2025-11-25 20:09:33 -08:00
|
|
|
"""Modern Textual UI for driving Medeia-Macina pipelines."""
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import sys
|
|
|
|
|
from pathlib import Path
|
2025-11-27 10:59:01 -08:00
|
|
|
from typing import Any, List, Optional, Sequence
|
2025-11-25 20:09:33 -08:00
|
|
|
|
|
|
|
|
from textual import work
|
|
|
|
|
from textual.app import App, ComposeResult
|
|
|
|
|
from textual.binding import Binding
|
|
|
|
|
from textual.containers import Container, Horizontal, Vertical, VerticalScroll
|
|
|
|
|
from textual.widgets import (
|
|
|
|
|
Button,
|
|
|
|
|
DataTable,
|
|
|
|
|
Footer,
|
|
|
|
|
Header,
|
|
|
|
|
Input,
|
|
|
|
|
ListItem,
|
|
|
|
|
ListView,
|
|
|
|
|
Static,
|
|
|
|
|
TextArea,
|
|
|
|
|
Tree,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
|
|
|
ROOT_DIR = BASE_DIR.parent
|
|
|
|
|
for path in (BASE_DIR, ROOT_DIR):
|
|
|
|
|
str_path = str(path)
|
|
|
|
|
if str_path not in sys.path:
|
|
|
|
|
sys.path.insert(0, str_path)
|
|
|
|
|
|
|
|
|
|
from menu_actions import ( # type: ignore # noqa: E402
|
|
|
|
|
PIPELINE_PRESETS,
|
|
|
|
|
PipelinePreset,
|
|
|
|
|
)
|
|
|
|
|
from pipeline_runner import PipelineExecutor, PipelineRunResult # type: ignore # noqa: E402
|
2025-11-27 10:59:01 -08:00
|
|
|
from result_table import ResultTable # type: ignore # noqa: E402
|
2025-11-25 20:09:33 -08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class PresetListItem(ListItem):
|
|
|
|
|
"""List entry that stores its pipeline preset."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, preset: PipelinePreset) -> None:
|
|
|
|
|
super().__init__(
|
|
|
|
|
Static(
|
|
|
|
|
f"[b]{preset.label}[/b]\n[pale_green4]{preset.description}[/pale_green4]",
|
|
|
|
|
classes="preset-entry",
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
self.preset = preset
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PipelineHubApp(App):
|
|
|
|
|
"""Textual front-end that executes cmdlet pipelines inline."""
|
|
|
|
|
|
|
|
|
|
CSS_PATH = "tui.tcss"
|
|
|
|
|
BINDINGS = [
|
|
|
|
|
Binding("ctrl+enter", "run_pipeline", "Run Pipeline"),
|
|
|
|
|
Binding("f5", "refresh_workers", "Refresh Workers"),
|
|
|
|
|
Binding("ctrl+l", "focus_command", "Focus Input", show=False),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.executor = PipelineExecutor()
|
|
|
|
|
self.result_items: List[Any] = []
|
|
|
|
|
self.log_lines: List[str] = []
|
|
|
|
|
self.command_input: Optional[Input] = None
|
|
|
|
|
self.log_output: Optional[TextArea] = None
|
|
|
|
|
self.results_table: Optional[DataTable] = None
|
|
|
|
|
self.metadata_tree: Optional[Tree] = None
|
|
|
|
|
self.worker_table: Optional[DataTable] = None
|
|
|
|
|
self.preset_list: Optional[ListView] = None
|
|
|
|
|
self.status_panel: Optional[Static] = None
|
2025-11-27 10:59:01 -08:00
|
|
|
self.current_result_table: Optional[ResultTable] = None
|
2025-11-25 20:09:33 -08:00
|
|
|
self._pipeline_running = False
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# Layout
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
def compose(self) -> ComposeResult: # noqa: D401 - Textual compose hook
|
|
|
|
|
yield Header(show_clock=True)
|
|
|
|
|
with Container(id="app-shell"):
|
2025-11-27 10:59:01 -08:00
|
|
|
with Horizontal(id="command-pane"):
|
2025-11-25 20:09:33 -08:00
|
|
|
self.command_input = Input(
|
|
|
|
|
placeholder='download-data "<url>" | merge-file | add-tag | add-file -storage local',
|
|
|
|
|
id="pipeline-input",
|
|
|
|
|
)
|
|
|
|
|
yield self.command_input
|
|
|
|
|
yield Button("Run", id="run-button", variant="primary")
|
|
|
|
|
self.status_panel = Static("Idle", id="status-panel")
|
|
|
|
|
yield self.status_panel
|
|
|
|
|
with Horizontal(id="content-row"):
|
|
|
|
|
with VerticalScroll(id="left-pane"):
|
|
|
|
|
yield Static("Pipeline Presets", classes="section-title")
|
|
|
|
|
self.preset_list = ListView(
|
|
|
|
|
*(PresetListItem(preset) for preset in PIPELINE_PRESETS),
|
|
|
|
|
id="preset-list",
|
|
|
|
|
)
|
|
|
|
|
yield self.preset_list
|
|
|
|
|
yield Static("Logs", classes="section-title")
|
|
|
|
|
self.log_output = TextArea(id="log-output", read_only=True)
|
|
|
|
|
yield self.log_output
|
|
|
|
|
yield Static("Workers", classes="section-title")
|
|
|
|
|
self.worker_table = DataTable(id="workers-table")
|
|
|
|
|
yield self.worker_table
|
|
|
|
|
with Vertical(id="right-pane"):
|
|
|
|
|
yield Static("Results", classes="section-title")
|
|
|
|
|
self.results_table = DataTable(id="results-table")
|
|
|
|
|
yield self.results_table
|
|
|
|
|
yield Static("Metadata", classes="section-title")
|
|
|
|
|
self.metadata_tree = Tree("Run a pipeline", id="metadata-tree")
|
|
|
|
|
yield self.metadata_tree
|
|
|
|
|
yield Footer()
|
|
|
|
|
|
|
|
|
|
def on_mount(self) -> None:
|
|
|
|
|
if self.results_table:
|
|
|
|
|
self.results_table.add_columns("Row", "Title", "Source", "File")
|
|
|
|
|
if self.worker_table:
|
|
|
|
|
self.worker_table.add_columns("ID", "Type", "Status", "Details")
|
|
|
|
|
if self.executor.worker_manager:
|
|
|
|
|
self.set_interval(2.0, self.refresh_workers)
|
|
|
|
|
self.refresh_workers()
|
|
|
|
|
if self.command_input:
|
|
|
|
|
self.command_input.focus()
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# Actions
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
def action_focus_command(self) -> None:
|
|
|
|
|
if self.command_input:
|
|
|
|
|
self.command_input.focus()
|
|
|
|
|
|
|
|
|
|
def action_run_pipeline(self) -> None:
|
|
|
|
|
if self._pipeline_running:
|
|
|
|
|
self.notify("Pipeline already running", severity="warning", timeout=3)
|
|
|
|
|
return
|
|
|
|
|
if not self.command_input:
|
|
|
|
|
return
|
|
|
|
|
pipeline_text = self.command_input.value.strip()
|
|
|
|
|
if not pipeline_text:
|
|
|
|
|
self.notify("Enter a pipeline to run", severity="warning", timeout=3)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
self._pipeline_running = True
|
|
|
|
|
self._set_status("Running…", level="info")
|
|
|
|
|
self._clear_log()
|
|
|
|
|
self._append_log_line(f"$ {pipeline_text}")
|
|
|
|
|
self._clear_results()
|
|
|
|
|
self._run_pipeline_background(pipeline_text)
|
|
|
|
|
|
|
|
|
|
def action_refresh_workers(self) -> None:
|
|
|
|
|
self.refresh_workers()
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# Event handlers
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
|
|
|
if event.button.id == "run-button":
|
|
|
|
|
self.action_run_pipeline()
|
|
|
|
|
|
|
|
|
|
def on_input_submitted(self, event: Input.Submitted) -> None:
|
|
|
|
|
if event.input.id == "pipeline-input":
|
|
|
|
|
self.action_run_pipeline()
|
|
|
|
|
|
|
|
|
|
def on_list_view_selected(self, event: ListView.Selected) -> None:
|
|
|
|
|
if isinstance(event.item, PresetListItem) and self.command_input:
|
|
|
|
|
self.command_input.value = event.item.preset.pipeline
|
|
|
|
|
self.notify(f"Loaded preset: {event.item.preset.label}", timeout=2)
|
|
|
|
|
event.stop()
|
|
|
|
|
|
|
|
|
|
def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
|
|
|
|
|
if not self.results_table or event.control is not self.results_table:
|
|
|
|
|
return
|
|
|
|
|
index = event.cursor_row
|
|
|
|
|
if 0 <= index < len(self.result_items):
|
2025-11-27 10:59:01 -08:00
|
|
|
self._display_metadata(index)
|
2025-11-25 20:09:33 -08:00
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# Pipeline execution helpers
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
@work(exclusive=True, thread=True)
|
|
|
|
|
def _run_pipeline_background(self, pipeline_text: str) -> None:
|
|
|
|
|
run_result = self.executor.run_pipeline(pipeline_text, on_log=self._log_from_worker)
|
|
|
|
|
self.call_from_thread(self._on_pipeline_finished, run_result)
|
|
|
|
|
|
|
|
|
|
def _on_pipeline_finished(self, run_result: PipelineRunResult) -> None:
|
|
|
|
|
self._pipeline_running = False
|
|
|
|
|
status_level = "success" if run_result.success else "error"
|
|
|
|
|
status_text = "Completed" if run_result.success else "Failed"
|
|
|
|
|
self._set_status(status_text, level=status_level)
|
|
|
|
|
|
|
|
|
|
if not run_result.success:
|
|
|
|
|
self.notify(run_result.error or "Pipeline failed", severity="error", timeout=6)
|
|
|
|
|
else:
|
|
|
|
|
self.notify("Pipeline completed", timeout=3)
|
|
|
|
|
|
|
|
|
|
if run_result.stdout.strip():
|
|
|
|
|
self._append_log_line("stdout:")
|
|
|
|
|
self._append_block(run_result.stdout)
|
|
|
|
|
if run_result.stderr.strip():
|
|
|
|
|
self._append_log_line("stderr:")
|
|
|
|
|
self._append_block(run_result.stderr)
|
|
|
|
|
|
|
|
|
|
for stage in run_result.stages:
|
|
|
|
|
summary = f"[{stage.status}] {stage.name} -> {len(stage.emitted)} item(s)"
|
|
|
|
|
if stage.error:
|
|
|
|
|
summary += f" ({stage.error})"
|
|
|
|
|
self._append_log_line(summary)
|
|
|
|
|
|
|
|
|
|
emitted = run_result.emitted
|
|
|
|
|
if isinstance(emitted, list):
|
|
|
|
|
self.result_items = emitted
|
|
|
|
|
elif emitted:
|
|
|
|
|
self.result_items = [emitted]
|
|
|
|
|
else:
|
|
|
|
|
self.result_items = []
|
|
|
|
|
|
2025-11-27 10:59:01 -08:00
|
|
|
self.current_result_table = run_result.result_table
|
2025-11-25 20:09:33 -08:00
|
|
|
self._populate_results_table()
|
|
|
|
|
self.refresh_workers()
|
|
|
|
|
|
|
|
|
|
def _log_from_worker(self, message: str) -> None:
|
|
|
|
|
self.call_from_thread(self._append_log_line, message)
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# UI helpers
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
def _populate_results_table(self) -> None:
|
|
|
|
|
if not self.results_table:
|
|
|
|
|
return
|
2025-11-27 10:59:01 -08:00
|
|
|
self.results_table.clear(columns=True)
|
2025-11-25 20:09:33 -08:00
|
|
|
|
2025-11-27 10:59:01 -08:00
|
|
|
if self.current_result_table and self.current_result_table.rows:
|
|
|
|
|
# Use ResultTable headers from the first row
|
|
|
|
|
first_row = self.current_result_table.rows[0]
|
|
|
|
|
headers = ["#"] + [col.name for col in first_row.columns]
|
|
|
|
|
self.results_table.add_columns(*headers)
|
|
|
|
|
|
|
|
|
|
rows = self.current_result_table.to_datatable_rows()
|
|
|
|
|
for idx, row_values in enumerate(rows, 1):
|
|
|
|
|
self.results_table.add_row(str(idx), *row_values, key=str(idx - 1))
|
|
|
|
|
else:
|
|
|
|
|
# Fallback or empty state
|
|
|
|
|
self.results_table.add_columns("Row", "Title", "Source", "File")
|
|
|
|
|
if not self.result_items:
|
|
|
|
|
self.results_table.add_row("—", "No results", "", "")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# Fallback for items without a table
|
|
|
|
|
for idx, item in enumerate(self.result_items, start=1):
|
|
|
|
|
self.results_table.add_row(str(idx), str(item), "—", "—", key=str(idx - 1))
|
|
|
|
|
|
|
|
|
|
def _display_metadata(self, index: int) -> None:
|
2025-11-25 20:09:33 -08:00
|
|
|
if not self.metadata_tree:
|
|
|
|
|
return
|
|
|
|
|
root = self.metadata_tree.root
|
|
|
|
|
root.label = "Metadata"
|
|
|
|
|
root.remove_children()
|
|
|
|
|
|
2025-11-27 10:59:01 -08:00
|
|
|
if self.current_result_table and 0 <= index < len(self.current_result_table.rows):
|
|
|
|
|
row = self.current_result_table.rows[index]
|
|
|
|
|
for col in row.columns:
|
|
|
|
|
root.add(f"[b]{col.name}[/b]: {col.value}")
|
|
|
|
|
elif 0 <= index < len(self.result_items):
|
|
|
|
|
item = self.result_items[index]
|
|
|
|
|
if isinstance(item, dict):
|
|
|
|
|
self._populate_tree_node(root, item)
|
2025-11-25 20:09:33 -08:00
|
|
|
else:
|
2025-11-27 10:59:01 -08:00
|
|
|
root.add(str(item))
|
2025-11-25 20:09:33 -08:00
|
|
|
|
|
|
|
|
def _populate_tree_node(self, node, data: Any) -> None:
|
|
|
|
|
if isinstance(data, dict):
|
|
|
|
|
for key, value in data.items():
|
|
|
|
|
child = node.add(f"[b]{key}[/b]")
|
|
|
|
|
self._populate_tree_node(child, value)
|
|
|
|
|
elif isinstance(data, Sequence) and not isinstance(data, (str, bytes)):
|
|
|
|
|
for idx, value in enumerate(data):
|
|
|
|
|
child = node.add(f"[{idx}]")
|
|
|
|
|
self._populate_tree_node(child, value)
|
|
|
|
|
else:
|
|
|
|
|
node.add(str(data))
|
|
|
|
|
|
|
|
|
|
def _clear_log(self) -> None:
|
|
|
|
|
self.log_lines = []
|
|
|
|
|
if self.log_output:
|
2025-11-27 10:59:01 -08:00
|
|
|
self.log_output.text = ""
|
2025-11-25 20:09:33 -08:00
|
|
|
|
|
|
|
|
def _append_log_line(self, line: str) -> None:
|
|
|
|
|
self.log_lines.append(line)
|
|
|
|
|
if len(self.log_lines) > 500:
|
|
|
|
|
self.log_lines = self.log_lines[-500:]
|
|
|
|
|
if self.log_output:
|
2025-11-27 10:59:01 -08:00
|
|
|
self.log_output.text = "\n".join(self.log_lines)
|
2025-11-25 20:09:33 -08:00
|
|
|
|
|
|
|
|
def _append_block(self, text: str) -> None:
|
|
|
|
|
for line in text.strip().splitlines():
|
|
|
|
|
self._append_log_line(f" {line}")
|
|
|
|
|
|
|
|
|
|
def _clear_results(self) -> None:
|
|
|
|
|
self.result_items = []
|
|
|
|
|
if self.results_table:
|
|
|
|
|
self.results_table.clear()
|
|
|
|
|
if self.metadata_tree:
|
|
|
|
|
self.metadata_tree.root.label = "Awaiting results"
|
|
|
|
|
self.metadata_tree.root.remove_children()
|
|
|
|
|
|
|
|
|
|
def _set_status(self, message: str, *, level: str = "info") -> None:
|
|
|
|
|
if not self.status_panel:
|
|
|
|
|
return
|
|
|
|
|
for css in ("status-info", "status-success", "status-error"):
|
|
|
|
|
self.status_panel.remove_class(css)
|
|
|
|
|
css_class = f"status-{level if level in {'success', 'error'} else 'info'}"
|
|
|
|
|
self.status_panel.add_class(css_class)
|
|
|
|
|
self.status_panel.update(message)
|
|
|
|
|
|
|
|
|
|
def refresh_workers(self) -> None:
|
|
|
|
|
if not self.worker_table:
|
|
|
|
|
return
|
|
|
|
|
manager = self.executor.worker_manager
|
|
|
|
|
self.worker_table.clear()
|
|
|
|
|
if manager is None:
|
|
|
|
|
self.worker_table.add_row("—", "—", "—", "Worker manager unavailable")
|
|
|
|
|
return
|
|
|
|
|
workers = manager.get_active_workers()
|
|
|
|
|
if not workers:
|
|
|
|
|
self.worker_table.add_row("—", "—", "—", "No active workers")
|
|
|
|
|
return
|
|
|
|
|
for worker in workers:
|
|
|
|
|
worker_id = str(worker.get("worker_id") or worker.get("id") or "?")[:8]
|
|
|
|
|
worker_type = str(worker.get("worker_type") or worker.get("type") or "?")
|
|
|
|
|
status = str(worker.get("status") or worker.get("result") or "running")
|
|
|
|
|
details = worker.get("current_step") or worker.get("description") or worker.get("pipe") or ""
|
|
|
|
|
self.worker_table.add_row(worker_id, worker_type, status, str(details)[:80])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
PipelineHubApp().run()
|