"""Get tags from Hydrus or local sidecar metadata. This cmdlet retrieves tags for a selected result, supporting both: - Hydrus Network (for files with hash) - Local sidecar files (.tag) In interactive mode: navigate with numbers, add/delete tags In pipeline mode: display tags as read-only table, emit as structured JSON """ from __future__ import annotations import sys from SYS.logger import log, debug try: from Provider.openlibrary import OpenLibrary _ol_scrape_isbn_metadata = OpenLibrary.scrape_isbn_metadata _ol_scrape_openlibrary_metadata = OpenLibrary.scrape_openlibrary_metadata except Exception: _ol_scrape_isbn_metadata = None # type: ignore[assignment] _ol_scrape_openlibrary_metadata = None # type: ignore[assignment] from Provider.metadata_provider import get_metadata_provider, list_metadata_providers import subprocess from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple from SYS import pipeline as ctx from API import HydrusNetwork from API.folder import read_sidecar, write_sidecar, find_sidecar, API_folder_store from . import _shared as sh normalize_hash = sh.normalize_hash looks_like_hash = sh.looks_like_hash Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg SharedArgs = sh.SharedArgs parse_cmdlet_args = sh.parse_cmdlet_args get_field = sh.get_field from SYS.config import get_local_storage_path try: from SYS.metadata import extract_title except ImportError: extract_title = None def _dedup_tags_preserve_order(tags: List[str]) -> List[str]: """Deduplicate tags case-insensitively while preserving order.""" out: List[str] = [] seen: set[str] = set() for t in tags or []: if not isinstance(t, str): continue s = t.strip() if not s: continue key = s.lower() if key in seen: continue seen.add(key) out.append(s) return out def _extract_subtitle_tags(info: Dict[str, Any]) -> List[str]: """Extract subtitle availability tags from a yt-dlp info dict. Produces multi-valued tags so languages can coexist: - subs: - subs_auto: """ def _langs(value: Any) -> List[str]: if not isinstance(value, dict): return [] langs: List[str] = [] for k in value.keys(): if not isinstance(k, str): continue lang = k.strip().lower() if lang: langs.append(lang) return sorted(set(langs)) out: List[str] = [] for lang in _langs(info.get("subtitles")): out.append(f"subs:{lang}") for lang in _langs(info.get("automatic_captions")): out.append(f"subs_auto:{lang}") return out def _scrape_ytdlp_info(url: str) -> Optional[Dict[str, Any]]: """Fetch a yt-dlp info dict without downloading media.""" if not isinstance(url, str) or not url.strip(): return None url = url.strip() # Prefer the Python module when available (faster, avoids shell quoting issues). try: import yt_dlp # type: ignore opts: Any = { "quiet": True, "no_warnings": True, "skip_download": True, "noprogress": True, "socket_timeout": 15, "retries": 1, "playlist_items": "1-10", } with yt_dlp.YoutubeDL(opts) as ydl: info = ydl.extract_info(url, download=False) return info if isinstance(info, dict) else None except Exception: pass # Fallback to yt-dlp CLI if the module isn't available. try: import json as json_module cmd = [ "yt-dlp", "-J", "--no-warnings", "--skip-download", "--playlist-items", "1-10", url, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: return None payload = (result.stdout or "").strip() if not payload: return None data = json_module.loads(payload) return data if isinstance(data, dict) else None except Exception: return None def _resolve_candidate_urls_for_item( result: Any, backend: Any, file_hash: str, config: Dict[str, Any], ) -> List[str]: """Get candidate URLs from backend and/or piped result.""" try: from SYS.metadata import normalize_urls except Exception: normalize_urls = None # type: ignore[assignment] urls: List[str] = [] # 1) Backend URL association (best source of truth) try: backend_urls = backend.get_url(file_hash, config=config) if backend_urls: if normalize_urls: urls.extend(normalize_urls(backend_urls)) else: urls.extend( [ str(u).strip() for u in backend_urls if isinstance(u, str) and str(u).strip() ] ) except Exception: pass # 2) Backend metadata url field try: meta = backend.get_metadata(file_hash, config=config) if isinstance(meta, dict) and meta.get("url"): if normalize_urls: urls.extend(normalize_urls(meta.get("url"))) else: raw = meta.get("url") if isinstance(raw, list): urls.extend( [ str(u).strip() for u in raw if isinstance(u, str) and str(u).strip() ] ) elif isinstance(raw, str) and raw.strip(): urls.append(raw.strip()) except Exception: pass # 3) Piped result fields def _get(obj: Any, key: str, default: Any = None) -> Any: if isinstance(obj, dict): return obj.get(key, default) return getattr(obj, key, default) for key in ("url", "webpage_url", "source_url", "target"): val = _get(result, key, None) if not val: continue if normalize_urls: urls.extend(normalize_urls(val)) continue if isinstance(val, str) and val.strip(): urls.append(val.strip()) elif isinstance(val, list): urls.extend( [str(u).strip() for u in val if isinstance(u, str) and str(u).strip()] ) meta_field = _get(result, "metadata", None) if isinstance(meta_field, dict) and meta_field.get("url"): val = meta_field.get("url") if normalize_urls: urls.extend(normalize_urls(val)) elif isinstance(val, list): urls.extend( [str(u).strip() for u in val if isinstance(u, str) and str(u).strip()] ) elif isinstance(val, str) and val.strip(): urls.append(val.strip()) # Dedup return _dedup_tags_preserve_order(urls) def _pick_supported_ytdlp_url(urls: List[str]) -> Optional[str]: """Pick the first URL that looks supported by yt-dlp (best effort).""" if not urls: return None def _is_hydrus_file_url(u: str) -> bool: text = str(u or "").strip().lower() if not text: return False # Hydrus-local file URLs are retrievable blobs, not original source pages. # yt-dlp generally can't extract meaningful metadata from these. return ("/get_files/file" in text) and ("hash=" in text) http_urls: List[str] = [] for u in urls: text = str(u or "").strip() if text.lower().startswith(("http://", "https://")): http_urls.append(text) # Prefer non-Hydrus URLs for yt-dlp scraping. candidates = [u for u in http_urls if not _is_hydrus_file_url(u)] if not candidates: return None # Prefer a true support check when the Python module is available. try: from SYS.download import is_url_supported_by_ytdlp for text in candidates: try: if is_url_supported_by_ytdlp(text): return text except Exception: continue except Exception: pass # Fallback: use the first non-Hydrus http(s) URL and let extraction decide. return candidates[0] if candidates else None _scrape_isbn_metadata = _ol_scrape_isbn_metadata # type: ignore[assignment] _scrape_openlibrary_metadata = _ol_scrape_openlibrary_metadata # type: ignore[assignment] # Tag item for ResultTable display and piping from dataclasses import dataclass @dataclass class TagItem: """Tag item for display in ResultTable and piping to other cmdlet. Allows tags to be selected and piped like: - delete-tag @{3,4,9} (delete tags at indices 3, 4, 9) - add-tag @"namespace:value" (add this tag) """ tag_name: str tag_index: int # 1-based index for user reference hash: Optional[str] = None store: str = "hydrus" service_name: Optional[str] = None path: Optional[str] = None def __post_init__(self): # Make ResultTable happy by adding standard fields # NOTE: Don't set 'title' - we want only the tag column in ResultTable self.detail = f"Tag #{self.tag_index}" self.target = self.tag_name self.media_kind = "tag" def to_dict(self) -> Dict[str, Any]: """Convert to dict for JSON serialization.""" return { "tag_name": self.tag_name, "tag_index": self.tag_index, "hash": self.hash, "store": self.store, "path": self.path, "service_name": self.service_name, } def _emit_tags_as_table( tags_list: List[str], file_hash: Optional[str], store: str = "hydrus", service_name: Optional[str] = None, config: Optional[Dict[str, Any]] = None, item_title: Optional[str] = None, path: Optional[str] = None, subject: Optional[Any] = None, ) -> None: """Emit tags as TagItem objects and display via ResultTable. This replaces _print_tag_list to make tags pipe-able. Stores the table via ctx.set_last_result_table_overlay (or ctx.set_last_result_table) for downstream @ selection. """ from SYS.result_table import ResultTable # Create ResultTable with just tag column (no title) # Keep the title stable and avoid including hash fragments. table_title = "tag" if item_title: table_title = f"tag: {item_title}" table = ResultTable(table_title, max_columns=1) table.set_source_command("get-tag", []) # Create TagItem for each tag tag_items = [] for idx, tag_name in enumerate(tags_list, start=1): tag_item = TagItem( tag_name=tag_name, tag_index=idx, hash=file_hash, store=store, service_name=service_name, path=path, ) tag_items.append(tag_item) table.add_result(tag_item) # Also emit to pipeline for downstream processing ctx.emit(tag_item) # Store the table and items in history so @.. works to go back # Use overlay mode so it doesn't push the previous search to history stack # This makes get-tag behave like a transient view try: ctx.set_last_result_table_overlay(table, tag_items, subject) except AttributeError: ctx.set_last_result_table(table, tag_items, subject) # Note: CLI will handle displaying the table via ResultTable formatting def _filter_scraped_tags(tags: List[str]) -> List[str]: """Filter out tags we don't want to import from scraping.""" blocked = {"title", "artist", "source"} out: List[str] = [] seen: set[str] = set() for t in tags: if not t: continue s = str(t).strip() if not s: continue ns = s.split(":", 1)[0].strip().lower() if ":" in s else "" if ns in blocked: continue key = s.lower() if key in seen: continue seen.add(key) out.append(s) return out def _summarize_tags(tags_list: List[str], limit: int = 8) -> str: """Create a summary of tags for display.""" shown = [t for t in tags_list[:limit] if t] summary = ", ".join(shown) remaining = max(0, len(tags_list) - len(shown)) if remaining > 0: summary = f"{summary} (+{remaining} more)" if summary else f"(+{remaining} more)" if len(summary) > 200: summary = summary[:197] + "..." return summary def _extract_title_from(tags_list: List[str]) -> Optional[str]: """Extract title from tags list.""" if extract_title: try: return extract_title(tags_list) except Exception: pass for t in tags_list: if isinstance(t, str) and t.lower().startswith("title:"): val = t.split(":", 1)[1].strip() if val: return val return None def _rename_file_if_title_tag(media: Optional[Path], tags_added: List[str]) -> bool: """Rename a local file if title: tag was added. Returns True if file was renamed, False otherwise. """ if not media or not tags_added: return False # Check if any of the added tags is a title: tag title_value = None for tag in tags_added: if isinstance(tag, str): lower_tag = tag.lower() if lower_tag.startswith("title:"): title_value = tag.split(":", 1)[1].strip() break if not title_value: return False try: # Get current file path file_path = media if not file_path.exists(): return False # Parse file path dir_path = file_path.parent old_name = file_path.name # Get file extension suffix = file_path.suffix or "" # Sanitize title for use as filename import re safe_title = re.sub(r'[<>:"/\\|?*]', "", title_value).strip() if not safe_title: return False new_name = safe_title + suffix new_file_path = dir_path / new_name if new_file_path == file_path: return False # Build sidecar paths BEFORE renaming the file old_sidecar = Path(str(file_path) + ".tag") new_sidecar = Path(str(new_file_path) + ".tag") # Rename file try: file_path.rename(new_file_path) log(f"Renamed file: {old_name} → {new_name}") # Rename .tag sidecar if it exists if old_sidecar.exists(): try: old_sidecar.rename(new_sidecar) log(f"Renamed sidecar: {old_name}.tag → {new_name}.tag") except Exception as e: log(f"Failed to rename sidecar: {e}", file=sys.stderr) return True except Exception as e: log(f"Failed to rename file: {e}", file=sys.stderr) return False except Exception as e: log(f"Error during file rename: {e}", file=sys.stderr) return False def _apply_result_updates_from_tags(result: Any, tag_list: List[str]) -> None: """Update result object with title and tag summary from tags.""" try: new_title = _extract_title_from(tag_list) if new_title: setattr(result, "title", new_title) setattr(result, "tag_summary", _summarize_tags(tag_list)) except Exception: pass def _handle_title_rename(old_path: Path, tags_list: List[str]) -> Optional[Path]: """If a title: tag is present, rename the file and its .tag sidecar to match. Returns the new path if renamed, otherwise returns None. """ # Extract title from tags new_title = None for tag in tags_list: if isinstance(tag, str) and tag.lower().startswith("title:"): new_title = tag.split(":", 1)[1].strip() break if not new_title or not old_path.exists(): return None try: # Build new filename with same extension old_name = old_path.name old_suffix = old_path.suffix # Create new filename: title + extension new_name = f"{new_title}{old_suffix}" new_path = old_path.parent / new_name # Don't rename if already the same name if new_path == old_path: return None # Rename the main file if new_path.exists(): log(f"Warning: Target filename already exists: {new_name}", file=sys.stderr) return None old_path.rename(new_path) log(f"Renamed file: {old_name} → {new_name}", file=sys.stderr) # Rename the .tag sidecar if it exists old_tags_path = old_path.parent / (old_name + ".tag") if old_tags_path.exists(): new_tags_path = old_path.parent / (new_name + ".tag") if new_tags_path.exists(): log( f"Warning: Target sidecar already exists: {new_tags_path.name}", file=sys.stderr ) else: old_tags_path.rename(new_tags_path) log( f"Renamed sidecar: {old_tags_path.name} → {new_tags_path.name}", file=sys.stderr ) return new_path except Exception as exc: log(f"Warning: Failed to rename file: {exc}", file=sys.stderr) return None def _read_sidecar_fallback(p: Path) -> tuple[Optional[str], List[str], List[str]]: """Fallback sidecar reader if metadata module unavailable. Format: - Lines with "hash:" prefix: file hash - Lines with "url:" or "url:" prefix: url - Lines with "relationship:" prefix: ignored (internal relationships) - Lines with "key:", "namespace:value" format: treated as namespace tags - Plain lines without colons: freeform tags Excluded namespaces (treated as metadata, not tags): hash, url, url, relationship """ try: raw = p.read_text(encoding="utf-8", errors="ignore") except OSError: return None, [], [] t: List[str] = [] u: List[str] = [] h: Optional[str] = None # Namespaces to exclude from tags excluded_namespaces = {"hash", "url", "url", "relationship"} for line in raw.splitlines(): s = line.strip() if not s: continue low = s.lower() # Check if this is a hash line if low.startswith("hash:"): h = s.split(":", 1)[1].strip() if ":" in s else h # Check if this is a URL line elif low.startswith("url:") or low.startswith("url:"): val = s.split(":", 1)[1].strip() if ":" in s else "" if val: u.append(val) # Check if this is an excluded namespace elif ":" in s: namespace = s.split(":", 1)[0].strip().lower() if namespace not in excluded_namespaces: # Include as namespace tag (e.g., "title: The Freemasons") t.append(s) else: # Plain text without colon = freeform tag t.append(s) return h, t, u def _write_sidecar( p: Path, media: Path, tag_list: List[str], url: List[str], hash_in_sidecar: Optional[str] ) -> Path: """Write tags to sidecar file and handle title-based renaming. Returns the new media path if renamed, otherwise returns the original media path. """ success = write_sidecar(media, tag_list, url, hash_in_sidecar) if success: _apply_result_updates_from_tags(None, tag_list) # Check if we should rename the file based on title tag new_media = _handle_title_rename(media, tag_list) if new_media: return new_media return media # Fallback writer ordered = [s for s in tag_list if s and s.strip()] lines = [] if hash_in_sidecar: lines.append(f"hash:{hash_in_sidecar}") lines.extend(ordered) for u in url: lines.append(f"url:{u}") try: p.write_text("\n".join(lines) + "\n", encoding="utf-8") # Check if we should rename the file based on title tag new_media = _handle_title_rename(media, tag_list) if new_media: return new_media return media except OSError as exc: log(f"Failed to write sidecar: {exc}", file=sys.stderr) return media def _emit_tag_payload( source: str, tags_list: List[str], *, hash_value: Optional[str], extra: Optional[Dict[str, Any]] = None, store_label: Optional[str] = None, ) -> int: """Emit tag values as structured payload to pipeline.""" payload: Dict[str, Any] = { "source": source, "tag": list(tags_list), "count": len(tags_list), } if hash_value: payload["hash"] = hash_value if extra: for key, value in extra.items(): if value is not None: payload[key] = value label = None if store_label: label = store_label elif ctx.get_stage_context() is not None: label = "tag" if label: ctx.store_value(label, payload) # Emit individual TagItem objects so they can be selected by bare index # When in pipeline, emit individual TagItem objects if ctx.get_stage_context() is not None: for idx, tag_name in enumerate(tags_list, start=1): tag_item = TagItem( tag_name=tag_name, tag_index=idx, hash=hash_value, store=source, service_name=None ) ctx.emit(tag_item) else: # When not in pipeline, just emit the payload ctx.emit(payload) return 0 def _extract_scrapable_identifiers(tags_list: List[str]) -> Dict[str, str]: """Extract scrapable identifiers from tags.""" identifiers = {} scrapable_prefixes = { "openlibrary", "isbn", "isbn_10", "isbn_13", "musicbrainz", "musicbrainzalbum", "imdb", "tmdb", "tvdb", } for tag in tags_list: if not isinstance(tag, str) or ":" not in tag: continue parts = tag.split(":", 1) if len(parts) != 2: continue key_raw = parts[0].strip().lower() key = key_raw.replace("-", "_") if key == "isbn10": key = "isbn_10" elif key == "isbn13": key = "isbn_13" value = parts[1].strip() # Normalize ISBN values by removing hyphens for API friendliness if key.startswith("isbn"): value = value.replace("-", "") if key in scrapable_prefixes and value: identifiers[key] = value return identifiers def _extract_tag_value(tags_list: List[str], namespace: str) -> Optional[str]: """Get first tag value for a namespace (e.g., artist:, title:).""" ns = namespace.lower() for tag in tags_list: if not isinstance(tag, str) or ":" not in tag: continue prefix, _, value = tag.partition(":") if prefix.strip().lower() != ns: continue candidate = value.strip() if candidate: return candidate return None def _scrape_url_metadata( url: str, ) -> Tuple[Optional[str], List[str], List[Tuple[str, str]], List[Dict[str, Any]]]: """Scrape metadata from a URL using yt-dlp. Returns: (title, tags, formats, playlist_items) tuple where: - title: Video/content title - tags: List of extracted tags (both namespaced and freeform) - formats: List of (display_label, format_id) tuples - playlist_items: List of playlist entry dicts (empty if not a playlist) """ try: import json as json_module try: from SYS.metadata import extract_ytdlp_tags except ImportError: extract_ytdlp_tags = None # Build yt-dlp command with playlist support # IMPORTANT: Do NOT use --flat-playlist! It strips metadata like artist, album, uploader, genre # Without it, yt-dlp gives us full metadata in an 'entries' array within a single JSON object # This ensures we get album-level metadata from sources like BandCamp, YouTube Music, etc. cmd = [ "yt-dlp", "-j", # Output JSON "--no-warnings", "--playlist-items", "1-10", # Get first 10 items if it's a playlist (provides entries) "-f", "best", url, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: log(f"yt-dlp error: {result.stderr}", file=sys.stderr) return None, [], [], [] # Parse JSON output - WITHOUT --flat-playlist, we get ONE JSON object with 'entries' array # This gives us full metadata instead of flat format lines = result.stdout.strip().split("\n") if not lines or not lines[0]: log("yt-dlp returned empty output", file=sys.stderr) return None, [], [], [] # Parse the single JSON object try: data = json_module.loads(lines[0]) except json_module.JSONDecodeError as e: log(f"Failed to parse yt-dlp JSON: {e}", file=sys.stderr) return None, [], [], [] # Extract title - use the main title title = data.get("title", "Unknown") # Determine if this is a playlist/album (has entries array) # is_playlist = 'entries' in data and isinstance(data.get('entries'), list) # Extract tags and playlist items tags = [] playlist_items = [] # IMPORTANT: Extract album/playlist-level tags FIRST (before processing entries) # This ensures we get metadata about the collection, not just individual tracks if extract_ytdlp_tags: album_tags = extract_ytdlp_tags(data) tags.extend(album_tags) # Case 1: Entries are nested in the main object (standard playlist structure) if "entries" in data and isinstance(data.get("entries"), list): entries = data["entries"] # Build playlist items with title and duration for idx, entry in enumerate(entries, 1): if isinstance(entry, dict): item_title = entry.get("title", entry.get("id", f"Track {idx}")) item_duration = entry.get("duration", 0) playlist_items.append( { "index": idx, "id": entry.get("id", f"track_{idx}"), "title": item_title, "duration": item_duration, "url": entry.get("url") or entry.get("webpage_url", ""), } ) # Extract tags from each entry and merge (but don't duplicate album-level tags) # Only merge entry tags that are multi-value prefixes (not single-value like title:, artist:, etc.) if extract_ytdlp_tags: entry_tags = extract_ytdlp_tags(entry) # Single-value namespaces that should not be duplicated from entries single_value_namespaces = { "title", "artist", "album", "creator", "channel", "release_date", "upload_date", "license", "location", } for tag in entry_tags: # Extract the namespace (part before the colon) tag_namespace = tag.split(":", 1)[0].lower( ) if ":" in tag else None # Skip if this namespace already exists in tags (from album level) if tag_namespace and tag_namespace in single_value_namespaces: # Check if any tag with this namespace already exists in tags already_has_namespace = any( t.split(":", 1)[0].lower() == tag_namespace for t in tags if ":" in t ) if already_has_namespace: continue # Skip this tag, keep the album-level one if tag not in tags: # Avoid exact duplicates tags.append(tag) # Case 2: Playlist detected by playlist_count field (BandCamp albums, etc.) # These need a separate call with --flat-playlist to get the actual entries elif (data.get("playlist_count") or 0) > 0 and "entries" not in data: try: # Make a second call with --flat-playlist to get the actual tracks flat_cmd = [ "yt-dlp", "-j", "--no-warnings", "--flat-playlist", "-f", "best", url ] flat_result = subprocess.run( flat_cmd, capture_output=True, text=True, timeout=30 ) if flat_result.returncode == 0: flat_lines = flat_result.stdout.strip().split("\n") # With --flat-playlist, each line is a separate track JSON object # (not nested in a playlist container), so process ALL lines for idx, line in enumerate(flat_lines, 1): if line.strip().startswith("{"): try: entry = json_module.loads(line) item_title = entry.get( "title", entry.get("id", f"Track {idx}") ) item_duration = entry.get("duration", 0) playlist_items.append( { "index": idx, "id": entry.get("id", f"track_{idx}"), "title": item_title, "duration": item_duration, "url": entry.get("url") or entry.get("webpage_url", ""), } ) except json_module.JSONDecodeError: pass except Exception as e: pass # Silently ignore if we can't get playlist entries # Fallback: if still no tags detected, get from first item if not tags and extract_ytdlp_tags: tags = extract_ytdlp_tags(data) # Extract formats from the main data object formats = [] if "formats" in data: formats = _extract_url_formats(data.get("formats", [])) # Deduplicate tags by namespace to prevent duplicate title:, artist:, etc. try: from SYS.metadata import dedup_tags_by_namespace as _dedup if _dedup: tags = _dedup(tags, keep_first=True) except Exception: pass # If dedup fails, return tags as-is return title, tags, formats, playlist_items except subprocess.TimeoutExpired: log("yt-dlp timeout (>30s)", file=sys.stderr) return None, [], [], [] except Exception as e: log(f"URL scraping error: {e}", file=sys.stderr) return None, [], [], [] def _extract_url_formats(formats: list) -> List[Tuple[str, str]]: """Extract best formats from yt-dlp formats list. Returns list of (display_label, format_id) tuples. """ try: video_formats = {} # {resolution: format_data} audio_formats = {} # {quality_label: format_data} for fmt in formats: vcodec = fmt.get("vcodec", "none") acodec = fmt.get("acodec", "none") height = fmt.get("height") ext = fmt.get("ext", "unknown") format_id = fmt.get("format_id", "") tbr = fmt.get("tbr", 0) abr = fmt.get("abr", 0) # Video format if vcodec and vcodec != "none" and height: if height < 480: continue res_key = f"{height}p" if res_key not in video_formats or tbr > video_formats[res_key].get( "tbr", 0): video_formats[res_key] = { "label": f"{height}p ({ext})", "format_id": format_id, "tbr": tbr, } # Audio-only format elif acodec and acodec != "none" and (not vcodec or vcodec == "none"): audio_key = f"audio_{abr}" if audio_key not in audio_formats or abr > audio_formats[audio_key].get( "abr", 0): audio_formats[audio_key] = { "label": f"audio ({ext})", "format_id": format_id, "abr": abr, } result = [] # Add video formats in descending resolution order for res in sorted(video_formats.keys(), key=lambda x: int(x.replace("p", "")), reverse=True): fmt = video_formats[res] result.append((fmt["label"], fmt["format_id"])) # Add best audio format if audio_formats: best_audio = max(audio_formats.values(), key=lambda x: x.get("abr", 0)) result.append((best_audio["label"], best_audio["format_id"])) return result except Exception as e: log(f"Error extracting formats: {e}", file=sys.stderr) return [] def _scrape_isbn_metadata(isbn: str) -> List[str]: if _ol_scrape_isbn_metadata is None: log("OpenLibrary scraper unavailable", file=sys.stderr) return [] try: return list(_ol_scrape_isbn_metadata(isbn)) except Exception as e: log(f"ISBN scraping error: {e}", file=sys.stderr) return [] def _scrape_openlibrary_metadata(olid: str) -> List[str]: if _ol_scrape_openlibrary_metadata is None: log("OpenLibrary scraper unavailable", file=sys.stderr) return [] try: return list(_ol_scrape_openlibrary_metadata(olid)) except Exception as e: log(f"OpenLibrary scraping error: {e}", file=sys.stderr) return [] def _perform_scraping(tags_list: List[str]) -> List[str]: """Perform scraping based on identifiers in tags. Priority order: 1. openlibrary: (preferred - more complete metadata) 2. isbn_10 or isbn (fallback) """ identifiers = _extract_scrapable_identifiers(tags_list) if not identifiers: log("No scrapable identifiers found (openlibrary, ISBN, musicbrainz, imdb)") return [] log(f"Found scrapable identifiers: {', '.join(identifiers.keys())}") new_tags = [] # Prefer OpenLibrary over ISBN (more complete metadata) if "openlibrary" in identifiers: olid = identifiers["openlibrary"] if olid: log(f"Scraping OpenLibrary: {olid}") new_tags.extend(_scrape_openlibrary_metadata(olid)) elif "isbn_13" in identifiers or "isbn_10" in identifiers or "isbn" in identifiers: isbn = identifiers.get("isbn_13") or identifiers.get( "isbn_10" ) or identifiers.get("isbn") if isbn: log(f"Scraping ISBN: {isbn}") new_tags.extend(_scrape_isbn_metadata(isbn)) existing_tags_lower = {tag.lower() for tag in tags_list} scraped_unique = [] seen = set() for tag in new_tags: tag_lower = tag.lower() if tag_lower not in existing_tags_lower and tag_lower not in seen: scraped_unique.append(tag) seen.add(tag_lower) if scraped_unique: log(f"Added {len(scraped_unique)} new tag(s) from scraping") return scraped_unique def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Get tags from Hydrus, local sidecar, or URL metadata. Usage: get-tag [-query "hash:"] [--store ] [--emit] get-tag -scrape Options: -query "hash:": Override hash to use instead of result's hash --store : Store result to this key for pipeline --emit: Emit result without interactive prompt (quiet mode) -scrape : Scrape metadata from URL or provider name (itunes, openlibrary, googlebooks, imdb) """ args_list = [str(arg) for arg in (args or [])] raw_args = list(args_list) # Support numeric selection tokens (e.g., "@1" leading to argument "1") without treating # them as hash overrides. This lets users pick from the most recent table overlay/results. if len(args_list) == 1: token = args_list[0] if not token.startswith("-") and token.isdigit(): try: idx = int(token) - 1 items_pool = ctx.get_last_result_items() if 0 <= idx < len(items_pool): result = items_pool[idx] args_list = [] debug( f"[get_tag] Resolved numeric selection arg {token} -> last_result_items[{idx}]" ) else: debug( f"[get_tag] Numeric selection arg {token} out of range (items={len(items_pool)})" ) except Exception as exc: debug( f"[get_tag] Failed to resolve numeric selection arg {token}: {exc}" ) # Helper to get field from both dict and object def get_field(obj: Any, field: str, default: Any = None) -> Any: if isinstance(obj, dict): return obj.get(field, default) else: return getattr(obj, field, default) # Parse arguments using shared parser parsed_args = parse_cmdlet_args(args_list, CMDLET) # Detect if -scrape flag was provided without a value (parse_cmdlet_args skips missing values) scrape_flag_present = any( str(arg).lower() in {"-scrape", "--scrape"} for arg in args_list ) # Extract values query_raw = parsed_args.get("query") hash_override = sh.parse_single_hash_query(query_raw) if query_raw and not hash_override: log("Invalid -query value (expected hash:)", file=sys.stderr) return 1 store_key = parsed_args.get("store") emit_requested = parsed_args.get("emit", False) scrape_url = parsed_args.get("scrape") scrape_requested = scrape_flag_present or scrape_url is not None # Convenience: `-scrape` with no value defaults to `ytdlp` (store-backed URL scrape). if scrape_flag_present and (scrape_url is None or str(scrape_url).strip() == ""): scrape_url = "ytdlp" scrape_requested = True if scrape_requested and (scrape_url is None or str(scrape_url).strip() == ""): log("-scrape requires a URL or provider name", file=sys.stderr) return 1 # Handle URL or provider scraping mode if scrape_requested and scrape_url: import json as json_module if str(scrape_url).strip().lower() == "ytdlp": # Scrape metadata from the selected item's URL via yt-dlp (no download), # then OVERWRITE all existing tags (including title:). # # This mode requires a store-backed item (hash + store). # # NOTE: We intentionally do not reuse _scrape_url_metadata() here because it # performs namespace deduplication that would collapse multi-valued tags. file_hash = normalize_hash(hash_override) or normalize_hash( get_field(result, "hash", None) ) store_name = get_field(result, "store", None) subject_path = ( get_field(result, "path", None) or get_field(result, "target", None) or get_field(result, "filename", None) ) item_title = ( get_field(result, "title", None) or get_field(result, "name", None) or get_field(result, "filename", None) ) # Only run overwrite-apply when the item is store-backed. # If this is a URL-only PipeObject, fall through to provider mode below. if (file_hash and store_name and str(file_hash).strip().lower() != "unknown" and str(store_name).strip().upper() not in {"PATH", "URL"}): try: from Store import Store storage = Store(config) backend = storage[str(store_name)] except Exception as exc: log( f"Failed to resolve store backend '{store_name}': {exc}", file=sys.stderr ) return 1 candidate_urls = _resolve_candidate_urls_for_item( result, backend, file_hash, config ) scrape_target = _pick_supported_ytdlp_url(candidate_urls) if not scrape_target: log( "No yt-dlp-supported source URL found for this item (Hydrus /get_files/file URLs are ignored). ", file=sys.stderr, ) log( "Add the original page URL to the file (e.g. via add-url), then retry get-tag -scrape.", file=sys.stderr, ) return 1 info = _scrape_ytdlp_info(scrape_target) if not info: log( "yt-dlp could not extract metadata for this URL (unsupported or failed)", file=sys.stderr, ) return 1 try: from SYS.metadata import extract_ytdlp_tags except Exception: extract_ytdlp_tags = None # type: ignore[assignment] # Prefer the top-level metadata, but if this is a playlist container, use # the first entry for per-item fields like subtitles. info_for_subs = info entries = info.get("entries") if isinstance(info, dict) else None if isinstance(entries, list) and entries: first = entries[0] if isinstance(first, dict): info_for_subs = first tags: List[str] = [] if extract_ytdlp_tags: try: tags.extend(extract_ytdlp_tags(info)) except Exception: pass # Subtitle availability tags try: tags.extend( _extract_subtitle_tags( info_for_subs if isinstance(info_for_subs, dict) else {} ) ) except Exception: pass # Ensure we actually have something to apply. tags = _dedup_tags_preserve_order(tags) if not tags: log("No tags extracted from yt-dlp metadata", file=sys.stderr) return 1 # Full overwrite: delete all existing tags, then add the new set. try: existing_tags, _src = backend.get_tag(file_hash, config=config) except Exception: existing_tags = [] try: if existing_tags: backend.delete_tag( file_hash, list(existing_tags), config=config ) except Exception as exc: debug(f"[get_tag] ytdlp overwrite: delete_tag failed: {exc}") try: backend.add_tag(file_hash, list(tags), config=config) except Exception as exc: log(f"Failed to apply yt-dlp tags: {exc}", file=sys.stderr) return 1 # Show updated tags try: updated_tags, _src = backend.get_tag(file_hash, config=config) except Exception: updated_tags = tags if not updated_tags: updated_tags = tags _emit_tags_as_table( tags_list=list(updated_tags), file_hash=file_hash, store=str(store_name), service_name=None, config=config, item_title=str(item_title or "ytdlp"), path=str(subject_path) if subject_path else None, subject={ "hash": file_hash, "store": str(store_name), "path": str(subject_path) if subject_path else None, "title": item_title, "extra": { "applied_provider": "ytdlp", "scrape_url": scrape_target }, }, ) return 0 if scrape_url.startswith("http://") or scrape_url.startswith("https://"): # URL scraping (existing behavior) title, tags, formats, playlist_items = _scrape_url_metadata(scrape_url) if not tags: log("No tags extracted from URL", file=sys.stderr) return 1 output = { "title": title, "tag": tags, "formats": [(label, fmt_id) for label, fmt_id in formats], "playlist_items": playlist_items, } print(json_module.dumps(output, ensure_ascii=False)) return 0 # Provider scraping (e.g., itunes, imdb) provider = get_metadata_provider(scrape_url, config) if provider is None: log(f"Unknown metadata provider: {scrape_url}", file=sys.stderr) return 1 # Prefer identifier tags (ISBN/OLID/etc.) when available; fallback to title/filename. # IMPORTANT: do not rely on `result.tag` for this because it can be stale (cached on # the piped PipeObject). Always prefer the current store-backed tags when possible. identifier_tags: List[str] = [] file_hash_for_scrape = normalize_hash(hash_override) or normalize_hash( get_field(result, "hash", None) ) store_for_scrape = get_field(result, "store", None) if file_hash_for_scrape and store_for_scrape: try: from Store import Store storage = Store(config) backend = storage[str(store_for_scrape)] current_tags, _src = backend.get_tag(file_hash_for_scrape, config=config) if isinstance(current_tags, (list, tuple, set)) and current_tags: identifier_tags = [ str(t) for t in current_tags if isinstance(t, (str, bytes)) ] except Exception: # Fall back to whatever is present on the piped result if store lookup fails. pass # Fall back to tags carried on the result (may be stale). if not identifier_tags: result_tags = get_field(result, "tag", None) if isinstance(result_tags, list): identifier_tags = [ str(t) for t in result_tags if isinstance(t, (str, bytes)) ] # As a last resort, try local sidecar only when the item is not store-backed. if not identifier_tags and (not file_hash_for_scrape or not store_for_scrape): file_path = ( get_field(result, "target", None) or get_field(result, "path", None) or get_field(result, "filename", None) ) if (isinstance(file_path, str) and file_path and not file_path.lower().startswith( ("http://", "https://"))): try: media_path = Path(str(file_path)) if media_path.exists(): tags_from_sidecar = read_sidecar(media_path) if isinstance(tags_from_sidecar, list): identifier_tags = [ str(t) for t in tags_from_sidecar if isinstance(t, (str, bytes)) ] except Exception: pass title_from_tags = _extract_tag_value(identifier_tags, "title") artist_from_tags = _extract_tag_value(identifier_tags, "artist") identifiers = _extract_scrapable_identifiers(identifier_tags) identifier_query: Optional[str] = None if identifiers: if provider.name in {"openlibrary", "googlebooks", "google"}: identifier_query = ( identifiers.get("isbn_13") or identifiers.get("isbn_10") or identifiers.get("isbn") or identifiers.get("openlibrary") ) elif provider.name == "imdb": identifier_query = identifiers.get("imdb") elif provider.name == "itunes": identifier_query = identifiers.get("musicbrainz") or identifiers.get( "musicbrainzalbum" ) # Determine query from identifier first, else title on the result or filename title_hint = ( title_from_tags or get_field(result, "title", None) or get_field(result, "name", None) ) if not title_hint: file_path = get_field(result, "path", None) or get_field(result, "filename", None) if file_path: title_hint = Path(str(file_path)).stem artist_hint = ( artist_from_tags or get_field(result, "artist", None) or get_field(result, "uploader", None) ) if not artist_hint: meta_field = get_field(result, "metadata", None) if isinstance(meta_field, dict): meta_artist = meta_field.get("artist") or meta_field.get("uploader") if meta_artist: artist_hint = str(meta_artist) combined_query: Optional[str] = None if (not identifier_query and title_hint and artist_hint and provider.name in {"itunes", "musicbrainz"}): if provider.name == "musicbrainz": combined_query = f'recording:"{title_hint}" AND artist:"{artist_hint}"' else: combined_query = f"{title_hint} {artist_hint}" # yt-dlp isn't a search provider; it requires a URL. url_hint: Optional[str] = None if provider.name == "ytdlp": raw_url = ( get_field(result, "url", None) or get_field(result, "source_url", None) or get_field(result, "target", None) ) if isinstance(raw_url, list) and raw_url: raw_url = raw_url[0] if isinstance(raw_url, str) and raw_url.strip().startswith(("http://", "https://")): url_hint = raw_url.strip() query_hint = url_hint or identifier_query or combined_query or title_hint if not query_hint: log( "No title or identifier available to search for metadata", file=sys.stderr ) return 1 if identifier_query: log(f"Using identifier for metadata search: {identifier_query}") elif combined_query: log(f"Using title+artist for metadata search: {title_hint} - {artist_hint}") else: log(f"Using title for metadata search: {query_hint}") items = provider.search(query_hint, limit=10) if not items: log("No metadata results found", file=sys.stderr) return 1 # For yt-dlp, emit tags directly (there is no meaningful multi-result selection step). if provider.name == "ytdlp": try: tags = [str(t) for t in provider.to_tags(items[0]) if t is not None] except Exception: tags = [] if not tags: log("No tags extracted from yt-dlp metadata", file=sys.stderr) return 1 _emit_tags_as_table( tags_list=list(tags), file_hash=None, store="url", service_name=None, config=config, item_title=str(items[0].get("title") or "ytdlp"), path=None, subject={ "provider": "ytdlp", "url": str(query_hint) }, ) return 0 from SYS.result_table import ResultTable table = ResultTable(f"Metadata: {provider.name}") table.set_table(f"metadata.{provider.name}") table.set_source_command("get-tag", []) selection_payload = [] hash_for_payload = normalize_hash(hash_override) or normalize_hash( get_field(result, "hash", None) ) store_for_payload = get_field(result, "store", None) # Preserve a consistent path field when present so selecting a metadata row # keeps referring to the original file. path_for_payload = ( get_field(result, "path", None) or get_field(result, "target", None) or get_field(result, "filename", None) ) for idx, item in enumerate(items): tags = _filter_scraped_tags(provider.to_tags(item)) row = table.add_row() row.add_column("Title", item.get("title", "")) row.add_column("Artist", item.get("artist", "")) row.add_column("Album", item.get("album", "")) row.add_column("Year", item.get("year", "")) payload = { "tag": tags, "provider": provider.name, "title": item.get("title"), "artist": item.get("artist"), "album": item.get("album"), "year": item.get("year"), "hash": hash_for_payload, "store": store_for_payload, "path": path_for_payload, "extra": { "tag": tags, "provider": provider.name, }, } selection_payload.append(payload) table.set_row_selection_args(idx, [str(idx + 1)]) # Store an overlay so that a subsequent `@N` selects from THIS metadata table, # not from the previous searchable table. ctx.set_last_result_table_overlay(table, selection_payload) ctx.set_current_stage_table(table) return 0 # If -scrape was requested but no URL, that's an error if scrape_requested and not scrape_url: log("-scrape requires a URL argument", file=sys.stderr) return 1 # Handle @N selection which creates a list - extract the first item if isinstance(result, list) and len(result) > 0: result = result[0] # If the current result already carries a tag list (e.g. a selected metadata # row from get-tag -scrape itunes), APPLY those tags to the file in the store. result_provider = get_field(result, "provider", None) result_tags = get_field(result, "tag", None) if result_provider and isinstance(result_tags, list) and result_tags: file_hash = normalize_hash(hash_override) or normalize_hash( get_field(result, "hash", None) ) store_name = get_field(result, "store", None) subject_path = ( get_field(result, "path", None) or get_field(result, "target", None) or get_field(result, "filename", None) ) if not file_hash or not store_name: log( "Selected metadata row is missing hash/store; cannot apply tags", file=sys.stderr ) _emit_tags_as_table( tags_list=[str(t) for t in result_tags if t is not None], file_hash=file_hash, store=str(store_name or "local"), service_name=None, config=config, item_title=str(get_field(result, "title", None) or result_provider), path=str(subject_path) if subject_path else None, subject=result, ) _emit_tag_payload( str(result_provider), [str(t) for t in result_tags if t is not None], hash_value=file_hash, ) return 0 # Apply tags to the store backend (no sidecar writing here). if str(result_provider).strip().lower() == "ytdlp": apply_tags = [str(t) for t in result_tags if t is not None] else: apply_tags = _filter_scraped_tags( [str(t) for t in result_tags if t is not None] ) if not apply_tags: log( "No applicable scraped tags to apply (title:/artist:/source: are skipped)", file=sys.stderr, ) return 0 try: from Store import Store storage = Store(config) backend = storage[str(store_name)] ok = bool(backend.add_tag(file_hash, apply_tags, config=config)) if not ok: log(f"Failed to apply tags to store '{store_name}'", file=sys.stderr) except Exception as exc: log(f"Failed to apply tags: {exc}", file=sys.stderr) return 1 # Show updated tags after applying. try: updated_tags, _src = backend.get_tag(file_hash, config=config) except Exception: updated_tags = apply_tags if not updated_tags: updated_tags = apply_tags _emit_tags_as_table( tags_list=list(updated_tags), file_hash=file_hash, store=str(store_name), service_name=None, config=config, item_title=str( get_field(result, "title", None) or get_field(result, "name", None) or str(result_provider) ), path=str(subject_path) if subject_path else None, subject={ "hash": file_hash, "store": str(store_name), "path": str(subject_path) if subject_path else None, "title": get_field(result, "title", None) or get_field(result, "name", None), "extra": { "applied_provider": str(result_provider) }, }, ) _emit_tag_payload( str(store_name), list(updated_tags), hash_value=file_hash, extra={"applied_provider": str(result_provider)}, ) return 0 hash_from_result = normalize_hash(get_field(result, "hash", None)) file_hash = hash_override or hash_from_result # Only use emit mode if explicitly requested with --emit flag, not just because we're in a pipeline # This allows interactive REPL to work even in pipelines emit_mode = emit_requested or bool(store_key) store_label = store_key.strip() if store_key and store_key.strip() else None # Get hash and store from result store_name = get_field(result, "store") if not file_hash: log("No hash available in result", file=sys.stderr) return 1 if not store_name: log("No store specified in result", file=sys.stderr) return 1 # Get tags using storage backend try: from Store import Store storage = Store(config) backend = storage[store_name] current, source = backend.get_tag(file_hash, config=config) if not current: log("No tags found", file=sys.stderr) return 1 service_name = "" except KeyError: log(f"Store '{store_name}' not found", file=sys.stderr) return 1 except Exception as exc: log(f"Failed to get tags: {exc}", file=sys.stderr) return 1 # Always output to ResultTable (pipeline mode only) # Extract title for table header item_title = ( get_field(result, "title", None) or get_field(result, "name", None) or get_field(result, "filename", None) ) # Build a subject payload representing the file whose tags are being shown subject_store = get_field(result, "store", None) or store_name subject_path = ( get_field(result, "path", None) or get_field(result, "target", None) or get_field(result, "filename", None) ) subject_payload: Dict[str, Any] = { "tag": list(current), "title": item_title, "name": item_title, "store": subject_store, "service_name": service_name, "extra": { "tag": list(current), }, } if file_hash: subject_payload["hash"] = file_hash if subject_path: try: subject_payload["path"] = str(subject_path) except Exception: pass _emit_tags_as_table( current, file_hash=file_hash, store=subject_store, service_name=service_name if source == "hydrus" else None, config=config, item_title=item_title, path=str(subject_path) if subject_path else None, subject=subject_payload, ) # If emit requested or store key provided, emit payload if emit_mode: _emit_tag_payload( source, current, hash_value=file_hash, store_label=store_label ) return 0 _SCRAPE_CHOICES = [] try: _SCRAPE_CHOICES = sorted(list_metadata_providers().keys()) except Exception: _SCRAPE_CHOICES = [ "itunes", "openlibrary", "googlebooks", "google", "musicbrainz", "imdb", ] # Special scrape mode: pull tags from an item's URL via yt-dlp (no download) if "ytdlp" not in _SCRAPE_CHOICES: _SCRAPE_CHOICES.append("ytdlp") _SCRAPE_CHOICES = sorted(_SCRAPE_CHOICES) class Get_Tag(Cmdlet): """Class-based get-tag cmdlet with self-registration.""" def __init__(self) -> None: """Initialize get-tag cmdlet.""" super().__init__( name="get-tag", summary="Get tag values from Hydrus or local sidecar metadata", usage= 'get-tag [-query "hash:"] [--store ] [--emit] [-scrape ]', alias=[], arg=[ SharedArgs.QUERY, CmdletArg( name="-store", type="string", description="Store result to this key for pipeline", alias="store", ), CmdletArg( name="-emit", type="flag", description="Emit result without interactive prompt (quiet mode)", alias="emit-only", ), CmdletArg( name="-scrape", type="string", description= "Scrape metadata from URL/provider, or use 'ytdlp' to scrape from the item's URL and overwrite tags", required=False, choices=_SCRAPE_CHOICES, ), ], detail=[ "- Retrieves tags for a file from:", " Hydrus: Using file hash if available", " Local: From sidecar files or local library database", "- Options:", ' -query: Override hash to look up in Hydrus (use: -query "hash:")', " -store: Store result to key for downstream pipeline", " -emit: Quiet mode (no interactive selection)", " -scrape: Scrape metadata from URL or metadata provider", ], exec=self.run, ) self.register() def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Execute get-tag cmdlet.""" return _run(result, args, config) # Create and register the cmdlet CMDLET = Get_Tag()