"""Get tags from Hydrus or local sidecar metadata. This cmdlet retrieves tags for a selected result, supporting both: - Hydrus Network (for files with hash_hex) - Local sidecar files (.tags) In interactive mode: navigate with numbers, add/delete tags In pipeline mode: display tags as read-only table, emit as structured JSON """ from __future__ import annotations import sys from helper.logger import log import subprocess from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple import pipeline as ctx from helper import hydrus from helper.local_library import read_sidecar, write_sidecar, find_sidecar, LocalLibraryDB from ._shared import normalize_hash, Cmdlet, CmdletArg, SharedArgs, parse_cmdlet_args from config import get_local_storage_path try: from metadata import extract_title except ImportError: extract_title = None # Tag item for ResultTable display and piping from dataclasses import dataclass @dataclass class TagItem: """Tag item for display in ResultTable and piping to other cmdlets. Allows tags to be selected and piped like: - delete-tag @{3,4,9} (delete tags at indices 3, 4, 9) - add-tag @"namespace:value" (add this tag) """ tag_name: str tag_index: int # 1-based index for user reference hash_hex: Optional[str] = None source: str = "hydrus" service_name: Optional[str] = None file_path: Optional[str] = None def __post_init__(self): # Make ResultTable happy by adding standard fields # NOTE: Don't set 'title' - we want only the tag column in ResultTable self.origin = self.source self.detail = f"Tag #{self.tag_index}" self.target = self.tag_name self.media_kind = "tag" def to_dict(self) -> Dict[str, Any]: """Convert to dict for JSON serialization.""" return { "tag_name": self.tag_name, "tag_index": self.tag_index, "hash_hex": self.hash_hex, "source": self.source, "service_name": self.service_name, } def _extract_my_tags_from_hydrus_meta(meta: Dict[str, Any], service_key: Optional[str], service_name: str) -> List[str]: """Extract current tags from Hydrus metadata dict. Prefers display_tags (includes siblings/parents, excludes deleted). Falls back to storage_tags status '0' (current). """ tags_payload = meta.get("tags") if not isinstance(tags_payload, dict): return [] svc_data = None if service_key: svc_data = tags_payload.get(service_key) if not isinstance(svc_data, dict): return [] # Prefer display_tags (Hydrus computes siblings/parents) display = svc_data.get("display_tags") if isinstance(display, list) and display: return [str(t) for t in display if isinstance(t, (str, bytes)) and str(t).strip()] # Fallback to storage_tags status '0' (current) storage = svc_data.get("storage_tags") if isinstance(storage, dict): current_list = storage.get("0") or storage.get(0) if isinstance(current_list, list): return [str(t) for t in current_list if isinstance(t, (str, bytes)) and str(t).strip()] return [] def _emit_tags_as_table( tags_list: List[str], hash_hex: Optional[str], source: str = "hydrus", service_name: Optional[str] = None, config: Dict[str, Any] = None, item_title: Optional[str] = None, file_path: Optional[str] = None ) -> None: """Emit tags as TagItem objects and display via ResultTable. This replaces _print_tag_list to make tags pipe-able. Stores the table in ctx._LAST_RESULT_TABLE for downstream @ selection. """ from result_table import ResultTable # Create ResultTable with just tag column (no title) table_title = "Tags" if item_title: table_title = f"Tags: {item_title}" if hash_hex: table_title += f" [{hash_hex[:8]}]" table = ResultTable(table_title, max_columns=1) table.set_source_command("get-tag", []) # Create TagItem for each tag tag_items = [] for idx, tag_name in enumerate(tags_list, start=1): tag_item = TagItem( tag_name=tag_name, tag_index=idx, hash_hex=hash_hex, source=source, service_name=service_name, file_path=file_path, ) tag_items.append(tag_item) table.add_result(tag_item) # Also emit to pipeline for downstream processing ctx.emit(tag_item) # Store the table and items in history so @.. works to go back # Use overlay mode so it doesn't push the previous search to history stack # This makes get-tag behave like a transient view try: ctx.set_last_result_table_overlay(table, tag_items) except AttributeError: ctx.set_last_result_table(table, tag_items) # Note: CLI will handle displaying the table via ResultTable formatting def _summarize_tags(tags_list: List[str], limit: int = 8) -> str: """Create a summary of tags for display.""" shown = [t for t in tags_list[:limit] if t] summary = ", ".join(shown) remaining = max(0, len(tags_list) - len(shown)) if remaining > 0: summary = f"{summary} (+{remaining} more)" if summary else f"(+{remaining} more)" if len(summary) > 200: summary = summary[:197] + "..." return summary def _extract_title_from(tags_list: List[str]) -> Optional[str]: """Extract title from tags list.""" if extract_title: try: return extract_title(tags_list) except Exception: pass for t in tags_list: if isinstance(t, str) and t.lower().startswith("title:"): val = t.split(":", 1)[1].strip() if val: return val return None def _rename_file_if_title_tag(media: Optional[Path], tags_added: List[str]) -> bool: """Rename a local file if title: tag was added. Returns True if file was renamed, False otherwise. """ if not media or not tags_added: return False # Check if any of the added tags is a title: tag title_value = None for tag in tags_added: if isinstance(tag, str): lower_tag = tag.lower() if lower_tag.startswith("title:"): title_value = tag.split(":", 1)[1].strip() break if not title_value: return False try: # Get current file path file_path = media if not file_path.exists(): return False # Parse file path dir_path = file_path.parent old_name = file_path.name # Get file extension suffix = file_path.suffix or '' # Sanitize title for use as filename import re safe_title = re.sub(r'[<>:"/\\|?*]', '', title_value).strip() if not safe_title: return False new_name = safe_title + suffix new_file_path = dir_path / new_name if new_file_path == file_path: return False # Build sidecar paths BEFORE renaming the file old_sidecar = Path(str(file_path) + '.tags') new_sidecar = Path(str(new_file_path) + '.tags') # Rename file try: file_path.rename(new_file_path) log(f"Renamed file: {old_name} → {new_name}") # Rename .tags sidecar if it exists if old_sidecar.exists(): try: old_sidecar.rename(new_sidecar) log(f"Renamed sidecar: {old_name}.tags → {new_name}.tags") except Exception as e: log(f"Failed to rename sidecar: {e}", file=sys.stderr) return True except Exception as e: log(f"Failed to rename file: {e}", file=sys.stderr) return False except Exception as e: log(f"Error during file rename: {e}", file=sys.stderr) return False def _apply_result_updates_from_tags(result: Any, tag_list: List[str]) -> None: """Update result object with title and tag summary from tags.""" try: new_title = _extract_title_from(tag_list) if new_title: setattr(result, "title", new_title) setattr(result, "tag_summary", _summarize_tags(tag_list)) except Exception: pass def _handle_title_rename(old_path: Path, tags_list: List[str]) -> Optional[Path]: """If a title: tag is present, rename the file and its .tags sidecar to match. Returns the new path if renamed, otherwise returns None. """ # Extract title from tags new_title = None for tag in tags_list: if isinstance(tag, str) and tag.lower().startswith('title:'): new_title = tag.split(':', 1)[1].strip() break if not new_title or not old_path.exists(): return None try: # Build new filename with same extension old_name = old_path.name old_suffix = old_path.suffix # Create new filename: title + extension new_name = f"{new_title}{old_suffix}" new_path = old_path.parent / new_name # Don't rename if already the same name if new_path == old_path: return None # Rename the main file if new_path.exists(): log(f"Warning: Target filename already exists: {new_name}", file=sys.stderr) return None old_path.rename(new_path) log(f"Renamed file: {old_name} → {new_name}", file=sys.stderr) # Rename the .tags sidecar if it exists old_tags_path = old_path.parent / (old_name + '.tags') if old_tags_path.exists(): new_tags_path = old_path.parent / (new_name + '.tags') if new_tags_path.exists(): log(f"Warning: Target sidecar already exists: {new_tags_path.name}", file=sys.stderr) else: old_tags_path.rename(new_tags_path) log(f"Renamed sidecar: {old_tags_path.name} → {new_tags_path.name}", file=sys.stderr) return new_path except Exception as exc: log(f"Warning: Failed to rename file: {exc}", file=sys.stderr) return None def _read_sidecar_fallback(p: Path) -> tuple[Optional[str], List[str], List[str]]: """Fallback sidecar reader if metadata module unavailable. Format: - Lines with "hash:" prefix: file hash - Lines with "known_url:" or "url:" prefix: URLs - Lines with "relationship:" prefix: ignored (internal relationships) - Lines with "key:", "namespace:value" format: treated as namespace tags - Plain lines without colons: freeform tags Excluded namespaces (treated as metadata, not tags): hash, known_url, url, relationship """ try: raw = p.read_text(encoding="utf-8", errors="ignore") except OSError: return None, [], [] t: List[str] = [] u: List[str] = [] h: Optional[str] = None # Namespaces to exclude from tags excluded_namespaces = {"hash", "known_url", "url", "relationship"} for line in raw.splitlines(): s = line.strip() if not s: continue low = s.lower() # Check if this is a hash line if low.startswith("hash:"): h = s.split(":", 1)[1].strip() if ":" in s else h # Check if this is a URL line elif low.startswith("known_url:") or low.startswith("url:"): val = s.split(":", 1)[1].strip() if ":" in s else "" if val: u.append(val) # Check if this is an excluded namespace elif ":" in s: namespace = s.split(":", 1)[0].strip().lower() if namespace not in excluded_namespaces: # Include as namespace tag (e.g., "title: The Freemasons") t.append(s) else: # Plain text without colon = freeform tag t.append(s) return h, t, u def _write_sidecar(p: Path, media: Path, tag_list: List[str], known_urls: List[str], hash_in_sidecar: Optional[str]) -> Path: """Write tags to sidecar file and handle title-based renaming. Returns the new media path if renamed, otherwise returns the original media path. """ success = write_sidecar(media, tag_list, known_urls, hash_in_sidecar) if success: _apply_result_updates_from_tags(None, tag_list) # Check if we should rename the file based on title tag new_media = _handle_title_rename(media, tag_list) if new_media: return new_media return media # Fallback writer ordered = [s for s in tag_list if s and s.strip()] lines = [] if hash_in_sidecar: lines.append(f"hash:{hash_in_sidecar}") lines.extend(ordered) for u in known_urls: lines.append(f"known_url:{u}") try: p.write_text("\n".join(lines) + "\n", encoding="utf-8") # Check if we should rename the file based on title tag new_media = _handle_title_rename(media, tag_list) if new_media: return new_media return media except OSError as exc: log(f"Failed to write sidecar: {exc}", file=sys.stderr) return media def _emit_tag_payload(source: str, tags_list: List[str], *, hash_value: Optional[str], extra: Optional[Dict[str, Any]] = None, store_label: Optional[str] = None) -> int: """Emit tags as structured payload to pipeline. Also emits individual tag objects to _PIPELINE_LAST_ITEMS so they can be selected by index. """ payload: Dict[str, Any] = { "source": source, "tags": list(tags_list), "count": len(tags_list), } if hash_value: payload["hash"] = hash_value if extra: for key, value in extra.items(): if value is not None: payload[key] = value label = None if store_label: label = store_label elif ctx._PIPE_ACTIVE: label = "tags" if label: ctx.store_value(label, payload) if ctx._PIPE_ACTIVE and label.lower() != "tags": ctx.store_value("tags", payload) # Emit individual TagItem objects so they can be selected by bare index # When in pipeline, emit individual TagItem objects if ctx._PIPE_ACTIVE: for idx, tag_name in enumerate(tags_list, start=1): tag_item = TagItem( tag_name=tag_name, tag_index=idx, hash_hex=hash_value, source=source, service_name=None ) ctx.emit(tag_item) else: # When not in pipeline, just emit the payload ctx.emit(payload) return 0 def _extract_scrapable_identifiers(tags_list: List[str]) -> Dict[str, str]: """Extract scrapable identifiers from tags.""" identifiers = {} scrapable_prefixes = {'openlibrary', 'isbn_10', 'isbn', 'musicbrainz', 'musicbrainzalbum', 'imdb', 'tmdb', 'tvdb'} for tag in tags_list: if not isinstance(tag, str) or ':' not in tag: continue parts = tag.split(':', 1) if len(parts) != 2: continue key = parts[0].strip().lower() value = parts[1].strip() if key in scrapable_prefixes and value: identifiers[key] = value return identifiers def _scrape_url_metadata(url: str) -> Tuple[Optional[str], List[str], List[Tuple[str, str]], List[Dict[str, Any]]]: """Scrape metadata from a URL using yt-dlp. Returns: (title, tags, formats, playlist_items) tuple where: - title: Video/content title - tags: List of extracted tags (both namespaced and freeform) - formats: List of (display_label, format_id) tuples - playlist_items: List of playlist entry dicts (empty if not a playlist) """ try: import json as json_module try: from metadata import extract_ytdlp_tags except ImportError: extract_ytdlp_tags = None # Build yt-dlp command with playlist support # IMPORTANT: Do NOT use --flat-playlist! It strips metadata like artist, album, uploader, genre # Without it, yt-dlp gives us full metadata in an 'entries' array within a single JSON object # This ensures we get album-level metadata from sources like BandCamp, YouTube Music, etc. cmd = [ "yt-dlp", "-j", # Output JSON "--no-warnings", "--playlist-items", "1-10", # Get first 10 items if it's a playlist (provides entries) "-f", "best", url ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: log(f"yt-dlp error: {result.stderr}", file=sys.stderr) return None, [], [], [] # Parse JSON output - WITHOUT --flat-playlist, we get ONE JSON object with 'entries' array # This gives us full metadata instead of flat format lines = result.stdout.strip().split('\n') if not lines or not lines[0]: log("yt-dlp returned empty output", file=sys.stderr) return None, [], [], [] # Parse the single JSON object try: data = json_module.loads(lines[0]) except json_module.JSONDecodeError as e: log(f"Failed to parse yt-dlp JSON: {e}", file=sys.stderr) return None, [], [], [] # Extract title - use the main title title = data.get('title', 'Unknown') # Determine if this is a playlist/album (has entries array) # is_playlist = 'entries' in data and isinstance(data.get('entries'), list) # Extract tags and playlist items tags = [] playlist_items = [] # IMPORTANT: Extract album/playlist-level tags FIRST (before processing entries) # This ensures we get metadata about the collection, not just individual tracks if extract_ytdlp_tags: album_tags = extract_ytdlp_tags(data) tags.extend(album_tags) # Case 1: Entries are nested in the main object (standard playlist structure) if 'entries' in data and isinstance(data.get('entries'), list): entries = data['entries'] # Build playlist items with title and duration for idx, entry in enumerate(entries, 1): if isinstance(entry, dict): item_title = entry.get('title', entry.get('id', f'Track {idx}')) item_duration = entry.get('duration', 0) playlist_items.append({ 'index': idx, 'id': entry.get('id', f'track_{idx}'), 'title': item_title, 'duration': item_duration, 'url': entry.get('url') or entry.get('webpage_url', ''), }) # Extract tags from each entry and merge (but don't duplicate album-level tags) # Only merge entry tags that are multi-value prefixes (not single-value like title:, artist:, etc.) if extract_ytdlp_tags: entry_tags = extract_ytdlp_tags(entry) # Single-value namespaces that should not be duplicated from entries single_value_namespaces = {'title', 'artist', 'album', 'creator', 'channel', 'release_date', 'upload_date', 'license', 'location'} for tag in entry_tags: # Extract the namespace (part before the colon) tag_namespace = tag.split(':', 1)[0].lower() if ':' in tag else None # Skip if this namespace already exists in tags (from album level) if tag_namespace and tag_namespace in single_value_namespaces: # Check if any tag with this namespace already exists in tags already_has_namespace = any( t.split(':', 1)[0].lower() == tag_namespace for t in tags if ':' in t ) if already_has_namespace: continue # Skip this tag, keep the album-level one if tag not in tags: # Avoid exact duplicates tags.append(tag) # Case 2: Playlist detected by playlist_count field (BandCamp albums, etc.) # These need a separate call with --flat-playlist to get the actual entries elif (data.get('playlist_count') or 0) > 0 and 'entries' not in data: try: # Make a second call with --flat-playlist to get the actual tracks flat_cmd = [ "yt-dlp", "-j", "--no-warnings", "--flat-playlist", "-f", "best", url ] flat_result = subprocess.run(flat_cmd, capture_output=True, text=True, timeout=30) if flat_result.returncode == 0: flat_lines = flat_result.stdout.strip().split('\n') # With --flat-playlist, each line is a separate track JSON object # (not nested in a playlist container), so process ALL lines for idx, line in enumerate(flat_lines, 1): if line.strip().startswith('{'): try: entry = json_module.loads(line) item_title = entry.get('title', entry.get('id', f'Track {idx}')) item_duration = entry.get('duration', 0) playlist_items.append({ 'index': idx, 'id': entry.get('id', f'track_{idx}'), 'title': item_title, 'duration': item_duration, 'url': entry.get('url') or entry.get('webpage_url', ''), }) except json_module.JSONDecodeError: pass except Exception as e: pass # Silently ignore if we can't get playlist entries # Fallback: if still no tags detected, get from first item if not tags and extract_ytdlp_tags: tags = extract_ytdlp_tags(data) # Extract formats from the main data object formats = [] if 'formats' in data: formats = _extract_url_formats(data.get('formats', [])) # Deduplicate tags by namespace to prevent duplicate title:, artist:, etc. try: from metadata import dedup_tags_by_namespace as _dedup if _dedup: tags = _dedup(tags, keep_first=True) except Exception: pass # If dedup fails, return tags as-is return title, tags, formats, playlist_items except subprocess.TimeoutExpired: log("yt-dlp timeout (>30s)", file=sys.stderr) return None, [], [], [] except Exception as e: log(f"URL scraping error: {e}", file=sys.stderr) return None, [], [], [] def _extract_url_formats(formats: list) -> List[Tuple[str, str]]: """Extract best formats from yt-dlp formats list. Returns list of (display_label, format_id) tuples. """ try: video_formats = {} # {resolution: format_data} audio_formats = {} # {quality_label: format_data} for fmt in formats: vcodec = fmt.get('vcodec', 'none') acodec = fmt.get('acodec', 'none') height = fmt.get('height') ext = fmt.get('ext', 'unknown') format_id = fmt.get('format_id', '') tbr = fmt.get('tbr', 0) abr = fmt.get('abr', 0) # Video format if vcodec and vcodec != 'none' and height: if height < 480: continue res_key = f"{height}p" if res_key not in video_formats or tbr > video_formats[res_key].get('tbr', 0): video_formats[res_key] = { 'label': f"{height}p ({ext})", 'format_id': format_id, 'tbr': tbr, } # Audio-only format elif acodec and acodec != 'none' and (not vcodec or vcodec == 'none'): audio_key = f"audio_{abr}" if audio_key not in audio_formats or abr > audio_formats[audio_key].get('abr', 0): audio_formats[audio_key] = { 'label': f"audio ({ext})", 'format_id': format_id, 'abr': abr, } result = [] # Add video formats in descending resolution order for res in sorted(video_formats.keys(), key=lambda x: int(x.replace('p', '')), reverse=True): fmt = video_formats[res] result.append((fmt['label'], fmt['format_id'])) # Add best audio format if audio_formats: best_audio = max(audio_formats.values(), key=lambda x: x.get('abr', 0)) result.append((best_audio['label'], best_audio['format_id'])) return result except Exception as e: log(f"Error extracting formats: {e}", file=sys.stderr) return [] def _scrape_isbn_metadata(isbn: str) -> List[str]: """Scrape metadata for an ISBN using Open Library API.""" new_tags = [] try: from ..helper.http_client import HTTPClient import json as json_module isbn_clean = isbn.replace('-', '').strip() url = f"https://openlibrary.org/api/books?bibkeys=ISBN:{isbn_clean}&jscmd=data&format=json" try: with HTTPClient() as client: response = client.get(url) response.raise_for_status() data = json_module.loads(response.content.decode('utf-8')) except Exception as e: log(f"Failed to fetch ISBN metadata: {e}", file=sys.stderr) return [] if not data: log(f"No ISBN metadata found for: {isbn}") return [] book_data = next(iter(data.values()), None) if not book_data: return [] if 'title' in book_data: new_tags.append(f"title:{book_data['title']}") if 'authors' in book_data and isinstance(book_data['authors'], list): for author in book_data['authors'][:3]: if 'name' in author: new_tags.append(f"author:{author['name']}") if 'publish_date' in book_data: new_tags.append(f"publish_date:{book_data['publish_date']}") if 'publishers' in book_data and isinstance(book_data['publishers'], list): for pub in book_data['publishers'][:1]: if 'name' in pub: new_tags.append(f"publisher:{pub['name']}") if 'description' in book_data: desc = book_data['description'] if isinstance(desc, dict) and 'value' in desc: desc = desc['value'] if desc: desc_str = str(desc).strip() # Include description if available (limit to 200 chars to keep it manageable) if len(desc_str) > 0: new_tags.append(f"description:{desc_str[:200]}") if 'number_of_pages' in book_data: page_count = book_data['number_of_pages'] if page_count and isinstance(page_count, int) and page_count > 0: new_tags.append(f"pages:{page_count}") if 'identifiers' in book_data and isinstance(book_data['identifiers'], dict): identifiers = book_data['identifiers'] if 'openlibrary' in identifiers: ol_ids = identifiers['openlibrary'] if isinstance(ol_ids, list) and ol_ids: new_tags.append(f"openlibrary:{ol_ids[0]}") elif isinstance(ol_ids, str): new_tags.append(f"openlibrary:{ol_ids}") if 'lccn' in identifiers: lccn_list = identifiers['lccn'] if isinstance(lccn_list, list) and lccn_list: new_tags.append(f"lccn:{lccn_list[0]}") elif isinstance(lccn_list, str): new_tags.append(f"lccn:{lccn_list}") if 'oclc' in identifiers: oclc_list = identifiers['oclc'] if isinstance(oclc_list, list) and oclc_list: new_tags.append(f"oclc:{oclc_list[0]}") elif isinstance(oclc_list, str): new_tags.append(f"oclc:{oclc_list}") if 'goodreads' in identifiers: goodreads_list = identifiers['goodreads'] if isinstance(goodreads_list, list) and goodreads_list: new_tags.append(f"goodreads:{goodreads_list[0]}") elif isinstance(goodreads_list, str): new_tags.append(f"goodreads:{goodreads_list}") if 'librarything' in identifiers: lt_list = identifiers['librarything'] if isinstance(lt_list, list) and lt_list: new_tags.append(f"librarything:{lt_list[0]}") elif isinstance(lt_list, str): new_tags.append(f"librarything:{lt_list}") if 'doi' in identifiers: doi_list = identifiers['doi'] if isinstance(doi_list, list) and doi_list: new_tags.append(f"doi:{doi_list[0]}") elif isinstance(doi_list, str): new_tags.append(f"doi:{doi_list}") if 'internet_archive' in identifiers: ia_list = identifiers['internet_archive'] if isinstance(ia_list, list) and ia_list: new_tags.append(f"internet_archive:{ia_list[0]}") elif isinstance(ia_list, str): new_tags.append(f"internet_archive:{ia_list}") log(f"Found {len(new_tags)} tag(s) from ISBN lookup") return new_tags except Exception as e: log(f"ISBN scraping error: {e}", file=sys.stderr) return [] def _scrape_openlibrary_metadata(olid: str) -> List[str]: """Scrape metadata for an OpenLibrary ID using the .json API endpoint. Fetches from https://openlibrary.org/books/{OLID}.json and extracts: - Title, authors, publish date, publishers - Description - Subjects as freeform tags (without namespace prefix) - Identifiers (ISBN, LCCN, OCLC, etc.) """ new_tags = [] try: from ..helper.http_client import HTTPClient import json as json_module # Format: OL9674499M or just 9674499M olid_clean = olid.replace('OL', '').replace('M', '') if not olid_clean.isdigit(): olid_clean = olid # Ensure we have the full OLID format for the URL if not olid.startswith('OL'): url = f"https://openlibrary.org/books/OL{olid_clean}M.json" else: url = f"https://openlibrary.org/books/{olid}.json" try: with HTTPClient() as client: response = client.get(url) response.raise_for_status() data = json_module.loads(response.content.decode('utf-8')) except Exception as e: log(f"Failed to fetch OpenLibrary metadata: {e}", file=sys.stderr) return [] if not data: log(f"No OpenLibrary metadata found for: {olid}") return [] # Add title if 'title' in data: new_tags.append(f"title:{data['title']}") # Add authors if 'authors' in data and isinstance(data['authors'], list): for author in data['authors'][:3]: if isinstance(author, dict) and 'name' in author: new_tags.append(f"author:{author['name']}") elif isinstance(author, str): new_tags.append(f"author:{author}") # Add publish date if 'publish_date' in data: new_tags.append(f"publish_date:{data['publish_date']}") # Add publishers if 'publishers' in data and isinstance(data['publishers'], list): for pub in data['publishers'][:1]: if isinstance(pub, dict) and 'name' in pub: new_tags.append(f"publisher:{pub['name']}") elif isinstance(pub, str): new_tags.append(f"publisher:{pub}") # Add description if 'description' in data: desc = data['description'] if isinstance(desc, dict) and 'value' in desc: desc = desc['value'] if desc: desc_str = str(desc).strip() if len(desc_str) > 0: new_tags.append(f"description:{desc_str[:200]}") # Add number of pages if 'number_of_pages' in data: page_count = data['number_of_pages'] if page_count and isinstance(page_count, int) and page_count > 0: new_tags.append(f"pages:{page_count}") # Add subjects as FREEFORM tags (no namespace prefix) if 'subjects' in data and isinstance(data['subjects'], list): for subject in data['subjects'][:10]: if subject and isinstance(subject, str): subject_clean = str(subject).strip() if subject_clean and subject_clean not in new_tags: new_tags.append(subject_clean) # Add identifiers if 'identifiers' in data and isinstance(data['identifiers'], dict): identifiers = data['identifiers'] if 'isbn_10' in identifiers: isbn_10_list = identifiers['isbn_10'] if isinstance(isbn_10_list, list) and isbn_10_list: new_tags.append(f"isbn_10:{isbn_10_list[0]}") elif isinstance(isbn_10_list, str): new_tags.append(f"isbn_10:{isbn_10_list}") if 'isbn_13' in identifiers: isbn_13_list = identifiers['isbn_13'] if isinstance(isbn_13_list, list) and isbn_13_list: new_tags.append(f"isbn_13:{isbn_13_list[0]}") elif isinstance(isbn_13_list, str): new_tags.append(f"isbn_13:{isbn_13_list}") if 'lccn' in identifiers: lccn_list = identifiers['lccn'] if isinstance(lccn_list, list) and lccn_list: new_tags.append(f"lccn:{lccn_list[0]}") elif isinstance(lccn_list, str): new_tags.append(f"lccn:{lccn_list}") if 'oclc_numbers' in identifiers: oclc_list = identifiers['oclc_numbers'] if isinstance(oclc_list, list) and oclc_list: new_tags.append(f"oclc:{oclc_list[0]}") elif isinstance(oclc_list, str): new_tags.append(f"oclc:{oclc_list}") if 'goodreads' in identifiers: goodreads_list = identifiers['goodreads'] if isinstance(goodreads_list, list) and goodreads_list: new_tags.append(f"goodreads:{goodreads_list[0]}") elif isinstance(goodreads_list, str): new_tags.append(f"goodreads:{goodreads_list}") log(f"Found {len(new_tags)} tag(s) from OpenLibrary lookup") return new_tags except Exception as e: log(f"OpenLibrary scraping error: {e}", file=sys.stderr) return [] def _perform_scraping(tags_list: List[str]) -> List[str]: """Perform scraping based on identifiers in tags. Priority order: 1. openlibrary: (preferred - more complete metadata) 2. isbn_10 or isbn (fallback) """ identifiers = _extract_scrapable_identifiers(tags_list) if not identifiers: log("No scrapable identifiers found (openlibrary, ISBN, musicbrainz, imdb)") return [] log(f"Found scrapable identifiers: {', '.join(identifiers.keys())}") new_tags = [] # Prefer OpenLibrary over ISBN (more complete metadata) if 'openlibrary' in identifiers: olid = identifiers['openlibrary'] if olid: log(f"Scraping OpenLibrary: {olid}") new_tags.extend(_scrape_openlibrary_metadata(olid)) elif 'isbn_10' in identifiers or 'isbn' in identifiers: isbn = identifiers.get('isbn_10') or identifiers.get('isbn') if isbn: log(f"Scraping ISBN: {isbn}") new_tags.extend(_scrape_isbn_metadata(isbn)) existing_tags_lower = {tag.lower() for tag in tags_list} scraped_unique = [] seen = set() for tag in new_tags: tag_lower = tag.lower() if tag_lower not in existing_tags_lower and tag_lower not in seen: scraped_unique.append(tag) seen.add(tag_lower) if scraped_unique: log(f"Added {len(scraped_unique)} new tag(s) from scraping") return scraped_unique def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Get tags from Hydrus, local sidecar, or URL metadata. Usage: get-tag [-hash ] [--store ] [--emit] get-tag -scrape Options: -hash : Override hash to use instead of result's hash_hex --store : Store result to this key for pipeline --emit: Emit result without interactive prompt (quiet mode) -scrape : Scrape metadata from URL (returns tags as JSON) """ # Helper to get field from both dict and object def get_field(obj: Any, field: str, default: Any = None) -> Any: if isinstance(obj, dict): return obj.get(field, default) else: return getattr(obj, field, default) # Parse arguments using shared parser parsed_args = parse_cmdlet_args(args, CMDLET) # Extract values hash_override = normalize_hash(parsed_args.get("hash")) store_key = parsed_args.get("store") emit_requested = parsed_args.get("emit", False) scrape_url = parsed_args.get("scrape") scrape_requested = scrape_url is not None # Handle URL scraping mode if scrape_requested and scrape_url: import json as json_module # Don't print debug message - output should be JSON only for programmatic consumption # logger.debug(f"Scraping URL: {scrape_url}") title, tags, formats, playlist_items = _scrape_url_metadata(scrape_url) if not tags: log("No tags extracted from URL", file=sys.stderr) return 1 # Build result object # result_obj = TagItem("url_scrape", tag_index=0, hash_hex=None, source="url", service_name=None) # result_obj.title = title or "URL Content" # Emit tags as JSON for pipeline consumption (output should be pure JSON on stdout) output = { "title": title, "tags": tags, "formats": [(label, fmt_id) for label, fmt_id in formats], "playlist_items": playlist_items, } # Use print() directly to stdout for JSON output (NOT log() which adds prefix) # This ensures the output is capturable by the download modal and other pipelines # The modal filters for lines starting with '{' so the prefix breaks parsing print(json_module.dumps(output, ensure_ascii=False)) return 0 # If -scrape was requested but no URL, that's an error if scrape_requested and not scrape_url: log("-scrape requires a URL argument", file=sys.stderr) return 1 # Handle @N selection which creates a list - extract the first item if isinstance(result, list) and len(result) > 0: result = result[0] hash_from_result = normalize_hash(get_field(result, "hash_hex", None)) hash_hex = hash_override or hash_from_result # Only use emit mode if explicitly requested with --emit flag, not just because we're in a pipeline # This allows interactive REPL to work even in pipelines emit_mode = emit_requested or bool(store_key) store_label = (store_key.strip() if store_key and store_key.strip() else None) # Check Hydrus availability hydrus_available, _ = hydrus.is_available(config) # Try to find path in result object local_path = get_field(result, "target", None) or get_field(result, "path", None) or get_field(result, "file_path", None) # Determine if local file is_local_file = False media: Optional[Path] = None if local_path and isinstance(local_path, str) and not local_path.startswith(("http://", "https://")): is_local_file = True try: media = Path(str(local_path)) except Exception: media = None # Try Hydrus first (always prioritize if available and has hash) use_hydrus = False hydrus_meta = None # Cache the metadata from first fetch client = None if hash_hex and hydrus_available: try: client = hydrus.get_client(config) payload = client.fetch_file_metadata(hashes=[str(hash_hex)], include_service_keys_to_tags=True, include_file_urls=False) items = payload.get("metadata") if isinstance(payload, dict) else None if isinstance(items, list) and items: meta = items[0] if isinstance(items[0], dict) else None # Only accept file if it has a valid file_id (not None) if isinstance(meta, dict) and meta.get("file_id") is not None: use_hydrus = True hydrus_meta = meta # Cache for tag extraction except Exception: pass # Get tags - try Hydrus first, fallback to sidecar current = [] service_name = "" service_key = None source = "unknown" if use_hydrus and hash_hex and hydrus_meta: try: # Use cached metadata from above, don't fetch again service_name = hydrus.get_tag_service_name(config) if client is None: client = hydrus.get_client(config) service_key = hydrus.get_tag_service_key(client, service_name) current = _extract_my_tags_from_hydrus_meta(hydrus_meta, service_key, service_name) source = "hydrus" except Exception as exc: log(f"Warning: Failed to extract tags from Hydrus: {exc}", file=sys.stderr) # Fallback to local sidecar or local DB if no tags if not current and is_local_file and media and media.exists(): try: # First try local library DB library_root = get_local_storage_path(config) if library_root: try: with LocalLibraryDB(library_root) as db: db_tags = db.get_tags(media) if db_tags: current = db_tags source = "local_db" except Exception as exc: log(f"[get_tag] DB lookup failed, trying sidecar: {exc}", file=sys.stderr) # Fall back to sidecar if DB didn't have tags if not current: sidecar_path = find_sidecar(media) if sidecar_path and sidecar_path.exists(): try: _, current, _ = read_sidecar(sidecar_path) except Exception: _, current, _ = _read_sidecar_fallback(sidecar_path) if current: source = "sidecar" except Exception as exc: log(f"Warning: Failed to load tags from local storage: {exc}", file=sys.stderr) # Fallback to tags in the result object if Hydrus/local lookup returned nothing if not current: # Check if result has 'tags' attribute (PipeObject) if hasattr(result, 'tags') and getattr(result, 'tags', None): current = getattr(result, 'tags') source = "pipeline_result" # Check if result is a dict with 'tags' key elif isinstance(result, dict) and 'tags' in result: tags_val = result['tags'] if isinstance(tags_val, list): current = tags_val source = "pipeline_result" source = "pipeline_result" # Error if no tags found if not current: log("No tags found", file=sys.stderr) return 1 # Always output to ResultTable (pipeline mode only) # Extract title for table header item_title = get_field(result, "title", None) or get_field(result, "name", None) or get_field(result, "filename", None) if source == "hydrus": _emit_tags_as_table(current, hash_hex=hash_hex, source="hydrus", service_name=service_name, config=config, item_title=item_title) else: _emit_tags_as_table(current, hash_hex=hash_hex, source="local", service_name=None, config=config, item_title=item_title, file_path=str(local_path) if local_path else None) # If emit requested or store key provided, emit payload if emit_mode: _emit_tag_payload(source, current, hash_value=hash_hex, store_label=store_label) return 0 CMDLET = Cmdlet( name="get-tag", summary="Get tags from Hydrus or local sidecar metadata", usage="get-tag [-hash ] [--store ] [--emit] [-scrape ]", aliases=["tags"], args=[ SharedArgs.HASH, CmdletArg( name="-store", type="string", description="Store result to this key for pipeline", alias="store" ), CmdletArg( name="-emit", type="flag", description="Emit result without interactive prompt (quiet mode)", alias="emit-only" ), CmdletArg( name="-scrape", type="string", description="Scrape metadata from URL (returns tags as JSON)", required=False ) ] )