2025-11-25 20:09:33 -08:00
|
|
|
"""Unified result table formatter for CLI display.
|
|
|
|
|
|
|
|
|
|
Provides a structured way to convert search results, metadata, and pipeline objects
|
|
|
|
|
into formatted tables suitable for display in the REPL and CLI output.
|
|
|
|
|
|
|
|
|
|
Features:
|
|
|
|
|
- Format results as aligned tables with row numbers
|
|
|
|
|
- Support multiple selection formats (single, ranges, lists, combined)
|
|
|
|
|
- Interactive selection with user input
|
|
|
|
|
- Input options for cmdlet arguments (location, source selection, etc)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from dataclasses import dataclass, field
|
|
|
|
|
from typing import Any, Dict, List, Optional, Union, Callable, Tuple
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
import json
|
|
|
|
|
|
|
|
|
|
# Optional Textual imports - graceful fallback if not available
|
|
|
|
|
try:
|
|
|
|
|
from textual.widgets import Tree, DataTable
|
|
|
|
|
from textual.containers import Horizontal, Vertical
|
|
|
|
|
from textual.widgets import Static, Button
|
|
|
|
|
TEXTUAL_AVAILABLE = True
|
|
|
|
|
except ImportError:
|
|
|
|
|
TEXTUAL_AVAILABLE = False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class InputOption:
|
|
|
|
|
"""Represents an interactive input option (cmdlet argument) in a table.
|
|
|
|
|
|
|
|
|
|
Allows users to select options that translate to cmdlet arguments,
|
|
|
|
|
enabling interactive configuration right from the result table.
|
|
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
# Create an option for location selection
|
|
|
|
|
location_opt = InputOption(
|
|
|
|
|
"location",
|
|
|
|
|
type="enum",
|
|
|
|
|
choices=["local", "hydrus", "0x0"],
|
|
|
|
|
description="Download destination"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Use in result table
|
|
|
|
|
table.add_input_option(location_opt)
|
|
|
|
|
selected = table.select_option("location") # Returns user choice
|
|
|
|
|
"""
|
|
|
|
|
name: str
|
|
|
|
|
"""Option name (maps to cmdlet argument)"""
|
|
|
|
|
type: str = "string"
|
|
|
|
|
"""Option type: 'string', 'enum', 'flag', 'integer'"""
|
|
|
|
|
choices: List[str] = field(default_factory=list)
|
|
|
|
|
"""Valid choices for enum type"""
|
|
|
|
|
default: Optional[str] = None
|
|
|
|
|
"""Default value if not specified"""
|
|
|
|
|
description: str = ""
|
|
|
|
|
"""Description of what this option does"""
|
|
|
|
|
validator: Optional[Callable[[str], bool]] = None
|
|
|
|
|
"""Optional validator function: takes value, returns True if valid"""
|
|
|
|
|
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
|
|
|
"""Convert to dictionary."""
|
|
|
|
|
return {
|
|
|
|
|
"name": self.name,
|
|
|
|
|
"type": self.type,
|
|
|
|
|
"choices": self.choices if self.choices else None,
|
|
|
|
|
"default": self.default,
|
|
|
|
|
"description": self.description,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class TUIResultCard:
|
|
|
|
|
"""Represents a result as a UI card with title, metadata, and actions.
|
|
|
|
|
|
|
|
|
|
Used in hub-ui and TUI contexts to render individual search results
|
|
|
|
|
as grouped components with visual structure.
|
|
|
|
|
"""
|
|
|
|
|
title: str
|
|
|
|
|
subtitle: Optional[str] = None
|
|
|
|
|
metadata: Optional[Dict[str, str]] = None
|
|
|
|
|
media_kind: Optional[str] = None
|
|
|
|
|
tags: Optional[List[str]] = None
|
|
|
|
|
file_hash: Optional[str] = None
|
|
|
|
|
file_size: Optional[str] = None
|
|
|
|
|
duration: Optional[str] = None
|
|
|
|
|
|
|
|
|
|
def __post_init__(self):
|
|
|
|
|
"""Initialize default values."""
|
|
|
|
|
if self.metadata is None:
|
|
|
|
|
self.metadata = {}
|
|
|
|
|
if self.tags is None:
|
|
|
|
|
self.tags = []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class ResultColumn:
|
|
|
|
|
"""Represents a single column in a result table."""
|
|
|
|
|
name: str
|
|
|
|
|
value: str
|
|
|
|
|
width: Optional[int] = None
|
|
|
|
|
|
|
|
|
|
def __str__(self) -> str:
|
|
|
|
|
"""String representation of the column."""
|
|
|
|
|
return f"{self.name}: {self.value}"
|
|
|
|
|
|
|
|
|
|
def to_dict(self) -> Dict[str, str]:
|
|
|
|
|
"""Convert to dictionary."""
|
|
|
|
|
return {"name": self.name, "value": self.value}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class ResultRow:
|
|
|
|
|
"""Represents a single row in a result table."""
|
|
|
|
|
columns: List[ResultColumn] = field(default_factory=list)
|
|
|
|
|
selection_args: Optional[List[str]] = None
|
|
|
|
|
"""Arguments to use for this row when selected via @N syntax (e.g., ['-item', '3'])"""
|
|
|
|
|
|
|
|
|
|
def add_column(self, name: str, value: Any) -> None:
|
|
|
|
|
"""Add a column to this row."""
|
|
|
|
|
str_value = str(value) if value is not None else ""
|
|
|
|
|
self.columns.append(ResultColumn(name, str_value))
|
|
|
|
|
|
|
|
|
|
def get_column(self, name: str) -> Optional[str]:
|
|
|
|
|
"""Get column value by name."""
|
|
|
|
|
for col in self.columns:
|
|
|
|
|
if col.name.lower() == name.lower():
|
|
|
|
|
return col.value
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def to_dict(self) -> List[Dict[str, str]]:
|
|
|
|
|
"""Convert to list of column dicts."""
|
|
|
|
|
return [col.to_dict() for col in self.columns]
|
|
|
|
|
|
|
|
|
|
def to_list(self) -> List[tuple[str, str]]:
|
|
|
|
|
"""Convert to list of (name, value) tuples."""
|
|
|
|
|
return [(col.name, col.value) for col in self.columns]
|
|
|
|
|
|
|
|
|
|
def __str__(self) -> str:
|
|
|
|
|
"""String representation of the row."""
|
|
|
|
|
return " | ".join(str(col) for col in self.columns)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ResultTable:
|
|
|
|
|
"""Unified table formatter for search results, metadata, and pipeline objects.
|
|
|
|
|
|
|
|
|
|
Provides a structured way to display results in the CLI with consistent formatting.
|
|
|
|
|
Handles conversion from various result types (SearchResult, PipeObject, dicts) into
|
|
|
|
|
a formatted table with rows and columns.
|
|
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
>>> result_table = ResultTable("Search Results")
|
|
|
|
|
>>> row = result_table.add_row()
|
|
|
|
|
>>> row.add_column("File", "document.pdf")
|
|
|
|
|
>>> row.add_column("Size", "2.5 MB")
|
|
|
|
|
>>> row.add_column("Tags", "pdf, document")
|
|
|
|
|
>>> print(result_table)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, title: str = "", title_width: int = 80, max_columns: int = None):
|
|
|
|
|
"""Initialize a result table.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
title: Optional title for the table
|
|
|
|
|
title_width: Width for formatting the title line
|
|
|
|
|
max_columns: Maximum number of columns to display (None for unlimited, default: 5 for search results)
|
|
|
|
|
"""
|
|
|
|
|
self.title = title
|
|
|
|
|
self.title_width = title_width
|
|
|
|
|
self.max_columns = max_columns if max_columns is not None else 5 # Default 5 for cleaner display
|
|
|
|
|
self.rows: List[ResultRow] = []
|
|
|
|
|
self.column_widths: Dict[str, int] = {}
|
|
|
|
|
self.input_options: Dict[str, InputOption] = {}
|
|
|
|
|
"""Options available for user input (cmdlet arguments)"""
|
|
|
|
|
self.source_command: Optional[str] = None
|
|
|
|
|
"""Command that generated this table (e.g., 'download-data URL')"""
|
|
|
|
|
self.source_args: List[str] = []
|
|
|
|
|
"""Base arguments for the source command"""
|
|
|
|
|
|
|
|
|
|
def add_row(self) -> ResultRow:
|
|
|
|
|
"""Add a new row to the table and return it for configuration."""
|
|
|
|
|
row = ResultRow()
|
|
|
|
|
self.rows.append(row)
|
|
|
|
|
return row
|
|
|
|
|
|
|
|
|
|
def set_source_command(self, command: str, args: Optional[List[str]] = None) -> "ResultTable":
|
|
|
|
|
"""Set the source command that generated this table.
|
|
|
|
|
|
|
|
|
|
This is used for @N expansion: when user runs @2 | next-cmd, it will expand to:
|
|
|
|
|
source_command + source_args + row_selection_args | next-cmd
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
command: Command name (e.g., 'download-data')
|
|
|
|
|
args: Base arguments for the command (e.g., ['URL'])
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Self for chaining
|
|
|
|
|
"""
|
|
|
|
|
self.source_command = command
|
|
|
|
|
self.source_args = args or []
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
def set_row_selection_args(self, row_index: int, selection_args: List[str]) -> None:
|
|
|
|
|
"""Set the selection arguments for a specific row.
|
|
|
|
|
|
|
|
|
|
When user selects this row via @N, these arguments will be appended to the
|
|
|
|
|
source command to re-execute with that item selected.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
row_index: Index of the row (0-based)
|
|
|
|
|
selection_args: Arguments to use (e.g., ['-item', '3'])
|
|
|
|
|
"""
|
|
|
|
|
if 0 <= row_index < len(self.rows):
|
|
|
|
|
self.rows[row_index].selection_args = selection_args
|
|
|
|
|
|
|
|
|
|
def add_result(self, result: Any) -> "ResultTable":
|
|
|
|
|
"""Add a result object (SearchResult, PipeObject, ResultItem, TagItem, or dict) as a row.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
result: Result object to add
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Self for chaining
|
|
|
|
|
"""
|
|
|
|
|
row = self.add_row()
|
|
|
|
|
|
|
|
|
|
# Handle TagItem from get_tag.py (tag display with index)
|
|
|
|
|
if hasattr(result, '__class__') and result.__class__.__name__ == 'TagItem':
|
|
|
|
|
self._add_tag_item(row, result)
|
|
|
|
|
# Handle ResultItem from search_file.py (compact display)
|
|
|
|
|
elif hasattr(result, '__class__') and result.__class__.__name__ == 'ResultItem':
|
|
|
|
|
self._add_result_item(row, result)
|
|
|
|
|
# Handle SearchResult from search_file.py
|
|
|
|
|
elif hasattr(result, '__class__') and result.__class__.__name__ == 'SearchResult':
|
|
|
|
|
self._add_search_result(row, result)
|
|
|
|
|
# Handle PipeObject from models.py
|
|
|
|
|
elif hasattr(result, '__class__') and result.__class__.__name__ == 'PipeObject':
|
|
|
|
|
self._add_pipe_object(row, result)
|
|
|
|
|
# Handle dict
|
|
|
|
|
elif isinstance(result, dict):
|
|
|
|
|
self._add_dict(row, result)
|
|
|
|
|
# Handle generic objects with __dict__
|
|
|
|
|
elif hasattr(result, '__dict__'):
|
|
|
|
|
self._add_generic_object(row, result)
|
|
|
|
|
# Handle strings (simple text result)
|
|
|
|
|
elif isinstance(result, str):
|
|
|
|
|
row.add_column("Result", result)
|
|
|
|
|
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
def _add_search_result(self, row: ResultRow, result: Any) -> None:
|
|
|
|
|
"""Extract and add SearchResult fields to row."""
|
|
|
|
|
# Core fields
|
2025-11-27 10:59:01 -08:00
|
|
|
title = getattr(result, 'title', '')
|
|
|
|
|
origin = getattr(result, 'origin', '').lower()
|
|
|
|
|
|
|
|
|
|
# Handle extension separation for local files
|
|
|
|
|
extension = ""
|
|
|
|
|
if title and origin == 'local':
|
|
|
|
|
path_obj = Path(title)
|
|
|
|
|
if path_obj.suffix:
|
|
|
|
|
extension = path_obj.suffix.lstrip('.')
|
|
|
|
|
title = path_obj.stem
|
|
|
|
|
|
|
|
|
|
if title:
|
|
|
|
|
row.add_column("Title", title)
|
|
|
|
|
|
|
|
|
|
# Extension column
|
|
|
|
|
row.add_column("Ext", extension)
|
2025-11-25 20:09:33 -08:00
|
|
|
|
|
|
|
|
if hasattr(result, 'origin') and result.origin:
|
|
|
|
|
row.add_column("Source", result.origin)
|
|
|
|
|
|
|
|
|
|
if hasattr(result, 'detail') and result.detail:
|
|
|
|
|
row.add_column("Detail", result.detail)
|
|
|
|
|
|
|
|
|
|
if hasattr(result, 'media_kind') and result.media_kind:
|
|
|
|
|
row.add_column("Type", result.media_kind)
|
|
|
|
|
|
|
|
|
|
# Tags summary
|
|
|
|
|
if hasattr(result, 'tag_summary') and result.tag_summary:
|
|
|
|
|
tags_str = str(result.tag_summary)
|
|
|
|
|
if len(tags_str) > 60:
|
|
|
|
|
tags_str = tags_str[:57] + "..."
|
|
|
|
|
row.add_column("Tags", tags_str)
|
|
|
|
|
|
|
|
|
|
# Duration (for media)
|
|
|
|
|
if hasattr(result, 'duration_seconds') and result.duration_seconds:
|
|
|
|
|
minutes = int(result.duration_seconds // 60)
|
|
|
|
|
seconds = int(result.duration_seconds % 60)
|
|
|
|
|
row.add_column("Duration", f"{minutes}m {seconds}s")
|
|
|
|
|
|
|
|
|
|
# Size (for files)
|
|
|
|
|
if hasattr(result, 'size_bytes') and result.size_bytes:
|
|
|
|
|
size_mb = result.size_bytes / (1024 * 1024)
|
|
|
|
|
row.add_column("Size", f"{size_mb:.1f} MB")
|
|
|
|
|
|
|
|
|
|
# Annotations
|
|
|
|
|
if hasattr(result, 'annotations') and result.annotations:
|
|
|
|
|
ann_str = ", ".join(str(a) for a in result.annotations)
|
|
|
|
|
if len(ann_str) > 50:
|
|
|
|
|
ann_str = ann_str[:47] + "..."
|
|
|
|
|
row.add_column("Annotations", ann_str)
|
|
|
|
|
|
|
|
|
|
def _add_result_item(self, row: ResultRow, item: Any) -> None:
|
|
|
|
|
"""Extract and add ResultItem fields to row (compact display for search results).
|
|
|
|
|
|
|
|
|
|
Shows only essential columns:
|
|
|
|
|
- Title (required)
|
2025-11-27 10:59:01 -08:00
|
|
|
- Ext (extension)
|
2025-11-25 20:09:33 -08:00
|
|
|
- Origin (source backend)
|
|
|
|
|
- Size (formatted MB, integer only)
|
|
|
|
|
|
|
|
|
|
All other fields are stored in item but not displayed to keep table compact.
|
|
|
|
|
Use @row# syntax to pipe full item data to next command.
|
|
|
|
|
"""
|
|
|
|
|
# Title (required - use origin as fallback)
|
|
|
|
|
title = getattr(item, 'title', None) or getattr(item, 'origin', 'Unknown')
|
2025-11-27 10:59:01 -08:00
|
|
|
origin = getattr(item, 'origin', '').lower()
|
|
|
|
|
|
|
|
|
|
# Handle extension separation for local files
|
|
|
|
|
extension = ""
|
|
|
|
|
if title and origin == 'local':
|
|
|
|
|
# Try to split extension
|
|
|
|
|
path_obj = Path(title)
|
|
|
|
|
if path_obj.suffix:
|
|
|
|
|
extension = path_obj.suffix.lstrip('.')
|
|
|
|
|
title = path_obj.stem
|
|
|
|
|
|
2025-11-25 20:09:33 -08:00
|
|
|
if title:
|
|
|
|
|
row.add_column("Title", title[:90] + ("..." if len(title) > 90 else ""))
|
|
|
|
|
|
2025-11-27 10:59:01 -08:00
|
|
|
# Extension column - always add to maintain column order
|
|
|
|
|
row.add_column("Ext", extension)
|
|
|
|
|
|
2025-11-25 20:09:33 -08:00
|
|
|
# Storage (source backend - hydrus, local, debrid, etc)
|
|
|
|
|
if hasattr(item, 'origin') and item.origin:
|
|
|
|
|
row.add_column("Storage", item.origin)
|
|
|
|
|
|
|
|
|
|
# Size (for files) - integer MB only
|
|
|
|
|
if hasattr(item, 'size_bytes') and item.size_bytes:
|
|
|
|
|
size_mb = int(item.size_bytes / (1024 * 1024))
|
|
|
|
|
row.add_column("Size", f"{size_mb} MB")
|
|
|
|
|
|
|
|
|
|
def _add_tag_item(self, row: ResultRow, item: Any) -> None:
|
|
|
|
|
"""Extract and add TagItem fields to row (compact tag display).
|
|
|
|
|
|
|
|
|
|
Shows the Tag column with the tag name and Source column to identify
|
|
|
|
|
which storage backend the tags come from (Hydrus, local, etc.).
|
|
|
|
|
All data preserved in TagItem for piping and operations.
|
|
|
|
|
Use @1 to select a tag, @{1,3,5} to select multiple.
|
|
|
|
|
"""
|
|
|
|
|
# Tag name (truncate if too long)
|
|
|
|
|
if hasattr(item, 'tag_name') and item.tag_name:
|
|
|
|
|
tag_name = item.tag_name
|
|
|
|
|
if len(tag_name) > 60:
|
|
|
|
|
tag_name = tag_name[:57] + "..."
|
|
|
|
|
row.add_column("Tag", tag_name)
|
|
|
|
|
|
|
|
|
|
# Source/Store (where the tags come from)
|
|
|
|
|
if hasattr(item, 'source') and item.source:
|
|
|
|
|
row.add_column("Store", item.source)
|
|
|
|
|
elif hasattr(item, 'origin') and item.origin:
|
|
|
|
|
row.add_column("Store", item.origin)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _add_pipe_object(self, row: ResultRow, obj: Any) -> None:
|
|
|
|
|
"""Extract and add PipeObject fields to row."""
|
|
|
|
|
# Source and identifier
|
|
|
|
|
if hasattr(obj, 'source') and obj.source:
|
|
|
|
|
row.add_column("Source", obj.source)
|
|
|
|
|
|
|
|
|
|
# Title
|
|
|
|
|
if hasattr(obj, 'title') and obj.title:
|
|
|
|
|
row.add_column("Title", obj.title[:50] + ("..." if len(obj.title) > 50 else ""))
|
|
|
|
|
|
|
|
|
|
# File info
|
|
|
|
|
if hasattr(obj, 'file_path') and obj.file_path:
|
|
|
|
|
file_str = str(obj.file_path)
|
|
|
|
|
if len(file_str) > 60:
|
|
|
|
|
file_str = "..." + file_str[-57:]
|
|
|
|
|
row.add_column("Path", file_str)
|
|
|
|
|
|
|
|
|
|
# Tags
|
|
|
|
|
if hasattr(obj, 'tags') and obj.tags:
|
|
|
|
|
tags_str = ", ".join(obj.tags[:3]) # First 3 tags
|
|
|
|
|
if len(obj.tags) > 3:
|
|
|
|
|
tags_str += f", +{len(obj.tags) - 3} more"
|
|
|
|
|
row.add_column("Tags", tags_str)
|
|
|
|
|
|
|
|
|
|
# Duration
|
|
|
|
|
if hasattr(obj, 'duration') and obj.duration:
|
|
|
|
|
row.add_column("Duration", f"{obj.duration:.1f}s")
|
|
|
|
|
|
|
|
|
|
# Warnings
|
|
|
|
|
if hasattr(obj, 'warnings') and obj.warnings:
|
|
|
|
|
warnings_str = "; ".join(obj.warnings[:2])
|
|
|
|
|
if len(obj.warnings) > 2:
|
|
|
|
|
warnings_str += f" (+{len(obj.warnings) - 2} more)"
|
|
|
|
|
row.add_column("Warnings", warnings_str)
|
|
|
|
|
|
|
|
|
|
def _add_dict(self, row: ResultRow, data: Dict[str, Any]) -> None:
|
|
|
|
|
"""Extract and add dict fields to row using first-match priority groups.
|
|
|
|
|
|
|
|
|
|
Respects max_columns limit to keep table compact and readable.
|
|
|
|
|
|
|
|
|
|
Special handling for 'columns' field: if present, uses it to populate row columns
|
|
|
|
|
instead of treating it as a regular field. This allows dynamic column definitions
|
|
|
|
|
from search providers.
|
|
|
|
|
|
|
|
|
|
Priority field groups (uses first match within each group):
|
|
|
|
|
- title | name | filename
|
|
|
|
|
- origin | source
|
|
|
|
|
- type | media_kind | kind
|
|
|
|
|
- target | path | url
|
|
|
|
|
- hash | hash_hex | file_hash
|
|
|
|
|
- tags | tag_summary
|
|
|
|
|
- detail | description
|
|
|
|
|
"""
|
|
|
|
|
# Helper to determine if a field should be hidden from display
|
|
|
|
|
def is_hidden_field(field_name: Any) -> bool:
|
|
|
|
|
# Hide internal/metadata fields
|
2025-11-27 10:59:01 -08:00
|
|
|
hidden_fields = {
|
|
|
|
|
'__', 'id', 'action', 'parent_id', 'is_temp', 'file_path', 'extra',
|
|
|
|
|
'target', 'hash', 'hash_hex', 'file_hash'
|
|
|
|
|
}
|
2025-11-25 20:09:33 -08:00
|
|
|
if isinstance(field_name, str):
|
|
|
|
|
if field_name.startswith('__'):
|
|
|
|
|
return True
|
|
|
|
|
if field_name in hidden_fields:
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Strip out hidden metadata fields (prefixed with __)
|
|
|
|
|
visible_data = {k: v for k, v in data.items() if not is_hidden_field(k)}
|
|
|
|
|
|
2025-11-27 10:59:01 -08:00
|
|
|
# Handle extension separation for local files
|
|
|
|
|
origin = str(visible_data.get('origin', '') or visible_data.get('source', '')).lower()
|
|
|
|
|
|
|
|
|
|
# Debug logging
|
|
|
|
|
# print(f"DEBUG: Processing dict result. Origin: {origin}, Keys: {list(visible_data.keys())}")
|
|
|
|
|
|
|
|
|
|
if origin == 'local':
|
|
|
|
|
# Find title field
|
|
|
|
|
title_field = next((f for f in ['title', 'name', 'filename'] if f in visible_data), None)
|
|
|
|
|
if title_field:
|
|
|
|
|
title_val = str(visible_data[title_field])
|
|
|
|
|
path_obj = Path(title_val)
|
|
|
|
|
if path_obj.suffix:
|
|
|
|
|
extension = path_obj.suffix.lstrip('.')
|
|
|
|
|
visible_data[title_field] = path_obj.stem
|
|
|
|
|
visible_data['ext'] = extension
|
|
|
|
|
# print(f"DEBUG: Split extension. Title: {visible_data[title_field]}, Ext: {extension}")
|
|
|
|
|
else:
|
|
|
|
|
visible_data['ext'] = ""
|
|
|
|
|
|
|
|
|
|
# Ensure 'ext' is present so it gets picked up by priority_groups in correct order
|
|
|
|
|
if 'ext' not in visible_data:
|
|
|
|
|
visible_data['ext'] = ""
|
|
|
|
|
|
2025-11-25 20:09:33 -08:00
|
|
|
# Track which fields we've already added to avoid duplicates
|
|
|
|
|
added_fields = set()
|
|
|
|
|
column_count = 0 # Track total columns added
|
|
|
|
|
|
|
|
|
|
# Helper function to format values
|
|
|
|
|
def format_value(value: Any) -> str:
|
|
|
|
|
if isinstance(value, list):
|
|
|
|
|
formatted = ", ".join(str(v) for v in value[:3])
|
|
|
|
|
if len(value) > 3:
|
|
|
|
|
formatted += f", +{len(value) - 3} more"
|
|
|
|
|
return formatted
|
|
|
|
|
return str(value)
|
|
|
|
|
|
|
|
|
|
# Special handling for 'columns' field from search providers
|
|
|
|
|
# If present, use it to populate row columns dynamically
|
|
|
|
|
if 'columns' in visible_data and isinstance(visible_data['columns'], list) and visible_data['columns']:
|
|
|
|
|
try:
|
|
|
|
|
for col_name, col_value in visible_data['columns']:
|
|
|
|
|
# Skip the "#" column as ResultTable already adds row numbers
|
|
|
|
|
if col_name == '#':
|
|
|
|
|
continue
|
|
|
|
|
if column_count >= self.max_columns:
|
|
|
|
|
break
|
|
|
|
|
col_value_str = format_value(col_value)
|
|
|
|
|
if len(col_value_str) > 60:
|
|
|
|
|
col_value_str = col_value_str[:57] + "..."
|
|
|
|
|
row.add_column(col_name, col_value_str)
|
|
|
|
|
added_fields.add(col_name.lower())
|
|
|
|
|
column_count += 1
|
|
|
|
|
# Mark 'columns' as handled so we don't add it as a field
|
|
|
|
|
added_fields.add('columns')
|
|
|
|
|
# Also mark common fields that shouldn't be re-displayed if they're in columns
|
|
|
|
|
# This prevents showing both "Store" (from columns) and "Origin" (from data fields)
|
|
|
|
|
added_fields.add('origin')
|
|
|
|
|
added_fields.add('source')
|
|
|
|
|
added_fields.add('target')
|
|
|
|
|
added_fields.add('path')
|
|
|
|
|
added_fields.add('media_kind')
|
|
|
|
|
added_fields.add('detail')
|
|
|
|
|
added_fields.add('annotations')
|
|
|
|
|
added_fields.add('full_metadata') # Don't display full metadata as column
|
|
|
|
|
except Exception:
|
|
|
|
|
# Fall back to regular field handling if columns format is unexpected
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# Only add priority groups if we haven't already filled columns from 'columns' field
|
|
|
|
|
if column_count == 0:
|
|
|
|
|
# Priority field groups - uses first matching field in each group
|
|
|
|
|
priority_groups = [
|
|
|
|
|
('title | name | filename', ['title', 'name', 'filename']),
|
2025-11-27 10:59:01 -08:00
|
|
|
('ext', ['ext']),
|
|
|
|
|
('origin | source | store', ['origin', 'source', 'store']),
|
2025-11-25 20:09:33 -08:00
|
|
|
('type | media_kind | kind', ['type', 'media_kind', 'kind']),
|
|
|
|
|
('tags | tag_summary', ['tags', 'tag_summary']),
|
|
|
|
|
('detail | description', ['detail', 'description']),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# Add priority field groups first - use first match in each group
|
|
|
|
|
for _group_label, field_options in priority_groups:
|
|
|
|
|
if column_count >= self.max_columns:
|
|
|
|
|
break
|
|
|
|
|
for field in field_options:
|
|
|
|
|
if field in visible_data and field not in added_fields:
|
|
|
|
|
value_str = format_value(visible_data[field])
|
|
|
|
|
if len(value_str) > 60:
|
|
|
|
|
value_str = value_str[:57] + "..."
|
|
|
|
|
|
2025-11-27 10:59:01 -08:00
|
|
|
# Special case for Origin/Source -> Store to match user preference
|
|
|
|
|
col_name = field.replace('_', ' ').title()
|
|
|
|
|
if field in ['origin', 'source']:
|
|
|
|
|
col_name = "Store"
|
|
|
|
|
|
|
|
|
|
row.add_column(col_name, value_str)
|
2025-11-25 20:09:33 -08:00
|
|
|
added_fields.add(field)
|
|
|
|
|
column_count += 1
|
|
|
|
|
break # Use first match in this group, skip rest
|
|
|
|
|
|
|
|
|
|
# Add remaining fields only if we haven't hit max_columns (and no explicit columns were set)
|
|
|
|
|
if column_count < self.max_columns:
|
|
|
|
|
for key, value in visible_data.items():
|
|
|
|
|
if column_count >= self.max_columns:
|
|
|
|
|
break
|
|
|
|
|
if key not in added_fields: # Only add if not already added
|
|
|
|
|
value_str = format_value(value)
|
|
|
|
|
if len(value_str) > 40:
|
|
|
|
|
value_str = value_str[:37] + "..."
|
|
|
|
|
row.add_column(key.replace('_', ' ').title(), value_str)
|
|
|
|
|
added_fields.add(key) # Track in added_fields to prevent re-adding
|
|
|
|
|
column_count += 1
|
|
|
|
|
|
|
|
|
|
# Check for selection args
|
|
|
|
|
if '_selection_args' in data:
|
|
|
|
|
row.selection_args = data['_selection_args']
|
|
|
|
|
# Don't display it
|
|
|
|
|
added_fields.add('_selection_args')
|
|
|
|
|
|
|
|
|
|
def _add_generic_object(self, row: ResultRow, obj: Any) -> None:
|
|
|
|
|
"""Extract and add fields from generic objects."""
|
|
|
|
|
if hasattr(obj, '__dict__'):
|
|
|
|
|
for key, value in obj.__dict__.items():
|
|
|
|
|
if key.startswith('_'): # Skip private attributes
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
value_str = str(value)
|
|
|
|
|
if len(value_str) > 60:
|
|
|
|
|
value_str = value_str[:57] + "..."
|
|
|
|
|
|
|
|
|
|
row.add_column(key.replace('_', ' ').title(), value_str)
|
|
|
|
|
|
|
|
|
|
def format_plain(self) -> str:
|
|
|
|
|
"""Format table as plain text with aligned columns and row numbers.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Formatted table string
|
|
|
|
|
"""
|
|
|
|
|
if not self.rows:
|
|
|
|
|
return "No results"
|
|
|
|
|
|
|
|
|
|
# Calculate column widths
|
|
|
|
|
col_widths: Dict[str, int] = {}
|
|
|
|
|
for row in self.rows:
|
|
|
|
|
for col in row.columns:
|
|
|
|
|
col_name = col.name
|
|
|
|
|
col_widths[col_name] = max(
|
|
|
|
|
col_widths.get(col_name, 0),
|
|
|
|
|
len(col.name),
|
|
|
|
|
len(col.value)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Calculate row number column width
|
|
|
|
|
num_width = len(str(len(self.rows))) + 1 # +1 for padding
|
|
|
|
|
|
|
|
|
|
lines = []
|
|
|
|
|
|
|
|
|
|
# Add title if present
|
|
|
|
|
if self.title:
|
|
|
|
|
lines.append("=" * self.title_width)
|
|
|
|
|
lines.append(self.title.center(self.title_width))
|
|
|
|
|
lines.append("=" * self.title_width)
|
|
|
|
|
|
|
|
|
|
# Add header with # column
|
|
|
|
|
header_parts = ["#".ljust(num_width)]
|
|
|
|
|
separator_parts = ["-" * num_width]
|
|
|
|
|
for col_name in col_widths:
|
|
|
|
|
width = min(col_widths[col_name], 90) # Cap column width (increased for expanded titles)
|
|
|
|
|
header_parts.append(col_name.ljust(width))
|
|
|
|
|
separator_parts.append("-" * width)
|
|
|
|
|
|
|
|
|
|
lines.append(" | ".join(header_parts))
|
|
|
|
|
lines.append("-+-".join(separator_parts))
|
|
|
|
|
|
|
|
|
|
# Add rows with row numbers
|
|
|
|
|
for row_num, row in enumerate(self.rows, 1):
|
|
|
|
|
row_parts = [str(row_num).ljust(num_width)]
|
|
|
|
|
for col_name in col_widths:
|
|
|
|
|
width = min(col_widths[col_name], 90) # Increased cap for expanded titles
|
|
|
|
|
col_value = row.get_column(col_name) or ""
|
|
|
|
|
if len(col_value) > width:
|
|
|
|
|
col_value = col_value[:width - 3] + "..."
|
|
|
|
|
row_parts.append(col_value.ljust(width))
|
|
|
|
|
lines.append(" | ".join(row_parts))
|
|
|
|
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
def format_compact(self) -> str:
|
|
|
|
|
"""Format table in compact form (one line per row).
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Formatted table string
|
|
|
|
|
"""
|
|
|
|
|
lines = []
|
|
|
|
|
|
|
|
|
|
if self.title:
|
|
|
|
|
lines.append(f"\n{self.title}")
|
|
|
|
|
lines.append("-" * len(self.title))
|
|
|
|
|
|
|
|
|
|
for i, row in enumerate(self.rows, 1):
|
|
|
|
|
row_str = " | ".join(str(col) for col in row.columns)
|
|
|
|
|
lines.append(f"{i}. {row_str}")
|
|
|
|
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
def format_json(self) -> str:
|
|
|
|
|
"""Format table as JSON.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
JSON string
|
|
|
|
|
"""
|
|
|
|
|
data = {
|
|
|
|
|
"title": self.title,
|
|
|
|
|
"row_count": len(self.rows),
|
|
|
|
|
"rows": [row.to_list() for row in self.rows]
|
|
|
|
|
}
|
|
|
|
|
return json.dumps(data, indent=2)
|
|
|
|
|
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
|
|
|
"""Convert table to dictionary.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dictionary representation
|
|
|
|
|
"""
|
|
|
|
|
return {
|
|
|
|
|
"title": self.title,
|
|
|
|
|
"rows": [row.to_list() for row in self.rows]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def __str__(self) -> str:
|
|
|
|
|
"""String representation (plain text format)."""
|
|
|
|
|
return self.format_plain()
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
"""Developer representation."""
|
|
|
|
|
return f"ResultTable(title={self.title!r}, rows={len(self.rows)})"
|
|
|
|
|
|
|
|
|
|
def __len__(self) -> int:
|
|
|
|
|
"""Number of rows in the table."""
|
|
|
|
|
return len(self.rows)
|
|
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
|
"""Iterate over rows."""
|
|
|
|
|
return iter(self.rows)
|
|
|
|
|
|
|
|
|
|
def __getitem__(self, index: int) -> ResultRow:
|
|
|
|
|
"""Get row by index."""
|
|
|
|
|
return self.rows[index]
|
|
|
|
|
|
|
|
|
|
def select_interactive(self, prompt: str = "Select an item", accept_args: bool = False) -> Optional[List[int]] | dict:
|
|
|
|
|
"""Display table and get interactive user selection (single or multiple).
|
|
|
|
|
|
|
|
|
|
Supports multiple input formats:
|
|
|
|
|
- Single: "5" or "q" to quit
|
|
|
|
|
- Range: "3-5" (selects items 3, 4, 5)
|
|
|
|
|
- Multiple: "3,5,13" (selects items 3, 5, and 13)
|
|
|
|
|
- Combined: "1-3,7,9-11" (selects 1,2,3,7,9,10,11)
|
|
|
|
|
|
|
|
|
|
If accept_args=True, also supports cmdlet arguments:
|
|
|
|
|
- "5 -storage hydrus" → returns indices [4] + args {"-storage": "hydrus"}
|
|
|
|
|
- "2-4 -storage hydrus -tag important" → returns indices [1,2,3] + multiple args
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
prompt: Custom prompt text
|
|
|
|
|
accept_args: If True, parse and return cmdlet arguments from input
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
If accept_args=False: List of 0-based indices, or None if cancelled
|
|
|
|
|
If accept_args=True: Dict with "indices" and "args" keys, or None if cancelled
|
|
|
|
|
"""
|
|
|
|
|
# Display the table
|
|
|
|
|
print(f"\n{self}")
|
|
|
|
|
|
|
|
|
|
# Get user input
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
if accept_args:
|
|
|
|
|
choice = input(f"\n{prompt} (e.g., '5' or '2 -storage hydrus' or 'q' to quit): ").strip()
|
|
|
|
|
else:
|
|
|
|
|
choice = input(f"\n{prompt} (e.g., '5' or '3-5' or '1,3,5' or 'q' to quit): ").strip()
|
|
|
|
|
|
|
|
|
|
if choice.lower() == 'q':
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
if accept_args:
|
|
|
|
|
# Parse selection and arguments
|
|
|
|
|
result = self._parse_selection_with_args(choice)
|
|
|
|
|
if result is not None:
|
|
|
|
|
return result
|
|
|
|
|
print(f"Invalid format. Use: selection (5 or 3-5 or 1,3,5) optionally followed by flags (e.g., '5 -storage hydrus').")
|
|
|
|
|
else:
|
|
|
|
|
# Parse just the selection
|
|
|
|
|
selected_indices = self._parse_selection(choice)
|
|
|
|
|
if selected_indices is not None:
|
|
|
|
|
return selected_indices
|
|
|
|
|
print(f"Invalid format. Use: single (5), range (3-5), list (1,3,5), combined (1-3,7,9-11), or 'q' to quit.")
|
|
|
|
|
except (ValueError, EOFError):
|
|
|
|
|
if accept_args:
|
|
|
|
|
print(f"Invalid format. Use: selection (5 or 3-5 or 1,3,5) optionally followed by flags (e.g., '5 -storage hydrus').")
|
|
|
|
|
else:
|
|
|
|
|
print(f"Invalid format. Use: single (5), range (3-5), list (1,3,5), combined (1-3,7,9-11), or 'q' to quit.")
|
|
|
|
|
|
|
|
|
|
def _parse_selection(self, selection_str: str) -> Optional[List[int]]:
|
|
|
|
|
"""Parse user selection string into list of 0-based indices.
|
|
|
|
|
|
|
|
|
|
Supports:
|
|
|
|
|
- Single: "5" → [4]
|
|
|
|
|
- Range: "3-5" → [2, 3, 4]
|
|
|
|
|
- Multiple: "3,5,13" → [2, 4, 12]
|
|
|
|
|
- Combined: "1-3,7,9-11" → [0, 1, 2, 6, 8, 9, 10]
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
selection_str: User input string
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List of 0-based indices, or None if invalid
|
|
|
|
|
"""
|
|
|
|
|
indices = set()
|
|
|
|
|
|
|
|
|
|
# Split by comma for multiple selections
|
|
|
|
|
parts = selection_str.split(',')
|
|
|
|
|
|
|
|
|
|
for part in parts:
|
|
|
|
|
part = part.strip()
|
|
|
|
|
if not part:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Check if it's a range (contains dash)
|
|
|
|
|
if '-' in part:
|
|
|
|
|
# Handle ranges like "3-5"
|
|
|
|
|
try:
|
|
|
|
|
range_parts = part.split('-')
|
|
|
|
|
if len(range_parts) != 2:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
start = int(range_parts[0].strip())
|
|
|
|
|
end = int(range_parts[1].strip())
|
|
|
|
|
|
|
|
|
|
# Validate range
|
|
|
|
|
if start < 1 or end < 1 or start > len(self.rows) or end > len(self.rows):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
if start > end:
|
|
|
|
|
start, end = end, start
|
|
|
|
|
|
|
|
|
|
# Add all indices in range (convert to 0-based)
|
|
|
|
|
for i in range(start, end + 1):
|
|
|
|
|
indices.add(i - 1)
|
|
|
|
|
|
|
|
|
|
except (ValueError, IndexError):
|
|
|
|
|
return None
|
|
|
|
|
else:
|
|
|
|
|
# Single number
|
|
|
|
|
try:
|
|
|
|
|
num = int(part)
|
|
|
|
|
if num < 1 or num > len(self.rows):
|
|
|
|
|
return None
|
|
|
|
|
indices.add(num - 1) # Convert to 0-based
|
|
|
|
|
except ValueError:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
if not indices:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Return sorted list
|
|
|
|
|
return sorted(list(indices))
|
|
|
|
|
|
|
|
|
|
def _parse_selection_with_args(self, input_str: str) -> Optional[dict]:
|
|
|
|
|
"""Parse user input into selection indices and cmdlet arguments.
|
|
|
|
|
|
|
|
|
|
Supports formats like:
|
|
|
|
|
- "5" → {"indices": [4], "args": {}}
|
|
|
|
|
- "2 -storage hydrus" → {"indices": [1], "args": {"-storage": "hydrus"}}
|
|
|
|
|
- "3-5 -storage hydrus -tag important" → {"indices": [2,3,4], "args": {"-storage": "hydrus", "-tag": "important"}}
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
input_str: User input string with selection and optional flags
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dict with "indices" and "args" keys, or None if invalid
|
|
|
|
|
"""
|
|
|
|
|
parts = input_str.split()
|
|
|
|
|
if not parts:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# First part should be the selection
|
|
|
|
|
selection_str = parts[0]
|
|
|
|
|
selected_indices = self._parse_selection(selection_str)
|
|
|
|
|
|
|
|
|
|
if selected_indices is None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Remaining parts are cmdlet arguments
|
|
|
|
|
cmdlet_args = {}
|
|
|
|
|
i = 1
|
|
|
|
|
while i < len(parts):
|
|
|
|
|
part = parts[i]
|
|
|
|
|
|
|
|
|
|
# Check if it's a flag (starts with -)
|
|
|
|
|
if part.startswith("-"):
|
|
|
|
|
flag = part
|
|
|
|
|
value = None
|
|
|
|
|
|
|
|
|
|
# Get the value if it exists and doesn't start with -
|
|
|
|
|
if i + 1 < len(parts) and not parts[i + 1].startswith("-"):
|
|
|
|
|
value = parts[i + 1]
|
|
|
|
|
i += 2
|
|
|
|
|
else:
|
|
|
|
|
i += 1
|
|
|
|
|
|
|
|
|
|
# Store the flag
|
|
|
|
|
if value is not None:
|
|
|
|
|
cmdlet_args[flag] = value
|
|
|
|
|
else:
|
|
|
|
|
cmdlet_args[flag] = True # Flag without value
|
|
|
|
|
else:
|
|
|
|
|
i += 1
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"indices": selected_indices,
|
|
|
|
|
"args": cmdlet_args
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def add_input_option(self, option: InputOption) -> "ResultTable":
|
|
|
|
|
"""Add an interactive input option to the table.
|
|
|
|
|
|
|
|
|
|
Input options allow users to specify cmdlet arguments interactively,
|
|
|
|
|
like choosing a download location or source.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
option: InputOption definition
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Self for chaining
|
|
|
|
|
"""
|
|
|
|
|
self.input_options[option.name] = option
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
def select_option(self, option_name: str, prompt: str = "") -> Optional[str]:
|
|
|
|
|
"""Interactively get user input for a specific option.
|
|
|
|
|
|
|
|
|
|
Displays the option choices (if enum) and prompts user for input.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
option_name: Name of the option to get input for
|
|
|
|
|
prompt: Custom prompt text (uses option description if not provided)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
User's selected/entered value, or None if cancelled
|
|
|
|
|
"""
|
|
|
|
|
if option_name not in self.input_options:
|
|
|
|
|
print(f"Unknown option: {option_name}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
option = self.input_options[option_name]
|
|
|
|
|
prompt_text = prompt or option.description or option_name
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
# For enum options, show choices
|
|
|
|
|
if option.type == "enum" and option.choices:
|
|
|
|
|
print(f"\n{prompt_text}")
|
|
|
|
|
for i, choice in enumerate(option.choices, 1):
|
|
|
|
|
print(f" {i}. {choice}")
|
|
|
|
|
|
|
|
|
|
choice_input = input(f"Select {option_name} (1-{len(option.choices)}, or 'q' to cancel): ").strip()
|
|
|
|
|
|
|
|
|
|
if choice_input.lower() == 'q':
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
idx = int(choice_input) - 1
|
|
|
|
|
if 0 <= idx < len(option.choices):
|
|
|
|
|
return option.choices[idx]
|
|
|
|
|
print(f"Invalid choice. Enter 1-{len(option.choices)}")
|
|
|
|
|
except ValueError:
|
|
|
|
|
print(f"Invalid choice. Enter 1-{len(option.choices)}")
|
|
|
|
|
|
|
|
|
|
# For string/integer options, get direct input
|
|
|
|
|
elif option.type in ("string", "integer"):
|
|
|
|
|
value = input(f"{prompt_text} (or 'q' to cancel): ").strip()
|
|
|
|
|
|
|
|
|
|
if value.lower() == 'q':
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Validate if validator provided
|
|
|
|
|
if option.validator and not option.validator(value):
|
|
|
|
|
print(f"Invalid value for {option_name}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Type conversion
|
|
|
|
|
if option.type == "integer":
|
|
|
|
|
try:
|
|
|
|
|
int(value)
|
|
|
|
|
except ValueError:
|
|
|
|
|
print(f"Must be an integer")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
# For flag options
|
|
|
|
|
elif option.type == "flag":
|
|
|
|
|
response = input(f"{prompt_text} (y/n): ").strip().lower()
|
|
|
|
|
if response == 'q':
|
|
|
|
|
return None
|
|
|
|
|
return "true" if response in ('y', 'yes', 'true') else "false"
|
|
|
|
|
|
|
|
|
|
except (ValueError, EOFError):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def get_all_options(self) -> Dict[str, str]:
|
|
|
|
|
"""Get all input options at once with user prompts.
|
|
|
|
|
|
|
|
|
|
Interactively prompts user for all registered options.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dictionary mapping option names to selected values
|
|
|
|
|
"""
|
|
|
|
|
result = {}
|
|
|
|
|
for name, option in self.input_options.items():
|
|
|
|
|
value = self.select_option(name)
|
|
|
|
|
if value is not None:
|
|
|
|
|
result[name] = value
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def select_by_index(self, index: int) -> Optional[ResultRow]:
|
|
|
|
|
"""Get a row by 1-based index (user-friendly).
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
index: 1-based index
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
ResultRow if valid, None otherwise
|
|
|
|
|
"""
|
|
|
|
|
idx = index - 1
|
|
|
|
|
if 0 <= idx < len(self.rows):
|
|
|
|
|
return self.rows[idx]
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# TUI-specific formatting methods
|
|
|
|
|
|
|
|
|
|
def to_datatable_rows(self, source: str = "unknown") -> List[List[str]]:
|
|
|
|
|
"""Convert results to rows suitable for Textual DataTable widget.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
source: Source type for formatting context (openlibrary, soulseek, etc.)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List of row value lists
|
|
|
|
|
"""
|
|
|
|
|
rows = []
|
|
|
|
|
for result in self.rows:
|
|
|
|
|
row_values = self._format_datatable_row(result, source)
|
|
|
|
|
rows.append(row_values)
|
|
|
|
|
return rows
|
|
|
|
|
|
|
|
|
|
def _format_datatable_row(self, row: ResultRow, source: str = "unknown") -> List[str]:
|
|
|
|
|
"""Format a ResultRow for DataTable display.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
row: ResultRow to format
|
|
|
|
|
source: Source type
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List of column values as strings
|
|
|
|
|
"""
|
|
|
|
|
# Extract values from row columns
|
|
|
|
|
values = [col.value for col in row.columns]
|
|
|
|
|
|
|
|
|
|
# Truncate to reasonable lengths for table display
|
|
|
|
|
return [v[:60] if len(v) > 60 else v for v in values]
|
|
|
|
|
|
|
|
|
|
def to_result_cards(self) -> List[TUIResultCard]:
|
|
|
|
|
"""Convert all rows to TUIResultCard objects for card-based UI display.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List of TUIResultCard objects
|
|
|
|
|
"""
|
|
|
|
|
cards = []
|
|
|
|
|
for row in self.rows:
|
|
|
|
|
card = self._row_to_card(row)
|
|
|
|
|
cards.append(card)
|
|
|
|
|
return cards
|
|
|
|
|
|
|
|
|
|
def _row_to_card(self, row: ResultRow) -> TUIResultCard:
|
|
|
|
|
"""Convert a ResultRow to a TUIResultCard.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
row: ResultRow to convert
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
TUIResultCard with extracted metadata
|
|
|
|
|
"""
|
|
|
|
|
# Build metadata dict from row columns
|
|
|
|
|
metadata = {}
|
|
|
|
|
title = ""
|
|
|
|
|
|
|
|
|
|
for col in row.columns:
|
|
|
|
|
if col.name.lower() == "title":
|
|
|
|
|
title = col.value
|
|
|
|
|
metadata[col.name] = col.value
|
|
|
|
|
|
|
|
|
|
# Extract tags if present
|
|
|
|
|
tags = []
|
|
|
|
|
if "tags" in metadata:
|
|
|
|
|
tags_val = metadata["tags"]
|
|
|
|
|
if tags_val:
|
|
|
|
|
tags = [t.strip() for t in tags_val.split(",")][:5]
|
|
|
|
|
|
|
|
|
|
# Try to find useful metadata fields
|
|
|
|
|
subtitle = metadata.get("Artist", metadata.get("Author", ""))
|
|
|
|
|
media_kind = metadata.get("Type", metadata.get("Media Kind", ""))
|
|
|
|
|
file_size = metadata.get("Size", "")
|
|
|
|
|
duration = metadata.get("Duration", "")
|
|
|
|
|
file_hash = metadata.get("Hash", "")
|
|
|
|
|
|
|
|
|
|
return TUIResultCard(
|
|
|
|
|
title=title or "Unknown",
|
|
|
|
|
subtitle=subtitle,
|
|
|
|
|
metadata=metadata,
|
|
|
|
|
media_kind=media_kind,
|
|
|
|
|
tags=tags,
|
|
|
|
|
file_hash=file_hash or None,
|
|
|
|
|
file_size=file_size or None,
|
|
|
|
|
duration=duration or None
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def build_metadata_tree(self, tree_widget: "Tree") -> None:
|
|
|
|
|
"""Populate a Textual Tree widget with result metadata hierarchy.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
tree_widget: Textual Tree widget to populate
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
ImportError: If Textual not available
|
|
|
|
|
"""
|
|
|
|
|
if not TEXTUAL_AVAILABLE:
|
|
|
|
|
raise ImportError("Textual not available for tree building")
|
|
|
|
|
|
|
|
|
|
tree_widget.reset()
|
|
|
|
|
root = tree_widget.root
|
|
|
|
|
|
|
|
|
|
# Add each row as a top-level node
|
|
|
|
|
for i, row in enumerate(self.rows, 1):
|
|
|
|
|
row_node = root.add(f"[bold]Result {i}[/bold]")
|
|
|
|
|
|
|
|
|
|
# Add columns as children
|
|
|
|
|
for col in row.columns:
|
|
|
|
|
value_str = col.value
|
|
|
|
|
if len(value_str) > 100:
|
|
|
|
|
value_str = value_str[:97] + "..."
|
|
|
|
|
row_node.add_leaf(f"[cyan]{col.name}[/cyan]: {value_str}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _format_duration(duration: Any) -> str:
|
|
|
|
|
"""Format duration value as human-readable string.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
duration: Duration in seconds, milliseconds, or already formatted string
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Formatted duration string (e.g., "2h 18m 5s", "5m 30s")
|
|
|
|
|
"""
|
|
|
|
|
if isinstance(duration, str):
|
|
|
|
|
return duration if duration else ""
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Convert to seconds if needed
|
|
|
|
|
if isinstance(duration, (int, float)):
|
|
|
|
|
seconds = int(duration)
|
|
|
|
|
if seconds < 1000: # Likely already in seconds
|
|
|
|
|
pass
|
|
|
|
|
else: # Likely in milliseconds
|
|
|
|
|
seconds = seconds // 1000
|
|
|
|
|
else:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
hours = seconds // 3600
|
|
|
|
|
minutes = (seconds % 3600) // 60
|
|
|
|
|
secs = seconds % 60
|
|
|
|
|
|
|
|
|
|
if hours > 0:
|
|
|
|
|
return f"{hours}h {minutes}m {secs}s"
|
|
|
|
|
elif minutes > 0:
|
|
|
|
|
return f"{minutes}m {secs}s"
|
|
|
|
|
else:
|
|
|
|
|
return f"{secs}s"
|
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _format_size(size: Any) -> str:
|
|
|
|
|
"""Format file size as human-readable string.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
size: Size in bytes or already formatted string
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Formatted size string (e.g., "1.5 MB", "250 KB")
|
|
|
|
|
"""
|
|
|
|
|
if isinstance(size, str):
|
|
|
|
|
return size if size else ""
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
bytes_val = int(size)
|
|
|
|
|
if bytes_val < 0:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
for unit, divisor in [("GB", 1024**3), ("MB", 1024**2), ("KB", 1024)]:
|
|
|
|
|
if bytes_val >= divisor:
|
|
|
|
|
return f"{bytes_val / divisor:.1f} {unit}"
|
|
|
|
|
|
|
|
|
|
return f"{bytes_val} B"
|
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_result(result: Any, title: str = "") -> str:
|
|
|
|
|
"""Quick function to format a single result or list of results.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
result: Result object, list of results, or dict
|
|
|
|
|
title: Optional title for the table
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Formatted string
|
|
|
|
|
"""
|
|
|
|
|
table = ResultTable(title)
|
|
|
|
|
|
|
|
|
|
if isinstance(result, list):
|
|
|
|
|
for item in result:
|
|
|
|
|
table.add_result(item)
|
|
|
|
|
else:
|
|
|
|
|
table.add_result(result)
|
|
|
|
|
|
|
|
|
|
return str(table)
|