From 3bd36baf5a1f2df9e16f0d9b5d68e84f0b9513b8 Mon Sep 17 00:00:00 2001 From: Nose Date: Mon, 12 Jan 2026 20:50:39 -0800 Subject: [PATCH] f --- SYS/result_table.py | 104 ++- SYS/result_table_new.py | 1831 --------------------------------------- 2 files changed, 102 insertions(+), 1833 deletions(-) delete mode 100644 SYS/result_table_new.py diff --git a/SYS/result_table.py b/SYS/result_table.py index 7ab94bc..7bd316b 100644 --- a/SYS/result_table.py +++ b/SYS/result_table.py @@ -34,6 +34,14 @@ except ImportError: TEXTUAL_AVAILABLE = False +# Import ResultModel from the API for unification +try: + from SYS.result_table_api import ResultModel +except ImportError: + # Fallback if not available yet in directory structure (unlikely) + ResultModel = None + + def _sanitize_cell_text(value: Any) -> str: """Coerce to a single-line, tab-free string suitable for terminal display.""" if value is None: @@ -741,8 +749,11 @@ class ResultTable: row = self.add_row() row.payload = result + # Handle ResultModel from the new strict API (SYS/result_table_api.py) + if ResultModel and isinstance(result, ResultModel): + self._add_result_model(row, result) # Handle TagItem from get_tag.py (tag display with index) - if hasattr(result, "__class__") and result.__class__.__name__ == "TagItem": + elif hasattr(result, "__class__") and result.__class__.__name__ == "TagItem": self._add_tag_item(row, result) # Handle ResultItem from search_file.py (compact display) elif hasattr(result, "__class__") and result.__class__.__name__ == "ResultItem": @@ -781,6 +792,62 @@ class ResultTable: payloads.append(payload) return payloads + @classmethod + def from_api_table(cls, api_table: Any) -> "ResultTable": + """Convert a strict SYS.result_table_api.ResultTable into an interactive monolith ResultTable. + + This allows providers using the new strict API to benefit from the monolith's + interactive selection (@N) and rich layout features. + """ + # Duck typing check instead of strict isinstance to keep dependencies light + if not hasattr(api_table, "rows") or not hasattr(api_table, "columns"): + return cls(str(api_table)) + + title = getattr(api_table, "provider", "") + # Try to get provider metadata title if available + meta = getattr(api_table, "meta", {}) + if meta and isinstance(meta, dict): + title = meta.get("title") or title + + instance = cls(title) + + # Import adapters if we want to extract selection args automatically + # but let's keep it simple: we rely on add_result logic for most things. + + # Iterate rows and build interactive ones + for r in api_table.rows: + row = instance.add_row() + row.payload = r + + # Use columns defined in the API table + for col in api_table.columns: + try: + val = col.extractor(r) + if col.format_fn: + val = col.format_fn(val) + row.add_column(col.header, val) + except Exception: + pass + + return instance + + def _add_result_model(self, row: ResultRow, result: ResultModel) -> None: + """Extract and add ResultModel fields from the new API to row.""" + row.add_column("Title", result.title) + + if result.ext: + row.add_column("Ext", result.ext) + + if result.size_bytes is not None: + # Use the existing format_mb helper in this file + row.add_column("Size", format_mb(result.size_bytes)) + + if result.source: + row.add_column("Source", result.source) + + # Add a placeholder for metadata-like display if needed in the main table + # but usually metadata is handled by the detail panel now + def _add_search_result(self, row: ResultRow, result: Any) -> None: """Extract and add SearchResult fields to row.""" cols = getattr(result, "columns", None) @@ -1830,12 +1897,45 @@ def format_result(result: Any, title: str = "") -> str: return str(table) def extract_item_metadata(item: Any) -> Dict[str, Any]: - """Extract a comprehensive set of metadata from an item for the ItemDetailView.""" + """Extract a comprehensive set of metadata from an item for the ItemDetailView. + + Now supports SYS.result_table_api.ResultModel as a first-class input. + """ if item is None: return {} out = {} + # Handle ResultModel specifically for better detail display + if ResultModel and isinstance(item, ResultModel): + if item.title: out["Title"] = item.title + if item.path: out["Path"] = item.path + if item.ext: out["Ext"] = item.ext + if item.size_bytes: out["Size"] = format_mb(item.size_bytes) + if item.source: out["Store"] = item.source + + # Merge internal metadata dict + if item.metadata: + for k, v in item.metadata.items(): + # Convert keys to readable labels (snake_case -> Title Case) + label = str(k).replace("_", " ").title() + if label not in out and v is not None: + out[label] = v + + # URLs/Tags/Relations from metadata if present + data = item.metadata or {} + url = _get_first_dict_value(data, ["url", "URL"]) + if url: out["Url"] = url + + rels = _get_first_dict_value(data, ["relationships", "rel"]) + if rels: out["Relations"] = rels + + tags = _get_first_dict_value(data, ["tags", "tag"]) + if tags: out["Tags"] = tags + + return out + + # Fallback to existing extraction logic for legacy objects/dicts # Use existing extractors from match-standard result table columns title = extract_title_value(item) if title: diff --git a/SYS/result_table_new.py b/SYS/result_table_new.py deleted file mode 100644 index cee5a05..0000000 --- a/SYS/result_table_new.py +++ /dev/null @@ -1,1831 +0,0 @@ -"""Unified result table formatter for CLI display. - -Provides a structured way to convert search results, metadata, and pipeline objects -into formatted tables suitable for display in the REPL and CLI output. - -Features: -- Format results as aligned tables with row numbers -- Support multiple selection formats (single, ranges, lists, combined) -- Interactive selection with user input -- Input options for cmdlet arguments (location, source selection, etc) -""" - -from __future__ import annotations - -from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional, Callable, Set -from pathlib import Path -import json -import shutil - -from rich.box import SIMPLE -from rich.console import Group -from rich.panel import Panel -from rich.prompt import Prompt -from rich.table import Table as RichTable -from rich.text import Text - -# Optional Textual imports - graceful fallback if not available -try: - from textual.widgets import Tree - - TEXTUAL_AVAILABLE = True -except ImportError: - TEXTUAL_AVAILABLE = False - - -def _sanitize_cell_text(value: Any) -> str: - """Coerce to a single-line, tab-free string suitable for terminal display.""" - if value is None: - return "" - text = str(value) - if not text: - return "" - return text.replace("\r\n", - " ").replace("\n", - " ").replace("\r", - " ").replace("\t", - " ") - - -def _format_duration_hms(duration: Any) -> str: - """Format a duration in seconds into a compact h/m/s string. - - Examples: - 3150 -> "52m30s" - 59 -> "59s" - 3600 -> "1h0m0s" - - If the value is not numeric, returns an empty string. - """ - if duration is None: - return "" - try: - if isinstance(duration, str): - s = duration.strip() - if not s: - return "" - # If it's already formatted (contains letters/colon), leave it to caller. - if any(ch.isalpha() for ch in s) or ":" in s: - return "" - seconds = float(s) - else: - seconds = float(duration) - except Exception: - return "" - - if seconds < 0: - return "" - - total_seconds = int(seconds) - minutes, secs = divmod(total_seconds, 60) - hours, minutes = divmod(minutes, 60) - - parts: List[str] = [] - if hours > 0: - parts.append(f"{hours}h") - if minutes > 0 or hours > 0: - parts.append(f"{minutes}m") - parts.append(f"{secs}s") - return "".join(parts) - - -@dataclass(frozen=True) -class TableColumn: - """Reusable column specification. - - This is intentionally separate from `ResultColumn`: - - `ResultColumn` is a rendered (name,value) pair attached to a single row. - - `TableColumn` is a reusable extractor/formatter used to build rows consistently - across cmdlets and stores. - """ - - key: str - header: str - extractor: Callable[[Any], Any] - - def extract(self, item: Any) -> Any: - try: - return self.extractor(item) - except Exception: - return None - - -def _get_first_dict_value(data: Dict[str, Any], keys: List[str]) -> Any: - for k in keys: - if k in data: - v = data.get(k) - if v is not None and str(v).strip() != "": - return v - return None - - -def _as_dict(item: Any) -> Optional[Dict[str, Any]]: - if isinstance(item, dict): - return item - try: - if hasattr(item, "__dict__"): - return dict(getattr(item, "__dict__")) - except Exception: - return None - return None - - -def extract_store_value(item: Any) -> str: - data = _as_dict(item) or {} - store = _get_first_dict_value( - data, - ["store", - "table", - "source", - "storage"] - ) # storage is legacy - return str(store or "").strip() - - -def extract_hash_value(item: Any) -> str: - data = _as_dict(item) or {} - hv = _get_first_dict_value(data, ["hash", "hash_hex", "file_hash", "sha256"]) - return str(hv or "").strip() - - -def extract_title_value(item: Any) -> str: - data = _as_dict(item) or {} - title = _get_first_dict_value(data, ["title", "name", "filename"]) - if not title: - title = _get_first_dict_value( - data, - ["target", - "path", - "url"] - ) # last resort display - return str(title or "").strip() - - -def extract_ext_value(item: Any) -> str: - data = _as_dict(item) or {} - - meta = data.get("metadata") if isinstance(data.get("metadata"), - dict) else {} - raw_path = data.get("path") or data.get("target") or data.get( - "filename" - ) or data.get("title") - - ext = _get_first_dict_value(data, - ["ext", - "file_ext", - "extension"]) or _get_first_dict_value( - meta, - ["ext", - "file_ext", - "extension"] - ) - - if (not ext) and raw_path: - try: - suf = Path(str(raw_path)).suffix - if suf: - ext = suf.lstrip(".") - except Exception: - ext = "" - - ext_str = str(ext or "").strip().lstrip(".") - for idx, ch in enumerate(ext_str): - if not ch.isalnum(): - ext_str = ext_str[:idx] - break - return ext_str[:5] - - -def extract_size_bytes_value(item: Any) -> Optional[int]: - data = _as_dict(item) or {} - meta = data.get("metadata") if isinstance(data.get("metadata"), - dict) else {} - - size_val = _get_first_dict_value( - data, - ["size_bytes", - "size", - "file_size", - "bytes", - "filesize"] - ) or _get_first_dict_value( - meta, - ["size_bytes", - "size", - "file_size", - "bytes", - "filesize"] - ) - if size_val is None: - return None - try: - s = str(size_val).strip() - if not s: - return None - # Some sources might provide floats or numeric strings - return int(float(s)) - except Exception: - return None - - -COMMON_COLUMNS: Dict[str, - TableColumn] = { - "title": TableColumn("title", - "Title", - extract_title_value), - "store": TableColumn("store", - "Store", - extract_store_value), - "hash": TableColumn("hash", - "Hash", - extract_hash_value), - "ext": TableColumn("ext", - "Ext", - extract_ext_value), - "size": TableColumn("size", - "Size", - extract_size_bytes_value), - } - - -def build_display_row(item: Any, *, keys: List[str]) -> Dict[str, Any]: - """Build a dict suitable for `ResultTable.add_result()` using shared column specs.""" - out: Dict[str, - Any] = {} - for k in keys: - spec = COMMON_COLUMNS.get(k) - if spec is None: - continue - val = spec.extract(item) - out[spec.key] = val - return out - - -@dataclass -class InputOption: - """Represents an interactive input option (cmdlet argument) in a table. - - Allows users to select options that translate to cmdlet arguments, - enabling interactive configuration right from the result table. - - Example: - # Create an option for location selection - location_opt = InputOption( - "location", - type="enum", - choices=["local", "hydrus", "0x0"], - description="Download destination" - ) - - # Use in result table - table.add_input_option(location_opt) - selected = table.select_option("location") # Returns user choice - """ - - name: str - """Option name (maps to cmdlet argument)""" - type: str = "string" - """Option type: 'string', 'enum', 'flag', 'integer'""" - choices: List[str] = field(default_factory=list) - """Valid choices for enum type""" - default: Optional[str] = None - """Default value if not specified""" - description: str = "" - """Description of what this option does""" - validator: Optional[Callable[[str], bool]] = None - """Optional validator function: takes value, returns True if valid""" - - def to_dict(self) -> Dict[str, Any]: - """Convert to dictionary.""" - return { - "name": self.name, - "type": self.type, - "choices": self.choices if self.choices else None, - "default": self.default, - "description": self.description, - } - - -@dataclass -class TUIResultCard: - """Represents a result as a UI card with title, metadata, and actions. - - Used in hub-ui and TUI contexts to render individual search results - as grouped components with visual structure. - """ - - title: str - subtitle: Optional[str] = None - metadata: Optional[Dict[str, str]] = None - media_kind: Optional[str] = None - tag: Optional[List[str]] = None - file_hash: Optional[str] = None - file_size: Optional[str] = None - duration: Optional[str] = None - - def __post_init__(self): - """Initialize default values.""" - if self.metadata is None: - self.metadata = {} - if self.tag is None: - self.tag = [] - - -@dataclass -class ResultColumn: - """Represents a single column in a result table.""" - - name: str - value: str - width: Optional[int] = None - - def __str__(self) -> str: - """String representation of the column.""" - return f"{self.name}: {self.value}" - - def to_dict(self) -> Dict[str, str]: - """Convert to dictionary.""" - return { - "name": self.name, - "value": self.value - } - - -@dataclass -class ResultRow: - """Represents a single row in a result table.""" - - columns: List[ResultColumn] = field(default_factory=list) - selection_args: Optional[List[str]] = None - """Arguments to use for this row when selected via @N syntax (e.g., ['-item', '3'])""" - selection_action: Optional[List[str]] = None - """Full expanded stage tokens that should run when this row is selected.""" - source_index: Optional[int] = None - """Original insertion order index (used to map sorted views back to source items).""" - payload: Optional[Any] = None - """Original object that contributed to this row.""" - - def add_column(self, name: str, value: Any) -> None: - """Add a column to this row.""" - # Normalize column header names. - normalized_name = str(name or "").strip() - if normalized_name.lower() == "name": - normalized_name = "Title" - - str_value = _sanitize_cell_text(value) - - # Normalize extension columns globally and cap to 5 characters - if normalized_name.lower() == "ext": - str_value = str_value.strip().lstrip(".") - for idx, ch in enumerate(str_value): - if not ch.isalnum(): - str_value = str_value[:idx] - break - str_value = str_value[:5] - - # Normalize Duration columns: providers often pass raw seconds. - if normalized_name.lower() == "duration": - formatted = _format_duration_hms(value) - if formatted: - str_value = formatted - - self.columns.append(ResultColumn(normalized_name, str_value)) - - def get_column(self, name: str) -> Optional[str]: - """Get column value by name.""" - for col in self.columns: - if col.name.lower() == name.lower(): - return col.value - return None - - def to_dict(self) -> List[Dict[str, str]]: - """Convert to list of column dicts.""" - return [col.to_dict() for col in self.columns] - - def to_list(self) -> List[tuple[str, str]]: - """Convert to list of (name, value) tuples.""" - return [(col.name, col.value) for col in self.columns] - - def __str__(self) -> str: - """String representation of the row.""" - return " | ".join(str(col) for col in self.columns) - - -class ResultTable: - """Unified table formatter for search results, metadata, and pipeline objects. - - Provides a structured way to display results in the CLI with consistent formatting. - Handles conversion from various result types (SearchResult, PipeObject, dicts) into - a formatted table with rows and columns. - - Example: - >>> result_table = ResultTable("Search Results") - >>> row = result_table.add_row() - >>> row.add_column("File", "document.pdf") - >>> row.add_column("Size", "2.5 MB") - >>> row.add_column("Tag", "pdf, document") - >>> print(result_table) - """ - - def __init__( - self, - title: str = "", - title_width: int = 80, - max_columns: Optional[int] = None, - preserve_order: bool = False, - ): - """Initialize a result table. - - Args: - title: Optional title for the table - title_width: Width for formatting the title line - max_columns: Maximum number of columns to display (None for unlimited, default: 5 for search results) - preserve_order: When True, skip automatic sorting so row order matches source - """ - self.title = title - try: - from SYS import pipeline as ctx - - cmdlet_name = "" - try: - cmdlet_name = ( - ctx.get_current_cmdlet_name("") - if hasattr(ctx, - "get_current_cmdlet_name") else "" - ) - except Exception: - cmdlet_name = "" - - stage_text = "" - try: - stage_text = ( - ctx.get_current_stage_text("") - if hasattr(ctx, - "get_current_stage_text") else "" - ) - except Exception: - stage_text = "" - - if cmdlet_name and stage_text: - normalized_cmd = str(cmdlet_name).replace("_", "-").strip().lower() - normalized_title = str(self.title or "").strip().lower() - normalized_stage = str(stage_text).strip() - if normalized_stage and normalized_stage.lower().startswith( - normalized_cmd): - if (not normalized_title) or normalized_title.replace( - "_", - "-").startswith(normalized_cmd): - self.title = normalized_stage - except Exception: - pass - self.title_width = title_width - self.max_columns = ( - max_columns if max_columns is not None else 5 - ) # Default 5 for cleaner display - self.rows: List[ResultRow] = [] - self.column_widths: Dict[str, - int] = {} - self.input_options: Dict[str, - InputOption] = {} - """Options available for user input (cmdlet arguments)""" - self.source_command: Optional[str] = None - """Command that generated this table (e.g., 'download-file URL')""" - self.source_args: List[str] = [] - """Base arguments for the source command""" - self.header_lines: List[str] = [] - """Optional metadata lines rendered under the title""" - self.preserve_order: bool = preserve_order - """If True, skip automatic sorting so display order matches input order.""" - self.no_choice: bool = False - """When True, suppress row numbers/selection to make the table non-interactive.""" - self.table: Optional[str] = None - """Table type (e.g., 'youtube', 'soulseek') for context-aware selection logic.""" - - self.table_metadata: Dict[str, Any] = {} - """Optional provider/table metadata (e.g., provider name, view).""" - - self.value_case: str = "lower" - """Display-only value casing: 'lower' (default), 'upper', or 'preserve'.""" - - def set_value_case(self, value_case: str) -> "ResultTable": - """Configure display-only casing for rendered cell values.""" - case = str(value_case or "").strip().lower() - if case not in {"lower", - "upper", - "preserve"}: - case = "lower" - self.value_case = case - return self - - def _apply_value_case(self, text: str) -> str: - if not text: - return "" - if self.value_case == "upper": - return text.upper() - if self.value_case == "preserve": - return text - return text.lower() - - def set_table(self, table: str) -> "ResultTable": - """Set the table type for context-aware selection logic.""" - self.table = table - return self - - def set_table_metadata(self, metadata: Optional[Dict[str, Any]]) -> "ResultTable": - """Attach provider/table metadata for downstream selection logic.""" - self.table_metadata = dict(metadata or {}) - return self - - def get_table_metadata(self) -> Dict[str, Any]: - """Return attached provider/table metadata (copy to avoid mutation).""" - try: - return dict(self.table_metadata) - except Exception: - return {} - - def set_no_choice(self, no_choice: bool = True) -> "ResultTable": - """Mark the table as non-interactive (no row numbers, no selection parsing).""" - self.no_choice = bool(no_choice) - return self - - def set_preserve_order(self, preserve: bool = True) -> "ResultTable": - """Configure whether this table should skip automatic sorting.""" - self.preserve_order = bool(preserve) - return self - - def add_row(self) -> ResultRow: - """Add a new row to the table and return it for configuration.""" - row = ResultRow() - row.source_index = len(self.rows) - self.rows.append(row) - return row - - def set_source_command( - self, - command: str, - args: Optional[List[str]] = None - ) -> "ResultTable": - """Set the source command that generated this table. - - This is used for @N expansion: when user runs @2 | next-cmd, it will expand to: - source_command + source_args + row_selection_args | next-cmd - - Args: - command: Command name (e.g., 'download-file') - args: Base arguments for the command (e.g., ['URL']) - - Returns: - Self for chaining - """ - self.source_command = command - self.source_args = args or [] - return self - - def init_command( - self, - title: str, - command: str, - args: Optional[List[str]] = None, - preserve_order: bool = False, - ) -> "ResultTable": - """Initialize table with title, command, args, and preserve_order in one call. - - Consolidates common initialization pattern: ResultTable(title) + set_source_command(cmd, args) + set_preserve_order(preserve_order) - - Args: - title: Table title - command: Source command name - args: Command arguments - preserve_order: Whether to preserve input row order - - Returns: - self for method chaining - """ - self.title = title - self.source_command = command - self.source_args = args or [] - self.preserve_order = preserve_order - return self - - def copy_with_title(self, new_title: str) -> "ResultTable": - """Create a new table copying settings from this one but with a new title. - - Consolidates pattern: new_table = ResultTable(title); new_table.set_source_command(...) - Useful for intermediate processing that needs to preserve source command but update display title. - - Args: - new_title: New title for the copied table - - Returns: - New ResultTable with copied settings and new title - """ - new_table = ResultTable( - title=new_title, - title_width=self.title_width, - max_columns=self.max_columns, - preserve_order=self.preserve_order, - ) - new_table.source_command = self.source_command - new_table.source_args = list(self.source_args) if self.source_args else [] - new_table.input_options = dict(self.input_options) if self.input_options else {} - new_table.no_choice = self.no_choice - new_table.table = self.table - new_table.table_metadata = ( - dict(self.table_metadata) if getattr(self, "table_metadata", None) else {} - ) - new_table.header_lines = list(self.header_lines) if self.header_lines else [] - return new_table - - def set_row_selection_args(self, row_index: int, selection_args: List[str]) -> None: - """Set the selection arguments for a specific row. - - When user selects this row via @N, these arguments will be appended to the - source command to re-execute with that item selected. - - Args: - row_index: Index of the row (0-based) - selection_args: Arguments to use (e.g., ['-item', '3']) - """ - if 0 <= row_index < len(self.rows): - self.rows[row_index].selection_args = selection_args - - def set_row_selection_action(self, row_index: int, selection_action: List[str]) -> None: - """Specify the entire stage tokens to run for this row on @N.""" - if 0 <= row_index < len(self.rows): - self.rows[row_index].selection_action = selection_action - - def set_header_lines(self, lines: List[str]) -> "ResultTable": - """Attach metadata lines that render beneath the title.""" - self.header_lines = [line for line in lines if line] - return self - - def set_header_line(self, line: str) -> "ResultTable": - """Attach a single metadata line beneath the title.""" - return self.set_header_lines([line] if line else []) - - def set_storage_summary( - self, - storage_counts: Dict[str, - int], - filter_text: Optional[str] = None, - inline: bool = False, - ) -> str: - """Render a storage count summary (e.g., "Hydrus:0 Local:1 | filter: \"q\""). - - Returns the summary string so callers can place it inline with the title if desired. - """ - summary_parts: List[str] = [] - - if storage_counts: - summary_parts.append( - " ".join(f"{name}:{count}" for name, count in storage_counts.items()) - ) - - if filter_text: - safe_filter = filter_text.replace('"', '\\"') - summary_parts.append(f'filter: "{safe_filter}"') - - summary = " | ".join(summary_parts) - if not inline: - self.set_header_line(summary) - return summary - - def sort_by_title(self) -> "ResultTable": - """Sort rows alphabetically by Title or Name column. - - Looks for columns named 'Title', 'Name', or 'Tag' (in that order). - Case-insensitive sort. Returns self for chaining. - - NOTE: This only affects display order. Each row keeps its original - `source_index` (insertion order) for callers that need stable mapping. - """ - if getattr(self, "preserve_order", False): - return self - # Find the title column (try Title, Name, Tag in order) - title_col_idx = None - for row in self.rows: - if not row.columns: - continue - for idx, col in enumerate(row.columns): - col_lower = col.name.lower() - if col_lower in ("title", "name", "tag"): - title_col_idx = idx - break - if title_col_idx is not None: - break - - if title_col_idx is None: - # No title column found, return unchanged - return self - - # Sort rows by the title column value (case-insensitive) - self.rows.sort( - key=lambda row: ( - row.columns[title_col_idx].value.lower() - if title_col_idx < len(row.columns) else "" - ) - ) - - return self - - def add_result(self, result: Any) -> "ResultTable": - """Add a result object (SearchResult, PipeObject, ResultItem, TagItem, or dict) as a row. - - Args: - result: Result object to add - - Returns: - Self for chaining - """ - row = self.add_row() - row.payload = result - - # Handle TagItem from get_tag.py (tag display with index) - if hasattr(result, "__class__") and result.__class__.__name__ == "TagItem": - self._add_tag_item(row, result) - # Handle ResultItem from search_file.py (compact display) - elif hasattr(result, "__class__") and result.__class__.__name__ == "ResultItem": - self._add_result_item(row, result) - # Handle SearchResult from search_file.py - elif hasattr(result, - "__class__") and result.__class__.__name__ == "SearchResult": - self._add_search_result(row, result) - # Handle PipeObject from models.py - elif hasattr(result, "__class__") and result.__class__.__name__ == "PipeObject": - self._add_pipe_object(row, result) - # Handle dict - elif isinstance(result, dict): - self._add_dict(row, result) - # Handle generic objects with __dict__ - elif hasattr(result, "__dict__"): - self._add_generic_object(row, result) - # Handle strings (simple text result) - elif isinstance(result, str): - row.add_column("Result", result) - - return self - - def get_row_payload(self, row_index: int) -> Optional[Any]: - """Return the original payload for the row at ``row_index`` if available.""" - if 0 <= row_index < len(self.rows): - return getattr(self.rows[row_index], "payload", None) - return None - - def get_payloads(self) -> List[Any]: - """Return the payloads for every row, preserving table order.""" - payloads: List[Any] = [] - for row in self.rows: - payload = getattr(row, "payload", None) - if payload is not None: - payloads.append(payload) - return payloads - - def _add_search_result(self, row: ResultRow, result: Any) -> None: - """Extract and add SearchResult fields to row.""" - cols = getattr(result, "columns", None) - used_explicit_columns = False - if cols: - used_explicit_columns = True - for name, value in cols: - row.add_column(name, value) - else: - # Core fields (legacy fallback) - title = getattr(result, "title", "") - table = str(getattr(result, "table", "") or "").lower() - - # Handle extension separation for local files - extension = "" - if title and table == "local": - path_obj = Path(title) - if path_obj.suffix: - extension = path_obj.suffix.lstrip(".") - title = path_obj.stem - - if title: - row.add_column("Title", title) - - # Extension column - row.add_column("Ext", extension) - - if hasattr(result, "table") and getattr(result, "table", None): - row.add_column("Source", str(getattr(result, "table"))) - - if hasattr(result, "detail") and result.detail: - row.add_column("Detail", result.detail) - - if hasattr(result, "media_kind") and result.media_kind: - row.add_column("Type", result.media_kind) - - # Tag summary - if hasattr(result, "tag_summary") and result.tag_summary: - row.add_column("Tag", str(result.tag_summary)) - - # Duration (for media) - if hasattr(result, "duration_seconds") and result.duration_seconds: - dur = _format_duration_hms(result.duration_seconds) - row.add_column("Duration", dur or str(result.duration_seconds)) - - # Size (for files) - if hasattr(result, "size_bytes") and result.size_bytes: - row.add_column("Size", _format_size(result.size_bytes, integer_only=False)) - - # Annotations - if hasattr(result, "annotations") and result.annotations: - row.add_column("Annotations", ", ".join(str(a) for a in result.annotations)) - - try: - md = getattr(result, "full_metadata", None) - md_dict = dict(md) if isinstance(md, dict) else {} - except Exception: - md_dict = {} - - try: - selection_args = getattr(result, "selection_args", None) - except Exception: - selection_args = None - if selection_args is None: - selection_args = md_dict.get("_selection_args") or md_dict.get("selection_args") - if selection_args: - row.selection_args = [str(a) for a in selection_args if a is not None] - - try: - selection_action = getattr(result, "selection_action", None) - except Exception: - selection_action = None - if selection_action is None: - selection_action = md_dict.get("_selection_action") or md_dict.get("selection_action") - if selection_action: - row.selection_action = [str(a) for a in selection_action if a is not None] - - def _add_result_item(self, row: ResultRow, item: Any) -> None: - """Extract and add ResultItem fields to row (compact display for search results). - - Shows only essential columns: - - Title (required) - - Ext (extension) - - Storage (source backend) - - Size (formatted MB, integer only) - - All other fields are stored in item but not displayed to keep table compact. - Use @row# syntax to pipe full item data to next command. - """ - # Title (required) - title = getattr(item, "title", None) or "Unknown" - table = str(getattr(item, - "table", - "") or getattr(item, - "store", - "") or "").lower() - - # Handle extension separation for local files - extension = "" - if title and table == "local": - # Try to split extension - path_obj = Path(title) - if path_obj.suffix: - extension = path_obj.suffix.lstrip(".") - title = path_obj.stem - - if title: - row.add_column("Title", title) - - # Extension column - always add to maintain column order - row.add_column("Ext", extension) - - # Storage (source backend - hydrus, local, debrid, etc) - if getattr(item, "table", None): - row.add_column("Storage", str(getattr(item, "table"))) - elif getattr(item, "store", None): - row.add_column("Storage", str(getattr(item, "store"))) - - # Size (for files) - if hasattr(item, "size_bytes") and item.size_bytes: - row.add_column("Size", _format_size(item.size_bytes, integer_only=False)) - - def _add_tag_item(self, row: ResultRow, item: Any) -> None: - """Extract and add TagItem fields to row (compact tag display). - - Shows the Tag column with the tag name and Source column to identify - which storage backend the tag values come from (Hydrus, local, etc.). - All data preserved in TagItem for piping and operations. - Tag row selection is handled by the CLI pipeline (e.g. `@N | ...`). - """ - # Tag name - if hasattr(item, "tag_name") and item.tag_name: - row.add_column("Tag", item.tag_name) - - # Source/Store (where the tag values come from) - if hasattr(item, "source") and item.source: - row.add_column("Store", item.source) - - def _add_pipe_object(self, row: ResultRow, obj: Any) -> None: - """Extract and add PipeObject fields to row.""" - # Source and identifier - if hasattr(obj, "source") and obj.source: - row.add_column("Source", obj.source) - - # Title - if hasattr(obj, "title") and obj.title: - row.add_column("Title", obj.title) - - # File info - if hasattr(obj, "path") and obj.path: - row.add_column("Path", str(obj.path)) - - # Tag - if hasattr(obj, "tag") and obj.tag: - tag_str = ", ".join(obj.tag[:3]) # First 3 tag values - if len(obj.tag) > 3: - tag_str += f", +{len(obj.tag) - 3} more" - row.add_column("Tag", tag_str) - - # Duration - if hasattr(obj, "duration") and obj.duration: - dur = _format_duration_hms(obj.duration) - row.add_column("Duration", dur or str(obj.duration)) - - # Warnings - if hasattr(obj, "warnings") and obj.warnings: - warnings_str = "; ".join(obj.warnings[:2]) - if len(obj.warnings) > 2: - warnings_str += f" (+{len(obj.warnings) - 2} more)" - row.add_column("Warnings", warnings_str) - - def _add_dict(self, row: ResultRow, data: Dict[str, Any]) -> None: - """Extract and add dict fields to row using first-match priority groups. - - Respects max_columns limit to keep table compact and readable. - - Special handling for 'columns' field: if present, uses it to populate row columns - instead of treating it as a regular field. This allows dynamic column definitions - from search providers. - - Priority field groups (first match per group): - - title | name | filename - - store | table | source - - size | size_bytes - - ext - """ - - # Helper to determine if a field should be hidden from display - def is_hidden_field(field_name: Any) -> bool: - # Hide internal/metadata fields - hidden_fields = { - "__", - "id", - "action", - "parent_id", - "is_temp", - "path", - "extra", - "target", - "hash", - "hash_hex", - "file_hash", - "tag", - "tag_summary", - } - if isinstance(field_name, str): - if field_name.startswith("__"): - return True - if field_name in hidden_fields: - return True - return False - - # Strip out hidden metadata fields (prefixed with __) - visible_data = { - k: v - for k, v in data.items() if not is_hidden_field(k) - } - - # Normalize common fields using shared extractors so nested metadata/path values work. - # This keeps Ext/Size/Store consistent across all dict-based result sources. - try: - store_extracted = extract_store_value(data) - if (store_extracted and "store" not in visible_data - and "table" not in visible_data and "source" not in visible_data): - visible_data["store"] = store_extracted - except Exception: - pass - - try: - ext_extracted = extract_ext_value(data) - # Always ensure `ext` exists so priority_groups keeps a stable column. - visible_data["ext"] = str(ext_extracted or "") - except Exception: - visible_data.setdefault("ext", "") - - try: - size_extracted = extract_size_bytes_value(data) - if (size_extracted is not None and "size_bytes" not in visible_data - and "size" not in visible_data): - visible_data["size_bytes"] = size_extracted - except Exception: - pass - - # Handle extension separation for local files - store_val = str( - visible_data.get("store", - "") or visible_data.get("table", - "") - or visible_data.get("source", - "") - ).lower() - - # Debug logging - # print(f"DEBUG: Processing dict result. Store: {store_val}, Keys: {list(visible_data.keys())}") - - if store_val == "local": - # Find title field - title_field = next( - (f for f in ["title", "name", "filename"] if f in visible_data), - None - ) - if title_field: - title_val = str(visible_data[title_field]) - path_obj = Path(title_val) - if path_obj.suffix: - extension = path_obj.suffix.lstrip(".") - visible_data[title_field] = path_obj.stem - visible_data["ext"] = extension - # print(f"DEBUG: Split extension. Title: {visible_data[title_field]}, Ext: {extension}") - else: - visible_data["ext"] = "" - - # Ensure 'ext' is present so it gets picked up by priority_groups in correct order - if "ext" not in visible_data: - visible_data["ext"] = "" - - # Track which fields we've already added to avoid duplicates - added_fields = set() - column_count = 0 # Track total columns added - - # Helper function to format values - def format_value(value: Any) -> str: - if isinstance(value, list): - formatted = ", ".join(str(v) for v in value[:3]) - if len(value) > 3: - formatted += f", +{len(value) - 3} more" - return formatted - return str(value) - - # Special handling for 'columns' field from search providers - # If present, use it to populate row columns dynamically - if ("columns" in visible_data and isinstance(visible_data["columns"], - list) and visible_data["columns"]): - try: - for col_name, col_value in visible_data["columns"]: - # Skip the "#" column as ResultTable already adds row numbers - if col_name == "#": - continue - if column_count >= self.max_columns: - break - # When providers supply raw numeric fields, keep formatting consistent. - if isinstance(col_name, str) and col_name.strip().lower() == "size": - try: - if col_value is None or str(col_value).strip() == "": - col_value_str = "" - else: - col_value_str = _format_size( - col_value, - integer_only=False - ) - except Exception: - col_value_str = format_value(col_value) - elif isinstance(col_name, - str) and col_name.strip().lower() == "duration": - try: - if col_value is None or str(col_value).strip() == "": - col_value_str = "" - else: - dur = _format_duration_hms(col_value) - col_value_str = dur or format_value(col_value) - except Exception: - col_value_str = format_value(col_value) - else: - col_value_str = format_value(col_value) - row.add_column(col_name, col_value_str) - added_fields.add(col_name.lower()) - column_count += 1 - # Mark 'columns' as handled so we don't add it as a field - added_fields.add("columns") - # Also mark common fields that shouldn't be re-displayed if they're in columns - # This prevents showing both "Store" (from columns) and "Store" (from data fields) - added_fields.add("table") - added_fields.add("source") - added_fields.add("target") - added_fields.add("path") - added_fields.add("media_kind") - added_fields.add("detail") - added_fields.add("annotations") - added_fields.add( - "full_metadata" - ) # Don't display full metadata as column - except Exception: - # Fall back to regular field handling if columns format is unexpected - pass - - # Only add priority groups if we haven't already filled columns from 'columns' field - if column_count == 0: - # Explicitly set which columns to display in order - priority_groups = [ - ("title", - ["title", - "name", - "filename"]), - ("store", - ["store", - "table", - "source"]), - ("size", - ["size", - "size_bytes"]), - ("ext", - ["ext"]), - ] - - # Add priority field groups first - use first match in each group - for _group_label, field_options in priority_groups: - if column_count >= self.max_columns: - break - for field in field_options: - if field in visible_data and field not in added_fields: - # Special handling for size fields - format with unit and decimals - if field in ["size", "size_bytes"]: - value_str = _format_size( - visible_data[field], - integer_only=False - ) - else: - value_str = format_value(visible_data[field]) - - # Map field names to display column names - if field in ["store", "table", "source"]: - col_name = "Store" - elif field in ["size", "size_bytes"]: - col_name = "Size" - elif field in ["title", "name", "filename"]: - col_name = "Title" - else: - col_name = field.replace("_", " ").title() - - row.add_column(col_name, value_str) - added_fields.add(field) - column_count += 1 - break # Use first match in this group, skip rest - - # Add remaining fields only if we haven't hit max_columns (and no explicit columns were set) - # Don't add any remaining fields - only use priority_groups for dict results - - # Check for selection args - if "_selection_args" in data: - row.selection_args = data["_selection_args"] - # Don't display it - added_fields.add("_selection_args") - - def _add_generic_object(self, row: ResultRow, obj: Any) -> None: - """Extract and add fields from generic objects.""" - if hasattr(obj, "__dict__"): - for key, value in obj.__dict__.items(): - if key.startswith("_"): # Skip private attributes - continue - - row.add_column(key.replace("_", " ").title(), str(value)) - - def to_rich(self): - """Return a Rich renderable representing this table.""" - if not self.rows: - empty = Text("No results") - return Panel(empty, title=self.title) if self.title else empty - - col_names: List[str] = [] - seen: Set[str] = set() - for row in self.rows: - for col in row.columns: - if col.name not in seen: - seen.add(col.name) - col_names.append(col.name) - - table = RichTable( - show_header=True, - header_style="bold", - box=SIMPLE, - expand=True, - show_lines=False, - ) - - if not self.no_choice: - table.add_column("#", justify="right", no_wrap=True) - - # Render headers in uppercase, but keep original column keys for lookup. - header_by_key: Dict[str, - str] = { - name: str(name).upper() - for name in col_names - } - - for name in col_names: - header = header_by_key.get(name, str(name).upper()) - if name.lower() == "ext": - table.add_column(header, no_wrap=True) - else: - table.add_column(header) - - for row_idx, row in enumerate(self.rows, 1): - cells: List[str] = [] - if not self.no_choice: - cells.append(str(row_idx)) - for name in col_names: - val = row.get_column(name) or "" - cells.append(self._apply_value_case(_sanitize_cell_text(val))) - table.add_row(*cells) - - if self.title or self.header_lines: - header_bits = [Text(line) for line in (self.header_lines or [])] - renderable = Group(*header_bits, table) if header_bits else table - return Panel(renderable, title=self.title) if self.title else renderable - - return table - - def format_compact(self) -> str: - """Format table in compact form (one line per row). - - Returns: - Formatted table string - """ - lines = [] - - if self.title: - lines.append(f"\n{self.title}") - lines.append("-" * len(self.title)) - - for i, row in enumerate(self.rows, 1): - row_str = " | ".join(str(col) for col in row.columns) - lines.append(f"{i}. {row_str}") - - return "\n".join(lines) - - def format_json(self) -> str: - """Format table as JSON. - - Returns: - JSON string - """ - data = { - "title": self.title, - "row_count": len(self.rows), - "rows": [row.to_list() for row in self.rows], - } - return json.dumps(data, indent=2) - - def to_dict(self) -> Dict[str, Any]: - """Convert table to dictionary. - - Returns: - Dictionary representation - """ - return { - "title": self.title, - "rows": [row.to_list() for row in self.rows] - } - - def __str__(self) -> str: - """String representation. - - Rich is the primary rendering path. This keeps accidental `print(table)` - usage from emitting ASCII box-drawn tables. - """ - label = self.title or "ResultTable" - return f"{label} ({len(self.rows)} rows)" - - def __rich__(self): - return self.to_rich() - - def __repr__(self) -> str: - """Developer representation.""" - return f"ResultTable(title={self.title!r}, rows={len(self.rows)})" - - def __len__(self) -> int: - """Number of rows in the table.""" - return len(self.rows) - - def __iter__(self): - """Iterate over rows.""" - return iter(self.rows) - - def __getitem__(self, index: int) -> ResultRow: - """Get row by index.""" - return self.rows[index] - - def select_interactive( - self, - prompt: str = "Select an item", - accept_args: bool = False - ) -> Optional[List[int]] | dict: - """Display table and get interactive user selection (single or multiple). - - Supports multiple input formats: - - Single: "5" or "q" to quit - - Range: "3-5" (selects items 3, 4, 5) - - Multiple: "3,5,13" (selects items 3, 5, and 13) - - Combined: "1-3,7,9-11" (selects 1,2,3,7,9,10,11) - - If accept_args=True, also supports cmdlet arguments: - - "5 -storage hydrus" → returns indices [4] + args {"-storage": "hydrus"} - - "2-4 -storage hydrus -tag important" → returns indices [1,2,3] + multiple args - - Args: - prompt: Custom prompt text - accept_args: If True, parse and return cmdlet arguments from input - - Returns: - If accept_args=False: List of 0-based indices, or None if cancelled - If accept_args=True: Dict with "indices" and "args" keys, or None if cancelled - """ - if self.no_choice: - from SYS.rich_display import stdout_console - - stdout_console().print(self) - stdout_console().print(Panel(Text("Selection is disabled for this table."))) - return None - - # Display the table - from SYS.rich_display import stdout_console - - stdout_console().print(self) - - # Get user input - while True: - try: - if accept_args: - choice = Prompt.ask( - f"{prompt} (e.g., '5' or '2 -storage hydrus' or 'q' to quit)" - ).strip() - else: - choice = Prompt.ask( - f"{prompt} (e.g., '5' or '3-5' or '1,3,5' or 'q' to quit)" - ).strip() - - if choice.lower() == "q": - return None - - if accept_args: - # Parse selection and arguments - result = self._parse_selection_with_args(choice) - if result is not None: - return result - stdout_console().print( - Panel( - Text( - "Invalid format. Use: selection (5 or 3-5 or 1,3,5) optionally followed by flags (e.g., '5 -storage hydrus')." - ) - ) - ) - else: - # Parse just the selection - selected_indices = self._parse_selection(choice) - if selected_indices is not None: - return selected_indices - stdout_console().print( - Panel( - Text( - "Invalid format. Use: single (5), range (3-5), list (1,3,5), combined (1-3,7,9-11), or 'q' to quit." - ) - ) - ) - except (ValueError, EOFError): - if accept_args: - stdout_console().print( - Panel( - Text( - "Invalid format. Use: selection (5 or 3-5 or 1,3,5) optionally followed by flags (e.g., '5 -storage hydrus')." - ) - ) - ) - else: - stdout_console().print( - Panel( - Text( - "Invalid format. Use: single (5), range (3-5), list (1,3,5), combined (1-3,7,9-11), or 'q' to quit." - ) - ) - ) - - def _parse_selection(self, selection_str: str) -> Optional[List[int]]: - """Parse user selection string into list of 0-based indices. - - Supports: - - Single: "5" → [4] - - Range: "3-5" → [2, 3, 4] - - Multiple: "3,5,13" → [2, 4, 12] - - Combined: "1-3,7,9-11" → [0, 1, 2, 6, 8, 9, 10] - - Args: - selection_str: User input string - - Returns: - List of 0-based indices, or None if invalid - """ - if self.no_choice: - return None - - indices = set() - - # Split by comma for multiple selections - parts = selection_str.split(",") - - for part in parts: - part = part.strip() - if not part: - continue - - # Check if it's a range (contains dash) - if "-" in part: - # Handle ranges like "3-5" - try: - range_parts = part.split("-") - if len(range_parts) != 2: - return None - - start = int(range_parts[0].strip()) - end = int(range_parts[1].strip()) - - # Validate range - if start < 1 or end < 1 or start > len(self.rows) or end > len( - self.rows): - return None - - if start > end: - start, end = end, start - - # Add all indices in range (convert to 0-based) - for i in range(start, end + 1): - indices.add(i - 1) - - except (ValueError, IndexError): - return None - else: - # Single number - try: - num = int(part) - if num < 1 or num > len(self.rows): - return None - indices.add(num - 1) # Convert to 0-based - except ValueError: - return None - - if not indices: - return None - - # Return sorted list - return sorted(list(indices)) - - def _parse_selection_with_args(self, input_str: str) -> Optional[dict]: - """Parse user input into selection indices and cmdlet arguments. - - Supports formats like: - - "5" → {"indices": [4], "args": {}} - - "2 -storage hydrus" → {"indices": [1], "args": {"-storage": "hydrus"}} - - "3-5 -storage hydrus -tag important" → {"indices": [2,3,4], "args": {"-storage": "hydrus", "-tag": "important"}} - - Args: - input_str: User input string with selection and optional flags - - Returns: - Dict with "indices" and "args" keys, or None if invalid - """ - parts = input_str.split() - if not parts: - return None - - # First part should be the selection - selection_str = parts[0] - selected_indices = self._parse_selection(selection_str) - - if selected_indices is None: - return None - - # Remaining parts are cmdlet arguments - cmdlet_args = {} - i = 1 - while i < len(parts): - part = parts[i] - - # Check if it's a flag (starts with -) - if part.startswith("-"): - flag = part - value = None - - # Get the value if it exists and doesn't start with - - if i + 1 < len(parts) and not parts[i + 1].startswith("-"): - value = parts[i + 1] - i += 2 - else: - i += 1 - - # Store the flag - if value is not None: - cmdlet_args[flag] = value - else: - cmdlet_args[flag] = True # Flag without value - else: - i += 1 - - return { - "indices": selected_indices, - "args": cmdlet_args - } - - def add_input_option(self, option: InputOption) -> "ResultTable": - """Add an interactive input option to the table. - - Input options allow users to specify cmdlet arguments interactively, - like choosing a download location or source. - - Args: - option: InputOption definition - - Returns: - Self for chaining - """ - self.input_options[option.name] = option - return self - - def select_option(self, option_name: str, prompt: str = "") -> Optional[str]: - """Interactively get user input for a specific option. - - Displays the option choices (if enum) and prompts user for input. - - Args: - option_name: Name of the option to get input for - prompt: Custom prompt text (uses option description if not provided) - - Returns: - User's selected/entered value, or None if cancelled - """ - if option_name not in self.input_options: - print(f"Unknown option: {option_name}") - return None - - option = self.input_options[option_name] - prompt_text = prompt or option.description or option_name - - while True: - try: - # For enum options, show choices - if option.type == "enum" and option.choices: - print(f"\n{prompt_text}") - for i, choice in enumerate(option.choices, 1): - print(f" {i}. {choice}") - - choice_input = input( - f"Select {option_name} (1-{len(option.choices)}, or 'q' to cancel): " - ).strip() - - if choice_input.lower() == "q": - return None - - try: - idx = int(choice_input) - 1 - if 0 <= idx < len(option.choices): - return option.choices[idx] - print(f"Invalid choice. Enter 1-{len(option.choices)}") - except ValueError: - print(f"Invalid choice. Enter 1-{len(option.choices)}") - - # For string/integer options, get direct input - elif option.type in ("string", "integer"): - value = input(f"{prompt_text} (or 'q' to cancel): ").strip() - - if value.lower() == "q": - return None - - # Validate if validator provided - if option.validator and not option.validator(value): - print(f"Invalid value for {option_name}") - continue - - # Type conversion - if option.type == "integer": - try: - int(value) - except ValueError: - print(f"Must be an integer") - continue - - return value - - # For flag options - elif option.type == "flag": - response = input(f"{prompt_text} (y/n): ").strip().lower() - if response == "q": - return None - return "true" if response in ("y", "yes", "true") else "false" - - except (ValueError, EOFError): - return None - - def get_all_options(self) -> Dict[str, str]: - """Get all input options at once with user prompts. - - Interactively prompts user for all registered options. - - Returns: - Dictionary mapping option names to selected values - """ - result = {} - for name, _option in self.input_options.items(): - value = self.select_option(name) - if value is not None: - result[name] = value - return result - - def select_by_index(self, index: int) -> Optional[ResultRow]: - """Get a row by 1-based index (user-friendly). - - Args: - index: 1-based index - - Returns: - ResultRow if valid, None otherwise - """ - idx = index - 1 - if 0 <= idx < len(self.rows): - return self.rows[idx] - return None - - # TUI-specific formatting methods - - def to_datatable_rows(self, source: str = "unknown") -> List[List[str]]: - """Convert results to rows suitable for Textual DataTable widget. - - Args: - source: Source type for formatting context (openlibrary, soulseek, etc.) - - Returns: - List of row value lists - """ - rows = [] - for result in self.rows: - row_values = self._format_datatable_row(result, source) - rows.append(row_values) - return rows - - def _format_datatable_row(self, - row: ResultRow, - source: str = "unknown") -> List[str]: - """Format a ResultRow for DataTable display. - - Args: - row: ResultRow to format - source: Source type - - Returns: - List of column values as strings - """ - # Extract values from row columns - values = [col.value for col in row.columns] - - # Truncate to reasonable lengths for table display - return [v[:60] if len(v) > 60 else v for v in values] - - def to_result_cards(self) -> List[TUIResultCard]: - """Convert all rows to TUIResultCard objects for card-based UI display. - - Returns: - List of TUIResultCard objects - """ - cards = [] - for row in self.rows: - card = self._row_to_card(row) - cards.append(card) - return cards - - def _row_to_card(self, row: ResultRow) -> TUIResultCard: - """Convert a ResultRow to a TUIResultCard. - - Args: - row: ResultRow to convert - - Returns: - TUIResultCard with extracted metadata - """ - # Build metadata dict from row columns - metadata = {} - title = "" - - for col in row.columns: - if col.name.lower() == "title": - title = col.value - metadata[col.name] = col.value - - # Extract tag values if present - tag = [] - if "Tag" in metadata: - tag_val = metadata["Tag"] - if tag_val: - tag = [t.strip() for t in tag_val.split(",")][:5] - - # Try to find useful metadata fields - subtitle = metadata.get("Artist", metadata.get("Author", "")) - media_kind = metadata.get("Type", metadata.get("Media Kind", "")) - file_size = metadata.get("Size", "") - duration = metadata.get("Duration", "") - file_hash = metadata.get("Hash", "") - - return TUIResultCard( - title=title or "Unknown", - subtitle=subtitle, - metadata=metadata, - media_kind=media_kind, - tag=tag, - file_hash=file_hash or None, - file_size=file_size or None, - duration=duration or None, - ) - - def build_metadata_tree(self, tree_widget: "Tree") -> None: - """Populate a Textual Tree widget with result metadata hierarchy. - - Args: - tree_widget: Textual Tree widget to populate - - Raises: - ImportError: If Textual not available - """ - if not TEXTUAL_AVAILABLE: - raise ImportError("Textual not available for tree building") - - tree_widget.reset(self.title or "Results") - root = tree_widget.root - - # Add each row as a top-level node - for i, row in enumerate(self.rows, 1): - row_node = root.add(f"[bold]Result {i}[/bold]") - - # Add columns as children - for col in row.columns: - value_str = col.value - if len(value_str) > 100: - value_str = value_str[:97] + "..." - row_node.add_leaf(f"[cyan]{col.name}[/cyan]: {value_str}") - - -def _format_size(size: Any, integer_only: bool = False) -> str: - """Format file size as human-readable string. - - Args: - size: Size in bytes or already formatted string - integer_only: If True, show MB as an integer (e.g., "250 MB") - - Returns: - Formatted size string with units (e.g., "3.53 MB", "0.57 MB", "1.2 GB") - """ - if isinstance(size, str): - return size if size else "" - - try: - bytes_val = int(size) - if bytes_val < 0: - return "" - - # Keep display consistent with the CLI expectation: show MB with unit - # (including values under 1 MB as fractional MB), and show GB for very - # large sizes. - if bytes_val >= 1024**3: - value = bytes_val / (1024**3) - unit = "GB" - else: - value = bytes_val / (1024**2) - unit = "MB" - - if integer_only: - return f"{int(round(value))} {unit}" - - num = f"{value:.2f}".rstrip("0").rstrip(".") - return f"{num} {unit}" - except (ValueError, TypeError): - return "" - - -def format_result(result: Any, title: str = "") -> str: - """Quick function to format a single result or list of results. - - Args: - result: Result object, list of results, or dict - title: Optional title for the table - - Returns: - Formatted string - """ - table = ResultTable(title) - - if isinstance(result, list): - for item in result: - table.add_result(item) - else: - table.add_result(result) - - return str(table) -