Files
Medios-Macina/Provider/metadata_provider.py
2025-12-22 02:11:53 -08:00

550 lines
18 KiB
Python

from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type, cast
import requests
import sys
import json
import subprocess
from SYS.logger import log, debug
try: # Optional dependency
import musicbrainzngs # type: ignore
except ImportError: # pragma: no cover - optional
musicbrainzngs = None
try: # Optional dependency
import yt_dlp # type: ignore
except ImportError: # pragma: no cover - optional
yt_dlp = None
class MetadataProvider(ABC):
"""Base class for metadata providers (music, movies, books, etc.)."""
def __init__(self, config: Optional[Dict[str, Any]] = None) -> None:
self.config = config or {}
@property
def name(self) -> str:
return self.__class__.__name__.replace("Provider", "").lower()
@abstractmethod
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Return a list of candidate metadata records."""
def to_tags(self, item: Dict[str, Any]) -> List[str]:
"""Convert a result item into a list of tags."""
tags: List[str] = []
title = item.get("title")
artist = item.get("artist")
album = item.get("album")
year = item.get("year")
if title:
tags.append(f"title:{title}")
if artist:
tags.append(f"artist:{artist}")
if album:
tags.append(f"album:{album}")
if year:
tags.append(f"year:{year}")
tags.append(f"source:{self.name}")
return tags
class ITunesProvider(MetadataProvider):
"""Metadata provider using the iTunes Search API."""
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
params = {"term": query, "media": "music", "entity": "song", "limit": limit}
try:
resp = requests.get("https://itunes.apple.com/search", params=params, timeout=10)
resp.raise_for_status()
results = resp.json().get("results", [])
except Exception as exc:
log(f"iTunes search failed: {exc}", file=sys.stderr)
return []
items: List[Dict[str, Any]] = []
for r in results:
item = {
"title": r.get("trackName"),
"artist": r.get("artistName"),
"album": r.get("collectionName"),
"year": str(r.get("releaseDate", ""))[:4],
"provider": self.name,
"raw": r,
}
items.append(item)
debug(f"iTunes returned {len(items)} items for '{query}'")
return items
class OpenLibraryMetadataProvider(MetadataProvider):
"""Metadata provider for OpenLibrary book metadata."""
@property
def name(self) -> str: # type: ignore[override]
return "openlibrary"
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
query_clean = (query or "").strip()
if not query_clean:
return []
try:
# Prefer ISBN-specific search when the query looks like one
if query_clean.replace("-", "").isdigit() and len(query_clean.replace("-", "")) in (10, 13):
q = f"isbn:{query_clean.replace('-', '')}"
else:
q = query_clean
resp = requests.get(
"https://openlibrary.org/search.json",
params={"q": q, "limit": limit},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
log(f"OpenLibrary search failed: {exc}", file=sys.stderr)
return []
items: List[Dict[str, Any]] = []
for doc in data.get("docs", [])[:limit]:
authors = doc.get("author_name") or []
publisher = ""
publishers = doc.get("publisher") or []
if isinstance(publishers, list) and publishers:
publisher = publishers[0]
# Prefer 13-digit ISBN when available, otherwise 10-digit
isbn_list = doc.get("isbn") or []
isbn_13 = next((i for i in isbn_list if len(str(i)) == 13), None)
isbn_10 = next((i for i in isbn_list if len(str(i)) == 10), None)
# Derive OLID from key
olid = ""
key = doc.get("key", "")
if isinstance(key, str) and key:
olid = key.split("/")[-1]
items.append({
"title": doc.get("title") or "",
"artist": ", ".join(authors) if authors else "",
"album": publisher,
"year": str(doc.get("first_publish_year") or ""),
"provider": self.name,
"authors": authors,
"publisher": publisher,
"identifiers": {
"isbn_13": isbn_13,
"isbn_10": isbn_10,
"openlibrary": olid,
"oclc": (doc.get("oclc_numbers") or [None])[0],
"lccn": (doc.get("lccn") or [None])[0],
},
"description": None,
})
return items
def to_tags(self, item: Dict[str, Any]) -> List[str]:
tags: List[str] = []
title = item.get("title")
authors = item.get("authors") or []
publisher = item.get("publisher")
year = item.get("year")
description = item.get("description") or ""
if title:
tags.append(f"title:{title}")
for author in authors:
if author:
tags.append(f"author:{author}")
if publisher:
tags.append(f"publisher:{publisher}")
if year:
tags.append(f"year:{year}")
if description:
tags.append(f"description:{description[:200]}")
identifiers = item.get("identifiers") or {}
for key, value in identifiers.items():
if value:
tags.append(f"{key}:{value}")
tags.append(f"source:{self.name}")
return tags
class GoogleBooksMetadataProvider(MetadataProvider):
"""Metadata provider for Google Books volumes API."""
@property
def name(self) -> str: # type: ignore[override]
return "googlebooks"
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
query_clean = (query or "").strip()
if not query_clean:
return []
# Prefer ISBN queries when possible
if query_clean.replace("-", "").isdigit() and len(query_clean.replace("-", "")) in (10, 13):
q = f"isbn:{query_clean.replace('-', '')}"
else:
q = query_clean
try:
resp = requests.get(
"https://www.googleapis.com/books/v1/volumes",
params={"q": q, "maxResults": limit},
timeout=10,
)
resp.raise_for_status()
payload = resp.json()
except Exception as exc:
log(f"Google Books search failed: {exc}", file=sys.stderr)
return []
items: List[Dict[str, Any]] = []
for volume in payload.get("items", [])[:limit]:
info = volume.get("volumeInfo") or {}
authors = info.get("authors") or []
publisher = info.get("publisher", "")
published_date = info.get("publishedDate", "")
year = str(published_date)[:4] if published_date else ""
identifiers_raw = info.get("industryIdentifiers") or []
identifiers: Dict[str, Optional[str]] = {"googlebooks": volume.get("id")}
for ident in identifiers_raw:
if not isinstance(ident, dict):
continue
ident_type = ident.get("type", "").lower()
ident_value = ident.get("identifier")
if not ident_value:
continue
if ident_type == "isbn_13":
identifiers.setdefault("isbn_13", ident_value)
elif ident_type == "isbn_10":
identifiers.setdefault("isbn_10", ident_value)
else:
identifiers.setdefault(ident_type, ident_value)
items.append({
"title": info.get("title") or "",
"artist": ", ".join(authors) if authors else "",
"album": publisher,
"year": year,
"provider": self.name,
"authors": authors,
"publisher": publisher,
"identifiers": identifiers,
"description": info.get("description", ""),
})
return items
def to_tags(self, item: Dict[str, Any]) -> List[str]:
tags: List[str] = []
title = item.get("title")
authors = item.get("authors") or []
publisher = item.get("publisher")
year = item.get("year")
description = item.get("description") or ""
if title:
tags.append(f"title:{title}")
for author in authors:
if author:
tags.append(f"author:{author}")
if publisher:
tags.append(f"publisher:{publisher}")
if year:
tags.append(f"year:{year}")
if description:
tags.append(f"description:{description[:200]}")
identifiers = item.get("identifiers") or {}
for key, value in identifiers.items():
if value:
tags.append(f"{key}:{value}")
tags.append(f"source:{self.name}")
return tags
class MusicBrainzMetadataProvider(MetadataProvider):
"""Metadata provider for MusicBrainz recordings."""
@property
def name(self) -> str: # type: ignore[override]
return "musicbrainz"
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
if not musicbrainzngs:
log("musicbrainzngs is not installed; skipping MusicBrainz scrape", file=sys.stderr)
return []
q = (query or "").strip()
if not q:
return []
try:
# Ensure user agent is set (required by MusicBrainz)
musicbrainzngs.set_useragent("Medeia-Macina", "0.1")
except Exception:
pass
try:
resp = musicbrainzngs.search_recordings(query=q, limit=limit)
recordings = resp.get("recording-list") or resp.get("recordings") or []
except Exception as exc:
log(f"MusicBrainz search failed: {exc}", file=sys.stderr)
return []
items: List[Dict[str, Any]] = []
for rec in recordings[:limit]:
if not isinstance(rec, dict):
continue
title = rec.get("title") or ""
artist = ""
artist_credit = rec.get("artist-credit") or rec.get("artist_credit")
if isinstance(artist_credit, list) and artist_credit:
first = artist_credit[0]
if isinstance(first, dict):
artist = first.get("name") or first.get("artist", {}).get("name", "")
elif isinstance(first, str):
artist = first
album = ""
release_list = rec.get("release-list") or rec.get("releases") or rec.get("release")
if isinstance(release_list, list) and release_list:
first_rel = release_list[0]
if isinstance(first_rel, dict):
album = first_rel.get("title", "") or ""
release_date = first_rel.get("date") or ""
else:
album = str(first_rel)
release_date = ""
else:
release_date = rec.get("first-release-date") or ""
year = str(release_date)[:4] if release_date else ""
mbid = rec.get("id") or ""
items.append({
"title": title,
"artist": artist,
"album": album,
"year": year,
"provider": self.name,
"mbid": mbid,
"raw": rec,
})
return items
def to_tags(self, item: Dict[str, Any]) -> List[str]:
tags = super().to_tags(item)
mbid = item.get("mbid")
if mbid:
tags.append(f"musicbrainz:{mbid}")
return tags
class YtdlpMetadataProvider(MetadataProvider):
"""Metadata provider that extracts tags from a supported URL using yt-dlp.
This does NOT download media; it only probes metadata.
"""
@property
def name(self) -> str: # type: ignore[override]
return "ytdlp"
def _extract_info(self, url: str) -> Optional[Dict[str, Any]]:
url = (url or "").strip()
if not url:
return None
# Prefer Python module when available.
if yt_dlp is not None:
try:
opts: Any = {
"quiet": True,
"no_warnings": True,
"skip_download": True,
"noprogress": True,
"socket_timeout": 15,
"retries": 1,
"playlist_items": "1-10",
}
with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore[attr-defined]
info = ydl.extract_info(url, download=False)
return cast(Dict[str, Any], info) if isinstance(info, dict) else None
except Exception:
pass
# Fallback to CLI.
try:
cmd = [
"yt-dlp",
"-J",
"--no-warnings",
"--skip-download",
"--playlist-items",
"1-10",
url,
]
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if proc.returncode != 0:
return None
payload = (proc.stdout or "").strip()
if not payload:
return None
data = json.loads(payload)
return data if isinstance(data, dict) else None
except Exception:
return None
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
url = (query or "").strip()
if not url.startswith(("http://", "https://")):
return []
info = self._extract_info(url)
if not isinstance(info, dict):
return []
upload_date = str(info.get("upload_date") or "")
release_date = str(info.get("release_date") or "")
year = (release_date or upload_date)[:4] if (release_date or upload_date) else ""
# Provide basic columns for the standard metadata selection table.
# NOTE: This is best-effort; many extractors don't provide artist/album.
artist = (
info.get("artist")
or info.get("uploader")
or info.get("channel")
or ""
)
album = info.get("album") or info.get("playlist_title") or ""
title = info.get("title") or ""
return [
{
"title": title,
"artist": str(artist or ""),
"album": str(album or ""),
"year": str(year or ""),
"provider": self.name,
"url": url,
"raw": info,
}
]
def to_tags(self, item: Dict[str, Any]) -> List[str]:
raw = item.get("raw")
if not isinstance(raw, dict):
return super().to_tags(item)
tags: List[str] = []
try:
from metadata import extract_ytdlp_tags
except Exception:
extract_ytdlp_tags = None # type: ignore[assignment]
if extract_ytdlp_tags:
try:
tags.extend(extract_ytdlp_tags(raw))
except Exception:
pass
# Subtitle availability tags
def _langs(value: Any) -> List[str]:
if not isinstance(value, dict):
return []
out: List[str] = []
for k in value.keys():
if isinstance(k, str) and k.strip():
out.append(k.strip().lower())
return sorted(set(out))
# If this is a playlist container, subtitle/captions are usually per-entry.
info_for_subs: Dict[str, Any] = raw
entries = raw.get("entries")
if isinstance(entries, list) and entries:
first = entries[0]
if isinstance(first, dict):
info_for_subs = first
for lang in _langs(info_for_subs.get("subtitles")):
tags.append(f"subs:{lang}")
for lang in _langs(info_for_subs.get("automatic_captions")):
tags.append(f"subs_auto:{lang}")
# Always include source tag for parity with other providers.
tags.append(f"source:{self.name}")
# Dedup case-insensitively, preserve order.
seen = set()
out: List[str] = []
for t in tags:
if not isinstance(t, str):
continue
s = t.strip()
if not s:
continue
k = s.lower()
if k in seen:
continue
seen.add(k)
out.append(s)
return out
# Registry ---------------------------------------------------------------
_METADATA_PROVIDERS: Dict[str, Type[MetadataProvider]] = {
"itunes": ITunesProvider,
"openlibrary": OpenLibraryMetadataProvider,
"googlebooks": GoogleBooksMetadataProvider,
"google": GoogleBooksMetadataProvider,
"musicbrainz": MusicBrainzMetadataProvider,
"ytdlp": YtdlpMetadataProvider,
}
def register_provider(name: str, provider_cls: Type[MetadataProvider]) -> None:
_METADATA_PROVIDERS[name.lower()] = provider_cls
def list_metadata_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]:
availability: Dict[str, bool] = {}
for name, cls in _METADATA_PROVIDERS.items():
try:
_ = cls(config)
# Basic availability check: perform lightweight validation if defined
availability[name] = True
except Exception:
availability[name] = False
return availability
def get_metadata_provider(name: str, config: Optional[Dict[str, Any]] = None) -> Optional[MetadataProvider]:
cls = _METADATA_PROVIDERS.get(name.lower())
if not cls:
return None
try:
return cls(config)
except Exception as exc:
log(f"Provider init failed for '{name}': {exc}", file=sys.stderr)
return None