"""Provider registry. Concrete provider implementations live in the `Provider/` package. This module is the single source of truth for provider discovery. """ from __future__ import annotations from typing import Any, Dict, Optional, Sequence, Type import sys from urllib.parse import urlparse from SYS.logger import log from ProviderCore.base import Provider, SearchProvider, FileProvider, SearchResult from Provider.alldebrid import AllDebrid from Provider.bandcamp import Bandcamp from Provider.libgen import Libgen from Provider.matrix import Matrix from Provider.openlibrary import OpenLibrary from Provider.soulseek import Soulseek, download_soulseek_file from Provider.telegram import Telegram from Provider.youtube import YouTube from Provider.fileio import FileIO from Provider.zeroxzero import ZeroXZero from Provider.loc import LOC from Provider.internetarchive import InternetArchive from Provider.podcastindex import PodcastIndex from Provider.HIFI import HIFI _PROVIDERS: Dict[str, Type[Provider]] = { # Search-capable providers "alldebrid": AllDebrid, "libgen": Libgen, "openlibrary": OpenLibrary, "internetarchive": InternetArchive, "hifi": HIFI, "soulseek": Soulseek, "bandcamp": Bandcamp, "youtube": YouTube, "telegram": Telegram, "loc": LOC, "podcastindex": PodcastIndex, # Upload-capable providers "0x0": ZeroXZero, "file.io": FileIO, "matrix": Matrix, } def get_provider_class(name: str) -> Optional[Type[Provider]]: """Return the provider class for a registered provider name, if any.""" key = str(name or "").strip().lower() return _PROVIDERS.get(key) def selection_auto_stage_for_table( table_type: str, stage_args: Optional[Sequence[str]] = None, ) -> Optional[list[str]]: """Return the provider-suggested stage to auto-run for a selected table. This is used by the CLI to avoid hardcoding table names and behaviors. """ t = str(table_type or "").strip().lower() if not t: return None # Provider tables are usually either: # - "youtube" (no dot) # - "hifi.tracks" (prefix = provider name) provider_key = t.split(".", 1)[0] if "." in t else t provider_class = get_provider_class(provider_key) or get_provider_class(t) if provider_class is None: return None try: return provider_class.selection_auto_stage(t, stage_args) except Exception: return None def is_known_provider_name(name: str) -> bool: """Return True if `name` matches a registered provider key. This is intentionally cheap (no imports/instantiation) so callers can probe UI strings (table names, store names, etc.) without triggering noisy 'Unknown provider' logs. """ return (name or "").strip().lower() in _PROVIDERS def _supports_search(provider: Provider) -> bool: return provider.__class__.search is not Provider.search def _supports_upload(provider: Provider) -> bool: return provider.__class__.upload is not Provider.upload def _provider_url_patterns(provider_class: Type[Provider]) -> Sequence[str]: try: return list(provider_class.url_patterns()) except Exception: return [] def get_provider(name: str, config: Optional[Dict[str, Any]] = None) -> Optional[Provider]: """Get a provider by name (unified registry).""" provider_class = _PROVIDERS.get((name or "").lower()) if provider_class is None: log(f"[provider] Unknown provider: {name}", file=sys.stderr) return None try: provider = provider_class(config) if not provider.validate(): log(f"[provider] Provider '{name}' is not available", file=sys.stderr) return None return provider except Exception as exc: log(f"[provider] Error initializing '{name}': {exc}", file=sys.stderr) return None def list_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]: """List all providers and their availability.""" availability: Dict[str, bool] = {} for name, provider_class in _PROVIDERS.items(): try: provider = provider_class(config) availability[name] = provider.validate() except Exception: availability[name] = False return availability def get_search_provider(name: str, config: Optional[Dict[str, Any]] = None) -> Optional[SearchProvider]: """Get a search-capable provider by name (compat API).""" provider = get_provider(name, config) if provider is None: return None if not _supports_search(provider): log(f"[provider] Provider '{name}' does not support search", file=sys.stderr) return None return provider # type: ignore[return-value] def list_search_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]: """List all search providers and their availability.""" availability: Dict[str, bool] = {} for name, provider_class in _PROVIDERS.items(): try: provider = provider_class(config) availability[name] = bool( provider.validate() and _supports_search(provider) ) except Exception: availability[name] = False return availability def get_file_provider(name: str, config: Optional[Dict[str, Any]] = None) -> Optional[FileProvider]: """Get an upload-capable provider by name (compat API).""" provider = get_provider(name, config) if provider is None: return None if not _supports_upload(provider): log(f"[provider] Provider '{name}' does not support upload", file=sys.stderr) return None return provider # type: ignore[return-value] def list_file_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]: """List all file providers and their availability.""" availability: Dict[str, bool] = {} for name, provider_class in _PROVIDERS.items(): try: provider = provider_class(config) availability[name] = bool( provider.validate() and _supports_upload(provider) ) except Exception: availability[name] = False return availability def match_provider_name_for_url(url: str) -> Optional[str]: """Return a registered provider name that claims the URL's domain. Providers can declare domains via class attribute `URL` (preferred) or `URL_DOMAINS`. This matcher is intentionally cheap (no provider instantiation, no network). """ raw_url = str(url or "").strip() raw_url_lower = raw_url.lower() try: parsed = urlparse(raw_url) host = (parsed.hostname or "").strip().lower() path = (parsed.path or "").strip() except Exception: host = "" path = "" # Prefer Internet Archive for archive.org links unless the URL clearly refers # to a borrow/loan flow (handled by OpenLibrary provider). # # This keeps direct downloads and item pages routed to `internetarchive`, while # preserving OpenLibrary's scripted borrow pipeline for loan/reader URLs. if host: if host == "openlibrary.org" or host.endswith(".openlibrary.org"): return "openlibrary" if "openlibrary" in _PROVIDERS else None if host == "archive.org" or host.endswith(".archive.org"): low_path = str(path or "").lower() is_borrowish = ( low_path.startswith("/borrow/") or low_path.startswith("/stream/") or low_path.startswith("/services/loans/") or "/services/loans/" in low_path ) if is_borrowish: return "openlibrary" if "openlibrary" in _PROVIDERS else None return "internetarchive" if "internetarchive" in _PROVIDERS else None for name, provider_class in _PROVIDERS.items(): domains = _provider_url_patterns(provider_class) if not domains: continue for d in domains: dom = str(d or "").strip().lower() if not dom: continue if raw_url_lower.startswith(dom): return name for d in domains: dom = str(d or "").strip().lower() if not dom or not host: continue if host == dom or host.endswith("." + dom): return name return None def get_provider_for_url(url: str, config: Optional[Dict[str, Any]] = None) -> Optional[Provider]: """Instantiate and return the matching provider for a URL, if any.""" name = match_provider_name_for_url(url) if not name: return None return get_provider(name, config) __all__ = [ "SearchResult", "Provider", "SearchProvider", "FileProvider", "get_provider", "list_providers", "get_search_provider", "list_search_providers", "get_file_provider", "list_file_providers", "match_provider_name_for_url", "get_provider_for_url", "download_soulseek_file", "get_provider_class", "selection_auto_stage_for_table", ]