2025-12-05 03:42:57 -08:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
from abc import ABC, abstractmethod
|
2025-12-22 02:11:53 -08:00
|
|
|
from typing import Any, Dict, List, Optional, Type, cast
|
2025-12-25 04:49:22 -08:00
|
|
|
import html as html_std
|
|
|
|
|
import re
|
2025-12-05 03:42:57 -08:00
|
|
|
import requests
|
|
|
|
|
import sys
|
2025-12-22 02:11:53 -08:00
|
|
|
import json
|
|
|
|
|
import subprocess
|
2025-12-05 03:42:57 -08:00
|
|
|
|
2025-12-11 19:04:02 -08:00
|
|
|
from SYS.logger import log, debug
|
2025-12-05 03:42:57 -08:00
|
|
|
|
2025-12-07 00:21:30 -08:00
|
|
|
try: # Optional dependency
|
|
|
|
|
import musicbrainzngs # type: ignore
|
|
|
|
|
except ImportError: # pragma: no cover - optional
|
|
|
|
|
musicbrainzngs = None
|
|
|
|
|
|
2025-12-05 03:42:57 -08:00
|
|
|
|
2025-12-22 02:11:53 -08:00
|
|
|
try: # Optional dependency
|
|
|
|
|
import yt_dlp # type: ignore
|
|
|
|
|
except ImportError: # pragma: no cover - optional
|
|
|
|
|
yt_dlp = None
|
|
|
|
|
|
|
|
|
|
|
2025-12-05 03:42:57 -08:00
|
|
|
class MetadataProvider(ABC):
|
|
|
|
|
"""Base class for metadata providers (music, movies, books, etc.)."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None) -> None:
|
|
|
|
|
self.config = config or {}
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def name(self) -> str:
|
|
|
|
|
return self.__class__.__name__.replace("Provider", "").lower()
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
|
|
|
|
|
"""Return a list of candidate metadata records."""
|
|
|
|
|
|
|
|
|
|
def to_tags(self, item: Dict[str, Any]) -> List[str]:
|
|
|
|
|
"""Convert a result item into a list of tags."""
|
|
|
|
|
tags: List[str] = []
|
|
|
|
|
title = item.get("title")
|
|
|
|
|
artist = item.get("artist")
|
|
|
|
|
album = item.get("album")
|
|
|
|
|
year = item.get("year")
|
|
|
|
|
|
|
|
|
|
if title:
|
|
|
|
|
tags.append(f"title:{title}")
|
|
|
|
|
if artist:
|
|
|
|
|
tags.append(f"artist:{artist}")
|
|
|
|
|
if album:
|
|
|
|
|
tags.append(f"album:{album}")
|
|
|
|
|
if year:
|
|
|
|
|
tags.append(f"year:{year}")
|
|
|
|
|
|
|
|
|
|
tags.append(f"source:{self.name}")
|
|
|
|
|
return tags
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ITunesProvider(MetadataProvider):
|
|
|
|
|
"""Metadata provider using the iTunes Search API."""
|
|
|
|
|
|
|
|
|
|
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
|
|
|
|
|
params = {"term": query, "media": "music", "entity": "song", "limit": limit}
|
|
|
|
|
try:
|
|
|
|
|
resp = requests.get("https://itunes.apple.com/search", params=params, timeout=10)
|
|
|
|
|
resp.raise_for_status()
|
|
|
|
|
results = resp.json().get("results", [])
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log(f"iTunes search failed: {exc}", file=sys.stderr)
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
items: List[Dict[str, Any]] = []
|
|
|
|
|
for r in results:
|
|
|
|
|
item = {
|
|
|
|
|
"title": r.get("trackName"),
|
|
|
|
|
"artist": r.get("artistName"),
|
|
|
|
|
"album": r.get("collectionName"),
|
|
|
|
|
"year": str(r.get("releaseDate", ""))[:4],
|
|
|
|
|
"provider": self.name,
|
|
|
|
|
"raw": r,
|
|
|
|
|
}
|
|
|
|
|
items.append(item)
|
|
|
|
|
debug(f"iTunes returned {len(items)} items for '{query}'")
|
|
|
|
|
return items
|
|
|
|
|
|
|
|
|
|
|
2025-12-06 00:10:19 -08:00
|
|
|
class OpenLibraryMetadataProvider(MetadataProvider):
|
|
|
|
|
"""Metadata provider for OpenLibrary book metadata."""
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def name(self) -> str: # type: ignore[override]
|
|
|
|
|
return "openlibrary"
|
|
|
|
|
|
|
|
|
|
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
|
|
|
|
|
query_clean = (query or "").strip()
|
|
|
|
|
if not query_clean:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Prefer ISBN-specific search when the query looks like one
|
|
|
|
|
if query_clean.replace("-", "").isdigit() and len(query_clean.replace("-", "")) in (10, 13):
|
|
|
|
|
q = f"isbn:{query_clean.replace('-', '')}"
|
|
|
|
|
else:
|
|
|
|
|
q = query_clean
|
|
|
|
|
|
|
|
|
|
resp = requests.get(
|
|
|
|
|
"https://openlibrary.org/search.json",
|
|
|
|
|
params={"q": q, "limit": limit},
|
|
|
|
|
timeout=10,
|
|
|
|
|
)
|
|
|
|
|
resp.raise_for_status()
|
|
|
|
|
data = resp.json()
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log(f"OpenLibrary search failed: {exc}", file=sys.stderr)
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
items: List[Dict[str, Any]] = []
|
|
|
|
|
for doc in data.get("docs", [])[:limit]:
|
|
|
|
|
authors = doc.get("author_name") or []
|
|
|
|
|
publisher = ""
|
|
|
|
|
publishers = doc.get("publisher") or []
|
|
|
|
|
if isinstance(publishers, list) and publishers:
|
|
|
|
|
publisher = publishers[0]
|
|
|
|
|
|
|
|
|
|
# Prefer 13-digit ISBN when available, otherwise 10-digit
|
|
|
|
|
isbn_list = doc.get("isbn") or []
|
|
|
|
|
isbn_13 = next((i for i in isbn_list if len(str(i)) == 13), None)
|
|
|
|
|
isbn_10 = next((i for i in isbn_list if len(str(i)) == 10), None)
|
|
|
|
|
|
|
|
|
|
# Derive OLID from key
|
|
|
|
|
olid = ""
|
|
|
|
|
key = doc.get("key", "")
|
|
|
|
|
if isinstance(key, str) and key:
|
|
|
|
|
olid = key.split("/")[-1]
|
|
|
|
|
|
|
|
|
|
items.append({
|
|
|
|
|
"title": doc.get("title") or "",
|
|
|
|
|
"artist": ", ".join(authors) if authors else "",
|
|
|
|
|
"album": publisher,
|
|
|
|
|
"year": str(doc.get("first_publish_year") or ""),
|
|
|
|
|
"provider": self.name,
|
|
|
|
|
"authors": authors,
|
|
|
|
|
"publisher": publisher,
|
|
|
|
|
"identifiers": {
|
|
|
|
|
"isbn_13": isbn_13,
|
|
|
|
|
"isbn_10": isbn_10,
|
|
|
|
|
"openlibrary": olid,
|
|
|
|
|
"oclc": (doc.get("oclc_numbers") or [None])[0],
|
|
|
|
|
"lccn": (doc.get("lccn") or [None])[0],
|
|
|
|
|
},
|
|
|
|
|
"description": None,
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
return items
|
|
|
|
|
|
|
|
|
|
def to_tags(self, item: Dict[str, Any]) -> List[str]:
|
|
|
|
|
tags: List[str] = []
|
|
|
|
|
title = item.get("title")
|
|
|
|
|
authors = item.get("authors") or []
|
|
|
|
|
publisher = item.get("publisher")
|
|
|
|
|
year = item.get("year")
|
|
|
|
|
description = item.get("description") or ""
|
|
|
|
|
|
|
|
|
|
if title:
|
|
|
|
|
tags.append(f"title:{title}")
|
|
|
|
|
for author in authors:
|
|
|
|
|
if author:
|
|
|
|
|
tags.append(f"author:{author}")
|
|
|
|
|
if publisher:
|
|
|
|
|
tags.append(f"publisher:{publisher}")
|
|
|
|
|
if year:
|
|
|
|
|
tags.append(f"year:{year}")
|
|
|
|
|
if description:
|
|
|
|
|
tags.append(f"description:{description[:200]}")
|
|
|
|
|
|
|
|
|
|
identifiers = item.get("identifiers") or {}
|
|
|
|
|
for key, value in identifiers.items():
|
|
|
|
|
if value:
|
|
|
|
|
tags.append(f"{key}:{value}")
|
|
|
|
|
|
|
|
|
|
tags.append(f"source:{self.name}")
|
|
|
|
|
return tags
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GoogleBooksMetadataProvider(MetadataProvider):
|
|
|
|
|
"""Metadata provider for Google Books volumes API."""
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def name(self) -> str: # type: ignore[override]
|
|
|
|
|
return "googlebooks"
|
|
|
|
|
|
|
|
|
|
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
|
|
|
|
|
query_clean = (query or "").strip()
|
|
|
|
|
if not query_clean:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
# Prefer ISBN queries when possible
|
|
|
|
|
if query_clean.replace("-", "").isdigit() and len(query_clean.replace("-", "")) in (10, 13):
|
|
|
|
|
q = f"isbn:{query_clean.replace('-', '')}"
|
|
|
|
|
else:
|
|
|
|
|
q = query_clean
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
resp = requests.get(
|
|
|
|
|
"https://www.googleapis.com/books/v1/volumes",
|
|
|
|
|
params={"q": q, "maxResults": limit},
|
|
|
|
|
timeout=10,
|
|
|
|
|
)
|
|
|
|
|
resp.raise_for_status()
|
|
|
|
|
payload = resp.json()
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log(f"Google Books search failed: {exc}", file=sys.stderr)
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
items: List[Dict[str, Any]] = []
|
|
|
|
|
for volume in payload.get("items", [])[:limit]:
|
|
|
|
|
info = volume.get("volumeInfo") or {}
|
|
|
|
|
authors = info.get("authors") or []
|
|
|
|
|
publisher = info.get("publisher", "")
|
|
|
|
|
published_date = info.get("publishedDate", "")
|
|
|
|
|
year = str(published_date)[:4] if published_date else ""
|
|
|
|
|
|
|
|
|
|
identifiers_raw = info.get("industryIdentifiers") or []
|
|
|
|
|
identifiers: Dict[str, Optional[str]] = {"googlebooks": volume.get("id")}
|
|
|
|
|
for ident in identifiers_raw:
|
|
|
|
|
if not isinstance(ident, dict):
|
|
|
|
|
continue
|
|
|
|
|
ident_type = ident.get("type", "").lower()
|
|
|
|
|
ident_value = ident.get("identifier")
|
|
|
|
|
if not ident_value:
|
|
|
|
|
continue
|
|
|
|
|
if ident_type == "isbn_13":
|
|
|
|
|
identifiers.setdefault("isbn_13", ident_value)
|
|
|
|
|
elif ident_type == "isbn_10":
|
|
|
|
|
identifiers.setdefault("isbn_10", ident_value)
|
|
|
|
|
else:
|
|
|
|
|
identifiers.setdefault(ident_type, ident_value)
|
|
|
|
|
|
|
|
|
|
items.append({
|
|
|
|
|
"title": info.get("title") or "",
|
|
|
|
|
"artist": ", ".join(authors) if authors else "",
|
|
|
|
|
"album": publisher,
|
|
|
|
|
"year": year,
|
|
|
|
|
"provider": self.name,
|
|
|
|
|
"authors": authors,
|
|
|
|
|
"publisher": publisher,
|
|
|
|
|
"identifiers": identifiers,
|
|
|
|
|
"description": info.get("description", ""),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
return items
|
|
|
|
|
|
|
|
|
|
def to_tags(self, item: Dict[str, Any]) -> List[str]:
|
|
|
|
|
tags: List[str] = []
|
|
|
|
|
title = item.get("title")
|
|
|
|
|
authors = item.get("authors") or []
|
|
|
|
|
publisher = item.get("publisher")
|
|
|
|
|
year = item.get("year")
|
|
|
|
|
description = item.get("description") or ""
|
|
|
|
|
|
|
|
|
|
if title:
|
|
|
|
|
tags.append(f"title:{title}")
|
|
|
|
|
for author in authors:
|
|
|
|
|
if author:
|
|
|
|
|
tags.append(f"author:{author}")
|
|
|
|
|
if publisher:
|
|
|
|
|
tags.append(f"publisher:{publisher}")
|
|
|
|
|
if year:
|
|
|
|
|
tags.append(f"year:{year}")
|
|
|
|
|
if description:
|
|
|
|
|
tags.append(f"description:{description[:200]}")
|
|
|
|
|
|
|
|
|
|
identifiers = item.get("identifiers") or {}
|
|
|
|
|
for key, value in identifiers.items():
|
|
|
|
|
if value:
|
|
|
|
|
tags.append(f"{key}:{value}")
|
|
|
|
|
|
|
|
|
|
tags.append(f"source:{self.name}")
|
|
|
|
|
return tags
|
|
|
|
|
|
|
|
|
|
|
2025-12-25 04:49:22 -08:00
|
|
|
class ISBNsearchMetadataProvider(MetadataProvider):
|
|
|
|
|
"""Metadata provider that scrapes isbnsearch.org by ISBN.
|
|
|
|
|
|
|
|
|
|
This is a best-effort HTML scrape. It expects the query to be an ISBN.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def name(self) -> str: # type: ignore[override]
|
|
|
|
|
return "isbnsearch"
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _strip_html_to_text(raw: str) -> str:
|
|
|
|
|
s = html_std.unescape(str(raw or ""))
|
|
|
|
|
s = re.sub(r"(?i)<br\s*/?>", "\n", s)
|
|
|
|
|
s = re.sub(r"<[^>]+>", " ", s)
|
|
|
|
|
s = re.sub(r"\s+", " ", s)
|
|
|
|
|
return s.strip()
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _clean_isbn(query: str) -> str:
|
|
|
|
|
s = str(query or "").strip()
|
|
|
|
|
if not s:
|
|
|
|
|
return ""
|
|
|
|
|
s = s.replace("isbn:", "").replace("ISBN:", "")
|
|
|
|
|
s = re.sub(r"[^0-9Xx]", "", s).upper()
|
|
|
|
|
if len(s) in (10, 13):
|
|
|
|
|
return s
|
|
|
|
|
# Try to locate an ISBN-like token inside the query.
|
|
|
|
|
m = re.search(r"\b(?:97[89])?\d{9}[\dXx]\b", s)
|
|
|
|
|
return str(m.group(0)).upper() if m else ""
|
|
|
|
|
|
|
|
|
|
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
|
|
|
|
|
_ = limit
|
|
|
|
|
isbn = self._clean_isbn(query)
|
|
|
|
|
if not isbn:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
url = f"https://isbnsearch.org/isbn/{isbn}"
|
|
|
|
|
try:
|
|
|
|
|
resp = requests.get(url, timeout=10)
|
|
|
|
|
resp.raise_for_status()
|
|
|
|
|
html = str(resp.text or "")
|
|
|
|
|
if not html:
|
|
|
|
|
return []
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log(f"ISBNsearch scrape failed: {exc}", file=sys.stderr)
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
title = ""
|
|
|
|
|
m_title = re.search(r"(?is)<h1\b[^>]*>(.*?)</h1>", html)
|
|
|
|
|
if m_title:
|
|
|
|
|
title = self._strip_html_to_text(m_title.group(1))
|
|
|
|
|
|
|
|
|
|
raw_fields: Dict[str, str] = {}
|
|
|
|
|
strong_matches = list(re.finditer(r"(?is)<strong\b[^>]*>(.*?)</strong>", html))
|
|
|
|
|
for idx, m in enumerate(strong_matches):
|
|
|
|
|
label_raw = self._strip_html_to_text(m.group(1))
|
|
|
|
|
label = str(label_raw or "").strip()
|
|
|
|
|
if not label:
|
|
|
|
|
continue
|
|
|
|
|
if label.endswith(":"):
|
|
|
|
|
label = label[:-1].strip()
|
|
|
|
|
|
|
|
|
|
chunk_start = m.end()
|
|
|
|
|
# Stop at next <strong> or end of document.
|
|
|
|
|
chunk_end = strong_matches[idx + 1].start() if (idx + 1) < len(strong_matches) else len(html)
|
|
|
|
|
chunk = html[chunk_start:chunk_end]
|
|
|
|
|
# Prefer stopping within the same paragraph when possible.
|
|
|
|
|
m_end = re.search(r"(?is)(</p>|<br\s*/?>)", chunk)
|
|
|
|
|
if m_end:
|
|
|
|
|
chunk = chunk[: m_end.start()]
|
|
|
|
|
|
|
|
|
|
val_text = self._strip_html_to_text(chunk)
|
|
|
|
|
if not val_text:
|
|
|
|
|
continue
|
|
|
|
|
raw_fields[label] = val_text
|
|
|
|
|
|
|
|
|
|
def _get(*labels: str) -> str:
|
|
|
|
|
for lab in labels:
|
|
|
|
|
for k, v in raw_fields.items():
|
|
|
|
|
if str(k).strip().lower() == str(lab).strip().lower():
|
|
|
|
|
return str(v or "").strip()
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
# Map common ISBNsearch labels.
|
|
|
|
|
author_text = _get("Author", "Authors", "Author(s)")
|
|
|
|
|
publisher = _get("Publisher")
|
|
|
|
|
published = _get("Published", "Publication Date", "Publish Date")
|
|
|
|
|
language = _get("Language")
|
|
|
|
|
pages = _get("Pages")
|
|
|
|
|
isbn_13 = _get("ISBN-13", "ISBN13")
|
|
|
|
|
isbn_10 = _get("ISBN-10", "ISBN10")
|
|
|
|
|
|
|
|
|
|
year = ""
|
|
|
|
|
if published:
|
|
|
|
|
m_year = re.search(r"\b(\d{4})\b", published)
|
|
|
|
|
year = str(m_year.group(1)) if m_year else ""
|
|
|
|
|
|
|
|
|
|
authors: List[str] = []
|
|
|
|
|
if author_text:
|
|
|
|
|
# Split on common separators; keep multi-part names intact.
|
|
|
|
|
for part in re.split(r"\s*(?:,|;|\band\b|\&|\|)\s*", author_text, flags=re.IGNORECASE):
|
|
|
|
|
p = str(part or "").strip()
|
|
|
|
|
if p:
|
|
|
|
|
authors.append(p)
|
|
|
|
|
|
|
|
|
|
# Prefer parsed title, but fall back to og:title if needed.
|
|
|
|
|
if not title:
|
|
|
|
|
m_og = re.search(r"(?is)<meta\b[^>]*property=['\"]og:title['\"][^>]*content=['\"](.*?)['\"][^>]*>", html)
|
|
|
|
|
if m_og:
|
|
|
|
|
title = self._strip_html_to_text(m_og.group(1))
|
|
|
|
|
|
|
|
|
|
# Ensure ISBN tokens are normalized.
|
|
|
|
|
isbn_tokens: List[str] = []
|
|
|
|
|
for token in [isbn_13, isbn_10, isbn]:
|
|
|
|
|
t = self._clean_isbn(token)
|
|
|
|
|
if t and t not in isbn_tokens:
|
|
|
|
|
isbn_tokens.append(t)
|
|
|
|
|
|
|
|
|
|
item: Dict[str, Any] = {
|
|
|
|
|
"title": title or "",
|
|
|
|
|
# Keep UI columns compatible with the generic metadata table.
|
|
|
|
|
"artist": ", ".join(authors) if authors else "",
|
|
|
|
|
"album": publisher or "",
|
|
|
|
|
"year": year or "",
|
|
|
|
|
"provider": self.name,
|
|
|
|
|
"authors": authors,
|
|
|
|
|
"publisher": publisher or "",
|
|
|
|
|
"language": language or "",
|
|
|
|
|
"pages": pages or "",
|
|
|
|
|
"identifiers": {
|
|
|
|
|
"isbn_13": next((t for t in isbn_tokens if len(t) == 13), None),
|
|
|
|
|
"isbn_10": next((t for t in isbn_tokens if len(t) == 10), None),
|
|
|
|
|
},
|
|
|
|
|
"raw_fields": raw_fields,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Only return usable items.
|
|
|
|
|
if not item.get("title") and not any(item["identifiers"].values()):
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
return [item]
|
|
|
|
|
|
|
|
|
|
def to_tags(self, item: Dict[str, Any]) -> List[str]:
|
|
|
|
|
tags: List[str] = []
|
|
|
|
|
|
|
|
|
|
title = str(item.get("title") or "").strip()
|
|
|
|
|
if title:
|
|
|
|
|
tags.append(f"title:{title}")
|
|
|
|
|
|
|
|
|
|
authors = item.get("authors") or []
|
|
|
|
|
if isinstance(authors, list):
|
|
|
|
|
for a in authors:
|
|
|
|
|
a = str(a or "").strip()
|
|
|
|
|
if a:
|
|
|
|
|
tags.append(f"author:{a}")
|
|
|
|
|
|
|
|
|
|
publisher = str(item.get("publisher") or "").strip()
|
|
|
|
|
if publisher:
|
|
|
|
|
tags.append(f"publisher:{publisher}")
|
|
|
|
|
|
|
|
|
|
year = str(item.get("year") or "").strip()
|
|
|
|
|
if year:
|
|
|
|
|
tags.append(f"year:{year}")
|
|
|
|
|
|
|
|
|
|
language = str(item.get("language") or "").strip()
|
|
|
|
|
if language:
|
|
|
|
|
tags.append(f"language:{language}")
|
|
|
|
|
|
|
|
|
|
identifiers = item.get("identifiers") or {}
|
|
|
|
|
if isinstance(identifiers, dict):
|
|
|
|
|
for key in ("isbn_13", "isbn_10"):
|
|
|
|
|
val = identifiers.get(key)
|
|
|
|
|
if val:
|
|
|
|
|
tags.append(f"isbn:{val}")
|
|
|
|
|
|
|
|
|
|
tags.append(f"source:{self.name}")
|
|
|
|
|
|
|
|
|
|
# Dedup case-insensitively, preserve order.
|
|
|
|
|
seen: set[str] = set()
|
|
|
|
|
out: List[str] = []
|
|
|
|
|
for t in tags:
|
|
|
|
|
s = str(t or "").strip()
|
|
|
|
|
if not s:
|
|
|
|
|
continue
|
|
|
|
|
k = s.lower()
|
|
|
|
|
if k in seen:
|
|
|
|
|
continue
|
|
|
|
|
seen.add(k)
|
|
|
|
|
out.append(s)
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
2025-12-07 00:21:30 -08:00
|
|
|
class MusicBrainzMetadataProvider(MetadataProvider):
|
|
|
|
|
"""Metadata provider for MusicBrainz recordings."""
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def name(self) -> str: # type: ignore[override]
|
|
|
|
|
return "musicbrainz"
|
|
|
|
|
|
|
|
|
|
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
|
|
|
|
|
if not musicbrainzngs:
|
|
|
|
|
log("musicbrainzngs is not installed; skipping MusicBrainz scrape", file=sys.stderr)
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
q = (query or "").strip()
|
|
|
|
|
if not q:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Ensure user agent is set (required by MusicBrainz)
|
|
|
|
|
musicbrainzngs.set_useragent("Medeia-Macina", "0.1")
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
resp = musicbrainzngs.search_recordings(query=q, limit=limit)
|
|
|
|
|
recordings = resp.get("recording-list") or resp.get("recordings") or []
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log(f"MusicBrainz search failed: {exc}", file=sys.stderr)
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
items: List[Dict[str, Any]] = []
|
|
|
|
|
for rec in recordings[:limit]:
|
|
|
|
|
if not isinstance(rec, dict):
|
|
|
|
|
continue
|
|
|
|
|
title = rec.get("title") or ""
|
|
|
|
|
|
|
|
|
|
artist = ""
|
|
|
|
|
artist_credit = rec.get("artist-credit") or rec.get("artist_credit")
|
|
|
|
|
if isinstance(artist_credit, list) and artist_credit:
|
|
|
|
|
first = artist_credit[0]
|
|
|
|
|
if isinstance(first, dict):
|
|
|
|
|
artist = first.get("name") or first.get("artist", {}).get("name", "")
|
|
|
|
|
elif isinstance(first, str):
|
|
|
|
|
artist = first
|
|
|
|
|
|
|
|
|
|
album = ""
|
|
|
|
|
release_list = rec.get("release-list") or rec.get("releases") or rec.get("release")
|
|
|
|
|
if isinstance(release_list, list) and release_list:
|
|
|
|
|
first_rel = release_list[0]
|
|
|
|
|
if isinstance(first_rel, dict):
|
|
|
|
|
album = first_rel.get("title", "") or ""
|
|
|
|
|
release_date = first_rel.get("date") or ""
|
|
|
|
|
else:
|
|
|
|
|
album = str(first_rel)
|
|
|
|
|
release_date = ""
|
|
|
|
|
else:
|
|
|
|
|
release_date = rec.get("first-release-date") or ""
|
|
|
|
|
|
|
|
|
|
year = str(release_date)[:4] if release_date else ""
|
|
|
|
|
mbid = rec.get("id") or ""
|
|
|
|
|
|
|
|
|
|
items.append({
|
|
|
|
|
"title": title,
|
|
|
|
|
"artist": artist,
|
|
|
|
|
"album": album,
|
|
|
|
|
"year": year,
|
|
|
|
|
"provider": self.name,
|
|
|
|
|
"mbid": mbid,
|
|
|
|
|
"raw": rec,
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
return items
|
|
|
|
|
|
|
|
|
|
def to_tags(self, item: Dict[str, Any]) -> List[str]:
|
|
|
|
|
tags = super().to_tags(item)
|
|
|
|
|
mbid = item.get("mbid")
|
|
|
|
|
if mbid:
|
|
|
|
|
tags.append(f"musicbrainz:{mbid}")
|
|
|
|
|
return tags
|
|
|
|
|
|
|
|
|
|
|
2025-12-22 02:11:53 -08:00
|
|
|
class YtdlpMetadataProvider(MetadataProvider):
|
|
|
|
|
"""Metadata provider that extracts tags from a supported URL using yt-dlp.
|
|
|
|
|
|
|
|
|
|
This does NOT download media; it only probes metadata.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def name(self) -> str: # type: ignore[override]
|
|
|
|
|
return "ytdlp"
|
|
|
|
|
|
|
|
|
|
def _extract_info(self, url: str) -> Optional[Dict[str, Any]]:
|
|
|
|
|
url = (url or "").strip()
|
|
|
|
|
if not url:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Prefer Python module when available.
|
|
|
|
|
if yt_dlp is not None:
|
|
|
|
|
try:
|
|
|
|
|
opts: Any = {
|
|
|
|
|
"quiet": True,
|
|
|
|
|
"no_warnings": True,
|
|
|
|
|
"skip_download": True,
|
|
|
|
|
"noprogress": True,
|
|
|
|
|
"socket_timeout": 15,
|
|
|
|
|
"retries": 1,
|
|
|
|
|
"playlist_items": "1-10",
|
|
|
|
|
}
|
|
|
|
|
with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore[attr-defined]
|
|
|
|
|
info = ydl.extract_info(url, download=False)
|
|
|
|
|
return cast(Dict[str, Any], info) if isinstance(info, dict) else None
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# Fallback to CLI.
|
|
|
|
|
try:
|
|
|
|
|
cmd = [
|
|
|
|
|
"yt-dlp",
|
|
|
|
|
"-J",
|
|
|
|
|
"--no-warnings",
|
|
|
|
|
"--skip-download",
|
|
|
|
|
"--playlist-items",
|
|
|
|
|
"1-10",
|
|
|
|
|
url,
|
|
|
|
|
]
|
|
|
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
|
if proc.returncode != 0:
|
|
|
|
|
return None
|
|
|
|
|
payload = (proc.stdout or "").strip()
|
|
|
|
|
if not payload:
|
|
|
|
|
return None
|
|
|
|
|
data = json.loads(payload)
|
|
|
|
|
return data if isinstance(data, dict) else None
|
|
|
|
|
except Exception:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
|
|
|
|
|
url = (query or "").strip()
|
|
|
|
|
if not url.startswith(("http://", "https://")):
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
info = self._extract_info(url)
|
|
|
|
|
if not isinstance(info, dict):
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
upload_date = str(info.get("upload_date") or "")
|
|
|
|
|
release_date = str(info.get("release_date") or "")
|
|
|
|
|
year = (release_date or upload_date)[:4] if (release_date or upload_date) else ""
|
|
|
|
|
|
|
|
|
|
# Provide basic columns for the standard metadata selection table.
|
|
|
|
|
# NOTE: This is best-effort; many extractors don't provide artist/album.
|
|
|
|
|
artist = (
|
|
|
|
|
info.get("artist")
|
|
|
|
|
or info.get("uploader")
|
|
|
|
|
or info.get("channel")
|
|
|
|
|
or ""
|
|
|
|
|
)
|
|
|
|
|
album = info.get("album") or info.get("playlist_title") or ""
|
|
|
|
|
title = info.get("title") or ""
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
{
|
|
|
|
|
"title": title,
|
|
|
|
|
"artist": str(artist or ""),
|
|
|
|
|
"album": str(album or ""),
|
|
|
|
|
"year": str(year or ""),
|
|
|
|
|
"provider": self.name,
|
|
|
|
|
"url": url,
|
|
|
|
|
"raw": info,
|
|
|
|
|
}
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def to_tags(self, item: Dict[str, Any]) -> List[str]:
|
|
|
|
|
raw = item.get("raw")
|
|
|
|
|
if not isinstance(raw, dict):
|
|
|
|
|
return super().to_tags(item)
|
|
|
|
|
|
|
|
|
|
tags: List[str] = []
|
|
|
|
|
try:
|
|
|
|
|
from metadata import extract_ytdlp_tags
|
|
|
|
|
except Exception:
|
|
|
|
|
extract_ytdlp_tags = None # type: ignore[assignment]
|
|
|
|
|
|
|
|
|
|
if extract_ytdlp_tags:
|
|
|
|
|
try:
|
|
|
|
|
tags.extend(extract_ytdlp_tags(raw))
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# Subtitle availability tags
|
|
|
|
|
def _langs(value: Any) -> List[str]:
|
|
|
|
|
if not isinstance(value, dict):
|
|
|
|
|
return []
|
|
|
|
|
out: List[str] = []
|
|
|
|
|
for k in value.keys():
|
|
|
|
|
if isinstance(k, str) and k.strip():
|
|
|
|
|
out.append(k.strip().lower())
|
|
|
|
|
return sorted(set(out))
|
|
|
|
|
|
|
|
|
|
# If this is a playlist container, subtitle/captions are usually per-entry.
|
|
|
|
|
info_for_subs: Dict[str, Any] = raw
|
|
|
|
|
entries = raw.get("entries")
|
|
|
|
|
if isinstance(entries, list) and entries:
|
|
|
|
|
first = entries[0]
|
|
|
|
|
if isinstance(first, dict):
|
|
|
|
|
info_for_subs = first
|
|
|
|
|
|
|
|
|
|
for lang in _langs(info_for_subs.get("subtitles")):
|
|
|
|
|
tags.append(f"subs:{lang}")
|
|
|
|
|
for lang in _langs(info_for_subs.get("automatic_captions")):
|
|
|
|
|
tags.append(f"subs_auto:{lang}")
|
|
|
|
|
|
|
|
|
|
# Always include source tag for parity with other providers.
|
|
|
|
|
tags.append(f"source:{self.name}")
|
|
|
|
|
|
|
|
|
|
# Dedup case-insensitively, preserve order.
|
|
|
|
|
seen = set()
|
|
|
|
|
out: List[str] = []
|
|
|
|
|
for t in tags:
|
|
|
|
|
if not isinstance(t, str):
|
|
|
|
|
continue
|
|
|
|
|
s = t.strip()
|
|
|
|
|
if not s:
|
|
|
|
|
continue
|
|
|
|
|
k = s.lower()
|
|
|
|
|
if k in seen:
|
|
|
|
|
continue
|
|
|
|
|
seen.add(k)
|
|
|
|
|
out.append(s)
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
2025-12-05 03:42:57 -08:00
|
|
|
# Registry ---------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
_METADATA_PROVIDERS: Dict[str, Type[MetadataProvider]] = {
|
|
|
|
|
"itunes": ITunesProvider,
|
2025-12-06 00:10:19 -08:00
|
|
|
"openlibrary": OpenLibraryMetadataProvider,
|
|
|
|
|
"googlebooks": GoogleBooksMetadataProvider,
|
|
|
|
|
"google": GoogleBooksMetadataProvider,
|
2025-12-25 04:49:22 -08:00
|
|
|
"isbnsearch": ISBNsearchMetadataProvider,
|
2025-12-07 00:21:30 -08:00
|
|
|
"musicbrainz": MusicBrainzMetadataProvider,
|
2025-12-22 02:11:53 -08:00
|
|
|
"ytdlp": YtdlpMetadataProvider,
|
2025-12-05 03:42:57 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def register_provider(name: str, provider_cls: Type[MetadataProvider]) -> None:
|
|
|
|
|
_METADATA_PROVIDERS[name.lower()] = provider_cls
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_metadata_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]:
|
|
|
|
|
availability: Dict[str, bool] = {}
|
|
|
|
|
for name, cls in _METADATA_PROVIDERS.items():
|
|
|
|
|
try:
|
2025-12-22 02:11:53 -08:00
|
|
|
_ = cls(config)
|
2025-12-05 03:42:57 -08:00
|
|
|
# Basic availability check: perform lightweight validation if defined
|
|
|
|
|
availability[name] = True
|
|
|
|
|
except Exception:
|
|
|
|
|
availability[name] = False
|
|
|
|
|
return availability
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_metadata_provider(name: str, config: Optional[Dict[str, Any]] = None) -> Optional[MetadataProvider]:
|
|
|
|
|
cls = _METADATA_PROVIDERS.get(name.lower())
|
|
|
|
|
if not cls:
|
|
|
|
|
return None
|
|
|
|
|
try:
|
|
|
|
|
return cls(config)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log(f"Provider init failed for '{name}': {exc}", file=sys.stderr)
|
|
|
|
|
return None
|