diff --git a/CLI.py b/CLI.py index 139bc46..6a46cf9 100644 --- a/CLI.py +++ b/CLI.py @@ -2556,8 +2556,11 @@ class PipelineExecutor: # Auto-insert downloader stages for provider tables. try: - current_table = ctx.get_current_stage_table( - ) or ctx.get_last_result_table() + current_table = ctx.get_current_stage_table() + if current_table is None and hasattr(ctx, "get_display_table"): + current_table = ctx.get_display_table() + if current_table is None: + current_table = ctx.get_last_result_table() except Exception: current_table = None table_type = ( @@ -2584,6 +2587,9 @@ class PipelineExecutor: "libgen"}: print("Auto-piping selection to download-file") stages.append(["download-file"]) + elif isinstance(table_type, str) and table_type.startswith("metadata."): + print("Auto-applying metadata selection via get-tag") + stages.append(["get-tag"]) else: first_cmd = stages[0][0] if stages and stages[0] else None if table_type == "soulseek" and first_cmd not in ( @@ -2636,6 +2642,13 @@ class PipelineExecutor: ): print("Auto-inserting download-file after Libgen selection") stages.insert(0, ["download-file"]) + if isinstance(table_type, str) and table_type.startswith("metadata.") and first_cmd not in ( + "get-tag", + "get_tag", + ".pipe", + ): + print("Auto-inserting get-tag after metadata selection") + stages.insert(0, ["get-tag"]) return True, piped_result else: @@ -3691,6 +3704,90 @@ class PipelineExecutor: pass if not stages and piped_result is not None: + # Special-case: selecting metadata rows (e.g., get-tag -scrape) should + # immediately apply tags to the target item instead of just echoing a + # selection table. + try: + items = piped_result if isinstance(piped_result, list) else [piped_result] + applied_any = False + from cmdlet._shared import normalize_hash # type: ignore + from cmdlet.get_tag import _filter_scraped_tags, _emit_tags_as_table # type: ignore + from Store import Store # type: ignore + cfg_loader = ConfigLoader(root=Path.cwd()) + config = cfg_loader.load() + + for item in items: + if not isinstance(item, dict): + continue + provider = item.get("provider") + tags = item.get("tag") + if not provider or not isinstance(tags, list) or not tags: + continue + + file_hash = normalize_hash( + item.get("hash") + or item.get("hash_hex") + or item.get("file_hash") + or item.get("sha256") + ) + store_name = item.get("store") or item.get("storage") + subject_path = ( + item.get("path") + or item.get("target") + or item.get("filename") + ) + + if str(provider).strip().lower() == "ytdlp": + apply_tags = [str(t) for t in tags if t is not None] + else: + apply_tags = _filter_scraped_tags([str(t) for t in tags if t is not None]) + + if not apply_tags: + continue + + if store_name and file_hash: + try: + backend = Store(config)[str(store_name)] + backend.add_tag(file_hash, apply_tags, config=config) + try: + updated_tags, _src = backend.get_tag(file_hash, config=config) + except Exception: + updated_tags = apply_tags + _emit_tags_as_table( + tags_list=list(updated_tags or apply_tags), + file_hash=file_hash, + store=str(store_name), + service_name=None, + config=config, + item_title=str(item.get("title") or provider), + path=str(subject_path) if subject_path else None, + subject=item, + ) + applied_any = True + continue + except Exception: + pass + + # No store/hash: just emit the tags to the pipeline/view. + _emit_tags_as_table( + tags_list=list(apply_tags), + file_hash=file_hash, + store=str(store_name or "local"), + service_name=None, + config=config, + item_title=str(item.get("title") or provider), + path=str(subject_path) if subject_path else None, + subject=item, + ) + applied_any = True + + if applied_any: + # Selection handled; skip default selection echo. + return + except Exception: + # Fall back to default selection rendering on any failure. + pass + table = ResultTable("Selection Result") items = piped_result if isinstance(piped_result, list) else [piped_result] diff --git a/Provider/HIFI.py b/Provider/HIFI.py new file mode 100644 index 0000000..af4578e --- /dev/null +++ b/Provider/HIFI.py @@ -0,0 +1,434 @@ +from __future__ import annotations + +import re +import sys +from typing import Any, Dict, List, Optional, Tuple + +import httpx + +from ProviderCore.base import Provider, SearchResult +from SYS.logger import log + +DEFAULT_API_URLS = ( + "https://tidal-api.binimum.org", +) + +_KEY_TO_PARAM: Dict[str, str] = { + "album": "al", + "artist": "a", + "playlist": "p", + "video": "v", + "song": "s", + "track": "s", + "title": "s", +} + +_DELIMITERS_RE = re.compile(r"[;,]") +_SEGMENT_BOUNDARY_RE = re.compile(r"(?=\b\w+\s*:)") + + +class HIFI(Provider): + """Provider that targets the HiFi-RestAPI (Tidal proxy) search endpoint. + + The CLI can supply a list of fail-over URLs via ``provider.hifi.api_urls`` or + ``provider.hifi.api_url`` in the config. When not configured, it defaults to + https://tidal-api.binimum.org. + """ + + def __init__(self, config: Optional[Dict[str, Any]] = None) -> None: + super().__init__(config) + self.api_urls = self._resolve_api_urls() + + def validate(self) -> bool: + return bool(self.api_urls) + + def search( + self, + query: str, + limit: int = 50, + filters: Optional[Dict[str, Any]] = None, + **_kwargs: Any, + ) -> List[SearchResult]: + if limit <= 0: + return [] + params = self._build_search_params(query) + if not params: + return [] + + payload: Optional[Dict[str, Any]] = None + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/search/" + try: + resp = httpx.get(endpoint, params=params, timeout=10.0) + resp.raise_for_status() + payload = resp.json() + break + except Exception as exc: + log(f"[hifi] Search failed for {endpoint}: {exc}", file=sys.stderr) + continue + + if not payload: + return [] + + data = payload.get("data") or {} + items = data.get("items") or [] + results: List[SearchResult] = [] + for item in items: + if limit and len(results) >= limit: + break + result = self._item_to_result(item) + if result is not None: + results.append(result) + + return results[:limit] + + def _resolve_api_urls(self) -> List[str]: + urls: List[str] = [] + raw = self.config.get("api_urls") + if raw is None: + raw = self.config.get("api_url") + if isinstance(raw, (list, tuple)): + urls.extend(str(item).strip() for item in raw if isinstance(item, str)) + elif isinstance(raw, str): + urls.append(raw.strip()) + cleaned = [u.rstrip("/") for u in urls if isinstance(u, str) and u.strip()] + if not cleaned: + cleaned = [DEFAULT_API_URLS[0]] + return cleaned + + def _build_search_params(self, query: str) -> Dict[str, str]: + cleaned = str(query or "").strip() + if not cleaned: + return {} + + segments: List[str] = [] + for chunk in _DELIMITERS_RE.split(cleaned): + chunk = chunk.strip() + if not chunk: + continue + if ":" in chunk: + for sub in _SEGMENT_BOUNDARY_RE.split(chunk): + part = sub.strip() + if part: + segments.append(part) + else: + segments.append(chunk) + + key_values: Dict[str, str] = {} + free_text: List[str] = [] + for segment in segments: + if ":" not in segment: + free_text.append(segment) + continue + key, value = segment.split(":", 1) + key = key.strip().lower() + value = value.strip().strip('"').strip("'") + if value: + key_values[key] = value + + params: Dict[str, str] = {} + for key, value in key_values.items(): + if not value: + continue + mapped = _KEY_TO_PARAM.get(key) + if mapped: + params[mapped] = value + + general = " ".join(part for part in free_text if part).strip() + if general: + params.setdefault("s", general) + elif not params: + params["s"] = cleaned + return params + + @staticmethod + def _format_duration(seconds: Any) -> str: + try: + total = int(seconds) + if total < 0: + return "" + except Exception: + return "" + minutes, secs = divmod(total, 60) + return f"{minutes}:{secs:02d}" + + @staticmethod + def _stringify(value: Any) -> str: + text = str(value or "").strip() + return text + + @staticmethod + def _extract_artists(item: Dict[str, Any]) -> List[str]: + names: List[str] = [] + artists = item.get("artists") + if isinstance(artists, list): + for artist in artists: + if isinstance(artist, dict): + name = str(artist.get("name") or "").strip() + if name and name not in names: + names.append(name) + if not names: + primary = item.get("artist") + if isinstance(primary, dict): + name = str(primary.get("name") or "").strip() + if name: + names.append(name) + return names + + def _item_to_result(self, item: Dict[str, Any]) -> Optional[SearchResult]: + if not isinstance(item, dict): + return None + + title = str(item.get("title") or "").strip() + if not title: + return None + + identifier = item.get("id") + if identifier is None: + return None + try: + track_id = int(identifier) + except (TypeError, ValueError): + return None + + # Avoid tidal.com URLs entirely; selection will resolve to a decoded MPD. + path = f"hifi://track/{track_id}" + + artists = self._extract_artists(item) + artist_display = ", ".join(artists) + + album = item.get("album") + album_title = "" + if isinstance(album, dict): + album_title = str(album.get("title") or "").strip() + + detail_parts: List[str] = [] + if artist_display: + detail_parts.append(artist_display) + if album_title: + detail_parts.append(album_title) + detail = " | ".join(detail_parts) + + columns: List[tuple[str, str]] = [] + if artist_display: + columns.append(("Artist", artist_display)) + if album_title: + columns.append(("Album", album_title)) + duration_text = self._format_duration(item.get("duration")) + if duration_text: + columns.append(("Duration", duration_text)) + audio_quality = str(item.get("audioQuality") or "").strip() + if audio_quality: + columns.append(("Quality", audio_quality)) + + tags = {"tidal"} + if audio_quality: + tags.add(f"quality:{audio_quality.lower()}") + metadata = item.get("mediaMetadata") + if isinstance(metadata, dict): + tag_values = metadata.get("tags") or [] + for tag in tag_values: + if isinstance(tag, str) and tag.strip(): + tags.add(tag.strip().lower()) + + return SearchResult( + table="hifi", + title=title, + path=path, + detail=detail, + annotations=["tidal"], + media_kind="audio", + tag=tags, + columns=columns, + full_metadata=item, + ) + + def _extract_track_selection_context( + self, selected_items: List[Any] + ) -> List[Tuple[int, str, str]]: + contexts: List[Tuple[int, str, str]] = [] + seen_ids: set[int] = set() + for item in selected_items or []: + payload: Dict[str, Any] = {} + if isinstance(item, dict): + payload = item + else: + try: + payload = ( + item.to_dict() + if hasattr(item, "to_dict") + and callable(getattr(item, "to_dict")) + else {} + ) + except Exception: + payload = {} + if not payload: + try: + payload = { + "title": getattr(item, "title", None), + "path": getattr(item, "path", None), + "url": getattr(item, "url", None), + "full_metadata": getattr(item, "full_metadata", None), + } + except Exception: + payload = {} + + meta = ( + payload.get("full_metadata") + if isinstance(payload.get("full_metadata"), dict) + else payload + ) + if not isinstance(meta, dict): + meta = {} + raw_id = meta.get("trackId") or meta.get("id") or payload.get("id") + if raw_id is None: + continue + try: + track_id = int(raw_id) + except (TypeError, ValueError): + continue + if track_id in seen_ids: + continue + seen_ids.add(track_id) + + title = ( + payload.get("title") + or meta.get("title") + or payload.get("name") + or payload.get("path") + or payload.get("url") + ) + if not title: + title = f"Track {track_id}" + path = ( + payload.get("path") + or payload.get("url") + or f"hifi://track/{track_id}" + ) + contexts.append((track_id, str(title).strip(), str(path).strip())) + return contexts + + def _fetch_track_details(self, track_id: int) -> Optional[Dict[str, Any]]: + if track_id <= 0: + return None + params = {"id": str(track_id)} + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/track/" + try: + resp = httpx.get(endpoint, params=params, timeout=10.0) + resp.raise_for_status() + payload = resp.json() + data = payload.get("data") + if isinstance(data, dict): + return data + except Exception as exc: + log(f"[hifi] Track lookup failed for {endpoint}: {exc}", file=sys.stderr) + continue + return None + + def _build_track_columns(self, detail: Dict[str, Any], track_id: int) -> List[Tuple[str, str]]: + values: List[Tuple[str, str]] = [ + ("Track ID", str(track_id)), + ("Quality", self._stringify(detail.get("audioQuality"))), + ("Mode", self._stringify(detail.get("audioMode"))), + ("Asset", self._stringify(detail.get("assetPresentation"))), + ("Manifest Type", self._stringify(detail.get("manifestMimeType"))), + ("Manifest Hash", self._stringify(detail.get("manifestHash"))), + ("Bit Depth", self._stringify(detail.get("bitDepth"))), + ("Sample Rate", self._stringify(detail.get("sampleRate"))), + ] + return [(name, value) for name, value in values if value] + + def selector( + self, + selected_items: List[Any], + *, + ctx: Any, + stage_is_last: bool = True, + **_kwargs: Any, + ) -> bool: + if not stage_is_last: + return False + + contexts = self._extract_track_selection_context(selected_items) + if not contexts: + return False + + track_details: List[Tuple[int, str, str, Dict[str, Any]]] = [] + for track_id, title, path in contexts: + detail = self._fetch_track_details(track_id) + if detail: + track_details.append((track_id, title, path, detail)) + + if not track_details: + return False + + try: + from SYS.rich_display import stdout_console + from SYS.result_table import ResultTable + except Exception: + return False + + table = ResultTable("HIFI Track").set_preserve_order(True) + table.set_table("hifi.track") + results_payload: List[Dict[str, Any]] = [] + for track_id, title, path, detail in track_details: + # Decode the DASH MPD manifest to a local file and use it as the selectable/playable path. + try: + from cmdlet._shared import resolve_tidal_manifest_path + + manifest_path = resolve_tidal_manifest_path( + {"full_metadata": detail, "path": f"hifi://track/{track_id}"} + ) + except Exception: + manifest_path = None + + resolved_path = str(manifest_path) if manifest_path else f"hifi://track/{track_id}" + + artists = self._extract_artists(detail) + artist_display = ", ".join(artists) if artists else "" + columns = self._build_track_columns(detail, track_id) + if artist_display: + columns.insert(1, ("Artist", artist_display)) + album = detail.get("album") + if isinstance(album, dict): + album_title = self._stringify(album.get("title")) + else: + album_title = self._stringify(detail.get("album")) + if album_title: + insert_pos = 2 if artist_display else 1 + columns.insert(insert_pos, ("Album", album_title)) + + result = SearchResult( + table="hifi.track", + title=title, + path=resolved_path, + detail=f"id:{track_id}", + annotations=["tidal", "track"], + media_kind="audio", + columns=columns, + full_metadata=detail, + ) + table.add_result(result) + try: + results_payload.append(result.to_dict()) + except Exception: + results_payload.append({ + "table": "hifi.track", + "title": result.title, + "path": result.path, + }) + + try: + ctx.set_last_result_table(table, results_payload) + ctx.set_current_stage_table(table) + except Exception: + pass + + try: + stdout_console().print() + stdout_console().print(table) + except Exception: + pass + + return True \ No newline at end of file diff --git a/Provider/metadata_provider.py b/Provider/metadata_provider.py index 9856ebb..2a1b0d3 100644 --- a/Provider/metadata_provider.py +++ b/Provider/metadata_provider.py @@ -8,8 +8,13 @@ import requests import sys import json import subprocess +try: # Optional dependency for IMDb scraping + from imdbinfo.services import search_title # type: ignore +except ImportError: # pragma: no cover - optional + search_title = None # type: ignore[assignment] from SYS.logger import log, debug +from SYS.metadata import imdb_tag try: # Optional dependency import musicbrainzngs # type: ignore @@ -607,6 +612,139 @@ class MusicBrainzMetadataProvider(MetadataProvider): return tags +class ImdbMetadataProvider(MetadataProvider): + """Metadata provider for IMDb titles (movies/series/episodes).""" + + @property + def name(self) -> str: # type: ignore[override] + return "imdb" + + @staticmethod + def _extract_imdb_id(text: str) -> str: + raw = str(text or "").strip() + if not raw: + return "" + + # Exact tt123 pattern + m = re.search(r"(tt\d+)", raw, re.IGNORECASE) + if m: + imdb_id = m.group(1).lower() + return imdb_id if imdb_id.startswith("tt") else f"tt{imdb_id}" + + # Bare numeric IDs (e.g., "0118883") + if raw.isdigit() and len(raw) >= 6: + return f"tt{raw}" + + # Last-resort: extract first digit run + m_digits = re.search(r"(\d{6,})", raw) + if m_digits: + return f"tt{m_digits.group(1)}" + + return "" + + def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: + q = (query or "").strip() + if not q: + return [] + + imdb_id = self._extract_imdb_id(q) + if imdb_id: + try: + data = imdb_tag(imdb_id) + raw_tags = data.get("tag") if isinstance(data, dict) else [] + title = None + year = None + if isinstance(raw_tags, list): + for tag in raw_tags: + if not isinstance(tag, str): + continue + if tag.startswith("title:"): + title = tag.split(":", 1)[1] + elif tag.startswith("year:"): + year = tag.split(":", 1)[1] + return [ + { + "title": title or imdb_id, + "artist": "", + "album": "", + "year": str(year or ""), + "provider": self.name, + "imdb_id": imdb_id, + "raw": data, + } + ] + except Exception as exc: + log(f"IMDb lookup failed: {exc}", file=sys.stderr) + return [] + + if search_title is None: + log("imdbinfo is not installed; skipping IMDb scrape", file=sys.stderr) + return [] + + try: + search_result = search_title(q) + titles = getattr(search_result, "titles", None) or [] + except Exception as exc: + log(f"IMDb search failed: {exc}", file=sys.stderr) + return [] + + items: List[Dict[str, Any]] = [] + for entry in titles[:limit]: + imdb_id = self._extract_imdb_id( + getattr(entry, "imdb_id", None) + or getattr(entry, "imdbId", None) + or getattr(entry, "id", None) + ) + title = getattr(entry, "title", "") or getattr(entry, "title_localized", "") + year = str(getattr(entry, "year", "") or "")[:4] + kind = getattr(entry, "kind", "") or "" + rating = getattr(entry, "rating", None) + items.append( + { + "title": title, + "artist": "", + "album": kind, + "year": year, + "provider": self.name, + "imdb_id": imdb_id, + "kind": kind, + "rating": rating, + "raw": entry, + } + ) + return items + + def to_tags(self, item: Dict[str, Any]) -> List[str]: + imdb_id = self._extract_imdb_id( + item.get("imdb_id") or item.get("id") or item.get("imdb") or "" + ) + try: + if imdb_id: + data = imdb_tag(imdb_id) + raw_tags = data.get("tag") if isinstance(data, dict) else [] + tags = [t for t in raw_tags if isinstance(t, str)] + if tags: + return tags + except Exception as exc: + log(f"IMDb tag extraction failed: {exc}", file=sys.stderr) + + tags = super().to_tags(item) + if imdb_id: + tags.append(f"imdb:{imdb_id}") + seen: set[str] = set() + deduped: List[str] = [] + for t in tags: + s = str(t or "").strip() + if not s: + continue + k = s.lower() + if k in seen: + continue + seen.add(k) + deduped.append(s) + return deduped + + class YtdlpMetadataProvider(MetadataProvider): """Metadata provider that extracts tags from a supported URL using yt-dlp. @@ -764,6 +902,7 @@ _METADATA_PROVIDERS: Dict[str, "google": GoogleBooksMetadataProvider, "isbnsearch": ISBNsearchMetadataProvider, "musicbrainz": MusicBrainzMetadataProvider, + "imdb": ImdbMetadataProvider, "ytdlp": YtdlpMetadataProvider, } diff --git a/ProviderCore/registry.py b/ProviderCore/registry.py index 9747818..1db4dc0 100644 --- a/ProviderCore/registry.py +++ b/ProviderCore/registry.py @@ -26,6 +26,7 @@ from Provider.zeroxzero import ZeroXZero from Provider.loc import LOC from Provider.internetarchive import InternetArchive from Provider.podcastindex import PodcastIndex +from Provider.HIFI import HIFI _PROVIDERS: Dict[str, Type[Provider]] = { @@ -34,6 +35,7 @@ _PROVIDERS: Dict[str, "libgen": Libgen, "openlibrary": OpenLibrary, "internetarchive": InternetArchive, + "hifi": HIFI, "soulseek": Soulseek, "bandcamp": Bandcamp, "youtube": YouTube, diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index 21d11d5..a3d1aab 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -2,9 +2,13 @@ from __future__ import annotations +import base64 +import hashlib import json +import re import shutil import sys +import tempfile from collections.abc import Iterable as IterableABC from SYS.logger import log @@ -53,14 +57,14 @@ class CmdletArg: """Resolve/process the argument value using the handler if available. Args: - value: The raw argument value to process + value: The raw argument value to process Returns: - Processed value from handler, or original value if no handler + Processed value from handler, or original value if no handler Example: - # For STORAGE arg with a handler - storage_path = SharedArgs.STORAGE.resolve('local') # Returns Path.home() / "Videos" + # For STORAGE arg with a handler + storage_path = SharedArgs.STORAGE.resolve('local') # Returns Path.home() / "Videos" """ if self.handler is not None and callable(self.handler): return self.handler(value) @@ -2435,3 +2439,224 @@ def register_url_with_local_library( return True # url already existed except Exception: return False + + +def resolve_tidal_manifest_path(item: Any) -> Optional[str]: + """Persist the Tidal manifest from search results and return a local path.""" + + metadata = None + if isinstance(item, dict): + metadata = item.get("full_metadata") or item.get("metadata") + else: + metadata = getattr(item, "full_metadata", None) or getattr(item, "metadata", None) + + if not isinstance(metadata, dict): + return None + + existing_path = metadata.get("_tidal_manifest_path") + if existing_path: + try: + resolved = Path(str(existing_path)) + if resolved.is_file(): + return str(resolved) + except Exception: + pass + + existing_url = metadata.get("_tidal_manifest_url") + if existing_url and isinstance(existing_url, str): + candidate = existing_url.strip() + if candidate: + return candidate + + raw_manifest = metadata.get("manifest") + if not raw_manifest: + # When piping directly from the HIFI search table, we may only have a track id. + # Fetch track details from the proxy so downstream stages can decode the manifest. + try: + already = bool(metadata.get("_tidal_track_details_fetched")) + except Exception: + already = False + + track_id = metadata.get("trackId") or metadata.get("id") + if track_id is None: + try: + if isinstance(item, dict): + candidate_path = item.get("path") or item.get("url") + else: + candidate_path = getattr(item, "path", None) or getattr(item, "url", None) + except Exception: + candidate_path = None + + if candidate_path: + m = re.search( + r"hifi:(?://)?track[\\/](\d+)", + str(candidate_path), + flags=re.IGNORECASE, + ) + if m: + track_id = m.group(1) + + if (not already) and track_id is not None: + try: + track_int = int(track_id) + except Exception: + track_int = None + + if track_int and track_int > 0: + try: + import httpx + + resp = httpx.get( + "https://tidal-api.binimum.org/track/", + params={"id": str(track_int)}, + timeout=10.0, + ) + resp.raise_for_status() + payload = resp.json() + data = payload.get("data") if isinstance(payload, dict) else None + if isinstance(data, dict) and data: + try: + metadata.update(data) + except Exception: + pass + try: + metadata["_tidal_track_details_fetched"] = True + except Exception: + pass + except Exception: + pass + + raw_manifest = metadata.get("manifest") + if not raw_manifest: + return None + + manifest_str = "".join(str(raw_manifest or "").split()) + if not manifest_str: + return None + + manifest_bytes: bytes + try: + manifest_bytes = base64.b64decode(manifest_str, validate=True) + except Exception: + try: + manifest_bytes = base64.b64decode(manifest_str, validate=False) + except Exception: + try: + manifest_bytes = manifest_str.encode("utf-8") + except Exception: + return None + + if not manifest_bytes: + return None + + head = (manifest_bytes[:1024] or b"").lstrip() + if head.startswith((b"{", b"[")): + try: + text = manifest_bytes.decode("utf-8", errors="ignore") + payload = json.loads(text) + urls = payload.get("urls") or [] + selected_url = None + for candidate in urls: + if isinstance(candidate, str): + candidate = candidate.strip() + if candidate: + selected_url = candidate + break + if selected_url: + try: + metadata["_tidal_manifest_url"] = selected_url + except Exception: + pass + try: + log( + f"[hifi] Resolved JSON manifest for track {metadata.get('trackId') or metadata.get('id')} to {selected_url}", + file=sys.stderr, + ) + except Exception: + pass + return selected_url + try: + metadata["_tidal_manifest_error"] = "JSON manifest contained no urls" + except Exception: + pass + log( + f"[hifi] JSON manifest for track {metadata.get('trackId') or metadata.get('id')} had no playable urls", + file=sys.stderr, + ) + except Exception as exc: + try: + metadata["_tidal_manifest_error"] = ( + f"Failed to parse JSON manifest: {exc}" + ) + except Exception: + pass + log( + f"[hifi] Failed to parse JSON manifest for track {metadata.get('trackId') or metadata.get('id')}: {exc}", + file=sys.stderr, + ) + return None + + looks_like_mpd = ( + head.startswith(b" placeholder, resolve it to a decoded MPD first. + try: + if isinstance(media_path_or_url, Path): + mp_url = str(media_path_or_url) + if mp_url.lower().startswith("hifi:"): + manifest_path = sh.resolve_tidal_manifest_path(item) + if not manifest_path: + try: + meta = getattr(item, "full_metadata", None) + if isinstance(meta, dict) and meta.get("_tidal_manifest_error"): + log(str(meta.get("_tidal_manifest_error")), file=sys.stderr) + except Exception: + pass + log("HIFI selection has no playable DASH MPD manifest.", file=sys.stderr) + failures += 1 + continue + media_path_or_url = Path(manifest_path) + pipe_obj.path = str(media_path_or_url) + elif isinstance(media_path_or_url, str): + if str(media_path_or_url).strip().lower().startswith("hifi:"): + manifest_path = sh.resolve_tidal_manifest_path(item) + if not manifest_path: + try: + meta = getattr(item, "full_metadata", None) + if isinstance(meta, dict) and meta.get("_tidal_manifest_error"): + log(str(meta.get("_tidal_manifest_error")), file=sys.stderr) + except Exception: + pass + log("HIFI selection has no playable DASH MPD manifest.", file=sys.stderr) + failures += 1 + continue + media_path_or_url = Path(manifest_path) + pipe_obj.path = str(media_path_or_url) + except Exception: + pass + + manifest_source: Optional[Union[str, Path]] = None + tidal_metadata = None + try: + if isinstance(item, dict): + tidal_metadata = item.get("full_metadata") or item.get("metadata") + else: + tidal_metadata = ( + getattr(item, "full_metadata", None) + or getattr(item, "metadata", None) + ) + except Exception: + tidal_metadata = None + + if not tidal_metadata and isinstance(pipe_obj.extra, dict): + tidal_metadata = pipe_obj.extra.get("full_metadata") or pipe_obj.extra.get("metadata") + + if isinstance(tidal_metadata, dict): + manifest_source = ( + tidal_metadata.get("_tidal_manifest_path") + or tidal_metadata.get("_tidal_manifest_url") + ) + if not manifest_source: + if isinstance(media_path_or_url, Path): + manifest_source = media_path_or_url + elif isinstance(media_path_or_url, str): + if media_path_or_url.lower().endswith(".mpd"): + manifest_source = media_path_or_url + + if manifest_source: + downloaded, tmp_dir = self._download_manifest_with_ffmpeg(manifest_source) + if downloaded is None: + failures += 1 + continue + media_path_or_url = str(downloaded) + pipe_obj.path = str(downloaded) + pipe_obj.is_temp = True + delete_after_item = True + if tmp_dir is not None: + temp_dir_to_cleanup = tmp_dir + is_url_target = isinstance( media_path_or_url, str @@ -2016,10 +2092,159 @@ class Add_File(Cmdlet): # Call download-media with the URL in args return dl_cmdlet.run(None, dl_args, config) + @staticmethod + def _download_manifest_with_ffmpeg(source: Union[str, Path]) -> Tuple[Optional[Path], Optional[Path]]: + """Run ffmpeg on the manifest or stream URL and return a local file path for ingestion.""" + import subprocess + + ffmpeg_bin = shutil.which("ffmpeg") + if not ffmpeg_bin: + log("ffmpeg not found on PATH; cannot download HIFI manifest.", file=sys.stderr) + return None, None + + tmp_dir = Path(tempfile.mkdtemp(prefix="medeia_hifi_mpd_")) + stream_mp4 = tmp_dir / "stream.mp4" + + input_target: Optional[str] = None + if isinstance(source, Path): + input_target = str(source) + elif isinstance(source, str): + candidate = source.strip() + if candidate.lower().startswith("file://"): + try: + from urllib.parse import unquote, urlparse + + parsed = urlparse(candidate) + raw_path = unquote(parsed.path or "") + raw_path = raw_path.lstrip("/") + candidate = raw_path + except Exception: + pass + input_target = candidate + + if not input_target: + return None, None + + try: + subprocess.run( + [ + ffmpeg_bin, + "-hide_banner", + "-loglevel", + "error", + "-y", + "-protocol_whitelist", + "file,https,tcp,tls,crypto,data", + "-i", + input_target, + "-c", + "copy", + str(stream_mp4), + ], + check=True, + capture_output=True, + text=True, + ) + except subprocess.CalledProcessError as exc: + err = (exc.stderr or "").strip() + if err: + log(f"ffmpeg manifest download failed: {err}", file=sys.stderr) + else: + log(f"ffmpeg manifest download failed (exit {exc.returncode})", file=sys.stderr) + return None, tmp_dir + except Exception as exc: + log(f"ffmpeg manifest download failed: {exc}", file=sys.stderr) + return None, tmp_dir + + codec = None + ffprobe_bin = shutil.which("ffprobe") + if ffprobe_bin: + try: + probe = subprocess.run( + [ + ffprobe_bin, + "-v", + "error", + "-select_streams", + "a:0", + "-show_entries", + "stream=codec_name", + "-of", + "default=nw=1:nk=1", + str(stream_mp4), + ], + capture_output=True, + text=True, + check=True, + ) + codec = (probe.stdout or "").strip().lower() or None + except Exception: + codec = None + + ext = None + if codec == "flac": + ext = "flac" + elif codec == "aac": + ext = "m4a" + elif codec == "mp3": + ext = "mp3" + elif codec == "opus": + ext = "opus" + else: + ext = "mka" + + audio_out = tmp_dir / f"audio.{ext}" + try: + subprocess.run( + [ + ffmpeg_bin, + "-hide_banner", + "-loglevel", + "error", + "-y", + "-i", + str(stream_mp4), + "-vn", + "-c:a", + "copy", + str(audio_out), + ], + check=True, + capture_output=True, + text=True, + ) + if audio_out.exists(): + return audio_out, tmp_dir + except subprocess.CalledProcessError as exc: + err = (exc.stderr or "").strip() + if err: + log(f"ffmpeg audio extract failed: {err}", file=sys.stderr) + except Exception: + pass + + if stream_mp4.exists(): + return stream_mp4, tmp_dir + return None, tmp_dir + @staticmethod def _get_url(result: Any, pipe_obj: models.PipeObject) -> List[str]: from SYS.metadata import normalize_urls + # If this is a HIFI selection, we only support the decoded MPD (never tidal.com URLs). + is_hifi = False + try: + if isinstance(result, dict): + is_hifi = str(result.get("table") or result.get("provider") or "").strip().lower().startswith("hifi") + else: + is_hifi = str(getattr(result, "table", "") or getattr(result, "provider", "")).strip().lower().startswith("hifi") + except Exception: + is_hifi = False + try: + if not is_hifi: + is_hifi = str(getattr(pipe_obj, "path", "") or "").strip().lower().startswith("hifi:") + except Exception: + pass + # Prefer explicit PipeObject.url if present urls: List[str] = [] try: @@ -2043,6 +2268,13 @@ class Add_File(Cmdlet): if not urls: urls = normalize_urls(extract_url_from_result(result)) + # If this is a Tidal/HIFI selection with a decodable manifest, do NOT fall back to + # tidal.com track URLs. The only supported target is the decoded local MPD. + manifest_path = sh.resolve_tidal_manifest_path(result) + if manifest_path: + return [manifest_path] + if is_hifi: + return [] return urls @staticmethod diff --git a/cmdlet/get_note.py b/cmdlet/get_note.py index 9878d9f..9a953e1 100644 --- a/cmdlet/get_note.py +++ b/cmdlet/get_note.py @@ -1,12 +1,13 @@ from __future__ import annotations from pathlib import Path -from typing import Any, Dict, Optional, Sequence +from typing import Any, Dict, List, Optional, Sequence import sys from SYS.logger import log from SYS import pipeline as ctx +from SYS.result_table import ResultTable from . import _shared as sh Cmdlet = sh.Cmdlet @@ -99,6 +100,8 @@ class Get_Note(Cmdlet): store_registry = Store(config) any_notes = False + display_items: List[Dict[str, Any]] = [] + note_table: Optional[ResultTable] = None for res in results: if not isinstance(res, dict): @@ -145,6 +148,13 @@ class Get_Note(Cmdlet): continue any_notes = True + if note_table is None: + note_table = ( + ResultTable("note") + .set_table("note") + .set_value_case("preserve") + .set_preserve_order(True) + ) # Emit each note as its own row so CLI renders a proper note table for k in sorted(notes.keys(), key=lambda x: str(x).lower()): v = notes.get(k) @@ -152,23 +162,27 @@ class Get_Note(Cmdlet): # Keep payload small for IPC/pipes. raw_text = raw_text[:999] preview = " ".join(raw_text.replace("\r", "").split("\n")) - ctx.emit( - { - "store": store_name, - "hash": resolved_hash, - "note_name": str(k), - "note_text": raw_text, - "columns": [ - ("Name", - str(k)), - ("Text", - preview.strip()), - ], - } - ) + payload: Dict[str, Any] = { + "store": store_name, + "hash": resolved_hash, + "note_name": str(k), + "note_text": raw_text, + "columns": [ + ("Name", + str(k)), + ("Text", + preview.strip()), + ], + } + display_items.append(payload) + if note_table is not None: + note_table.add_result(payload) + ctx.emit(payload) if not any_notes: ctx.emit("No notes found.") + elif note_table is not None: + ctx.set_last_result_table(note_table, display_items, subject=result) return 0 diff --git a/cmdlet/get_tag.py b/cmdlet/get_tag.py index c0e524f..a997f60 100644 --- a/cmdlet/get_tag.py +++ b/cmdlet/get_tag.py @@ -1118,7 +1118,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: -query "hash:": Override hash to use instead of result's hash --store : Store result to this key for pipeline --emit: Emit result without interactive prompt (quiet mode) - -scrape : Scrape metadata from URL or provider name (itunes, openlibrary, googlebooks) + -scrape : Scrape metadata from URL or provider name (itunes, openlibrary, googlebooks, imdb) """ args_list = [str(arg) for arg in (args or [])] raw_args = list(args_list) @@ -1367,7 +1367,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: print(json_module.dumps(output, ensure_ascii=False)) return 0 - # Provider scraping (e.g., itunes) + # Provider scraping (e.g., itunes, imdb) provider = get_metadata_provider(scrape_url, config) if provider is None: log(f"Unknown metadata provider: {scrape_url}", file=sys.stderr) @@ -1447,6 +1447,8 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: identifiers.get("isbn_13") or identifiers.get("isbn_10") or identifiers.get("isbn") or identifiers.get("openlibrary") ) + elif provider.name == "imdb": + identifier_query = identifiers.get("imdb") elif provider.name == "itunes": identifier_query = identifiers.get("musicbrainz") or identifiers.get( "musicbrainzalbum" @@ -1557,6 +1559,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: from SYS.result_table import ResultTable table = ResultTable(f"Metadata: {provider.name}") + table.set_table(f"metadata.{provider.name}") table.set_source_command("get-tag", []) selection_payload = [] hash_for_payload = normalize_hash(hash_override) or normalize_hash( @@ -1601,10 +1604,10 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: selection_payload.append(payload) table.set_row_selection_args(idx, [str(idx + 1)]) + # Store an overlay so that a subsequent `@N` selects from THIS metadata table, + # not from the previous searchable table. ctx.set_last_result_table_overlay(table, selection_payload) ctx.set_current_stage_table(table) - # Preserve items for @ selection and downstream pipes without emitting duplicates - ctx.set_last_result_items_only(selection_payload) return 0 # If -scrape was requested but no URL, that's an error @@ -1653,6 +1656,11 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: path=str(subject_path) if subject_path else None, subject=result, ) + _emit_tag_payload( + str(result_provider), + [str(t) for t in result_tags if t is not None], + hash_value=file_hash, + ) return 0 # Apply tags to the store backend (no sidecar writing here). @@ -1716,6 +1724,12 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: }, }, ) + _emit_tag_payload( + str(store_name), + list(updated_tags), + hash_value=file_hash, + extra={"applied_provider": str(result_provider)}, + ) return 0 hash_from_result = normalize_hash(get_field(result, "hash", None)) @@ -1825,7 +1839,14 @@ _SCRAPE_CHOICES = [] try: _SCRAPE_CHOICES = sorted(list_metadata_providers().keys()) except Exception: - _SCRAPE_CHOICES = ["itunes", "openlibrary", "googlebooks", "google", "musicbrainz"] + _SCRAPE_CHOICES = [ + "itunes", + "openlibrary", + "googlebooks", + "google", + "musicbrainz", + "imdb", + ] # Special scrape mode: pull tags from an item's URL via yt-dlp (no download) if "ytdlp" not in _SCRAPE_CHOICES: diff --git a/cmdlet/search_file.py b/cmdlet/search_file.py index 0483639..c84273d 100644 --- a/cmdlet/search_file.py +++ b/cmdlet/search_file.py @@ -62,7 +62,7 @@ class search_file(Cmdlet): "provider", type="string", description= - "External provider name: bandcamp, libgen, soulseek, youtube, alldebrid, loc, internetarchive", + "External provider name: bandcamp, libgen, soulseek, youtube, alldebrid, loc, internetarchive, hifi", ), CmdletArg( "open", diff --git a/cmdnat/pipe.py b/cmdnat/pipe.py index c09236f..3cc3709 100644 --- a/cmdnat/pipe.py +++ b/cmdnat/pipe.py @@ -7,7 +7,7 @@ import re import subprocess from urllib.parse import urlparse, parse_qs from pathlib import Path -from cmdlet._shared import Cmdlet, CmdletArg, parse_cmdlet_args +from cmdlet._shared import Cmdlet, CmdletArg, parse_cmdlet_args, resolve_tidal_manifest_path from SYS.logger import debug, get_thread_stream, is_debug_enabled, set_debug, set_thread_stream from SYS.result_table import ResultTable from MPV.mpv_ipc import MPV @@ -723,6 +723,28 @@ def _get_playable_path( "none"}: path = None + manifest_path = resolve_tidal_manifest_path(item) + if manifest_path: + path = manifest_path + else: + # If this is a hifi:// placeholder and we couldn't resolve a manifest, do not fall back. + try: + if isinstance(path, str) and path.strip().lower().startswith("hifi:"): + try: + meta = None + if isinstance(item, dict): + meta = item.get("full_metadata") or item.get("metadata") + else: + meta = getattr(item, "full_metadata", None) or getattr(item, "metadata", None) + if isinstance(meta, dict) and meta.get("_tidal_manifest_error"): + print(str(meta.get("_tidal_manifest_error")), file=sys.stderr) + except Exception: + pass + print("HIFI selection has no playable DASH MPD manifest.", file=sys.stderr) + return None + except Exception: + pass + if title is not None and not isinstance(title, str): title = str(title) @@ -885,6 +907,25 @@ def _queue_items( target, title = result + # MPD/DASH playback requires ffmpeg protocol whitelist (file + https + crypto etc). + # Set it via IPC before loadfile so the currently running MPV can play the manifest. + try: + target_str = str(target or "") + if re.search(r"\.mpd($|\?)", target_str.lower()): + _send_ipc_command( + { + "command": [ + "set_property", + "options/demuxer-lavf-o", + "protocol_whitelist=file,https,tcp,tls,crypto,data", + ], + "request_id": 198, + }, + silent=True, + ) + except Exception: + pass + # If the target is an AllDebrid protected file URL, unlock it to a direct link for MPV. try: if isinstance(target, str): @@ -1894,6 +1935,27 @@ def _start_mpv( "--ytdl-format=bestvideo[height<=?1080]+bestaudio/best[height<=?1080]", ] + # If we are going to play a DASH MPD, allow ffmpeg to fetch https segments referenced by the manifest. + try: + needs_mpd_whitelist = False + for it in items or []: + mpd = resolve_tidal_manifest_path(it) + candidate = mpd + if not candidate: + if isinstance(it, dict): + candidate = it.get("path") or it.get("url") + else: + candidate = getattr(it, "path", None) or getattr(it, "url", None) + if candidate and re.search(r"\.mpd($|\?)", str(candidate).lower()): + needs_mpd_whitelist = True + break + if needs_mpd_whitelist: + extra_args.append( + "--demuxer-lavf-o=protocol_whitelist=file,https,tcp,tls,crypto,data" + ) + except Exception: + pass + # Optional: borderless window (useful for uosc-like overlay UI without fullscreen). if start_opts and start_opts.get("borderless"): extra_args.append("--border=no")