Files
Medios-Macina/Provider/metadata_provider.py

757 lines
25 KiB
Python
Raw Normal View History

2025-12-05 03:42:57 -08:00
from __future__ import annotations
from abc import ABC, abstractmethod
2025-12-22 02:11:53 -08:00
from typing import Any, Dict, List, Optional, Type, cast
2025-12-25 04:49:22 -08:00
import html as html_std
import re
2025-12-05 03:42:57 -08:00
import requests
import sys
2025-12-22 02:11:53 -08:00
import json
import subprocess
2025-12-05 03:42:57 -08:00
2025-12-11 19:04:02 -08:00
from SYS.logger import log, debug
2025-12-05 03:42:57 -08:00
2025-12-07 00:21:30 -08:00
try: # Optional dependency
import musicbrainzngs # type: ignore
except ImportError: # pragma: no cover - optional
musicbrainzngs = None
2025-12-05 03:42:57 -08:00
2025-12-22 02:11:53 -08:00
try: # Optional dependency
import yt_dlp # type: ignore
except ImportError: # pragma: no cover - optional
yt_dlp = None
2025-12-05 03:42:57 -08:00
class MetadataProvider(ABC):
"""Base class for metadata providers (music, movies, books, etc.)."""
def __init__(self, config: Optional[Dict[str, Any]] = None) -> None:
self.config = config or {}
@property
def name(self) -> str:
return self.__class__.__name__.replace("Provider", "").lower()
@abstractmethod
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Return a list of candidate metadata records."""
def to_tags(self, item: Dict[str, Any]) -> List[str]:
"""Convert a result item into a list of tags."""
tags: List[str] = []
title = item.get("title")
artist = item.get("artist")
album = item.get("album")
year = item.get("year")
if title:
tags.append(f"title:{title}")
if artist:
tags.append(f"artist:{artist}")
if album:
tags.append(f"album:{album}")
if year:
tags.append(f"year:{year}")
tags.append(f"source:{self.name}")
return tags
class ITunesProvider(MetadataProvider):
"""Metadata provider using the iTunes Search API."""
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
params = {"term": query, "media": "music", "entity": "song", "limit": limit}
try:
resp = requests.get("https://itunes.apple.com/search", params=params, timeout=10)
resp.raise_for_status()
results = resp.json().get("results", [])
except Exception as exc:
log(f"iTunes search failed: {exc}", file=sys.stderr)
return []
items: List[Dict[str, Any]] = []
for r in results:
item = {
"title": r.get("trackName"),
"artist": r.get("artistName"),
"album": r.get("collectionName"),
"year": str(r.get("releaseDate", ""))[:4],
"provider": self.name,
"raw": r,
}
items.append(item)
debug(f"iTunes returned {len(items)} items for '{query}'")
return items
2025-12-06 00:10:19 -08:00
class OpenLibraryMetadataProvider(MetadataProvider):
"""Metadata provider for OpenLibrary book metadata."""
@property
def name(self) -> str: # type: ignore[override]
return "openlibrary"
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
query_clean = (query or "").strip()
if not query_clean:
return []
try:
# Prefer ISBN-specific search when the query looks like one
2025-12-29 17:05:03 -08:00
if query_clean.replace("-", "").isdigit() and len(query_clean.replace("-", "")) in (
10,
13,
):
2025-12-06 00:10:19 -08:00
q = f"isbn:{query_clean.replace('-', '')}"
else:
q = query_clean
resp = requests.get(
"https://openlibrary.org/search.json",
params={"q": q, "limit": limit},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
log(f"OpenLibrary search failed: {exc}", file=sys.stderr)
return []
items: List[Dict[str, Any]] = []
for doc in data.get("docs", [])[:limit]:
authors = doc.get("author_name") or []
publisher = ""
publishers = doc.get("publisher") or []
if isinstance(publishers, list) and publishers:
publisher = publishers[0]
# Prefer 13-digit ISBN when available, otherwise 10-digit
isbn_list = doc.get("isbn") or []
isbn_13 = next((i for i in isbn_list if len(str(i)) == 13), None)
isbn_10 = next((i for i in isbn_list if len(str(i)) == 10), None)
# Derive OLID from key
olid = ""
key = doc.get("key", "")
if isinstance(key, str) and key:
olid = key.split("/")[-1]
2025-12-29 17:05:03 -08:00
items.append(
{
"title": doc.get("title") or "",
"artist": ", ".join(authors) if authors else "",
"album": publisher,
"year": str(doc.get("first_publish_year") or ""),
"provider": self.name,
"authors": authors,
"publisher": publisher,
"identifiers": {
"isbn_13": isbn_13,
"isbn_10": isbn_10,
"openlibrary": olid,
"oclc": (doc.get("oclc_numbers") or [None])[0],
"lccn": (doc.get("lccn") or [None])[0],
},
"description": None,
}
)
2025-12-06 00:10:19 -08:00
return items
def to_tags(self, item: Dict[str, Any]) -> List[str]:
tags: List[str] = []
title = item.get("title")
authors = item.get("authors") or []
publisher = item.get("publisher")
year = item.get("year")
description = item.get("description") or ""
if title:
tags.append(f"title:{title}")
for author in authors:
if author:
tags.append(f"author:{author}")
if publisher:
tags.append(f"publisher:{publisher}")
if year:
tags.append(f"year:{year}")
if description:
tags.append(f"description:{description[:200]}")
identifiers = item.get("identifiers") or {}
for key, value in identifiers.items():
if value:
tags.append(f"{key}:{value}")
tags.append(f"source:{self.name}")
return tags
class GoogleBooksMetadataProvider(MetadataProvider):
"""Metadata provider for Google Books volumes API."""
@property
def name(self) -> str: # type: ignore[override]
return "googlebooks"
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
query_clean = (query or "").strip()
if not query_clean:
return []
# Prefer ISBN queries when possible
if query_clean.replace("-", "").isdigit() and len(query_clean.replace("-", "")) in (10, 13):
q = f"isbn:{query_clean.replace('-', '')}"
else:
q = query_clean
try:
resp = requests.get(
"https://www.googleapis.com/books/v1/volumes",
params={"q": q, "maxResults": limit},
timeout=10,
)
resp.raise_for_status()
payload = resp.json()
except Exception as exc:
log(f"Google Books search failed: {exc}", file=sys.stderr)
return []
items: List[Dict[str, Any]] = []
for volume in payload.get("items", [])[:limit]:
info = volume.get("volumeInfo") or {}
authors = info.get("authors") or []
publisher = info.get("publisher", "")
published_date = info.get("publishedDate", "")
year = str(published_date)[:4] if published_date else ""
identifiers_raw = info.get("industryIdentifiers") or []
identifiers: Dict[str, Optional[str]] = {"googlebooks": volume.get("id")}
for ident in identifiers_raw:
if not isinstance(ident, dict):
continue
ident_type = ident.get("type", "").lower()
ident_value = ident.get("identifier")
if not ident_value:
continue
if ident_type == "isbn_13":
identifiers.setdefault("isbn_13", ident_value)
elif ident_type == "isbn_10":
identifiers.setdefault("isbn_10", ident_value)
else:
identifiers.setdefault(ident_type, ident_value)
2025-12-29 17:05:03 -08:00
items.append(
{
"title": info.get("title") or "",
"artist": ", ".join(authors) if authors else "",
"album": publisher,
"year": year,
"provider": self.name,
"authors": authors,
"publisher": publisher,
"identifiers": identifiers,
"description": info.get("description", ""),
}
)
2025-12-06 00:10:19 -08:00
return items
def to_tags(self, item: Dict[str, Any]) -> List[str]:
tags: List[str] = []
title = item.get("title")
authors = item.get("authors") or []
publisher = item.get("publisher")
year = item.get("year")
description = item.get("description") or ""
if title:
tags.append(f"title:{title}")
for author in authors:
if author:
tags.append(f"author:{author}")
if publisher:
tags.append(f"publisher:{publisher}")
if year:
tags.append(f"year:{year}")
if description:
tags.append(f"description:{description[:200]}")
identifiers = item.get("identifiers") or {}
for key, value in identifiers.items():
if value:
tags.append(f"{key}:{value}")
tags.append(f"source:{self.name}")
return tags
2025-12-25 04:49:22 -08:00
class ISBNsearchMetadataProvider(MetadataProvider):
"""Metadata provider that scrapes isbnsearch.org by ISBN.
This is a best-effort HTML scrape. It expects the query to be an ISBN.
"""
@property
def name(self) -> str: # type: ignore[override]
return "isbnsearch"
@staticmethod
def _strip_html_to_text(raw: str) -> str:
s = html_std.unescape(str(raw or ""))
s = re.sub(r"(?i)<br\s*/?>", "\n", s)
s = re.sub(r"<[^>]+>", " ", s)
s = re.sub(r"\s+", " ", s)
return s.strip()
@staticmethod
def _clean_isbn(query: str) -> str:
s = str(query or "").strip()
if not s:
return ""
s = s.replace("isbn:", "").replace("ISBN:", "")
s = re.sub(r"[^0-9Xx]", "", s).upper()
if len(s) in (10, 13):
return s
# Try to locate an ISBN-like token inside the query.
m = re.search(r"\b(?:97[89])?\d{9}[\dXx]\b", s)
return str(m.group(0)).upper() if m else ""
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
_ = limit
isbn = self._clean_isbn(query)
if not isbn:
return []
url = f"https://isbnsearch.org/isbn/{isbn}"
try:
resp = requests.get(url, timeout=10)
resp.raise_for_status()
html = str(resp.text or "")
if not html:
return []
except Exception as exc:
log(f"ISBNsearch scrape failed: {exc}", file=sys.stderr)
return []
title = ""
m_title = re.search(r"(?is)<h1\b[^>]*>(.*?)</h1>", html)
if m_title:
title = self._strip_html_to_text(m_title.group(1))
raw_fields: Dict[str, str] = {}
strong_matches = list(re.finditer(r"(?is)<strong\b[^>]*>(.*?)</strong>", html))
for idx, m in enumerate(strong_matches):
label_raw = self._strip_html_to_text(m.group(1))
label = str(label_raw or "").strip()
if not label:
continue
if label.endswith(":"):
label = label[:-1].strip()
chunk_start = m.end()
# Stop at next <strong> or end of document.
2025-12-29 17:05:03 -08:00
chunk_end = (
strong_matches[idx + 1].start() if (idx + 1) < len(strong_matches) else len(html)
)
2025-12-25 04:49:22 -08:00
chunk = html[chunk_start:chunk_end]
# Prefer stopping within the same paragraph when possible.
m_end = re.search(r"(?is)(</p>|<br\s*/?>)", chunk)
if m_end:
chunk = chunk[: m_end.start()]
val_text = self._strip_html_to_text(chunk)
if not val_text:
continue
raw_fields[label] = val_text
def _get(*labels: str) -> str:
for lab in labels:
for k, v in raw_fields.items():
if str(k).strip().lower() == str(lab).strip().lower():
return str(v or "").strip()
return ""
# Map common ISBNsearch labels.
author_text = _get("Author", "Authors", "Author(s)")
publisher = _get("Publisher")
published = _get("Published", "Publication Date", "Publish Date")
language = _get("Language")
pages = _get("Pages")
isbn_13 = _get("ISBN-13", "ISBN13")
isbn_10 = _get("ISBN-10", "ISBN10")
year = ""
if published:
m_year = re.search(r"\b(\d{4})\b", published)
year = str(m_year.group(1)) if m_year else ""
authors: List[str] = []
if author_text:
# Split on common separators; keep multi-part names intact.
for part in re.split(r"\s*(?:,|;|\band\b|\&|\|)\s*", author_text, flags=re.IGNORECASE):
p = str(part or "").strip()
if p:
authors.append(p)
# Prefer parsed title, but fall back to og:title if needed.
if not title:
2025-12-29 17:05:03 -08:00
m_og = re.search(
r"(?is)<meta\b[^>]*property=['\"]og:title['\"][^>]*content=['\"](.*?)['\"][^>]*>",
html,
)
2025-12-25 04:49:22 -08:00
if m_og:
title = self._strip_html_to_text(m_og.group(1))
# Ensure ISBN tokens are normalized.
isbn_tokens: List[str] = []
for token in [isbn_13, isbn_10, isbn]:
t = self._clean_isbn(token)
if t and t not in isbn_tokens:
isbn_tokens.append(t)
item: Dict[str, Any] = {
"title": title or "",
# Keep UI columns compatible with the generic metadata table.
"artist": ", ".join(authors) if authors else "",
"album": publisher or "",
"year": year or "",
"provider": self.name,
"authors": authors,
"publisher": publisher or "",
"language": language or "",
"pages": pages or "",
"identifiers": {
"isbn_13": next((t for t in isbn_tokens if len(t) == 13), None),
"isbn_10": next((t for t in isbn_tokens if len(t) == 10), None),
},
"raw_fields": raw_fields,
}
# Only return usable items.
if not item.get("title") and not any(item["identifiers"].values()):
return []
return [item]
def to_tags(self, item: Dict[str, Any]) -> List[str]:
tags: List[str] = []
title = str(item.get("title") or "").strip()
if title:
tags.append(f"title:{title}")
authors = item.get("authors") or []
if isinstance(authors, list):
for a in authors:
a = str(a or "").strip()
if a:
tags.append(f"author:{a}")
publisher = str(item.get("publisher") or "").strip()
if publisher:
tags.append(f"publisher:{publisher}")
year = str(item.get("year") or "").strip()
if year:
tags.append(f"year:{year}")
language = str(item.get("language") or "").strip()
if language:
tags.append(f"language:{language}")
identifiers = item.get("identifiers") or {}
if isinstance(identifiers, dict):
for key in ("isbn_13", "isbn_10"):
val = identifiers.get(key)
if val:
tags.append(f"isbn:{val}")
tags.append(f"source:{self.name}")
# Dedup case-insensitively, preserve order.
seen: set[str] = set()
out: List[str] = []
for t in tags:
s = str(t or "").strip()
if not s:
continue
k = s.lower()
if k in seen:
continue
seen.add(k)
out.append(s)
return out
2025-12-07 00:21:30 -08:00
class MusicBrainzMetadataProvider(MetadataProvider):
"""Metadata provider for MusicBrainz recordings."""
@property
def name(self) -> str: # type: ignore[override]
return "musicbrainz"
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
if not musicbrainzngs:
log("musicbrainzngs is not installed; skipping MusicBrainz scrape", file=sys.stderr)
return []
q = (query or "").strip()
if not q:
return []
try:
# Ensure user agent is set (required by MusicBrainz)
musicbrainzngs.set_useragent("Medeia-Macina", "0.1")
except Exception:
pass
try:
resp = musicbrainzngs.search_recordings(query=q, limit=limit)
recordings = resp.get("recording-list") or resp.get("recordings") or []
except Exception as exc:
log(f"MusicBrainz search failed: {exc}", file=sys.stderr)
return []
items: List[Dict[str, Any]] = []
for rec in recordings[:limit]:
if not isinstance(rec, dict):
continue
title = rec.get("title") or ""
artist = ""
artist_credit = rec.get("artist-credit") or rec.get("artist_credit")
if isinstance(artist_credit, list) and artist_credit:
first = artist_credit[0]
if isinstance(first, dict):
artist = first.get("name") or first.get("artist", {}).get("name", "")
elif isinstance(first, str):
artist = first
album = ""
release_list = rec.get("release-list") or rec.get("releases") or rec.get("release")
if isinstance(release_list, list) and release_list:
first_rel = release_list[0]
if isinstance(first_rel, dict):
album = first_rel.get("title", "") or ""
release_date = first_rel.get("date") or ""
else:
album = str(first_rel)
release_date = ""
else:
release_date = rec.get("first-release-date") or ""
year = str(release_date)[:4] if release_date else ""
mbid = rec.get("id") or ""
2025-12-29 17:05:03 -08:00
items.append(
{
"title": title,
"artist": artist,
"album": album,
"year": year,
"provider": self.name,
"mbid": mbid,
"raw": rec,
}
)
2025-12-07 00:21:30 -08:00
return items
def to_tags(self, item: Dict[str, Any]) -> List[str]:
tags = super().to_tags(item)
mbid = item.get("mbid")
if mbid:
tags.append(f"musicbrainz:{mbid}")
return tags
2025-12-22 02:11:53 -08:00
class YtdlpMetadataProvider(MetadataProvider):
"""Metadata provider that extracts tags from a supported URL using yt-dlp.
This does NOT download media; it only probes metadata.
"""
@property
def name(self) -> str: # type: ignore[override]
return "ytdlp"
def _extract_info(self, url: str) -> Optional[Dict[str, Any]]:
url = (url or "").strip()
if not url:
return None
# Prefer Python module when available.
if yt_dlp is not None:
try:
opts: Any = {
"quiet": True,
"no_warnings": True,
"skip_download": True,
"noprogress": True,
"socket_timeout": 15,
"retries": 1,
"playlist_items": "1-10",
}
with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore[attr-defined]
info = ydl.extract_info(url, download=False)
return cast(Dict[str, Any], info) if isinstance(info, dict) else None
except Exception:
pass
# Fallback to CLI.
try:
cmd = [
"yt-dlp",
"-J",
"--no-warnings",
"--skip-download",
"--playlist-items",
"1-10",
url,
]
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if proc.returncode != 0:
return None
payload = (proc.stdout or "").strip()
if not payload:
return None
data = json.loads(payload)
return data if isinstance(data, dict) else None
except Exception:
return None
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
url = (query or "").strip()
if not url.startswith(("http://", "https://")):
return []
info = self._extract_info(url)
if not isinstance(info, dict):
return []
upload_date = str(info.get("upload_date") or "")
release_date = str(info.get("release_date") or "")
year = (release_date or upload_date)[:4] if (release_date or upload_date) else ""
# Provide basic columns for the standard metadata selection table.
# NOTE: This is best-effort; many extractors don't provide artist/album.
2025-12-29 17:05:03 -08:00
artist = info.get("artist") or info.get("uploader") or info.get("channel") or ""
2025-12-22 02:11:53 -08:00
album = info.get("album") or info.get("playlist_title") or ""
title = info.get("title") or ""
return [
{
"title": title,
"artist": str(artist or ""),
"album": str(album or ""),
"year": str(year or ""),
"provider": self.name,
"url": url,
"raw": info,
}
]
def to_tags(self, item: Dict[str, Any]) -> List[str]:
raw = item.get("raw")
if not isinstance(raw, dict):
return super().to_tags(item)
tags: List[str] = []
try:
from metadata import extract_ytdlp_tags
except Exception:
extract_ytdlp_tags = None # type: ignore[assignment]
if extract_ytdlp_tags:
try:
tags.extend(extract_ytdlp_tags(raw))
except Exception:
pass
# Subtitle availability tags
def _langs(value: Any) -> List[str]:
if not isinstance(value, dict):
return []
out: List[str] = []
for k in value.keys():
if isinstance(k, str) and k.strip():
out.append(k.strip().lower())
return sorted(set(out))
# If this is a playlist container, subtitle/captions are usually per-entry.
info_for_subs: Dict[str, Any] = raw
entries = raw.get("entries")
if isinstance(entries, list) and entries:
first = entries[0]
if isinstance(first, dict):
info_for_subs = first
for lang in _langs(info_for_subs.get("subtitles")):
tags.append(f"subs:{lang}")
for lang in _langs(info_for_subs.get("automatic_captions")):
tags.append(f"subs_auto:{lang}")
# Always include source tag for parity with other providers.
tags.append(f"source:{self.name}")
# Dedup case-insensitively, preserve order.
seen = set()
out: List[str] = []
for t in tags:
if not isinstance(t, str):
continue
s = t.strip()
if not s:
continue
k = s.lower()
if k in seen:
continue
seen.add(k)
out.append(s)
return out
2025-12-05 03:42:57 -08:00
# Registry ---------------------------------------------------------------
_METADATA_PROVIDERS: Dict[str, Type[MetadataProvider]] = {
"itunes": ITunesProvider,
2025-12-06 00:10:19 -08:00
"openlibrary": OpenLibraryMetadataProvider,
"googlebooks": GoogleBooksMetadataProvider,
"google": GoogleBooksMetadataProvider,
2025-12-25 04:49:22 -08:00
"isbnsearch": ISBNsearchMetadataProvider,
2025-12-07 00:21:30 -08:00
"musicbrainz": MusicBrainzMetadataProvider,
2025-12-22 02:11:53 -08:00
"ytdlp": YtdlpMetadataProvider,
2025-12-05 03:42:57 -08:00
}
def register_provider(name: str, provider_cls: Type[MetadataProvider]) -> None:
_METADATA_PROVIDERS[name.lower()] = provider_cls
def list_metadata_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]:
availability: Dict[str, bool] = {}
for name, cls in _METADATA_PROVIDERS.items():
try:
2025-12-22 02:11:53 -08:00
_ = cls(config)
2025-12-05 03:42:57 -08:00
# Basic availability check: perform lightweight validation if defined
availability[name] = True
except Exception:
availability[name] = False
return availability
2025-12-29 17:05:03 -08:00
def get_metadata_provider(
name: str, config: Optional[Dict[str, Any]] = None
) -> Optional[MetadataProvider]:
2025-12-05 03:42:57 -08:00
cls = _METADATA_PROVIDERS.get(name.lower())
if not cls:
return None
try:
return cls(config)
except Exception as exc:
log(f"Provider init failed for '{name}': {exc}", file=sys.stderr)
return None