Files
Medios-Macina/cmdlet/get_tag.py
2025-12-22 02:11:53 -08:00

1590 lines
51 KiB
Python

"""Get tags from Hydrus or local sidecar metadata.
This cmdlet retrieves tags for a selected result, supporting both:
- Hydrus Network (for files with hash)
- Local sidecar files (.tag)
In interactive mode: navigate with numbers, add/delete tags
In pipeline mode: display tags as read-only table, emit as structured JSON
"""
from __future__ import annotations
import sys
from SYS.logger import log, debug
try:
from Provider.openlibrary import OpenLibrary
_ol_scrape_isbn_metadata = OpenLibrary.scrape_isbn_metadata
_ol_scrape_openlibrary_metadata = OpenLibrary.scrape_openlibrary_metadata
except Exception:
_ol_scrape_isbn_metadata = None # type: ignore[assignment]
_ol_scrape_openlibrary_metadata = None # type: ignore[assignment]
from Provider.metadata_provider import get_metadata_provider, list_metadata_providers
import subprocess
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
import pipeline as ctx
from API import HydrusNetwork
from API.folder import read_sidecar, write_sidecar, find_sidecar, API_folder_store
from . import _shared as sh
normalize_hash = sh.normalize_hash
looks_like_hash = sh.looks_like_hash
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
parse_cmdlet_args = sh.parse_cmdlet_args
get_field = sh.get_field
from config import get_local_storage_path
try:
from metadata import extract_title
except ImportError:
extract_title = None
def _dedup_tags_preserve_order(tags: List[str]) -> List[str]:
"""Deduplicate tags case-insensitively while preserving order."""
out: List[str] = []
seen: set[str] = set()
for t in tags or []:
if not isinstance(t, str):
continue
s = t.strip()
if not s:
continue
key = s.lower()
if key in seen:
continue
seen.add(key)
out.append(s)
return out
def _extract_subtitle_tags(info: Dict[str, Any]) -> List[str]:
"""Extract subtitle availability tags from a yt-dlp info dict.
Produces multi-valued tags so languages can coexist:
- subs:<lang>
- subs_auto:<lang>
"""
def _langs(value: Any) -> List[str]:
if not isinstance(value, dict):
return []
langs: List[str] = []
for k in value.keys():
if not isinstance(k, str):
continue
lang = k.strip().lower()
if lang:
langs.append(lang)
return sorted(set(langs))
out: List[str] = []
for lang in _langs(info.get("subtitles")):
out.append(f"subs:{lang}")
for lang in _langs(info.get("automatic_captions")):
out.append(f"subs_auto:{lang}")
return out
def _scrape_ytdlp_info(url: str) -> Optional[Dict[str, Any]]:
"""Fetch a yt-dlp info dict without downloading media."""
if not isinstance(url, str) or not url.strip():
return None
url = url.strip()
# Prefer the Python module when available (faster, avoids shell quoting issues).
try:
import yt_dlp # type: ignore
opts: Any = {
"quiet": True,
"no_warnings": True,
"skip_download": True,
"noprogress": True,
"socket_timeout": 15,
"retries": 1,
"playlist_items": "1-10",
}
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(url, download=False)
return info if isinstance(info, dict) else None
except Exception:
pass
# Fallback to yt-dlp CLI if the module isn't available.
try:
import json as json_module
cmd = [
"yt-dlp",
"-J",
"--no-warnings",
"--skip-download",
"--playlist-items",
"1-10",
url,
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return None
payload = (result.stdout or "").strip()
if not payload:
return None
data = json_module.loads(payload)
return data if isinstance(data, dict) else None
except Exception:
return None
def _resolve_candidate_urls_for_item(
result: Any,
backend: Any,
file_hash: str,
config: Dict[str, Any],
) -> List[str]:
"""Get candidate URLs from backend and/or piped result."""
try:
from metadata import normalize_urls
except Exception:
normalize_urls = None # type: ignore[assignment]
urls: List[str] = []
# 1) Backend URL association (best source of truth)
try:
backend_urls = backend.get_url(file_hash, config=config)
if backend_urls:
if normalize_urls:
urls.extend(normalize_urls(backend_urls))
else:
urls.extend([str(u).strip() for u in backend_urls if isinstance(u, str) and str(u).strip()])
except Exception:
pass
# 2) Backend metadata url field
try:
meta = backend.get_metadata(file_hash, config=config)
if isinstance(meta, dict) and meta.get("url"):
if normalize_urls:
urls.extend(normalize_urls(meta.get("url")))
else:
raw = meta.get("url")
if isinstance(raw, list):
urls.extend([str(u).strip() for u in raw if isinstance(u, str) and str(u).strip()])
elif isinstance(raw, str) and raw.strip():
urls.append(raw.strip())
except Exception:
pass
# 3) Piped result fields
def _get(obj: Any, key: str, default: Any = None) -> Any:
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
for key in ("url", "webpage_url", "source_url", "target"):
val = _get(result, key, None)
if not val:
continue
if normalize_urls:
urls.extend(normalize_urls(val))
continue
if isinstance(val, str) and val.strip():
urls.append(val.strip())
elif isinstance(val, list):
urls.extend([str(u).strip() for u in val if isinstance(u, str) and str(u).strip()])
meta_field = _get(result, "metadata", None)
if isinstance(meta_field, dict) and meta_field.get("url"):
val = meta_field.get("url")
if normalize_urls:
urls.extend(normalize_urls(val))
elif isinstance(val, list):
urls.extend([str(u).strip() for u in val if isinstance(u, str) and str(u).strip()])
elif isinstance(val, str) and val.strip():
urls.append(val.strip())
# Dedup
return _dedup_tags_preserve_order(urls)
def _pick_supported_ytdlp_url(urls: List[str]) -> Optional[str]:
"""Pick the first URL that looks supported by yt-dlp (best effort)."""
if not urls:
return None
def _is_hydrus_file_url(u: str) -> bool:
text = str(u or "").strip().lower()
if not text:
return False
# Hydrus-local file URLs are retrievable blobs, not original source pages.
# yt-dlp generally can't extract meaningful metadata from these.
return ("/get_files/file" in text) and ("hash=" in text)
http_urls: List[str] = []
for u in urls:
text = str(u or "").strip()
if text.lower().startswith(("http://", "https://")):
http_urls.append(text)
# Prefer non-Hydrus URLs for yt-dlp scraping.
candidates = [u for u in http_urls if not _is_hydrus_file_url(u)]
if not candidates:
return None
# Prefer a true support check when the Python module is available.
try:
from SYS.download import is_url_supported_by_ytdlp
for text in candidates:
try:
if is_url_supported_by_ytdlp(text):
return text
except Exception:
continue
except Exception:
pass
# Fallback: use the first non-Hydrus http(s) URL and let extraction decide.
return candidates[0] if candidates else None
_scrape_isbn_metadata = _ol_scrape_isbn_metadata # type: ignore[assignment]
_scrape_openlibrary_metadata = _ol_scrape_openlibrary_metadata # type: ignore[assignment]
# Tag item for ResultTable display and piping
from dataclasses import dataclass
@dataclass
class TagItem:
"""Tag item for display in ResultTable and piping to other cmdlet.
Allows tags to be selected and piped like:
- delete-tag @{3,4,9} (delete tags at indices 3, 4, 9)
- add-tag @"namespace:value" (add this tag)
"""
tag_name: str
tag_index: int # 1-based index for user reference
hash: Optional[str] = None
store: str = "hydrus"
service_name: Optional[str] = None
path: Optional[str] = None
def __post_init__(self):
# Make ResultTable happy by adding standard fields
# NOTE: Don't set 'title' - we want only the tag column in ResultTable
self.detail = f"Tag #{self.tag_index}"
self.target = self.tag_name
self.media_kind = "tag"
def to_dict(self) -> Dict[str, Any]:
"""Convert to dict for JSON serialization."""
return {
"tag_name": self.tag_name,
"tag_index": self.tag_index,
"hash": self.hash,
"store": self.store,
"path": self.path,
"service_name": self.service_name,
}
def _emit_tags_as_table(
tags_list: List[str],
file_hash: Optional[str],
store: str = "hydrus",
service_name: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
item_title: Optional[str] = None,
path: Optional[str] = None,
subject: Optional[Any] = None,
) -> None:
"""Emit tags as TagItem objects and display via ResultTable.
This replaces _print_tag_list to make tags pipe-able.
Stores the table in ctx._LAST_RESULT_TABLE for downstream @ selection.
"""
from result_table import ResultTable
# Create ResultTable with just tag column (no title)
# Keep the title stable and avoid including hash fragments.
table_title = "tag"
if item_title:
table_title = f"tag: {item_title}"
table = ResultTable(table_title, max_columns=1)
table.set_source_command("get-tag", [])
# Create TagItem for each tag
tag_items = []
for idx, tag_name in enumerate(tags_list, start=1):
tag_item = TagItem(
tag_name=tag_name,
tag_index=idx,
hash=file_hash,
store=store,
service_name=service_name,
path=path,
)
tag_items.append(tag_item)
table.add_result(tag_item)
# Also emit to pipeline for downstream processing
ctx.emit(tag_item)
# Store the table and items in history so @.. works to go back
# Use overlay mode so it doesn't push the previous search to history stack
# This makes get-tag behave like a transient view
try:
ctx.set_last_result_table_overlay(table, tag_items, subject)
except AttributeError:
ctx.set_last_result_table(table, tag_items, subject)
# Note: CLI will handle displaying the table via ResultTable formatting
def _filter_scraped_tags(tags: List[str]) -> List[str]:
"""Filter out tags we don't want to import from scraping."""
blocked = {"title", "artist", "source"}
out: List[str] = []
seen: set[str] = set()
for t in tags:
if not t:
continue
s = str(t).strip()
if not s:
continue
ns = s.split(":", 1)[0].strip().lower() if ":" in s else ""
if ns in blocked:
continue
key = s.lower()
if key in seen:
continue
seen.add(key)
out.append(s)
return out
def _summarize_tags(tags_list: List[str], limit: int = 8) -> str:
"""Create a summary of tags for display."""
shown = [t for t in tags_list[:limit] if t]
summary = ", ".join(shown)
remaining = max(0, len(tags_list) - len(shown))
if remaining > 0:
summary = f"{summary} (+{remaining} more)" if summary else f"(+{remaining} more)"
if len(summary) > 200:
summary = summary[:197] + "..."
return summary
def _extract_title_from(tags_list: List[str]) -> Optional[str]:
"""Extract title from tags list."""
if extract_title:
try:
return extract_title(tags_list)
except Exception:
pass
for t in tags_list:
if isinstance(t, str) and t.lower().startswith("title:"):
val = t.split(":", 1)[1].strip()
if val:
return val
return None
def _rename_file_if_title_tag(media: Optional[Path], tags_added: List[str]) -> bool:
"""Rename a local file if title: tag was added.
Returns True if file was renamed, False otherwise.
"""
if not media or not tags_added:
return False
# Check if any of the added tags is a title: tag
title_value = None
for tag in tags_added:
if isinstance(tag, str):
lower_tag = tag.lower()
if lower_tag.startswith("title:"):
title_value = tag.split(":", 1)[1].strip()
break
if not title_value:
return False
try:
# Get current file path
file_path = media
if not file_path.exists():
return False
# Parse file path
dir_path = file_path.parent
old_name = file_path.name
# Get file extension
suffix = file_path.suffix or ''
# Sanitize title for use as filename
import re
safe_title = re.sub(r'[<>:"/\\|?*]', '', title_value).strip()
if not safe_title:
return False
new_name = safe_title + suffix
new_file_path = dir_path / new_name
if new_file_path == file_path:
return False
# Build sidecar paths BEFORE renaming the file
old_sidecar = Path(str(file_path) + '.tag')
new_sidecar = Path(str(new_file_path) + '.tag')
# Rename file
try:
file_path.rename(new_file_path)
log(f"Renamed file: {old_name}{new_name}")
# Rename .tag sidecar if it exists
if old_sidecar.exists():
try:
old_sidecar.rename(new_sidecar)
log(f"Renamed sidecar: {old_name}.tag → {new_name}.tag")
except Exception as e:
log(f"Failed to rename sidecar: {e}", file=sys.stderr)
return True
except Exception as e:
log(f"Failed to rename file: {e}", file=sys.stderr)
return False
except Exception as e:
log(f"Error during file rename: {e}", file=sys.stderr)
return False
def _apply_result_updates_from_tags(result: Any, tag_list: List[str]) -> None:
"""Update result object with title and tag summary from tags."""
try:
new_title = _extract_title_from(tag_list)
if new_title:
setattr(result, "title", new_title)
setattr(result, "tag_summary", _summarize_tags(tag_list))
except Exception:
pass
def _handle_title_rename(old_path: Path, tags_list: List[str]) -> Optional[Path]:
"""If a title: tag is present, rename the file and its .tag sidecar to match.
Returns the new path if renamed, otherwise returns None.
"""
# Extract title from tags
new_title = None
for tag in tags_list:
if isinstance(tag, str) and tag.lower().startswith('title:'):
new_title = tag.split(':', 1)[1].strip()
break
if not new_title or not old_path.exists():
return None
try:
# Build new filename with same extension
old_name = old_path.name
old_suffix = old_path.suffix
# Create new filename: title + extension
new_name = f"{new_title}{old_suffix}"
new_path = old_path.parent / new_name
# Don't rename if already the same name
if new_path == old_path:
return None
# Rename the main file
if new_path.exists():
log(f"Warning: Target filename already exists: {new_name}", file=sys.stderr)
return None
old_path.rename(new_path)
log(f"Renamed file: {old_name}{new_name}", file=sys.stderr)
# Rename the .tag sidecar if it exists
old_tags_path = old_path.parent / (old_name + '.tag')
if old_tags_path.exists():
new_tags_path = old_path.parent / (new_name + '.tag')
if new_tags_path.exists():
log(f"Warning: Target sidecar already exists: {new_tags_path.name}", file=sys.stderr)
else:
old_tags_path.rename(new_tags_path)
log(f"Renamed sidecar: {old_tags_path.name}{new_tags_path.name}", file=sys.stderr)
return new_path
except Exception as exc:
log(f"Warning: Failed to rename file: {exc}", file=sys.stderr)
return None
def _read_sidecar_fallback(p: Path) -> tuple[Optional[str], List[str], List[str]]:
"""Fallback sidecar reader if metadata module unavailable.
Format:
- Lines with "hash:" prefix: file hash
- Lines with "url:" or "url:" prefix: url
- Lines with "relationship:" prefix: ignored (internal relationships)
- Lines with "key:", "namespace:value" format: treated as namespace tags
- Plain lines without colons: freeform tags
Excluded namespaces (treated as metadata, not tags): hash, url, url, relationship
"""
try:
raw = p.read_text(encoding="utf-8", errors="ignore")
except OSError:
return None, [], []
t: List[str] = []
u: List[str] = []
h: Optional[str] = None
# Namespaces to exclude from tags
excluded_namespaces = {"hash", "url", "url", "relationship"}
for line in raw.splitlines():
s = line.strip()
if not s:
continue
low = s.lower()
# Check if this is a hash line
if low.startswith("hash:"):
h = s.split(":", 1)[1].strip() if ":" in s else h
# Check if this is a URL line
elif low.startswith("url:") or low.startswith("url:"):
val = s.split(":", 1)[1].strip() if ":" in s else ""
if val:
u.append(val)
# Check if this is an excluded namespace
elif ":" in s:
namespace = s.split(":", 1)[0].strip().lower()
if namespace not in excluded_namespaces:
# Include as namespace tag (e.g., "title: The Freemasons")
t.append(s)
else:
# Plain text without colon = freeform tag
t.append(s)
return h, t, u
def _write_sidecar(p: Path, media: Path, tag_list: List[str], url: List[str], hash_in_sidecar: Optional[str]) -> Path:
"""Write tags to sidecar file and handle title-based renaming.
Returns the new media path if renamed, otherwise returns the original media path.
"""
success = write_sidecar(media, tag_list, url, hash_in_sidecar)
if success:
_apply_result_updates_from_tags(None, tag_list)
# Check if we should rename the file based on title tag
new_media = _handle_title_rename(media, tag_list)
if new_media:
return new_media
return media
# Fallback writer
ordered = [s for s in tag_list if s and s.strip()]
lines = []
if hash_in_sidecar:
lines.append(f"hash:{hash_in_sidecar}")
lines.extend(ordered)
for u in url:
lines.append(f"url:{u}")
try:
p.write_text("\n".join(lines) + "\n", encoding="utf-8")
# Check if we should rename the file based on title tag
new_media = _handle_title_rename(media, tag_list)
if new_media:
return new_media
return media
except OSError as exc:
log(f"Failed to write sidecar: {exc}", file=sys.stderr)
return media
def _emit_tag_payload(source: str, tags_list: List[str], *, hash_value: Optional[str], extra: Optional[Dict[str, Any]] = None, store_label: Optional[str] = None) -> int:
"""Emit tag values as structured payload to pipeline."""
payload: Dict[str, Any] = {
"source": source,
"tag": list(tags_list),
"count": len(tags_list),
}
if hash_value:
payload["hash"] = hash_value
if extra:
for key, value in extra.items():
if value is not None:
payload[key] = value
label = None
if store_label:
label = store_label
elif ctx.get_stage_context() is not None:
label = "tag"
if label:
ctx.store_value(label, payload)
# Emit individual TagItem objects so they can be selected by bare index
# When in pipeline, emit individual TagItem objects
if ctx.get_stage_context() is not None:
for idx, tag_name in enumerate(tags_list, start=1):
tag_item = TagItem(
tag_name=tag_name,
tag_index=idx,
hash=hash_value,
store=source,
service_name=None
)
ctx.emit(tag_item)
else:
# When not in pipeline, just emit the payload
ctx.emit(payload)
return 0
def _extract_scrapable_identifiers(tags_list: List[str]) -> Dict[str, str]:
"""Extract scrapable identifiers from tags."""
identifiers = {}
scrapable_prefixes = {
'openlibrary', 'isbn', 'isbn_10', 'isbn_13',
'musicbrainz', 'musicbrainzalbum', 'imdb', 'tmdb', 'tvdb'
}
for tag in tags_list:
if not isinstance(tag, str) or ':' not in tag:
continue
parts = tag.split(':', 1)
if len(parts) != 2:
continue
key_raw = parts[0].strip().lower()
key = key_raw.replace('-', '_')
if key == 'isbn10':
key = 'isbn_10'
elif key == 'isbn13':
key = 'isbn_13'
value = parts[1].strip()
# Normalize ISBN values by removing hyphens for API friendliness
if key.startswith('isbn'):
value = value.replace('-', '')
if key in scrapable_prefixes and value:
identifiers[key] = value
return identifiers
def _extract_tag_value(tags_list: List[str], namespace: str) -> Optional[str]:
"""Get first tag value for a namespace (e.g., artist:, title:)."""
ns = namespace.lower()
for tag in tags_list:
if not isinstance(tag, str) or ':' not in tag:
continue
prefix, _, value = tag.partition(':')
if prefix.strip().lower() != ns:
continue
candidate = value.strip()
if candidate:
return candidate
return None
def _scrape_url_metadata(url: str) -> Tuple[Optional[str], List[str], List[Tuple[str, str]], List[Dict[str, Any]]]:
"""Scrape metadata from a URL using yt-dlp.
Returns:
(title, tags, formats, playlist_items) tuple where:
- title: Video/content title
- tags: List of extracted tags (both namespaced and freeform)
- formats: List of (display_label, format_id) tuples
- playlist_items: List of playlist entry dicts (empty if not a playlist)
"""
try:
import json as json_module
try:
from metadata import extract_ytdlp_tags
except ImportError:
extract_ytdlp_tags = None
# Build yt-dlp command with playlist support
# IMPORTANT: Do NOT use --flat-playlist! It strips metadata like artist, album, uploader, genre
# Without it, yt-dlp gives us full metadata in an 'entries' array within a single JSON object
# This ensures we get album-level metadata from sources like BandCamp, YouTube Music, etc.
cmd = [
"yt-dlp",
"-j", # Output JSON
"--no-warnings",
"--playlist-items", "1-10", # Get first 10 items if it's a playlist (provides entries)
"-f", "best",
url
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
log(f"yt-dlp error: {result.stderr}", file=sys.stderr)
return None, [], [], []
# Parse JSON output - WITHOUT --flat-playlist, we get ONE JSON object with 'entries' array
# This gives us full metadata instead of flat format
lines = result.stdout.strip().split('\n')
if not lines or not lines[0]:
log("yt-dlp returned empty output", file=sys.stderr)
return None, [], [], []
# Parse the single JSON object
try:
data = json_module.loads(lines[0])
except json_module.JSONDecodeError as e:
log(f"Failed to parse yt-dlp JSON: {e}", file=sys.stderr)
return None, [], [], []
# Extract title - use the main title
title = data.get('title', 'Unknown')
# Determine if this is a playlist/album (has entries array)
# is_playlist = 'entries' in data and isinstance(data.get('entries'), list)
# Extract tags and playlist items
tags = []
playlist_items = []
# IMPORTANT: Extract album/playlist-level tags FIRST (before processing entries)
# This ensures we get metadata about the collection, not just individual tracks
if extract_ytdlp_tags:
album_tags = extract_ytdlp_tags(data)
tags.extend(album_tags)
# Case 1: Entries are nested in the main object (standard playlist structure)
if 'entries' in data and isinstance(data.get('entries'), list):
entries = data['entries']
# Build playlist items with title and duration
for idx, entry in enumerate(entries, 1):
if isinstance(entry, dict):
item_title = entry.get('title', entry.get('id', f'Track {idx}'))
item_duration = entry.get('duration', 0)
playlist_items.append({
'index': idx,
'id': entry.get('id', f'track_{idx}'),
'title': item_title,
'duration': item_duration,
'url': entry.get('url') or entry.get('webpage_url', ''),
})
# Extract tags from each entry and merge (but don't duplicate album-level tags)
# Only merge entry tags that are multi-value prefixes (not single-value like title:, artist:, etc.)
if extract_ytdlp_tags:
entry_tags = extract_ytdlp_tags(entry)
# Single-value namespaces that should not be duplicated from entries
single_value_namespaces = {'title', 'artist', 'album', 'creator', 'channel', 'release_date', 'upload_date', 'license', 'location'}
for tag in entry_tags:
# Extract the namespace (part before the colon)
tag_namespace = tag.split(':', 1)[0].lower() if ':' in tag else None
# Skip if this namespace already exists in tags (from album level)
if tag_namespace and tag_namespace in single_value_namespaces:
# Check if any tag with this namespace already exists in tags
already_has_namespace = any(
t.split(':', 1)[0].lower() == tag_namespace
for t in tags if ':' in t
)
if already_has_namespace:
continue # Skip this tag, keep the album-level one
if tag not in tags: # Avoid exact duplicates
tags.append(tag)
# Case 2: Playlist detected by playlist_count field (BandCamp albums, etc.)
# These need a separate call with --flat-playlist to get the actual entries
elif (data.get('playlist_count') or 0) > 0 and 'entries' not in data:
try:
# Make a second call with --flat-playlist to get the actual tracks
flat_cmd = [
"yt-dlp",
"-j",
"--no-warnings",
"--flat-playlist",
"-f", "best",
url
]
flat_result = subprocess.run(flat_cmd, capture_output=True, text=True, timeout=30)
if flat_result.returncode == 0:
flat_lines = flat_result.stdout.strip().split('\n')
# With --flat-playlist, each line is a separate track JSON object
# (not nested in a playlist container), so process ALL lines
for idx, line in enumerate(flat_lines, 1):
if line.strip().startswith('{'):
try:
entry = json_module.loads(line)
item_title = entry.get('title', entry.get('id', f'Track {idx}'))
item_duration = entry.get('duration', 0)
playlist_items.append({
'index': idx,
'id': entry.get('id', f'track_{idx}'),
'title': item_title,
'duration': item_duration,
'url': entry.get('url') or entry.get('webpage_url', ''),
})
except json_module.JSONDecodeError:
pass
except Exception as e:
pass # Silently ignore if we can't get playlist entries
# Fallback: if still no tags detected, get from first item
if not tags and extract_ytdlp_tags:
tags = extract_ytdlp_tags(data)
# Extract formats from the main data object
formats = []
if 'formats' in data:
formats = _extract_url_formats(data.get('formats', []))
# Deduplicate tags by namespace to prevent duplicate title:, artist:, etc.
try:
from metadata import dedup_tags_by_namespace as _dedup
if _dedup:
tags = _dedup(tags, keep_first=True)
except Exception:
pass # If dedup fails, return tags as-is
return title, tags, formats, playlist_items
except subprocess.TimeoutExpired:
log("yt-dlp timeout (>30s)", file=sys.stderr)
return None, [], [], []
except Exception as e:
log(f"URL scraping error: {e}", file=sys.stderr)
return None, [], [], []
def _extract_url_formats(formats: list) -> List[Tuple[str, str]]:
"""Extract best formats from yt-dlp formats list.
Returns list of (display_label, format_id) tuples.
"""
try:
video_formats = {} # {resolution: format_data}
audio_formats = {} # {quality_label: format_data}
for fmt in formats:
vcodec = fmt.get('vcodec', 'none')
acodec = fmt.get('acodec', 'none')
height = fmt.get('height')
ext = fmt.get('ext', 'unknown')
format_id = fmt.get('format_id', '')
tbr = fmt.get('tbr', 0)
abr = fmt.get('abr', 0)
# Video format
if vcodec and vcodec != 'none' and height:
if height < 480:
continue
res_key = f"{height}p"
if res_key not in video_formats or tbr > video_formats[res_key].get('tbr', 0):
video_formats[res_key] = {
'label': f"{height}p ({ext})",
'format_id': format_id,
'tbr': tbr,
}
# Audio-only format
elif acodec and acodec != 'none' and (not vcodec or vcodec == 'none'):
audio_key = f"audio_{abr}"
if audio_key not in audio_formats or abr > audio_formats[audio_key].get('abr', 0):
audio_formats[audio_key] = {
'label': f"audio ({ext})",
'format_id': format_id,
'abr': abr,
}
result = []
# Add video formats in descending resolution order
for res in sorted(video_formats.keys(), key=lambda x: int(x.replace('p', '')), reverse=True):
fmt = video_formats[res]
result.append((fmt['label'], fmt['format_id']))
# Add best audio format
if audio_formats:
best_audio = max(audio_formats.values(), key=lambda x: x.get('abr', 0))
result.append((best_audio['label'], best_audio['format_id']))
return result
except Exception as e:
log(f"Error extracting formats: {e}", file=sys.stderr)
return []
def _scrape_isbn_metadata(isbn: str) -> List[str]:
if _ol_scrape_isbn_metadata is None:
log("OpenLibrary scraper unavailable", file=sys.stderr)
return []
try:
return list(_ol_scrape_isbn_metadata(isbn))
except Exception as e:
log(f"ISBN scraping error: {e}", file=sys.stderr)
return []
def _scrape_openlibrary_metadata(olid: str) -> List[str]:
if _ol_scrape_openlibrary_metadata is None:
log("OpenLibrary scraper unavailable", file=sys.stderr)
return []
try:
return list(_ol_scrape_openlibrary_metadata(olid))
except Exception as e:
log(f"OpenLibrary scraping error: {e}", file=sys.stderr)
return []
def _perform_scraping(tags_list: List[str]) -> List[str]:
"""Perform scraping based on identifiers in tags.
Priority order:
1. openlibrary: (preferred - more complete metadata)
2. isbn_10 or isbn (fallback)
"""
identifiers = _extract_scrapable_identifiers(tags_list)
if not identifiers:
log("No scrapable identifiers found (openlibrary, ISBN, musicbrainz, imdb)")
return []
log(f"Found scrapable identifiers: {', '.join(identifiers.keys())}")
new_tags = []
# Prefer OpenLibrary over ISBN (more complete metadata)
if 'openlibrary' in identifiers:
olid = identifiers['openlibrary']
if olid:
log(f"Scraping OpenLibrary: {olid}")
new_tags.extend(_scrape_openlibrary_metadata(olid))
elif 'isbn_13' in identifiers or 'isbn_10' in identifiers or 'isbn' in identifiers:
isbn = identifiers.get('isbn_13') or identifiers.get('isbn_10') or identifiers.get('isbn')
if isbn:
log(f"Scraping ISBN: {isbn}")
new_tags.extend(_scrape_isbn_metadata(isbn))
existing_tags_lower = {tag.lower() for tag in tags_list}
scraped_unique = []
seen = set()
for tag in new_tags:
tag_lower = tag.lower()
if tag_lower not in existing_tags_lower and tag_lower not in seen:
scraped_unique.append(tag)
seen.add(tag_lower)
if scraped_unique:
log(f"Added {len(scraped_unique)} new tag(s) from scraping")
return scraped_unique
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Get tags from Hydrus, local sidecar, or URL metadata.
Usage:
get-tag [-query "hash:<sha256>"] [--store <key>] [--emit]
get-tag -scrape <url|provider>
Options:
-query "hash:<sha256>": Override hash to use instead of result's hash
--store <key>: Store result to this key for pipeline
--emit: Emit result without interactive prompt (quiet mode)
-scrape <url|provider>: Scrape metadata from URL or provider name (itunes, openlibrary, googlebooks)
"""
args_list = [str(arg) for arg in (args or [])]
raw_args = list(args_list)
# Support numeric selection tokens (e.g., "@1" leading to argument "1") without treating
# them as hash overrides. This lets users pick from the most recent table overlay/results.
if len(args_list) == 1:
token = args_list[0]
if not token.startswith("-") and token.isdigit():
try:
idx = int(token) - 1
items_pool = ctx.get_last_result_items()
if 0 <= idx < len(items_pool):
result = items_pool[idx]
args_list = []
debug(f"[get_tag] Resolved numeric selection arg {token} -> last_result_items[{idx}]")
else:
debug(f"[get_tag] Numeric selection arg {token} out of range (items={len(items_pool)})")
except Exception as exc:
debug(f"[get_tag] Failed to resolve numeric selection arg {token}: {exc}")
# Helper to get field from both dict and object
def get_field(obj: Any, field: str, default: Any = None) -> Any:
if isinstance(obj, dict):
return obj.get(field, default)
else:
return getattr(obj, field, default)
# Parse arguments using shared parser
parsed_args = parse_cmdlet_args(args_list, CMDLET)
# Detect if -scrape flag was provided without a value (parse_cmdlet_args skips missing values)
scrape_flag_present = any(str(arg).lower() in {"-scrape", "--scrape"} for arg in args_list)
# Extract values
query_raw = parsed_args.get("query")
hash_override = sh.parse_single_hash_query(query_raw)
if query_raw and not hash_override:
log("Invalid -query value (expected hash:<sha256>)", file=sys.stderr)
return 1
store_key = parsed_args.get("store")
emit_requested = parsed_args.get("emit", False)
scrape_url = parsed_args.get("scrape")
scrape_requested = scrape_flag_present or scrape_url is not None
# Convenience: `-scrape` with no value defaults to `ytdlp` (store-backed URL scrape).
if scrape_flag_present and (scrape_url is None or str(scrape_url).strip() == ""):
scrape_url = "ytdlp"
scrape_requested = True
if scrape_requested and (scrape_url is None or str(scrape_url).strip() == ""):
log("-scrape requires a URL or provider name", file=sys.stderr)
return 1
# Handle URL or provider scraping mode
if scrape_requested and scrape_url:
import json as json_module
if str(scrape_url).strip().lower() == "ytdlp":
# Scrape metadata from the selected item's URL via yt-dlp (no download),
# then OVERWRITE all existing tags (including title:).
#
# This mode requires a store-backed item (hash + store).
#
# NOTE: We intentionally do not reuse _scrape_url_metadata() here because it
# performs namespace deduplication that would collapse multi-valued tags.
file_hash = normalize_hash(hash_override) or normalize_hash(get_field(result, "hash", None))
store_name = get_field(result, "store", None)
subject_path = get_field(result, "path", None) or get_field(result, "target", None) or get_field(result, "filename", None)
item_title = get_field(result, "title", None) or get_field(result, "name", None) or get_field(result, "filename", None)
# Only run overwrite-apply when the item is store-backed.
# If this is a URL-only PipeObject, fall through to provider mode below.
if file_hash and store_name and str(file_hash).strip().lower() != "unknown" and str(store_name).strip().upper() not in {"PATH", "URL"}:
try:
from Store import Store
storage = Store(config)
backend = storage[str(store_name)]
except Exception as exc:
log(f"Failed to resolve store backend '{store_name}': {exc}", file=sys.stderr)
return 1
candidate_urls = _resolve_candidate_urls_for_item(result, backend, file_hash, config)
scrape_target = _pick_supported_ytdlp_url(candidate_urls)
if not scrape_target:
log(
"No yt-dlp-supported source URL found for this item (Hydrus /get_files/file URLs are ignored). ",
file=sys.stderr,
)
log(
"Add the original page URL to the file (e.g. via add-url), then retry get-tag -scrape.",
file=sys.stderr,
)
return 1
info = _scrape_ytdlp_info(scrape_target)
if not info:
log("yt-dlp could not extract metadata for this URL (unsupported or failed)", file=sys.stderr)
return 1
try:
from metadata import extract_ytdlp_tags
except Exception:
extract_ytdlp_tags = None # type: ignore[assignment]
# Prefer the top-level metadata, but if this is a playlist container, use
# the first entry for per-item fields like subtitles.
info_for_subs = info
entries = info.get("entries") if isinstance(info, dict) else None
if isinstance(entries, list) and entries:
first = entries[0]
if isinstance(first, dict):
info_for_subs = first
tags: List[str] = []
if extract_ytdlp_tags:
try:
tags.extend(extract_ytdlp_tags(info))
except Exception:
pass
# Subtitle availability tags
try:
tags.extend(_extract_subtitle_tags(info_for_subs if isinstance(info_for_subs, dict) else {}))
except Exception:
pass
# Ensure we actually have something to apply.
tags = _dedup_tags_preserve_order(tags)
if not tags:
log("No tags extracted from yt-dlp metadata", file=sys.stderr)
return 1
# Full overwrite: delete all existing tags, then add the new set.
try:
existing_tags, _src = backend.get_tag(file_hash, config=config)
except Exception:
existing_tags = []
try:
if existing_tags:
backend.delete_tag(file_hash, list(existing_tags), config=config)
except Exception as exc:
debug(f"[get_tag] ytdlp overwrite: delete_tag failed: {exc}")
try:
backend.add_tag(file_hash, list(tags), config=config)
except Exception as exc:
log(f"Failed to apply yt-dlp tags: {exc}", file=sys.stderr)
return 1
# Show updated tags
try:
updated_tags, _src = backend.get_tag(file_hash, config=config)
except Exception:
updated_tags = tags
if not updated_tags:
updated_tags = tags
_emit_tags_as_table(
tags_list=list(updated_tags),
file_hash=file_hash,
store=str(store_name),
service_name=None,
config=config,
item_title=str(item_title or "ytdlp"),
path=str(subject_path) if subject_path else None,
subject={
"hash": file_hash,
"store": str(store_name),
"path": str(subject_path) if subject_path else None,
"title": item_title,
"extra": {"applied_provider": "ytdlp", "scrape_url": scrape_target},
},
)
return 0
if scrape_url.startswith("http://") or scrape_url.startswith("https://"):
# URL scraping (existing behavior)
title, tags, formats, playlist_items = _scrape_url_metadata(scrape_url)
if not tags:
log("No tags extracted from URL", file=sys.stderr)
return 1
output = {
"title": title,
"tag": tags,
"formats": [(label, fmt_id) for label, fmt_id in formats],
"playlist_items": playlist_items,
}
print(json_module.dumps(output, ensure_ascii=False))
return 0
# Provider scraping (e.g., itunes)
provider = get_metadata_provider(scrape_url, config)
if provider is None:
log(f"Unknown metadata provider: {scrape_url}", file=sys.stderr)
return 1
# Prefer identifier tags (ISBN/OLID/etc.) when available; fallback to title/filename.
# IMPORTANT: do not rely on `result.tag` for this because it can be stale (cached on
# the piped PipeObject). Always prefer the current store-backed tags when possible.
identifier_tags: List[str] = []
file_hash_for_scrape = normalize_hash(hash_override) or normalize_hash(get_field(result, "hash", None))
store_for_scrape = get_field(result, "store", None)
if file_hash_for_scrape and store_for_scrape:
try:
from Store import Store
storage = Store(config)
backend = storage[str(store_for_scrape)]
current_tags, _src = backend.get_tag(file_hash_for_scrape, config=config)
if isinstance(current_tags, (list, tuple, set)) and current_tags:
identifier_tags = [str(t) for t in current_tags if isinstance(t, (str, bytes))]
except Exception:
# Fall back to whatever is present on the piped result if store lookup fails.
pass
# Fall back to tags carried on the result (may be stale).
if not identifier_tags:
result_tags = get_field(result, "tag", None)
if isinstance(result_tags, list):
identifier_tags = [str(t) for t in result_tags if isinstance(t, (str, bytes))]
# As a last resort, try local sidecar only when the item is not store-backed.
if not identifier_tags and (not file_hash_for_scrape or not store_for_scrape):
file_path = get_field(result, "target", None) or get_field(result, "path", None) or get_field(result, "filename", None)
if isinstance(file_path, str) and file_path and not file_path.lower().startswith(("http://", "https://")):
try:
media_path = Path(str(file_path))
if media_path.exists():
tags_from_sidecar = read_sidecar(media_path)
if isinstance(tags_from_sidecar, list):
identifier_tags = [str(t) for t in tags_from_sidecar if isinstance(t, (str, bytes))]
except Exception:
pass
title_from_tags = _extract_tag_value(identifier_tags, "title")
artist_from_tags = _extract_tag_value(identifier_tags, "artist")
identifiers = _extract_scrapable_identifiers(identifier_tags)
identifier_query: Optional[str] = None
if identifiers:
if provider.name in {"openlibrary", "googlebooks", "google"}:
identifier_query = identifiers.get("isbn_13") or identifiers.get("isbn_10") or identifiers.get("isbn") or identifiers.get("openlibrary")
elif provider.name == "itunes":
identifier_query = identifiers.get("musicbrainz") or identifiers.get("musicbrainzalbum")
# Determine query from identifier first, else title on the result or filename
title_hint = title_from_tags or get_field(result, "title", None) or get_field(result, "name", None)
if not title_hint:
file_path = get_field(result, "path", None) or get_field(result, "filename", None)
if file_path:
title_hint = Path(str(file_path)).stem
artist_hint = artist_from_tags or get_field(result, "artist", None) or get_field(result, "uploader", None)
if not artist_hint:
meta_field = get_field(result, "metadata", None)
if isinstance(meta_field, dict):
meta_artist = meta_field.get("artist") or meta_field.get("uploader")
if meta_artist:
artist_hint = str(meta_artist)
combined_query: Optional[str] = None
if not identifier_query and title_hint and artist_hint and provider.name in {"itunes", "musicbrainz"}:
if provider.name == "musicbrainz":
combined_query = f'recording:"{title_hint}" AND artist:"{artist_hint}"'
else:
combined_query = f"{title_hint} {artist_hint}"
# yt-dlp isn't a search provider; it requires a URL.
url_hint: Optional[str] = None
if provider.name == "ytdlp":
raw_url = get_field(result, "url", None) or get_field(result, "source_url", None) or get_field(result, "target", None)
if isinstance(raw_url, list) and raw_url:
raw_url = raw_url[0]
if isinstance(raw_url, str) and raw_url.strip().startswith(("http://", "https://")):
url_hint = raw_url.strip()
query_hint = url_hint or identifier_query or combined_query or title_hint
if not query_hint:
log("No title or identifier available to search for metadata", file=sys.stderr)
return 1
if identifier_query:
log(f"Using identifier for metadata search: {identifier_query}")
elif combined_query:
log(f"Using title+artist for metadata search: {title_hint} - {artist_hint}")
else:
log(f"Using title for metadata search: {query_hint}")
items = provider.search(query_hint, limit=10)
if not items:
log("No metadata results found", file=sys.stderr)
return 1
# For yt-dlp, emit tags directly (there is no meaningful multi-result selection step).
if provider.name == "ytdlp":
try:
tags = [str(t) for t in provider.to_tags(items[0]) if t is not None]
except Exception:
tags = []
if not tags:
log("No tags extracted from yt-dlp metadata", file=sys.stderr)
return 1
_emit_tags_as_table(
tags_list=list(tags),
file_hash=None,
store="url",
service_name=None,
config=config,
item_title=str(items[0].get("title") or "ytdlp"),
path=None,
subject={"provider": "ytdlp", "url": str(query_hint)},
)
return 0
from result_table import ResultTable
table = ResultTable(f"Metadata: {provider.name}")
table.set_source_command("get-tag", [])
selection_payload = []
hash_for_payload = normalize_hash(hash_override) or normalize_hash(get_field(result, "hash", None))
store_for_payload = get_field(result, "store", None)
# Preserve a consistent path field when present so selecting a metadata row
# keeps referring to the original file.
path_for_payload = get_field(result, "path", None) or get_field(result, "target", None) or get_field(result, "filename", None)
for idx, item in enumerate(items):
tags = _filter_scraped_tags(provider.to_tags(item))
row = table.add_row()
row.add_column("Title", item.get("title", ""))
row.add_column("Artist", item.get("artist", ""))
row.add_column("Album", item.get("album", ""))
row.add_column("Year", item.get("year", ""))
payload = {
"tag": tags,
"provider": provider.name,
"title": item.get("title"),
"artist": item.get("artist"),
"album": item.get("album"),
"year": item.get("year"),
"hash": hash_for_payload,
"store": store_for_payload,
"path": path_for_payload,
"extra": {
"tag": tags,
"provider": provider.name,
},
}
selection_payload.append(payload)
table.set_row_selection_args(idx, [str(idx + 1)])
ctx.set_last_result_table_overlay(table, selection_payload)
ctx.set_current_stage_table(table)
# Preserve items for @ selection and downstream pipes without emitting duplicates
ctx.set_last_result_items_only(selection_payload)
return 0
# If -scrape was requested but no URL, that's an error
if scrape_requested and not scrape_url:
log("-scrape requires a URL argument", file=sys.stderr)
return 1
# Handle @N selection which creates a list - extract the first item
if isinstance(result, list) and len(result) > 0:
result = result[0]
# If the current result already carries a tag list (e.g. a selected metadata
# row from get-tag -scrape itunes), APPLY those tags to the file in the store.
result_provider = get_field(result, "provider", None)
result_tags = get_field(result, "tag", None)
if result_provider and isinstance(result_tags, list) and result_tags:
file_hash = normalize_hash(hash_override) or normalize_hash(get_field(result, "hash", None))
store_name = get_field(result, "store", None)
subject_path = get_field(result, "path", None) or get_field(result, "target", None) or get_field(result, "filename", None)
if not file_hash or not store_name:
log("Selected metadata row is missing hash/store; cannot apply tags", file=sys.stderr)
_emit_tags_as_table(
tags_list=[str(t) for t in result_tags if t is not None],
file_hash=file_hash,
store=str(store_name or "local"),
service_name=None,
config=config,
item_title=str(get_field(result, "title", None) or result_provider),
path=str(subject_path) if subject_path else None,
subject=result,
)
return 0
# Apply tags to the store backend (no sidecar writing here).
if str(result_provider).strip().lower() == "ytdlp":
apply_tags = [str(t) for t in result_tags if t is not None]
else:
apply_tags = _filter_scraped_tags([str(t) for t in result_tags if t is not None])
if not apply_tags:
log("No applicable scraped tags to apply (title:/artist:/source: are skipped)", file=sys.stderr)
return 0
try:
from Store import Store
storage = Store(config)
backend = storage[str(store_name)]
ok = bool(backend.add_tag(file_hash, apply_tags, config=config))
if not ok:
log(f"Failed to apply tags to store '{store_name}'", file=sys.stderr)
except Exception as exc:
log(f"Failed to apply tags: {exc}", file=sys.stderr)
return 1
# Show updated tags after applying.
try:
updated_tags, _src = backend.get_tag(file_hash, config=config)
except Exception:
updated_tags = apply_tags
if not updated_tags:
updated_tags = apply_tags
_emit_tags_as_table(
tags_list=list(updated_tags),
file_hash=file_hash,
store=str(store_name),
service_name=None,
config=config,
item_title=str(get_field(result, "title", None) or get_field(result, "name", None) or str(result_provider)),
path=str(subject_path) if subject_path else None,
subject={
"hash": file_hash,
"store": str(store_name),
"path": str(subject_path) if subject_path else None,
"title": get_field(result, "title", None) or get_field(result, "name", None),
"extra": {"applied_provider": str(result_provider)},
},
)
return 0
hash_from_result = normalize_hash(get_field(result, "hash", None))
file_hash = hash_override or hash_from_result
# Only use emit mode if explicitly requested with --emit flag, not just because we're in a pipeline
# This allows interactive REPL to work even in pipelines
emit_mode = emit_requested or bool(store_key)
store_label = (store_key.strip() if store_key and store_key.strip() else None)
# Get hash and store from result
store_name = get_field(result, "store")
if not file_hash:
log("No hash available in result", file=sys.stderr)
return 1
if not store_name:
log("No store specified in result", file=sys.stderr)
return 1
# Get tags using storage backend
try:
from Store import Store
storage = Store(config)
backend = storage[store_name]
current, source = backend.get_tag(file_hash, config=config)
if not current:
log("No tags found", file=sys.stderr)
return 1
service_name = ""
except KeyError:
log(f"Store '{store_name}' not found", file=sys.stderr)
return 1
except Exception as exc:
log(f"Failed to get tags: {exc}", file=sys.stderr)
return 1
# Always output to ResultTable (pipeline mode only)
# Extract title for table header
item_title = get_field(result, "title", None) or get_field(result, "name", None) or get_field(result, "filename", None)
# Build a subject payload representing the file whose tags are being shown
subject_store = get_field(result, "store", None) or store_name
subject_path = get_field(result, "path", None) or get_field(result, "target", None) or get_field(result, "filename", None)
subject_payload: Dict[str, Any] = {
"tag": list(current),
"title": item_title,
"name": item_title,
"store": subject_store,
"service_name": service_name,
"extra": {
"tag": list(current),
},
}
if file_hash:
subject_payload["hash"] = file_hash
if subject_path:
try:
subject_payload["path"] = str(subject_path)
except Exception:
pass
_emit_tags_as_table(
current,
file_hash=file_hash,
store=subject_store,
service_name=service_name if source == "hydrus" else None,
config=config,
item_title=item_title,
path=str(subject_path) if subject_path else None,
subject=subject_payload,
)
# If emit requested or store key provided, emit payload
if emit_mode:
_emit_tag_payload(source, current, hash_value=file_hash, store_label=store_label)
return 0
_SCRAPE_CHOICES = []
try:
_SCRAPE_CHOICES = sorted(list_metadata_providers().keys())
except Exception:
_SCRAPE_CHOICES = ["itunes", "openlibrary", "googlebooks", "google", "musicbrainz"]
# Special scrape mode: pull tags from an item's URL via yt-dlp (no download)
if "ytdlp" not in _SCRAPE_CHOICES:
_SCRAPE_CHOICES.append("ytdlp")
_SCRAPE_CHOICES = sorted(_SCRAPE_CHOICES)
class Get_Tag(Cmdlet):
"""Class-based get-tag cmdlet with self-registration."""
def __init__(self) -> None:
"""Initialize get-tag cmdlet."""
super().__init__(
name="get-tag",
summary="Get tag values from Hydrus or local sidecar metadata",
usage="get-tag [-query \"hash:<sha256>\"] [--store <key>] [--emit] [-scrape <url|provider>]",
alias=[],
arg=[
SharedArgs.QUERY,
CmdletArg(
name="-store",
type="string",
description="Store result to this key for pipeline",
alias="store"
),
CmdletArg(
name="-emit",
type="flag",
description="Emit result without interactive prompt (quiet mode)",
alias="emit-only"
),
CmdletArg(
name="-scrape",
type="string",
description="Scrape metadata from URL/provider, or use 'ytdlp' to scrape from the item's URL and overwrite tags",
required=False,
choices=_SCRAPE_CHOICES,
)
],
detail=[
"- Retrieves tags for a file from:",
" Hydrus: Using file hash if available",
" Local: From sidecar files or local library database",
"- Options:",
" -query: Override hash to look up in Hydrus (use: -query \"hash:<sha256>\")",
" -store: Store result to key for downstream pipeline",
" -emit: Quiet mode (no interactive selection)",
" -scrape: Scrape metadata from URL or metadata provider",
],
exec=self.run,
)
self.register()
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Execute get-tag cmdlet."""
return _run(result, args, config)
# Create and register the cmdlet
CMDLET = Get_Tag()