diff --git a/MPV/LUA/main.lua b/MPV/LUA/main.lua index 58f810b..16a1b65 100644 --- a/MPV/LUA/main.lua +++ b/MPV/LUA/main.lua @@ -858,6 +858,15 @@ local _helper_ready_last_seen_ts = 0 local HELPER_READY_STALE_SECONDS = 10.0 local function _is_pipeline_helper_ready() + local helper_version = mp.get_property('user-data/medeia-pipeline-helper-version') + if helper_version == nil or helper_version == '' then + helper_version = mp.get_property_native('user-data/medeia-pipeline-helper-version') + end + helper_version = tostring(helper_version or '') + if helper_version ~= '2026-03-22.2' then + return false + end + local ready = mp.get_property(PIPELINE_READY_PROP) if ready == nil or ready == '' then ready = mp.get_property_native(PIPELINE_READY_PROP) @@ -910,12 +919,18 @@ local function _helper_ready_diagnostics() if ready == nil or ready == '' then ready = mp.get_property_native(PIPELINE_READY_PROP) end + local helper_version = mp.get_property('user-data/medeia-pipeline-helper-version') + if helper_version == nil or helper_version == '' then + helper_version = mp.get_property_native('user-data/medeia-pipeline-helper-version') + end local now = mp.get_time() or 0 local age = 'n/a' if _helper_ready_last_seen_ts > 0 then age = string.format('%.2fs', math.max(0, now - _helper_ready_last_seen_ts)) end return 'ready=' .. tostring(ready or '') + .. ' helper_version=' .. tostring(helper_version or '') + .. ' required_version=2026-03-22.2' .. ' last_value=' .. tostring(_helper_ready_last_value or '') .. ' last_seen_age=' .. tostring(age) end @@ -975,6 +990,7 @@ local function attempt_start_pipeline_helper_async(callback) -- Clear any stale ready heartbeat from an earlier helper instance before spawning. pcall(mp.set_property, PIPELINE_READY_PROP, '') + pcall(mp.set_property, 'user-data/medeia-pipeline-helper-version', '') _helper_ready_last_value = '' _helper_ready_last_seen_ts = 0 diff --git a/MPV/pipeline_helper.py b/MPV/pipeline_helper.py index aa2bfaa..f1432d2 100644 --- a/MPV/pipeline_helper.py +++ b/MPV/pipeline_helper.py @@ -20,7 +20,7 @@ This helper is intentionally minimal: one request at a time, last-write-wins. from __future__ import annotations -MEDEIA_MPV_HELPER_VERSION = "2026-03-19.1" +MEDEIA_MPV_HELPER_VERSION = "2026-03-22.2" import argparse import json @@ -62,7 +62,7 @@ _ROOT = str(_repo_root()) if _ROOT not in sys.path: sys.path.insert(0, _ROOT) -from MPV.mpv_ipc import MPVIPCClient # noqa: E402 +from MPV.mpv_ipc import MPVIPCClient, _windows_kill_pids, _windows_hidden_subprocess_kwargs # noqa: E402 from SYS.config import load_config, reload_config # noqa: E402 from SYS.logger import set_debug, debug, set_thread_stream # noqa: E402 from SYS.repl_queue import enqueue_repl_command # noqa: E402 @@ -71,6 +71,7 @@ from SYS.utils import format_bytes # noqa: E402 REQUEST_PROP = "user-data/medeia-pipeline-request" RESPONSE_PROP = "user-data/medeia-pipeline-response" READY_PROP = "user-data/medeia-pipeline-ready" +VERSION_PROP = "user-data/medeia-pipeline-helper-version" OBS_ID_REQUEST = 1001 @@ -113,12 +114,11 @@ def _set_cached_store_choices(choices: Any) -> list[str]: return list(_STORE_CHOICES_CACHE) -def _publish_store_choices_cache(ipc_path: str, choices: Any) -> None: +def _store_choices_payload(choices: Any) -> Optional[str]: cached = _normalize_store_choices(choices) - if not ipc_path or not cached: - return - - payload = json.dumps( + if not cached: + return None + return json.dumps( { "success": True, "choices": cached, @@ -126,21 +126,6 @@ def _publish_store_choices_cache(ipc_path: str, choices: Any) -> None: ensure_ascii=False, ) - client = MPVIPCClient(socket_path=ipc_path, timeout=0.75, silent=True) - try: - client.send_command_no_wait( - [ - "set_property_string", - "user-data/medeia-store-choices-cached", - payload, - ] - ) - finally: - try: - client.disconnect() - except Exception: - pass - def _load_store_choices_from_config(*, force_reload: bool = False) -> list[str]: from Store.registry import list_configured_backend_names # noqa: WPS433 @@ -243,6 +228,56 @@ def _start_ready_heartbeat(ipc_path: str, stop_event: threading.Event) -> thread return thread +def _windows_list_pipeline_helper_pids(ipc_path: str) -> list[int]: + if platform.system() != "Windows": + return [] + try: + ipc_path = str(ipc_path or "") + except Exception: + ipc_path = "" + if not ipc_path: + return [] + + ps_script = ( + "$ipc = " + json.dumps(ipc_path) + "; " + "Get-CimInstance Win32_Process | " + "Where-Object { $_.CommandLine -and (($_.CommandLine -match 'pipeline_helper\\.py') -or ($_.CommandLine -match ' -m\\s+MPV\\.pipeline_helper(\\s|$)')) -and $_.CommandLine -match ('--ipc\\s+' + [regex]::Escape($ipc)) } | " + "Select-Object -ExpandProperty ProcessId | ConvertTo-Json -Compress" + ) + + try: + out = subprocess.check_output( + ["powershell", "-NoProfile", "-Command", ps_script], + stdin=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + timeout=3, + text=True, + **_windows_hidden_subprocess_kwargs(), + ) + except Exception: + return [] + + txt = (out or "").strip() + if not txt or txt == "null": + return [] + + try: + obj = json.loads(txt) + except Exception: + return [] + + raw_pids = obj if isinstance(obj, list) else [obj] + out_pids: list[int] = [] + for value in raw_pids: + try: + pid = int(value) + except Exception: + continue + if pid > 0 and pid not in out_pids: + out_pids.append(pid) + return out_pids + + def _run_pipeline( pipeline_text: str, *, @@ -705,10 +740,6 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]: refresh = bool(data.get("refresh") or data.get("reload")) if cached_choices and not refresh: - try: - _publish_store_choices_cache(os.environ.get("MEDEIA_MPV_IPC", ""), cached_choices) - except Exception: - pass debug(f"[store-choices] using cached choices={len(cached_choices)}") return { "success": True, @@ -731,10 +762,6 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]: if choices: choices = _set_cached_store_choices(choices) - try: - _publish_store_choices_cache(os.environ.get("MEDEIA_MPV_IPC", ""), choices) - except Exception: - pass debug(f"[store-choices] config_dir={config_root} choices={len(choices)}") @@ -751,10 +778,6 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]: debug( f"[store-choices] refresh failed; returning cached choices={len(cached_choices)} error={type(exc).__name__}: {exc}" ) - try: - _publish_store_choices_cache(os.environ.get("MEDEIA_MPV_IPC", ""), cached_choices) - except Exception: - pass return { "success": True, "stdout": "", @@ -1270,6 +1293,24 @@ def main(argv: Optional[list[str]] = None) -> int: # path used by this helper (which comes from the running MPV instance). os.environ["MEDEIA_MPV_IPC"] = str(args.ipc) + if platform.system() == "Windows": + try: + sibling_pids = [ + pid + for pid in _windows_list_pipeline_helper_pids(str(args.ipc)) + if pid and pid != os.getpid() + ] + if sibling_pids: + _append_helper_log( + f"[helper] terminating older helper pids for ipc={args.ipc}: {','.join(str(pid) for pid in sibling_pids)}" + ) + _windows_kill_pids(sibling_pids) + time.sleep(0.25) + except Exception as exc: + _append_helper_log( + f"[helper] failed to terminate older helpers: {type(exc).__name__}: {exc}" + ) + # Ensure single helper instance per ipc. _lock_fh = _acquire_ipc_lock(str(args.ipc)) if _lock_fh is None: @@ -1344,6 +1385,9 @@ def main(argv: Optional[list[str]] = None) -> int: seen_request_ids_lock = threading.Lock() seen_request_ttl_seconds = 180.0 request_processing_lock = threading.Lock() + command_client_lock = threading.Lock() + _send_helper_command = lambda _command, _label='': False + _publish_store_choices_cached_property = lambda _choices: None def _write_error_log(text: str, *, req_id: str) -> Optional[str]: try: @@ -1383,20 +1427,23 @@ def main(argv: Optional[list[str]] = None) -> int: return True def _publish_response(resp: Dict[str, Any]) -> None: - response_client = MPVIPCClient(socket_path=str(args.ipc), timeout=0.75, silent=True) - try: - response_client.send_command_no_wait( - [ - "set_property_string", - RESPONSE_PROP, - json.dumps(resp, ensure_ascii=False), - ] + req_id = str(resp.get("id") or "") + ok = _send_helper_command( + [ + "set_property_string", + RESPONSE_PROP, + json.dumps(resp, ensure_ascii=False), + ], + f"response:{req_id or 'unknown'}", + ) + if ok: + _append_helper_log( + f"[response {req_id or '?'}] published success={bool(resp.get('success'))}" + ) + else: + _append_helper_log( + f"[response {req_id or '?'}] publish failed success={bool(resp.get('success'))}" ) - finally: - try: - response_client.disconnect() - except Exception: - pass def _process_request(raw: Any, source: str) -> bool: req = _parse_request(raw) @@ -1492,6 +1539,17 @@ def main(argv: Optional[list[str]] = None) -> int: "table": None, } + try: + if op: + extra = "" + if isinstance(resp.get("choices"), list): + extra = f" choices={len(resp.get('choices') or [])}" + _append_helper_log( + f"[request {req_id}] op-finished success={bool(resp.get('success'))}{extra}" + ) + except Exception: + pass + try: if resp.get("stdout"): _append_helper_log("[stdout]\n" + str(resp.get("stdout"))) @@ -1517,6 +1575,12 @@ def main(argv: Optional[list[str]] = None) -> int: except Exception: pass + if resp.get("success") and isinstance(resp.get("choices"), list): + try: + _publish_store_choices_cached_property(resp.get("choices")) + except Exception: + pass + return True # Connect to mpv's JSON IPC. On Windows, the pipe can exist but reject opens @@ -1542,6 +1606,37 @@ def main(argv: Optional[list[str]] = None) -> int: # Keep trying. time.sleep(0.10) + command_client = MPVIPCClient(socket_path=str(args.ipc), timeout=0.75, silent=True) + + def _send_helper_command(command: Any, label: str = "") -> bool: + with command_client_lock: + try: + if command_client.sock is None and not command_client.connect(): + _append_helper_log( + f"[helper-ipc] connect failed label={label or '?'}" + ) + return False + rid = command_client.send_command_no_wait(command) + if rid is None: + _append_helper_log( + f"[helper-ipc] send failed label={label or '?'}" + ) + try: + command_client.disconnect() + except Exception: + pass + return False + return True + except Exception as exc: + _append_helper_log( + f"[helper-ipc] exception label={label or '?'} error={type(exc).__name__}: {exc}" + ) + try: + command_client.disconnect() + except Exception: + pass + return False + def _emit_helper_log_to_mpv(payload: str) -> None: safe = str(payload or "").replace("\r", " ").replace("\n", " ").strip() if not safe: @@ -1572,15 +1667,36 @@ def main(argv: Optional[list[str]] = None) -> int: if (now - last_ready_ts) < 0.75: return try: - client.send_command_no_wait( - ["set_property_string", - READY_PROP, - str(int(now))] + _send_helper_command( + ["set_property_string", READY_PROP, str(int(now))], + "ready-heartbeat", ) last_ready_ts = now except Exception: return + def _publish_store_choices_cached_property(choices: Any) -> None: + payload = _store_choices_payload(choices) + if not payload: + return + _send_helper_command( + [ + "set_property_string", + "user-data/medeia-store-choices-cached", + payload, + ], + "store-choices-cache", + ) + + def _publish_helper_version() -> None: + if _send_helper_command( + ["set_property_string", VERSION_PROP, MEDEIA_MPV_HELPER_VERSION], + "helper-version", + ): + _append_helper_log( + f"[helper] published helper version {MEDEIA_MPV_HELPER_VERSION}" + ) + # Mirror mpv's own log messages into our helper log file so debugging does # not depend on the mpv on-screen console or mpv's log-file. try: @@ -1619,6 +1735,7 @@ def main(argv: Optional[list[str]] = None) -> int: # sends a request before we can receive property-change notifications. try: _touch_ready() + _publish_helper_version() _append_helper_log(f"[helper] ready heartbeat armed prop={READY_PROP}") except Exception: pass @@ -1645,7 +1762,7 @@ def main(argv: Optional[list[str]] = None) -> int: # Publish to a cached property for Lua to read without IPC request. try: - _publish_store_choices_cache(str(args.ipc), startup_choices) + _publish_store_choices_cached_property(startup_choices) _append_helper_log( "[helper] published store-choices to user-data/medeia-store-choices-cached" ) @@ -1665,10 +1782,11 @@ def main(argv: Optional[list[str]] = None) -> int: cfg = load_config() temp_dir = cfg.get("temp", "").strip() or os.getenv("TEMP") or "/tmp" if temp_dir: - client.send_command_no_wait( + _send_helper_command( ["set_property_string", "user-data/medeia-config-temp", - temp_dir] + temp_dir], + "config-temp", ) _append_helper_log( f"[helper] published config temp to user-data/medeia-config-temp={temp_dir}" @@ -1685,12 +1803,13 @@ def main(argv: Optional[list[str]] = None) -> int: if domains: # We join them into a space-separated string for Lua to parse easily domains_str = " ".join(domains) - client.send_command_no_wait( + _send_helper_command( [ "set_property_string", "user-data/medeia-ytdlp-domains-cached", domains_str - ] + ], + "ytdlp-domains", ) _append_helper_log( f"[helper] published {len(domains)} ytdlp domains for Lua menu filtering"