from __future__ import annotations import re from abc import ABC, abstractmethod from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple, Callable @dataclass class SearchResult: """Unified search result format across all search providers.""" table: str # Provider name: "libgen", "soulseek", "bandcamp", "youtube", etc. title: str # Display title/filename path: str # Download target (URL, path, magnet, identifier) detail: str = "" # Additional description annotations: List[str] = field( default_factory=list ) # Tags: ["120MB", "flac", "ready"] media_kind: str = "other" # Type: "book", "audio", "video", "game", "magnet" size_bytes: Optional[int] = None tag: set[str] = field(default_factory=set) # Searchable tag values columns: List[Tuple[str, str]] = field(default_factory=list) # Display columns selection_action: Optional[Dict[str, Any]] = None selection_args: Optional[List[str]] = None full_metadata: Dict[str, Any] = field(default_factory=dict) # Extra metadata def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for pipeline processing.""" out = { "table": self.table, "title": self.title, "path": self.path, "detail": self.detail, "annotations": self.annotations, "media_kind": self.media_kind, "size_bytes": self.size_bytes, "tag": list(self.tag), "columns": list(self.columns), "full_metadata": self.full_metadata, } try: url_value = getattr(self, "url", None) if url_value is not None: out["url"] = url_value except Exception: pass try: selection_args = getattr(self, "selection_args", None) except Exception: selection_args = None if selection_args is None: try: fm = getattr(self, "full_metadata", None) if isinstance(fm, dict): selection_args = fm.get("_selection_args") or fm.get("selection_args") except Exception: selection_args = None if selection_args: out["_selection_args"] = selection_args try: selection_action = getattr(self, "selection_action", None) except Exception: selection_action = None if selection_action is None: try: fm = getattr(self, "full_metadata", None) if isinstance(fm, dict): selection_action = fm.get("_selection_action") or fm.get("selection_action") except Exception: selection_action = None if selection_action: normalized = [str(x) for x in selection_action if x is not None] if normalized: out["_selection_action"] = normalized return out def parse_inline_query_arguments(raw_query: str) -> Tuple[str, Dict[str, str]]: """Extract inline key:value arguments from a provider search query.""" query_text = str(raw_query or "").strip() if not query_text: return "", {} tokens = re.split(r"[,\s]+", query_text) leftover: List[str] = [] parsed_args: Dict[str, str] = {} for token in tokens: if not token: continue sep_index = token.find(":") if sep_index < 0: sep_index = token.find("=") if sep_index > 0: key = token[:sep_index].strip().lower() value = token[sep_index + 1 :].strip() if key and value: parsed_args[key] = value continue leftover.append(token) return " ".join(leftover).strip(), parsed_args class Provider(ABC): """Unified provider base class. This replaces the older split between "search providers" and "file providers". Concrete providers may implement any subset of: - search(query, ...) - download(result, output_dir) - upload(file_path, ...) - login(...) - validate() """ URL: Sequence[str] = () # Optional provider-driven defaults for what to do when a user selects @N from a # provider table. The CLI uses this to auto-insert stages (e.g. download-file) # without hardcoding table names. # # Example: # TABLE_AUTO_STAGES = {"youtube": ["download-file"]} # TABLE_AUTO_PREFIXES = {"tidal": ["download-file"]} # matches tidal.* TABLE_AUTO_STAGES: Dict[str, Sequence[str]] = {} TABLE_AUTO_PREFIXES: Dict[str, Sequence[str]] = {} AUTO_STAGE_USE_SELECTION_ARGS: bool = False # Optional provider-declared configuration keys. # Used for dynamically generating config panels (e.g., missing credentials). REQUIRED_CONFIG_KEYS: Sequence[str] = () def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or {} self.name = self.__class__.__name__.lower() @classmethod def config(cls) -> List[Dict[str, Any]]: """Return configuration schema for this provider. Returns a list of dicts, each defining a field: { "key": "api_key", "label": "API Key", "default": "", "required": True, "secret": True, "choices": ["Option 1", "Option 2"] } """ return [] @classmethod def required_config_keys(cls) -> List[str]: keys = getattr(cls, "REQUIRED_CONFIG_KEYS", None) if not keys: return [] out: List[str] = [] try: for k in list(keys): s = str(k or "").strip() if s: out.append(s) except Exception: return [] return out def extract_query_arguments(self, query: str) -> Tuple[str, Dict[str, Any]]: """Allow providers to normalize query text and parse inline arguments.""" normalized = str(query or "").strip() return normalized, {} # Standard lifecycle/auth hook. def login(self, **_kwargs: Any) -> bool: return True def search( self, query: str, limit: int = 50, filters: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[SearchResult]: """Search for items matching the query.""" raise NotImplementedError(f"Provider '{self.name}' does not support search") def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]: """Download an item from a search result.""" return None def download_items( self, result: SearchResult, output_dir: Path, *, emit: Callable[[Path, str, str, Dict[str, Any]], None], progress: Any, quiet_mode: bool, path_from_result: Callable[[Any], Path], config: Optional[Dict[str, Any]] = None, ) -> int: """Optional multi-item download hook (default no-op).""" _ = result _ = output_dir _ = emit _ = progress _ = quiet_mode _ = path_from_result _ = config return 0 def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]: """Optional provider override to parse and act on URLs.""" _ = url _ = output_dir return False, None def upload(self, file_path: str, **kwargs: Any) -> str: """Upload a file and return a URL or identifier.""" raise NotImplementedError(f"Provider '{self.name}' does not support upload") def validate(self) -> bool: """Check if provider is available and properly configured.""" return True def selector( self, selected_items: List[Any], *, ctx: Any, stage_is_last: bool = True, **_kwargs: Any ) -> bool: """Optional hook for handling `@N` selection semantics. The CLI can delegate selection behavior to a provider/store instead of applying the default selection filtering. Return True if the selection was handled and default behavior should be skipped. """ _ = selected_items _ = ctx _ = stage_is_last return False @classmethod def selection_auto_stage( cls, table_type: str, stage_args: Optional[Sequence[str]] = None, ) -> Optional[List[str]]: """Return a stage to auto-run after selecting from `table_type`. This is used by the CLI to auto-insert default stages for provider tables (e.g. select a YouTube row -> auto-run download-file). Providers can implement this via class attributes (TABLE_AUTO_STAGES / TABLE_AUTO_PREFIXES) or by overriding this method. """ t = str(table_type or "").strip().lower() if not t: return None stage: Optional[Sequence[str]] = None try: stage = cls.TABLE_AUTO_STAGES.get(t) except Exception: stage = None if stage is None: try: for prefix, cmd in (cls.TABLE_AUTO_PREFIXES or {}).items(): p = str(prefix or "").strip().lower() if not p: continue if t == p or t.startswith(p + ".") or t.startswith(p): stage = cmd break except Exception: stage = None if not stage: return None out = [str(x) for x in stage if str(x or "").strip()] if not out: return None if cls.AUTO_STAGE_USE_SELECTION_ARGS and stage_args: try: out.extend([str(x) for x in stage_args if str(x or "").strip()]) except Exception: pass return out @classmethod def url_patterns(cls) -> Tuple[str, ...]: """Return normalized URL patterns that this provider handles.""" patterns: List[str] = [] maybe_urls = getattr(cls, "URL", None) if isinstance(maybe_urls, (list, tuple)): for entry in maybe_urls: try: candidate = str(entry or "").strip().lower() except Exception: continue if candidate: patterns.append(candidate) maybe_domains = getattr(cls, "URL_DOMAINS", None) if isinstance(maybe_domains, (list, tuple)): for entry in maybe_domains: try: candidate = str(entry or "").strip().lower() except Exception: continue if candidate and candidate not in patterns: patterns.append(candidate) return tuple(patterns) class SearchProvider(Provider): """Compatibility alias for older code. Prefer inheriting from Provider directly. """ class FileProvider(Provider): """Compatibility alias for older code. Prefer inheriting from Provider directly. """