Files
Medios-Macina/models.py

803 lines
29 KiB
Python
Raw Normal View History

2025-11-25 20:09:33 -08:00
"""Data models for the pipeline."""
import datetime
import hashlib
import json
import math
import os
import shutil
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Protocol, TextIO, Tuple
@dataclass(slots=True)
class PipeObject:
2025-12-11 23:21:45 -08:00
"""Unified pipeline object for tracking files, metadata, tag values, and relationships through the pipeline.
2025-11-25 20:09:33 -08:00
2025-12-11 12:47:30 -08:00
This is the single source of truth for all result data in the pipeline. Uses the hash+store
canonical pattern for file identification.
2025-11-25 20:09:33 -08:00
Attributes:
2025-12-11 12:47:30 -08:00
hash: SHA-256 hash of the file (canonical identifier)
store: Storage backend name (e.g., 'default', 'hydrus', 'test', 'home')
2025-12-11 23:21:45 -08:00
tag: List of extracted or assigned tag values
2025-11-25 20:09:33 -08:00
title: Human-readable title if applicable
source_url: URL where the object came from
duration: Duration in seconds if applicable
metadata: Full metadata dictionary from source
warnings: Any warnings or issues encountered
2025-12-11 12:47:30 -08:00
path: Path to the file if this object represents a file
relationships: Relationship data (king/alt/related hashes)
2025-11-25 20:09:33 -08:00
is_temp: If True, this is a temporary/intermediate artifact that may be cleaned up
2025-12-11 12:47:30 -08:00
action: The cmdlet that created this object (format: 'cmdlet:cmdlet_name')
parent_hash: Hash of the parent file in the pipeline chain (for tracking provenance/lineage)
2025-11-25 20:09:33 -08:00
extra: Additional fields not covered above
"""
2025-12-11 12:47:30 -08:00
hash: str
store: str
2025-12-11 23:21:45 -08:00
tag: List[str] = field(default_factory=list)
2025-11-25 20:09:33 -08:00
title: Optional[str] = None
2025-12-11 12:47:30 -08:00
url: Optional[str] = None
2025-11-25 20:09:33 -08:00
source_url: Optional[str] = None
duration: Optional[float] = None
metadata: Dict[str, Any] = field(default_factory=dict)
warnings: List[str] = field(default_factory=list)
2025-12-11 12:47:30 -08:00
path: Optional[str] = None
relationships: Dict[str, Any] = field(default_factory=dict)
2025-11-25 20:09:33 -08:00
is_temp: bool = False
action: Optional[str] = None
2025-12-11 12:47:30 -08:00
parent_hash: Optional[str] = None
2025-11-25 20:09:33 -08:00
extra: Dict[str, Any] = field(default_factory=dict)
2025-12-11 12:47:30 -08:00
def add_relationship(self, rel_type: str, rel_hash: str) -> None:
"""Add a relationship hash.
Args:
rel_type: Relationship type ('king', 'alt', 'related')
rel_hash: Hash to add to the relationship
"""
if rel_type not in self.relationships:
self.relationships[rel_type] = []
if isinstance(self.relationships[rel_type], list):
if rel_hash not in self.relationships[rel_type]:
self.relationships[rel_type].append(rel_hash)
else:
# Single value (e.g., king), convert to that value
self.relationships[rel_type] = rel_hash
2025-11-25 20:09:33 -08:00
def get_relationships(self) -> Dict[str, Any]:
"""Get all relationships for this object."""
2025-12-11 12:47:30 -08:00
return self.relationships.copy() if self.relationships else {}
def debug_table(self) -> None:
"""Print a formatted debug table showing PipeObject state.
Only prints when debug logging is enabled. Useful for tracking
object state throughout the pipeline.
"""
try:
2025-12-11 19:04:02 -08:00
from SYS.logger import is_debug_enabled, debug
2025-12-16 01:45:01 -08:00
import shutil
2025-12-11 12:47:30 -08:00
if not is_debug_enabled():
return
except Exception:
return
# Prepare display values
2025-12-16 01:45:01 -08:00
hash_display = str(self.hash or "N/A")
store_display = str(self.store or "N/A")
title_display = str(self.title or "N/A")
2025-12-11 23:21:45 -08:00
tag_display = ", ".join(self.tag[:3]) if self.tag else "[]"
if len(self.tag) > 3:
tag_display += f" (+{len(self.tag) - 3} more)"
2025-12-16 01:45:01 -08:00
file_path_display = str(self.path or "N/A")
2025-12-13 00:18:30 -08:00
url_display: Any = self.url or "N/A"
if isinstance(url_display, (list, tuple, set)):
parts = [str(x) for x in url_display if x]
url_display = ", ".join(parts) if parts else "N/A"
2025-12-16 01:45:01 -08:00
else:
2025-12-13 00:18:30 -08:00
url_display = str(url_display)
2025-12-11 12:47:30 -08:00
relationships_display = "N/A"
if self.relationships:
rel_parts = []
for key, val in self.relationships.items():
if isinstance(val, list):
rel_parts.append(f"{key}({len(val)})")
else:
rel_parts.append(key)
relationships_display = ", ".join(rel_parts)
warnings_display = f"{len(self.warnings)} warning(s)" if self.warnings else "none"
2025-12-16 01:45:01 -08:00
def _fit(text: str, max_len: int) -> str:
if max_len <= 0:
return ""
if len(text) <= max_len:
return text
if max_len <= 3:
return text[:max_len]
return text[: max_len - 3] + "..."
# Compute box width from terminal size, but never allow overflow.
try:
term_cols = int(getattr(shutil.get_terminal_size((120, 20)), "columns", 120))
except Exception:
term_cols = 120
box_inner_max = max(60, term_cols - 3) # line length = box_inner + 3
rows = [
("Hash", hash_display),
("Store", store_display),
("Title", title_display),
("Tag", tag_display),
("URL", str(url_display)),
("File Path", file_path_display),
("Relationships", relationships_display),
("Warnings", warnings_display),
]
label_width = max(len(k) for k, _ in rows)
# Estimate a good inner width from current content, capped to terminal.
base_contents = [f"{k:<{label_width}} : {v}" for k, v in rows]
desired_inner = max([len("PipeObject Debug Info"), *[len(x) for x in base_contents], 60])
box_inner = min(desired_inner, box_inner_max)
def _line(content: str) -> str:
return f"{_fit(content, box_inner):<{box_inner}}"
2025-12-11 12:47:30 -08:00
# Print table
2025-12-16 01:45:01 -08:00
debug("" + ("" * (box_inner + 1)) + "")
debug(_line("PipeObject Debug Info"))
debug("" + ("" * (box_inner + 1)) + "")
for key, val in rows:
content = f"{key:<{label_width}} : {val}"
debug(_line(content))
2025-12-11 12:47:30 -08:00
# Show extra keys as individual rows
if self.extra:
2025-12-16 01:45:01 -08:00
debug("" + ("" * (box_inner + 1)) + "")
debug(_line("Extra Fields:"))
2025-12-11 12:47:30 -08:00
for key, val in self.extra.items():
# Format value for display
if isinstance(val, (list, set)):
val_display = f"{type(val).__name__}({len(val)})"
elif isinstance(val, dict):
val_display = f"dict({len(val)})"
elif isinstance(val, (int, float)):
val_display = str(val)
else:
val_str = str(val)
val_display = val_str if len(val_str) <= 40 else val_str[:37] + "..."
# Truncate key if needed
2025-12-16 01:45:01 -08:00
key_display = str(key)
key_display = key_display if len(key_display) <= 15 else key_display[:12] + "..."
content = f" {key_display:<15}: {val_display}"
debug(_line(content))
2025-12-14 00:53:52 -08:00
# If we have structured provider metadata, expand it for debugging.
full_md = self.extra.get("full_metadata")
if isinstance(full_md, dict) and full_md:
2025-12-16 01:45:01 -08:00
debug("" + ("" * (box_inner + 1)) + "")
debug(_line("full_metadata:"))
2025-12-14 00:53:52 -08:00
for md_key in sorted(full_md.keys(), key=lambda x: str(x)):
md_val = full_md.get(md_key)
if isinstance(md_val, (str, int, float)) or md_val is None or isinstance(md_val, bool):
md_display = str(md_val)
elif isinstance(md_val, list):
if len(md_val) <= 6 and all(isinstance(x, (str, int, float, bool)) or x is None for x in md_val):
md_display = "[" + ", ".join(str(x) for x in md_val) + "]"
else:
md_display = f"list({len(md_val)})"
elif isinstance(md_val, dict):
# Avoid dumping huge nested dicts (like raw provider docs).
keys = list(md_val.keys())
preview = ",".join(str(k) for k in keys[:6])
md_display = f"dict({len(keys)})[{preview}{',...' if len(keys) > 6 else ''}]"
else:
md_str = str(md_val)
md_display = md_str if len(md_str) <= 40 else md_str[:37] + "..."
md_key_display = str(md_key)
md_key_display = md_key_display if len(md_key_display) <= 15 else md_key_display[:12] + "..."
2025-12-16 01:45:01 -08:00
content = f" {md_key_display:<15}: {md_display}"
debug(_line(content))
2025-12-11 12:47:30 -08:00
if self.action:
debug("├─────────────────────────────────────────────────────────────┤")
action_display = self.action[:48]
debug(f"│ Action : {action_display:<48}")
if self.parent_hash:
if not self.action:
debug("├─────────────────────────────────────────────────────────────┤")
parent_display = self.parent_hash[:12] + "..." if len(self.parent_hash) > 12 else self.parent_hash
debug(f"│ Parent Hash : {parent_display:<48}")
debug("└─────────────────────────────────────────────────────────────┘")
2025-11-25 20:09:33 -08:00
def to_dict(self) -> Dict[str, Any]:
"""Serialize to dictionary, excluding None and empty values."""
data: Dict[str, Any] = {
2025-12-11 12:47:30 -08:00
"hash": self.hash,
"store": self.store,
2025-11-25 20:09:33 -08:00
}
2025-12-11 12:47:30 -08:00
2025-12-11 23:21:45 -08:00
if self.tag:
data["tag"] = self.tag
2025-11-25 20:09:33 -08:00
if self.title:
data["title"] = self.title
2025-12-11 12:47:30 -08:00
if self.url:
data["url"] = self.url
2025-11-25 20:09:33 -08:00
if self.source_url:
data["source_url"] = self.source_url
if self.duration is not None:
data["duration"] = self.duration
if self.metadata:
data["metadata"] = self.metadata
if self.warnings:
data["warnings"] = self.warnings
2025-12-11 12:47:30 -08:00
if self.path:
data["path"] = self.path
if self.relationships:
data["relationships"] = self.relationships
2025-11-25 20:09:33 -08:00
if self.is_temp:
data["is_temp"] = self.is_temp
if self.action:
data["action"] = self.action
2025-12-11 12:47:30 -08:00
if self.parent_hash:
data["parent_hash"] = self.parent_hash
# Add extra fields
2025-11-25 20:09:33 -08:00
data.update({k: v for k, v in self.extra.items() if v is not None})
return data
class FileRelationshipTracker:
"""Track relationships between files for sidecar creation.
Allows tagging files with their relationships to other files:
- king: The primary/master version of a file
- alt: Alternate versions of the same content
- related: Related files (e.g., screenshots of a book)
"""
def __init__(self) -> None:
self.relationships: Dict[str, Dict[str, Any]] = {}
def register_king(self, file_path: str, file_hash: str) -> None:
"""Register a file as the king (primary) version."""
if file_path not in self.relationships:
self.relationships[file_path] = {}
self.relationships[file_path]["king"] = file_hash
def add_alt(self, file_path: str, alt_hash: str) -> None:
"""Add an alternate version of a file."""
if file_path not in self.relationships:
self.relationships[file_path] = {}
if "alt" not in self.relationships[file_path]:
self.relationships[file_path]["alt"] = []
if alt_hash not in self.relationships[file_path]["alt"]:
self.relationships[file_path]["alt"].append(alt_hash)
def add_related(self, file_path: str, related_hash: str) -> None:
"""Add a related file."""
if file_path not in self.relationships:
self.relationships[file_path] = {}
if "related" not in self.relationships[file_path]:
self.relationships[file_path]["related"] = []
if related_hash not in self.relationships[file_path]["related"]:
self.relationships[file_path]["related"].append(related_hash)
def get_relationships(self, file_path: str) -> Optional[Dict[str, Any]]:
"""Get relationships for a file."""
return self.relationships.get(file_path)
def link_files(self, primary_path: str, king_hash: str, *alt_paths: str) -> None:
"""Link files together with primary as king and others as alternates.
Args:
primary_path: Path to the primary file (will be marked as 'king')
king_hash: Hash of the primary file
alt_paths: Paths to alternate versions (will be marked as 'alt')
"""
self.register_king(primary_path, king_hash)
for alt_path in alt_paths:
try:
alt_hash = _get_file_hash(alt_path)
self.add_alt(primary_path, alt_hash)
except Exception as e:
import sys
print(f"Error hashing {alt_path}: {e}", file=sys.stderr)
def _get_file_hash(filepath: str) -> str:
"""Calculate SHA256 hash of a file."""
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
# ============= Download Module Classes =============
class DownloadError(RuntimeError):
"""Raised when the download or Hydrus import fails."""
@dataclass(slots=True)
class DownloadOptions:
"""Configuration for downloading media.
Use the add-file cmdlet separately for Hydrus import.
"""
url: str
mode: str # "audio" or "video"
output_dir: Path
cookies_path: Optional[Path] = None
ytdl_format: Optional[str] = None
extra_tags: Optional[List[str]] = None
debug_log: Optional[Path] = None
native_progress: bool = False
clip_sections: Optional[str] = None
playlist_items: Optional[str] = None # yt-dlp --playlist-items format (e.g., "1-3,5,8")
no_playlist: bool = False # If True, pass --no-playlist to yt-dlp
2025-12-11 12:47:30 -08:00
quiet: bool = False # If True, suppress all console output (progress, debug logs)
2025-12-16 23:23:43 -08:00
embed_chapters: bool = False # If True, pass yt-dlp --embed-chapters / embedchapters
write_sub: bool = False # If True, download subtitles (writesubtitles/writeautomaticsub)
2025-11-25 20:09:33 -08:00
class SendFunc(Protocol):
"""Protocol for event sender function."""
def __call__(self, event: str, **payload: Any) -> None:
...
@dataclass(slots=True)
class DownloadMediaResult:
"""Result of a successful media download."""
path: Path
info: Dict[str, Any]
2025-12-11 23:21:45 -08:00
tag: List[str]
2025-11-25 20:09:33 -08:00
source_url: Optional[str]
hash_value: Optional[str] = None
2025-12-01 14:42:30 -08:00
paths: Optional[List[Path]] = None # For multiple files (e.g., section downloads)
2025-11-25 20:09:33 -08:00
@dataclass(slots=True)
class DebugLogger:
"""Logs events to a JSON debug file for troubleshooting downloads."""
path: Path
file: Optional[TextIO] = None
session_started: bool = False
def ensure_open(self) -> None:
"""Open the debug log file if not already open."""
if self.file is not None:
return
try:
parent = self.path.parent
if parent and not parent.exists():
parent.mkdir(parents=True, exist_ok=True)
self.file = self.path.open("a", encoding="utf-8")
except OSError as exc: # pragma: no cover - surfaces to stderr
print(f"Failed to open debug log {self.path}: {exc}", file=sys.stderr)
self.file = None
return
self._write_session_header()
def _write_session_header(self) -> None:
"""Write session start marker to log."""
if self.session_started:
return
self.session_started = True
self.write_record("session-start", {"pid": os.getpid(), "exe": sys.executable})
def write_raw(self, text: str) -> None:
"""Write raw text to debug log."""
self.ensure_open()
if self.file is None:
return
self.file.write(text + "\n")
self.file.flush()
def write_record(self, event: str, payload: Optional[Dict[str, Any]] = None) -> None:
"""Write a structured event record to debug log."""
record = {
"timestamp": datetime.datetime.utcnow().isoformat(timespec="seconds") + "Z",
"event": event,
"payload": payload,
}
self.write_raw(json.dumps(_sanitise_for_json(record), ensure_ascii=False))
def close(self) -> None:
"""Close the debug log file."""
if self.file is None:
return
try:
self.file.close()
finally:
self.file = None
def _sanitise_for_json(value: Any, *, max_depth: int = 8, _seen: Optional[set[int]] = None) -> Any:
"""Best-effort conversion to JSON-serialisable types without raising on cycles."""
import math
from dataclasses import asdict, is_dataclass
if value is None or isinstance(value, (str, bool)):
return value
if isinstance(value, (int, float)):
if isinstance(value, float) and not math.isfinite(value):
return repr(value)
return value
if isinstance(value, Path):
return str(value)
if isinstance(value, bytes):
try:
return value.decode()
except Exception:
return value.hex()
if max_depth <= 0:
return repr(value)
if _seen is None:
_seen = set()
obj_id = id(value)
if obj_id in _seen:
return "<circular>"
_seen.add(obj_id)
try:
if isinstance(value, dict):
return {
str(key): _sanitise_for_json(val, max_depth=max_depth - 1, _seen=_seen)
for key, val in value.items()
}
if isinstance(value, (list, tuple, set)):
iterable = value if not isinstance(value, set) else list(value)
return [
_sanitise_for_json(item, max_depth=max_depth - 1, _seen=_seen)
for item in iterable
]
if is_dataclass(value) and not isinstance(value, type):
return _sanitise_for_json(asdict(value), max_depth=max_depth - 1, _seen=_seen)
finally:
_seen.discard(obj_id)
return repr(value)
class ProgressBar:
"""Formats download progress with visual bar, speed, ETA, and file size."""
def __init__(self, width: Optional[int] = None):
"""Initialize progress bar with optional custom width.
Args:
width: Terminal width, defaults to auto-detect.
"""
if width is None:
width = shutil.get_terminal_size((80, 20))[0]
self.width = max(40, width) # Minimum 40 chars for readability
def format_bytes(self, bytes_val: Optional[float]) -> str:
"""Format bytes to human-readable size.
Args:
bytes_val: Number of bytes or None.
Returns:
Formatted string (e.g., "123.4 MB", "1.2 GB").
"""
if bytes_val is None or bytes_val <= 0:
return "?.? B"
for unit in ("B", "KB", "MB", "GB", "TB"):
if bytes_val < 1024:
return f"{bytes_val:.1f} {unit}"
bytes_val /= 1024
return f"{bytes_val:.1f} PB"
def format_speed(self, speed_str: Optional[str]) -> str:
"""Format download speed.
Args:
speed_str: Speed string from yt-dlp (e.g., "1.23MiB/s").
Returns:
Formatted speed string or "?.? KB/s".
"""
if not speed_str or speed_str.strip() == "":
return "?.? KB/s"
return speed_str.strip()
def format_eta(self, eta_str: Optional[str]) -> str:
"""Format estimated time remaining.
Args:
eta_str: ETA string from yt-dlp (e.g., "00:12:34").
Returns:
Formatted ETA string or "?:?:?".
"""
if not eta_str or eta_str.strip() == "":
return "?:?:?"
return eta_str.strip()
def format_percent(self, percent_str: Optional[str]) -> float:
"""Extract percent as float.
Args:
percent_str: Percent string from yt-dlp (e.g., "45.2%").
Returns:
Float 0-100 or 0 if invalid.
"""
if not percent_str:
return 0.0
try:
return float(percent_str.replace("%", "").strip())
except ValueError:
return 0.0
def build_bar(self, percent: float, width: int = 30) -> str:
"""Build ASCII progress bar.
Args:
percent: Completion percentage (0-100).
width: Bar width in characters.
Returns:
Progress bar string (e.g., "[████████░░░░░░░░░░░░░░░░░░]").
"""
percent = max(0, min(100, percent)) # Clamp to 0-100
filled = int(percent * width / 100)
empty = width - filled
# Use box-drawing characters for nice appearance
bar = "" * filled + "" * empty
return f"[{bar}]"
def format_progress(
self,
percent_str: Optional[str] = None,
downloaded: Optional[int] = None,
total: Optional[int] = None,
speed_str: Optional[str] = None,
eta_str: Optional[str] = None,
) -> str:
"""Format complete progress line.
Args:
percent_str: Percent string (e.g., "45.2%").
downloaded: Downloaded bytes.
total: Total bytes.
speed_str: Speed string (e.g., "1.23MiB/s").
eta_str: ETA string (e.g., "00:12:34").
Returns:
Formatted progress string.
"""
percent = self.format_percent(percent_str)
2025-12-13 12:09:50 -08:00
# Some callers (e.g. yt-dlp hooks) may not provide a stable percent string.
# When we have byte counts, derive percent from them so the bar advances.
if (not percent_str or percent == 0.0) and downloaded is not None and total is not None and total > 0:
try:
percent = (float(downloaded) / float(total)) * 100.0
except Exception:
percent = percent
2025-11-25 20:09:33 -08:00
bar = self.build_bar(percent)
# Format sizes
if downloaded is not None and total is not None and total > 0:
size_str = f"{self.format_bytes(downloaded)} / {self.format_bytes(total)}"
elif total is not None and total > 0:
size_str = f"/ {self.format_bytes(total)}"
elif downloaded is not None and downloaded > 0:
size_str = f"{self.format_bytes(downloaded)} downloaded"
else:
size_str = ""
speed = self.format_speed(speed_str)
eta = self.format_eta(eta_str)
# Build complete line
# Format: [████░░░░] 45.2% | 125.5 MB / 278.3 MB | 1.23 MB/s | ETA 00:12:34
parts = [
bar,
f"{percent:5.1f}%",
]
if size_str:
parts.append(f"| {size_str}")
parts.append(f"| {speed}")
parts.append(f"| ETA {eta}")
return " ".join(parts)
def format_summary(
self,
total: Optional[int] = None,
speed_str: Optional[str] = None,
elapsed_str: Optional[str] = None,
) -> str:
"""Format completion summary.
Args:
total: Total bytes downloaded.
speed_str: Average speed.
elapsed_str: Total time elapsed.
Returns:
Summary string.
"""
parts = ["✓ Download complete"]
if total is not None and total > 0:
parts.append(f"| {self.format_bytes(total)}")
if speed_str:
parts.append(f"| {speed_str.strip()}")
if elapsed_str:
parts.append(f"| {elapsed_str.strip()}")
return " ".join(parts)
# ============================================================================
# PIPELINE EXECUTION CONTEXT
# Consolidated from pipeline_context.py
# ============================================================================
# Note: Pipeline functions and state variables moved to pipeline.py
class PipelineStageContext:
"""Context information for the current pipeline stage."""
2025-12-11 12:47:30 -08:00
def __init__(self, stage_index: int, total_stages: int, worker_id: Optional[str] = None):
2025-11-25 20:09:33 -08:00
self.stage_index = stage_index
self.total_stages = total_stages
self.is_last_stage = (stage_index == total_stages - 1)
2025-12-11 12:47:30 -08:00
self.worker_id = worker_id
2025-11-25 20:09:33 -08:00
self.emits: List[Any] = []
def emit(self, obj: Any) -> None:
"""Emit an object to the next pipeline stage."""
self.emits.append(obj)
2025-12-11 12:47:30 -08:00
def get_current_command_text(self) -> str:
"""Get the current command text (for backward compatibility)."""
# This is maintained for backward compatibility with old code
# In a real implementation, this would come from the stage context
return ""
2025-11-25 20:09:33 -08:00
def __repr__(self) -> str:
2025-12-11 12:47:30 -08:00
return f"PipelineStageContext(stage={self.stage_index}/{self.total_stages}, is_last={self.is_last_stage}, worker_id={self.worker_id})"
2025-11-25 20:09:33 -08:00
# ============================================================================
# RESULT TABLE CLASSES
# Consolidated from result_table.py
# ============================================================================
@dataclass
class InputOption:
"""Represents an interactive input option (cmdlet argument) in a table.
Allows users to select options that translate to cmdlet arguments,
enabling interactive configuration right from the result table.
Example:
# Create an option for location selection
location_opt = InputOption(
"location",
type="enum",
choices=["local", "hydrus", "0x0"],
description="Download destination"
)
# Use in result table
table.add_input_option(location_opt)
selected = table.select_option("location") # Returns user choice
"""
name: str
"""Option name (maps to cmdlet argument)"""
type: str = "string"
"""Option type: 'string', 'enum', 'flag', 'integer'"""
choices: List[str] = field(default_factory=list)
"""Valid choices for enum type"""
default: Optional[str] = None
"""Default value if not specified"""
description: str = ""
"""Description of what this option does"""
validator: Optional[Callable[[str], bool]] = None
"""Optional validator function: takes value, returns True if valid"""
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"name": self.name,
"type": self.type,
"choices": self.choices if self.choices else None,
"default": self.default,
"description": self.description,
}
@dataclass
class TUIResultCard:
"""Represents a result as a UI card with title, metadata, and actions.
Used in hub-ui and TUI contexts to render individual search results
as grouped components with visual structure.
"""
title: str
subtitle: Optional[str] = None
metadata: Optional[Dict[str, str]] = None
media_kind: Optional[str] = None
2025-12-11 23:21:45 -08:00
tag: Optional[List[str]] = None
2025-11-25 20:09:33 -08:00
file_hash: Optional[str] = None
file_size: Optional[str] = None
duration: Optional[str] = None
def __post_init__(self):
"""Initialize default values."""
if self.metadata is None:
self.metadata = {}
2025-12-11 23:21:45 -08:00
if self.tag is None:
self.tag = []
2025-11-25 20:09:33 -08:00
@dataclass
class ResultColumn:
"""Represents a single column in a result table."""
name: str
value: str
width: Optional[int] = None
def __str__(self) -> str:
"""String representation of the column."""
return f"{self.name}: {self.value}"
def to_dict(self) -> Dict[str, str]:
"""Convert to dictionary."""
return {"name": self.name, "value": self.value}
@dataclass
class ResultRow:
"""Represents a single row in a result table."""
columns: List[ResultColumn] = field(default_factory=list)
def add_column(self, name: str, value: Any) -> None:
"""Add a column to this row."""
str_value = str(value) if value is not None else ""
self.columns.append(ResultColumn(name, str_value))
def get_column(self, name: str) -> Optional[str]:
"""Get column value by name."""
for col in self.columns:
if col.name.lower() == name.lower():
return col.value
return None
def to_dict(self) -> List[Dict[str, str]]:
"""Convert to list of column dicts."""
return [col.to_dict() for col in self.columns]
def to_list(self) -> List[tuple[str, str]]:
"""Convert to list of (name, value) tuples."""
return [(col.name, col.value) for col in self.columns]
def __str__(self) -> str:
"""String representation of the row."""
return " | ".join(str(col) for col in self.columns)