from __future__ import annotations from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Type, cast import html as html_std import re import requests import sys import json import subprocess from API.HTTP import HTTPClient from ProviderCore.base import SearchResult try: # Optional dependency for IMDb scraping from imdbinfo.services import search_title # type: ignore except ImportError: # pragma: no cover - optional search_title = None # type: ignore[assignment] from SYS.logger import log, debug from SYS.metadata import imdb_tag from SYS.json_table import normalize_record try: # Optional dependency import musicbrainzngs # type: ignore except ImportError: # pragma: no cover - optional musicbrainzngs = None try: # Optional dependency import yt_dlp # type: ignore except ImportError: # pragma: no cover - optional yt_dlp = None class MetadataProvider(ABC): """Base class for metadata providers (music, movies, books, etc.).""" def __init__(self, config: Optional[Dict[str, Any]] = None) -> None: self.config = config or {} @property def name(self) -> str: return self.__class__.__name__.replace("Provider", "").lower() @abstractmethod def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: """Return a list of candidate metadata records.""" def to_tags(self, item: Dict[str, Any]) -> List[str]: """Convert a result item into a list of tags.""" tags: List[str] = [] title = item.get("title") artist = item.get("artist") album = item.get("album") year = item.get("year") if title: tags.append(f"title:{title}") if artist: tags.append(f"artist:{artist}") if album: tags.append(f"album:{album}") if year: tags.append(f"year:{year}") tags.append(f"source:{self.name}") return tags class ITunesProvider(MetadataProvider): """Metadata provider using the iTunes Search API.""" def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: params = { "term": query, "media": "music", "entity": "song", "limit": limit } try: resp = requests.get( "https://itunes.apple.com/search", params=params, timeout=10 ) resp.raise_for_status() results = resp.json().get("results", []) except Exception as exc: log(f"iTunes search failed: {exc}", file=sys.stderr) return [] items: List[Dict[str, Any]] = [] for r in results: item = { "title": r.get("trackName"), "artist": r.get("artistName"), "album": r.get("collectionName"), "year": str(r.get("releaseDate", ""))[:4], "provider": self.name, "raw": r, } items.append(item) debug(f"iTunes returned {len(items)} items for '{query}'") return items class OpenLibraryMetadataProvider(MetadataProvider): """Metadata provider for OpenLibrary book metadata.""" @property def name(self) -> str: # type: ignore[override] return "openlibrary" def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: query_clean = (query or "").strip() if not query_clean: return [] try: # Prefer ISBN-specific search when the query looks like one if query_clean.replace("-", "").isdigit() and len(query_clean.replace("-", "")) in ( 10, 13, ): q = f"isbn:{query_clean.replace('-', '')}" else: q = query_clean resp = requests.get( "https://openlibrary.org/search.json", params={ "q": q, "limit": limit }, timeout=10, ) resp.raise_for_status() data = resp.json() except Exception as exc: log(f"OpenLibrary search failed: {exc}", file=sys.stderr) return [] items: List[Dict[str, Any]] = [] for doc in data.get("docs", [])[:limit]: authors = doc.get("author_name") or [] publisher = "" publishers = doc.get("publisher") or [] if isinstance(publishers, list) and publishers: publisher = publishers[0] # Prefer 13-digit ISBN when available, otherwise 10-digit isbn_list = doc.get("isbn") or [] isbn_13 = next((i for i in isbn_list if len(str(i)) == 13), None) isbn_10 = next((i for i in isbn_list if len(str(i)) == 10), None) # Derive OLID from key olid = "" key = doc.get("key", "") if isinstance(key, str) and key: olid = key.split("/")[-1] items.append( { "title": doc.get("title") or "", "artist": ", ".join(authors) if authors else "", "album": publisher, "year": str(doc.get("first_publish_year") or ""), "provider": self.name, "authors": authors, "publisher": publisher, "identifiers": { "isbn_13": isbn_13, "isbn_10": isbn_10, "openlibrary": olid, "oclc": (doc.get("oclc_numbers") or [None])[0], "lccn": (doc.get("lccn") or [None])[0], }, "description": None, } ) return items def to_tags(self, item: Dict[str, Any]) -> List[str]: tags: List[str] = [] title = item.get("title") authors = item.get("authors") or [] publisher = item.get("publisher") year = item.get("year") description = item.get("description") or "" if title: tags.append(f"title:{title}") for author in authors: if author: tags.append(f"author:{author}") if publisher: tags.append(f"publisher:{publisher}") if year: tags.append(f"year:{year}") if description: tags.append(f"description:{description[:200]}") identifiers = item.get("identifiers") or {} for key, value in identifiers.items(): if value: tags.append(f"{key}:{value}") tags.append(f"source:{self.name}") return tags class GoogleBooksMetadataProvider(MetadataProvider): """Metadata provider for Google Books volumes API.""" @property def name(self) -> str: # type: ignore[override] return "googlebooks" def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: query_clean = (query or "").strip() if not query_clean: return [] # Prefer ISBN queries when possible if query_clean.replace("-", "").isdigit() and len(query_clean.replace("-", "")) in (10, 13): q = f"isbn:{query_clean.replace('-', '')}" else: q = query_clean try: resp = requests.get( "https://www.googleapis.com/books/v1/volumes", params={ "q": q, "maxResults": limit }, timeout=10, ) resp.raise_for_status() payload = resp.json() except Exception as exc: log(f"Google Books search failed: {exc}", file=sys.stderr) return [] items: List[Dict[str, Any]] = [] for volume in payload.get("items", [])[:limit]: info = volume.get("volumeInfo") or {} authors = info.get("authors") or [] publisher = info.get("publisher", "") published_date = info.get("publishedDate", "") year = str(published_date)[:4] if published_date else "" identifiers_raw = info.get("industryIdentifiers") or [] identifiers: Dict[str, Optional[str]] = { "googlebooks": volume.get("id") } for ident in identifiers_raw: if not isinstance(ident, dict): continue ident_type = ident.get("type", "").lower() ident_value = ident.get("identifier") if not ident_value: continue if ident_type == "isbn_13": identifiers.setdefault("isbn_13", ident_value) elif ident_type == "isbn_10": identifiers.setdefault("isbn_10", ident_value) else: identifiers.setdefault(ident_type, ident_value) items.append( { "title": info.get("title") or "", "artist": ", ".join(authors) if authors else "", "album": publisher, "year": year, "provider": self.name, "authors": authors, "publisher": publisher, "identifiers": identifiers, "description": info.get("description", ""), } ) return items def to_tags(self, item: Dict[str, Any]) -> List[str]: tags: List[str] = [] title = item.get("title") authors = item.get("authors") or [] publisher = item.get("publisher") year = item.get("year") description = item.get("description") or "" if title: tags.append(f"title:{title}") for author in authors: if author: tags.append(f"author:{author}") if publisher: tags.append(f"publisher:{publisher}") if year: tags.append(f"year:{year}") if description: tags.append(f"description:{description[:200]}") identifiers = item.get("identifiers") or {} for key, value in identifiers.items(): if value: tags.append(f"{key}:{value}") tags.append(f"source:{self.name}") return tags class ISBNsearchMetadataProvider(MetadataProvider): """Metadata provider that scrapes isbnsearch.org by ISBN. This is a best-effort HTML scrape. It expects the query to be an ISBN. """ @property def name(self) -> str: # type: ignore[override] return "isbnsearch" @staticmethod def _strip_html_to_text(raw: str) -> str: s = html_std.unescape(str(raw or "")) s = re.sub(r"(?i)", "\n", s) s = re.sub(r"<[^>]+>", " ", s) s = re.sub(r"\s+", " ", s) return s.strip() @staticmethod def _clean_isbn(query: str) -> str: s = str(query or "").strip() if not s: return "" s = s.replace("isbn:", "").replace("ISBN:", "") s = re.sub(r"[^0-9Xx]", "", s).upper() if len(s) in (10, 13): return s # Try to locate an ISBN-like token inside the query. m = re.search(r"\b(?:97[89])?\d{9}[\dXx]\b", s) return str(m.group(0)).upper() if m else "" def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: _ = limit isbn = self._clean_isbn(query) if not isbn: return [] url = f"https://isbnsearch.org/isbn/{isbn}" try: resp = requests.get(url, timeout=10) resp.raise_for_status() html = str(resp.text or "") if not html: return [] except Exception as exc: log(f"ISBNsearch scrape failed: {exc}", file=sys.stderr) return [] title = "" m_title = re.search(r"(?is)]*>(.*?)", html) if m_title: title = self._strip_html_to_text(m_title.group(1)) raw_fields: Dict[str, str] = {} strong_matches = list(re.finditer(r"(?is)]*>(.*?)", html)) for idx, m in enumerate(strong_matches): label_raw = self._strip_html_to_text(m.group(1)) label = str(label_raw or "").strip() if not label: continue if label.endswith(":"): label = label[:-1].strip() chunk_start = m.end() # Stop at next or end of document. chunk_end = ( strong_matches[idx + 1].start() if (idx + 1) < len(strong_matches) else len(html) ) chunk = html[chunk_start:chunk_end] # Prefer stopping within the same paragraph when possible. m_end = re.search(r"(?is)(

