Files
Medios-Macina/TUI/modalscreen/workers.py

586 lines
27 KiB
Python
Raw Permalink Normal View History

2025-11-25 20:09:33 -08:00
"""Workers modal screen for monitoring and managing background tasks."""
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.containers import Horizontal, Vertical
from textual.widgets import Static, Button, DataTable, TextArea
from textual.binding import Binding
from textual.message import Message
import logging
from typing import Optional, Dict, List, Any
from pathlib import Path
import sys
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
logger = logging.getLogger(__name__)
class WorkersModal(ModalScreen):
"""Modal screen for monitoring running and finished workers."""
BINDINGS = [
Binding("escape", "cancel", "Cancel"),
]
CSS_PATH = "workers.tcss"
class WorkerUpdated(Message):
"""Posted when worker list is updated."""
def __init__(self, workers: List[Dict[str, Any]]) -> None:
self.workers = workers
super().__init__()
class WorkerCancelled(Message):
"""Posted when user cancels a worker."""
def __init__(self, worker_id: str) -> None:
self.worker_id = worker_id
super().__init__()
def __init__(self, app_instance=None):
"""Initialize the workers modal.
Args:
app_instance: Reference to the hub app for accessing worker info
"""
super().__init__()
self.app_instance = app_instance
self.running_table: Optional[DataTable] = None
self.finished_table: Optional[DataTable] = None
self.stdout_display: Optional[TextArea] = None
self.running_workers: List[Dict[str, Any]] = []
self.finished_workers: List[Dict[str, Any]] = []
self.selected_worker_id: Optional[str] = None
self.show_running = False # Start with finished tab
def compose(self) -> ComposeResult:
"""Create child widgets for the workers modal."""
with Vertical(id="workers-container"):
# Title with toggle buttons
with Horizontal(id="workers-title-bar"):
yield Static("Workers Monitor", id="workers-title")
yield Button("Running", id="toggle-running-btn", variant="primary")
yield Button("Finished", id="toggle-finished-btn", variant="default")
# Running tab content (initially hidden)
with Vertical(id="running-section"):
self.running_table = DataTable(id="running-table")
yield self.running_table
with Horizontal(id="running-controls"):
yield Button("Refresh", id="running-refresh-btn", variant="primary")
yield Button("Stop Selected", id="running-stop-btn", variant="warning")
yield Button("Stop All", id="running-stop-all-btn", variant="error")
# Finished tab content (initially visible)
with Vertical(id="finished-section"):
self.finished_table = DataTable(id="finished-table")
yield self.finished_table
with Horizontal(id="finished-controls"):
yield Button("Refresh", id="finished-refresh-btn", variant="primary")
yield Button("Clear Selected", id="finished-clear-btn", variant="warning")
yield Button("Clear All", id="finished-clear-all-btn", variant="error")
# Shared textarea for displaying worker logs
with Vertical(id="logs-section"):
yield Static("Worker Logs:", id="logs-label")
self.stdout_display = TextArea(id="stdout-display", read_only=True)
yield self.stdout_display
with Horizontal(id="workers-buttons"):
yield Button("Close", id="close-btn", variant="primary")
def on_mount(self) -> None:
"""Set up the tables and load worker data."""
# Set up running workers table
if self.running_table:
self.running_table.add_columns(
"ID",
"Type",
"Status",
"Pipe",
"Progress",
"Started",
"Details"
)
self.running_table.zebra_stripes = True
# Set up finished workers table
if self.finished_table:
self.finished_table.add_columns(
"ID",
"Type",
"Result",
"Pipe",
"Started",
"Completed",
"Duration",
"Details"
)
self.finished_table.zebra_stripes = True
# Set initial view (show finished by default)
self._update_view_visibility()
# Load initial data
self.refresh_workers()
# Don't set up periodic refresh - it was causing issues with stdout display
# Users can click the Refresh button to update manually
def refresh_workers(self) -> None:
"""Refresh the workers data from app instance."""
try:
if not self.app_instance:
logger.warning("[workers-modal] No app instance provided")
return
# Get running workers from app instance
# This assumes the app has a get_running_workers() method
if hasattr(self.app_instance, 'get_running_workers'):
self.running_workers = self.app_instance.get_running_workers()
else:
self.running_workers = []
# Get finished workers from app instance
if hasattr(self.app_instance, 'get_finished_workers'):
self.finished_workers = self.app_instance.get_finished_workers()
if self.finished_workers:
logger.info(f"[workers-modal-refresh] Got {len(self.finished_workers)} finished workers from app")
# Log the keys in the first worker to verify structure
if isinstance(self.finished_workers[0], dict):
logger.info(f"[workers-modal-refresh] First worker keys: {list(self.finished_workers[0].keys())}")
logger.info(f"[workers-modal-refresh] First worker: {self.finished_workers[0]}")
else:
logger.warning(f"[workers-modal-refresh] First worker is not a dict: {type(self.finished_workers[0])}")
else:
self.finished_workers = []
# Update tables
self._update_running_table()
self._update_finished_table()
logger.info(f"[workers-modal] Refreshed: {len(self.running_workers)} running, {len(self.finished_workers)} finished")
except Exception as e:
logger.error(f"[workers-modal] Error refreshing workers: {e}")
def _update_view_visibility(self) -> None:
"""Toggle visibility between running and finished views."""
try:
running_section = self.query_one("#running-section", Vertical)
finished_section = self.query_one("#finished-section", Vertical)
toggle_running_btn = self.query_one("#toggle-running-btn", Button)
toggle_finished_btn = self.query_one("#toggle-finished-btn", Button)
if self.show_running:
running_section.display = True
finished_section.display = False
toggle_running_btn.variant = "primary"
toggle_finished_btn.variant = "default"
logger.debug("[workers-modal] Switched to Running view")
else:
running_section.display = False
finished_section.display = True
toggle_running_btn.variant = "default"
toggle_finished_btn.variant = "primary"
logger.debug("[workers-modal] Switched to Finished view")
except Exception as e:
logger.error(f"[workers-modal] Error updating view visibility: {e}")
def _update_running_table(self) -> None:
"""Update the running workers table."""
try:
if not self.running_table:
logger.error("[workers-modal] Running table not initialized")
return
self.running_table.clear()
if not self.running_workers:
self.running_table.add_row("---", "---", "---", "---", "---", "---", "No workers running")
logger.debug(f"[workers-modal] No running workers to display")
return
logger.debug(f"[workers-modal] Updating running table with {len(self.running_workers)} workers")
for idx, worker_info in enumerate(self.running_workers):
try:
worker_id = worker_info.get('id', 'unknown')
worker_type = worker_info.get('type', 'unknown')
status = worker_info.get('status', 'running')
progress = worker_info.get('progress', '')
started = worker_info.get('started', '')
details = worker_info.get('details', '')
pipe = worker_info.get('pipe', '')
# Ensure values are strings
worker_id = str(worker_id) if worker_id else 'unknown'
worker_type = str(worker_type) if worker_type else 'unknown'
status = str(status) if status else 'running'
progress = str(progress) if progress else '---'
started = str(started) if started else '---'
details = str(details) if details else '---'
pipe_display = self._summarize_pipe(pipe)
# Truncate long strings
progress = progress[:20]
started = started[:19]
details = details[:30]
pipe_display = pipe_display[:40]
self.running_table.add_row(
worker_id[:8],
worker_type[:15],
status[:10],
pipe_display,
progress,
started,
details
)
if idx == 0: # Log first entry
logger.debug(f"[workers-modal] Added running row {idx}: {worker_id[:8]} {worker_type[:15]} {status}")
except Exception as row_error:
logger.error(f"[workers-modal] Error adding running row {idx}: {row_error}", exc_info=True)
logger.debug(f"[workers-modal] Updated running table with {len(self.running_workers)} workers")
except Exception as e:
logger.error(f"[workers-modal] Error updating running table: {e}", exc_info=True)
def _update_finished_table(self) -> None:
"""Update the finished workers table."""
try:
if not self.finished_table:
logger.error("[workers-modal] Finished table not initialized")
return
self.finished_table.clear()
if not self.finished_workers:
self.finished_table.add_row("---", "---", "---", "---", "---", "---", "---", "No finished workers")
logger.debug(f"[workers-modal] No finished workers to display")
return
logger.info(f"[workers-modal-update] STARTING to update finished table with {len(self.finished_workers)} workers")
added_count = 0
error_count = 0
for idx, worker_info in enumerate(self.finished_workers):
try:
worker_id = worker_info.get('id', 'unknown')
worker_type = worker_info.get('type', 'unknown')
result = worker_info.get('result', 'unknown')
completed = worker_info.get('completed', '')
duration = worker_info.get('duration', '')
details = worker_info.get('details', '')
pipe = worker_info.get('pipe', '')
started = worker_info.get('started', '')
# Ensure values are strings
worker_id = str(worker_id) if worker_id else 'unknown'
worker_type = str(worker_type) if worker_type else 'unknown'
result = str(result) if result else 'unknown'
completed = str(completed) if completed else '---'
duration = str(duration) if duration else '---'
details = str(details) if details else '---'
started = str(started) if started else '---'
pipe_display = self._summarize_pipe(pipe)
# Truncate long strings
result = result[:15]
completed = completed[:19]
started = started[:19]
duration = duration[:10]
details = details[:30]
pipe_display = pipe_display[:40]
self.finished_table.add_row(
worker_id[:8],
worker_type[:15],
result,
pipe_display,
started,
completed,
duration,
details
)
added_count += 1
except Exception as row_error:
error_count += 1
logger.error(f"[workers-modal-update] Error adding finished row {idx}: {row_error}", exc_info=True)
logger.info(f"[workers-modal-update] COMPLETED: Added {added_count}/{len(self.finished_workers)} finished workers (errors: {error_count})")
logger.debug(f"[workers-modal-update] Finished table row_count after update: {self.finished_table.row_count}")
except Exception as e:
logger.error(f"[workers-modal] Error updating finished table: {e}", exc_info=True)
def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
"""Handle row highlight in tables - display stdout."""
try:
logger.info(f"[workers-modal] Row highlighted, cursor_row: {event.cursor_row}")
# Get the selected worker from the correct table
workers_list = None
if event.control == self.running_table:
workers_list = self.running_workers
logger.debug(f"[workers-modal] Highlighted in running table")
elif event.control == self.finished_table:
workers_list = self.finished_workers
logger.debug(f"[workers-modal] Highlighted in finished table, list size: {len(workers_list)}")
else:
logger.warning(f"[workers-modal] Unknown table: {event.control}")
return
# Get the worker at this row
if workers_list and 0 <= event.cursor_row < len(workers_list):
worker = workers_list[event.cursor_row]
worker_id = worker.get('id', '')
logger.info(f"[workers-modal] Highlighted worker: {worker_id}")
if worker_id:
self.selected_worker_id = worker_id
# Display the stdout
self._update_stdout_display(worker_id, worker)
else:
logger.warning(f"[workers-modal] Row {event.cursor_row} out of bounds for list of size {len(workers_list) if workers_list else 0}")
except Exception as e:
logger.error(f"[workers-modal] Error handling row highlight: {e}", exc_info=True)
def on_data_table_cell_highlighted(self, event: DataTable.CellHighlighted) -> None:
"""Handle cell highlight in tables - display stdout (backup for row selection)."""
try:
# CellHighlighted has coordinate (row, column) not cursor_row
cursor_row = event.coordinate.row
logger.debug(f"[workers-modal] Cell highlighted, row: {cursor_row}, column: {event.coordinate.column}")
# Get the selected worker from the correct table
workers_list = None
if event.data_table == self.running_table:
workers_list = self.running_workers
logger.debug(f"[workers-modal] Cell highlighted in running table")
elif event.data_table == self.finished_table:
workers_list = self.finished_workers
logger.debug(f"[workers-modal] Cell highlighted in finished table, list size: {len(workers_list)}")
else:
return
# Get the worker at this row
if workers_list and 0 <= cursor_row < len(workers_list):
worker = workers_list[cursor_row]
worker_id = worker.get('id', '')
if worker_id and worker_id != self.selected_worker_id:
logger.info(f"[workers-modal] Cell-highlighted worker: {worker_id}")
self.selected_worker_id = worker_id
# Display the stdout
self._update_stdout_display(worker_id, worker)
except Exception as e:
logger.debug(f"[workers-modal] Error handling cell highlight: {e}")
def _update_stdout_display(self, worker_id: str, worker: Optional[Dict[str, Any]] = None) -> None:
"""Update the stdout textarea with logs from the selected worker."""
try:
if not self.stdout_display:
logger.error("[workers-modal] stdout_display not initialized")
return
logger.debug(f"[workers-modal] Updating stdout display for worker: {worker_id}")
worker_data = worker or self._locate_worker(worker_id)
stdout_text = self._resolve_worker_stdout(worker_id, worker_data)
pipe_text = self._resolve_worker_pipe(worker_id, worker_data)
events = self._get_worker_events(worker_id)
timeline_text = self._format_worker_timeline(events)
sections = []
if pipe_text:
sections.append(f"Pipe:\n{pipe_text}")
if timeline_text:
sections.append("Timeline:\n" + timeline_text)
logs_body = (stdout_text or "").strip()
sections.append("Logs:\n" + (logs_body if logs_body else "(no logs recorded)"))
combined_text = "\n\n".join(sections)
logger.debug(f"[workers-modal] Setting textarea to {len(combined_text)} chars (stdout_len={len(stdout_text or '')})")
self.stdout_display.text = combined_text
if len(combined_text) > 10:
try:
self.stdout_display.cursor_location = (len(combined_text) - 1, 0)
except Exception:
pass
logger.info(f"[workers-modal] Updated stdout display successfully")
except Exception as e:
logger.error(f"[workers-modal] Error updating stdout display: {e}", exc_info=True)
def _locate_worker(self, worker_id: str) -> Optional[Dict[str, Any]]:
for worker in self.running_workers or []:
if isinstance(worker, dict) and worker.get('id') == worker_id:
return worker
for worker in self.finished_workers or []:
if isinstance(worker, dict) and worker.get('id') == worker_id:
return worker
return None
def _resolve_worker_stdout(self, worker_id: str, worker: Optional[Dict[str, Any]]) -> str:
if worker and worker.get('stdout'):
return worker.get('stdout', '') or ''
manager = getattr(self.app_instance, 'worker_manager', None)
if manager:
try:
return manager.get_stdout(worker_id) or ''
except Exception as exc:
logger.debug(f"[workers-modal] Could not fetch stdout for {worker_id}: {exc}")
return ''
def _resolve_worker_pipe(self, worker_id: str, worker: Optional[Dict[str, Any]]) -> str:
if worker and worker.get('pipe'):
return str(worker.get('pipe'))
record = self._fetch_worker_record(worker_id)
if record and record.get('pipe'):
return str(record.get('pipe'))
return ''
def _fetch_worker_record(self, worker_id: str) -> Optional[Dict[str, Any]]:
manager = getattr(self.app_instance, 'worker_manager', None)
if not manager:
return None
try:
return manager.get_worker(worker_id)
except Exception as exc:
logger.debug(f"[workers-modal] Could not fetch worker record {worker_id}: {exc}")
return None
def _get_worker_events(self, worker_id: str, limit: int = 250) -> List[Dict[str, Any]]:
manager = getattr(self.app_instance, 'worker_manager', None)
if not manager:
return []
try:
return manager.get_worker_events(worker_id, limit=limit)
except Exception as exc:
logger.debug(f"[workers-modal] Could not fetch worker events {worker_id}: {exc}")
return []
def _format_worker_timeline(self, events: List[Dict[str, Any]]) -> str:
if not events:
return ""
lines: List[str] = []
for event in events:
timestamp = self._format_event_timestamp(event.get('created_at'))
label = (event.get('event_type') or '').upper() or 'EVENT'
channel = (event.get('channel') or '').upper()
if channel and channel not in label:
label = f"{label}/{channel}"
step = event.get('step') or ''
message = event.get('message') or ''
prefix = ''
if event.get('event_type') == 'step' and step:
prefix = f"{step} :: "
elif step and step not in message:
prefix = f"{step} :: "
formatted_message = self._format_message_block(message)
lines.append(f"[{timestamp}] {label}: {prefix}{formatted_message}")
return "\n".join(lines)
def _format_event_timestamp(self, raw_timestamp: Any) -> str:
if not raw_timestamp:
return "--:--:--"
text = str(raw_timestamp)
if "T" in text:
time_part = text.split("T", 1)[1]
elif " " in text:
time_part = text.split(" ", 1)[1]
else:
time_part = text
return time_part[:8] if len(time_part) >= 8 else time_part
def _format_message_block(self, message: str) -> str:
clean = (message or '').strip()
if not clean:
return "(empty)"
lines = clean.splitlines()
if len(lines) == 1:
return lines[0]
head, *rest = lines
indented = "\n".join(f" {line}" for line in rest)
return f"{head}\n{indented}"
def _summarize_pipe(self, pipe_value: Any, limit: int = 40) -> str:
text = str(pipe_value or '').strip()
if not text:
return "(none)"
return text if len(text) <= limit else text[: limit - 3] + '...'
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
button_id = event.button.id
try:
if button_id == "toggle-running-btn":
self.show_running = True
self._update_view_visibility()
return
elif button_id == "toggle-finished-btn":
self.show_running = False
self._update_view_visibility()
return
if button_id == "running-refresh-btn":
self.refresh_workers()
elif button_id == "running-stop-btn":
# Stop selected running worker
if self.running_table and self.running_table.row_count > 0:
try:
selected_row = self.running_table.cursor_row
if 0 <= selected_row < len(self.running_workers):
worker = self.running_workers[selected_row]
worker_id = worker.get('id')
if self.app_instance and hasattr(self.app_instance, 'stop_worker'):
self.app_instance.stop_worker(worker_id)
logger.info(f"[workers-modal] Stopped worker: {worker_id}")
self.refresh_workers()
except Exception as e:
logger.error(f"[workers-modal] Error stopping worker: {e}")
elif button_id == "running-stop-all-btn":
# Stop all running workers
if self.app_instance and hasattr(self.app_instance, 'stop_all_workers'):
self.app_instance.stop_all_workers()
logger.info("[workers-modal] Stopped all workers")
self.refresh_workers()
elif button_id == "finished-refresh-btn":
self.refresh_workers()
elif button_id == "finished-clear-btn":
# Clear selected finished worker
if self.finished_table and self.finished_table.row_count > 0:
try:
selected_row = self.finished_table.cursor_row
if 0 <= selected_row < len(self.finished_workers):
worker = self.finished_workers[selected_row]
worker_id = worker.get('id')
if self.app_instance and hasattr(self.app_instance, 'clear_finished_worker'):
self.app_instance.clear_finished_worker(worker_id)
logger.info(f"[workers-modal] Cleared worker: {worker_id}")
self.refresh_workers()
except Exception as e:
logger.error(f"[workers-modal] Error clearing worker: {e}")
elif button_id == "finished-clear-all-btn":
# Clear all finished workers
if self.app_instance and hasattr(self.app_instance, 'clear_all_finished_workers'):
self.app_instance.clear_all_finished_workers()
logger.info("[workers-modal] Cleared all finished workers")
self.refresh_workers()
elif button_id == "close-btn":
self.dismiss(None)
except Exception as e:
logger.error(f"[workers-modal] Error in on_button_pressed: {e}")
def action_cancel(self) -> None:
"""Action for Escape key - close modal."""
self.dismiss(None)