from __future__ import annotations from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Type, cast import requests import sys import json import subprocess from SYS.logger import log, debug try: # Optional dependency import musicbrainzngs # type: ignore except ImportError: # pragma: no cover - optional musicbrainzngs = None try: # Optional dependency import yt_dlp # type: ignore except ImportError: # pragma: no cover - optional yt_dlp = None class MetadataProvider(ABC): """Base class for metadata providers (music, movies, books, etc.).""" def __init__(self, config: Optional[Dict[str, Any]] = None) -> None: self.config = config or {} @property def name(self) -> str: return self.__class__.__name__.replace("Provider", "").lower() @abstractmethod def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: """Return a list of candidate metadata records.""" def to_tags(self, item: Dict[str, Any]) -> List[str]: """Convert a result item into a list of tags.""" tags: List[str] = [] title = item.get("title") artist = item.get("artist") album = item.get("album") year = item.get("year") if title: tags.append(f"title:{title}") if artist: tags.append(f"artist:{artist}") if album: tags.append(f"album:{album}") if year: tags.append(f"year:{year}") tags.append(f"source:{self.name}") return tags class ITunesProvider(MetadataProvider): """Metadata provider using the iTunes Search API.""" def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: params = {"term": query, "media": "music", "entity": "song", "limit": limit} try: resp = requests.get("https://itunes.apple.com/search", params=params, timeout=10) resp.raise_for_status() results = resp.json().get("results", []) except Exception as exc: log(f"iTunes search failed: {exc}", file=sys.stderr) return [] items: List[Dict[str, Any]] = [] for r in results: item = { "title": r.get("trackName"), "artist": r.get("artistName"), "album": r.get("collectionName"), "year": str(r.get("releaseDate", ""))[:4], "provider": self.name, "raw": r, } items.append(item) debug(f"iTunes returned {len(items)} items for '{query}'") return items class OpenLibraryMetadataProvider(MetadataProvider): """Metadata provider for OpenLibrary book metadata.""" @property def name(self) -> str: # type: ignore[override] return "openlibrary" def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: query_clean = (query or "").strip() if not query_clean: return [] try: # Prefer ISBN-specific search when the query looks like one if query_clean.replace("-", "").isdigit() and len(query_clean.replace("-", "")) in (10, 13): q = f"isbn:{query_clean.replace('-', '')}" else: q = query_clean resp = requests.get( "https://openlibrary.org/search.json", params={"q": q, "limit": limit}, timeout=10, ) resp.raise_for_status() data = resp.json() except Exception as exc: log(f"OpenLibrary search failed: {exc}", file=sys.stderr) return [] items: List[Dict[str, Any]] = [] for doc in data.get("docs", [])[:limit]: authors = doc.get("author_name") or [] publisher = "" publishers = doc.get("publisher") or [] if isinstance(publishers, list) and publishers: publisher = publishers[0] # Prefer 13-digit ISBN when available, otherwise 10-digit isbn_list = doc.get("isbn") or [] isbn_13 = next((i for i in isbn_list if len(str(i)) == 13), None) isbn_10 = next((i for i in isbn_list if len(str(i)) == 10), None) # Derive OLID from key olid = "" key = doc.get("key", "") if isinstance(key, str) and key: olid = key.split("/")[-1] items.append({ "title": doc.get("title") or "", "artist": ", ".join(authors) if authors else "", "album": publisher, "year": str(doc.get("first_publish_year") or ""), "provider": self.name, "authors": authors, "publisher": publisher, "identifiers": { "isbn_13": isbn_13, "isbn_10": isbn_10, "openlibrary": olid, "oclc": (doc.get("oclc_numbers") or [None])[0], "lccn": (doc.get("lccn") or [None])[0], }, "description": None, }) return items def to_tags(self, item: Dict[str, Any]) -> List[str]: tags: List[str] = [] title = item.get("title") authors = item.get("authors") or [] publisher = item.get("publisher") year = item.get("year") description = item.get("description") or "" if title: tags.append(f"title:{title}") for author in authors: if author: tags.append(f"author:{author}") if publisher: tags.append(f"publisher:{publisher}") if year: tags.append(f"year:{year}") if description: tags.append(f"description:{description[:200]}") identifiers = item.get("identifiers") or {} for key, value in identifiers.items(): if value: tags.append(f"{key}:{value}") tags.append(f"source:{self.name}") return tags class GoogleBooksMetadataProvider(MetadataProvider): """Metadata provider for Google Books volumes API.""" @property def name(self) -> str: # type: ignore[override] return "googlebooks" def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: query_clean = (query or "").strip() if not query_clean: return [] # Prefer ISBN queries when possible if query_clean.replace("-", "").isdigit() and len(query_clean.replace("-", "")) in (10, 13): q = f"isbn:{query_clean.replace('-', '')}" else: q = query_clean try: resp = requests.get( "https://www.googleapis.com/books/v1/volumes", params={"q": q, "maxResults": limit}, timeout=10, ) resp.raise_for_status() payload = resp.json() except Exception as exc: log(f"Google Books search failed: {exc}", file=sys.stderr) return [] items: List[Dict[str, Any]] = [] for volume in payload.get("items", [])[:limit]: info = volume.get("volumeInfo") or {} authors = info.get("authors") or [] publisher = info.get("publisher", "") published_date = info.get("publishedDate", "") year = str(published_date)[:4] if published_date else "" identifiers_raw = info.get("industryIdentifiers") or [] identifiers: Dict[str, Optional[str]] = {"googlebooks": volume.get("id")} for ident in identifiers_raw: if not isinstance(ident, dict): continue ident_type = ident.get("type", "").lower() ident_value = ident.get("identifier") if not ident_value: continue if ident_type == "isbn_13": identifiers.setdefault("isbn_13", ident_value) elif ident_type == "isbn_10": identifiers.setdefault("isbn_10", ident_value) else: identifiers.setdefault(ident_type, ident_value) items.append({ "title": info.get("title") or "", "artist": ", ".join(authors) if authors else "", "album": publisher, "year": year, "provider": self.name, "authors": authors, "publisher": publisher, "identifiers": identifiers, "description": info.get("description", ""), }) return items def to_tags(self, item: Dict[str, Any]) -> List[str]: tags: List[str] = [] title = item.get("title") authors = item.get("authors") or [] publisher = item.get("publisher") year = item.get("year") description = item.get("description") or "" if title: tags.append(f"title:{title}") for author in authors: if author: tags.append(f"author:{author}") if publisher: tags.append(f"publisher:{publisher}") if year: tags.append(f"year:{year}") if description: tags.append(f"description:{description[:200]}") identifiers = item.get("identifiers") or {} for key, value in identifiers.items(): if value: tags.append(f"{key}:{value}") tags.append(f"source:{self.name}") return tags class MusicBrainzMetadataProvider(MetadataProvider): """Metadata provider for MusicBrainz recordings.""" @property def name(self) -> str: # type: ignore[override] return "musicbrainz" def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: if not musicbrainzngs: log("musicbrainzngs is not installed; skipping MusicBrainz scrape", file=sys.stderr) return [] q = (query or "").strip() if not q: return [] try: # Ensure user agent is set (required by MusicBrainz) musicbrainzngs.set_useragent("Medeia-Macina", "0.1") except Exception: pass try: resp = musicbrainzngs.search_recordings(query=q, limit=limit) recordings = resp.get("recording-list") or resp.get("recordings") or [] except Exception as exc: log(f"MusicBrainz search failed: {exc}", file=sys.stderr) return [] items: List[Dict[str, Any]] = [] for rec in recordings[:limit]: if not isinstance(rec, dict): continue title = rec.get("title") or "" artist = "" artist_credit = rec.get("artist-credit") or rec.get("artist_credit") if isinstance(artist_credit, list) and artist_credit: first = artist_credit[0] if isinstance(first, dict): artist = first.get("name") or first.get("artist", {}).get("name", "") elif isinstance(first, str): artist = first album = "" release_list = rec.get("release-list") or rec.get("releases") or rec.get("release") if isinstance(release_list, list) and release_list: first_rel = release_list[0] if isinstance(first_rel, dict): album = first_rel.get("title", "") or "" release_date = first_rel.get("date") or "" else: album = str(first_rel) release_date = "" else: release_date = rec.get("first-release-date") or "" year = str(release_date)[:4] if release_date else "" mbid = rec.get("id") or "" items.append({ "title": title, "artist": artist, "album": album, "year": year, "provider": self.name, "mbid": mbid, "raw": rec, }) return items def to_tags(self, item: Dict[str, Any]) -> List[str]: tags = super().to_tags(item) mbid = item.get("mbid") if mbid: tags.append(f"musicbrainz:{mbid}") return tags class YtdlpMetadataProvider(MetadataProvider): """Metadata provider that extracts tags from a supported URL using yt-dlp. This does NOT download media; it only probes metadata. """ @property def name(self) -> str: # type: ignore[override] return "ytdlp" def _extract_info(self, url: str) -> Optional[Dict[str, Any]]: url = (url or "").strip() if not url: return None # Prefer Python module when available. if yt_dlp is not None: try: opts: Any = { "quiet": True, "no_warnings": True, "skip_download": True, "noprogress": True, "socket_timeout": 15, "retries": 1, "playlist_items": "1-10", } with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore[attr-defined] info = ydl.extract_info(url, download=False) return cast(Dict[str, Any], info) if isinstance(info, dict) else None except Exception: pass # Fallback to CLI. try: cmd = [ "yt-dlp", "-J", "--no-warnings", "--skip-download", "--playlist-items", "1-10", url, ] proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if proc.returncode != 0: return None payload = (proc.stdout or "").strip() if not payload: return None data = json.loads(payload) return data if isinstance(data, dict) else None except Exception: return None def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: url = (query or "").strip() if not url.startswith(("http://", "https://")): return [] info = self._extract_info(url) if not isinstance(info, dict): return [] upload_date = str(info.get("upload_date") or "") release_date = str(info.get("release_date") or "") year = (release_date or upload_date)[:4] if (release_date or upload_date) else "" # Provide basic columns for the standard metadata selection table. # NOTE: This is best-effort; many extractors don't provide artist/album. artist = ( info.get("artist") or info.get("uploader") or info.get("channel") or "" ) album = info.get("album") or info.get("playlist_title") or "" title = info.get("title") or "" return [ { "title": title, "artist": str(artist or ""), "album": str(album or ""), "year": str(year or ""), "provider": self.name, "url": url, "raw": info, } ] def to_tags(self, item: Dict[str, Any]) -> List[str]: raw = item.get("raw") if not isinstance(raw, dict): return super().to_tags(item) tags: List[str] = [] try: from metadata import extract_ytdlp_tags except Exception: extract_ytdlp_tags = None # type: ignore[assignment] if extract_ytdlp_tags: try: tags.extend(extract_ytdlp_tags(raw)) except Exception: pass # Subtitle availability tags def _langs(value: Any) -> List[str]: if not isinstance(value, dict): return [] out: List[str] = [] for k in value.keys(): if isinstance(k, str) and k.strip(): out.append(k.strip().lower()) return sorted(set(out)) # If this is a playlist container, subtitle/captions are usually per-entry. info_for_subs: Dict[str, Any] = raw entries = raw.get("entries") if isinstance(entries, list) and entries: first = entries[0] if isinstance(first, dict): info_for_subs = first for lang in _langs(info_for_subs.get("subtitles")): tags.append(f"subs:{lang}") for lang in _langs(info_for_subs.get("automatic_captions")): tags.append(f"subs_auto:{lang}") # Always include source tag for parity with other providers. tags.append(f"source:{self.name}") # Dedup case-insensitively, preserve order. seen = set() out: List[str] = [] for t in tags: if not isinstance(t, str): continue s = t.strip() if not s: continue k = s.lower() if k in seen: continue seen.add(k) out.append(s) return out # Registry --------------------------------------------------------------- _METADATA_PROVIDERS: Dict[str, Type[MetadataProvider]] = { "itunes": ITunesProvider, "openlibrary": OpenLibraryMetadataProvider, "googlebooks": GoogleBooksMetadataProvider, "google": GoogleBooksMetadataProvider, "musicbrainz": MusicBrainzMetadataProvider, "ytdlp": YtdlpMetadataProvider, } def register_provider(name: str, provider_cls: Type[MetadataProvider]) -> None: _METADATA_PROVIDERS[name.lower()] = provider_cls def list_metadata_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]: availability: Dict[str, bool] = {} for name, cls in _METADATA_PROVIDERS.items(): try: _ = cls(config) # Basic availability check: perform lightweight validation if defined availability[name] = True except Exception: availability[name] = False return availability def get_metadata_provider(name: str, config: Optional[Dict[str, Any]] = None) -> Optional[MetadataProvider]: cls = _METADATA_PROVIDERS.get(name.lower()) if not cls: return None try: return cls(config) except Exception as exc: log(f"Provider init failed for '{name}': {exc}", file=sys.stderr) return None