added repl injection

This commit is contained in:
2026-03-18 20:17:28 -07:00
parent 7c526784a8
commit 5cbc2c09df
7 changed files with 896 additions and 184 deletions

View File

@@ -20,7 +20,7 @@ This helper is intentionally minimal: one request at a time, last-write-wins.
from __future__ import annotations
MEDEIA_MPV_HELPER_VERSION = "2025-12-19"
MEDEIA_MPV_HELPER_VERSION = "2026-03-19.1"
import argparse
import json
@@ -65,6 +65,7 @@ if _ROOT not in sys.path:
from MPV.mpv_ipc import MPVIPCClient # noqa: E402
from SYS.config import load_config # noqa: E402
from SYS.logger import set_debug, debug, set_thread_stream # noqa: E402
from SYS.repl_queue import enqueue_repl_command # noqa: E402
from SYS.utils import format_bytes # noqa: E402
REQUEST_PROP = "user-data/medeia-pipeline-request"
@@ -76,6 +77,69 @@ OBS_ID_REQUEST = 1001
_HELPER_MPV_LOG_EMITTER: Optional[Callable[[str], None]] = None
_HELPER_LOG_BACKLOG: list[str] = []
_HELPER_LOG_BACKLOG_LIMIT = 200
_ASYNC_PIPELINE_JOBS: Dict[str, Dict[str, Any]] = {}
_ASYNC_PIPELINE_JOBS_LOCK = threading.Lock()
_ASYNC_PIPELINE_JOB_TTL_SECONDS = 900.0
def _prune_async_pipeline_jobs(now: Optional[float] = None) -> None:
cutoff = float(now or time.time()) - _ASYNC_PIPELINE_JOB_TTL_SECONDS
with _ASYNC_PIPELINE_JOBS_LOCK:
expired = [
job_id
for job_id, job in _ASYNC_PIPELINE_JOBS.items()
if float(job.get("updated_at") or 0.0) < cutoff
]
for job_id in expired:
_ASYNC_PIPELINE_JOBS.pop(job_id, None)
def _update_async_pipeline_job(job_id: str, **fields: Any) -> Optional[Dict[str, Any]]:
if not job_id:
return None
now = time.time()
_prune_async_pipeline_jobs(now)
with _ASYNC_PIPELINE_JOBS_LOCK:
job = dict(_ASYNC_PIPELINE_JOBS.get(job_id) or {})
if not job:
job = {
"job_id": job_id,
"status": "queued",
"success": False,
"created_at": now,
}
job.update(fields)
job["updated_at"] = now
_ASYNC_PIPELINE_JOBS[job_id] = job
return dict(job)
def _get_async_pipeline_job(job_id: str) -> Optional[Dict[str, Any]]:
if not job_id:
return None
_prune_async_pipeline_jobs()
with _ASYNC_PIPELINE_JOBS_LOCK:
job = _ASYNC_PIPELINE_JOBS.get(job_id)
return dict(job) if job else None
def _append_prefixed_log_lines(prefix: str, text: Any, *, max_lines: int = 40) -> None:
payload = str(text or "").replace("\r\n", "\n").replace("\r", "\n")
if not payload.strip():
return
emitted = 0
for line in payload.split("\n"):
line = line.rstrip()
if not line:
continue
_append_helper_log(f"{prefix}{line}")
emitted += 1
if emitted >= max_lines:
_append_helper_log(f"{prefix}... truncated after {max_lines} lines")
break
def _start_ready_heartbeat(ipc_path: str, stop_event: threading.Event) -> threading.Thread:
@@ -229,17 +293,80 @@ def _run_pipeline(
}
def _run_pipeline_background(pipeline_text: str, *, seeds: Any, req_id: str) -> None:
def _run_pipeline_background(
pipeline_text: str,
*,
seeds: Any,
req_id: str,
job_id: Optional[str] = None,
json_output: bool = False,
) -> None:
def _target() -> None:
try:
result = _run_pipeline(pipeline_text, seeds=seeds)
if job_id:
_update_async_pipeline_job(
job_id,
status="running",
success=False,
pipeline=pipeline_text,
req_id=req_id,
started_at=time.time(),
finished_at=None,
error=None,
stdout="",
stderr="",
table=None,
data=None,
)
result = _run_pipeline(
pipeline_text,
seeds=seeds,
json_output=json_output,
)
status = "success" if result.get("success") else "failed"
if job_id:
_update_async_pipeline_job(
job_id,
status=status,
success=bool(result.get("success")),
finished_at=time.time(),
error=result.get("error"),
stdout=result.get("stdout", ""),
stderr=result.get("stderr", ""),
table=result.get("table"),
data=result.get("data"),
)
_append_helper_log(
f"[pipeline async {req_id}] {status} error={result.get('error')}"
f"[pipeline async {req_id}] {status}"
+ (f" job={job_id}" if job_id else "")
+ f" error={result.get('error')}"
)
_append_prefixed_log_lines(
f"[pipeline async stdout {req_id}] ",
result.get("stdout", ""),
)
_append_prefixed_log_lines(
f"[pipeline async stderr {req_id}] ",
result.get("stderr", ""),
)
except Exception as exc: # pragma: no cover - best-effort logging
if job_id:
_update_async_pipeline_job(
job_id,
status="failed",
success=False,
finished_at=time.time(),
error=f"{type(exc).__name__}: {exc}",
stdout="",
stderr="",
table=None,
data=None,
)
_append_helper_log(
f"[pipeline async {req_id}] exception: {type(exc).__name__}: {exc}"
f"[pipeline async {req_id}] exception"
+ (f" job={job_id}" if job_id else "")
+ f": {type(exc).__name__}: {exc}"
)
thread = threading.Thread(
@@ -263,6 +390,141 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]:
"""
op_name = str(op or "").strip().lower()
if op_name in {"run-background",
"run_background",
"pipeline-background",
"pipeline_background"}:
pipeline_text = ""
seeds = None
json_output = False
requested_job_id = ""
if isinstance(data, dict):
pipeline_text = str(data.get("pipeline") or "").strip()
seeds = data.get("seeds")
json_output = bool(data.get("json") or data.get("output_json"))
requested_job_id = str(data.get("job_id") or "").strip()
if not pipeline_text:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": "Missing pipeline",
"table": None,
}
token = requested_job_id or f"job-{int(time.time() * 1000)}-{threading.get_ident()}"
job_id = hashlib.sha1(
f"{token}|{pipeline_text}|{time.time()}".encode("utf-8", "ignore")
).hexdigest()[:16]
_update_async_pipeline_job(
job_id,
status="queued",
success=False,
pipeline=pipeline_text,
req_id=job_id,
finished_at=None,
error=None,
stdout="",
stderr="",
table=None,
data=None,
)
_run_pipeline_background(
pipeline_text,
seeds=seeds,
req_id=job_id,
job_id=job_id,
json_output=json_output,
)
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": None,
"job_id": job_id,
"status": "queued",
}
if op_name in {"job-status",
"job_status",
"pipeline-job-status",
"pipeline_job_status"}:
job_id = ""
if isinstance(data, dict):
job_id = str(data.get("job_id") or "").strip()
if not job_id:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": "Missing job_id",
"table": None,
}
job = _get_async_pipeline_job(job_id)
if not job:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": f"Unknown job_id: {job_id}",
"table": None,
}
payload = dict(job)
return {
"success": True,
"stdout": str(payload.get("stdout") or ""),
"stderr": str(payload.get("stderr") or ""),
"error": payload.get("error"),
"table": payload.get("table"),
"data": payload.get("data"),
"job": payload,
}
if op_name in {"queue-repl-command",
"queue_repl_command",
"repl-command",
"repl_command"}:
command_text = ""
source = "mpv"
metadata = None
if isinstance(data, dict):
command_text = str(data.get("command") or data.get("pipeline") or "").strip()
source = str(data.get("source") or "mpv").strip() or "mpv"
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else None
if not command_text:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": "Missing command",
"table": None,
}
queue_path = enqueue_repl_command(
_repo_root(),
command_text,
source=source,
metadata=metadata,
)
_append_helper_log(
f"[repl-queue] queued source={source} path={queue_path.name} cmd={command_text}"
)
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": None,
"path": str(queue_path),
"queued": True,
}
if op_name in {"run-detached",
"run_detached",
"pipeline-detached",
@@ -810,6 +1072,59 @@ def _parse_request(data: Any) -> Optional[Dict[str, Any]]:
return None
def _start_request_poll_loop(
ipc_path: str,
stop_event: threading.Event,
handle_request: Callable[[Any, str], bool],
) -> threading.Thread:
"""Poll the request property on a separate IPC connection.
Windows named-pipe event delivery can stall even while direct get/set
property commands still work. Polling the request property provides a more
reliable fallback transport for helper ops and pipeline dispatch.
"""
def _poll_loop() -> None:
poll_client = MPVIPCClient(socket_path=ipc_path, timeout=0.75, silent=True)
while not stop_event.is_set():
try:
if poll_client.sock is None and not poll_client.connect():
stop_event.wait(0.10)
continue
resp = poll_client.send_command(["get_property", REQUEST_PROP])
if not resp:
try:
poll_client.disconnect()
except Exception:
pass
stop_event.wait(0.10)
continue
if resp.get("error") == "success":
handle_request(resp.get("data"), "poll")
stop_event.wait(0.05)
except Exception:
try:
poll_client.disconnect()
except Exception:
pass
stop_event.wait(0.15)
try:
poll_client.disconnect()
except Exception:
pass
thread = threading.Thread(
target=_poll_loop,
name="mpv-helper-request-poll",
daemon=True,
)
thread.start()
return thread
def main(argv: Optional[list[str]] = None) -> int:
parser = argparse.ArgumentParser(prog="mpv-pipeline-helper")
parser.add_argument("--ipc", required=True, help="mpv --input-ipc-server path")
@@ -917,6 +1232,10 @@ def main(argv: Optional[list[str]] = None) -> int:
except Exception:
error_log_dir = Path(tempfile.gettempdir())
last_error_log = error_log_dir / "medeia-mpv-pipeline-last-error.log"
seen_request_ids: Dict[str, float] = {}
seen_request_ids_lock = threading.Lock()
seen_request_ttl_seconds = 180.0
request_processing_lock = threading.Lock()
def _write_error_log(text: str, *, req_id: str) -> Optional[str]:
try:
@@ -941,6 +1260,157 @@ def main(argv: Optional[list[str]] = None) -> int:
return str(stamped) if stamped else str(last_error_log)
def _mark_request_seen(req_id: str) -> bool:
if not req_id:
return False
now = time.time()
cutoff = now - seen_request_ttl_seconds
with seen_request_ids_lock:
expired = [key for key, ts in seen_request_ids.items() if ts < cutoff]
for key in expired:
seen_request_ids.pop(key, None)
if req_id in seen_request_ids:
return False
seen_request_ids[req_id] = now
return True
def _publish_response(resp: Dict[str, Any]) -> None:
response_client = MPVIPCClient(socket_path=str(args.ipc), timeout=0.75, silent=True)
try:
response_client.send_command_no_wait(
[
"set_property_string",
RESPONSE_PROP,
json.dumps(resp, ensure_ascii=False),
]
)
finally:
try:
response_client.disconnect()
except Exception:
pass
def _process_request(raw: Any, source: str) -> bool:
req = _parse_request(raw)
if not req:
try:
if isinstance(raw, str) and raw.strip():
snippet = raw.strip().replace("\r", "").replace("\n", " ")
if len(snippet) > 220:
snippet = snippet[:220] + ""
_append_helper_log(
f"[request-raw {source}] could not parse request json: {snippet}"
)
except Exception:
pass
return False
req_id = str(req.get("id") or "")
op = str(req.get("op") or "").strip()
data = req.get("data")
pipeline_text = str(req.get("pipeline") or "").strip()
seeds = req.get("seeds")
json_output = bool(req.get("json") or req.get("output_json"))
if not req_id:
return False
if not _mark_request_seen(req_id):
return False
try:
label = pipeline_text if pipeline_text else (op and ("op=" + op) or "(empty)")
_append_helper_log(f"\n[request {req_id} via {source}] {label}")
except Exception:
pass
with request_processing_lock:
async_dispatch = False
try:
if op:
run = _run_op(op, data)
else:
if not pipeline_text:
return False
if _is_load_url_pipeline(pipeline_text):
async_dispatch = True
run = {
"success": True,
"stdout": "",
"stderr": "",
"error": "",
"table": None,
}
_run_pipeline_background(
pipeline_text,
seeds=seeds,
req_id=req_id,
)
else:
run = _run_pipeline(
pipeline_text,
seeds=seeds,
json_output=json_output,
)
resp = {
"id": req_id,
"success": bool(run.get("success")),
"stdout": run.get("stdout", ""),
"stderr": run.get("stderr", ""),
"error": run.get("error"),
"table": run.get("table"),
"data": run.get("data"),
}
if "choices" in run:
resp["choices"] = run.get("choices")
if "job_id" in run:
resp["job_id"] = run.get("job_id")
if "job" in run:
resp["job"] = run.get("job")
if "status" in run:
resp["status"] = run.get("status")
if "pid" in run:
resp["pid"] = run.get("pid")
if async_dispatch:
resp["info"] = "queued asynchronously"
except Exception as exc:
resp = {
"id": req_id,
"success": False,
"stdout": "",
"stderr": "",
"error": f"{type(exc).__name__}: {exc}",
"table": None,
}
try:
if resp.get("stdout"):
_append_helper_log("[stdout]\n" + str(resp.get("stdout")))
if resp.get("stderr"):
_append_helper_log("[stderr]\n" + str(resp.get("stderr")))
if resp.get("error"):
_append_helper_log("[error]\n" + str(resp.get("error")))
except Exception:
pass
if not resp.get("success"):
details = ""
if resp.get("error"):
details += str(resp.get("error"))
if resp.get("stderr"):
details = (details + "\n" if details else "") + str(resp.get("stderr"))
log_path = _write_error_log(details, req_id=req_id)
if log_path:
resp["log_path"] = log_path
try:
_publish_response(resp)
except Exception:
pass
return True
# Connect to mpv's JSON IPC. On Windows, the pipe can exist but reject opens
# briefly during startup; also mpv may create the IPC server slightly after
# the Lua script launches us. Retry until timeout.
@@ -1047,6 +1517,7 @@ def main(argv: Optional[list[str]] = None) -> int:
heartbeat_stop = threading.Event()
_start_ready_heartbeat(str(args.ipc), heartbeat_stop)
_start_request_poll_loop(str(args.ipc), heartbeat_stop, _process_request)
# Pre-compute store choices at startup and publish to a cached property so Lua
# can read immediately without waiting for a request/response cycle (which may timeout).
@@ -1133,8 +1604,6 @@ def main(argv: Optional[list[str]] = None) -> int:
f"[helper] failed to publish ytdlp domains: {type(exc).__name__}: {exc}"
)
last_seen_id: Optional[str] = None
try:
_append_helper_log(f"[helper] connected to ipc={args.ipc}")
except Exception:
@@ -1200,131 +1669,7 @@ def main(argv: Optional[list[str]] = None) -> int:
if msg.get("id") != OBS_ID_REQUEST:
continue
raw = msg.get("data")
req = _parse_request(raw)
if not req:
try:
if isinstance(raw, str) and raw.strip():
snippet = raw.strip().replace("\r", "").replace("\n", " ")
if len(snippet) > 220:
snippet = snippet[:220] + ""
_append_helper_log(
f"[request-raw] could not parse request json: {snippet}"
)
except Exception:
pass
continue
req_id = str(req.get("id") or "")
op = str(req.get("op") or "").strip()
data = req.get("data")
pipeline_text = str(req.get("pipeline") or "").strip()
seeds = req.get("seeds")
json_output = bool(req.get("json") or req.get("output_json"))
if not req_id:
continue
if last_seen_id == req_id:
continue
last_seen_id = req_id
try:
label = pipeline_text if pipeline_text else (
op and ("op=" + op) or "(empty)"
)
_append_helper_log(f"\n[request {req_id}] {label}")
except Exception:
pass
async_dispatch = False
try:
if op:
run = _run_op(op, data)
else:
if not pipeline_text:
continue
if _is_load_url_pipeline(pipeline_text):
async_dispatch = True
run = {
"success": True,
"stdout": "",
"stderr": "",
"error": "",
"table": None,
}
_run_pipeline_background(
pipeline_text,
seeds=seeds,
req_id=req_id,
)
else:
run = _run_pipeline(
pipeline_text,
seeds=seeds,
json_output=json_output,
)
resp = {
"id": req_id,
"success": bool(run.get("success")),
"stdout": run.get("stdout",
""),
"stderr": run.get("stderr",
""),
"error": run.get("error"),
"table": run.get("table"),
"data": run.get("data"),
}
if "choices" in run:
resp["choices"] = run.get("choices")
if async_dispatch:
resp["info"] = "queued asynchronously"
except Exception as exc:
resp = {
"id": req_id,
"success": False,
"stdout": "",
"stderr": "",
"error": f"{type(exc).__name__}: {exc}",
"table": None,
}
# Persist helper output for debugging MPV menu interactions.
try:
if resp.get("stdout"):
_append_helper_log("[stdout]\n" + str(resp.get("stdout")))
if resp.get("stderr"):
_append_helper_log("[stderr]\n" + str(resp.get("stderr")))
if resp.get("error"):
_append_helper_log("[error]\n" + str(resp.get("error")))
except Exception:
pass
if not resp.get("success"):
details = ""
if resp.get("error"):
details += str(resp.get("error"))
if resp.get("stderr"):
details = (details + "\n" if details else "") + str(resp.get("stderr"))
log_path = _write_error_log(details, req_id=req_id)
if log_path:
resp["log_path"] = log_path
try:
# IMPORTANT: don't wait for a response here; waiting would consume
# async events and can drop/skip property-change notifications.
client.send_command_no_wait(
[
"set_property_string",
RESPONSE_PROP,
json.dumps(resp,
ensure_ascii=False)
]
)
except Exception:
# If posting results fails, there's nothing more useful to do.
pass
_process_request(msg.get("data"), "observe")
if __name__ == "__main__":