"""Background task handling and IPC helpers for mpv integration.""" from __future__ import annotations import errno import json import os import socket import subprocess import sys from helper.logger import log import threading import time from typing import IO, Iterable def connect_ipc(path: str, timeout: float = 5.0) -> IO[bytes] | None: """Connect to the mpv IPC server located at *path*.""" deadline = time.time() + timeout if not path: return None if os.name == 'nt': # mpv exposes a named pipe on Windows. Keep retrying until it is ready. while True: try: return open(path, 'r+b', buffering=0) except FileNotFoundError: if time.time() > deadline: return None time.sleep(0.05) except OSError as exc: # Pipe busy if exc.errno not in (errno.ENOENT, errno.EPIPE, errno.EBUSY): raise if time.time() > deadline: return None time.sleep(0.05) else: sock = socket.socket(socket.AF_UNIX) while True: try: sock.connect(path) return sock.makefile('r+b', buffering=0) except FileNotFoundError: if time.time() > deadline: return None time.sleep(0.05) except OSError as exc: if exc.errno not in (errno.ENOENT, errno.ECONNREFUSED): raise if time.time() > deadline: return None time.sleep(0.05) def ipc_sender(ipc: IO[bytes] | None): """Create a helper function for sending script messages via IPC.""" if ipc is None: def _noop(_event: str, _payload: dict) -> None: return None return _noop lock = threading.Lock() def _send(event: str, payload: dict) -> None: message = json.dumps({'command': ['script-message', event, json.dumps(payload)]}, ensure_ascii=False) encoded = message.encode('utf-8') + b'\n' with lock: try: ipc.write(encoded) ipc.flush() except OSError: pass return _send def iter_stream(stream: Iterable[str]) -> Iterable[str]: for raw in stream: yield raw.rstrip('\r\n') def _run_task(args, parser) -> int: if not args.command: parser.error('run-task requires a command to execute (use "--" before the command).') env = os.environ.copy() for entry in args.env: key, sep, value = entry.partition('=') if not sep: parser.error(f'Invalid environment variable definition: {entry!r}') env[key] = value command = list(args.command) if command and command[0] == '--': command.pop(0) notifier = ipc_sender(connect_ipc(args.ipc, timeout=args.ipc_timeout)) if not command: notifier('downlow-task-event', { 'id': args.task_id, 'event': 'error', 'message': 'No command provided after separator', }) log('[downlow.py] No command provided for run-task', file=sys.stderr) return 1 if command and isinstance(command[0], str) and sys.executable: first = command[0].lower() if first in {'python', 'python3', 'py', 'python.exe', 'python3.exe', 'py.exe'}: command[0] = sys.executable if os.environ.get('DOWNLOW_DEBUG'): log(f"Launching command: {command}", file=sys.stderr) notifier('downlow-task-event', { 'id': args.task_id, 'event': 'start', 'command': command, 'cwd': args.cwd or os.getcwd(), }) try: process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=args.cwd or None, env=env, text=True, bufsize=1, universal_newlines=True, ) except FileNotFoundError as exc: notifier('downlow-task-event', { 'id': args.task_id, 'event': 'error', 'message': f'Executable not found: {exc.filename}', }) log(f"{exc}", file=sys.stderr) return 1 stdout_lines: list[str] = [] stderr_lines: list[str] = [] def pump(stream: IO[str], label: str, sink: list[str]) -> None: for line in iter_stream(stream): sink.append(line) notifier('downlow-task-event', { 'id': args.task_id, 'event': label, 'line': line, }) threads = [] if process.stdout: t_out = threading.Thread(target=pump, args=(process.stdout, 'stdout', stdout_lines), daemon=True) t_out.start() threads.append(t_out) if process.stderr: t_err = threading.Thread(target=pump, args=(process.stderr, 'stderr', stderr_lines), daemon=True) t_err.start() threads.append(t_err) return_code = process.wait() for t in threads: t.join(timeout=0.1) notifier('downlow-task-event', { 'id': args.task_id, 'event': 'exit', 'returncode': return_code, 'success': return_code == 0, }) # Also mirror aggregated output to stdout/stderr for compatibility when IPC is unavailable. if stdout_lines: log('\n'.join(stdout_lines)) if stderr_lines: log('\n'.join(stderr_lines), file=sys.stderr) return return_code