huge refactor of the entire codebase, with the goal of improving maintainability, readability, and extensibility. This commit includes changes to almost every file in the project, including:

This commit is contained in:
2026-04-19 00:41:09 -07:00
parent d9e736172a
commit bafd37fdfb
50 changed files with 3258 additions and 4177 deletions
+355 -2
View File
@@ -40,6 +40,42 @@ except ImportError: # pragma: no cover - optional
yt_dlp = None
def _dedup_text_values(values: List[str]) -> List[str]:
out: List[str] = []
seen: set[str] = set()
for value in values or []:
if value is None:
continue
text = str(value).strip()
if not text:
continue
key = text.lower()
if key in seen:
continue
seen.add(key)
out.append(text)
return out
def _filter_default_scraped_tags(tags: List[str]) -> List[str]:
blocked = {"title", "artist", "source"}
out: List[str] = []
seen: set[str] = set()
for tag in tags or []:
text = str(tag or "").strip()
if not text:
continue
namespace = text.split(":", 1)[0].strip().lower() if ":" in text else ""
if namespace in blocked:
continue
key = text.lower()
if key in seen:
continue
seen.add(key)
out.append(text)
return out
class MetadataProvider(ABC):
"""Base class for metadata providers (music, movies, books, etc.)."""
@@ -122,6 +158,64 @@ class MetadataProvider(ABC):
return False
def default_subject_scrape_priority(self) -> int:
"""Priority used when `get-tag -scrape` is invoked without an explicit provider."""
return 0
def url_scrape_priority(self, url: str) -> int:
"""Priority for handling a raw URL passed to `get-tag -scrape <url>`."""
_ = url
return 0
def resolve_subject_query(
self,
result: Any,
get_field: Any,
*,
backend: Any = None,
file_hash: Optional[str] = None,
) -> Optional[str]:
"""Resolve a provider-specific query from the current subject/result."""
_ = backend
_ = file_hash
return self.extract_url_query(result, get_field)
def prefers_store_tag_overwrite(self) -> bool:
"""Whether direct subject scrapes should replace the store tag set."""
return False
def filter_tags_for_selection(self, tags: List[str]) -> List[str]:
"""Filter scraped tags before presenting a selectable metadata row."""
return _filter_default_scraped_tags(tags)
def filter_tags_for_store_apply(self, tags: List[str]) -> List[str]:
"""Filter scraped tags before applying them to an existing store-backed item."""
return self.filter_tags_for_selection(tags)
def scrape_url_payload(self, url: str) -> Optional[Dict[str, Any]]:
"""Return a URL scrape payload for `get-tag -scrape <url>` when supported."""
items = self.search(url, limit=1)
if not items:
return None
item = items[0] if isinstance(items[0], dict) else {}
try:
tags = [str(t) for t in self.to_tags(item) if t is not None]
except Exception:
tags = []
return {
"title": item.get("title"),
"tag": _dedup_text_values(tags),
"formats": [],
"playlist_items": [],
}
class ITunesProvider(MetadataProvider):
"""Metadata provider using the iTunes Search API."""
@@ -1015,6 +1109,226 @@ class YtdlpMetadataProvider(MetadataProvider):
def emits_direct_tags(self) -> bool:
return True
def default_subject_scrape_priority(self) -> int:
return 100
def url_scrape_priority(self, url: str) -> int:
text = str(url or "").strip()
if not text.startswith(("http://", "https://")):
return 0
return 100
def prefers_store_tag_overwrite(self) -> bool:
return True
def filter_tags_for_store_apply(self, tags: List[str]) -> List[str]:
return _dedup_text_values(tags)
def _resolve_candidate_urls_for_subject(
self,
result: Any,
get_field: Any,
*,
backend: Any = None,
file_hash: Optional[str] = None,
) -> List[str]:
try:
from SYS.metadata import normalize_urls
except Exception:
normalize_urls = None # type: ignore[assignment]
urls: List[str] = []
if backend is not None and file_hash:
try:
backend_urls = backend.get_url(file_hash, config=self.config)
if backend_urls:
if normalize_urls:
urls.extend(normalize_urls(backend_urls))
else:
urls.extend(
[str(u).strip() for u in backend_urls if isinstance(u, str) and str(u).strip()]
)
except Exception:
pass
try:
meta = backend.get_metadata(file_hash, config=self.config)
if isinstance(meta, dict) and meta.get("url"):
raw = meta.get("url")
if normalize_urls:
urls.extend(normalize_urls(raw))
elif isinstance(raw, list):
urls.extend([str(u).strip() for u in raw if isinstance(u, str) and str(u).strip()])
elif isinstance(raw, str) and raw.strip():
urls.append(raw.strip())
except Exception:
pass
for key in ("url", "webpage_url", "source_url", "target"):
val = get_field(result, key, None)
if not val:
continue
if normalize_urls:
urls.extend(normalize_urls(val))
continue
if isinstance(val, str) and val.strip():
urls.append(val.strip())
elif isinstance(val, list):
urls.extend([str(u).strip() for u in val if isinstance(u, str) and str(u).strip()])
meta_field = get_field(result, "metadata", None)
if isinstance(meta_field, dict) and meta_field.get("url"):
raw = meta_field.get("url")
if normalize_urls:
urls.extend(normalize_urls(raw))
elif isinstance(raw, list):
urls.extend([str(u).strip() for u in raw if isinstance(u, str) and str(u).strip()])
elif isinstance(raw, str) and raw.strip():
urls.append(raw.strip())
return _dedup_text_values(urls)
def _pick_supported_subject_url(self, urls: List[str]) -> Optional[str]:
if not urls:
return None
def _is_hydrus_file_url(u: str) -> bool:
text = str(u or "").strip().lower()
return bool(text and "/get_files/file" in text and "hash=" in text)
candidates = []
for url in urls:
text = str(url or "").strip()
if not text.startswith(("http://", "https://")):
continue
if _is_hydrus_file_url(text):
continue
candidates.append(text)
if not candidates:
return None
try:
from tool.ytdlp import is_url_supported_by_ytdlp
for text in candidates:
try:
if is_url_supported_by_ytdlp(text):
return text
except Exception:
continue
except Exception:
pass
return candidates[0] if candidates else None
def resolve_subject_query(
self,
result: Any,
get_field: Any,
*,
backend: Any = None,
file_hash: Optional[str] = None,
) -> Optional[str]:
candidate_urls = self._resolve_candidate_urls_for_subject(
result,
get_field,
backend=backend,
file_hash=file_hash,
)
return self._pick_supported_subject_url(candidate_urls)
@staticmethod
def _extract_url_formats(formats: Any) -> List[tuple[str, str]]:
if not isinstance(formats, list):
return []
video_formats: Dict[str, Dict[str, Any]] = {}
audio_formats: Dict[str, Dict[str, Any]] = {}
for fmt in formats:
if not isinstance(fmt, dict):
continue
vcodec = fmt.get("vcodec", "none")
acodec = fmt.get("acodec", "none")
height = fmt.get("height")
ext = fmt.get("ext", "unknown")
format_id = fmt.get("format_id", "")
tbr = fmt.get("tbr", 0)
abr = fmt.get("abr", 0)
if vcodec and vcodec != "none" and height:
if int(height) < 480:
continue
res_key = f"{int(height)}p"
if res_key not in video_formats or tbr > video_formats[res_key].get("tbr", 0):
video_formats[res_key] = {
"label": f"{int(height)}p ({ext})",
"format_id": str(format_id),
"tbr": tbr,
}
elif acodec and acodec != "none" and (not vcodec or vcodec == "none"):
audio_key = f"audio_{abr}"
if audio_key not in audio_formats or abr > audio_formats[audio_key].get("abr", 0):
audio_formats[audio_key] = {
"label": f"audio ({ext})",
"format_id": str(format_id),
"abr": abr,
}
result: List[tuple[str, str]] = []
for res in sorted(video_formats.keys(), key=lambda value: int(value.replace("p", "")), reverse=True):
fmt = video_formats[res]
result.append((str(fmt.get("label") or res), str(fmt.get("format_id") or "")))
if audio_formats:
best_audio_key = max(audio_formats.keys(), key=lambda key: float(audio_formats[key].get("abr", 0) or 0))
fmt = audio_formats[best_audio_key]
result.append((str(fmt.get("label") or "audio"), str(fmt.get("format_id") or "")))
return [entry for entry in result if entry[1]]
@staticmethod
def _build_playlist_items(raw: Dict[str, Any]) -> List[Dict[str, Any]]:
entries = raw.get("entries")
if not isinstance(entries, list):
return []
playlist_items: List[Dict[str, Any]] = []
for idx, entry in enumerate(entries, 1):
if not isinstance(entry, dict):
continue
playlist_items.append(
{
"index": idx,
"id": entry.get("id", f"track_{idx}"),
"title": entry.get("title", entry.get("id", f"Track {idx}")),
"duration": entry.get("duration", 0),
"url": entry.get("url") or entry.get("webpage_url", ""),
}
)
return playlist_items
def scrape_url_payload(self, url: str) -> Optional[Dict[str, Any]]:
info = self._extract_info(url)
if not isinstance(info, dict):
return None
item = {
"title": info.get("title") or "",
"artist": str(info.get("artist") or info.get("uploader") or info.get("channel") or ""),
"album": str(info.get("album") or info.get("playlist_title") or ""),
"year": str((str(info.get("release_date") or "") or str(info.get("upload_date") or ""))[:4]),
"provider": self.name,
"url": str(url or "").strip(),
"raw": info,
}
tags = _dedup_text_values([str(tag) for tag in self.to_tags(item) if tag is not None])
return {
"title": item.get("title") or None,
"tag": tags,
"formats": self._extract_url_formats(info.get("formats", [])),
"playlist_items": self._build_playlist_items(info),
}
def _coerce_archive_field_list(value: Any) -> List[str]:
"""Coerce an Archive.org metadata field to a list of strings."""
@@ -1420,7 +1734,7 @@ try:
from typing import Iterable
from SYS.result_table_api import ColumnSpec, ResultModel, metadata_column, title_column
from SYS.result_table_adapters import register_provider
from SYS.result_table_adapters import register_plugin
def _ensure_search_result(item: Any) -> SearchResult:
if isinstance(item, SearchResult):
@@ -1526,7 +1840,7 @@ try:
return ["-url", url]
return ["-title", row.title or ""]
register_provider(
register_plugin(
"openlibrary",
_adapter,
columns=_columns_factory,
@@ -1671,3 +1985,42 @@ def get_metadata_provider(name: str,
except Exception as exc:
log(f"Provider init failed for '{name}': {exc}", file=sys.stderr)
return None
def get_default_subject_scrape_provider(
config: Optional[Dict[str, Any]] = None,
) -> Optional[MetadataProvider]:
best_provider: Optional[MetadataProvider] = None
best_priority = 0
for cls in _METADATA_PROVIDERS.values():
try:
provider = cls(config)
priority = int(provider.default_subject_scrape_priority())
except Exception:
continue
if priority > best_priority:
best_priority = priority
best_provider = provider
return best_provider
def get_metadata_provider_for_url(
url: str,
config: Optional[Dict[str, Any]] = None,
) -> Optional[MetadataProvider]:
text = str(url or "").strip()
if not text:
return None
best_provider: Optional[MetadataProvider] = None
best_priority = 0
for cls in _METADATA_PROVIDERS.values():
try:
provider = cls(config)
priority = int(provider.url_scrape_priority(text))
except Exception:
continue
if priority > best_priority:
best_priority = priority
best_provider = provider
return best_provider