"""Unified result table formatter for CLI display. Provides a structured way to convert search results, metadata, and pipeline objects into formatted tables suitable for display in the REPL and CLI output. Features: - Format results as aligned tables with row numbers - Support multiple selection formats (single, ranges, lists, combined) - Interactive selection with user input - Input options for cmdlet arguments (location, source selection, etc) """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Callable, Set from pathlib import Path import json import shutil from rich.box import SIMPLE from rich.console import Group from rich.panel import Panel from rich.prompt import Prompt from rich.table import Table as RichTable from rich.text import Text # Optional Textual imports - graceful fallback if not available try: from textual.widgets import Tree TEXTUAL_AVAILABLE = True except ImportError: TEXTUAL_AVAILABLE = False def _sanitize_cell_text(value: Any) -> str: """Coerce to a single-line, tab-free string suitable for terminal display.""" if value is None: return "" text = str(value) if not text: return "" return text.replace("\r\n", " ").replace("\n", " ").replace("\r", " ").replace("\t", " ") def _format_duration_hms(duration: Any) -> str: """Format a duration in seconds into a compact h/m/s string. Examples: 3150 -> "52m30s" 59 -> "59s" 3600 -> "1h0m0s" If the value is not numeric, returns an empty string. """ if duration is None: return "" try: if isinstance(duration, str): s = duration.strip() if not s: return "" # If it's already formatted (contains letters/colon), leave it to caller. if any(ch.isalpha() for ch in s) or ":" in s: return "" seconds = float(s) else: seconds = float(duration) except Exception: return "" if seconds < 0: return "" total_seconds = int(seconds) minutes, secs = divmod(total_seconds, 60) hours, minutes = divmod(minutes, 60) parts: List[str] = [] if hours > 0: parts.append(f"{hours}h") if minutes > 0 or hours > 0: parts.append(f"{minutes}m") parts.append(f"{secs}s") return "".join(parts) @dataclass(frozen=True) class TableColumn: """Reusable column specification. This is intentionally separate from `ResultColumn`: - `ResultColumn` is a rendered (name,value) pair attached to a single row. - `TableColumn` is a reusable extractor/formatter used to build rows consistently across cmdlets and stores. """ key: str header: str extractor: Callable[[Any], Any] def extract(self, item: Any) -> Any: try: return self.extractor(item) except Exception: return None def _get_first_dict_value(data: Dict[str, Any], keys: List[str]) -> Any: for k in keys: if k in data: v = data.get(k) if v is not None and str(v).strip() != "": return v return None def _as_dict(item: Any) -> Optional[Dict[str, Any]]: if isinstance(item, dict): return item try: if hasattr(item, "__dict__"): return dict(getattr(item, "__dict__")) except Exception: return None return None def extract_store_value(item: Any) -> str: data = _as_dict(item) or {} store = _get_first_dict_value( data, ["store", "table", "source", "storage"] ) # storage is legacy return str(store or "").strip() def extract_hash_value(item: Any) -> str: data = _as_dict(item) or {} hv = _get_first_dict_value(data, ["hash", "hash_hex", "file_hash", "sha256"]) return str(hv or "").strip() def extract_title_value(item: Any) -> str: data = _as_dict(item) or {} title = _get_first_dict_value(data, ["title", "name", "filename"]) if not title: title = _get_first_dict_value( data, ["target", "path", "url"] ) # last resort display return str(title or "").strip() def extract_ext_value(item: Any) -> str: data = _as_dict(item) or {} meta = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} raw_path = data.get("path") or data.get("target") or data.get( "filename" ) or data.get("title") ext = _get_first_dict_value(data, ["ext", "file_ext", "extension"]) or _get_first_dict_value( meta, ["ext", "file_ext", "extension"] ) if (not ext) and raw_path: try: suf = Path(str(raw_path)).suffix if suf: ext = suf.lstrip(".") except Exception: ext = "" ext_str = str(ext or "").strip().lstrip(".") for idx, ch in enumerate(ext_str): if not ch.isalnum(): ext_str = ext_str[:idx] break return ext_str[:5] def extract_size_bytes_value(item: Any) -> Optional[int]: data = _as_dict(item) or {} meta = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} size_val = _get_first_dict_value( data, ["size_bytes", "size", "file_size", "bytes", "filesize"] ) or _get_first_dict_value( meta, ["size_bytes", "size", "file_size", "bytes", "filesize"] ) if size_val is None: return None try: s = str(size_val).strip() if not s: return None # Some sources might provide floats or numeric strings return int(float(s)) except Exception: return None COMMON_COLUMNS: Dict[str, TableColumn] = { "title": TableColumn("title", "Title", extract_title_value), "store": TableColumn("store", "Store", extract_store_value), "hash": TableColumn("hash", "Hash", extract_hash_value), "ext": TableColumn("ext", "Ext", extract_ext_value), "size": TableColumn("size", "Size", extract_size_bytes_value), } def build_display_row(item: Any, *, keys: List[str]) -> Dict[str, Any]: """Build a dict suitable for `ResultTable.add_result()` using shared column specs.""" out: Dict[str, Any] = {} for k in keys: spec = COMMON_COLUMNS.get(k) if spec is None: continue val = spec.extract(item) out[spec.key] = val return out @dataclass class InputOption: """Represents an interactive input option (cmdlet argument) in a table. Allows users to select options that translate to cmdlet arguments, enabling interactive configuration right from the result table. Example: # Create an option for location selection location_opt = InputOption( "location", type="enum", choices=["local", "hydrus", "0x0"], description="Download destination" ) # Use in result table table.add_input_option(location_opt) selected = table.select_option("location") # Returns user choice """ name: str """Option name (maps to cmdlet argument)""" type: str = "string" """Option type: 'string', 'enum', 'flag', 'integer'""" choices: List[str] = field(default_factory=list) """Valid choices for enum type""" default: Optional[str] = None """Default value if not specified""" description: str = "" """Description of what this option does""" validator: Optional[Callable[[str], bool]] = None """Optional validator function: takes value, returns True if valid""" def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "name": self.name, "type": self.type, "choices": self.choices if self.choices else None, "default": self.default, "description": self.description, } @dataclass class TUIResultCard: """Represents a result as a UI card with title, metadata, and actions. Used in hub-ui and TUI contexts to render individual search results as grouped components with visual structure. """ title: str subtitle: Optional[str] = None metadata: Optional[Dict[str, str]] = None media_kind: Optional[str] = None tag: Optional[List[str]] = None file_hash: Optional[str] = None file_size: Optional[str] = None duration: Optional[str] = None def __post_init__(self): """Initialize default values.""" if self.metadata is None: self.metadata = {} if self.tag is None: self.tag = [] @dataclass class ResultColumn: """Represents a single column in a result table.""" name: str value: str width: Optional[int] = None def __str__(self) -> str: """String representation of the column.""" return f"{self.name}: {self.value}" def to_dict(self) -> Dict[str, str]: """Convert to dictionary.""" return { "name": self.name, "value": self.value } @dataclass class ResultRow: """Represents a single row in a result table.""" columns: List[ResultColumn] = field(default_factory=list) selection_args: Optional[List[str]] = None """Arguments to use for this row when selected via @N syntax (e.g., ['-item', '3'])""" selection_action: Optional[List[str]] = None """Full expanded stage tokens that should run when this row is selected.""" source_index: Optional[int] = None """Original insertion order index (used to map sorted views back to source items).""" payload: Optional[Any] = None """Original object that contributed to this row.""" def add_column(self, name: str, value: Any) -> None: """Add a column to this row.""" # Normalize column header names. normalized_name = str(name or "").strip() if normalized_name.lower() == "name": normalized_name = "Title" str_value = _sanitize_cell_text(value) # Normalize extension columns globally and cap to 5 characters if normalized_name.lower() == "ext": str_value = str_value.strip().lstrip(".") for idx, ch in enumerate(str_value): if not ch.isalnum(): str_value = str_value[:idx] break str_value = str_value[:5] # Normalize Duration columns: providers often pass raw seconds. if normalized_name.lower() == "duration": formatted = _format_duration_hms(value) if formatted: str_value = formatted self.columns.append(ResultColumn(normalized_name, str_value)) def get_column(self, name: str) -> Optional[str]: """Get column value by name.""" for col in self.columns: if col.name.lower() == name.lower(): return col.value return None def to_dict(self) -> List[Dict[str, str]]: """Convert to list of column dicts.""" return [col.to_dict() for col in self.columns] def to_list(self) -> List[tuple[str, str]]: """Convert to list of (name, value) tuples.""" return [(col.name, col.value) for col in self.columns] def __str__(self) -> str: """String representation of the row.""" return " | ".join(str(col) for col in self.columns) class ResultTable: """Unified table formatter for search results, metadata, and pipeline objects. Provides a structured way to display results in the CLI with consistent formatting. Handles conversion from various result types (SearchResult, PipeObject, dicts) into a formatted table with rows and columns. Example: >>> result_table = ResultTable("Search Results") >>> row = result_table.add_row() >>> row.add_column("File", "document.pdf") >>> row.add_column("Size", "2.5 MB") >>> row.add_column("Tag", "pdf, document") >>> print(result_table) """ def __init__( self, title: str = "", title_width: int = 80, max_columns: Optional[int] = None, preserve_order: bool = False, ): """Initialize a result table. Args: title: Optional title for the table title_width: Width for formatting the title line max_columns: Maximum number of columns to display (None for unlimited, default: 5 for search results) preserve_order: When True, skip automatic sorting so row order matches source """ self.title = title try: from SYS import pipeline as ctx cmdlet_name = "" try: cmdlet_name = ( ctx.get_current_cmdlet_name("") if hasattr(ctx, "get_current_cmdlet_name") else "" ) except Exception: cmdlet_name = "" stage_text = "" try: stage_text = ( ctx.get_current_stage_text("") if hasattr(ctx, "get_current_stage_text") else "" ) except Exception: stage_text = "" if cmdlet_name and stage_text: normalized_cmd = str(cmdlet_name).replace("_", "-").strip().lower() normalized_title = str(self.title or "").strip().lower() normalized_stage = str(stage_text).strip() if normalized_stage and normalized_stage.lower().startswith( normalized_cmd): if (not normalized_title) or normalized_title.replace( "_", "-").startswith(normalized_cmd): self.title = normalized_stage except Exception: pass self.title_width = title_width self.max_columns = ( max_columns if max_columns is not None else 5 ) # Default 5 for cleaner display self.rows: List[ResultRow] = [] self.column_widths: Dict[str, int] = {} self.input_options: Dict[str, InputOption] = {} """Options available for user input (cmdlet arguments)""" self.source_command: Optional[str] = None """Command that generated this table (e.g., 'download-file URL')""" self.source_args: List[str] = [] """Base arguments for the source command""" self.header_lines: List[str] = [] """Optional metadata lines rendered under the title""" self.preserve_order: bool = preserve_order """If True, skip automatic sorting so display order matches input order.""" self.no_choice: bool = False """When True, suppress row numbers/selection to make the table non-interactive.""" self.table: Optional[str] = None """Table type (e.g., 'youtube', 'soulseek') for context-aware selection logic.""" self.table_metadata: Dict[str, Any] = {} """Optional provider/table metadata (e.g., provider name, view).""" self.value_case: str = "lower" """Display-only value casing: 'lower' (default), 'upper', or 'preserve'.""" def set_value_case(self, value_case: str) -> "ResultTable": """Configure display-only casing for rendered cell values.""" case = str(value_case or "").strip().lower() if case not in {"lower", "upper", "preserve"}: case = "lower" self.value_case = case return self def _apply_value_case(self, text: str) -> str: if not text: return "" if self.value_case == "upper": return text.upper() if self.value_case == "preserve": return text return text.lower() def set_table(self, table: str) -> "ResultTable": """Set the table type for context-aware selection logic.""" self.table = table return self def set_table_metadata(self, metadata: Optional[Dict[str, Any]]) -> "ResultTable": """Attach provider/table metadata for downstream selection logic.""" self.table_metadata = dict(metadata or {}) return self def get_table_metadata(self) -> Dict[str, Any]: """Return attached provider/table metadata (copy to avoid mutation).""" try: return dict(self.table_metadata) except Exception: return {} def set_no_choice(self, no_choice: bool = True) -> "ResultTable": """Mark the table as non-interactive (no row numbers, no selection parsing).""" self.no_choice = bool(no_choice) return self def set_preserve_order(self, preserve: bool = True) -> "ResultTable": """Configure whether this table should skip automatic sorting.""" self.preserve_order = bool(preserve) return self def add_row(self) -> ResultRow: """Add a new row to the table and return it for configuration.""" row = ResultRow() row.source_index = len(self.rows) self.rows.append(row) return row def set_source_command( self, command: str, args: Optional[List[str]] = None ) -> "ResultTable": """Set the source command that generated this table. This is used for @N expansion: when user runs @2 | next-cmd, it will expand to: source_command + source_args + row_selection_args | next-cmd Args: command: Command name (e.g., 'download-file') args: Base arguments for the command (e.g., ['URL']) Returns: Self for chaining """ self.source_command = command self.source_args = args or [] return self def init_command( self, title: str, command: str, args: Optional[List[str]] = None, preserve_order: bool = False, ) -> "ResultTable": """Initialize table with title, command, args, and preserve_order in one call. Consolidates common initialization pattern: ResultTable(title) + set_source_command(cmd, args) + set_preserve_order(preserve_order) Args: title: Table title command: Source command name args: Command arguments preserve_order: Whether to preserve input row order Returns: self for method chaining """ self.title = title self.source_command = command self.source_args = args or [] self.preserve_order = preserve_order return self def copy_with_title(self, new_title: str) -> "ResultTable": """Create a new table copying settings from this one but with a new title. Consolidates pattern: new_table = ResultTable(title); new_table.set_source_command(...) Useful for intermediate processing that needs to preserve source command but update display title. Args: new_title: New title for the copied table Returns: New ResultTable with copied settings and new title """ new_table = ResultTable( title=new_title, title_width=self.title_width, max_columns=self.max_columns, preserve_order=self.preserve_order, ) new_table.source_command = self.source_command new_table.source_args = list(self.source_args) if self.source_args else [] new_table.input_options = dict(self.input_options) if self.input_options else {} new_table.no_choice = self.no_choice new_table.table = self.table new_table.table_metadata = ( dict(self.table_metadata) if getattr(self, "table_metadata", None) else {} ) new_table.header_lines = list(self.header_lines) if self.header_lines else [] return new_table def set_row_selection_args(self, row_index: int, selection_args: List[str]) -> None: """Set the selection arguments for a specific row. When user selects this row via @N, these arguments will be appended to the source command to re-execute with that item selected. Args: row_index: Index of the row (0-based) selection_args: Arguments to use (e.g., ['-item', '3']) """ if 0 <= row_index < len(self.rows): self.rows[row_index].selection_args = selection_args def set_row_selection_action(self, row_index: int, selection_action: List[str]) -> None: """Specify the entire stage tokens to run for this row on @N.""" if 0 <= row_index < len(self.rows): self.rows[row_index].selection_action = selection_action def set_header_lines(self, lines: List[str]) -> "ResultTable": """Attach metadata lines that render beneath the title.""" self.header_lines = [line for line in lines if line] return self def set_header_line(self, line: str) -> "ResultTable": """Attach a single metadata line beneath the title.""" return self.set_header_lines([line] if line else []) def set_storage_summary( self, storage_counts: Dict[str, int], filter_text: Optional[str] = None, inline: bool = False, ) -> str: """Render a storage count summary (e.g., "Hydrus:0 Local:1 | filter: \"q\""). Returns the summary string so callers can place it inline with the title if desired. """ summary_parts: List[str] = [] if storage_counts: summary_parts.append( " ".join(f"{name}:{count}" for name, count in storage_counts.items()) ) if filter_text: safe_filter = filter_text.replace('"', '\\"') summary_parts.append(f'filter: "{safe_filter}"') summary = " | ".join(summary_parts) if not inline: self.set_header_line(summary) return summary def sort_by_title(self) -> "ResultTable": """Sort rows alphabetically by Title or Name column. Looks for columns named 'Title', 'Name', or 'Tag' (in that order). Case-insensitive sort. Returns self for chaining. NOTE: This only affects display order. Each row keeps its original `source_index` (insertion order) for callers that need stable mapping. """ if getattr(self, "preserve_order", False): return self # Find the title column (try Title, Name, Tag in order) title_col_idx = None for row in self.rows: if not row.columns: continue for idx, col in enumerate(row.columns): col_lower = col.name.lower() if col_lower in ("title", "name", "tag"): title_col_idx = idx break if title_col_idx is not None: break if title_col_idx is None: # No title column found, return unchanged return self # Sort rows by the title column value (case-insensitive) self.rows.sort( key=lambda row: ( row.columns[title_col_idx].value.lower() if title_col_idx < len(row.columns) else "" ) ) return self def add_result(self, result: Any) -> "ResultTable": """Add a result object (SearchResult, PipeObject, ResultItem, TagItem, or dict) as a row. Args: result: Result object to add Returns: Self for chaining """ row = self.add_row() row.payload = result # Handle TagItem from get_tag.py (tag display with index) if hasattr(result, "__class__") and result.__class__.__name__ == "TagItem": self._add_tag_item(row, result) # Handle ResultItem from search_file.py (compact display) elif hasattr(result, "__class__") and result.__class__.__name__ == "ResultItem": self._add_result_item(row, result) # Handle SearchResult from search_file.py elif hasattr(result, "__class__") and result.__class__.__name__ == "SearchResult": self._add_search_result(row, result) # Handle PipeObject from models.py elif hasattr(result, "__class__") and result.__class__.__name__ == "PipeObject": self._add_pipe_object(row, result) # Handle dict elif isinstance(result, dict): self._add_dict(row, result) # Handle generic objects with __dict__ elif hasattr(result, "__dict__"): self._add_generic_object(row, result) # Handle strings (simple text result) elif isinstance(result, str): row.add_column("Result", result) return self def get_row_payload(self, row_index: int) -> Optional[Any]: """Return the original payload for the row at ``row_index`` if available.""" if 0 <= row_index < len(self.rows): return getattr(self.rows[row_index], "payload", None) return None def get_payloads(self) -> List[Any]: """Return the payloads for every row, preserving table order.""" payloads: List[Any] = [] for row in self.rows: payload = getattr(row, "payload", None) if payload is not None: payloads.append(payload) return payloads def _add_search_result(self, row: ResultRow, result: Any) -> None: """Extract and add SearchResult fields to row.""" cols = getattr(result, "columns", None) used_explicit_columns = False if cols: used_explicit_columns = True for name, value in cols: row.add_column(name, value) else: # Core fields (legacy fallback) title = getattr(result, "title", "") table = str(getattr(result, "table", "") or "").lower() # Handle extension separation for local files extension = "" if title and table == "local": path_obj = Path(title) if path_obj.suffix: extension = path_obj.suffix.lstrip(".") title = path_obj.stem if title: row.add_column("Title", title) # Extension column row.add_column("Ext", extension) if hasattr(result, "table") and getattr(result, "table", None): row.add_column("Source", str(getattr(result, "table"))) if hasattr(result, "detail") and result.detail: row.add_column("Detail", result.detail) if hasattr(result, "media_kind") and result.media_kind: row.add_column("Type", result.media_kind) # Tag summary if hasattr(result, "tag_summary") and result.tag_summary: row.add_column("Tag", str(result.tag_summary)) # Duration (for media) if hasattr(result, "duration_seconds") and result.duration_seconds: dur = _format_duration_hms(result.duration_seconds) row.add_column("Duration", dur or str(result.duration_seconds)) # Size (for files) if hasattr(result, "size_bytes") and result.size_bytes: row.add_column("Size", _format_size(result.size_bytes, integer_only=False)) # Annotations if hasattr(result, "annotations") and result.annotations: row.add_column("Annotations", ", ".join(str(a) for a in result.annotations)) try: md = getattr(result, "full_metadata", None) md_dict = dict(md) if isinstance(md, dict) else {} except Exception: md_dict = {} try: selection_args = getattr(result, "selection_args", None) except Exception: selection_args = None if selection_args is None: selection_args = md_dict.get("_selection_args") or md_dict.get("selection_args") if selection_args: row.selection_args = [str(a) for a in selection_args if a is not None] try: selection_action = getattr(result, "selection_action", None) except Exception: selection_action = None if selection_action is None: selection_action = md_dict.get("_selection_action") or md_dict.get("selection_action") if selection_action: row.selection_action = [str(a) for a in selection_action if a is not None] def _add_result_item(self, row: ResultRow, item: Any) -> None: """Extract and add ResultItem fields to row (compact display for search results). Shows only essential columns: - Title (required) - Ext (extension) - Storage (source backend) - Size (formatted MB, integer only) All other fields are stored in item but not displayed to keep table compact. Use @row# syntax to pipe full item data to next command. """ # Title (required) title = getattr(item, "title", None) or "Unknown" table = str(getattr(item, "table", "") or getattr(item, "store", "") or "").lower() # Handle extension separation for local files extension = "" if title and table == "local": # Try to split extension path_obj = Path(title) if path_obj.suffix: extension = path_obj.suffix.lstrip(".") title = path_obj.stem if title: row.add_column("Title", title) # Extension column - always add to maintain column order row.add_column("Ext", extension) # Storage (source backend - hydrus, local, debrid, etc) if getattr(item, "table", None): row.add_column("Storage", str(getattr(item, "table"))) elif getattr(item, "store", None): row.add_column("Storage", str(getattr(item, "store"))) # Size (for files) if hasattr(item, "size_bytes") and item.size_bytes: row.add_column("Size", _format_size(item.size_bytes, integer_only=False)) def _add_tag_item(self, row: ResultRow, item: Any) -> None: """Extract and add TagItem fields to row (compact tag display). Shows the Tag column with the tag name and Source column to identify which storage backend the tag values come from (Hydrus, local, etc.). All data preserved in TagItem for piping and operations. Tag row selection is handled by the CLI pipeline (e.g. `@N | ...`). """ # Tag name if hasattr(item, "tag_name") and item.tag_name: row.add_column("Tag", item.tag_name) # Source/Store (where the tag values come from) if hasattr(item, "source") and item.source: row.add_column("Store", item.source) def _add_pipe_object(self, row: ResultRow, obj: Any) -> None: """Extract and add PipeObject fields to row.""" # Source and identifier if hasattr(obj, "source") and obj.source: row.add_column("Source", obj.source) # Title if hasattr(obj, "title") and obj.title: row.add_column("Title", obj.title) # File info if hasattr(obj, "path") and obj.path: row.add_column("Path", str(obj.path)) # Tag if hasattr(obj, "tag") and obj.tag: tag_str = ", ".join(obj.tag[:3]) # First 3 tag values if len(obj.tag) > 3: tag_str += f", +{len(obj.tag) - 3} more" row.add_column("Tag", tag_str) # Duration if hasattr(obj, "duration") and obj.duration: dur = _format_duration_hms(obj.duration) row.add_column("Duration", dur or str(obj.duration)) # Warnings if hasattr(obj, "warnings") and obj.warnings: warnings_str = "; ".join(obj.warnings[:2]) if len(obj.warnings) > 2: warnings_str += f" (+{len(obj.warnings) - 2} more)" row.add_column("Warnings", warnings_str) def _add_dict(self, row: ResultRow, data: Dict[str, Any]) -> None: """Extract and add dict fields to row using first-match priority groups. Respects max_columns limit to keep table compact and readable. Special handling for 'columns' field: if present, uses it to populate row columns instead of treating it as a regular field. This allows dynamic column definitions from search providers. Priority field groups (first match per group): - title | name | filename - store | table | source - size | size_bytes - ext """ # Helper to determine if a field should be hidden from display def is_hidden_field(field_name: Any) -> bool: # Hide internal/metadata fields hidden_fields = { "__", "id", "action", "parent_id", "is_temp", "path", "extra", "target", "hash", "hash_hex", "file_hash", "tag", "tag_summary", } if isinstance(field_name, str): if field_name.startswith("__"): return True if field_name in hidden_fields: return True return False # Strip out hidden metadata fields (prefixed with __) visible_data = { k: v for k, v in data.items() if not is_hidden_field(k) } # Normalize common fields using shared extractors so nested metadata/path values work. # This keeps Ext/Size/Store consistent across all dict-based result sources. try: store_extracted = extract_store_value(data) if (store_extracted and "store" not in visible_data and "table" not in visible_data and "source" not in visible_data): visible_data["store"] = store_extracted except Exception: pass try: ext_extracted = extract_ext_value(data) # Always ensure `ext` exists so priority_groups keeps a stable column. visible_data["ext"] = str(ext_extracted or "") except Exception: visible_data.setdefault("ext", "") try: size_extracted = extract_size_bytes_value(data) if (size_extracted is not None and "size_bytes" not in visible_data and "size" not in visible_data): visible_data["size_bytes"] = size_extracted except Exception: pass # Handle extension separation for local files store_val = str( visible_data.get("store", "") or visible_data.get("table", "") or visible_data.get("source", "") ).lower() # Debug logging # print(f"DEBUG: Processing dict result. Store: {store_val}, Keys: {list(visible_data.keys())}") if store_val == "local": # Find title field title_field = next( (f for f in ["title", "name", "filename"] if f in visible_data), None ) if title_field: title_val = str(visible_data[title_field]) path_obj = Path(title_val) if path_obj.suffix: extension = path_obj.suffix.lstrip(".") visible_data[title_field] = path_obj.stem visible_data["ext"] = extension # print(f"DEBUG: Split extension. Title: {visible_data[title_field]}, Ext: {extension}") else: visible_data["ext"] = "" # Ensure 'ext' is present so it gets picked up by priority_groups in correct order if "ext" not in visible_data: visible_data["ext"] = "" # Track which fields we've already added to avoid duplicates added_fields = set() column_count = 0 # Track total columns added # Helper function to format values def format_value(value: Any) -> str: if isinstance(value, list): formatted = ", ".join(str(v) for v in value[:3]) if len(value) > 3: formatted += f", +{len(value) - 3} more" return formatted return str(value) # Special handling for 'columns' field from search providers # If present, use it to populate row columns dynamically if ("columns" in visible_data and isinstance(visible_data["columns"], list) and visible_data["columns"]): try: for col_name, col_value in visible_data["columns"]: # Skip the "#" column as ResultTable already adds row numbers if col_name == "#": continue if column_count >= self.max_columns: break # When providers supply raw numeric fields, keep formatting consistent. if isinstance(col_name, str) and col_name.strip().lower() == "size": try: if col_value is None or str(col_value).strip() == "": col_value_str = "" else: col_value_str = _format_size( col_value, integer_only=False ) except Exception: col_value_str = format_value(col_value) elif isinstance(col_name, str) and col_name.strip().lower() == "duration": try: if col_value is None or str(col_value).strip() == "": col_value_str = "" else: dur = _format_duration_hms(col_value) col_value_str = dur or format_value(col_value) except Exception: col_value_str = format_value(col_value) else: col_value_str = format_value(col_value) row.add_column(col_name, col_value_str) added_fields.add(col_name.lower()) column_count += 1 # Mark 'columns' as handled so we don't add it as a field added_fields.add("columns") # Also mark common fields that shouldn't be re-displayed if they're in columns # This prevents showing both "Store" (from columns) and "Store" (from data fields) added_fields.add("table") added_fields.add("source") added_fields.add("target") added_fields.add("path") added_fields.add("media_kind") added_fields.add("detail") added_fields.add("annotations") added_fields.add( "full_metadata" ) # Don't display full metadata as column except Exception: # Fall back to regular field handling if columns format is unexpected pass # Only add priority groups if we haven't already filled columns from 'columns' field if column_count == 0: # Explicitly set which columns to display in order priority_groups = [ ("title", ["title", "name", "filename"]), ("store", ["store", "table", "source"]), ("size", ["size", "size_bytes"]), ("ext", ["ext"]), ] # Add priority field groups first - use first match in each group for _group_label, field_options in priority_groups: if column_count >= self.max_columns: break for field in field_options: if field in visible_data and field not in added_fields: # Special handling for size fields - format with unit and decimals if field in ["size", "size_bytes"]: value_str = _format_size( visible_data[field], integer_only=False ) else: value_str = format_value(visible_data[field]) # Map field names to display column names if field in ["store", "table", "source"]: col_name = "Store" elif field in ["size", "size_bytes"]: col_name = "Size" elif field in ["title", "name", "filename"]: col_name = "Title" else: col_name = field.replace("_", " ").title() row.add_column(col_name, value_str) added_fields.add(field) column_count += 1 break # Use first match in this group, skip rest # Add remaining fields only if we haven't hit max_columns (and no explicit columns were set) # Don't add any remaining fields - only use priority_groups for dict results # Check for selection args if "_selection_args" in data: row.selection_args = data["_selection_args"] # Don't display it added_fields.add("_selection_args") def _add_generic_object(self, row: ResultRow, obj: Any) -> None: """Extract and add fields from generic objects.""" if hasattr(obj, "__dict__"): for key, value in obj.__dict__.items(): if key.startswith("_"): # Skip private attributes continue row.add_column(key.replace("_", " ").title(), str(value)) def to_rich(self): """Return a Rich renderable representing this table.""" if not self.rows: empty = Text("No results") return Panel(empty, title=self.title) if self.title else empty col_names: List[str] = [] seen: Set[str] = set() for row in self.rows: for col in row.columns: if col.name not in seen: seen.add(col.name) col_names.append(col.name) table = RichTable( show_header=True, header_style="bold", box=SIMPLE, expand=True, show_lines=False, ) if not self.no_choice: table.add_column("#", justify="right", no_wrap=True) # Render headers in uppercase, but keep original column keys for lookup. header_by_key: Dict[str, str] = { name: str(name).upper() for name in col_names } for name in col_names: header = header_by_key.get(name, str(name).upper()) if name.lower() == "ext": table.add_column(header, no_wrap=True) else: table.add_column(header) for row_idx, row in enumerate(self.rows, 1): cells: List[str] = [] if not self.no_choice: cells.append(str(row_idx)) for name in col_names: val = row.get_column(name) or "" cells.append(self._apply_value_case(_sanitize_cell_text(val))) table.add_row(*cells) if self.title or self.header_lines: header_bits = [Text(line) for line in (self.header_lines or [])] renderable = Group(*header_bits, table) if header_bits else table return Panel(renderable, title=self.title) if self.title else renderable return table def format_compact(self) -> str: """Format table in compact form (one line per row). Returns: Formatted table string """ lines = [] if self.title: lines.append(f"\n{self.title}") lines.append("-" * len(self.title)) for i, row in enumerate(self.rows, 1): row_str = " | ".join(str(col) for col in row.columns) lines.append(f"{i}. {row_str}") return "\n".join(lines) def format_json(self) -> str: """Format table as JSON. Returns: JSON string """ data = { "title": self.title, "row_count": len(self.rows), "rows": [row.to_list() for row in self.rows], } return json.dumps(data, indent=2) def to_dict(self) -> Dict[str, Any]: """Convert table to dictionary. Returns: Dictionary representation """ return { "title": self.title, "rows": [row.to_list() for row in self.rows] } def __str__(self) -> str: """String representation. Rich is the primary rendering path. This keeps accidental `print(table)` usage from emitting ASCII box-drawn tables. """ label = self.title or "ResultTable" return f"{label} ({len(self.rows)} rows)" def __rich__(self): return self.to_rich() def __repr__(self) -> str: """Developer representation.""" return f"ResultTable(title={self.title!r}, rows={len(self.rows)})" def __len__(self) -> int: """Number of rows in the table.""" return len(self.rows) def __iter__(self): """Iterate over rows.""" return iter(self.rows) def __getitem__(self, index: int) -> ResultRow: """Get row by index.""" return self.rows[index] def select_interactive( self, prompt: str = "Select an item", accept_args: bool = False ) -> Optional[List[int]] | dict: """Display table and get interactive user selection (single or multiple). Supports multiple input formats: - Single: "5" or "q" to quit - Range: "3-5" (selects items 3, 4, 5) - Multiple: "3,5,13" (selects items 3, 5, and 13) - Combined: "1-3,7,9-11" (selects 1,2,3,7,9,10,11) If accept_args=True, also supports cmdlet arguments: - "5 -storage hydrus" → returns indices [4] + args {"-storage": "hydrus"} - "2-4 -storage hydrus -tag important" → returns indices [1,2,3] + multiple args Args: prompt: Custom prompt text accept_args: If True, parse and return cmdlet arguments from input Returns: If accept_args=False: List of 0-based indices, or None if cancelled If accept_args=True: Dict with "indices" and "args" keys, or None if cancelled """ if self.no_choice: from SYS.rich_display import stdout_console stdout_console().print(self) stdout_console().print(Panel(Text("Selection is disabled for this table."))) return None # Display the table from SYS.rich_display import stdout_console stdout_console().print(self) # Get user input while True: try: if accept_args: choice = Prompt.ask( f"{prompt} (e.g., '5' or '2 -storage hydrus' or 'q' to quit)" ).strip() else: choice = Prompt.ask( f"{prompt} (e.g., '5' or '3-5' or '1,3,5' or 'q' to quit)" ).strip() if choice.lower() == "q": return None if accept_args: # Parse selection and arguments result = self._parse_selection_with_args(choice) if result is not None: return result stdout_console().print( Panel( Text( "Invalid format. Use: selection (5 or 3-5 or 1,3,5) optionally followed by flags (e.g., '5 -storage hydrus')." ) ) ) else: # Parse just the selection selected_indices = self._parse_selection(choice) if selected_indices is not None: return selected_indices stdout_console().print( Panel( Text( "Invalid format. Use: single (5), range (3-5), list (1,3,5), combined (1-3,7,9-11), or 'q' to quit." ) ) ) except (ValueError, EOFError): if accept_args: stdout_console().print( Panel( Text( "Invalid format. Use: selection (5 or 3-5 or 1,3,5) optionally followed by flags (e.g., '5 -storage hydrus')." ) ) ) else: stdout_console().print( Panel( Text( "Invalid format. Use: single (5), range (3-5), list (1,3,5), combined (1-3,7,9-11), or 'q' to quit." ) ) ) def _parse_selection(self, selection_str: str) -> Optional[List[int]]: """Parse user selection string into list of 0-based indices. Supports: - Single: "5" → [4] - Range: "3-5" → [2, 3, 4] - Multiple: "3,5,13" → [2, 4, 12] - Combined: "1-3,7,9-11" → [0, 1, 2, 6, 8, 9, 10] Args: selection_str: User input string Returns: List of 0-based indices, or None if invalid """ if self.no_choice: return None indices = set() # Split by comma for multiple selections parts = selection_str.split(",") for part in parts: part = part.strip() if not part: continue # Check if it's a range (contains dash) if "-" in part: # Handle ranges like "3-5" try: range_parts = part.split("-") if len(range_parts) != 2: return None start = int(range_parts[0].strip()) end = int(range_parts[1].strip()) # Validate range if start < 1 or end < 1 or start > len(self.rows) or end > len( self.rows): return None if start > end: start, end = end, start # Add all indices in range (convert to 0-based) for i in range(start, end + 1): indices.add(i - 1) except (ValueError, IndexError): return None else: # Single number try: num = int(part) if num < 1 or num > len(self.rows): return None indices.add(num - 1) # Convert to 0-based except ValueError: return None if not indices: return None # Return sorted list return sorted(list(indices)) def _parse_selection_with_args(self, input_str: str) -> Optional[dict]: """Parse user input into selection indices and cmdlet arguments. Supports formats like: - "5" → {"indices": [4], "args": {}} - "2 -storage hydrus" → {"indices": [1], "args": {"-storage": "hydrus"}} - "3-5 -storage hydrus -tag important" → {"indices": [2,3,4], "args": {"-storage": "hydrus", "-tag": "important"}} Args: input_str: User input string with selection and optional flags Returns: Dict with "indices" and "args" keys, or None if invalid """ parts = input_str.split() if not parts: return None # First part should be the selection selection_str = parts[0] selected_indices = self._parse_selection(selection_str) if selected_indices is None: return None # Remaining parts are cmdlet arguments cmdlet_args = {} i = 1 while i < len(parts): part = parts[i] # Check if it's a flag (starts with -) if part.startswith("-"): flag = part value = None # Get the value if it exists and doesn't start with - if i + 1 < len(parts) and not parts[i + 1].startswith("-"): value = parts[i + 1] i += 2 else: i += 1 # Store the flag if value is not None: cmdlet_args[flag] = value else: cmdlet_args[flag] = True # Flag without value else: i += 1 return { "indices": selected_indices, "args": cmdlet_args } def add_input_option(self, option: InputOption) -> "ResultTable": """Add an interactive input option to the table. Input options allow users to specify cmdlet arguments interactively, like choosing a download location or source. Args: option: InputOption definition Returns: Self for chaining """ self.input_options[option.name] = option return self def select_option(self, option_name: str, prompt: str = "") -> Optional[str]: """Interactively get user input for a specific option. Displays the option choices (if enum) and prompts user for input. Args: option_name: Name of the option to get input for prompt: Custom prompt text (uses option description if not provided) Returns: User's selected/entered value, or None if cancelled """ if option_name not in self.input_options: print(f"Unknown option: {option_name}") return None option = self.input_options[option_name] prompt_text = prompt or option.description or option_name while True: try: # For enum options, show choices if option.type == "enum" and option.choices: print(f"\n{prompt_text}") for i, choice in enumerate(option.choices, 1): print(f" {i}. {choice}") choice_input = input( f"Select {option_name} (1-{len(option.choices)}, or 'q' to cancel): " ).strip() if choice_input.lower() == "q": return None try: idx = int(choice_input) - 1 if 0 <= idx < len(option.choices): return option.choices[idx] print(f"Invalid choice. Enter 1-{len(option.choices)}") except ValueError: print(f"Invalid choice. Enter 1-{len(option.choices)}") # For string/integer options, get direct input elif option.type in ("string", "integer"): value = input(f"{prompt_text} (or 'q' to cancel): ").strip() if value.lower() == "q": return None # Validate if validator provided if option.validator and not option.validator(value): print(f"Invalid value for {option_name}") continue # Type conversion if option.type == "integer": try: int(value) except ValueError: print(f"Must be an integer") continue return value # For flag options elif option.type == "flag": response = input(f"{prompt_text} (y/n): ").strip().lower() if response == "q": return None return "true" if response in ("y", "yes", "true") else "false" except (ValueError, EOFError): return None def get_all_options(self) -> Dict[str, str]: """Get all input options at once with user prompts. Interactively prompts user for all registered options. Returns: Dictionary mapping option names to selected values """ result = {} for name, _option in self.input_options.items(): value = self.select_option(name) if value is not None: result[name] = value return result def select_by_index(self, index: int) -> Optional[ResultRow]: """Get a row by 1-based index (user-friendly). Args: index: 1-based index Returns: ResultRow if valid, None otherwise """ idx = index - 1 if 0 <= idx < len(self.rows): return self.rows[idx] return None # TUI-specific formatting methods def to_datatable_rows(self, source: str = "unknown") -> List[List[str]]: """Convert results to rows suitable for Textual DataTable widget. Args: source: Source type for formatting context (openlibrary, soulseek, etc.) Returns: List of row value lists """ rows = [] for result in self.rows: row_values = self._format_datatable_row(result, source) rows.append(row_values) return rows def _format_datatable_row(self, row: ResultRow, source: str = "unknown") -> List[str]: """Format a ResultRow for DataTable display. Args: row: ResultRow to format source: Source type Returns: List of column values as strings """ # Extract values from row columns values = [col.value for col in row.columns] # Truncate to reasonable lengths for table display return [v[:60] if len(v) > 60 else v for v in values] def to_result_cards(self) -> List[TUIResultCard]: """Convert all rows to TUIResultCard objects for card-based UI display. Returns: List of TUIResultCard objects """ cards = [] for row in self.rows: card = self._row_to_card(row) cards.append(card) return cards def _row_to_card(self, row: ResultRow) -> TUIResultCard: """Convert a ResultRow to a TUIResultCard. Args: row: ResultRow to convert Returns: TUIResultCard with extracted metadata """ # Build metadata dict from row columns metadata = {} title = "" for col in row.columns: if col.name.lower() == "title": title = col.value metadata[col.name] = col.value # Extract tag values if present tag = [] if "Tag" in metadata: tag_val = metadata["Tag"] if tag_val: tag = [t.strip() for t in tag_val.split(",")][:5] # Try to find useful metadata fields subtitle = metadata.get("Artist", metadata.get("Author", "")) media_kind = metadata.get("Type", metadata.get("Media Kind", "")) file_size = metadata.get("Size", "") duration = metadata.get("Duration", "") file_hash = metadata.get("Hash", "") return TUIResultCard( title=title or "Unknown", subtitle=subtitle, metadata=metadata, media_kind=media_kind, tag=tag, file_hash=file_hash or None, file_size=file_size or None, duration=duration or None, ) def build_metadata_tree(self, tree_widget: "Tree") -> None: """Populate a Textual Tree widget with result metadata hierarchy. Args: tree_widget: Textual Tree widget to populate Raises: ImportError: If Textual not available """ if not TEXTUAL_AVAILABLE: raise ImportError("Textual not available for tree building") tree_widget.reset(self.title or "Results") root = tree_widget.root # Add each row as a top-level node for i, row in enumerate(self.rows, 1): row_node = root.add(f"[bold]Result {i}[/bold]") # Add columns as children for col in row.columns: value_str = col.value if len(value_str) > 100: value_str = value_str[:97] + "..." row_node.add_leaf(f"[cyan]{col.name}[/cyan]: {value_str}") def _format_size(size: Any, integer_only: bool = False) -> str: """Format file size as human-readable string. Args: size: Size in bytes or already formatted string integer_only: If True, show MB as an integer (e.g., "250 MB") Returns: Formatted size string with units (e.g., "3.53 MB", "0.57 MB", "1.2 GB") """ if isinstance(size, str): return size if size else "" try: bytes_val = int(size) if bytes_val < 0: return "" # Keep display consistent with the CLI expectation: show MB with unit # (including values under 1 MB as fractional MB), and show GB for very # large sizes. if bytes_val >= 1024**3: value = bytes_val / (1024**3) unit = "GB" else: value = bytes_val / (1024**2) unit = "MB" if integer_only: return f"{int(round(value))} {unit}" num = f"{value:.2f}".rstrip("0").rstrip(".") return f"{num} {unit}" except (ValueError, TypeError): return "" def format_result(result: Any, title: str = "") -> str: """Quick function to format a single result or list of results. Args: result: Result object, list of results, or dict title: Optional title for the table Returns: Formatted string """ table = ResultTable(title) if isinstance(result, list): for item in result: table.add_result(item) else: table.add_result(result) return str(table)