This commit is contained in:
2026-01-19 06:24:09 -08:00
parent a961ac3ce7
commit 7ddf0065d1
45 changed files with 627 additions and 411 deletions

View File

@@ -17,6 +17,14 @@ try: # Optional; used for IMDb lookup without API key
from imdbinfo.services import search_title # type: ignore
except Exception: # pragma: no cover - optional dependency
search_title = None # type: ignore[assignment]
try:
import mutagen
except ImportError:
mutagen = None
try:
import musicbrainzngs
except ImportError:
musicbrainzngs = None
def value_normalize(value: Any) -> str:
@@ -93,6 +101,52 @@ def _sanitize_url(value: Optional[str]) -> Optional[str]:
return cleaned
def sanitize_metadata_value(value: Any) -> str:
if value is None:
return ""
if isinstance(value, (list, tuple)):
value = ", ".join(str(v) for v in value if v)
return str(value).strip().replace("\n", " ").replace("\r", " ")
def unique_preserve_order(items: Iterable[Any]) -> list[Any]:
seen = set()
result = []
for item in items:
if item not in seen:
seen.add(item)
result.append(item)
return result
def fetch_musicbrainz_tags(mbid: str, entity: str = "release") -> Dict[str, Any]:
if not musicbrainzngs:
return {"tag": []}
musicbrainzngs.set_useragent("Medeia-Macina", "0.1")
tags: list[str] = []
try:
if entity == "release":
res = musicbrainzngs.get_release_by_id(mbid, includes=["tags"])
tags_list = res.get("release", {}).get("tag-list", [])
elif entity == "recording":
res = musicbrainzngs.get_recording_by_id(mbid, includes=["tags"])
tags_list = res.get("recording", {}).get("tag-list", [])
elif entity == "artist":
res = musicbrainzngs.get_artist_by_id(mbid, includes=["tags"])
tags_list = res.get("artist", {}).get("tag-list", [])
else:
return {"tag": []}
for t in tags_list:
if isinstance(t, dict) and "name" in t:
tags.append(t["name"])
except Exception as exc:
debug(f"MusicBrainz lookup failed: {exc}")
return {"tag": tags}
def _clean_existing_tags(existing: Any) -> List[str]:
tags: List[str] = []
seen: Set[str] = set()
@@ -601,7 +655,7 @@ def write_tags(
# Write via consolidated function
try:
lines = []
lines: List[str] = []
lines.extend(str(tag).strip().lower() for tag in tag_list if str(tag).strip())
if lines:
@@ -2415,11 +2469,6 @@ def scrape_url_metadata(
try:
import json as json_module
try:
from SYS.metadata import extract_ytdlp_tags
except ImportError:
extract_ytdlp_tags = None
# Build yt-dlp command with playlist support
# IMPORTANT: Do NOT use --flat-playlist! It strips metadata like artist, album, uploader, genre
# Without it, yt-dlp gives us full metadata in an 'entries' array within a single JSON object
@@ -2462,14 +2511,13 @@ def scrape_url_metadata(
# is_playlist = 'entries' in data and isinstance(data.get('entries'), list)
# Extract tags and playlist items
tags = []
playlist_items = []
tags: List[str] = []
playlist_items: List[Dict[str, Any]] = []
# IMPORTANT: Extract album/playlist-level tags FIRST (before processing entries)
# This ensures we get metadata about the collection, not just individual tracks
if extract_ytdlp_tags:
album_tags = extract_ytdlp_tags(data)
tags.extend(album_tags)
album_tags = extract_ytdlp_tags(data)
tags.extend(album_tags)
# Case 1: Entries are nested in the main object (standard playlist structure)
if "entries" in data and isinstance(data.get("entries"), list):
@@ -2493,41 +2541,40 @@ def scrape_url_metadata(
# Extract tags from each entry and merge (but don't duplicate album-level tags)
# Only merge entry tags that are multi-value prefixes (not single-value like title:, artist:, etc.)
if extract_ytdlp_tags:
entry_tags = extract_ytdlp_tags(entry)
entry_tags = extract_ytdlp_tags(entry)
# Single-value namespaces that should not be duplicated from entries
single_value_namespaces = {
"title",
"artist",
"album",
"creator",
"channel",
"release_date",
"upload_date",
"license",
"location",
}
# Single-value namespaces that should not be duplicated from entries
single_value_namespaces = {
"title",
"artist",
"album",
"creator",
"channel",
"release_date",
"upload_date",
"license",
"location",
}
for tag in entry_tags:
# Extract the namespace (part before the colon)
tag_namespace = tag.split(":",
1)[0].lower(
) if ":" in tag else None
for tag in entry_tags:
# Extract the namespace (part before the colon)
tag_namespace = tag.split(":",
1)[0].lower(
) if ":" in tag else None
# Skip if this namespace already exists in tags (from album level)
if tag_namespace and tag_namespace in single_value_namespaces:
# Check if any tag with this namespace already exists in tags
already_has_namespace = any(
t.split(":",
1)[0].lower() == tag_namespace for t in tags
if ":" in t
)
if already_has_namespace:
continue # Skip this tag, keep the album-level one
# Skip if this namespace already exists in tags (from album level)
if tag_namespace and tag_namespace in single_value_namespaces:
# Check if any tag with this namespace already exists in tags
already_has_namespace = any(
t.split(":",
1)[0].lower() == tag_namespace for t in tags
if ":" in t
)
if already_has_namespace:
continue # Skip this tag, keep the album-level one
if tag not in tags: # Avoid exact duplicates
tags.append(tag)
if tag not in tags: # Avoid exact duplicates
tags.append(tag)
# Case 2: Playlist detected by playlist_count field (BandCamp albums, etc.)
# These need a separate call with --flat-playlist to get the actual entries
@@ -2586,7 +2633,7 @@ def scrape_url_metadata(
pass # Silently ignore if we can't get playlist entries
# Fallback: if still no tags detected, get from first item
if not tags and extract_ytdlp_tags:
if not tags:
tags = extract_ytdlp_tags(data)
# Extract formats from the main data object
@@ -2595,11 +2642,7 @@ def scrape_url_metadata(
formats = extract_url_formats(data.get("formats", []))
# Deduplicate tags by namespace to prevent duplicate title:, artist:, etc.
try:
if dedup_tags_by_namespace:
tags = dedup_tags_by_namespace(tags, keep_first=True)
except Exception:
pass # If dedup fails, return tags as-is
tags = dedup_tags_by_namespace(tags, keep_first=True)
return title, tags, formats, playlist_items
@@ -2617,8 +2660,8 @@ def extract_url_formats(formats: list) -> List[Tuple[str, str]]:
Returns list of (display_label, format_id) tuples.
"""
try:
video_formats = {} # {resolution: format_data}
audio_formats = {} # {quality_label: format_data}
video_formats: Dict[str, Dict[str, Any]] = {} # {resolution: format_data}
audio_formats: Dict[str, Dict[str, Any]] = {} # {quality_label: format_data}
for fmt in formats:
vcodec = fmt.get("vcodec", "none")
@@ -2655,7 +2698,7 @@ def extract_url_formats(formats: list) -> List[Tuple[str, str]]:
"abr": abr,
}
result = []
result: List[Tuple[str, str]] = []
# Add video formats in descending resolution order
for res in sorted(video_formats.keys(),
@@ -2674,3 +2717,237 @@ def extract_url_formats(formats: list) -> List[Tuple[str, str]]:
except Exception as e:
log(f"Error extracting formats: {e}", file=sys.stderr)
return []
def prepare_ffmpeg_metadata(payload: Optional[dict[str, Any]]) -> dict[str, str]:
if not isinstance(payload, dict):
return {}
metadata: dict[str, str] = {}
def set_field(key: str, raw: Any, limit: int = 2000) -> None:
sanitized = sanitize_metadata_value(raw)
if not sanitized:
return
if len(sanitized) > limit:
sanitized = sanitized[:limit]
metadata[key] = sanitized
set_field("title", payload.get("title"))
set_field("artist", payload.get("artist"), 512)
set_field("album", payload.get("album"), 512)
set_field("date", payload.get("year") or payload.get("date"), 20)
comment = payload.get("comment")
tags_value = payload.get("tags")
tag_strings: list[str] = []
artists_from_tags: list[str] = []
albums_from_tags: list[str] = []
genres_from_tags: list[str] = []
if isinstance(tags_value, list):
for raw_tag in tags_value:
if raw_tag is None:
continue
if not isinstance(raw_tag, str):
raw_tag = str(raw_tag)
tag = raw_tag.strip()
if not tag:
continue
tag_strings.append(tag)
namespace, sep, value = tag.partition(":")
if sep and value:
ns = namespace.strip().lower()
value = value.strip()
if ns in {"artist", "creator", "author", "performer"}:
artists_from_tags.append(value)
elif ns in {"album", "series", "collection", "group"}:
albums_from_tags.append(value)
elif ns in {"genre", "rating"}:
genres_from_tags.append(value)
elif ns in {"comment", "description"} and not comment:
comment = value
elif ns in {"year", "date"} and not (payload.get("year") or payload.get("date")):
set_field("date", value, 20)
else:
genres_from_tags.append(tag)
if "artist" not in metadata and artists_from_tags:
set_field("artist", ", ".join(unique_preserve_order(artists_from_tags)[:3]), 512)
if "album" not in metadata and albums_from_tags:
set_field("album", unique_preserve_order(albums_from_tags)[0], 512)
if genres_from_tags:
set_field("genre", ", ".join(unique_preserve_order(genres_from_tags)[:5]), 256)
if tag_strings:
joined_tags = ", ".join(tag_strings[:50])
set_field("keywords", joined_tags, 2000)
if not comment:
comment = joined_tags
if comment:
set_field("comment", str(comment), 2000)
set_field("description", str(comment), 2000)
return metadata
def apply_mutagen_metadata(path: Path, metadata: dict[str, str], fmt: str) -> None:
if fmt != "audio":
return
if not metadata:
return
if mutagen is None:
return
try:
audio = mutagen.File(path, easy=True) # type: ignore[attr-defined]
except Exception as exc: # pragma: no cover - best effort only
log(f"mutagen load failed: {exc}", file=sys.stderr)
return
if audio is None:
return
field_map = {
"title": "title",
"artist": "artist",
"album": "album",
"genre": "genre",
"comment": "comment",
"description": "comment",
"date": "date",
}
changed = False
for source_key, target_key in field_map.items():
value = metadata.get(source_key)
if not value:
continue
try:
audio[target_key] = [value]
changed = True
except Exception: # pragma: no cover - best effort only
continue
if not changed:
return
try:
audio.save()
except Exception as exc: # pragma: no cover - best effort only
log(f"mutagen save failed: {exc}", file=sys.stderr)
def build_ffmpeg_command(
ffmpeg_path: str,
input_path: Path,
output_path: Path,
fmt: str,
max_width: int,
metadata: Optional[dict[str, str]] = None,
) -> list[str]:
cmd = [ffmpeg_path, "-y", "-i", str(input_path)]
if fmt in {"mp4", "webm"} and max_width and max_width > 0:
cmd.extend(["-vf", f"scale='min({max_width},iw)':-2"])
if metadata:
for key, value in metadata.items():
cmd.extend(["-metadata", f"{key}={value}"])
# Video formats
if fmt == "mp4":
cmd.extend([
"-c:v",
"libx265",
"-preset",
"medium",
"-crf",
"26",
"-tag:v",
"hvc1",
"-pix_fmt",
"yuv420p",
"-c:a",
"aac",
"-b:a",
"192k",
"-movflags",
"+faststart",
])
elif fmt == "webm":
cmd.extend([
"-c:v",
"libvpx-vp9",
"-b:v",
"0",
"-crf",
"32",
"-c:a",
"libopus",
"-b:a",
"160k",
])
cmd.extend(["-f", "webm"])
# Audio formats
elif fmt == "mp3":
cmd.extend([
"-vn",
"-c:a",
"libmp3lame",
"-b:a",
"192k",
])
cmd.extend(["-f", "mp3"])
elif fmt == "flac":
cmd.extend([
"-vn",
"-c:a",
"flac",
])
cmd.extend(["-f", "flac"])
elif fmt == "wav":
cmd.extend([
"-vn",
"-c:a",
"pcm_s16le",
])
cmd.extend(["-f", "wav"])
elif fmt == "aac":
cmd.extend([
"-vn",
"-c:a",
"aac",
"-b:a",
"192k",
])
cmd.extend(["-f", "adts"])
elif fmt == "m4a":
cmd.extend([
"-vn",
"-c:a",
"aac",
"-b:a",
"192k",
])
cmd.extend(["-f", "ipod"])
elif fmt == "ogg":
cmd.extend([
"-vn",
"-c:a",
"libvorbis",
"-b:a",
"192k",
])
cmd.extend(["-f", "ogg"])
elif fmt == "opus":
cmd.extend([
"-vn",
"-c:a",
"libopus",
"-b:a",
"192k",
])
cmd.extend(["-f", "opus"])
elif fmt == "audio":
# Legacy format name for mp3
cmd.extend([
"-vn",
"-c:a",
"libmp3lame",
"-b:a",
"192k",
])
cmd.extend(["-f", "mp3"])
elif fmt != "copy":
raise ValueError(f"Unsupported format: {fmt}")
cmd.append(str(output_path))
return cmd