r"""Timed lyric overlay for mpv via JSON IPC. This is intentionally implemented from scratch (no vendored/copied code) while providing the same *kind* of functionality as popular mpv lyric scripts: - Parse LRC (timestamped lyrics) - Track mpv playback time via IPC - Show the current line on mpv's OSD Primary intended usage in this repo: - Auto mode (no stdin / no --lrc): loads lyrics from store notes. A lyric note is stored under the note name 'lyric'. - If the lyric note is missing, auto mode will attempt to auto-fetch synced lyrics from a public API (LRCLIB) and store it into the 'lyric' note. You can disable this by setting config key `lyric_autofetch` to false. - You can still pipe LRC into this script (stdin) and it will render lyrics in mpv. Example (PowerShell): Get-Content .\song.lrc | python -m MPV.lyric If you want to connect to a non-default mpv IPC server: Get-Content .\song.lrc | python -m MPV.lyric --ipc "\\.\pipe\mpv-custom" """ from __future__ import annotations import argparse import bisect import hashlib import os import re import sys import tempfile import time from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, TextIO from urllib.parse import parse_qs, unquote, urlencode from urllib.request import Request, urlopen from urllib.parse import urlparse from MPV.mpv_ipc import MPV, MPVIPCClient _TIMESTAMP_RE = re.compile(r"\[(?P\d+):(?P\d{2})(?:\.(?P\d{1,3}))?\]") _OFFSET_RE = re.compile(r"^\[offset:(?P[+-]?\d+)\]$", re.IGNORECASE) _HASH_RE = re.compile(r"[0-9a-f]{64}", re.IGNORECASE) _HYDRUS_HASH_QS_RE = re.compile(r"hash=([0-9a-f]{64})", re.IGNORECASE) _WIN_DRIVE_RE = re.compile(r"^[a-zA-Z]:[\\/]") _WIN_UNC_RE = re.compile(r"^\\\\") _LOG_FH: Optional[TextIO] = None _SINGLE_INSTANCE_LOCK_FH: Optional[TextIO] = None _LYRIC_VISIBLE_PROP = "user-data/medeia-lyric-visible" # mpv osd-overlay IDs are scoped to the IPC client connection. # MPV.lyric keeps a persistent connection, so we can safely reuse a constant ID. _LYRIC_OSD_OVERLAY_ID = 4242 def _single_instance_lock_path(ipc_path: str) -> Path: # Key the lock to the mpv IPC target so multiple mpv instances with different # IPC servers can still run independent lyric helpers. key = hashlib.sha1((ipc_path or "").encode("utf-8", errors="ignore")).hexdigest() tmp_dir = Path(tempfile.gettempdir()) return (tmp_dir / f"medeia-mpv-lyric-{key}.lock").resolve() def _acquire_single_instance_lock(ipc_path: str) -> bool: """Ensure only one MPV.lyric process runs per IPC server. This prevents duplicate overlays (e.g. one old show-text overlay + one new osd-overlay). """ global _SINGLE_INSTANCE_LOCK_FH if _SINGLE_INSTANCE_LOCK_FH is not None: return True lock_path = _single_instance_lock_path(ipc_path) lock_path.parent.mkdir(parents=True, exist_ok=True) try: fh = open(lock_path, "a", encoding="utf-8", errors="replace") except Exception: # If we can't create the lock file, don't block playback; just proceed. return True try: if os.name == "nt": import msvcrt # Lock the first byte (non-blocking). msvcrt.locking(fh.fileno(), msvcrt.LK_NBLCK, 1) else: import fcntl fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) _SINGLE_INSTANCE_LOCK_FH = fh try: fh.write(f"pid={os.getpid()} ipc={ipc_path}\n") fh.flush() except Exception: pass return True except Exception: try: fh.close() except Exception: pass return False def _ass_escape(text: str) -> str: # Escape braces/backslashes so lyric text can't break ASS formatting. t = str(text or "") t = t.replace("\\", "\\\\") t = t.replace("{", "\\{") t = t.replace("}", "\\}") t = t.replace("\r\n", "\n").replace("\r", "\n") t = t.replace("\n", "\\N") return t def _format_lyric_as_subtitle(text: str) -> str: # Bottom-center like a subtitle (ASS alignment 2). # NOTE: show-text escapes ASS by default; we use osd-overlay so this is honored. return "{\\an2}" + _ass_escape(text) def _osd_overlay_set_ass(client: MPVIPCClient, ass_text: str) -> Optional[dict]: # Use osd-overlay with ass-events so ASS override tags (e.g. {\an2}) are applied. # Keep z low so UI scripts (like uosc) can draw above it if they use higher z. return client.send_command( { "command": { "name": "osd-overlay", "id": _LYRIC_OSD_OVERLAY_ID, "format": "ass-events", "data": ass_text, "res_y": 720, "z": -50, } } ) def _osd_overlay_clear(client: MPVIPCClient) -> None: client.send_command({"command": {"name": "osd-overlay", "id": _LYRIC_OSD_OVERLAY_ID, "format": "none"}}) def _log(msg: str) -> None: line = f"[{datetime.now().isoformat(timespec='seconds')}] {msg}" try: if _LOG_FH is not None: _LOG_FH.write(line + "\n") _LOG_FH.flush() return except Exception: pass print(line, file=sys.stderr, flush=True) def _ipc_get_property( client: MPVIPCClient, name: str, default: object = None, *, raise_on_disconnect: bool = False, ) -> object: resp = client.send_command({"command": ["get_property", name]}) if resp is None: if raise_on_disconnect: raise ConnectionError("Lost mpv IPC connection") return default if resp and resp.get("error") == "success": return resp.get("data", default) return default def _http_get_json(url: str, *, timeout_s: float = 10.0) -> Optional[dict]: try: req = Request( url, headers={ "User-Agent": "medeia-macina/lyric", "Accept": "application/json", }, method="GET", ) with urlopen(req, timeout=timeout_s) as resp: data = resp.read() import json obj = json.loads(data.decode("utf-8", errors="replace")) return obj if isinstance(obj, dict) else None except Exception as exc: _log(f"HTTP JSON failed: {exc} ({url})") return None def _http_get_json_list(url: str, *, timeout_s: float = 10.0) -> Optional[list]: try: req = Request( url, headers={ "User-Agent": "medeia-macina/lyric", "Accept": "application/json", }, method="GET", ) with urlopen(req, timeout=timeout_s) as resp: data = resp.read() import json obj = json.loads(data.decode("utf-8", errors="replace")) return obj if isinstance(obj, list) else None except Exception as exc: _log(f"HTTP JSON(list) failed: {exc} ({url})") return None def _sanitize_query(s: Optional[str]) -> Optional[str]: if not isinstance(s, str): return None t = s.strip().strip("\ufeff") return t if t else None def _infer_artist_title_from_tags(tags: List[str]) -> tuple[Optional[str], Optional[str]]: artist = None title = None for t in tags or []: ts = str(t) low = ts.lower() if low.startswith("artist:") and artist is None: artist = ts.split(":", 1)[1].strip() or None elif low.startswith("title:") and title is None: title = ts.split(":", 1)[1].strip() or None if artist and title: break return _sanitize_query(artist), _sanitize_query(title) def _wrap_plain_lyrics_as_lrc(text: str) -> str: # Fallback: create a crude LRC that advances every 4 seconds. # This is intentionally simple and deterministic. lines = [ln.strip() for ln in (text or "").splitlines()] lines = [ln for ln in lines if ln] if not lines: return "" out: List[str] = [] t_s = 0 for ln in lines: mm = t_s // 60 ss = t_s % 60 out.append(f"[{mm:02d}:{ss:02d}.00]{ln}") t_s += 4 return "\n".join(out) + "\n" def _fetch_lrclib(*, artist: Optional[str], title: Optional[str], duration_s: Optional[float] = None) -> Optional[str]: base = "https://lrclib.net/api" # Require both artist and title; title-only lookups cause frequent mismatches. if not artist or not title: return None # Try direct get. q: Dict[str, str] = { "artist_name": artist, "track_name": title, } if isinstance(duration_s, (int, float)) and duration_s and duration_s > 0: q["duration"] = str(int(duration_s)) url = f"{base}/get?{urlencode(q)}" obj = _http_get_json(url) if isinstance(obj, dict): synced = obj.get("syncedLyrics") if isinstance(synced, str) and synced.strip(): _log("LRCLIB: got syncedLyrics") return synced plain = obj.get("plainLyrics") if isinstance(plain, str) and plain.strip(): _log("LRCLIB: only plainLyrics; wrapping") wrapped = _wrap_plain_lyrics_as_lrc(plain) return wrapped if wrapped.strip() else None # Fallback: search using artist+title only. q_text = f"{artist} {title}" url = f"{base}/search?{urlencode({'q': q_text})}" items = _http_get_json_list(url) or [] for item in items: if not isinstance(item, dict): continue synced = item.get("syncedLyrics") if isinstance(synced, str) and synced.strip(): _log("LRCLIB: search hit with syncedLyrics") return synced # Plain lyrics fallback from search if available for item in items: if not isinstance(item, dict): continue plain = item.get("plainLyrics") if isinstance(plain, str) and plain.strip(): _log("LRCLIB: search hit only plainLyrics; wrapping") wrapped = _wrap_plain_lyrics_as_lrc(plain) return wrapped if wrapped.strip() else None return None def _fetch_lyrics_ovh(*, artist: Optional[str], title: Optional[str]) -> Optional[str]: # Public, no-auth lyrics provider (typically plain lyrics, not time-synced). if not artist or not title: return None try: # Endpoint uses path segments, so we urlencode each part. from urllib.parse import quote url = f"https://api.lyrics.ovh/v1/{quote(artist)}/{quote(title)}" obj = _http_get_json(url) if not isinstance(obj, dict): return None lyr = obj.get("lyrics") if isinstance(lyr, str) and lyr.strip(): _log("lyrics.ovh: got plain lyrics; wrapping") wrapped = _wrap_plain_lyrics_as_lrc(lyr) return wrapped if wrapped.strip() else None except Exception as exc: _log(f"lyrics.ovh failed: {exc}") return None try: print(line, file=sys.stderr) except Exception: pass @dataclass(frozen=True) class LrcLine: time_s: float text: str def _frac_to_ms(frac: str) -> int: # LRC commonly uses centiseconds (2 digits), but can be 1–3 digits. if not frac: return 0 if len(frac) == 3: return int(frac) if len(frac) == 2: return int(frac) * 10 return int(frac) * 100 def parse_lrc(text: str) -> List[LrcLine]: """Parse LRC into sorted timestamped lines.""" offset_ms = 0 lines: List[LrcLine] = [] for raw_line in text.splitlines(): line = raw_line.strip("\ufeff\r\n") if not line: continue # Optional global offset. off_m = _OFFSET_RE.match(line) if off_m: try: offset_ms = int(off_m.group("ms")) except Exception: offset_ms = 0 continue matches = list(_TIMESTAMP_RE.finditer(line)) if not matches: # Ignore non-timestamp metadata lines like [ar:], [ti:], etc. continue lyric_text = line[matches[-1].end() :].strip() for m in matches: mm = int(m.group("m")) ss = int(m.group("s")) frac = m.group("frac") or "" ts_ms = (mm * 60 + ss) * 1000 + _frac_to_ms(frac) + offset_ms if ts_ms < 0: continue lines.append(LrcLine(time_s=ts_ms / 1000.0, text=lyric_text)) # Sort and de-dupe by timestamp (prefer last non-empty text). lines.sort(key=lambda x: x.time_s) deduped: List[LrcLine] = [] for item in lines: if deduped and abs(deduped[-1].time_s - item.time_s) < 1e-6: if item.text: deduped[-1] = item else: deduped.append(item) return deduped def _read_all_stdin() -> str: return sys.stdin.read() def _current_index(time_s: float, times: List[float]) -> int: # Index of last timestamp <= time_s return bisect.bisect_right(times, time_s) - 1 def _unwrap_memory_m3u(text: Optional[str]) -> Optional[str]: """Extract the real target URL/path from a memory:// M3U payload.""" if not isinstance(text, str) or not text.startswith("memory://"): return text for line in text.splitlines(): s = line.strip() if not s or s.startswith("#") or s.startswith("memory://"): continue return s return text def _extract_hash_from_target(target: str) -> Optional[str]: if not isinstance(target, str): return None m = _HYDRUS_HASH_QS_RE.search(target) if m: return m.group(1).lower() # Fallback: plain hash string s = target.strip().lower() if _HASH_RE.fullmatch(s): return s return None def _load_config_best_effort() -> dict: try: from config import load_config cfg = load_config() return cfg if isinstance(cfg, dict) else {} except Exception: return {} def _extract_lrc_from_notes(notes: Dict[str, str]) -> Optional[str]: """Return raw LRC text from the note named 'lyric'.""" if not isinstance(notes, dict) or not notes: return None raw = None for k, v in notes.items(): if not isinstance(k, str): continue if k.strip() == "lyric": raw = v break if not isinstance(raw, str): return None text = raw.strip("\ufeff\r\n") return text if text.strip() else None def _extract_sub_from_notes(notes: Dict[str, str]) -> Optional[str]: """Return raw subtitle text from the note named 'sub'.""" if not isinstance(notes, dict) or not notes: return None raw = None for k, v in notes.items(): if not isinstance(k, str): continue if k.strip() == "sub": raw = v break if not isinstance(raw, str): return None text = raw.strip("\ufeff\r\n") return text if text.strip() else None def _infer_sub_extension(text: str) -> str: # Best-effort: mpv generally understands SRT/VTT; choose based on content. t = (text or "").lstrip("\ufeff\r\n").lstrip() if t.upper().startswith("WEBVTT"): return ".vtt" if "-->" in t: # SRT typically uses commas for milliseconds, VTT uses dots. if re.search(r"\d\d:\d\d:\d\d,\d\d\d\s*-->\s*\d\d:\d\d:\d\d,\d\d\d", t): return ".srt" return ".vtt" return ".vtt" def _write_temp_sub_file(*, key: str, text: str) -> Path: # Write to a content-addressed temp path so updates force mpv reload. tmp_dir = Path(tempfile.gettempdir()) / "medeia-mpv-notes" tmp_dir.mkdir(parents=True, exist_ok=True) ext = _infer_sub_extension(text) digest = hashlib.sha1((key + "\n" + (text or "")).encode("utf-8", errors="ignore")).hexdigest()[:16] safe_key = hashlib.sha1((key or "").encode("utf-8", errors="ignore")).hexdigest()[:12] path = (tmp_dir / f"sub-{safe_key}-{digest}{ext}").resolve() path.write_text(text or "", encoding="utf-8", errors="replace") return path def _try_remove_selected_external_sub(client: MPVIPCClient) -> None: try: client.send_command({"command": ["sub-remove"]}) except Exception: return def _try_add_external_sub(client: MPVIPCClient, path: Path) -> None: try: client.send_command({"command": ["sub-add", str(path), "select", "medeia-sub"]}) except Exception: return def _is_stream_target(target: str) -> bool: """Return True when mpv's 'path' is not a local filesystem file. We intentionally treat any URL/streaming scheme as invalid for lyrics in auto mode. """ if not isinstance(target, str): return False s = target.strip() if not s: return False # Windows local paths: drive letter or UNC. if _WIN_DRIVE_RE.match(s) or _WIN_UNC_RE.match(s): return False # Common streaming prefixes. if s.startswith("http://") or s.startswith("https://"): return True # Generic scheme:// (e.g. ytdl://, edl://, rtmp://, etc.). if "://" in s: try: parsed = urlparse(s) scheme = (parsed.scheme or "").lower() if scheme and scheme not in {"file"}: return True except Exception: return True return False def _normalize_file_uri_target(target: str) -> str: """Convert file:// URIs to a local filesystem path string when possible.""" if not isinstance(target, str): return target s = target.strip() if not s: return target if not s.lower().startswith("file://"): return target try: parsed = urlparse(s) path = unquote(parsed.path or "") if os.name == "nt": # UNC: file://server/share/path -> \\server\share\path if parsed.netloc: p = path.replace("/", "\\") if p.startswith("\\"): p = p.lstrip("\\") return f"\\\\{parsed.netloc}\\{p}" if p else f"\\\\{parsed.netloc}" # Drive letter: file:///C:/path -> C:/path if path.startswith("/") and len(path) >= 3 and path[2] == ":": path = path[1:] return path or target except Exception: return target def _extract_store_from_url_target(target: str) -> Optional[str]: """Extract explicit store name from a URL query param `store=...` (if present).""" if not isinstance(target, str): return None s = target.strip() if not (s.startswith("http://") or s.startswith("https://")): return None try: parsed = urlparse(s) if not parsed.query: return None qs = parse_qs(parsed.query) raw = qs.get("store", [None])[0] if isinstance(raw, str) and raw.strip(): return raw.strip() except Exception: return None return None def _infer_hydrus_store_from_url_target(*, target: str, config: dict) -> Optional[str]: """Infer a Hydrus store backend by matching the URL prefix to the backend base URL.""" if not isinstance(target, str): return None s = target.strip() if not (s.startswith("http://") or s.startswith("https://")): return None try: from Store import Store as StoreRegistry reg = StoreRegistry(config, suppress_debug=True) backends = [(name, reg[name]) for name in reg.list_backends()] except Exception: return None matches: List[str] = [] for name, backend in backends: if type(backend).__name__ != "HydrusNetwork": continue base_url = getattr(backend, "_url", None) if not base_url: client = getattr(backend, "_client", None) base_url = getattr(client, "url", None) if client else None if not base_url: continue base = str(base_url).rstrip("/") if s.startswith(base): matches.append(name) if len(matches) == 1: return matches[0] return None def _resolve_store_backend_for_target( *, target: str, file_hash: str, config: dict, ) -> tuple[Optional[str], Any]: """Resolve a store backend for a local mpv target using the store DB. A target is considered valid only when: - target is a local filesystem file - a backend's get_file(hash) returns a local file path - that path resolves to the same target path """ try: p = Path(target) if not p.exists() or not p.is_file(): return None, None target_resolved = p.resolve() except Exception: return None, None try: from Store import Store as StoreRegistry reg = StoreRegistry(config, suppress_debug=True) backend_names = list(reg.list_backends()) except Exception: return None, None # Prefer the inferred Folder store (fast), but still validate via get_file(). preferred = _infer_store_for_target(target=target, config=config) if preferred and preferred in backend_names: backend_names.remove(preferred) backend_names.insert(0, preferred) for name in backend_names: try: backend = reg[name] except Exception: continue store_file = None try: store_file = backend.get_file(file_hash, config=config) except TypeError: try: store_file = backend.get_file(file_hash) except Exception: store_file = None except Exception: store_file = None if not store_file: continue # Only accept local files; if the backend returns a URL, it's not valid for lyrics. try: store_path = Path(str(store_file)).expanduser() if not store_path.exists() or not store_path.is_file(): continue if store_path.resolve() != target_resolved: continue except Exception: continue return name, backend return None, None def _infer_store_for_target(*, target: str, config: dict) -> Optional[str]: """Infer store name from the current mpv target (local path under a folder root). Note: URLs/streams are intentionally not mapped to stores for lyrics. """ if isinstance(target, str) and _is_stream_target(target): return None try: from Store import Store as StoreRegistry reg = StoreRegistry(config, suppress_debug=True) backends = [(name, reg[name]) for name in reg.list_backends()] except Exception: backends = [] # Local file path: choose the deepest Folder root that contains it. try: p = Path(target) if not p.exists() or not p.is_file(): return None p_str = str(p.resolve()).lower() except Exception: return None best: Optional[str] = None best_len = -1 for name, backend in backends: if type(backend).__name__ != "Folder": continue root = None try: root = getattr(backend, "_location", None) or getattr(backend, "location", lambda: None)() except Exception: root = None if not root: continue try: root_path = Path(str(root)).expanduser().resolve() root_str = str(root_path).lower().rstrip("\\/") except Exception: continue if p_str.startswith(root_str) and len(root_str) > best_len: best = name best_len = len(root_str) return best def _infer_hash_for_target(target: str) -> Optional[str]: """Infer SHA256 hash from Hydrus URL query, hash-named local files, or by hashing local file content.""" h = _extract_hash_from_target(target) if h: return h try: p = Path(target) if not p.exists() or not p.is_file(): return None stem = p.stem if isinstance(stem, str) and _HASH_RE.fullmatch(stem.strip()): return stem.strip().lower() from SYS.utils import sha256_file return sha256_file(p) except Exception: return None def run_auto_overlay(*, mpv: MPV, poll_s: float = 0.15, config: Optional[dict] = None) -> int: """Auto mode: track mpv's current file and render lyrics (note: 'lyric') or load subtitles (note: 'sub').""" cfg = config or {} client = mpv.client() if not client.connect(): _log("mpv IPC is not reachable (is mpv running with --input-ipc-server?).") return 3 _log(f"Auto overlay connected (ipc={getattr(mpv, 'ipc_path', None)})") last_target: Optional[str] = None current_store_name: Optional[str] = None current_file_hash: Optional[str] = None current_key: Optional[str] = None current_backend: Optional[Any] = None last_loaded_key: Optional[str] = None last_loaded_mode: Optional[str] = None # 'lyric' | 'sub' last_loaded_sub_path: Optional[Path] = None last_fetch_attempt_key: Optional[str] = None last_fetch_attempt_at: float = 0.0 entries: List[LrcLine] = [] times: List[float] = [] last_idx: Optional[int] = None last_text: Optional[str] = None last_visible: Optional[bool] = None while True: try: # Toggle support (mpv Lua script sets this property; default to visible). visible_raw = _ipc_get_property(client, _LYRIC_VISIBLE_PROP, True, raise_on_disconnect=True) raw_path = _ipc_get_property(client, "path", None, raise_on_disconnect=True) except ConnectionError: try: _osd_overlay_clear(client) except Exception: pass try: client.disconnect() except Exception: pass if not client.connect(): _log("mpv IPC disconnected; exiting MPV.lyric") return 4 time.sleep(poll_s) continue visible = bool(visible_raw) if isinstance(visible_raw, (bool, int)) else True if last_visible is None: last_visible = visible elif last_visible is True and visible is False: # Clear immediately when switching off. try: _osd_overlay_clear(client) except Exception: pass last_idx = None last_text = None last_visible = visible elif last_visible is False and visible is True: # Force a refresh on next tick. last_idx = None last_text = None last_visible = visible else: last_visible = visible target = _unwrap_memory_m3u(str(raw_path)) if isinstance(raw_path, str) else None if isinstance(target, str): target = _normalize_file_uri_target(target) if not isinstance(target, str) or not target: time.sleep(poll_s) continue is_http = target.startswith("http://") or target.startswith("https://") if (not is_http) and _is_stream_target(target): # Non-http streams (ytdl://, edl://, rtmp://, etc.) are never valid for lyrics. if last_loaded_key is not None: try: _osd_overlay_clear(client) except Exception: pass if last_loaded_sub_path is not None: _try_remove_selected_external_sub(client) last_loaded_sub_path = None last_target = target current_store_name = None current_file_hash = None current_key = None current_backend = None entries = [] times = [] last_loaded_key = None last_loaded_mode = None time.sleep(poll_s) continue if target != last_target: last_target = target last_idx = None last_text = None _log(f"Target changed: {target}") current_file_hash = _infer_hash_for_target(target) if not current_file_hash: entries = [] times = [] if last_loaded_key is not None: _osd_overlay_clear(client) last_loaded_key = None last_loaded_mode = None if last_loaded_sub_path is not None: _try_remove_selected_external_sub(client) last_loaded_sub_path = None time.sleep(poll_s) continue if is_http: # HTTP/HTTPS targets are only valid if they map to a store backend. store_from_url = _extract_store_from_url_target(target) store_name = store_from_url or _infer_hydrus_store_from_url_target(target=target, config=cfg) if not store_name: _log("HTTP target has no store mapping; lyrics disabled") current_store_name = None current_backend = None current_key = None entries = [] times = [] if last_loaded_key is not None: _osd_overlay_clear(client) last_loaded_key = None last_loaded_mode = None if last_loaded_sub_path is not None: _try_remove_selected_external_sub(client) last_loaded_sub_path = None time.sleep(poll_s) continue try: from Store import Store as StoreRegistry reg = StoreRegistry(cfg, suppress_debug=True) current_backend = reg[store_name] current_store_name = store_name except Exception: _log(f"HTTP target store {store_name!r} not available; lyrics disabled") current_store_name = None current_backend = None current_key = None entries = [] times = [] if last_loaded_key is not None: _osd_overlay_clear(client) last_loaded_key = None last_loaded_mode = None if last_loaded_sub_path is not None: _try_remove_selected_external_sub(client) last_loaded_sub_path = None time.sleep(poll_s) continue # Optional existence check: if metadata is unavailable, treat as not-a-store-item. try: meta = current_backend.get_metadata(current_file_hash, config=cfg) except Exception: meta = None if meta is None: _log(f"HTTP target not found in store DB (store={store_name!r} hash={current_file_hash}); lyrics disabled") current_store_name = None current_backend = None current_key = None entries = [] times = [] if last_loaded_key is not None: _osd_overlay_clear(client) last_loaded_key = None last_loaded_mode = None if last_loaded_sub_path is not None: _try_remove_selected_external_sub(client) last_loaded_sub_path = None time.sleep(poll_s) continue current_key = f"{current_store_name}:{current_file_hash}" _log(f"Resolved store={current_store_name!r} hash={current_file_hash!r} valid=True") else: # Local files: resolve store item via store DB. If not resolvable, lyrics are disabled. current_store_name, current_backend = _resolve_store_backend_for_target( target=target, file_hash=current_file_hash, config=cfg, ) current_key = f"{current_store_name}:{current_file_hash}" if current_store_name and current_file_hash else None _log(f"Resolved store={current_store_name!r} hash={current_file_hash!r} valid={bool(current_key)}") if not current_key or not current_backend: current_store_name = None current_backend = None current_key = None entries = [] times = [] if last_loaded_key is not None: _osd_overlay_clear(client) last_loaded_key = None last_loaded_mode = None if last_loaded_sub_path is not None: _try_remove_selected_external_sub(client) last_loaded_sub_path = None time.sleep(poll_s) continue # Load/reload lyrics when we have a resolvable key and it differs from what we loaded. # This is important for the autofetch path: the note can appear without the mpv target changing. if current_key and current_key != last_loaded_key and current_store_name and current_file_hash and current_backend: notes: Dict[str, str] = {} try: notes = current_backend.get_note(current_file_hash, config=cfg) or {} except Exception: notes = {} try: _log(f"Loaded notes keys: {sorted([str(k) for k in notes.keys()]) if isinstance(notes, dict) else 'N/A'}") except Exception: _log("Loaded notes keys: ") sub_text = _extract_sub_from_notes(notes) if sub_text: # Treat subtitles as an alternative to lyrics; do not show the lyric overlay. try: _osd_overlay_clear(client) except Exception: pass try: sub_path = _write_temp_sub_file(key=current_key, text=sub_text) except Exception as exc: _log(f"Failed to write sub note temp file: {exc}") sub_path = None if sub_path is not None: # If we previously loaded a sub, remove it first to avoid stacking. if last_loaded_sub_path is not None: _try_remove_selected_external_sub(client) _try_add_external_sub(client, sub_path) last_loaded_sub_path = sub_path entries = [] times = [] last_loaded_key = current_key last_loaded_mode = "sub" else: # Switching away from sub-note mode: best-effort unload the selected external subtitle. if last_loaded_mode == "sub" and last_loaded_sub_path is not None: _try_remove_selected_external_sub(client) last_loaded_sub_path = None lrc_text = _extract_lrc_from_notes(notes) if not lrc_text: _log("No lyric note found (note name: 'lyric')") # Auto-fetch path: fetch and persist lyrics into the note named 'lyric'. # Throttle attempts per key to avoid hammering APIs. autofetch_enabled = bool(cfg.get("lyric_autofetch", True)) now = time.time() if autofetch_enabled and current_key != last_fetch_attempt_key and (now - last_fetch_attempt_at) > 2.0: last_fetch_attempt_key = current_key last_fetch_attempt_at = now artist = None title = None duration_s = None try: duration_s = _ipc_get_property(client, "duration", None) except Exception: duration_s = None # Use store tags only (artist:/title:). No filename/metadata/media-title fallbacks. try: tags, _src = current_backend.get_tag(current_file_hash, config=cfg) if isinstance(tags, list): artist, title = _infer_artist_title_from_tags([str(x) for x in tags]) except Exception: pass _log(f"Autofetch query artist={artist!r} title={title!r} duration={duration_s!r}") if not artist or not title: _log("Autofetch skipped: requires both artist and title") fetched = None else: fetched = _fetch_lrclib( artist=artist, title=title, duration_s=float(duration_s) if isinstance(duration_s, (int, float)) else None, ) if not fetched or not fetched.strip(): fetched = _fetch_lyrics_ovh(artist=artist, title=title) if fetched and fetched.strip(): try: ok = bool(current_backend.set_note(current_file_hash, "lyric", fetched, config=cfg)) _log(f"Autofetch stored lyric note ok={ok}") # Next loop iteration will re-load the note. except Exception as exc: _log(f"Autofetch failed to store lyric note: {exc}") else: _log("Autofetch: no lyrics found") entries = [] times = [] if last_loaded_key is not None: _osd_overlay_clear(client) last_loaded_key = None last_loaded_mode = None else: _log(f"Loaded lyric note ({len(lrc_text)} chars)") parsed = parse_lrc(lrc_text) entries = parsed times = [e.time_s for e in entries] last_loaded_key = current_key last_loaded_mode = "lyric" try: # mpv returns None when idle/no file. t = _ipc_get_property(client, "time-pos", None, raise_on_disconnect=True) except ConnectionError: try: _osd_overlay_clear(client) except Exception: pass try: client.disconnect() except Exception: pass if not client.connect(): _log("mpv IPC disconnected; exiting MPV.lyric") return 4 time.sleep(poll_s) continue if not isinstance(t, (int, float)): time.sleep(poll_s) continue if not entries: time.sleep(poll_s) continue if not visible: time.sleep(poll_s) continue idx = _current_index(float(t), times) if idx < 0: time.sleep(poll_s) continue line = entries[idx] if idx != last_idx or line.text != last_text: # osd-overlay has no duration; refresh periodically. resp = _osd_overlay_set_ass(client, _format_lyric_as_subtitle(line.text)) if resp is None: client.disconnect() if not client.connect(): print("Lost mpv IPC connection.", file=sys.stderr) return 4 elif isinstance(resp, dict) and resp.get("error") not in (None, "success"): try: _log(f"mpv osd-overlay returned error={resp.get('error')!r}") except Exception: pass last_idx = idx last_text = line.text time.sleep(poll_s) def run_overlay(*, mpv: MPV, entries: List[LrcLine], poll_s: float = 0.15) -> int: if not entries: print("No timestamped LRC lines found.", file=sys.stderr) return 2 times = [e.time_s for e in entries] last_idx: Optional[int] = None last_text: Optional[str] = None client = mpv.client() if not client.connect(): print("mpv IPC is not reachable (is mpv running with --input-ipc-server?).", file=sys.stderr) return 3 while True: try: # mpv returns None when idle/no file. t = _ipc_get_property(client, "time-pos", None, raise_on_disconnect=True) except ConnectionError: try: _osd_overlay_clear(client) except Exception: pass try: client.disconnect() except Exception: pass if not client.connect(): print("Lost mpv IPC connection.", file=sys.stderr) return 4 time.sleep(poll_s) continue if not isinstance(t, (int, float)): time.sleep(poll_s) continue idx = _current_index(float(t), times) if idx < 0: # Before first lyric timestamp. time.sleep(poll_s) continue line = entries[idx] if idx != last_idx or line.text != last_text: # osd-overlay has no duration; refresh periodically. resp = _osd_overlay_set_ass(client, _format_lyric_as_subtitle(line.text)) if resp is None: client.disconnect() if not client.connect(): print("Lost mpv IPC connection.", file=sys.stderr) return 4 elif isinstance(resp, dict) and resp.get("error") not in (None, "success"): try: _log(f"mpv osd-overlay returned error={resp.get('error')!r}") except Exception: pass last_idx = idx last_text = line.text time.sleep(poll_s) def main(argv: Optional[List[str]] = None) -> int: parser = argparse.ArgumentParser(prog="python -m MPV.lyric", add_help=True) parser.add_argument( "--ipc", default=None, help="mpv IPC path. Defaults to the repo's fixed IPC pipe name.", ) parser.add_argument( "--lrc", default=None, help="Path to an .lrc file. If omitted, reads LRC from stdin.", ) parser.add_argument( "--poll", type=float, default=0.15, help="Polling interval in seconds for time-pos updates.", ) parser.add_argument( "--log", default=None, help="Optional path to a log file for diagnostics.", ) args = parser.parse_args(argv) # Configure logging early. global _LOG_FH if args.log: try: log_path = Path(str(args.log)).expanduser().resolve() log_path.parent.mkdir(parents=True, exist_ok=True) _LOG_FH = open(log_path, "a", encoding="utf-8", errors="replace") _log("MPV.lyric starting") except Exception: _LOG_FH = None mpv = MPV(ipc_path=args.ipc) if args.ipc else MPV() # Prevent multiple lyric helpers from running at once for the same mpv IPC. if not _acquire_single_instance_lock(getattr(mpv, "ipc_path", "") or ""): _log("Another MPV.lyric instance is already running for this IPC; exiting.") return 0 # If --lrc is provided, use it. if args.lrc: with open(args.lrc, "r", encoding="utf-8", errors="replace") as f: lrc_text = f.read() entries = parse_lrc(lrc_text) try: return run_overlay(mpv=mpv, entries=entries, poll_s=float(args.poll)) except KeyboardInterrupt: return 0 # Otherwise: if stdin has content, treat it as LRC; if stdin is empty/TTY, auto-discover. lrc_text = "" try: if not sys.stdin.isatty(): lrc_text = _read_all_stdin() or "" except Exception: lrc_text = "" if lrc_text.strip(): entries = parse_lrc(lrc_text) try: return run_overlay(mpv=mpv, entries=entries, poll_s=float(args.poll)) except KeyboardInterrupt: return 0 cfg = _load_config_best_effort() try: return run_auto_overlay(mpv=mpv, poll_s=float(args.poll), config=cfg) except KeyboardInterrupt: return 0 if __name__ == "__main__": raise SystemExit(main())