From 5cbc2c09df04551b7fdad7d2eaabaf4e113e4f41 Mon Sep 17 00:00:00 2001 From: Nose Date: Wed, 18 Mar 2026 20:17:28 -0700 Subject: [PATCH] added repl injection --- API/data/alldebrid.json | 4 +- CLI.py | 95 ++- MPV/LUA/main.lua | 277 +++++++-- MPV/pipeline_helper.py | 609 +++++++++++++++----- MPV/portable_config/script-opts/medeia.conf | 2 +- SYS/pipeline.py | 29 +- SYS/repl_queue.py | 64 ++ 7 files changed, 896 insertions(+), 184 deletions(-) create mode 100644 SYS/repl_queue.py diff --git a/API/data/alldebrid.json b/API/data/alldebrid.json index 52119c2..d929bf4 100644 --- a/API/data/alldebrid.json +++ b/API/data/alldebrid.json @@ -92,7 +92,7 @@ "(hitfile\\.net/[a-z0-9A-Z]{4,9})" ], "regexp": "(hitf\\.(to|cc)/([a-z0-9A-Z]{4,9}))|(htfl\\.(net|to|cc)/([a-z0-9A-Z]{4,9}))|(hitfile\\.(net)/download/free/([a-z0-9A-Z]{4,9}))|((hitfile\\.net/[a-z0-9A-Z]{4,9}))", - "status": false + "status": true }, "mega": { "name": "mega", @@ -494,7 +494,7 @@ "mediafire\\.com/(\\?|download/|file/|download\\.php\\?)([0-9a-z]{15})" ], "regexp": "mediafire\\.com/(\\?|download/|file/|download\\.php\\?)([0-9a-z]{15})", - "status": true + "status": false }, "mixdrop": { "name": "mixdrop", diff --git a/CLI.py b/CLI.py index 2fb90ab..cbb5171 100644 --- a/CLI.py +++ b/CLI.py @@ -78,6 +78,7 @@ def _install_rich_traceback(*, show_locals: bool = False) -> None: _install_rich_traceback(show_locals=False) from SYS.logger import debug, set_debug +from SYS.repl_queue import pop_repl_commands from SYS.worker_manager import WorkerManager from SYS.cmdlet_catalog import ( @@ -2155,11 +2156,101 @@ Come to love it when others take what you share, as there is no greater joy refresh_interval=0.5, ) + queued_inputs: List[Dict[str, Any]] = [] + queued_inputs_lock = threading.Lock() + repl_queue_stop = threading.Event() + + def _drain_repl_queue() -> None: + try: + pending = pop_repl_commands(self.ROOT, limit=8) + except Exception: + pending = [] + if not pending: + return + with queued_inputs_lock: + queued_inputs.extend(pending) + + def _inject_repl_command(payload: Dict[str, Any]) -> bool: + nonlocal session + command_text = str(payload.get("command") or "").strip() + source_text = str(payload.get("source") or "external").strip() or "external" + if not command_text or session is None: + return False + + update_toolbar(f"queued from {source_text}: {command_text[:96]}") + app = getattr(session, "app", None) + if app is None or not getattr(app, "is_running", False): + return False + + injected = False + + def _apply() -> None: + nonlocal injected + try: + buffer = getattr(session, "default_buffer", None) + if buffer is not None: + buffer.document = Document(text=command_text, cursor_position=len(command_text)) + try: + buffer.validate_and_handle() + injected = True + return + except Exception: + pass + app.exit(result=command_text) + injected = True + except Exception: + injected = False + + try: + loop = getattr(app, "loop", None) + if loop is not None and hasattr(loop, "call_soon_threadsafe"): + loop.call_soon_threadsafe(_apply) + return True + except Exception: + pass + + try: + _apply() + except Exception: + injected = False + return injected + + def _queue_poll_loop() -> None: + while not repl_queue_stop.is_set(): + _drain_repl_queue() + with queued_inputs_lock: + next_payload = queued_inputs[0] if queued_inputs else None + if next_payload and _inject_repl_command(next_payload): + with queued_inputs_lock: + if queued_inputs and queued_inputs[0] is next_payload: + queued_inputs.pop(0) + repl_queue_stop.wait(0.25) + + _drain_repl_queue() + repl_queue_thread = threading.Thread( + target=_queue_poll_loop, + name="medeia-repl-queue", + daemon=True, + ) + repl_queue_thread.start() + while True: try: - user_input = session.prompt(prompt_text).strip() + with queued_inputs_lock: + queued_payload = queued_inputs.pop(0) if queued_inputs else None + + if queued_payload is not None: + source_text = str(queued_payload.get("source") or "external").strip() or "external" + user_input = str(queued_payload.get("command") or "").strip() + if user_input: + print(f"{prompt_text}{user_input} [queued:{source_text}]") + else: + user_input = "" + else: + user_input = session.prompt(prompt_text).strip() except (EOFError, KeyboardInterrupt): print("He who is victorious through deceit is defeated by the truth.") + repl_queue_stop.set() break if not user_input: @@ -2346,6 +2437,8 @@ Come to love it when others take what you share, as there is no greater joy if pipeline_ctx_ref: pipeline_ctx_ref.clear_current_command_text() + repl_queue_stop.set() + _PTK_Lexer = object # type: ignore diff --git a/MPV/LUA/main.lua b/MPV/LUA/main.lua index 35d3efa..c2933b1 100644 --- a/MPV/LUA/main.lua +++ b/MPV/LUA/main.lua @@ -4,7 +4,7 @@ local msg = require 'mp.msg' local M = {} -local MEDEIA_LUA_VERSION = '2025-12-24' +local MEDEIA_LUA_VERSION = '2026-03-19.1' -- Expose a tiny breadcrumb for debugging which script version is loaded. pcall(mp.set_property, 'user-data/medeia-lua-version', MEDEIA_LUA_VERSION) @@ -43,6 +43,8 @@ local LOAD_URL_MENU_TYPE = 'medios_load_url' local DOWNLOAD_FORMAT_MENU_TYPE = 'medios_download_pick_format' local DOWNLOAD_STORE_MENU_TYPE = 'medios_download_pick_store' local SCREENSHOT_TAG_MENU_TYPE = 'medeia_screenshot_tags' +local SCREENSHOT_SAVE_JOB_TIMEOUT = 120 +local SCREENSHOT_SAVE_JOB_POLL_INTERVAL = 0.75 -- Menu types for the command submenu and trim prompt local CMD_MENU_TYPE = 'medios_cmd_menu' @@ -595,6 +597,8 @@ local _resolve_python_exe local _refresh_store_cache local _uosc_open_list_picker local _run_pipeline_detached +local _run_pipeline_background_job +local _run_pipeline_cli_async local _cached_store_names = {} local _store_cache_loaded = false @@ -923,18 +927,19 @@ local function _run_helper_request_async(req, timeout_seconds, cb) local quiet = type(req) == 'table' and req.quiet and true or false local is_timeout = err_text:find('timeout waiting response', 1, true) ~= nil local retry_count = type(req) == 'table' and tonumber(req._retry or 0) or 0 - local is_retryable = is_timeout and type(req) == 'table' - and tostring(req.op or '') == 'ytdlp-formats' and retry_count < 1 + local op_name = type(req) == 'table' and tostring(req.op or '') or '' + local is_retryable = is_timeout and type(req) == 'table' and retry_count < 1 + and (op_name == 'ytdlp-formats' or op_name == 'run-background') if is_retryable then req._retry = retry_count + 1 req.id = nil _ipc_async_busy = false - _lua_log('ipc-async: timeout on ytdlp-formats; restarting helper and retrying (attempt ' .. tostring(req._retry) .. ')') + _lua_log('ipc-async: timeout on ' .. tostring(op_name) .. '; restarting helper and retrying (attempt ' .. tostring(req._retry) .. ')') pcall(mp.set_property, PIPELINE_READY_PROP, '') attempt_start_pipeline_helper_async(function(success) if success then - _lua_log('ipc-async: helper restart succeeded; retrying ytdlp-formats') + _lua_log('ipc-async: helper restart succeeded; retrying ' .. tostring(op_name)) else _lua_log('ipc-async: helper restart failed; retrying anyway') end @@ -1272,7 +1277,7 @@ local function _check_store_for_existing_url(store, url, cb) local query = 'url:' .. needle _lua_log('store-check: probing global query=' .. tostring(query)) - _run_helper_request_async({ op = 'url-exists', data = { url = url, needles = { needle } }, quiet = true }, 2.5, function(resp, err) + _run_helper_request_async({ op = 'url-exists', data = { url = url, needles = { needle } }, quiet = true }, 5.0, function(resp, err) if resp and resp.success then local data = resp.data if type(data) ~= 'table' or #data == 0 then @@ -1517,6 +1522,45 @@ local function _normalize_tag_list(value) return tags end +local function _queue_pipeline_in_repl(pipeline_cmd, queued_message, failure_prefix, queue_label) + pipeline_cmd = trim(tostring(pipeline_cmd or '')) + if pipeline_cmd == '' then + mp.osd_message((failure_prefix or 'REPL queue failed') .. ': empty pipeline command', 5) + return false + end + + _lua_log(queue_label .. ': queueing repl cmd=' .. pipeline_cmd) + ensure_mpv_ipc_server() + if not ensure_pipeline_helper_running() then + mp.osd_message((failure_prefix or 'REPL queue failed') .. ': helper not running', 5) + return false + end + + _run_helper_request_async( + { + op = 'queue-repl-command', + data = { + command = pipeline_cmd, + source = queue_label, + metadata = { kind = 'mpv-download' }, + }, + }, + 4.0, + function(resp, err) + if resp and resp.success then + local queue_path = trim(tostring(resp.path or '')) + _lua_log(queue_label .. ': queued repl command path=' .. tostring(queue_path)) + mp.osd_message(tostring(queued_message or 'Queued in REPL'), 3) + return + end + local detail = tostring(err or (resp and resp.error) or 'unknown') + _lua_log(queue_label .. ': queue failed err=' .. detail) + mp.osd_message((failure_prefix or 'REPL queue failed') .. ': ' .. detail, 5) + end + ) + return true +end + local function _start_screenshot_store_save(store, out_path, tags) store = _normalize_store_name(store) out_path = _normalize_fs_path(out_path) @@ -1527,21 +1571,32 @@ local function _start_screenshot_store_save(store, out_path, tags) local tag_list = _normalize_tag_list(tags) local cmd = 'add-file -store ' .. quote_pipeline_arg(store) - local seeds = { { path = out_path } } - if #tag_list > 0 then - seeds[1].tag = tag_list - end + .. ' -path ' .. quote_pipeline_arg(out_path) _set_selected_store(store) - if _run_pipeline_detached(cmd, function(_, err) - mp.osd_message('Screenshot upload failed to start: ' .. tostring(err or 'unknown'), 5) - end, seeds) then - local tag_suffix = (#tag_list > 0) and (' | tags: ' .. tostring(#tag_list)) or '' - mp.osd_message('Screenshot saved to store: ' .. store .. tag_suffix, 3) + local tag_suffix = (#tag_list > 0) and (' | tags: ' .. tostring(#tag_list)) or '' + if #tag_list > 0 then + local tag_string = table.concat(tag_list, ',') + cmd = cmd .. ' | add-tag ' .. quote_pipeline_arg(tag_string) + end + + _lua_log('screenshot-save: async pipeline cmd=' .. cmd) + + local started, start_err = _run_pipeline_cli_async(cmd, nil, function(success, _result, detail) + if success then + mp.osd_message('Screenshot saved to store: ' .. store .. tag_suffix, 3) + return + end + mp.osd_message('Screenshot upload failed: ' .. tostring(detail or 'unknown'), 5) + end) + + if started then + mp.osd_message('Screenshot upload started to store: ' .. store .. tag_suffix, 3) return true end - mp.osd_message('Screenshot upload failed to start', 5) + mp.osd_message('Screenshot upload failed to start: ' .. tostring(start_err or 'unknown'), 5) + return false end @@ -1556,6 +1611,9 @@ local function _commit_pending_screenshot(tags) end local function _apply_screenshot_tag_query(query) + pcall(function() + mp.commandv('script-message-to', 'uosc', 'close-menu', SCREENSHOT_TAG_MENU_TYPE) + end) _commit_pending_screenshot(_normalize_tag_list(query)) end @@ -2258,7 +2316,7 @@ _refresh_store_cache = function(timeout_seconds, on_complete) end _lua_log('stores: requesting store-choices via helper (fallback)') - _run_helper_request_async({ op = 'store-choices' }, math.max(timeout_seconds or 0, 3.0), function(resp, err) + _run_helper_request_async({ op = 'store-choices' }, math.max(timeout_seconds or 0, 6.0), function(resp, err) local success = false local changed = false if resp and resp.success and type(resp.choices) == 'table' then @@ -3375,10 +3433,10 @@ local function _resolve_cli_entrypoint() return configured ~= '' and configured or 'CLI.py' end -local function _run_pipeline_cli_detached(pipeline_cmd, seeds) +local function _build_pipeline_cli_args(pipeline_cmd, seeds) pipeline_cmd = trim(tostring(pipeline_cmd or '')) if pipeline_cmd == '' then - return false, 'empty pipeline command' + return nil, 'empty pipeline command' end local python = _resolve_python_exe(true) @@ -3386,7 +3444,7 @@ local function _run_pipeline_cli_detached(pipeline_cmd, seeds) python = _resolve_python_exe(false) end if not python or python == '' then - return false, 'python not found' + return nil, 'python not found' end local cli_path = _resolve_cli_entrypoint() @@ -3399,6 +3457,58 @@ local function _run_pipeline_cli_detached(pipeline_cmd, seeds) end end + return args, nil +end + +_run_pipeline_cli_async = function(pipeline_cmd, seeds, cb) + local args, build_err = _build_pipeline_cli_args(pipeline_cmd, seeds) + if type(args) ~= 'table' then + if type(cb) == 'function' then + cb(false, nil, tostring(build_err or 'invalid pipeline args')) + end + return false, tostring(build_err or 'invalid pipeline args') + end + + mp.command_native_async( + { + name = 'subprocess', + args = args, + capture_stdout = true, + capture_stderr = true, + playback_only = false, + }, + function(success, result, err) + if not success then + if type(cb) == 'function' then + cb(false, result, tostring(err or 'subprocess failed')) + end + return + end + + local ok = false + local detail = 'invalid subprocess result' + if type(result) == 'table' then + local err_text = trim(tostring(result.error or '')) + local status = tonumber(result.status) + ok = (err_text == '' or err_text == 'success') and (status == nil or status == 0) + detail = _describe_subprocess_result(result) + end + + if type(cb) == 'function' then + cb(ok, result, detail) + end + end + ) + + return true, nil +end + +local function _run_pipeline_cli_detached(pipeline_cmd, seeds) + local args, build_err = _build_pipeline_cli_args(pipeline_cmd, seeds) + if type(args) ~= 'table' then + return false, tostring(build_err or 'invalid pipeline args') + end + local cmd = { name = 'subprocess', args = args, @@ -3443,6 +3553,94 @@ _run_pipeline_detached = function(pipeline_cmd, on_failure, seeds) return true end +_run_pipeline_background_job = function(pipeline_cmd, seeds, on_started, on_complete, timeout_seconds, poll_interval_seconds) + pipeline_cmd = trim(tostring(pipeline_cmd or '')) + if pipeline_cmd == '' then + if type(on_complete) == 'function' then + on_complete(nil, 'empty pipeline command') + end + return false + end + + ensure_mpv_ipc_server() + if not ensure_pipeline_helper_running() then + if type(on_complete) == 'function' then + on_complete(nil, 'helper not running') + end + return false + end + + _run_helper_request_async({ op = 'run-background', data = { pipeline = pipeline_cmd, seeds = seeds } }, 8.0, function(resp, err) + if err or not resp or not resp.success then + if type(on_complete) == 'function' then + on_complete(nil, err or (resp and resp.error) or 'failed to start background job') + end + return + end + + local job_id = trim(tostring(resp.job_id or '')) + if job_id == '' then + if type(on_complete) == 'function' then + on_complete(nil, 'missing background job id') + end + return + end + + if type(on_started) == 'function' then + on_started(job_id, resp) + end + + local deadline = mp.get_time() + math.max(tonumber(timeout_seconds or 0) or 0, 15.0) + local poll_interval = math.max(tonumber(poll_interval_seconds or 0) or 0, 0.25) + local poll_inflight = false + local timer = nil + + local function finish(job, finish_err) + if timer then + timer:kill() + timer = nil + end + if type(on_complete) == 'function' then + on_complete(job, finish_err) + end + end + + timer = mp.add_periodic_timer(poll_interval, function() + if poll_inflight then + return + end + if mp.get_time() >= deadline then + finish(nil, 'timeout waiting background job') + return + end + + poll_inflight = true + _run_helper_request_async({ op = 'job-status', data = { job_id = job_id }, quiet = true }, math.max(poll_interval + 0.75, 1.25), function(status_resp, status_err) + poll_inflight = false + + if status_err then + _lua_log('background-job: poll retry job=' .. tostring(job_id) .. ' err=' .. tostring(status_err)) + return + end + + if not status_resp or not status_resp.success then + local status_error = status_resp and status_resp.error or 'job status unavailable' + finish(nil, status_error) + return + end + + local job = status_resp.job + local status = type(job) == 'table' and tostring(job.status or '') or tostring(status_resp.status or '') + if status == 'success' or status == 'failed' then + finish(job, nil) + end + end) + end) + end) + + return true +end + local function _open_save_location_picker_for_pending_download() if type(_pending_download) ~= 'table' or not _pending_download.url or not _pending_download.format then return @@ -3562,13 +3760,12 @@ local function _start_download_flow_for_current() end ensure_mpv_ipc_server() local pipeline_cmd = 'get-file -store ' .. quote_pipeline_arg(store_hash.store) .. ' -query ' .. quote_pipeline_arg('hash:' .. store_hash.hash) .. ' -path ' .. quote_pipeline_arg(folder) - if _run_pipeline_detached(pipeline_cmd, function(_, err) - mp.osd_message('Download failed to start: ' .. tostring(err or 'unknown'), 5) - end) then - mp.osd_message('Download started', 2) - else - mp.osd_message('Download failed to start', 5) - end + _queue_pipeline_in_repl( + pipeline_cmd, + 'Queued in REPL: store copy', + 'REPL queue failed', + 'download-store-copy' + ) return end @@ -3765,13 +3962,12 @@ mp.register_script_message('medios-download-pick-store', function(json) .. ' | add-file -store ' .. quote_pipeline_arg(store) _set_selected_store(store) - if _run_pipeline_detached(pipeline_cmd, function(_, err) - mp.osd_message('Download failed to start: ' .. tostring(err or 'unknown'), 5) - end) then - mp.osd_message('Download started', 3) - else - mp.osd_message('Download failed to start', 5) - end + _queue_pipeline_in_repl( + pipeline_cmd, + 'Queued in REPL: save to store ' .. store, + 'REPL queue failed', + 'download-store-save' + ) _pending_download = nil end) @@ -3796,13 +3992,12 @@ mp.register_script_message('medios-download-pick-path', function() .. ' -query ' .. quote_pipeline_arg('format:' .. fmt) .. ' | add-file -path ' .. quote_pipeline_arg(folder) - if _run_pipeline_detached(pipeline_cmd, function(_, err) - mp.osd_message('Download failed to start: ' .. tostring(err or 'unknown'), 5) - end) then - mp.osd_message('Download started', 3) - else - mp.osd_message('Download failed to start', 5) - end + _queue_pipeline_in_repl( + pipeline_cmd, + 'Queued in REPL: save to folder', + 'REPL queue failed', + 'download-folder-save' + ) _pending_download = nil end) diff --git a/MPV/pipeline_helper.py b/MPV/pipeline_helper.py index fb7feb4..ff39bc7 100644 --- a/MPV/pipeline_helper.py +++ b/MPV/pipeline_helper.py @@ -20,7 +20,7 @@ This helper is intentionally minimal: one request at a time, last-write-wins. from __future__ import annotations -MEDEIA_MPV_HELPER_VERSION = "2025-12-19" +MEDEIA_MPV_HELPER_VERSION = "2026-03-19.1" import argparse import json @@ -65,6 +65,7 @@ if _ROOT not in sys.path: from MPV.mpv_ipc import MPVIPCClient # noqa: E402 from SYS.config import load_config # noqa: E402 from SYS.logger import set_debug, debug, set_thread_stream # noqa: E402 +from SYS.repl_queue import enqueue_repl_command # noqa: E402 from SYS.utils import format_bytes # noqa: E402 REQUEST_PROP = "user-data/medeia-pipeline-request" @@ -76,6 +77,69 @@ OBS_ID_REQUEST = 1001 _HELPER_MPV_LOG_EMITTER: Optional[Callable[[str], None]] = None _HELPER_LOG_BACKLOG: list[str] = [] _HELPER_LOG_BACKLOG_LIMIT = 200 +_ASYNC_PIPELINE_JOBS: Dict[str, Dict[str, Any]] = {} +_ASYNC_PIPELINE_JOBS_LOCK = threading.Lock() +_ASYNC_PIPELINE_JOB_TTL_SECONDS = 900.0 + + +def _prune_async_pipeline_jobs(now: Optional[float] = None) -> None: + cutoff = float(now or time.time()) - _ASYNC_PIPELINE_JOB_TTL_SECONDS + with _ASYNC_PIPELINE_JOBS_LOCK: + expired = [ + job_id + for job_id, job in _ASYNC_PIPELINE_JOBS.items() + if float(job.get("updated_at") or 0.0) < cutoff + ] + for job_id in expired: + _ASYNC_PIPELINE_JOBS.pop(job_id, None) + + +def _update_async_pipeline_job(job_id: str, **fields: Any) -> Optional[Dict[str, Any]]: + if not job_id: + return None + + now = time.time() + _prune_async_pipeline_jobs(now) + + with _ASYNC_PIPELINE_JOBS_LOCK: + job = dict(_ASYNC_PIPELINE_JOBS.get(job_id) or {}) + if not job: + job = { + "job_id": job_id, + "status": "queued", + "success": False, + "created_at": now, + } + job.update(fields) + job["updated_at"] = now + _ASYNC_PIPELINE_JOBS[job_id] = job + return dict(job) + + +def _get_async_pipeline_job(job_id: str) -> Optional[Dict[str, Any]]: + if not job_id: + return None + _prune_async_pipeline_jobs() + with _ASYNC_PIPELINE_JOBS_LOCK: + job = _ASYNC_PIPELINE_JOBS.get(job_id) + return dict(job) if job else None + + +def _append_prefixed_log_lines(prefix: str, text: Any, *, max_lines: int = 40) -> None: + payload = str(text or "").replace("\r\n", "\n").replace("\r", "\n") + if not payload.strip(): + return + + emitted = 0 + for line in payload.split("\n"): + line = line.rstrip() + if not line: + continue + _append_helper_log(f"{prefix}{line}") + emitted += 1 + if emitted >= max_lines: + _append_helper_log(f"{prefix}... truncated after {max_lines} lines") + break def _start_ready_heartbeat(ipc_path: str, stop_event: threading.Event) -> threading.Thread: @@ -229,17 +293,80 @@ def _run_pipeline( } -def _run_pipeline_background(pipeline_text: str, *, seeds: Any, req_id: str) -> None: +def _run_pipeline_background( + pipeline_text: str, + *, + seeds: Any, + req_id: str, + job_id: Optional[str] = None, + json_output: bool = False, +) -> None: def _target() -> None: try: - result = _run_pipeline(pipeline_text, seeds=seeds) + if job_id: + _update_async_pipeline_job( + job_id, + status="running", + success=False, + pipeline=pipeline_text, + req_id=req_id, + started_at=time.time(), + finished_at=None, + error=None, + stdout="", + stderr="", + table=None, + data=None, + ) + + result = _run_pipeline( + pipeline_text, + seeds=seeds, + json_output=json_output, + ) status = "success" if result.get("success") else "failed" + if job_id: + _update_async_pipeline_job( + job_id, + status=status, + success=bool(result.get("success")), + finished_at=time.time(), + error=result.get("error"), + stdout=result.get("stdout", ""), + stderr=result.get("stderr", ""), + table=result.get("table"), + data=result.get("data"), + ) _append_helper_log( - f"[pipeline async {req_id}] {status} error={result.get('error')}" + f"[pipeline async {req_id}] {status}" + + (f" job={job_id}" if job_id else "") + + f" error={result.get('error')}" + ) + _append_prefixed_log_lines( + f"[pipeline async stdout {req_id}] ", + result.get("stdout", ""), + ) + _append_prefixed_log_lines( + f"[pipeline async stderr {req_id}] ", + result.get("stderr", ""), ) except Exception as exc: # pragma: no cover - best-effort logging + if job_id: + _update_async_pipeline_job( + job_id, + status="failed", + success=False, + finished_at=time.time(), + error=f"{type(exc).__name__}: {exc}", + stdout="", + stderr="", + table=None, + data=None, + ) _append_helper_log( - f"[pipeline async {req_id}] exception: {type(exc).__name__}: {exc}" + f"[pipeline async {req_id}] exception" + + (f" job={job_id}" if job_id else "") + + f": {type(exc).__name__}: {exc}" ) thread = threading.Thread( @@ -263,6 +390,141 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]: """ op_name = str(op or "").strip().lower() + if op_name in {"run-background", + "run_background", + "pipeline-background", + "pipeline_background"}: + pipeline_text = "" + seeds = None + json_output = False + requested_job_id = "" + if isinstance(data, dict): + pipeline_text = str(data.get("pipeline") or "").strip() + seeds = data.get("seeds") + json_output = bool(data.get("json") or data.get("output_json")) + requested_job_id = str(data.get("job_id") or "").strip() + + if not pipeline_text: + return { + "success": False, + "stdout": "", + "stderr": "", + "error": "Missing pipeline", + "table": None, + } + + token = requested_job_id or f"job-{int(time.time() * 1000)}-{threading.get_ident()}" + job_id = hashlib.sha1( + f"{token}|{pipeline_text}|{time.time()}".encode("utf-8", "ignore") + ).hexdigest()[:16] + + _update_async_pipeline_job( + job_id, + status="queued", + success=False, + pipeline=pipeline_text, + req_id=job_id, + finished_at=None, + error=None, + stdout="", + stderr="", + table=None, + data=None, + ) + _run_pipeline_background( + pipeline_text, + seeds=seeds, + req_id=job_id, + job_id=job_id, + json_output=json_output, + ) + return { + "success": True, + "stdout": "", + "stderr": "", + "error": None, + "table": None, + "job_id": job_id, + "status": "queued", + } + + if op_name in {"job-status", + "job_status", + "pipeline-job-status", + "pipeline_job_status"}: + job_id = "" + if isinstance(data, dict): + job_id = str(data.get("job_id") or "").strip() + if not job_id: + return { + "success": False, + "stdout": "", + "stderr": "", + "error": "Missing job_id", + "table": None, + } + + job = _get_async_pipeline_job(job_id) + if not job: + return { + "success": False, + "stdout": "", + "stderr": "", + "error": f"Unknown job_id: {job_id}", + "table": None, + } + + payload = dict(job) + return { + "success": True, + "stdout": str(payload.get("stdout") or ""), + "stderr": str(payload.get("stderr") or ""), + "error": payload.get("error"), + "table": payload.get("table"), + "data": payload.get("data"), + "job": payload, + } + + if op_name in {"queue-repl-command", + "queue_repl_command", + "repl-command", + "repl_command"}: + command_text = "" + source = "mpv" + metadata = None + if isinstance(data, dict): + command_text = str(data.get("command") or data.get("pipeline") or "").strip() + source = str(data.get("source") or "mpv").strip() or "mpv" + metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else None + + if not command_text: + return { + "success": False, + "stdout": "", + "stderr": "", + "error": "Missing command", + "table": None, + } + + queue_path = enqueue_repl_command( + _repo_root(), + command_text, + source=source, + metadata=metadata, + ) + _append_helper_log( + f"[repl-queue] queued source={source} path={queue_path.name} cmd={command_text}" + ) + return { + "success": True, + "stdout": "", + "stderr": "", + "error": None, + "table": None, + "path": str(queue_path), + "queued": True, + } + if op_name in {"run-detached", "run_detached", "pipeline-detached", @@ -810,6 +1072,59 @@ def _parse_request(data: Any) -> Optional[Dict[str, Any]]: return None +def _start_request_poll_loop( + ipc_path: str, + stop_event: threading.Event, + handle_request: Callable[[Any, str], bool], +) -> threading.Thread: + """Poll the request property on a separate IPC connection. + + Windows named-pipe event delivery can stall even while direct get/set + property commands still work. Polling the request property provides a more + reliable fallback transport for helper ops and pipeline dispatch. + """ + + def _poll_loop() -> None: + poll_client = MPVIPCClient(socket_path=ipc_path, timeout=0.75, silent=True) + while not stop_event.is_set(): + try: + if poll_client.sock is None and not poll_client.connect(): + stop_event.wait(0.10) + continue + + resp = poll_client.send_command(["get_property", REQUEST_PROP]) + if not resp: + try: + poll_client.disconnect() + except Exception: + pass + stop_event.wait(0.10) + continue + + if resp.get("error") == "success": + handle_request(resp.get("data"), "poll") + stop_event.wait(0.05) + except Exception: + try: + poll_client.disconnect() + except Exception: + pass + stop_event.wait(0.15) + + try: + poll_client.disconnect() + except Exception: + pass + + thread = threading.Thread( + target=_poll_loop, + name="mpv-helper-request-poll", + daemon=True, + ) + thread.start() + return thread + + def main(argv: Optional[list[str]] = None) -> int: parser = argparse.ArgumentParser(prog="mpv-pipeline-helper") parser.add_argument("--ipc", required=True, help="mpv --input-ipc-server path") @@ -917,6 +1232,10 @@ def main(argv: Optional[list[str]] = None) -> int: except Exception: error_log_dir = Path(tempfile.gettempdir()) last_error_log = error_log_dir / "medeia-mpv-pipeline-last-error.log" + seen_request_ids: Dict[str, float] = {} + seen_request_ids_lock = threading.Lock() + seen_request_ttl_seconds = 180.0 + request_processing_lock = threading.Lock() def _write_error_log(text: str, *, req_id: str) -> Optional[str]: try: @@ -941,6 +1260,157 @@ def main(argv: Optional[list[str]] = None) -> int: return str(stamped) if stamped else str(last_error_log) + def _mark_request_seen(req_id: str) -> bool: + if not req_id: + return False + now = time.time() + cutoff = now - seen_request_ttl_seconds + with seen_request_ids_lock: + expired = [key for key, ts in seen_request_ids.items() if ts < cutoff] + for key in expired: + seen_request_ids.pop(key, None) + if req_id in seen_request_ids: + return False + seen_request_ids[req_id] = now + return True + + def _publish_response(resp: Dict[str, Any]) -> None: + response_client = MPVIPCClient(socket_path=str(args.ipc), timeout=0.75, silent=True) + try: + response_client.send_command_no_wait( + [ + "set_property_string", + RESPONSE_PROP, + json.dumps(resp, ensure_ascii=False), + ] + ) + finally: + try: + response_client.disconnect() + except Exception: + pass + + def _process_request(raw: Any, source: str) -> bool: + req = _parse_request(raw) + if not req: + try: + if isinstance(raw, str) and raw.strip(): + snippet = raw.strip().replace("\r", "").replace("\n", " ") + if len(snippet) > 220: + snippet = snippet[:220] + "…" + _append_helper_log( + f"[request-raw {source}] could not parse request json: {snippet}" + ) + except Exception: + pass + return False + + req_id = str(req.get("id") or "") + op = str(req.get("op") or "").strip() + data = req.get("data") + pipeline_text = str(req.get("pipeline") or "").strip() + seeds = req.get("seeds") + json_output = bool(req.get("json") or req.get("output_json")) + + if not req_id: + return False + + if not _mark_request_seen(req_id): + return False + + try: + label = pipeline_text if pipeline_text else (op and ("op=" + op) or "(empty)") + _append_helper_log(f"\n[request {req_id} via {source}] {label}") + except Exception: + pass + + with request_processing_lock: + async_dispatch = False + try: + if op: + run = _run_op(op, data) + else: + if not pipeline_text: + return False + if _is_load_url_pipeline(pipeline_text): + async_dispatch = True + run = { + "success": True, + "stdout": "", + "stderr": "", + "error": "", + "table": None, + } + _run_pipeline_background( + pipeline_text, + seeds=seeds, + req_id=req_id, + ) + else: + run = _run_pipeline( + pipeline_text, + seeds=seeds, + json_output=json_output, + ) + + resp = { + "id": req_id, + "success": bool(run.get("success")), + "stdout": run.get("stdout", ""), + "stderr": run.get("stderr", ""), + "error": run.get("error"), + "table": run.get("table"), + "data": run.get("data"), + } + if "choices" in run: + resp["choices"] = run.get("choices") + if "job_id" in run: + resp["job_id"] = run.get("job_id") + if "job" in run: + resp["job"] = run.get("job") + if "status" in run: + resp["status"] = run.get("status") + if "pid" in run: + resp["pid"] = run.get("pid") + if async_dispatch: + resp["info"] = "queued asynchronously" + except Exception as exc: + resp = { + "id": req_id, + "success": False, + "stdout": "", + "stderr": "", + "error": f"{type(exc).__name__}: {exc}", + "table": None, + } + + try: + if resp.get("stdout"): + _append_helper_log("[stdout]\n" + str(resp.get("stdout"))) + if resp.get("stderr"): + _append_helper_log("[stderr]\n" + str(resp.get("stderr"))) + if resp.get("error"): + _append_helper_log("[error]\n" + str(resp.get("error"))) + except Exception: + pass + + if not resp.get("success"): + details = "" + if resp.get("error"): + details += str(resp.get("error")) + if resp.get("stderr"): + details = (details + "\n" if details else "") + str(resp.get("stderr")) + log_path = _write_error_log(details, req_id=req_id) + if log_path: + resp["log_path"] = log_path + + try: + _publish_response(resp) + except Exception: + pass + + return True + # Connect to mpv's JSON IPC. On Windows, the pipe can exist but reject opens # briefly during startup; also mpv may create the IPC server slightly after # the Lua script launches us. Retry until timeout. @@ -1047,6 +1517,7 @@ def main(argv: Optional[list[str]] = None) -> int: heartbeat_stop = threading.Event() _start_ready_heartbeat(str(args.ipc), heartbeat_stop) + _start_request_poll_loop(str(args.ipc), heartbeat_stop, _process_request) # Pre-compute store choices at startup and publish to a cached property so Lua # can read immediately without waiting for a request/response cycle (which may timeout). @@ -1133,8 +1604,6 @@ def main(argv: Optional[list[str]] = None) -> int: f"[helper] failed to publish ytdlp domains: {type(exc).__name__}: {exc}" ) - last_seen_id: Optional[str] = None - try: _append_helper_log(f"[helper] connected to ipc={args.ipc}") except Exception: @@ -1200,131 +1669,7 @@ def main(argv: Optional[list[str]] = None) -> int: if msg.get("id") != OBS_ID_REQUEST: continue - raw = msg.get("data") - req = _parse_request(raw) - if not req: - try: - if isinstance(raw, str) and raw.strip(): - snippet = raw.strip().replace("\r", "").replace("\n", " ") - if len(snippet) > 220: - snippet = snippet[:220] + "…" - _append_helper_log( - f"[request-raw] could not parse request json: {snippet}" - ) - except Exception: - pass - continue - - req_id = str(req.get("id") or "") - op = str(req.get("op") or "").strip() - data = req.get("data") - pipeline_text = str(req.get("pipeline") or "").strip() - seeds = req.get("seeds") - json_output = bool(req.get("json") or req.get("output_json")) - - if not req_id: - continue - - if last_seen_id == req_id: - continue - last_seen_id = req_id - - try: - label = pipeline_text if pipeline_text else ( - op and ("op=" + op) or "(empty)" - ) - _append_helper_log(f"\n[request {req_id}] {label}") - except Exception: - pass - - async_dispatch = False - try: - if op: - run = _run_op(op, data) - else: - if not pipeline_text: - continue - if _is_load_url_pipeline(pipeline_text): - async_dispatch = True - run = { - "success": True, - "stdout": "", - "stderr": "", - "error": "", - "table": None, - } - _run_pipeline_background( - pipeline_text, - seeds=seeds, - req_id=req_id, - ) - else: - run = _run_pipeline( - pipeline_text, - seeds=seeds, - json_output=json_output, - ) - - resp = { - "id": req_id, - "success": bool(run.get("success")), - "stdout": run.get("stdout", - ""), - "stderr": run.get("stderr", - ""), - "error": run.get("error"), - "table": run.get("table"), - "data": run.get("data"), - } - if "choices" in run: - resp["choices"] = run.get("choices") - if async_dispatch: - resp["info"] = "queued asynchronously" - except Exception as exc: - resp = { - "id": req_id, - "success": False, - "stdout": "", - "stderr": "", - "error": f"{type(exc).__name__}: {exc}", - "table": None, - } - - # Persist helper output for debugging MPV menu interactions. - try: - if resp.get("stdout"): - _append_helper_log("[stdout]\n" + str(resp.get("stdout"))) - if resp.get("stderr"): - _append_helper_log("[stderr]\n" + str(resp.get("stderr"))) - if resp.get("error"): - _append_helper_log("[error]\n" + str(resp.get("error"))) - except Exception: - pass - - if not resp.get("success"): - details = "" - if resp.get("error"): - details += str(resp.get("error")) - if resp.get("stderr"): - details = (details + "\n" if details else "") + str(resp.get("stderr")) - log_path = _write_error_log(details, req_id=req_id) - if log_path: - resp["log_path"] = log_path - - try: - # IMPORTANT: don't wait for a response here; waiting would consume - # async events and can drop/skip property-change notifications. - client.send_command_no_wait( - [ - "set_property_string", - RESPONSE_PROP, - json.dumps(resp, - ensure_ascii=False) - ] - ) - except Exception: - # If posting results fails, there's nothing more useful to do. - pass + _process_request(msg.get("data"), "observe") if __name__ == "__main__": diff --git a/MPV/portable_config/script-opts/medeia.conf b/MPV/portable_config/script-opts/medeia.conf index 3d30edf..7606177 100644 --- a/MPV/portable_config/script-opts/medeia.conf +++ b/MPV/portable_config/script-opts/medeia.conf @@ -1,2 +1,2 @@ # Medeia MPV script options -store=local +store=rpi diff --git a/SYS/pipeline.py b/SYS/pipeline.py index 6f9fc43..facdfe2 100644 --- a/SYS/pipeline.py +++ b/SYS/pipeline.py @@ -1778,6 +1778,9 @@ class PipelineExecutor: return [str(x) for x in candidate if x is not None] return None + def _norm_cmd_name(value: Any) -> str: + return str(value or "").replace("_", "-").strip().lower() + # ============================================================================ # PHASE 2: Parse source command and table metadata # ============================================================================ @@ -1841,11 +1844,23 @@ class PipelineExecutor: else: selected_row_args: List[str] = [] skip_pipe_expansion = source_cmd in {".pipe", ".mpv"} and len(stages) > 0 + prefer_row_action = False + if len(selection_indices) == 1 and not stages: + try: + row_action = _get_row_action(selection_indices[0]) + except Exception: + row_action = None + if row_action: + prefer_row_action = True + debug( + "@N: skipping source command expansion because row has explicit selection_action " + f"{row_action}" + ) # Command expansion via @N: # - Default behavior: expand ONLY for single-row selections. # - Special case: allow multi-row expansion for add-file directory tables by # combining selected rows into a single `-path file1,file2,...` argument. - if source_cmd and not skip_pipe_expansion: + if source_cmd and not skip_pipe_expansion and not prefer_row_action: src = str(source_cmd).replace("_", "-").strip().lower() if src == "add-file" and selection_indices: @@ -2119,7 +2134,7 @@ class PipelineExecutor: source_args_for_selection = [] if not stages and selection_indices and source_cmd_for_selection: - src_norm = _norm_cmd(source_cmd_for_selection) + src_norm = _norm_cmd_name(source_cmd_for_selection) if src_norm in {".worker", "worker", "workers"}: if len(selection_indices) == 1: idx = selection_indices[0] @@ -2197,7 +2212,7 @@ class PipelineExecutor: logger.exception("Failed to record pipeline log step for applied row action (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None)) else: first_cmd = stages[0][0] if stages and stages[0] else None - first_cmd_norm = _norm_cmd(first_cmd) + first_cmd_norm = _norm_cmd_name(first_cmd) inserted_provider_download = False if first_cmd_norm == "add-file": @@ -2205,7 +2220,7 @@ class PipelineExecutor: # run download before add-file so add-file receives local files. if len(selection_indices) == 1: row_action = _get_row_action(selection_indices[0], items_list) - if row_action and _norm_cmd(row_action[0]) == "download-file": + if row_action and _norm_cmd_name(row_action[0]) == "download-file": stages.insert(0, [str(x) for x in row_action if x is not None]) inserted_provider_download = True debug("Auto-inserting row download-file action before add-file") @@ -2218,7 +2233,7 @@ class PipelineExecutor: has_download_row_action = False for idx in selection_indices: row_action = _get_row_action(idx, items_list) - if row_action and _norm_cmd(row_action[0]) == "download-file": + if row_action and _norm_cmd_name(row_action[0]) == "download-file": has_download_row_action = True break if has_download_row_action: @@ -2237,8 +2252,8 @@ class PipelineExecutor: print("Auto-inserting get-tag after metadata selection") stages.insert(0, ["get-tag"]) elif auto_stage: - first_cmd_norm = _norm_cmd(stages[0][0] if stages and stages[0] else None) - auto_cmd_norm = _norm_cmd(auto_stage[0]) + first_cmd_norm = _norm_cmd_name(stages[0][0] if stages and stages[0] else None) + auto_cmd_norm = _norm_cmd_name(auto_stage[0]) if first_cmd_norm not in (auto_cmd_norm, ".pipe", ".mpv"): debug(f"Auto-inserting {auto_cmd_norm} after selection") # Insert the auto stage before the user-specified stage diff --git a/SYS/repl_queue.py b/SYS/repl_queue.py new file mode 100644 index 0000000..3a9db69 --- /dev/null +++ b/SYS/repl_queue.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import json +import time +import uuid +from pathlib import Path +from typing import Any, Dict, List, Optional + + +def repl_queue_dir(root: Path) -> Path: + return Path(root) / "Log" / "repl_queue" + + +def enqueue_repl_command( + root: Path, + command: str, + *, + source: str = "external", + metadata: Optional[Dict[str, Any]] = None, +) -> Path: + queue_dir = repl_queue_dir(root) + queue_dir.mkdir(parents=True, exist_ok=True) + + payload: Dict[str, Any] = { + "id": uuid.uuid4().hex, + "command": str(command or "").strip(), + "source": str(source or "external").strip() or "external", + "created_at": time.time(), + } + if isinstance(metadata, dict) and metadata: + payload["metadata"] = metadata + + stamp = int(time.time() * 1000) + token = payload["id"][:8] + final_path = queue_dir / f"{stamp:013d}-{token}.json" + temp_path = final_path.with_suffix(".tmp") + temp_path.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8") + temp_path.replace(final_path) + return final_path + + +def pop_repl_commands(root: Path, *, limit: int = 8) -> List[Dict[str, Any]]: + queue_dir = repl_queue_dir(root) + if not queue_dir.exists(): + return [] + + items: List[Dict[str, Any]] = [] + for entry in sorted(queue_dir.glob("*.json"))[: max(1, int(limit or 1))]: + try: + payload = json.loads(entry.read_text(encoding="utf-8")) + except Exception: + payload = { + "id": entry.stem, + "command": "", + "source": "invalid", + "created_at": entry.stat().st_mtime, + } + try: + entry.unlink() + except Exception: + continue + if isinstance(payload, dict): + items.append(payload) + return items \ No newline at end of file