"""Modern Textual UI for driving Medeia-Macina pipelines.""" from __future__ import annotations import sys from pathlib import Path from typing import Any, List, Optional, Sequence from textual import work from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Container, Horizontal, Vertical, VerticalScroll from textual.widgets import ( Button, DataTable, Footer, Header, Input, ListItem, ListView, Static, TextArea, Tree, ) BASE_DIR = Path(__file__).resolve().parent ROOT_DIR = BASE_DIR.parent for path in (BASE_DIR, ROOT_DIR): str_path = str(path) if str_path not in sys.path: sys.path.insert(0, str_path) from menu_actions import ( # type: ignore # noqa: E402 PIPELINE_PRESETS, PipelinePreset, ) from pipeline_runner import PipelineExecutor, PipelineRunResult # type: ignore # noqa: E402 from result_table import ResultTable # type: ignore # noqa: E402 class PresetListItem(ListItem): """List entry that stores its pipeline preset.""" def __init__(self, preset: PipelinePreset) -> None: super().__init__( Static( f"[b]{preset.label}[/b]\n[pale_green4]{preset.description}[/pale_green4]", classes="preset-entry", ) ) self.preset = preset class PipelineHubApp(App): """Textual front-end that executes cmdlet pipelines inline.""" CSS_PATH = "tui.tcss" BINDINGS = [ Binding("ctrl+enter", "run_pipeline", "Run Pipeline"), Binding("f5", "refresh_workers", "Refresh Workers"), Binding("ctrl+l", "focus_command", "Focus Input", show=False), ] def __init__(self) -> None: super().__init__() self.executor = PipelineExecutor() self.result_items: List[Any] = [] self.log_lines: List[str] = [] self.command_input: Optional[Input] = None self.log_output: Optional[TextArea] = None self.results_table: Optional[DataTable] = None self.metadata_tree: Optional[Tree] = None self.worker_table: Optional[DataTable] = None self.preset_list: Optional[ListView] = None self.status_panel: Optional[Static] = None self.current_result_table: Optional[ResultTable] = None self._pipeline_running = False # ------------------------------------------------------------------ # Layout # ------------------------------------------------------------------ def compose(self) -> ComposeResult: # noqa: D401 - Textual compose hook yield Header(show_clock=True) with Container(id="app-shell"): with Horizontal(id="command-pane"): self.command_input = Input( placeholder='download-data "" | merge-file | add-tag | add-file -storage local', id="pipeline-input", ) yield self.command_input yield Button("Run", id="run-button", variant="primary") self.status_panel = Static("Idle", id="status-panel") yield self.status_panel with Horizontal(id="content-row"): with VerticalScroll(id="left-pane"): yield Static("Pipeline Presets", classes="section-title") self.preset_list = ListView( *(PresetListItem(preset) for preset in PIPELINE_PRESETS), id="preset-list", ) yield self.preset_list yield Static("Logs", classes="section-title") self.log_output = TextArea(id="log-output", read_only=True) yield self.log_output yield Static("Workers", classes="section-title") self.worker_table = DataTable(id="workers-table") yield self.worker_table with Vertical(id="right-pane"): yield Static("Results", classes="section-title") self.results_table = DataTable(id="results-table") yield self.results_table yield Static("Metadata", classes="section-title") self.metadata_tree = Tree("Run a pipeline", id="metadata-tree") yield self.metadata_tree yield Footer() def on_mount(self) -> None: if self.results_table: self.results_table.add_columns("Row", "Title", "Source", "File") if self.worker_table: self.worker_table.add_columns("ID", "Type", "Status", "Details") if self.executor.worker_manager: self.set_interval(2.0, self.refresh_workers) self.refresh_workers() if self.command_input: self.command_input.focus() # ------------------------------------------------------------------ # Actions # ------------------------------------------------------------------ def action_focus_command(self) -> None: if self.command_input: self.command_input.focus() def action_run_pipeline(self) -> None: if self._pipeline_running: self.notify("Pipeline already running", severity="warning", timeout=3) return if not self.command_input: return pipeline_text = self.command_input.value.strip() if not pipeline_text: self.notify("Enter a pipeline to run", severity="warning", timeout=3) return self._pipeline_running = True self._set_status("Running…", level="info") self._clear_log() self._append_log_line(f"$ {pipeline_text}") self._clear_results() self._run_pipeline_background(pipeline_text) def action_refresh_workers(self) -> None: self.refresh_workers() # ------------------------------------------------------------------ # Event handlers # ------------------------------------------------------------------ def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "run-button": self.action_run_pipeline() def on_input_submitted(self, event: Input.Submitted) -> None: if event.input.id == "pipeline-input": self.action_run_pipeline() def on_list_view_selected(self, event: ListView.Selected) -> None: if isinstance(event.item, PresetListItem) and self.command_input: self.command_input.value = event.item.preset.pipeline self.notify(f"Loaded preset: {event.item.preset.label}", timeout=2) event.stop() def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: if not self.results_table or event.control is not self.results_table: return index = event.cursor_row if 0 <= index < len(self.result_items): self._display_metadata(index) # ------------------------------------------------------------------ # Pipeline execution helpers # ------------------------------------------------------------------ @work(exclusive=True, thread=True) def _run_pipeline_background(self, pipeline_text: str) -> None: run_result = self.executor.run_pipeline(pipeline_text, on_log=self._log_from_worker) self.call_from_thread(self._on_pipeline_finished, run_result) def _on_pipeline_finished(self, run_result: PipelineRunResult) -> None: self._pipeline_running = False status_level = "success" if run_result.success else "error" status_text = "Completed" if run_result.success else "Failed" self._set_status(status_text, level=status_level) if not run_result.success: self.notify(run_result.error or "Pipeline failed", severity="error", timeout=6) else: self.notify("Pipeline completed", timeout=3) if run_result.stdout.strip(): self._append_log_line("stdout:") self._append_block(run_result.stdout) if run_result.stderr.strip(): self._append_log_line("stderr:") self._append_block(run_result.stderr) for stage in run_result.stages: summary = f"[{stage.status}] {stage.name} -> {len(stage.emitted)} item(s)" if stage.error: summary += f" ({stage.error})" self._append_log_line(summary) emitted = run_result.emitted if isinstance(emitted, list): self.result_items = emitted elif emitted: self.result_items = [emitted] else: self.result_items = [] self.current_result_table = run_result.result_table self._populate_results_table() self.refresh_workers() def _log_from_worker(self, message: str) -> None: self.call_from_thread(self._append_log_line, message) # ------------------------------------------------------------------ # UI helpers # ------------------------------------------------------------------ def _populate_results_table(self) -> None: if not self.results_table: return self.results_table.clear(columns=True) if self.current_result_table and self.current_result_table.rows: # Use ResultTable headers from the first row first_row = self.current_result_table.rows[0] headers = ["#"] + [col.name for col in first_row.columns] self.results_table.add_columns(*headers) rows = self.current_result_table.to_datatable_rows() for idx, row_values in enumerate(rows, 1): self.results_table.add_row(str(idx), *row_values, key=str(idx - 1)) else: # Fallback or empty state self.results_table.add_columns("Row", "Title", "Source", "File") if not self.result_items: self.results_table.add_row("—", "No results", "", "") return # Fallback for items without a table for idx, item in enumerate(self.result_items, start=1): self.results_table.add_row(str(idx), str(item), "—", "—", key=str(idx - 1)) def _display_metadata(self, index: int) -> None: if not self.metadata_tree: return root = self.metadata_tree.root root.label = "Metadata" root.remove_children() if self.current_result_table and 0 <= index < len(self.current_result_table.rows): row = self.current_result_table.rows[index] for col in row.columns: root.add(f"[b]{col.name}[/b]: {col.value}") elif 0 <= index < len(self.result_items): item = self.result_items[index] if isinstance(item, dict): self._populate_tree_node(root, item) else: root.add(str(item)) def _populate_tree_node(self, node, data: Any) -> None: if isinstance(data, dict): for key, value in data.items(): child = node.add(f"[b]{key}[/b]") self._populate_tree_node(child, value) elif isinstance(data, Sequence) and not isinstance(data, (str, bytes)): for idx, value in enumerate(data): child = node.add(f"[{idx}]") self._populate_tree_node(child, value) else: node.add(str(data)) def _clear_log(self) -> None: self.log_lines = [] if self.log_output: self.log_output.text = "" def _append_log_line(self, line: str) -> None: self.log_lines.append(line) if len(self.log_lines) > 500: self.log_lines = self.log_lines[-500:] if self.log_output: self.log_output.text = "\n".join(self.log_lines) def _append_block(self, text: str) -> None: for line in text.strip().splitlines(): self._append_log_line(f" {line}") def _clear_results(self) -> None: self.result_items = [] if self.results_table: self.results_table.clear() if self.metadata_tree: self.metadata_tree.root.label = "Awaiting results" self.metadata_tree.root.remove_children() def _set_status(self, message: str, *, level: str = "info") -> None: if not self.status_panel: return for css in ("status-info", "status-success", "status-error"): self.status_panel.remove_class(css) css_class = f"status-{level if level in {'success', 'error'} else 'info'}" self.status_panel.add_class(css_class) self.status_panel.update(message) def refresh_workers(self) -> None: if not self.worker_table: return manager = self.executor.worker_manager self.worker_table.clear() if manager is None: self.worker_table.add_row("—", "—", "—", "Worker manager unavailable") return workers = manager.get_active_workers() if not workers: self.worker_table.add_row("—", "—", "—", "No active workers") return for worker in workers: worker_id = str(worker.get("worker_id") or worker.get("id") or "?")[:8] worker_type = str(worker.get("worker_type") or worker.get("type") or "?") status = str(worker.get("status") or worker.get("result") or "running") details = worker.get("current_step") or worker.get("description") or worker.get("pipe") or "" self.worker_table.add_row(worker_id, worker_type, status, str(details)[:80]) if __name__ == "__main__": PipelineHubApp().run()