AST
This commit is contained in:
678
models.py
Normal file
678
models.py
Normal file
@@ -0,0 +1,678 @@
|
||||
"""Data models for the pipeline."""
|
||||
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional, Protocol, TextIO, Tuple
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class PipeObject:
|
||||
"""Unified pipeline object for tracking files, metadata, tags, and relationships through the pipeline.
|
||||
|
||||
This is the single source of truth for all result data in the pipeline. It can represent:
|
||||
- Tag extraction results (IMDb, MusicBrainz, OpenLibrary lookups)
|
||||
- Remote metadata fetches
|
||||
- File operations with metadata/tags and relationship tracking
|
||||
- Search results
|
||||
- Files with version relationships (king/alt/related)
|
||||
|
||||
Attributes:
|
||||
source: Source of the object (e.g., 'imdb', 'musicbrainz', 'libgen', 'debrid', 'file', etc.)
|
||||
identifier: Unique identifier from the source (e.g., IMDb ID, MBID, magnet hash, file hash)
|
||||
tags: List of extracted or assigned tags
|
||||
title: Human-readable title if applicable
|
||||
source_url: URL where the object came from
|
||||
duration: Duration in seconds if applicable
|
||||
metadata: Full metadata dictionary from source
|
||||
remote_metadata: Additional remote metadata
|
||||
warnings: Any warnings or issues encountered
|
||||
mpv_metadata: MPV-specific metadata if applicable
|
||||
file_path: Path to the file if this object represents a file
|
||||
file_hash: SHA-256 hash of the file for integrity and relationship tracking
|
||||
king_hash: Hash of the primary/master version of this file (for alternates)
|
||||
alt_hashes: List of hashes for alternate versions of this file
|
||||
related_hashes: List of hashes for related files (e.g., screenshots, editions)
|
||||
is_temp: If True, this is a temporary/intermediate artifact that may be cleaned up
|
||||
action: The cmdlet that created this object (format: 'cmdlet:cmdlet_name', e.g., 'cmdlet:get-file')
|
||||
parent_id: Hash of the parent file in the pipeline chain (for tracking provenance/lineage)
|
||||
extra: Additional fields not covered above
|
||||
"""
|
||||
source: str
|
||||
identifier: str
|
||||
tags: List[str] = field(default_factory=list)
|
||||
title: Optional[str] = None
|
||||
source_url: Optional[str] = None
|
||||
duration: Optional[float] = None
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
remote_metadata: Optional[Dict[str, Any]] = None
|
||||
warnings: List[str] = field(default_factory=list)
|
||||
mpv_metadata: Optional[Dict[str, Any]] = None
|
||||
file_path: Optional[str] = None
|
||||
file_hash: Optional[str] = None
|
||||
king_hash: Optional[str] = None
|
||||
alt_hashes: List[str] = field(default_factory=list)
|
||||
related_hashes: List[str] = field(default_factory=list)
|
||||
is_temp: bool = False
|
||||
action: Optional[str] = None
|
||||
parent_id: Optional[str] = None
|
||||
extra: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def register_as_king(self, file_hash: str) -> None:
|
||||
"""Register this object as the king (primary) version of a file."""
|
||||
self.king_hash = file_hash
|
||||
|
||||
def add_alternate(self, alt_hash: str) -> None:
|
||||
"""Add an alternate version hash for this file."""
|
||||
if alt_hash not in self.alt_hashes:
|
||||
self.alt_hashes.append(alt_hash)
|
||||
|
||||
def add_related(self, related_hash: str) -> None:
|
||||
"""Add a related file hash (e.g., screenshot, edition)."""
|
||||
if related_hash not in self.related_hashes:
|
||||
self.related_hashes.append(related_hash)
|
||||
|
||||
def get_relationships(self) -> Dict[str, Any]:
|
||||
"""Get all relationships for this object."""
|
||||
rels = {}
|
||||
if self.king_hash:
|
||||
rels["king"] = self.king_hash
|
||||
if self.alt_hashes:
|
||||
rels["alt"] = self.alt_hashes
|
||||
if self.related_hashes:
|
||||
rels["related"] = self.related_hashes
|
||||
return rels
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Serialize to dictionary, excluding None and empty values."""
|
||||
data: Dict[str, Any] = {
|
||||
"source": self.source,
|
||||
"tags": self.tags,
|
||||
}
|
||||
if self.identifier:
|
||||
data["id"] = self.identifier
|
||||
if self.title:
|
||||
data["title"] = self.title
|
||||
if self.source_url:
|
||||
data["source_url"] = self.source_url
|
||||
if self.duration is not None:
|
||||
data["duration"] = self.duration
|
||||
if self.metadata:
|
||||
data["metadata"] = self.metadata
|
||||
if self.remote_metadata is not None:
|
||||
data["remote_metadata"] = self.remote_metadata
|
||||
if self.mpv_metadata is not None:
|
||||
data["mpv_metadata"] = self.mpv_metadata
|
||||
if self.warnings:
|
||||
data["warnings"] = self.warnings
|
||||
if self.file_path:
|
||||
data["file_path"] = self.file_path
|
||||
if self.file_hash:
|
||||
data["file_hash"] = self.file_hash
|
||||
# Include pipeline chain tracking fields
|
||||
if self.is_temp:
|
||||
data["is_temp"] = self.is_temp
|
||||
if self.action:
|
||||
data["action"] = self.action
|
||||
if self.parent_id:
|
||||
data["parent_id"] = self.parent_id
|
||||
# Include relationship data if present
|
||||
rels = self.get_relationships()
|
||||
if rels:
|
||||
data["relationships"] = rels
|
||||
data.update({k: v for k, v in self.extra.items() if v is not None})
|
||||
return data
|
||||
|
||||
@property
|
||||
def hash(self) -> str:
|
||||
"""Compute SHA-256 hash from source and identifier."""
|
||||
base = f"{self.source}:{self.identifier}"
|
||||
return hashlib.sha256(base.encode('utf-8')).hexdigest()
|
||||
|
||||
# Backwards compatibility aliases
|
||||
def as_dict(self) -> Dict[str, Any]:
|
||||
"""Alias for to_dict() for backwards compatibility."""
|
||||
return self.to_dict()
|
||||
|
||||
def to_serializable(self) -> Dict[str, Any]:
|
||||
"""Alias for to_dict() for backwards compatibility."""
|
||||
return self.to_dict()
|
||||
|
||||
|
||||
class FileRelationshipTracker:
|
||||
"""Track relationships between files for sidecar creation.
|
||||
|
||||
Allows tagging files with their relationships to other files:
|
||||
- king: The primary/master version of a file
|
||||
- alt: Alternate versions of the same content
|
||||
- related: Related files (e.g., screenshots of a book)
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.relationships: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
def register_king(self, file_path: str, file_hash: str) -> None:
|
||||
"""Register a file as the king (primary) version."""
|
||||
if file_path not in self.relationships:
|
||||
self.relationships[file_path] = {}
|
||||
self.relationships[file_path]["king"] = file_hash
|
||||
|
||||
def add_alt(self, file_path: str, alt_hash: str) -> None:
|
||||
"""Add an alternate version of a file."""
|
||||
if file_path not in self.relationships:
|
||||
self.relationships[file_path] = {}
|
||||
if "alt" not in self.relationships[file_path]:
|
||||
self.relationships[file_path]["alt"] = []
|
||||
if alt_hash not in self.relationships[file_path]["alt"]:
|
||||
self.relationships[file_path]["alt"].append(alt_hash)
|
||||
|
||||
def add_related(self, file_path: str, related_hash: str) -> None:
|
||||
"""Add a related file."""
|
||||
if file_path not in self.relationships:
|
||||
self.relationships[file_path] = {}
|
||||
if "related" not in self.relationships[file_path]:
|
||||
self.relationships[file_path]["related"] = []
|
||||
if related_hash not in self.relationships[file_path]["related"]:
|
||||
self.relationships[file_path]["related"].append(related_hash)
|
||||
|
||||
def get_relationships(self, file_path: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get relationships for a file."""
|
||||
return self.relationships.get(file_path)
|
||||
|
||||
def link_files(self, primary_path: str, king_hash: str, *alt_paths: str) -> None:
|
||||
"""Link files together with primary as king and others as alternates.
|
||||
|
||||
Args:
|
||||
primary_path: Path to the primary file (will be marked as 'king')
|
||||
king_hash: Hash of the primary file
|
||||
alt_paths: Paths to alternate versions (will be marked as 'alt')
|
||||
"""
|
||||
self.register_king(primary_path, king_hash)
|
||||
for alt_path in alt_paths:
|
||||
try:
|
||||
alt_hash = _get_file_hash(alt_path)
|
||||
self.add_alt(primary_path, alt_hash)
|
||||
except Exception as e:
|
||||
import sys
|
||||
print(f"Error hashing {alt_path}: {e}", file=sys.stderr)
|
||||
|
||||
|
||||
def _get_file_hash(filepath: str) -> str:
|
||||
"""Calculate SHA256 hash of a file."""
|
||||
sha256_hash = hashlib.sha256()
|
||||
with open(filepath, "rb") as f:
|
||||
for byte_block in iter(lambda: f.read(4096), b""):
|
||||
sha256_hash.update(byte_block)
|
||||
return sha256_hash.hexdigest()
|
||||
|
||||
|
||||
# ============= Download Module Classes =============
|
||||
|
||||
class DownloadError(RuntimeError):
|
||||
"""Raised when the download or Hydrus import fails."""
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class DownloadOptions:
|
||||
"""Configuration for downloading media.
|
||||
|
||||
Use the add-file cmdlet separately for Hydrus import.
|
||||
"""
|
||||
url: str
|
||||
mode: str # "audio" or "video"
|
||||
output_dir: Path
|
||||
cookies_path: Optional[Path] = None
|
||||
ytdl_format: Optional[str] = None
|
||||
extra_tags: Optional[List[str]] = None
|
||||
debug_log: Optional[Path] = None
|
||||
native_progress: bool = False
|
||||
clip_sections: Optional[str] = None
|
||||
playlist_items: Optional[str] = None # yt-dlp --playlist-items format (e.g., "1-3,5,8")
|
||||
no_playlist: bool = False # If True, pass --no-playlist to yt-dlp
|
||||
|
||||
|
||||
class SendFunc(Protocol):
|
||||
"""Protocol for event sender function."""
|
||||
def __call__(self, event: str, **payload: Any) -> None:
|
||||
...
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class DownloadMediaResult:
|
||||
"""Result of a successful media download."""
|
||||
path: Path
|
||||
info: Dict[str, Any]
|
||||
tags: List[str]
|
||||
source_url: Optional[str]
|
||||
hash_value: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class DebugLogger:
|
||||
"""Logs events to a JSON debug file for troubleshooting downloads."""
|
||||
path: Path
|
||||
file: Optional[TextIO] = None
|
||||
session_started: bool = False
|
||||
|
||||
def ensure_open(self) -> None:
|
||||
"""Open the debug log file if not already open."""
|
||||
if self.file is not None:
|
||||
return
|
||||
try:
|
||||
parent = self.path.parent
|
||||
if parent and not parent.exists():
|
||||
parent.mkdir(parents=True, exist_ok=True)
|
||||
self.file = self.path.open("a", encoding="utf-8")
|
||||
except OSError as exc: # pragma: no cover - surfaces to stderr
|
||||
print(f"Failed to open debug log {self.path}: {exc}", file=sys.stderr)
|
||||
self.file = None
|
||||
return
|
||||
self._write_session_header()
|
||||
|
||||
def _write_session_header(self) -> None:
|
||||
"""Write session start marker to log."""
|
||||
if self.session_started:
|
||||
return
|
||||
self.session_started = True
|
||||
self.write_record("session-start", {"pid": os.getpid(), "exe": sys.executable})
|
||||
|
||||
def write_raw(self, text: str) -> None:
|
||||
"""Write raw text to debug log."""
|
||||
self.ensure_open()
|
||||
if self.file is None:
|
||||
return
|
||||
self.file.write(text + "\n")
|
||||
self.file.flush()
|
||||
|
||||
def write_record(self, event: str, payload: Optional[Dict[str, Any]] = None) -> None:
|
||||
"""Write a structured event record to debug log."""
|
||||
record = {
|
||||
"timestamp": datetime.datetime.utcnow().isoformat(timespec="seconds") + "Z",
|
||||
"event": event,
|
||||
"payload": payload,
|
||||
}
|
||||
self.write_raw(json.dumps(_sanitise_for_json(record), ensure_ascii=False))
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the debug log file."""
|
||||
if self.file is None:
|
||||
return
|
||||
try:
|
||||
self.file.close()
|
||||
finally:
|
||||
self.file = None
|
||||
|
||||
|
||||
def _sanitise_for_json(value: Any, *, max_depth: int = 8, _seen: Optional[set[int]] = None) -> Any:
|
||||
"""Best-effort conversion to JSON-serialisable types without raising on cycles."""
|
||||
import math
|
||||
from dataclasses import asdict, is_dataclass
|
||||
|
||||
if value is None or isinstance(value, (str, bool)):
|
||||
return value
|
||||
if isinstance(value, (int, float)):
|
||||
if isinstance(value, float) and not math.isfinite(value):
|
||||
return repr(value)
|
||||
return value
|
||||
if isinstance(value, Path):
|
||||
return str(value)
|
||||
if isinstance(value, bytes):
|
||||
try:
|
||||
return value.decode()
|
||||
except Exception:
|
||||
return value.hex()
|
||||
|
||||
if max_depth <= 0:
|
||||
return repr(value)
|
||||
|
||||
if _seen is None:
|
||||
_seen = set()
|
||||
|
||||
obj_id = id(value)
|
||||
if obj_id in _seen:
|
||||
return "<circular>"
|
||||
|
||||
_seen.add(obj_id)
|
||||
try:
|
||||
if isinstance(value, dict):
|
||||
return {
|
||||
str(key): _sanitise_for_json(val, max_depth=max_depth - 1, _seen=_seen)
|
||||
for key, val in value.items()
|
||||
}
|
||||
if isinstance(value, (list, tuple, set)):
|
||||
iterable = value if not isinstance(value, set) else list(value)
|
||||
return [
|
||||
_sanitise_for_json(item, max_depth=max_depth - 1, _seen=_seen)
|
||||
for item in iterable
|
||||
]
|
||||
if is_dataclass(value) and not isinstance(value, type):
|
||||
return _sanitise_for_json(asdict(value), max_depth=max_depth - 1, _seen=_seen)
|
||||
finally:
|
||||
_seen.discard(obj_id)
|
||||
|
||||
return repr(value)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# PROGRESS BAR CLASS
|
||||
# ============================================================================
|
||||
|
||||
class ProgressBar:
|
||||
"""Formats download progress with visual bar, speed, ETA, and file size."""
|
||||
|
||||
def __init__(self, width: Optional[int] = None):
|
||||
"""Initialize progress bar with optional custom width.
|
||||
|
||||
Args:
|
||||
width: Terminal width, defaults to auto-detect.
|
||||
"""
|
||||
if width is None:
|
||||
width = shutil.get_terminal_size((80, 20))[0]
|
||||
self.width = max(40, width) # Minimum 40 chars for readability
|
||||
|
||||
def format_bytes(self, bytes_val: Optional[float]) -> str:
|
||||
"""Format bytes to human-readable size.
|
||||
|
||||
Args:
|
||||
bytes_val: Number of bytes or None.
|
||||
|
||||
Returns:
|
||||
Formatted string (e.g., "123.4 MB", "1.2 GB").
|
||||
"""
|
||||
if bytes_val is None or bytes_val <= 0:
|
||||
return "?.? B"
|
||||
|
||||
for unit in ("B", "KB", "MB", "GB", "TB"):
|
||||
if bytes_val < 1024:
|
||||
return f"{bytes_val:.1f} {unit}"
|
||||
bytes_val /= 1024
|
||||
|
||||
return f"{bytes_val:.1f} PB"
|
||||
|
||||
def format_speed(self, speed_str: Optional[str]) -> str:
|
||||
"""Format download speed.
|
||||
|
||||
Args:
|
||||
speed_str: Speed string from yt-dlp (e.g., "1.23MiB/s").
|
||||
|
||||
Returns:
|
||||
Formatted speed string or "?.? KB/s".
|
||||
"""
|
||||
if not speed_str or speed_str.strip() == "":
|
||||
return "?.? KB/s"
|
||||
return speed_str.strip()
|
||||
|
||||
def format_eta(self, eta_str: Optional[str]) -> str:
|
||||
"""Format estimated time remaining.
|
||||
|
||||
Args:
|
||||
eta_str: ETA string from yt-dlp (e.g., "00:12:34").
|
||||
|
||||
Returns:
|
||||
Formatted ETA string or "?:?:?".
|
||||
"""
|
||||
if not eta_str or eta_str.strip() == "":
|
||||
return "?:?:?"
|
||||
return eta_str.strip()
|
||||
|
||||
def format_percent(self, percent_str: Optional[str]) -> float:
|
||||
"""Extract percent as float.
|
||||
|
||||
Args:
|
||||
percent_str: Percent string from yt-dlp (e.g., "45.2%").
|
||||
|
||||
Returns:
|
||||
Float 0-100 or 0 if invalid.
|
||||
"""
|
||||
if not percent_str:
|
||||
return 0.0
|
||||
try:
|
||||
return float(percent_str.replace("%", "").strip())
|
||||
except ValueError:
|
||||
return 0.0
|
||||
|
||||
def build_bar(self, percent: float, width: int = 30) -> str:
|
||||
"""Build ASCII progress bar.
|
||||
|
||||
Args:
|
||||
percent: Completion percentage (0-100).
|
||||
width: Bar width in characters.
|
||||
|
||||
Returns:
|
||||
Progress bar string (e.g., "[████████░░░░░░░░░░░░░░░░░░]").
|
||||
"""
|
||||
percent = max(0, min(100, percent)) # Clamp to 0-100
|
||||
filled = int(percent * width / 100)
|
||||
empty = width - filled
|
||||
|
||||
# Use box-drawing characters for nice appearance
|
||||
bar = "█" * filled + "░" * empty
|
||||
return f"[{bar}]"
|
||||
|
||||
def format_progress(
|
||||
self,
|
||||
percent_str: Optional[str] = None,
|
||||
downloaded: Optional[int] = None,
|
||||
total: Optional[int] = None,
|
||||
speed_str: Optional[str] = None,
|
||||
eta_str: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Format complete progress line.
|
||||
|
||||
Args:
|
||||
percent_str: Percent string (e.g., "45.2%").
|
||||
downloaded: Downloaded bytes.
|
||||
total: Total bytes.
|
||||
speed_str: Speed string (e.g., "1.23MiB/s").
|
||||
eta_str: ETA string (e.g., "00:12:34").
|
||||
|
||||
Returns:
|
||||
Formatted progress string.
|
||||
"""
|
||||
percent = self.format_percent(percent_str)
|
||||
bar = self.build_bar(percent)
|
||||
|
||||
# Format sizes
|
||||
if downloaded is not None and total is not None and total > 0:
|
||||
size_str = f"{self.format_bytes(downloaded)} / {self.format_bytes(total)}"
|
||||
elif total is not None and total > 0:
|
||||
size_str = f"/ {self.format_bytes(total)}"
|
||||
elif downloaded is not None and downloaded > 0:
|
||||
size_str = f"{self.format_bytes(downloaded)} downloaded"
|
||||
else:
|
||||
size_str = ""
|
||||
|
||||
speed = self.format_speed(speed_str)
|
||||
eta = self.format_eta(eta_str)
|
||||
|
||||
# Build complete line
|
||||
# Format: [████░░░░] 45.2% | 125.5 MB / 278.3 MB | 1.23 MB/s | ETA 00:12:34
|
||||
parts = [
|
||||
bar,
|
||||
f"{percent:5.1f}%",
|
||||
]
|
||||
|
||||
if size_str:
|
||||
parts.append(f"| {size_str}")
|
||||
|
||||
parts.append(f"| {speed}")
|
||||
parts.append(f"| ETA {eta}")
|
||||
|
||||
return " ".join(parts)
|
||||
|
||||
def format_summary(
|
||||
self,
|
||||
total: Optional[int] = None,
|
||||
speed_str: Optional[str] = None,
|
||||
elapsed_str: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Format completion summary.
|
||||
|
||||
Args:
|
||||
total: Total bytes downloaded.
|
||||
speed_str: Average speed.
|
||||
elapsed_str: Total time elapsed.
|
||||
|
||||
Returns:
|
||||
Summary string.
|
||||
"""
|
||||
parts = ["✓ Download complete"]
|
||||
|
||||
if total is not None and total > 0:
|
||||
parts.append(f"| {self.format_bytes(total)}")
|
||||
|
||||
if speed_str:
|
||||
parts.append(f"| {speed_str.strip()}")
|
||||
|
||||
if elapsed_str:
|
||||
parts.append(f"| {elapsed_str.strip()}")
|
||||
|
||||
return " ".join(parts)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# PIPELINE EXECUTION CONTEXT
|
||||
# Consolidated from pipeline_context.py
|
||||
# ============================================================================
|
||||
# Note: Pipeline functions and state variables moved to pipeline.py
|
||||
|
||||
class PipelineStageContext:
|
||||
"""Context information for the current pipeline stage."""
|
||||
|
||||
def __init__(self, stage_index: int, total_stages: int):
|
||||
self.stage_index = stage_index
|
||||
self.total_stages = total_stages
|
||||
self.is_last_stage = (stage_index == total_stages - 1)
|
||||
self.emits: List[Any] = []
|
||||
|
||||
def emit(self, obj: Any) -> None:
|
||||
"""Emit an object to the next pipeline stage."""
|
||||
self.emits.append(obj)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"PipelineStageContext(stage={self.stage_index}/{self.total_stages}, is_last={self.is_last_stage})"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# RESULT TABLE CLASSES
|
||||
# Consolidated from result_table.py
|
||||
# ============================================================================
|
||||
|
||||
@dataclass
|
||||
class InputOption:
|
||||
"""Represents an interactive input option (cmdlet argument) in a table.
|
||||
|
||||
Allows users to select options that translate to cmdlet arguments,
|
||||
enabling interactive configuration right from the result table.
|
||||
|
||||
Example:
|
||||
# Create an option for location selection
|
||||
location_opt = InputOption(
|
||||
"location",
|
||||
type="enum",
|
||||
choices=["local", "hydrus", "0x0"],
|
||||
description="Download destination"
|
||||
)
|
||||
|
||||
# Use in result table
|
||||
table.add_input_option(location_opt)
|
||||
selected = table.select_option("location") # Returns user choice
|
||||
"""
|
||||
name: str
|
||||
"""Option name (maps to cmdlet argument)"""
|
||||
type: str = "string"
|
||||
"""Option type: 'string', 'enum', 'flag', 'integer'"""
|
||||
choices: List[str] = field(default_factory=list)
|
||||
"""Valid choices for enum type"""
|
||||
default: Optional[str] = None
|
||||
"""Default value if not specified"""
|
||||
description: str = ""
|
||||
"""Description of what this option does"""
|
||||
validator: Optional[Callable[[str], bool]] = None
|
||||
"""Optional validator function: takes value, returns True if valid"""
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return {
|
||||
"name": self.name,
|
||||
"type": self.type,
|
||||
"choices": self.choices if self.choices else None,
|
||||
"default": self.default,
|
||||
"description": self.description,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class TUIResultCard:
|
||||
"""Represents a result as a UI card with title, metadata, and actions.
|
||||
|
||||
Used in hub-ui and TUI contexts to render individual search results
|
||||
as grouped components with visual structure.
|
||||
"""
|
||||
title: str
|
||||
subtitle: Optional[str] = None
|
||||
metadata: Optional[Dict[str, str]] = None
|
||||
media_kind: Optional[str] = None
|
||||
tags: Optional[List[str]] = None
|
||||
file_hash: Optional[str] = None
|
||||
file_size: Optional[str] = None
|
||||
duration: Optional[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
"""Initialize default values."""
|
||||
if self.metadata is None:
|
||||
self.metadata = {}
|
||||
if self.tags is None:
|
||||
self.tags = []
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResultColumn:
|
||||
"""Represents a single column in a result table."""
|
||||
name: str
|
||||
value: str
|
||||
width: Optional[int] = None
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""String representation of the column."""
|
||||
return f"{self.name}: {self.value}"
|
||||
|
||||
def to_dict(self) -> Dict[str, str]:
|
||||
"""Convert to dictionary."""
|
||||
return {"name": self.name, "value": self.value}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResultRow:
|
||||
"""Represents a single row in a result table."""
|
||||
columns: List[ResultColumn] = field(default_factory=list)
|
||||
|
||||
def add_column(self, name: str, value: Any) -> None:
|
||||
"""Add a column to this row."""
|
||||
str_value = str(value) if value is not None else ""
|
||||
self.columns.append(ResultColumn(name, str_value))
|
||||
|
||||
def get_column(self, name: str) -> Optional[str]:
|
||||
"""Get column value by name."""
|
||||
for col in self.columns:
|
||||
if col.name.lower() == name.lower():
|
||||
return col.value
|
||||
return None
|
||||
|
||||
def to_dict(self) -> List[Dict[str, str]]:
|
||||
"""Convert to list of column dicts."""
|
||||
return [col.to_dict() for col in self.columns]
|
||||
|
||||
def to_list(self) -> List[tuple[str, str]]:
|
||||
"""Convert to list of (name, value) tuples."""
|
||||
return [(col.name, col.value) for col in self.columns]
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""String representation of the row."""
|
||||
return " | ".join(str(col) for col in self.columns)
|
||||
Reference in New Issue
Block a user