Files
Medios-Macina/metadata.py

3200 lines
118 KiB
Python
Raw Permalink Normal View History

2025-11-25 20:09:33 -08:00
import json
import re
import subprocess
import sys
import shutil
import sqlite3
import requests
2025-11-27 10:59:01 -08:00
from helper.logger import log, debug
2025-11-25 20:09:33 -08:00
from urllib.parse import urlsplit, urlunsplit, unquote
from collections import deque
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple
from models import PipeObject, FileRelationshipTracker, _get_file_hash
try:
import musicbrainzngs # type: ignore
except ImportError: # pragma: no cover
musicbrainzngs = None
from imdbinfo.services import get_movie # type: ignore
try:
import yt_dlp # type: ignore
except ImportError: # pragma: no cover
yt_dlp = None
try:
from config import load_config, resolve_output_dir # type: ignore
except ImportError: # pragma: no cover
load_config = None # type: ignore[assignment]
resolve_output_dir = None # type: ignore[assignment]
try:
from helpers.hydrus import HydrusClient, HydrusRequestError, HydrusRequestSpec # type: ignore
except ImportError: # pragma: no cover
HydrusClient = None # type: ignore[assignment]
HydrusRequestError = RuntimeError # type: ignore[assignment]
HydrusRequestSpec = None # type: ignore[assignment]
if musicbrainzngs: # pragma: no branch
musicbrainzngs.set_useragent("DownlowScript", "0.1", "admin@example.com")
MusicBrainzRequestError = getattr(musicbrainzngs, "MusicBrainzRequestError", Exception)
else: # pragma: no cover
MusicBrainzRequestError = Exception
# Global relationship tracker for the current session
_CURRENT_RELATIONSHIP_TRACKER = FileRelationshipTracker()
def _generate_hydrus_url_variants(url: str) -> List[str]:
seen: Set[str] = set()
variants: List[str] = []
def push(candidate: Optional[str]) -> None:
if not candidate:
return
text = candidate.strip()
if not text or text in seen:
return
seen.add(text)
variants.append(text)
push(url)
try:
parsed = urlsplit(url)
except Exception:
return variants
if parsed.scheme in {"http", "https"}:
alternate_scheme = "https" if parsed.scheme == "http" else "http"
push(urlunsplit((alternate_scheme, parsed.netloc, parsed.path, parsed.query, parsed.fragment)))
normalised_netloc = parsed.netloc.lower()
if normalised_netloc and normalised_netloc != parsed.netloc:
push(urlunsplit((parsed.scheme, normalised_netloc, parsed.path, parsed.query, parsed.fragment)))
if parsed.path:
trimmed_path = parsed.path.rstrip('/')
if trimmed_path != parsed.path:
push(urlunsplit((parsed.scheme, parsed.netloc, trimmed_path, parsed.query, parsed.fragment)))
else:
push(urlunsplit((parsed.scheme, parsed.netloc, parsed.path + '/', parsed.query, parsed.fragment)))
unquoted_path = unquote(parsed.path)
if unquoted_path != parsed.path:
push(urlunsplit((parsed.scheme, parsed.netloc, unquoted_path, parsed.query, parsed.fragment)))
if parsed.query or parsed.fragment:
push(urlunsplit((parsed.scheme, parsed.netloc, parsed.path, '', '')))
if parsed.path:
unquoted_path = unquote(parsed.path)
push(urlunsplit((parsed.scheme, parsed.netloc, unquoted_path, '', '')))
return variants
def value_normalize(value: str) -> str:
"""Normalize whitespace: collapse internal spaces, strip, remove newlines."""
value = value.replace("\n", " ").replace("\r", " ")
value = re.sub(r"\s+", " ", value).strip()
return value
def import_pending_sidecars(db_root: Path, db: Any) -> None:
"""Import any .tags or .metadata sidecars that exist in the filesystem.
Scans for sidecar files (.tags, .metadata, .notes) and imports their contents
into the database as tags and metadata for the associated files.
Args:
db_root: Root directory to search for sidecar files
db: LocalLibraryDB instance to import metadata into
"""
try:
sidecar_patterns = ['**/*.tags', '**/*.metadata', '**/*.notes']
for pattern in sidecar_patterns:
for sidecar_path in db_root.glob(pattern):
if '.downlow' in sidecar_path.parts:
continue
if sidecar_path.suffix == '.tags':
orig_path = sidecar_path.parent / sidecar_path.name[:-5]
elif sidecar_path.suffix == '.metadata':
orig_path = sidecar_path.parent / sidecar_path.name[:-9]
elif sidecar_path.suffix == '.notes':
orig_path = sidecar_path.parent / sidecar_path.name[:-6]
else:
continue
if not orig_path.exists():
continue
try:
cursor = db.connection.cursor() if db.connection else None
if cursor:
cursor.execute('SELECT id FROM files WHERE file_path = ?', (str(orig_path),))
result = cursor.fetchone()
file_id = result[0] if result else None
except Exception:
file_id = None
if not file_id:
try:
cursor = db.connection.cursor() if db.connection else None
if cursor:
cursor.execute(
'INSERT INTO files (file_path, indexed_at, updated_at) VALUES (?, datetime("now"), datetime("now"))',
(str(orig_path),)
)
db.connection.commit()
file_id = cursor.lastrowid
except Exception:
continue
if sidecar_path.suffix == '.tags' and file_id:
try:
with open(sidecar_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if content:
if '\n' in content:
tags = [tag.strip() for tag in content.split('\n') if tag.strip()]
else:
tags = [tag.strip() for tag in content.split(',') if tag.strip()]
cursor = db.connection.cursor() if db.connection else None
if cursor:
for tag in tags:
cursor.execute(
'INSERT OR IGNORE INTO tags (file_id, tag, tag_type) VALUES (?, ?, ?)',
(file_id, tag, 'sidecar_import')
)
db.connection.commit()
sidecar_path.unlink()
except Exception:
pass
elif sidecar_path.suffix == '.metadata' and file_id:
try:
with open(sidecar_path, 'r', encoding='utf-8') as f:
metadata_dict = json.load(f)
cursor = db.connection.cursor() if db.connection else None
if cursor and metadata_dict:
cursor.execute(
'INSERT OR REPLACE INTO metadata (file_id, hash, size, ext, duration, media_type, time_imported, time_modified) VALUES (?, ?, ?, ?, ?, ?, datetime("now"), datetime("now"))',
(
file_id,
metadata_dict.get('hash'),
metadata_dict.get('size'),
metadata_dict.get('ext'),
metadata_dict.get('duration'),
metadata_dict.get('media_type'),
)
)
db.connection.commit()
sidecar_path.unlink()
except Exception:
pass
except Exception:
pass
def _extract_from_sequence(values: Sequence) -> Iterable[str]:
"""Extract string values from a sequence of mixed types (dicts, strings, etc.)."""
seen = set()
for item in values:
candidate = None
if isinstance(item, dict):
candidate = item.get("name") or item.get("title") or item.get("value") or item.get("text") or item.get("id") or item.get("imdb_id")
else:
candidate = str(item)
if candidate:
normalized = value_normalize(str(candidate))
if normalized and normalized not in seen:
seen.add(normalized)
yield normalized
def _add_tag(tags: List[str], namespace: str, value: Optional[str]) -> None:
"""Add a single namespaced tag (e.g., 'artist:Beatles')."""
if not value:
return
value = value_normalize(str(value))
if not value:
return
tags.append(f"{namespace}:{value}")
def _extend_tags(tags: List[str], namespace: str, values) -> None:
"""Extend tags from a single value or sequence, with optional namespace."""
if not values:
return
if isinstance(values, set):
values = list(values)
if isinstance(values, (list, tuple)):
for candidate in _extract_from_sequence(values):
_add_tag(tags, namespace, candidate)
else:
_add_tag(tags, namespace, values)
def imdb_tag(imdb_id: str) -> Dict[str, object]:
movie = get_movie(imdb_id)
if movie is None:
raise ValueError(f"IMDb title not found: {imdb_id}")
if hasattr(movie, "model_dump"):
info = movie.model_dump()
elif hasattr(movie, "dict"):
info = movie.dict()
else:
info = {}
tags: List[str] = []
canonical_id = getattr(movie, "imdb_id", None) or info.get("imdb_id") or imdb_id
if canonical_id:
canonical_id = str(canonical_id).strip().lower()
if not canonical_id.startswith("tt"):
canonical_id = f"tt{canonical_id}"
else:
canonical_id = imdb_id.lower()
if not canonical_id.startswith("tt"):
canonical_id = f"tt{canonical_id}"
_add_tag(tags, "imdb", canonical_id)
_add_tag(tags, "title", info.get("title") or getattr(movie, "title", None))
_add_tag(tags, "year", info.get("year") or info.get("start_year") or getattr(movie, "year", None))
_add_tag(tags, "rating", info.get("rating"))
runtime_value = None
if isinstance(info.get("runtime"), (str, int)):
runtime_value = info["runtime"]
elif isinstance(info.get("runtimes"), (list, tuple)) and info["runtimes"]:
runtime_value = info["runtimes"][0]
elif info.get("duration"):
runtime_value = info["duration"]
_add_tag(tags, "runtime", runtime_value)
kind = None
if hasattr(movie, "is_series") and movie.is_series():
kind = "series"
elif hasattr(movie, "is_episode") and movie.is_episode():
kind = "episode"
else:
kind = info.get("kind") or "movie"
_add_tag(tags, "kind", kind)
_extend_tags(tags, "genre", info.get("genres") or info.get("genre"))
_extend_tags(tags, "language", info.get("languages"))
_extend_tags(tags, "country", info.get("countries"))
creators = info.get("directors") or info.get("director") or info.get("producers") or info.get("writers")
if creators:
_extend_tags(tags, "creator", creators)
info_episode = getattr(movie, "info_episode", None)
series_title = None
season = info.get("season") or info.get("series_season")
episode = info.get("episode") or info.get("series_episode")
if info_episode:
if hasattr(info_episode, "model_dump"):
episode_meta = info_episode.model_dump()
elif hasattr(info_episode, "dict"):
episode_meta = info_episode.dict()
else:
episode_meta = getattr(info_episode, "__dict__", {}) or {}
season = season or episode_meta.get("season") or episode_meta.get("season_n")
episode = episode or episode_meta.get("episode") or episode_meta.get("episode_n")
series_title = episode_meta.get("series_title")
if not series_title:
series_title = getattr(getattr(movie, "series_info", None), "title", None)
if kind == "episode" and not season:
season = getattr(getattr(movie, "series_info", None), "season", None)
if season:
_add_tag(tags, "season", season)
if episode:
_add_tag(tags, "episode", episode)
series_title = series_title or info.get("series_title") or info.get("series") or getattr(getattr(movie, "series_info", None), "title", None)
if series_title:
_add_tag(tags, "series", series_title)
summary = info.get("plot outline") or info.get("plot_outline") or info.get("plot")
if isinstance(summary, (list, tuple)):
summary = summary[0] if summary else None
if not summary and hasattr(movie, "plot_outline"):
summary = getattr(movie, "plot_outline")
if not summary:
summaries = info.get("summaries")
if isinstance(summaries, (list, tuple)) and summaries:
summary = summaries[0]
if summary:
_add_tag(tags, "summary", summary)
cast_sources = info.get("cast") or info.get("actors") or info.get("cast_members") or info.get("stars")
cast_names: List[str] = []
if cast_sources:
for name in _extract_from_sequence(cast_sources):
if name:
cast_names.append(name)
if len(cast_names) >= 10:
break
if cast_names:
_extend_tags(tags, "cast", cast_names)
return PipeObject("imdb", canonical_id, tags=tags).to_dict()
def fetch_musicbrainz_tags(mbid: str, entity: str) -> Dict[str, object]:
if not musicbrainzngs:
raise RuntimeError("musicbrainzngs package is not available")
entity = entity.lower()
if entity not in {"release", "recording", "artist"}:
raise ValueError("Unsupported MusicBrainz entity: %s" % entity)
def _fetch_with_fallback(getter, key: str, includes: List[str]):
try:
return getter(mbid, includes=includes)[key]
except MusicBrainzRequestError as exc:
if "Bad includes" in str(exc) and "genres" in includes:
fallback = [inc for inc in includes if inc != "genres"]
return getter(mbid, includes=fallback)[key]
raise
include = ["tags", "genres"]
match entity:
case "release":
include.extend(["artist-credits", "release-groups"])
data = _fetch_with_fallback(musicbrainzngs.get_release_by_id, "release", include)
case "recording":
include.extend(["artists", "releases"])
data = _fetch_with_fallback(musicbrainzngs.get_recording_by_id, "recording", include)
case _:
include.extend(["release-groups", "aliases"])
data = _fetch_with_fallback(musicbrainzngs.get_artist_by_id, "artist", include)
tags: List[str] = []
_add_tag(tags, "musicbrainz", mbid)
_add_tag(tags, "entity", entity)
_add_tag(tags, "title", data.get("title"))
if entity != "artist":
date = data.get("date") or data.get("first-release-date")
if date:
_add_tag(tags, "date", date)
_add_tag(tags, "year", date[:4])
if data.get("country"):
_add_tag(tags, "country", data["country"])
if data.get("status"):
_add_tag(tags, "status", data["status"])
artist_credit = data.get("artist-credit") or data.get("artists")
if artist_credit:
names = []
for item in artist_credit:
if isinstance(item, dict):
name = item.get("name") or item.get("artist", {}).get("name")
if name:
names.append(name)
_extend_tags(tags, "artist", names)
tag_list = data.get("tag-list") or data.get("tags") or []
for tag in tag_list:
if isinstance(tag, dict) and tag.get("name"):
_add_tag(tags, "tag", tag["name"])
genre_list = data.get("genre-list") or data.get("genres") or []
for genre in genre_list:
if isinstance(genre, dict) and genre.get("name"):
_add_tag(tags, "genre", genre["name"])
return PipeObject("musicbrainz", mbid, tags=tags, extra={"entity": entity}).to_dict()
def fetch_openlibrary_tags(ol_id: str) -> Dict[str, object]:
"""Fetch metadata tags from OpenLibrary.
Args:
ol_id: OpenLibrary ID (e.g., 'OL123456M' for a book)
Returns:
Dictionary with 'tags' key containing list of extracted tags
"""
import urllib.request
# Normalize OL ID
ol_id = ol_id.strip().upper()
if not ol_id.startswith('OL'):
ol_id = f'OL{ol_id}'
# Fetch from OpenLibrary API
url = f"https://openlibrary.org/books/{ol_id}.json"
tags: List[str] = []
try:
with urllib.request.urlopen(url, timeout=10) as response:
data = json.loads(response.read().decode('utf-8'))
except Exception as e:
raise ValueError(f"Failed to fetch OpenLibrary data for {ol_id}: {e}")
# Add OpenLibrary ID tag
_add_tag(tags, "openlibrary", ol_id)
# Extract title
_add_tag(tags, "title", data.get("title"))
# Extract subtitle if present
if data.get("subtitle"):
_add_tag(tags, "subtitle", data["subtitle"])
# Extract authors
authors = data.get("authors", [])
author_names: List[str] = []
for author in authors:
if isinstance(author, dict):
name = author.get("name")
else:
name = str(author)
if name:
author_names.append(name)
if author_names:
_extend_tags(tags, "author", author_names)
# Extract publication details
if data.get("publish_date"):
_add_tag(tags, "publish_date", data["publish_date"])
# Extract year if present
year_match = re.search(r'\b(\d{4})\b', str(data.get("publish_date", "")))
if year_match:
_add_tag(tags, "year", year_match.group(1))
# Extract publishers
publishers = data.get("publishers", [])
if publishers:
publisher_names = []
for pub in publishers:
if isinstance(pub, dict):
name = pub.get("name")
else:
name = str(pub)
if name:
publisher_names.append(name)
if publisher_names:
_extend_tags(tags, "publisher", publisher_names)
# Extract languages
languages = data.get("languages", [])
if languages:
lang_codes = []
for lang in languages:
if isinstance(lang, dict):
code = lang.get("key", "").split("/")[-1]
else:
code = str(lang).split("/")[-1]
if code and code != "":
lang_codes.append(code)
if lang_codes:
_extend_tags(tags, "language", lang_codes)
# Extract ISBN
isbns = data.get("isbn_10", []) + data.get("isbn_13", [])
if isbns:
for isbn in isbns[:1]: # Just take first one
if len(str(isbn)) == 10:
_add_tag(tags, "isbn_10", isbn)
elif len(str(isbn)) == 13:
_add_tag(tags, "isbn_13", isbn)
# Extract page count
_add_tag(tags, "pages", data.get("number_of_pages"))
# Extract genres/subjects (OpenLibrary calls them subjects)
# Subjects are added as plain freeform tags (no namespace prefix)
subjects = data.get("subjects", [])
if subjects:
for subject in subjects[:10]: # Limit to 10 subjects
if isinstance(subject, dict):
name = subject.get("name")
else:
name = str(subject)
if name:
# Add subject as plain tag without "subject:" prefix
normalized = value_normalize(str(name))
if normalized:
tags.append(normalized)
# Extract OpenLibrary description
description = data.get("description")
if description:
if isinstance(description, dict):
description = description.get("value")
_add_tag(tags, "summary", description)
return PipeObject("openlibrary", ol_id, tags=tags).to_dict()
def _append_unique(target: List[str], seen: Set[str], value: Optional[str]) -> None:
"""Append a single value if not already in seen set (deduplication)."""
if value is None:
return
normalized = value_normalize(str(value))
if not normalized or normalized in seen:
return
seen.add(normalized)
target.append(normalized)
def _extend_namespaced(target: List[str], seen: Set[str], namespace: str, values: Iterable[Optional[str]]) -> None:
"""Append namespaced values if not already in seen set."""
for val in values:
if val:
_append_unique(target, seen, f"{namespace}:{val}")
def _coerce_duration(metadata: Dict[str, Any]) -> Optional[float]:
for key in ("duration", "duration_seconds", "length", "duration_sec"):
value = metadata.get(key)
if value is None:
continue
if isinstance(value, (int, float)):
if value > 0:
return float(value)
elif isinstance(value, str):
try:
candidate = float(value.strip())
except ValueError:
continue
if candidate > 0:
return candidate
return None
def _sanitize_url(value: Optional[str]) -> Optional[str]:
"""Sanitize URL: normalize and remove ytdl:// prefix."""
if value is None:
return None
cleaned = value_normalize(str(value))
if not cleaned:
return None
if cleaned.lower().startswith("ytdl://"):
cleaned = cleaned[7:]
return cleaned
def _clean_existing_tags(existing: Any) -> List[str]:
tags: List[str] = []
seen: Set[str] = set()
if isinstance(existing, (list, tuple, set)):
iterable = existing
elif existing is None:
iterable = []
else:
iterable = [existing]
for tag in iterable:
_append_unique(tags, seen, tag)
return tags
def _should_fetch_url(url: Optional[str]) -> bool:
if not url or not isinstance(url, str):
return False
return url.lower().startswith(('http://', 'https://'))
def fetch_remote_metadata(url: str, options: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], List[str]]:
warnings: List[str] = []
info: Optional[Dict[str, Any]] = None
if yt_dlp is not None:
try: # pragma: no cover - depends on runtime availability
ydl_opts = {
'quiet': True,
'no_warnings': True,
'skip_download': True,
'noplaylist': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[attr-defined]
info_dict = ydl.extract_info(url, download=False)
if info_dict is not None:
info = dict(info_dict)
except Exception as exc: # pragma: no cover - best effort
warnings.append(f"yt_dlp extract failed: {exc}")
if info is None:
executable = str(options.get('ytdlp_path') or 'yt-dlp')
extra_args = options.get('ytdlp_args') or []
if isinstance(extra_args, (str, bytes)):
extra_args = [extra_args]
cmd = [executable, '--dump-single-json', '--no-playlist', '--skip-download', '--no-warnings']
cmd.extend(str(arg) for arg in extra_args)
cmd.append(url)
timeout = float(options.get('timeout') or 45.0)
try:
completed = subprocess.run(cmd, capture_output=True, text=True, check=False, timeout=timeout)
except Exception as exc: # pragma: no cover - subprocess failure
warnings.append(f"yt-dlp invocation failed: {exc}")
return None, warnings
if completed.returncode != 0:
message = completed.stderr.strip() or completed.stdout.strip() or f"status {completed.returncode}"
warnings.append(message)
return None, warnings
try:
info = json.loads(completed.stdout)
except json.JSONDecodeError as exc: # pragma: no cover - parse failure
warnings.append(f"invalid JSON from yt-dlp: {exc}")
return None, warnings
if isinstance(info, dict) and 'entries' in info:
entries = info.get('entries')
if isinstance(entries, list) and entries:
info = entries[0]
if isinstance(info, dict):
info.setdefault('source_url', url)
return info if isinstance(info, dict) else None, warnings
def resolve_remote_metadata(payload: Dict[str, Any]) -> Dict[str, Any]:
options_raw = payload.get('options')
options: Dict[str, Any] = options_raw if isinstance(options_raw, dict) else {}
source_url = payload.get('source_url')
sanitized = _sanitize_url(source_url) or source_url
existing_tags = _clean_existing_tags(payload.get('existing_tags'))
metadata_sources: List[Dict[str, Any]] = []
for key in ('metadata', 'mpv_metadata', 'remote_metadata', 'info'):
candidate = payload.get(key)
if isinstance(candidate, dict):
metadata_sources.append(candidate)
remote_info: Optional[Dict[str, Any]] = None
warnings: List[str] = []
if not options.get('no_fetch'):
fetch_url = sanitized
if _should_fetch_url(fetch_url):
remote_info, fetch_warnings = fetch_remote_metadata(fetch_url or '', options)
warnings.extend(fetch_warnings)
if remote_info:
metadata_sources.append(remote_info)
combined_metadata = {}
for source in metadata_sources:
if isinstance(source, dict):
combined_metadata.update(source)
context = {'source_url': sanitized}
bundle = build_remote_bundle(combined_metadata, existing_tags, context)
merged_metadata = {**combined_metadata, **(bundle.get('metadata') or {})}
bundle['metadata'] = merged_metadata
if not bundle.get('source_url'):
bundle['source_url'] = sanitized
mpv_meta_candidate = payload.get('mpv_metadata')
mpv_metadata = mpv_meta_candidate if isinstance(mpv_meta_candidate, dict) else None
result_tags = bundle.get('tags') or existing_tags
result = PipeObject(
source='remote-metadata',
identifier=sanitized or 'unknown',
tags=result_tags,
title=bundle.get('title'),
source_url=bundle.get('source_url') or sanitized,
duration=bundle.get('duration'),
metadata=merged_metadata,
remote_metadata=remote_info,
warnings=warnings,
mpv_metadata=mpv_metadata,
)
return result.to_serializable()
def _ensure_hydrus_client() -> None:
if HydrusClient is None or HydrusRequestSpec is None: # pragma: no cover - depends on optional module
raise RuntimeError("Hydrus helpers are unavailable")
def _normalize_hash(value: Any) -> str:
candidate = str(value or '').strip().lower()
if not candidate:
raise ValueError("Hydrus hash is required")
if len(candidate) != 64 or any(ch not in '0123456789abcdef' for ch in candidate):
raise ValueError("Hydrus hash must be a 64-character hex string")
return candidate
def _normalize_tag(tag: Any) -> Optional[str]:
if tag is None:
return None
if isinstance(tag, str):
candidate = tag.strip()
else:
candidate = str(tag).strip()
return candidate or None
def _extract_tag_services(entry: Dict[str, Any]) -> List[Dict[str, Any]]:
tags_section = entry.get('tags')
services: List[Dict[str, Any]] = []
if not isinstance(tags_section, dict):
return services
names_map = tags_section.get('service_keys_to_names')
if not isinstance(names_map, dict):
names_map = {}
def get_record(service_key: Optional[str], service_name: Optional[str]) -> Dict[str, Any]:
key_lower = service_key.lower() if isinstance(service_key, str) else None
name_lower = service_name.lower() if isinstance(service_name, str) else None
for record in services:
existing_key = record.get('service_key')
if key_lower and isinstance(existing_key, str) and existing_key.lower() == key_lower:
if service_name and not record.get('service_name'):
record['service_name'] = service_name
return record
existing_name = record.get('service_name')
if name_lower and isinstance(existing_name, str) and existing_name.lower() == name_lower:
if service_key and not record.get('service_key'):
record['service_key'] = service_key
return record
record = {
'service_key': service_key,
'service_name': service_name,
'tags': [],
}
services.append(record)
return record
def _iter_current_status_lists(container: Any) -> Iterable[List[Any]]:
if isinstance(container, dict):
for status_key, tags_list in container.items():
if str(status_key) != '0':
continue
if isinstance(tags_list, list):
yield tags_list
elif isinstance(container, list):
yield container
statuses_map = tags_section.get('service_keys_to_statuses_to_tags')
if isinstance(statuses_map, dict):
for service_key, status_map in statuses_map.items():
record = get_record(service_key if isinstance(service_key, str) else None, names_map.get(service_key))
for tags_list in _iter_current_status_lists(status_map):
for tag in tags_list:
normalized = _normalize_tag(tag)
if normalized:
record['tags'].append(normalized)
ignored_keys = {
'service_keys_to_statuses_to_tags',
'service_keys_to_statuses_to_display_tags',
'service_keys_to_display_friendly_tags',
'service_keys_to_names',
'tag_display_types_to_namespaces',
'namespace_display_string_lookup',
'tag_display_decoration_colour_lookup',
}
for key, service in tags_section.items():
if key in ignored_keys:
continue
if isinstance(service, dict):
service_key = service.get('service_key') or (key if isinstance(key, str) else None)
service_name = service.get('service_name') or service.get('name') or names_map.get(service_key)
record = get_record(service_key if isinstance(service_key, str) else None, service_name)
storage = service.get('storage_tags') or service.get('statuses_to_tags') or service.get('tags')
if isinstance(storage, dict):
for tags_list in _iter_current_status_lists(storage):
for tag in tags_list:
normalized = _normalize_tag(tag)
if normalized:
record['tags'].append(normalized)
elif isinstance(storage, list):
for tag in storage:
normalized = _normalize_tag(tag)
if normalized:
record['tags'].append(normalized)
# Use canonical dedup function
for record in services:
record['tags'] = dedup_tags_by_namespace(record['tags'], keep_first=True)
return services
def _select_primary_tags(services: List[Dict[str, Any]], aggregated: List[str], prefer_service: Optional[str]) -> Tuple[Optional[str], List[str]]:
prefer_lower = prefer_service.lower() if isinstance(prefer_service, str) else None
if prefer_lower:
for record in services:
name = record.get('service_name')
if isinstance(name, str) and name.lower() == prefer_lower and record['tags']:
return record.get('service_key'), record['tags']
for record in services:
if record['tags']:
return record.get('service_key'), record['tags']
return None, aggregated
def _derive_title(tags_primary: List[str], tags_aggregated: List[str], entry: Dict[str, Any]) -> Optional[str]:
for source in (tags_primary, tags_aggregated):
for tag in source:
namespace, sep, value = tag.partition(':')
if sep and namespace and namespace.lower() == 'title':
cleaned = value.strip()
if cleaned:
return cleaned
for key in ('title', 'display_name', 'pretty_name', 'original_display_filename', 'original_filename'):
value = entry.get(key)
if isinstance(value, str):
cleaned = value.strip()
if cleaned:
return cleaned
return None
def _derive_clip_time(tags_primary: List[str], tags_aggregated: List[str], entry: Dict[str, Any]) -> Optional[str]:
namespaces = {'clip', 'clip_time', 'cliptime'}
for source in (tags_primary, tags_aggregated):
for tag in source:
namespace, sep, value = tag.partition(':')
if sep and namespace and namespace.lower() in namespaces:
cleaned = value.strip()
if cleaned:
return cleaned
clip_value = entry.get('clip_time')
if isinstance(clip_value, str):
cleaned_clip = clip_value.strip()
if cleaned_clip:
return cleaned_clip
return None
def _summarize_hydrus_entry(entry: Dict[str, Any], prefer_service: Optional[str]) -> Tuple[Dict[str, Any], List[str], Optional[str], Optional[str], Optional[str]]:
services = _extract_tag_services(entry)
aggregated: List[str] = []
seen: Set[str] = set()
for record in services:
for tag in record['tags']:
if tag not in seen:
seen.add(tag)
aggregated.append(tag)
service_key, primary_tags = _select_primary_tags(services, aggregated, prefer_service)
title = _derive_title(primary_tags, aggregated, entry)
clip_time = _derive_clip_time(primary_tags, aggregated, entry)
summary = dict(entry)
if title and not summary.get('title'):
summary['title'] = title
if clip_time and not summary.get('clip_time'):
summary['clip_time'] = clip_time
summary['tag_service_key'] = service_key
summary['has_current_file_service'] = _has_current_file_service(entry)
if 'is_local' not in summary:
summary['is_local'] = bool(entry.get('is_local'))
return summary, primary_tags, service_key, title, clip_time
def _looks_like_hash(value: Any) -> bool:
if not isinstance(value, str):
return False
candidate = value.strip().lower()
return len(candidate) == 64 and all(ch in '0123456789abcdef' for ch in candidate)
def _collect_relationship_hashes(payload: Any, accumulator: Set[str]) -> None:
if isinstance(payload, dict):
for value in payload.values():
_collect_relationship_hashes(value, accumulator)
elif isinstance(payload, (list, tuple, set)):
for value in payload:
_collect_relationship_hashes(value, accumulator)
elif isinstance(payload, str) and _looks_like_hash(payload):
accumulator.add(payload)
def _build_hydrus_query(
hashes: Optional[Sequence[str]],
file_ids: Optional[Sequence[int]],
include_relationships: bool,
minimal: bool,
) -> Dict[str, str]:
query: Dict[str, str] = {}
if hashes:
query['hashes'] = json.dumps(list(hashes))
if file_ids:
query['file_ids'] = json.dumps([int(value) for value in file_ids])
if not query:
raise ValueError('hashes or file_ids must be provided')
query['include_service_keys_to_tags'] = json.dumps(True)
query['include_tag_services'] = json.dumps(True)
query['include_file_services'] = json.dumps(True)
if include_relationships:
query['include_file_relationships'] = json.dumps(True)
if not minimal:
extras = (
'include_known_urls',
'include_size',
'include_width',
'include_height',
'include_duration',
'include_mime',
'include_has_audio',
'include_is_trashed',
)
for key in extras:
query[key] = json.dumps(True)
return query
def _fetch_hydrus_entries(
client: Any,
hashes: Optional[Sequence[str]],
file_ids: Optional[Sequence[int]],
include_relationships: bool,
minimal: bool,
) -> List[Dict[str, Any]]:
if not hashes and not file_ids:
return []
assert HydrusRequestSpec is not None
spec = HydrusRequestSpec(
method='GET',
endpoint='/get_files/file_metadata',
query=_build_hydrus_query(hashes, file_ids, include_relationships, minimal),
)
response = client._perform_request(spec) # type: ignore[attr-defined]
metadata = response.get('metadata') if isinstance(response, dict) else None
if isinstance(metadata, list):
return [entry for entry in metadata if isinstance(entry, dict)]
return []
def _has_current_file_service(entry: Dict[str, Any]) -> bool:
services = entry.get('file_services')
if not isinstance(services, dict):
return False
current = services.get('current')
if isinstance(current, dict):
for value in current.values():
if value:
return True
return False
if isinstance(current, list):
return len(current) > 0
return False
def _compute_file_flags(entry: Dict[str, Any]) -> Tuple[bool, bool, bool]:
mime = entry.get('mime')
mime_lower = mime.lower() if isinstance(mime, str) else ''
is_video = mime_lower.startswith('video/')
is_audio = mime_lower.startswith('audio/')
is_deleted = False
if entry.get('is_trashed'):
is_deleted = True
file_services = entry.get('file_services')
if not is_deleted and isinstance(file_services, dict):
deleted = file_services.get('deleted')
if isinstance(deleted, dict) and deleted:
is_deleted = True
return is_video, is_audio, is_deleted
def fetch_hydrus_metadata(payload: Dict[str, Any]) -> Dict[str, Any]:
_ensure_hydrus_client()
assert HydrusClient is not None
hash_hex = None
raw_hash_value = payload.get('hash')
if raw_hash_value is not None:
hash_hex = _normalize_hash(raw_hash_value)
file_ids: List[int] = []
raw_file_ids = payload.get('file_ids')
if isinstance(raw_file_ids, (list, tuple, set)):
for value in raw_file_ids:
try:
file_ids.append(int(value))
except (TypeError, ValueError):
continue
elif raw_file_ids is not None:
try:
file_ids.append(int(raw_file_ids))
except (TypeError, ValueError):
file_ids = []
raw_file_id = payload.get('file_id')
if raw_file_id is not None:
try:
coerced = int(raw_file_id)
except (TypeError, ValueError):
coerced = None
if coerced is not None and coerced not in file_ids:
file_ids.append(coerced)
base_url = str(payload.get('api_url') or '').strip()
if not base_url:
raise ValueError('Hydrus api_url is required')
access_key = str(payload.get('access_key') or '').strip()
options_raw = payload.get('options')
options = options_raw if isinstance(options_raw, dict) else {}
prefer_service = options.get('prefer_service_name')
if isinstance(prefer_service, str):
prefer_service = prefer_service.strip()
else:
prefer_service = None
include_relationships = bool(options.get('include_relationships'))
minimal = bool(options.get('minimal'))
timeout = float(options.get('timeout') or 60.0)
client = HydrusClient(base_url, access_key, timeout)
hashes: Optional[List[str]] = None
if hash_hex:
hashes = [hash_hex]
if not hashes and not file_ids:
raise ValueError('Hydrus hash or file id is required')
try:
entries = _fetch_hydrus_entries(client, hashes, file_ids or None, include_relationships, minimal)
except HydrusRequestError as exc: # type: ignore[misc]
raise RuntimeError(str(exc))
if not entries:
response: Dict[str, Any] = {
'hash': hash_hex,
'metadata': {},
'tags': [],
'warnings': [f'No Hydrus metadata for {hash_hex or file_ids}'],
'error': 'not_found',
}
if file_ids:
response['file_id'] = file_ids[0]
return response
entry = entries[0]
if not hash_hex:
entry_hash = entry.get('hash')
if isinstance(entry_hash, str) and entry_hash:
hash_hex = entry_hash
hashes = [hash_hex]
summary, primary_tags, service_key, title, clip_time = _summarize_hydrus_entry(entry, prefer_service)
is_video, is_audio, is_deleted = _compute_file_flags(entry)
has_current_file_service = _has_current_file_service(entry)
is_local = bool(entry.get('is_local'))
size_bytes = entry.get('size') or entry.get('file_size')
filesize_mb = None
if isinstance(size_bytes, (int, float)) and size_bytes > 0:
filesize_mb = float(size_bytes) / (1024.0 * 1024.0)
duration = entry.get('duration')
if duration is None and isinstance(entry.get('duration_ms'), (int, float)):
duration = float(entry['duration_ms']) / 1000.0
warnings: List[str] = []
if not primary_tags:
warnings.append('No tags returned for preferred service')
relationships = None
relationship_metadata: Dict[str, Dict[str, Any]] = {}
if include_relationships and hash_hex:
try:
assert HydrusRequestSpec is not None
rel_spec = HydrusRequestSpec(
method='GET',
endpoint='/manage_file_relationships/get_file_relationships',
query={'hash': hash_hex},
)
relationships = client._perform_request(rel_spec) # type: ignore[attr-defined]
except HydrusRequestError as exc: # type: ignore[misc]
warnings.append(f'Relationship lookup failed: {exc}')
relationships = None
if isinstance(relationships, dict):
related_hashes: Set[str] = set()
_collect_relationship_hashes(relationships, related_hashes)
related_hashes.discard(hash_hex)
if related_hashes:
try:
related_entries = _fetch_hydrus_entries(client, sorted(related_hashes), None, False, True)
except HydrusRequestError as exc: # type: ignore[misc]
warnings.append(f'Relationship metadata fetch failed: {exc}')
else:
for rel_entry in related_entries:
rel_hash = rel_entry.get('hash')
if not isinstance(rel_hash, str):
continue
rel_summary, rel_tags, _, rel_title, rel_clip = _summarize_hydrus_entry(rel_entry, prefer_service)
rel_summary['tags'] = rel_tags
if rel_title:
rel_summary['title'] = rel_title
if rel_clip:
rel_summary['clip_time'] = rel_clip
relationship_metadata[rel_hash] = rel_summary
result: Dict[str, Any] = {
'hash': entry.get('hash') or hash_hex,
'metadata': summary,
'tags': primary_tags,
'tag_service_key': service_key,
'title': title,
'clip_time': clip_time,
'duration': duration,
'filesize_mb': filesize_mb,
'is_video': is_video,
'is_audio': is_audio,
'is_deleted': is_deleted,
'is_local': is_local,
'has_current_file_service': has_current_file_service,
'matched_hash': entry.get('hash') or hash_hex,
'swap_recommended': False,
}
file_id_value = entry.get('file_id')
if isinstance(file_id_value, (int, float)):
result['file_id'] = int(file_id_value)
if relationships is not None:
result['relationships'] = relationships
if relationship_metadata:
result['relationship_metadata'] = relationship_metadata
if warnings:
result['warnings'] = warnings
return result
def fetch_hydrus_metadata_by_url(payload: Dict[str, Any]) -> Dict[str, Any]:
_ensure_hydrus_client()
assert HydrusClient is not None
raw_url = payload.get('url') or payload.get('source_url')
url = str(raw_url or '').strip()
if not url:
raise ValueError('URL is required to fetch Hydrus metadata by URL')
base_url = str(payload.get('api_url') or '').strip()
if not base_url:
raise ValueError('Hydrus api_url is required')
access_key = str(payload.get('access_key') or '').strip()
options_raw = payload.get('options')
options = options_raw if isinstance(options_raw, dict) else {}
timeout = float(options.get('timeout') or 60.0)
client = HydrusClient(base_url, access_key, timeout)
hashes: Optional[List[str]] = None
file_ids: Optional[List[int]] = None
matched_url = None
normalised_reported = None
seen: Set[str] = set()
queue = deque()
for variant in _generate_hydrus_url_variants(url):
queue.append(variant)
if not queue:
queue.append(url)
tried_variants: List[str] = []
while queue:
candidate = queue.popleft()
candidate = str(candidate or '').strip()
if not candidate or candidate in seen:
continue
seen.add(candidate)
tried_variants.append(candidate)
assert HydrusRequestSpec is not None
spec = HydrusRequestSpec(
method='GET',
endpoint='/add_urls/get_url_files',
query={'url': candidate},
)
try:
response = client._perform_request(spec) # type: ignore[attr-defined]
except HydrusRequestError as exc: # type: ignore[misc]
raise RuntimeError(str(exc))
response_hashes_list: List[str] = []
response_file_ids_list: List[int] = []
if isinstance(response, dict):
normalised_value = response.get('normalised_url')
if isinstance(normalised_value, str):
trimmed = normalised_value.strip()
if trimmed:
normalised_reported = normalised_reported or trimmed
if trimmed not in seen:
queue.append(trimmed)
for redirect_key in ('redirect_url', 'url'):
redirect_value = response.get(redirect_key)
if isinstance(redirect_value, str):
redirect_trimmed = redirect_value.strip()
if redirect_trimmed and redirect_trimmed not in seen:
queue.append(redirect_trimmed)
raw_hashes = response.get('hashes') or response.get('file_hashes')
if isinstance(raw_hashes, list):
for item in raw_hashes:
try:
normalized = _normalize_hash(item)
except ValueError:
continue
if normalized:
response_hashes_list.append(normalized)
raw_ids = response.get('file_ids') or response.get('file_id')
if isinstance(raw_ids, list):
for item in raw_ids:
try:
response_file_ids_list.append(int(item))
except (TypeError, ValueError):
continue
elif raw_ids is not None:
try:
response_file_ids_list.append(int(raw_ids))
except (TypeError, ValueError):
pass
statuses = response.get('url_file_statuses')
if isinstance(statuses, list):
for entry in statuses:
if not isinstance(entry, dict):
continue
status_hash = entry.get('hash') or entry.get('file_hash')
if status_hash:
try:
normalized = _normalize_hash(status_hash)
except ValueError:
normalized = None
if normalized:
response_hashes_list.append(normalized)
status_id = entry.get('file_id') or entry.get('fileid')
if status_id is not None:
try:
response_file_ids_list.append(int(status_id))
except (TypeError, ValueError):
continue
if response_hashes_list:
hashes = response_hashes_list
if response_file_ids_list:
file_ids = response_file_ids_list
if hashes or file_ids:
matched_url = candidate
break
if not hashes and not file_ids:
result = {
'found': False,
'url': url,
'variants': tried_variants,
'metadata': {},
'tags': [],
'warnings': [f'No Hydrus file found for {url}'],
'error': 'not_found',
}
if normalised_reported:
result['normalised_url'] = normalised_reported
return result
hash_value = str(hashes[0]) if hashes else None
followup_payload: Dict[str, Any] = {
'api_url': base_url,
'access_key': access_key,
'options': options,
}
if hash_value:
followup_payload['hash'] = hash_value
if file_ids:
followup_payload['file_id'] = file_ids[0]
result = fetch_hydrus_metadata(followup_payload)
result['found'] = True
result['url'] = url
if matched_url and matched_url != url:
result['matched_url'] = matched_url
if file_ids:
result['file_id'] = file_ids[0]
if normalised_reported:
result['normalised_url'] = normalised_reported
result['variants'] = tried_variants
return result
def _normalise_string_list(values: Optional[Iterable[Any]]) -> List[str]:
if not values:
return []
seen: Set[str] = set()
items: List[str] = []
for value in values:
if value is None:
continue
text = str(value).strip()
if not text:
continue
if text in seen:
continue
seen.add(text)
items.append(text)
return items
def _derive_sidecar_path(media_path: Path) -> Path:
try:
return media_path.parent / (media_path.name + '.tags')
except ValueError:
return media_path.with_name(media_path.name + '.tags')
def _read_sidecar_metadata(sidecar_path: Path) -> tuple[Optional[str], List[str], List[str]]:
"""Read hash, tags, and known_urls from .tags sidecar file.
Consolidated with read_tags_from_file - this extracts extra metadata (hash, urls).
"""
if not sidecar_path.exists():
return None, [], []
try:
raw = sidecar_path.read_text(encoding='utf-8')
except OSError:
return None, [], []
hash_value: Optional[str] = None
tags: List[str] = []
known_urls: List[str] = []
for raw_line in raw.splitlines():
line = raw_line.strip()
if not line or line.startswith('#'):
continue
lower = line.lower()
if lower.startswith('hash:'):
hash_value = line.split(':', 1)[1].strip() if ':' in line else ''
elif lower.startswith('known_url:') or lower.startswith('url:'):
# Parse URLs (handle legacy 'url:' format)
urls_part = line.split(':', 1)[1].strip() if ':' in line else ''
if urls_part:
for url_segment in urls_part.split(','):
for url in url_segment.split():
url_clean = url.strip()
if url_clean and url_clean not in known_urls:
known_urls.append(url_clean)
else:
# Everything else is a tag (including relationship: lines)
tags.append(line)
return hash_value, tags, known_urls
2025-11-27 10:59:01 -08:00
def rename(file_path: Path, tags: Iterable[str]) -> Optional[Path]:
2025-11-25 20:09:33 -08:00
"""Rename a file based on title: tag in the tags list.
If a title: tag is present, renames the file and any .tags/.metadata sidecars.
Args:
file_path: Path to the file to potentially rename
tags: Iterable of tag strings (should contain title: tag if rename needed)
Returns:
New path if renamed, None if not renamed or error occurred
"""
# Extract title from tags
new_title = None
for tag in tags:
if isinstance(tag, str) and tag.lower().startswith('title:'):
new_title = tag.split(':', 1)[1].strip()
break
if not new_title or not file_path.exists():
return None
try:
old_name = file_path.name
old_suffix = file_path.suffix
# Create new filename: title + extension
new_name = f"{new_title}{old_suffix}"
new_path = file_path.parent / new_name
# Don't rename if already the same name
if new_path == file_path:
return None
# If target exists, delete it first (replace mode)
if new_path.exists():
try:
new_path.unlink()
2025-11-27 10:59:01 -08:00
debug(f"Replaced existing file: {new_name}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
except Exception as e:
2025-11-27 10:59:01 -08:00
debug(f"Warning: Could not replace target file {new_name}: {e}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
return None
file_path.rename(new_path)
2025-11-27 10:59:01 -08:00
debug(f"Renamed file: {old_name}{new_name}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
# Rename the .tags sidecar if it exists
old_tags_path = file_path.parent / (old_name + '.tags')
if old_tags_path.exists():
new_tags_path = file_path.parent / (new_name + '.tags')
if new_tags_path.exists():
try:
new_tags_path.unlink()
except Exception:
pass
else:
old_tags_path.rename(new_tags_path)
2025-11-27 10:59:01 -08:00
debug(f"Renamed sidecar: {old_tags_path.name}{new_tags_path.name}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
# Rename the .metadata sidecar if it exists
old_metadata_path = file_path.parent / (old_name + '.metadata')
if old_metadata_path.exists():
new_metadata_path = file_path.parent / (new_name + '.metadata')
if new_metadata_path.exists():
2025-11-27 10:59:01 -08:00
debug(f"Warning: Target metadata already exists: {new_metadata_path.name}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
else:
old_metadata_path.rename(new_metadata_path)
2025-11-27 10:59:01 -08:00
debug(f"Renamed metadata: {old_metadata_path.name}{new_metadata_path.name}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
return new_path
except Exception as exc:
2025-11-27 10:59:01 -08:00
debug(f"Warning: Failed to rename file: {exc}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
return None
def write_tags(media_path: Path, tags: Iterable[str], known_urls: Iterable[str], hash_value: Optional[str] = None, db=None) -> None:
"""Write tags and metadata to database or sidecar file.
If db is provided, inserts into LocalLibraryDB and skips sidecar file creation.
Otherwise, creates .tags sidecar file with name: media.ext.tags (e.g., song.mp3.tags)
Args:
media_path: Path to the media file
tags: Iterable of tag strings
known_urls: Iterable of known URL strings
hash_value: Optional hash value for the file
db: Optional LocalLibraryDB instance. If provided, skips sidecar creation.
"""
if media_path.exists() and media_path.is_dir():
raise ValueError(f"write_tags_sidecar: media_path is a directory: {media_path}")
# Prepare tags lines and convert to list if needed
tag_list = list(tags) if not isinstance(tags, list) else tags
url_list = list(known_urls) if not isinstance(known_urls, list) else known_urls
# If database provided, insert directly and skip sidecar
if db is not None:
try:
# Build tag list with hash and known_urls
db_tags = []
if hash_value:
db_tags.append(f"hash:{hash_value}")
db_tags.extend(str(tag).strip() for tag in tag_list if str(tag).strip())
db_tags.extend(f"known_url:{str(url).strip()}" for url in url_list if str(url).strip())
if db_tags:
db.add_tags(media_path, db_tags)
2025-11-27 10:59:01 -08:00
debug(f"Added tags to database for {media_path.name}")
2025-11-25 20:09:33 -08:00
return
except Exception as e:
2025-11-27 10:59:01 -08:00
debug(f"Failed to add tags to database: {e}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
# Fall through to sidecar creation as fallback
# Create sidecar path
try:
sidecar = media_path.parent / (media_path.name + '.tags')
except Exception:
sidecar = media_path.with_name(media_path.name + '.tags')
# Handle edge case: empty/invalid base name
try:
if not sidecar.stem or sidecar.name in {'.tags', '-.tags', '_.tags'}:
fallback_base = media_path.stem or _sanitize_title_for_filename(extract_title(tag_list) or '') or 'untitled'
sidecar = media_path.parent / f"{fallback_base}.tags"
except Exception:
pass
# Write via consolidated function
try:
lines = []
if hash_value:
lines.append(f"hash:{hash_value}")
lines.extend(str(tag).strip() for tag in tag_list if str(tag).strip())
lines.extend(f"known_url:{str(url).strip()}" for url in url_list if str(url).strip())
if lines:
sidecar.write_text("\n".join(lines) + "\n", encoding="utf-8")
2025-11-27 10:59:01 -08:00
debug(f"Tags: {sidecar}")
2025-11-25 20:09:33 -08:00
# Clean up legacy files
for legacy_path in [media_path.with_name(media_path.name + '.tags'),
media_path.with_name(media_path.name + '.tags.txt')]:
if legacy_path.exists() and legacy_path != sidecar:
try:
legacy_path.unlink()
except OSError:
pass
else:
try:
sidecar.unlink()
except FileNotFoundError:
pass
except OSError as exc:
2025-11-27 10:59:01 -08:00
debug(f"Failed to write tag sidecar {sidecar}: {exc}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
def write_metadata(media_path: Path, hash_value: Optional[str] = None, known_urls: Optional[Iterable[str]] = None, relationships: Optional[Iterable[str]] = None, db=None) -> None:
"""Write metadata to database or sidecar file.
If db is provided, inserts into LocalLibraryDB and skips sidecar file creation.
Otherwise, creates .metadata sidecar file with hash, URLs, and relationships.
Args:
media_path: Path to the media file
hash_value: Optional hash value for the file
known_urls: Optional iterable of known URL strings
relationships: Optional iterable of relationship strings
db: Optional LocalLibraryDB instance. If provided, skips sidecar creation.
"""
if media_path.exists() and media_path.is_dir():
raise ValueError(f"write_metadata_sidecar: media_path is a directory: {media_path}")
# Prepare metadata lines
url_list = list(known_urls) if known_urls else []
rel_list = list(relationships) if relationships else []
# If database provided, insert directly and skip sidecar
if db is not None:
try:
# Build metadata tag list
db_tags = []
if hash_value:
db_tags.append(f"hash:{hash_value}")
for url in url_list:
if str(url).strip():
db_tags.append(f"known_url:{str(url).strip()}")
for rel in rel_list:
if str(rel).strip():
db_tags.append(f"relationship:{str(rel).strip()}")
if db_tags:
db.add_tags(media_path, db_tags)
2025-11-27 10:59:01 -08:00
debug(f"Added metadata to database for {media_path.name}")
2025-11-25 20:09:33 -08:00
return
except Exception as e:
2025-11-27 10:59:01 -08:00
debug(f"Failed to add metadata to database: {e}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
# Fall through to sidecar creation as fallback
# Create sidecar path
try:
sidecar = media_path.parent / (media_path.name + '.metadata')
except Exception:
sidecar = media_path.with_name(media_path.name + '.metadata')
try:
lines = []
# Add hash if available
if hash_value:
lines.append(f"hash:{hash_value}")
# Add known URLs
for url in url_list:
if str(url).strip():
lines.append(f"known_url:{str(url).strip()}")
# Add relationships
for rel in rel_list:
if str(rel).strip():
lines.append(f"relationship:{str(rel).strip()}")
# Write metadata file
if lines:
sidecar.write_text("\n".join(lines) + "\n", encoding="utf-8")
2025-11-27 10:59:01 -08:00
debug(f"Wrote metadata to {sidecar}")
2025-11-25 20:09:33 -08:00
else:
# Remove if no content
try:
sidecar.unlink()
except FileNotFoundError:
pass
except OSError as exc:
2025-11-27 10:59:01 -08:00
debug(f"Failed to write metadata sidecar {sidecar}: {exc}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
def extract_title(tags: Iterable[str]) -> Optional[str]:
"""
Extracts a title from a list of tags (looks for 'title:...').
"""
for tag in tags:
tag = tag.strip()
if tag.lower().startswith("title:"):
title_tag = tag.split(":", 1)[1].strip()
if title_tag:
return title_tag
return None
def _sanitize_title_for_filename(title: str) -> str:
# Allow alnum, hyphen, underscore, and space; replace other chars with space
temp = []
for ch in title:
if ch.isalnum() or ch in {"-", "_", " "}:
temp.append(ch)
else:
temp.append(" ")
# Collapse whitespace and trim hyphens/underscores around words
rough = "".join(temp)
tokens = []
for seg in rough.split():
cleaned = seg.strip("-_ ")
if cleaned:
tokens.append(cleaned)
sanitized = "_".join(tokens)
sanitized = sanitized.strip("-_")
return sanitized or "untitled"
def apply_title_to_path(media_path: Path, tags: Iterable[str]) -> Path:
"""
If a title tag is present, returns a new Path with the title as filename; else returns original path.
"""
title = extract_title(tags)
if not title:
return media_path
parent = media_path.parent
sanitized = _sanitize_title_for_filename(title)
destination = parent / f"{sanitized}{media_path.suffix}"
return destination
def _collect_search_roots(payload: Dict[str, Any]) -> List[Path]:
roots: List[Path] = []
for key in ('paths', 'search_paths', 'roots', 'directories'):
raw = payload.get(key)
if not raw:
continue
entries = raw if isinstance(raw, (list, tuple, set)) else [raw]
for entry in entries:
if not entry:
continue
try:
candidate = Path(str(entry)).expanduser()
except Exception:
continue
roots.append(candidate)
if load_config is not None and resolve_output_dir is not None:
try:
config = load_config()
except Exception:
config = None
if isinstance(config, dict) and config:
try:
default_root = resolve_output_dir(config)
except Exception:
default_root = None
if default_root is not None:
roots.append(default_root)
return roots
def _locate_sidecar_by_hash(hash_value: str, roots: Iterable[Path]) -> Optional[Path]:
target = f'hash:{hash_value.strip().lower()}'
for root in roots:
try:
root_path = root.expanduser()
except Exception:
continue
if not root_path.exists() or not root_path.is_dir():
continue
for pattern in ('*.tags', '*.tags.txt'):
try:
iterator = root_path.rglob(pattern)
except OSError:
continue
for candidate in iterator:
if not candidate.is_file():
continue
try:
with candidate.open('r', encoding='utf-8', errors='ignore') as handle:
for line in handle:
if line.strip().lower() == target:
return candidate
except OSError:
continue
return None
def sync_sidecar(payload: Dict[str, Any]) -> Dict[str, Any]:
path_value = payload.get('path')
sidecar_path: Optional[Path] = None
media_path: Optional[Path] = None
if path_value:
candidate = Path(str(path_value)).expanduser()
if candidate.suffix.lower() in {'.tags', '.tags.txt'}:
sidecar_path = candidate
else:
media_path = candidate
hash_input = payload.get('hash')
hash_value = None
if hash_input:
hash_value = _normalize_hash(hash_input)
tags = _normalise_string_list(payload.get('tags'))
known_urls = _normalise_string_list(payload.get('known_urls'))
if media_path is not None:
sidecar_path = _derive_sidecar_path(media_path)
search_roots = _collect_search_roots(payload)
if sidecar_path is None and hash_value:
located = _locate_sidecar_by_hash(hash_value, search_roots)
if located is not None:
sidecar_path = located
if sidecar_path is None:
if media_path is not None:
sidecar_path = _derive_sidecar_path(media_path)
elif hash_value:
return {
'error': 'not_found',
'hash': hash_value,
'tags': tags,
'known_urls': known_urls,
}
else:
raise ValueError('path or hash is required to synchronise sidecar')
existing_hash, existing_tags, existing_known = _read_sidecar_metadata(sidecar_path)
if not tags:
tags = existing_tags
if not known_urls:
known_urls = existing_known
hash_line = hash_value or existing_hash
title_value: Optional[str] = None
for tag in tags:
if isinstance(tag, str):
if tag.lower().startswith('title:'):
title_value = tag.split(':', 1)[1].strip() if ':' in tag else ''
if title_value == '':
title_value = None
break
lines: List[str] = []
if hash_line:
lines.append(f'hash:{hash_line}')
lines.extend(tags)
lines.extend(f'known_url:{url}' for url in known_urls)
sidecar_path.parent.mkdir(parents=True, exist_ok=True)
if lines:
sidecar_path.write_text('\n'.join(lines) + '\n', encoding='utf-8')
else:
try:
sidecar_path.unlink()
except FileNotFoundError:
pass
return {
'path': str(sidecar_path),
'hash': hash_line,
'tags': [],
'known_urls': [],
'deleted': True,
'title': title_value,
}
return {
'path': str(sidecar_path),
'hash': hash_line,
'tags': tags,
'known_urls': known_urls,
'title': title_value,
}
def _build_hydrus_context(payload: Dict[str, Any]) -> Tuple[Any, str, str, float, Optional[str]]:
_ensure_hydrus_client()
assert HydrusClient is not None
base_url = str(payload.get('api_url') or '').strip()
if not base_url:
raise ValueError('Hydrus api_url is required')
access_key = str(payload.get('access_key') or '').strip()
options_raw = payload.get('options')
options = options_raw if isinstance(options_raw, dict) else {}
timeout = float(options.get('timeout') or payload.get('timeout') or 60.0)
prefer_service = payload.get('prefer_service_name') or options.get('prefer_service_name')
if isinstance(prefer_service, str):
prefer_service = prefer_service.strip() or None
else:
prefer_service = None
client = HydrusClient(base_url, access_key, timeout)
return client, base_url, access_key, timeout, prefer_service
def _refetch_hydrus_summary(base_url: str, access_key: str, hash_hex: str, timeout: float, prefer_service: Optional[str]) -> Dict[str, Any]:
payload: Dict[str, Any] = {
'hash': hash_hex,
'api_url': base_url,
'access_key': access_key,
'options': {
'minimal': True,
'include_relationships': False,
'timeout': timeout,
},
}
if prefer_service:
payload['options']['prefer_service_name'] = prefer_service
return fetch_hydrus_metadata(payload)
def _apply_hydrus_tag_mutation(payload: Dict[str, Any], add: Iterable[Any], remove: Iterable[Any]) -> Dict[str, Any]:
client, base_url, access_key, timeout, prefer_service = _build_hydrus_context(payload)
hash_hex = _normalize_hash(payload.get('hash'))
add_list = [_normalize_tag(tag) for tag in add if _normalize_tag(tag)]
remove_list = [_normalize_tag(tag) for tag in remove if _normalize_tag(tag)]
if not add_list and not remove_list:
raise ValueError('No tag changes supplied')
service_key = payload.get('service_key') or payload.get('tag_service_key')
summary = None
if not service_key:
summary = _refetch_hydrus_summary(base_url, access_key, hash_hex, timeout, prefer_service)
service_key = summary.get('tag_service_key')
if not isinstance(service_key, str) or not service_key:
raise RuntimeError('Unable to determine Hydrus tag service key')
actions: Dict[str, List[str]] = {}
if add_list:
actions['0'] = [tag for tag in add_list if tag]
if remove_list:
actions['1'] = [tag for tag in remove_list if tag]
if not actions:
raise ValueError('Tag mutation produced no actionable changes')
request_payload = {
'hashes': [hash_hex],
'service_keys_to_actions_to_tags': {
service_key: actions,
},
}
try:
assert HydrusRequestSpec is not None
tag_spec = HydrusRequestSpec(
method='POST',
endpoint='/add_tags/add_tags',
data=request_payload,
)
client._perform_request(tag_spec)
except HydrusRequestError as exc: # type: ignore[misc]
raise RuntimeError(str(exc))
summary_after = _refetch_hydrus_summary(base_url, access_key, hash_hex, timeout, prefer_service)
result = dict(summary_after)
result['added_tags'] = actions.get('0', [])
result['removed_tags'] = actions.get('1', [])
result['tag_service_key'] = summary_after.get('tag_service_key')
return result
def apply_tag_mutation(payload: Dict[str, Any], operation: str = 'add') -> Dict[str, Any]:
"""Unified tag mutation for add and update operations (Hydrus and local).
Consolidates: add_tag, update_tag, _add_local_tag, _update_local_tag
Args:
payload: Mutation payload with type, tags, old_tag, new_tag
operation: 'add' or 'update'
Returns:
Dict with tags and operation result
"""
file_type = str(payload.get('type', 'local')).lower()
if file_type == 'hydrus':
if operation == 'add':
new_tag = _normalize_tag(payload.get('new_tag'))
if not new_tag:
raise ValueError('new_tag is required')
result = _apply_hydrus_tag_mutation(payload, [new_tag], [])
result['added'] = True
return result
else: # update
old_tag = _normalize_tag(payload.get('old_tag'))
new_tag = _normalize_tag(payload.get('new_tag'))
result = _apply_hydrus_tag_mutation(
payload,
[new_tag] if new_tag else [],
[old_tag] if old_tag else []
)
result['updated'] = True
return result
else: # local
tags = _clean_existing_tags(payload.get('tags'))
if operation == 'add':
new_tag = _normalize_tag(payload.get('new_tag'))
if not new_tag:
raise ValueError('new_tag is required')
added = new_tag not in tags
if added:
tags.append(new_tag)
return {'tags': tags, 'added': added}
else: # update
old_tag = _normalize_tag(payload.get('old_tag'))
new_tag = _normalize_tag(payload.get('new_tag'))
if not old_tag:
raise ValueError('old_tag is required')
remaining = []
removed_count = 0
for tag in tags:
if tag == old_tag:
removed_count += 1
else:
remaining.append(tag)
if new_tag and removed_count > 0:
remaining.extend([new_tag] * removed_count)
updated = removed_count > 0 or (bool(new_tag) and new_tag not in tags)
return {'tags': remaining, 'updated': updated, 'removed_count': removed_count}
def extract_ytdlp_tags(entry: Dict[str, Any]) -> List[str]:
"""Extract meaningful metadata tags from yt-dlp entry.
This is the UNIFIED API for extracting tags from yt-dlp metadata.
All modules (download_data, merge_file, etc.) should use this function
instead of implementing their own extraction logic.
Extracts meaningful tags (artist, album, creator, genre, track, etc.)
while excluding technical fields (filesize, duration, format, etc.).
Args:
entry: yt-dlp entry metadata dictionary from download
Returns:
List of normalized tag strings in format "namespace:value"
Example:
>>> entry = {'artist': 'The Beatles', 'album': 'Abbey Road', 'duration': 5247}
>>> tags = extract_ytdlp_tags(entry)
2025-11-27 10:59:01 -08:00
>>> debug(tags)
2025-11-25 20:09:33 -08:00
['artist:The Beatles', 'album:Abbey Road']
"""
tags: List[str] = []
seen_namespaces: Set[str] = set()
# Meaningful yt-dlp fields that should become tags
# This mapping excludes technical fields: filesize, duration, format_id, vcodec, acodec, ext, etc.
field_to_namespace = {
'artist': 'artist',
'album': 'album',
'creator': 'creator',
'uploader': 'creator', # Map uploader to creator (deduplicate)
'uploader_id': 'creator',
'channel': 'channel',
'genre': 'genre',
'track': 'track',
'track_number': 'track_number',
'release_date': 'release_date',
'upload_date': 'upload_date',
'title': 'title',
'license': 'license',
'location': 'location',
}
# Extract simple field mappings
for yt_field, namespace in field_to_namespace.items():
value = entry.get(yt_field)
if value is not None:
value_str = value_normalize(str(value))
if value_str:
# Prevent duplicate creator tags (only use first creator)
if namespace == 'creator':
if 'creator' in seen_namespaces:
continue
seen_namespaces.add('creator')
_add_tag(tags, namespace, value_str)
# Handle tags field specially (could be list, dict, or string)
# For list/sequence tags, capture as freeform (no namespace prefix)
tags_field = entry.get('tags')
if tags_field is not None:
if isinstance(tags_field, list):
# Tags is list: ["tag1", "tag2", ...] → capture as freeform tags (no "tag:" prefix)
# These are typically genre/category tags from the source (BandCamp genres, etc.)
for tag_value in tags_field:
if tag_value:
normalized = value_normalize(str(tag_value))
if normalized and normalized not in tags:
tags.append(normalized)
elif isinstance(tags_field, dict):
# Tags is dict: {"key": "val"} → tag:key:val
for key, val in tags_field.items():
if key and val:
key_normalized = value_normalize(str(key))
val_normalized = value_normalize(str(val))
if key_normalized and val_normalized:
_add_tag(tags, f'tag:{key_normalized}', val_normalized)
else:
# Tags is string or other: add as freeform
if tags_field:
normalized = value_normalize(str(tags_field))
if normalized and normalized not in tags:
tags.append(normalized)
return tags
def dedup_tags_by_namespace(tags: List[str], keep_first: bool = True) -> List[str]:
"""Deduplicate tags by namespace, keeping consistent order.
This is the UNIFIED API for tag deduplication used across all cmdlets.
Replaces custom deduplication logic in merge_file.py and other modules.
Groups tags by namespace (e.g., "artist", "album", "tag") and keeps
either the first or last occurrence of each namespace, then preserves
order based on first appearance.
Args:
tags: List of tags (with or without namespace prefixes)
keep_first: If True, keep first occurrence per namespace (default).
If False, keep last occurrence per namespace.
Returns:
Deduplicated tag list with consistent order
Example:
>>> tags = [
... 'artist:Beatles', 'album:Abbey Road',
... 'artist:Beatles', 'tag:rock',
... 'album:Abbey Road', 'artist:Beatles'
... ]
>>> dedup = dedup_tags_by_namespace(tags)
2025-11-27 10:59:01 -08:00
>>> debug(dedup)
2025-11-25 20:09:33 -08:00
['artist:Beatles', 'album:Abbey Road', 'tag:rock']
"""
if not tags:
return []
# Group tags by namespace
namespace_to_tags: Dict[Optional[str], List[Tuple[int, str]]] = {} # namespace → [(index, full_tag), ...]
first_appearance: Dict[Optional[str], int] = {} # namespace → first_index
for idx, tag in enumerate(tags):
# Extract namespace (part before ':')
if ':' in tag:
namespace: Optional[str] = tag.split(':', 1)[0]
else:
namespace = None # No namespace
# Track first appearance
if namespace not in first_appearance:
first_appearance[namespace] = idx
# Store tag with its index
if namespace not in namespace_to_tags:
namespace_to_tags[namespace] = []
namespace_to_tags[namespace].append((idx, tag))
# Build result: keep first or last occurrence per namespace
result: List[Tuple[int, str]] = [] # (first_appearance_index, tag)
for namespace, tag_list in namespace_to_tags.items():
if keep_first:
chosen_tag = tag_list[0][1] # First occurrence
else:
chosen_tag = tag_list[-1][1] # Last occurrence
result.append((first_appearance[namespace], chosen_tag))
# Sort by first appearance order, then extract tags
result.sort(key=lambda x: x[0])
return [tag for _, tag in result]
def merge_multiple_tag_lists(
sources: List[List[str]],
strategy: str = 'first'
) -> List[str]:
"""Intelligently merge multiple tag lists with smart deduplication.
This is the UNIFIED API for merging tags from multiple sources
(e.g., when merging multiple files or combining metadata sources).
Strategies:
- 'first': Keep first occurrence of each namespace (default)
- 'all': Keep all different values (different artists possible)
- 'combine': For non-namespace tags, combine all unique values
Args:
sources: List of tag lists to merge
strategy: Merge strategy - 'first', 'all', or 'combine'
Returns:
Merged and deduplicated tag list
Example:
>>> list1 = ['artist:Beatles', 'album:Abbey Road']
>>> list2 = ['artist:Beatles', 'album:Abbey Road', 'tag:rock']
>>> merged = merge_multiple_tag_lists([list1, list2])
2025-11-27 10:59:01 -08:00
>>> debug(merged)
2025-11-25 20:09:33 -08:00
['artist:Beatles', 'album:Abbey Road', 'tag:rock']
"""
if not sources:
return []
if strategy == 'first':
# Concatenate all lists and deduplicate by namespace
all_tags = []
for tag_list in sources:
all_tags.extend(tag_list or [])
return dedup_tags_by_namespace(all_tags, keep_first=True)
elif strategy == 'all':
# Keep all different values per namespace
namespace_to_values: Dict[Optional[str], Set[str]] = {}
order: List[Tuple[int, str, str]] = [] # (first_index, namespace, value)
global_index = 0
for source in sources:
if not source:
continue
for tag in source:
if ':' in tag:
namespace: Optional[str] = tag.split(':', 1)[0]
value = tag.split(':', 1)[1]
else:
namespace = None
value = tag
if namespace not in namespace_to_values:
namespace_to_values[namespace] = set()
order.append((global_index, namespace or '', tag))
elif value not in namespace_to_values[namespace]:
order.append((global_index, namespace or '', tag))
namespace_to_values[namespace].add(value)
global_index += 1
# Sort by order of first appearance and extract
order.sort(key=lambda x: x[0])
return [tag for _, _, tag in order]
elif strategy == 'combine':
# Combine all unique plain (non-namespace) tags
all_tags = []
namespaced: Dict[str, str] = {} # namespace → tag (first occurrence)
for source in sources:
if not source:
continue
for tag in source:
if ':' in tag:
namespace = tag.split(':', 1)[0]
if namespace not in namespaced:
namespaced[namespace] = tag
all_tags.append(tag)
else:
if tag not in all_tags:
all_tags.append(tag)
return all_tags
else:
raise ValueError(f"Unknown merge strategy: {strategy}")
def read_tags_from_file(file_path: Path) -> List[str]:
"""Read and normalize tags from .tags sidecar file.
This is the UNIFIED API for reading .tags files across all cmdlets.
Handles normalization, deduplication, and format validation.
Args:
file_path: Path to .tags sidecar file
Returns:
List of normalized tag strings
Raises:
FileNotFoundError: If file doesn't exist
Example:
>>> tags = read_tags_from_file(Path('file.txt.tags'))
2025-11-27 10:59:01 -08:00
>>> debug(tags)
2025-11-25 20:09:33 -08:00
['artist:Beatles', 'album:Abbey Road']
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"Tag file not found: {file_path}")
tags: List[str] = []
seen: Set[str] = set()
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
# Strip whitespace and skip empty lines
line = line.strip()
if not line:
continue
# Skip comment lines
if line.startswith('#'):
continue
# Normalize the tag
normalized = value_normalize(line)
if normalized and normalized not in seen:
seen.add(normalized)
tags.append(normalized)
except Exception as exc:
raise ValueError(f"Error reading tag file {file_path}: {exc}")
return tags
def embed_metadata_in_file(
file_path: Path,
tags: List[str],
file_kind: str = ''
) -> bool:
"""Embed metadata tags into a media file using FFmpeg.
Extracts metadata from tags (namespace:value format) and writes to the file's
metadata using FFmpeg with -c copy (no re-encoding).
Supported tag namespaces:
- title, artist, album, track/track_number, date/year, genre, composer, comment
For audio files, applies sensible defaults:
- If no album, uses title as album
- If no track, defaults to 1
- album_artist is set to artist value
Args:
file_path: Path to media file
tags: List of tags in format ['namespace:value', ...] (e.g., ['artist:Beatles', 'album:Abbey Road'])
file_kind: Type of file: 'audio', 'video', or '' for auto-detect (optional)
Returns:
True if successful, False otherwise
Raises:
None (logs errors to stderr)
Example:
>>> tags = ['artist:Beatles', 'album:Abbey Road', 'track:1']
>>> success = embed_metadata_in_file(Path('song.mp3'), tags, file_kind='audio')
"""
if not tags:
return True
file_path = Path(file_path)
# Tag namespace to FFmpeg metadata key mapping
tag_map = {
'title': 'title',
'artist': 'artist',
'album': 'album',
'track': 'track',
'track_number': 'track',
'date': 'date',
'year': 'date',
'genre': 'genre',
'composer': 'composer',
'comment': 'comment',
'known_url': 'comment', # Embed known URLs in comment field
'creator': 'artist', # Map creator to artist
'channel': 'album_artist', # Map channel to album_artist
}
# Extract metadata from tags
metadata = {}
comments = [] # Collect comments (including URLs)
for tag in tags:
tag_str = str(tag).strip()
if ':' in tag_str:
namespace, value = tag_str.split(':', 1)
namespace = namespace.lower().strip()
value = value.strip()
if namespace in tag_map and value:
ffmpeg_key = tag_map[namespace]
if namespace == 'known_url':
# Collect URLs as comments
comments.append(f"URL: {value}")
elif ffmpeg_key == 'comment':
# Collect other comment-type tags
comments.append(value)
elif ffmpeg_key not in metadata:
# Don't overwrite if already set from earlier tag
metadata[ffmpeg_key] = value
# Add collected comments to metadata
if comments:
if 'comment' in metadata:
metadata['comment'] = metadata['comment'] + ' | ' + ' | '.join(comments)
else:
metadata['comment'] = ' | '.join(comments)
# Apply sensible defaults for audio files
if file_kind == 'audio' or (not file_kind and file_path.suffix.lower() in {'.mp3', '.flac', '.wav', '.m4a', '.aac', '.ogg', '.opus', '.mka'}):
# If no album, use title as album
if 'album' not in metadata and 'title' in metadata:
metadata['album'] = metadata['title']
# If no track, default to 1
if 'track' not in metadata:
metadata['track'] = '1'
# If no album_artist, use artist
if 'artist' in metadata:
metadata['album_artist'] = metadata['artist']
if not metadata:
return True
# Check if FFmpeg is available
ffmpeg_path = shutil.which('ffmpeg')
if not ffmpeg_path:
2025-11-27 10:59:01 -08:00
debug(f"⚠️ FFmpeg not found; cannot embed metadata in {file_path.name}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
return False
# Create temporary file for output
temp_file = file_path.parent / f"{file_path.stem}.ffmpeg_tmp{file_path.suffix}"
try:
cmd = [ffmpeg_path, '-y', '-i', str(file_path)]
for key, value in metadata.items():
cmd.extend(['-metadata', f'{key}={value}'])
cmd.extend(['-c', 'copy', str(temp_file)])
# Run ffmpeg with error handling for non-UTF8 output
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=False, # Don't decode as text - ffmpeg may output binary data
timeout=30
)
if result.returncode == 0 and temp_file.exists():
# Replace original with temp file
file_path.unlink()
temp_file.rename(file_path)
2025-11-27 10:59:01 -08:00
debug(f"✅ Embedded metadata in file: {file_path.name}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
return True
else:
# Clean up temp file if it exists
if temp_file.exists():
temp_file.unlink()
2025-11-27 10:59:01 -08:00
debug(f"❌ FFmpeg metadata embedding failed for {file_path.name}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
if result.stderr:
# Safely decode stderr, ignoring invalid UTF-8 bytes
try:
stderr_text = result.stderr.decode('utf-8', errors='replace')[:200]
2025-11-27 10:59:01 -08:00
debug(f"FFmpeg stderr: {stderr_text}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
except Exception:
pass
return False
except Exception as exc:
if temp_file.exists():
try:
temp_file.unlink()
except Exception:
pass
2025-11-27 10:59:01 -08:00
debug(f"❌ Error embedding metadata: {exc}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
return False
def write_tags_to_file(
file_path: Path,
tags: List[str],
source_hashes: Optional[List[str]] = None,
known_urls: Optional[List[str]] = None,
append: bool = False
) -> bool:
"""Write tags to .tags sidecar file.
This is the UNIFIED API for writing .tags files across all cmdlets.
Uses consistent format and handles file creation/overwriting.
Args:
file_path: Path to .tags file (will be created if doesn't exist)
tags: List of tags to write
source_hashes: Optional source file hashes (written as source:hash1,hash2)
known_urls: Optional known URLs (each written on separate line as known_url:url)
append: If True, append to existing file; if False, overwrite (default)
Returns:
True if successful
Raises:
Exception: If file write fails
Example:
>>> tags = ['artist:Beatles', 'album:Abbey Road']
>>> write_tags_to_file(Path('file.txt.tags'), tags)
True
"""
file_path = Path(file_path)
try:
# Prepare content
content_lines: List[str] = []
# Add source hashes if provided
if source_hashes:
content_lines.append(f"source:{','.join(source_hashes)}")
# Add known URLs if provided - each on separate line to prevent corruption
if known_urls:
for url in known_urls:
content_lines.append(f"known_url:{url}")
# Add tags
if tags:
content_lines.extend(tags)
# Write to file
mode = 'a' if (append and file_path.exists()) else 'w'
with open(file_path, mode, encoding='utf-8') as f:
for line in content_lines:
f.write(line + '\n')
return True
except Exception as exc:
raise ValueError(f"Error writing tag file {file_path}: {exc}")
def normalize_tags_from_source(
source_data: Any,
source_type: str = 'auto'
) -> List[str]:
"""Normalize tags from any source format.
Universal function to normalize tags from different sources:
- yt-dlp entry dicts
- Raw tag lists
- .tags file content strings
- Metadata dictionaries
Args:
source_data: Source data (type determined by source_type or auto-detected)
source_type: One of 'auto', 'ytdlp', 'list', 'text', 'dict'
'auto' attempts to auto-detect the type
Returns:
Normalized, deduplicated tag list
Example:
>>> entry = {'artist': 'Beatles', 'album': 'Abbey Road'}
>>> tags = normalize_tags_from_source(entry, 'ytdlp')
2025-11-27 10:59:01 -08:00
>>> debug(tags)
2025-11-25 20:09:33 -08:00
['artist:Beatles', 'album:Abbey Road']
"""
if source_type == 'auto':
# Auto-detect source type
if isinstance(source_data, dict):
# Check if it looks like a yt-dlp entry (has id, title, url, etc.)
if 'id' in source_data or 'title' in source_data or 'uploader' in source_data:
source_type = 'ytdlp'
else:
source_type = 'dict'
elif isinstance(source_data, list):
source_type = 'list'
elif isinstance(source_data, str):
source_type = 'text'
else:
source_type = 'dict'
# Process based on detected/specified type
if source_type == 'ytdlp':
if not isinstance(source_data, dict):
raise ValueError("ytdlp source must be a dict")
return extract_ytdlp_tags(source_data)
elif source_type == 'list':
if not isinstance(source_data, (list, tuple)):
raise ValueError("list source must be a list or tuple")
# Normalize each tag in the list
result = []
for tag in source_data:
normalized = value_normalize(str(tag))
if normalized:
result.append(normalized)
return result
elif source_type == 'text':
if not isinstance(source_data, str):
raise ValueError("text source must be a string")
# Split by lines and normalize
lines = source_data.split('\n')
result = []
seen = set()
for line in lines:
line = line.strip()
if line and not line.startswith('#'):
normalized = value_normalize(line)
if normalized and normalized not in seen:
seen.add(normalized)
result.append(normalized)
return result
elif source_type == 'dict':
if not isinstance(source_data, dict):
raise ValueError("dict source must be a dict")
# Extract as generic metadata (similar to yt-dlp but from any dict)
return extract_ytdlp_tags(source_data)
else:
raise ValueError(f"Unknown source type: {source_type}")
def detect_metadata_request(tag: str) -> Optional[Dict[str, str]]:
trimmed = value_normalize(tag)
if not trimmed:
return None
lower = trimmed.lower()
imdb_match = re.match(r'^imdb:\s*(tt[\w]+)$', lower)
if imdb_match:
imdb_id = imdb_match.group(1)
return {
'source': 'imdb',
'id': imdb_id,
'base': f'imdb:{imdb_id}',
}
remainder = re.match(r'^musicbrainz:\s*(.+)$', lower)
if remainder:
raw = remainder.group(1)
entity = 'release'
identifier = raw
specific = re.match(r'^(?P<entity>[a-zA-Z]+)\s*:\s*(?P<id>[\w-]+)$', raw)
if specific:
entity = specific.group('entity')
identifier = specific.group('id')
identifier = identifier.replace(' ', '')
if identifier:
return {
'source': 'musicbrainz',
'entity': entity.lower(),
'id': identifier,
'base': f'musicbrainz:{identifier}',
}
return None
def expand_metadata_tag(payload: Dict[str, Any]) -> Dict[str, Any]:
tag = payload.get('tag')
if not isinstance(tag, str):
return {'tags': []}
trimmed = value_normalize(tag)
if not trimmed:
return {'tags': []}
request = detect_metadata_request(trimmed)
tags: List[str] = []
seen: Set[str] = set()
if request:
_append_unique(tags, seen, request['base'])
else:
_append_unique(tags, seen, trimmed)
return {'tags': tags}
try:
if request['source'] == 'imdb':
data = imdb_tag(request['id'])
else:
data = fetch_musicbrainz_tags(request['id'], request['entity'])
except Exception as exc: # pragma: no cover - network/service errors
return {'tags': tags, 'error': str(exc)}
# Add tags from fetched data (no namespace, just unique append)
for tag in (data.get('tags') or []):
_append_unique(tags, seen, tag)
result = {
'tags': tags,
'source': request['source'],
'id': request['id'],
}
if request['source'] == 'musicbrainz':
result['entity'] = request['entity']
return result
def build_remote_bundle(metadata: Optional[Dict[str, Any]], existing: Optional[Sequence[str]] = None, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
metadata = metadata or {}
context = context or {}
tags: List[str] = []
seen: Set[str] = set()
if existing:
for tag in existing:
_append_unique(tags, seen, tag)
# Add tags from various sources
for tag in (metadata.get("tags") or []):
_append_unique(tags, seen, tag)
for tag in (metadata.get("categories") or []):
_append_unique(tags, seen, tag)
# Extract and namespace genres
raw_genres = metadata.get("genres")
keywords = metadata.get("keywords")
if isinstance(keywords, str):
for token in keywords.split(","):
_append_unique(tags, seen, token)
if raw_genres:
for genre in (raw_genres if isinstance(raw_genres, (list, tuple)) else [raw_genres]):
if genre:
_append_unique(tags, seen, f"genre:{genre}")
# Extract creators/artists
artists = metadata.get("artists") or metadata.get("artist")
if artists:
artist_list = artists if isinstance(artists, (list, tuple)) else [artists]
for artist in artist_list:
if artist:
_append_unique(tags, seen, f"creator:{artist}")
creator = metadata.get("uploader") or metadata.get("channel") or metadata.get("artist") or metadata.get("creator")
if creator:
_append_unique(tags, seen, f"creator:{creator}")
# Extract title
title_value = metadata.get("title")
if title_value:
_extend_namespaced(tags, seen, "title", [title_value])
source_url = context.get("source_url") or metadata.get("original_url") or metadata.get("webpage_url") or metadata.get("url")
clean_title = value_normalize(str(title_value)) if title_value is not None else None
result = {
"tags": tags,
"title": clean_title,
"source_url": _sanitize_url(source_url),
"duration": _coerce_duration(metadata),
"metadata": metadata,
}
return result
def _load_payload(value: Optional[str]) -> Dict[str, Any]:
text = value
if text is None:
text = sys.stdin.read()
if text is None or text.strip() == "":
raise ValueError("Expected JSON payload")
data = json.loads(text)
if not isinstance(data, dict):
raise ValueError("Payload must be a JSON object")
return data
import typer
app = typer.Typer(help="Fetch metadata tags for known services")
@app.command(help="Lookup an IMDb title")
def imdb(imdb_id: str = typer.Argument(..., help="IMDb identifier (ttXXXXXXX)")):
"""Lookup an IMDb title."""
try:
result = imdb_tag(imdb_id)
2025-11-27 10:59:01 -08:00
debug(json.dumps(result, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
except Exception as exc:
error_payload = {"error": str(exc)}
2025-11-27 10:59:01 -08:00
debug(json.dumps(error_payload, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
raise typer.Exit(code=1)
@app.command(help="Lookup a MusicBrainz entity")
def musicbrainz(
mbid: str = typer.Argument(..., help="MusicBrainz identifier (UUID)"),
entity: str = typer.Option("release", help="Entity type (release, recording, artist)")
):
"""Lookup a MusicBrainz entity."""
try:
result = fetch_musicbrainz_tags(mbid, entity)
2025-11-27 10:59:01 -08:00
debug(json.dumps(result, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
except Exception as exc:
error_payload = {"error": str(exc)}
2025-11-27 10:59:01 -08:00
debug(json.dumps(error_payload, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
raise typer.Exit(code=1)
@app.command(name="remote-tags", help="Normalize a remote metadata payload")
def remote_tags(payload: Optional[str] = typer.Option(None, "--payload", help="JSON payload; reads stdin if omitted")):
"""Normalize a remote metadata payload."""
try:
payload_data = _load_payload(payload)
metadata = payload_data.get("metadata") or {}
existing = payload_data.get("existing_tags") or []
context = payload_data.get("context") or {}
if not isinstance(existing, list):
raise ValueError("existing_tags must be a list")
if context and not isinstance(context, dict):
raise ValueError("context must be an object")
result = build_remote_bundle(metadata, existing, context)
2025-11-27 10:59:01 -08:00
debug(json.dumps(result, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
except Exception as exc:
error_payload = {"error": str(exc)}
2025-11-27 10:59:01 -08:00
debug(json.dumps(error_payload, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
raise typer.Exit(code=1)
@app.command(name="remote-fetch", help="Resolve remote metadata bundle")
def remote_fetch(payload: Optional[str] = typer.Option(None, "--payload", help="JSON payload; reads stdin if omitted")):
"""Resolve remote metadata bundle."""
try:
payload_data = _load_payload(payload)
result = resolve_remote_metadata(payload_data)
2025-11-27 10:59:01 -08:00
debug(json.dumps(result, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
except Exception as exc:
error_payload = {"error": str(exc)}
2025-11-27 10:59:01 -08:00
debug(json.dumps(error_payload, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
raise typer.Exit(code=1)
@app.command(name="expand-tag", help="Expand metadata references into tags")
def expand_tag(payload: Optional[str] = typer.Option(None, "--payload", help="JSON payload; reads stdin if omitted")):
"""Expand metadata references into tags."""
try:
payload_data = _load_payload(payload)
result = expand_metadata_tag(payload_data)
2025-11-27 10:59:01 -08:00
debug(json.dumps(result, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
except Exception as exc:
error_payload = {"error": str(exc)}
2025-11-27 10:59:01 -08:00
debug(json.dumps(error_payload, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
raise typer.Exit(code=1)
@app.command(name="hydrus-fetch", help="Fetch Hydrus metadata for a file")
def hydrus_fetch(payload: Optional[str] = typer.Option(None, "--payload", help="JSON payload; reads stdin if omitted")):
"""Fetch Hydrus metadata for a file."""
try:
payload_data = _load_payload(payload)
result = fetch_hydrus_metadata(payload_data)
2025-11-27 10:59:01 -08:00
debug(json.dumps(result, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
except Exception as exc:
error_payload = {"error": str(exc)}
2025-11-27 10:59:01 -08:00
debug(json.dumps(error_payload, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
raise typer.Exit(code=1)
@app.command(name="hydrus-fetch-url", help="Fetch Hydrus metadata using a source URL")
def hydrus_fetch_url(payload: Optional[str] = typer.Option(None, "--payload", help="JSON payload; reads stdin if omitted")):
"""Fetch Hydrus metadata using a source URL."""
try:
payload_data = _load_payload(payload)
result = fetch_hydrus_metadata_by_url(payload_data)
2025-11-27 10:59:01 -08:00
debug(json.dumps(result, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
except Exception as exc:
error_payload = {"error": str(exc)}
2025-11-27 10:59:01 -08:00
debug(json.dumps(error_payload, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
raise typer.Exit(code=1)
@app.command(name="sync-sidecar", help="Synchronise .tags sidecar with supplied data")
def sync_sidecar_cmd(payload: Optional[str] = typer.Option(None, "--payload", help="JSON payload; reads stdin if omitted")):
"""Synchronise .tags sidecar with supplied data."""
try:
payload_data = _load_payload(payload)
result = sync_sidecar(payload_data)
2025-11-27 10:59:01 -08:00
debug(json.dumps(result, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
except Exception as exc:
error_payload = {"error": str(exc)}
2025-11-27 10:59:01 -08:00
debug(json.dumps(error_payload, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
raise typer.Exit(code=1)
@app.command(name="update-tag", help="Update or rename a tag")
def update_tag_cmd(payload: Optional[str] = typer.Option(None, "--payload", help="JSON payload; reads stdin if omitted")):
"""Update or rename a tag."""
try:
payload_data = _load_payload(payload)
result = apply_tag_mutation(payload_data, 'update')
2025-11-27 10:59:01 -08:00
debug(json.dumps(result, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
except Exception as exc:
error_payload = {"error": str(exc)}
2025-11-27 10:59:01 -08:00
debug(json.dumps(error_payload, ensure_ascii=False), flush=True)
2025-11-25 20:09:33 -08:00
raise typer.Exit(code=1)
def main(argv: Optional[List[str]] = None) -> int:
"""Main entry point using Typer."""
try:
app(argv, standalone_mode=False)
return 0
except SystemExit as e:
return e.code if isinstance(e.code, int) else 1
# ============================================================================
# TAG OPERATIONS - Consolidated from tag_operations.py and tag_helpers.py
# ============================================================================
def sort_tags(tags: List[str]) -> List[str]:
"""
Sort tags into namespace tags and freeform tags, then alphabetically.
Args:
tags: List of tag strings
Returns:
Sorted list with namespace tags first, then freeform tags
"""
if not tags:
return []
namespace_tags = []
freeform_tags = []
for tag in tags:
if isinstance(tag, str):
if ':' in tag:
namespace_tags.append(tag)
else:
freeform_tags.append(tag)
namespace_tags.sort()
freeform_tags.sort()
return namespace_tags + freeform_tags
def format_tags_display(tags: List[str], namespace_filter: Optional[str] = None) -> List[str]:
"""
Format tags for display, optionally filtered by namespace.
Args:
tags: List of tags
namespace_filter: Optional namespace to filter by (e.g., "creator:")
Returns:
Formatted list of tags
"""
if not tags:
return []
if namespace_filter:
filtered = [t for t in tags if t.startswith(namespace_filter)]
return sort_tags(filtered)
return sort_tags(tags)
def split_tag(tag: str) -> tuple[str, str]:
"""
Split a tag into namespace and value.
Args:
tag: Tag string (e.g., "creator:Author Name" or "freeform tag")
Returns:
Tuple of (namespace, value). For freeform tags, namespace is empty string.
"""
if ':' in tag:
parts = tag.split(':', 1)
return parts[0], parts[1]
return '', tag
def filter_tags_by_namespace(tags: List[str], namespace: str) -> List[str]:
"""
Get all tags in a specific namespace.
Args:
tags: List of tags
namespace: Namespace to filter by
Returns:
List of values in that namespace
"""
prefix = namespace + ':'
return [split_tag(t)[1] for t in tags if t.startswith(prefix)]
def ensure_title_tag(tags: List[str], title: str) -> List[str]:
"""
Ensure there's a title: tag with the given title.
Args:
tags: List of existing tags
title: Title to ensure exists
Returns:
Updated tag list
"""
if not title:
return tags
# Remove any existing title tags
filtered = [t for t in tags if not t.startswith('title:')]
# Add new title tag
new_tags = filtered + [f'title:{title}']
return sort_tags(new_tags)
def remove_title_tags(tags: List[str]) -> List[str]:
"""Remove all title: tags."""
return [t for t in tags if not t.startswith('title:')]
def is_namespace_tag(tag: str) -> bool:
"""Check if a tag is a namespace tag (contains :)."""
return ':' in tag if isinstance(tag, str) else False
def validate_tag(tag: str) -> bool:
"""
Validate that a tag is properly formatted.
Args:
tag: Tag to validate
Returns:
True if tag is valid
"""
if not isinstance(tag, str) or not tag.strip():
return False
# Tag shouldn't have leading/trailing whitespace
if tag != tag.strip():
return False
# Tag shouldn't be empty
if not tag:
return False
return True
def normalize_tags(tags: List[Any]) -> List[str]:
"""
Normalize a tag list by filtering and cleaning.
Args:
tags: List of tags (may contain invalid entries)
Returns:
Cleaned list of valid tags
"""
if not tags:
return []
normalized = []
for tag in tags:
if isinstance(tag, str):
trimmed = tag.strip()
if trimmed and validate_tag(trimmed):
normalized.append(trimmed)
return sort_tags(normalized)
def merge_tag_lists(*tag_lists: List[str]) -> List[str]:
"""
Merge multiple tag lists, removing duplicates.
Args:
*tag_lists: Variable number of tag lists
Returns:
Merged, deduplicated, sorted list
"""
merged = set()
for tag_list in tag_lists:
if isinstance(tag_list, list):
merged.update(tag_list)
return sort_tags(list(merged))
def tag_diff(old_tags: List[str], new_tags: List[str]) -> Dict[str, List[str]]:
"""
Calculate the difference between two tag lists.
Args:
old_tags: Original tags
new_tags: New tags
Returns:
Dict with 'added' and 'removed' keys
"""
old_set = set(old_tags) if old_tags else set()
new_set = set(new_tags) if new_tags else set()
return {
'added': sorted(list(new_set - old_set)),
'removed': sorted(list(old_set - new_set))
}
def expand_tag_lists(tags_set: Set[str]) -> Set[str]:
"""Expand tag list references like {psychology} to actual tags from adjective.json.
Removes the reference after expansion (e.g., {psychology} is deleted, psychology tags added).
Args:
tags_set: Set of tag strings that may include {list_name} references
Returns:
Set of expanded tags with all {list_name} references replaced with actual tags
"""
# Load adjective.json from workspace root
adjective_path = Path(__file__).parent / "adjective.json"
if not adjective_path.exists():
log.debug(f"adjective.json not found at {adjective_path}")
return tags_set
try:
with open(adjective_path, 'r') as f:
adjective_lists = json.load(f)
except Exception as e:
log.error(f"Error loading adjective.json: {e}")
return tags_set
expanded_tags = set()
for tag in tags_set:
# Check if tag is a list reference like {psychology}
if tag.startswith('{') and tag.endswith('}'):
list_name = tag[1:-1].lower() # Extract name, make lowercase
# Find matching list (case-insensitive)
matched_list = None
for key in adjective_lists.keys():
if key.lower() == list_name:
matched_list = adjective_lists[key]
break
if matched_list:
# Add all tags from the list
expanded_tags.update(matched_list)
log.info(f"Expanded {tag} to {len(matched_list)} tags")
else:
# List not found, log warning but don't add the reference
log.warning(f"Tag list '{list_name}' not found in adjective.json")
else:
# Regular tag, keep as is
expanded_tags.add(tag)
return expanded_tags
def process_tags_from_string(tags_str: str, expand_lists: bool = False) -> Set[str]:
"""Process a tag string into a set of tags.
Handles:
- Multiple formats: comma-separated, newline-separated, space-separated
- Tag list expansion: {psychology} -> psychology tags (if expand_lists=True)
- Whitespace trimming
Args:
tags_str: Raw tag string
expand_lists: If True, expand {list_name} references using adjective.json
Returns:
Set of processed tags
"""
if not tags_str:
return set()
# Try to detect delimiter and split accordingly
# Prefer newlines, then commas, then spaces
if '\n' in tags_str:
delimiter = '\n'
elif ',' in tags_str:
delimiter = ','
else:
delimiter = ' '
# Split and clean tags
tags_set = set()
for tag in tags_str.split(delimiter):
tag = tag.strip()
if tag:
tags_set.add(tag)
# Expand list references if requested
if expand_lists:
tags_set = expand_tag_lists(tags_set)
return tags_set
def fetch_openlibrary_metadata_tags(isbn: Optional[str] = None, olid: Optional[str] = None) -> List[str]:
"""Fetch book metadata from OpenLibrary and return as tags.
Args:
isbn: ISBN number (with or without isbn: prefix)
olid: OpenLibrary ID
Returns:
List of tags extracted from OpenLibrary metadata
"""
metadata_tags = []
# Try OLID first (preferred), then ISBN
url = None
if olid:
# Clean up OLID format
olid_clean = str(olid).replace('OL', '').replace('M', '').replace('W', '')
if olid_clean.isdigit():
url = f"https://openlibrary.org/books/OL{olid_clean}M.json"
else:
url = f"https://openlibrary.org/books/{olid}.json"
elif isbn:
# Clean up ISBN
isbn_clean = str(isbn).replace('isbn:', '').strip()
url = f"https://openlibrary.org/isbn/{isbn_clean}.json"
if not url:
return metadata_tags
try:
response = requests.get(url, timeout=10)
if response.status_code != 200:
return metadata_tags
data = response.json()
if not data:
return metadata_tags
# Extract title
if 'title' in data:
metadata_tags.append(f"title:{data['title']}")
# Extract authors
if 'authors' in data and isinstance(data['authors'], list):
for author in data['authors'][:3]:
if isinstance(author, dict) and 'name' in author:
metadata_tags.append(f"author:{author['name']}")
elif isinstance(author, str):
metadata_tags.append(f"author:{author}")
# Extract publish date
if 'publish_date' in data:
metadata_tags.append(f"publish_date:{data['publish_date']}")
# Extract publishers
if 'publishers' in data and isinstance(data['publishers'], list):
for pub in data['publishers'][:1]:
if isinstance(pub, dict) and 'name' in pub:
metadata_tags.append(f"publisher:{pub['name']}")
elif isinstance(pub, str):
metadata_tags.append(f"publisher:{pub}")
# Extract number of pages
if 'number_of_pages' in data:
page_count = data['number_of_pages']
if page_count and isinstance(page_count, int) and page_count > 0:
metadata_tags.append(f"pages:{page_count}")
# Extract language
if 'languages' in data and isinstance(data['languages'], list) and data['languages']:
lang = data['languages'][0]
if isinstance(lang, dict) and 'key' in lang:
lang_code = lang['key'].split('/')[-1]
metadata_tags.append(f"language:{lang_code}")
elif isinstance(lang, str):
metadata_tags.append(f"language:{lang}")
# Extract subjects as freeform tags (limit to 5)
if 'subjects' in data and isinstance(data['subjects'], list):
for subject in data['subjects'][:5]:
if subject and isinstance(subject, str):
subject_clean = str(subject).strip()
if subject_clean:
metadata_tags.append(subject_clean)
except Exception as e:
2025-11-27 10:59:01 -08:00
debug(f"⚠ Failed to fetch OpenLibrary metadata: {e}")
2025-11-25 20:09:33 -08:00
return metadata_tags
def enrich_playlist_entries(entries: list, extractor: str) -> list:
"""Enrich playlist entries with full metadata by fetching individual entry info.
When extract_flat is used, entries contain minimal info (title, id, url).
This function fetches full metadata for each entry.
Args:
entries: List of entry dicts from probe_url
extractor: Extractor name
Returns:
List of enriched entry dicts
"""
# Import here to avoid circular dependency
from helper.download import is_url_supported_by_ytdlp
if not entries:
return entries
enriched = []
for entry in entries:
# If entry has a direct URL, fetch its full metadata
entry_url = entry.get("url")
if entry_url and is_url_supported_by_ytdlp(entry_url):
try:
import yt_dlp
ydl_opts = {
"quiet": True,
"no_warnings": True,
"skip_download": True,
"noprogress": True,
"socket_timeout": 5,
"retries": 1,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
full_info = ydl.extract_info(entry_url, download=False)
if full_info:
enriched.append(full_info)
continue
except Exception:
pass
# Fallback to original entry if fetch failed
enriched.append(entry)
return enriched
def format_playlist_entry(entry: Dict[str, Any], index: int, extractor: str) -> Dict[str, Any]:
"""Format a playlist entry for display in result table.
Args:
entry: Single playlist entry from yt-dlp (fully enriched if possible)
index: 1-based track number
extractor: Extractor name (youtube, bandcamp, spotify, etc.)
Returns:
Dict with displayable fields for result table
"""
result = {
"index": index,
"title": entry.get("title", "Unknown"),
"duration": entry.get("duration") or entry.get("length") or 0,
"uploader": entry.get("uploader") or entry.get("creator") or "",
"artist": entry.get("artist") or entry.get("uploader") or entry.get("creator") or "",
"album": entry.get("album") or "",
"track_number": entry.get("track_number") or index,
}
# Normalize extractor for comparison
ext_lower = extractor.lower().replace(":", "").replace(" ", "")
# Add site-specific fields
if "youtube" in ext_lower:
result["video_id"] = entry.get("id", "")
result["channel"] = entry.get("uploader") or entry.get("channel", "")
result["views"] = entry.get("view_count", 0)
elif "bandcamp" in ext_lower:
result["track_number"] = entry.get("track_number") or index
# For Bandcamp album entries, track info may be in different fields
result["artist"] = entry.get("artist") or entry.get("uploader", "")
result["album"] = entry.get("album") or ""
elif "spotify" in ext_lower:
result["artists"] = entry.get("creator") or entry.get("uploader", "")
result["album"] = entry.get("album", "")
result["release_date"] = entry.get("release_date", "")
return result