Files
Medios-Macina/pipeline.py
2025-11-25 20:09:33 -08:00

680 lines
21 KiB
Python

"""Pipeline execution context and state management for cmdlets.
This module provides functions for managing pipeline state, allowing cmdlets to
emit results and control printing behavior within a piped execution context.
Key Concepts:
- Pipeline stages are chained command invocations
- Each stage receives input items and emits output items
- Printing behavior is controlled based on pipeline position
- Stage context tracks whether this is the last stage (affects output verbosity)
PowerShell-like piping model:
- Each stage processes items individually
- Stage calls emit() for each output item
- Output items become input for next stage
- Batch commands receive all items at once (special case)
"""
from __future__ import annotations
import sys
from typing import Any, Dict, List, Optional, Sequence
from models import PipelineStageContext
from helper.logger import log
# ============================================================================
# PIPELINE GLOBALS (maintained for backward compatibility)
# ============================================================================
# Current pipeline context (thread-local in real world, global here for simplicity)
_CURRENT_CONTEXT: Optional[PipelineStageContext] = None
# Active execution state
_PIPE_EMITS: List[Any] = []
_PIPE_ACTIVE: bool = False
_PIPE_IS_LAST: bool = False
# Ephemeral handoff for direct pipelines (e.g., URL --screen-shot | ...)
_LAST_PIPELINE_CAPTURE: Optional[Any] = None
# Remember last search query to support refreshing results after pipeline actions
_LAST_SEARCH_QUERY: Optional[str] = None
# Track whether the last pipeline execution already refreshed and displayed results
_PIPELINE_REFRESHED: bool = False
# Cache the last pipeline outputs so non-interactive callers can inspect results
_PIPELINE_LAST_ITEMS: List[Any] = []
# Store the last result table for @ selection syntax (e.g., @2, @2-5, @{1,3,5})
_LAST_RESULT_TABLE: Optional[Any] = None
_LAST_RESULT_ITEMS: List[Any] = []
# History of result tables for @.. navigation (LIFO stack, max 20 tables)
_RESULT_TABLE_HISTORY: List[tuple[Optional[Any], List[Any]]] = []
_MAX_RESULT_TABLE_HISTORY = 20
# Current stage table for @N expansion (separate from history)
# Used to track the ResultTable with source_command + row_selection_args from current pipeline stage
# This is set by cmdlets that display tabular results (e.g., download-data showing formats)
# and used by CLI to expand @N into full commands like "download-data URL -item 2"
_CURRENT_STAGE_TABLE: Optional[Any] = None
# Items displayed by non-selectable commands (get-tag, delete-tag, etc.)
# These are available for @N selection but NOT saved to history
_DISPLAY_ITEMS: List[Any] = []
# Table for display-only commands (overlay)
# Used when a command wants to show a specific table formatting but not affect history
_DISPLAY_TABLE: Optional[Any] = None
# Track the indices the user selected via @ syntax for the current invocation
_PIPELINE_LAST_SELECTION: List[int] = []
# Track the currently executing command/pipeline string for worker attribution
_PIPELINE_COMMAND_TEXT: str = ""
# Shared scratchpad for cmdlets/funacts to stash structured data between stages
_PIPELINE_VALUES: Dict[str, Any] = {}
_PIPELINE_MISSING = object()
# Global callback to notify UI when library content changes
_UI_LIBRARY_REFRESH_CALLBACK: Optional[Any] = None
# ============================================================================
# PUBLIC API
# ============================================================================
def set_stage_context(context: Optional[PipelineStageContext]) -> None:
"""Internal: Set the current pipeline stage context."""
global _CURRENT_CONTEXT
_CURRENT_CONTEXT = context
def get_stage_context() -> Optional[PipelineStageContext]:
"""Get the current pipeline stage context."""
return _CURRENT_CONTEXT
def emit(obj: Any) -> None:
"""Emit an object to the current pipeline stage output.
Call this from a cmdlet to pass data to the next pipeline stage.
If not in a pipeline context, this is a no-op.
Args:
obj: Any object to emit downstream
Example:
```python
def _run(item, args, config):
result = process(item)
if result:
emit(result) # Pass to next stage
return 0
```
"""
# Try new context-based approach first
if _CURRENT_CONTEXT is not None:
import logging
logger = logging.getLogger(__name__)
logger.debug(f"[EMIT] Context-based: appending to _CURRENT_CONTEXT.emits. obj={obj}")
_CURRENT_CONTEXT.emit(obj)
return
# Fallback to legacy global approach (for backward compatibility)
try:
import logging
logger = logging.getLogger(__name__)
logger.debug(f"[EMIT] Legacy: appending to _PIPE_EMITS. obj type={type(obj).__name__}, _PIPE_EMITS len before={len(_PIPE_EMITS)}")
_PIPE_EMITS.append(obj)
logger.debug(f"[EMIT] Legacy: _PIPE_EMITS len after={len(_PIPE_EMITS)}")
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"[EMIT] Error appending to _PIPE_EMITS: {e}", exc_info=True)
pass
def print_if_visible(*args: Any, file=None, **kwargs: Any) -> None:
"""Print only if this is not a quiet mid-pipeline stage.
- Always allow errors printed to stderr by callers (they pass file=sys.stderr).
- For normal info messages, this suppresses printing for intermediate pipeline stages.
- Use this instead of log() in cmdlets when you want stage-aware output.
Args:
*args: Arguments to print (same as built-in print)
file: Output stream (default: stdout)
**kwargs: Keyword arguments for print
Example:
```python
# Always shows errors
print_if_visible("[error] Something failed", file=sys.stderr)
# Only shows in non-piped context or as final stage
print_if_visible(f"Processed {count} items")
```
"""
try:
# Print if: not in a pipeline OR this is the last stage
should_print = (not _PIPE_ACTIVE) or _PIPE_IS_LAST
# Always print to stderr regardless
if file is not None:
should_print = True
if should_print:
log(*args, **kwargs) if file is None else log(*args, file=file, **kwargs)
except Exception:
pass
def store_value(key: str, value: Any) -> None:
"""Store a value to pass to later pipeline stages.
Values are stored in a shared dictionary keyed by normalized lowercase strings.
This allows one stage to prepare data for the next stage without intermediate output.
Args:
key: Variable name (normalized to lowercase, non-empty)
value: Any Python object to store
"""
if not isinstance(key, str):
return
text = key.strip().lower()
if not text:
return
try:
_PIPELINE_VALUES[text] = value
except Exception:
pass
def load_value(key: str, default: Any = None) -> Any:
"""Retrieve a value stored by an earlier pipeline stage.
Supports dotted path notation for nested access (e.g., "metadata.tags" or "items.0").
Args:
key: Variable name or dotted path (e.g., "my_var", "metadata.title", "list.0")
default: Value to return if key not found or access fails
Returns:
The stored value, or default if not found
"""
if not isinstance(key, str):
return default
text = key.strip()
if not text:
return default
parts = [segment.strip() for segment in text.split('.') if segment.strip()]
if not parts:
return default
root_key = parts[0].lower()
container = _PIPELINE_VALUES.get(root_key, _PIPELINE_MISSING)
if container is _PIPELINE_MISSING:
return default
if len(parts) == 1:
return container
current: Any = container
for fragment in parts[1:]:
if isinstance(current, dict):
fragment_lower = fragment.lower()
if fragment in current:
current = current[fragment]
continue
match = _PIPELINE_MISSING
for key_name, value in current.items():
if isinstance(key_name, str) and key_name.lower() == fragment_lower:
match = value
break
if match is _PIPELINE_MISSING:
return default
current = match
continue
if isinstance(current, (list, tuple)):
if fragment.isdigit():
try:
idx = int(fragment)
except ValueError:
return default
if 0 <= idx < len(current):
current = current[idx]
continue
return default
if hasattr(current, fragment):
try:
current = getattr(current, fragment)
continue
except Exception:
return default
return default
return current
def reset() -> None:
"""Reset all pipeline state. Called between pipeline executions."""
global _PIPE_EMITS, _PIPE_ACTIVE, _PIPE_IS_LAST, _PIPELINE_VALUES
global _LAST_PIPELINE_CAPTURE, _PIPELINE_REFRESHED, _PIPELINE_LAST_ITEMS
global _PIPELINE_COMMAND_TEXT
_PIPE_EMITS = []
_PIPE_ACTIVE = False
_PIPE_IS_LAST = False
_LAST_PIPELINE_CAPTURE = None
_PIPELINE_REFRESHED = False
_PIPELINE_LAST_ITEMS = []
_PIPELINE_VALUES = {}
_PIPELINE_COMMAND_TEXT = ""
def get_emitted_items() -> List[Any]:
"""Get a copy of all items emitted by the current pipeline stage."""
return list(_PIPE_EMITS)
def clear_emits() -> None:
"""Clear the emitted items list (called between stages)."""
global _PIPE_EMITS
_PIPE_EMITS = []
def set_last_selection(indices: Sequence[int]) -> None:
"""Record the indices selected via @ syntax for the next cmdlet.
Args:
indices: Iterable of 0-based indices captured from the REPL parser
"""
global _PIPELINE_LAST_SELECTION
_PIPELINE_LAST_SELECTION = list(indices or [])
def get_last_selection() -> List[int]:
"""Return the indices selected via @ syntax for the current invocation."""
return list(_PIPELINE_LAST_SELECTION)
def clear_last_selection() -> None:
"""Clear the cached selection indices after a cmdlet finishes."""
global _PIPELINE_LAST_SELECTION
_PIPELINE_LAST_SELECTION = []
def set_current_command_text(command_text: Optional[str]) -> None:
"""Record the raw pipeline/command text for downstream consumers."""
global _PIPELINE_COMMAND_TEXT
_PIPELINE_COMMAND_TEXT = (command_text or "").strip()
def get_current_command_text(default: str = "") -> str:
"""Return the last recorded command/pipeline text."""
text = _PIPELINE_COMMAND_TEXT.strip()
return text if text else default
def clear_current_command_text() -> None:
"""Clear the cached command text after execution completes."""
global _PIPELINE_COMMAND_TEXT
_PIPELINE_COMMAND_TEXT = ""
def set_active(active: bool) -> None:
"""Internal: Set whether we're in a pipeline context."""
global _PIPE_ACTIVE
_PIPE_ACTIVE = active
def set_last_stage(is_last: bool) -> None:
"""Internal: Set whether this is the last stage of the pipeline."""
global _PIPE_IS_LAST
_PIPE_IS_LAST = is_last
def set_search_query(query: Optional[str]) -> None:
"""Internal: Set the last search query for refresh purposes."""
global _LAST_SEARCH_QUERY
_LAST_SEARCH_QUERY = query
def get_search_query() -> Optional[str]:
"""Get the last search query."""
return _LAST_SEARCH_QUERY
def set_pipeline_refreshed(refreshed: bool) -> None:
"""Internal: Track whether the pipeline already refreshed results."""
global _PIPELINE_REFRESHED
_PIPELINE_REFRESHED = refreshed
def was_pipeline_refreshed() -> bool:
"""Check if the pipeline already refreshed results."""
return _PIPELINE_REFRESHED
def set_last_items(items: list) -> None:
"""Internal: Cache the last pipeline outputs."""
global _PIPELINE_LAST_ITEMS
_PIPELINE_LAST_ITEMS = list(items) if items else []
def get_last_items() -> List[Any]:
"""Get the last pipeline outputs."""
return list(_PIPELINE_LAST_ITEMS)
def set_last_capture(obj: Any) -> None:
"""Internal: Store ephemeral handoff for direct pipelines."""
global _LAST_PIPELINE_CAPTURE
_LAST_PIPELINE_CAPTURE = obj
def get_last_capture() -> Optional[Any]:
"""Get ephemeral pipeline handoff (e.g., URL --screen-shot | ...)."""
return _LAST_PIPELINE_CAPTURE
def set_ui_library_refresh_callback(callback: Any) -> None:
"""Set a callback to be called when library content is updated.
The callback will be called with:
callback(library_filter: str = 'local')
Args:
callback: A callable that accepts optional library_filter parameter
Example:
def my_refresh_callback(library_filter='local'):
print(f"Refresh library: {library_filter}")
set_ui_library_refresh_callback(my_refresh_callback)
"""
global _UI_LIBRARY_REFRESH_CALLBACK
_UI_LIBRARY_REFRESH_CALLBACK = callback
def get_ui_library_refresh_callback() -> Optional[Any]:
"""Get the current library refresh callback."""
return _UI_LIBRARY_REFRESH_CALLBACK
def trigger_ui_library_refresh(library_filter: str = 'local') -> None:
"""Trigger a library refresh in the UI if callback is registered.
This should be called from cmdlets/funacts after content is added to library.
Args:
library_filter: Which library to refresh ('local', 'hydrus', etc)
"""
callback = get_ui_library_refresh_callback()
if callback:
try:
callback(library_filter)
except Exception as e:
print(f"[trigger_ui_library_refresh] Error calling refresh callback: {e}", file=sys.stderr)
def set_last_result_table(result_table: Optional[Any], items: Optional[List[Any]] = None) -> None:
"""Store the last result table and items for @ selection syntax.
This should be called after displaying a result table, so users can reference
rows with @2, @2-5, @{1,3,5} syntax in subsequent commands.
Also maintains a history stack for @.. navigation (restore previous result table).
Only selectable commands (search-file, download-data) should call this to create history.
For action commands (delete-tag, add-tag, etc), use set_last_result_table_preserve_history() instead.
Args:
result_table: The ResultTable object that was displayed (or None)
items: List of items that populated the table (optional)
"""
global _LAST_RESULT_TABLE, _LAST_RESULT_ITEMS, _RESULT_TABLE_HISTORY, _DISPLAY_ITEMS, _DISPLAY_TABLE
# Push current table to history before replacing
if _LAST_RESULT_TABLE is not None:
_RESULT_TABLE_HISTORY.append((_LAST_RESULT_TABLE, _LAST_RESULT_ITEMS.copy()))
# Keep history size limited
if len(_RESULT_TABLE_HISTORY) > _MAX_RESULT_TABLE_HISTORY:
_RESULT_TABLE_HISTORY.pop(0)
# Set new current table and clear any display items/table
_DISPLAY_ITEMS = []
_DISPLAY_TABLE = None
_LAST_RESULT_TABLE = result_table
_LAST_RESULT_ITEMS = items or []
def set_last_result_table_overlay(result_table: Optional[Any], items: Optional[List[Any]] = None) -> None:
"""Set a result table as an overlay (display only, no history).
Used for commands like get-tag that want to show a formatted table but
should be treated as a transient view (closing it returns to previous table).
Args:
result_table: The ResultTable object to display
items: List of items for @N selection
"""
global _DISPLAY_ITEMS, _DISPLAY_TABLE
_DISPLAY_TABLE = result_table
_DISPLAY_ITEMS = items or []
def set_last_result_table_preserve_history(result_table: Optional[Any], items: Optional[List[Any]] = None) -> None:
"""Update the last result table WITHOUT adding to history.
Used for action commands (delete-tag, add-tag, etc.) that modify data but shouldn't
create history entries. This allows @.. to navigate search results, not undo stacks.
Args:
result_table: The ResultTable object that was displayed (or None)
items: List of items that populated the table (optional)
"""
global _LAST_RESULT_TABLE, _LAST_RESULT_ITEMS
# Update current table WITHOUT pushing to history
_LAST_RESULT_TABLE = result_table
_LAST_RESULT_ITEMS = items or []
def set_last_result_items_only(items: Optional[List[Any]]) -> None:
"""Store items for @N selection WITHOUT affecting history or saved search data.
Used for display-only commands (get-tag, get-url, etc.) and action commands
(delete-tag, add-tag, etc.) that emit results but shouldn't affect history.
These items are available for @1, @2, etc. selection in the next command,
but are NOT saved to history. This preserves search context for @.. navigation.
Args:
items: List of items to select from
"""
global _DISPLAY_ITEMS, _DISPLAY_TABLE
# Store items for immediate @N selection, but DON'T modify _LAST_RESULT_ITEMS
# This ensures history contains original search data, not display transformations
_DISPLAY_ITEMS = items or []
# Clear display table since we're setting items only (CLI will generate table if needed)
_DISPLAY_TABLE = None
def restore_previous_result_table() -> bool:
"""Restore the previous result table from history (for @.. navigation).
Returns:
True if a previous table was restored, False if history is empty
"""
global _LAST_RESULT_TABLE, _LAST_RESULT_ITEMS, _RESULT_TABLE_HISTORY, _DISPLAY_ITEMS, _DISPLAY_TABLE
# If we have an active overlay (display items/table), clear it to "go back" to the underlying table
if _DISPLAY_ITEMS or _DISPLAY_TABLE:
_DISPLAY_ITEMS = []
_DISPLAY_TABLE = None
return True
if not _RESULT_TABLE_HISTORY:
return False
# Pop from history and restore
_LAST_RESULT_TABLE, _LAST_RESULT_ITEMS = _RESULT_TABLE_HISTORY.pop()
# Clear display items so get_last_result_items() falls back to restored items
_DISPLAY_ITEMS = []
_DISPLAY_TABLE = None
return True
def get_display_table() -> Optional[Any]:
"""Get the current display overlay table.
Returns:
The ResultTable object, or None if no overlay table is set
"""
return _DISPLAY_TABLE
def get_last_result_table() -> Optional[Any]:
"""Get the current last result table.
Returns:
The ResultTable object, or None if no table is set
"""
return _LAST_RESULT_TABLE
def get_last_result_items() -> List[Any]:
"""Get the items available for @N selection.
Returns items from display/action commands (get-tag, delete-tag, etc.) if available,
otherwise returns items from the last search command. This ensures @N selection
works for both display operations and search results.
Returns:
List of items, or empty list if no prior results
"""
# Prioritize items from display commands (get-tag, delete-tag, etc.)
# These are available for immediate @N selection
if _DISPLAY_ITEMS:
return _DISPLAY_ITEMS
# Fall back to items from last search/selectable command
return _LAST_RESULT_ITEMS
def get_last_result_table_source_command() -> Optional[str]:
"""Get the source command from the last displayed result table.
Returns:
Command name (e.g., 'download-data') or None if not set
"""
if _LAST_RESULT_TABLE and hasattr(_LAST_RESULT_TABLE, 'source_command'):
return _LAST_RESULT_TABLE.source_command
return None
def get_last_result_table_source_args() -> List[str]:
"""Get the base source arguments from the last displayed result table.
Returns:
List of arguments (e.g., ['https://example.com']) or empty list
"""
if _LAST_RESULT_TABLE and hasattr(_LAST_RESULT_TABLE, 'source_args'):
return _LAST_RESULT_TABLE.source_args or []
return []
def get_last_result_table_row_selection_args(row_index: int) -> Optional[List[str]]:
"""Get the selection arguments for a specific row in the last result table.
Args:
row_index: Index of the row (0-based)
Returns:
Selection arguments (e.g., ['-item', '3']) or None
"""
if _LAST_RESULT_TABLE and hasattr(_LAST_RESULT_TABLE, 'rows'):
if 0 <= row_index < len(_LAST_RESULT_TABLE.rows):
row = _LAST_RESULT_TABLE.rows[row_index]
if hasattr(row, 'selection_args'):
return row.selection_args
return None
def set_current_stage_table(result_table: Optional[Any]) -> None:
"""Store the current pipeline stage table for @N expansion.
Used by cmdlets that display tabular results (e.g., download-data with formats)
to make their result table available for @N expansion logic.
Does NOT push to history - purely for command expansion in the current pipeline.
Args:
result_table: The ResultTable object (or None to clear)
"""
global _CURRENT_STAGE_TABLE
_CURRENT_STAGE_TABLE = result_table
def get_current_stage_table_source_command() -> Optional[str]:
"""Get the source command from the current pipeline stage table.
Returns:
Command name (e.g., 'download-data') or None
"""
if _CURRENT_STAGE_TABLE and hasattr(_CURRENT_STAGE_TABLE, 'source_command'):
return _CURRENT_STAGE_TABLE.source_command
return None
def get_current_stage_table_source_args() -> List[str]:
"""Get the source arguments from the current pipeline stage table.
Returns:
List of arguments or empty list
"""
if _CURRENT_STAGE_TABLE and hasattr(_CURRENT_STAGE_TABLE, 'source_args'):
return _CURRENT_STAGE_TABLE.source_args or []
return []
def get_current_stage_table_row_selection_args(row_index: int) -> Optional[List[str]]:
"""Get the selection arguments for a row in the current pipeline stage table.
Args:
row_index: Index of the row (0-based)
Returns:
Selection arguments or None
"""
if _CURRENT_STAGE_TABLE and hasattr(_CURRENT_STAGE_TABLE, 'rows'):
if 0 <= row_index < len(_CURRENT_STAGE_TABLE.rows):
row = _CURRENT_STAGE_TABLE.rows[row_index]
if hasattr(row, 'selection_args'):
return row.selection_args
return None
def clear_last_result() -> None:
"""Clear the stored last result table and items."""
global _LAST_RESULT_TABLE, _LAST_RESULT_ITEMS
_LAST_RESULT_TABLE = None
_LAST_RESULT_ITEMS = []
def emit_list(objects: List[Any]) -> None:
"""Emit a list of PipeObjects to the next pipeline stage.
This allows cmdlets to emit multiple results that are tracked as a list,
enabling downstream cmdlets to process all of them or filter by metadata.
Args:
objects: List of PipeObject instances or dicts to emit
"""
if _CURRENT_CONTEXT is not None:
_CURRENT_CONTEXT.emit(objects)
else:
_PIPE_EMITS.append(objects)