"""Pipeline execution utilities for the Textual UI. The TUI is a frontend to the CLI, so it must use the same pipeline executor implementation as the CLI (`CLI.PipelineExecutor`). """ from __future__ import annotations import contextlib import io import shlex import sys from pathlib import Path from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Sequence BASE_DIR = Path(__file__).resolve().parent ROOT_DIR = BASE_DIR.parent for path in (ROOT_DIR, BASE_DIR): str_path = str(path) if str_path not in sys.path: sys.path.insert(0, str_path) from SYS import pipeline as ctx # Lazily import CLI dependencies to avoid import-time failures in test environments try: from CLI import ConfigLoader, PipelineExecutor as CLIPipelineExecutor, WorkerManagerRegistry except Exception: ConfigLoader = None CLIPipelineExecutor = None WorkerManagerRegistry = None from SYS.logger import set_debug from SYS.rich_display import capture_rich_output from SYS.result_table import ResultTable @dataclass(slots=True) class PipelineStageResult: """Summary for a single pipeline stage.""" name: str args: Sequence[str] emitted: List[Any] = field(default_factory=list) result_table: Optional[Any] = None # ResultTable object if available status: str = "pending" error: Optional[str] = None @dataclass(slots=True) class PipelineRunResult: """Aggregate result for a pipeline run.""" pipeline: str success: bool stages: List[PipelineStageResult] = field(default_factory=list) emitted: List[Any] = field(default_factory=list) result_table: Optional[Any] = None # Final ResultTable object if available stdout: str = "" stderr: str = "" error: Optional[str] = None def to_summary(self) -> Dict[str, Any]: """Provide a JSON-friendly representation for logging or UI.""" return { "pipeline": self.pipeline, "success": self.success, "error": self.error, "stages": [ { "name": stage.name, "status": stage.status, "error": stage.error, "emitted": len(stage.emitted), } for stage in self.stages ], } class PipelineRunner: """TUI wrapper that delegates to the canonical CLI pipeline executor.""" def __init__(self, config_loader: Optional[Any] = None, executor: Optional[Any] = None) -> None: # Allow dependency injection or lazily construct CLI dependencies so tests # don't fail due to import-order issues in pytest environments. self._config_loader = config_loader if config_loader is not None else (ConfigLoader(root=ROOT_DIR) if ConfigLoader else None) if executor is not None: self._executor = executor else: self._executor = CLIPipelineExecutor(config_loader=self._config_loader) if CLIPipelineExecutor else None self._worker_manager = None @property def worker_manager(self): return self._worker_manager def run_pipeline( self, pipeline_text: str, *, seeds: Optional[Any] = None, isolate: bool = False, on_log: Optional[Callable[[str], None]] = None, ) -> PipelineRunResult: snapshot: Optional[Dict[str, Any]] = None if isolate: snapshot = self._snapshot_ctx_state() normalized = str(pipeline_text or "").strip() result = PipelineRunResult(pipeline=normalized, success=False) if not normalized: result.error = "Pipeline is empty" return result try: from SYS.cli_syntax import validate_pipeline_text syntax_error = validate_pipeline_text(normalized) if syntax_error: result.error = syntax_error.message result.stderr = syntax_error.message return result except Exception: pass try: tokens = shlex.split(normalized) except Exception as exc: result.error = f"Syntax error: {exc}" result.stderr = result.error return result if not tokens: result.error = "Pipeline contains no tokens" return result config = self._config_loader.load() try: set_debug(bool(config.get("debug", False))) except Exception: pass try: self._worker_manager = WorkerManagerRegistry.ensure(config) except Exception: self._worker_manager = None ctx.reset() ctx.set_current_command_text(normalized) if seeds is not None: try: if not isinstance(seeds, list): seeds = [seeds] ctx.set_last_result_items_only(list(seeds)) except Exception: pass stdout_buffer = io.StringIO() stderr_buffer = io.StringIO() try: with capture_rich_output(stdout=stdout_buffer, stderr=stderr_buffer): with ( contextlib.redirect_stdout(stdout_buffer), contextlib.redirect_stderr(stderr_buffer), ): if on_log: on_log("Executing pipeline via CLI executor...") self._executor.execute_tokens(list(tokens)) except Exception as exc: result.error = f"{type(exc).__name__}: {exc}" finally: try: ctx.clear_current_command_text() except Exception: pass result.stdout = stdout_buffer.getvalue() result.stderr = stderr_buffer.getvalue() # Pull the canonical state out of pipeline context. table = None try: table = ( ctx.get_display_table() or ctx.get_current_stage_table() or ctx.get_last_result_table() ) except Exception: table = None items: List[Any] = [] try: items = list(ctx.get_last_result_items() or []) except Exception: items = [] if table is None and items: try: synth = ResultTable("Results") for item in items: synth.add_result(item) table = synth except Exception: table = None result.emitted = items result.result_table = table combined = (result.stdout + "\n" + result.stderr).strip().lower() failure_markers = ( "unknown command:", "pipeline order error:", "invalid selection:", "invalid pipeline syntax", "failed to execute pipeline", "[error]", ) if result.error: result.success = False elif any(m in combined for m in failure_markers): result.success = False if not result.error: result.error = "Pipeline failed" else: result.success = True if isolate and snapshot is not None: try: self._restore_ctx_state(snapshot) except Exception: # Best-effort; isolation should never break normal operation. pass return result @staticmethod def _snapshot_ctx_state() -> Dict[str, Any]: """Best-effort snapshot of pipeline context using PipelineState. This reads from the active PipelineState (ContextVar or global fallback) to produce a consistent snapshot that can be restored later. """ def _copy(val: Any) -> Any: if isinstance(val, list): return val.copy() if isinstance(val, dict): return val.copy() return val state = ctx.get_pipeline_state() snap: Dict[str, Any] = {} # Simple scalar/list/dict fields snap["live_progress"] = _copy(state.live_progress) snap["current_context"] = state.current_context snap["last_search_query"] = state.last_search_query snap["pipeline_refreshed"] = state.pipeline_refreshed snap["last_items"] = _copy(state.last_items) snap["last_result_table"] = state.last_result_table snap["last_result_items"] = _copy(state.last_result_items) snap["last_result_subject"] = state.last_result_subject # Deep-copy history/forward stacks (copy nested item lists) def _copy_history(hist: Optional[List[tuple]]) -> List[tuple]: out: List[tuple] = [] try: for (t, items, subj) in list(hist or []): items_copy = items.copy() if isinstance(items, list) else list(items) if items else [] out.append((t, items_copy, subj)) except Exception: pass return out snap["result_table_history"] = _copy_history(state.result_table_history) snap["result_table_forward"] = _copy_history(state.result_table_forward) snap["current_stage_table"] = state.current_stage_table snap["display_items"] = _copy(state.display_items) snap["display_table"] = state.display_table snap["display_subject"] = state.display_subject snap["last_selection"] = _copy(state.last_selection) snap["pipeline_command_text"] = state.pipeline_command_text snap["current_cmdlet_name"] = state.current_cmdlet_name snap["current_stage_text"] = state.current_stage_text snap["pipeline_values"] = _copy(state.pipeline_values) if isinstance(state.pipeline_values, dict) else state.pipeline_values snap["pending_pipeline_tail"] = [list(stage) for stage in (state.pending_pipeline_tail or [])] snap["pending_pipeline_source"] = state.pending_pipeline_source snap["ui_library_refresh_callback"] = state.ui_library_refresh_callback snap["pipeline_stop"] = state.pipeline_stop return snap @staticmethod def _restore_ctx_state(snapshot: Dict[str, Any]) -> None: if not snapshot: return state = ctx.get_pipeline_state() # Helper for restoring history-like stacks def _restore_history(key: str, val: Any) -> None: try: if isinstance(val, list): out: List[tuple] = [] for (t, items, subj) in val: items_copy = items.copy() if isinstance(items, list) else list(items) if items else [] out.append((t, items_copy, subj)) setattr(state, key, out) except Exception: pass try: if "live_progress" in snapshot: state.live_progress = snapshot["live_progress"] if "current_context" in snapshot: state.current_context = snapshot["current_context"] if "last_search_query" in snapshot: state.last_search_query = snapshot["last_search_query"] if "pipeline_refreshed" in snapshot: state.pipeline_refreshed = snapshot["pipeline_refreshed"] if "last_items" in snapshot: state.last_items = snapshot["last_items"] or [] if "last_result_table" in snapshot: state.last_result_table = snapshot["last_result_table"] if "last_result_items" in snapshot: state.last_result_items = snapshot["last_result_items"] or [] if "last_result_subject" in snapshot: state.last_result_subject = snapshot["last_result_subject"] if "result_table_history" in snapshot: _restore_history("result_table_history", snapshot["result_table_history"]) if "result_table_forward" in snapshot: _restore_history("result_table_forward", snapshot["result_table_forward"]) if "current_stage_table" in snapshot: state.current_stage_table = snapshot["current_stage_table"] if "display_items" in snapshot: state.display_items = snapshot["display_items"] or [] if "display_table" in snapshot: state.display_table = snapshot["display_table"] if "display_subject" in snapshot: state.display_subject = snapshot["display_subject"] if "last_selection" in snapshot: state.last_selection = snapshot["last_selection"] or [] if "pipeline_command_text" in snapshot: state.pipeline_command_text = snapshot["pipeline_command_text"] or "" if "current_cmdlet_name" in snapshot: state.current_cmdlet_name = snapshot["current_cmdlet_name"] or "" if "current_stage_text" in snapshot: state.current_stage_text = snapshot["current_stage_text"] or "" if "pipeline_values" in snapshot: state.pipeline_values = snapshot["pipeline_values"] or {} if "pending_pipeline_tail" in snapshot: state.pending_pipeline_tail = snapshot["pending_pipeline_tail"] or [] if "pending_pipeline_source" in snapshot: state.pending_pipeline_source = snapshot["pending_pipeline_source"] if "ui_library_refresh_callback" in snapshot: state.ui_library_refresh_callback = snapshot["ui_library_refresh_callback"] if "pipeline_stop" in snapshot: state.pipeline_stop = snapshot["pipeline_stop"] except Exception: # Best-effort; don't break the pipeline runner pass # Ensure module-level variables reflect restored state ctx.sync_module_state(state)