From d9a6b1bfb41242aaa81ecd718af247c3e4346300 Mon Sep 17 00:00:00 2001 From: Nose Date: Sat, 21 Mar 2026 15:12:52 -0700 Subject: [PATCH] hkh --- CLI.py | 136 ++++++++++- MPV/LUA/main.lua | 220 +++++++++++++++++- MPV/mpv_ipc.py | 88 ++++++- .../scripts/uosc/scripts/uosc/main.lua | 10 + SYS/models.py | 63 +++++ SYS/pipeline.py | 16 ++ 6 files changed, 512 insertions(+), 21 deletions(-) diff --git a/CLI.py b/CLI.py index ba1a481..031ad4b 100644 --- a/CLI.py +++ b/CLI.py @@ -115,7 +115,13 @@ from SYS.cli_parsing import SelectionSyntax, SelectionFilterSyntax, MedeiaLexer -def _send_mpv_ipc_command(command: List[Any], *, ipc_path: Optional[str] = None, timeout: float = 0.75) -> bool: +def _send_mpv_ipc_command( + command: List[Any], + *, + ipc_path: Optional[str] = None, + timeout: float = 0.75, + wait_for_response: bool = True, +) -> bool: if not isinstance(command, list) or not command: return False @@ -130,13 +136,16 @@ def _send_mpv_ipc_command(command: List[Any], *, ipc_path: Optional[str] = None, try: response = client.send_command({ "command": command, - }) + }, wait=bool(wait_for_response)) finally: try: client.disconnect() except Exception: pass + if not wait_for_response: + return bool(response and (response.get("async") or response.get("request_id") is not None)) + return bool(response and response.get("error") == "success") except Exception as exc: debug(f"mpv ipc command failed: {exc}") @@ -155,10 +164,11 @@ def _notify_mpv_osd(text: str, *, duration_ms: int = 3500, ipc_path: Optional[st max(0, int(duration_ms)), ], ipc_path=ipc_path, + wait_for_response=False, ) -def _notify_mpv_callback(metadata: Dict[str, Any], execution_result: Dict[str, Any]) -> bool: +def _send_mpv_callback_event(metadata: Dict[str, Any], payload: Dict[str, Any]) -> bool: callback = metadata.get("mpv_callback") if isinstance(metadata, dict) else None if not isinstance(callback, dict): return False @@ -169,26 +179,102 @@ def _notify_mpv_callback(metadata: Dict[str, Any], execution_result: Dict[str, A if not script_name or not message_name: return False - payload = { - "phase": "completed", - "success": bool(execution_result.get("success")), - "status": str(execution_result.get("status") or "completed"), - "error": str(execution_result.get("error") or "").strip(), - "command_text": str(execution_result.get("command_text") or "").strip(), + event_payload = { "kind": str(metadata.get("kind") or "").strip(), } + if isinstance(payload, dict): + event_payload.update(payload) return _send_mpv_ipc_command( [ "script-message-to", script_name, message_name, - json.dumps(payload, ensure_ascii=False), + json.dumps(event_payload, ensure_ascii=False), ], ipc_path=ipc_path, + wait_for_response=False, ) +def _notify_mpv_callback(metadata: Dict[str, Any], execution_result: Dict[str, Any]) -> bool: + return _send_mpv_callback_event( + metadata, + { + "phase": "completed", + "success": bool(execution_result.get("success")), + "status": str(execution_result.get("status") or "completed"), + "error": str(execution_result.get("error") or "").strip(), + "command_text": str(execution_result.get("command_text") or "").strip(), + }, + ) + + +def _build_mpv_progress_callback(metadata: Dict[str, Any]) -> Optional[Any]: + callback = metadata.get("mpv_callback") if isinstance(metadata, dict) else None + if not isinstance(callback, dict): + return None + + last_sent_at: Dict[str, float] = {} + last_percent: Dict[str, int] = {} + last_text: Dict[str, str] = {} + + def emit(payload: Dict[str, Any]) -> bool: + if not isinstance(payload, dict): + return False + + event_name = str(payload.get("event") or "").strip().lower() + now = time.monotonic() + throttle_key = event_name or "progress" + + if event_name == "pipe-percent": + pipe_index = int(payload.get("pipe_index") or 0) + percent = max(0, min(100, int(payload.get("percent") or 0))) + throttle_key = f"pipe-percent:{pipe_index}" + prev = last_percent.get(throttle_key) + if prev == percent: + return False + if prev is not None and percent < 100 and (percent - prev) < 5 and (now - last_sent_at.get(throttle_key, 0.0)) < 0.35: + return False + last_percent[throttle_key] = percent + payload = dict(payload) + payload["percent"] = percent + elif event_name == "transfer": + label = str(payload.get("label") or "transfer").strip() or "transfer" + throttle_key = f"transfer:{label}" + completed = payload.get("completed") + total = payload.get("total") + percent = None + try: + if total is not None and int(total) > 0 and completed is not None: + percent = max(0, min(100, int(round((int(completed) / max(1, int(total))) * 100.0)))) + except Exception: + percent = None + if percent is not None: + prev = last_percent.get(throttle_key) + if prev == percent: + return False + if prev is not None and percent < 100 and (percent - prev) < 3 and (now - last_sent_at.get(throttle_key, 0.0)) < 0.35: + return False + last_percent[throttle_key] = percent + payload = dict(payload) + payload["percent"] = percent + elif event_name == "status": + pipe_index = int(payload.get("pipe_index") or 0) + throttle_key = f"status:{pipe_index}" + text = str(payload.get("text") or "").strip() + if last_text.get(throttle_key) == text and (now - last_sent_at.get(throttle_key, 0.0)) < 0.5: + return False + last_text[throttle_key] = text + + last_sent_at[throttle_key] = now + event_payload = dict(payload) + event_payload.setdefault("phase", "progress") + return _send_mpv_callback_event(metadata, event_payload) + + return emit + + def _notify_mpv_completion(metadata: Dict[str, Any], execution_result: Dict[str, Any]) -> bool: callback_sent = _notify_mpv_callback(metadata, execution_result) @@ -1024,6 +1110,15 @@ class CmdletExecutor: ctx.set_live_progress(progress_ui) except Exception: pass + try: + progress_cb = ( + ctx.get_progress_event_callback() + if hasattr(ctx, "get_progress_event_callback") else None + ) + if callable(progress_cb) and hasattr(progress_ui, "set_event_callback"): + progress_ui.set_event_callback(progress_cb) + except Exception: + pass pipe_idx = 0 @@ -2418,14 +2513,30 @@ Come to love it when others take what you share, as there is no greater joy if isinstance(queued_payload, dict) and isinstance(queued_payload.get("metadata"), dict) else None ) + progress_event_callback = _build_mpv_progress_callback(queued_metadata) if queued_metadata else None try: from SYS import pipeline as ctx ctx.set_current_command_text(user_input) + if hasattr(ctx, "set_progress_event_callback"): + ctx.set_progress_event_callback(progress_event_callback) pipeline_ctx_ref = ctx except Exception: pipeline_ctx_ref = None + if queued_metadata: + try: + _send_mpv_callback_event( + queued_metadata, + { + "phase": "started", + "event": "command-started", + "command_text": user_input, + }, + ) + except Exception: + pass + execution_result: Dict[str, Any] = { "status": "completed", "success": True, @@ -2627,6 +2738,11 @@ Come to love it when others take what you share, as there is no greater joy pass if pipeline_ctx_ref: pipeline_ctx_ref.clear_current_command_text() + if hasattr(pipeline_ctx_ref, "set_progress_event_callback"): + try: + pipeline_ctx_ref.set_progress_event_callback(None) + except Exception: + pass repl_queue_stop.set() diff --git a/MPV/LUA/main.lua b/MPV/LUA/main.lua index f6c238b..f092cb5 100644 --- a/MPV/LUA/main.lua +++ b/MPV/LUA/main.lua @@ -4,7 +4,7 @@ local msg = require 'mp.msg' local M = {} -local MEDEIA_LUA_VERSION = '2026-03-20.1' +local MEDEIA_LUA_VERSION = '2026-03-21.1' -- Expose a tiny breadcrumb for debugging which script version is loaded. pcall(mp.set_property, 'user-data/medeia-lua-version', MEDEIA_LUA_VERSION) @@ -56,6 +56,140 @@ local PIPELINE_REQ_PROP = 'user-data/medeia-pipeline-request' local PIPELINE_RESP_PROP = 'user-data/medeia-pipeline-response' local PIPELINE_READY_PROP = 'user-data/medeia-pipeline-ready' local CURRENT_WEB_URL_PROP = 'user-data/medeia-current-web-url' +local _pipeline_progress_ui = { + overlay = nil, + hide_token = 0, + title = '', + summary = '', + detail = '', +} + +function _pipeline_progress_ui.trim(text) + return tostring(text or ''):gsub('^%s+', ''):gsub('%s+$', '') +end + +function _pipeline_progress_ui.ass_escape(text) + text = tostring(text or '') + text = text:gsub('\\', '\\\\') + text = text:gsub('{', '\\{') + text = text:gsub('}', '\\}') + text = text:gsub('\n', '\\N') + return text +end + +function _pipeline_progress_ui.truncate(text, max_len) + text = _pipeline_progress_ui.trim(text) + max_len = tonumber(max_len or 0) or 0 + if max_len <= 0 or #text <= max_len then + return text + end + if max_len <= 3 then + return text:sub(1, max_len) + end + return text:sub(1, max_len - 3) .. '...' +end + +function _pipeline_progress_ui.kind_title(kind) + kind = _pipeline_progress_ui.trim(kind):lower() + if kind == 'mpv-download' then + return 'Download' + end + if kind == 'mpv-screenshot' then + return 'Screenshot' + end + return 'Pipeline' +end + +function _pipeline_progress_ui.ensure_overlay() + if _pipeline_progress_ui.overlay then + return _pipeline_progress_ui.overlay + end + local ok, overlay = pcall(mp.create_osd_overlay, 'ass-events') + if ok and overlay then + _pipeline_progress_ui.overlay = overlay + end + return _pipeline_progress_ui.overlay +end + +function _pipeline_progress_ui.cancel_hide() + _pipeline_progress_ui.hide_token = (_pipeline_progress_ui.hide_token or 0) + 1 +end + +function _pipeline_progress_ui.render() + local overlay = _pipeline_progress_ui.ensure_overlay() + if not overlay then + return + end + + local width, height = 1280, 720 + local ok, w, h = pcall(mp.get_osd_size) + if ok and tonumber(w or 0) and tonumber(h or 0) and w > 0 and h > 0 then + width = math.floor(w) + height = math.floor(h) + end + + overlay.res_x = width + overlay.res_y = height + + if _pipeline_progress_ui.summary == '' and _pipeline_progress_ui.detail == '' then + overlay.data = '' + overlay:update() + return + end + + local title = _pipeline_progress_ui.truncate(_pipeline_progress_ui.title ~= '' and _pipeline_progress_ui.title or 'Pipeline', 42) + local summary = _pipeline_progress_ui.truncate(_pipeline_progress_ui.summary, 72) + local detail = _pipeline_progress_ui.truncate(_pipeline_progress_ui.detail, 88) + local lines = { + '{\\b1}' .. _pipeline_progress_ui.ass_escape(title) .. '{\\b0}', + } + if summary ~= '' then + lines[#lines + 1] = _pipeline_progress_ui.ass_escape(summary) + end + if detail ~= '' then + lines[#lines + 1] = '{\\fs18\\c&HDDDDDD&}' .. _pipeline_progress_ui.ass_escape(detail) .. '{\\r}' + end + + overlay.data = string.format( + '{\\an9\\pos(%d,%d)\\fs22\\bord2\\shad1\\1c&HFFFFFF&\\3c&H111111&\\4c&H000000&}%s', + width - 28, + 34, + table.concat(lines, '\\N') + ) + overlay:update() +end + +function _pipeline_progress_ui.hide() + _pipeline_progress_ui.cancel_hide() + _pipeline_progress_ui.title = '' + _pipeline_progress_ui.summary = '' + _pipeline_progress_ui.detail = '' + _pipeline_progress_ui.render() +end + +function _pipeline_progress_ui.schedule_hide(delay_seconds) + _pipeline_progress_ui.cancel_hide() + local delay = tonumber(delay_seconds or 0) or 0 + if delay <= 0 then + _pipeline_progress_ui.hide() + return + end + local token = _pipeline_progress_ui.hide_token + mp.add_timeout(delay, function() + if token ~= _pipeline_progress_ui.hide_token then + return + end + _pipeline_progress_ui.hide() + end) +end + +function _pipeline_progress_ui.update(title, summary, detail) + _pipeline_progress_ui.cancel_hide() + _pipeline_progress_ui.title = _pipeline_progress_ui.trim(title) + _pipeline_progress_ui.summary = _pipeline_progress_ui.trim(summary) + _pipeline_progress_ui.detail = _pipeline_progress_ui.trim(detail) + _pipeline_progress_ui.render() +end local function _get_lua_source_path() local info = nil @@ -1705,8 +1839,83 @@ mp.register_script_message('medeia-pipeline-event', function(json) pcall(mp.set_property, 'user-data/medeia-last-pipeline-event', encoded) end + local phase = _pipeline_progress_ui.trim(payload.phase) + local event_name = _pipeline_progress_ui.trim(payload.event) + local kind = _pipeline_progress_ui.trim(payload.kind) + local title = _pipeline_progress_ui.kind_title(kind) + + local summary = '' + local detail = '' + if phase == 'started' then + local command_text = _pipeline_progress_ui.trim(payload.command_text) + summary = command_text ~= '' and ('Started: ' .. command_text) or ('Started: ' .. kind) + detail = 'Queued job started' + elseif phase == 'progress' then + if event_name == 'pipe-percent' then + local label = _pipeline_progress_ui.trim(payload.pipe_label ~= '' and payload.pipe_label or kind ~= '' and kind or 'pipeline') + local percent = tonumber(payload.percent or 0) or 0 + summary = ('%s %d%%'):format(label, math.floor(percent + 0.5)) + detail = 'Processing' + elseif event_name == 'status' then + summary = _pipeline_progress_ui.trim(payload.text) + detail = _pipeline_progress_ui.trim(payload.pipe_label ~= '' and payload.pipe_label or kind) + elseif event_name == 'transfer' then + local label = _pipeline_progress_ui.trim(payload.label ~= '' and payload.label or 'transfer') + local percent = tonumber(payload.percent or 0) + if percent then + summary = ('%s %d%%'):format(label, math.floor(percent + 0.5)) + else + summary = label + end + local completed = tonumber(payload.completed or 0) + local total = tonumber(payload.total or 0) + if completed and total and total > 0 then + detail = ('%d / %d'):format(math.floor(completed + 0.5), math.floor(total + 0.5)) + end + elseif event_name == 'pipe-begin' then + local label = _pipeline_progress_ui.trim(payload.pipe_label ~= '' and payload.pipe_label or kind ~= '' and kind or 'pipeline') + summary = 'Running: ' .. label + local total_items = tonumber(payload.total_items or 0) + if total_items and total_items > 0 then + detail = ('Items: %d'):format(math.floor(total_items + 0.5)) + end + elseif event_name == 'pipe-emit' then + local label = _pipeline_progress_ui.trim(payload.pipe_label ~= '' and payload.pipe_label or kind ~= '' and kind or 'pipeline') + local completed = tonumber(payload.completed or 0) or 0 + local total = tonumber(payload.total or 0) or 0 + summary = total > 0 and ('%s %d/%d'):format(label, completed, total) or label + detail = _pipeline_progress_ui.trim(payload.item_label) + end + end + + if phase == 'completed' then + pcall(mp.set_property, 'user-data/medeia-pipeline-progress', '') + pcall(mp.set_property, 'user-data/medeia-pipeline-progress-summary', '') + if payload.success == false then + summary = title .. ' failed' + detail = _pipeline_progress_ui.trim(payload.error) + if detail == '' then + detail = 'Unknown error' + end + else + summary = title .. ' complete' + detail = _pipeline_progress_ui.trim(payload.command_text) + end + _pipeline_progress_ui.update(title, summary, detail) + _pipeline_progress_ui.schedule_hide(2.5) + else + if type(encoded) == 'string' and encoded ~= '' then + pcall(mp.set_property, 'user-data/medeia-pipeline-progress', encoded) + end + if summary ~= '' then + pcall(mp.set_property, 'user-data/medeia-pipeline-progress-summary', summary) + end + _pipeline_progress_ui.update(title, summary, detail) + end + _lua_log( 'pipeline-event: phase=' .. tostring(payload.phase or '') + .. ' event=' .. tostring(payload.event or '') .. ' success=' .. tostring(payload.success) .. ' kind=' .. tostring(payload.kind or '') .. ' error=' .. tostring(payload.error or '') @@ -2288,7 +2497,6 @@ local function _activate_image_controls() _bind_image_key('d', 'image-pan-d', function() _change_pan(-ImageControl.pan_step, 0) end, {repeatable=true}) _bind_image_key('=', 'image-zoom-in', function() _change_zoom(ImageControl.zoom_step) end, {repeatable=true}) - _disable_image_section() _bind_image_key('-', 'image-zoom-out', function() _change_zoom(-ImageControl.zoom_step) end, {repeatable=true}) _bind_image_key('+', 'image-zoom-in-fine', function() _change_zoom(ImageControl.zoom_step_slow) end, {repeatable=true}) _bind_image_key('_', 'image-zoom-out-fine', function() _change_zoom(-ImageControl.zoom_step_slow) end, {repeatable=true}) @@ -2300,10 +2508,12 @@ end local function _deactivate_image_controls() if not ImageControl.enabled then + _disable_image_section() return end ImageControl.enabled = false _set_image_property(false) + _disable_image_section() _restore_q_default() _unbind_image_keys() mp.osd_message('Image viewer controls disabled', 1.0) @@ -4813,6 +5023,12 @@ mp.register_script_message('medios-load-url-event', function(json) if ensure_uosc_loaded() then _lua_log('[LOAD-URL] Sending close-menu command to UOSC') mp.commandv('script-message-to', 'uosc', 'close-menu', LOAD_URL_MENU_TYPE) + mp.add_timeout(0.05, function() + if ensure_uosc_loaded() then + _lua_log('[LOAD-URL] Requesting UOSC cursor sync after menu close') + pcall(mp.commandv, 'script-message-to', 'uosc', 'sync-cursor') + end + end) else _lua_log('[LOAD-URL] UOSC not loaded, cannot close menu') end diff --git a/MPV/mpv_ipc.py b/MPV/mpv_ipc.py index a3f299c..80cd478 100644 --- a/MPV/mpv_ipc.py +++ b/MPV/mpv_ipc.py @@ -48,6 +48,42 @@ def _windows_pipe_available(path: str) -> bool: return False +def _windows_pipe_bytes_available(pipe: BinaryIO) -> Optional[int]: + """Return the number of bytes ready to read from a Windows named pipe.""" + if platform.system() != "Windows": + return None + try: + import msvcrt + + handle = msvcrt.get_osfhandle(pipe.fileno()) + kernel32 = ctypes.windll.kernel32 + PeekNamedPipe = kernel32.PeekNamedPipe + PeekNamedPipe.argtypes = [ + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_uint32, + ctypes.c_void_p, + ctypes.POINTER(ctypes.c_uint32), + ctypes.c_void_p, + ] + PeekNamedPipe.restype = ctypes.c_bool + + total_available = ctypes.c_uint32(0) + ok = PeekNamedPipe( + ctypes.c_void_p(handle), + None, + 0, + None, + ctypes.byref(total_available), + None, + ) + if not ok: + return None + return int(total_available.value) + except Exception: + return None + + def _windows_pythonw_exe(python_exe: Optional[str]) -> Optional[str]: """Return a pythonw.exe adjacent to python.exe if available (Windows only).""" if platform.system() != "Windows": @@ -921,15 +957,46 @@ class MPVIPCClient: deadline = _time.time() + max(0.0, effective_timeout) if self.is_windows: - try: - pipe = cast(BinaryIO, self.sock) - return pipe.readline() - except (OSError, IOError, BrokenPipeError, ConnectionResetError) as exc: - # Pipe error; try to reconnect once - if not self.silent: - debug(f"Pipe readline failed: {exc}") - self.disconnect() - return None + pipe = cast(BinaryIO, self.sock) + while True: + nl = self._recv_buffer.find(b"\n") + if nl != -1: + line = self._recv_buffer[:nl + 1] + self._recv_buffer = self._recv_buffer[nl + 1:] + return line + + remaining = deadline - _time.time() + if remaining <= 0: + return None + + try: + available = _windows_pipe_bytes_available(pipe) + except Exception as exc: + if not self.silent: + debug(f"Pipe availability probe failed: {exc}") + self.disconnect() + return None + + if available is None: + self.disconnect() + return None + + if available <= 0: + _time.sleep(min(0.01, max(0.001, remaining))) + continue + + try: + chunk = pipe.read(min(available, 4096)) + except (OSError, IOError, BrokenPipeError, ConnectionResetError) as exc: + if not self.silent: + debug(f"Pipe readline failed: {exc}") + self.disconnect() + return None + + if not chunk: + return b"" + + self._recv_buffer += chunk # Unix: buffer until newline. sock_obj = cast(socket.socket, self.sock) @@ -1034,6 +1101,7 @@ class MPVIPCClient: try: # Try to open the named pipe self.sock = open(self.socket_path, "r+b", buffering=0) + self._recv_buffer = b"" return True except (OSError, IOError) as exc: if not self.silent: @@ -1055,6 +1123,7 @@ class MPVIPCClient: self.sock = socket.socket(af_unix, socket.SOCK_STREAM) self.sock.settimeout(self.timeout) self.sock.connect(self.socket_path) + self._recv_buffer = b"" return True except Exception as exc: if not self.silent: @@ -1167,6 +1236,7 @@ class MPVIPCClient: except Exception: pass self.sock = None + self._recv_buffer = b"" def __del__(self) -> None: """Cleanup on object destruction.""" diff --git a/MPV/portable_config/scripts/uosc/scripts/uosc/main.lua b/MPV/portable_config/scripts/uosc/scripts/uosc/main.lua index 5fa6376..4a9bac8 100644 --- a/MPV/portable_config/scripts/uosc/scripts/uosc/main.lua +++ b/MPV/portable_config/scripts/uosc/scripts/uosc/main.lua @@ -1112,6 +1112,16 @@ end) mp.register_script_message('close-menu', function(type) if Menu:is_open(type) then Menu:close() end end) +mp.register_script_message('sync-cursor', function() + local mouse = mp.get_property_native('mouse-pos') + if type(mouse) == 'table' and mouse.hover and mouse.x and mouse.y then + cursor:move(mouse.x, mouse.y) + else + cursor:leave() + end + cursor:queue_autohide() + request_render() +end) mp.register_script_message('menu-action', function(name, ...) local menu = Menu:is_open() if menu then diff --git a/SYS/models.py b/SYS/models.py index c39b465..e87b31a 100644 --- a/SYS/models.py +++ b/SYS/models.py @@ -769,6 +769,7 @@ class PipelineLiveProgress: self._enabled = bool(enabled) self._pipe_labels = [str(x) for x in (pipe_labels or [])] self._lock = RLock() + self._event_callback: Any = None self._console: Optional[Console] = None self._live: Optional[Live] = None @@ -809,6 +810,24 @@ class PipelineLiveProgress: # Title line state (active per-item context) self._active_subtask_text: Optional[str] = None + def set_event_callback(self, callback: Any) -> None: + with self._lock: + self._event_callback = callback + + def _emit_event(self, event: str, **payload: Any) -> None: + callback = self._event_callback + if not callable(callback): + return + data: Dict[str, Any] = { + "phase": "progress", + "event": str(event or "").strip(), + } + data.update(payload) + try: + callback(data) + except Exception: + logger.exception("Failed to emit PipelineLiveProgress event '%s'", event) + def _title_text(self) -> str: """Compute the Pipeline panel title. @@ -1093,6 +1112,12 @@ class PipelineLiveProgress: prog.update(task_id, description=msg, refresh=True) except Exception: logger.exception("Failed to update status task %s in set_pipe_status_text", task_id) + self._emit_event( + "status", + pipe_index=pidx, + pipe_label=self._pipe_labels[pidx] if 0 <= pidx < len(self._pipe_labels) else str(pidx), + text=msg, + ) def clear_pipe_status_text(self, pipe_index: int) -> None: if not self._enabled: @@ -1134,6 +1159,12 @@ class PipelineLiveProgress: pipe_task = self._pipe_tasks[pidx] pipe_progress.update(pipe_task, completed=pct, total=100, refresh=True) self._update_overall() + self._emit_event( + "pipe-percent", + pipe_index=pidx, + pipe_label=self._pipe_labels[pidx] if 0 <= pidx < len(self._pipe_labels) else str(pidx), + percent=pct, + ) except Exception: logger.exception("Failed to set pipe percent for pipe %s in set_pipe_percent", pipe_index) @@ -1262,6 +1293,7 @@ class PipelineLiveProgress: try: task_id = self._transfers.add_task(key, total=task_total) self._transfer_tasks[key] = task_id + self._emit_event("transfer-begin", label=key, total=task_total) except Exception: logger.exception("Failed to add transfer task %s in begin_transfer", key) @@ -1290,6 +1322,12 @@ class PipelineLiveProgress: if total is not None and total > 0: kwargs["total"] = int(total) self._transfers.update(task_id, refresh=True, **kwargs) + self._emit_event( + "transfer", + label=key, + completed=(int(completed) if completed is not None else None), + total=(int(total) if total is not None and total > 0 else None), + ) except Exception: logger.exception("Failed to update transfer '%s'", key) @@ -1302,6 +1340,7 @@ class PipelineLiveProgress: return try: self._transfers.remove_task(task_id) + self._emit_event("transfer-finish", label=key) except Exception: logger.exception("Failed to remove transfer task '%s' in finish_transfer", key) @@ -1363,6 +1402,13 @@ class PipelineLiveProgress: logger.exception("Failed to start pipe task timer in begin_pipe for %s", pipe_index) self._update_overall() + self._emit_event( + "pipe-begin", + pipe_index=pipe_index, + pipe_label=self._pipe_labels[pipe_index] if 0 <= pipe_index < len(self._pipe_labels) else str(pipe_index), + total_items=total_items, + percent_mode=percent_mode, + ) labels: List[str] = [] if isinstance(items_preview, list) and items_preview: @@ -1452,6 +1498,15 @@ class PipelineLiveProgress: else: pipe_progress.update(pipe_task, completed=done) + self._emit_event( + "pipe-emit", + pipe_index=pipe_index, + pipe_label=self._pipe_labels[pipe_index] if 0 <= pipe_index < len(self._pipe_labels) else str(pipe_index), + completed=done, + total=self._pipe_totals[pipe_index], + item_label=_pipeline_progress_item_label(emitted), + ) + self._update_overall() # Clear any status line now that it emitted. @@ -1538,6 +1593,14 @@ class PipelineLiveProgress: except Exception: logger.exception("Failed to stop pipe task %s during finish_pipe", pipe_index) + self._emit_event( + "pipe-finish", + pipe_index=pipe_index, + pipe_label=self._pipe_labels[pipe_index] if 0 <= pipe_index < len(self._pipe_labels) else str(pipe_index), + completed=self._pipe_done[pipe_index], + total=self._pipe_totals[pipe_index], + ) + self._update_overall() def complete_all_pipes(self) -> None: diff --git a/SYS/pipeline.py b/SYS/pipeline.py index a8890a3..5d90518 100644 --- a/SYS/pipeline.py +++ b/SYS/pipeline.py @@ -39,6 +39,16 @@ def get_live_progress() -> Any: return state.live_progress +def set_progress_event_callback(callback: Any) -> None: + state = _get_pipeline_state() + state.progress_event_callback = callback + + +def get_progress_event_callback() -> Any: + state = _get_pipeline_state() + return state.progress_event_callback + + @contextmanager def suspend_live_progress(): """Temporarily pause Live progress rendering. @@ -104,6 +114,7 @@ class PipelineState: pipeline_stop: Optional[Dict[str, Any]] = None live_progress: Any = None last_execution_result: Dict[str, Any] = field(default_factory=dict) + progress_event_callback: Any = None def reset(self) -> None: self.current_context = None @@ -130,6 +141,7 @@ class PipelineState: self.pipeline_stop = None self.live_progress = None self.last_execution_result = {} + self.progress_event_callback = None # ContextVar for per-run state (prototype) @@ -2431,6 +2443,10 @@ class PipelineExecutor: if hasattr(_pipeline_ctx, "set_live_progress"): _pipeline_ctx.set_live_progress(progress_ui) + if hasattr(_pipeline_ctx, "get_progress_event_callback"): + progress_cb = _pipeline_ctx.get_progress_event_callback() + if callable(progress_cb) and hasattr(progress_ui, "set_event_callback"): + progress_ui.set_event_callback(progress_cb) except Exception: logger.exception("Failed to register PipelineLiveProgress with pipeline context") pipe_index_by_stage = {