Files
Medios-Macina/TUI/pipeline_runner.py
2025-12-30 04:47:13 -08:00

367 lines
14 KiB
Python

"""Pipeline execution utilities for the Textual UI.
The TUI is a frontend to the CLI, so it must use the same pipeline executor
implementation as the CLI (`CLI.PipelineExecutor`).
"""
from __future__ import annotations
import contextlib
import io
import shlex
import sys
from pathlib import Path
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Sequence
BASE_DIR = Path(__file__).resolve().parent
ROOT_DIR = BASE_DIR.parent
for path in (ROOT_DIR, BASE_DIR):
str_path = str(path)
if str_path not in sys.path:
sys.path.insert(0, str_path)
from SYS import pipeline as ctx
# Lazily import CLI dependencies to avoid import-time failures in test environments
try:
from CLI import ConfigLoader, PipelineExecutor as CLIPipelineExecutor, WorkerManagerRegistry
except Exception:
ConfigLoader = None
CLIPipelineExecutor = None
WorkerManagerRegistry = None
from SYS.logger import set_debug
from SYS.rich_display import capture_rich_output
from SYS.result_table import ResultTable
@dataclass(slots=True)
class PipelineStageResult:
"""Summary for a single pipeline stage."""
name: str
args: Sequence[str]
emitted: List[Any] = field(default_factory=list)
result_table: Optional[Any] = None # ResultTable object if available
status: str = "pending"
error: Optional[str] = None
@dataclass(slots=True)
class PipelineRunResult:
"""Aggregate result for a pipeline run."""
pipeline: str
success: bool
stages: List[PipelineStageResult] = field(default_factory=list)
emitted: List[Any] = field(default_factory=list)
result_table: Optional[Any] = None # Final ResultTable object if available
stdout: str = ""
stderr: str = ""
error: Optional[str] = None
def to_summary(self) -> Dict[str, Any]:
"""Provide a JSON-friendly representation for logging or UI."""
return {
"pipeline":
self.pipeline,
"success":
self.success,
"error":
self.error,
"stages": [
{
"name": stage.name,
"status": stage.status,
"error": stage.error,
"emitted": len(stage.emitted),
} for stage in self.stages
],
}
class PipelineRunner:
"""TUI wrapper that delegates to the canonical CLI pipeline executor."""
def __init__(self, config_loader: Optional[Any] = None, executor: Optional[Any] = None) -> None:
# Allow dependency injection or lazily construct CLI dependencies so tests
# don't fail due to import-order issues in pytest environments.
self._config_loader = config_loader if config_loader is not None else (ConfigLoader(root=ROOT_DIR) if ConfigLoader else None)
if executor is not None:
self._executor = executor
else:
self._executor = CLIPipelineExecutor(config_loader=self._config_loader) if CLIPipelineExecutor else None
self._worker_manager = None
@property
def worker_manager(self):
return self._worker_manager
def run_pipeline(
self,
pipeline_text: str,
*,
seeds: Optional[Any] = None,
isolate: bool = False,
on_log: Optional[Callable[[str],
None]] = None,
) -> PipelineRunResult:
snapshot: Optional[Dict[str, Any]] = None
if isolate:
snapshot = self._snapshot_ctx_state()
normalized = str(pipeline_text or "").strip()
result = PipelineRunResult(pipeline=normalized, success=False)
if not normalized:
result.error = "Pipeline is empty"
return result
try:
from SYS.cli_syntax import validate_pipeline_text
syntax_error = validate_pipeline_text(normalized)
if syntax_error:
result.error = syntax_error.message
result.stderr = syntax_error.message
return result
except Exception:
pass
try:
tokens = shlex.split(normalized)
except Exception as exc:
result.error = f"Syntax error: {exc}"
result.stderr = result.error
return result
if not tokens:
result.error = "Pipeline contains no tokens"
return result
config = self._config_loader.load()
try:
set_debug(bool(config.get("debug", False)))
except Exception:
pass
try:
self._worker_manager = WorkerManagerRegistry.ensure(config)
except Exception:
self._worker_manager = None
ctx.reset()
ctx.set_current_command_text(normalized)
if seeds is not None:
try:
if not isinstance(seeds, list):
seeds = [seeds]
ctx.set_last_result_items_only(list(seeds))
except Exception:
pass
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()
try:
with capture_rich_output(stdout=stdout_buffer, stderr=stderr_buffer):
with (
contextlib.redirect_stdout(stdout_buffer),
contextlib.redirect_stderr(stderr_buffer),
):
if on_log:
on_log("Executing pipeline via CLI executor...")
self._executor.execute_tokens(list(tokens))
except Exception as exc:
result.error = f"{type(exc).__name__}: {exc}"
finally:
try:
ctx.clear_current_command_text()
except Exception:
pass
result.stdout = stdout_buffer.getvalue()
result.stderr = stderr_buffer.getvalue()
# Pull the canonical state out of pipeline context.
table = None
try:
table = (
ctx.get_display_table() or ctx.get_current_stage_table()
or ctx.get_last_result_table()
)
except Exception:
table = None
items: List[Any] = []
try:
items = list(ctx.get_last_result_items() or [])
except Exception:
items = []
if table is None and items:
try:
synth = ResultTable("Results")
for item in items:
synth.add_result(item)
table = synth
except Exception:
table = None
result.emitted = items
result.result_table = table
combined = (result.stdout + "\n" + result.stderr).strip().lower()
failure_markers = (
"unknown command:",
"pipeline order error:",
"invalid selection:",
"invalid pipeline syntax",
"failed to execute pipeline",
"[error]",
)
if result.error:
result.success = False
elif any(m in combined for m in failure_markers):
result.success = False
if not result.error:
result.error = "Pipeline failed"
else:
result.success = True
if isolate and snapshot is not None:
try:
self._restore_ctx_state(snapshot)
except Exception:
# Best-effort; isolation should never break normal operation.
pass
return result
@staticmethod
def _snapshot_ctx_state() -> Dict[str, Any]:
"""Best-effort snapshot of pipeline context using PipelineState.
This reads from the active PipelineState (ContextVar or global fallback)
to produce a consistent snapshot that can be restored later.
"""
def _copy(val: Any) -> Any:
if isinstance(val, list):
return val.copy()
if isinstance(val, dict):
return val.copy()
return val
state = ctx.get_pipeline_state()
snap: Dict[str, Any] = {}
# Simple scalar/list/dict fields
snap["live_progress"] = _copy(state.live_progress)
snap["current_context"] = state.current_context
snap["last_search_query"] = state.last_search_query
snap["pipeline_refreshed"] = state.pipeline_refreshed
snap["last_items"] = _copy(state.last_items)
snap["last_result_table"] = state.last_result_table
snap["last_result_items"] = _copy(state.last_result_items)
snap["last_result_subject"] = state.last_result_subject
# Deep-copy history/forward stacks (copy nested item lists)
def _copy_history(hist: Optional[List[tuple]]) -> List[tuple]:
out: List[tuple] = []
try:
for (t, items, subj) in list(hist or []):
items_copy = items.copy() if isinstance(items, list) else list(items) if items else []
out.append((t, items_copy, subj))
except Exception:
pass
return out
snap["result_table_history"] = _copy_history(state.result_table_history)
snap["result_table_forward"] = _copy_history(state.result_table_forward)
snap["current_stage_table"] = state.current_stage_table
snap["display_items"] = _copy(state.display_items)
snap["display_table"] = state.display_table
snap["display_subject"] = state.display_subject
snap["last_selection"] = _copy(state.last_selection)
snap["pipeline_command_text"] = state.pipeline_command_text
snap["current_cmdlet_name"] = state.current_cmdlet_name
snap["current_stage_text"] = state.current_stage_text
snap["pipeline_values"] = _copy(state.pipeline_values) if isinstance(state.pipeline_values, dict) else state.pipeline_values
snap["pending_pipeline_tail"] = [list(stage) for stage in (state.pending_pipeline_tail or [])]
snap["pending_pipeline_source"] = state.pending_pipeline_source
snap["ui_library_refresh_callback"] = state.ui_library_refresh_callback
snap["pipeline_stop"] = state.pipeline_stop
return snap
@staticmethod
def _restore_ctx_state(snapshot: Dict[str, Any]) -> None:
if not snapshot:
return
state = ctx.get_pipeline_state()
# Helper for restoring history-like stacks
def _restore_history(key: str, val: Any) -> None:
try:
if isinstance(val, list):
out: List[tuple] = []
for (t, items, subj) in val:
items_copy = items.copy() if isinstance(items, list) else list(items) if items else []
out.append((t, items_copy, subj))
setattr(state, key, out)
except Exception:
pass
try:
if "live_progress" in snapshot:
state.live_progress = snapshot["live_progress"]
if "current_context" in snapshot:
state.current_context = snapshot["current_context"]
if "last_search_query" in snapshot:
state.last_search_query = snapshot["last_search_query"]
if "pipeline_refreshed" in snapshot:
state.pipeline_refreshed = snapshot["pipeline_refreshed"]
if "last_items" in snapshot:
state.last_items = snapshot["last_items"] or []
if "last_result_table" in snapshot:
state.last_result_table = snapshot["last_result_table"]
if "last_result_items" in snapshot:
state.last_result_items = snapshot["last_result_items"] or []
if "last_result_subject" in snapshot:
state.last_result_subject = snapshot["last_result_subject"]
if "result_table_history" in snapshot:
_restore_history("result_table_history", snapshot["result_table_history"])
if "result_table_forward" in snapshot:
_restore_history("result_table_forward", snapshot["result_table_forward"])
if "current_stage_table" in snapshot:
state.current_stage_table = snapshot["current_stage_table"]
if "display_items" in snapshot:
state.display_items = snapshot["display_items"] or []
if "display_table" in snapshot:
state.display_table = snapshot["display_table"]
if "display_subject" in snapshot:
state.display_subject = snapshot["display_subject"]
if "last_selection" in snapshot:
state.last_selection = snapshot["last_selection"] or []
if "pipeline_command_text" in snapshot:
state.pipeline_command_text = snapshot["pipeline_command_text"] or ""
if "current_cmdlet_name" in snapshot:
state.current_cmdlet_name = snapshot["current_cmdlet_name"] or ""
if "current_stage_text" in snapshot:
state.current_stage_text = snapshot["current_stage_text"] or ""
if "pipeline_values" in snapshot:
state.pipeline_values = snapshot["pipeline_values"] or {}
if "pending_pipeline_tail" in snapshot:
state.pending_pipeline_tail = snapshot["pending_pipeline_tail"] or []
if "pending_pipeline_source" in snapshot:
state.pending_pipeline_source = snapshot["pending_pipeline_source"]
if "ui_library_refresh_callback" in snapshot:
state.ui_library_refresh_callback = snapshot["ui_library_refresh_callback"]
if "pipeline_stop" in snapshot:
state.pipeline_stop = snapshot["pipeline_stop"]
except Exception:
# Best-effort; don't break the pipeline runner
pass