""" Pipeline execution context and state management for cmdlet. """ from __future__ import annotations import sys import shlex from contextlib import contextmanager from dataclasses import dataclass, field from contextvars import ContextVar from typing import Any, Dict, List, Optional, Sequence from SYS.models import PipelineStageContext from SYS.logger import log _LIVE_PROGRESS: Any = None def set_live_progress(progress_ui: Any) -> None: """Register the current Live progress UI so cmdlets can suspend it during prompts.""" state = _get_pipeline_state() state.live_progress = progress_ui _sync_module_state(state) def get_live_progress() -> Any: state = _get_pipeline_state() return state.live_progress @contextmanager def suspend_live_progress(): """Temporarily pause Live progress rendering. This avoids Rich Live cursor control interfering with interactive tables/prompts emitted by cmdlets during preflight (e.g. URL-duplicate confirmation). """ ui = get_live_progress() paused = False try: if ui is not None and hasattr(ui, "pause"): try: ui.pause() paused = True except Exception: paused = False yield finally: # If a stage requested the pipeline stop (e.g. user declined a preflight prompt), # do not resume Live rendering. if get_pipeline_stop() is not None: return if paused and ui is not None and hasattr(ui, "resume"): try: ui.resume() except Exception: pass def _is_selectable_table(table: Any) -> bool: """Return True when a table can be used for @ selection.""" return bool(table) and not getattr(table, "no_choice", False) # Pipeline state container (prototype) @dataclass class PipelineState: current_context: Optional[PipelineStageContext] = None last_search_query: Optional[str] = None pipeline_refreshed: bool = False last_items: List[Any] = field(default_factory=list) last_result_table: Optional[Any] = None last_result_items: List[Any] = field(default_factory=list) last_result_subject: Optional[Any] = None result_table_history: List[tuple[Optional[Any], List[Any], Optional[Any]]] = field(default_factory=list) result_table_forward: List[tuple[Optional[Any], List[Any], Optional[Any]]] = field(default_factory=list) current_stage_table: Optional[Any] = None display_items: List[Any] = field(default_factory=list) display_table: Optional[Any] = None display_subject: Optional[Any] = None last_selection: List[int] = field(default_factory=list) pipeline_command_text: str = "" current_cmdlet_name: str = "" current_stage_text: str = "" pipeline_values: Dict[str, Any] = field(default_factory=dict) pending_pipeline_tail: List[List[str]] = field(default_factory=list) pending_pipeline_source: Optional[str] = None ui_library_refresh_callback: Optional[Any] = None pipeline_stop: Optional[Dict[str, Any]] = None live_progress: Any = None def reset(self) -> None: self.current_context = None self.last_search_query = None self.pipeline_refreshed = False self.last_items = [] self.last_result_table = None self.last_result_items = [] self.last_result_subject = None self.result_table_history = [] self.result_table_forward = [] self.current_stage_table = None self.display_items = [] self.display_table = None self.display_subject = None self.last_selection = [] self.pipeline_command_text = "" self.current_cmdlet_name = "" self.current_stage_text = "" self.pipeline_values = {} self.pending_pipeline_tail = [] self.pending_pipeline_source = None self.ui_library_refresh_callback = None self.pipeline_stop = None self.live_progress = None # ContextVar for per-run state (prototype) _CTX_STATE: ContextVar[Optional[PipelineState]] = ContextVar("_pipeline_state", default=None) _GLOBAL_STATE: PipelineState = PipelineState() def _get_pipeline_state() -> PipelineState: """Return the PipelineState for the current context or the global fallback.""" state = _CTX_STATE.get() return state if state is not None else _GLOBAL_STATE @contextmanager def new_pipeline_state(): """Context manager to use a fresh PipelineState for a run.""" token = _CTX_STATE.set(PipelineState()) try: yield _CTX_STATE.get() finally: _CTX_STATE.reset(token) def _sync_module_state(state: PipelineState) -> None: """Synchronize module-level pipeline globals from a PipelineState instance. Centralizing `global` declarations reduces the chance of syntax problems during incremental migration (all module globals are updated from a single place). """ global _CURRENT_CONTEXT, _LAST_SEARCH_QUERY, _PIPELINE_REFRESHED, _PIPELINE_LAST_ITEMS global _LAST_RESULT_TABLE, _LAST_RESULT_ITEMS, _LAST_RESULT_SUBJECT, _RESULT_TABLE_HISTORY, _RESULT_TABLE_FORWARD global _CURRENT_STAGE_TABLE, _DISPLAY_ITEMS, _DISPLAY_TABLE, _DISPLAY_SUBJECT global _PIPELINE_LAST_SELECTION, _PIPELINE_COMMAND_TEXT, _CURRENT_CMDLET_NAME, _CURRENT_STAGE_TEXT global _PIPELINE_VALUES, _PENDING_PIPELINE_TAIL, _PENDING_PIPELINE_SOURCE, _UI_LIBRARY_REFRESH_CALLBACK global _PIPELINE_STOP, _LIVE_PROGRESS _CURRENT_CONTEXT = state.current_context _LAST_SEARCH_QUERY = state.last_search_query _PIPELINE_REFRESHED = state.pipeline_refreshed _PIPELINE_LAST_ITEMS = state.last_items _LAST_RESULT_TABLE = state.last_result_table _LAST_RESULT_ITEMS = state.last_result_items _LAST_RESULT_SUBJECT = state.last_result_subject _RESULT_TABLE_HISTORY = state.result_table_history _RESULT_TABLE_FORWARD = state.result_table_forward _CURRENT_STAGE_TABLE = state.current_stage_table _DISPLAY_ITEMS = state.display_items _DISPLAY_TABLE = state.display_table _DISPLAY_SUBJECT = state.display_subject _PIPELINE_LAST_SELECTION = state.last_selection _PIPELINE_COMMAND_TEXT = state.pipeline_command_text _CURRENT_CMDLET_NAME = state.current_cmdlet_name _CURRENT_STAGE_TEXT = state.current_stage_text _PIPELINE_VALUES = state.pipeline_values _PENDING_PIPELINE_TAIL = state.pending_pipeline_tail _PENDING_PIPELINE_SOURCE = state.pending_pipeline_source _UI_LIBRARY_REFRESH_CALLBACK = state.ui_library_refresh_callback _PIPELINE_STOP = state.pipeline_stop _LIVE_PROGRESS = state.live_progress # Public accessors for pipeline state (for external callers that need to inspect # or mutate the PipelineState directly). These provide stable, non-underscored # entrypoints so other modules don't rely on implementation-internal names. def get_pipeline_state() -> PipelineState: """Return the active PipelineState for the current context or the global fallback.""" return _get_pipeline_state() def sync_module_state(state: PipelineState) -> None: """Synchronize module-level globals from a PipelineState instance (public wrapper).""" _sync_module_state(state) # ============================================================================ # PIPELINE STATE # ============================================================================ # Current pipeline context _CURRENT_CONTEXT: Optional[PipelineStageContext] = None # Remember last search query to support refreshing results after pipeline actions _LAST_SEARCH_QUERY: Optional[str] = None # Track whether the last pipeline execution already refreshed and displayed results _PIPELINE_REFRESHED: bool = False # Cache the last pipeline outputs so non-interactive callers can inspect results _PIPELINE_LAST_ITEMS: List[Any] = [] # Store the last result table for @ selection syntax (e.g., @2, @2-5, @{1,3,5}) _LAST_RESULT_TABLE: Optional[Any] = None _LAST_RESULT_ITEMS: List[Any] = [] # Subject for the current result table (e.g., the file whose tags/url are displayed) _LAST_RESULT_SUBJECT: Optional[Any] = None # History of result tables for @.. navigation (LIFO stack, max 20 tables) _RESULT_TABLE_HISTORY: List[tuple[Optional[Any], List[Any], Optional[Any]]] = [] _MAX_RESULT_TABLE_HISTORY = 20 # Forward history for @,, navigation (LIFO stack for popped tables) _RESULT_TABLE_FORWARD: List[tuple[Optional[Any], List[Any], Optional[Any]]] = [] # Current stage table for @N expansion (separate from history) _CURRENT_STAGE_TABLE: Optional[Any] = None # Items displayed by non-selectable commands (get-tag, delete-tag, etc.) _DISPLAY_ITEMS: List[Any] = [] # Table for display-only commands (overlay) _DISPLAY_TABLE: Optional[Any] = None # Subject for overlay/display-only tables (takes precedence over _LAST_RESULT_SUBJECT) _DISPLAY_SUBJECT: Optional[Any] = None # Track the indices the user selected via @ syntax for the current invocation _PIPELINE_LAST_SELECTION: List[int] = [] # Track the currently executing command/pipeline string for worker attribution _PIPELINE_COMMAND_TEXT: str = "" # Track the currently executing cmdlet name so debug helpers can label objects # with the active stage (e.g., "1 - add-file"). _CURRENT_CMDLET_NAME: str = "" # Track the currently executing stage text (best-effort, quotes preserved). _CURRENT_STAGE_TEXT: str = "" # Shared scratchpad for cmdlet/funacts to stash structured data between stages _PIPELINE_VALUES: Dict[str, Any] = {} _PIPELINE_MISSING = object() # Preserve downstream pipeline stages when a command pauses for @N selection _PENDING_PIPELINE_TAIL: List[List[str]] = [] _PENDING_PIPELINE_SOURCE: Optional[str] = None # Global callback to notify UI when library content changes _UI_LIBRARY_REFRESH_CALLBACK: Optional[Any] = None # ============================================================================ # PIPELINE STOP SIGNAL # ============================================================================ _PIPELINE_STOP: Optional[Dict[str, Any]] = None def request_pipeline_stop(*, reason: str = "", exit_code: int = 0) -> None: """Request that the pipeline runner stop gracefully after the current stage.""" state = _get_pipeline_state() state.pipeline_stop = { "reason": str(reason or "").strip(), "exit_code": int(exit_code) } _sync_module_state(state) def get_pipeline_stop() -> Optional[Dict[str, Any]]: state = _get_pipeline_state() return state.pipeline_stop def clear_pipeline_stop() -> None: state = _get_pipeline_state() state.pipeline_stop = None _sync_module_state(state) # ============================================================================ # PUBLIC API # ============================================================================ def set_stage_context(context: Optional[PipelineStageContext]) -> None: """Set the current pipeline stage context.""" state = _get_pipeline_state() state.current_context = context _sync_module_state(state) def get_stage_context() -> Optional[PipelineStageContext]: """Get the current pipeline stage context.""" state = _get_pipeline_state() return state.current_context def emit(obj: Any) -> None: """ Emit an object to the current pipeline stage output. """ ctx = _get_pipeline_state().current_context if ctx is not None: ctx.emit(obj) def emit_list(objects: List[Any]) -> None: """ Emit a list of objects to the next pipeline stage. """ ctx = _get_pipeline_state().current_context if ctx is not None: ctx.emit(objects) def print_if_visible(*args: Any, file=None, **kwargs: Any) -> None: """ Print only if this is not a quiet mid-pipeline stage. """ try: # Print if: not in a pipeline OR this is the last stage ctx = _get_pipeline_state().current_context should_print = (ctx is None) or (ctx and ctx.is_last_stage) # Always print to stderr regardless if file is not None: should_print = True if should_print: log(*args, **kwargs) if file is None else log(*args, file=file, **kwargs) except Exception: pass def store_value(key: str, value: Any) -> None: """ Store a value to pass to later pipeline stages. """ if not isinstance(key, str): return text = key.strip().lower() if not text: return try: _PIPELINE_VALUES[text] = value except Exception: pass def load_value(key: str, default: Any = None) -> Any: """ Retrieve a value stored by an earlier pipeline stage. """ if not isinstance(key, str): return default text = key.strip() if not text: return default parts = [segment.strip() for segment in text.split(".") if segment.strip()] if not parts: return default root_key = parts[0].lower() container = _PIPELINE_VALUES.get(root_key, _PIPELINE_MISSING) if container is _PIPELINE_MISSING: return default if len(parts) == 1: return container current: Any = container for fragment in parts[1:]: if isinstance(current, dict): fragment_lower = fragment.lower() if fragment in current: current = current[fragment] continue match = _PIPELINE_MISSING for key_name, value in current.items(): if isinstance(key_name, str) and key_name.lower() == fragment_lower: match = value break if match is _PIPELINE_MISSING: return default current = match continue if isinstance(current, (list, tuple)): if fragment.isdigit(): try: idx = int(fragment) except ValueError: return default if 0 <= idx < len(current): current = current[idx] continue return default if hasattr(current, fragment): try: current = getattr(current, fragment) continue except Exception: return default return default return current def set_pending_pipeline_tail( stages: Optional[Sequence[Sequence[str]]], source_command: Optional[str] = None ) -> None: """ Store the remaining pipeline stages when execution pauses for @N selection. """ state = _get_pipeline_state() try: pending: List[List[str]] = [] for stage in stages or []: if isinstance(stage, (list, tuple)): pending.append([str(token) for token in stage]) state.pending_pipeline_tail = pending clean_source = (source_command or "").strip() state.pending_pipeline_source = clean_source if clean_source else None # Sync module-level variables _sync_module_state(state) except Exception: # Keep existing pending tail on failure pass def get_pending_pipeline_tail() -> List[List[str]]: """Get a copy of the pending pipeline tail (stages queued after selection).""" state = _get_pipeline_state() return [list(stage) for stage in state.pending_pipeline_tail] def get_pending_pipeline_source() -> Optional[str]: """Get the source command associated with the pending pipeline tail.""" state = _get_pipeline_state() return state.pending_pipeline_source def clear_pending_pipeline_tail() -> None: """Clear any stored pending pipeline tail.""" state = _get_pipeline_state() state.pending_pipeline_tail = [] state.pending_pipeline_source = None # Sync module-level variables _sync_module_state(state) def reset() -> None: """Reset all pipeline state. Called between pipeline executions.""" state = _get_pipeline_state() state.reset() # Sync module-level variables for backwards compatibility _sync_module_state(state) def get_emitted_items() -> List[Any]: """ Get a copy of all items emitted by the current pipeline stage. """ if _CURRENT_CONTEXT is not None: return list(_CURRENT_CONTEXT.emits) return [] def clear_emits() -> None: """Clear the emitted items list (called between stages).""" if _CURRENT_CONTEXT is not None: _CURRENT_CONTEXT.emits.clear() def set_last_selection(indices: Sequence[int]) -> None: """Record the indices selected via @ syntax for the next cmdlet. Args: indices: Iterable of 0-based indices captured from the REPL parser """ state = _get_pipeline_state() state.last_selection = list(indices or []) # Sync module-level variables _sync_module_state(state) def get_last_selection() -> List[int]: """Return the indices selected via @ syntax for the current invocation.""" state = _get_pipeline_state() return list(state.last_selection) def clear_last_selection() -> None: """Clear the cached selection indices after a cmdlet finishes.""" state = _get_pipeline_state() state.last_selection = [] _sync_module_state(state) def set_current_command_text(command_text: Optional[str]) -> None: """Record the raw pipeline/command text for downstream consumers.""" state = _get_pipeline_state() state.pipeline_command_text = (command_text or "").strip() _sync_module_state(state) def get_current_command_text(default: str = "") -> str: """Return the last recorded command/pipeline text.""" state = _get_pipeline_state() text = state.pipeline_command_text.strip() return text if text else default def clear_current_command_text() -> None: """Clear the cached command text after execution completes.""" state = _get_pipeline_state() state.pipeline_command_text = "" _sync_module_state(state) def split_pipeline_text(pipeline_text: str) -> List[str]: """Split a pipeline string on unquoted '|' characters. Preserves original quoting/spacing within each returned stage segment. """ text = str(pipeline_text or "") if not text: return [] stages: List[str] = [] buf: List[str] = [] quote: Optional[str] = None escape = False for ch in text: if escape: buf.append(ch) escape = False continue if ch == "\\" and quote is not None: buf.append(ch) escape = True continue if ch in ('"', "'"): if quote is None: quote = ch elif quote == ch: quote = None buf.append(ch) continue if ch == "|" and quote is None: stages.append("".join(buf).strip()) buf = [] continue buf.append(ch) tail = "".join(buf).strip() if tail: stages.append(tail) return [s for s in stages if s] def get_current_command_stages() -> List[str]: """Return the raw stage segments for the current command text.""" return split_pipeline_text(get_current_command_text("")) def set_current_stage_text(stage_text: Optional[str]) -> None: """Record the raw stage text currently being executed.""" state = _get_pipeline_state() state.current_stage_text = str(stage_text or "").strip() _sync_module_state(state) def get_current_stage_text(default: str = "") -> str: """Return the raw stage text currently being executed.""" state = _get_pipeline_state() text = state.current_stage_text.strip() return text if text else default def clear_current_stage_text() -> None: """Clear the cached stage text after a stage completes.""" state = _get_pipeline_state() state.current_stage_text = "" _sync_module_state(state) def set_current_cmdlet_name(cmdlet_name: Optional[str]) -> None: """Record the currently executing cmdlet name (stage-local).""" state = _get_pipeline_state() state.current_cmdlet_name = str(cmdlet_name or "").strip() _sync_module_state(state) def get_current_cmdlet_name(default: str = "") -> str: """Return the currently executing cmdlet name (stage-local).""" state = _get_pipeline_state() text = state.current_cmdlet_name.strip() return text if text else default def clear_current_cmdlet_name() -> None: """Clear the cached cmdlet name after a stage completes.""" state = _get_pipeline_state() state.current_cmdlet_name = "" _sync_module_state(state) def set_search_query(query: Optional[str]) -> None: """Set the last search query for refresh purposes.""" state = _get_pipeline_state() state.last_search_query = query _sync_module_state(state) def get_search_query() -> Optional[str]: """Get the last search query.""" state = _get_pipeline_state() return state.last_search_query def set_pipeline_refreshed(refreshed: bool) -> None: """Track whether the pipeline already refreshed results.""" state = _get_pipeline_state() state.pipeline_refreshed = refreshed _sync_module_state(state) def was_pipeline_refreshed() -> bool: """Check if the pipeline already refreshed results.""" state = _get_pipeline_state() return state.pipeline_refreshed def set_last_items(items: list) -> None: """Cache the last pipeline outputs.""" state = _get_pipeline_state() state.last_items = list(items) if items else [] # Sync module-level variable _sync_module_state(state) def get_last_items() -> List[Any]: """Get the last pipeline outputs.""" state = _get_pipeline_state() return list(state.last_items) def set_ui_library_refresh_callback(callback: Any) -> None: """ Set a callback to be called when library content is updated. """ state = _get_pipeline_state() state.ui_library_refresh_callback = callback _sync_module_state(state) def get_ui_library_refresh_callback() -> Optional[Any]: """Get the current library refresh callback.""" state = _get_pipeline_state() return state.ui_library_refresh_callback def trigger_ui_library_refresh(library_filter: str = "local") -> None: """Trigger a library refresh in the UI if callback is registered. This should be called from cmdlet/funacts after content is added to library. Args: library_filter: Which library to refresh ('local', 'hydrus', etc) """ callback = get_ui_library_refresh_callback() if callback: try: callback(library_filter) except Exception as e: print( f"[trigger_ui_library_refresh] Error calling refresh callback: {e}", file=sys.stderr ) def set_last_result_table( result_table: Optional[Any], items: Optional[List[Any]] = None, subject: Optional[Any] = None ) -> None: """ Store the last result table and items for @ selection syntax. """ state = _get_pipeline_state() # Push current table to history before replacing if state.last_result_table is not None: state.result_table_history.append( ( state.last_result_table, state.last_result_items.copy(), state.last_result_subject, ) ) # Keep history size limited if len(state.result_table_history) > _MAX_RESULT_TABLE_HISTORY: state.result_table_history.pop(0) # Set new current table and clear any display items/table state.display_items = [] state.display_table = None state.display_subject = None state.last_result_table = result_table state.last_result_items = items or [] state.last_result_subject = subject # Sort table by Title/Name column alphabetically if available if ( result_table is not None and hasattr(result_table, "sort_by_title") and not getattr(result_table, "preserve_order", False) ): try: result_table.sort_by_title() # Re-order items list to match the sorted table if state.last_result_items and hasattr(result_table, "rows"): sorted_items: List[Any] = [] for row in result_table.rows: src_idx = getattr(row, "source_index", None) if isinstance(src_idx, int) and 0 <= src_idx < len(state.last_result_items): sorted_items.append(state.last_result_items[src_idx]) if len(sorted_items) == len(result_table.rows): state.last_result_items = sorted_items except Exception: pass # Sync module-level variables for backwards compatibility _sync_module_state(state) def set_last_result_table_overlay( result_table: Optional[Any], items: Optional[List[Any]] = None, subject: Optional[Any] = None ) -> None: """ Set a result table as an overlay (display only, no history). """ state = _get_pipeline_state() state.display_table = result_table state.display_items = items or [] state.display_subject = subject # Sort table by Title/Name column alphabetically if available if ( result_table is not None and hasattr(result_table, "sort_by_title") and not getattr(result_table, "preserve_order", False) ): try: result_table.sort_by_title() # Re-order items list to match the sorted table if state.display_items and hasattr(result_table, "rows"): sorted_items: List[Any] = [] for row in result_table.rows: src_idx = getattr(row, "source_index", None) if isinstance(src_idx, int) and 0 <= src_idx < len(state.display_items): sorted_items.append(state.display_items[src_idx]) if len(sorted_items) == len(result_table.rows): state.display_items = sorted_items except Exception: pass # Sync module-level variables _sync_module_state(state) def set_last_result_table_preserve_history( result_table: Optional[Any], items: Optional[List[Any]] = None, subject: Optional[Any] = None ) -> None: """ Update the last result table WITHOUT adding to history. """ state = _get_pipeline_state() # Update current table WITHOUT pushing to history state.last_result_table = result_table state.last_result_items = items or [] state.last_result_subject = subject # Sync module-level variables _sync_module_state(state) def set_last_result_items_only(items: Optional[List[Any]]) -> None: """ Store items for @N selection WITHOUT affecting history or saved search data. """ state = _get_pipeline_state() # Store items for immediate @N selection, but DON'T modify last_result_items # This ensures history contains original search data, not display transformations state.display_items = items or [] # Clear display table since we're setting items only (CLI will generate table if needed) state.display_table = None state.display_subject = None # Sync module-level variables _sync_module_state(state) def restore_previous_result_table() -> bool: """ Restore the previous result table from history (for @.. navigation). """ state = _get_pipeline_state() # If we have an active overlay (display items/table), clear it to "go back" to the underlying table if state.display_items or state.display_table or state.display_subject is not None: state.display_items = [] state.display_table = None state.display_subject = None # If an underlying table exists, we're done. # Otherwise, fall through to history restore so @.. actually returns to the last table. if state.last_result_table is not None: # Sync module-level variables _sync_module_state(state) return True if not state.result_table_history: # Sync _sync_module_state(state) return True if not state.result_table_history: return False # Save current state to forward stack before popping state.result_table_forward.append( (state.last_result_table, state.last_result_items, state.last_result_subject) ) # Pop from history and restore prev = state.result_table_history.pop() if isinstance(prev, tuple) and len(prev) >= 3: state.last_result_table, state.last_result_items, state.last_result_subject = prev[0], prev[1], prev[2] elif isinstance(prev, tuple) and len(prev) == 2: state.last_result_table, state.last_result_items = prev state.last_result_subject = None else: state.last_result_table, state.last_result_items, state.last_result_subject = None, [], None # Clear display items so get_last_result_items() falls back to restored items state.display_items = [] state.display_table = None state.display_subject = None # Sync module-level variables _sync_module_state(state) return True def restore_next_result_table() -> bool: """ Restore the next result table from forward history (for @,, navigation). """ state = _get_pipeline_state() # If we have an active overlay (display items/table), clear it to "go forward" to the underlying table if state.display_items or state.display_table or state.display_subject is not None: state.display_items = [] state.display_table = None state.display_subject = None # If an underlying table exists, we're done. # Otherwise, fall through to forward restore when available. if state.last_result_table is not None: # Sync module-level vars _sync_module_state(state) return True if not state.result_table_forward: # Sync and return _sync_module_state(state) return True if not state.result_table_forward: return False # Save current state to history stack before popping forward state.result_table_history.append( (state.last_result_table, state.last_result_items, state.last_result_subject) ) # Pop from forward stack and restore next_state = state.result_table_forward.pop() if isinstance(next_state, tuple) and len(next_state) >= 3: state.last_result_table, state.last_result_items, state.last_result_subject = ( next_state[0], next_state[1], next_state[2] ) elif isinstance(next_state, tuple) and len(next_state) == 2: state.last_result_table, state.last_result_items = next_state state.last_result_subject = None else: state.last_result_table, state.last_result_items, state.last_result_subject = None, [], None # Clear display items so get_last_result_items() falls back to restored items state.display_items = [] state.display_table = None state.display_subject = None # Sync module-level variables _sync_module_state(state) return True def get_display_table() -> Optional[Any]: """ Get the current display overlay table. """ state = _get_pipeline_state() return state.display_table def get_last_result_subject() -> Optional[Any]: """ Get the subject associated with the current result table or overlay. """ state = _get_pipeline_state() if state.display_subject is not None: return state.display_subject return state.last_result_subject def get_last_result_table() -> Optional[Any]: """Get the current last result table. Returns: The ResultTable object, or None if no table is set """ state = _get_pipeline_state() return state.last_result_table def get_last_result_items() -> List[Any]: """ Get the items available for @N selection. """ state = _get_pipeline_state() # Prioritize items from display commands (get-tag, delete-tag, etc.) # These are available for immediate @N selection if state.display_items: if state.display_table is not None and not _is_selectable_table(state.display_table): return [] return state.display_items # Fall back to items from last search/selectable command if state.last_result_table is None: return state.last_result_items if _is_selectable_table(state.last_result_table): return state.last_result_items return [] def get_last_selectable_result_items() -> List[Any]: """Get items from the last *selectable* result table, ignoring display-only items. This is useful when a selection stage should target the last visible selectable table (e.g., a playlist/search table), even if a prior action command emitted items and populated display_items. """ state = _get_pipeline_state() if state.last_result_table is None: return list(state.last_result_items) if _is_selectable_table(state.last_result_table): return list(state.last_result_items) return [] def get_last_result_table_source_command() -> Optional[str]: """Get the source command from the last displayed result table. Returns: Command name (e.g., 'download-file') or None if not set """ state = _get_pipeline_state() if _is_selectable_table(state.last_result_table) and hasattr(state.last_result_table, "source_command"): return state.last_result_table.source_command return None def get_last_result_table_source_args() -> List[str]: """Get the base source arguments from the last displayed result table. Returns: List of arguments (e.g., ['https://example.com']) or empty list """ state = _get_pipeline_state() if _is_selectable_table(state.last_result_table) and hasattr(state.last_result_table, "source_args"): return state.last_result_table.source_args or [] return [] def get_last_result_table_row_selection_args(row_index: int) -> Optional[List[str]]: """Get the selection arguments for a specific row in the last result table. Args: row_index: Index of the row (0-based) Returns: Selection arguments (e.g., ['-item', '3']) or None """ state = _get_pipeline_state() if _is_selectable_table(state.last_result_table) and hasattr(state.last_result_table, "rows"): if 0 <= row_index < len(state.last_result_table.rows): row = state.last_result_table.rows[row_index] if hasattr(row, "selection_args"): return row.selection_args return None def set_current_stage_table(result_table: Optional[Any]) -> None: """Store the current pipeline stage table for @N expansion. Used by cmdlet that display tabular results (e.g., download-file listing formats) to make their result table available for @N expansion logic. Does NOT push to history - purely for command expansion in the current pipeline. Args: result_table: The ResultTable object (or None to clear) """ state = _get_pipeline_state() state.current_stage_table = result_table # Sync module-level variable _sync_module_state(state) def get_current_stage_table() -> Optional[Any]: """Get the current pipeline stage table (if any).""" state = _get_pipeline_state() return state.current_stage_table def get_current_stage_table_source_command() -> Optional[str]: """Get the source command from the current pipeline stage table. Returns: Command name (e.g., 'download-file') or None """ state = _get_pipeline_state() if _is_selectable_table(state.current_stage_table) and hasattr(state.current_stage_table, "source_command"): return state.current_stage_table.source_command return None def get_current_stage_table_source_args() -> List[str]: """Get the source arguments from the current pipeline stage table. Returns: List of arguments or empty list """ state = _get_pipeline_state() if _is_selectable_table(state.current_stage_table) and hasattr(state.current_stage_table, "source_args"): return state.current_stage_table.source_args or [] return [] def get_current_stage_table_row_selection_args(row_index: int) -> Optional[List[str]]: """Get the selection arguments for a row in the current pipeline stage table. Args: row_index: Index of the row (0-based) Returns: Selection arguments or None """ state = _get_pipeline_state() if _is_selectable_table(state.current_stage_table) and hasattr(state.current_stage_table, "rows"): if 0 <= row_index < len(state.current_stage_table.rows): row = state.current_stage_table.rows[row_index] if hasattr(row, "selection_args"): return row.selection_args return None def get_current_stage_table_row_source_index(row_index: int) -> Optional[int]: """Get the original source index for a row in the current stage table. Useful when the table has been sorted for display but selections should map back to the original item order (e.g., playlist or provider order). """ state = _get_pipeline_state() if _is_selectable_table(state.current_stage_table) and hasattr(state.current_stage_table, "rows"): if 0 <= row_index < len(state.current_stage_table.rows): row = state.current_stage_table.rows[row_index] return getattr(row, "source_index", None) return None def clear_last_result() -> None: """Clear the stored last result table and items.""" state = _get_pipeline_state() state.last_result_table = None state.last_result_items = [] state.last_result_subject = None _sync_module_state(state)