diff --git a/SYS/result_table.py b/SYS/result_table.py index 843f1e3..8dfd836 100644 --- a/SYS/result_table.py +++ b/SYS/result_table.py @@ -1828,3 +1828,153 @@ def format_result(result: Any, title: str = "") -> str: table.add_result(result) return str(table) + +def extract_item_metadata(item: Any) -> Dict[str, Any]: + """Extract a comprehensive set of metadata from an item for the ItemDetailView.""" + if item is None: + return {} + + out = {} + + # Use existing extractors from match-standard result table columns + title = extract_title_value(item) + if title: out["Title"] = title + + hv = extract_hash_value(item) + if hv: out["Hash"] = hv + + store = extract_store_value(item) + if store: out["Store"] = store + + # Path/Target + data = _as_dict(item) or {} + path = data.get("path") or data.get("target") or data.get("filename") + if path: out["Path"] = path + + ext = extract_ext_value(item) + if ext: out["Ext"] = ext + + size = extract_size_bytes_value(item) + if size: out["Size"] = size + + # Duration + dur = _get_first_dict_value(data, ["duration_seconds", "duration"]) + if dur: + out["Duration"] = _format_duration_hms(dur) + + # URL + url = _get_first_dict_value(data, ["url", "URL"]) + if url: out["Url"] = url + + # Relationships + rels = _get_first_dict_value(data, ["relationships", "rel"]) + if rels: out["Relations"] = rels + + # Tags Summary + tags = _get_first_dict_value(data, ["tags", "tag"]) + if tags: out["Tags"] = tags + + return out + + +class ItemDetailView(ResultTable): + """A specialized view that displays item details alongside a list of related items (tags, urls, etc). + + This is used for 'get-tag', 'get-url' and similar cmdlets where we want to contextually show + what is being operated on (the main item) along with the selection list. + """ + + def __init__( + self, + title: str = "", + item_metadata: Optional[Dict[str, Any]] = None, + **kwargs + ): + super().__init__(title, **kwargs) + self.item_metadata = item_metadata or {} + + def to_rich(self): + """Render the item details panel above the standard results table.""" + from rich.table import Table as RichTable + from rich.panel import Panel + from rich.console import Group, Columns + from rich.text import Text + + # 1. Create Detail Grid + details_table = RichTable(show_header=False, box=None, padding=(0, 2), expand=True) + details_table.add_column("Key", style="bold cyan", justify="right", width=12) + details_table.add_column("Value") + + # Canonical display order for metadata + order = ["Title", "Hash", "Store", "Path", "Ext", "Size", "Duration", "Url", "Relations"] + + has_details = False + # Add ordered items first + for key in order: + val = self.item_metadata.get(key) or self.item_metadata.get(key.lower()) or self.item_metadata.get(key.upper()) + + # Special formatting for certain types + if key == "Size" and val and isinstance(val, (int, float, str)) and str(val).isdigit(): + val = _format_size(int(val), integer_only=False) + + if key == "Relations" and isinstance(val, list) and val: + if isinstance(val[0], dict): + val = "\n".join([f"[dim]→[/dim] {r.get('type','rel')}: {r.get('title','?')}" for r in val]) + else: + val = "\n".join([f"[dim]→[/dim] {r}" for r in val]) + + if val: + details_table.add_row(f"{key}:", str(val)) + has_details = True + elif key in ["Url", "Relations"]: + # User requested for these if blank + details_table.add_row(f"{key}:", "[dim][/dim]") + has_details = True + + # Add any remaining metadata not in the canonical list + for k, v in self.item_metadata.items(): + k_norm = k.lower() + if k_norm not in [x.lower() for x in order] and v and k_norm not in ["tags", "tag"]: + details_table.add_row(f"{k.capitalize()}:", str(v)) + has_details = True + + # Tags Summary + tags = self.item_metadata.get("Tags") or self.item_metadata.get("tags") or self.item_metadata.get("tag") + if tags and isinstance(tags, (list, str)): + if isinstance(tags, str): + tags = [t.strip() for t in tags.split(",") if t.strip()] + tags_sorted = sorted(map(str, tags)) + tag_cols = Columns([f"[dim]#[/dim]{t}" for t in tags_sorted], equal=True, expand=True) + details_table.add_row("", "") # Spacer + details_table.add_row("Tags:", tag_cols) + has_details = True + + # 2. Get the standard table render + original_title = self.title + original_header_lines = self.header_lines + self.title = "" + self.header_lines = [] + + try: + results_renderable = super().to_rich() + finally: + self.title = original_title + self.header_lines = original_header_lines + + # 3. Assemble components + elements = [] + + if has_details: + elements.append(Panel(details_table, title="Item Details", border_style="blue")) + + # Wrap the results in a titled panel + display_title = "Items" + if original_title: + display_title = original_title + + # Add a bit of padding + results_group = Group(Text(""), results_renderable, Text("")) + + elements.append(Panel(results_group, title=display_title, border_style="green")) + + return Group(*elements) diff --git a/SYS/result_table_new.py b/SYS/result_table_new.py new file mode 100644 index 0000000..cee5a05 --- /dev/null +++ b/SYS/result_table_new.py @@ -0,0 +1,1831 @@ +"""Unified result table formatter for CLI display. + +Provides a structured way to convert search results, metadata, and pipeline objects +into formatted tables suitable for display in the REPL and CLI output. + +Features: +- Format results as aligned tables with row numbers +- Support multiple selection formats (single, ranges, lists, combined) +- Interactive selection with user input +- Input options for cmdlet arguments (location, source selection, etc) +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Callable, Set +from pathlib import Path +import json +import shutil + +from rich.box import SIMPLE +from rich.console import Group +from rich.panel import Panel +from rich.prompt import Prompt +from rich.table import Table as RichTable +from rich.text import Text + +# Optional Textual imports - graceful fallback if not available +try: + from textual.widgets import Tree + + TEXTUAL_AVAILABLE = True +except ImportError: + TEXTUAL_AVAILABLE = False + + +def _sanitize_cell_text(value: Any) -> str: + """Coerce to a single-line, tab-free string suitable for terminal display.""" + if value is None: + return "" + text = str(value) + if not text: + return "" + return text.replace("\r\n", + " ").replace("\n", + " ").replace("\r", + " ").replace("\t", + " ") + + +def _format_duration_hms(duration: Any) -> str: + """Format a duration in seconds into a compact h/m/s string. + + Examples: + 3150 -> "52m30s" + 59 -> "59s" + 3600 -> "1h0m0s" + + If the value is not numeric, returns an empty string. + """ + if duration is None: + return "" + try: + if isinstance(duration, str): + s = duration.strip() + if not s: + return "" + # If it's already formatted (contains letters/colon), leave it to caller. + if any(ch.isalpha() for ch in s) or ":" in s: + return "" + seconds = float(s) + else: + seconds = float(duration) + except Exception: + return "" + + if seconds < 0: + return "" + + total_seconds = int(seconds) + minutes, secs = divmod(total_seconds, 60) + hours, minutes = divmod(minutes, 60) + + parts: List[str] = [] + if hours > 0: + parts.append(f"{hours}h") + if minutes > 0 or hours > 0: + parts.append(f"{minutes}m") + parts.append(f"{secs}s") + return "".join(parts) + + +@dataclass(frozen=True) +class TableColumn: + """Reusable column specification. + + This is intentionally separate from `ResultColumn`: + - `ResultColumn` is a rendered (name,value) pair attached to a single row. + - `TableColumn` is a reusable extractor/formatter used to build rows consistently + across cmdlets and stores. + """ + + key: str + header: str + extractor: Callable[[Any], Any] + + def extract(self, item: Any) -> Any: + try: + return self.extractor(item) + except Exception: + return None + + +def _get_first_dict_value(data: Dict[str, Any], keys: List[str]) -> Any: + for k in keys: + if k in data: + v = data.get(k) + if v is not None and str(v).strip() != "": + return v + return None + + +def _as_dict(item: Any) -> Optional[Dict[str, Any]]: + if isinstance(item, dict): + return item + try: + if hasattr(item, "__dict__"): + return dict(getattr(item, "__dict__")) + except Exception: + return None + return None + + +def extract_store_value(item: Any) -> str: + data = _as_dict(item) or {} + store = _get_first_dict_value( + data, + ["store", + "table", + "source", + "storage"] + ) # storage is legacy + return str(store or "").strip() + + +def extract_hash_value(item: Any) -> str: + data = _as_dict(item) or {} + hv = _get_first_dict_value(data, ["hash", "hash_hex", "file_hash", "sha256"]) + return str(hv or "").strip() + + +def extract_title_value(item: Any) -> str: + data = _as_dict(item) or {} + title = _get_first_dict_value(data, ["title", "name", "filename"]) + if not title: + title = _get_first_dict_value( + data, + ["target", + "path", + "url"] + ) # last resort display + return str(title or "").strip() + + +def extract_ext_value(item: Any) -> str: + data = _as_dict(item) or {} + + meta = data.get("metadata") if isinstance(data.get("metadata"), + dict) else {} + raw_path = data.get("path") or data.get("target") or data.get( + "filename" + ) or data.get("title") + + ext = _get_first_dict_value(data, + ["ext", + "file_ext", + "extension"]) or _get_first_dict_value( + meta, + ["ext", + "file_ext", + "extension"] + ) + + if (not ext) and raw_path: + try: + suf = Path(str(raw_path)).suffix + if suf: + ext = suf.lstrip(".") + except Exception: + ext = "" + + ext_str = str(ext or "").strip().lstrip(".") + for idx, ch in enumerate(ext_str): + if not ch.isalnum(): + ext_str = ext_str[:idx] + break + return ext_str[:5] + + +def extract_size_bytes_value(item: Any) -> Optional[int]: + data = _as_dict(item) or {} + meta = data.get("metadata") if isinstance(data.get("metadata"), + dict) else {} + + size_val = _get_first_dict_value( + data, + ["size_bytes", + "size", + "file_size", + "bytes", + "filesize"] + ) or _get_first_dict_value( + meta, + ["size_bytes", + "size", + "file_size", + "bytes", + "filesize"] + ) + if size_val is None: + return None + try: + s = str(size_val).strip() + if not s: + return None + # Some sources might provide floats or numeric strings + return int(float(s)) + except Exception: + return None + + +COMMON_COLUMNS: Dict[str, + TableColumn] = { + "title": TableColumn("title", + "Title", + extract_title_value), + "store": TableColumn("store", + "Store", + extract_store_value), + "hash": TableColumn("hash", + "Hash", + extract_hash_value), + "ext": TableColumn("ext", + "Ext", + extract_ext_value), + "size": TableColumn("size", + "Size", + extract_size_bytes_value), + } + + +def build_display_row(item: Any, *, keys: List[str]) -> Dict[str, Any]: + """Build a dict suitable for `ResultTable.add_result()` using shared column specs.""" + out: Dict[str, + Any] = {} + for k in keys: + spec = COMMON_COLUMNS.get(k) + if spec is None: + continue + val = spec.extract(item) + out[spec.key] = val + return out + + +@dataclass +class InputOption: + """Represents an interactive input option (cmdlet argument) in a table. + + Allows users to select options that translate to cmdlet arguments, + enabling interactive configuration right from the result table. + + Example: + # Create an option for location selection + location_opt = InputOption( + "location", + type="enum", + choices=["local", "hydrus", "0x0"], + description="Download destination" + ) + + # Use in result table + table.add_input_option(location_opt) + selected = table.select_option("location") # Returns user choice + """ + + name: str + """Option name (maps to cmdlet argument)""" + type: str = "string" + """Option type: 'string', 'enum', 'flag', 'integer'""" + choices: List[str] = field(default_factory=list) + """Valid choices for enum type""" + default: Optional[str] = None + """Default value if not specified""" + description: str = "" + """Description of what this option does""" + validator: Optional[Callable[[str], bool]] = None + """Optional validator function: takes value, returns True if valid""" + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return { + "name": self.name, + "type": self.type, + "choices": self.choices if self.choices else None, + "default": self.default, + "description": self.description, + } + + +@dataclass +class TUIResultCard: + """Represents a result as a UI card with title, metadata, and actions. + + Used in hub-ui and TUI contexts to render individual search results + as grouped components with visual structure. + """ + + title: str + subtitle: Optional[str] = None + metadata: Optional[Dict[str, str]] = None + media_kind: Optional[str] = None + tag: Optional[List[str]] = None + file_hash: Optional[str] = None + file_size: Optional[str] = None + duration: Optional[str] = None + + def __post_init__(self): + """Initialize default values.""" + if self.metadata is None: + self.metadata = {} + if self.tag is None: + self.tag = [] + + +@dataclass +class ResultColumn: + """Represents a single column in a result table.""" + + name: str + value: str + width: Optional[int] = None + + def __str__(self) -> str: + """String representation of the column.""" + return f"{self.name}: {self.value}" + + def to_dict(self) -> Dict[str, str]: + """Convert to dictionary.""" + return { + "name": self.name, + "value": self.value + } + + +@dataclass +class ResultRow: + """Represents a single row in a result table.""" + + columns: List[ResultColumn] = field(default_factory=list) + selection_args: Optional[List[str]] = None + """Arguments to use for this row when selected via @N syntax (e.g., ['-item', '3'])""" + selection_action: Optional[List[str]] = None + """Full expanded stage tokens that should run when this row is selected.""" + source_index: Optional[int] = None + """Original insertion order index (used to map sorted views back to source items).""" + payload: Optional[Any] = None + """Original object that contributed to this row.""" + + def add_column(self, name: str, value: Any) -> None: + """Add a column to this row.""" + # Normalize column header names. + normalized_name = str(name or "").strip() + if normalized_name.lower() == "name": + normalized_name = "Title" + + str_value = _sanitize_cell_text(value) + + # Normalize extension columns globally and cap to 5 characters + if normalized_name.lower() == "ext": + str_value = str_value.strip().lstrip(".") + for idx, ch in enumerate(str_value): + if not ch.isalnum(): + str_value = str_value[:idx] + break + str_value = str_value[:5] + + # Normalize Duration columns: providers often pass raw seconds. + if normalized_name.lower() == "duration": + formatted = _format_duration_hms(value) + if formatted: + str_value = formatted + + self.columns.append(ResultColumn(normalized_name, str_value)) + + def get_column(self, name: str) -> Optional[str]: + """Get column value by name.""" + for col in self.columns: + if col.name.lower() == name.lower(): + return col.value + return None + + def to_dict(self) -> List[Dict[str, str]]: + """Convert to list of column dicts.""" + return [col.to_dict() for col in self.columns] + + def to_list(self) -> List[tuple[str, str]]: + """Convert to list of (name, value) tuples.""" + return [(col.name, col.value) for col in self.columns] + + def __str__(self) -> str: + """String representation of the row.""" + return " | ".join(str(col) for col in self.columns) + + +class ResultTable: + """Unified table formatter for search results, metadata, and pipeline objects. + + Provides a structured way to display results in the CLI with consistent formatting. + Handles conversion from various result types (SearchResult, PipeObject, dicts) into + a formatted table with rows and columns. + + Example: + >>> result_table = ResultTable("Search Results") + >>> row = result_table.add_row() + >>> row.add_column("File", "document.pdf") + >>> row.add_column("Size", "2.5 MB") + >>> row.add_column("Tag", "pdf, document") + >>> print(result_table) + """ + + def __init__( + self, + title: str = "", + title_width: int = 80, + max_columns: Optional[int] = None, + preserve_order: bool = False, + ): + """Initialize a result table. + + Args: + title: Optional title for the table + title_width: Width for formatting the title line + max_columns: Maximum number of columns to display (None for unlimited, default: 5 for search results) + preserve_order: When True, skip automatic sorting so row order matches source + """ + self.title = title + try: + from SYS import pipeline as ctx + + cmdlet_name = "" + try: + cmdlet_name = ( + ctx.get_current_cmdlet_name("") + if hasattr(ctx, + "get_current_cmdlet_name") else "" + ) + except Exception: + cmdlet_name = "" + + stage_text = "" + try: + stage_text = ( + ctx.get_current_stage_text("") + if hasattr(ctx, + "get_current_stage_text") else "" + ) + except Exception: + stage_text = "" + + if cmdlet_name and stage_text: + normalized_cmd = str(cmdlet_name).replace("_", "-").strip().lower() + normalized_title = str(self.title or "").strip().lower() + normalized_stage = str(stage_text).strip() + if normalized_stage and normalized_stage.lower().startswith( + normalized_cmd): + if (not normalized_title) or normalized_title.replace( + "_", + "-").startswith(normalized_cmd): + self.title = normalized_stage + except Exception: + pass + self.title_width = title_width + self.max_columns = ( + max_columns if max_columns is not None else 5 + ) # Default 5 for cleaner display + self.rows: List[ResultRow] = [] + self.column_widths: Dict[str, + int] = {} + self.input_options: Dict[str, + InputOption] = {} + """Options available for user input (cmdlet arguments)""" + self.source_command: Optional[str] = None + """Command that generated this table (e.g., 'download-file URL')""" + self.source_args: List[str] = [] + """Base arguments for the source command""" + self.header_lines: List[str] = [] + """Optional metadata lines rendered under the title""" + self.preserve_order: bool = preserve_order + """If True, skip automatic sorting so display order matches input order.""" + self.no_choice: bool = False + """When True, suppress row numbers/selection to make the table non-interactive.""" + self.table: Optional[str] = None + """Table type (e.g., 'youtube', 'soulseek') for context-aware selection logic.""" + + self.table_metadata: Dict[str, Any] = {} + """Optional provider/table metadata (e.g., provider name, view).""" + + self.value_case: str = "lower" + """Display-only value casing: 'lower' (default), 'upper', or 'preserve'.""" + + def set_value_case(self, value_case: str) -> "ResultTable": + """Configure display-only casing for rendered cell values.""" + case = str(value_case or "").strip().lower() + if case not in {"lower", + "upper", + "preserve"}: + case = "lower" + self.value_case = case + return self + + def _apply_value_case(self, text: str) -> str: + if not text: + return "" + if self.value_case == "upper": + return text.upper() + if self.value_case == "preserve": + return text + return text.lower() + + def set_table(self, table: str) -> "ResultTable": + """Set the table type for context-aware selection logic.""" + self.table = table + return self + + def set_table_metadata(self, metadata: Optional[Dict[str, Any]]) -> "ResultTable": + """Attach provider/table metadata for downstream selection logic.""" + self.table_metadata = dict(metadata or {}) + return self + + def get_table_metadata(self) -> Dict[str, Any]: + """Return attached provider/table metadata (copy to avoid mutation).""" + try: + return dict(self.table_metadata) + except Exception: + return {} + + def set_no_choice(self, no_choice: bool = True) -> "ResultTable": + """Mark the table as non-interactive (no row numbers, no selection parsing).""" + self.no_choice = bool(no_choice) + return self + + def set_preserve_order(self, preserve: bool = True) -> "ResultTable": + """Configure whether this table should skip automatic sorting.""" + self.preserve_order = bool(preserve) + return self + + def add_row(self) -> ResultRow: + """Add a new row to the table and return it for configuration.""" + row = ResultRow() + row.source_index = len(self.rows) + self.rows.append(row) + return row + + def set_source_command( + self, + command: str, + args: Optional[List[str]] = None + ) -> "ResultTable": + """Set the source command that generated this table. + + This is used for @N expansion: when user runs @2 | next-cmd, it will expand to: + source_command + source_args + row_selection_args | next-cmd + + Args: + command: Command name (e.g., 'download-file') + args: Base arguments for the command (e.g., ['URL']) + + Returns: + Self for chaining + """ + self.source_command = command + self.source_args = args or [] + return self + + def init_command( + self, + title: str, + command: str, + args: Optional[List[str]] = None, + preserve_order: bool = False, + ) -> "ResultTable": + """Initialize table with title, command, args, and preserve_order in one call. + + Consolidates common initialization pattern: ResultTable(title) + set_source_command(cmd, args) + set_preserve_order(preserve_order) + + Args: + title: Table title + command: Source command name + args: Command arguments + preserve_order: Whether to preserve input row order + + Returns: + self for method chaining + """ + self.title = title + self.source_command = command + self.source_args = args or [] + self.preserve_order = preserve_order + return self + + def copy_with_title(self, new_title: str) -> "ResultTable": + """Create a new table copying settings from this one but with a new title. + + Consolidates pattern: new_table = ResultTable(title); new_table.set_source_command(...) + Useful for intermediate processing that needs to preserve source command but update display title. + + Args: + new_title: New title for the copied table + + Returns: + New ResultTable with copied settings and new title + """ + new_table = ResultTable( + title=new_title, + title_width=self.title_width, + max_columns=self.max_columns, + preserve_order=self.preserve_order, + ) + new_table.source_command = self.source_command + new_table.source_args = list(self.source_args) if self.source_args else [] + new_table.input_options = dict(self.input_options) if self.input_options else {} + new_table.no_choice = self.no_choice + new_table.table = self.table + new_table.table_metadata = ( + dict(self.table_metadata) if getattr(self, "table_metadata", None) else {} + ) + new_table.header_lines = list(self.header_lines) if self.header_lines else [] + return new_table + + def set_row_selection_args(self, row_index: int, selection_args: List[str]) -> None: + """Set the selection arguments for a specific row. + + When user selects this row via @N, these arguments will be appended to the + source command to re-execute with that item selected. + + Args: + row_index: Index of the row (0-based) + selection_args: Arguments to use (e.g., ['-item', '3']) + """ + if 0 <= row_index < len(self.rows): + self.rows[row_index].selection_args = selection_args + + def set_row_selection_action(self, row_index: int, selection_action: List[str]) -> None: + """Specify the entire stage tokens to run for this row on @N.""" + if 0 <= row_index < len(self.rows): + self.rows[row_index].selection_action = selection_action + + def set_header_lines(self, lines: List[str]) -> "ResultTable": + """Attach metadata lines that render beneath the title.""" + self.header_lines = [line for line in lines if line] + return self + + def set_header_line(self, line: str) -> "ResultTable": + """Attach a single metadata line beneath the title.""" + return self.set_header_lines([line] if line else []) + + def set_storage_summary( + self, + storage_counts: Dict[str, + int], + filter_text: Optional[str] = None, + inline: bool = False, + ) -> str: + """Render a storage count summary (e.g., "Hydrus:0 Local:1 | filter: \"q\""). + + Returns the summary string so callers can place it inline with the title if desired. + """ + summary_parts: List[str] = [] + + if storage_counts: + summary_parts.append( + " ".join(f"{name}:{count}" for name, count in storage_counts.items()) + ) + + if filter_text: + safe_filter = filter_text.replace('"', '\\"') + summary_parts.append(f'filter: "{safe_filter}"') + + summary = " | ".join(summary_parts) + if not inline: + self.set_header_line(summary) + return summary + + def sort_by_title(self) -> "ResultTable": + """Sort rows alphabetically by Title or Name column. + + Looks for columns named 'Title', 'Name', or 'Tag' (in that order). + Case-insensitive sort. Returns self for chaining. + + NOTE: This only affects display order. Each row keeps its original + `source_index` (insertion order) for callers that need stable mapping. + """ + if getattr(self, "preserve_order", False): + return self + # Find the title column (try Title, Name, Tag in order) + title_col_idx = None + for row in self.rows: + if not row.columns: + continue + for idx, col in enumerate(row.columns): + col_lower = col.name.lower() + if col_lower in ("title", "name", "tag"): + title_col_idx = idx + break + if title_col_idx is not None: + break + + if title_col_idx is None: + # No title column found, return unchanged + return self + + # Sort rows by the title column value (case-insensitive) + self.rows.sort( + key=lambda row: ( + row.columns[title_col_idx].value.lower() + if title_col_idx < len(row.columns) else "" + ) + ) + + return self + + def add_result(self, result: Any) -> "ResultTable": + """Add a result object (SearchResult, PipeObject, ResultItem, TagItem, or dict) as a row. + + Args: + result: Result object to add + + Returns: + Self for chaining + """ + row = self.add_row() + row.payload = result + + # Handle TagItem from get_tag.py (tag display with index) + if hasattr(result, "__class__") and result.__class__.__name__ == "TagItem": + self._add_tag_item(row, result) + # Handle ResultItem from search_file.py (compact display) + elif hasattr(result, "__class__") and result.__class__.__name__ == "ResultItem": + self._add_result_item(row, result) + # Handle SearchResult from search_file.py + elif hasattr(result, + "__class__") and result.__class__.__name__ == "SearchResult": + self._add_search_result(row, result) + # Handle PipeObject from models.py + elif hasattr(result, "__class__") and result.__class__.__name__ == "PipeObject": + self._add_pipe_object(row, result) + # Handle dict + elif isinstance(result, dict): + self._add_dict(row, result) + # Handle generic objects with __dict__ + elif hasattr(result, "__dict__"): + self._add_generic_object(row, result) + # Handle strings (simple text result) + elif isinstance(result, str): + row.add_column("Result", result) + + return self + + def get_row_payload(self, row_index: int) -> Optional[Any]: + """Return the original payload for the row at ``row_index`` if available.""" + if 0 <= row_index < len(self.rows): + return getattr(self.rows[row_index], "payload", None) + return None + + def get_payloads(self) -> List[Any]: + """Return the payloads for every row, preserving table order.""" + payloads: List[Any] = [] + for row in self.rows: + payload = getattr(row, "payload", None) + if payload is not None: + payloads.append(payload) + return payloads + + def _add_search_result(self, row: ResultRow, result: Any) -> None: + """Extract and add SearchResult fields to row.""" + cols = getattr(result, "columns", None) + used_explicit_columns = False + if cols: + used_explicit_columns = True + for name, value in cols: + row.add_column(name, value) + else: + # Core fields (legacy fallback) + title = getattr(result, "title", "") + table = str(getattr(result, "table", "") or "").lower() + + # Handle extension separation for local files + extension = "" + if title and table == "local": + path_obj = Path(title) + if path_obj.suffix: + extension = path_obj.suffix.lstrip(".") + title = path_obj.stem + + if title: + row.add_column("Title", title) + + # Extension column + row.add_column("Ext", extension) + + if hasattr(result, "table") and getattr(result, "table", None): + row.add_column("Source", str(getattr(result, "table"))) + + if hasattr(result, "detail") and result.detail: + row.add_column("Detail", result.detail) + + if hasattr(result, "media_kind") and result.media_kind: + row.add_column("Type", result.media_kind) + + # Tag summary + if hasattr(result, "tag_summary") and result.tag_summary: + row.add_column("Tag", str(result.tag_summary)) + + # Duration (for media) + if hasattr(result, "duration_seconds") and result.duration_seconds: + dur = _format_duration_hms(result.duration_seconds) + row.add_column("Duration", dur or str(result.duration_seconds)) + + # Size (for files) + if hasattr(result, "size_bytes") and result.size_bytes: + row.add_column("Size", _format_size(result.size_bytes, integer_only=False)) + + # Annotations + if hasattr(result, "annotations") and result.annotations: + row.add_column("Annotations", ", ".join(str(a) for a in result.annotations)) + + try: + md = getattr(result, "full_metadata", None) + md_dict = dict(md) if isinstance(md, dict) else {} + except Exception: + md_dict = {} + + try: + selection_args = getattr(result, "selection_args", None) + except Exception: + selection_args = None + if selection_args is None: + selection_args = md_dict.get("_selection_args") or md_dict.get("selection_args") + if selection_args: + row.selection_args = [str(a) for a in selection_args if a is not None] + + try: + selection_action = getattr(result, "selection_action", None) + except Exception: + selection_action = None + if selection_action is None: + selection_action = md_dict.get("_selection_action") or md_dict.get("selection_action") + if selection_action: + row.selection_action = [str(a) for a in selection_action if a is not None] + + def _add_result_item(self, row: ResultRow, item: Any) -> None: + """Extract and add ResultItem fields to row (compact display for search results). + + Shows only essential columns: + - Title (required) + - Ext (extension) + - Storage (source backend) + - Size (formatted MB, integer only) + + All other fields are stored in item but not displayed to keep table compact. + Use @row# syntax to pipe full item data to next command. + """ + # Title (required) + title = getattr(item, "title", None) or "Unknown" + table = str(getattr(item, + "table", + "") or getattr(item, + "store", + "") or "").lower() + + # Handle extension separation for local files + extension = "" + if title and table == "local": + # Try to split extension + path_obj = Path(title) + if path_obj.suffix: + extension = path_obj.suffix.lstrip(".") + title = path_obj.stem + + if title: + row.add_column("Title", title) + + # Extension column - always add to maintain column order + row.add_column("Ext", extension) + + # Storage (source backend - hydrus, local, debrid, etc) + if getattr(item, "table", None): + row.add_column("Storage", str(getattr(item, "table"))) + elif getattr(item, "store", None): + row.add_column("Storage", str(getattr(item, "store"))) + + # Size (for files) + if hasattr(item, "size_bytes") and item.size_bytes: + row.add_column("Size", _format_size(item.size_bytes, integer_only=False)) + + def _add_tag_item(self, row: ResultRow, item: Any) -> None: + """Extract and add TagItem fields to row (compact tag display). + + Shows the Tag column with the tag name and Source column to identify + which storage backend the tag values come from (Hydrus, local, etc.). + All data preserved in TagItem for piping and operations. + Tag row selection is handled by the CLI pipeline (e.g. `@N | ...`). + """ + # Tag name + if hasattr(item, "tag_name") and item.tag_name: + row.add_column("Tag", item.tag_name) + + # Source/Store (where the tag values come from) + if hasattr(item, "source") and item.source: + row.add_column("Store", item.source) + + def _add_pipe_object(self, row: ResultRow, obj: Any) -> None: + """Extract and add PipeObject fields to row.""" + # Source and identifier + if hasattr(obj, "source") and obj.source: + row.add_column("Source", obj.source) + + # Title + if hasattr(obj, "title") and obj.title: + row.add_column("Title", obj.title) + + # File info + if hasattr(obj, "path") and obj.path: + row.add_column("Path", str(obj.path)) + + # Tag + if hasattr(obj, "tag") and obj.tag: + tag_str = ", ".join(obj.tag[:3]) # First 3 tag values + if len(obj.tag) > 3: + tag_str += f", +{len(obj.tag) - 3} more" + row.add_column("Tag", tag_str) + + # Duration + if hasattr(obj, "duration") and obj.duration: + dur = _format_duration_hms(obj.duration) + row.add_column("Duration", dur or str(obj.duration)) + + # Warnings + if hasattr(obj, "warnings") and obj.warnings: + warnings_str = "; ".join(obj.warnings[:2]) + if len(obj.warnings) > 2: + warnings_str += f" (+{len(obj.warnings) - 2} more)" + row.add_column("Warnings", warnings_str) + + def _add_dict(self, row: ResultRow, data: Dict[str, Any]) -> None: + """Extract and add dict fields to row using first-match priority groups. + + Respects max_columns limit to keep table compact and readable. + + Special handling for 'columns' field: if present, uses it to populate row columns + instead of treating it as a regular field. This allows dynamic column definitions + from search providers. + + Priority field groups (first match per group): + - title | name | filename + - store | table | source + - size | size_bytes + - ext + """ + + # Helper to determine if a field should be hidden from display + def is_hidden_field(field_name: Any) -> bool: + # Hide internal/metadata fields + hidden_fields = { + "__", + "id", + "action", + "parent_id", + "is_temp", + "path", + "extra", + "target", + "hash", + "hash_hex", + "file_hash", + "tag", + "tag_summary", + } + if isinstance(field_name, str): + if field_name.startswith("__"): + return True + if field_name in hidden_fields: + return True + return False + + # Strip out hidden metadata fields (prefixed with __) + visible_data = { + k: v + for k, v in data.items() if not is_hidden_field(k) + } + + # Normalize common fields using shared extractors so nested metadata/path values work. + # This keeps Ext/Size/Store consistent across all dict-based result sources. + try: + store_extracted = extract_store_value(data) + if (store_extracted and "store" not in visible_data + and "table" not in visible_data and "source" not in visible_data): + visible_data["store"] = store_extracted + except Exception: + pass + + try: + ext_extracted = extract_ext_value(data) + # Always ensure `ext` exists so priority_groups keeps a stable column. + visible_data["ext"] = str(ext_extracted or "") + except Exception: + visible_data.setdefault("ext", "") + + try: + size_extracted = extract_size_bytes_value(data) + if (size_extracted is not None and "size_bytes" not in visible_data + and "size" not in visible_data): + visible_data["size_bytes"] = size_extracted + except Exception: + pass + + # Handle extension separation for local files + store_val = str( + visible_data.get("store", + "") or visible_data.get("table", + "") + or visible_data.get("source", + "") + ).lower() + + # Debug logging + # print(f"DEBUG: Processing dict result. Store: {store_val}, Keys: {list(visible_data.keys())}") + + if store_val == "local": + # Find title field + title_field = next( + (f for f in ["title", "name", "filename"] if f in visible_data), + None + ) + if title_field: + title_val = str(visible_data[title_field]) + path_obj = Path(title_val) + if path_obj.suffix: + extension = path_obj.suffix.lstrip(".") + visible_data[title_field] = path_obj.stem + visible_data["ext"] = extension + # print(f"DEBUG: Split extension. Title: {visible_data[title_field]}, Ext: {extension}") + else: + visible_data["ext"] = "" + + # Ensure 'ext' is present so it gets picked up by priority_groups in correct order + if "ext" not in visible_data: + visible_data["ext"] = "" + + # Track which fields we've already added to avoid duplicates + added_fields = set() + column_count = 0 # Track total columns added + + # Helper function to format values + def format_value(value: Any) -> str: + if isinstance(value, list): + formatted = ", ".join(str(v) for v in value[:3]) + if len(value) > 3: + formatted += f", +{len(value) - 3} more" + return formatted + return str(value) + + # Special handling for 'columns' field from search providers + # If present, use it to populate row columns dynamically + if ("columns" in visible_data and isinstance(visible_data["columns"], + list) and visible_data["columns"]): + try: + for col_name, col_value in visible_data["columns"]: + # Skip the "#" column as ResultTable already adds row numbers + if col_name == "#": + continue + if column_count >= self.max_columns: + break + # When providers supply raw numeric fields, keep formatting consistent. + if isinstance(col_name, str) and col_name.strip().lower() == "size": + try: + if col_value is None or str(col_value).strip() == "": + col_value_str = "" + else: + col_value_str = _format_size( + col_value, + integer_only=False + ) + except Exception: + col_value_str = format_value(col_value) + elif isinstance(col_name, + str) and col_name.strip().lower() == "duration": + try: + if col_value is None or str(col_value).strip() == "": + col_value_str = "" + else: + dur = _format_duration_hms(col_value) + col_value_str = dur or format_value(col_value) + except Exception: + col_value_str = format_value(col_value) + else: + col_value_str = format_value(col_value) + row.add_column(col_name, col_value_str) + added_fields.add(col_name.lower()) + column_count += 1 + # Mark 'columns' as handled so we don't add it as a field + added_fields.add("columns") + # Also mark common fields that shouldn't be re-displayed if they're in columns + # This prevents showing both "Store" (from columns) and "Store" (from data fields) + added_fields.add("table") + added_fields.add("source") + added_fields.add("target") + added_fields.add("path") + added_fields.add("media_kind") + added_fields.add("detail") + added_fields.add("annotations") + added_fields.add( + "full_metadata" + ) # Don't display full metadata as column + except Exception: + # Fall back to regular field handling if columns format is unexpected + pass + + # Only add priority groups if we haven't already filled columns from 'columns' field + if column_count == 0: + # Explicitly set which columns to display in order + priority_groups = [ + ("title", + ["title", + "name", + "filename"]), + ("store", + ["store", + "table", + "source"]), + ("size", + ["size", + "size_bytes"]), + ("ext", + ["ext"]), + ] + + # Add priority field groups first - use first match in each group + for _group_label, field_options in priority_groups: + if column_count >= self.max_columns: + break + for field in field_options: + if field in visible_data and field not in added_fields: + # Special handling for size fields - format with unit and decimals + if field in ["size", "size_bytes"]: + value_str = _format_size( + visible_data[field], + integer_only=False + ) + else: + value_str = format_value(visible_data[field]) + + # Map field names to display column names + if field in ["store", "table", "source"]: + col_name = "Store" + elif field in ["size", "size_bytes"]: + col_name = "Size" + elif field in ["title", "name", "filename"]: + col_name = "Title" + else: + col_name = field.replace("_", " ").title() + + row.add_column(col_name, value_str) + added_fields.add(field) + column_count += 1 + break # Use first match in this group, skip rest + + # Add remaining fields only if we haven't hit max_columns (and no explicit columns were set) + # Don't add any remaining fields - only use priority_groups for dict results + + # Check for selection args + if "_selection_args" in data: + row.selection_args = data["_selection_args"] + # Don't display it + added_fields.add("_selection_args") + + def _add_generic_object(self, row: ResultRow, obj: Any) -> None: + """Extract and add fields from generic objects.""" + if hasattr(obj, "__dict__"): + for key, value in obj.__dict__.items(): + if key.startswith("_"): # Skip private attributes + continue + + row.add_column(key.replace("_", " ").title(), str(value)) + + def to_rich(self): + """Return a Rich renderable representing this table.""" + if not self.rows: + empty = Text("No results") + return Panel(empty, title=self.title) if self.title else empty + + col_names: List[str] = [] + seen: Set[str] = set() + for row in self.rows: + for col in row.columns: + if col.name not in seen: + seen.add(col.name) + col_names.append(col.name) + + table = RichTable( + show_header=True, + header_style="bold", + box=SIMPLE, + expand=True, + show_lines=False, + ) + + if not self.no_choice: + table.add_column("#", justify="right", no_wrap=True) + + # Render headers in uppercase, but keep original column keys for lookup. + header_by_key: Dict[str, + str] = { + name: str(name).upper() + for name in col_names + } + + for name in col_names: + header = header_by_key.get(name, str(name).upper()) + if name.lower() == "ext": + table.add_column(header, no_wrap=True) + else: + table.add_column(header) + + for row_idx, row in enumerate(self.rows, 1): + cells: List[str] = [] + if not self.no_choice: + cells.append(str(row_idx)) + for name in col_names: + val = row.get_column(name) or "" + cells.append(self._apply_value_case(_sanitize_cell_text(val))) + table.add_row(*cells) + + if self.title or self.header_lines: + header_bits = [Text(line) for line in (self.header_lines or [])] + renderable = Group(*header_bits, table) if header_bits else table + return Panel(renderable, title=self.title) if self.title else renderable + + return table + + def format_compact(self) -> str: + """Format table in compact form (one line per row). + + Returns: + Formatted table string + """ + lines = [] + + if self.title: + lines.append(f"\n{self.title}") + lines.append("-" * len(self.title)) + + for i, row in enumerate(self.rows, 1): + row_str = " | ".join(str(col) for col in row.columns) + lines.append(f"{i}. {row_str}") + + return "\n".join(lines) + + def format_json(self) -> str: + """Format table as JSON. + + Returns: + JSON string + """ + data = { + "title": self.title, + "row_count": len(self.rows), + "rows": [row.to_list() for row in self.rows], + } + return json.dumps(data, indent=2) + + def to_dict(self) -> Dict[str, Any]: + """Convert table to dictionary. + + Returns: + Dictionary representation + """ + return { + "title": self.title, + "rows": [row.to_list() for row in self.rows] + } + + def __str__(self) -> str: + """String representation. + + Rich is the primary rendering path. This keeps accidental `print(table)` + usage from emitting ASCII box-drawn tables. + """ + label = self.title or "ResultTable" + return f"{label} ({len(self.rows)} rows)" + + def __rich__(self): + return self.to_rich() + + def __repr__(self) -> str: + """Developer representation.""" + return f"ResultTable(title={self.title!r}, rows={len(self.rows)})" + + def __len__(self) -> int: + """Number of rows in the table.""" + return len(self.rows) + + def __iter__(self): + """Iterate over rows.""" + return iter(self.rows) + + def __getitem__(self, index: int) -> ResultRow: + """Get row by index.""" + return self.rows[index] + + def select_interactive( + self, + prompt: str = "Select an item", + accept_args: bool = False + ) -> Optional[List[int]] | dict: + """Display table and get interactive user selection (single or multiple). + + Supports multiple input formats: + - Single: "5" or "q" to quit + - Range: "3-5" (selects items 3, 4, 5) + - Multiple: "3,5,13" (selects items 3, 5, and 13) + - Combined: "1-3,7,9-11" (selects 1,2,3,7,9,10,11) + + If accept_args=True, also supports cmdlet arguments: + - "5 -storage hydrus" → returns indices [4] + args {"-storage": "hydrus"} + - "2-4 -storage hydrus -tag important" → returns indices [1,2,3] + multiple args + + Args: + prompt: Custom prompt text + accept_args: If True, parse and return cmdlet arguments from input + + Returns: + If accept_args=False: List of 0-based indices, or None if cancelled + If accept_args=True: Dict with "indices" and "args" keys, or None if cancelled + """ + if self.no_choice: + from SYS.rich_display import stdout_console + + stdout_console().print(self) + stdout_console().print(Panel(Text("Selection is disabled for this table."))) + return None + + # Display the table + from SYS.rich_display import stdout_console + + stdout_console().print(self) + + # Get user input + while True: + try: + if accept_args: + choice = Prompt.ask( + f"{prompt} (e.g., '5' or '2 -storage hydrus' or 'q' to quit)" + ).strip() + else: + choice = Prompt.ask( + f"{prompt} (e.g., '5' or '3-5' or '1,3,5' or 'q' to quit)" + ).strip() + + if choice.lower() == "q": + return None + + if accept_args: + # Parse selection and arguments + result = self._parse_selection_with_args(choice) + if result is not None: + return result + stdout_console().print( + Panel( + Text( + "Invalid format. Use: selection (5 or 3-5 or 1,3,5) optionally followed by flags (e.g., '5 -storage hydrus')." + ) + ) + ) + else: + # Parse just the selection + selected_indices = self._parse_selection(choice) + if selected_indices is not None: + return selected_indices + stdout_console().print( + Panel( + Text( + "Invalid format. Use: single (5), range (3-5), list (1,3,5), combined (1-3,7,9-11), or 'q' to quit." + ) + ) + ) + except (ValueError, EOFError): + if accept_args: + stdout_console().print( + Panel( + Text( + "Invalid format. Use: selection (5 or 3-5 or 1,3,5) optionally followed by flags (e.g., '5 -storage hydrus')." + ) + ) + ) + else: + stdout_console().print( + Panel( + Text( + "Invalid format. Use: single (5), range (3-5), list (1,3,5), combined (1-3,7,9-11), or 'q' to quit." + ) + ) + ) + + def _parse_selection(self, selection_str: str) -> Optional[List[int]]: + """Parse user selection string into list of 0-based indices. + + Supports: + - Single: "5" → [4] + - Range: "3-5" → [2, 3, 4] + - Multiple: "3,5,13" → [2, 4, 12] + - Combined: "1-3,7,9-11" → [0, 1, 2, 6, 8, 9, 10] + + Args: + selection_str: User input string + + Returns: + List of 0-based indices, or None if invalid + """ + if self.no_choice: + return None + + indices = set() + + # Split by comma for multiple selections + parts = selection_str.split(",") + + for part in parts: + part = part.strip() + if not part: + continue + + # Check if it's a range (contains dash) + if "-" in part: + # Handle ranges like "3-5" + try: + range_parts = part.split("-") + if len(range_parts) != 2: + return None + + start = int(range_parts[0].strip()) + end = int(range_parts[1].strip()) + + # Validate range + if start < 1 or end < 1 or start > len(self.rows) or end > len( + self.rows): + return None + + if start > end: + start, end = end, start + + # Add all indices in range (convert to 0-based) + for i in range(start, end + 1): + indices.add(i - 1) + + except (ValueError, IndexError): + return None + else: + # Single number + try: + num = int(part) + if num < 1 or num > len(self.rows): + return None + indices.add(num - 1) # Convert to 0-based + except ValueError: + return None + + if not indices: + return None + + # Return sorted list + return sorted(list(indices)) + + def _parse_selection_with_args(self, input_str: str) -> Optional[dict]: + """Parse user input into selection indices and cmdlet arguments. + + Supports formats like: + - "5" → {"indices": [4], "args": {}} + - "2 -storage hydrus" → {"indices": [1], "args": {"-storage": "hydrus"}} + - "3-5 -storage hydrus -tag important" → {"indices": [2,3,4], "args": {"-storage": "hydrus", "-tag": "important"}} + + Args: + input_str: User input string with selection and optional flags + + Returns: + Dict with "indices" and "args" keys, or None if invalid + """ + parts = input_str.split() + if not parts: + return None + + # First part should be the selection + selection_str = parts[0] + selected_indices = self._parse_selection(selection_str) + + if selected_indices is None: + return None + + # Remaining parts are cmdlet arguments + cmdlet_args = {} + i = 1 + while i < len(parts): + part = parts[i] + + # Check if it's a flag (starts with -) + if part.startswith("-"): + flag = part + value = None + + # Get the value if it exists and doesn't start with - + if i + 1 < len(parts) and not parts[i + 1].startswith("-"): + value = parts[i + 1] + i += 2 + else: + i += 1 + + # Store the flag + if value is not None: + cmdlet_args[flag] = value + else: + cmdlet_args[flag] = True # Flag without value + else: + i += 1 + + return { + "indices": selected_indices, + "args": cmdlet_args + } + + def add_input_option(self, option: InputOption) -> "ResultTable": + """Add an interactive input option to the table. + + Input options allow users to specify cmdlet arguments interactively, + like choosing a download location or source. + + Args: + option: InputOption definition + + Returns: + Self for chaining + """ + self.input_options[option.name] = option + return self + + def select_option(self, option_name: str, prompt: str = "") -> Optional[str]: + """Interactively get user input for a specific option. + + Displays the option choices (if enum) and prompts user for input. + + Args: + option_name: Name of the option to get input for + prompt: Custom prompt text (uses option description if not provided) + + Returns: + User's selected/entered value, or None if cancelled + """ + if option_name not in self.input_options: + print(f"Unknown option: {option_name}") + return None + + option = self.input_options[option_name] + prompt_text = prompt or option.description or option_name + + while True: + try: + # For enum options, show choices + if option.type == "enum" and option.choices: + print(f"\n{prompt_text}") + for i, choice in enumerate(option.choices, 1): + print(f" {i}. {choice}") + + choice_input = input( + f"Select {option_name} (1-{len(option.choices)}, or 'q' to cancel): " + ).strip() + + if choice_input.lower() == "q": + return None + + try: + idx = int(choice_input) - 1 + if 0 <= idx < len(option.choices): + return option.choices[idx] + print(f"Invalid choice. Enter 1-{len(option.choices)}") + except ValueError: + print(f"Invalid choice. Enter 1-{len(option.choices)}") + + # For string/integer options, get direct input + elif option.type in ("string", "integer"): + value = input(f"{prompt_text} (or 'q' to cancel): ").strip() + + if value.lower() == "q": + return None + + # Validate if validator provided + if option.validator and not option.validator(value): + print(f"Invalid value for {option_name}") + continue + + # Type conversion + if option.type == "integer": + try: + int(value) + except ValueError: + print(f"Must be an integer") + continue + + return value + + # For flag options + elif option.type == "flag": + response = input(f"{prompt_text} (y/n): ").strip().lower() + if response == "q": + return None + return "true" if response in ("y", "yes", "true") else "false" + + except (ValueError, EOFError): + return None + + def get_all_options(self) -> Dict[str, str]: + """Get all input options at once with user prompts. + + Interactively prompts user for all registered options. + + Returns: + Dictionary mapping option names to selected values + """ + result = {} + for name, _option in self.input_options.items(): + value = self.select_option(name) + if value is not None: + result[name] = value + return result + + def select_by_index(self, index: int) -> Optional[ResultRow]: + """Get a row by 1-based index (user-friendly). + + Args: + index: 1-based index + + Returns: + ResultRow if valid, None otherwise + """ + idx = index - 1 + if 0 <= idx < len(self.rows): + return self.rows[idx] + return None + + # TUI-specific formatting methods + + def to_datatable_rows(self, source: str = "unknown") -> List[List[str]]: + """Convert results to rows suitable for Textual DataTable widget. + + Args: + source: Source type for formatting context (openlibrary, soulseek, etc.) + + Returns: + List of row value lists + """ + rows = [] + for result in self.rows: + row_values = self._format_datatable_row(result, source) + rows.append(row_values) + return rows + + def _format_datatable_row(self, + row: ResultRow, + source: str = "unknown") -> List[str]: + """Format a ResultRow for DataTable display. + + Args: + row: ResultRow to format + source: Source type + + Returns: + List of column values as strings + """ + # Extract values from row columns + values = [col.value for col in row.columns] + + # Truncate to reasonable lengths for table display + return [v[:60] if len(v) > 60 else v for v in values] + + def to_result_cards(self) -> List[TUIResultCard]: + """Convert all rows to TUIResultCard objects for card-based UI display. + + Returns: + List of TUIResultCard objects + """ + cards = [] + for row in self.rows: + card = self._row_to_card(row) + cards.append(card) + return cards + + def _row_to_card(self, row: ResultRow) -> TUIResultCard: + """Convert a ResultRow to a TUIResultCard. + + Args: + row: ResultRow to convert + + Returns: + TUIResultCard with extracted metadata + """ + # Build metadata dict from row columns + metadata = {} + title = "" + + for col in row.columns: + if col.name.lower() == "title": + title = col.value + metadata[col.name] = col.value + + # Extract tag values if present + tag = [] + if "Tag" in metadata: + tag_val = metadata["Tag"] + if tag_val: + tag = [t.strip() for t in tag_val.split(",")][:5] + + # Try to find useful metadata fields + subtitle = metadata.get("Artist", metadata.get("Author", "")) + media_kind = metadata.get("Type", metadata.get("Media Kind", "")) + file_size = metadata.get("Size", "") + duration = metadata.get("Duration", "") + file_hash = metadata.get("Hash", "") + + return TUIResultCard( + title=title or "Unknown", + subtitle=subtitle, + metadata=metadata, + media_kind=media_kind, + tag=tag, + file_hash=file_hash or None, + file_size=file_size or None, + duration=duration or None, + ) + + def build_metadata_tree(self, tree_widget: "Tree") -> None: + """Populate a Textual Tree widget with result metadata hierarchy. + + Args: + tree_widget: Textual Tree widget to populate + + Raises: + ImportError: If Textual not available + """ + if not TEXTUAL_AVAILABLE: + raise ImportError("Textual not available for tree building") + + tree_widget.reset(self.title or "Results") + root = tree_widget.root + + # Add each row as a top-level node + for i, row in enumerate(self.rows, 1): + row_node = root.add(f"[bold]Result {i}[/bold]") + + # Add columns as children + for col in row.columns: + value_str = col.value + if len(value_str) > 100: + value_str = value_str[:97] + "..." + row_node.add_leaf(f"[cyan]{col.name}[/cyan]: {value_str}") + + +def _format_size(size: Any, integer_only: bool = False) -> str: + """Format file size as human-readable string. + + Args: + size: Size in bytes or already formatted string + integer_only: If True, show MB as an integer (e.g., "250 MB") + + Returns: + Formatted size string with units (e.g., "3.53 MB", "0.57 MB", "1.2 GB") + """ + if isinstance(size, str): + return size if size else "" + + try: + bytes_val = int(size) + if bytes_val < 0: + return "" + + # Keep display consistent with the CLI expectation: show MB with unit + # (including values under 1 MB as fractional MB), and show GB for very + # large sizes. + if bytes_val >= 1024**3: + value = bytes_val / (1024**3) + unit = "GB" + else: + value = bytes_val / (1024**2) + unit = "MB" + + if integer_only: + return f"{int(round(value))} {unit}" + + num = f"{value:.2f}".rstrip("0").rstrip(".") + return f"{num} {unit}" + except (ValueError, TypeError): + return "" + + +def format_result(result: Any, title: str = "") -> str: + """Quick function to format a single result or list of results. + + Args: + result: Result object, list of results, or dict + title: Optional title for the table + + Returns: + Formatted string + """ + table = ResultTable(title) + + if isinstance(result, list): + for item in result: + table.add_result(item) + else: + table.add_result(result) + + return str(table) + diff --git a/SYS/rich_display.py b/SYS/rich_display.py index 88f8030..75299e9 100644 --- a/SYS/rich_display.py +++ b/SYS/rich_display.py @@ -265,6 +265,7 @@ def render_item_details_panel(item: Dict[str, Any]) -> None: """Render a comprehensive details panel for a result item.""" from rich.table import Table from rich.columns import Columns + from rich.panel import Panel title = ( item.get("title") @@ -274,31 +275,35 @@ def render_item_details_panel(item: Dict[str, Any]) -> None: ) # Main layout table for the panel - details_table = Table.grid(expand=True) - details_table.add_column(style="cyan", no_wrap=True, width=15) + details_table = Table.grid(expand=True, padding=(0, 2)) + details_table.add_column(style="cyan", no_wrap=True, width=15, justify="right") details_table.add_column(style="white") - # Basic Info - details_table.add_row("Title", f"[bold]{title}[/bold]") + # Canonical order + details_table.add_row("Title:", f"[bold]{title}[/bold]") - if "store" in item: - details_table.add_row("Store", str(item["store"])) + if "hash" in item or "hash_hex" in item or "file_hash" in item: + h = item.get("hash") or item.get("hash_hex") or item.get("file_hash") + details_table.add_row("Hash:", str(h)) + + if "store" in item or "table" in item: + s = item.get("store") or item.get("table") + details_table.add_row("Store:", str(s)) - if "hash" in item: - details_table.add_row("Hash", str(item["hash"])) - - # Metadata / Path if "path" in item or "target" in item: path = item.get("path") or item.get("target") - details_table.add_row("Path", str(path)) + # Only show if it doesn't look like a URL (which would go in Url row) + if path and not str(path).startswith(("http://", "https://")): + details_table.add_row("Path:", str(path)) if "ext" in item or "extension" in item: ext = item.get("ext") or item.get("extension") - details_table.add_row("Extension", str(ext)) + details_table.add_row("Ext:", str(ext)) if "size_bytes" in item or "size" in item: size = item.get("size_bytes") or item.get("size") - if isinstance(size, (int, float)): + if isinstance(size, (int, float, str)) and str(size).isdigit(): + size = int(size) if size > 1024 * 1024 * 1024: size_str = f"{size / (1024*1024*1024):.1f} GB" elif size > 1024 * 1024: @@ -307,33 +312,40 @@ def render_item_details_panel(item: Dict[str, Any]) -> None: size_str = f"{size / 1024:.1f} KB" else: size_str = f"{size} bytes" - details_table.add_row("Size", size_str) + details_table.add_row("Size:", size_str) # URL(s) urls = item.get("url") or item.get("URL") or [] if isinstance(urls, str): urls = [urls] - if isinstance(urls, list) and urls: - url_text = "\n".join(map(str, urls)) - details_table.add_row("URL(s)", url_text) + valid_urls = [str(u).strip() for u in urls if str(u).strip()] + if valid_urls: + url_text = "\n".join(valid_urls) + details_table.add_row("Url:", url_text) + else: + details_table.add_row("Url:", "[dim][/dim]") # Tags tags = item.get("tag") or item.get("tags") or [] if isinstance(tags, str): - tags = [tags] + tags = [t.strip() for t in tags.split(",") if t.strip()] if isinstance(tags, list) and tags: - # Sort and filter tags to look nice tags_sorted = sorted(map(str, tags)) - # Group tags by namespace if they have them tag_cols = Columns([f"[dim]#[/dim]{t}" for t in tags_sorted], equal=True, expand=True) details_table.add_row("", "") # Spacer - details_table.add_row("Tags", tag_cols) + details_table.add_row("Tags:", tag_cols) # Relationships (if any) rels = item.get("relationships") or item.get("rel") or [] if isinstance(rels, list) and rels: - rel_text = "\n".join([f"[dim]→[/dim] {r}" for r in rels]) - details_table.add_row("Relations", rel_text) + # Check for list of dicts (from get-relationship) or list of strings + if rels and isinstance(rels[0], dict): + rel_text = "\n".join([f"[dim]→[/dim] {r.get('type','rel')}: {r.get('title','?')}" for r in rels]) + else: + rel_text = "\n".join([f"[dim]→[/dim] {r}" for r in rels]) + details_table.add_row("Relations:", rel_text) + else: + details_table.add_row("Relations:", "[dim][/dim]") panel = Panel( details_table, diff --git a/cmdlet/add_tag.py b/cmdlet/add_tag.py index daa955f..369434d 100644 --- a/cmdlet/add_tag.py +++ b/cmdlet/add_tag.py @@ -669,17 +669,34 @@ class Add_Tag(Cmdlet): # treat add-tag as a pipeline mutation (carry tags forward for add-file) instead of a store write. if not store_override: store_name_str = str(store_name) if store_name is not None else "" - local_mode_requested = ( - (not store_name_str) or (store_name_str.upper() == "PATH") - or (store_name_str.lower() == "local") - ) - is_known_backend = bool(store_name_str) and store_registry.is_available( - store_name_str - ) + + is_known_backend = False + try: + is_known_backend = bool(store_name_str) and store_registry.is_available( + store_name_str + ) + except Exception: + pass - if local_mode_requested and raw_path: + # If the item isn't in a configured store backend yet (e.g., store=PATH), + # treat add-tag as a pipeline mutation (carry tags forward for add-file) + # instead of a store write. + if not is_known_backend: try: - if Path(str(raw_path)).expanduser().exists(): + # We allow metadata updates even if file doesn't exist locally, + # but check path existence if valid path provided. + proceed_local = True + if raw_path: + try: + if not Path(str(raw_path)).expanduser().exists(): + # If path is provided but missing, we might prefer skipping? + # But for pipeline metadata, purely missing file shouldn't block tagging. + # So we allow it. + pass + except Exception: + pass + + if proceed_local: existing_tag_list = _extract_item_tags(res) existing_lower = { t.lower() @@ -799,14 +816,9 @@ class Add_Tag(Cmdlet): except Exception: pass - if local_mode_requested: - log( - "[add_tag] Error: Missing usable local path for tagging (or provide -store)", - file=sys.stderr, - ) - return 1 - if store_name_str and not is_known_backend: + # If it's not a known backend and we didn't handle it above as a local/pipeline + # metadata edit, then it's an error. log( f"[add_tag] Error: Unknown store '{store_name_str}'. Available: {store_registry.list_backends()}", file=sys.stderr, diff --git a/cmdlet/get_relationship.py b/cmdlet/get_relationship.py index fa1b2ea..03762b9 100644 --- a/cmdlet/get_relationship.py +++ b/cmdlet/get_relationship.py @@ -514,7 +514,19 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: return 0 # Display results - table = ResultTable(f"Relationships: {source_title}" + from SYS.result_table import ItemDetailView, extract_item_metadata + + # Prepare metadata for the detail view + metadata = extract_item_metadata(result) + + if hash_hex: + metadata["Hash"] = hash_hex + + # Overlays + if source_title and source_title != "Unknown": + metadata["Title"] = source_title + + table = ItemDetailView(f"Relationships", item_metadata=metadata ).init_command("get-relationship", []) diff --git a/cmdlet/get_tag.py b/cmdlet/get_tag.py index 451b810..39004a2 100644 --- a/cmdlet/get_tag.py +++ b/cmdlet/get_tag.py @@ -322,15 +322,23 @@ def _emit_tags_as_table( This replaces _print_tag_list to make tags pipe-able. Stores the table via ctx.set_last_result_table_overlay (or ctx.set_last_result_table) for downstream @ selection. """ - from SYS.result_table import ResultTable + from SYS.result_table import ItemDetailView, extract_item_metadata - # Create ResultTable with just tag column (no title) - # Keep the title stable and avoid including hash fragments. - table_title = "tag" + # Prepare metadata for the detail view + metadata = extract_item_metadata(subject) + + # Overlays/Overrides from explicit args if subject was partial if item_title: - table_title = f"tag: {item_title}" + metadata["Title"] = item_title + if file_hash: + metadata["Hash"] = file_hash + if store: + metadata["Store"] = service_name if service_name else store + if path: + metadata["Path"] = path - table = ResultTable(table_title, max_columns=1) + # Create ItemDetailView + table = ItemDetailView("Tags", item_metadata=metadata, max_columns=1) table.set_source_command("get-tag", []) # Create TagItem for each tag diff --git a/cmdlet/get_url.py b/cmdlet/get_url.py index 685ebc0..d3c1f28 100644 --- a/cmdlet/get_url.py +++ b/cmdlet/get_url.py @@ -421,14 +421,20 @@ class Get_Url(Cmdlet): from SYS.metadata import normalize_urls urls = normalize_urls(urls) - title = str(get_field(result, "title") or "").strip() - table_title = "Title" - if title: - table_title = f"Title: {title}" + from SYS.result_table import ItemDetailView, extract_item_metadata + + # Prepare metadata for the detail view + metadata = extract_item_metadata(result) + + if file_hash: + metadata["Hash"] = file_hash + if store_name: + metadata["Store"] = store_name table = ( - ResultTable( - table_title, + ItemDetailView( + "Urls", + item_metadata=metadata, max_columns=1 ).set_preserve_order(True).set_table("url").set_value_case("preserve") )