Files
Medios-Macina/MPV/lyric.py

1410 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

r"""Timed lyric overlay for mpv via JSON IPC.
This is intentionally implemented from scratch (no vendored/copied code) while
providing the same *kind* of functionality as popular mpv lyric scripts:
- Parse LRC (timestamped lyrics)
- Track mpv playback time via IPC
- Show the current line on mpv's OSD
Primary intended usage in this repo:
- Auto mode (no stdin / no --lrc): loads lyrics from store notes.
A lyric note is stored under the note name 'lyric'.
- If the lyric note is missing, auto mode will attempt to auto-fetch synced lyrics
from a public API (LRCLIB) and store it into the 'lyric' note.
You can disable this by setting config key `lyric_autofetch` to false.
- You can still pipe LRC into this script (stdin) and it will render lyrics in mpv.
Example (PowerShell):
Get-Content .\song.lrc | python -m MPV.lyric
If you want to connect to a non-default mpv IPC server:
Get-Content .\song.lrc | python -m MPV.lyric --ipc "\\.\pipe\mpv-custom"
"""
from __future__ import annotations
import argparse
import bisect
import hashlib
import os
import re
import sys
import tempfile
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, TextIO
from urllib.parse import parse_qs, unquote, urlencode
from urllib.request import Request, urlopen
from urllib.parse import urlparse
from MPV.mpv_ipc import MPV, MPVIPCClient
_TIMESTAMP_RE = re.compile(r"\[(?P<m>\d+):(?P<s>\d{2})(?:\.(?P<frac>\d{1,3}))?\]")
_OFFSET_RE = re.compile(r"^\[offset:(?P<ms>[+-]?\d+)\]$", re.IGNORECASE)
_HASH_RE = re.compile(r"[0-9a-f]{64}", re.IGNORECASE)
_HYDRUS_HASH_QS_RE = re.compile(r"hash=([0-9a-f]{64})", re.IGNORECASE)
_WIN_DRIVE_RE = re.compile(r"^[a-zA-Z]:[\\/]")
_WIN_UNC_RE = re.compile(r"^\\\\")
_LOG_FH: Optional[TextIO] = None
_SINGLE_INSTANCE_LOCK_FH: Optional[TextIO] = None
_LYRIC_VISIBLE_PROP = "user-data/medeia-lyric-visible"
# mpv osd-overlay IDs are scoped to the IPC client connection.
# MPV.lyric keeps a persistent connection, so we can safely reuse a constant ID.
_LYRIC_OSD_OVERLAY_ID = 4242
def _single_instance_lock_path(ipc_path: str) -> Path:
# Key the lock to the mpv IPC target so multiple mpv instances with different
# IPC servers can still run independent lyric helpers.
key = hashlib.sha1((ipc_path or "").encode("utf-8", errors="ignore")).hexdigest()
tmp_dir = Path(tempfile.gettempdir())
return (tmp_dir / f"medeia-mpv-lyric-{key}.lock").resolve()
def _acquire_single_instance_lock(ipc_path: str) -> bool:
"""Ensure only one MPV.lyric process runs per IPC server.
This prevents duplicate overlays (e.g. one old show-text overlay + one new osd-overlay).
"""
global _SINGLE_INSTANCE_LOCK_FH
if _SINGLE_INSTANCE_LOCK_FH is not None:
return True
lock_path = _single_instance_lock_path(ipc_path)
lock_path.parent.mkdir(parents=True, exist_ok=True)
try:
fh = open(lock_path, "a", encoding="utf-8", errors="replace")
except Exception:
# If we can't create the lock file, don't block playback; just proceed.
return True
try:
if os.name == "nt":
import msvcrt
# Lock the first byte (non-blocking).
msvcrt.locking(fh.fileno(), msvcrt.LK_NBLCK, 1)
else:
import fcntl
fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
_SINGLE_INSTANCE_LOCK_FH = fh
try:
fh.write(f"pid={os.getpid()} ipc={ipc_path}\n")
fh.flush()
except Exception:
pass
return True
except Exception:
try:
fh.close()
except Exception:
pass
return False
def _ass_escape(text: str) -> str:
# Escape braces/backslashes so lyric text can't break ASS formatting.
t = str(text or "")
t = t.replace("\\", "\\\\")
t = t.replace("{", "\\{")
t = t.replace("}", "\\}")
t = t.replace("\r\n", "\n").replace("\r", "\n")
t = t.replace("\n", "\\N")
return t
def _format_lyric_as_subtitle(text: str) -> str:
# Bottom-center like a subtitle (ASS alignment 2).
# NOTE: show-text escapes ASS by default; we use osd-overlay so this is honored.
return "{\\an2}" + _ass_escape(text)
def _osd_overlay_set_ass(client: MPVIPCClient, ass_text: str) -> Optional[dict]:
# Use osd-overlay with ass-events so ASS override tags (e.g. {\an2}) are applied.
# Keep z low so UI scripts (like uosc) can draw above it if they use higher z.
return client.send_command(
{
"command": {
"name": "osd-overlay",
"id": _LYRIC_OSD_OVERLAY_ID,
"format": "ass-events",
"data": ass_text,
"res_y": 720,
"z": -50,
}
}
)
def _osd_overlay_clear(client: MPVIPCClient) -> None:
client.send_command(
{
"command": {
"name": "osd-overlay",
"id": _LYRIC_OSD_OVERLAY_ID,
"format": "none"
}
}
)
def _log(msg: str) -> None:
line = f"[{datetime.now().isoformat(timespec='seconds')}] {msg}"
try:
if _LOG_FH is not None:
_LOG_FH.write(line + "\n")
_LOG_FH.flush()
return
except Exception:
pass
print(line, file=sys.stderr, flush=True)
def _ipc_get_property(
client: MPVIPCClient,
name: str,
default: object = None,
*,
raise_on_disconnect: bool = False,
) -> object:
resp = client.send_command({
"command": ["get_property",
name]
})
if resp is None:
if raise_on_disconnect:
raise ConnectionError("Lost mpv IPC connection")
return default
if resp and resp.get("error") == "success":
return resp.get("data", default)
return default
def _http_get_json(url: str, *, timeout_s: float = 10.0) -> Optional[dict]:
try:
req = Request(
url,
headers={
"User-Agent": "medeia-macina/lyric",
"Accept": "application/json",
},
method="GET",
)
with urlopen(req, timeout=timeout_s) as resp:
data = resp.read()
import json
obj = json.loads(data.decode("utf-8", errors="replace"))
return obj if isinstance(obj, dict) else None
except Exception as exc:
_log(f"HTTP JSON failed: {exc} ({url})")
return None
def _http_get_json_list(url: str, *, timeout_s: float = 10.0) -> Optional[list]:
try:
req = Request(
url,
headers={
"User-Agent": "medeia-macina/lyric",
"Accept": "application/json",
},
method="GET",
)
with urlopen(req, timeout=timeout_s) as resp:
data = resp.read()
import json
obj = json.loads(data.decode("utf-8", errors="replace"))
return obj if isinstance(obj, list) else None
except Exception as exc:
_log(f"HTTP JSON(list) failed: {exc} ({url})")
return None
def _sanitize_query(s: Optional[str]) -> Optional[str]:
if not isinstance(s, str):
return None
t = s.strip().strip("\ufeff")
return t if t else None
def _infer_artist_title_from_tags(
tags: List[str]
) -> tuple[Optional[str],
Optional[str]]:
artist = None
title = None
for t in tags or []:
ts = str(t)
low = ts.lower()
if low.startswith("artist:") and artist is None:
artist = ts.split(":", 1)[1].strip() or None
elif low.startswith("title:") and title is None:
title = ts.split(":", 1)[1].strip() or None
if artist and title:
break
return _sanitize_query(artist), _sanitize_query(title)
def _wrap_plain_lyrics_as_lrc(text: str) -> str:
# Fallback: create a crude LRC that advances every 4 seconds.
# This is intentionally simple and deterministic.
lines = [ln.strip() for ln in (text or "").splitlines()]
lines = [ln for ln in lines if ln]
if not lines:
return ""
out: List[str] = []
t_s = 0
for ln in lines:
mm = t_s // 60
ss = t_s % 60
out.append(f"[{mm:02d}:{ss:02d}.00]{ln}")
t_s += 4
return "\n".join(out) + "\n"
def _fetch_lrclib(
*,
artist: Optional[str],
title: Optional[str],
duration_s: Optional[float] = None
) -> Optional[str]:
base = "https://lrclib.net/api"
# Require both artist and title; title-only lookups cause frequent mismatches.
if not artist or not title:
return None
# Try direct get.
q: Dict[str,
str] = {
"artist_name": artist,
"track_name": title,
}
if isinstance(duration_s, (int, float)) and duration_s and duration_s > 0:
q["duration"] = str(int(duration_s))
url = f"{base}/get?{urlencode(q)}"
obj = _http_get_json(url)
if isinstance(obj, dict):
synced = obj.get("syncedLyrics")
if isinstance(synced, str) and synced.strip():
_log("LRCLIB: got syncedLyrics")
return synced
plain = obj.get("plainLyrics")
if isinstance(plain, str) and plain.strip():
_log("LRCLIB: only plainLyrics; wrapping")
wrapped = _wrap_plain_lyrics_as_lrc(plain)
return wrapped if wrapped.strip() else None
# Fallback: search using artist+title only.
q_text = f"{artist} {title}"
url = f"{base}/search?{urlencode({'q': q_text})}"
items = _http_get_json_list(url) or []
for item in items:
if not isinstance(item, dict):
continue
synced = item.get("syncedLyrics")
if isinstance(synced, str) and synced.strip():
_log("LRCLIB: search hit with syncedLyrics")
return synced
# Plain lyrics fallback from search if available
for item in items:
if not isinstance(item, dict):
continue
plain = item.get("plainLyrics")
if isinstance(plain, str) and plain.strip():
_log("LRCLIB: search hit only plainLyrics; wrapping")
wrapped = _wrap_plain_lyrics_as_lrc(plain)
return wrapped if wrapped.strip() else None
return None
def _fetch_lyrics_ovh(*, artist: Optional[str], title: Optional[str]) -> Optional[str]:
# Public, no-auth lyrics provider (typically plain lyrics, not time-synced).
if not artist or not title:
return None
try:
# Endpoint uses path segments, so we urlencode each part.
from urllib.parse import quote
url = f"https://api.lyrics.ovh/v1/{quote(artist)}/{quote(title)}"
obj = _http_get_json(url)
if not isinstance(obj, dict):
return None
lyr = obj.get("lyrics")
if isinstance(lyr, str) and lyr.strip():
_log("lyrics.ovh: got plain lyrics; wrapping")
wrapped = _wrap_plain_lyrics_as_lrc(lyr)
return wrapped if wrapped.strip() else None
except Exception as exc:
_log(f"lyrics.ovh failed: {exc}")
return None
try:
print(line, file=sys.stderr)
except Exception:
pass
@dataclass(frozen=True)
class LrcLine:
time_s: float
text: str
def _frac_to_ms(frac: str) -> int:
# LRC commonly uses centiseconds (2 digits), but can be 13 digits.
if not frac:
return 0
if len(frac) == 3:
return int(frac)
if len(frac) == 2:
return int(frac) * 10
return int(frac) * 100
def parse_lrc(text: str) -> List[LrcLine]:
"""Parse LRC into sorted timestamped lines."""
offset_ms = 0
lines: List[LrcLine] = []
for raw_line in text.splitlines():
line = raw_line.strip("\ufeff\r\n")
if not line:
continue
# Optional global offset.
off_m = _OFFSET_RE.match(line)
if off_m:
try:
offset_ms = int(off_m.group("ms"))
except Exception:
offset_ms = 0
continue
matches = list(_TIMESTAMP_RE.finditer(line))
if not matches:
# Ignore non-timestamp metadata lines like [ar:], [ti:], etc.
continue
lyric_text = line[matches[-1].end():].strip()
for m in matches:
mm = int(m.group("m"))
ss = int(m.group("s"))
frac = m.group("frac") or ""
ts_ms = (mm * 60 + ss) * 1000 + _frac_to_ms(frac) + offset_ms
if ts_ms < 0:
continue
lines.append(LrcLine(time_s=ts_ms / 1000.0, text=lyric_text))
# Sort and de-dupe by timestamp (prefer last non-empty text).
lines.sort(key=lambda x: x.time_s)
deduped: List[LrcLine] = []
for item in lines:
if deduped and abs(deduped[-1].time_s - item.time_s) < 1e-6:
if item.text:
deduped[-1] = item
else:
deduped.append(item)
return deduped
def _read_all_stdin() -> str:
return sys.stdin.read()
def _current_index(time_s: float, times: List[float]) -> int:
# Index of last timestamp <= time_s
return bisect.bisect_right(times, time_s) - 1
def _unwrap_memory_m3u(text: Optional[str]) -> Optional[str]:
"""Extract the real target URL/path from a memory:// M3U payload."""
if not isinstance(text, str) or not text.startswith("memory://"):
return text
for line in text.splitlines():
s = line.strip()
if not s or s.startswith("#") or s.startswith("memory://"):
continue
return s
return text
def _extract_hash_from_target(target: str) -> Optional[str]:
if not isinstance(target, str):
return None
m = _HYDRUS_HASH_QS_RE.search(target)
if m:
return m.group(1).lower()
# Fallback: plain hash string
s = target.strip().lower()
if _HASH_RE.fullmatch(s):
return s
return None
def _load_config_best_effort() -> dict:
try:
from SYS.config import load_config
cfg = load_config()
return cfg if isinstance(cfg,
dict) else {}
except Exception:
return {}
def _extract_lrc_from_notes(notes: Dict[str, str]) -> Optional[str]:
"""Return raw LRC text from the note named 'lyric'."""
if not isinstance(notes, dict) or not notes:
return None
raw = None
for k, v in notes.items():
if not isinstance(k, str):
continue
if k.strip() == "lyric":
raw = v
break
if not isinstance(raw, str):
return None
text = raw.strip("\ufeff\r\n")
return text if text.strip() else None
def _extract_sub_from_notes(notes: Dict[str, str]) -> Optional[str]:
"""Return raw subtitle text from the note named 'sub'."""
if not isinstance(notes, dict) or not notes:
return None
raw = None
for k, v in notes.items():
if not isinstance(k, str):
continue
if k.strip() == "sub":
raw = v
break
if not isinstance(raw, str):
return None
text = raw.strip("\ufeff\r\n")
return text if text.strip() else None
def _infer_sub_extension(text: str) -> str:
# Best-effort: mpv generally understands SRT/VTT; choose based on content.
t = (text or "").lstrip("\ufeff\r\n").lstrip()
if t.upper().startswith("WEBVTT"):
return ".vtt"
if "-->" in t:
# SRT typically uses commas for milliseconds, VTT uses dots.
if re.search(r"\d\d:\d\d:\d\d,\d\d\d\s*-->\s*\d\d:\d\d:\d\d,\d\d\d", t):
return ".srt"
return ".vtt"
return ".vtt"
def _write_temp_sub_file(*, key: str, text: str) -> Path:
# Write to a content-addressed temp path so updates force mpv reload.
tmp_dir = Path(tempfile.gettempdir()) / "medeia-mpv-notes"
tmp_dir.mkdir(parents=True, exist_ok=True)
ext = _infer_sub_extension(text)
digest = hashlib.sha1((key + "\n" + (text or "")).encode("utf-8",
errors="ignore")
).hexdigest()[:16]
safe_key = hashlib.sha1((key or "").encode("utf-8",
errors="ignore")).hexdigest()[:12]
path = (tmp_dir / f"sub-{safe_key}-{digest}{ext}").resolve()
path.write_text(text or "", encoding="utf-8", errors="replace")
return path
def _try_remove_selected_external_sub(client: MPVIPCClient) -> None:
try:
client.send_command({
"command": ["sub-remove"]
})
except Exception:
return
def _try_add_external_sub(client: MPVIPCClient, path: Path) -> None:
try:
client.send_command(
{
"command": ["sub-add",
str(path),
"select",
"medeia-sub"]
}
)
except Exception:
return
def _is_stream_target(target: str) -> bool:
"""Return True when mpv's 'path' is not a local filesystem file.
We intentionally treat any URL/streaming scheme as invalid for lyrics in auto mode.
"""
if not isinstance(target, str):
return False
s = target.strip()
if not s:
return False
# Windows local paths: drive letter or UNC.
if _WIN_DRIVE_RE.match(s) or _WIN_UNC_RE.match(s):
return False
# Common streaming prefixes.
if s.startswith("http://") or s.startswith("https://"):
return True
# Generic scheme:// (e.g. ytdl://, edl://, rtmp://, etc.).
if "://" in s:
try:
parsed = urlparse(s)
scheme = (parsed.scheme or "").lower()
if scheme and scheme not in {"file"}:
return True
except Exception:
return True
return False
def _normalize_file_uri_target(target: str) -> str:
"""Convert file:// URIs to a local filesystem path string when possible."""
if not isinstance(target, str):
return target
s = target.strip()
if not s:
return target
if not s.lower().startswith("file://"):
return target
try:
parsed = urlparse(s)
path = unquote(parsed.path or "")
if os.name == "nt":
# UNC: file://server/share/path -> \\server\share\path
if parsed.netloc:
p = path.replace("/", "\\")
if p.startswith("\\"):
p = p.lstrip("\\")
return f"\\\\{parsed.netloc}\\{p}" if p else f"\\\\{parsed.netloc}"
# Drive letter: file:///C:/path -> C:/path
if path.startswith("/") and len(path) >= 3 and path[2] == ":":
path = path[1:]
return path or target
except Exception:
return target
def _extract_store_from_url_target(target: str) -> Optional[str]:
"""Extract explicit store name from a URL query param `store=...` (if present)."""
if not isinstance(target, str):
return None
s = target.strip()
if not (s.startswith("http://") or s.startswith("https://")):
return None
try:
parsed = urlparse(s)
if not parsed.query:
return None
qs = parse_qs(parsed.query)
raw = qs.get("store", [None])[0]
if isinstance(raw, str) and raw.strip():
return raw.strip()
except Exception:
return None
return None
def _infer_hydrus_store_from_url_target(*, target: str, config: dict) -> Optional[str]:
"""Infer a Hydrus store backend by matching the URL prefix to the backend base URL."""
if not isinstance(target, str):
return None
s = target.strip()
if not (s.startswith("http://") or s.startswith("https://")):
return None
try:
from Store import Store as StoreRegistry
reg = StoreRegistry(config, suppress_debug=True)
backends = [(name, reg[name]) for name in reg.list_backends()]
except Exception:
return None
matches: List[str] = []
for name, backend in backends:
if type(backend).__name__ != "HydrusNetwork":
continue
base_url = getattr(backend, "_url", None)
if not base_url:
client = getattr(backend, "_client", None)
base_url = getattr(client, "url", None) if client else None
if not base_url:
continue
base = str(base_url).rstrip("/")
if s.startswith(base):
matches.append(name)
if len(matches) == 1:
return matches[0]
return None
def _resolve_store_backend_for_target(
*,
target: str,
file_hash: str,
config: dict,
) -> tuple[Optional[str],
Any]:
"""Resolve a store backend for a local mpv target using the store DB.
A target is considered valid only when:
- target is a local filesystem file
- a backend's get_file(hash) returns a local file path
- that path resolves to the same target path
"""
try:
p = Path(target)
if not p.exists() or not p.is_file():
return None, None
target_resolved = p.resolve()
except Exception:
return None, None
try:
from Store import Store as StoreRegistry
reg = StoreRegistry(config, suppress_debug=True)
backend_names = list(reg.list_backends())
except Exception:
return None, None
# Prefer the inferred Folder store (fast), but still validate via get_file().
preferred = _infer_store_for_target(target=target, config=config)
if preferred and preferred in backend_names:
backend_names.remove(preferred)
backend_names.insert(0, preferred)
for name in backend_names:
try:
backend = reg[name]
except Exception:
continue
store_file = None
try:
store_file = backend.get_file(file_hash, config=config)
except TypeError:
try:
store_file = backend.get_file(file_hash)
except Exception:
store_file = None
except Exception:
store_file = None
if not store_file:
continue
# Only accept local files; if the backend returns a URL, it's not valid for lyrics.
try:
store_path = Path(str(store_file)).expanduser()
if not store_path.exists() or not store_path.is_file():
continue
if store_path.resolve() != target_resolved:
continue
except Exception:
continue
return name, backend
return None, None
def _infer_store_for_target(*, target: str, config: dict) -> Optional[str]:
"""Infer store name from the current mpv target (local path under a folder root).
Note: URLs/streams are intentionally not mapped to stores for lyrics.
"""
if isinstance(target, str) and _is_stream_target(target):
return None
try:
from Store import Store as StoreRegistry
reg = StoreRegistry(config, suppress_debug=True)
backends = [(name, reg[name]) for name in reg.list_backends()]
except Exception:
backends = []
# Local file path: choose the deepest Folder root that contains it.
try:
p = Path(target)
if not p.exists() or not p.is_file():
return None
p_str = str(p.resolve()).lower()
except Exception:
return None
best: Optional[str] = None
best_len = -1
for name, backend in backends:
if type(backend).__name__ != "Folder":
continue
root = None
try:
root = (
getattr(backend,
"_location",
None) or getattr(backend,
"location", lambda: None)()
)
except Exception:
root = None
if not root:
continue
try:
root_path = Path(str(root)).expanduser().resolve()
root_str = str(root_path).lower().rstrip("\\/")
except Exception:
continue
if p_str.startswith(root_str) and len(root_str) > best_len:
best = name
best_len = len(root_str)
return best
def _infer_hash_for_target(target: str) -> Optional[str]:
"""Infer SHA256 hash from Hydrus URL query, hash-named local files, or by hashing local file content."""
h = _extract_hash_from_target(target)
if h:
return h
try:
p = Path(target)
if not p.exists() or not p.is_file():
return None
stem = p.stem
if isinstance(stem, str) and _HASH_RE.fullmatch(stem.strip()):
return stem.strip().lower()
from SYS.utils import sha256_file
return sha256_file(p)
except Exception:
return None
def run_auto_overlay(
*,
mpv: MPV,
poll_s: float = 0.15,
config: Optional[dict] = None
) -> int:
"""Auto mode: track mpv's current file and render lyrics (note: 'lyric') or load subtitles (note: 'sub')."""
cfg = config or {}
client = mpv.client()
if not client.connect():
_log("mpv IPC is not reachable (is mpv running with --input-ipc-server?).")
return 3
_log(f"Auto overlay connected (ipc={getattr(mpv, 'ipc_path', None)})")
last_target: Optional[str] = None
current_store_name: Optional[str] = None
current_file_hash: Optional[str] = None
current_key: Optional[str] = None
current_backend: Optional[Any] = None
last_loaded_key: Optional[str] = None
last_loaded_mode: Optional[str] = None # 'lyric' | 'sub'
last_loaded_sub_path: Optional[Path] = None
last_fetch_attempt_key: Optional[str] = None
last_fetch_attempt_at: float = 0.0
entries: List[LrcLine] = []
times: List[float] = []
last_idx: Optional[int] = None
last_text: Optional[str] = None
last_visible: Optional[bool] = None
while True:
try:
# Toggle support (mpv Lua script sets this property; default to visible).
visible_raw = _ipc_get_property(
client,
_LYRIC_VISIBLE_PROP,
True,
raise_on_disconnect=True
)
raw_path = _ipc_get_property(client, "path", None, raise_on_disconnect=True)
except ConnectionError:
try:
_osd_overlay_clear(client)
except Exception:
pass
try:
client.disconnect()
except Exception:
pass
if not client.connect():
_log("mpv IPC disconnected; exiting MPV.lyric")
return 4
time.sleep(poll_s)
continue
visible = bool(visible_raw) if isinstance(visible_raw, (bool, int)) else True
if last_visible is None:
last_visible = visible
elif last_visible is True and visible is False:
# Clear immediately when switching off.
try:
_osd_overlay_clear(client)
except Exception:
pass
# Also remove any external subtitle that may be showing lyrics so
# turning lyrics "off" leaves no text on screen.
try:
_try_remove_selected_external_sub(client)
except Exception:
pass
last_idx = None
last_text = None
last_visible = visible
elif last_visible is False and visible is True:
# Force a refresh on next tick.
last_idx = None
last_text = None
last_visible = visible
else:
last_visible = visible
target = _unwrap_memory_m3u(str(raw_path)
) if isinstance(raw_path,
str) else None
if isinstance(target, str):
target = _normalize_file_uri_target(target)
if not isinstance(target, str) or not target:
time.sleep(poll_s)
continue
is_http = target.startswith("http://") or target.startswith("https://")
if (not is_http) and _is_stream_target(target):
# Non-http streams (ytdl://, edl://, rtmp://, etc.) are never valid for lyrics.
if last_loaded_key is not None:
try:
_osd_overlay_clear(client)
except Exception:
pass
if last_loaded_sub_path is not None:
_try_remove_selected_external_sub(client)
last_loaded_sub_path = None
last_target = target
current_store_name = None
current_file_hash = None
current_key = None
current_backend = None
entries = []
times = []
last_loaded_key = None
last_loaded_mode = None
time.sleep(poll_s)
continue
if target != last_target:
last_target = target
last_idx = None
last_text = None
_log(f"Target changed: {target}")
current_file_hash = _infer_hash_for_target(target)
if not current_file_hash:
entries = []
times = []
if last_loaded_key is not None:
_osd_overlay_clear(client)
last_loaded_key = None
last_loaded_mode = None
if last_loaded_sub_path is not None:
_try_remove_selected_external_sub(client)
last_loaded_sub_path = None
time.sleep(poll_s)
continue
if is_http:
# HTTP/HTTPS targets are only valid if they map to a store backend.
store_from_url = _extract_store_from_url_target(target)
store_name = store_from_url or _infer_hydrus_store_from_url_target(
target=target,
config=cfg
)
if not store_name:
_log("HTTP target has no store mapping; lyrics disabled")
current_store_name = None
current_backend = None
current_key = None
entries = []
times = []
if last_loaded_key is not None:
_osd_overlay_clear(client)
last_loaded_key = None
last_loaded_mode = None
if last_loaded_sub_path is not None:
_try_remove_selected_external_sub(client)
last_loaded_sub_path = None
time.sleep(poll_s)
continue
try:
from Store import Store as StoreRegistry
reg = StoreRegistry(cfg, suppress_debug=True)
current_backend = reg[store_name]
current_store_name = store_name
except Exception:
_log(
f"HTTP target store {store_name!r} not available; lyrics disabled"
)
current_store_name = None
current_backend = None
current_key = None
entries = []
times = []
if last_loaded_key is not None:
_osd_overlay_clear(client)
last_loaded_key = None
last_loaded_mode = None
if last_loaded_sub_path is not None:
_try_remove_selected_external_sub(client)
last_loaded_sub_path = None
time.sleep(poll_s)
continue
# Optional existence check: if metadata is unavailable, treat as not-a-store-item.
try:
meta = current_backend.get_metadata(current_file_hash, config=cfg)
except Exception:
meta = None
if meta is None:
_log(
f"HTTP target not found in store DB (store={store_name!r} hash={current_file_hash}); lyrics disabled"
)
current_store_name = None
current_backend = None
current_key = None
entries = []
times = []
if last_loaded_key is not None:
_osd_overlay_clear(client)
last_loaded_key = None
last_loaded_mode = None
if last_loaded_sub_path is not None:
_try_remove_selected_external_sub(client)
last_loaded_sub_path = None
time.sleep(poll_s)
continue
current_key = f"{current_store_name}:{current_file_hash}"
_log(
f"Resolved store={current_store_name!r} hash={current_file_hash!r} valid=True"
)
else:
# Local files: resolve store item via store DB. If not resolvable, lyrics are disabled.
current_store_name, current_backend = _resolve_store_backend_for_target(
target=target,
file_hash=current_file_hash,
config=cfg,
)
current_key = (
f"{current_store_name}:{current_file_hash}"
if current_store_name and current_file_hash else None
)
_log(
f"Resolved store={current_store_name!r} hash={current_file_hash!r} valid={bool(current_key)}"
)
if not current_key or not current_backend:
current_store_name = None
current_backend = None
current_key = None
entries = []
times = []
if last_loaded_key is not None:
_osd_overlay_clear(client)
last_loaded_key = None
last_loaded_mode = None
if last_loaded_sub_path is not None:
_try_remove_selected_external_sub(client)
last_loaded_sub_path = None
time.sleep(poll_s)
continue
# Load/reload lyrics when we have a resolvable key and it differs from what we loaded.
# This is important for the autofetch path: the note can appear without the mpv target changing.
if (current_key and current_key != last_loaded_key and current_store_name
and current_file_hash and current_backend):
notes: Dict[str,
str] = {}
try:
notes = current_backend.get_note(
current_file_hash,
config=cfg
) or {}
except Exception:
notes = {}
try:
_log(
f"Loaded notes keys: {sorted([str(k) for k in notes.keys()]) if isinstance(notes, dict) else 'N/A'}"
)
except Exception:
_log("Loaded notes keys: <error>")
sub_text = _extract_sub_from_notes(notes)
if sub_text:
# Treat subtitles as an alternative to lyrics; do not show the lyric overlay.
try:
_osd_overlay_clear(client)
except Exception:
pass
try:
sub_path = _write_temp_sub_file(key=current_key, text=sub_text)
except Exception as exc:
_log(f"Failed to write sub note temp file: {exc}")
sub_path = None
if sub_path is not None:
# If we previously loaded a sub, remove it first to avoid stacking.
if last_loaded_sub_path is not None:
_try_remove_selected_external_sub(client)
_try_add_external_sub(client, sub_path)
last_loaded_sub_path = sub_path
entries = []
times = []
last_loaded_key = current_key
last_loaded_mode = "sub"
else:
# Switching away from sub-note mode: best-effort unload the selected external subtitle.
if last_loaded_mode == "sub" and last_loaded_sub_path is not None:
_try_remove_selected_external_sub(client)
last_loaded_sub_path = None
lrc_text = _extract_lrc_from_notes(notes)
if not lrc_text:
_log("No lyric note found (note name: 'lyric')")
# Auto-fetch path: fetch and persist lyrics into the note named 'lyric'.
# Throttle attempts per key to avoid hammering APIs.
autofetch_enabled = bool(cfg.get("lyric_autofetch", True))
now = time.time()
if (autofetch_enabled and current_key != last_fetch_attempt_key
and (now - last_fetch_attempt_at) > 2.0):
last_fetch_attempt_key = current_key
last_fetch_attempt_at = now
artist = None
title = None
duration_s = None
try:
duration_s = _ipc_get_property(client, "duration", None)
except Exception:
duration_s = None
# Use store tags only (artist:/title:). No filename/metadata/media-title fallbacks.
try:
tags, _src = current_backend.get_tag(current_file_hash, config=cfg)
if isinstance(tags, list):
artist, title = _infer_artist_title_from_tags([str(x) for x in tags])
except Exception:
pass
_log(
f"Autofetch query artist={artist!r} title={title!r} duration={duration_s!r}"
)
if not artist or not title:
_log("Autofetch skipped: requires both artist and title")
fetched = None
else:
fetched = _fetch_lrclib(
artist=artist,
title=title,
duration_s=(
float(duration_s)
if isinstance(duration_s,
(int,
float)) else None
),
)
if not fetched or not fetched.strip():
fetched = _fetch_lyrics_ovh(artist=artist, title=title)
if fetched and fetched.strip():
try:
ok = bool(
current_backend.set_note(
current_file_hash,
"lyric",
fetched,
config=cfg
)
)
_log(f"Autofetch stored lyric note ok={ok}")
# Next loop iteration will re-load the note.
except Exception as exc:
_log(f"Autofetch failed to store lyric note: {exc}")
else:
_log("Autofetch: no lyrics found")
entries = []
times = []
if last_loaded_key is not None:
_osd_overlay_clear(client)
last_loaded_key = None
last_loaded_mode = None
else:
_log(f"Loaded lyric note ({len(lrc_text)} chars)")
parsed = parse_lrc(lrc_text)
entries = parsed
times = [e.time_s for e in entries]
last_loaded_key = current_key
last_loaded_mode = "lyric"
try:
# mpv returns None when idle/no file.
t = _ipc_get_property(client, "time-pos", None, raise_on_disconnect=True)
except ConnectionError:
try:
_osd_overlay_clear(client)
except Exception:
pass
try:
client.disconnect()
except Exception:
pass
if not client.connect():
_log("mpv IPC disconnected; exiting MPV.lyric")
return 4
time.sleep(poll_s)
continue
if not isinstance(t, (int, float)):
time.sleep(poll_s)
continue
if not entries:
time.sleep(poll_s)
continue
if not visible:
time.sleep(poll_s)
continue
idx = _current_index(float(t), times)
if idx < 0:
time.sleep(poll_s)
continue
line = entries[idx]
if idx != last_idx or line.text != last_text:
# osd-overlay has no duration; refresh periodically.
resp = _osd_overlay_set_ass(client, _format_lyric_as_subtitle(line.text))
if resp is None:
client.disconnect()
if not client.connect():
print("Lost mpv IPC connection.", file=sys.stderr)
return 4
elif isinstance(resp, dict) and resp.get("error") not in (None, "success"):
try:
_log(f"mpv osd-overlay returned error={resp.get('error')!r}")
except Exception:
pass
last_idx = idx
last_text = line.text
time.sleep(poll_s)
def run_overlay(*, mpv: MPV, entries: List[LrcLine], poll_s: float = 0.15) -> int:
if not entries:
print("No timestamped LRC lines found.", file=sys.stderr)
return 2
times = [e.time_s for e in entries]
last_idx: Optional[int] = None
last_text: Optional[str] = None
client = mpv.client()
if not client.connect():
print(
"mpv IPC is not reachable (is mpv running with --input-ipc-server?).",
file=sys.stderr
)
return 3
while True:
try:
# mpv returns None when idle/no file.
t = _ipc_get_property(client, "time-pos", None, raise_on_disconnect=True)
except ConnectionError:
try:
_osd_overlay_clear(client)
except Exception:
pass
try:
client.disconnect()
except Exception:
pass
if not client.connect():
print("Lost mpv IPC connection.", file=sys.stderr)
return 4
time.sleep(poll_s)
continue
if not isinstance(t, (int, float)):
time.sleep(poll_s)
continue
idx = _current_index(float(t), times)
if idx < 0:
# Before first lyric timestamp.
time.sleep(poll_s)
continue
line = entries[idx]
if idx != last_idx or line.text != last_text:
# osd-overlay has no duration; refresh periodically.
resp = _osd_overlay_set_ass(client, _format_lyric_as_subtitle(line.text))
if resp is None:
client.disconnect()
if not client.connect():
print("Lost mpv IPC connection.", file=sys.stderr)
return 4
elif isinstance(resp, dict) and resp.get("error") not in (None, "success"):
try:
_log(f"mpv osd-overlay returned error={resp.get('error')!r}")
except Exception:
pass
last_idx = idx
last_text = line.text
time.sleep(poll_s)
def main(argv: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser(prog="python -m MPV.lyric", add_help=True)
parser.add_argument(
"--ipc",
default=None,
help="mpv IPC path. Defaults to the repo's fixed IPC pipe name.",
)
parser.add_argument(
"--lrc",
default=None,
help="Path to an .lrc file. If omitted, reads LRC from stdin.",
)
parser.add_argument(
"--poll",
type=float,
default=0.15,
help="Polling interval in seconds for time-pos updates.",
)
parser.add_argument(
"--log",
default=None,
help="Optional path to a log file for diagnostics.",
)
args = parser.parse_args(argv)
# Configure logging early.
global _LOG_FH
if args.log:
try:
log_path = Path(str(args.log)).expanduser().resolve()
log_path.parent.mkdir(parents=True, exist_ok=True)
_LOG_FH = open(log_path, "a", encoding="utf-8", errors="replace")
_log("MPV.lyric starting")
except Exception:
_LOG_FH = None
mpv = MPV(ipc_path=args.ipc) if args.ipc else MPV()
# Prevent multiple lyric helpers from running at once for the same mpv IPC.
if not _acquire_single_instance_lock(getattr(mpv, "ipc_path", "") or ""):
_log("Another MPV.lyric instance is already running for this IPC; exiting.")
return 0
# If --lrc is provided, use it.
if args.lrc:
with open(args.lrc, "r", encoding="utf-8", errors="replace") as f:
lrc_text = f.read()
entries = parse_lrc(lrc_text)
try:
return run_overlay(mpv=mpv, entries=entries, poll_s=float(args.poll))
except KeyboardInterrupt:
return 0
# Otherwise: if stdin has content, treat it as LRC; if stdin is empty/TTY, auto-discover.
lrc_text = ""
try:
if not sys.stdin.isatty():
lrc_text = _read_all_stdin() or ""
except Exception:
lrc_text = ""
if lrc_text.strip():
entries = parse_lrc(lrc_text)
try:
return run_overlay(mpv=mpv, entries=entries, poll_s=float(args.poll))
except KeyboardInterrupt:
return 0
cfg = _load_config_best_effort()
try:
return run_auto_overlay(mpv=mpv, poll_s=float(args.poll), config=cfg)
except KeyboardInterrupt:
return 0
if __name__ == "__main__":
raise SystemExit(main())