Files
Medios-Macina/MPV/pipeline_helper.py
2026-03-21 19:02:30 -07:00

1892 lines
63 KiB
Python

"""Persistent MPV pipeline helper.
This process connects to MPV's IPC server, observes a user-data property for
pipeline execution requests, runs the pipeline in-process, and posts results
back to MPV via user-data properties.
Why:
- Avoid spawning a new Python process for every MPV action.
- Enable MPV Lua scripts to trigger any cmdlet pipeline cheaply.
Protocol (user-data properties):
- Request: user-data/medeia-pipeline-request (JSON string)
{"id": "...", "pipeline": "...", "seeds": [...]} (seeds optional)
- Response: user-data/medeia-pipeline-response (JSON string)
{"id": "...", "success": bool, "stdout": "...", "stderr": "...", "error": "..."}
- Ready: user-data/medeia-pipeline-ready ("1")
This helper is intentionally minimal: one request at a time, last-write-wins.
"""
from __future__ import annotations
MEDEIA_MPV_HELPER_VERSION = "2026-03-22.2"
import argparse
import json
import os
import sys
import tempfile
import time
import threading
import logging
import re
import hashlib
import subprocess
import platform
from pathlib import Path
from typing import Any, Callable, Dict, Optional
def _repo_root() -> Path:
return Path(__file__).resolve().parent.parent
def _runtime_config_root() -> Path:
"""Best-effort config root for runtime execution.
MPV can spawn this helper from an installed location while setting `cwd` to
the repo root (see MPV.mpv_ipc). Prefer `cwd` when it contains `config.conf`.
"""
try:
cwd = Path.cwd().resolve()
if (cwd / "config.conf").exists():
return cwd
except Exception:
pass
return _repo_root()
# Make repo-local packages importable even when mpv starts us from another cwd.
_ROOT = str(_repo_root())
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
from MPV.mpv_ipc import MPVIPCClient, _windows_kill_pids, _windows_hidden_subprocess_kwargs # noqa: E402
from SYS.config import load_config, reload_config # noqa: E402
from SYS.logger import set_debug, debug, set_thread_stream # noqa: E402
from SYS.repl_queue import enqueue_repl_command # noqa: E402
from SYS.utils import format_bytes # noqa: E402
REQUEST_PROP = "user-data/medeia-pipeline-request"
RESPONSE_PROP = "user-data/medeia-pipeline-response"
READY_PROP = "user-data/medeia-pipeline-ready"
VERSION_PROP = "user-data/medeia-pipeline-helper-version"
OBS_ID_REQUEST = 1001
_HELPER_MPV_LOG_EMITTER: Optional[Callable[[str], None]] = None
_HELPER_LOG_BACKLOG: list[str] = []
_HELPER_LOG_BACKLOG_LIMIT = 200
_ASYNC_PIPELINE_JOBS: Dict[str, Dict[str, Any]] = {}
_ASYNC_PIPELINE_JOBS_LOCK = threading.Lock()
_ASYNC_PIPELINE_JOB_TTL_SECONDS = 900.0
_STORE_CHOICES_CACHE: list[str] = []
_STORE_CHOICES_CACHE_LOCK = threading.Lock()
def _normalize_store_choices(values: Any) -> list[str]:
out: list[str] = []
seen: set[str] = set()
if not isinstance(values, (list, tuple, set)):
return out
for item in values:
text = str(item or "").strip()
if not text:
continue
key = text.casefold()
if key in seen:
continue
seen.add(key)
out.append(text)
return sorted(out, key=str.casefold)
def _get_cached_store_choices() -> list[str]:
with _STORE_CHOICES_CACHE_LOCK:
return list(_STORE_CHOICES_CACHE)
def _set_cached_store_choices(choices: Any) -> list[str]:
normalized = _normalize_store_choices(choices)
with _STORE_CHOICES_CACHE_LOCK:
_STORE_CHOICES_CACHE[:] = normalized
return list(_STORE_CHOICES_CACHE)
def _store_choices_payload(choices: Any) -> Optional[str]:
cached = _normalize_store_choices(choices)
if not cached:
return None
return json.dumps(
{
"success": True,
"choices": cached,
},
ensure_ascii=False,
)
def _load_store_choices_from_config(*, force_reload: bool = False) -> list[str]:
from Store.registry import list_configured_backend_names # noqa: WPS433
cfg = reload_config() if force_reload else load_config()
return _normalize_store_choices(list_configured_backend_names(cfg or {}))
def _prune_async_pipeline_jobs(now: Optional[float] = None) -> None:
cutoff = float(now or time.time()) - _ASYNC_PIPELINE_JOB_TTL_SECONDS
with _ASYNC_PIPELINE_JOBS_LOCK:
expired = [
job_id
for job_id, job in _ASYNC_PIPELINE_JOBS.items()
if float(job.get("updated_at") or 0.0) < cutoff
]
for job_id in expired:
_ASYNC_PIPELINE_JOBS.pop(job_id, None)
def _update_async_pipeline_job(job_id: str, **fields: Any) -> Optional[Dict[str, Any]]:
if not job_id:
return None
now = time.time()
_prune_async_pipeline_jobs(now)
with _ASYNC_PIPELINE_JOBS_LOCK:
job = dict(_ASYNC_PIPELINE_JOBS.get(job_id) or {})
if not job:
job = {
"job_id": job_id,
"status": "queued",
"success": False,
"created_at": now,
}
job.update(fields)
job["updated_at"] = now
_ASYNC_PIPELINE_JOBS[job_id] = job
return dict(job)
def _get_async_pipeline_job(job_id: str) -> Optional[Dict[str, Any]]:
if not job_id:
return None
_prune_async_pipeline_jobs()
with _ASYNC_PIPELINE_JOBS_LOCK:
job = _ASYNC_PIPELINE_JOBS.get(job_id)
return dict(job) if job else None
def _append_prefixed_log_lines(prefix: str, text: Any, *, max_lines: int = 40) -> None:
payload = str(text or "").replace("\r\n", "\n").replace("\r", "\n")
if not payload.strip():
return
emitted = 0
for line in payload.split("\n"):
line = line.rstrip()
if not line:
continue
_append_helper_log(f"{prefix}{line}")
emitted += 1
if emitted >= max_lines:
_append_helper_log(f"{prefix}... truncated after {max_lines} lines")
break
def _start_ready_heartbeat(ipc_path: str, stop_event: threading.Event) -> threading.Thread:
"""Keep READY_PROP fresh even when the main loop blocks on Windows pipes."""
def _heartbeat_loop() -> None:
hb_client = MPVIPCClient(socket_path=ipc_path, timeout=0.5, silent=True)
while not stop_event.is_set():
try:
if hb_client.sock is None and not hb_client.connect():
stop_event.wait(0.25)
continue
hb_client.send_command_no_wait(
["set_property_string", READY_PROP, str(int(time.time()))]
)
except Exception:
try:
hb_client.disconnect()
except Exception:
pass
stop_event.wait(0.75)
try:
hb_client.disconnect()
except Exception:
pass
thread = threading.Thread(
target=_heartbeat_loop,
name="mpv-helper-heartbeat",
daemon=True,
)
thread.start()
return thread
def _windows_list_pipeline_helper_pids(ipc_path: str) -> list[int]:
if platform.system() != "Windows":
return []
try:
ipc_path = str(ipc_path or "")
except Exception:
ipc_path = ""
if not ipc_path:
return []
ps_script = (
"$ipc = " + json.dumps(ipc_path) + "; "
"Get-CimInstance Win32_Process | "
"Where-Object { $_.CommandLine -and (($_.CommandLine -match 'pipeline_helper\\.py') -or ($_.CommandLine -match ' -m\\s+MPV\\.pipeline_helper(\\s|$)')) -and $_.CommandLine -match ('--ipc\\s+' + [regex]::Escape($ipc)) } | "
"Select-Object -ExpandProperty ProcessId | ConvertTo-Json -Compress"
)
try:
out = subprocess.check_output(
["powershell", "-NoProfile", "-Command", ps_script],
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=3,
text=True,
**_windows_hidden_subprocess_kwargs(),
)
except Exception:
return []
txt = (out or "").strip()
if not txt or txt == "null":
return []
try:
obj = json.loads(txt)
except Exception:
return []
raw_pids = obj if isinstance(obj, list) else [obj]
out_pids: list[int] = []
for value in raw_pids:
try:
pid = int(value)
except Exception:
continue
if pid > 0 and pid not in out_pids:
out_pids.append(pid)
return out_pids
def _run_pipeline(
pipeline_text: str,
*,
seeds: Any = None,
json_output: bool = False,
) -> Dict[str, Any]:
# Import after sys.path fix.
from TUI.pipeline_runner import PipelineRunner # noqa: WPS433
def _json_safe(value: Any) -> Any:
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, dict):
out: Dict[str, Any] = {}
for key, item in value.items():
out[str(key)] = _json_safe(item)
return out
if isinstance(value, (list, tuple, set)):
return [_json_safe(item) for item in value]
if hasattr(value, "to_dict") and callable(getattr(value, "to_dict")):
try:
return _json_safe(value.to_dict())
except Exception:
pass
return str(value)
def _table_to_payload(table: Any) -> Optional[Dict[str, Any]]:
if table is None:
return None
try:
title = getattr(table, "title", "")
except Exception:
title = ""
rows_payload = []
try:
rows = getattr(table, "rows", None)
except Exception:
rows = None
if isinstance(rows, list):
for r in rows:
cols_payload = []
try:
cols = getattr(r, "columns", None)
except Exception:
cols = None
if isinstance(cols, list):
for c in cols:
try:
cols_payload.append(
{
"name": getattr(c,
"name",
""),
"value": getattr(c,
"value",
""),
}
)
except Exception:
continue
sel_args = None
try:
sel = getattr(r, "selection_args", None)
if isinstance(sel, list):
sel_args = [str(x) for x in sel]
except Exception:
sel_args = None
rows_payload.append(
{
"columns": cols_payload,
"selection_args": sel_args
}
)
# Only return JSON-serializable data (Lua only needs title + rows).
return {
"title": str(title or ""),
"rows": rows_payload
}
start_time = time.time()
runner = PipelineRunner()
result = runner.run_pipeline(pipeline_text, seeds=seeds)
duration = time.time() - start_time
try:
_append_helper_log(
f"[pipeline] run_pipeline completed in {duration:.2f}s pipeline={pipeline_text[:64]}"
)
except Exception:
pass
table_payload = None
try:
table_payload = _table_to_payload(getattr(result, "result_table", None))
except Exception:
table_payload = None
data_payload = None
if json_output:
try:
data_payload = _json_safe(getattr(result, "emitted", None) or [])
except Exception:
data_payload = []
return {
"success": bool(result.success),
"stdout": result.stdout or "",
"stderr": result.stderr or "",
"error": result.error,
"table": table_payload,
"data": data_payload,
}
def _run_pipeline_background(
pipeline_text: str,
*,
seeds: Any,
req_id: str,
job_id: Optional[str] = None,
json_output: bool = False,
) -> None:
def _target() -> None:
try:
if job_id:
_update_async_pipeline_job(
job_id,
status="running",
success=False,
pipeline=pipeline_text,
req_id=req_id,
started_at=time.time(),
finished_at=None,
error=None,
stdout="",
stderr="",
table=None,
data=None,
)
result = _run_pipeline(
pipeline_text,
seeds=seeds,
json_output=json_output,
)
status = "success" if result.get("success") else "failed"
if job_id:
_update_async_pipeline_job(
job_id,
status=status,
success=bool(result.get("success")),
finished_at=time.time(),
error=result.get("error"),
stdout=result.get("stdout", ""),
stderr=result.get("stderr", ""),
table=result.get("table"),
data=result.get("data"),
)
_append_helper_log(
f"[pipeline async {req_id}] {status}"
+ (f" job={job_id}" if job_id else "")
+ f" error={result.get('error')}"
)
_append_prefixed_log_lines(
f"[pipeline async stdout {req_id}] ",
result.get("stdout", ""),
)
_append_prefixed_log_lines(
f"[pipeline async stderr {req_id}] ",
result.get("stderr", ""),
)
except Exception as exc: # pragma: no cover - best-effort logging
if job_id:
_update_async_pipeline_job(
job_id,
status="failed",
success=False,
finished_at=time.time(),
error=f"{type(exc).__name__}: {exc}",
stdout="",
stderr="",
table=None,
data=None,
)
_append_helper_log(
f"[pipeline async {req_id}] exception"
+ (f" job={job_id}" if job_id else "")
+ f": {type(exc).__name__}: {exc}"
)
thread = threading.Thread(
target=_target,
name=f"pipeline-async-{req_id}",
daemon=True,
)
thread.start()
def _is_load_url_pipeline(pipeline_text: str) -> bool:
return str(pipeline_text or "").lstrip().lower().startswith(".mpv -url")
def _run_op(op: str, data: Any) -> Dict[str, Any]:
"""Run a helper-only operation.
These are NOT cmdlets and are not available via CLI pipelines. They exist
solely so MPV Lua can query lightweight metadata (e.g., autocomplete lists)
without inventing user-facing commands.
"""
op_name = str(op or "").strip().lower()
if op_name in {"run-background",
"run_background",
"pipeline-background",
"pipeline_background"}:
pipeline_text = ""
seeds = None
json_output = False
requested_job_id = ""
if isinstance(data, dict):
pipeline_text = str(data.get("pipeline") or "").strip()
seeds = data.get("seeds")
json_output = bool(data.get("json") or data.get("output_json"))
requested_job_id = str(data.get("job_id") or "").strip()
if not pipeline_text:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": "Missing pipeline",
"table": None,
}
token = requested_job_id or f"job-{int(time.time() * 1000)}-{threading.get_ident()}"
job_id = hashlib.sha1(
f"{token}|{pipeline_text}|{time.time()}".encode("utf-8", "ignore")
).hexdigest()[:16]
_update_async_pipeline_job(
job_id,
status="queued",
success=False,
pipeline=pipeline_text,
req_id=job_id,
finished_at=None,
error=None,
stdout="",
stderr="",
table=None,
data=None,
)
_run_pipeline_background(
pipeline_text,
seeds=seeds,
req_id=job_id,
job_id=job_id,
json_output=json_output,
)
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": None,
"job_id": job_id,
"status": "queued",
}
if op_name in {"job-status",
"job_status",
"pipeline-job-status",
"pipeline_job_status"}:
job_id = ""
if isinstance(data, dict):
job_id = str(data.get("job_id") or "").strip()
if not job_id:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": "Missing job_id",
"table": None,
}
job = _get_async_pipeline_job(job_id)
if not job:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": f"Unknown job_id: {job_id}",
"table": None,
}
payload = dict(job)
return {
"success": True,
"stdout": str(payload.get("stdout") or ""),
"stderr": str(payload.get("stderr") or ""),
"error": payload.get("error"),
"table": payload.get("table"),
"data": payload.get("data"),
"job": payload,
}
if op_name in {"queue-repl-command",
"queue_repl_command",
"repl-command",
"repl_command"}:
command_text = ""
source = "mpv"
metadata = None
if isinstance(data, dict):
command_text = str(data.get("command") or data.get("pipeline") or "").strip()
source = str(data.get("source") or "mpv").strip() or "mpv"
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else None
if not command_text:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": "Missing command",
"table": None,
}
queue_path = enqueue_repl_command(
_repo_root(),
command_text,
source=source,
metadata=metadata,
)
_append_helper_log(
f"[repl-queue] queued source={source} path={queue_path.name} cmd={command_text}"
)
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": None,
"path": str(queue_path),
"queued": True,
}
if op_name in {"run-detached",
"run_detached",
"pipeline-detached",
"pipeline_detached"}:
pipeline_text = ""
seeds = None
if isinstance(data, dict):
pipeline_text = str(data.get("pipeline") or "").strip()
seeds = data.get("seeds")
if not pipeline_text:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": "Missing pipeline",
"table": None,
}
py = sys.executable or "python"
if platform.system() == "Windows":
try:
exe = str(py or "").strip()
except Exception:
exe = ""
low = exe.lower()
if low.endswith("python.exe"):
try:
candidate = exe[:-10] + "pythonw.exe"
if os.path.exists(candidate):
py = candidate
except Exception:
pass
cmd = [
py,
str((_repo_root() / "CLI.py").resolve()),
"pipeline",
"--pipeline",
pipeline_text,
]
if seeds is not None:
try:
cmd.extend(["--seeds-json", json.dumps(seeds, ensure_ascii=False)])
except Exception:
# Best-effort; seeds are optional.
pass
popen_kwargs: Dict[str,
Any] = {
"stdin": subprocess.DEVNULL,
"stdout": subprocess.DEVNULL,
"stderr": subprocess.DEVNULL,
"cwd": str(_repo_root()),
}
if platform.system() == "Windows":
flags = 0
try:
flags |= int(getattr(subprocess, "DETACHED_PROCESS", 0x00000008))
except Exception:
flags |= 0x00000008
try:
flags |= int(getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000))
except Exception:
flags |= 0x08000000
popen_kwargs["creationflags"] = int(flags)
try:
si = subprocess.STARTUPINFO()
si.dwFlags |= int(
getattr(subprocess,
"STARTF_USESHOWWINDOW",
0x00000001)
)
si.wShowWindow = subprocess.SW_HIDE
popen_kwargs["startupinfo"] = si
except Exception:
pass
else:
popen_kwargs["start_new_session"] = True
try:
proc = subprocess.Popen(cmd, **popen_kwargs)
except Exception as exc:
return {
"success": False,
"stdout": "",
"stderr": "",
"error":
f"Failed to spawn detached pipeline: {type(exc).__name__}: {exc}",
"table": None,
}
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": None,
"pid": int(getattr(proc,
"pid",
0) or 0),
}
# Provide store backend choices using the dynamic registered store registry only.
if op_name in {"store-choices",
"store_choices",
"get-store-choices",
"get_store_choices"}:
cached_choices = _get_cached_store_choices()
refresh = False
if isinstance(data, dict):
refresh = bool(data.get("refresh") or data.get("reload"))
if cached_choices and not refresh:
debug(f"[store-choices] using cached choices={len(cached_choices)}")
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": None,
"choices": cached_choices,
}
try:
config_root = _runtime_config_root()
choices = _load_store_choices_from_config(force_reload=refresh)
if not choices and cached_choices:
choices = cached_choices
debug(
f"[store-choices] config returned empty; falling back to cached choices={len(choices)}"
)
if choices:
choices = _set_cached_store_choices(choices)
debug(f"[store-choices] config_dir={config_root} choices={len(choices)}")
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": None,
"choices": choices,
}
except Exception as exc:
if cached_choices:
debug(
f"[store-choices] refresh failed; returning cached choices={len(cached_choices)} error={type(exc).__name__}: {exc}"
)
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": None,
"choices": cached_choices,
}
return {
"success": False,
"stdout": "",
"stderr": "",
"error": f"store-choices failed: {type(exc).__name__}: {exc}",
"table": None,
"choices": [],
}
if op_name in {"url-exists",
"url_exists",
"find-url",
"find_url"}:
try:
from Store import Store # noqa: WPS433
cfg = load_config() or {}
storage = Store(config=cfg, suppress_debug=True)
raw_needles: list[str] = []
if isinstance(data, dict):
maybe_needles = data.get("needles")
if isinstance(maybe_needles, (list, tuple, set)):
for item in maybe_needles:
text = str(item or "").strip()
if text and text not in raw_needles:
raw_needles.append(text)
elif isinstance(maybe_needles, str):
text = maybe_needles.strip()
if text:
raw_needles.append(text)
if not raw_needles:
text = str(data.get("url") or "").strip()
if text:
raw_needles.append(text)
if not raw_needles:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": "Missing url",
"table": None,
"data": [],
}
matches: list[dict[str, Any]] = []
seen_keys: set[str] = set()
for backend_name in storage.list_backends() or []:
try:
backend = storage[backend_name]
except Exception:
continue
search_fn = getattr(backend, "search", None)
if not callable(search_fn):
continue
for needle in raw_needles:
query = f"url:{needle}"
try:
results = backend.search(
query,
limit=1,
minimal=True,
url_only=True,
) or []
except Exception:
continue
for item in results:
if hasattr(item, "to_dict") and callable(getattr(item, "to_dict")):
try:
item = item.to_dict()
except Exception:
item = {"title": str(item)}
elif not isinstance(item, dict):
item = {"title": str(item)}
payload = dict(item)
payload.setdefault("store", str(backend_name))
payload.setdefault("needle", str(needle))
key = str(payload.get("hash") or payload.get("url") or payload.get("title") or needle).strip().lower()
if key in seen_keys:
continue
seen_keys.add(key)
matches.append(payload)
if matches:
break
if matches:
break
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": None,
"data": matches,
}
except Exception as exc:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": f"url-exists failed: {type(exc).__name__}: {exc}",
"table": None,
"data": [],
}
# Provide yt-dlp format list for a URL (for MPV "Change format" menu).
# Returns a ResultTable-like payload so the Lua UI can render without running cmdlets.
if op_name in {"ytdlp-formats",
"ytdlp_formats",
"ytdl-formats",
"ytdl_formats"}:
try:
url = None
if isinstance(data, dict):
url = data.get("url")
url = str(url or "").strip()
if not url:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": "Missing url",
"table": None,
}
try:
from tool.ytdlp import list_formats, is_browseable_format # noqa: WPS433
except Exception as exc:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": f"yt-dlp tool unavailable: {type(exc).__name__}: {exc}",
"table": None,
}
cookiefile = None
try:
from tool.ytdlp import YtDlpTool # noqa: WPS433
cfg = load_config() or {}
cookie_path = YtDlpTool(cfg).resolve_cookiefile()
if cookie_path is not None:
cookiefile = str(cookie_path)
except Exception:
cookiefile = None
def _format_bytes(n: Any) -> str:
"""Format bytes using centralized utility."""
return format_bytes(n)
formats = list_formats(
url,
no_playlist=True,
cookiefile=cookiefile,
timeout_seconds=25,
)
if formats is None:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": "yt-dlp format probe failed or timed out",
"table": None,
}
if not formats:
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": {
"title": "Formats",
"rows": []
},
}
browseable = [f for f in formats if is_browseable_format(f)]
if browseable:
formats = browseable
# Debug: dump a short summary of the format list to the helper log.
try:
count = len(formats)
_append_helper_log(
f"[ytdlp-formats] extracted formats count={count} url={url}"
)
limit = 60
for i, f in enumerate(formats[:limit], start=1):
if not isinstance(f, dict):
continue
fid = str(f.get("format_id") or "")
ext = str(f.get("ext") or "")
note = f.get("format_note") or f.get("format") or ""
vcodec = str(f.get("vcodec") or "")
acodec = str(f.get("acodec") or "")
size = f.get("filesize") or f.get("filesize_approx")
res = str(f.get("resolution") or "")
if not res:
try:
w = f.get("width")
h = f.get("height")
if w and h:
res = f"{int(w)}x{int(h)}"
elif h:
res = f"{int(h)}p"
except Exception:
res = ""
_append_helper_log(
f"[ytdlp-format {i:02d}] id={fid} ext={ext} res={res} note={note} codecs={vcodec}/{acodec} size={size}"
)
if count > limit:
_append_helper_log(
f"[ytdlp-formats] (truncated; total={count})"
)
except Exception:
pass
rows = []
for fmt in formats:
if not isinstance(fmt, dict):
continue
format_id = str(fmt.get("format_id") or "").strip()
if not format_id:
continue
# Prefer human-ish resolution.
resolution = str(fmt.get("resolution") or "").strip()
if not resolution:
w = fmt.get("width")
h = fmt.get("height")
try:
if w and h:
resolution = f"{int(w)}x{int(h)}"
elif h:
resolution = f"{int(h)}p"
except Exception:
resolution = ""
ext = str(fmt.get("ext") or "").strip()
size = _format_bytes(fmt.get("filesize") or fmt.get("filesize_approx"))
vcodec = str(fmt.get("vcodec") or "none")
acodec = str(fmt.get("acodec") or "none")
selection_id = format_id
if vcodec != "none" and acodec == "none":
selection_id = f"{format_id}+ba"
# Build selection args compatible with MPV Lua picker.
# Use -format instead of -query so Lua can extract the ID easily.
selection_args = ["-format", selection_id]
rows.append(
{
"columns": [
{
"name": "ID",
"value": format_id
},
{
"name": "Resolution",
"value": resolution or ""
},
{
"name": "Ext",
"value": ext or ""
},
{
"name": "Size",
"value": size or ""
},
],
"selection_args":
selection_args,
}
)
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": {
"title": "Formats",
"rows": rows
},
}
except Exception as exc:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": f"{type(exc).__name__}: {exc}",
"table": None,
}
return {
"success": False,
"stdout": "",
"stderr": "",
"error": f"Unknown op: {op_name}",
"table": None,
}
def _append_helper_log(text: str) -> None:
"""Log to database instead of file. This provides unified logging with rest of system."""
payload = (text or "").rstrip()
if not payload:
return
_HELPER_LOG_BACKLOG.append(payload)
if len(_HELPER_LOG_BACKLOG) > _HELPER_LOG_BACKLOG_LIMIT:
del _HELPER_LOG_BACKLOG[:-_HELPER_LOG_BACKLOG_LIMIT]
try:
# Try database logging first (best practice: unified logging)
from SYS.database import log_to_db
log_to_db("INFO", "mpv", payload)
except Exception:
# Fallback to stderr if database unavailable
import sys
print(f"[mpv-helper] {payload}", file=sys.stderr)
emitter = _HELPER_MPV_LOG_EMITTER
if emitter is not None:
try:
emitter(payload)
except Exception:
pass
def _helper_log_path() -> str:
try:
log_dir = _repo_root() / "Log"
log_dir.mkdir(parents=True, exist_ok=True)
return str((log_dir / "medeia-mpv-helper.log").resolve())
except Exception:
tmp = tempfile.gettempdir()
return str((Path(tmp) / "medeia-mpv-helper.log").resolve())
def _acquire_ipc_lock(ipc_path: str) -> Optional[Any]:
"""Best-effort singleton lock per IPC path.
Multiple helpers subscribing to mpv log-message events causes duplicated
log output. Use a tiny file lock to ensure one helper per mpv instance.
"""
try:
safe = re.sub(r"[^a-zA-Z0-9_.-]+", "_", str(ipc_path or ""))
if not safe:
safe = "mpv"
# Keep lock files out of the repo's Log/ directory to avoid clutter.
lock_dir = Path(tempfile.gettempdir()) / "medeia-mpv-helper"
lock_dir.mkdir(parents=True, exist_ok=True)
lock_path = lock_dir / f"medeia-mpv-helper-{safe}.lock"
fh = open(lock_path, "a+", encoding="utf-8", errors="replace")
if os.name == "nt":
try:
import msvcrt # type: ignore
fh.seek(0)
msvcrt.locking(fh.fileno(), msvcrt.LK_NBLCK, 1)
except Exception:
try:
fh.close()
except Exception:
pass
return None
else:
try:
import fcntl # type: ignore
fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except Exception:
try:
fh.close()
except Exception:
pass
return None
return fh
except Exception:
return None
def _parse_request(data: Any) -> Optional[Dict[str, Any]]:
if data is None:
return None
if isinstance(data, str):
text = data.strip()
if not text:
return None
try:
obj = json.loads(text)
except Exception:
return None
return obj if isinstance(obj, dict) else None
if isinstance(data, dict):
return data
return None
def _start_request_poll_loop(
ipc_path: str,
stop_event: threading.Event,
handle_request: Callable[[Any, str], bool],
) -> threading.Thread:
"""Poll the request property on a separate IPC connection.
Windows named-pipe event delivery can stall even while direct get/set
property commands still work. Polling the request property provides a more
reliable fallback transport for helper ops and pipeline dispatch.
"""
def _poll_loop() -> None:
poll_client = MPVIPCClient(socket_path=ipc_path, timeout=0.75, silent=True)
while not stop_event.is_set():
try:
if poll_client.sock is None and not poll_client.connect():
stop_event.wait(0.10)
continue
resp = poll_client.send_command(["get_property", REQUEST_PROP])
if not resp:
try:
poll_client.disconnect()
except Exception:
pass
stop_event.wait(0.10)
continue
if resp.get("error") == "success":
handle_request(resp.get("data"), "poll")
stop_event.wait(0.05)
except Exception:
try:
poll_client.disconnect()
except Exception:
pass
stop_event.wait(0.15)
try:
poll_client.disconnect()
except Exception:
pass
thread = threading.Thread(
target=_poll_loop,
name="mpv-helper-request-poll",
daemon=True,
)
thread.start()
return thread
def main(argv: Optional[list[str]] = None) -> int:
parser = argparse.ArgumentParser(prog="mpv-pipeline-helper")
parser.add_argument("--ipc", required=True, help="mpv --input-ipc-server path")
parser.add_argument("--timeout", type=float, default=15.0)
args = parser.parse_args(argv)
# Load config once and configure logging similar to CLI.pipeline.
try:
cfg = load_config() or {}
except Exception:
cfg = {}
try:
debug_enabled = bool(isinstance(cfg, dict) and cfg.get("debug", False))
set_debug(debug_enabled)
if debug_enabled:
logging.basicConfig(
level=logging.DEBUG,
format="[%(name)s] %(levelname)s: %(message)s",
stream=sys.stderr,
)
for noisy in ("httpx",
"httpcore",
"httpcore.http11",
"httpcore.connection"):
try:
logging.getLogger(noisy).setLevel(logging.WARNING)
except Exception:
pass
except Exception:
pass
# Ensure all in-process cmdlets that talk to MPV pick up the exact IPC server
# path used by this helper (which comes from the running MPV instance).
os.environ["MEDEIA_MPV_IPC"] = str(args.ipc)
if platform.system() == "Windows":
try:
sibling_pids = [
pid
for pid in _windows_list_pipeline_helper_pids(str(args.ipc))
if pid and pid != os.getpid()
]
if sibling_pids:
_append_helper_log(
f"[helper] terminating older helper pids for ipc={args.ipc}: {','.join(str(pid) for pid in sibling_pids)}"
)
_windows_kill_pids(sibling_pids)
time.sleep(0.25)
except Exception as exc:
_append_helper_log(
f"[helper] failed to terminate older helpers: {type(exc).__name__}: {exc}"
)
# Ensure single helper instance per ipc.
_lock_fh = _acquire_ipc_lock(str(args.ipc))
if _lock_fh is None:
_append_helper_log(
f"[helper] another instance already holds lock for ipc={args.ipc}; exiting"
)
return 0
try:
_append_helper_log(
f"[helper] version={MEDEIA_MPV_HELPER_VERSION} started ipc={args.ipc}"
)
try:
_append_helper_log(
f"[helper] file={Path(__file__).resolve()} cwd={Path.cwd().resolve()}"
)
except Exception:
pass
try:
runtime_root = _runtime_config_root()
_append_helper_log(
f"[helper] config_root={runtime_root} exists={bool((runtime_root / 'config.conf').exists())}"
)
except Exception:
pass
debug(f"[mpv-helper] logging to: {_helper_log_path()}")
except Exception:
pass
# Route SYS.logger output into the helper log file so diagnostics are not
# lost in mpv's console/terminal output.
try:
class _HelperLogStream:
def __init__(self) -> None:
self._pending = ""
def write(self, s: str) -> int:
if not s:
return 0
text = self._pending + str(s)
lines = text.splitlines(keepends=True)
if lines and not lines[-1].endswith(("\n", "\r")):
self._pending = lines[-1]
lines = lines[:-1]
else:
self._pending = ""
for line in lines:
payload = line.rstrip("\r\n")
if payload:
_append_helper_log("[py] " + payload)
return len(s)
def flush(self) -> None:
if self._pending:
_append_helper_log("[py] " + self._pending.rstrip("\r\n"))
self._pending = ""
set_thread_stream(_HelperLogStream())
except Exception:
pass
# Prefer a stable repo-local log folder for discoverability.
error_log_dir = _repo_root() / "Log"
try:
error_log_dir.mkdir(parents=True, exist_ok=True)
except Exception:
error_log_dir = Path(tempfile.gettempdir())
last_error_log = error_log_dir / "medeia-mpv-pipeline-last-error.log"
seen_request_ids: Dict[str, float] = {}
seen_request_ids_lock = threading.Lock()
seen_request_ttl_seconds = 180.0
request_processing_lock = threading.Lock()
command_client_lock = threading.Lock()
_send_helper_command = lambda _command, _label='': False
_publish_store_choices_cached_property = lambda _choices: None
def _write_error_log(text: str, *, req_id: str) -> Optional[str]:
try:
error_log_dir.mkdir(parents=True, exist_ok=True)
except Exception:
pass
payload = (text or "").strip()
if not payload:
return None
stamped = error_log_dir / f"medeia-mpv-pipeline-error-{req_id}.log"
try:
stamped.write_text(payload, encoding="utf-8", errors="replace")
except Exception:
stamped = None
try:
last_error_log.write_text(payload, encoding="utf-8", errors="replace")
except Exception:
pass
return str(stamped) if stamped else str(last_error_log)
def _mark_request_seen(req_id: str) -> bool:
if not req_id:
return False
now = time.time()
cutoff = now - seen_request_ttl_seconds
with seen_request_ids_lock:
expired = [key for key, ts in seen_request_ids.items() if ts < cutoff]
for key in expired:
seen_request_ids.pop(key, None)
if req_id in seen_request_ids:
return False
seen_request_ids[req_id] = now
return True
def _publish_response(resp: Dict[str, Any]) -> None:
req_id = str(resp.get("id") or "")
ok = _send_helper_command(
[
"set_property_string",
RESPONSE_PROP,
json.dumps(resp, ensure_ascii=False),
],
f"response:{req_id or 'unknown'}",
)
if ok:
_append_helper_log(
f"[response {req_id or '?'}] published success={bool(resp.get('success'))}"
)
else:
_append_helper_log(
f"[response {req_id or '?'}] publish failed success={bool(resp.get('success'))}"
)
def _process_request(raw: Any, source: str) -> bool:
req = _parse_request(raw)
if not req:
try:
if isinstance(raw, str) and raw.strip():
snippet = raw.strip().replace("\r", "").replace("\n", " ")
if len(snippet) > 220:
snippet = snippet[:220] + ""
_append_helper_log(
f"[request-raw {source}] could not parse request json: {snippet}"
)
except Exception:
pass
return False
req_id = str(req.get("id") or "")
op = str(req.get("op") or "").strip()
data = req.get("data")
pipeline_text = str(req.get("pipeline") or "").strip()
seeds = req.get("seeds")
json_output = bool(req.get("json") or req.get("output_json"))
if not req_id:
return False
if not _mark_request_seen(req_id):
return False
try:
label = pipeline_text if pipeline_text else (op and ("op=" + op) or "(empty)")
_append_helper_log(f"\n[request {req_id} via {source}] {label}")
except Exception:
pass
with request_processing_lock:
async_dispatch = False
try:
if op:
run = _run_op(op, data)
else:
if not pipeline_text:
return False
if _is_load_url_pipeline(pipeline_text):
async_dispatch = True
run = {
"success": True,
"stdout": "",
"stderr": "",
"error": "",
"table": None,
}
_run_pipeline_background(
pipeline_text,
seeds=seeds,
req_id=req_id,
)
else:
run = _run_pipeline(
pipeline_text,
seeds=seeds,
json_output=json_output,
)
resp = {
"id": req_id,
"success": bool(run.get("success")),
"stdout": run.get("stdout", ""),
"stderr": run.get("stderr", ""),
"error": run.get("error"),
"table": run.get("table"),
"data": run.get("data"),
}
if "choices" in run:
resp["choices"] = run.get("choices")
if "job_id" in run:
resp["job_id"] = run.get("job_id")
if "job" in run:
resp["job"] = run.get("job")
if "status" in run:
resp["status"] = run.get("status")
if "pid" in run:
resp["pid"] = run.get("pid")
if async_dispatch:
resp["info"] = "queued asynchronously"
except Exception as exc:
resp = {
"id": req_id,
"success": False,
"stdout": "",
"stderr": "",
"error": f"{type(exc).__name__}: {exc}",
"table": None,
}
try:
if op:
extra = ""
if isinstance(resp.get("choices"), list):
extra = f" choices={len(resp.get('choices') or [])}"
_append_helper_log(
f"[request {req_id}] op-finished success={bool(resp.get('success'))}{extra}"
)
except Exception:
pass
try:
if resp.get("stdout"):
_append_helper_log("[stdout]\n" + str(resp.get("stdout")))
if resp.get("stderr"):
_append_helper_log("[stderr]\n" + str(resp.get("stderr")))
if resp.get("error"):
_append_helper_log("[error]\n" + str(resp.get("error")))
except Exception:
pass
if not resp.get("success"):
details = ""
if resp.get("error"):
details += str(resp.get("error"))
if resp.get("stderr"):
details = (details + "\n" if details else "") + str(resp.get("stderr"))
log_path = _write_error_log(details, req_id=req_id)
if log_path:
resp["log_path"] = log_path
try:
_publish_response(resp)
except Exception:
pass
if resp.get("success") and isinstance(resp.get("choices"), list):
try:
_publish_store_choices_cached_property(resp.get("choices"))
except Exception:
pass
return True
# Connect to mpv's JSON IPC. On Windows, the pipe can exist but reject opens
# briefly during startup; also mpv may create the IPC server slightly after
# the Lua script launches us. Retry until timeout.
connect_deadline = time.time() + max(0.5, float(args.timeout))
last_connect_error: Optional[str] = None
client = MPVIPCClient(socket_path=args.ipc, timeout=0.5, silent=True)
while True:
try:
if client.connect():
break
except Exception as exc:
last_connect_error = f"{type(exc).__name__}: {exc}"
if time.time() > connect_deadline:
_append_helper_log(
f"[helper] failed to connect ipc={args.ipc} error={last_connect_error or 'timeout'}"
)
return 2
# Keep trying.
time.sleep(0.10)
command_client = MPVIPCClient(socket_path=str(args.ipc), timeout=0.75, silent=True)
def _send_helper_command(command: Any, label: str = "") -> bool:
with command_client_lock:
try:
if command_client.sock is None and not command_client.connect():
_append_helper_log(
f"[helper-ipc] connect failed label={label or '?'}"
)
return False
rid = command_client.send_command_no_wait(command)
if rid is None:
_append_helper_log(
f"[helper-ipc] send failed label={label or '?'}"
)
try:
command_client.disconnect()
except Exception:
pass
return False
return True
except Exception as exc:
_append_helper_log(
f"[helper-ipc] exception label={label or '?'} error={type(exc).__name__}: {exc}"
)
try:
command_client.disconnect()
except Exception:
pass
return False
def _emit_helper_log_to_mpv(payload: str) -> None:
safe = str(payload or "").replace("\r", " ").replace("\n", " ").strip()
if not safe:
return
if len(safe) > 900:
safe = safe[:900] + "..."
try:
client.send_command_no_wait(["print-text", f"medeia-helper: {safe}"])
except Exception:
return
global _HELPER_MPV_LOG_EMITTER
_HELPER_MPV_LOG_EMITTER = _emit_helper_log_to_mpv
for backlog_line in list(_HELPER_LOG_BACKLOG):
try:
_emit_helper_log_to_mpv(backlog_line)
except Exception:
break
# Mark ready ASAP and keep it fresh.
# Use a unix timestamp so the Lua side can treat it as a heartbeat.
last_ready_ts: float = 0.0
def _touch_ready() -> None:
nonlocal last_ready_ts
now = time.time()
# Throttle updates to reduce IPC chatter.
if (now - last_ready_ts) < 0.75:
return
try:
_send_helper_command(
["set_property_string", READY_PROP, str(int(now))],
"ready-heartbeat",
)
last_ready_ts = now
except Exception:
return
def _publish_store_choices_cached_property(choices: Any) -> None:
payload = _store_choices_payload(choices)
if not payload:
return
_send_helper_command(
[
"set_property_string",
"user-data/medeia-store-choices-cached",
payload,
],
"store-choices-cache",
)
def _publish_helper_version() -> None:
if _send_helper_command(
["set_property_string", VERSION_PROP, MEDEIA_MPV_HELPER_VERSION],
"helper-version",
):
_append_helper_log(
f"[helper] published helper version {MEDEIA_MPV_HELPER_VERSION}"
)
# Mirror mpv's own log messages into our helper log file so debugging does
# not depend on the mpv on-screen console or mpv's log-file.
try:
# IMPORTANT: mpv debug logs can be extremely chatty (especially ytdl_hook)
# and can starve request handling. Default to warn unless explicitly overridden.
level = os.environ.get("MEDEIA_MPV_HELPER_MPVLOG", "").strip() or "warn"
client.send_command_no_wait(["request_log_messages", level])
_append_helper_log(f"[helper] requested mpv log messages level={level}")
except Exception:
pass
# De-dup/throttle mpv log-message lines (mpv and yt-dlp can be very chatty).
last_mpv_line: Optional[str] = None
last_mpv_count: int = 0
last_mpv_ts: float = 0.0
def _flush_mpv_repeat() -> None:
nonlocal last_mpv_line, last_mpv_count
if last_mpv_line and last_mpv_count > 1:
_append_helper_log(f"[mpv] (previous line repeated {last_mpv_count}x)")
last_mpv_line = None
last_mpv_count = 0
# Observe request property changes.
try:
client.send_command_no_wait(
["observe_property",
OBS_ID_REQUEST,
REQUEST_PROP,
"string"]
)
except Exception:
return 3
# Mark ready only after the observer is installed to avoid races where Lua
# sends a request before we can receive property-change notifications.
try:
_touch_ready()
_publish_helper_version()
_append_helper_log(f"[helper] ready heartbeat armed prop={READY_PROP}")
except Exception:
pass
heartbeat_stop = threading.Event()
_start_ready_heartbeat(str(args.ipc), heartbeat_stop)
_start_request_poll_loop(str(args.ipc), heartbeat_stop, _process_request)
# Pre-compute store choices at startup and publish to a cached property so Lua
# can read immediately without waiting for a request/response cycle (which may timeout).
try:
startup_choices_payload = _run_op("store-choices", None)
startup_choices = (
startup_choices_payload.get("choices")
if isinstance(startup_choices_payload,
dict) else None
)
if isinstance(startup_choices, list):
startup_choices = _set_cached_store_choices(startup_choices)
preview = ", ".join(str(x) for x in startup_choices[:50])
_append_helper_log(
f"[helper] startup store-choices count={len(startup_choices)} items={preview}"
)
# Publish to a cached property for Lua to read without IPC request.
try:
_publish_store_choices_cached_property(startup_choices)
_append_helper_log(
"[helper] published store-choices to user-data/medeia-store-choices-cached"
)
except Exception as exc:
_append_helper_log(
f"[helper] failed to publish store-choices: {type(exc).__name__}: {exc}"
)
else:
_append_helper_log("[helper] startup store-choices unavailable")
except Exception as exc:
_append_helper_log(
f"[helper] startup store-choices failed: {type(exc).__name__}: {exc}"
)
# Also publish config temp directory if available
try:
cfg = load_config()
temp_dir = cfg.get("temp", "").strip() or os.getenv("TEMP") or "/tmp"
if temp_dir:
_send_helper_command(
["set_property_string",
"user-data/medeia-config-temp",
temp_dir],
"config-temp",
)
_append_helper_log(
f"[helper] published config temp to user-data/medeia-config-temp={temp_dir}"
)
except Exception as exc:
_append_helper_log(
f"[helper] failed to publish config temp: {type(exc).__name__}: {exc}"
)
# Publish yt-dlp supported domains for Lua menu filtering
try:
from tool.ytdlp import _build_supported_domains
domains = sorted(list(_build_supported_domains()))
if domains:
# We join them into a space-separated string for Lua to parse easily
domains_str = " ".join(domains)
_send_helper_command(
[
"set_property_string",
"user-data/medeia-ytdlp-domains-cached",
domains_str
],
"ytdlp-domains",
)
_append_helper_log(
f"[helper] published {len(domains)} ytdlp domains for Lua menu filtering"
)
except Exception as exc:
_append_helper_log(
f"[helper] failed to publish ytdlp domains: {type(exc).__name__}: {exc}"
)
try:
_append_helper_log(f"[helper] connected to ipc={args.ipc}")
except Exception:
pass
while True:
msg = client.read_message(timeout=0.25)
if msg is None:
# Keep READY fresh even when idle (Lua may clear it on timeouts).
_touch_ready()
time.sleep(0.02)
continue
if msg.get("event") == "__eof__":
try:
_flush_mpv_repeat()
except Exception:
pass
heartbeat_stop.set()
return 0
if msg.get("event") == "log-message":
try:
level = str(msg.get("level") or "")
prefix = str(msg.get("prefix") or "")
text = str(msg.get("text") or "").rstrip()
if not text:
continue
# Filter excessive noise unless debug is enabled.
if not debug_enabled:
lower_prefix = prefix.lower()
if "quic" in lower_prefix and "DEBUG:" in text:
continue
# Suppress progress-bar style lines (keep true errors).
if ("ETA" in text or "%" in text) and ("ERROR:" not in text
and "WARNING:" not in text):
# Typical yt-dlp progress bar line.
if text.lstrip().startswith("["):
continue
line = f"[mpv {level}] {prefix} {text}".strip()
now = time.time()
if last_mpv_line == line and (now - last_mpv_ts) < 2.0:
last_mpv_count += 1
last_mpv_ts = now
continue
_flush_mpv_repeat()
last_mpv_line = line
last_mpv_count = 1
last_mpv_ts = now
_append_helper_log(line)
except Exception:
pass
continue
if msg.get("event") != "property-change":
continue
if msg.get("id") != OBS_ID_REQUEST:
continue
_process_request(msg.get("data"), "observe")
if __name__ == "__main__":
raise SystemExit(main())