Files
Medios-Macina/ProviderCore/base.py

348 lines
11 KiB
Python
Raw Normal View History

2025-12-11 19:04:02 -08:00
from __future__ import annotations
import re
2025-12-11 19:04:02 -08:00
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
2026-01-07 05:09:59 -08:00
from typing import Any, Dict, List, Optional, Sequence, Tuple, Callable
2025-12-11 19:04:02 -08:00
@dataclass
class SearchResult:
"""Unified search result format across all search providers."""
table: str # Provider name: "libgen", "soulseek", "bandcamp", "youtube", etc.
title: str # Display title/filename
path: str # Download target (URL, path, magnet, identifier)
detail: str = "" # Additional description
annotations: List[str] = field(
default_factory=list
) # Tags: ["120MB", "flac", "ready"]
2025-12-11 19:04:02 -08:00
media_kind: str = "other" # Type: "book", "audio", "video", "game", "magnet"
size_bytes: Optional[int] = None
2025-12-11 23:21:45 -08:00
tag: set[str] = field(default_factory=set) # Searchable tag values
2025-12-11 19:04:02 -08:00
columns: List[Tuple[str, str]] = field(default_factory=list) # Display columns
full_metadata: Dict[str, Any] = field(default_factory=dict) # Extra metadata
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for pipeline processing."""
2026-01-05 07:51:19 -08:00
out = {
2025-12-11 19:04:02 -08:00
"table": self.table,
"title": self.title,
"path": self.path,
"detail": self.detail,
"annotations": self.annotations,
"media_kind": self.media_kind,
"size_bytes": self.size_bytes,
2025-12-11 23:21:45 -08:00
"tag": list(self.tag),
2025-12-11 19:04:02 -08:00
"columns": list(self.columns),
"full_metadata": self.full_metadata,
}
2026-01-05 07:51:19 -08:00
try:
url_value = getattr(self, "url", None)
if url_value is not None:
out["url"] = url_value
except Exception:
pass
try:
selection_args = getattr(self, "selection_args", None)
except Exception:
selection_args = None
if selection_args is None:
try:
fm = getattr(self, "full_metadata", None)
if isinstance(fm, dict):
selection_args = fm.get("_selection_args") or fm.get("selection_args")
except Exception:
selection_args = None
if selection_args:
out["_selection_args"] = selection_args
2026-01-07 05:09:59 -08:00
try:
selection_action = getattr(self, "selection_action", None)
except Exception:
selection_action = None
if selection_action is None:
try:
fm = getattr(self, "full_metadata", None)
if isinstance(fm, dict):
selection_action = fm.get("_selection_action") or fm.get("selection_action")
except Exception:
selection_action = None
if selection_action:
normalized = [str(x) for x in selection_action if x is not None]
if normalized:
out["_selection_action"] = normalized
2026-01-05 07:51:19 -08:00
return out
2025-12-11 19:04:02 -08:00
def parse_inline_query_arguments(raw_query: str) -> Tuple[str, Dict[str, str]]:
"""Extract inline key:value arguments from a provider search query."""
query_text = str(raw_query or "").strip()
if not query_text:
return "", {}
tokens = re.split(r"[,\s]+", query_text)
leftover: List[str] = []
parsed_args: Dict[str, str] = {}
for token in tokens:
if not token:
continue
sep_index = token.find(":")
if sep_index < 0:
sep_index = token.find("=")
if sep_index > 0:
key = token[:sep_index].strip().lower()
value = token[sep_index + 1 :].strip()
if key and value:
parsed_args[key] = value
continue
leftover.append(token)
return " ".join(leftover).strip(), parsed_args
2025-12-19 02:29:42 -08:00
class Provider(ABC):
"""Unified provider base class.
This replaces the older split between "search providers" and "file providers".
Concrete providers may implement any subset of:
- search(query, ...)
- download(result, output_dir)
- upload(file_path, ...)
- login(...)
- validate()
"""
2025-12-11 19:04:02 -08:00
2026-01-01 20:37:27 -08:00
URL: Sequence[str] = ()
2026-01-03 03:37:48 -08:00
# Optional provider-driven defaults for what to do when a user selects @N from a
# provider table. The CLI uses this to auto-insert stages (e.g. download-file)
# without hardcoding table names.
#
# Example:
# TABLE_AUTO_STAGES = {"youtube": ["download-file"]}
# TABLE_AUTO_PREFIXES = {"hifi": ["download-file"]} # matches hifi.*
TABLE_AUTO_STAGES: Dict[str, Sequence[str]] = {}
TABLE_AUTO_PREFIXES: Dict[str, Sequence[str]] = {}
AUTO_STAGE_USE_SELECTION_ARGS: bool = False
# Optional provider-declared configuration keys.
# Used for dynamically generating config panels (e.g., missing credentials).
REQUIRED_CONFIG_KEYS: Sequence[str] = ()
2025-12-11 19:04:02 -08:00
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.name = self.__class__.__name__.lower()
2026-01-11 03:24:49 -08:00
@classmethod
def config(cls) -> List[Dict[str, Any]]:
"""Return configuration schema for this provider.
Returns a list of dicts, each defining a field:
{
"key": "api_key",
"label": "API Key",
"default": "",
"required": True,
2026-01-11 03:47:25 -08:00
"secret": True,
"choices": ["Option 1", "Option 2"]
2026-01-11 03:24:49 -08:00
}
"""
return []
2026-01-03 03:37:48 -08:00
@classmethod
def required_config_keys(cls) -> List[str]:
keys = getattr(cls, "REQUIRED_CONFIG_KEYS", None)
if not keys:
return []
out: List[str] = []
try:
for k in list(keys):
s = str(k or "").strip()
if s:
out.append(s)
except Exception:
return []
return out
def extract_query_arguments(self, query: str) -> Tuple[str, Dict[str, Any]]:
"""Allow providers to normalize query text and parse inline arguments."""
normalized = str(query or "").strip()
return normalized, {}
2025-12-19 02:29:42 -08:00
# Standard lifecycle/auth hook.
def login(self, **_kwargs: Any) -> bool:
return True
2025-12-11 19:04:02 -08:00
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str,
Any]] = None,
2025-12-11 19:04:02 -08:00
**kwargs: Any,
) -> List[SearchResult]:
"""Search for items matching the query."""
2025-12-19 02:29:42 -08:00
raise NotImplementedError(f"Provider '{self.name}' does not support search")
2025-12-11 19:04:02 -08:00
def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]:
"""Download an item from a search result."""
return None
2026-01-07 05:09:59 -08:00
def download_items(
self,
result: SearchResult,
output_dir: Path,
*,
emit: Callable[[Path, str, str, Dict[str, Any]], None],
progress: Any,
quiet_mode: bool,
path_from_result: Callable[[Any], Path],
config: Optional[Dict[str, Any]] = None,
) -> int:
"""Optional multi-item download hook (default no-op)."""
_ = result
_ = output_dir
_ = emit
_ = progress
_ = quiet_mode
_ = path_from_result
_ = config
return 0
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]:
"""Optional provider override to parse and act on URLs."""
_ = url
_ = output_dir
return False, None
2025-12-19 02:29:42 -08:00
def upload(self, file_path: str, **kwargs: Any) -> str:
"""Upload a file and return a URL or identifier."""
raise NotImplementedError(f"Provider '{self.name}' does not support upload")
2025-12-11 19:04:02 -08:00
def validate(self) -> bool:
"""Check if provider is available and properly configured."""
return True
2025-12-29 17:05:03 -08:00
def selector(
self,
selected_items: List[Any],
*,
ctx: Any,
stage_is_last: bool = True,
**_kwargs: Any
2025-12-29 17:05:03 -08:00
) -> bool:
2025-12-19 02:29:42 -08:00
"""Optional hook for handling `@N` selection semantics.
2025-12-11 19:04:02 -08:00
2025-12-19 02:29:42 -08:00
The CLI can delegate selection behavior to a provider/store instead of
applying the default selection filtering.
2025-12-11 19:04:02 -08:00
2025-12-19 02:29:42 -08:00
Return True if the selection was handled and default behavior should be skipped.
"""
2025-12-11 19:04:02 -08:00
2025-12-19 02:29:42 -08:00
_ = selected_items
_ = ctx
_ = stage_is_last
return False
2025-12-11 19:04:02 -08:00
2026-01-03 03:37:48 -08:00
@classmethod
def selection_auto_stage(
cls,
table_type: str,
stage_args: Optional[Sequence[str]] = None,
) -> Optional[List[str]]:
"""Return a stage to auto-run after selecting from `table_type`.
This is used by the CLI to auto-insert default stages for provider tables
(e.g. select a YouTube row -> auto-run download-file).
Providers can implement this via class attributes (TABLE_AUTO_STAGES /
TABLE_AUTO_PREFIXES) or by overriding this method.
"""
t = str(table_type or "").strip().lower()
if not t:
return None
stage: Optional[Sequence[str]] = None
try:
stage = cls.TABLE_AUTO_STAGES.get(t)
except Exception:
stage = None
if stage is None:
try:
for prefix, cmd in (cls.TABLE_AUTO_PREFIXES or {}).items():
p = str(prefix or "").strip().lower()
if not p:
continue
if t == p or t.startswith(p + ".") or t.startswith(p):
stage = cmd
break
except Exception:
stage = None
if not stage:
return None
out = [str(x) for x in stage if str(x or "").strip()]
if not out:
return None
if cls.AUTO_STAGE_USE_SELECTION_ARGS and stage_args:
try:
out.extend([str(x) for x in stage_args if str(x or "").strip()])
except Exception:
pass
return out
2026-01-01 20:37:27 -08:00
@classmethod
def url_patterns(cls) -> Tuple[str, ...]:
"""Return normalized URL patterns that this provider handles."""
patterns: List[str] = []
maybe_urls = getattr(cls, "URL", None)
if isinstance(maybe_urls, (list, tuple)):
for entry in maybe_urls:
try:
candidate = str(entry or "").strip().lower()
except Exception:
continue
if candidate:
patterns.append(candidate)
maybe_domains = getattr(cls, "URL_DOMAINS", None)
if isinstance(maybe_domains, (list, tuple)):
for entry in maybe_domains:
try:
candidate = str(entry or "").strip().lower()
except Exception:
continue
if candidate and candidate not in patterns:
patterns.append(candidate)
return tuple(patterns)
2025-12-11 19:04:02 -08:00
2025-12-19 02:29:42 -08:00
class SearchProvider(Provider):
"""Compatibility alias for older code.
Prefer inheriting from Provider directly.
"""
class FileProvider(Provider):
"""Compatibility alias for older code.
Prefer inheriting from Provider directly.
"""