r"""Timed lyric overlay for mpv via JSON IPC. This is intentionally implemented from scratch (no vendored/copied code) while providing the same *kind* of functionality as popular mpv lyric scripts: - Parse LRC (timestamped lyrics) - Track mpv playback time via IPC - Show the current line on mpv's OSD Primary intended usage in this repo: - Auto mode (no stdin / no --lrc): loads lyrics from store notes. A lyric note is stored under the note name 'lyric'. - If the lyric note is missing, auto mode will attempt to auto-fetch synced lyrics from a public API (LRCLIB) and store it into the 'lyric' note. You can disable this by setting config key `lyric_autofetch` to false. - You can still pipe LRC into this script (stdin) and it will render lyrics in mpv. Example (PowerShell): Get-Content .\song.lrc | python -m MPV.lyric If you want to connect to a non-default mpv IPC server: Get-Content .\song.lrc | python -m MPV.lyric --ipc "\\.\pipe\mpv-custom" """ from __future__ import annotations import argparse import bisect import hashlib import os import re import sys import tempfile import time from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, TextIO from urllib.parse import parse_qs, unquote, urlencode from urllib.request import Request, urlopen from urllib.parse import urlparse from MPV.mpv_ipc import MPV, MPVIPCClient _TIMESTAMP_RE = re.compile(r"\[(?P\d+):(?P\d{2})(?:\.(?P\d{1,3}))?\]") _OFFSET_RE = re.compile(r"^\[offset:(?P[+-]?\d+)\]$", re.IGNORECASE) _HASH_RE = re.compile(r"[0-9a-f]{64}", re.IGNORECASE) _HYDRUS_HASH_QS_RE = re.compile(r"hash=([0-9a-f]{64})", re.IGNORECASE) _WIN_DRIVE_RE = re.compile(r"^[a-zA-Z]:[\\/]") _WIN_UNC_RE = re.compile(r"^\\\\") _LOG_FH: Optional[TextIO] = None _SINGLE_INSTANCE_LOCK_FH: Optional[TextIO] = None _LYRIC_VISIBLE_PROP = "user-data/medeia-lyric-visible" # Optional overrides set by the playlist controller (.pipe/.mpv) so the lyric # helper can resolve notes even when the local file path cannot be mapped back # to a store via the store DB. _ITEM_STORE_PROP = "user-data/medeia-item-store" _ITEM_HASH_PROP = "user-data/medeia-item-hash" # Note: We previously used `osd-overlay`, but some mpv builds return # error='invalid parameter' for that command. We now use `show-text`, which is # widely supported across mpv versions. _OSD_STYLE_SAVED: Optional[Dict[str, Any]] = None _OSD_STYLE_APPLIED: bool = False def _single_instance_lock_path(ipc_path: str) -> Path: # Key the lock to the mpv IPC target so multiple mpv instances with different # IPC servers can still run independent lyric helpers. key = hashlib.sha1((ipc_path or "").encode("utf-8", errors="ignore")).hexdigest() tmp_dir = Path(tempfile.gettempdir()) return (tmp_dir / f"medeia-mpv-lyric-{key}.lock").resolve() def _acquire_single_instance_lock(ipc_path: str) -> bool: """Ensure only one MPV.lyric process runs per IPC server. This prevents duplicate overlays (e.g. multiple lyric helpers racing to update OSD). """ global _SINGLE_INSTANCE_LOCK_FH if _SINGLE_INSTANCE_LOCK_FH is not None: return True lock_path = _single_instance_lock_path(ipc_path) lock_path.parent.mkdir(parents=True, exist_ok=True) try: fh = open(lock_path, "a", encoding="utf-8", errors="replace") except Exception: # If we can't create the lock file, don't block playback; just proceed. return True try: if os.name == "nt": import msvcrt # Lock the first byte (non-blocking). msvcrt.locking(fh.fileno(), msvcrt.LK_NBLCK, 1) else: import fcntl fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) _SINGLE_INSTANCE_LOCK_FH = fh try: fh.write(f"pid={os.getpid()} ipc={ipc_path}\n") fh.flush() except Exception: pass return True except Exception: try: fh.close() except Exception: pass return False def _ass_escape(text: str) -> str: # Escape braces/backslashes so lyric text can't break ASS formatting. t = str(text or "") t = t.replace("\\", "\\\\") t = t.replace("{", "\\{") t = t.replace("}", "\\}") t = t.replace("\r\n", "\n").replace("\r", "\n") t = t.replace("\n", "\\N") return t def _osd_set_text(client: MPVIPCClient, text: str, *, duration_ms: int = 1000) -> Optional[dict]: # Signature: show-text [] [] # Duration 0 clears immediately; we generally set it to cover until next update. try: d = int(duration_ms) except Exception: d = 1000 if d < 0: d = 0 return client.send_command({ "command": [ "show-text", str(text or ""), d, ] }) def _osd_clear(client: MPVIPCClient) -> None: try: _osd_set_text(client, "", duration_ms=0) except Exception: return def _log(msg: str) -> None: line = f"[{datetime.now().isoformat(timespec='seconds')}] {msg}" try: if _LOG_FH is not None: _LOG_FH.write(line + "\n") _LOG_FH.flush() return except Exception: pass print(line, file=sys.stderr, flush=True) def _ipc_get_property( client: MPVIPCClient, name: str, default: object = None, *, raise_on_disconnect: bool = False, ) -> object: try: resp = client.send_command({ "command": ["get_property", name] }) except Exception as exc: if raise_on_disconnect: raise ConnectionError(f"Lost mpv IPC connection: {exc}") from exc return default if resp is None: if raise_on_disconnect: raise ConnectionError("Lost mpv IPC connection") return default if resp and resp.get("error") == "success": return resp.get("data", default) return default def _ipc_set_property(client: MPVIPCClient, name: str, value: Any) -> bool: resp = client.send_command({ "command": ["set_property", name, value] }) return bool(resp and resp.get("error") == "success") def _osd_capture_style(client: MPVIPCClient) -> Dict[str, Any]: keys = [ "osd-align-x", "osd-align-y", "osd-font-size", "osd-margin-y", ] out: Dict[str, Any] = {} for k in keys: try: out[k] = _ipc_get_property(client, k, None) except Exception: out[k] = None return out def _osd_apply_lyric_style(client: MPVIPCClient, *, config: Dict[str, Any]) -> None: """Apply bottom-center + larger font for lyric show-text messages. This modifies mpv's global OSD settings, so we save and restore them. """ global _OSD_STYLE_SAVED, _OSD_STYLE_APPLIED if not _OSD_STYLE_APPLIED: if _OSD_STYLE_SAVED is None: _OSD_STYLE_SAVED = _osd_capture_style(client) try: _ipc_set_property(client, "osd-align-x", "center") _ipc_set_property(client, "osd-align-y", "bottom") scale = config.get("lyric_osd_font_scale", 1.15) try: scale_f = float(scale) except Exception: scale_f = 1.15 if scale_f < 1.0: scale_f = 1.0 old_size = None try: if _OSD_STYLE_SAVED is not None: old_size = _OSD_STYLE_SAVED.get("osd-font-size") except Exception: old_size = None if isinstance(old_size, (int, float)): new_size = int(max(10, round(float(old_size) * scale_f))) else: # mpv default is typically ~55; choose a conservative readable size. new_size = int(config.get("lyric_osd_font_size", 64)) _ipc_set_property(client, "osd-font-size", new_size) min_margin_y = int(config.get("lyric_osd_min_margin_y", 60)) old_margin_y = None try: if _OSD_STYLE_SAVED is not None: old_margin_y = _OSD_STYLE_SAVED.get("osd-margin-y") except Exception: old_margin_y = None if isinstance(old_margin_y, (int, float)): _ipc_set_property(client, "osd-margin-y", int(max(old_margin_y, min_margin_y))) else: _ipc_set_property(client, "osd-margin-y", min_margin_y) except Exception: return _OSD_STYLE_APPLIED = True def _osd_restore_style(client: MPVIPCClient) -> None: global _OSD_STYLE_SAVED, _OSD_STYLE_APPLIED if not _OSD_STYLE_APPLIED: return try: saved = _OSD_STYLE_SAVED or {} for k, v in saved.items(): if v is None: continue try: _ipc_set_property(client, k, v) except Exception: pass finally: _OSD_STYLE_APPLIED = False def _osd_clear_and_restore(client: MPVIPCClient) -> None: """Clear OSD text and restore any saved OSD style in a single call.""" _osd_clear(client) _osd_restore_style(client) def _http_get_json_raw(url: str, *, timeout_s: float = 10.0) -> Optional[Any]: """HTTP GET and JSON-decode; returns the parsed value (dict, list, etc.) or None on any failure.""" try: req = Request( url, headers={ "User-Agent": "medeia-macina/lyric", "Accept": "application/json", }, method="GET", ) with urlopen(req, timeout=timeout_s) as resp: data = resp.read() import json return json.loads(data.decode("utf-8", errors="replace")) except Exception as exc: _log(f"HTTP JSON failed: {exc} ({url})") return None def _http_get_json(url: str, *, timeout_s: float = 10.0) -> Optional[dict]: """HTTP GET returning a JSON object (dict), or None.""" obj = _http_get_json_raw(url, timeout_s=timeout_s) return obj if isinstance(obj, dict) else None def _http_get_json_list(url: str, *, timeout_s: float = 10.0) -> Optional[list]: """HTTP GET returning a JSON array (list), or None.""" obj = _http_get_json_raw(url, timeout_s=timeout_s) return obj if isinstance(obj, list) else None def _sanitize_query(s: Optional[str]) -> Optional[str]: if not isinstance(s, str): return None t = s.strip().strip("\ufeff") return t if t else None def _infer_artist_title_from_tags( tags: List[str] ) -> tuple[Optional[str], Optional[str]]: artist = None title = None for t in tags or []: ts = str(t) low = ts.lower() if low.startswith("artist:") and artist is None: artist = ts.split(":", 1)[1].strip() or None elif low.startswith("title:") and title is None: title = ts.split(":", 1)[1].strip() or None if artist and title: break return _sanitize_query(artist), _sanitize_query(title) def _wrap_plain_lyrics_as_lrc(text: str) -> str: # Fallback: create a crude LRC that advances every 4 seconds. # This is intentionally simple and deterministic. lines = [ln.strip() for ln in (text or "").splitlines()] lines = [ln for ln in lines if ln] if not lines: return "" out: List[str] = [] t_s = 0 for ln in lines: mm = t_s // 60 ss = t_s % 60 out.append(f"[{mm:02d}:{ss:02d}.00]{ln}") t_s += 4 return "\n".join(out) + "\n" def _fetch_lrclib( *, artist: Optional[str], title: Optional[str], duration_s: Optional[float] = None ) -> Optional[str]: base = "https://lrclib.net/api" # Require both artist and title; title-only lookups cause frequent mismatches. if not artist or not title: return None # Try direct get. q: Dict[str, str] = { "artist_name": artist, "track_name": title, } if isinstance(duration_s, (int, float)) and duration_s and duration_s > 0: q["duration"] = str(int(duration_s)) url = f"{base}/get?{urlencode(q)}" obj = _http_get_json(url) if isinstance(obj, dict): synced = obj.get("syncedLyrics") if isinstance(synced, str) and synced.strip(): _log("LRCLIB: got syncedLyrics") return synced plain = obj.get("plainLyrics") if isinstance(plain, str) and plain.strip(): _log("LRCLIB: only plainLyrics; wrapping") wrapped = _wrap_plain_lyrics_as_lrc(plain) return wrapped if wrapped.strip() else None # Fallback: search using artist+title only. q_text = f"{artist} {title}" url = f"{base}/search?{urlencode({'q': q_text})}" items = _http_get_json_list(url) or [] for item in items: if not isinstance(item, dict): continue synced = item.get("syncedLyrics") if isinstance(synced, str) and synced.strip(): _log("LRCLIB: search hit with syncedLyrics") return synced # Plain lyrics fallback from search if available for item in items: if not isinstance(item, dict): continue plain = item.get("plainLyrics") if isinstance(plain, str) and plain.strip(): _log("LRCLIB: search hit only plainLyrics; wrapping") wrapped = _wrap_plain_lyrics_as_lrc(plain) return wrapped if wrapped.strip() else None return None def _fetch_lyrics_ovh(*, artist: Optional[str], title: Optional[str]) -> Optional[str]: # Public, no-auth lyrics provider (typically plain lyrics, not time-synced). if not artist or not title: return None try: # Endpoint uses path segments, so we urlencode each part. from urllib.parse import quote url = f"https://api.lyrics.ovh/v1/{quote(artist)}/{quote(title)}" obj = _http_get_json(url) if not isinstance(obj, dict): return None lyr = obj.get("lyrics") if isinstance(lyr, str) and lyr.strip(): _log("lyrics.ovh: got plain lyrics; wrapping") wrapped = _wrap_plain_lyrics_as_lrc(lyr) return wrapped if wrapped.strip() else None except Exception as exc: _log(f"lyrics.ovh failed: {exc}") return None @dataclass(frozen=True) class LrcLine: time_s: float text: str def _frac_to_ms(frac: str) -> int: # LRC commonly uses centiseconds (2 digits), but can be 1–3 digits. if not frac: return 0 if len(frac) == 3: return int(frac) if len(frac) == 2: return int(frac) * 10 return int(frac) * 100 def parse_lrc(text: str) -> List[LrcLine]: """Parse LRC into sorted timestamped lines.""" offset_ms = 0 lines: List[LrcLine] = [] for raw_line in text.splitlines(): line = raw_line.strip("\ufeff\r\n") if not line: continue # Optional global offset. off_m = _OFFSET_RE.match(line) if off_m: try: offset_ms = int(off_m.group("ms")) except Exception: offset_ms = 0 continue matches = list(_TIMESTAMP_RE.finditer(line)) if not matches: # Ignore non-timestamp metadata lines like [ar:], [ti:], etc. continue lyric_text = line[matches[-1].end():].strip() for m in matches: mm = int(m.group("m")) ss = int(m.group("s")) frac = m.group("frac") or "" ts_ms = (mm * 60 + ss) * 1000 + _frac_to_ms(frac) + offset_ms if ts_ms < 0: continue lines.append(LrcLine(time_s=ts_ms / 1000.0, text=lyric_text)) # Sort and de-dupe by timestamp (prefer last non-empty text). lines.sort(key=lambda x: x.time_s) deduped: List[LrcLine] = [] for item in lines: if deduped and abs(deduped[-1].time_s - item.time_s) < 1e-6: if item.text: deduped[-1] = item else: deduped.append(item) return deduped def _read_all_stdin() -> str: return sys.stdin.read() def _current_index(time_s: float, times: List[float]) -> int: # Index of last timestamp <= time_s return bisect.bisect_right(times, time_s) - 1 def _lyric_duration_ms(idx: int, times: List[float], current_t: float) -> int: """Duration in ms to display the lyric at *idx* — until the next timestamp or a safe maximum.""" try: if idx + 1 < len(times): return int(max(250, min(8000, (times[idx + 1] - current_t) * 1000))) except Exception: pass return 1200 def _unwrap_memory_m3u(text: Optional[str]) -> Optional[str]: """Extract the real target URL/path from a memory:// M3U payload.""" if not isinstance(text, str) or not text.startswith("memory://"): return text for line in text.splitlines(): s = line.strip() if not s or s.startswith("#") or s.startswith("memory://"): continue return s return text def _extract_hash_from_target(target: str) -> Optional[str]: if not isinstance(target, str): return None m = _HYDRUS_HASH_QS_RE.search(target) if m: return m.group(1).lower() # Fallback: plain hash string s = target.strip().lower() if _HASH_RE.fullmatch(s): return s return None def _load_config_best_effort() -> dict: try: from SYS.config import load_config cfg = load_config() return cfg if isinstance(cfg, dict) else {} except Exception: return {} def _extract_note_text(notes: Dict[str, str], name: str) -> Optional[str]: """Return stripped text from the note named *name*, or None if absent or blank.""" if not isinstance(notes, dict) or not notes: return None raw = None for k, v in notes.items(): if isinstance(k, str) and k.strip() == name: raw = v break if not isinstance(raw, str): return None text = raw.strip("\ufeff\r\n") return text if text.strip() else None def _extract_lrc_from_notes(notes: Dict[str, str]) -> Optional[str]: """Return raw LRC text from the note named 'lyric'.""" return _extract_note_text(notes, "lyric") def _extract_sub_from_notes(notes: Dict[str, str]) -> Optional[str]: """Return raw subtitle text from the note named 'sub'.""" return _extract_note_text(notes, "sub") def _infer_sub_extension(text: str) -> str: # Best-effort: mpv generally understands SRT/VTT; choose based on content. t = (text or "").lstrip("\ufeff\r\n").lstrip() if t.upper().startswith("WEBVTT"): return ".vtt" if "-->" in t: # SRT typically uses commas for milliseconds, VTT uses dots. if re.search(r"\d\d:\d\d:\d\d,\d\d\d\s*-->\s*\d\d:\d\d:\d\d,\d\d\d", t): return ".srt" return ".vtt" return ".vtt" def _write_temp_sub_file(*, key: str, text: str) -> Path: # Write to a content-addressed temp path so updates force mpv reload. tmp_dir = Path(tempfile.gettempdir()) / "medeia-mpv-notes" tmp_dir.mkdir(parents=True, exist_ok=True) ext = _infer_sub_extension(text) digest = hashlib.sha1((key + "\n" + (text or "")).encode("utf-8", errors="ignore") ).hexdigest()[:16] safe_key = hashlib.sha1((key or "").encode("utf-8", errors="ignore")).hexdigest()[:12] path = (tmp_dir / f"sub-{safe_key}-{digest}{ext}").resolve() path.write_text(text or "", encoding="utf-8", errors="replace") return path def _try_remove_selected_external_sub(client: MPVIPCClient) -> None: try: client.send_command({ "command": ["sub-remove"] }) except Exception: return def _try_add_external_sub(client: MPVIPCClient, path: Path) -> None: try: client.send_command( { "command": ["sub-add", str(path), "select", "medeia-sub"] } ) except Exception: return def _is_stream_target(target: str) -> bool: """Return True when mpv's 'path' is not a local filesystem file. We intentionally treat any URL/streaming scheme as invalid for lyrics in auto mode. """ if not isinstance(target, str): return False s = target.strip() if not s: return False # Windows local paths: drive letter or UNC. if _WIN_DRIVE_RE.match(s) or _WIN_UNC_RE.match(s): return False # Common streaming prefixes. if s.startswith("http://") or s.startswith("https://"): return True # Generic scheme:// (e.g. ytdl://, edl://, rtmp://, etc.). if "://" in s: try: parsed = urlparse(s) scheme = (parsed.scheme or "").lower() if scheme and scheme not in {"file"}: return True except Exception: return True return False def _normalize_file_uri_target(target: str) -> str: """Convert file:// URIs to a local filesystem path string when possible.""" if not isinstance(target, str): return target s = target.strip() if not s: return target if not s.lower().startswith("file://"): return target try: parsed = urlparse(s) path = unquote(parsed.path or "") if os.name == "nt": # UNC: file://server/share/path -> \\server\share\path if parsed.netloc: p = path.replace("/", "\\") if p.startswith("\\"): p = p.lstrip("\\") return f"\\\\{parsed.netloc}\\{p}" if p else f"\\\\{parsed.netloc}" # Drive letter: file:///C:/path -> C:/path if path.startswith("/") and len(path) >= 3 and path[2] == ":": path = path[1:] return path or target except Exception: return target def _extract_store_from_url_target(target: str) -> Optional[str]: """Extract explicit store name from a URL query param `store=...` (if present).""" if not isinstance(target, str): return None s = target.strip() if not (s.startswith("http://") or s.startswith("https://")): return None try: parsed = urlparse(s) if not parsed.query: return None qs = parse_qs(parsed.query) raw = qs.get("store", [None])[0] if isinstance(raw, str) and raw.strip(): return raw.strip() except Exception: return None return None def _infer_hydrus_store_from_url_target(*, target: str, config: dict) -> Optional[str]: """Infer a Hydrus store backend by matching the URL prefix to the backend base URL.""" if not isinstance(target, str): return None s = target.strip() if not (s.startswith("http://") or s.startswith("https://")): return None try: from Store import Store as StoreRegistry reg = StoreRegistry(config, suppress_debug=True) backends = [(name, reg[name]) for name in reg.list_backends()] except Exception: return None matches: List[str] = [] for name, backend in backends: if type(backend).__name__ != "HydrusNetwork": continue base_url = getattr(backend, "_url", None) if not base_url: client = getattr(backend, "_client", None) base_url = getattr(client, "url", None) if client else None if not base_url: continue base = str(base_url).rstrip("/") if s.startswith(base): matches.append(name) if len(matches) == 1: return matches[0] return None def _resolve_store_backend_for_target( *, target: str, file_hash: str, config: dict, ) -> tuple[Optional[str], Any]: """Resolve a store backend for a local mpv target using the store DB. A target is considered valid only when: - target is a local filesystem file - a backend's get_file(hash) returns a local file path - that path resolves to the same target path """ try: p = Path(target) if not p.exists() or not p.is_file(): return None, None target_resolved = p.resolve() except Exception: return None, None try: from Store import Store as StoreRegistry reg = StoreRegistry(config, suppress_debug=True) backend_names = list(reg.list_backends()) except Exception: return None, None for name in backend_names: try: backend = reg[name] except Exception: continue store_file = None try: store_file = backend.get_file(file_hash, config=config) except TypeError: try: store_file = backend.get_file(file_hash) except Exception: store_file = None except Exception: store_file = None if not store_file: continue # Only accept local files; if the backend returns a URL, it's not valid for lyrics. try: store_path = Path(str(store_file)).expanduser() if not store_path.exists() or not store_path.is_file(): continue if store_path.resolve() != target_resolved: continue except Exception: continue return name, backend return None, None def _infer_hash_for_target(target: str) -> Optional[str]: """Infer SHA256 hash from Hydrus URL query, hash-named local files, or by hashing local file content.""" h = _extract_hash_from_target(target) if h: return h try: p = Path(target) if not p.exists() or not p.is_file(): return None stem = p.stem if isinstance(stem, str) and _HASH_RE.fullmatch(stem.strip()): return stem.strip().lower() from SYS.utils import sha256_file return sha256_file(p) except Exception: return None @dataclass class _PlaybackState: """Mutable per-track resolution state for the auto overlay loop. Centralising these variables in one object eliminates the repeated 15-line 'reset everything + clear OSD + remove sub' block that previously appeared five times inside run_auto_overlay. """ store_name: Optional[str] = None file_hash: Optional[str] = None key: Optional[str] = None backend: Optional[Any] = None entries: List[LrcLine] = field(default_factory=list) times: List[float] = field(default_factory=list) loaded_key: Optional[str] = None loaded_mode: Optional[str] = None # 'lyric' | 'sub' | None loaded_sub_path: Optional[Path] = None last_target: Optional[str] = None fetch_attempt_key: Optional[str] = None fetch_attempt_at: float = 0.0 def clear(self, client: MPVIPCClient, *, clear_hash: bool = True) -> None: """Reset backend resolution and clean up any active OSD / external subtitle. Pass ``clear_hash=False`` to preserve *file_hash* when the hash is still valid but the store lookup failed (e.g. store temporarily unavailable), so the late-arriving-context fallback can retry later. """ self.store_name = None self.backend = None self.key = None if clear_hash: self.file_hash = None self.entries = [] self.times = [] if self.loaded_key is not None: _osd_clear_and_restore(client) self.loaded_key = None self.loaded_mode = None if self.loaded_sub_path is not None: _try_remove_selected_external_sub(client) self.loaded_sub_path = None def run_auto_overlay( *, mpv: MPV, poll_s: float = 0.15, config: Optional[dict] = None ) -> int: """Auto mode: track mpv's current file and render lyrics (note: 'lyric') or subtitles (note: 'sub'). State is managed via :class:`_PlaybackState` to eliminate the repeated 15-line reset blocks of the previous implementation. """ cfg = config or {} client = mpv.client() if not client.connect(): _log("mpv IPC is not reachable (is mpv running with --input-ipc-server?).") return 3 _log(f"Auto overlay connected (ipc={getattr(mpv, 'ipc_path', None)})") state = _PlaybackState() last_idx: Optional[int] = None last_text: Optional[str] = None last_visible: Optional[bool] = None global _OSD_STYLE_SAVED, _OSD_STYLE_APPLIED # Import the Store registry once so each track change doesn't re-import the module. try: from Store import Store as _StoreRegistry # noqa: PLC0415 _store_cls: Any = _StoreRegistry except Exception: _store_cls = None def _make_registry() -> Optional[Any]: if _store_cls is None: return None try: return _store_cls(cfg, suppress_debug=True) except Exception: return None while True: # ---------------------------------------------------------------- # 1. Read IPC properties; reconnect on disconnect. # ---------------------------------------------------------------- try: visible_raw = _ipc_get_property( client, _LYRIC_VISIBLE_PROP, True, raise_on_disconnect=True ) raw_path = _ipc_get_property(client, "path", None, raise_on_disconnect=True) except ConnectionError: _osd_clear_and_restore(client) try: client.disconnect() except Exception: pass _OSD_STYLE_SAVED = None _OSD_STYLE_APPLIED = False if not client.connect(): _log("mpv IPC disconnected; exiting MPV.lyric") return 4 time.sleep(poll_s) continue # ---------------------------------------------------------------- # 2. Visibility toggle support. # ---------------------------------------------------------------- visible = bool(visible_raw) if isinstance(visible_raw, (bool, int)) else True if last_visible is None: last_visible = visible elif last_visible is True and visible is False: _osd_clear_and_restore(client) _try_remove_selected_external_sub(client) last_idx = None last_text = None last_visible = visible elif last_visible is False and visible is True: last_idx = None last_text = None last_visible = visible else: last_visible = visible # ---------------------------------------------------------------- # 3. Normalise the current playback target. # ---------------------------------------------------------------- target = _unwrap_memory_m3u(str(raw_path)) if isinstance(raw_path, str) else None if isinstance(target, str): target = _normalize_file_uri_target(target) if not isinstance(target, str) or not target: time.sleep(poll_s) continue is_http = target.startswith("http://") or target.startswith("https://") # Non-HTTP streams (ytdl://, edl://, rtmp://, etc.) are never valid for lyrics. if (not is_http) and _is_stream_target(target): state.clear(client) state.last_target = target time.sleep(poll_s) continue # ---------------------------------------------------------------- # 4. Read user-data overrides from the playlist controller. # ---------------------------------------------------------------- store_override: Optional[str] = None hash_override: Optional[str] = None try: raw_so = _ipc_get_property(client, _ITEM_STORE_PROP, None) raw_ho = _ipc_get_property(client, _ITEM_HASH_PROP, None) store_override = str(raw_so).strip() if raw_so else None hash_override = str(raw_ho).strip().lower() if raw_ho else None except Exception: pass # ---------------------------------------------------------------- # 5. Resolve store / hash on target change. # ---------------------------------------------------------------- if target != state.last_target: state.last_target = target last_idx = None last_text = None _log(f"Target changed: {target}") state.file_hash = _infer_hash_for_target(target) if not state.file_hash: state.clear(client, clear_hash=False) time.sleep(poll_s) continue # Reset backend state; user-data override may supply it right away. state.store_name = None state.backend = None state.key = None if store_override and (not hash_override or hash_override == state.file_hash): reg = _make_registry() if reg is not None: try: state.backend = reg[store_override] state.store_name = store_override state.key = f"{state.store_name}:{state.file_hash}" _log( f"Resolved via mpv override" f" store={state.store_name!r} hash={state.file_hash!r} valid=True" ) except Exception: state.backend = None state.store_name = None state.key = None if is_http: store_from_url = _extract_store_from_url_target(target) store_name = store_from_url or _infer_hydrus_store_from_url_target( target=target, config=cfg ) if not store_name: _log("HTTP target has no store mapping; lyrics disabled") state.clear(client, clear_hash=False) time.sleep(poll_s) continue reg = _make_registry() if reg is None: _log(f"HTTP target store {store_name!r} not available; lyrics disabled") state.clear(client, clear_hash=False) time.sleep(poll_s) continue try: state.backend = reg[store_name] state.store_name = store_name except Exception: _log(f"HTTP target store {store_name!r} not available; lyrics disabled") state.clear(client, clear_hash=False) time.sleep(poll_s) continue # Existence check only when store was inferred (not explicit in ?store=…). # When ?store= is in the URL mpv is already streaming — the file provably exists. if not store_from_url: try: meta = state.backend.get_metadata(state.file_hash, config=cfg) except Exception: meta = None if meta is None: _log( f"HTTP target not found in store DB" f" (store={store_name!r} hash={state.file_hash}); lyrics disabled" ) state.clear(client, clear_hash=False) time.sleep(poll_s) continue state.key = f"{state.store_name}:{state.file_hash}" _log(f"Resolved store={state.store_name!r} hash={state.file_hash!r} valid=True") else: # Local file: resolve via store DB (skip if user-data already resolved it). if not state.key or not state.backend: state.store_name, state.backend = _resolve_store_backend_for_target( target=target, file_hash=state.file_hash, config=cfg, ) state.key = ( f"{state.store_name}:{state.file_hash}" if state.store_name and state.file_hash else None ) _log( f"Resolved store={state.store_name!r} hash={state.file_hash!r}" f" valid={bool(state.key)}" ) if not state.key or not state.backend: state.clear(client, clear_hash=False) time.sleep(poll_s) continue # ---------------------------------------------------------------- # 6. Late-arriving context fallback: user-data override published # after the track change was already processed without a backend. # ---------------------------------------------------------------- if (not is_http) and target and (not state.key or not state.backend): try: state.file_hash = _infer_hash_for_target(target) or state.file_hash except Exception: pass if ( store_override and state.file_hash and (not hash_override or hash_override == state.file_hash) ): reg = _make_registry() if reg is not None: try: state.backend = reg[store_override] state.store_name = store_override state.key = f"{state.store_name}:{state.file_hash}" _log( f"Resolved via mpv override" f" store={state.store_name!r} hash={state.file_hash!r} valid=True" ) except Exception: pass # ---------------------------------------------------------------- # 7. Load / reload content when the resolved key changes. # ---------------------------------------------------------------- if ( state.key and state.key != state.loaded_key and state.store_name and state.file_hash and state.backend ): notes: Dict[str, str] = {} try: notes = state.backend.get_note(state.file_hash, config=cfg) or {} except Exception: notes = {} try: _log( f"Loaded notes keys:" f" {sorted(str(k) for k in notes) if isinstance(notes, dict) else 'N/A'}" ) except Exception: _log("Loaded notes keys: ") sub_text = _extract_note_text(notes, "sub") if sub_text: # Hand subtitles to mpv's track subsystem; suppress OSD lyric overlay. _osd_clear_and_restore(client) sub_path: Optional[Path] = None try: sub_path = _write_temp_sub_file(key=state.key, text=sub_text) except Exception as exc: _log(f"Failed to write sub note temp file: {exc}") if sub_path is not None: if state.loaded_sub_path is not None: _try_remove_selected_external_sub(client) _try_add_external_sub(client, sub_path) state.loaded_sub_path = sub_path state.entries = [] state.times = [] state.loaded_key = state.key state.loaded_mode = "sub" else: # Switching away from sub mode: unload the external subtitle. if state.loaded_mode == "sub" and state.loaded_sub_path is not None: _try_remove_selected_external_sub(client) state.loaded_sub_path = None lrc_text = _extract_note_text(notes, "lyric") if not lrc_text: _log("No lyric note found (note name: 'lyric')") # Auto-fetch: throttled per key to avoid hammering APIs. autofetch_enabled = bool(cfg.get("lyric_autofetch", True)) now = time.time() if ( autofetch_enabled and state.key != state.fetch_attempt_key and (now - state.fetch_attempt_at) > 2.0 ): state.fetch_attempt_key = state.key state.fetch_attempt_at = now artist: Optional[str] = None title: Optional[str] = None duration_s: Optional[float] = None try: duration_s = _ipc_get_property(client, "duration", None) except Exception: pass try: tags, _src = state.backend.get_tag(state.file_hash, config=cfg) if isinstance(tags, list): artist, title = _infer_artist_title_from_tags( [str(x) for x in tags] ) except Exception: pass _log( f"Autofetch query artist={artist!r} title={title!r}" f" duration={duration_s!r}" ) if not artist or not title: _log("Autofetch skipped: requires both artist and title") fetched: Optional[str] = None else: fetched = _fetch_lrclib( artist=artist, title=title, duration_s=( float(duration_s) if isinstance(duration_s, (int, float)) else None ), ) if not fetched or not fetched.strip(): fetched = _fetch_lyrics_ovh(artist=artist, title=title) if fetched and fetched.strip(): try: ok = bool( state.backend.set_note( state.file_hash, "lyric", fetched, config=cfg ) ) _log(f"Autofetch stored lyric note ok={ok}") except Exception as exc: _log(f"Autofetch failed to store lyric note: {exc}") else: _log("Autofetch: no lyrics found") state.entries = [] state.times = [] _osd_clear_and_restore(client) state.loaded_key = None state.loaded_mode = None elif not lrc_text: # No lyric and autofetch is throttled or disabled for this key. _osd_clear_and_restore(client) state.entries = [] state.times = [] state.loaded_key = state.key state.loaded_mode = None else: _log(f"Loaded lyric note ({len(lrc_text)} chars)") parsed = parse_lrc(lrc_text) state.entries = parsed state.times = [e.time_s for e in parsed] state.loaded_key = state.key state.loaded_mode = "lyric" # ---------------------------------------------------------------- # 8. Render the current lyric line. # ---------------------------------------------------------------- try: t = _ipc_get_property(client, "time-pos", None, raise_on_disconnect=True) except ConnectionError: _osd_clear_and_restore(client) try: client.disconnect() except Exception: pass _OSD_STYLE_SAVED = None _OSD_STYLE_APPLIED = False if not client.connect(): _log("mpv IPC disconnected; exiting MPV.lyric") return 4 time.sleep(poll_s) continue if not isinstance(t, (int, float)): time.sleep(poll_s) continue if not state.entries: if last_text is not None: _osd_clear_and_restore(client) last_text = None last_idx = None time.sleep(poll_s) continue if not visible: if last_text is not None: _osd_clear_and_restore(client) last_text = None last_idx = None time.sleep(poll_s) continue idx = _current_index(float(t), state.times) if idx < 0: time.sleep(poll_s) continue line = state.entries[idx] if idx != last_idx or line.text != last_text: if state.loaded_mode == "lyric": try: _osd_apply_lyric_style(client, config=cfg) except Exception: pass dur_ms = _lyric_duration_ms(idx, state.times, float(t)) resp = _osd_set_text(client, line.text, duration_ms=dur_ms) if resp is None: client.disconnect() if not client.connect(): print("Lost mpv IPC connection.", file=sys.stderr) return 4 elif isinstance(resp, dict) and resp.get("error") not in (None, "success"): _log(f"mpv show-text returned error={resp.get('error')!r}") last_idx = idx last_text = line.text time.sleep(poll_s) def run_overlay(*, mpv: MPV, entries: List[LrcLine], poll_s: float = 0.15) -> int: if not entries: print("No timestamped LRC lines found.", file=sys.stderr) return 2 times = [e.time_s for e in entries] last_idx: Optional[int] = None last_text: Optional[str] = None client = mpv.client() if not client.connect(): print( "mpv IPC is not reachable (is mpv running with --input-ipc-server?).", file=sys.stderr, ) return 3 while True: try: # mpv returns None when idle/no file. t = _ipc_get_property(client, "time-pos", None, raise_on_disconnect=True) except ConnectionError: _osd_clear(client) try: client.disconnect() except Exception: pass if not client.connect(): print("Lost mpv IPC connection.", file=sys.stderr) return 4 time.sleep(poll_s) continue if not isinstance(t, (int, float)): time.sleep(poll_s) continue idx = _current_index(float(t), times) if idx < 0: # Before first lyric timestamp. time.sleep(poll_s) continue line = entries[idx] if idx != last_idx or line.text != last_text: dur_ms = _lyric_duration_ms(idx, times, float(t)) resp = _osd_set_text(client, line.text, duration_ms=dur_ms) if resp is None: client.disconnect() if not client.connect(): print("Lost mpv IPC connection.", file=sys.stderr) return 4 elif isinstance(resp, dict) and resp.get("error") not in (None, "success"): _log(f"mpv show-text returned error={resp.get('error')!r}") last_idx = idx last_text = line.text time.sleep(poll_s) def main(argv: Optional[List[str]] = None) -> int: parser = argparse.ArgumentParser(prog="python -m MPV.lyric", add_help=True) parser.add_argument( "--ipc", default=None, help="mpv IPC path. Defaults to the repo's fixed IPC pipe name.", ) parser.add_argument( "--lrc", default=None, help="Path to an .lrc file. If omitted, reads LRC from stdin.", ) parser.add_argument( "--poll", type=float, default=0.15, help="Polling interval in seconds for time-pos updates.", ) parser.add_argument( "--log", default=None, help="Optional path to a log file for diagnostics.", ) args = parser.parse_args(argv) # Configure logging early. global _LOG_FH if args.log: try: log_path = Path(str(args.log)).expanduser().resolve() log_path.parent.mkdir(parents=True, exist_ok=True) _LOG_FH = open(log_path, "a", encoding="utf-8", errors="replace") _log("MPV.lyric starting") except Exception: _LOG_FH = None mpv = MPV(ipc_path=args.ipc) if args.ipc else MPV() # Prevent multiple lyric helpers from running at once for the same mpv IPC. if not _acquire_single_instance_lock(getattr(mpv, "ipc_path", "") or ""): _log("Another MPV.lyric instance is already running for this IPC; exiting.") return 0 # If --lrc is provided, use it. if args.lrc: with open(args.lrc, "r", encoding="utf-8", errors="replace") as f: lrc_text = f.read() entries = parse_lrc(lrc_text) try: return run_overlay(mpv=mpv, entries=entries, poll_s=float(args.poll)) except KeyboardInterrupt: return 0 # Otherwise: if stdin has content, treat it as LRC; if stdin is empty/TTY, auto-discover. lrc_text = "" try: if not sys.stdin.isatty(): lrc_text = _read_all_stdin() or "" except Exception: lrc_text = "" if lrc_text.strip(): entries = parse_lrc(lrc_text) try: return run_overlay(mpv=mpv, entries=entries, poll_s=float(args.poll)) except KeyboardInterrupt: return 0 cfg = _load_config_best_effort() try: return run_auto_overlay(mpv=mpv, poll_s=float(args.poll), config=cfg) except KeyboardInterrupt: return 0 if __name__ == "__main__": raise SystemExit(main())