Files
Medios-Macina/SYS/pipeline.py
2026-01-31 19:00:04 -08:00

2915 lines
123 KiB
Python

"""
Pipeline execution context and state management for cmdlet.
"""
from __future__ import annotations
import sys
from contextlib import contextmanager
from dataclasses import dataclass, field
from contextvars import ContextVar
from typing import Any, Dict, List, Optional, Sequence, Callable
from SYS.models import PipelineStageContext
from SYS.logger import log, debug, is_debug_enabled
import logging
logger = logging.getLogger(__name__)
from SYS.worker import WorkerManagerRegistry, WorkerStages
from SYS.cli_parsing import SelectionSyntax, SelectionFilterSyntax
from SYS.rich_display import stdout_console
from SYS.background_notifier import ensure_background_notifier
from SYS.result_table import Table
import re
from datetime import datetime
from SYS.cmdlet_catalog import import_cmd_module
HELP_EXAMPLE_SOURCE_COMMANDS = {
".help-example",
"help-example",
}
def set_live_progress(progress_ui: Any) -> None:
"""Register the current Live progress UI so cmdlets can suspend it during prompts."""
state = _get_pipeline_state()
state.live_progress = progress_ui
def get_live_progress() -> Any:
state = _get_pipeline_state()
return state.live_progress
@contextmanager
def suspend_live_progress():
"""Temporarily pause Live progress rendering.
This avoids Rich Live cursor control interfering with interactive tables/prompts
emitted by cmdlets during preflight (e.g. URL-duplicate confirmation).
"""
ui = get_live_progress()
paused = False
try:
if ui is not None and hasattr(ui, "pause"):
try:
ui.pause()
paused = True
except Exception:
paused = False
yield
finally:
# If a stage requested the pipeline stop (e.g. user declined a preflight prompt),
# do not resume Live rendering.
if get_pipeline_stop() is not None:
return
if paused and ui is not None and hasattr(ui, "resume"):
try:
ui.resume()
except Exception:
logger.exception("Failed to resume live progress UI after suspend")
def _is_selectable_table(table: Any) -> bool:
"""Return True when a table can be used for @ selection."""
# Avoid relying on truthiness for selectability.
# `ResultTable` can be falsey when it has 0 rows, but `@` selection/filtering
# should still be allowed when the backing `last_result_items` exist.
return table is not None and not getattr(table, "no_choice", False)
# Pipeline state container (prototype)
@dataclass
class PipelineState:
current_context: Optional[PipelineStageContext] = None
last_search_query: Optional[str] = None
pipeline_refreshed: bool = False
last_items: List[Any] = field(default_factory=list)
last_result_table: Optional[Any] = None
last_result_items: List[Any] = field(default_factory=list)
last_result_subject: Optional[Any] = None
result_table_history: List[tuple[Optional[Any], List[Any], Optional[Any]]] = field(default_factory=list)
result_table_forward: List[tuple[Optional[Any], List[Any], Optional[Any]]] = field(default_factory=list)
current_stage_table: Optional[Any] = None
display_items: List[Any] = field(default_factory=list)
display_table: Optional[Any] = None
display_subject: Optional[Any] = None
last_selection: List[int] = field(default_factory=list)
pipeline_command_text: str = ""
current_cmdlet_name: str = ""
current_stage_text: str = ""
pipeline_values: Dict[str, Any] = field(default_factory=dict)
pending_pipeline_tail: List[List[str]] = field(default_factory=list)
pending_pipeline_source: Optional[str] = None
ui_library_refresh_callback: Optional[Any] = None
pipeline_stop: Optional[Dict[str, Any]] = None
live_progress: Any = None
def reset(self) -> None:
self.current_context = None
self.last_search_query = None
self.pipeline_refreshed = False
self.last_items = []
self.last_result_table = None
self.last_result_items = []
self.last_result_subject = None
self.result_table_history = []
self.result_table_forward = []
self.current_stage_table = None
self.display_items = []
self.display_table = None
self.display_subject = None
self.last_selection = []
self.pipeline_command_text = ""
self.current_cmdlet_name = ""
self.current_stage_text = ""
self.pipeline_values = {}
self.pending_pipeline_tail = []
self.pending_pipeline_source = None
self.ui_library_refresh_callback = None
self.pipeline_stop = None
self.live_progress = None
# ContextVar for per-run state (prototype)
_CTX_STATE: ContextVar[Optional[PipelineState]] = ContextVar("_pipeline_state", default=None)
_GLOBAL_STATE: PipelineState = PipelineState()
def _get_pipeline_state() -> PipelineState:
"""Return the PipelineState for the current context or the global fallback."""
state = _CTX_STATE.get()
return state if state is not None else _GLOBAL_STATE
@contextmanager
def new_pipeline_state():
"""Context manager to use a fresh PipelineState for a run."""
token = _CTX_STATE.set(PipelineState())
try:
yield _CTX_STATE.get()
finally:
_CTX_STATE.reset(token)
# Legacy module-level synchronization removed — module-level pipeline globals are no longer maintained.
# Use `get_pipeline_state()` to access or mutate the per-run PipelineState.
# Public accessors for pipeline state (for external callers that need to inspect
# or mutate the PipelineState directly). These provide stable, non-underscored
# entrypoints so other modules don't rely on implementation-internal names.
def get_pipeline_state() -> PipelineState:
"""Return the active PipelineState for the current context or the global fallback."""
return _get_pipeline_state()
# No module-level pipeline runtime variables; per-run pipeline state is stored in PipelineState (use `get_pipeline_state()`).
MAX_RESULT_TABLE_HISTORY = 20
PIPELINE_MISSING = object()
def request_pipeline_stop(*, reason: str = "", exit_code: int = 0) -> None:
"""Request that the pipeline runner stop gracefully after the current stage."""
state = _get_pipeline_state()
state.pipeline_stop = {
"reason": str(reason or "").strip(),
"exit_code": int(exit_code)
}
def get_pipeline_stop() -> Optional[Dict[str, Any]]:
state = _get_pipeline_state()
return state.pipeline_stop
def clear_pipeline_stop() -> None:
state = _get_pipeline_state()
state.pipeline_stop = None
# ============================================================================
# PUBLIC API
# ============================================================================
def set_stage_context(context: Optional[PipelineStageContext]) -> None:
"""Set the current pipeline stage context."""
state = _get_pipeline_state()
state.current_context = context
def get_stage_context() -> Optional[PipelineStageContext]:
"""Get the current pipeline stage context."""
state = _get_pipeline_state()
return state.current_context
def emit(obj: Any) -> None:
"""
Emit an object to the current pipeline stage output.
"""
ctx = _get_pipeline_state().current_context
if ctx is not None:
ctx.emit(obj)
def emit_list(objects: List[Any]) -> None:
"""
Emit a list of objects to the next pipeline stage.
"""
ctx = _get_pipeline_state().current_context
if ctx is not None:
ctx.emit(objects)
def print_if_visible(*args: Any, file=None, **kwargs: Any) -> None:
"""
Print only if this is not a quiet mid-pipeline stage.
"""
try:
# Print if: not in a pipeline OR this is the last stage
ctx = _get_pipeline_state().current_context
should_print = (ctx is None) or (ctx and ctx.is_last_stage)
# Always print to stderr regardless
if file is not None:
should_print = True
if should_print:
log(*args, **kwargs) if file is None else log(*args, file=file, **kwargs)
except Exception:
logger.exception("Error in print_if_visible")
def store_value(key: str, value: Any) -> None:
"""
Store a value to pass to later pipeline stages.
"""
if not isinstance(key, str):
return
text = key.strip().lower()
if not text:
return
try:
state = _get_pipeline_state()
state.pipeline_values[text] = value
except Exception:
logger.exception("Failed to store pipeline value '%s'", key)
def load_value(key: str, default: Any = None) -> Any:
"""
Retrieve a value stored by an earlier pipeline stage.
"""
if not isinstance(key, str):
return default
text = key.strip()
if not text:
return default
parts = [segment.strip() for segment in text.split(".") if segment.strip()]
if not parts:
return default
root_key = parts[0].lower()
state = _get_pipeline_state()
container = state.pipeline_values.get(root_key, PIPELINE_MISSING)
if container is PIPELINE_MISSING:
return default
if len(parts) == 1:
return container
current: Any = container
for fragment in parts[1:]:
if isinstance(current, dict):
fragment_lower = fragment.lower()
if fragment in current:
current = current[fragment]
continue
match = PIPELINE_MISSING
for key_name, value in current.items():
if isinstance(key_name, str) and key_name.lower() == fragment_lower:
match = value
break
if match is PIPELINE_MISSING:
return default
current = match
continue
if isinstance(current, (list, tuple)):
if fragment.isdigit():
try:
idx = int(fragment)
except ValueError:
return default
if 0 <= idx < len(current):
current = current[idx]
continue
return default
if hasattr(current, fragment):
try:
current = getattr(current, fragment)
continue
except Exception:
return default
return default
return current
def set_pending_pipeline_tail(
stages: Optional[Sequence[Sequence[str]]],
source_command: Optional[str] = None
) -> None:
"""
Store the remaining pipeline stages when execution pauses for @N selection.
"""
state = _get_pipeline_state()
try:
pending: List[List[str]] = []
for stage in stages or []:
if isinstance(stage, (list, tuple)):
pending.append([str(token) for token in stage])
state.pending_pipeline_tail = pending
clean_source = (source_command or "").strip()
state.pending_pipeline_source = clean_source if clean_source else None
except Exception:
# Keep existing pending tail on failure
logger.exception("Failed to set pending pipeline tail; keeping existing pending tail")
def get_pending_pipeline_tail() -> List[List[str]]:
"""Get a copy of the pending pipeline tail (stages queued after selection)."""
state = _get_pipeline_state()
return [list(stage) for stage in state.pending_pipeline_tail]
def get_pending_pipeline_source() -> Optional[str]:
"""Get the source command associated with the pending pipeline tail."""
state = _get_pipeline_state()
return state.pending_pipeline_source
def clear_pending_pipeline_tail() -> None:
"""Clear any stored pending pipeline tail."""
state = _get_pipeline_state()
state.pending_pipeline_tail = []
state.pending_pipeline_source = None
def reset() -> None:
"""Reset all pipeline state. Called between pipeline executions."""
state = _get_pipeline_state()
state.reset()
def get_emitted_items() -> List[Any]:
"""
Get a copy of all items emitted by the current pipeline stage.
"""
state = _get_pipeline_state()
ctx = state.current_context
if ctx is not None:
return list(ctx.emits)
return []
def clear_emits() -> None:
"""Clear the emitted items list (called between stages)."""
state = _get_pipeline_state()
ctx = state.current_context
if ctx is not None:
ctx.emits.clear()
def set_last_selection(indices: Sequence[int]) -> None:
"""Record the indices selected via @ syntax for the next cmdlet.
Args:
indices: Iterable of 0-based indices captured from the REPL parser
"""
state = _get_pipeline_state()
state.last_selection = list(indices or [])
def get_last_selection() -> List[int]:
"""Return the indices selected via @ syntax for the current invocation."""
state = _get_pipeline_state()
return list(state.last_selection)
def clear_last_selection() -> None:
"""Clear the cached selection indices after a cmdlet finishes."""
state = _get_pipeline_state()
state.last_selection = []
def set_current_command_text(command_text: Optional[str]) -> None:
"""Record the raw pipeline/command text for downstream consumers."""
state = _get_pipeline_state()
state.pipeline_command_text = (command_text or "").strip()
def get_current_command_text(default: str = "") -> str:
"""Return the last recorded command/pipeline text."""
state = _get_pipeline_state()
text = state.pipeline_command_text.strip()
return text if text else default
def clear_current_command_text() -> None:
"""Clear the cached command text after execution completes."""
state = _get_pipeline_state()
state.pipeline_command_text = ""
def split_pipeline_text(pipeline_text: str) -> List[str]:
"""Split a pipeline string on unquoted '|' characters.
Preserves original quoting/spacing within each returned stage segment.
"""
text = str(pipeline_text or "")
if not text:
return []
stages: List[str] = []
buf: List[str] = []
quote: Optional[str] = None
escape = False
for ch in text:
if escape:
buf.append(ch)
escape = False
continue
if ch == "\\" and quote is not None:
buf.append(ch)
escape = True
continue
if ch in ('"', "'"):
if quote is None:
quote = ch
elif quote == ch:
quote = None
buf.append(ch)
continue
if ch == "|" and quote is None:
stages.append("".join(buf).strip())
buf = []
continue
buf.append(ch)
tail = "".join(buf).strip()
if tail:
stages.append(tail)
return [s for s in stages if s]
def get_current_command_stages() -> List[str]:
"""Return the raw stage segments for the current command text."""
return split_pipeline_text(get_current_command_text(""))
def set_current_stage_text(stage_text: Optional[str]) -> None:
"""Record the raw stage text currently being executed."""
state = _get_pipeline_state()
state.current_stage_text = str(stage_text or "").strip()
def get_current_stage_text(default: str = "") -> str:
"""Return the raw stage text currently being executed."""
state = _get_pipeline_state()
text = state.current_stage_text.strip()
return text if text else default
def clear_current_stage_text() -> None:
"""Clear the cached stage text after a stage completes."""
state = _get_pipeline_state()
state.current_stage_text = ""
def set_current_cmdlet_name(cmdlet_name: Optional[str]) -> None:
"""Record the currently executing cmdlet name (stage-local)."""
state = _get_pipeline_state()
state.current_cmdlet_name = str(cmdlet_name or "").strip()
def get_current_cmdlet_name(default: str = "") -> str:
"""Return the currently executing cmdlet name (stage-local)."""
state = _get_pipeline_state()
text = state.current_cmdlet_name.strip()
return text if text else default
def clear_current_cmdlet_name() -> None:
"""Clear the cached cmdlet name after a stage completes."""
state = _get_pipeline_state()
state.current_cmdlet_name = ""
def set_search_query(query: Optional[str]) -> None:
"""Set the last search query for refresh purposes."""
state = _get_pipeline_state()
state.last_search_query = query
def get_search_query() -> Optional[str]:
"""Get the last search query."""
state = _get_pipeline_state()
return state.last_search_query
def set_pipeline_refreshed(refreshed: bool) -> None:
"""Track whether the pipeline already refreshed results."""
state = _get_pipeline_state()
state.pipeline_refreshed = refreshed
def was_pipeline_refreshed() -> bool:
"""Check if the pipeline already refreshed results."""
state = _get_pipeline_state()
return state.pipeline_refreshed
def set_last_items(items: list) -> None:
"""Cache the last pipeline outputs."""
state = _get_pipeline_state()
state.last_items = list(items) if items else []
def get_last_items() -> List[Any]:
"""Get the last pipeline outputs."""
state = _get_pipeline_state()
return list(state.last_items)
def set_ui_library_refresh_callback(callback: Any) -> None:
"""
Set a callback to be called when library content is updated.
"""
state = _get_pipeline_state()
state.ui_library_refresh_callback = callback
def get_ui_library_refresh_callback() -> Optional[Any]:
"""Get the current library refresh callback."""
state = _get_pipeline_state()
return state.ui_library_refresh_callback
def trigger_ui_library_refresh(library_filter: str = "local") -> None:
"""Trigger a library refresh in the UI if callback is registered.
This should be called from cmdlet/funacts after content is added to library.
Args:
library_filter: Which library to refresh ('local', 'hydrus', etc)
"""
callback = get_ui_library_refresh_callback()
if callback:
try:
callback(library_filter)
except Exception as e:
print(
f"[trigger_ui_library_refresh] Error calling refresh callback: {e}",
file=sys.stderr
)
def set_last_result_table(
result_table: Optional[Any],
items: Optional[List[Any]] = None,
subject: Optional[Any] = None
) -> None:
"""
Store the last result table and items for @ selection syntax.
"""
state = _get_pipeline_state()
# Push current table to history before replacing
if state.last_result_table is not None:
state.result_table_history.append(
(
state.last_result_table,
state.last_result_items.copy(),
state.last_result_subject,
)
)
# Keep history size limited
if len(state.result_table_history) > MAX_RESULT_TABLE_HISTORY:
state.result_table_history.pop(0)
# Set new current table and clear any display items/table
state.display_items = []
state.display_table = None
state.display_subject = None
state.last_result_table = result_table
state.last_result_items = items or []
state.last_result_subject = subject
# Sort table by Title/Name column alphabetically if available
if (
result_table is not None
and hasattr(result_table, "sort_by_title")
and not getattr(result_table, "preserve_order", False)
):
try:
result_table.sort_by_title()
# Re-order items list to match the sorted table
if state.last_result_items and hasattr(result_table, "rows"):
sorted_items: List[Any] = []
for row in result_table.rows:
src_idx = getattr(row, "source_index", None)
if isinstance(src_idx, int) and 0 <= src_idx < len(state.last_result_items):
sorted_items.append(state.last_result_items[src_idx])
# Only reassign when the table actually contains rows and the reordering
# produced a complete mapping. Avoid clearing items when the table has no rows.
if result_table.rows and len(sorted_items) == len(result_table.rows):
state.last_result_items = sorted_items
except Exception:
logger.exception("Failed to sort result_table and reorder items")
if (
result_table is not None
and hasattr(result_table, "sort_by_title")
and not getattr(result_table, "preserve_order", False)
):
try:
result_table.sort_by_title()
# Re-order items list to match the sorted table
if state.display_items and hasattr(result_table, "rows"):
sorted_items: List[Any] = []
for row in result_table.rows:
src_idx = getattr(row, "source_index", None)
if isinstance(src_idx, int) and 0 <= src_idx < len(state.display_items):
sorted_items.append(state.display_items[src_idx])
if len(sorted_items) == len(result_table.rows):
state.display_items = sorted_items
except Exception:
logger.exception("Failed to sort overlay result_table and reorder items")
def set_last_result_items_only(items: Optional[List[Any]]) -> None:
"""
Store items for @N selection WITHOUT affecting history or saved search data.
"""
state = _get_pipeline_state()
# Store items for immediate @N selection, but DON'T modify last_result_items
# This ensures history contains original search data, not display transformations
state.display_items = items or []
# Clear display table since we're setting items only (CLI will generate table if needed)
state.display_table = None
state.display_subject = None
def restore_previous_result_table() -> bool:
"""
Restore the previous result table from history (for @.. navigation).
"""
state = _get_pipeline_state()
# If we have an active overlay (display items/table), clear it to "go back" to the underlying table
if state.display_items or state.display_table or state.display_subject is not None:
state.display_items = []
state.display_table = None
state.display_subject = None
# If an underlying table exists, we're done.
# Otherwise, fall through to history restore so @.. actually returns to the last table.
if state.last_result_table is not None:
# Ensure subsequent @N selection uses the table the user sees.
state.current_stage_table = state.last_result_table
return True
if not state.result_table_history:
state.current_stage_table = state.last_result_table
return True
if not state.result_table_history:
return False
# Save current state to forward stack before popping
state.result_table_forward.append(
(state.last_result_table, state.last_result_items, state.last_result_subject)
)
# Pop from history and restore
prev = state.result_table_history.pop()
if isinstance(prev, tuple) and len(prev) >= 3:
state.last_result_table, state.last_result_items, state.last_result_subject = prev[0], prev[1], prev[2]
elif isinstance(prev, tuple) and len(prev) == 2:
state.last_result_table, state.last_result_items = prev
state.last_result_subject = None
else:
state.last_result_table, state.last_result_items, state.last_result_subject = None, [], None
# Clear display items so get_last_result_items() falls back to restored items
state.display_items = []
state.display_table = None
state.display_subject = None
# Sync current stage table to the restored view so provider selectors run
# against the correct table type.
state.current_stage_table = state.last_result_table
try:
debug_table_state("restore_previous_result_table")
except Exception:
logger.exception("Failed to debug_table_state during restore_previous_result_table")
return True
def restore_next_result_table() -> bool:
"""
Restore the next result table from forward history (for @,, navigation).
"""
state = _get_pipeline_state()
# If we have an active overlay (display items/table), clear it to "go forward" to the underlying table
if state.display_items or state.display_table or state.display_subject is not None:
state.display_items = []
state.display_table = None
state.display_subject = None
# If an underlying table exists, we're done.
# Otherwise, fall through to forward restore when available.
if state.last_result_table is not None:
# Ensure subsequent @N selection uses the table the user sees.
state.current_stage_table = state.last_result_table
return True
if not state.result_table_forward:
state.current_stage_table = state.last_result_table
return True
if not state.result_table_forward:
return False
# Save current state to history stack before popping forward
state.result_table_history.append(
(state.last_result_table, state.last_result_items, state.last_result_subject)
)
# Pop from forward stack and restore
next_state = state.result_table_forward.pop()
if isinstance(next_state, tuple) and len(next_state) >= 3:
state.last_result_table, state.last_result_items, state.last_result_subject = (
next_state[0], next_state[1], next_state[2]
)
elif isinstance(next_state, tuple) and len(next_state) == 2:
state.last_result_table, state.last_result_items = next_state
state.last_result_subject = None
else:
state.last_result_table, state.last_result_items, state.last_result_subject = None, [], None
# Clear display items so get_last_result_items() falls back to restored items
state.display_items = []
state.display_table = None
state.display_subject = None
# Sync current stage table to the restored view so provider selectors run
# against the correct table type.
state.current_stage_table = state.last_result_table
try:
debug_table_state("restore_next_result_table")
except Exception:
logger.exception("Failed to debug_table_state during restore_next_result_table")
return True
def get_display_table() -> Optional[Any]:
"""
Get the current display overlay table.
"""
state = _get_pipeline_state()
return state.display_table
def get_last_result_subject() -> Optional[Any]:
"""
Get the subject associated with the current result table or overlay.
"""
state = _get_pipeline_state()
if state.display_subject is not None:
return state.display_subject
return state.last_result_subject
def get_last_result_table() -> Optional[Any]:
"""Get the current last result table.
Returns:
The ResultTable object, or None if no table is set
"""
state = _get_pipeline_state()
return state.last_result_table
def get_last_result_items() -> List[Any]:
"""
Get the items available for @N selection.
"""
state = _get_pipeline_state()
# Prioritize items from display commands (get-tag, delete-tag, etc.)
# These are available for immediate @N selection
if state.display_items:
if state.display_table is not None and not _is_selectable_table(state.display_table):
return []
return state.display_items
# Fall back to items from last search/selectable command
if state.last_result_table is None:
return state.last_result_items
if _is_selectable_table(state.last_result_table):
return state.last_result_items
return []
def debug_table_state(label: str = "") -> None:
"""Dump pipeline table and item-buffer state (debug-only).
Useful for diagnosing cases where `@N` selection appears to act on a different
table than the one currently displayed.
"""
if not is_debug_enabled():
return
state = _get_pipeline_state()
def _tbl(name: str, t: Any) -> None:
if t is None:
debug(f"[table] {name}: None")
return
try:
table_type = getattr(t, "table", None)
except Exception:
table_type = None
try:
title = getattr(t, "title", None)
except Exception:
title = None
try:
src_cmd = getattr(t, "source_command", None)
except Exception:
src_cmd = None
try:
src_args = getattr(t, "source_args", None)
except Exception:
src_args = None
try:
no_choice = bool(getattr(t, "no_choice", False))
except Exception:
no_choice = False
try:
preserve_order = bool(getattr(t, "preserve_order", False))
except Exception:
preserve_order = False
try:
row_count = len(getattr(t, "rows", []) or [])
except Exception:
row_count = 0
try:
meta = (
t.get_table_metadata() if hasattr(t, "get_table_metadata") else getattr(t, "table_metadata", None)
)
except Exception:
meta = None
meta_keys = list(meta.keys()) if isinstance(meta, dict) else []
debug(
f"[table] {name}: id={id(t)} class={type(t).__name__} title={repr(title)} table={repr(table_type)} rows={row_count} "
f"source={repr(src_cmd)} source_args={repr(src_args)} no_choice={no_choice} preserve_order={preserve_order} meta_keys={meta_keys}"
)
if label:
debug(f"[table] state: {label}")
_tbl("display_table", getattr(state, "display_table", None))
_tbl("current_stage_table", getattr(state, "current_stage_table", None))
_tbl("last_result_table", getattr(state, "last_result_table", None))
try:
debug(
f"[table] buffers: display_items={len(state.display_items or [])} last_result_items={len(state.last_result_items or [])} "
f"history={len(state.result_table_history or [])} forward={len(state.result_table_forward or [])} last_selection={list(state.last_selection or [])}"
)
except Exception:
logger.exception("Failed to debug_table_state buffers summary")
def get_last_selectable_result_items() -> List[Any]:
"""Get items from the last *selectable* result table, ignoring display-only items.
This is useful when a selection stage should target the last visible selectable table
(e.g., a playlist/search table), even if a prior action command emitted items and
populated display_items.
"""
state = _get_pipeline_state()
if state.last_result_table is None:
return list(state.last_result_items)
if _is_selectable_table(state.last_result_table):
return list(state.last_result_items)
return []
def get_last_result_table_source_command() -> Optional[str]:
"""Get the source command from the last displayed result table.
Returns:
Command name (e.g., 'download-file') or None if not set
"""
state = _get_pipeline_state()
table = state.last_result_table
if table is not None and _is_selectable_table(table) and hasattr(table, "source_command"):
return getattr(table, "source_command")
return None
def get_last_result_table_source_args() -> List[str]:
"""Get the base source arguments from the last displayed result table.
Returns:
List of arguments (e.g., ['https://example.com']) or empty list
"""
state = _get_pipeline_state()
table = state.last_result_table
if table is not None and _is_selectable_table(table) and hasattr(table, "source_args"):
return getattr(table, "source_args") or []
return []
def get_last_result_table_row_selection_args(row_index: int) -> Optional[List[str]]:
"""Get the selection arguments for a specific row in the last result table.
Args:
row_index: Index of the row (0-based)
Returns:
Selection arguments (e.g., ['-item', '3']) or None
"""
state = _get_pipeline_state()
table = state.last_result_table
if table is not None and _is_selectable_table(table) and hasattr(table, "rows"):
rows = table.rows
if 0 <= row_index < len(rows):
row = rows[row_index]
if hasattr(row, "selection_args"):
return getattr(row, "selection_args")
return None
def get_last_result_table_row_selection_action(row_index: int) -> Optional[List[str]]:
"""Get the expanded stage tokens for a row in the last result table."""
state = _get_pipeline_state()
table = state.last_result_table
if table is not None and _is_selectable_table(table) and hasattr(table, "rows"):
rows = table.rows
if 0 <= row_index < len(rows):
row = rows[row_index]
if hasattr(row, "selection_action"):
return getattr(row, "selection_action")
return None
def set_current_stage_table(result_table: Optional[Any]) -> None:
"""Store the current pipeline stage table for @N expansion.
Used by cmdlet that display tabular results (e.g., download-file listing formats)
to make their result table available for @N expansion logic.
Does NOT push to history - purely for command expansion in the current pipeline.
Args:
result_table: The ResultTable object (or None to clear)
"""
state = _get_pipeline_state()
state.current_stage_table = result_table
def get_current_stage_table() -> Optional[Any]:
"""Get the current pipeline stage table (if any)."""
state = _get_pipeline_state()
return state.current_stage_table
def get_current_stage_table_source_command() -> Optional[str]:
"""Get the source command from the current pipeline stage table.
Returns:
Command name (e.g., 'download-file') or None
"""
state = _get_pipeline_state()
table = state.current_stage_table
if table is not None and _is_selectable_table(table) and hasattr(table, "source_command"):
return getattr(table, "source_command")
return None
def get_current_stage_table_source_args() -> List[str]:
"""Get the source arguments from the current pipeline stage table.
Returns:
List of arguments or empty list
"""
state = _get_pipeline_state()
table = state.current_stage_table
if table is not None and _is_selectable_table(table) and hasattr(table, "source_args"):
return getattr(table, "source_args") or []
return []
def get_current_stage_table_row_selection_args(row_index: int) -> Optional[List[str]]:
"""Get the selection arguments for a row in the current pipeline stage table.
Args:
row_index: Index of the row (0-based)
Returns:
Selection arguments or None
"""
state = _get_pipeline_state()
table = state.current_stage_table
if table is not None and _is_selectable_table(table) and hasattr(table, "rows"):
rows = table.rows
if 0 <= row_index < len(rows):
row = rows[row_index]
if hasattr(row, "selection_args"):
return getattr(row, "selection_args")
return None
def get_current_stage_table_row_selection_action(row_index: int) -> Optional[List[str]]:
"""Get the expanded stage tokens for a row in the current stage table."""
state = _get_pipeline_state()
table = state.current_stage_table
if table is not None and _is_selectable_table(table) and hasattr(table, "rows"):
rows = table.rows
if 0 <= row_index < len(rows):
row = rows[row_index]
if hasattr(row, "selection_action"):
return getattr(row, "selection_action")
return None
def get_current_stage_table_row_source_index(row_index: int) -> Optional[int]:
"""Get the original source index for a row in the current stage table.
Useful when the table has been sorted for display but selections should map
back to the original item order (e.g., playlist or provider order).
"""
state = _get_pipeline_state()
table = state.current_stage_table
if table is not None and _is_selectable_table(table) and hasattr(table, "rows"):
rows = table.rows
if 0 <= row_index < len(rows):
row = rows[row_index]
return getattr(row, "source_index", None)
return None
def clear_last_result() -> None:
"""Clear the stored last result table and items."""
state = _get_pipeline_state()
state.last_result_table = None
state.last_result_items = []
state.last_result_subject = None
def _split_pipeline_tokens(tokens: Sequence[str]) -> List[List[str]]:
"""Split example tokens into per-stage command sequences using pipe separators."""
stages: List[List[str]] = []
current: List[str] = []
for token in tokens:
if token == "|":
if current:
stages.append(current)
current = []
continue
current.append(str(token))
if current:
stages.append(current)
return [stage for stage in stages if stage]
class PipelineExecutor:
def __init__(self, *, config_loader: Optional[Any] = None) -> None:
self._config_loader = config_loader
self._toolbar_output: Optional[Callable[[str], None]] = None
def _load_config(self) -> Dict[str, Any]:
try:
if self._config_loader is not None:
return self._config_loader.load()
except Exception:
logger.exception("Failed to use config_loader.load(); falling back to SYS.config.load_config")
try:
from SYS.config import load_config
return load_config()
except Exception:
return {}
def set_toolbar_output(self, output: Optional[Callable[[str], None]]) -> None:
self._toolbar_output = output
@staticmethod
def _split_stages(tokens: Sequence[str]) -> List[List[str]]:
stages: List[List[str]] = []
current: List[str] = []
for token in tokens:
if token == "|":
if current:
stages.append(current)
current = []
else:
current.append(token)
if current:
stages.append(current)
return stages
@staticmethod
def _validate_download_file_relationship_order(stages: List[List[str]]) -> bool:
"""Guard against running add-relationship on unstored download-file results.
Intended UX:
download-file ... | add-file -store <store> | add-relationship
Rationale:
download-file outputs items that may not yet have a stable store+hash.
add-relationship is designed to operate in store/hash mode.
"""
def _norm(name: str) -> str:
return str(name or "").replace("_", "-").strip().lower()
names: List[str] = []
for stage in stages or []:
if not stage:
continue
names.append(_norm(stage[0]))
dl_idxs = [i for i, n in enumerate(names) if n == "download-file"]
rel_idxs = [i for i, n in enumerate(names) if n == "add-relationship"]
add_file_idxs = [i for i, n in enumerate(names) if n == "add-file"]
if not dl_idxs or not rel_idxs:
return True
# If download-file is upstream of add-relationship, require an add-file in between.
for rel_i in rel_idxs:
dl_before = [d for d in dl_idxs if d < rel_i]
if not dl_before:
continue
dl_i = max(dl_before)
if not any(dl_i < a < rel_i for a in add_file_idxs):
print(
"Pipeline order error: when using download-file with add-relationship, "
"add-relationship must come after add-file (so items are stored and have store+hash).\n"
"Example: download-file <...> | add-file -store <store> | add-relationship\n"
)
return False
return True
@staticmethod
def _try_clear_pipeline_stop(ctx: Any) -> None:
try:
if hasattr(ctx, "clear_pipeline_stop"):
ctx.clear_pipeline_stop()
except Exception:
logger.exception("Failed to clear pipeline stop via ctx.clear_pipeline_stop")
@staticmethod
def _maybe_seed_current_stage_table(ctx: Any) -> None:
try:
if hasattr(ctx,
"get_current_stage_table") and not ctx.get_current_stage_table():
display_table = (
ctx.get_display_table() if hasattr(ctx,
"get_display_table") else None
)
if display_table:
ctx.set_current_stage_table(display_table)
else:
last_table = (
ctx.get_last_result_table()
if hasattr(ctx,
"get_last_result_table") else None
)
if last_table:
ctx.set_current_stage_table(last_table)
except Exception:
logger.exception("Failed to seed current_stage_table from display or last table")
@staticmethod
def _maybe_apply_pending_pipeline_tail(ctx: Any,
stages: List[List[str]]) -> List[List[str]]:
try:
pending_tail = (
ctx.get_pending_pipeline_tail()
if hasattr(ctx,
"get_pending_pipeline_tail") else []
)
pending_source = (
ctx.get_pending_pipeline_source()
if hasattr(ctx,
"get_pending_pipeline_source") else None
)
except Exception:
pending_tail = []
pending_source = None
try:
current_source = (
ctx.get_current_stage_table_source_command()
if hasattr(ctx,
"get_current_stage_table_source_command") else None
)
except Exception:
current_source = None
try:
effective_source = current_source or (
ctx.get_last_result_table_source_command()
if hasattr(ctx,
"get_last_result_table_source_command") else None
)
except Exception:
effective_source = current_source
selection_start = bool(
stages and stages[0] and stages[0][0].startswith("@")
)
def _tail_is_suffix(existing: List[List[str]], tail: List[List[str]]) -> bool:
if not tail or not existing:
return False
if len(tail) > len(existing):
return False
return existing[-len(tail):] == tail
if pending_tail and selection_start:
if (pending_source is None) or (effective_source
and pending_source == effective_source):
# Only append the pending tail if the user hasn't already provided it.
if not _tail_is_suffix(stages, pending_tail):
stages = list(stages) + list(pending_tail)
try:
if hasattr(ctx, "clear_pending_pipeline_tail"):
ctx.clear_pending_pipeline_tail()
except Exception:
logger.exception("Failed to clear pending pipeline tail after appending pending tail")
else:
try:
if hasattr(ctx, "clear_pending_pipeline_tail"):
ctx.clear_pending_pipeline_tail()
except Exception:
logger.exception("Failed to clear pending pipeline tail (source mismatch branch)")
return stages
def _apply_quiet_background_flag(self, config: Any) -> Any:
if isinstance(config, dict):
# This executor is used by both the REPL and the `pipeline` subcommand.
# Quiet/background mode is helpful for detached/background runners, but
# it suppresses interactive UX (like the pipeline Live progress UI).
config["_quiet_background_output"] = bool(self._toolbar_output is None)
return config
@staticmethod
def _extract_first_stage_selection_tokens(
stages: List[List[str]],
) -> tuple[List[List[str]],
List[int],
bool,
bool]:
first_stage_tokens = stages[0] if stages else []
first_stage_selection_indices: List[int] = []
first_stage_had_extra_args = False
first_stage_select_all = False
if first_stage_tokens:
new_first_stage: List[str] = []
for token in first_stage_tokens:
if token.startswith("@"): # selection
selection = SelectionSyntax.parse(token)
if selection is not None:
first_stage_selection_indices = sorted(
[i - 1 for i in selection]
)
continue
if token == "@*":
first_stage_select_all = True
continue
new_first_stage.append(token)
if new_first_stage:
stages = list(stages)
stages[0] = new_first_stage
if first_stage_selection_indices or first_stage_select_all:
first_stage_had_extra_args = True
elif first_stage_selection_indices or first_stage_select_all:
stages = list(stages)
stages.pop(0)
return (
stages,
first_stage_selection_indices,
first_stage_had_extra_args,
first_stage_select_all,
)
@staticmethod
def _apply_select_all_if_requested(ctx: Any,
indices: List[int],
select_all: bool) -> List[int]:
if not select_all:
return indices
try:
last_items = ctx.get_last_result_items()
except Exception:
last_items = None
if last_items:
return list(range(len(last_items)))
return indices
@staticmethod
def _maybe_run_class_selector(
ctx: Any,
config: Any,
selected_items: list,
*,
stage_is_last: bool
) -> bool:
if not stage_is_last:
return False
candidates: list[str] = []
seen: set[str] = set()
def _add(value) -> None:
try:
text = str(value or "").strip().lower()
except Exception:
return
if not text or text in seen:
return
seen.add(text)
candidates.append(text)
try:
current_table = ctx.get_current_stage_table() or ctx.get_last_result_table()
_add(
current_table.
table if current_table and hasattr(current_table,
"table") else None
)
# Prefer an explicit provider hint from table metadata when available.
# This keeps @N selectors working even when row payloads don't carry a
# provider key (or when they carry a table-type like tidal.album).
try:
meta = (
current_table.get_table_metadata()
if current_table is not None and hasattr(current_table, "get_table_metadata")
else getattr(current_table, "table_metadata", None)
)
except Exception:
meta = None
if isinstance(meta, dict):
_add(meta.get("provider"))
except Exception:
logger.exception("Failed to inspect current_table/table metadata in _maybe_run_class_selector")
for item in selected_items or []:
if isinstance(item, dict):
_add(item.get("provider"))
_add(item.get("store"))
_add(item.get("table"))
else:
_add(getattr(item, "provider", None))
_add(getattr(item, "store", None))
_add(getattr(item, "table", None))
try:
from ProviderCore.registry import get_provider, is_known_provider_name
except Exception:
get_provider = None # type: ignore
is_known_provider_name = None # type: ignore
# If we have a table-type like "tidal.album", also try its provider prefix ("tidal")
# when that prefix is a registered provider name.
if is_known_provider_name is not None:
try:
for key in list(candidates):
if not isinstance(key, str):
continue
if "." not in key:
continue
if is_known_provider_name(key):
continue
prefix = str(key).split(".", 1)[0].strip().lower()
if prefix and is_known_provider_name(prefix):
_add(prefix)
except Exception:
logger.exception("Failed while computing provider prefix heuristics in _maybe_run_class_selector")
if get_provider is not None:
for key in candidates:
try:
if is_known_provider_name is not None and (
not is_known_provider_name(key)):
continue
except Exception:
# If the predicate fails for any reason, fall back to legacy behavior.
logger.exception("is_known_provider_name predicate failed for key %s; falling back", key)
try:
provider = get_provider(key, config)
except Exception:
continue
selector = getattr(provider, "selector", None)
if selector is None:
continue
try:
handled = bool(
selector(selected_items,
ctx=ctx,
stage_is_last=True)
)
except Exception as exc:
print(f"{key} selector failed: {exc}\n")
return True
if handled:
return True
store_keys: list[str] = []
for item in selected_items or []:
if isinstance(item, dict):
v = item.get("store")
else:
v = getattr(item, "store", None)
name = str(v or "").strip()
if name:
store_keys.append(name)
if store_keys:
try:
from Store.registry import Store as StoreRegistry
store_registry = StoreRegistry(config, suppress_debug=True)
_backend_names = list(store_registry.list_backends() or [])
_backend_by_lower = {
str(n).lower(): str(n)
for n in _backend_names if str(n).strip()
}
for name in store_keys:
resolved_name = name
if not store_registry.is_available(resolved_name):
resolved_name = _backend_by_lower.get(str(name).lower(), name)
if not store_registry.is_available(resolved_name):
continue
backend = store_registry[resolved_name]
selector = getattr(backend, "selector", None)
if selector is None:
continue
handled = bool(
selector(selected_items,
ctx=ctx,
stage_is_last=True)
)
if handled:
return True
except Exception:
logger.exception("Failed while running store-based selector logic in _maybe_run_class_selector")
return False
@staticmethod
def _summarize_stage_text(stage_tokens: Sequence[str], limit: int = 140) -> str:
combined = " ".join(str(tok) for tok in stage_tokens if tok is not None).strip()
if not combined:
return ""
normalized = re.sub(r"\s+", " ", combined)
if len(normalized) <= limit:
return normalized
return normalized[:limit - 3].rstrip() + "..."
@staticmethod
def _log_pipeline_event(
worker_manager: Any,
worker_id: Optional[str],
message: str,
) -> None:
if not worker_manager or not worker_id or not message:
return
try:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
except Exception:
timestamp = ""
if timestamp:
text = f"{timestamp} - PIPELINE - {message}"
else:
text = f"PIPELINE - {message}"
try:
worker_manager.append_stdout(worker_id, text + "\n", channel="log")
except Exception:
logger.exception("Failed to append pipeline event to worker stdout for %s", worker_id)
@staticmethod
def _maybe_open_url_selection(
current_table: Any,
selected_items: list,
*,
stage_is_last: bool
) -> bool:
if not stage_is_last:
return False
if not selected_items or len(selected_items) != 1:
return False
table_type = ""
source_cmd = ""
try:
table_type = str(getattr(current_table, "table", "") or "").strip().lower()
except Exception:
table_type = ""
try:
source_cmd = (
str(getattr(current_table,
"source_command",
"") or "").strip().replace("_",
"-").lower()
)
except Exception:
source_cmd = ""
if table_type != "url" and source_cmd != "get-url":
return False
item = selected_items[0]
url = None
try:
from cmdlet._shared import get_field
url = get_field(item, "url")
except Exception:
try:
url = item.get("url") if isinstance(item,
dict
) else getattr(item,
"url",
None)
except Exception:
url = None
url_text = str(url or "").strip()
if not url_text:
return False
try:
import webbrowser
webbrowser.open(url_text, new=2)
return True
except Exception:
return False
def _maybe_enable_background_notifier(
self,
worker_manager: Any,
config: Any,
pipeline_session: Any
) -> None:
if not (pipeline_session and worker_manager and isinstance(config, dict)):
return
session_worker_ids = config.get("_session_worker_ids")
if not session_worker_ids:
return
try:
output_fn = self._toolbar_output
quiet_mode = bool(config.get("_quiet_background_output"))
terminal_only = quiet_mode and not output_fn
kwargs: Dict[str,
Any] = {
"session_worker_ids": session_worker_ids,
"only_terminal_updates": terminal_only,
"overlay_mode": bool(output_fn),
}
if output_fn:
kwargs["output"] = output_fn
ensure_background_notifier(worker_manager, **kwargs)
except Exception:
logger.exception("Failed to enable background notifier for session_worker_ids=%r", session_worker_ids)
@staticmethod
def _get_raw_stage_texts(ctx: Any) -> List[str]:
raw_stage_texts: List[str] = []
try:
if hasattr(ctx, "get_current_command_stages"):
raw_stage_texts = ctx.get_current_command_stages() or []
except Exception:
raw_stage_texts = []
return raw_stage_texts
def _maybe_apply_initial_selection(
self,
ctx: Any,
config: Any,
stages: List[List[str]],
*,
selection_indices: List[int],
first_stage_had_extra_args: bool,
worker_manager: Any,
pipeline_session: Any,
) -> tuple[bool,
Any]:
if not selection_indices:
return True, None
# Selection should operate on the *currently displayed* selectable table.
# Some navigation flows (e.g. @.. back) can show a display table without
# updating current_stage_table. Provider selectors rely on current_stage_table
# to detect table type (e.g. tidal.album -> tracks), so sync it here.
display_table = None
try:
display_table = (
ctx.get_display_table() if hasattr(ctx, "get_display_table") else None
)
except Exception:
display_table = None
current_stage_table = None
try:
current_stage_table = (
ctx.get_current_stage_table()
if hasattr(ctx, "get_current_stage_table") else None
)
except Exception:
current_stage_table = None
try:
if display_table is not None and hasattr(ctx, "set_current_stage_table"):
ctx.set_current_stage_table(display_table)
elif current_stage_table is None and hasattr(ctx, "set_current_stage_table"):
last_table = (
ctx.get_last_result_table()
if hasattr(ctx, "get_last_result_table") else None
)
if last_table is not None:
ctx.set_current_stage_table(last_table)
except Exception:
logger.exception("Failed to sync current_stage_table from display/last table in _maybe_apply_initial_selection")
source_cmd = None
source_args_raw = None
try:
source_cmd = ctx.get_current_stage_table_source_command()
source_args_raw = ctx.get_current_stage_table_source_args()
except Exception:
source_cmd = None
source_args_raw = None
if isinstance(source_args_raw, str):
source_args: List[str] = [source_args_raw]
elif isinstance(source_args_raw, list):
source_args = [str(x) for x in source_args_raw if x is not None]
else:
source_args = []
current_table = None
try:
current_table = ctx.get_current_stage_table()
except Exception:
current_table = None
table_type = (
current_table.table if current_table and hasattr(current_table,
"table") else None
)
command_expanded = False
example_selector_triggered = False
normalized_source_cmd = str(source_cmd or "").replace("_", "-").strip().lower()
if normalized_source_cmd in HELP_EXAMPLE_SOURCE_COMMANDS and selection_indices:
try:
idx = selection_indices[0]
row_args = ctx.get_current_stage_table_row_selection_args(idx)
except Exception:
row_args = None
tokens: List[str] = []
if isinstance(row_args, list) and row_args:
tokens = [str(x) for x in row_args if x is not None]
if tokens:
stage_groups = _split_pipeline_tokens(tokens)
if stage_groups:
for stage in reversed(stage_groups):
stages.insert(0, stage)
selection_indices = []
command_expanded = True
example_selector_triggered = True
if not example_selector_triggered:
if table_type in {"youtube",
"soulseek"}:
command_expanded = False
elif source_cmd == "search-file" and source_args and "youtube" in source_args:
command_expanded = False
else:
selected_row_args: List[str] = []
skip_pipe_expansion = source_cmd in {".pipe", ".mpv"} and len(stages) > 0
# Command expansion via @N:
# - Default behavior: expand ONLY for single-row selections.
# - Special case: allow multi-row expansion for add-file directory tables by
# combining selected rows into a single `-path file1,file2,...` argument.
if source_cmd and not skip_pipe_expansion:
src = str(source_cmd).replace("_", "-").strip().lower()
if src == "add-file" and selection_indices:
row_args_list: List[List[str]] = []
for idx in selection_indices:
try:
row_args = ctx.get_current_stage_table_row_selection_args(
idx
)
except Exception:
row_args = None
if isinstance(row_args, list) and row_args:
row_args_list.append(
[str(x) for x in row_args if x is not None]
)
# Combine `['-path', <file>]` from each row into one `-path` arg.
paths: List[str] = []
can_merge = bool(row_args_list) and (
len(row_args_list) == len(selection_indices)
)
if can_merge:
for ra in row_args_list:
if len(ra) == 2 and str(ra[0]).strip().lower() in {
"-path",
"--path",
"-p",
}:
p = str(ra[1]).strip()
if p:
paths.append(p)
else:
can_merge = False
break
if can_merge and paths:
selected_row_args.extend(["-path", ",".join(paths)])
elif len(selection_indices) == 1 and row_args_list:
selected_row_args.extend(row_args_list[0])
else:
# Only perform @N command expansion for *single-item* selections.
# For multi-item selections (e.g. @*, @1-5), expanding to one row
# would silently drop items. In those cases we pipe items downstream.
if len(selection_indices) == 1:
idx = selection_indices[0]
row_args = ctx.get_current_stage_table_row_selection_args(idx)
if row_args:
selected_row_args.extend(row_args)
if selected_row_args:
if isinstance(source_cmd, list):
cmd_list: List[str] = [str(x) for x in source_cmd if x is not None]
elif isinstance(source_cmd, str):
cmd_list = [source_cmd]
else:
cmd_list = []
# IMPORTANT: Put selected row args *before* source_args.
# Rationale: The cmdlet argument parser treats the *first* unknown
# token as a positional value (e.g., URL). If `source_args`
# contain unknown flags (like -provider which download-file does
# not declare), they could be misinterpreted as the positional
# URL argument and cause attempts to download strings like
# "-provider" (which is invalid). By placing selection args
# first we ensure the intended URL/selection token is parsed
# as the positional URL and avoid this class of parsing errors.
expanded_stage: List[str] = cmd_list + selected_row_args + source_args
if first_stage_had_extra_args and stages:
expanded_stage += stages[0]
stages[0] = expanded_stage
else:
stages.insert(0, expanded_stage)
if pipeline_session and worker_manager:
try:
worker_manager.log_step(
pipeline_session.worker_id,
f"@N expansion: {source_cmd} + selected_args={selected_row_args} + source_args={source_args}",
)
except Exception:
logger.exception("Failed to record pipeline log step for @N expansion (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None))
stage_table = None
try:
stage_table = ctx.get_current_stage_table()
except Exception:
stage_table = None
display_table = None
try:
display_table = (
ctx.get_display_table() if hasattr(ctx,
"get_display_table") else None
)
except Exception:
display_table = None
if not stage_table and display_table is not None:
stage_table = display_table
if not stage_table:
try:
stage_table = ctx.get_last_result_table()
except Exception:
stage_table = None
# Prefer selecting from the last selectable *table* (search/playlist)
# rather than from display-only emitted items, unless we're explicitly
# selecting from an overlay table.
try:
if display_table is not None and stage_table is display_table:
items_list = ctx.get_last_result_items() or []
else:
if hasattr(ctx, "get_last_selectable_result_items"):
items_list = ctx.get_last_selectable_result_items() or []
else:
items_list = ctx.get_last_result_items() or []
except Exception:
items_list = []
resolved_items = items_list if items_list else []
if items_list:
filtered = [
resolved_items[i] for i in selection_indices
if 0 <= i < len(resolved_items)
]
if not filtered:
print("No items matched selection in pipeline\n")
return False, None
# Provider selection expansion (non-terminal): allow certain provider tables
# (e.g. tidal.album) to expand to multiple downstream items when the user
# pipes into another stage (e.g. @N | .mpv or @N | add-file).
table_type_hint = None
try:
table_type_hint = (
stage_table.table
if stage_table is not None and hasattr(stage_table, "table")
else None
)
except Exception:
table_type_hint = None
if stages and isinstance(table_type_hint, str) and table_type_hint.strip().lower() == "tidal.album":
try:
from ProviderCore.registry import get_provider
prov = get_provider("tidal", config)
except Exception:
prov = None
if prov is not None and hasattr(prov, "_extract_album_selection_context") and hasattr(prov, "_tracks_for_album"):
try:
album_contexts = prov._extract_album_selection_context(filtered) # type: ignore[attr-defined]
except Exception:
album_contexts = []
track_items: List[Any] = []
seen_track_ids: set[int] = set()
for album_id, album_title, artist_name in album_contexts or []:
try:
track_results = prov._tracks_for_album( # type: ignore[attr-defined]
album_id=album_id,
album_title=album_title,
artist_name=artist_name,
limit=500,
)
except Exception:
track_results = []
for tr in track_results or []:
try:
md = getattr(tr, "full_metadata", None)
tid = None
if isinstance(md, dict):
raw_id = md.get("trackId") or md.get("id")
try:
tid = int(raw_id) if raw_id is not None else None
except Exception:
tid = None
if tid is not None:
if tid in seen_track_ids:
continue
seen_track_ids.add(tid)
except Exception:
logger.exception("Failed to extract/parse track metadata in album processing")
track_items.append(tr)
if track_items:
filtered = track_items
table_type_hint = "tidal.track"
if PipelineExecutor._maybe_run_class_selector(
ctx,
config,
filtered,
stage_is_last=(not stages)):
return False, None
from cmdlet._shared import coerce_to_pipe_object
filtered_pipe_objs = [coerce_to_pipe_object(item) for item in filtered]
piped_result = (
filtered_pipe_objs
if len(filtered_pipe_objs) > 1 else filtered_pipe_objs[0]
)
if pipeline_session and worker_manager:
try:
selection_parts = [f"@{i+1}" for i in selection_indices]
worker_manager.log_step(
pipeline_session.worker_id,
f"Applied @N selection {' | '.join(selection_parts)}",
)
except Exception:
logger.exception("Failed to record Applied @N selection log step (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None))
# Auto-insert downloader stages for provider tables.
try:
current_table = ctx.get_current_stage_table()
if current_table is None and hasattr(ctx, "get_display_table"):
current_table = ctx.get_display_table()
if current_table is None:
current_table = ctx.get_last_result_table()
except Exception:
logger.exception("Failed to determine current_table for selection auto-insert; defaulting to None")
current_table = None
table_type = None
try:
if isinstance(table_type_hint, str) and table_type_hint.strip():
table_type = table_type_hint
else:
table_type = (
current_table.table
if current_table and hasattr(current_table, "table") else None
)
except Exception:
logger.exception("Failed to compute table_type from current_table; using fallback attribute access")
table_type = (
current_table.table
if current_table and hasattr(current_table, "table") else None
)
def _norm_cmd(name: Any) -> str:
return str(name or "").replace("_", "-").strip().lower()
auto_stage = None
if isinstance(table_type, str) and table_type:
try:
from ProviderCore.registry import selection_auto_stage_for_table
auto_stage = selection_auto_stage_for_table(table_type)
except Exception:
auto_stage = None
source_cmd_for_selection = None
source_args_for_selection: List[str] = []
try:
source_cmd_for_selection = (
ctx.get_current_stage_table_source_command()
or ctx.get_last_result_table_source_command()
)
source_args_for_selection = (
ctx.get_current_stage_table_source_args()
or ctx.get_last_result_table_source_args()
or []
)
except Exception:
source_cmd_for_selection = None
source_args_for_selection = []
if not stages and selection_indices and source_cmd_for_selection:
src_norm = _norm_cmd(source_cmd_for_selection)
if src_norm in {".worker", "worker", "workers"}:
if len(selection_indices) == 1:
idx = selection_indices[0]
row_args = None
try:
row_args = ctx.get_current_stage_table_row_selection_args(idx)
except Exception:
row_args = None
if not row_args:
try:
row_args = ctx.get_last_result_table_row_selection_args(idx)
except Exception:
row_args = None
if not row_args:
try:
items = ctx.get_last_result_items() or []
if 0 <= idx < len(items):
maybe = items[idx]
if isinstance(maybe, dict):
candidate = maybe.get("_selection_args")
if isinstance(candidate, (list, tuple)):
row_args = [str(x) for x in candidate if x is not None]
except Exception:
row_args = row_args or None
if row_args:
stages.append(
[str(source_cmd_for_selection)]
+ [str(x) for x in row_args if x is not None]
+ [str(x) for x in source_args_for_selection if x is not None]
)
def _apply_row_action_to_stage(stage_idx: int) -> bool:
if not selection_indices or len(selection_indices) != 1:
return False
try:
row_action = ctx.get_current_stage_table_row_selection_action(
selection_indices[0]
)
except Exception:
row_action = None
if not row_action:
# Fallback to serialized payload when the table row is unavailable
try:
items = ctx.get_last_result_items() or []
if 0 <= selection_indices[0] < len(items):
maybe = items[selection_indices[0]]
if isinstance(maybe, dict):
candidate = maybe.get("_selection_action")
if isinstance(candidate, (list, tuple)):
row_action = [str(x) for x in candidate if x is not None]
debug(f"@N row {selection_indices[0]} restored action from payload: {row_action}")
except Exception:
row_action = row_action or None
if not row_action:
debug(f"@N row {selection_indices[0]} has no selection_action")
return False
normalized = [str(x) for x in row_action if x is not None]
if not normalized:
return False
debug(f"Applying row action for row {selection_indices[0]} -> {normalized}")
if 0 <= stage_idx < len(stages):
debug(f"Replacing stage {stage_idx} {stages[stage_idx]} with row action {normalized}")
stages[stage_idx] = normalized
return True
return False
if not stages:
if isinstance(table_type, str) and table_type.startswith("metadata."):
print("Auto-applying metadata selection via get-tag")
stages.append(["get-tag"])
elif auto_stage:
try:
print(f"Auto-running selection via {auto_stage[0]}")
except Exception:
logger.exception("Failed to print auto-run selection message for %s", auto_stage[0])
# Append the auto stage now. If the user also provided a selection
# (e.g., @1 | add-file ...), we want to attach the row selection
# args *to the auto-inserted stage* so the download command receives
# the selected row information immediately.
stages.append(list(auto_stage))
debug(f"Inserted auto stage before row action: {stages[-1]}")
# If the caller included a selection (e.g., @1) try to attach
# the selection args immediately to the inserted auto stage so
# the expansion is effective in a single pass.
if selection_indices:
try:
if not _apply_row_action_to_stage(len(stages) - 1):
# Only support single-row selection for auto-attach here
if len(selection_indices) == 1:
idx = selection_indices[0]
row_args = ctx.get_current_stage_table_row_selection_args(idx)
if not row_args:
try:
items = ctx.get_last_result_items() or []
if 0 <= idx < len(items):
maybe = items[idx]
if isinstance(maybe, dict):
candidate = maybe.get("_selection_args")
if isinstance(candidate, (list, tuple)):
row_args = [str(x) for x in candidate if x is not None]
except Exception:
row_args = row_args or None
if row_args:
# Place selection args before any existing source args
inserted = stages[-1]
if inserted:
cmd = inserted[0]
tail = [str(x) for x in inserted[1:]]
stages[-1] = [cmd] + [str(x) for x in row_args] + tail
except Exception:
logger.exception("Failed to attach selection args to auto-inserted stage")
# If no auto stage inserted and there are selection-action tokens available
# for the single selected row, apply it as the pipeline stage so a bare
# `@N` runs the intended action (e.g., get-file for hash-backed rows).
if not stages and selection_indices and len(selection_indices) == 1:
try:
idx = selection_indices[0]
debug(f"@N initial selection idx={idx} last_items={len(ctx.get_last_result_items() or [])}")
row_action = None
try:
row_action = ctx.get_current_stage_table_row_selection_action(idx)
except Exception:
logger.exception("Failed to get current_stage_table row selection action for idx %s", idx)
row_action = None
if not row_action:
try:
items = ctx.get_last_result_items() or []
if 0 <= idx < len(items):
maybe = items[idx]
try:
if isinstance(maybe, dict):
debug(f"@N payload: hash={maybe.get('hash')} store={maybe.get('store')} _selection_args={maybe.get('_selection_args')} _selection_action={maybe.get('_selection_action')}")
else:
debug(f"@N payload object type: {type(maybe).__name__}")
except Exception:
logger.exception("Failed to debug selection payload for index %s", idx)
if isinstance(maybe, dict):
candidate = maybe.get("_selection_action")
if isinstance(candidate, (list, tuple)):
row_action = [str(x) for x in candidate if x is not None]
except Exception:
row_action = None
if row_action:
debug(f"@N applying row action -> {row_action}")
stages.append(row_action)
if pipeline_session and worker_manager:
try:
worker_manager.log_step(
pipeline_session.worker_id,
f"@N applied row action -> {' '.join(row_action)}",
)
except Exception:
logger.exception("Failed to record pipeline log step for applied row action (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None))
except Exception:
logger.exception("Failed to apply single-row selection action")
stages.append(row_action)
if pipeline_session and worker_manager:
try:
worker_manager.log_step(
pipeline_session.worker_id,
f"@N applied row action -> {' '.join(row_action)}",
)
except Exception:
logger.exception("Failed to record pipeline log step for applied row action (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None))
else:
first_cmd = stages[0][0] if stages and stages[0] else None
if isinstance(table_type, str) and table_type.startswith("metadata.") and first_cmd not in (
"get-tag",
"get_tag",
".pipe",
".mpv",
):
print("Auto-inserting get-tag after metadata selection")
stages.insert(0, ["get-tag"])
elif auto_stage:
first_cmd_norm = _norm_cmd(first_cmd)
auto_cmd_norm = _norm_cmd(auto_stage[0])
if first_cmd_norm not in (auto_cmd_norm, ".pipe", ".mpv"):
debug(f"Auto-inserting {auto_cmd_norm} after selection")
# Insert the auto stage before the user-specified stage
# Note: Do NOT append source_args here - they are search tokens from
# the previous stage and should not be passed to the downloader.
stages.insert(0, list(auto_stage))
debug(f"Inserted auto stage before existing pipeline: {stages[0]}")
# If a selection is present, attach the row selection args to the
# newly-inserted stage so the download stage runs with the
# selected row information.
if selection_indices:
try:
if not _apply_row_action_to_stage(0):
if len(selection_indices) == 1:
idx = selection_indices[0]
row_args = ctx.get_current_stage_table_row_selection_args(idx)
if not row_args:
try:
items = ctx.get_last_result_items() or []
if 0 <= idx < len(items):
maybe = items[idx]
if isinstance(maybe, dict):
candidate = maybe.get("_selection_args")
if isinstance(candidate, (list, tuple)):
row_args = [str(x) for x in candidate if x is not None]
except Exception:
row_args = row_args or None
if row_args:
inserted = stages[0]
if inserted:
cmd = inserted[0]
tail = [str(x) for x in inserted[1:]]
stages[0] = [cmd] + [str(x) for x in row_args] + tail
except Exception:
logger.exception("Failed to attach selection args to inserted auto stage (alternate branch)")
# After inserting/appending an auto-stage, continue processing so later
# selection-expansion logic can still run (e.g., for example selectors).
return True, piped_result
else:
print("No previous results to select from\n")
return False, None
return True, None
@staticmethod
def _maybe_start_live_progress(config: Any,
stages: List[List[str]]) -> tuple[Any,
Dict[int,
int]]:
progress_ui = None
pipe_index_by_stage: Dict[int,
int] = {}
try:
quiet_mode = (
bool(config.get("_quiet_background_output"))
if isinstance(config,
dict) else False
)
except Exception:
quiet_mode = False
try:
import sys as _sys
if (not quiet_mode) and bool(getattr(_sys.stderr,
"isatty", lambda: False)()):
from SYS.models import PipelineLiveProgress
pipe_stage_indices: List[int] = []
pipe_labels: List[str] = []
for idx, stage_tokens in enumerate(stages):
if not stage_tokens:
continue
name = str(stage_tokens[0]).replace("_", "-").lower()
if name == "@" or name.startswith("@"):
continue
# add-file directory selector stage: avoid Live progress so the
# selection table renders cleanly.
if name in {"add-file",
"add_file"}:
try:
from pathlib import Path as _Path
toks = list(stage_tokens[1:])
i = 0
while i < len(toks):
t = str(toks[i])
low = t.lower().strip()
if low in {"-path",
"--path",
"-p"} and i + 1 < len(toks):
nxt = str(toks[i + 1])
if nxt and ("," not in nxt):
p = _Path(nxt)
if p.exists() and p.is_dir():
name = "" # mark as skipped
break
i += 2
continue
i += 1
except Exception:
logger.exception("Failed to inspect add-file stage tokens for potential directory; skipping Live progress")
if not name:
continue
# Display-only: avoid Live progress for relationship viewing.
# This keeps `@1 | get-relationship` clean and prevents progress UI
# from interfering with Rich tables/panels.
if name in {"get-relationship",
"get-rel"}:
continue
if name in {"get-metadata",
"meta"}:
continue
# `.pipe` (MPV) is an interactive launcher; disable pipeline Live progress
# for it because it doesn't meaningfully "complete" (mpv may keep running)
# and Live output interferes with MPV playlist UI.
if name in {".pipe", ".mpv"}:
continue
# `.matrix` uses a two-phase picker (@N then .matrix -send). Pipeline Live
# progress can linger across those phases and interfere with interactive output.
if name == ".matrix":
continue
# `delete-file` prints a Rich table directly; Live progress interferes and
# can truncate/overwrite the output.
if name in {"delete-file",
"del-file"}:
continue
pipe_stage_indices.append(idx)
pipe_labels.append(name)
if pipe_labels:
progress_ui = PipelineLiveProgress(pipe_labels, enabled=True)
progress_ui.start()
try:
from SYS import pipeline as _pipeline_ctx
if hasattr(_pipeline_ctx, "set_live_progress"):
_pipeline_ctx.set_live_progress(progress_ui)
except Exception:
logger.exception("Failed to register PipelineLiveProgress with pipeline context")
pipe_index_by_stage = {
stage_idx: pipe_idx
for pipe_idx, stage_idx in enumerate(pipe_stage_indices)
}
except Exception:
progress_ui = None
pipe_index_by_stage = {}
return progress_ui, pipe_index_by_stage
def execute_tokens(self, tokens: List[str]) -> None:
from cmdlet import REGISTRY
ctx = sys.modules[__name__]
try:
self._try_clear_pipeline_stop(ctx)
# REPL guard: stage-local tables should not persist across independent
# commands. Selection stages can always seed from last/display tables.
try:
if hasattr(ctx, "set_current_stage_table"):
ctx.set_current_stage_table(None)
except Exception:
logger.exception("Failed to clear current_stage_table in execute_tokens")
# Preflight (URL-duplicate prompts, etc.) should be cached within a single
# pipeline run, not across independent pipelines.
try:
ctx.store_value("preflight",
{})
except Exception:
logger.exception("Failed to set preflight cache in execute_tokens")
stages = self._split_stages(tokens)
if not stages:
print("Invalid pipeline syntax\n")
return
self._maybe_seed_current_stage_table(ctx)
stages = self._maybe_apply_pending_pipeline_tail(ctx, stages)
config = self._load_config()
config = self._apply_quiet_background_flag(config)
(
stages,
first_stage_selection_indices,
first_stage_had_extra_args,
first_stage_select_all,
) = self._extract_first_stage_selection_tokens(stages)
first_stage_selection_indices = self._apply_select_all_if_requested(
ctx,
first_stage_selection_indices,
first_stage_select_all
)
piped_result: Any = None
worker_manager = WorkerManagerRegistry.ensure(config)
pipeline_text = " | ".join(" ".join(stage) for stage in stages)
pipeline_session = WorkerStages.begin_pipeline(
worker_manager,
pipeline_text=pipeline_text,
config=config
)
if pipeline_session and worker_manager:
self._log_pipeline_event(
worker_manager,
pipeline_session.worker_id,
f"Pipeline start: {pipeline_text or '(empty pipeline)'}",
)
raw_stage_texts = self._get_raw_stage_texts(ctx)
self._maybe_enable_background_notifier(
worker_manager,
config,
pipeline_session
)
pipeline_status = "completed"
pipeline_error = ""
progress_ui = None
pipe_index_by_stage: Dict[int,
int] = {}
ok, initial_piped = self._maybe_apply_initial_selection(
ctx,
config,
stages,
selection_indices=first_stage_selection_indices,
first_stage_had_extra_args=first_stage_had_extra_args,
worker_manager=worker_manager,
pipeline_session=pipeline_session,
)
if not ok:
return
if initial_piped is not None:
piped_result = initial_piped
# REPL guard: prevent add-relationship before add-file for download-file pipelines.
if not self._validate_download_file_relationship_order(stages):
pipeline_status = "failed"
pipeline_error = "Invalid pipeline order"
return
# ------------------------------------------------------------------
# Multi-level pipeline progress (pipes = stages, tasks = items)
# ------------------------------------------------------------------
progress_ui, pipe_index_by_stage = self._maybe_start_live_progress(config, stages)
for stage_index, stage_tokens in enumerate(stages):
if not stage_tokens:
continue
raw_stage_name = str(stage_tokens[0])
cmd_name = raw_stage_name.replace("_", "-").lower()
stage_args = stage_tokens[1:]
if cmd_name == "@":
# Prefer piping the last emitted/visible items (e.g. add-file results)
# over the result-table subject. The subject can refer to older context
# (e.g. a playlist row) and may not contain store+hash.
last_items = None
try:
last_items = ctx.get_last_result_items()
except Exception:
last_items = None
if last_items:
from cmdlet._shared import coerce_to_pipe_object
try:
pipe_items = [
coerce_to_pipe_object(x) for x in list(last_items)
]
except Exception:
pipe_items = list(last_items)
piped_result = pipe_items if len(pipe_items
) > 1 else pipe_items[0]
try:
ctx.set_last_items(pipe_items)
except Exception:
logger.exception("Failed to set last items after @ selection")
if pipeline_session and worker_manager:
try:
worker_manager.log_step(
pipeline_session.worker_id,
"@ used last result items"
)
except Exception:
logger.exception("Failed to record pipeline log step for '@ used last result items' (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None))
continue
subject = ctx.get_last_result_subject()
if subject is None:
print("No current result context available for '@'\n")
pipeline_status = "failed"
pipeline_error = "No result items/subject for @"
return
piped_result = subject
try:
subject_items = subject if isinstance(subject,
list) else [subject]
ctx.set_last_items(subject_items)
except Exception:
logger.exception("Failed to set last_items from subject during @ handling")
if pipeline_session and worker_manager:
try:
worker_manager.log_step(
pipeline_session.worker_id,
"@ used current table subject"
)
except Exception:
logger.exception("Failed to record pipeline log step for '@ used current table subject' (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None))
continue
if cmd_name.startswith("@"): # selection stage
selection_token = raw_stage_name
selection = SelectionSyntax.parse(selection_token)
filter_spec = SelectionFilterSyntax.parse(selection_token)
is_select_all = selection_token.strip() == "@*"
if selection is None and filter_spec is None and not is_select_all:
print(f"Invalid selection: {selection_token}\n")
pipeline_status = "failed"
pipeline_error = f"Invalid selection {selection_token}"
return
selected_indices = []
# Prefer selecting from the last selectable *table* (search/playlist)
# rather than from display-only emitted items, unless we're explicitly
# selecting from an overlay table.
display_table = None
try:
display_table = (
ctx.get_display_table()
if hasattr(ctx,
"get_display_table") else None
)
except Exception:
display_table = None
stage_table = ctx.get_current_stage_table()
# Selection should operate on the table the user sees.
# If a display overlay table exists, force it as the current-stage table
# so provider selectors (e.g. tidal.album -> tracks) behave consistently.
try:
if display_table is not None and hasattr(ctx, "set_current_stage_table"):
ctx.set_current_stage_table(display_table)
stage_table = display_table
except Exception:
logger.exception("Failed to set current_stage_table from display table during selection processing")
if not stage_table and display_table is not None:
stage_table = display_table
if not stage_table:
stage_table = ctx.get_last_result_table()
try:
if hasattr(ctx, "debug_table_state"):
ctx.debug_table_state(f"selection {selection_token}")
except Exception:
logger.exception("Failed to debug_table_state during selection %s", selection_token)
if display_table is not None and stage_table is display_table:
items_list = ctx.get_last_result_items() or []
else:
if hasattr(ctx, "get_last_selectable_result_items"):
items_list = ctx.get_last_selectable_result_items(
) or []
else:
items_list = ctx.get_last_result_items() or []
if is_select_all:
selected_indices = list(range(len(items_list)))
elif filter_spec is not None:
selected_indices = [
i for i, item in enumerate(items_list)
if SelectionFilterSyntax.matches(item, filter_spec)
]
else:
selected_indices = sorted(
[i - 1 for i in selection]
) # type: ignore[arg-type]
resolved_items = items_list if items_list else []
filtered = [
resolved_items[i] for i in selected_indices
if 0 <= i < len(resolved_items)
]
# Debug: show selection resolution and sample payload info
try:
debug(f"Selection {selection_token} -> resolved_indices={selected_indices} filtered_count={len(filtered)}")
if filtered:
sample = filtered[0]
if isinstance(sample, dict):
debug(f"Selection sample: hash={sample.get('hash')} store={sample.get('store')} _selection_args={sample.get('_selection_args')} _selection_action={sample.get('_selection_action')}")
else:
try:
debug(f"Selection sample object: provider={getattr(sample, 'provider', None)} store={getattr(sample, 'store', None)}")
except Exception:
logger.exception("Failed to debug selection sample object")
except Exception:
logger.exception("Failed to produce selection debug sample for token %s", selection_token)
if not filtered:
print("No items matched selection\n")
pipeline_status = "failed"
pipeline_error = "Empty selection"
return
# Filter UX: if the stage token is a filter and it's terminal,
# render a filtered table overlay rather than selecting/auto-downloading.
stage_is_last = (stage_index + 1 >= len(stages))
if filter_spec is not None and stage_is_last:
try:
base_table = stage_table
if base_table is None:
base_table = ctx.get_last_result_table()
if base_table is not None and hasattr(base_table, "copy_with_title"):
new_table = base_table.copy_with_title(getattr(base_table, "title", "") or "Results")
else:
new_table = Table(getattr(base_table, "title", "") if base_table is not None else "Results")
try:
if base_table is not None and getattr(base_table, "table", None):
new_table.set_table(str(getattr(base_table, "table")))
except Exception:
logger.exception("Failed to set table on new_table for filter overlay")
try:
# Attach a one-line header so users see the active filter.
safe = str(selection_token)[1:].strip()
new_table.set_header_line(f'filter: "{safe}"')
except Exception:
logger.exception("Failed to set header line for filter overlay for token %s", selection_token)
for item in filtered:
new_table.add_result(item)
try:
ctx.set_last_result_table_overlay(new_table, items=list(filtered), subject=ctx.get_last_result_subject())
except Exception:
logger.exception("Failed to set last_result_table_overlay for filter selection")
try:
stdout_console().print()
stdout_console().print(new_table)
except Exception:
logger.exception("Failed to render filter overlay to stdout_console")
except Exception:
logger.exception("Failed while rendering filter overlay for selection %s", selection_token)
continue
# UX: selecting a single URL row from get-url tables should open it.
# Only do this when the selection stage is terminal to avoid surprising
# side-effects in pipelines like `@1 | download-file`.
current_table = ctx.get_current_stage_table(
) or ctx.get_last_result_table()
if (not is_select_all) and (len(filtered) == 1):
try:
PipelineExecutor._maybe_open_url_selection(
current_table,
filtered,
stage_is_last=(stage_index + 1 >= len(stages)),
)
except Exception:
logger.exception("Failed to open URL selection for table %s", getattr(current_table, 'table', None))
if PipelineExecutor._maybe_run_class_selector(
ctx,
config,
filtered,
stage_is_last=(stage_index + 1 >= len(stages))):
return
# Special case: selecting multiple tags from get-tag and piping into delete-tag
# should batch into a single operation (one backend call).
next_cmd = None
try:
if stage_index + 1 < len(stages) and stages[stage_index + 1]:
next_cmd = str(stages[stage_index + 1][0]
).replace("_",
"-").lower()
except Exception:
logger.exception("Failed to determine next_cmd during selection expansion for stage_index %s", stage_index)
next_cmd = None
def _is_tag_row(obj: Any) -> bool:
try:
if (hasattr(obj,
"__class__")
and obj.__class__.__name__ == "TagItem"
and hasattr(obj,
"tag_name")):
return True
except Exception:
logger.exception("Failed to inspect TagItem object while checking _is_tag_row")
try:
if isinstance(obj, dict) and obj.get("tag_name"):
return True
except Exception:
logger.exception("Failed to inspect dict tag_name while checking _is_tag_row")
return False
if (next_cmd in {"delete-tag",
"delete_tag"} and len(filtered) > 1
and all(_is_tag_row(x) for x in filtered)):
from cmdlet._shared import get_field
tags: List[str] = []
first_hash = None
first_store = None
first_path = None
for item in filtered:
tag_name = get_field(item, "tag_name")
if tag_name:
tags.append(str(tag_name))
if first_hash is None:
first_hash = get_field(item, "hash")
if first_store is None:
first_store = get_field(item, "store")
if first_path is None:
first_path = get_field(item,
"path") or get_field(
item,
"target"
)
if tags:
grouped = {
"table": "tag.selection",
"media_kind": "tag",
"hash": first_hash,
"store": first_store,
"path": first_path,
"tag": tags,
}
piped_result = grouped
continue
from cmdlet._shared import coerce_to_pipe_object
filtered_pipe_objs = [
coerce_to_pipe_object(item) for item in filtered
]
piped_result = (
filtered_pipe_objs
if len(filtered_pipe_objs) > 1 else filtered_pipe_objs[0]
)
current_table = ctx.get_current_stage_table(
) or ctx.get_last_result_table()
table_type = (
current_table.table
if current_table and hasattr(current_table,
"table") else None
)
def _norm_stage_cmd(name: Any) -> str:
return str(name or "").replace("_", "-").strip().lower()
next_cmd = None
if stage_index + 1 < len(stages) and stages[stage_index + 1]:
next_cmd = _norm_stage_cmd(stages[stage_index + 1][0])
auto_stage = None
if isinstance(table_type, str) and table_type:
try:
from ProviderCore.registry import selection_auto_stage_for_table
# Preserve historical behavior: only forward selection-stage args
# to the auto stage when we are appending a new last stage.
at_end = bool(stage_index + 1 >= len(stages))
auto_stage = selection_auto_stage_for_table(
table_type,
stage_args if at_end else None,
)
except Exception:
auto_stage = None
# Auto-insert downloader stages for provider tables.
# IMPORTANT: do not auto-download for filter selections; they may match many rows.
if filter_spec is None:
if stage_index + 1 >= len(stages):
if auto_stage:
try:
print(f"Auto-running selection via {auto_stage[0]}")
except Exception:
logger.exception("Failed to print auto-run selection message for %s", auto_stage[0])
stages.append(list(auto_stage))
else:
if auto_stage:
auto_cmd = _norm_stage_cmd(auto_stage[0])
if next_cmd not in (auto_cmd, ".pipe", ".mpv"):
debug(f"Auto-inserting {auto_cmd} after selection")
stages.insert(stage_index + 1, list(auto_stage))
continue
cmd_fn = REGISTRY.get(cmd_name)
if not cmd_fn:
try:
mod = import_cmd_module(cmd_name)
data = getattr(mod, "CMDLET", None) if mod else None
if data and hasattr(data, "exec") and callable(getattr(data, "exec")):
run_fn = getattr(data, "exec")
REGISTRY[cmd_name] = run_fn
cmd_fn = run_fn
except Exception:
cmd_fn = None
if not cmd_fn:
print(f"Unknown command: {cmd_name}\n")
pipeline_status = "failed"
pipeline_error = f"Unknown command: {cmd_name}"
return
try:
from SYS.models import PipelineStageContext
pipe_idx = pipe_index_by_stage.get(stage_index)
overlay_table: Any | None = None
session = WorkerStages.begin_stage(
worker_manager,
cmd_name=cmd_name,
stage_tokens=stage_tokens,
config=config,
command_text=pipeline_text if pipeline_text else " ".join(stage_tokens),
)
try:
stage_ctx = PipelineStageContext(
stage_index=stage_index,
total_stages=len(stages),
pipe_index=pipe_idx,
worker_id=session.worker_id if session else None,
on_emit=(lambda x: progress_ui.on_emit(pipe_idx, x))
if progress_ui is not None and pipe_idx is not None else None,
)
# Set context for the current run
ctx.set_stage_context(stage_ctx)
ctx.set_current_cmdlet_name(cmd_name)
ctx.set_current_stage_text(" ".join(stage_tokens))
ctx.clear_emits()
if progress_ui is not None and pipe_idx is not None:
# Start the pipe task in the UI. For most cmdlets we assume 1 item
# initially; cmdlets that process multiple items (like search)
# should call begin_pipe themselves with the actual count.
progress_ui.begin_pipe(pipe_idx, total_items=1)
# RUN THE CMDLET
cmd_fn(piped_result, stage_args, config)
# Pipeline overlay tables (e.g., get-url detail views) need to be
# rendered when running inside a pipeline because the CLI path
# normally handles rendering. The overlay is only useful when
# we're at the terminal stage of the pipeline. Save the table so
# it can be printed after the pipe finishes.
overlay_table = None
if stage_index + 1 >= len(stages):
try:
overlay_table = (
ctx.get_display_table()
if hasattr(ctx, "get_display_table") else None
)
except Exception:
overlay_table = None
# Update piped_result for next stage from emitted items
stage_emits = list(stage_ctx.emits)
if stage_emits:
piped_result = stage_emits if len(stage_emits) > 1 else stage_emits[0]
else:
piped_result = None
finally:
if progress_ui is not None and pipe_idx is not None:
progress_ui.finish_pipe(pipe_idx)
if overlay_table is not None:
try:
from SYS.rich_display import stdout_console
stdout_console().print()
stdout_console().print(overlay_table)
except Exception:
logger.exception("Failed to render overlay_table to stdout_console")
if session:
try:
session.close()
except Exception:
logger.exception("Failed to close pipeline stage session")
except Exception as exc:
pipeline_status = "failed"
pipeline_error = f"{cmd_name}: {exc}"
debug(f"Error in stage {stage_index} ({cmd_name}): {exc}")
return
except Exception as exc:
pipeline_status = "failed"
pipeline_error = f"{type(exc).__name__}: {exc}"
print(f"[error] {type(exc).__name__}: {exc}\n")
finally:
# Stop Live progress and clear pipeline-level live progress
if progress_ui is not None:
try:
progress_ui.complete_all_pipes()
except Exception:
logger.exception("Failed to complete all pipe UI tasks in progress_ui.complete_all_pipes")
try:
progress_ui.stop()
except Exception:
logger.exception("Failed to stop progress_ui")
try:
from SYS import pipeline as _pipeline_ctx
if hasattr(_pipeline_ctx, "set_live_progress"):
_pipeline_ctx.set_live_progress(None)
except Exception:
logger.exception("Failed to clear live_progress on pipeline context")
# Close pipeline session and log final status
try:
if pipeline_session and worker_manager:
pipeline_session.close(status=pipeline_status, error_msg=pipeline_error)
except Exception:
logger.exception("Failed to close pipeline session during finalization")
try:
if pipeline_session and worker_manager:
self._log_pipeline_event(worker_manager, pipeline_session.worker_id,
f"Pipeline {pipeline_status}: {pipeline_error or ''}")
except Exception:
logger.exception("Failed to log final pipeline status (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None))