"""Pipeline execution context and state management for cmdlets. This module provides functions for managing pipeline state, allowing cmdlets to emit results and control printing behavior within a piped execution context. Key Concepts: - Pipeline stages are chained command invocations - Each stage receives input items and emits output items - Printing behavior is controlled based on pipeline position - Stage context tracks whether this is the last stage (affects output verbosity) PowerShell-like piping model: - Each stage processes items individually - Stage calls emit() for each output item - Output items become input for next stage - Batch commands receive all items at once (special case) """ from __future__ import annotations import sys from typing import Any, Dict, List, Optional, Sequence from models import PipelineStageContext from helper.logger import log # ============================================================================ # PIPELINE GLOBALS (maintained for backward compatibility) # ============================================================================ # Current pipeline context (thread-local in real world, global here for simplicity) _CURRENT_CONTEXT: Optional[PipelineStageContext] = None # Active execution state _PIPE_EMITS: List[Any] = [] _PIPE_ACTIVE: bool = False _PIPE_IS_LAST: bool = False # Ephemeral handoff for direct pipelines (e.g., URL --screen-shot | ...) _LAST_PIPELINE_CAPTURE: Optional[Any] = None # Remember last search query to support refreshing results after pipeline actions _LAST_SEARCH_QUERY: Optional[str] = None # Track whether the last pipeline execution already refreshed and displayed results _PIPELINE_REFRESHED: bool = False # Cache the last pipeline outputs so non-interactive callers can inspect results _PIPELINE_LAST_ITEMS: List[Any] = [] # Store the last result table for @ selection syntax (e.g., @2, @2-5, @{1,3,5}) _LAST_RESULT_TABLE: Optional[Any] = None _LAST_RESULT_ITEMS: List[Any] = [] # History of result tables for @.. navigation (LIFO stack, max 20 tables) _RESULT_TABLE_HISTORY: List[tuple[Optional[Any], List[Any]]] = [] _MAX_RESULT_TABLE_HISTORY = 20 # Current stage table for @N expansion (separate from history) # Used to track the ResultTable with source_command + row_selection_args from current pipeline stage # This is set by cmdlets that display tabular results (e.g., download-data showing formats) # and used by CLI to expand @N into full commands like "download-data URL -item 2" _CURRENT_STAGE_TABLE: Optional[Any] = None # Items displayed by non-selectable commands (get-tag, delete-tag, etc.) # These are available for @N selection but NOT saved to history _DISPLAY_ITEMS: List[Any] = [] # Table for display-only commands (overlay) # Used when a command wants to show a specific table formatting but not affect history _DISPLAY_TABLE: Optional[Any] = None # Track the indices the user selected via @ syntax for the current invocation _PIPELINE_LAST_SELECTION: List[int] = [] # Track the currently executing command/pipeline string for worker attribution _PIPELINE_COMMAND_TEXT: str = "" # Shared scratchpad for cmdlets/funacts to stash structured data between stages _PIPELINE_VALUES: Dict[str, Any] = {} _PIPELINE_MISSING = object() # Global callback to notify UI when library content changes _UI_LIBRARY_REFRESH_CALLBACK: Optional[Any] = None # ============================================================================ # PUBLIC API # ============================================================================ def set_stage_context(context: Optional[PipelineStageContext]) -> None: """Internal: Set the current pipeline stage context.""" global _CURRENT_CONTEXT _CURRENT_CONTEXT = context def get_stage_context() -> Optional[PipelineStageContext]: """Get the current pipeline stage context.""" return _CURRENT_CONTEXT def emit(obj: Any) -> None: """Emit an object to the current pipeline stage output. Call this from a cmdlet to pass data to the next pipeline stage. If not in a pipeline context, this is a no-op. Args: obj: Any object to emit downstream Example: ```python def _run(item, args, config): result = process(item) if result: emit(result) # Pass to next stage return 0 ``` """ # Try new context-based approach first if _CURRENT_CONTEXT is not None: import logging logger = logging.getLogger(__name__) logger.debug(f"[EMIT] Context-based: appending to _CURRENT_CONTEXT.emits. obj={obj}") _CURRENT_CONTEXT.emit(obj) return # Fallback to legacy global approach (for backward compatibility) try: import logging logger = logging.getLogger(__name__) logger.debug(f"[EMIT] Legacy: appending to _PIPE_EMITS. obj type={type(obj).__name__}, _PIPE_EMITS len before={len(_PIPE_EMITS)}") _PIPE_EMITS.append(obj) logger.debug(f"[EMIT] Legacy: _PIPE_EMITS len after={len(_PIPE_EMITS)}") except Exception as e: import logging logger = logging.getLogger(__name__) logger.error(f"[EMIT] Error appending to _PIPE_EMITS: {e}", exc_info=True) pass def print_if_visible(*args: Any, file=None, **kwargs: Any) -> None: """Print only if this is not a quiet mid-pipeline stage. - Always allow errors printed to stderr by callers (they pass file=sys.stderr). - For normal info messages, this suppresses printing for intermediate pipeline stages. - Use this instead of log() in cmdlets when you want stage-aware output. Args: *args: Arguments to print (same as built-in print) file: Output stream (default: stdout) **kwargs: Keyword arguments for print Example: ```python # Always shows errors print_if_visible("[error] Something failed", file=sys.stderr) # Only shows in non-piped context or as final stage print_if_visible(f"Processed {count} items") ``` """ try: # Print if: not in a pipeline OR this is the last stage should_print = (not _PIPE_ACTIVE) or _PIPE_IS_LAST # Always print to stderr regardless if file is not None: should_print = True if should_print: log(*args, **kwargs) if file is None else log(*args, file=file, **kwargs) except Exception: pass def store_value(key: str, value: Any) -> None: """Store a value to pass to later pipeline stages. Values are stored in a shared dictionary keyed by normalized lowercase strings. This allows one stage to prepare data for the next stage without intermediate output. Args: key: Variable name (normalized to lowercase, non-empty) value: Any Python object to store """ if not isinstance(key, str): return text = key.strip().lower() if not text: return try: _PIPELINE_VALUES[text] = value except Exception: pass def load_value(key: str, default: Any = None) -> Any: """Retrieve a value stored by an earlier pipeline stage. Supports dotted path notation for nested access (e.g., "metadata.tags" or "items.0"). Args: key: Variable name or dotted path (e.g., "my_var", "metadata.title", "list.0") default: Value to return if key not found or access fails Returns: The stored value, or default if not found """ if not isinstance(key, str): return default text = key.strip() if not text: return default parts = [segment.strip() for segment in text.split('.') if segment.strip()] if not parts: return default root_key = parts[0].lower() container = _PIPELINE_VALUES.get(root_key, _PIPELINE_MISSING) if container is _PIPELINE_MISSING: return default if len(parts) == 1: return container current: Any = container for fragment in parts[1:]: if isinstance(current, dict): fragment_lower = fragment.lower() if fragment in current: current = current[fragment] continue match = _PIPELINE_MISSING for key_name, value in current.items(): if isinstance(key_name, str) and key_name.lower() == fragment_lower: match = value break if match is _PIPELINE_MISSING: return default current = match continue if isinstance(current, (list, tuple)): if fragment.isdigit(): try: idx = int(fragment) except ValueError: return default if 0 <= idx < len(current): current = current[idx] continue return default if hasattr(current, fragment): try: current = getattr(current, fragment) continue except Exception: return default return default return current def reset() -> None: """Reset all pipeline state. Called between pipeline executions.""" global _PIPE_EMITS, _PIPE_ACTIVE, _PIPE_IS_LAST, _PIPELINE_VALUES global _LAST_PIPELINE_CAPTURE, _PIPELINE_REFRESHED, _PIPELINE_LAST_ITEMS global _PIPELINE_COMMAND_TEXT _PIPE_EMITS = [] _PIPE_ACTIVE = False _PIPE_IS_LAST = False _LAST_PIPELINE_CAPTURE = None _PIPELINE_REFRESHED = False _PIPELINE_LAST_ITEMS = [] _PIPELINE_VALUES = {} _PIPELINE_COMMAND_TEXT = "" def get_emitted_items() -> List[Any]: """Get a copy of all items emitted by the current pipeline stage.""" return list(_PIPE_EMITS) def clear_emits() -> None: """Clear the emitted items list (called between stages).""" global _PIPE_EMITS _PIPE_EMITS = [] def set_last_selection(indices: Sequence[int]) -> None: """Record the indices selected via @ syntax for the next cmdlet. Args: indices: Iterable of 0-based indices captured from the REPL parser """ global _PIPELINE_LAST_SELECTION _PIPELINE_LAST_SELECTION = list(indices or []) def get_last_selection() -> List[int]: """Return the indices selected via @ syntax for the current invocation.""" return list(_PIPELINE_LAST_SELECTION) def clear_last_selection() -> None: """Clear the cached selection indices after a cmdlet finishes.""" global _PIPELINE_LAST_SELECTION _PIPELINE_LAST_SELECTION = [] def set_current_command_text(command_text: Optional[str]) -> None: """Record the raw pipeline/command text for downstream consumers.""" global _PIPELINE_COMMAND_TEXT _PIPELINE_COMMAND_TEXT = (command_text or "").strip() def get_current_command_text(default: str = "") -> str: """Return the last recorded command/pipeline text.""" text = _PIPELINE_COMMAND_TEXT.strip() return text if text else default def clear_current_command_text() -> None: """Clear the cached command text after execution completes.""" global _PIPELINE_COMMAND_TEXT _PIPELINE_COMMAND_TEXT = "" def set_active(active: bool) -> None: """Internal: Set whether we're in a pipeline context.""" global _PIPE_ACTIVE _PIPE_ACTIVE = active def set_last_stage(is_last: bool) -> None: """Internal: Set whether this is the last stage of the pipeline.""" global _PIPE_IS_LAST _PIPE_IS_LAST = is_last def set_search_query(query: Optional[str]) -> None: """Internal: Set the last search query for refresh purposes.""" global _LAST_SEARCH_QUERY _LAST_SEARCH_QUERY = query def get_search_query() -> Optional[str]: """Get the last search query.""" return _LAST_SEARCH_QUERY def set_pipeline_refreshed(refreshed: bool) -> None: """Internal: Track whether the pipeline already refreshed results.""" global _PIPELINE_REFRESHED _PIPELINE_REFRESHED = refreshed def was_pipeline_refreshed() -> bool: """Check if the pipeline already refreshed results.""" return _PIPELINE_REFRESHED def set_last_items(items: list) -> None: """Internal: Cache the last pipeline outputs.""" global _PIPELINE_LAST_ITEMS _PIPELINE_LAST_ITEMS = list(items) if items else [] def get_last_items() -> List[Any]: """Get the last pipeline outputs.""" return list(_PIPELINE_LAST_ITEMS) def set_last_capture(obj: Any) -> None: """Internal: Store ephemeral handoff for direct pipelines.""" global _LAST_PIPELINE_CAPTURE _LAST_PIPELINE_CAPTURE = obj def get_last_capture() -> Optional[Any]: """Get ephemeral pipeline handoff (e.g., URL --screen-shot | ...).""" return _LAST_PIPELINE_CAPTURE def set_ui_library_refresh_callback(callback: Any) -> None: """Set a callback to be called when library content is updated. The callback will be called with: callback(library_filter: str = 'local') Args: callback: A callable that accepts optional library_filter parameter Example: def my_refresh_callback(library_filter='local'): print(f"Refresh library: {library_filter}") set_ui_library_refresh_callback(my_refresh_callback) """ global _UI_LIBRARY_REFRESH_CALLBACK _UI_LIBRARY_REFRESH_CALLBACK = callback def get_ui_library_refresh_callback() -> Optional[Any]: """Get the current library refresh callback.""" return _UI_LIBRARY_REFRESH_CALLBACK def trigger_ui_library_refresh(library_filter: str = 'local') -> None: """Trigger a library refresh in the UI if callback is registered. This should be called from cmdlets/funacts after content is added to library. Args: library_filter: Which library to refresh ('local', 'hydrus', etc) """ callback = get_ui_library_refresh_callback() if callback: try: callback(library_filter) except Exception as e: print(f"[trigger_ui_library_refresh] Error calling refresh callback: {e}", file=sys.stderr) def set_last_result_table(result_table: Optional[Any], items: Optional[List[Any]] = None) -> None: """Store the last result table and items for @ selection syntax. This should be called after displaying a result table, so users can reference rows with @2, @2-5, @{1,3,5} syntax in subsequent commands. Also maintains a history stack for @.. navigation (restore previous result table). Only selectable commands (search-file, download-data) should call this to create history. For action commands (delete-tag, add-tag, etc), use set_last_result_table_preserve_history() instead. Args: result_table: The ResultTable object that was displayed (or None) items: List of items that populated the table (optional) """ global _LAST_RESULT_TABLE, _LAST_RESULT_ITEMS, _RESULT_TABLE_HISTORY, _DISPLAY_ITEMS, _DISPLAY_TABLE # Push current table to history before replacing if _LAST_RESULT_TABLE is not None: _RESULT_TABLE_HISTORY.append((_LAST_RESULT_TABLE, _LAST_RESULT_ITEMS.copy())) # Keep history size limited if len(_RESULT_TABLE_HISTORY) > _MAX_RESULT_TABLE_HISTORY: _RESULT_TABLE_HISTORY.pop(0) # Set new current table and clear any display items/table _DISPLAY_ITEMS = [] _DISPLAY_TABLE = None _LAST_RESULT_TABLE = result_table _LAST_RESULT_ITEMS = items or [] def set_last_result_table_overlay(result_table: Optional[Any], items: Optional[List[Any]] = None) -> None: """Set a result table as an overlay (display only, no history). Used for commands like get-tag that want to show a formatted table but should be treated as a transient view (closing it returns to previous table). Args: result_table: The ResultTable object to display items: List of items for @N selection """ global _DISPLAY_ITEMS, _DISPLAY_TABLE _DISPLAY_TABLE = result_table _DISPLAY_ITEMS = items or [] def set_last_result_table_preserve_history(result_table: Optional[Any], items: Optional[List[Any]] = None) -> None: """Update the last result table WITHOUT adding to history. Used for action commands (delete-tag, add-tag, etc.) that modify data but shouldn't create history entries. This allows @.. to navigate search results, not undo stacks. Args: result_table: The ResultTable object that was displayed (or None) items: List of items that populated the table (optional) """ global _LAST_RESULT_TABLE, _LAST_RESULT_ITEMS # Update current table WITHOUT pushing to history _LAST_RESULT_TABLE = result_table _LAST_RESULT_ITEMS = items or [] def set_last_result_items_only(items: Optional[List[Any]]) -> None: """Store items for @N selection WITHOUT affecting history or saved search data. Used for display-only commands (get-tag, get-url, etc.) and action commands (delete-tag, add-tag, etc.) that emit results but shouldn't affect history. These items are available for @1, @2, etc. selection in the next command, but are NOT saved to history. This preserves search context for @.. navigation. Args: items: List of items to select from """ global _DISPLAY_ITEMS, _DISPLAY_TABLE # Store items for immediate @N selection, but DON'T modify _LAST_RESULT_ITEMS # This ensures history contains original search data, not display transformations _DISPLAY_ITEMS = items or [] # Clear display table since we're setting items only (CLI will generate table if needed) _DISPLAY_TABLE = None def restore_previous_result_table() -> bool: """Restore the previous result table from history (for @.. navigation). Returns: True if a previous table was restored, False if history is empty """ global _LAST_RESULT_TABLE, _LAST_RESULT_ITEMS, _RESULT_TABLE_HISTORY, _DISPLAY_ITEMS, _DISPLAY_TABLE # If we have an active overlay (display items/table), clear it to "go back" to the underlying table if _DISPLAY_ITEMS or _DISPLAY_TABLE: _DISPLAY_ITEMS = [] _DISPLAY_TABLE = None return True if not _RESULT_TABLE_HISTORY: return False # Pop from history and restore _LAST_RESULT_TABLE, _LAST_RESULT_ITEMS = _RESULT_TABLE_HISTORY.pop() # Clear display items so get_last_result_items() falls back to restored items _DISPLAY_ITEMS = [] _DISPLAY_TABLE = None return True def get_display_table() -> Optional[Any]: """Get the current display overlay table. Returns: The ResultTable object, or None if no overlay table is set """ return _DISPLAY_TABLE def get_last_result_table() -> Optional[Any]: """Get the current last result table. Returns: The ResultTable object, or None if no table is set """ return _LAST_RESULT_TABLE def get_last_result_items() -> List[Any]: """Get the items available for @N selection. Returns items from display/action commands (get-tag, delete-tag, etc.) if available, otherwise returns items from the last search command. This ensures @N selection works for both display operations and search results. Returns: List of items, or empty list if no prior results """ # Prioritize items from display commands (get-tag, delete-tag, etc.) # These are available for immediate @N selection if _DISPLAY_ITEMS: return _DISPLAY_ITEMS # Fall back to items from last search/selectable command return _LAST_RESULT_ITEMS def get_last_result_table_source_command() -> Optional[str]: """Get the source command from the last displayed result table. Returns: Command name (e.g., 'download-data') or None if not set """ if _LAST_RESULT_TABLE and hasattr(_LAST_RESULT_TABLE, 'source_command'): return _LAST_RESULT_TABLE.source_command return None def get_last_result_table_source_args() -> List[str]: """Get the base source arguments from the last displayed result table. Returns: List of arguments (e.g., ['https://example.com']) or empty list """ if _LAST_RESULT_TABLE and hasattr(_LAST_RESULT_TABLE, 'source_args'): return _LAST_RESULT_TABLE.source_args or [] return [] def get_last_result_table_row_selection_args(row_index: int) -> Optional[List[str]]: """Get the selection arguments for a specific row in the last result table. Args: row_index: Index of the row (0-based) Returns: Selection arguments (e.g., ['-item', '3']) or None """ if _LAST_RESULT_TABLE and hasattr(_LAST_RESULT_TABLE, 'rows'): if 0 <= row_index < len(_LAST_RESULT_TABLE.rows): row = _LAST_RESULT_TABLE.rows[row_index] if hasattr(row, 'selection_args'): return row.selection_args return None def set_current_stage_table(result_table: Optional[Any]) -> None: """Store the current pipeline stage table for @N expansion. Used by cmdlets that display tabular results (e.g., download-data with formats) to make their result table available for @N expansion logic. Does NOT push to history - purely for command expansion in the current pipeline. Args: result_table: The ResultTable object (or None to clear) """ global _CURRENT_STAGE_TABLE _CURRENT_STAGE_TABLE = result_table def get_current_stage_table_source_command() -> Optional[str]: """Get the source command from the current pipeline stage table. Returns: Command name (e.g., 'download-data') or None """ if _CURRENT_STAGE_TABLE and hasattr(_CURRENT_STAGE_TABLE, 'source_command'): return _CURRENT_STAGE_TABLE.source_command return None def get_current_stage_table_source_args() -> List[str]: """Get the source arguments from the current pipeline stage table. Returns: List of arguments or empty list """ if _CURRENT_STAGE_TABLE and hasattr(_CURRENT_STAGE_TABLE, 'source_args'): return _CURRENT_STAGE_TABLE.source_args or [] return [] def get_current_stage_table_row_selection_args(row_index: int) -> Optional[List[str]]: """Get the selection arguments for a row in the current pipeline stage table. Args: row_index: Index of the row (0-based) Returns: Selection arguments or None """ if _CURRENT_STAGE_TABLE and hasattr(_CURRENT_STAGE_TABLE, 'rows'): if 0 <= row_index < len(_CURRENT_STAGE_TABLE.rows): row = _CURRENT_STAGE_TABLE.rows[row_index] if hasattr(row, 'selection_args'): return row.selection_args return None def clear_last_result() -> None: """Clear the stored last result table and items.""" global _LAST_RESULT_TABLE, _LAST_RESULT_ITEMS _LAST_RESULT_TABLE = None _LAST_RESULT_ITEMS = [] def emit_list(objects: List[Any]) -> None: """Emit a list of PipeObjects to the next pipeline stage. This allows cmdlets to emit multiple results that are tracked as a list, enabling downstream cmdlets to process all of them or filter by metadata. Args: objects: List of PipeObject instances or dicts to emit """ if _CURRENT_CONTEXT is not None: _CURRENT_CONTEXT.emit(objects) else: _PIPE_EMITS.append(objects)