import json import re import subprocess import sys import shutil from SYS.logger import log, debug from urllib.parse import urlsplit, urlunsplit, unquote from collections import deque from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple from API.HydrusNetwork import apply_hydrus_tag_mutation, fetch_hydrus_metadata, fetch_hydrus_metadata_by_url from SYS.models import FileRelationshipTracker try: # Optional; used when available for richer metadata fetches import yt_dlp except Exception: # pragma: no cover - optional dependency yt_dlp = None try: # Optional; used for IMDb lookup without API key from imdbinfo.services import search_title # type: ignore except Exception: # pragma: no cover - optional dependency search_title = None # type: ignore[assignment] def value_normalize(value: Any) -> str: text = str(value).strip() return text.lower() if text else "" def _append_unique(target: List[str], seen: Set[str], value: Any) -> None: normalized = value_normalize(str(value)) if not normalized or normalized in seen: return seen.add(normalized) target.append(normalized) def _normalize_tag(tag: Any) -> Optional[str]: if tag is None: return None normalized = value_normalize(tag) return normalized or None def _extend_namespaced( target: List[str], seen: Set[str], namespace: str, values: Iterable[Optional[str]] ) -> None: """Append namespaced values if not already in seen set.""" for val in values: if val: _append_unique(target, seen, f"{namespace}:{val}") def _coerce_duration(metadata: Dict[str, Any]) -> Optional[float]: for key in ("duration", "duration_seconds", "length", "duration_sec"): value = metadata.get(key) if value is None: continue if isinstance(value, (int, float)): if value > 0: return float(value) elif isinstance(value, str): try: candidate = float(value.strip()) except ValueError: continue if candidate > 0: return candidate return None def _sanitize_url(value: Optional[str]) -> Optional[str]: """Sanitize URL: normalize and remove ytdl:// prefix.""" if value is None: return None cleaned = value_normalize(str(value)) if not cleaned: return None if cleaned.lower().startswith("ytdl://"): cleaned = cleaned[7:] return cleaned def _clean_existing_tags(existing: Any) -> List[str]: tags: List[str] = [] seen: Set[str] = set() if isinstance(existing, (list, tuple, set)): iterable = existing elif existing is None: iterable = [] else: iterable = [existing] for tag in iterable: _append_unique(tags, seen, tag) return tags def _should_fetch_url(url: Optional[str]) -> bool: if not url or not isinstance(url, str): return False return url.lower().startswith(("http://", "https://")) def fetch_remote_metadata(url: str, options: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], List[str]]: warnings: List[str] = [] info: Optional[Dict[str, Any]] = None if yt_dlp is not None: try: # pragma: no cover - depends on runtime availability ydl_opts = { "quiet": True, "no_warnings": True, "skip_download": True, "noplaylist": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[attr-defined] info_dict = ydl.extract_info(url, download=False) if info_dict is not None: info = dict(info_dict) except Exception as exc: # pragma: no cover - best effort warnings.append(f"yt_dlp extract failed: {exc}") if info is None: executable = str(options.get("ytdlp_path") or "yt-dlp") extra_args = options.get("ytdlp_args") or [] if isinstance(extra_args, (str, bytes)): extra_args = [extra_args] cmd = [ executable, "--dump-single-json", "--no-playlist", "--skip-download", "--no-warnings", ] cmd.extend(str(arg) for arg in extra_args) cmd.append(url) timeout = float(options.get("timeout") or 45.0) try: completed = subprocess.run( cmd, capture_output=True, text=True, check=False, timeout=timeout ) except Exception as exc: # pragma: no cover - subprocess failure warnings.append(f"yt-dlp invocation failed: {exc}") return None, warnings if completed.returncode != 0: message = ( completed.stderr.strip() or completed.stdout.strip() or f"status {completed.returncode}" ) warnings.append(message) return None, warnings try: info = json.loads(completed.stdout) except json.JSONDecodeError as exc: # pragma: no cover - parse failure warnings.append(f"invalid JSON from yt-dlp: {exc}") return None, warnings if isinstance(info, dict) and "entries" in info: entries = info.get("entries") if isinstance(entries, list) and entries: info = entries[0] if isinstance(info, dict): info.setdefault("source_url", url) return info if isinstance(info, dict) else None, warnings def resolve_remote_metadata(payload: Dict[str, Any]) -> Dict[str, Any]: options_raw = payload.get("options") options: Dict[str, Any] = options_raw if isinstance(options_raw, dict) else {} source_url = payload.get("source_url") sanitized = _sanitize_url(source_url) or source_url existing_tags = _clean_existing_tags(payload.get("existing_tags")) metadata_sources: List[Dict[str, Any]] = [] for key in ("metadata", "mpv_metadata", "remote_metadata", "info"): candidate = payload.get(key) if isinstance(candidate, dict): metadata_sources.append(candidate) remote_info: Optional[Dict[str, Any]] = None warnings: List[str] = [] if not options.get("no_fetch"): fetch_url = sanitized if _should_fetch_url(fetch_url): remote_info, fetch_warnings = fetch_remote_metadata(fetch_url or "", options) warnings.extend(fetch_warnings) if remote_info: metadata_sources.append(remote_info) combined_metadata = {} for source in metadata_sources: if isinstance(source, dict): combined_metadata.update(source) context = { "source_url": sanitized } bundle = build_remote_bundle(combined_metadata, existing_tags, context) merged_metadata = { **combined_metadata, **(bundle.get("metadata") or {}) } bundle["metadata"] = merged_metadata if not bundle.get("source_url"): bundle["source_url"] = sanitized mpv_meta_candidate = payload.get("mpv_metadata") mpv_metadata = mpv_meta_candidate if isinstance(mpv_meta_candidate, dict) else None result_tags = bundle.get("tags") or existing_tags result = { "source": "remote-metadata", "id": sanitized or "unknown", "tags": result_tags, "title": bundle.get("title"), "source_url": bundle.get("source_url") or sanitized, "duration": bundle.get("duration"), "metadata": merged_metadata, "remote_metadata": remote_info, "warnings": warnings, "mpv_metadata": mpv_metadata, } return result def imdb_tag(imdb_id: str, timeout: float = 10.0) -> Dict[str, Any]: """Fetch IMDb data using imdbinfo (no API key required). Returns at minimum an imdb: tag. When imdbinfo is installed, enriches with title/year/type/rating from the first search result for the id. """ normalized = value_normalize(imdb_id) if not normalized: raise ValueError("imdb_id is required") if not normalized.startswith("tt"): normalized = f"tt{normalized}" tags: List[str] = [] seen: Set[str] = set() _append_unique(tags, seen, f"imdb:{normalized}") result: Dict[str, Any] = { "id": normalized, "tag": tags, } if search_title is None: result["warnings"] = ["imdbinfo is not installed; returning minimal IMDb tag"] return result try: search_result = search_title(normalized, timeout=timeout) except Exception as exc: # pragma: no cover - network dependent result["warnings"] = [f"IMDb lookup failed: {exc}"] return result titles = getattr(search_result, "titles", None) or [] if not titles: result["warnings"] = ["IMDb lookup returned no data"] return result entry = titles[0] title = getattr(entry, "title", None) or getattr(entry, "title_localized", None) year = getattr(entry, "year", None) kind = getattr(entry, "kind", None) rating = getattr(entry, "rating", None) if title: _append_unique(tags, seen, f"title:{title}") if year: _append_unique(tags, seen, f"year:{year}") if kind: _append_unique(tags, seen, f"type:{kind}") if rating: _append_unique(tags, seen, f"rating:{rating}") result["metadata"] = { "title": title, "year": year, "type": kind, "rating": rating, } result["tag"] = tags return result def normalize_urls(value: Any) -> List[str]: """Normalize a URL field into a stable, deduplicated list. Accepts: - None - a single URL string (optionally containing multiple URLs) - a list/tuple/set of URL strings This helper is used by cmdlets/stores/pipeline to keep `url` consistent. """ def _iter_raw_urls(raw: Any) -> Iterable[str]: if raw is None: return if isinstance(raw, str): text = raw.strip() if not text: return # Support legacy prefixes like "url:https://...". if text.lower().startswith("url:"): text = text.split(":", 1)[1].strip() # Prefer extracting obvious URLs to avoid splitting inside query strings. matches = re.findall(r"https?://[^\s,]+", text, flags=re.IGNORECASE) if matches: for m in matches: yield m return # Fallback: split on commas/whitespace. for token in text.replace("\n", " ").replace("\r", " ").replace(",", " ").split(): if token: yield token return if isinstance(raw, (list, tuple, set)): for item in raw: if item is None: continue if isinstance(item, str): if item.strip(): yield item else: text = str(item).strip() if text: yield text return # Last resort: string-coerce. text = str(raw).strip() if text: yield text def _canonicalize(url_text: str) -> Optional[str]: u = str(url_text or "").strip() if not u: return None # Trim common wrappers and trailing punctuation. u = u.strip("<>\"' ") u = u.rstrip(')].,;"') if not u: return None # IMPORTANT: URLs can be case-sensitive in the path/query on some hosts # (e.g., https://0x0.st/PzGY.webp). Do not lowercase or otherwise rewrite # the URL here; preserve exact casing and percent-encoding. return u seen: Set[str] = set() out: List[str] = [] for raw_url in _iter_raw_urls(value): canonical = _canonicalize(raw_url) if not canonical: continue if canonical in seen: continue seen.add(canonical) out.append(canonical) return out def _normalise_string_list(values: Optional[Iterable[Any]]) -> List[str]: if not values: return [] seen: Set[str] = set() items: List[str] = [] for value in values: if value is None: continue text = str(value).strip().lower() if not text: continue if text in seen: continue seen.add(text) items.append(text) return items def _derive_sidecar_path(media_path: Path) -> Path: """Return sidecar path (.tag).""" try: preferred = media_path.parent / (media_path.name + ".tag") except ValueError: preferred = media_path.with_name(media_path.name + ".tag") return preferred def _read_sidecar_metadata( sidecar_path: Path, ) -> tuple[Optional[str], List[str], List[str]]: # pyright: ignore[reportUnusedFunction] """Read hash, tags, and url from sidecar file. Consolidated with read_tags_from_file - this extracts extra metadata (hash, url). """ if not sidecar_path.exists(): return None, [], [] try: raw = sidecar_path.read_text(encoding="utf-8") except OSError: return None, [], [] hash_value: Optional[str] = None tags: List[str] = [] urls: List[str] = [] for raw_line in raw.splitlines(): line = raw_line.strip() if not line or line.startswith("#"): continue lower = line.lower() if lower.startswith("hash:"): hash_value = line.split(":", 1)[1].strip() if ":" in line else "" elif lower.startswith("url:") or lower.startswith("url:"): # Parse url (handle legacy 'url:' format) url_part = line.split(":", 1)[1].strip() if ":" in line else "" if url_part: for url_segment in url_part.split(","): for url_token in url_segment.split(): url_clean = url_token.strip() if url_clean and url_clean not in urls: urls.append(url_clean) else: # Everything else is a tag (including relationship: lines) tags.append(line.lower()) return hash_value, tags, urls def rename(file_path: Path, tags: Iterable[str]) -> Optional[Path]: """Rename a file based on a title: tag. If a title: tag is present, renames the file and any .tag/.metadata sidecars. """ new_title: Optional[str] = None for tag in tags: if isinstance(tag, str) and tag.lower().startswith("title:"): new_title = tag.split(":", 1)[1].strip() break if not new_title or not file_path.exists(): return None old_name = file_path.name old_suffix = file_path.suffix new_name = f"{new_title}{old_suffix}" new_path = file_path.with_name(new_name) if new_path == file_path: return None def _rename_sidecar(ext: str) -> None: old_sidecar = file_path.parent / (old_name + ext) if not old_sidecar.exists(): return new_sidecar = file_path.parent / (new_name + ext) if new_sidecar.exists(): try: new_sidecar.unlink() except Exception as exc: debug( f"Warning: Could not replace target sidecar {new_sidecar.name}: {exc}", file=sys.stderr, ) return old_sidecar.rename(new_sidecar) debug( f"Renamed sidecar: {old_sidecar.name} -> {new_sidecar.name}", file=sys.stderr ) try: if new_path.exists(): try: new_path.unlink() debug(f"Replaced existing file: {new_name}", file=sys.stderr) except Exception as exc: debug( f"Warning: Could not replace target file {new_name}: {exc}", file=sys.stderr ) return None file_path.rename(new_path) debug(f"Renamed file: {old_name} -> {new_name}", file=sys.stderr) _rename_sidecar(".tag") _rename_sidecar(".metadata") return new_path except Exception as exc: debug(f"Warning: Failed to rename file: {exc}", file=sys.stderr) return None def write_tags( media_path: Path, tags: Iterable[str], url: Iterable[str], hash_value: Optional[str] = None, db=None, ) -> None: """Write tags to database or sidecar file (tags only). Hash/URL data is no longer written to the tag sidecar; it belongs in metadata. If db is provided, inserts tags only into LocalLibraryDB. Otherwise, writes .tag sidecar. """ if media_path.exists() and media_path.is_dir(): raise ValueError(f"write_tags_sidecar: media_path is a directory: {media_path}") # Prepare tags lines and convert to list if needed (tags only) tag_list = list(tags) if not isinstance(tags, list) else tags tag_list = [str(tag).strip().lower() for tag in tag_list if str(tag).strip()] # If database provided, insert directly and skip sidecar if db is not None: try: db_tags = [str(tag).strip().lower() for tag in tag_list if str(tag).strip()] if db_tags: db.add_tags(media_path, db_tags) debug(f"Added tags to database for {media_path.name}") return except Exception as e: debug(f"Failed to add tags to database: {e}", file=sys.stderr) # Fall through to sidecar creation as fallback # Create sidecar path try: sidecar = media_path.parent / (media_path.name + ".tag") except Exception: sidecar = media_path.with_name(media_path.name + ".tag") # Handle edge case: empty/invalid base name try: if not sidecar.stem or sidecar.name in {".tag", "-.tag", "_.tag"}: fallback_base = ( media_path.stem or _sanitize_title_for_filename(extract_title(tag_list) or "") or "untitled" ) sidecar = media_path.parent / f"{fallback_base}.tag" except Exception: pass # Write via consolidated function try: lines = [] lines.extend(str(tag).strip().lower() for tag in tag_list if str(tag).strip()) if lines: sidecar.write_text("\n".join(lines) + "\n", encoding="utf-8") debug(f"Tags: {sidecar}") else: try: sidecar.unlink() except FileNotFoundError: pass except OSError as exc: debug(f"Failed to write tag sidecar {sidecar}: {exc}", file=sys.stderr) def write_metadata( media_path: Path, hash_value: Optional[str] = None, url: Optional[Iterable[str]] = None, relationships: Optional[Iterable[str]] = None, db=None, ) -> None: """Write metadata to database or sidecar file. If db is provided, inserts into LocalLibraryDB and skips sidecar file creation. Otherwise, creates .metadata sidecar file with hash, url, and relationships. Args: media_path: Path to the media file hash_value: Optional hash value for the file url: Optional iterable of known URL strings relationships: Optional iterable of relationship strings db: Optional LocalLibraryDB instance. If provided, skips sidecar creation. """ if media_path.exists() and media_path.is_dir(): raise ValueError( f"write_metadata_sidecar: media_path is a directory: {media_path}" ) # Prepare metadata lines url_list = list(url) if url else [] rel_list = list(relationships) if relationships else [] # If database provided, insert directly and skip sidecar if db is not None: try: # Build metadata tag list db_tags = [] if hash_value: db_tags.append(f"hash:{hash_value}") for url in url_list: if str(url).strip(): clean = str(url).strip() db_tags.append(f"url:{clean}") for rel in rel_list: if str(rel).strip(): db_tags.append(f"relationship:{str(rel).strip()}") if db_tags: db.add_tags(media_path, db_tags) debug(f"Added metadata to database for {media_path.name}") return except Exception as e: debug(f"Failed to add metadata to database: {e}", file=sys.stderr) # Fall through to sidecar creation as fallback # Create sidecar path try: sidecar = media_path.parent / (media_path.name + ".metadata") except Exception: sidecar = media_path.with_name(media_path.name + ".metadata") try: lines = [] # Add hash if available if hash_value: lines.append(f"hash:{hash_value}") # Add known url for url in url_list: if str(url).strip(): clean = str(url).strip() lines.append(f"url:{clean}") # Add relationships for rel in rel_list: if str(rel).strip(): lines.append(f"relationship:{str(rel).strip()}") # Write metadata file if lines: sidecar.write_text("\n".join(lines) + "\n", encoding="utf-8") debug(f"Wrote metadata to {sidecar}") else: # Remove if no content try: sidecar.unlink() except FileNotFoundError: pass except OSError as exc: debug(f"Failed to write metadata sidecar {sidecar}: {exc}", file=sys.stderr) def extract_title(tags: Iterable[str]) -> Optional[str]: """ Extracts a title from a list of tags (looks for 'title:...'). """ for tag in tags: tag = tag.strip() if tag.lower().startswith("title:"): title_tag = tag.split(":", 1)[1].strip() if title_tag: return title_tag return None def _sanitize_title_for_filename(title: str) -> str: # Allow alnum, hyphen, underscore, and space; replace other chars with space temp = [] for ch in title: if ch.isalnum() or ch in {"-", "_", " "}: temp.append(ch) else: temp.append(" ") # Collapse whitespace and trim hyphens/underscores around words rough = "".join(temp) tokens = [] for seg in rough.split(): cleaned = seg.strip("-_ ") if cleaned: tokens.append(cleaned) sanitized = "_".join(tokens) sanitized = sanitized.strip("-_") return sanitized or "untitled" def apply_title_to_path(media_path: Path, tags: Iterable[str]) -> Path: """ If a title tag is present, returns a new Path with the title as filename; else returns original path. """ title = extract_title(tags) if not title: return media_path parent = media_path.parent sanitized = _sanitize_title_for_filename(title) destination = parent / f"{sanitized}{media_path.suffix}" return destination def sync_sidecar(payload: Dict[str, Any]) -> Dict[str, Any]: path_value = payload.get("path") if not path_value: raise ValueError("path is required to synchronise sidecar") candidate = Path(str(path_value)).expanduser() if candidate.suffix.lower() == ".tag": sidecar_path = candidate else: sidecar_path = _derive_sidecar_path(candidate) tags = _normalise_string_list(payload.get("tag")) if not tags and sidecar_path.exists(): tags = read_tags_from_file(sidecar_path) sidecar_path.parent.mkdir(parents=True, exist_ok=True) if tags: sidecar_path.write_text("\n".join(tags) + "\n", encoding="utf-8") return { "path": str(sidecar_path), "tag": tags, } try: sidecar_path.unlink() except FileNotFoundError: pass return { "path": str(sidecar_path), "tag": [], "deleted": True, } def apply_tag_mutation(payload: Dict[str, Any], operation: str = "add") -> Dict[str, Any]: """Unified tag mutation for add and update operations (Hydrus and local). Consolidates: add_tag, update_tag, _add_local_tag, _update_local_tag Args: payload: Mutation payload with type, tags, old_tag, new_tag operation: 'add' or 'update' Returns: Dict with tags and operation result """ file_type = str(payload.get("type", "local")).lower() if file_type == "hydrus": if operation == "add": new_tag = _normalize_tag(payload.get("new_tag")) if not new_tag: raise ValueError("new_tag is required") result = apply_hydrus_tag_mutation(payload, [new_tag], []) result["added"] = True return result else: # update old_tag = _normalize_tag(payload.get("old_tag")) new_tag = _normalize_tag(payload.get("new_tag")) result = apply_hydrus_tag_mutation( payload, [new_tag] if new_tag else [], [old_tag] if old_tag else [] ) result["updated"] = True return result else: # local tag = _clean_existing_tags(payload.get("tag")) if operation == "add": new_tag = _normalize_tag(payload.get("new_tag")) if not new_tag: raise ValueError("new_tag is required") added = new_tag not in tag if added: tag.append(new_tag) return { "tag": tag, "added": added } else: # update old_tag = _normalize_tag(payload.get("old_tag")) new_tag = _normalize_tag(payload.get("new_tag")) if not old_tag: raise ValueError("old_tag is required") remaining = [] removed_count = 0 for item in tag: if item == old_tag: removed_count += 1 else: remaining.append(item) if new_tag and removed_count > 0: remaining.extend([new_tag] * removed_count) updated = removed_count > 0 or (bool(new_tag) and new_tag not in tag) return { "tag": remaining, "updated": updated, "removed_count": removed_count } def extract_ytdlp_tags(entry: Dict[str, Any]) -> List[str]: """ """ tags: List[str] = [] seen_namespaces: Set[str] = set() # Meaningful yt-dlp fields that should become tags # This mapping excludes technical fields: filesize, duration, format_id, vcodec, acodec, ext, etc. field_to_namespace = { "artist": "artist", "album": "album", "creator": "creator", "uploader": "creator", # Map uploader to creator (deduplicate) "uploader_id": "creator", "channel": "channel", "genre": "genre", "track": "track", "track_number": "track_number", "release_date": "release_date", "upload_date": "upload_date", "title": "title", "license": "license", "location": "location", } # Extract simple field mappings for yt_field, namespace in field_to_namespace.items(): value = entry.get(yt_field) if value is not None: value_str = value_normalize(str(value)) if value_str: # Prevent duplicate creator tags (only use first creator) if namespace == "creator": if "creator" in seen_namespaces: continue seen_namespaces.add("creator") _add_tag(tags, namespace, value_str) # Handle tags field specially (could be list, dict, or string) # For list/sequence tags, capture as freeform (no namespace prefix) tags_field = entry.get("tags") if tags_field is not None: if isinstance(tags_field, list): # Tags is list: ["tag1", "tag2", ...] → capture as freeform tags (no "tag:" prefix) # These are typically genre/category tags from the source (BandCamp genres, etc.) for tag_value in tags_field: if tag_value: normalized = value_normalize(str(tag_value)) if normalized and normalized not in tags: tags.append(normalized) elif isinstance(tags_field, dict): # Tags is dict: {"key": "val"} → tag:key:val for key, val in tags_field.items(): if key and val: key_normalized = value_normalize(str(key)) val_normalized = value_normalize(str(val)) if key_normalized and val_normalized: _add_tag(tags, f"tag:{key_normalized}", val_normalized) else: # Tags is string or other: add as freeform if tags_field: normalized = value_normalize(str(tags_field)) if normalized and normalized not in tags: tags.append(normalized) return tags def dedup_tags_by_namespace(tags: List[str], keep_first: bool = True) -> List[str]: """Deduplicate tags by namespace, keeping consistent order. This is the UNIFIED API for tag deduplication used across all cmdlet. Replaces custom deduplication logic in merge_file.py and other modules. Groups tags by namespace (e.g., "artist", "album", "tag") and keeps either the first or last occurrence of each namespace, then preserves order based on first appearance. Args: tags: List of tags (with or without namespace prefixes) keep_first: If True, keep first occurrence per namespace (default). If False, keep last occurrence per namespace. Returns: Deduplicated tag list with consistent order Example: >>> tags = [ ... 'artist:Beatles', 'album:Abbey Road', ... 'artist:Beatles', 'tag:rock', ... 'album:Abbey Road', 'artist:Beatles' ... ] >>> dedup = dedup_tags_by_namespace(tags) >>> debug(dedup) ['artist:Beatles', 'album:Abbey Road', 'tag:rock'] """ if not tags: return [] # Group tags by namespace namespace_to_tags: Dict[Optional[str], List[Tuple[int, str]]] = ( {} ) # namespace → [(index, full_tag), ...] first_appearance: Dict[Optional[str], int] = {} # namespace → first_index for idx, tag in enumerate(tags): # Extract namespace (part before ':') if ":" in tag: namespace: Optional[str] = tag.split(":", 1)[0] else: namespace = None # No namespace # Track first appearance if namespace not in first_appearance: first_appearance[namespace] = idx # Store tag with its index if namespace not in namespace_to_tags: namespace_to_tags[namespace] = [] namespace_to_tags[namespace].append((idx, tag)) # Build result: keep first or last occurrence per namespace result: List[Tuple[int, str]] = [] # (first_appearance_index, tag) for namespace, tag_list in namespace_to_tags.items(): if keep_first: chosen_tag = tag_list[0][1] # First occurrence else: chosen_tag = tag_list[-1][1] # Last occurrence result.append((first_appearance[namespace], chosen_tag)) # Sort by first appearance order, then extract tags result.sort(key=lambda x: x[0]) return [tag for _, tag in result] def merge_multiple_tag_lists(sources: List[List[str]], strategy: str = "first") -> List[str]: """Intelligently merge multiple tag lists with smart deduplication. This is the UNIFIED API for merging tags from multiple sources (e.g., when merging multiple files or combining metadata sources). Strategies: - 'first': Keep first occurrence of each namespace (default) - 'all': Keep all different values (different artists possible) - 'combine': For non-namespace tags, combine all unique values Args: sources: List of tag lists to merge strategy: Merge strategy - 'first', 'all', or 'combine' Returns: Merged and deduplicated tag list Example: >>> list1 = ['artist:Beatles', 'album:Abbey Road'] >>> list2 = ['artist:Beatles', 'album:Abbey Road', 'tag:rock'] >>> merged = merge_multiple_tag_lists([list1, list2]) >>> debug(merged) ['artist:Beatles', 'album:Abbey Road', 'tag:rock'] """ if not sources: return [] if strategy == "first": # Concatenate all lists and deduplicate by namespace all_tags = [] for tag_list in sources: all_tags.extend(tag_list or []) return dedup_tags_by_namespace(all_tags, keep_first=True) elif strategy == "all": # Keep all different values per namespace namespace_to_values: Dict[Optional[str], Set[str]] = {} order: List[Tuple[int, str, str]] = [] # (first_index, namespace, value) global_index = 0 for source in sources: if not source: continue for tag in source: if ":" in tag: namespace: Optional[str] = tag.split(":", 1)[0] value = tag.split(":", 1)[1] else: namespace = None value = tag if namespace not in namespace_to_values: namespace_to_values[namespace] = set() order.append((global_index, namespace or "", tag)) elif value not in namespace_to_values[namespace]: order.append((global_index, namespace or "", tag)) namespace_to_values[namespace].add(value) global_index += 1 # Sort by order of first appearance and extract order.sort(key=lambda x: x[0]) return [tag for _, _, tag in order] elif strategy == "combine": # Combine all unique plain (non-namespace) tags all_tags = [] namespaced: Dict[str, str] = {} # namespace → tag (first occurrence) for source in sources: if not source: continue for tag in source: if ":" in tag: namespace = tag.split(":", 1)[0] if namespace not in namespaced: namespaced[namespace] = tag all_tags.append(tag) else: if tag not in all_tags: all_tags.append(tag) return all_tags else: raise ValueError(f"Unknown merge strategy: {strategy}") def read_tags_from_file(file_path: Path) -> List[str]: """Read and normalize tags from .tag sidecar file. This is the UNIFIED API for reading .tag files across all cmdlet. Handles normalization, deduplication, and format validation. Args: file_path: Path to .tag sidecar file Returns: List of normalized tag strings Raises: FileNotFoundError: If file doesn't exist Example: >>> tags = read_tags_from_file(Path('file.txt.tag')) >>> debug(tags) ['artist:Beatles', 'album:Abbey Road'] """ file_path = Path(file_path) if not file_path.exists(): raise FileNotFoundError(f"Tag file not found: {file_path}") tags: List[str] = [] seen: Set[str] = set() try: with open(file_path, "r", encoding="utf-8") as f: for line in f: # Strip whitespace and skip empty lines line = line.strip() if not line: continue # Skip comment lines if line.startswith("#"): continue # Normalize the tag normalized = value_normalize(line).lower() if normalized and normalized not in seen: seen.add(normalized) tags.append(normalized) except Exception as exc: raise ValueError(f"Error reading tag file {file_path}: {exc}") return tags def embed_metadata_in_file( file_path: Path, tags: List[str], file_kind: str = "" ) -> bool: """ """ if not tags: return True file_path = Path(file_path) # Tag namespace to FFmpeg metadata key mapping tag_map = { "title": "title", "artist": "artist", "album": "album", "track": "track", "track_number": "track", "date": "date", "year": "date", "genre": "genre", "composer": "composer", "comment": "comment", "url": "comment", # Embed known url in comment field "creator": "artist", # Map creator to artist "channel": "album_artist", # Map channel to album_artist } # Extract metadata from tags metadata = {} comments = [] # Collect comments (including url) for tag in tags: tag_str = str(tag).strip() if ":" in tag_str: namespace, value = tag_str.split(":", 1) namespace = namespace.lower().strip() value = value.strip() if namespace in tag_map and value: ffmpeg_key = tag_map[namespace] if namespace == "url": # Collect url as comments comments.append(f"URL: {value}") elif ffmpeg_key == "comment": # Collect other comment-type tags comments.append(value) elif ffmpeg_key not in metadata: # Don't overwrite if already set from earlier tag metadata[ffmpeg_key] = value # Add collected comments to metadata if comments: if "comment" in metadata: metadata["comment"] = metadata["comment"] + " | " + " | ".join(comments) else: metadata["comment"] = " | ".join(comments) # Apply sensible defaults for audio files if file_kind == "audio" or (not file_kind and file_path.suffix.lower() in {".mp3", ".flac", ".wav", ".m4a", ".aac", ".ogg", ".opus", ".mka"}): # If no album, use title as album if "album" not in metadata and "title" in metadata: metadata["album"] = metadata["title"] # If no track, default to 1 if "track" not in metadata: metadata["track"] = "1" # If no album_artist, use artist if "artist" in metadata: metadata["album_artist"] = metadata["artist"] if not metadata: return True # Check if FFmpeg is available ffmpeg_path = shutil.which("ffmpeg") if not ffmpeg_path: debug( f"⚠️ FFmpeg not found; cannot embed metadata in {file_path.name}", file=sys.stderr ) return False # Create temporary file for output temp_file = file_path.parent / f"{file_path.stem}.ffmpeg_tmp{file_path.suffix}" try: cmd = [ffmpeg_path, "-y", "-i", str(file_path)] for key, value in metadata.items(): cmd.extend(["-metadata", f"{key}={value}"]) cmd.extend(["-c", "copy", str(temp_file)]) # Run ffmpeg with error handling for non-UTF8 output result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False, # Don't decode as text - ffmpeg may output binary data timeout=30, ) if result.returncode == 0 and temp_file.exists(): # Replace original with temp file file_path.unlink() temp_file.rename(file_path) debug(f"Embedded metadata in file: {file_path.name}", file=sys.stderr) return True else: # Clean up temp file if it exists if temp_file.exists(): temp_file.unlink() debug( f"❌ FFmpeg metadata embedding failed for {file_path.name}", file=sys.stderr ) if result.stderr: # Safely decode stderr, ignoring invalid UTF-8 bytes try: stderr_text = result.stderr.decode("utf-8", errors="replace")[:200] debug(f"FFmpeg stderr: {stderr_text}", file=sys.stderr) except Exception: pass return False except Exception as exc: if temp_file.exists(): try: temp_file.unlink() except Exception: pass debug(f"❌ Error embedding metadata: {exc}", file=sys.stderr) return False def write_tags_to_file( file_path: Path, tags: List[str], source_hashes: Optional[List[str]] = None, url: Optional[List[str]] = None, append: bool = False, ) -> bool: """Write tags to .tag sidecar file. This is the UNIFIED API for writing .tag files across all cmdlet. Uses consistent format and handles file creation/overwriting. Args: file_path: Path to .tag file (will be created if doesn't exist) tags: List of tags to write source_hashes: Optional source file hashes (written as source:hash1,hash2) url: Optional known url (each written on separate line as url:url) append: If True, append to existing file; if False, overwrite (default) Returns: True if successful Raises: Exception: If file write fails Example: >>> tags = ['artist:Beatles', 'album:Abbey Road'] >>> write_tags_to_file(Path('file.txt.tag'), tags) True """ file_path = Path(file_path) try: # Prepare content content_lines: List[str] = [] # Add source hashes if provided if source_hashes: content_lines.append(f"source:{','.join(source_hashes)}") # Add known url if provided - each on separate line to prevent corruption if url: for url_item in url: content_lines.append(f"url:{url_item}") # Add tags if tags: content_lines.extend( [str(t).strip().lower() for t in tags if str(t).strip()] ) # Write to file mode = "a" if (append and file_path.exists()) else "w" with open(file_path, mode, encoding="utf-8") as f: for line in content_lines: f.write(line + "\n") return True except Exception as exc: raise ValueError(f"Error writing tag file {file_path}: {exc}") def normalize_tags_from_source(source_data: Any, source_type: str = "auto") -> List[str]: """Normalize tags from any source format. Universal function to normalize tags from different sources: - yt-dlp entry dicts - Raw tag lists - .tag file content strings - Metadata dictionaries Args: source_data: Source data (type determined by source_type or auto-detected) source_type: One of 'auto', 'ytdlp', 'list', 'text', 'dict' 'auto' attempts to auto-detect the type Returns: Normalized, deduplicated tag list Example: >>> entry = {'artist': 'Beatles', 'album': 'Abbey Road'} >>> tags = normalize_tags_from_source(entry, 'ytdlp') >>> debug(tags) ['artist:Beatles', 'album:Abbey Road'] """ if source_type == "auto": # Auto-detect source type if isinstance(source_data, dict): # Check if it looks like a yt-dlp entry (has id, title, url, etc.) if "id" in source_data or "title" in source_data or "uploader" in source_data: source_type = "ytdlp" else: source_type = "dict" elif isinstance(source_data, list): source_type = "list" elif isinstance(source_data, str): source_type = "text" else: source_type = "dict" # Process based on detected/specified type if source_type == "ytdlp": if not isinstance(source_data, dict): raise ValueError("ytdlp source must be a dict") return extract_ytdlp_tags(source_data) elif source_type == "list": if not isinstance(source_data, (list, tuple)): raise ValueError("list source must be a list or tuple") # Normalize each tag in the list result = [] for tag in source_data: normalized = value_normalize(str(tag)) if normalized: result.append(normalized) return result elif source_type == "text": if not isinstance(source_data, str): raise ValueError("text source must be a string") # Split by lines and normalize lines = source_data.split("\n") result = [] seen = set() for line in lines: line = line.strip() if line and not line.startswith("#"): normalized = value_normalize(line) if normalized and normalized not in seen: seen.add(normalized) result.append(normalized) return result elif source_type == "dict": if not isinstance(source_data, dict): raise ValueError("dict source must be a dict") # Extract as generic metadata (similar to yt-dlp but from any dict) return extract_ytdlp_tags(source_data) else: raise ValueError(f"Unknown source type: {source_type}") def detect_metadata_request(tag: str) -> Optional[Dict[str, str]]: trimmed = value_normalize(tag) if not trimmed: return None lower = trimmed.lower() imdb_match = re.match(r"^imdb:\s*(tt[\w]+)$", lower) if imdb_match: imdb_id = imdb_match.group(1) return { "source": "imdb", "id": imdb_id, "base": f"imdb:{imdb_id}", } remainder = re.match(r"^musicbrainz:\s*(.+)$", lower) if remainder: raw = remainder.group(1) entity = "release" identifier = raw specific = re.match(r"^(?P[a-zA-Z]+)\s*:\s*(?P[\w-]+)$", raw) if specific: entity = specific.group("entity") identifier = specific.group("id") identifier = identifier.replace(" ", "") if identifier: return { "source": "musicbrainz", "entity": entity.lower(), "id": identifier, "base": f"musicbrainz:{identifier}", } return None def expand_metadata_tag(payload: Dict[str, Any]) -> Dict[str, Any]: tag = payload.get("tag") if not isinstance(tag, str): return { "tag": [] } trimmed = value_normalize(tag) if not trimmed: return { "tag": [] } request = detect_metadata_request(trimmed) tags: List[str] = [] seen: Set[str] = set() if request: _append_unique(tags, seen, request["base"]) else: _append_unique(tags, seen, trimmed) return { "tag": tags } try: if request["source"] == "imdb": data = imdb_tag(request["id"]) else: data = fetch_musicbrainz_tags(request["id"], request["entity"]) except Exception as exc: # pragma: no cover - network/service errors return { "tag": tags, "error": str(exc) } # Add tags from fetched data (no namespace, just unique append) raw_tags = data.get("tag") if isinstance(data, dict) else None if isinstance(raw_tags, str): tag_iter: Iterable[str] = [raw_tags] elif isinstance(raw_tags, (list, tuple, set)): tag_iter = [t for t in raw_tags if isinstance(t, str)] else: tag_iter = [] for tag_value in tag_iter: _append_unique(tags, seen, tag_value) result = { "tag": tags, "source": request["source"], "id": request["id"], } if request["source"] == "musicbrainz": result["entity"] = request["entity"] return result def build_remote_bundle( metadata: Optional[Dict[str, Any]], existing: Optional[Sequence[str]] = None, context: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: metadata = metadata or {} context = context or {} tags: List[str] = [] seen: Set[str] = set() if existing: for tag in existing: _append_unique(tags, seen, tag) # Add tags from various sources for tag in metadata.get("tag") or []: _append_unique(tags, seen, tag) for tag in metadata.get("categories") or []: _append_unique(tags, seen, tag) # Extract and namespace genres raw_genres = metadata.get("genres") keywords = metadata.get("keywords") if isinstance(keywords, str): for token in keywords.split(","): _append_unique(tags, seen, token) if raw_genres: for genre in (raw_genres if isinstance(raw_genres, (list, tuple)) else [raw_genres]): if genre: _append_unique(tags, seen, f"genre:{genre}") # Extract creators/artists artists = metadata.get("artists") or metadata.get("artist") if artists: artist_list = artists if isinstance(artists, (list, tuple)) else [artists] for artist in artist_list: if artist: _append_unique(tags, seen, f"creator:{artist}") creator = ( metadata.get("uploader") or metadata.get("channel") or metadata.get("artist") or metadata.get("creator") ) if creator: _append_unique(tags, seen, f"creator:{creator}") # Extract title title_value = metadata.get("title") if title_value: _extend_namespaced(tags, seen, "title", [title_value]) source_url = ( context.get("source_url") or metadata.get("original_url") or metadata.get("webpage_url") or metadata.get("url") ) clean_title = value_normalize(str(title_value)) if title_value is not None else None result = { "tag": tags, "title": clean_title, "source_url": _sanitize_url(source_url), "duration": _coerce_duration(metadata), "metadata": metadata, } return result def _load_payload(value: Optional[str]) -> Dict[str, Any]: text = value if text is None: text = sys.stdin.read() if text is None or text.strip() == "": raise ValueError("Expected JSON payload") data = json.loads(text) if not isinstance(data, dict): raise ValueError("Payload must be a JSON object") return data import typer app = typer.Typer(help="Fetch metadata tags for known services") @app.command(help="Lookup an IMDb title") def imdb(imdb_id: str = typer.Argument(..., help="IMDb identifier (ttXXXXXXX)")): """Lookup an IMDb title.""" try: result = imdb_tag(imdb_id) debug(json.dumps(result, ensure_ascii=False), flush=True) except Exception as exc: error_payload = { "error": str(exc) } debug(json.dumps(error_payload, ensure_ascii=False), flush=True) raise typer.Exit(code=1) @app.command(help="Lookup a MusicBrainz entity") def musicbrainz( mbid: str = typer.Argument(..., help="MusicBrainz identifier (UUID)"), entity: str = typer.Option( "release", help="Entity type (release, recording, artist)" ), ): """Lookup a MusicBrainz entity.""" try: result = fetch_musicbrainz_tags(mbid, entity) debug(json.dumps(result, ensure_ascii=False), flush=True) except Exception as exc: error_payload = { "error": str(exc) } debug(json.dumps(error_payload, ensure_ascii=False), flush=True) raise typer.Exit(code=1) @app.command(name="remote-tags", help="Normalize a remote metadata payload") def remote_tags( payload: Optional[str] = typer.Option( None, "--payload", help="JSON payload; reads stdin if omitted" ) ): """Normalize a remote metadata payload.""" try: payload_data = _load_payload(payload) metadata = payload_data.get("metadata") or {} existing = payload_data.get("existing_tags") or [] context = payload_data.get("context") or {} if not isinstance(existing, list): raise ValueError("existing_tags must be a list") if context and not isinstance(context, dict): raise ValueError("context must be an object") result = build_remote_bundle(metadata, existing, context) debug(json.dumps(result, ensure_ascii=False), flush=True) except Exception as exc: error_payload = { "error": str(exc) } debug(json.dumps(error_payload, ensure_ascii=False), flush=True) raise typer.Exit(code=1) @app.command(name="remote-fetch", help="Resolve remote metadata bundle") def remote_fetch( payload: Optional[str] = typer.Option( None, "--payload", help="JSON payload; reads stdin if omitted" ) ): """Resolve remote metadata bundle.""" try: payload_data = _load_payload(payload) result = resolve_remote_metadata(payload_data) debug(json.dumps(result, ensure_ascii=False), flush=True) except Exception as exc: error_payload = { "error": str(exc) } debug(json.dumps(error_payload, ensure_ascii=False), flush=True) raise typer.Exit(code=1) @app.command(name="expand-tag", help="Expand metadata references into tags") def expand_tag( payload: Optional[str] = typer.Option( None, "--payload", help="JSON payload; reads stdin if omitted" ) ): """Expand metadata references into tags.""" try: payload_data = _load_payload(payload) result = expand_metadata_tag(payload_data) debug(json.dumps(result, ensure_ascii=False), flush=True) except Exception as exc: error_payload = { "error": str(exc) } debug(json.dumps(error_payload, ensure_ascii=False), flush=True) raise typer.Exit(code=1) @app.command(name="hydrus-fetch", help="Fetch Hydrus metadata for a file") def hydrus_fetch( payload: Optional[str] = typer.Option( None, "--payload", help="JSON payload; reads stdin if omitted" ) ): """Fetch Hydrus metadata for a file.""" try: payload_data = _load_payload(payload) result = fetch_hydrus_metadata(payload_data) debug(json.dumps(result, ensure_ascii=False), flush=True) except Exception as exc: error_payload = { "error": str(exc) } debug(json.dumps(error_payload, ensure_ascii=False), flush=True) raise typer.Exit(code=1) @app.command(name="hydrus-fetch-url", help="Fetch Hydrus metadata using a source URL") def hydrus_fetch_url( payload: Optional[str] = typer.Option( None, "--payload", help="JSON payload; reads stdin if omitted" ) ): """Fetch Hydrus metadata using a source URL.""" try: payload_data = _load_payload(payload) result = fetch_hydrus_metadata_by_url(payload_data) debug(json.dumps(result, ensure_ascii=False), flush=True) except Exception as exc: error_payload = { "error": str(exc) } debug(json.dumps(error_payload, ensure_ascii=False), flush=True) raise typer.Exit(code=1) @app.command(name="sync-sidecar", help="Synchronise .tag sidecar with supplied data") def sync_sidecar_cmd( payload: Optional[str] = typer.Option( None, "--payload", help="JSON payload; reads stdin if omitted" ) ): """Synchronise .tag sidecar with supplied data.""" try: payload_data = _load_payload(payload) result = sync_sidecar(payload_data) debug(json.dumps(result, ensure_ascii=False), flush=True) except Exception as exc: error_payload = { "error": str(exc) } debug(json.dumps(error_payload, ensure_ascii=False), flush=True) raise typer.Exit(code=1) @app.command(name="update-tag", help="Update or rename a tag") def update_tag_cmd( payload: Optional[str] = typer.Option( None, "--payload", help="JSON payload; reads stdin if omitted" ) ): """Update or rename a tag.""" try: payload_data = _load_payload(payload) result = apply_tag_mutation(payload_data, "update") debug(json.dumps(result, ensure_ascii=False), flush=True) except Exception as exc: error_payload = { "error": str(exc) } debug(json.dumps(error_payload, ensure_ascii=False), flush=True) raise typer.Exit(code=1) def main(argv: Optional[List[str]] = None) -> int: """Main entry point using Typer.""" try: app(argv, standalone_mode=False) return 0 except SystemExit as e: return e.code if isinstance(e.code, int) else 1 # ============================================================================ # TAG OPERATIONS - Consolidated from tag_operations.py and tag_helpers.py # ============================================================================ def sort_tags(tags: List[str]) -> List[str]: """ Sort tags into namespace tags and freeform tags, then alphabetically. Args: tags: List of tag strings Returns: Sorted list with namespace tags first, then freeform tags """ if not tags: return [] namespace_tags = [] freeform_tags = [] for tag in tags: if isinstance(tag, str): if ":" in tag: namespace_tags.append(tag) else: freeform_tags.append(tag) namespace_tags.sort() freeform_tags.sort() return namespace_tags + freeform_tags def format_tags_display(tags: List[str], namespace_filter: Optional[str] = None) -> List[str]: """ Format tags for display, optionally filtered by namespace. Args: tags: List of tags namespace_filter: Optional namespace to filter by (e.g., "creator:") Returns: Formatted list of tags """ if not tags: return [] if namespace_filter: filtered = [t for t in tags if t.startswith(namespace_filter)] return sort_tags(filtered) return sort_tags(tags) def split_tag(tag: str) -> tuple[str, str]: """ Split a tag into namespace and value. Args: tag: Tag string (e.g., "creator:Author Name" or "freeform tag") Returns: Tuple of (namespace, value). For freeform tags, namespace is empty string. """ if ":" in tag: parts = tag.split(":", 1) return parts[0], parts[1] return "", tag def filter_tags_by_namespace(tags: List[str], namespace: str) -> List[str]: """ Get all tags in a specific namespace. Args: tags: List of tags namespace: Namespace to filter by Returns: List of values in that namespace """ prefix = namespace + ":" return [split_tag(t)[1] for t in tags if t.startswith(prefix)] def ensure_title_tag(tags: List[str], title: str) -> List[str]: """ Ensure there's a title: tag with the given title. Args: tags: List of existing tags title: Title to ensure exists Returns: Updated tag list """ if not title: return tags # Remove any existing title tags filtered = [t for t in tags if not t.startswith("title:")] # Add new title tag new_tags = filtered + [f"title:{title}"] return sort_tags(new_tags) def remove_title_tags(tags: List[str]) -> List[str]: """Remove all title: tags.""" return [t for t in tags if not t.startswith("title:")] def is_namespace_tag(tag: str) -> bool: """Check if a tag is a namespace tag (contains :).""" return ":" in tag if isinstance(tag, str) else False def validate_tag(tag: str) -> bool: """ Validate that a tag is properly formatted. Args: tag: Tag to validate Returns: True if tag is valid """ if not isinstance(tag, str) or not tag.strip(): return False # Tag shouldn't have leading/trailing whitespace if tag != tag.strip(): return False # Tag shouldn't be empty if not tag: return False return True def normalize_tags(tags: List[Any]) -> List[str]: """ Normalize a tag list by filtering and cleaning. Args: tags: List of tags (may contain invalid entries) Returns: Cleaned list of valid tags """ if not tags: return [] normalized = [] for tag in tags: if isinstance(tag, str): trimmed = tag.strip() if trimmed and validate_tag(trimmed): normalized.append(trimmed) return sort_tags(normalized) def compute_namespaced_tag_overwrite( existing_tags: Sequence[Any], incoming_tags: Sequence[Any], ) -> Tuple[List[str], List[str], List[str]]: """Compute a tag mutation with namespace overwrite semantics. Rules: - Incoming namespaced tags ("ns:value") overwrite any existing tags in that namespace. - Overwrite is based on namespace match (case-insensitive). - Additions are deduped case-insensitively against kept existing tags and within the incoming list. - If an existing tag matches an incoming tag exactly, it is kept (no remove/add). Returns: (tags_to_remove, tags_to_add, merged_tags) Notes: This is intentionally store-agnostic: stores decide how to persist/apply the returned mutation (DB merge write, Hydrus delete/add, etc.). """ def _clean(values: Sequence[Any]) -> List[str]: out: List[str] = [] for v in values or []: if not isinstance(v, str): continue t = v.strip() if t: out.append(t.lower()) return out def _ns_of(tag: str) -> str: if ":" not in tag: return "" return tag.split(":", 1)[0].strip().lower() existing = _clean(existing_tags) incoming = _clean(incoming_tags) if not incoming: return [], [], existing namespaces_to_replace: Set[str] = set() for t in incoming: ns = _ns_of(t) if ns: namespaces_to_replace.add(ns) kept_existing: List[str] = [] kept_existing_lower: Set[str] = set() tags_to_remove: List[str] = [] for t in existing: ns = _ns_of(t) if ns and ns in namespaces_to_replace: # If it matches exactly, keep it; otherwise remove it. if t in incoming: kept_existing.append(t) kept_existing_lower.add(t.lower()) else: # If incoming has the same tag value but different casing, treat as replace. tags_to_remove.append(t) continue kept_existing.append(t) kept_existing_lower.add(t.lower()) tags_to_add: List[str] = [] added_lower: Set[str] = set() for t in incoming: tl = t.lower() if tl in kept_existing_lower: continue if tl in added_lower: continue tags_to_add.append(t) added_lower.add(tl) merged = kept_existing + tags_to_add return tags_to_remove, tags_to_add, merged def merge_tag_lists(*tag_lists: List[str]) -> List[str]: """ Merge multiple tag lists, removing duplicates. Args: *tag_lists: Variable number of tag lists Returns: Merged, deduplicated, sorted list """ merged = set() for tag_list in tag_lists: if isinstance(tag_list, list): merged.update(tag_list) return sort_tags(list(merged)) def tag_diff(old_tags: List[str], new_tags: List[str]) -> Dict[str, List[str]]: """ Calculate the difference between two tag lists. Args: old_tags: Original tags new_tags: New tags Returns: Dict with 'added' and 'removed' keys """ old_set = set(old_tags) if old_tags else set() new_set = set(new_tags) if new_tags else set() return { "added": sorted(list(new_set - old_set)), "removed": sorted(list(old_set - new_set)) } def expand_tag_lists(tags_set: Set[str]) -> Set[str]: """Expand tag list references like {psychology} to actual tags from adjective.json. Removes the reference after expansion (e.g., {psychology} is deleted, psychology tags added). Args: tags_set: Set of tag strings that may include {list_name} references Returns: Set of expanded tags with all {list_name} references replaced with actual tags """ # Load adjective.json from workspace root adjective_path = Path(__file__).parent / "adjective.json" if not adjective_path.exists(): debug(f"adjective.json not found at {adjective_path}") return tags_set try: with open(adjective_path, "r") as f: adjective_lists = json.load(f) except Exception as e: debug(f"Error loading adjective.json: {e}") return tags_set expanded_tags = set() for tag in tags_set: # Check if tag is a list reference like {psychology} if tag.startswith("{") and tag.endswith("}"): list_name = tag[1:-1].lower() # Extract name, make lowercase # Find matching list (case-insensitive) matched_list = None for key in adjective_lists.keys(): if key.lower() == list_name: matched_list = adjective_lists[key] break if matched_list: # Add all tags from the list expanded_tags.update(matched_list) debug(f"Expanded {tag} to {len(matched_list)} tags") else: # List not found, log warning but don't add the reference debug(f"Tag list '{list_name}' not found in adjective.json") else: # Regular tag, keep as is expanded_tags.add(tag) return expanded_tags def process_tags_from_string(tags_str: str, expand_lists: bool = False) -> Set[str]: """Process a tag string into a set of tags. Handles: - Multiple formats: comma-separated, newline-separated, space-separated - Tag list expansion: {psychology} -> psychology tags (if expand_lists=True) - Whitespace trimming Args: tags_str: Raw tag string expand_lists: If True, expand {list_name} references using adjective.json Returns: Set of processed tags """ if not tags_str: return set() # Try to detect delimiter and split accordingly # Prefer newlines, then commas, then spaces if "\n" in tags_str: delimiter = "\n" elif "," in tags_str: delimiter = "," else: delimiter = " " # Split and clean tags tags_set = set() for tag in tags_str.split(delimiter): tag = tag.strip() if tag: tags_set.add(tag) # Expand list references if requested if expand_lists: tags_set = expand_tag_lists(tags_set) return tags_set def build_book_tags( *, title: Optional[str] = None, author: Optional[str] = None, isbn: Optional[str] = None, year: Optional[str] = None, source: Optional[str] = None, extra: Optional[Sequence[str]] = None, ) -> List[str]: """Build consistent book tags for downloads (LibGen, OpenLibrary, etc.).""" tags: List[str] = ["book"] def _add(tag: Optional[str]) -> None: if tag and isinstance(tag, str) and tag.strip(): tags.append(tag.strip()) _add(source) if title: _add(f"title:{title}") if author: _add(f"author:{author}") if isbn: _add(f"isbn:{isbn}") if year: _add(f"year:{year}") if extra: for tag in extra: _add(tag) # Deduplicate while preserving order deduped = list(dict.fromkeys(tags)) return deduped def enrich_playlist_entries(entries: list, extractor: str) -> list: """Enrich playlist entries with full metadata by fetching individual entry info. When extract_flat is used, entries contain minimal info (title, id, url). This function fetches full metadata for each entry. Args: entries: List of entry dicts from probe_url extractor: Extractor name Returns: List of enriched entry dicts """ # Import here to avoid circular dependency from tool.ytdlp import is_url_supported_by_ytdlp if not entries: return entries enriched = [] for entry in entries: # If entry has a direct URL, fetch its full metadata entry_url = entry.get("url") if entry_url and is_url_supported_by_ytdlp(entry_url): try: import yt_dlp ydl_opts: Any = { "quiet": True, "no_warnings": True, "skip_download": True, "noprogress": True, "socket_timeout": 5, "retries": 1, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: full_info = ydl.extract_info(entry_url, download=False) if full_info: enriched.append(full_info) continue except Exception: pass # Fallback to original entry if fetch failed enriched.append(entry) return enriched def format_playlist_entry(entry: Dict[str, Any], index: int, extractor: str) -> Dict[str, Any]: """Format a playlist entry for display in result table. Args: entry: Single playlist entry from yt-dlp (fully enriched if possible) index: 1-based track number extractor: Extractor name (youtube, bandcamp, spotify, etc.) Returns: Dict with displayable fields for result table """ result = { "index": index, "title": entry.get("title", "Unknown"), "duration": entry.get("duration") or entry.get("length") or 0, "uploader": entry.get("uploader") or entry.get("creator") or "", "artist": entry.get("artist") or entry.get("uploader") or entry.get("creator") or "", "album": entry.get("album") or "", "track_number": entry.get("track_number") or index, } # Normalize extractor for comparison ext_lower = extractor.lower().replace(":", "").replace(" ", "") # Add site-specific fields if "youtube" in ext_lower: result["video_id"] = entry.get("id", "") result["channel"] = entry.get("uploader") or entry.get("channel", "") result["views"] = entry.get("view_count", 0) elif "bandcamp" in ext_lower: result["track_number"] = entry.get("track_number") or index # For Bandcamp album entries, track info may be in different fields result["artist"] = entry.get("artist") or entry.get("uploader", "") result["album"] = entry.get("album") or "" elif "spotify" in ext_lower: result["artists"] = entry.get("creator") or entry.get("uploader", "") result["album"] = entry.get("album", "") result["release_date"] = entry.get("release_date", "") return result # ============================================================================ # Metadata helper functions for tag processing and scraping # ============================================================================ def extract_title_from_tags(tags_list: List[str]) -> Optional[str]: """Extract title from tags list.""" try: extracted = extract_title(tags_list) if extracted: return extracted except Exception: pass for t in tags_list: if isinstance(t, str) and t.lower().startswith("title:"): val = t.split(":", 1)[1].strip() if val: return val return None def summarize_tags(tags_list: List[str], limit: int = 8) -> str: """Create a summary of tags for display.""" shown = [t for t in tags_list[:limit] if t] summary = ", ".join(shown) remaining = max(0, len(tags_list) - len(shown)) if remaining > 0: summary = f"{summary} (+{remaining} more)" if summary else f"(+{remaining} more)" if len(summary) > 200: summary = summary[:197] + "..." return summary def extract_scrapable_identifiers(tags_list: List[str]) -> Dict[str, str]: """Extract scrapable identifiers from tags.""" identifiers = {} scrapable_prefixes = { "openlibrary", "isbn", "isbn_10", "isbn_13", "musicbrainz", "musicbrainzalbum", "imdb", "tmdb", "tvdb", } for tag in tags_list: if not isinstance(tag, str) or ":" not in tag: continue parts = tag.split(":", 1) if len(parts) != 2: continue key_raw = parts[0].strip().lower() key = key_raw.replace("-", "_") if key == "isbn10": key = "isbn_10" elif key == "isbn13": key = "isbn_13" value = parts[1].strip() # Normalize ISBN values by removing hyphens for API friendliness if key.startswith("isbn"): value = value.replace("-", "") if key in scrapable_prefixes and value: identifiers[key] = value return identifiers def extract_tag_value(tags_list: List[str], namespace: str) -> Optional[str]: """Get first tag value for a namespace (e.g., artist:, title:).""" ns = namespace.lower() for tag in tags_list: if not isinstance(tag, str) or ":" not in tag: continue prefix, _, value = tag.partition(":") if prefix.strip().lower() != ns: continue candidate = value.strip() if candidate: return candidate return None def scrape_url_metadata( url: str, ) -> Tuple[Optional[str], List[str], List[Tuple[str, str]], List[Dict[str, Any]]]: """Scrape metadata from a URL using yt-dlp. Returns: (title, tags, formats, playlist_items) tuple where: - title: Video/content title - tags: List of extracted tags (both namespaced and freeform) - formats: List of (display_label, format_id) tuples - playlist_items: List of playlist entry dicts (empty if not a playlist) """ try: import json as json_module try: from SYS.metadata import extract_ytdlp_tags except ImportError: extract_ytdlp_tags = None # Build yt-dlp command with playlist support # IMPORTANT: Do NOT use --flat-playlist! It strips metadata like artist, album, uploader, genre # Without it, yt-dlp gives us full metadata in an 'entries' array within a single JSON object # This ensures we get album-level metadata from sources like BandCamp, YouTube Music, etc. cmd = [ "yt-dlp", "-j", # Output JSON "--no-warnings", "--playlist-items", "1-10", # Get first 10 items if it's a playlist (provides entries) "-f", "best", url, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: log(f"yt-dlp error: {result.stderr}", file=sys.stderr) return None, [], [], [] # Parse JSON output - WITHOUT --flat-playlist, we get ONE JSON object with 'entries' array # This gives us full metadata instead of flat format lines = result.stdout.strip().split("\n") if not lines or not lines[0]: log("yt-dlp returned empty output", file=sys.stderr) return None, [], [], [] # Parse the single JSON object try: data = json_module.loads(lines[0]) except json_module.JSONDecodeError as e: log(f"Failed to parse yt-dlp JSON: {e}", file=sys.stderr) return None, [], [], [] # Extract title - use the main title title = data.get("title", "Unknown") # Determine if this is a playlist/album (has entries array) # is_playlist = 'entries' in data and isinstance(data.get('entries'), list) # Extract tags and playlist items tags = [] playlist_items = [] # IMPORTANT: Extract album/playlist-level tags FIRST (before processing entries) # This ensures we get metadata about the collection, not just individual tracks if extract_ytdlp_tags: album_tags = extract_ytdlp_tags(data) tags.extend(album_tags) # Case 1: Entries are nested in the main object (standard playlist structure) if "entries" in data and isinstance(data.get("entries"), list): entries = data["entries"] # Build playlist items with title and duration for idx, entry in enumerate(entries, 1): if isinstance(entry, dict): item_title = entry.get("title", entry.get("id", f"Track {idx}")) item_duration = entry.get("duration", 0) playlist_items.append( { "index": idx, "id": entry.get("id", f"track_{idx}"), "title": item_title, "duration": item_duration, "url": entry.get("url") or entry.get("webpage_url", ""), } ) # Extract tags from each entry and merge (but don't duplicate album-level tags) # Only merge entry tags that are multi-value prefixes (not single-value like title:, artist:, etc.) if extract_ytdlp_tags: entry_tags = extract_ytdlp_tags(entry) # Single-value namespaces that should not be duplicated from entries single_value_namespaces = { "title", "artist", "album", "creator", "channel", "release_date", "upload_date", "license", "location", } for tag in entry_tags: # Extract the namespace (part before the colon) tag_namespace = tag.split(":", 1)[0].lower( ) if ":" in tag else None # Skip if this namespace already exists in tags (from album level) if tag_namespace and tag_namespace in single_value_namespaces: # Check if any tag with this namespace already exists in tags already_has_namespace = any( t.split(":", 1)[0].lower() == tag_namespace for t in tags if ":" in t ) if already_has_namespace: continue # Skip this tag, keep the album-level one if tag not in tags: # Avoid exact duplicates tags.append(tag) # Case 2: Playlist detected by playlist_count field (BandCamp albums, etc.) # These need a separate call with --flat-playlist to get the actual entries elif (data.get("playlist_count") or 0) > 0 and "entries" not in data: try: # Make a second call with --flat-playlist to get the actual tracks flat_cmd = [ "yt-dlp", "-j", "--no-warnings", "--flat-playlist", "-f", "best", url ] flat_result = subprocess.run( flat_cmd, capture_output=True, text=True, timeout=30 ) if flat_result.returncode == 0: flat_lines = flat_result.stdout.strip().split("\n") # With --flat-playlist, each line is a separate track JSON object # (not nested in a playlist container), so process ALL lines for idx, line in enumerate(flat_lines, 1): if line.strip().startswith("{"): try: entry = json_module.loads(line) item_title = entry.get( "title", entry.get("id", f"Track {idx}") ) item_duration = entry.get("duration", 0) playlist_items.append( { "index": idx, "id": entry.get("id", f"track_{idx}"), "title": item_title, "duration": item_duration, "url": entry.get("url") or entry.get("webpage_url", ""), } ) except json_module.JSONDecodeError: pass except Exception as e: pass # Silently ignore if we can't get playlist entries # Fallback: if still no tags detected, get from first item if not tags and extract_ytdlp_tags: tags = extract_ytdlp_tags(data) # Extract formats from the main data object formats = [] if "formats" in data: formats = extract_url_formats(data.get("formats", [])) # Deduplicate tags by namespace to prevent duplicate title:, artist:, etc. try: if dedup_tags_by_namespace: tags = dedup_tags_by_namespace(tags, keep_first=True) except Exception: pass # If dedup fails, return tags as-is return title, tags, formats, playlist_items except subprocess.TimeoutExpired: log("yt-dlp timeout (>30s)", file=sys.stderr) return None, [], [], [] except Exception as e: log(f"URL scraping error: {e}", file=sys.stderr) return None, [], [], [] def extract_url_formats(formats: list) -> List[Tuple[str, str]]: """Extract best formats from yt-dlp formats list. Returns list of (display_label, format_id) tuples. """ try: video_formats = {} # {resolution: format_data} audio_formats = {} # {quality_label: format_data} for fmt in formats: vcodec = fmt.get("vcodec", "none") acodec = fmt.get("acodec", "none") height = fmt.get("height") ext = fmt.get("ext", "unknown") format_id = fmt.get("format_id", "") tbr = fmt.get("tbr", 0) abr = fmt.get("abr", 0) # Video format if vcodec and vcodec != "none" and height: if height < 480: continue res_key = f"{height}p" if res_key not in video_formats or tbr > video_formats[res_key].get( "tbr", 0): video_formats[res_key] = { "label": f"{height}p ({ext})", "format_id": format_id, "tbr": tbr, } # Audio-only format elif acodec and acodec != "none" and (not vcodec or vcodec == "none"): audio_key = f"audio_{abr}" if audio_key not in audio_formats or abr > audio_formats[audio_key].get( "abr", 0): audio_formats[audio_key] = { "label": f"audio ({ext})", "format_id": format_id, "abr": abr, } result = [] # Add video formats in descending resolution order for res in sorted(video_formats.keys(), key=lambda x: int(x.replace("p", "")), reverse=True): fmt = video_formats[res] result.append((fmt["label"], fmt["format_id"])) # Add best audio format if audio_formats: best_audio = max(audio_formats.values(), key=lambda x: x.get("abr", 0)) result.append((best_audio["label"], best_audio["format_id"])) return result except Exception as e: log(f"Error extracting formats: {e}", file=sys.stderr) return []