"""MPV file metadata aggregation helpers.""" from __future__ import annotations import os import re from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence from urllib.parse import parse_qs, urlparse, unquote from config import get_hydrus_url from helper.utils import sha256_file, unique_preserve_order from helper.hydrus import HydrusClient, HydrusRequestError import metadata class MPVFileError(RuntimeError): """Raised when we cannot construct an MPV file snapshot.""" @dataclass(slots=True) class DebridMagnet: """Represents a magnet result from AllDebrid search. This class matches the structure expected by the TUI (like Hydrus results) with title, target, media_kind attributes for compatibility. """ magnet_id: str title: str size: int status_code: int status_text: str progress: float downloaded: int seeders: int dl_speed: int tag_summary: Optional[str] = None metadata: Optional[Dict[str, Any]] = None # Complete magnet file metadata from AllDebrid API @property def target(self) -> str: """Return the target URI for this magnet (used by TUI for access operations).""" return f"alldebrid://{self.magnet_id}" @property def media_kind(self) -> str: """Return media kind for display.""" return "magnet" def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for metadata display.""" return { "magnet_id": self.magnet_id, "title": self.title, "size": self.size, "status_code": self.status_code, "status_text": self.status_text, "progress": f"{self.progress:.1f}%", "downloaded": self.downloaded, "seeders": self.seeders, "dl_speed": self.dl_speed, } @dataclass(slots=True) class HydrusSettings: base_url: Optional[str] access_key: Optional[str] timeout: float prefer_service_name: Optional[str] include_relationships: bool def as_metadata_options(self) -> Dict[str, Any]: options: Dict[str, Any] = { "timeout": self.timeout, "include_relationships": self.include_relationships, } if self.prefer_service_name: options["prefer_service_name"] = self.prefer_service_name return options @dataclass(slots=True) class MPVfile: path: Optional[str] = None filename: Optional[str] = None type: str = "unknown" hash: Optional[str] = None local_path: Optional[str] = None mpv_metadata: Dict[str, Any] = field(default_factory=dict) metadata: Dict[str, Any] = field(default_factory=dict) remote_metadata: Optional[Dict[str, Any]] = None relationships: Optional[Dict[str, Any]] = None relationship_metadata: Dict[str, Any] = field(default_factory=dict) tags: List[str] = field(default_factory=list) original_tags: Dict[str, str] = field(default_factory=dict) known_urls: List[str] = field(default_factory=list) title: Optional[str] = None source_url: Optional[str] = None clip_time: Optional[str] = None duration: Optional[float] = None filesize_mb: Optional[float] = None is_video: bool = False is_audio: bool = False is_deleted: Optional[bool] = None is_local: Optional[bool] = None has_current_file_service: Optional[bool] = None tag_service_key: Optional[str] = None swap_recommended: bool = False warnings: List[str] = field(default_factory=list) # New relationship fields for menu king: Optional[str] = None alts: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: payload: Dict[str, Any] = { "path": self.path, "filename": self.filename, "type": self.type, "hash": self.hash, "local_path": self.local_path, "mpv_metadata": self.mpv_metadata, "metadata": self.metadata, "remote_metadata": self.remote_metadata, "relationships": self.relationships, "relationship_metadata": self.relationship_metadata, "tags": self.tags, "original_tags": self.original_tags, "known_urls": self.known_urls, "title": self.title, "source_url": self.source_url, "clip_time": self.clip_time, "duration": self.duration, "filesize_mb": self.filesize_mb, "is_video": self.is_video, "is_audio": self.is_audio, "is_deleted": self.is_deleted, "is_local": self.is_local, "has_current_file_service": self.has_current_file_service, "tag_service_key": self.tag_service_key, "swap_recommended": self.swap_recommended, "warnings": self.warnings, # relationship summary fields for easier Lua consumption "king": self.king, "alts": self.alts, } # Remove empty optional values for terser payloads. for key in list(payload.keys()): value = payload[key] if value in (None, [], {}, ""): del payload[key] return payload def _normalise_string_list(values: Optional[Iterable[Any]]) -> List[str]: if not values: return [] seen: set[str] = set() result: List[str] = [] for value in values: if value is None: continue text = str(value).strip() if not text or text in seen: continue seen.add(text) result.append(text) return result def _looks_like_hash(value: Optional[str]) -> bool: if not value: return False candidate = value.strip().lower() return len(candidate) == 64 and all(ch in "0123456789abcdef" for ch in candidate) class MPVFileBuilder: def __init__(self, payload: Dict[str, Any], config: Dict[str, Any]): self.payload = payload or {} self.config = config or {} self.state = MPVfile() self.hydrus_settings = self._resolve_hydrus_settings() self.remote_options = self._resolve_remote_options() self.include_relationships = bool(self.payload.get("include_relationships", True)) self.last_url = self._normalise_url(self.payload.get("last_url")) self._initialise_identity() # ------------------------------------------------------------------ # public API # ------------------------------------------------------------------ def build(self) -> Dict[str, Any]: if self.state.type == "hydrus": self._populate_hydrus_by_hash() elif self.state.type == "local": self._populate_local() elif self.state.type == "remote": self._populate_remote() else: # Attempt best effort resolution even for unknown types. self._populate_local(best_effort=True) self._finalise() result = self.state.to_dict() # Append King and Alts info to mpv_metadata for info menu king = self.state.king alts = self.state.alts if king: result.setdefault("mpv_metadata", {})["King"] = king if alts: result.setdefault("mpv_metadata", {})["Alts"] = ", ".join(alts) return result # ------------------------------------------------------------------ # configuration helpers # ------------------------------------------------------------------ def _resolve_hydrus_settings(self) -> HydrusSettings: overrides = self.payload.get("hydrus") overrides = overrides if isinstance(overrides, dict) else {} base_url = overrides.get("url") or overrides.get("base_url") access_key = overrides.get("access_key") timeout_raw = overrides.get("timeout") or overrides.get("hydrus_timeout") prefer_service = overrides.get("prefer_service_name") include_relationships = overrides.get("include_relationships") if base_url is None: base_url = get_hydrus_url(self.config) if access_key is None: raw_key = self.config.get("HydrusNetwork_Access_Key") access_key = str(raw_key) if raw_key is not None else None if timeout_raw is None: timeout_raw = self.config.get("HydrusNetwork_Request_Timeout") try: timeout = float(timeout_raw) if timeout_raw is not None else 60.0 except (TypeError, ValueError): timeout = 60.0 if prefer_service is None: prefer_service = self.config.get("Hydrus_Tag_Service") if isinstance(prefer_service, str): prefer_service = prefer_service.strip() or None if include_relationships is None: include_relationships = self.payload.get("include_relationships") include_relationships = bool(True if include_relationships is None else include_relationships) base_url = base_url.strip() if isinstance(base_url, str) else None access_key = access_key.strip() if isinstance(access_key, str) else None return HydrusSettings( base_url=base_url or None, access_key=access_key or None, timeout=timeout, prefer_service_name=prefer_service, include_relationships=include_relationships, ) def _resolve_remote_options(self) -> Dict[str, Any]: remote_payload = self.payload.get("remote") remote_payload = remote_payload if isinstance(remote_payload, dict) else {} options = remote_payload.get("options") options = options if isinstance(options, dict) else {} ytdlp_args = options.get("ytdlp_args") if not ytdlp_args: options["ytdlp_args"] = ["--no-playlist", "--skip-download", "--no-warnings"] existing_timeout = options.get("timeout") if existing_timeout is None: options["timeout"] = min(90.0, max(10.0, float(self.payload.get("remote_timeout") or 45.0))) return options # ------------------------------------------------------------------ # initialisation # ------------------------------------------------------------------ def _initialise_identity(self) -> None: s = self.state p = self.payload def _str_or_none(v): return str(v) if v is not None and v != "" else None def _copy_dict_if_dict(v): return dict(v) if isinstance(v, dict) else {} # path and filename s.path = _str_or_none(p.get("path")) s.filename = _str_or_none(p.get("filename")) # mpv metadata s.mpv_metadata = _copy_dict_if_dict(p.get("mpv_metadata")) # tags (support both "tags" and legacy "existing_tags") existing_tags = p.get("tags") or p.get("existing_tags") s.tags = _normalise_string_list(existing_tags) if s.tags: s.original_tags = {tag: tag for tag in s.tags} # known URLs + last_url s.known_urls = _normalise_string_list(p.get("known_urls")) if self.last_url and self.last_url not in s.known_urls: s.known_urls.append(self.last_url) # source URL (explicit or fallback to last_url) explicit_source = p.get("source_url") s.source_url = self._normalise_url(explicit_source) or self.last_url # hash (validate looks-like-hash) hash_candidate = p.get("hash") if isinstance(hash_candidate, str): candidate = hash_candidate.strip().lower() if _looks_like_hash(candidate): s.hash = candidate # local_path (non-empty string) local_path_override = p.get("local_path") if isinstance(local_path_override, str): lp = local_path_override.strip() if lp: s.local_path = lp # derive remaining fields from path/filename/type self._derive_filename_from_path() self._determine_type() def _derive_filename_from_path(self) -> None: if self.state.filename or not self.state.path: return parsed = urlparse(self.state.path) if parsed.scheme in ("http", "https", "ytdl") and parsed.path: candidate = Path(parsed.path).name if candidate: self.state.filename = candidate elif parsed.scheme == "file": decoded = self._decode_file_url(self.state.path) if decoded: self.state.filename = Path(decoded).name else: try: self.state.filename = Path(self.state.path).name except Exception: pass def _determine_type(self) -> None: s = self.state p = self.payload def _set_local_from_path(pth: str | None): if not pth: return # Prefer resolved local path when available resolved = self._resolve_local_path(pth) s.local_path = resolved if resolved else pth s.type = "local" # 1) Respect explicit type when valid explicit = p.get("type") if isinstance(explicit, str): lowered = explicit.strip().lower() if lowered in {"local", "hydrus", "remote"}: s.type = lowered if lowered == "local": s.local_path = self._resolve_local_path(s.path) return # 2) Work from path path = s.path or "" if not path: s.type = "unknown" return # 3) Hydrus-specific quick checks if self._looks_like_hydrus_url(path): s.type = "hydrus" return parsed = urlparse(path) scheme = (parsed.scheme or "").lower() # 4) scheme-based handling if scheme == "hydrus": s.type = "hydrus" return if scheme in {"http", "https", "rtmp", "rtsp", "magnet", "ytdl"}: s.type = "hydrus" if self._looks_like_hydrus_url(path) else "remote" return if scheme == "file": decoded = self._decode_file_url(path) if decoded: s.local_path = decoded s.type = "local" return # 5) Windows/UNC absolute paths if re.match(r"^[A-Za-z]:[\\/]", path) or path.startswith(("\\\\", "//")): s.type = "local" s.local_path = path return # 6) Fallback: if it looks like a URL with a scheme separator treat as remote/hydrus if "://" in path: s.type = "hydrus" if self._looks_like_hydrus_url(path) else "remote" return # 7) Otherwise treat as a local path _set_local_from_path(path) # ------------------------------------------------------------------ # population helpers # ------------------------------------------------------------------ def _populate_local(self, best_effort: bool = False) -> None: local_path = self.state.local_path or self._resolve_local_path(self.state.path) if local_path: self.state.local_path = local_path self._load_sidecar_tags(local_path) if not self.state.hash: self._compute_local_hash(local_path) # If Hydrus is configured and we have a hash, enrich from Hydrus; otherwise keep local tags only if self.state.hash and self.hydrus_settings.base_url and self.hydrus_settings.access_key: self._populate_hydrus_by_hash() elif best_effort and self.hydrus_settings.base_url and self.state.source_url and self.hydrus_settings.access_key: self._populate_hydrus_by_url(self.state.source_url) # (helpers for resolving local path and loading sidecars already exist below) def _populate_remote(self) -> None: source_url = self.state.source_url or self.last_url or self.state.path source_url = self._normalise_url(source_url) if source_url: self.state.source_url = source_url remote_payload = { "source_url": self.state.source_url, "existing_tags": self.state.tags, "metadata": self.payload.get("remote_metadata"), "mpv_metadata": self.state.mpv_metadata, "options": self.remote_options, } try: remote_result = metadata.resolve_remote_metadata(remote_payload) except Exception as exc: # pragma: no cover - surfaced to the caller self.state.warnings.append(str(exc)) remote_result = None if remote_result: tags = remote_result.get("tags") or [] self._merge_tags(tags) self.state.remote_metadata = remote_result.get("metadata") self.state.title = remote_result.get("title") or self.state.title self.state.duration = remote_result.get("duration") or self.state.duration self.state.source_url = remote_result.get("source_url") or self.state.source_url warnings = remote_result.get("warnings") or [] if warnings: self.state.warnings.extend(warnings) if self.hydrus_settings.base_url and self.state.source_url: self._populate_hydrus_by_url(self.state.source_url) def _populate_hydrus_by_hash(self) -> None: hash_hex = self.state.hash or self._extract_hash_from_path(self.state.path) if hash_hex and not _looks_like_hash(hash_hex): hash_hex = None if not hash_hex: return self.state.hash = hash_hex if not self.hydrus_settings.base_url: return payload: Dict[str, Any] = { "api_url": self.hydrus_settings.base_url, "access_key": self.hydrus_settings.access_key or "", "options": self.hydrus_settings.as_metadata_options(), "hash": hash_hex, } try: result = metadata.fetch_hydrus_metadata(payload) except Exception as exc: # pragma: no cover - surfaced to caller self.state.warnings.append(str(exc)) return self._apply_hydrus_result(result) # Enrich relationships using the dedicated Hydrus endpoint (robust GET) if self.include_relationships and self.state.hash and self.hydrus_settings.base_url: self._enrich_relationships_from_api(self.state.hash) def _populate_hydrus_by_url(self, url: str) -> None: if not self.hydrus_settings.base_url: return payload: Dict[str, Any] = { "api_url": self.hydrus_settings.base_url, "access_key": self.hydrus_settings.access_key or "", "options": self.hydrus_settings.as_metadata_options(), "url": url, } try: result = metadata.fetch_hydrus_metadata_by_url(payload) except Exception as exc: # pragma: no cover - surfaced to caller self.state.warnings.append(str(exc)) return if result.get("error") == "not_found": self.state.warnings.extend(result.get("warnings") or []) return self._apply_hydrus_result(result) self.state.type = "hydrus" matched_url = result.get("matched_url") or result.get("url") if matched_url and matched_url not in self.state.known_urls: self.state.known_urls.append(matched_url) # Enrich relationships once we know the hash if self.include_relationships and self.state.hash and self.hydrus_settings.base_url: self._enrich_relationships_from_api(self.state.hash) # ------------------------------------------------------------------ # state modification helpers # ------------------------------------------------------------------ def _apply_hydrus_result(self, result: Dict[str, Any]) -> None: metadata_payload = result.get("metadata") if isinstance(metadata_payload, dict): # Process mime into type for Lua mime = metadata_payload.get("mime") if isinstance(mime, str): if mime.startswith("video/"): metadata_payload["type"] = "video" elif mime.startswith("audio/"): metadata_payload["type"] = "audio" elif mime.startswith("image/"): metadata_payload["type"] = "image" else: metadata_payload["type"] = "other" self.state.metadata = metadata_payload # Do NOT overwrite MPVfile.type with metadata.type self._merge_known_urls(metadata_payload.get("known_urls") or metadata_payload.get("known_urls_set")) source_url = metadata_payload.get("original_url") or metadata_payload.get("source_url") if source_url and not self.state.source_url: self.state.source_url = self._normalise_url(source_url) # If file_relationships are embedded in metadata, capture as relationships when missing if self.state.relationships is None: embedded = metadata_payload.get("file_relationships") if isinstance(embedded, dict) and embedded: self.state.relationships = embedded tags = result.get("tags") or [] self._merge_tags(tags) hash_value = result.get("hash") or result.get("matched_hash") if isinstance(hash_value, str) and _looks_like_hash(hash_value): self.state.hash = hash_value.lower() self.state.tag_service_key = result.get("tag_service_key") or self.state.tag_service_key self.state.duration = result.get("duration") or self.state.duration self.state.filesize_mb = result.get("filesize_mb") or self.state.filesize_mb self.state.is_video = bool(result.get("is_video") or self.state.is_video) self.state.is_audio = bool(result.get("is_audio") or self.state.is_audio) if result.get("is_deleted") is not None: self.state.is_deleted = bool(result.get("is_deleted")) if result.get("is_local") is not None: self.state.is_local = bool(result.get("is_local")) if result.get("has_current_file_service") is not None: self.state.has_current_file_service = bool(result.get("has_current_file_service")) # Consolidate relationships from explicit result or embedded metadata relationships_obj: Optional[Dict[str, Any]] = None if isinstance(result.get("relationships"), dict): relationships_obj = result["relationships"] self.state.relationships = relationships_obj elif isinstance(self.state.relationships, dict): relationships_obj = self.state.relationships # Helper to flatten any hashes from the relationships object def _collect_hashes(obj: Any, acc: set[str]) -> None: if obj is None: return if isinstance(obj, dict): for v in obj.values(): _collect_hashes(v, acc) elif isinstance(obj, (list, tuple, set)): for v in obj: _collect_hashes(v, acc) elif isinstance(obj, str) and _looks_like_hash(obj): acc.add(obj.lower()) # Derive king and alts robustly from available data king: Optional[str] = None alts: list[str] = [] # 1) Try direct king fields on relationships object rels = relationships_obj or {} if isinstance(rels, dict): # Common variants for key in ("king", "king_hash", "duplicate_king", "best", "best_hash"): val = rels.get(key) if isinstance(val, str) and _looks_like_hash(val): king = val.lower() break if isinstance(val, list): for h in val: if isinstance(h, str) and _looks_like_hash(h): king = h.lower() break if king: break # 2) Extract alternates from known fields: numeric "3" (clips), or textual synonyms for alt_key in ("3", "alternates", "alts", "clips"): val = rels.get(alt_key) if isinstance(val, list): for h in val: if isinstance(h, str) and _looks_like_hash(h): h_low = h.lower() if not king or h_low != king: alts.append(h_low) # some APIs might nest elif isinstance(val, dict): tmp: set[str] = set() _collect_hashes(val, tmp) for h in sorted(tmp): if not king or h != king: alts.append(h) # 3) Use relationship_metadata keys as additional alternates and king hint rel_meta = result.get("relationship_metadata") if isinstance(rel_meta, dict): # prefer king candidate with no clip_time if not set if not king: for h, meta in rel_meta.items(): if isinstance(h, str) and _looks_like_hash(h) and isinstance(meta, dict): if not meta.get("clip_time"): king = h.lower() break for h in rel_meta.keys(): if isinstance(h, str) and _looks_like_hash(h): h_low = h.lower() if not king or h_low != king: alts.append(h_low) # 4) As a last resort, flatten all relationship hashes if not alts and relationships_obj: tmp: set[str] = set() _collect_hashes(relationships_obj, tmp) for h in sorted(tmp): if not king or h != king: alts.append(h) # 5) Include current file when appropriate if self.state.hash and (not king or self.state.hash != king) and self.state.hash not in alts: alts.append(self.state.hash) # 6) Sort alternates by clip start time when available rel_meta_all = result.get("relationship_metadata") if isinstance(result.get("relationship_metadata"), dict) else {} def _clip_start_for(h: str) -> float: meta = rel_meta_all.get(h) if isinstance(rel_meta_all, dict) else None clip = meta.get("clip_time") if isinstance(meta, dict) else None if isinstance(clip, str): m = re.match(r"^(\d+)-(\d+)$", clip) if m: try: return float(m.group(1)) except Exception: return float("inf") return float("inf") if alts: # de-duplicate while preserving earliest clip time ordering seen: set[str] = set() alts = [h for h in sorted(alts, key=_clip_start_for) if (h not in seen and not seen.add(h))] self.state.king = king self.state.alts = alts if isinstance(result.get("relationship_metadata"), dict): self.state.relationship_metadata = result["relationship_metadata"] self.state.title = result.get("title") or self.state.title self.state.clip_time = result.get("clip_time") or self.state.clip_time if result.get("swap_recommended"): self.state.swap_recommended = True warnings = result.get("warnings") or [] if warnings: self.state.warnings.extend(warnings) # ------------------------------------------------------------------ # relationships enrichment (Hydrus endpoint + alt metadata) # ------------------------------------------------------------------ def _enrich_relationships_from_api(self, file_hash: str) -> None: """Fetch relationships for the given hash and enrich state's king/alts and alt metadata. - Uses GET /manage_file_relationships/get_file_relationships?hash=... - If alts exist, batch-fetch their metadata via GET /get_files/file_metadata?hashes=[...] - Extracts title, duration, size, tags (cleaned: title: kept with namespace, others stripped) """ base_url = self.hydrus_settings.base_url or "" access_key = self.hydrus_settings.access_key or "" if not base_url: return try: client = HydrusClient(base_url, access_key, timeout=self.hydrus_settings.timeout) except Exception as exc: # pragma: no cover - construction should rarely fail self.state.warnings.append(f"Hydrus client init failed: {exc}") return try: rel_resp = client.get_file_relationships(file_hash) except HydrusRequestError as hre: # pragma: no cover - surfaced but non-fatal self.state.warnings.append(f"relationships api: {hre}") return except Exception as exc: # pragma: no cover self.state.warnings.append(f"relationships api: {exc}") return rel_map = rel_resp.get("file_relationships") or {} rel_obj = None if isinstance(rel_map, dict): rel_obj = rel_map.get(file_hash) or next((v for v in rel_map.values() if isinstance(v, dict)), None) if isinstance(rel_obj, dict): # Preserve the full relationships object self.state.relationships = rel_obj # Update king and alts from canonical fields king = rel_obj.get("king") alts = rel_obj.get("3") or [] if isinstance(king, str) and _looks_like_hash(king): self.state.king = king.lower() if isinstance(alts, list): self.state.alts = [h.lower() for h in alts if isinstance(h, str) and _looks_like_hash(h)] # Fetch alt metadata if we have alts if not self.state.alts: return try: meta_resp = client.fetch_file_metadata( hashes=self.state.alts, include_service_keys_to_tags=True, include_duration=True, include_size=True, include_file_urls=False, include_mime=False, ) except HydrusRequestError as hre: # pragma: no cover self.state.warnings.append(f"metadata api: {hre}") return except Exception as exc: # pragma: no cover self.state.warnings.append(f"metadata api: {exc}") return if not isinstance(meta_resp, dict): return entries = meta_resp.get("metadata") or [] if not isinstance(entries, list): return def _extract_tags(meta: Dict[str, Any]) -> list[str]: tags: list[str] = [] tag_root = meta.get("tags") or meta.get("service_keys_to_statuses_to_tags") or {} if isinstance(tag_root, dict): for service_dict in tag_root.values(): if not isinstance(service_dict, dict): continue # Prefer storage_tags but fall back to any list values under known keys storage = service_dict.get("storage_tags") if isinstance(storage, dict): for vals in storage.values(): if isinstance(vals, list): tags.extend([str(t) for t in vals if isinstance(t, str)]) else: # fall back: inspect lists directly under service_dict for vals in service_dict.values(): if isinstance(vals, list): tags.extend([str(t) for t in vals if isinstance(t, str)]) return tags def _clean_tags_and_title(all_tags: list[str]) -> tuple[Optional[str], list[str]]: title_val: Optional[str] = None cleaned: list[str] = [] for tag in all_tags: if not isinstance(tag, str): continue if tag.startswith("title:"): if title_val is None: title_val = tag.split(":", 1)[1] cleaned.append(tag) # keep namespaced title else: if ":" in tag: cleaned.append(tag.split(":", 1)[1]) else: cleaned.append(tag) return title_val, cleaned for meta in entries: if not isinstance(meta, dict): continue h = meta.get("hash") if not (isinstance(h, str) and _looks_like_hash(h)): continue tags_all = _extract_tags(meta) title_val, tags_clean = _clean_tags_and_title(tags_all) alt_info = { "title": title_val, "duration": meta.get("duration"), "size": meta.get("size"), "tags": tags_clean, } self.state.relationship_metadata[h.lower()] = alt_info def _merge_tags(self, tags: Sequence[Any]) -> None: incoming = _normalise_string_list(tags) if not incoming: return combined = list(self.state.tags or []) + incoming self.state.tags = unique_preserve_order(combined) for tag in incoming: if tag not in self.state.original_tags: self.state.original_tags[tag] = tag def _merge_known_urls(self, urls: Optional[Iterable[Any]]) -> None: if not urls: return combined = list(self.state.known_urls or []) + _normalise_string_list(urls) self.state.known_urls = unique_preserve_order(combined) def _load_sidecar_tags(self, local_path: str) -> None: try: media_path = Path(local_path) except Exception: return if not media_path.exists(): return candidates = [media_path.with_suffix(".tags"), media_path.with_suffix(".tags.txt")] for candidate in candidates: if candidate.exists(): hash_value, tags, known = self._read_sidecar(candidate) if hash_value and not self.state.hash and _looks_like_hash(hash_value): self.state.hash = hash_value.lower() self._merge_tags(tags) self._merge_known_urls(known) break def _read_sidecar(self, sidecar_path: Path) -> tuple[Optional[str], List[str], List[str]]: try: raw = sidecar_path.read_text(encoding="utf-8", errors="ignore") except OSError: return None, [], [] hash_value: Optional[str] = None tags: List[str] = [] known_urls: List[str] = [] for line in raw.splitlines(): trimmed = line.strip() if not trimmed: continue lowered = trimmed.lower() if lowered.startswith("hash:"): candidate = trimmed.split(":", 1)[1].strip() if ":" in trimmed else "" if candidate: hash_value = candidate elif lowered.startswith("known_url:") or lowered.startswith("url:"): candidate = trimmed.split(":", 1)[1].strip() if ":" in trimmed else "" if candidate: known_urls.append(candidate) else: tags.append(trimmed) return hash_value, tags, known_urls def _compute_local_hash(self, local_path: str) -> None: try: digest = sha256_file(Path(local_path)) except OSError as exc: self.state.warnings.append(f"sha256 failed: {exc}") return self.state.hash = digest.lower() # ------------------------------------------------------------------ # finalisation helpers # ------------------------------------------------------------------ def _finalise(self) -> None: if self.state.tags: self.state.tags = unique_preserve_order(self.state.tags) if self.state.known_urls: self.state.known_urls = unique_preserve_order(self.state.known_urls) # Ensure metadata.type is always present for Lua, but do NOT overwrite MPVfile.type if not self.state.title: if self.state.metadata.get("title"): self.state.title = str(self.state.metadata["title"]).strip() elif self.state.filename: self.state.title = self.state.filename if self.state.hash and not _looks_like_hash(self.state.hash): self.state.hash = None if self.state.relationship_metadata is None: self.state.relationship_metadata = {} if self.state.relationships is not None and not isinstance(self.state.relationships, dict): self.state.relationships = None if self.state.original_tags is None: self.state.original_tags = {} # ------------------------------------------------------------------ # util helpers # ------------------------------------------------------------------ @staticmethod def _normalise_url(value: Any) -> Optional[str]: if value is None: return None text = str(value).strip() if not text: return None return text @staticmethod def _resolve_local_path(path: Optional[str]) -> Optional[str]: if not path: return None parsed = urlparse(path) if parsed.scheme == "file": decoded = MPVFileBuilder._decode_file_url(path) return decoded return path @staticmethod def _decode_file_url(value: str) -> Optional[str]: parsed = urlparse(value) if parsed.scheme != "file": return None netloc = parsed.netloc or "" path = unquote(parsed.path or "") if netloc: path = f"//{netloc}{path}" if os.name == "nt" and path.startswith("/") and re.match(r"/[A-Za-z]:", path): path = path[1:] path = path.replace("/", os.sep) return path def _looks_like_hydrus_url(self, url: str) -> bool: if not url: return False if url.startswith("hydrus://"): return True if "Hydrus-Client-API-Access-Key=" in url: return True base = self.hydrus_settings.base_url if base and url.startswith(base) and "/get_files/" in url: return True return False @staticmethod def _extract_hash_from_path(path: Optional[str]) -> Optional[str]: if not path: return None parsed = urlparse(path) query = parse_qs(parsed.query) if "hash" in query and query["hash"]: candidate = query["hash"][0].strip() if candidate: return candidate.lower() match = re.search(r"hash=([0-9a-fA-F]{64})", path) if match: return match.group(1).lower() return None def build_mpv_file_state(payload: Dict[str, Any], config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: builder = MPVFileBuilder(payload or {}, config or {}) return builder.build()