|)", chunk) if m_end: chunk = chunk[:m_end.start()] val_text = self._strip_html_to_text(chunk) if not val_text: continue raw_fields[label] = val_text def _get(*labels: str) -> str: for lab in labels: for k, v in raw_fields.items(): if str(k).strip().lower() == str(lab).strip().lower(): return str(v or "").strip() return "" # Map common ISBNsearch labels. author_text = _get("Author", "Authors", "Author(s)") publisher = _get("Publisher") published = _get("Published", "Publication Date", "Publish Date") language = _get("Language") pages = _get("Pages") isbn_13 = _get("ISBN-13", "ISBN13") isbn_10 = _get("ISBN-10", "ISBN10") year = "" if published: m_year = re.search(r"\b(\d{4})\b", published) year = str(m_year.group(1)) if m_year else "" authors: List[str] = [] if author_text: # Split on common separators; keep multi-part names intact. for part in re.split(r"\s*(?:,|;|\band\b|\&|\|)\s*", author_text, flags=re.IGNORECASE): p = str(part or "").strip() if p: authors.append(p) # Prefer parsed title, but fall back to og:title if needed. if not title: m_og = re.search( r"(?is)]*property=['\"]og:title['\"][^>]*content=['\"](.*?)['\"][^>]*>", html, ) if m_og: title = self._strip_html_to_text(m_og.group(1)) # Ensure ISBN tokens are normalized. isbn_tokens: List[str] = [] for token in [isbn_13, isbn_10, isbn]: t = self._clean_isbn(token) if t and t not in isbn_tokens: isbn_tokens.append(t) item: Dict[str, Any] = { "title": title or "", # Keep UI columns compatible with the generic metadata table. "artist": ", ".join(authors) if authors else "", "album": publisher or "", "year": year or "", "provider": self.name, "authors": authors, "publisher": publisher or "", "language": language or "", "pages": pages or "", "identifiers": { "isbn_13": next((t for t in isbn_tokens if len(t) == 13), None), "isbn_10": next((t for t in isbn_tokens if len(t) == 10), None), }, "raw_fields": raw_fields, } # Only return usable items. if not item.get("title") and not any(item["identifiers"].values()): return [] return [item] def to_tags(self, item: Dict[str, Any]) -> List[str]: tags: List[str] = [] title = str(item.get("title") or "").strip() if title: tags.append(f"title:{title}") authors = item.get("authors") or [] if isinstance(authors, list): for a in authors: a = str(a or "").strip() if a: tags.append(f"author:{a}") publisher = str(item.get("publisher") or "").strip() if publisher: tags.append(f"publisher:{publisher}") year = str(item.get("year") or "").strip() if year: tags.append(f"year:{year}") language = str(item.get("language") or "").strip() if language: tags.append(f"language:{language}") identifiers = item.get("identifiers") or {} if isinstance(identifiers, dict): for key in ("isbn_13", "isbn_10"): val = identifiers.get(key) if val: tags.append(f"isbn:{val}") tags.append(f"source:{self.name}") # Dedup case-insensitively, preserve order. seen: set[str] = set() out: List[str] = [] for t in tags: s = str(t or "").strip() if not s: continue k = s.lower() if k in seen: continue seen.add(k) out.append(s) return out class MusicBrainzMetadataProvider(MetadataProvider): """Metadata provider for MusicBrainz recordings.""" @property def name(self) -> str: # type: ignore[override] return "musicbrainz" def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: if not musicbrainzngs: log( "musicbrainzngs is not installed; skipping MusicBrainz scrape", file=sys.stderr ) return [] q = (query or "").strip() if not q: return [] try: # Ensure user agent is set (required by MusicBrainz) musicbrainzngs.set_useragent("Medeia-Macina", "0.1") except Exception: pass try: resp = musicbrainzngs.search_recordings(query=q, limit=limit) recordings = resp.get("recording-list") or resp.get("recordings") or [] except Exception as exc: log(f"MusicBrainz search failed: {exc}", file=sys.stderr) return [] items: List[Dict[str, Any]] = [] for rec in recordings[:limit]: if not isinstance(rec, dict): continue title = rec.get("title") or "" artist = "" artist_credit = rec.get("artist-credit") or rec.get("artist_credit") if isinstance(artist_credit, list) and artist_credit: first = artist_credit[0] if isinstance(first, dict): artist = first.get("name") or first.get("artist", {}).get("name", "") elif isinstance(first, str): artist = first album = "" release_list = rec.get("release-list") or rec.get("releases" ) or rec.get("release") if isinstance(release_list, list) and release_list: first_rel = release_list[0] if isinstance(first_rel, dict): album = first_rel.get("title", "") or "" release_date = first_rel.get("date") or "" else: album = str(first_rel) release_date = "" else: release_date = rec.get("first-release-date") or "" year = str(release_date)[:4] if release_date else "" mbid = rec.get("id") or "" items.append( { "title": title, "artist": artist, "album": album, "year": year, "provider": self.name, "mbid": mbid, "raw": rec, } ) return items def to_tags(self, item: Dict[str, Any]) -> List[str]: tags = super().to_tags(item) mbid = item.get("mbid") if mbid: tags.append(f"musicbrainz:{mbid}") return tags class ImdbMetadataProvider(MetadataProvider): """Metadata provider for IMDb titles (movies/series/episodes).""" @property def name(self) -> str: # type: ignore[override] return "imdb" @staticmethod def _extract_imdb_id(text: str) -> str: raw = str(text or "").strip() if not raw: return "" # Exact tt123 pattern m = re.search(r"(tt\d+)", raw, re.IGNORECASE) if m: imdb_id = m.group(1).lower() return imdb_id if imdb_id.startswith("tt") else f"tt{imdb_id}" # Bare numeric IDs (e.g., "0118883") if raw.isdigit() and len(raw) >= 6: return f"tt{raw}" # Last-resort: extract first digit run m_digits = re.search(r"(\d{6,})", raw) if m_digits: return f"tt{m_digits.group(1)}" return "" def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: q = (query or "").strip() if not q: return [] imdb_id = self._extract_imdb_id(q) if imdb_id: try: data = imdb_tag(imdb_id) raw_tags = data.get("tag") if isinstance(data, dict) else [] title = None year = None if isinstance(raw_tags, list): for tag in raw_tags: if not isinstance(tag, str): continue if tag.startswith("title:"): title = tag.split(":", 1)[1] elif tag.startswith("year:"): year = tag.split(":", 1)[1] return [ { "title": title or imdb_id, "artist": "", "album": "", "year": str(year or ""), "provider": self.name, "imdb_id": imdb_id, "raw": data, } ] except Exception as exc: log(f"IMDb lookup failed: {exc}", file=sys.stderr) return [] if search_title is None: log("imdbinfo is not installed; skipping IMDb scrape", file=sys.stderr) return [] try: search_result = search_title(q) titles = getattr(search_result, "titles", None) or [] except Exception as exc: log(f"IMDb search failed: {exc}", file=sys.stderr) return [] items: List[Dict[str, Any]] = [] for entry in titles[:limit]: imdb_id = self._extract_imdb_id( getattr(entry, "imdb_id", None) or getattr(entry, "imdbId", None) or getattr(entry, "id", None) ) title = getattr(entry, "title", "") or getattr(entry, "title_localized", "") year = str(getattr(entry, "year", "") or "")[:4] kind = getattr(entry, "kind", "") or "" rating = getattr(entry, "rating", None) items.append( { "title": title, "artist": "", "album": kind, "year": year, "provider": self.name, "imdb_id": imdb_id, "kind": kind, "rating": rating, "raw": entry, } ) return items def to_tags(self, item: Dict[str, Any]) -> List[str]: imdb_id = self._extract_imdb_id( item.get("imdb_id") or item.get("id") or item.get("imdb") or "" ) try: if imdb_id: data = imdb_tag(imdb_id) raw_tags = data.get("tag") if isinstance(data, dict) else [] tags = [t for t in raw_tags if isinstance(t, str)] if tags: return tags except Exception as exc: log(f"IMDb tag extraction failed: {exc}", file=sys.stderr) tags = super().to_tags(item) if imdb_id: tags.append(f"imdb:{imdb_id}") seen: set[str] = set() deduped: List[str] = [] for t in tags: s = str(t or "").strip() if not s: continue k = s.lower() if k in seen: continue seen.add(k) deduped.append(s) return deduped class YtdlpMetadataProvider(MetadataProvider): """Metadata provider that extracts tags from a supported URL using yt-dlp. This does NOT download media; it only probes metadata. """ @property def name(self) -> str: # type: ignore[override] return "ytdlp" def _extract_info(self, url: str) -> Optional[Dict[str, Any]]: url = (url or "").strip() if not url: return None # Prefer Python module when available. if yt_dlp is not None: try: opts: Any = { "quiet": True, "no_warnings": True, "skip_download": True, "noprogress": True, "socket_timeout": 15, "retries": 1, "playlist_items": "1-10", } with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore[attr-defined] info = ydl.extract_info(url, download=False) return cast(Dict[str, Any], info) if isinstance(info, dict) else None except Exception: pass # Fallback to CLI. try: cmd = [ "yt-dlp", "-J", "--no-warnings", "--skip-download", "--playlist-items", "1-10", url, ] proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if proc.returncode != 0: return None payload = (proc.stdout or "").strip() if not payload: return None data = json.loads(payload) return data if isinstance(data, dict) else None except Exception: return None def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: url = (query or "").strip() if not url.startswith(("http://", "https://")): return [] info = self._extract_info(url) if not isinstance(info, dict): return [] upload_date = str(info.get("upload_date") or "") release_date = str(info.get("release_date") or "") year = (release_date or upload_date)[:4] if (release_date or upload_date) else "" # Provide basic columns for the standard metadata selection table. # NOTE: This is best-effort; many extractors don't provide artist/album. artist = info.get("artist") or info.get("uploader") or info.get("channel") or "" album = info.get("album") or info.get("playlist_title") or "" title = info.get("title") or "" return [ { "title": title, "artist": str(artist or ""), "album": str(album or ""), "year": str(year or ""), "provider": self.name, "url": url, "raw": info, } ] def to_tags(self, item: Dict[str, Any]) -> List[str]: raw = item.get("raw") if not isinstance(raw, dict): return super().to_tags(item) tags: List[str] = [] try: from SYS.metadata import extract_ytdlp_tags except Exception: extract_ytdlp_tags = None # type: ignore[assignment] if extract_ytdlp_tags: try: tags.extend(extract_ytdlp_tags(raw)) except Exception: pass # Subtitle availability tags def _langs(value: Any) -> List[str]: if not isinstance(value, dict): return [] out: List[str] = [] for k in value.keys(): if isinstance(k, str) and k.strip(): out.append(k.strip().lower()) return sorted(set(out)) # If this is a playlist container, subtitle/captions are usually per-entry. info_for_subs: Dict[str, Any] = raw entries = raw.get("entries") if isinstance(entries, list) and entries: first = entries[0] if isinstance(first, dict): info_for_subs = first for lang in _langs(info_for_subs.get("subtitles")): tags.append(f"subs:{lang}") for lang in _langs(info_for_subs.get("automatic_captions")): tags.append(f"subs_auto:{lang}") # Always include source tag for parity with other providers. tags.append(f"source:{self.name}") # Dedup case-insensitively, preserve order. seen = set() out: List[str] = [] for t in tags: if not isinstance(t, str): continue s = t.strip() if not s: continue k = s.lower() if k in seen: continue seen.add(k) out.append(s) return out def _coerce_archive_field_list(value: Any) -> List[str]: """Coerce an Archive.org metadata field to a list of strings.""" if value is None: return [] if isinstance(value, list): out: List[str] = [] for v in value: try: s = str(v).strip() except Exception: continue if s: out.append(s) return out if isinstance(value, (tuple, set)): out = [] for v in value: try: s = str(v).strip() except Exception: continue if s: out.append(s) return out try: s = str(value).strip() except Exception: return [] return [s] if s else [] def archive_item_metadata_to_tags(archive_id: str, item_metadata: Dict[str, Any]) -> List[str]: """Coerce Archive.org metadata into a stable set of bibliographic tags.""" archive_id_clean = str(archive_id or "").strip() meta = item_metadata if isinstance(item_metadata, dict) else {} tags: List[str] = [] seen: set[str] = set() def _add(tag: str) -> None: try: t = str(tag).strip() except Exception: return if not t: return if t.lower() in seen: return seen.add(t.lower()) tags.append(t) if archive_id_clean: _add(f"internet_archive:{archive_id_clean}") for title in _coerce_archive_field_list(meta.get("title"))[:1]: _add(f"title:{title}") creators: List[str] = [] creators.extend(_coerce_archive_field_list(meta.get("creator"))) creators.extend(_coerce_archive_field_list(meta.get("author"))) for creator in creators[:3]: _add(f"author:{creator}") for publisher in _coerce_archive_field_list(meta.get("publisher"))[:3]: _add(f"publisher:{publisher}") for date_val in _coerce_archive_field_list(meta.get("date"))[:1]: _add(f"publish_date:{date_val}") for year_val in _coerce_archive_field_list(meta.get("year"))[:1]: _add(f"publish_date:{year_val}") for lang in _coerce_archive_field_list(meta.get("language"))[:3]: _add(f"language:{lang}") for subj in _coerce_archive_field_list(meta.get("subject"))[:15]: if len(subj) > 200: subj = subj[:200] _add(subj) def _clean_isbn(raw: str) -> str: return str(raw or "").replace("-", "").strip() for isbn in _coerce_archive_field_list(meta.get("isbn"))[:10]: isbn_clean = _clean_isbn(isbn) if isbn_clean: _add(f"isbn:{isbn_clean}") identifiers: List[str] = [] identifiers.extend(_coerce_archive_field_list(meta.get("identifier"))) identifiers.extend(_coerce_archive_field_list(meta.get("external-identifier"))) added_other = 0 for ident in identifiers: ident_s = str(ident or "").strip() if not ident_s: continue low = ident_s.lower() if low.startswith("urn:isbn:"): val = _clean_isbn(ident_s.split(":", 2)[-1]) if val: _add(f"isbn:{val}") continue if low.startswith("isbn:"): val = _clean_isbn(ident_s.split(":", 1)[-1]) if val: _add(f"isbn:{val}") continue if low.startswith("urn:oclc:"): val = ident_s.split(":", 2)[-1].strip() if val: _add(f"oclc:{val}") continue if low.startswith("oclc:"): val = ident_s.split(":", 1)[-1].strip() if val: _add(f"oclc:{val}") continue if low.startswith("urn:lccn:"): val = ident_s.split(":", 2)[-1].strip() if val: _add(f"lccn:{val}") continue if low.startswith("lccn:"): val = ident_s.split(":", 1)[-1].strip() if val: _add(f"lccn:{val}") continue if low.startswith("doi:"): val = ident_s.split(":", 1)[-1].strip() if val: _add(f"doi:{val}") continue if archive_id_clean and low == archive_id_clean.lower(): continue if added_other >= 5: continue if len(ident_s) > 200: ident_s = ident_s[:200] _add(f"identifier:{ident_s}") added_other += 1 return tags def fetch_archive_item_metadata(archive_id: str, *, timeout: int = 8) -> Dict[str, Any]: ident = str(archive_id or "").strip() if not ident: return {} resp = requests.get(f"https://archive.org/metadata/{ident}", timeout=int(timeout)) resp.raise_for_status() data = resp.json() if resp is not None else {} if not isinstance(data, dict): return {} meta = data.get("metadata") return meta if isinstance(meta, dict) else {} def scrape_isbn_metadata(isbn: str) -> List[str]: """Scrape metadata tags for an ISBN using OpenLibrary's books API.""" new_tags: List[str] = [] isbn_clean = str(isbn or "").replace("isbn:", "").replace("-", "").strip() if not isbn_clean: return [] url = f"https://openlibrary.org/api/books?bibkeys=ISBN:{isbn_clean}&jscmd=data&format=json" try: with HTTPClient() as client: response = client.get(url) response.raise_for_status() data = json.loads(response.content.decode("utf-8")) except Exception as exc: log(f"Failed to fetch ISBN metadata: {exc}", file=sys.stderr) return [] if not data: log(f"No ISBN metadata found for: {isbn}") return [] book_data = next(iter(data.values()), None) if not isinstance(book_data, dict): return [] if "title" in book_data: new_tags.append(f"title:{book_data['title']}") authors = book_data.get("authors") if isinstance(authors, list): for author in authors[:3]: if isinstance(author, dict) and author.get("name"): new_tags.append(f"author:{author['name']}") if book_data.get("publish_date"): new_tags.append(f"publish_date:{book_data['publish_date']}") publishers = book_data.get("publishers") if isinstance(publishers, list) and publishers: pub = publishers[0] if isinstance(pub, dict) and pub.get("name"): new_tags.append(f"publisher:{pub['name']}") if "description" in book_data: desc = book_data.get("description") if isinstance(desc, dict) and "value" in desc: desc = desc.get("value") if desc: desc_str = str(desc).strip() if desc_str: new_tags.append(f"description:{desc_str[:200]}") page_count = book_data.get("number_of_pages") if isinstance(page_count, int) and page_count > 0: new_tags.append(f"pages:{page_count}") identifiers = book_data.get("identifiers") if isinstance(identifiers, dict): def _first(value: Any) -> Any: if isinstance(value, list) and value: return value[0] return value for key, ns in ( ("openlibrary", "openlibrary"), ("lccn", "lccn"), ("oclc", "oclc"), ("goodreads", "goodreads"), ("librarything", "librarything"), ("doi", "doi"), ("internet_archive", "internet_archive"), ): val = _first(identifiers.get(key)) if val: new_tags.append(f"{ns}:{val}") debug(f"Found {len(new_tags)} tag(s) from ISBN lookup") return new_tags def scrape_openlibrary_metadata(olid: str) -> List[str]: """Scrape metadata tags for an OpenLibrary ID using the edition JSON endpoint.""" new_tags: List[str] = [] olid_text = str(olid or "").strip() if not olid_text: return [] olid_norm = olid_text try: if not olid_norm.startswith("OL"): olid_norm = f"OL{olid_norm}" if not olid_norm.endswith("M"): olid_norm = f"{olid_norm}M" except Exception: olid_norm = olid_text new_tags.append(f"openlibrary:{olid_norm}") olid_clean = olid_text.replace("OL", "").replace("M", "") if not olid_clean.isdigit(): olid_clean = olid_text if not olid_text.startswith("OL"): url = f"https://openlibrary.org/books/OL{olid_clean}M.json" else: url = f"https://openlibrary.org/books/{olid_text}.json" try: with HTTPClient() as client: response = client.get(url) response.raise_for_status() data = json.loads(response.content.decode("utf-8")) except Exception as exc: log(f"Failed to fetch OpenLibrary metadata: {exc}", file=sys.stderr) return [] if not isinstance(data, dict) or not data: log(f"No OpenLibrary metadata found for: {olid_text}") return [] if "title" in data: new_tags.append(f"title:{data['title']}") authors = data.get("authors") if isinstance(authors, list): for author in authors[:3]: if isinstance(author, dict) and author.get("name"): new_tags.append(f"author:{author['name']}") continue author_key = None if isinstance(author, dict): if isinstance(author.get("author"), dict): author_key = author.get("author", {}).get("key") if not author_key: author_key = author.get("key") if isinstance(author_key, str) and author_key.startswith("/"): try: author_url = f"https://openlibrary.org{author_key}.json" with HTTPClient(timeout=10) as client: author_resp = client.get(author_url) author_resp.raise_for_status() author_data = json.loads(author_resp.content.decode("utf-8")) if isinstance(author_data, dict) and author_data.get("name"): new_tags.append(f"author:{author_data['name']}") continue except Exception: pass if isinstance(author, str) and author: new_tags.append(f"author:{author}") if data.get("publish_date"): new_tags.append(f"publish_date:{data['publish_date']}") publishers = data.get("publishers") if isinstance(publishers, list) and publishers: pub = publishers[0] if isinstance(pub, dict) and pub.get("name"): new_tags.append(f"publisher:{pub['name']}") elif isinstance(pub, str) and pub: new_tags.append(f"publisher:{pub}") if "description" in data: desc = data.get("description") if isinstance(desc, dict) and "value" in desc: desc = desc.get("value") if desc: desc_str = str(desc).strip() if desc_str: new_tags.append(f"description:{desc_str[:200]}") page_count = data.get("number_of_pages") if isinstance(page_count, int) and page_count > 0: new_tags.append(f"pages:{page_count}") subjects = data.get("subjects") if isinstance(subjects, list): for subject in subjects[:10]: if isinstance(subject, str): subject_clean = subject.strip() if subject_clean and subject_clean not in new_tags: new_tags.append(subject_clean) identifiers = data.get("identifiers") if isinstance(identifiers, dict): def _first(value: Any) -> Any: if isinstance(value, list) and value: return value[0] return value for key, ns in ( ("isbn_10", "isbn_10"), ("isbn_13", "isbn_13"), ("lccn", "lccn"), ("oclc_numbers", "oclc"), ("goodreads", "goodreads"), ("internet_archive", "internet_archive"), ): val = _first(identifiers.get(key)) if val: new_tags.append(f"{ns}:{val}") ocaid = data.get("ocaid") if isinstance(ocaid, str) and ocaid.strip(): new_tags.append(f"internet_archive:{ocaid.strip()}") debug(f"Found {len(new_tags)} tag(s) from OpenLibrary lookup") return new_tags SAMPLE_ITEMS: List[Dict[str, Any]] = [ { "title": "Sample OpenLibrary book", "path": "https://openlibrary.org/books/OL123M", "openlibrary_id": "OL123M", "archive_id": "samplearchive123", "availability": "borrow", "availability_reason": "sample", "direct_url": "https://archive.org/download/sample.pdf", "author_name": ["OpenLibrary Demo"], "first_publish_year": 2023, "ia": ["samplearchive123"], }, ] try: from typing import Iterable from SYS.result_table_api import ColumnSpec, ResultModel, metadata_column, title_column from SYS.result_table_adapters import register_provider def _ensure_search_result(item: Any) -> SearchResult: if isinstance(item, SearchResult): return item if isinstance(item, dict): data = dict(item) title = str(data.get("title") or data.get("name") or "OpenLibrary") path = str(data.get("path") or data.get("url") or "") detail = str(data.get("detail") or "") annotations = list(data.get("annotations") or []) media_kind = str(data.get("media_kind") or "book") return SearchResult( table="openlibrary", title=title, path=path, detail=detail, annotations=annotations, media_kind=media_kind, columns=data.get("columns") or [], full_metadata={**data, "raw": dict(item)}, ) return SearchResult( table="openlibrary", title=str(item or "OpenLibrary"), path="", detail="", annotations=[], media_kind="book", full_metadata={"raw": {}}, ) def _adapter(items: Iterable[Any]) -> Iterable[ResultModel]: for item in items: sr = _ensure_search_result(item) metadata = dict(getattr(sr, "full_metadata", {}) or {}) raw = metadata.get("raw") if isinstance(raw, dict): normalized = normalize_record(raw) for key, val in normalized.items(): metadata.setdefault(key, val) def _make_url() -> str: candidate = ( metadata.get("selection_url") or metadata.get("direct_url") or metadata.get("url") or metadata.get("path") or sr.path or "" ) return str(candidate or "").strip() selection_url = _make_url() if selection_url: metadata["selection_url"] = selection_url authors_value = metadata.get("authors_display") or metadata.get("authors") or metadata.get("author_name") or "" if isinstance(authors_value, list): authors_value = ", ".join(str(v) for v in authors_value if v) authors_text = str(authors_value or "").strip() if authors_text: metadata["authors_display"] = authors_text year_value = metadata.get("year") or metadata.get("first_publish_year") if year_value and not isinstance(year_value, str): year_value = str(year_value) if year_value: metadata["year"] = str(year_value) metadata.setdefault("openlibrary_id", metadata.get("openlibrary_id") or metadata.get("olid")) metadata.setdefault("source", metadata.get("source") or "openlibrary") yield ResultModel( title=str(sr.title or metadata.get("title") or selection_url or "OpenLibrary"), path=selection_url or None, metadata=metadata, source="openlibrary", ) def _columns_factory(rows: List[ResultModel]) -> List[ColumnSpec]: cols: List[ColumnSpec] = [title_column()] def _has(key: str) -> bool: return any((row.metadata or {}).get(key) for row in rows) if _has("authors_display"): cols.append( ColumnSpec( "authors_display", "Author", lambda r: (r.metadata or {}).get("authors_display") or "", ) ) if _has("year"): cols.append(metadata_column("year", "Year")) if _has("availability"): cols.append(metadata_column("availability", "Avail")) if _has("archive_id"): cols.append(metadata_column("archive_id", "Archive ID")) if _has("openlibrary_id"): cols.append(metadata_column("openlibrary_id", "OLID")) return cols def _selection_fn(row: ResultModel) -> List[str]: metadata = row.metadata or {} url = str(metadata.get("selection_url") or row.path or "").strip() if url: return ["-url", url] return ["-title", row.title or ""] register_provider( "openlibrary", _adapter, columns=_columns_factory, selection_fn=_selection_fn, metadata={"description": "OpenLibrary search provider (JSON result table template)"}, ) except Exception: pass # Registry --------------------------------------------------------------- _METADATA_PROVIDERS: Dict[str, Type[MetadataProvider]] = { "itunes": ITunesProvider, "openlibrary": OpenLibraryMetadataProvider, "googlebooks": GoogleBooksMetadataProvider, "google": GoogleBooksMetadataProvider, "isbnsearch": ISBNsearchMetadataProvider, "musicbrainz": MusicBrainzMetadataProvider, "imdb": ImdbMetadataProvider, "ytdlp": YtdlpMetadataProvider, } def register_provider(name: str, provider_cls: Type[MetadataProvider]) -> None: _METADATA_PROVIDERS[name.lower()] = provider_cls def list_metadata_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]: availability: Dict[str, bool] = {} for name, cls in _METADATA_PROVIDERS.items(): try: _ = cls(config) # Basic availability check: perform lightweight validation if defined availability[name] = True except Exception: availability[name] = False return availability def get_metadata_provider(name: str, config: Optional[Dict[str, Any]] = None ) -> Optional[MetadataProvider]: cls = _METADATA_PROVIDERS.get(name.lower()) if not cls: return None try: return cls(config) except Exception as exc: log(f"Provider init failed for '{name}': {exc}", file=sys.stderr) return None