This commit is contained in:
2026-03-21 15:12:52 -07:00
parent 11384266e3
commit d9a6b1bfb4
6 changed files with 512 additions and 21 deletions

136
CLI.py
View File

@@ -115,7 +115,13 @@ from SYS.cli_parsing import SelectionSyntax, SelectionFilterSyntax, MedeiaLexer
def _send_mpv_ipc_command(command: List[Any], *, ipc_path: Optional[str] = None, timeout: float = 0.75) -> bool:
def _send_mpv_ipc_command(
command: List[Any],
*,
ipc_path: Optional[str] = None,
timeout: float = 0.75,
wait_for_response: bool = True,
) -> bool:
if not isinstance(command, list) or not command:
return False
@@ -130,13 +136,16 @@ def _send_mpv_ipc_command(command: List[Any], *, ipc_path: Optional[str] = None,
try:
response = client.send_command({
"command": command,
})
}, wait=bool(wait_for_response))
finally:
try:
client.disconnect()
except Exception:
pass
if not wait_for_response:
return bool(response and (response.get("async") or response.get("request_id") is not None))
return bool(response and response.get("error") == "success")
except Exception as exc:
debug(f"mpv ipc command failed: {exc}")
@@ -155,10 +164,11 @@ def _notify_mpv_osd(text: str, *, duration_ms: int = 3500, ipc_path: Optional[st
max(0, int(duration_ms)),
],
ipc_path=ipc_path,
wait_for_response=False,
)
def _notify_mpv_callback(metadata: Dict[str, Any], execution_result: Dict[str, Any]) -> bool:
def _send_mpv_callback_event(metadata: Dict[str, Any], payload: Dict[str, Any]) -> bool:
callback = metadata.get("mpv_callback") if isinstance(metadata, dict) else None
if not isinstance(callback, dict):
return False
@@ -169,26 +179,102 @@ def _notify_mpv_callback(metadata: Dict[str, Any], execution_result: Dict[str, A
if not script_name or not message_name:
return False
payload = {
"phase": "completed",
"success": bool(execution_result.get("success")),
"status": str(execution_result.get("status") or "completed"),
"error": str(execution_result.get("error") or "").strip(),
"command_text": str(execution_result.get("command_text") or "").strip(),
event_payload = {
"kind": str(metadata.get("kind") or "").strip(),
}
if isinstance(payload, dict):
event_payload.update(payload)
return _send_mpv_ipc_command(
[
"script-message-to",
script_name,
message_name,
json.dumps(payload, ensure_ascii=False),
json.dumps(event_payload, ensure_ascii=False),
],
ipc_path=ipc_path,
wait_for_response=False,
)
def _notify_mpv_callback(metadata: Dict[str, Any], execution_result: Dict[str, Any]) -> bool:
return _send_mpv_callback_event(
metadata,
{
"phase": "completed",
"success": bool(execution_result.get("success")),
"status": str(execution_result.get("status") or "completed"),
"error": str(execution_result.get("error") or "").strip(),
"command_text": str(execution_result.get("command_text") or "").strip(),
},
)
def _build_mpv_progress_callback(metadata: Dict[str, Any]) -> Optional[Any]:
callback = metadata.get("mpv_callback") if isinstance(metadata, dict) else None
if not isinstance(callback, dict):
return None
last_sent_at: Dict[str, float] = {}
last_percent: Dict[str, int] = {}
last_text: Dict[str, str] = {}
def emit(payload: Dict[str, Any]) -> bool:
if not isinstance(payload, dict):
return False
event_name = str(payload.get("event") or "").strip().lower()
now = time.monotonic()
throttle_key = event_name or "progress"
if event_name == "pipe-percent":
pipe_index = int(payload.get("pipe_index") or 0)
percent = max(0, min(100, int(payload.get("percent") or 0)))
throttle_key = f"pipe-percent:{pipe_index}"
prev = last_percent.get(throttle_key)
if prev == percent:
return False
if prev is not None and percent < 100 and (percent - prev) < 5 and (now - last_sent_at.get(throttle_key, 0.0)) < 0.35:
return False
last_percent[throttle_key] = percent
payload = dict(payload)
payload["percent"] = percent
elif event_name == "transfer":
label = str(payload.get("label") or "transfer").strip() or "transfer"
throttle_key = f"transfer:{label}"
completed = payload.get("completed")
total = payload.get("total")
percent = None
try:
if total is not None and int(total) > 0 and completed is not None:
percent = max(0, min(100, int(round((int(completed) / max(1, int(total))) * 100.0))))
except Exception:
percent = None
if percent is not None:
prev = last_percent.get(throttle_key)
if prev == percent:
return False
if prev is not None and percent < 100 and (percent - prev) < 3 and (now - last_sent_at.get(throttle_key, 0.0)) < 0.35:
return False
last_percent[throttle_key] = percent
payload = dict(payload)
payload["percent"] = percent
elif event_name == "status":
pipe_index = int(payload.get("pipe_index") or 0)
throttle_key = f"status:{pipe_index}"
text = str(payload.get("text") or "").strip()
if last_text.get(throttle_key) == text and (now - last_sent_at.get(throttle_key, 0.0)) < 0.5:
return False
last_text[throttle_key] = text
last_sent_at[throttle_key] = now
event_payload = dict(payload)
event_payload.setdefault("phase", "progress")
return _send_mpv_callback_event(metadata, event_payload)
return emit
def _notify_mpv_completion(metadata: Dict[str, Any], execution_result: Dict[str, Any]) -> bool:
callback_sent = _notify_mpv_callback(metadata, execution_result)
@@ -1024,6 +1110,15 @@ class CmdletExecutor:
ctx.set_live_progress(progress_ui)
except Exception:
pass
try:
progress_cb = (
ctx.get_progress_event_callback()
if hasattr(ctx, "get_progress_event_callback") else None
)
if callable(progress_cb) and hasattr(progress_ui, "set_event_callback"):
progress_ui.set_event_callback(progress_cb)
except Exception:
pass
pipe_idx = 0
@@ -2418,14 +2513,30 @@ Come to love it when others take what you share, as there is no greater joy
if isinstance(queued_payload, dict) and isinstance(queued_payload.get("metadata"), dict)
else None
)
progress_event_callback = _build_mpv_progress_callback(queued_metadata) if queued_metadata else None
try:
from SYS import pipeline as ctx
ctx.set_current_command_text(user_input)
if hasattr(ctx, "set_progress_event_callback"):
ctx.set_progress_event_callback(progress_event_callback)
pipeline_ctx_ref = ctx
except Exception:
pipeline_ctx_ref = None
if queued_metadata:
try:
_send_mpv_callback_event(
queued_metadata,
{
"phase": "started",
"event": "command-started",
"command_text": user_input,
},
)
except Exception:
pass
execution_result: Dict[str, Any] = {
"status": "completed",
"success": True,
@@ -2627,6 +2738,11 @@ Come to love it when others take what you share, as there is no greater joy
pass
if pipeline_ctx_ref:
pipeline_ctx_ref.clear_current_command_text()
if hasattr(pipeline_ctx_ref, "set_progress_event_callback"):
try:
pipeline_ctx_ref.set_progress_event_callback(None)
except Exception:
pass
repl_queue_stop.set()

View File

@@ -4,7 +4,7 @@ local msg = require 'mp.msg'
local M = {}
local MEDEIA_LUA_VERSION = '2026-03-20.1'
local MEDEIA_LUA_VERSION = '2026-03-21.1'
-- Expose a tiny breadcrumb for debugging which script version is loaded.
pcall(mp.set_property, 'user-data/medeia-lua-version', MEDEIA_LUA_VERSION)
@@ -56,6 +56,140 @@ local PIPELINE_REQ_PROP = 'user-data/medeia-pipeline-request'
local PIPELINE_RESP_PROP = 'user-data/medeia-pipeline-response'
local PIPELINE_READY_PROP = 'user-data/medeia-pipeline-ready'
local CURRENT_WEB_URL_PROP = 'user-data/medeia-current-web-url'
local _pipeline_progress_ui = {
overlay = nil,
hide_token = 0,
title = '',
summary = '',
detail = '',
}
function _pipeline_progress_ui.trim(text)
return tostring(text or ''):gsub('^%s+', ''):gsub('%s+$', '')
end
function _pipeline_progress_ui.ass_escape(text)
text = tostring(text or '')
text = text:gsub('\\', '\\\\')
text = text:gsub('{', '\\{')
text = text:gsub('}', '\\}')
text = text:gsub('\n', '\\N')
return text
end
function _pipeline_progress_ui.truncate(text, max_len)
text = _pipeline_progress_ui.trim(text)
max_len = tonumber(max_len or 0) or 0
if max_len <= 0 or #text <= max_len then
return text
end
if max_len <= 3 then
return text:sub(1, max_len)
end
return text:sub(1, max_len - 3) .. '...'
end
function _pipeline_progress_ui.kind_title(kind)
kind = _pipeline_progress_ui.trim(kind):lower()
if kind == 'mpv-download' then
return 'Download'
end
if kind == 'mpv-screenshot' then
return 'Screenshot'
end
return 'Pipeline'
end
function _pipeline_progress_ui.ensure_overlay()
if _pipeline_progress_ui.overlay then
return _pipeline_progress_ui.overlay
end
local ok, overlay = pcall(mp.create_osd_overlay, 'ass-events')
if ok and overlay then
_pipeline_progress_ui.overlay = overlay
end
return _pipeline_progress_ui.overlay
end
function _pipeline_progress_ui.cancel_hide()
_pipeline_progress_ui.hide_token = (_pipeline_progress_ui.hide_token or 0) + 1
end
function _pipeline_progress_ui.render()
local overlay = _pipeline_progress_ui.ensure_overlay()
if not overlay then
return
end
local width, height = 1280, 720
local ok, w, h = pcall(mp.get_osd_size)
if ok and tonumber(w or 0) and tonumber(h or 0) and w > 0 and h > 0 then
width = math.floor(w)
height = math.floor(h)
end
overlay.res_x = width
overlay.res_y = height
if _pipeline_progress_ui.summary == '' and _pipeline_progress_ui.detail == '' then
overlay.data = ''
overlay:update()
return
end
local title = _pipeline_progress_ui.truncate(_pipeline_progress_ui.title ~= '' and _pipeline_progress_ui.title or 'Pipeline', 42)
local summary = _pipeline_progress_ui.truncate(_pipeline_progress_ui.summary, 72)
local detail = _pipeline_progress_ui.truncate(_pipeline_progress_ui.detail, 88)
local lines = {
'{\\b1}' .. _pipeline_progress_ui.ass_escape(title) .. '{\\b0}',
}
if summary ~= '' then
lines[#lines + 1] = _pipeline_progress_ui.ass_escape(summary)
end
if detail ~= '' then
lines[#lines + 1] = '{\\fs18\\c&HDDDDDD&}' .. _pipeline_progress_ui.ass_escape(detail) .. '{\\r}'
end
overlay.data = string.format(
'{\\an9\\pos(%d,%d)\\fs22\\bord2\\shad1\\1c&HFFFFFF&\\3c&H111111&\\4c&H000000&}%s',
width - 28,
34,
table.concat(lines, '\\N')
)
overlay:update()
end
function _pipeline_progress_ui.hide()
_pipeline_progress_ui.cancel_hide()
_pipeline_progress_ui.title = ''
_pipeline_progress_ui.summary = ''
_pipeline_progress_ui.detail = ''
_pipeline_progress_ui.render()
end
function _pipeline_progress_ui.schedule_hide(delay_seconds)
_pipeline_progress_ui.cancel_hide()
local delay = tonumber(delay_seconds or 0) or 0
if delay <= 0 then
_pipeline_progress_ui.hide()
return
end
local token = _pipeline_progress_ui.hide_token
mp.add_timeout(delay, function()
if token ~= _pipeline_progress_ui.hide_token then
return
end
_pipeline_progress_ui.hide()
end)
end
function _pipeline_progress_ui.update(title, summary, detail)
_pipeline_progress_ui.cancel_hide()
_pipeline_progress_ui.title = _pipeline_progress_ui.trim(title)
_pipeline_progress_ui.summary = _pipeline_progress_ui.trim(summary)
_pipeline_progress_ui.detail = _pipeline_progress_ui.trim(detail)
_pipeline_progress_ui.render()
end
local function _get_lua_source_path()
local info = nil
@@ -1705,8 +1839,83 @@ mp.register_script_message('medeia-pipeline-event', function(json)
pcall(mp.set_property, 'user-data/medeia-last-pipeline-event', encoded)
end
local phase = _pipeline_progress_ui.trim(payload.phase)
local event_name = _pipeline_progress_ui.trim(payload.event)
local kind = _pipeline_progress_ui.trim(payload.kind)
local title = _pipeline_progress_ui.kind_title(kind)
local summary = ''
local detail = ''
if phase == 'started' then
local command_text = _pipeline_progress_ui.trim(payload.command_text)
summary = command_text ~= '' and ('Started: ' .. command_text) or ('Started: ' .. kind)
detail = 'Queued job started'
elseif phase == 'progress' then
if event_name == 'pipe-percent' then
local label = _pipeline_progress_ui.trim(payload.pipe_label ~= '' and payload.pipe_label or kind ~= '' and kind or 'pipeline')
local percent = tonumber(payload.percent or 0) or 0
summary = ('%s %d%%'):format(label, math.floor(percent + 0.5))
detail = 'Processing'
elseif event_name == 'status' then
summary = _pipeline_progress_ui.trim(payload.text)
detail = _pipeline_progress_ui.trim(payload.pipe_label ~= '' and payload.pipe_label or kind)
elseif event_name == 'transfer' then
local label = _pipeline_progress_ui.trim(payload.label ~= '' and payload.label or 'transfer')
local percent = tonumber(payload.percent or 0)
if percent then
summary = ('%s %d%%'):format(label, math.floor(percent + 0.5))
else
summary = label
end
local completed = tonumber(payload.completed or 0)
local total = tonumber(payload.total or 0)
if completed and total and total > 0 then
detail = ('%d / %d'):format(math.floor(completed + 0.5), math.floor(total + 0.5))
end
elseif event_name == 'pipe-begin' then
local label = _pipeline_progress_ui.trim(payload.pipe_label ~= '' and payload.pipe_label or kind ~= '' and kind or 'pipeline')
summary = 'Running: ' .. label
local total_items = tonumber(payload.total_items or 0)
if total_items and total_items > 0 then
detail = ('Items: %d'):format(math.floor(total_items + 0.5))
end
elseif event_name == 'pipe-emit' then
local label = _pipeline_progress_ui.trim(payload.pipe_label ~= '' and payload.pipe_label or kind ~= '' and kind or 'pipeline')
local completed = tonumber(payload.completed or 0) or 0
local total = tonumber(payload.total or 0) or 0
summary = total > 0 and ('%s %d/%d'):format(label, completed, total) or label
detail = _pipeline_progress_ui.trim(payload.item_label)
end
end
if phase == 'completed' then
pcall(mp.set_property, 'user-data/medeia-pipeline-progress', '')
pcall(mp.set_property, 'user-data/medeia-pipeline-progress-summary', '')
if payload.success == false then
summary = title .. ' failed'
detail = _pipeline_progress_ui.trim(payload.error)
if detail == '' then
detail = 'Unknown error'
end
else
summary = title .. ' complete'
detail = _pipeline_progress_ui.trim(payload.command_text)
end
_pipeline_progress_ui.update(title, summary, detail)
_pipeline_progress_ui.schedule_hide(2.5)
else
if type(encoded) == 'string' and encoded ~= '' then
pcall(mp.set_property, 'user-data/medeia-pipeline-progress', encoded)
end
if summary ~= '' then
pcall(mp.set_property, 'user-data/medeia-pipeline-progress-summary', summary)
end
_pipeline_progress_ui.update(title, summary, detail)
end
_lua_log(
'pipeline-event: phase=' .. tostring(payload.phase or '')
.. ' event=' .. tostring(payload.event or '')
.. ' success=' .. tostring(payload.success)
.. ' kind=' .. tostring(payload.kind or '')
.. ' error=' .. tostring(payload.error or '')
@@ -2288,7 +2497,6 @@ local function _activate_image_controls()
_bind_image_key('d', 'image-pan-d', function() _change_pan(-ImageControl.pan_step, 0) end, {repeatable=true})
_bind_image_key('=', 'image-zoom-in', function() _change_zoom(ImageControl.zoom_step) end, {repeatable=true})
_disable_image_section()
_bind_image_key('-', 'image-zoom-out', function() _change_zoom(-ImageControl.zoom_step) end, {repeatable=true})
_bind_image_key('+', 'image-zoom-in-fine', function() _change_zoom(ImageControl.zoom_step_slow) end, {repeatable=true})
_bind_image_key('_', 'image-zoom-out-fine', function() _change_zoom(-ImageControl.zoom_step_slow) end, {repeatable=true})
@@ -2300,10 +2508,12 @@ end
local function _deactivate_image_controls()
if not ImageControl.enabled then
_disable_image_section()
return
end
ImageControl.enabled = false
_set_image_property(false)
_disable_image_section()
_restore_q_default()
_unbind_image_keys()
mp.osd_message('Image viewer controls disabled', 1.0)
@@ -4813,6 +5023,12 @@ mp.register_script_message('medios-load-url-event', function(json)
if ensure_uosc_loaded() then
_lua_log('[LOAD-URL] Sending close-menu command to UOSC')
mp.commandv('script-message-to', 'uosc', 'close-menu', LOAD_URL_MENU_TYPE)
mp.add_timeout(0.05, function()
if ensure_uosc_loaded() then
_lua_log('[LOAD-URL] Requesting UOSC cursor sync after menu close')
pcall(mp.commandv, 'script-message-to', 'uosc', 'sync-cursor')
end
end)
else
_lua_log('[LOAD-URL] UOSC not loaded, cannot close menu')
end

View File

@@ -48,6 +48,42 @@ def _windows_pipe_available(path: str) -> bool:
return False
def _windows_pipe_bytes_available(pipe: BinaryIO) -> Optional[int]:
"""Return the number of bytes ready to read from a Windows named pipe."""
if platform.system() != "Windows":
return None
try:
import msvcrt
handle = msvcrt.get_osfhandle(pipe.fileno())
kernel32 = ctypes.windll.kernel32
PeekNamedPipe = kernel32.PeekNamedPipe
PeekNamedPipe.argtypes = [
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_uint32,
ctypes.c_void_p,
ctypes.POINTER(ctypes.c_uint32),
ctypes.c_void_p,
]
PeekNamedPipe.restype = ctypes.c_bool
total_available = ctypes.c_uint32(0)
ok = PeekNamedPipe(
ctypes.c_void_p(handle),
None,
0,
None,
ctypes.byref(total_available),
None,
)
if not ok:
return None
return int(total_available.value)
except Exception:
return None
def _windows_pythonw_exe(python_exe: Optional[str]) -> Optional[str]:
"""Return a pythonw.exe adjacent to python.exe if available (Windows only)."""
if platform.system() != "Windows":
@@ -921,15 +957,46 @@ class MPVIPCClient:
deadline = _time.time() + max(0.0, effective_timeout)
if self.is_windows:
try:
pipe = cast(BinaryIO, self.sock)
return pipe.readline()
except (OSError, IOError, BrokenPipeError, ConnectionResetError) as exc:
# Pipe error; try to reconnect once
if not self.silent:
debug(f"Pipe readline failed: {exc}")
self.disconnect()
return None
pipe = cast(BinaryIO, self.sock)
while True:
nl = self._recv_buffer.find(b"\n")
if nl != -1:
line = self._recv_buffer[:nl + 1]
self._recv_buffer = self._recv_buffer[nl + 1:]
return line
remaining = deadline - _time.time()
if remaining <= 0:
return None
try:
available = _windows_pipe_bytes_available(pipe)
except Exception as exc:
if not self.silent:
debug(f"Pipe availability probe failed: {exc}")
self.disconnect()
return None
if available is None:
self.disconnect()
return None
if available <= 0:
_time.sleep(min(0.01, max(0.001, remaining)))
continue
try:
chunk = pipe.read(min(available, 4096))
except (OSError, IOError, BrokenPipeError, ConnectionResetError) as exc:
if not self.silent:
debug(f"Pipe readline failed: {exc}")
self.disconnect()
return None
if not chunk:
return b""
self._recv_buffer += chunk
# Unix: buffer until newline.
sock_obj = cast(socket.socket, self.sock)
@@ -1034,6 +1101,7 @@ class MPVIPCClient:
try:
# Try to open the named pipe
self.sock = open(self.socket_path, "r+b", buffering=0)
self._recv_buffer = b""
return True
except (OSError, IOError) as exc:
if not self.silent:
@@ -1055,6 +1123,7 @@ class MPVIPCClient:
self.sock = socket.socket(af_unix, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.connect(self.socket_path)
self._recv_buffer = b""
return True
except Exception as exc:
if not self.silent:
@@ -1167,6 +1236,7 @@ class MPVIPCClient:
except Exception:
pass
self.sock = None
self._recv_buffer = b""
def __del__(self) -> None:
"""Cleanup on object destruction."""

View File

@@ -1112,6 +1112,16 @@ end)
mp.register_script_message('close-menu', function(type)
if Menu:is_open(type) then Menu:close() end
end)
mp.register_script_message('sync-cursor', function()
local mouse = mp.get_property_native('mouse-pos')
if type(mouse) == 'table' and mouse.hover and mouse.x and mouse.y then
cursor:move(mouse.x, mouse.y)
else
cursor:leave()
end
cursor:queue_autohide()
request_render()
end)
mp.register_script_message('menu-action', function(name, ...)
local menu = Menu:is_open()
if menu then

View File

@@ -769,6 +769,7 @@ class PipelineLiveProgress:
self._enabled = bool(enabled)
self._pipe_labels = [str(x) for x in (pipe_labels or [])]
self._lock = RLock()
self._event_callback: Any = None
self._console: Optional[Console] = None
self._live: Optional[Live] = None
@@ -809,6 +810,24 @@ class PipelineLiveProgress:
# Title line state (active per-item context)
self._active_subtask_text: Optional[str] = None
def set_event_callback(self, callback: Any) -> None:
with self._lock:
self._event_callback = callback
def _emit_event(self, event: str, **payload: Any) -> None:
callback = self._event_callback
if not callable(callback):
return
data: Dict[str, Any] = {
"phase": "progress",
"event": str(event or "").strip(),
}
data.update(payload)
try:
callback(data)
except Exception:
logger.exception("Failed to emit PipelineLiveProgress event '%s'", event)
def _title_text(self) -> str:
"""Compute the Pipeline panel title.
@@ -1093,6 +1112,12 @@ class PipelineLiveProgress:
prog.update(task_id, description=msg, refresh=True)
except Exception:
logger.exception("Failed to update status task %s in set_pipe_status_text", task_id)
self._emit_event(
"status",
pipe_index=pidx,
pipe_label=self._pipe_labels[pidx] if 0 <= pidx < len(self._pipe_labels) else str(pidx),
text=msg,
)
def clear_pipe_status_text(self, pipe_index: int) -> None:
if not self._enabled:
@@ -1134,6 +1159,12 @@ class PipelineLiveProgress:
pipe_task = self._pipe_tasks[pidx]
pipe_progress.update(pipe_task, completed=pct, total=100, refresh=True)
self._update_overall()
self._emit_event(
"pipe-percent",
pipe_index=pidx,
pipe_label=self._pipe_labels[pidx] if 0 <= pidx < len(self._pipe_labels) else str(pidx),
percent=pct,
)
except Exception:
logger.exception("Failed to set pipe percent for pipe %s in set_pipe_percent", pipe_index)
@@ -1262,6 +1293,7 @@ class PipelineLiveProgress:
try:
task_id = self._transfers.add_task(key, total=task_total)
self._transfer_tasks[key] = task_id
self._emit_event("transfer-begin", label=key, total=task_total)
except Exception:
logger.exception("Failed to add transfer task %s in begin_transfer", key)
@@ -1290,6 +1322,12 @@ class PipelineLiveProgress:
if total is not None and total > 0:
kwargs["total"] = int(total)
self._transfers.update(task_id, refresh=True, **kwargs)
self._emit_event(
"transfer",
label=key,
completed=(int(completed) if completed is not None else None),
total=(int(total) if total is not None and total > 0 else None),
)
except Exception:
logger.exception("Failed to update transfer '%s'", key)
@@ -1302,6 +1340,7 @@ class PipelineLiveProgress:
return
try:
self._transfers.remove_task(task_id)
self._emit_event("transfer-finish", label=key)
except Exception:
logger.exception("Failed to remove transfer task '%s' in finish_transfer", key)
@@ -1363,6 +1402,13 @@ class PipelineLiveProgress:
logger.exception("Failed to start pipe task timer in begin_pipe for %s", pipe_index)
self._update_overall()
self._emit_event(
"pipe-begin",
pipe_index=pipe_index,
pipe_label=self._pipe_labels[pipe_index] if 0 <= pipe_index < len(self._pipe_labels) else str(pipe_index),
total_items=total_items,
percent_mode=percent_mode,
)
labels: List[str] = []
if isinstance(items_preview, list) and items_preview:
@@ -1452,6 +1498,15 @@ class PipelineLiveProgress:
else:
pipe_progress.update(pipe_task, completed=done)
self._emit_event(
"pipe-emit",
pipe_index=pipe_index,
pipe_label=self._pipe_labels[pipe_index] if 0 <= pipe_index < len(self._pipe_labels) else str(pipe_index),
completed=done,
total=self._pipe_totals[pipe_index],
item_label=_pipeline_progress_item_label(emitted),
)
self._update_overall()
# Clear any status line now that it emitted.
@@ -1538,6 +1593,14 @@ class PipelineLiveProgress:
except Exception:
logger.exception("Failed to stop pipe task %s during finish_pipe", pipe_index)
self._emit_event(
"pipe-finish",
pipe_index=pipe_index,
pipe_label=self._pipe_labels[pipe_index] if 0 <= pipe_index < len(self._pipe_labels) else str(pipe_index),
completed=self._pipe_done[pipe_index],
total=self._pipe_totals[pipe_index],
)
self._update_overall()
def complete_all_pipes(self) -> None:

View File

@@ -39,6 +39,16 @@ def get_live_progress() -> Any:
return state.live_progress
def set_progress_event_callback(callback: Any) -> None:
state = _get_pipeline_state()
state.progress_event_callback = callback
def get_progress_event_callback() -> Any:
state = _get_pipeline_state()
return state.progress_event_callback
@contextmanager
def suspend_live_progress():
"""Temporarily pause Live progress rendering.
@@ -104,6 +114,7 @@ class PipelineState:
pipeline_stop: Optional[Dict[str, Any]] = None
live_progress: Any = None
last_execution_result: Dict[str, Any] = field(default_factory=dict)
progress_event_callback: Any = None
def reset(self) -> None:
self.current_context = None
@@ -130,6 +141,7 @@ class PipelineState:
self.pipeline_stop = None
self.live_progress = None
self.last_execution_result = {}
self.progress_event_callback = None
# ContextVar for per-run state (prototype)
@@ -2431,6 +2443,10 @@ class PipelineExecutor:
if hasattr(_pipeline_ctx, "set_live_progress"):
_pipeline_ctx.set_live_progress(progress_ui)
if hasattr(_pipeline_ctx, "get_progress_event_callback"):
progress_cb = _pipeline_ctx.get_progress_event_callback()
if callable(progress_cb) and hasattr(progress_ui, "set_event_callback"):
progress_ui.set_event_callback(progress_cb)
except Exception:
logger.exception("Failed to register PipelineLiveProgress with pipeline context")
pipe_index_by_stage = {