"""Workers modal screen for monitoring and managing background tasks.""" from textual.app import ComposeResult from textual.screen import ModalScreen from textual.containers import Horizontal, Vertical from textual.widgets import Static, Button, DataTable, TextArea from textual.binding import Binding from textual.message import Message import logging from typing import Optional, Dict, List, Any from pathlib import Path import sys # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) logger = logging.getLogger(__name__) class WorkersModal(ModalScreen): """Modal screen for monitoring running and finished workers.""" BINDINGS = [ Binding("escape", "cancel", "Cancel"), ] CSS_PATH = "workers.tcss" class WorkerUpdated(Message): """Posted when worker list is updated.""" def __init__(self, workers: List[Dict[str, Any]]) -> None: self.workers = workers super().__init__() class WorkerCancelled(Message): """Posted when user cancels a worker.""" def __init__(self, worker_id: str) -> None: self.worker_id = worker_id super().__init__() def __init__(self, app_instance=None): """Initialize the workers modal. Args: app_instance: Reference to the hub app for accessing worker info """ super().__init__() self.app_instance = app_instance self.running_table: Optional[DataTable] = None self.finished_table: Optional[DataTable] = None self.stdout_display: Optional[TextArea] = None self.running_workers: List[Dict[str, Any]] = [] self.finished_workers: List[Dict[str, Any]] = [] self.selected_worker_id: Optional[str] = None self.show_running = False # Start with finished tab def compose(self) -> ComposeResult: """Create child widgets for the workers modal.""" with Vertical(id="workers-container"): # Title with toggle buttons with Horizontal(id="workers-title-bar"): yield Static("Workers Monitor", id="workers-title") yield Button("Running", id="toggle-running-btn", variant="primary") yield Button("Finished", id="toggle-finished-btn", variant="default") # Running tab content (initially hidden) with Vertical(id="running-section"): self.running_table = DataTable(id="running-table") yield self.running_table with Horizontal(id="running-controls"): yield Button("Refresh", id="running-refresh-btn", variant="primary") yield Button("Stop Selected", id="running-stop-btn", variant="warning") yield Button("Stop All", id="running-stop-all-btn", variant="error") # Finished tab content (initially visible) with Vertical(id="finished-section"): self.finished_table = DataTable(id="finished-table") yield self.finished_table with Horizontal(id="finished-controls"): yield Button("Refresh", id="finished-refresh-btn", variant="primary") yield Button("Clear Selected", id="finished-clear-btn", variant="warning") yield Button("Clear All", id="finished-clear-all-btn", variant="error") # Shared textarea for displaying worker logs with Vertical(id="logs-section"): yield Static("Worker Logs:", id="logs-label") self.stdout_display = TextArea(id="stdout-display", read_only=True) yield self.stdout_display with Horizontal(id="workers-buttons"): yield Button("Close", id="close-btn", variant="primary") def on_mount(self) -> None: """Set up the tables and load worker data.""" # Set up running workers table if self.running_table: self.running_table.add_columns( "ID", "Type", "Status", "Pipe", "Progress", "Started", "Details" ) self.running_table.zebra_stripes = True # Set up finished workers table if self.finished_table: self.finished_table.add_columns( "ID", "Type", "Result", "Pipe", "Started", "Completed", "Duration", "Details" ) self.finished_table.zebra_stripes = True # Set initial view (show finished by default) self._update_view_visibility() # Load initial data self.refresh_workers() # Don't set up periodic refresh - it was causing issues with stdout display # Users can click the Refresh button to update manually def refresh_workers(self) -> None: """Refresh the workers data from app instance.""" try: if not self.app_instance: logger.warning("[workers-modal] No app instance provided") return # Get running workers from app instance # This assumes the app has a get_running_workers() method if hasattr(self.app_instance, 'get_running_workers'): self.running_workers = self.app_instance.get_running_workers() else: self.running_workers = [] # Get finished workers from app instance if hasattr(self.app_instance, 'get_finished_workers'): self.finished_workers = self.app_instance.get_finished_workers() if self.finished_workers: logger.info(f"[workers-modal-refresh] Got {len(self.finished_workers)} finished workers from app") # Log the keys in the first worker to verify structure if isinstance(self.finished_workers[0], dict): logger.info(f"[workers-modal-refresh] First worker keys: {list(self.finished_workers[0].keys())}") logger.info(f"[workers-modal-refresh] First worker: {self.finished_workers[0]}") else: logger.warning(f"[workers-modal-refresh] First worker is not a dict: {type(self.finished_workers[0])}") else: self.finished_workers = [] # Update tables self._update_running_table() self._update_finished_table() logger.info(f"[workers-modal] Refreshed: {len(self.running_workers)} running, {len(self.finished_workers)} finished") except Exception as e: logger.error(f"[workers-modal] Error refreshing workers: {e}") def _update_view_visibility(self) -> None: """Toggle visibility between running and finished views.""" try: running_section = self.query_one("#running-section", Vertical) finished_section = self.query_one("#finished-section", Vertical) toggle_running_btn = self.query_one("#toggle-running-btn", Button) toggle_finished_btn = self.query_one("#toggle-finished-btn", Button) if self.show_running: running_section.display = True finished_section.display = False toggle_running_btn.variant = "primary" toggle_finished_btn.variant = "default" logger.debug("[workers-modal] Switched to Running view") else: running_section.display = False finished_section.display = True toggle_running_btn.variant = "default" toggle_finished_btn.variant = "primary" logger.debug("[workers-modal] Switched to Finished view") except Exception as e: logger.error(f"[workers-modal] Error updating view visibility: {e}") def _update_running_table(self) -> None: """Update the running workers table.""" try: if not self.running_table: logger.error("[workers-modal] Running table not initialized") return self.running_table.clear() if not self.running_workers: self.running_table.add_row("---", "---", "---", "---", "---", "---", "No workers running") logger.debug(f"[workers-modal] No running workers to display") return logger.debug(f"[workers-modal] Updating running table with {len(self.running_workers)} workers") for idx, worker_info in enumerate(self.running_workers): try: worker_id = worker_info.get('id', 'unknown') worker_type = worker_info.get('type', 'unknown') status = worker_info.get('status', 'running') progress = worker_info.get('progress', '') started = worker_info.get('started', '') details = worker_info.get('details', '') pipe = worker_info.get('pipe', '') # Ensure values are strings worker_id = str(worker_id) if worker_id else 'unknown' worker_type = str(worker_type) if worker_type else 'unknown' status = str(status) if status else 'running' progress = str(progress) if progress else '---' started = str(started) if started else '---' details = str(details) if details else '---' pipe_display = self._summarize_pipe(pipe) # Truncate long strings progress = progress[:20] started = started[:19] details = details[:30] pipe_display = pipe_display[:40] self.running_table.add_row( worker_id[:8], worker_type[:15], status[:10], pipe_display, progress, started, details ) if idx == 0: # Log first entry logger.debug(f"[workers-modal] Added running row {idx}: {worker_id[:8]} {worker_type[:15]} {status}") except Exception as row_error: logger.error(f"[workers-modal] Error adding running row {idx}: {row_error}", exc_info=True) logger.debug(f"[workers-modal] Updated running table with {len(self.running_workers)} workers") except Exception as e: logger.error(f"[workers-modal] Error updating running table: {e}", exc_info=True) def _update_finished_table(self) -> None: """Update the finished workers table.""" try: if not self.finished_table: logger.error("[workers-modal] Finished table not initialized") return self.finished_table.clear() if not self.finished_workers: self.finished_table.add_row("---", "---", "---", "---", "---", "---", "---", "No finished workers") logger.debug(f"[workers-modal] No finished workers to display") return logger.info(f"[workers-modal-update] STARTING to update finished table with {len(self.finished_workers)} workers") added_count = 0 error_count = 0 for idx, worker_info in enumerate(self.finished_workers): try: worker_id = worker_info.get('id', 'unknown') worker_type = worker_info.get('type', 'unknown') result = worker_info.get('result', 'unknown') completed = worker_info.get('completed', '') duration = worker_info.get('duration', '') details = worker_info.get('details', '') pipe = worker_info.get('pipe', '') started = worker_info.get('started', '') # Ensure values are strings worker_id = str(worker_id) if worker_id else 'unknown' worker_type = str(worker_type) if worker_type else 'unknown' result = str(result) if result else 'unknown' completed = str(completed) if completed else '---' duration = str(duration) if duration else '---' details = str(details) if details else '---' started = str(started) if started else '---' pipe_display = self._summarize_pipe(pipe) # Truncate long strings result = result[:15] completed = completed[:19] started = started[:19] duration = duration[:10] details = details[:30] pipe_display = pipe_display[:40] self.finished_table.add_row( worker_id[:8], worker_type[:15], result, pipe_display, started, completed, duration, details ) added_count += 1 except Exception as row_error: error_count += 1 logger.error(f"[workers-modal-update] Error adding finished row {idx}: {row_error}", exc_info=True) logger.info(f"[workers-modal-update] COMPLETED: Added {added_count}/{len(self.finished_workers)} finished workers (errors: {error_count})") logger.debug(f"[workers-modal-update] Finished table row_count after update: {self.finished_table.row_count}") except Exception as e: logger.error(f"[workers-modal] Error updating finished table: {e}", exc_info=True) def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: """Handle row highlight in tables - display stdout.""" try: logger.info(f"[workers-modal] Row highlighted, cursor_row: {event.cursor_row}") # Get the selected worker from the correct table workers_list = None if event.control == self.running_table: workers_list = self.running_workers logger.debug(f"[workers-modal] Highlighted in running table") elif event.control == self.finished_table: workers_list = self.finished_workers logger.debug(f"[workers-modal] Highlighted in finished table, list size: {len(workers_list)}") else: logger.warning(f"[workers-modal] Unknown table: {event.control}") return # Get the worker at this row if workers_list and 0 <= event.cursor_row < len(workers_list): worker = workers_list[event.cursor_row] worker_id = worker.get('id', '') logger.info(f"[workers-modal] Highlighted worker: {worker_id}") if worker_id: self.selected_worker_id = worker_id # Display the stdout self._update_stdout_display(worker_id, worker) else: logger.warning(f"[workers-modal] Row {event.cursor_row} out of bounds for list of size {len(workers_list) if workers_list else 0}") except Exception as e: logger.error(f"[workers-modal] Error handling row highlight: {e}", exc_info=True) def on_data_table_cell_highlighted(self, event: DataTable.CellHighlighted) -> None: """Handle cell highlight in tables - display stdout (backup for row selection).""" try: # CellHighlighted has coordinate (row, column) not cursor_row cursor_row = event.coordinate.row logger.debug(f"[workers-modal] Cell highlighted, row: {cursor_row}, column: {event.coordinate.column}") # Get the selected worker from the correct table workers_list = None if event.data_table == self.running_table: workers_list = self.running_workers logger.debug(f"[workers-modal] Cell highlighted in running table") elif event.data_table == self.finished_table: workers_list = self.finished_workers logger.debug(f"[workers-modal] Cell highlighted in finished table, list size: {len(workers_list)}") else: return # Get the worker at this row if workers_list and 0 <= cursor_row < len(workers_list): worker = workers_list[cursor_row] worker_id = worker.get('id', '') if worker_id and worker_id != self.selected_worker_id: logger.info(f"[workers-modal] Cell-highlighted worker: {worker_id}") self.selected_worker_id = worker_id # Display the stdout self._update_stdout_display(worker_id, worker) except Exception as e: logger.debug(f"[workers-modal] Error handling cell highlight: {e}") def _update_stdout_display(self, worker_id: str, worker: Optional[Dict[str, Any]] = None) -> None: """Update the stdout textarea with logs from the selected worker.""" try: if not self.stdout_display: logger.error("[workers-modal] stdout_display not initialized") return logger.debug(f"[workers-modal] Updating stdout display for worker: {worker_id}") worker_data = worker or self._locate_worker(worker_id) stdout_text = self._resolve_worker_stdout(worker_id, worker_data) pipe_text = self._resolve_worker_pipe(worker_id, worker_data) events = self._get_worker_events(worker_id) timeline_text = self._format_worker_timeline(events) sections = [] if pipe_text: sections.append(f"Pipe:\n{pipe_text}") if timeline_text: sections.append("Timeline:\n" + timeline_text) logs_body = (stdout_text or "").strip() sections.append("Logs:\n" + (logs_body if logs_body else "(no logs recorded)")) combined_text = "\n\n".join(sections) logger.debug(f"[workers-modal] Setting textarea to {len(combined_text)} chars (stdout_len={len(stdout_text or '')})") self.stdout_display.text = combined_text if len(combined_text) > 10: try: self.stdout_display.cursor_location = (len(combined_text) - 1, 0) except Exception: pass logger.info(f"[workers-modal] Updated stdout display successfully") except Exception as e: logger.error(f"[workers-modal] Error updating stdout display: {e}", exc_info=True) def _locate_worker(self, worker_id: str) -> Optional[Dict[str, Any]]: for worker in self.running_workers or []: if isinstance(worker, dict) and worker.get('id') == worker_id: return worker for worker in self.finished_workers or []: if isinstance(worker, dict) and worker.get('id') == worker_id: return worker return None def _resolve_worker_stdout(self, worker_id: str, worker: Optional[Dict[str, Any]]) -> str: if worker and worker.get('stdout'): return worker.get('stdout', '') or '' manager = getattr(self.app_instance, 'worker_manager', None) if manager: try: return manager.get_stdout(worker_id) or '' except Exception as exc: logger.debug(f"[workers-modal] Could not fetch stdout for {worker_id}: {exc}") return '' def _resolve_worker_pipe(self, worker_id: str, worker: Optional[Dict[str, Any]]) -> str: if worker and worker.get('pipe'): return str(worker.get('pipe')) record = self._fetch_worker_record(worker_id) if record and record.get('pipe'): return str(record.get('pipe')) return '' def _fetch_worker_record(self, worker_id: str) -> Optional[Dict[str, Any]]: manager = getattr(self.app_instance, 'worker_manager', None) if not manager: return None try: return manager.get_worker(worker_id) except Exception as exc: logger.debug(f"[workers-modal] Could not fetch worker record {worker_id}: {exc}") return None def _get_worker_events(self, worker_id: str, limit: int = 250) -> List[Dict[str, Any]]: manager = getattr(self.app_instance, 'worker_manager', None) if not manager: return [] try: return manager.get_worker_events(worker_id, limit=limit) except Exception as exc: logger.debug(f"[workers-modal] Could not fetch worker events {worker_id}: {exc}") return [] def _format_worker_timeline(self, events: List[Dict[str, Any]]) -> str: if not events: return "" lines: List[str] = [] for event in events: timestamp = self._format_event_timestamp(event.get('created_at')) label = (event.get('event_type') or '').upper() or 'EVENT' channel = (event.get('channel') or '').upper() if channel and channel not in label: label = f"{label}/{channel}" step = event.get('step') or '' message = event.get('message') or '' prefix = '' if event.get('event_type') == 'step' and step: prefix = f"{step} :: " elif step and step not in message: prefix = f"{step} :: " formatted_message = self._format_message_block(message) lines.append(f"[{timestamp}] {label}: {prefix}{formatted_message}") return "\n".join(lines) def _format_event_timestamp(self, raw_timestamp: Any) -> str: if not raw_timestamp: return "--:--:--" text = str(raw_timestamp) if "T" in text: time_part = text.split("T", 1)[1] elif " " in text: time_part = text.split(" ", 1)[1] else: time_part = text return time_part[:8] if len(time_part) >= 8 else time_part def _format_message_block(self, message: str) -> str: clean = (message or '').strip() if not clean: return "(empty)" lines = clean.splitlines() if len(lines) == 1: return lines[0] head, *rest = lines indented = "\n".join(f" {line}" for line in rest) return f"{head}\n{indented}" def _summarize_pipe(self, pipe_value: Any, limit: int = 40) -> str: text = str(pipe_value or '').strip() if not text: return "(none)" return text if len(text) <= limit else text[: limit - 3] + '...' def on_button_pressed(self, event: Button.Pressed) -> None: """Handle button presses.""" button_id = event.button.id try: if button_id == "toggle-running-btn": self.show_running = True self._update_view_visibility() return elif button_id == "toggle-finished-btn": self.show_running = False self._update_view_visibility() return if button_id == "running-refresh-btn": self.refresh_workers() elif button_id == "running-stop-btn": # Stop selected running worker if self.running_table and self.running_table.row_count > 0: try: selected_row = self.running_table.cursor_row if 0 <= selected_row < len(self.running_workers): worker = self.running_workers[selected_row] worker_id = worker.get('id') if self.app_instance and hasattr(self.app_instance, 'stop_worker'): self.app_instance.stop_worker(worker_id) logger.info(f"[workers-modal] Stopped worker: {worker_id}") self.refresh_workers() except Exception as e: logger.error(f"[workers-modal] Error stopping worker: {e}") elif button_id == "running-stop-all-btn": # Stop all running workers if self.app_instance and hasattr(self.app_instance, 'stop_all_workers'): self.app_instance.stop_all_workers() logger.info("[workers-modal] Stopped all workers") self.refresh_workers() elif button_id == "finished-refresh-btn": self.refresh_workers() elif button_id == "finished-clear-btn": # Clear selected finished worker if self.finished_table and self.finished_table.row_count > 0: try: selected_row = self.finished_table.cursor_row if 0 <= selected_row < len(self.finished_workers): worker = self.finished_workers[selected_row] worker_id = worker.get('id') if self.app_instance and hasattr(self.app_instance, 'clear_finished_worker'): self.app_instance.clear_finished_worker(worker_id) logger.info(f"[workers-modal] Cleared worker: {worker_id}") self.refresh_workers() except Exception as e: logger.error(f"[workers-modal] Error clearing worker: {e}") elif button_id == "finished-clear-all-btn": # Clear all finished workers if self.app_instance and hasattr(self.app_instance, 'clear_all_finished_workers'): self.app_instance.clear_all_finished_workers() logger.info("[workers-modal] Cleared all finished workers") self.refresh_workers() elif button_id == "close-btn": self.dismiss(None) except Exception as e: logger.error(f"[workers-modal] Error in on_button_pressed: {e}") def action_cancel(self) -> None: """Action for Escape key - close modal.""" self.dismiss(None)