"""Pipeline execution utilities for the Textual UI. The TUI is a frontend to the CLI, so it must use the same pipeline executor implementation as the CLI (`CLI.PipelineExecutor`). """ from __future__ import annotations import contextlib import io import shlex import sys import traceback from pathlib import Path from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Sequence BASE_DIR = Path(__file__).resolve().parent ROOT_DIR = BASE_DIR.parent for path in (ROOT_DIR, BASE_DIR): str_path = str(path) if str_path not in sys.path: sys.path.insert(0, str_path) from SYS import pipeline as ctx from CLI import ConfigLoader from SYS.pipeline import PipelineExecutor from SYS.worker import WorkerManagerRegistry from SYS.logger import set_debug, debug from SYS.rich_display import capture_rich_output import traceback from SYS.result_table import Table @dataclass(slots=True) class PipelineStageResult: """Summary for a single pipeline stage.""" name: str args: Sequence[str] emitted: List[Any] = field(default_factory=list) result_table: Optional[Any] = None # ResultTable object if available status: str = "pending" error: Optional[str] = None @dataclass(slots=True) class PipelineRunResult: """Aggregate result for a pipeline run.""" pipeline: str success: bool stages: List[PipelineStageResult] = field(default_factory=list) emitted: List[Any] = field(default_factory=list) result_table: Optional[Any] = None # Final ResultTable object if available stdout: str = "" stderr: str = "" error: Optional[str] = None def to_summary(self) -> Dict[str, Any]: """Provide a JSON-friendly representation for logging or UI.""" return { "pipeline": self.pipeline, "success": self.success, "error": self.error, "stages": [ { "name": stage.name, "status": stage.status, "error": stage.error, "emitted": len(stage.emitted), } for stage in self.stages ], } class PipelineRunner: """TUI wrapper that delegates to the canonical CLI pipeline executor.""" def __init__(self, config_loader: Optional[Any] = None, executor: Optional[Any] = None) -> None: # Allow dependency injection or lazily construct CLI dependencies so tests # don't fail due to import-order issues in pytest environments. self._config_loader = config_loader if config_loader is not None else (ConfigLoader(root=ROOT_DIR) if ConfigLoader else None) if executor is not None: self._executor = executor else: self._executor = PipelineExecutor(config_loader=self._config_loader) self._worker_manager = None @property def worker_manager(self): return self._worker_manager def run_pipeline( self, pipeline_text: str, *, seeds: Optional[Any] = None, seed_table: Optional[Any] = None, isolate: bool = False, on_log: Optional[Callable[[str], None]] = None, ) -> PipelineRunResult: snapshot: Optional[Dict[str, Any]] = None if isolate: snapshot = self._snapshot_ctx_state() normalized = str(pipeline_text or "").strip() result = PipelineRunResult(pipeline=normalized, success=False) if not normalized: result.error = "Pipeline is empty" return result try: from SYS.cli_syntax import validate_pipeline_text syntax_error = validate_pipeline_text(normalized) if syntax_error: result.error = syntax_error.message result.stderr = syntax_error.message return result except Exception: debug(traceback.format_exc()) try: tokens = shlex.split(normalized) except Exception as exc: result.error = f"Syntax error: {exc}" result.stderr = result.error return result if not tokens: result.error = "Pipeline contains no tokens" return result config = self._config_loader.load() try: set_debug(bool(config.get("debug", False))) except Exception: debug(traceback.format_exc()) try: self._worker_manager = WorkerManagerRegistry.ensure(config) except Exception: debug(traceback.format_exc()) self._worker_manager = None ctx.reset() ctx.set_current_command_text(normalized) if seeds is not None: try: if not isinstance(seeds, list): seeds = [seeds] ctx.set_last_result_items_only(list(seeds)) except Exception: debug(traceback.format_exc()) if seed_table is not None: try: ctx.set_current_stage_table(seed_table) except Exception: debug(traceback.format_exc()) stdout_buffer = io.StringIO() stderr_buffer = io.StringIO() try: with capture_rich_output(stdout=stdout_buffer, stderr=stderr_buffer): with ( contextlib.redirect_stdout(stdout_buffer), contextlib.redirect_stderr(stderr_buffer), ): if on_log: on_log("Executing pipeline via CLI executor...") self._executor.execute_tokens(list(tokens)) except Exception as exc: result.error = f"{type(exc).__name__}: {exc}" finally: try: ctx.clear_current_command_text() except Exception: debug(traceback.format_exc()) result.stdout = stdout_buffer.getvalue() result.stderr = stderr_buffer.getvalue() # Pull the canonical state out of pipeline context. table = None try: table = ( ctx.get_display_table() or ctx.get_current_stage_table() or ctx.get_last_result_table() ) except Exception: table = None items: List[Any] = [] try: items = list(ctx.get_last_result_items() or []) except Exception: items = [] if table is None and items: try: synth = Table("Results") for item in items: synth.add_result(item) table = synth except Exception: table = None result.emitted = items result.result_table = table combined = (result.stdout + "\n" + result.stderr).strip().lower() failure_markers = ( "unknown command:", "pipeline order error:", "invalid selection:", "invalid pipeline syntax", "failed to execute pipeline", "[error]", ) if result.error: result.success = False elif any(m in combined for m in failure_markers): result.success = False if not result.error: result.error = "Pipeline failed" else: result.success = True if isolate and snapshot is not None: try: self._restore_ctx_state(snapshot) except Exception: # Best-effort; isolation should never break normal operation. pass return result @staticmethod def _snapshot_ctx_state() -> Dict[str, Any]: """Best-effort snapshot of pipeline context using PipelineState. This reads from the active PipelineState (ContextVar or global fallback) to produce a consistent snapshot that can be restored later. """ def _copy(val: Any) -> Any: if isinstance(val, list): return val.copy() if isinstance(val, dict): return val.copy() return val state = ctx.get_pipeline_state() snap: Dict[str, Any] = {} # Simple scalar/list/dict fields snap["live_progress"] = _copy(state.live_progress) snap["current_context"] = state.current_context snap["last_search_query"] = state.last_search_query snap["pipeline_refreshed"] = state.pipeline_refreshed snap["last_items"] = _copy(state.last_items) snap["last_result_table"] = state.last_result_table snap["last_result_items"] = _copy(state.last_result_items) snap["last_result_subject"] = state.last_result_subject # Deep-copy history/forward stacks (copy nested item lists) def _copy_history(hist: Optional[List[tuple]]) -> List[tuple]: out: List[tuple] = [] try: for (t, items, subj) in list(hist or []): items_copy = items.copy() if isinstance(items, list) else list(items) if items else [] out.append((t, items_copy, subj)) except Exception: debug(traceback.format_exc()) return out snap["result_table_history"] = _copy_history(state.result_table_history) snap["result_table_forward"] = _copy_history(state.result_table_forward) snap["current_stage_table"] = state.current_stage_table snap["display_items"] = _copy(state.display_items) snap["display_table"] = state.display_table snap["display_subject"] = state.display_subject snap["last_selection"] = _copy(state.last_selection) snap["pipeline_command_text"] = state.pipeline_command_text snap["current_cmdlet_name"] = state.current_cmdlet_name snap["current_stage_text"] = state.current_stage_text snap["pipeline_values"] = _copy(state.pipeline_values) if isinstance(state.pipeline_values, dict) else state.pipeline_values snap["pending_pipeline_tail"] = [list(stage) for stage in (state.pending_pipeline_tail or [])] snap["pending_pipeline_source"] = state.pending_pipeline_source snap["ui_library_refresh_callback"] = state.ui_library_refresh_callback snap["pipeline_stop"] = state.pipeline_stop return snap @staticmethod def _restore_ctx_state(snapshot: Dict[str, Any]) -> None: if not snapshot: return state = ctx.get_pipeline_state() # Helper for restoring history-like stacks def _restore_history(key: str, val: Any) -> None: try: if isinstance(val, list): out: List[tuple] = [] for (t, items, subj) in val: items_copy = items.copy() if isinstance(items, list) else list(items) if items else [] out.append((t, items_copy, subj)) setattr(state, key, out) except Exception: debug(traceback.format_exc()) try: if "live_progress" in snapshot: state.live_progress = snapshot["live_progress"] if "current_context" in snapshot: state.current_context = snapshot["current_context"] if "last_search_query" in snapshot: state.last_search_query = snapshot["last_search_query"] if "pipeline_refreshed" in snapshot: state.pipeline_refreshed = snapshot["pipeline_refreshed"] if "last_items" in snapshot: state.last_items = snapshot["last_items"] or [] if "last_result_table" in snapshot: state.last_result_table = snapshot["last_result_table"] if "last_result_items" in snapshot: state.last_result_items = snapshot["last_result_items"] or [] if "last_result_subject" in snapshot: state.last_result_subject = snapshot["last_result_subject"] if "result_table_history" in snapshot: _restore_history("result_table_history", snapshot["result_table_history"]) if "result_table_forward" in snapshot: _restore_history("result_table_forward", snapshot["result_table_forward"]) if "current_stage_table" in snapshot: state.current_stage_table = snapshot["current_stage_table"] if "display_items" in snapshot: state.display_items = snapshot["display_items"] or [] if "display_table" in snapshot: state.display_table = snapshot["display_table"] if "display_subject" in snapshot: state.display_subject = snapshot["display_subject"] if "last_selection" in snapshot: state.last_selection = snapshot["last_selection"] or [] if "pipeline_command_text" in snapshot: state.pipeline_command_text = snapshot["pipeline_command_text"] or "" if "current_cmdlet_name" in snapshot: state.current_cmdlet_name = snapshot["current_cmdlet_name"] or "" if "current_stage_text" in snapshot: state.current_stage_text = snapshot["current_stage_text"] or "" if "pipeline_values" in snapshot: state.pipeline_values = snapshot["pipeline_values"] or {} if "pending_pipeline_tail" in snapshot: state.pending_pipeline_tail = snapshot["pending_pipeline_tail"] or [] if "pending_pipeline_source" in snapshot: state.pending_pipeline_source = snapshot["pending_pipeline_source"] if "ui_library_refresh_callback" in snapshot: state.ui_library_refresh_callback = snapshot["ui_library_refresh_callback"] if "pipeline_stop" in snapshot: state.pipeline_stop = snapshot["pipeline_stop"] except Exception: # Best-effort; don't break the pipeline runner pass