From 275f18cb31c27528b6c4a60d9192b73b3b75066a Mon Sep 17 00:00:00 2001 From: Nose Date: Sun, 11 Jan 2026 14:46:41 -0800 Subject: [PATCH] f --- API/Tidal.py | 284 +++++ API/base.py | 50 + API/hifi.py | 94 -- API/loc.py | 23 +- API/podcastindex.py | 22 +- CLI.py | 22 +- Provider/HIFI.py | 166 +-- Provider/Tidal.py | 2119 +++++++++++++++++++++++++++++++++ Provider/metadata_provider.py | 14 +- Provider/soulseek.py | 2 +- Provider/tidal_shared.py | 109 -- ProviderCore/base.py | 2 +- ProviderCore/registry.py | 20 +- cmdlet/_shared.py | 14 +- cmdlet/add_file.py | 8 +- cmdlet/download_file.py | 28 +- cmdlet/search_file.py | 18 +- cmdnat/pipe.py | 6 +- search_results.txt | 134 +++ 19 files changed, 2741 insertions(+), 394 deletions(-) create mode 100644 API/Tidal.py create mode 100644 API/base.py delete mode 100644 API/hifi.py create mode 100644 Provider/Tidal.py delete mode 100644 Provider/tidal_shared.py create mode 100644 search_results.txt diff --git a/API/Tidal.py b/API/Tidal.py new file mode 100644 index 0000000..aba691c --- /dev/null +++ b/API/Tidal.py @@ -0,0 +1,284 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Set + +from .base import API, ApiError + +DEFAULT_BASE_URL = "https://tidal-api.binimum.org" + + +def stringify(value: Any) -> str: + """Helper to ensure we have a stripped string or empty.""" + return str(value or "").strip() + + +def extract_artists(item: Dict[str, Any]) -> List[str]: + """Extract list of artist names from a Tidal-style metadata dict.""" + names: List[str] = [] + artists = item.get("artists") + if isinstance(artists, list): + for artist in artists: + if isinstance(artist, dict): + name = stringify(artist.get("name")) + if name and name not in names: + names.append(name) + if not names: + primary = item.get("artist") + if isinstance(primary, dict): + name = stringify(primary.get("name")) + if name: + names.append(name) + return names + + +def build_track_tags(metadata: Dict[str, Any]) -> Set[str]: + """Create a set of searchable tags from track metadata.""" + tags: Set[str] = {"tidal"} + + audio_quality = stringify(metadata.get("audioQuality")) + if audio_quality: + tags.add(f"quality:{audio_quality.lower()}") + + media_md = metadata.get("mediaMetadata") + if isinstance(media_md, dict): + tag_values = media_md.get("tags") or [] + for tag in tag_values: + if isinstance(tag, str): + candidate = tag.strip() + if candidate: + tags.add(candidate.lower()) + + title_text = stringify(metadata.get("title")) + if title_text: + tags.add(f"title:{title_text}") + + artists = extract_artists(metadata) + for artist in artists: + artist_clean = stringify(artist) + if artist_clean: + tags.add(f"artist:{artist_clean}") + + album_title = "" + album_obj = metadata.get("album") + if isinstance(album_obj, dict): + album_title = stringify(album_obj.get("title")) + else: + album_title = stringify(metadata.get("album")) + if album_title: + tags.add(f"album:{album_title}") + + track_no_val = metadata.get("trackNumber") or metadata.get("track_number") + if track_no_val is not None: + try: + track_int = int(track_no_val) + if track_int > 0: + tags.add(f"track:{track_int}") + except Exception: + track_text = stringify(track_no_val) + if track_text: + tags.add(f"track:{track_text}") + + return tags + + +def parse_track_item(item: Dict[str, Any]) -> Dict[str, Any]: + """Parse raw Tidal track data into a clean, flat dictionary. + + Extracts core fields: id, title, duration, Track:, url, artist name, and album title. + """ + if not isinstance(item, dict): + return {} + + # Handle the "data" wrapper if present + data = item.get("data") if isinstance(item.get("data"), dict) else item + + artist_name = "" + artist_obj = data.get("artist") + if isinstance(artist_obj, dict): + artist_name = stringify(artist_obj.get("name")) + if not artist_name: + artists = extract_artists(data) + if artists: + artist_name = artists[0] + + album_title = "" + album_obj = data.get("album") + if isinstance(album_obj, dict): + album_title = stringify(album_obj.get("title")) + if not album_title and isinstance(data.get("album"), str): + album_title = stringify(data.get("album")) + + return { + "id": data.get("id"), + "title": stringify(data.get("title")), + "duration": data.get("duration"), + "Track:": data.get("trackNumber"), + "url": stringify(data.get("url")), + "artist": artist_name, + "album": album_title, + } + + +def coerce_duration_seconds(value: Any) -> Optional[int]: + """Attempt to extracts seconds from various Tidal duration formats.""" + candidates = [value] + try: + if isinstance(value, dict): + for key in ( + "duration", + "durationSeconds", + "duration_sec", + "duration_ms", + "durationMillis", + ): + if key in value: + candidates.append(value.get(key)) + except Exception: + pass + + for cand in candidates: + try: + if cand is None: + continue + text = str(cand).strip() + if text.lower().endswith("ms"): + text = text[:-2].strip() + num = float(text) + if num <= 0: + continue + if num > 10_000: + # Suspect milliseconds + num = num / 1000.0 + return int(round(num)) + except Exception: + continue + return None + + +class TidalApiError(ApiError): + """Raised when the Tidal API returns an error or malformed response.""" + + +class Tidal(API): + """Client for the Tidal (Tidal) API endpoints. + + This client communicates with the configured Tidal backend to retrieve + track metadata, manifests, search results, and lyrics. + """ + + def __init__(self, base_url: str = DEFAULT_BASE_URL, *, timeout: float = 10.0) -> None: + super().__init__(base_url, timeout) + + def search(self, params: Dict[str, str]) -> Dict[str, Any]: + usable = {k: v for k, v in (params or {}).items() if v} + search_keys = [key for key in ("s", "a", "v", "p") if usable.get(key)] + if not search_keys: + raise TidalApiError("One of s/a/v/p is required for /search/") + if len(search_keys) > 1: + first = search_keys[0] + usable = {first: usable[first]} + return self._get_json("search/", params=usable) + + def track(self, track_id: int, *, quality: Optional[str] = None) -> Dict[str, Any]: + try: + track_int = int(track_id) + except Exception as exc: + raise TidalApiError(f"track_id must be int-compatible: {exc}") from exc + if track_int <= 0: + raise TidalApiError("track_id must be positive") + + p: Dict[str, Any] = {"id": track_int} + if quality: + p["quality"] = str(quality) + return self._get_json("track/", params=p) + + def info(self, track_id: int) -> Dict[str, Any]: + """Fetch and parse core track metadata (id, title, artist, album, duration, etc).""" + try: + track_int = int(track_id) + except Exception as exc: + raise TidalApiError(f"track_id must be int-compatible: {exc}") from exc + if track_int <= 0: + raise TidalApiError("track_id must be positive") + + raw = self._get_json("info/", params={"id": track_int}) + return parse_track_item(raw) + + def album(self, album_id: int) -> Dict[str, Any]: + """Fetch album details, including track list when provided by the backend.""" + try: + album_int = int(album_id) + except Exception as exc: + raise TidalApiError(f"album_id must be int-compatible: {exc}") from exc + if album_int <= 0: + raise TidalApiError("album_id must be positive") + + return self._get_json("album/", params={"id": album_int}) + + def lyrics(self, track_id: int) -> Dict[str, Any]: + """Fetch lyrics (including subtitles/LRC) for a track.""" + try: + track_int = int(track_id) + except Exception as exc: + raise TidalApiError(f"track_id must be int-compatible: {exc}") from exc + if track_int <= 0: + raise TidalApiError("track_id must be positive") + + return self._get_json("lyrics/", params={"id": track_int}) + + def get_full_track_metadata(self, track_id: int) -> Dict[str, Any]: + """ + Orchestrate fetching all details for a track: + 1. Base info (/info/) + 2. Playback/Quality info (/track/) + 3. Lyrics (/lyrics/) + 4. Derived tags + """ + try: + track_int = int(track_id) + except Exception as exc: + raise TidalApiError(f"track_id must be int-compatible: {exc}") from exc + + # 1. Fetch info (metadata) - fetch raw to ensure all fields are available for merging + info_resp = self._get_json("info/", params={"id": track_int}) + info_data = info_resp.get("data") if isinstance(info_resp, dict) else info_resp + if not isinstance(info_data, dict) or "id" not in info_data: + info_data = info_resp if isinstance(info_resp, dict) and "id" in info_resp else {} + + # 2. Fetch track (manifest/bit depth) + track_resp = self.track(track_id) + # Note: track() method in this class currently returns raw JSON, so we handle it similarly. + track_data = track_resp.get("data") if isinstance(track_resp, dict) else track_resp + if not isinstance(track_data, dict) or "id" not in track_data: + track_data = track_resp if isinstance(track_resp, dict) and "id" in track_resp else {} + + # 3. Fetch lyrics + lyrics_data = {} + try: + lyr_resp = self.lyrics(track_id) + lyrics_data = lyr_resp.get("lyrics") or lyr_resp if isinstance(lyr_resp, dict) else {} + except Exception: + pass + + # Merged data for tags and parsing + merged_md = {} + if isinstance(info_data, dict): + merged_md.update(info_data) + if isinstance(track_data, dict): + merged_md.update(track_data) + + # Derived tags and normalized/parsed info + tags = build_track_tags(merged_md) + parsed_info = parse_track_item(merged_md) + + # Structure for return + return { + "metadata": merged_md, + "parsed": parsed_info, + "tags": list(tags), + "lyrics": lyrics_data, + } + + +# Legacy alias for TidalApiClient +TidalApiClient = Tidal diff --git a/API/base.py b/API/base.py new file mode 100644 index 0000000..0567c25 --- /dev/null +++ b/API/base.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import json +from typing import Any, Dict, Optional + +from .HTTP import HTTPClient + + +class ApiError(Exception): + """Base exception for API errors.""" + pass + + +class API: + """Base class for API clients using the internal HTTPClient.""" + + def __init__(self, base_url: str, timeout: float = 10.0) -> None: + self.base_url = str(base_url or "").rstrip("/") + self.timeout = float(timeout) + + def _get_json( + self, + path: str, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> Dict[str, Any]: + url = f"{self.base_url}/{str(path or '').lstrip('/')}" + try: + with HTTPClient(timeout=self.timeout, headers=headers) as client: + response = client.get(url, params=params, allow_redirects=True) + response.raise_for_status() + return response.json() + except Exception as exc: + raise ApiError(f"API request failed for {url}: {exc}") from exc + + def _post_json( + self, + path: str, + json_data: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> Dict[str, Any]: + url = f"{self.base_url}/{str(path or '').lstrip('/')}" + try: + with HTTPClient(timeout=self.timeout, headers=headers) as client: + response = client.post(url, json=json_data, params=params, allow_redirects=True) + response.raise_for_status() + return response.json() + except Exception as exc: + raise ApiError(f"API request failed for {url}: {exc}") from exc diff --git a/API/hifi.py b/API/hifi.py deleted file mode 100644 index 10e8956..0000000 --- a/API/hifi.py +++ /dev/null @@ -1,94 +0,0 @@ -from __future__ import annotations - -from typing import Any, Dict, Optional - -from .HTTP import HTTPClient - -DEFAULT_BASE_URL = "https://tidal-api.binimum.org" - - -class HifiApiError(Exception): - """Raised when the HiFi API returns an error or malformed response.""" - - -class HifiApiClient: - """Lightweight client for the hifi-api endpoints. - - Supported endpoints: - - GET /search/ with exactly one of s, a, v, p - - GET /track/ with id (and optional quality) - - GET /info/ with id - - GET /album/ with id - - GET /lyrics/ with id - """ - - def __init__(self, base_url: str = DEFAULT_BASE_URL, *, timeout: float = 10.0) -> None: - self.base_url = str(base_url or DEFAULT_BASE_URL).rstrip("/") - self.timeout = float(timeout) - - def _get_json(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: - url = f"{self.base_url}/{str(path or '').lstrip('/')}" - with HTTPClient(timeout=self.timeout) as client: - response = client.get(url, params=params, allow_redirects=True) - response.raise_for_status() - try: - return response.json() - except Exception as exc: # pragma: no cover - defensive - raise HifiApiError(f"Invalid JSON response from {url}: {exc}") from exc - - def search(self, params: Dict[str, str]) -> Dict[str, Any]: - usable = {k: v for k, v in (params or {}).items() if v} - search_keys = [key for key in ("s", "a", "v", "p") if usable.get(key)] - if not search_keys: - raise HifiApiError("One of s/a/v/p is required for /search/") - if len(search_keys) > 1: - first = search_keys[0] - usable = {first: usable[first]} - return self._get_json("search/", params=usable) - - def track(self, track_id: int, *, quality: Optional[str] = None) -> Dict[str, Any]: - try: - track_int = int(track_id) - except Exception as exc: - raise HifiApiError(f"track_id must be int-compatible: {exc}") from exc - if track_int <= 0: - raise HifiApiError("track_id must be positive") - - params: Dict[str, Any] = {"id": track_int} - if quality: - params["quality"] = str(quality) - return self._get_json("track/", params=params) - - def info(self, track_id: int) -> Dict[str, Any]: - try: - track_int = int(track_id) - except Exception as exc: - raise HifiApiError(f"track_id must be int-compatible: {exc}") from exc - if track_int <= 0: - raise HifiApiError("track_id must be positive") - - return self._get_json("info/", params={"id": track_int}) - - def album(self, album_id: int) -> Dict[str, Any]: - """Fetch album details, including track list when provided by the backend.""" - - try: - album_int = int(album_id) - except Exception as exc: - raise HifiApiError(f"album_id must be int-compatible: {exc}") from exc - if album_int <= 0: - raise HifiApiError("album_id must be positive") - - return self._get_json("album/", params={"id": album_int}) - - def lyrics(self, track_id: int) -> Dict[str, Any]: - """Fetch lyrics (including subtitles/LRC) for a track.""" - - try: - track_int = int(track_id) - except Exception as exc: - raise HifiApiError(f"track_id must be int-compatible: {exc}") from exc - if track_int <= 0: - raise HifiApiError("track_id must be positive") - - return self._get_json("lyrics/", params={"id": track_int}) diff --git a/API/loc.py b/API/loc.py index 833e21c..bd6effe 100644 --- a/API/loc.py +++ b/API/loc.py @@ -15,31 +15,18 @@ from __future__ import annotations import json from typing import Any, Dict, Optional -from API.HTTP import HTTPClient +from .base import API, ApiError -class LOCError(Exception): +class LOCError(ApiError): pass -class LOCClient: +class LOCClient(API): """Minimal client for the public LoC JSON API.""" - BASE_URL = "https://www.loc.gov" - - def __init__(self, *, timeout: float = 20.0): - self.timeout = float(timeout) - - def _get_json(self, path: str, params: Dict[str, Any]) -> Dict[str, Any]: - url = self.BASE_URL.rstrip("/") + "/" + str(path or "").lstrip("/") - try: - with HTTPClient(timeout=self.timeout) as client: - resp = client.get(url, params=params) - resp.raise_for_status() - # httpx.Response.json() exists but keep decoding consistent - return json.loads(resp.content.decode("utf-8")) - except Exception as exc: - raise LOCError(str(exc)) from exc + def __init__(self, *, base_url: str = "https://www.loc.gov", timeout: float = 20.0): + super().__init__(base_url=base_url, timeout=timeout) def search_chronicling_america( self, diff --git a/API/podcastindex.py b/API/podcastindex.py index 96fc47a..7a642a5 100644 --- a/API/podcastindex.py +++ b/API/podcastindex.py @@ -16,10 +16,10 @@ import json import time from typing import Any, Dict, List, Optional -from .HTTP import HTTPClient +from .base import API, ApiError -class PodcastIndexError(Exception): +class PodcastIndexError(ApiError): pass @@ -55,41 +55,31 @@ def build_auth_headers( } -class PodcastIndexClient: - BASE_URL = "https://api.podcastindex.org/api/1.0" - +class PodcastIndexClient(API): def __init__( self, api_key: str, api_secret: str, *, + base_url: str = "https://api.podcastindex.org/api/1.0", user_agent: str = "downlow/1.0", timeout: float = 30.0, ): + super().__init__(base_url=base_url, timeout=timeout) self.api_key = str(api_key or "").strip() self.api_secret = str(api_secret or "").strip() self.user_agent = str(user_agent or "downlow/1.0") - self.timeout = float(timeout) if not self.api_key or not self.api_secret: raise PodcastIndexError("PodcastIndex api key/secret are required") def _get(self, path: str, *, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: - url = self.BASE_URL.rstrip("/") + "/" + str(path or "").lstrip("/") headers = build_auth_headers( self.api_key, self.api_secret, user_agent=self.user_agent, ) - - with HTTPClient(timeout=self.timeout, headers=headers) as client: - response = client.get(url, params=params) - response.raise_for_status() - - try: - return json.loads(response.content.decode("utf-8")) - except Exception as exc: - raise PodcastIndexError(f"Invalid JSON response: {exc}") + return self._get_json(path, params=params, headers=headers) def search_byterm(self, query: str, *, max_results: int = 10) -> List[Dict[str, Any]]: q = str(query or "").strip() diff --git a/CLI.py b/CLI.py index d160359..f3e6154 100644 --- a/CLI.py +++ b/CLI.py @@ -2234,7 +2234,7 @@ class PipelineExecutor: # Prefer an explicit provider hint from table metadata when available. # This keeps @N selectors working even when row payloads don't carry a - # provider key (or when they carry a table-type like hifi.album). + # provider key (or when they carry a table-type like tidal.album). try: meta = ( current_table.get_table_metadata() @@ -2264,7 +2264,7 @@ class PipelineExecutor: get_provider = None # type: ignore is_known_provider_name = None # type: ignore - # If we have a table-type like "hifi.album", also try its provider prefix ("hifi") + # If we have a table-type like "tidal.album", also try its provider prefix ("tidal") # when that prefix is a registered provider name. if is_known_provider_name is not None: try: @@ -2498,7 +2498,7 @@ class PipelineExecutor: # Selection should operate on the *currently displayed* selectable table. # Some navigation flows (e.g. @.. back) can show a display table without # updating current_stage_table. Provider selectors rely on current_stage_table - # to detect table type (e.g. hifi.album -> tracks), so sync it here. + # to detect table type (e.g. tidal.album -> tracks), so sync it here. display_table = None try: display_table = ( @@ -2722,7 +2722,7 @@ class PipelineExecutor: return False, None # Provider selection expansion (non-terminal): allow certain provider tables - # (e.g. hifi.album) to expand to multiple downstream items when the user + # (e.g. tidal.album) to expand to multiple downstream items when the user # pipes into another stage (e.g. @N | .mpv or @N | add-file). table_type_hint = None try: @@ -2734,11 +2734,11 @@ class PipelineExecutor: except Exception: table_type_hint = None - if stages and isinstance(table_type_hint, str) and table_type_hint.strip().lower() == "hifi.album": + if stages and isinstance(table_type_hint, str) and table_type_hint.strip().lower() == "tidal.album": try: from ProviderCore.registry import get_provider - prov = get_provider("hifi", config) + prov = get_provider("tidal", config) except Exception: prov = None @@ -2780,7 +2780,7 @@ class PipelineExecutor: if track_items: filtered = track_items - table_type_hint = "hifi.track" + table_type_hint = "tidal.track" if PipelineExecutor._maybe_run_class_selector( ctx, @@ -2891,7 +2891,7 @@ class PipelineExecutor: # (e.g., @1 | add-file ...), we want to attach the row selection # args *to the auto-inserted stage* so the download command receives # the selected row information immediately. - stages.append(list(auto_stage) + (source_args or [])) + stages.append(list(auto_stage)) debug(f"Inserted auto stage before row action: {stages[-1]}") # If the caller included a selection (e.g., @1) try to attach @@ -2940,7 +2940,9 @@ class PipelineExecutor: if first_cmd_norm not in (auto_cmd_norm, ".pipe", ".mpv"): debug(f"Auto-inserting {auto_cmd_norm} after selection") # Insert the auto stage before the user-specified stage - stages.insert(0, list(auto_stage) + (source_args or [])) + # Note: Do NOT append source_args here - they are search tokens from + # the previous stage and should not be passed to the downloader. + stages.insert(0, list(auto_stage)) debug(f"Inserted auto stage before existing pipeline: {stages[0]}") # If a selection is present, attach the row selection args to the @@ -3278,7 +3280,7 @@ class PipelineExecutor: stage_table = ctx.get_current_stage_table() # Selection should operate on the table the user sees. # If a display overlay table exists, force it as the current-stage table - # so provider selectors (e.g. hifi.album -> tracks) behave consistently. + # so provider selectors (e.g. tidal.album -> tracks) behave consistently. try: if display_table is not None and hasattr(ctx, "set_current_stage_table"): ctx.set_current_stage_table(display_table) diff --git a/Provider/HIFI.py b/Provider/HIFI.py index 5ea5b92..23ebe79 100644 --- a/Provider/HIFI.py +++ b/Provider/HIFI.py @@ -12,19 +12,18 @@ from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple from urllib.parse import urlparse -from API.hifi import HifiApiClient -from ProviderCore.base import Provider, SearchResult, parse_inline_query_arguments -from Provider.tidal_shared import ( +from API.Tidal import ( + HifiApiClient, build_track_tags, coerce_duration_seconds, extract_artists, stringify, ) +from ProviderCore.base import Provider, SearchResult, parse_inline_query_arguments from SYS import pipeline as pipeline_context from SYS.logger import debug, log URL_API = ( - "https://tidal-api.binimum.org", "https://triton.squid.wtf", "https://wolf.qqdl.site", "https://maus.qqdl.site", @@ -33,6 +32,7 @@ URL_API = ( "https://hund.qqdl.site", "https://tidal.kinoplus.online", "https://tidal-api.binimum.org", + "https://tidal-api.binimum.org", ) @@ -64,7 +64,7 @@ def _format_total_seconds(seconds: Any) -> str: return f"{mins}:{secs:02d}" -class HIFI(Provider): +class Tidal(Provider): TABLE_AUTO_STAGES = { "hifi.track": ["download-file"], @@ -237,6 +237,16 @@ class HIFI(Provider): except Exception: return "", None + scheme = str(parsed.scheme or "").lower().strip() + if scheme == "hifi": + # Handle hifi://view/id + view = str(parsed.netloc or "").lower().strip() + path_parts = [p for p in (parsed.path or "").split("/") if p] + identifier = None + if path_parts: + identifier = self._parse_int(path_parts[0]) + return view, identifier + parts = [segment for segment in (parsed.path or "").split("/") if segment] if not parts: return "", None @@ -248,7 +258,7 @@ class HIFI(Provider): return "", None view = parts[idx].lower() - if view not in {"album", "track"}: + if view not in {"album", "track", "artist"}: return "", None for segment in parts[idx + 1:]: @@ -279,6 +289,7 @@ class HIFI(Provider): annotations=["tidal", "track"], media_kind="audio", full_metadata=dict(detail) if isinstance(detail, dict) else {}, + selection_args=["-url", f"hifi://track/{track_id}"], ) def _extract_artist_selection_context(self, selected_items: List[Any]) -> List[Tuple[int, str]]: @@ -802,6 +813,11 @@ class HIFI(Provider): full_metadata=md, ) + @staticmethod + def url_patterns() -> List[str]: + """Return URL prefixes handled by this provider.""" + return ["hifi://", "tidal.com"] + @staticmethod def _find_ffmpeg() -> Optional[str]: exe = shutil.which("ffmpeg") @@ -1113,34 +1129,28 @@ class HIFI(Provider): if isinstance(getattr(result, "full_metadata", None), dict): md = dict(getattr(result, "full_metadata") or {}) - if not md.get("manifest"): - track_id = self._extract_track_id_from_result(result) - if track_id: - detail = self._fetch_track_details(track_id) - if isinstance(detail, dict) and detail: - try: - md.update(detail) - except Exception: - md = detail + track_id = self._extract_track_id_from_result(result) + if track_id: + # Multi-part enrichment from API: metadata, tags, and lyrics. + full_data = self._fetch_all_track_data(track_id) + if isinstance(full_data, dict): + # 1. Update metadata + api_md = full_data.get("metadata") + if isinstance(api_md, dict): + md.update(api_md) + + # 2. Update tags (re-sync result.tag so cmdlet sees them) + api_tags = full_data.get("tags") + if isinstance(api_tags, list) and api_tags: + result.tag = set(api_tags) - # Best-effort: fetch synced lyric subtitles for MPV (LRC). - try: - track_id_for_lyrics = self._extract_track_id_from_result(result) - except Exception: - track_id_for_lyrics = None - if track_id_for_lyrics and not md.get("_tidal_lyrics_subtitles"): - lyr = self._fetch_track_lyrics(track_id_for_lyrics) - if isinstance(lyr, dict) and lyr: - try: - md.setdefault("lyrics", lyr) - except Exception: - pass - try: - subtitles = lyr.get("subtitles") + # 3. Handle lyrics + lyrics = full_data.get("lyrics") + if isinstance(lyrics, dict) and lyrics: + md.setdefault("lyrics", lyrics) + subtitles = lyrics.get("subtitles") if isinstance(subtitles, str) and subtitles.strip(): md["_tidal_lyrics_subtitles"] = subtitles.strip() - except Exception: - pass # Ensure downstream cmdlets see our enriched metadata. try: @@ -1665,6 +1675,7 @@ class HIFI(Provider): tag=tags, columns=columns, full_metadata=full_md, + selection_args=["-url", path], ) if url_value: try: @@ -1739,66 +1750,34 @@ class HIFI(Provider): return contexts def _fetch_track_details(self, track_id: int) -> Optional[Dict[str, Any]]: - if track_id <= 0: - return None - - info_data = self._fetch_track_info(track_id) - - for base in self.api_urls: - endpoint = f"{base.rstrip('/')}/track/" - try: - client = self._get_api_client_for_base(base) - payload = client.track(track_id) if client else None - data = payload.get("data") if isinstance(payload, dict) else None - if isinstance(data, dict): - merged: Dict[str, Any] = {} - if isinstance(info_data, dict): - merged.update(info_data) - merged.update(data) - return merged - except Exception as exc: - log(f"[hifi] Track lookup failed for {endpoint}: {exc}", file=sys.stderr) - continue - return None + """Legacy wrapper returning just metadata from the consolidated API call.""" + res = self._fetch_all_track_data(track_id) + return res.get("metadata") if res else None def _fetch_track_info(self, track_id: int) -> Optional[Dict[str, Any]]: + """Legacy wrapper; now part of _fetch_all_track_data.""" + return self._fetch_track_details(track_id) + + def _fetch_all_track_data(self, track_id: int) -> Optional[Dict[str, Any]]: + """Fetch full track details including metadata, tags, and lyrics from the API.""" if track_id <= 0: return None for base in self.api_urls: - endpoint = f"{base.rstrip('/')}/info/" try: client = self._get_api_client_for_base(base) - payload = client.info(track_id) if client else None - data = payload.get("data") if isinstance(payload, dict) else None - if isinstance(data, dict): - return data + if not client: + continue + # This method in the API client handles merging info+track and building tags. + return client.get_full_track_metadata(track_id) except Exception as exc: - debug(f"[hifi] Info lookup failed for {endpoint}: {exc}") + debug(f"[hifi] Full track fetch failed for {base}: {exc}") continue return None def _fetch_track_lyrics(self, track_id: int) -> Optional[Dict[str, Any]]: - if track_id <= 0: - return None - for base in self.api_urls: - endpoint = f"{base.rstrip('/')}/lyrics/" - try: - client = self._get_api_client_for_base(base) - payload = client.lyrics(track_id) if client else None - if not isinstance(payload, dict): - continue - - lyrics_obj = payload.get("lyrics") - if isinstance(lyrics_obj, dict) and lyrics_obj: - return lyrics_obj - - data_obj = payload.get("data") - if isinstance(data_obj, dict) and data_obj: - return data_obj - except Exception as exc: - debug(f"[hifi] Lyrics lookup failed for {endpoint}: {exc}") - continue - return None + """Legacy wrapper returning just lyrics from the consolidated API call.""" + res = self._fetch_all_track_data(track_id) + return res.get("lyrics") if res else None def _build_track_columns(self, detail: Dict[str, Any], track_id: int) -> List[Tuple[str, str]]: values: List[Tuple[str, str]] = [ @@ -1816,8 +1795,8 @@ class HIFI(Provider): def _build_track_tags(self, metadata: Dict[str, Any]) -> set[str]: return build_track_tags(metadata) + @staticmethod def selection_auto_stage( - self, table_type: str, stage_args: Optional[Sequence[str]] = None, ) -> Optional[List[str]]: @@ -2004,17 +1983,11 @@ class HIFI(Provider): return True + # Optimization: If we are selecting tracks, do NOT force a "Detail View" (resolving manifest) here. + # This allows batch selection to flow immediately to `download-file` (via TABLE_AUTO_STAGES) + # or other downstream cmdlets. The download logic (HIFI.download) handles manifest resolution locally. if table_type == "hifi.track" or (is_generic_hifi and any(str(get_field(i, "path")).startswith("hifi://track/") for i in selected_items)): - try: - meta = ( - current_table.get_table_metadata() - if current_table is not None and hasattr(current_table, "get_table_metadata") - else {} - ) - except Exception: - meta = {} - if isinstance(meta, dict) and meta.get("resolved_manifest"): - return False + return False contexts = self._extract_track_selection_context(selected_items) try: @@ -2047,17 +2020,7 @@ class HIFI(Provider): pass results_payload: List[Dict[str, Any]] = [] for track_id, title, path, detail in track_details: - # Decode the DASH MPD manifest to a local file and use it as the selectable/playable path. - try: - from cmdlet._shared import resolve_tidal_manifest_path - - manifest_path = resolve_tidal_manifest_path( - {"full_metadata": detail, "path": f"hifi://track/{track_id}"} - ) - except Exception: - manifest_path = None - - resolved_path = str(manifest_path) if manifest_path else f"hifi://track/{track_id}" + resolved_path = f"hifi://track/{track_id}" artists = self._extract_artists(detail) artist_display = ", ".join(artists) if artists else "" @@ -2086,6 +2049,7 @@ class HIFI(Provider): columns=columns, full_metadata=detail, tag=tags, + selection_args=["-url", resolved_path], ) if url_value: try: diff --git a/Provider/Tidal.py b/Provider/Tidal.py new file mode 100644 index 0000000..3d99102 --- /dev/null +++ b/Provider/Tidal.py @@ -0,0 +1,2119 @@ +from __future__ import annotations + +import os +import random +import re +import shutil +import string +import subprocess +import time +import sys +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Tuple +from urllib.parse import urlparse + +from API.Tidal import Tidal as tidalApiClient +from ProviderCore.base import Provider, SearchResult, parse_inline_query_arguments +from API.Tidal import ( + build_track_tags, + coerce_duration_seconds, + extract_artists, + stringify, +) +from SYS import pipeline as pipeline_context +from SYS.logger import debug, log + +URL_API = ( + "https://tidal-api.binimum.org", + "https://triton.squid.wtf", + "https://wolf.qqdl.site", + "https://maus.qqdl.site", + "https://vogel.qqdl.site", + "https://katze.qqdl.site", + "https://hund.qqdl.site", + "https://tidal.kinoplus.online", + "https://tidal-api.binimum.org", +) + + + + +_KEY_TO_PARAM: Dict[str, str] = { + "album": "al", + "artist": "a", + "playlist": "p", + "video": "v", + "song": "s", + "track": "s", + "title": "s", +} + +_DELIMITERS_RE = re.compile(r"[;,]") +_SEGMENT_BOUNDARY_RE = re.compile(r"(?=\b\w+\s*:)") + + +def _format_total_seconds(seconds: Any) -> str: + try: + total = int(seconds) + except Exception: + return "" + if total <= 0: + return "" + mins = total // 60 + secs = total % 60 + return f"{mins}:{secs:02d}" + + +class Tidal(Provider): + PROVIDER_NAME = "tidal" + PROVIDER_ALIASES = ("tidal",) + + TABLE_AUTO_STAGES = { + "tidal.track": ["download-file"], + } + QUERY_ARG_CHOICES = { + "artist": (), + "album": (), + "track": (), + "title": (), + "playlist": (), + "video": (), + } + INLINE_QUERY_FIELD_CHOICES = QUERY_ARG_CHOICES + URL_DOMAINS = ( + "tidal.com", + "listen.tidal.com", + ) + URL = URL_DOMAINS + """Provider that targets the Tidal-RestAPI (Tidal proxy) search endpoint. + + The CLI can supply a list of fail-over URLs via ``provider.tidal.api_urls`` or + ``provider.tidal.api_url`` in the config. When not configured, it defaults to + https://tidal-api.binimum.org. + """ + + def __init__(self, config: Optional[Dict[str, Any]] = None) -> None: + super().__init__(config) + self.api_urls = self._resolve_api_urls() + try: + self.api_timeout = float(self.config.get("timeout", 10.0)) + except Exception: + self.api_timeout = 10.0 + self.api_clients = [tidalApiClient(base_url=url, timeout=self.api_timeout) for url in self.api_urls] + + def extract_query_arguments(self, query: str) -> Tuple[str, Dict[str, Any]]: + normalized, parsed = parse_inline_query_arguments(query) + filtered: Dict[str, Any] = {} + for key, value in parsed.items(): + if key in self.QUERY_ARG_CHOICES: + filtered[key] = value + return normalized, filtered + + def validate(self) -> bool: + return bool(self.api_urls) + + def search( + self, + query: str, + limit: int = 50, + filters: Optional[Dict[str, Any]] = None, + **_kwargs: Any, + ) -> List[SearchResult]: + if limit <= 0: + return [] + normalized_query, inline_args = self.extract_query_arguments(query) + raw_query = str(query or "").strip() + search_query = normalized_query or raw_query + if not search_query and inline_args: + search_query = " ".join(f"{k}:{v}" for k, v in inline_args.items()) + if not search_query: + return [] + + view = self._determine_view(search_query, inline_args) + params = self._build_search_params(search_query) + if not params: + return [] + + payload: Optional[Dict[str, Any]] = None + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/search/" + try: + client = self._get_api_client_for_base(base) + payload = client.search(params) if client else None + if payload is not None: + break + except Exception as exc: + log(f"[tidal] Search failed for {endpoint}: {exc}", file=sys.stderr) + continue + + if not payload: + return [] + + data = payload.get("data") or {} + if view == "artist": + items = self._extract_artist_items(data) + else: + items = self._extract_track_items(data) + results: List[SearchResult] = [] + for item in items: + if limit and len(results) >= limit: + break + if view == "artist": + result = self._artist_item_to_result(item) + else: + result = self._item_to_result(item) + if result is not None: + results.append(result) + + return results[:limit] + + @staticmethod + def _get_view_from_query(query: str) -> str: + text = str(query or "").strip() + if not text: + return "track" + if re.search(r"\bartist\s*:", text, flags=re.IGNORECASE): + return "artist" + if re.search(r"\balbum\s*:", text, flags=re.IGNORECASE): + return "album" + return "track" + + def _determine_view(self, query: str, inline_args: Dict[str, Any]) -> str: + if inline_args: + if "artist" in inline_args: + return "artist" + if "album" in inline_args: + return "album" + if "track" in inline_args or "title" in inline_args: + return "track" + if "video" in inline_args or "playlist" in inline_args: + return "track" + return self._get_view_from_query(query) + + @staticmethod + def _safe_filename(value: Any, *, fallback: str = "tidal") -> str: + text = str(value or "").strip() + if not text: + return fallback + text = re.sub(r"[<>:\"/\\|?*\x00-\x1f]", "_", text) + text = re.sub(r"\s+", " ", text).strip().strip(". ") + return text[:120] if text else fallback + + @staticmethod + def _parse_track_id(value: Any) -> Optional[int]: + if value is None: + return None + try: + track_id = int(value) + except Exception: + return None + return track_id if track_id > 0 else None + + def _extract_track_id_from_result(self, result: SearchResult) -> Optional[int]: + md = getattr(result, "full_metadata", None) + if isinstance(md, dict): + track_id = self._parse_track_id(md.get("trackId") or md.get("id")) + if track_id: + return track_id + + path = str(getattr(result, "path", "") or "").strip() + if path: + m = re.search(r"tidal:(?://)?track[\\/](\d+)", path, flags=re.IGNORECASE) + if m: + return self._parse_track_id(m.group(1)) + return None + + @staticmethod + def _parse_int(value: Any) -> Optional[int]: + if value is None: + return None + try: + num = int(value) + except Exception: + return None + return num if num > 0 else None + + def _parse_tidal_url(self, url: str) -> Tuple[str, Optional[int]]: + try: + parsed = urlparse(str(url)) + except Exception: + return "", None + + parts = [segment for segment in (parsed.path or "").split("/") if segment] + if not parts: + return "", None + + idx = 0 + if parts[0].lower() == "browse": + idx = 1 + if idx >= len(parts): + return "", None + + view = parts[idx].lower() + if view not in {"album", "track"}: + return "", None + + for segment in parts[idx + 1:]: + identifier = self._parse_int(segment) + if identifier is not None: + return view, identifier + return view, None + + def _track_detail_to_result(self, detail: Optional[Dict[str, Any]], track_id: int) -> SearchResult: + if isinstance(detail, dict): + candidate = self._item_to_result(detail) + if candidate is not None: + try: + candidate.full_metadata = dict(detail) + except Exception: + pass + return candidate + + title = f"Track {track_id}" + if isinstance(detail, dict): + title = self._stringify(detail.get("title")) or title + + return SearchResult( + table="tidal.track", + title=title, + path=f"tidal://track/{track_id}", + detail=f"id:{track_id}", + annotations=["tidal", "track"], + media_kind="audio", + full_metadata=dict(detail) if isinstance(detail, dict) else {}, + ) + + def _extract_artist_selection_context(self, selected_items: List[Any]) -> List[Tuple[int, str]]: + contexts: List[Tuple[int, str]] = [] + seen: set[int] = set() + + for item in selected_items or []: + payload: Dict[str, Any] = {} + if isinstance(item, dict): + payload = item + else: + try: + payload = item.to_dict() if hasattr(item, "to_dict") and callable(getattr(item, "to_dict")) else {} + except Exception: + payload = {} + if not payload: + try: + payload = { + "title": getattr(item, "title", None), + "path": getattr(item, "path", None), + "full_metadata": getattr(item, "full_metadata", None), + } + except Exception: + payload = {} + + meta = payload.get("full_metadata") if isinstance(payload.get("full_metadata"), dict) else payload + if not isinstance(meta, dict): + meta = {} + + artist_id = self._parse_int(meta.get("artistId") or meta.get("id") or payload.get("artistId") or payload.get("id")) + if not artist_id: + # Try to parse from path. + raw_path = str(payload.get("path") or "").strip() + if raw_path: + m = re.search(r"tidal:(?://)?artist[\\/](\d+)", raw_path, flags=re.IGNORECASE) + if m: + artist_id = self._parse_int(m.group(1)) + + if not artist_id or artist_id in seen: + continue + seen.add(artist_id) + + name = ( + payload.get("title") + or meta.get("name") + or meta.get("title") + or payload.get("name") + ) + name_text = str(name or "").strip() or f"Artist {artist_id}" + contexts.append((artist_id, name_text)) + + return contexts + + def _extract_album_selection_context(self, selected_items: List[Any]) -> List[Tuple[Optional[int], str, str]]: + """Return (album_id, album_title, artist_name) for selected album rows.""" + + contexts: List[Tuple[Optional[int], str, str]] = [] + seen_ids: set[int] = set() + seen_keys: set[str] = set() + + for item in selected_items or []: + payload: Dict[str, Any] = {} + if isinstance(item, dict): + payload = item + else: + try: + payload = item.to_dict() if hasattr(item, "to_dict") and callable(getattr(item, "to_dict")) else {} + except Exception: + payload = {} + if not payload: + try: + payload = { + "title": getattr(item, "title", None), + "path": getattr(item, "path", None), + "full_metadata": getattr(item, "full_metadata", None), + } + except Exception: + payload = {} + + meta = payload.get("full_metadata") if isinstance(payload.get("full_metadata"), dict) else payload + if not isinstance(meta, dict): + meta = {} + + album_title = self._stringify(payload.get("title") or meta.get("title") or meta.get("name")) + if not album_title: + album_title = self._stringify(meta.get("album") or meta.get("albumTitle")) + if not album_title: + continue + + artist_name = self._stringify(meta.get("_artist_name") or meta.get("artist") or meta.get("artistName")) + if not artist_name: + # Some album payloads include nested artist objects. + artist_obj = meta.get("artist") + if isinstance(artist_obj, dict): + artist_name = self._stringify(artist_obj.get("name")) + + # Prefer albumId when available; some payloads carry both id/albumId. + album_id = self._parse_int(meta.get("albumId") or meta.get("id")) + + if not album_id: + raw_path = self._stringify(payload.get("path")) + if raw_path: + m = re.search(r"tidal:(?://)?album[\\/](\d+)", raw_path, flags=re.IGNORECASE) + if m: + album_id = self._parse_int(m.group(1)) + + if album_id: + if album_id in seen_ids: + continue + seen_ids.add(album_id) + else: + key = f"{album_title.lower()}::{artist_name.lower()}" + if key in seen_keys: + continue + seen_keys.add(key) + + contexts.append((album_id, album_title, artist_name)) + + return contexts + + def _track_matches_artist(self, track: Dict[str, Any], *, artist_id: Optional[int], artist_name: str) -> bool: + if not isinstance(track, dict): + return False + wanted = str(artist_name or "").strip().lower() + + primary = track.get("artist") + if isinstance(primary, dict): + if artist_id and self._parse_int(primary.get("id")) == artist_id: + return True + name = str(primary.get("name") or "").strip().lower() + if wanted and name == wanted: + return True + + artists = track.get("artists") + if isinstance(artists, list): + for a in artists: + if not isinstance(a, dict): + continue + if artist_id and self._parse_int(a.get("id")) == artist_id: + return True + name = str(a.get("name") or "").strip().lower() + if wanted and name == wanted: + return True + + # Fallback: string-match extracted display. + if wanted: + try: + names = [n.lower() for n in self._extract_artists(track)] + except Exception: + names = [] + return wanted in names + + return False + + def _albums_for_artist(self, *, artist_id: Optional[int], artist_name: str, limit: int = 200) -> List[SearchResult]: + name = str(artist_name or "").strip() + if not name: + return [] + + payload: Optional[Dict[str, Any]] = None + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/search/" + try: + client = self._get_api_client_for_base(base) + payload = client.search({"s": name}) if client else None + if payload is not None: + break + except Exception as exc: + log(f"[tidal] Album lookup failed for {endpoint}: {exc}", file=sys.stderr) + continue + + if not payload: + return [] + + data = payload.get("data") or {} + tracks = self._extract_track_items(data) + if not tracks: + return [] + + albums_by_id: Dict[int, Dict[str, Any]] = {} + albums_by_key: Dict[str, Dict[str, Any]] = {} + for track in tracks: + if not self._track_matches_artist(track, artist_id=artist_id, artist_name=name): + continue + album = track.get("album") + if not isinstance(album, dict): + continue + # Prefer albumId when available; some payloads carry both id/albumId. + album_id = self._parse_int(album.get("albumId") or album.get("id")) + title = self._stringify(album.get("title")) + if not title: + continue + if album_id: + albums_by_id.setdefault(album_id, album) + continue + key = f"{title.lower()}::{name.lower()}" + albums_by_key.setdefault(key, album) + + album_items: List[Dict[str, Any]] = list(albums_by_id.values()) + list(albums_by_key.values()) + results: List[SearchResult] = [] + for album in album_items: + if limit and len(results) >= limit: + break + res = self._album_item_to_result(album, artist_name=name) + if res is not None: + results.append(res) + return results + + def _tracks_for_album(self, *, album_id: Optional[int], album_title: str, artist_name: str = "", limit: int = 200) -> List[SearchResult]: + title = str(album_title or "").strip() + if not title: + return [] + + def _norm_album(text: str) -> str: + # Normalize album titles for matching across punctuation/case/spacing. + # Example: "either/or" vs "Either Or" or "Either/Or (Expanded Edition)". + s = str(text or "").strip().lower() + if not s: + return "" + s = re.sub(r"&", " and ", s) + s = re.sub(r"[^a-z0-9]+", "", s) + return s + + search_text = title + artist_text = str(artist_name or "").strip() + if artist_text: + # The proxy only supports s/a/v/p. Use a combined s= query to bias results + # toward the target album's tracks. + search_text = f"{artist_text} {title}".strip() + + # Prefer /album when we have a numeric album id. + # The proxy returns the album payload including a full track list in `data.items`. + # When this endpoint is available, it is authoritative for an album id, so we do + # not apply additional title/artist filtering. + if album_id: + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/album/" + try: + client = self._get_api_client_for_base(base) + album_payload = client.album(int(album_id)) if client else None + except Exception as exc: + log(f"[tidal] Album lookup failed for {endpoint}: {exc}", file=sys.stderr) + continue + + if not isinstance(album_payload, dict) or not album_payload: + continue + + try: + album_data = album_payload.get("data") + album_tracks = self._extract_track_items(album_data if album_data is not None else album_payload) + except Exception: + album_tracks = [] + + if not album_tracks: + # Try the next configured base URL (some backends return an error-shaped + # JSON object with 200, or omit tracks for certain ids). + continue + + ordered: List[Tuple[int, int, Dict[str, Any]]] = [] + for tr in album_tracks: + if not isinstance(tr, dict): + continue + disc_val = self._parse_int(tr.get("volumeNumber") or tr.get("discNumber") or 0) or 0 + track_val = self._parse_int(tr.get("trackNumber") or 0) or 0 + ordered.append((disc_val, track_val, tr)) + + ordered.sort(key=lambda t: (t[0], t[1])) + try: + debug(f"tidal album endpoint tracks: album_id={album_id} extracted={len(album_tracks)}") + except Exception: + pass + + results: List[SearchResult] = [] + for _disc, _track, tr in ordered: + if limit and len(results) >= limit: + break + res = self._item_to_result(tr) + if res is not None: + results.append(res) + if results: + return results + + # Reduce punctuation in the raw search string to improve /search/ recall. + try: + search_text = re.sub(r"[/\\]+", " ", search_text) + search_text = re.sub(r"\s+", " ", search_text).strip() + except Exception: + pass + + payload: Optional[Dict[str, Any]] = None + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/search/" + try: + client = self._get_api_client_for_base(base) + payload = client.search({"s": search_text}) if client else None + if payload is not None: + break + except Exception as exc: + log(f"[tidal] Track lookup failed for {endpoint}: {exc}", file=sys.stderr) + continue + + if not payload: + return [] + + data = payload.get("data") or {} + tracks = self._extract_track_items(data) + if not tracks: + return [] + + try: + debug(f"tidal album search tracks: album_id={album_id} extracted={len(tracks)} query={repr(search_text)}") + except Exception: + pass + + wanted_album = title.lower() + wanted_album_norm = _norm_album(title) + wanted_artist = artist_text.lower() + seen_ids: set[int] = set() + candidates: List[Tuple[int, int, Dict[str, Any]]] = [] + + for track in tracks: + if not isinstance(track, dict): + continue + tid = self._parse_int(track.get("id") or track.get("trackId")) + if not tid or tid in seen_ids: + continue + + album = track.get("album") + album_ok = False + if isinstance(album, dict): + if album_id and self._parse_int(album.get("albumId") or album.get("id")) == album_id: + album_ok = True + else: + at = self._stringify(album.get("title")).lower() + if at: + if at == wanted_album: + album_ok = True + else: + at_norm = _norm_album(at) + if wanted_album_norm and at_norm and ( + at_norm == wanted_album_norm + or wanted_album_norm in at_norm + or at_norm in wanted_album_norm): + album_ok = True + else: + # If album is not a dict, fall back to string compare. + at = self._stringify(track.get("album")).lower() + if at: + if at == wanted_album: + album_ok = True + else: + at_norm = _norm_album(at) + if wanted_album_norm and at_norm and ( + at_norm == wanted_album_norm + or wanted_album_norm in at_norm + or at_norm in wanted_album_norm): + album_ok = True + if not album_ok: + continue + + if wanted_artist: + if not self._track_matches_artist(track, artist_id=None, artist_name=artist_name): + continue + seen_ids.add(tid) + + disc_val = self._parse_int(track.get("volumeNumber") or track.get("discNumber") or 0) or 0 + track_val = self._parse_int(track.get("trackNumber") or 0) or 0 + candidates.append((disc_val, track_val, track)) + + candidates.sort(key=lambda t: (t[0], t[1])) + + # If strict matching found nothing, relax title matching (substring) while still + # keeping artist filtering when available. + if not candidates: + for track in tracks: + if not isinstance(track, dict): + continue + tid = self._parse_int(track.get("id") or track.get("trackId")) + if not tid or tid in seen_ids: + continue + + album = track.get("album") + if isinstance(album, dict): + at = self._stringify(album.get("title")).lower() + else: + at = self._stringify(track.get("album")).lower() + + if not at: + continue + at_norm = _norm_album(at) + if wanted_album_norm and at_norm: + if not (wanted_album_norm in at_norm or at_norm in wanted_album_norm): + continue + else: + if wanted_album not in at: + continue + if wanted_artist: + if not self._track_matches_artist(track, artist_id=None, artist_name=artist_name): + continue + + seen_ids.add(tid) + disc_val = self._parse_int(track.get("volumeNumber") or track.get("discNumber") or 0) or 0 + track_val = self._parse_int(track.get("trackNumber") or 0) or 0 + candidates.append((disc_val, track_val, track)) + + candidates.sort(key=lambda t: (t[0], t[1])) + + try: + debug(f"tidal album search tracks: album_id={album_id} matched={len(candidates)} title={repr(title)} artist={repr(artist_name)}") + except Exception: + pass + + results: List[SearchResult] = [] + for _disc, _track, track in candidates: + if limit and len(results) >= limit: + break + res = self._item_to_result(track) + if res is not None: + results.append(res) + + return results + + def _present_album_tracks( + self, + track_results: List[SearchResult], + *, + album_id: Optional[int], + album_title: str, + artist_name: str, + ) -> None: + if not track_results: + return + + try: + from SYS.rich_display import stdout_console + from SYS.result_table import ResultTable + except Exception: + return + + label = album_title or "Album" + if artist_name: + label = f"{artist_name} - {label}" + + table = ResultTable(f"Tidal Tracks: {label}").set_preserve_order(True) + table.set_table("tidal.track") + try: + table.set_table_metadata( + { + "provider": "tidal", + "view": "track", + "album_id": album_id, + "album_title": album_title, + "artist_name": artist_name, + } + ) + except Exception: + pass + + results_payload: List[Dict[str, Any]] = [] + for result in track_results: + table.add_result(result) + try: + results_payload.append(result.to_dict()) + except Exception: + results_payload.append( + { + "table": getattr(result, "table", "tidal.track"), + "title": getattr(result, "title", ""), + "path": getattr(result, "path", ""), + } + ) + + pipeline_context.set_last_result_table(table, results_payload) + pipeline_context.set_current_stage_table(table) + + try: + stdout_console().print() + stdout_console().print(table) + except Exception: + pass + + def _album_item_to_result(self, album: Dict[str, Any], *, artist_name: str) -> Optional[SearchResult]: + if not isinstance(album, dict): + return None + title = self._stringify(album.get("title")) + if not title: + return None + # Prefer albumId when available; some payloads carry both id/albumId. + album_id = self._parse_int(album.get("albumId") or album.get("id")) + path = f"tidal://album/{album_id}" if album_id else f"tidal://album/{self._safe_filename(title)}" + + columns: List[tuple[str, str]] = [("Album", title)] + if artist_name: + columns.append(("Artist", str(artist_name))) + + # Album stats (best-effort): show track count and total duration when available. + track_count = self._parse_int(album.get("numberOfTracks") or album.get("trackCount") or album.get("tracks") or 0) + if track_count: + columns.append(("Tracks", str(track_count))) + total_time = _format_total_seconds(album.get("duration") or album.get("durationSeconds") or album.get("duration_sec") or 0) + if total_time: + columns.append(("Total", total_time)) + + release_date = self._stringify(album.get("releaseDate") or album.get("release_date") or album.get("date")) + if release_date: + columns.append(("Release", release_date)) + + # Preserve the original album payload but add a hint for downstream. + md: Dict[str, Any] = dict(album) + if artist_name and "_artist_name" not in md: + md["_artist_name"] = artist_name + + return SearchResult( + table="tidal.album", + title=title, + path=path, + detail="album", + annotations=["tidal", "album"], + media_kind="audio", + columns=columns, + full_metadata=md, + ) + + @staticmethod + def _find_ffmpeg() -> Optional[str]: + exe = shutil.which("ffmpeg") + if exe: + return exe + try: + repo_root = Path(__file__).resolve().parents[1] + bundled = repo_root / "MPV" / "ffmpeg" / "bin" / "ffmpeg.exe" + if bundled.is_file(): + return str(bundled) + except Exception: + pass + return None + + @staticmethod + def _find_ffprobe() -> Optional[str]: + exe = shutil.which("ffprobe") + if exe: + return exe + try: + repo_root = Path(__file__).resolve().parents[1] + bundled = repo_root / "MPV" / "ffmpeg" / "bin" / "ffprobe.exe" + if bundled.is_file(): + return str(bundled) + except Exception: + pass + return None + + def _probe_audio_codec(self, input_ref: str) -> Optional[str]: + """Best-effort probe for primary audio codec name (lowercase).""" + + candidate = str(input_ref or "").strip() + if not candidate: + return None + + ffprobe_path = self._find_ffprobe() + if ffprobe_path: + cmd = [ + ffprobe_path, + "-v", + "error", + "-select_streams", + "a:0", + "-show_entries", + "stream=codec_name", + "-of", + "default=nw=1:nk=1", + candidate, + ] + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + check=False, + ) + if proc.returncode == 0: + codec = str(proc.stdout or "").strip().lower() + if codec: + return codec + except Exception: + pass + + # Fallback: parse `ffmpeg -i` stream info. + ffmpeg_path = self._find_ffmpeg() + if not ffmpeg_path: + return None + try: + proc = subprocess.run( + [ffmpeg_path, "-hide_banner", "-i", candidate], + capture_output=True, + text=True, + check=False, + ) + text = (proc.stderr or "") + "\n" + (proc.stdout or "") + m = re.search(r"Audio:\s*([A-Za-z0-9_]+)", text) + if m: + return str(m.group(1)).strip().lower() + except Exception: + pass + return None + + @staticmethod + def _preferred_audio_suffix(codec: Optional[str], metadata: Optional[Dict[str, Any]] = None) -> str: + c = str(codec or "").strip().lower() + if c == "flac": + return ".flac" + if c in {"aac", "alac"}: + return ".m4a" + # Default to Matroska Audio for unknown / uncommon codecs. + return ".mka" + + @staticmethod + def _has_nonempty_file(path: Path) -> bool: + try: + return path.is_file() and path.stat().st_size > 0 + except Exception: + return False + + def _ffmpeg_demux_to_audio( + self, + *, + input_ref: str, + output_path: Path, + lossless_fallback: bool = True, + progress: Optional[Any] = None, + transfer_label: Optional[str] = None, + duration_seconds: Optional[int] = None, + audio_quality: Optional[str] = None, + ) -> Optional[Path]: + ffmpeg_path = self._find_ffmpeg() + if not ffmpeg_path: + debug("[tidal] ffmpeg not found; cannot materialize audio from MPD") + return None + + if self._has_nonempty_file(output_path): + return output_path + + try: + output_path.parent.mkdir(parents=True, exist_ok=True) + except Exception: + pass + + protocol_whitelist = "file,https,http,tcp,tls,crypto,data" + + label = str(transfer_label or output_path.name or "tidal") + + def _estimate_total_bytes() -> Optional[int]: + try: + dur = int(duration_seconds) if duration_seconds is not None else None + except Exception: + dur = None + if not dur or dur <= 0: + return None + + qual = str(audio_quality or "").strip().lower() + # Rough per-quality bitrate guess (bytes/sec). + if qual in {"hi_res", + "hi_res_lossless", + "hires", + "hi-res", + "master", + "mqa"}: + bps = 4_608_000 # ~24-bit/96k stereo + elif qual in {"lossless", + "flac"}: + bps = 1_411_200 # 16-bit/44.1k stereo + else: + bps = 320_000 # kbps for compressed + + try: + return int((bps / 8.0) * dur) + except Exception: + return None + + est_total_bytes = _estimate_total_bytes() + + def _update_transfer(total_bytes_val: Optional[int]) -> None: + if progress is None: + return + try: + progress.update_transfer( + label=label, + completed=int(total_bytes_val) if total_bytes_val is not None else None, + total=est_total_bytes, + ) + except Exception: + pass + + def _run(cmd: List[str], *, target_path: Optional[Path] = None) -> bool: + cmd_progress = list(cmd) + # Enable ffmpeg progress output for live byte updates. + cmd_progress.insert(1, "-progress") + cmd_progress.insert(2, "pipe:1") + cmd_progress.insert(3, "-nostats") + + try: + proc = subprocess.Popen( + cmd_progress, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + except Exception as exc: + debug(f"[tidal] ffmpeg invocation failed: {exc}") + return False + + last_bytes = None + try: + while True: + line = proc.stdout.readline() if proc.stdout else "" + if not line: + if proc.poll() is not None: + break + time.sleep(0.05) + continue + + if "=" not in line: + continue + key, val = line.strip().split("=", 1) + if key == "total_size": + try: + last_bytes = int(val) + _update_transfer(last_bytes) + except Exception: + pass + elif key == "out_time_ms": + # Map out_time_ms to byte estimate when total_size missing. + try: + if est_total_bytes and val.isdigit(): + ms = int(val) + dur_ms = (duration_seconds or 0) * 1000 + if dur_ms > 0: + pct = min(1.0, max(0.0, ms / dur_ms)) + approx = int(est_total_bytes * pct) + _update_transfer(approx) + except Exception: + pass + + proc.wait() + finally: + if last_bytes is not None: + _update_transfer(last_bytes) + + check_path = target_path or output_path + if proc.returncode == 0 and self._has_nonempty_file(check_path): + return True + + try: + stderr_text = proc.stderr.read() if proc.stderr else "" + if stderr_text: + debug(f"[tidal] ffmpeg failed: {stderr_text.strip()}") + except Exception: + pass + return False + + # Prefer remux (fast, no transcode). + cmd_copy = [ + ffmpeg_path, + "-y", + "-hide_banner", + "-loglevel", + "error", + "-protocol_whitelist", + protocol_whitelist, + "-i", + str(input_ref), + "-vn", + "-c", + "copy", + str(output_path), + ] + if _run(cmd_copy): + return output_path + + if not lossless_fallback: + return None + + # Fallback: decode/transcode to FLAC to guarantee a supported file. + flac_path = ( + output_path + if output_path.suffix.lower() == ".flac" + else output_path.with_suffix(".flac") + ) + if self._has_nonempty_file(flac_path): + return flac_path + + # Avoid leaving a partial FLAC behind if we're transcoding into the final name. + tmp_flac_path = flac_path + if flac_path == output_path: + tmp_flac_path = output_path.with_name(f"{output_path.stem}.tmp{output_path.suffix}") + + cmd_flac = [ + ffmpeg_path, + "-y", + "-hide_banner", + "-loglevel", + "error", + "-protocol_whitelist", + protocol_whitelist, + "-i", + str(input_ref), + "-vn", + "-c:a", + "flac", + str(tmp_flac_path), + ] + if _run(cmd_flac, target_path=tmp_flac_path) and self._has_nonempty_file(tmp_flac_path): + if tmp_flac_path != flac_path: + try: + tmp_flac_path.replace(flac_path) + except Exception: + # If rename fails, still return the temp file. + return tmp_flac_path + return flac_path + return None + + def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]: + """Materialize a playable audio file from a Tidal DASH manifest.""" + + try: + output_dir.mkdir(parents=True, exist_ok=True) + except Exception: + pass + + raw_path = str(getattr(result, "path", "") or "").strip() + + md: Dict[str, Any] = {} + if isinstance(getattr(result, "full_metadata", None), dict): + md = dict(getattr(result, "full_metadata") or {}) + + if not md.get("manifest"): + track_id = self._extract_track_id_from_result(result) + if track_id: + detail = self._fetch_track_details(track_id) + if isinstance(detail, dict) and detail: + try: + md.update(detail) + except Exception: + md = detail + + # Best-effort: fetch synced lyric subtitles for MPV (LRC). + try: + track_id_for_lyrics = self._extract_track_id_from_result(result) + except Exception: + track_id_for_lyrics = None + if track_id_for_lyrics and not md.get("_tidal_lyrics_subtitles"): + lyr = self._fetch_track_lyrics(track_id_for_lyrics) + if isinstance(lyr, dict) and lyr: + try: + md.setdefault("lyrics", lyr) + except Exception: + pass + try: + subtitles = lyr.get("subtitles") + if isinstance(subtitles, str) and subtitles.strip(): + md["_tidal_lyrics_subtitles"] = subtitles.strip() + except Exception: + pass + + # Ensure downstream cmdlets see our enriched metadata. + try: + if isinstance(getattr(result, "full_metadata", None), dict): + result.full_metadata.update(md) + else: + result.full_metadata = md + except Exception: + pass + + try: + from cmdlet._shared import resolve_tidal_manifest_path + except Exception: + return None + + resolved = resolve_tidal_manifest_path({"full_metadata": md, "path": raw_path, "title": getattr(result, "title", "")}) + if not resolved: + return None + + resolved_text = str(resolved).strip() + if not resolved_text: + return None + + track_id = self._extract_track_id_from_result(result) + title_part = self._safe_filename(getattr(result, "title", None), fallback="tidal") + hash_part = self._safe_filename(md.get("manifestHash"), fallback="") + + stem_parts = ["tidal"] + if track_id: + stem_parts.append(str(track_id)) + if hash_part: + stem_parts.append(hash_part[:12]) + if title_part: + stem_parts.append(title_part) + stem = "-".join([p for p in stem_parts if p])[:180].rstrip("- ") + + codec = self._probe_audio_codec(resolved_text) + suffix = self._preferred_audio_suffix(codec, md) + + # If resolve_tidal_manifest_path returned a URL, prefer feeding it directly to ffmpeg. + if resolved_text.lower().startswith("http"): + out_file = output_dir / f"{stem}{suffix}" + materialized = self._ffmpeg_demux_to_audio( + input_ref=resolved_text, + output_path=out_file, + progress=self.config.get("_pipeline_progress") if isinstance(self.config, dict) else None, + transfer_label=title_part or getattr(result, "title", None), + duration_seconds=self._coerce_duration_seconds(md), + audio_quality=md.get("audioQuality") if isinstance(md, dict) else None, + ) + if materialized is not None: + return materialized + + # As a fallback, try downloading the URL directly if it looks like a file. + try: + import httpx + + resp = httpx.get(resolved_text, timeout=float(getattr(self, "api_timeout", 10.0))) + resp.raise_for_status() + content = resp.content + direct_path = output_dir / f"{stem}.bin" + with open(direct_path, "wb") as fh: + fh.write(content) + return direct_path + except Exception: + return None + + try: + source_path = Path(resolved_text) + except Exception: + return None + + if source_path.is_file() and source_path.suffix.lower() == ".mpd": + # Materialize audio from the local MPD. + out_file = output_dir / f"{stem}{suffix}" + materialized = self._ffmpeg_demux_to_audio( + input_ref=str(source_path), + output_path=out_file, + progress=self.config.get("_pipeline_progress") if isinstance(self.config, dict) else None, + transfer_label=title_part or getattr(result, "title", None), + duration_seconds=self._coerce_duration_seconds(md), + audio_quality=md.get("audioQuality") if isinstance(md, dict) else None, + ) + if materialized is not None: + return materialized + return None + + # If we somehow got a local audio file already, copy it to output_dir. + if source_path.is_file() and source_path.suffix.lower() in {".m4a", ".mp3", ".flac", ".wav", ".mka", ".mp4"}: + dest = output_dir / f"{stem}{source_path.suffix.lower()}" + if self._has_nonempty_file(dest): + return dest + try: + shutil.copyfile(source_path, dest) + return dest + except Exception: + return None + + # As a last resort, attempt to treat the local path as an ffmpeg input. + out_file = output_dir / f"{stem}{suffix}" + materialized = self._ffmpeg_demux_to_audio( + input_ref=resolved_text, + output_path=out_file, + progress=self.config.get("_pipeline_progress") if isinstance(self.config, dict) else None, + transfer_label=title_part or getattr(result, "title", None), + duration_seconds=self._coerce_duration_seconds(md), + audio_quality=md.get("audioQuality") if isinstance(md, dict) else None, + ) + return materialized + + def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]: + view, identifier = self._parse_tidal_url(url) + if not view: + return False, None + + if view == "track": + if not identifier or output_dir is None: + return False, None + + try: + detail = self._fetch_track_details(identifier) + except Exception: + detail = None + + result = self._track_detail_to_result(detail, identifier) + try: + downloaded = self.download(result, output_dir) + except Exception: + return False, None + + if downloaded: + return True, downloaded + return False, None + + if view == "album": + if not identifier: + return False, None + + try: + track_results = self._tracks_for_album( + album_id=identifier, + album_title="", + artist_name="", + limit=200, + ) + except Exception: + return False, None + + if not track_results: + return False, None + + album_title = "" + artist_name = "" + metadata = getattr(track_results[0], "full_metadata", None) + if isinstance(metadata, dict): + album_obj = metadata.get("album") + if isinstance(album_obj, dict): + album_title = self._stringify(album_obj.get("title")) + else: + album_title = self._stringify(album_obj or metadata.get("album")) + artists = self._extract_artists(metadata) + if artists: + artist_name = artists[0] + + if not album_title: + album_title = f"Album {identifier}" + + self._present_album_tracks( + track_results, + album_id=identifier, + album_title=album_title, + artist_name=artist_name, + ) + return True, None + + return False, None + + def _get_api_client_for_base(self, base_url: str) -> Optional[tidalApiClient]: + base = base_url.rstrip("/") + for client in self.api_clients: + if getattr(client, "base_url", "").rstrip("/") == base: + return client + return None + + def _extract_track_items(self, data: Any) -> List[Dict[str, Any]]: + if isinstance(data, list): + items: List[Dict[str, Any]] = [] + for item in data: + if not isinstance(item, dict): + continue + # Some endpoints return wrapper objects like {"item": {...}}. + nested = item.get("item") + if isinstance(nested, dict): + items.append(nested) + continue + nested = item.get("track") + if isinstance(nested, dict): + items.append(nested) + continue + items.append(item) + return items + if not isinstance(data, dict): + return [] + + items: List[Dict[str, Any]] = [] + direct = data.get("items") + if isinstance(direct, list): + for item in direct: + if not isinstance(item, dict): + continue + nested = item.get("item") + if isinstance(nested, dict): + items.append(nested) + continue + nested = item.get("track") + if isinstance(nested, dict): + items.append(nested) + continue + items.append(item) + + tracks_section = data.get("tracks") + if isinstance(tracks_section, dict): + track_items = tracks_section.get("items") + if isinstance(track_items, list): + for item in track_items: + if not isinstance(item, dict): + continue + nested = item.get("item") + if isinstance(nested, dict): + items.append(nested) + continue + nested = item.get("track") + if isinstance(nested, dict): + items.append(nested) + continue + items.append(item) + + top_hits = data.get("topHits") + if isinstance(top_hits, list): + for hit in top_hits: + if not isinstance(hit, dict): + continue + hit_type = str(hit.get("type") or "").upper() + if hit_type != "TRACKS" and hit_type != "TRACK": + continue + value = hit.get("value") + if isinstance(value, dict): + items.append(value) + + seen: set[int] = set() + deduped: List[Dict[str, Any]] = [] + for item in items: + track_id = item.get("id") or item.get("trackId") + if track_id is None: + continue + try: + track_int = int(track_id) + except Exception: + track_int = None + if track_int is None or track_int in seen: + continue + seen.add(track_int) + deduped.append(item) + + return deduped + + def _resolve_api_urls(self) -> List[str]: + urls: List[str] = [] + raw = self.config.get("api_urls") + if raw is None: + raw = self.config.get("api_url") + if isinstance(raw, (list, tuple)): + urls.extend(str(item).strip() for item in raw if isinstance(item, str)) + elif isinstance(raw, str): + urls.append(raw.strip()) + cleaned = [u.rstrip("/") for u in urls if isinstance(u, str) and u.strip()] + if not cleaned: + cleaned = [URL_API[0]] + return cleaned + + def _build_search_params(self, query: str) -> Dict[str, str]: + cleaned = str(query or "").strip() + if not cleaned: + return {} + + segments: List[str] = [] + for chunk in _DELIMITERS_RE.split(cleaned): + chunk = chunk.strip() + if not chunk: + continue + if ":" in chunk: + for sub in _SEGMENT_BOUNDARY_RE.split(chunk): + part = sub.strip() + if part: + segments.append(part) + else: + segments.append(chunk) + + key_values: Dict[str, str] = {} + free_text: List[str] = [] + for segment in segments: + if ":" not in segment: + free_text.append(segment) + continue + key, value = segment.split(":", 1) + key = key.strip().lower() + value = value.strip().strip('"').strip("'") + if value: + key_values[key] = value + + # The proxy API only accepts exactly one of s/a/v/p. If the user mixes + # free text with a structured key (e.g. artist:foo bar), treat the free + # text as part of the same query instead of creating an additional key. + mapped_values: Dict[str, List[str]] = {} + for key, value in key_values.items(): + if not value: + continue + mapped = _KEY_TO_PARAM.get(key) + if not mapped: + continue + mapped_values.setdefault(mapped, []).append(value) + + # Choose the search key in priority order. + chosen_key = None + for candidate in ("a", "v", "p", "s"): + if mapped_values.get(candidate): + chosen_key = candidate + break + if chosen_key is None: + chosen_key = "s" + + chosen_parts: List[str] = [] + chosen_parts.extend(mapped_values.get(chosen_key, [])) + + # If the user provided free text and a structured key (like artist:), + # fold it into the chosen key instead of forcing a second key. + extra = " ".join(part for part in free_text if part).strip() + if extra: + chosen_parts.append(extra) + + chosen_value = " ".join(p for p in chosen_parts if p).strip() + if not chosen_value: + chosen_value = cleaned + + return {chosen_key: chosen_value} if chosen_value else {} + + def _extract_artist_items(self, data: Any) -> List[Dict[str, Any]]: + if isinstance(data, list): + return [item for item in data if isinstance(item, dict)] + if not isinstance(data, dict): + return [] + + items: List[Dict[str, Any]] = [] + direct = data.get("items") + if isinstance(direct, list): + items.extend(item for item in direct if isinstance(item, dict)) + + artists_section = data.get("artists") + if isinstance(artists_section, dict): + artist_items = artists_section.get("items") + if isinstance(artist_items, list): + items.extend(item for item in artist_items if isinstance(item, dict)) + + top_hits = data.get("topHits") + if isinstance(top_hits, list): + for hit in top_hits: + if not isinstance(hit, dict): + continue + hit_type = str(hit.get("type") or "").upper() + if hit_type != "ARTISTS" and hit_type != "ARTIST": + continue + value = hit.get("value") + if isinstance(value, dict): + items.append(value) + + seen: set[int] = set() + deduped: List[Dict[str, Any]] = [] + for item in items: + raw_id = item.get("id") or item.get("artistId") + if raw_id is None: + continue + try: + artist_int = int(raw_id) + except Exception: + artist_int = None + if artist_int is None or artist_int in seen: + continue + seen.add(artist_int) + deduped.append(item) + + return deduped + + def _artist_item_to_result(self, item: Dict[str, Any]) -> Optional[SearchResult]: + if not isinstance(item, dict): + return None + + name = str(item.get("name") or item.get("title") or "").strip() + if not name: + return None + + raw_id = item.get("id") or item.get("artistId") + if raw_id is None: + return None + try: + artist_id = int(raw_id) + except (TypeError, ValueError): + return None + + path = f"tidal://artist/{artist_id}" + + columns: List[tuple[str, str]] = [("Artist", name), ("Artist ID", str(artist_id))] + popularity = self._stringify(item.get("popularity")) + if popularity: + columns.append(("Popularity", popularity)) + + return SearchResult( + table="tidal.artist", + title=name, + path=path, + detail="tidal.artist", + annotations=["tidal", "artist"], + media_kind="audio", + columns=columns, + full_metadata=item, + ) + + @staticmethod + def _format_duration(seconds: Any) -> str: + try: + total = int(seconds) + if total < 0: + return "" + except Exception: + return "" + minutes, secs = divmod(total, 60) + return f"{minutes}:{secs:02d}" + + @staticmethod + def _coerce_duration_seconds(value: Any) -> Optional[int]: + return coerce_duration_seconds(value) + + @staticmethod + def _stringify(value: Any) -> str: + return stringify(value) + + @staticmethod + def _extract_artists(item: Dict[str, Any]) -> List[str]: + return extract_artists(item) + + def _item_to_result(self, item: Dict[str, Any]) -> Optional[SearchResult]: + if not isinstance(item, dict): + return None + + title = str(item.get("title") or "").strip() + if not title: + return None + + identifier = item.get("id") + if identifier is None: + return None + try: + track_id = int(identifier) + except (TypeError, ValueError): + return None + + # Avoid tidal.com URLs entirely; selection will resolve to a decoded MPD. + path = f"tidal://track/{track_id}" + + artists = self._extract_artists(item) + artist_display = ", ".join(artists) + + album = item.get("album") + album_title = "" + if isinstance(album, dict): + album_title = str(album.get("title") or "").strip() + + detail_parts: List[str] = [] + if artist_display: + detail_parts.append(artist_display) + if album_title: + detail_parts.append(album_title) + detail = " | ".join(detail_parts) + + columns: List[tuple[str, str]] = [] + if title: + columns.append(("Title", title)) + disc_no = self._stringify(item.get("volumeNumber") or item.get("discNumber") or item.get("disc_number")) + track_no = self._stringify(item.get("trackNumber") or item.get("track_number")) + if disc_no: + columns.append(("Disc #", disc_no)) + if track_no: + columns.append(("Track #", track_no)) + if album_title: + columns.append(("Album", album_title)) + if artist_display: + columns.append(("Artist", artist_display)) + duration_text = self._format_duration(item.get("duration")) + if duration_text: + columns.append(("Duration", duration_text)) + audio_quality = str(item.get("audioQuality") or "").strip() + if audio_quality: + columns.append(("Quality", audio_quality)) + + # IMPORTANT: do not retain a shared reference to the raw API dict. + # Downstream playback (MPV) mutates metadata to cache the decoded Tidal + # manifest path/URL. If multiple results share the same dict reference, + # they can incorrectly collapse to a single playable target. + full_md: Dict[str, Any] = dict(item) + url_value = self._stringify(full_md.get("url")) + if url_value: + full_md["url"] = url_value + + tags = self._build_track_tags(full_md) + + result = SearchResult( + table="tidal.track", + title=title, + path=path, + detail="tidal.track", + annotations=["tidal", "track"], + media_kind="audio", + tag=tags, + columns=columns, + full_metadata=full_md, + ) + if url_value: + try: + result.url = url_value + except Exception: + pass + return result + + def _extract_track_selection_context( + self, selected_items: List[Any] + ) -> List[Tuple[int, str, str]]: + contexts: List[Tuple[int, str, str]] = [] + seen_ids: set[int] = set() + for item in selected_items or []: + payload: Dict[str, Any] = {} + if isinstance(item, dict): + payload = item + else: + try: + payload = ( + item.to_dict() + if hasattr(item, "to_dict") + and callable(getattr(item, "to_dict")) + else {} + ) + except Exception: + payload = {} + if not payload: + try: + payload = { + "title": getattr(item, "title", None), + "path": getattr(item, "path", None), + "url": getattr(item, "url", None), + "full_metadata": getattr(item, "full_metadata", None), + } + except Exception: + payload = {} + + meta = ( + payload.get("full_metadata") + if isinstance(payload.get("full_metadata"), dict) + else payload + ) + if not isinstance(meta, dict): + meta = {} + raw_id = meta.get("trackId") or meta.get("id") or payload.get("id") + if raw_id is None: + continue + try: + track_id = int(raw_id) + except (TypeError, ValueError): + continue + if track_id in seen_ids: + continue + seen_ids.add(track_id) + + title = ( + payload.get("title") + or meta.get("title") + or payload.get("name") + or payload.get("path") + or payload.get("url") + ) + if not title: + title = f"Track {track_id}" + path = ( + payload.get("path") + or payload.get("url") + or f"tidal://track/{track_id}" + ) + contexts.append((track_id, str(title).strip(), str(path).strip())) + return contexts + + def _fetch_track_details(self, track_id: int) -> Optional[Dict[str, Any]]: + if track_id <= 0: + return None + + info_data = self._fetch_track_info(track_id) + + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/track/" + try: + client = self._get_api_client_for_base(base) + payload = client.track(track_id) if client else None + data = payload.get("data") if isinstance(payload, dict) else None + if isinstance(data, dict): + merged: Dict[str, Any] = {} + if isinstance(info_data, dict): + merged.update(info_data) + merged.update(data) + return merged + except Exception as exc: + log(f"[tidal] Track lookup failed for {endpoint}: {exc}", file=sys.stderr) + continue + return None + + def _fetch_track_info(self, track_id: int) -> Optional[Dict[str, Any]]: + if track_id <= 0: + return None + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/info/" + try: + client = self._get_api_client_for_base(base) + payload = client.info(track_id) if client else None + data = payload.get("data") if isinstance(payload, dict) else None + if isinstance(data, dict): + return data + except Exception as exc: + debug(f"[tidal] Info lookup failed for {endpoint}: {exc}") + continue + return None + + def _fetch_track_lyrics(self, track_id: int) -> Optional[Dict[str, Any]]: + if track_id <= 0: + return None + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/lyrics/" + try: + client = self._get_api_client_for_base(base) + payload = client.lyrics(track_id) if client else None + if not isinstance(payload, dict): + continue + + lyrics_obj = payload.get("lyrics") + if isinstance(lyrics_obj, dict) and lyrics_obj: + return lyrics_obj + + data_obj = payload.get("data") + if isinstance(data_obj, dict) and data_obj: + return data_obj + except Exception as exc: + debug(f"[tidal] Lyrics lookup failed for {endpoint}: {exc}") + continue + return None + + def _build_track_columns(self, detail: Dict[str, Any], track_id: int) -> List[Tuple[str, str]]: + values: List[Tuple[str, str]] = [ + ("Track ID", str(track_id)), + ("Quality", self._stringify(detail.get("audioQuality"))), + ("Mode", self._stringify(detail.get("audioMode"))), + ("Asset", self._stringify(detail.get("assetPresentation"))), + ("Manifest Type", self._stringify(detail.get("manifestMimeType"))), + ("Manifest Hash", self._stringify(detail.get("manifestHash"))), + ("Bit Depth", self._stringify(detail.get("bitDepth"))), + ("Sample Rate", self._stringify(detail.get("sampleRate"))), + ] + return [(name, value) for name, value in values if value] + + def _build_track_tags(self, metadata: Dict[str, Any]) -> set[str]: + return build_track_tags(metadata) + + def selection_auto_stage( + self, + table_type: str, + stage_args: Optional[Sequence[str]] = None, + ) -> Optional[List[str]]: + """Determine if selection should auto-run download-file.""" + t = str(table_type or "").strip().lower() + + # Explicit track tables always auto-download. + if t == "tidal.track": + return ["download-file"] + + # For the generic "tidal" table (first-stage search results), + # only auto-download if we're selecting track items. + # Otherwise, let selector() handle navigation (artist -> album -> track). + if t == "tidal": + # If we can't see the items yet, we have to guess. + # Default to None so selector() gets a chance to run first. + return None + + return super().selection_auto_stage(table_type, stage_args) + + def selector( + self, + selected_items: List[Any], + *, + ctx: Any, + stage_is_last: bool = True, + **_kwargs: Any, + ) -> bool: + if not stage_is_last: + return False + + try: + current_table = ctx.get_current_stage_table() + except Exception: + current_table = None + if current_table is None: + try: + current_table = ctx.get_last_result_table() + except Exception: + current_table = None + table_type = str( + current_table.table + if current_table and hasattr(current_table, "table") + else "" + ).strip().lower() + + try: + debug( + f"[tidal.selector] table_type={table_type} stage_is_last={stage_is_last} selected_count={len(selected_items) if selected_items else 0}" + ) + except Exception: + pass + + # Unified selection logic: detect artist/album/track by inspecting path or metadata + # when the table name is just the generic "tidal" (from search-file). + is_generic_tidal = (table_type == "tidal") + + # Artist selection: selecting @N should open an albums list. + if table_type == "tidal.artist" or (is_generic_tidal and any(str(get_field(i, "path")).startswith("tidal://artist/") for i in selected_items)): + contexts = self._extract_artist_selection_context(selected_items) + try: + debug(f"[tidal.selector] artist contexts={len(contexts)}") + except Exception: + pass + if not contexts: + return False + + artist_id, artist_name = contexts[0] + album_results = self._albums_for_artist(artist_id=artist_id, artist_name=artist_name, limit=200) + if not album_results: + try: + from SYS.rich_display import stdout_console + stdout_console().print(f"[bold yellow][tidal] No albums found for {artist_name}[/]") + except Exception: + log(f"[tidal] No albums found for {artist_name}") + return True + + try: + from SYS.rich_display import stdout_console + from SYS.result_table import ResultTable + except Exception: + return False + + table = ResultTable(f"Tidal Albums: {artist_name}").set_preserve_order(False) + table.set_table("tidal.album") + try: + table.set_table_metadata({"provider": "tidal", "view": "album", "artist_id": artist_id, "artist_name": artist_name}) + except Exception: + pass + + results_payload: List[Dict[str, Any]] = [] + for res in album_results: + table.add_result(res) + try: + results_payload.append(res.to_dict()) + except Exception: + results_payload.append({"table": "tidal", "title": getattr(res, "title", ""), "path": getattr(res, "path", "")}) + + try: + ctx.set_last_result_table(table, results_payload) + ctx.set_current_stage_table(table) + except Exception: + pass + + try: + suppress = bool(getattr(ctx, "_suppress_provider_selector_print", False)) + except Exception: + suppress = False + + if not suppress: + try: + stdout_console().print() + stdout_console().print(table) + except Exception: + pass + + return True + + # Album selection: selecting @N should open the track list for that album. + if table_type == "tidal.album" or (is_generic_tidal and any(str(get_field(i, "path")).startswith("tidal://album/") for i in selected_items)): + contexts = self._extract_album_selection_context(selected_items) + try: + debug(f"[tidal.selector] album contexts={len(contexts)}") + except Exception: + pass + if not contexts: + return False + + album_id, album_title, artist_name = contexts[0] + track_results = self._tracks_for_album(album_id=album_id, album_title=album_title, artist_name=artist_name, limit=200) + if not track_results: + return False + + try: + from SYS.rich_display import stdout_console + from SYS.result_table import ResultTable + except Exception: + return False + + label = album_title + if artist_name: + label = f"{artist_name} - {album_title}" + # Preserve album order (disc/track) rather than sorting by title. + table = ResultTable(f"Tidal Tracks: {label}").set_preserve_order(True) + table.set_table("tidal.track") + try: + table.set_table_metadata( + { + "provider": "tidal", + "view": "track", + "album_id": album_id, + "album_title": album_title, + "artist_name": artist_name, + } + ) + except Exception: + pass + + results_payload: List[Dict[str, Any]] = [] + for res in track_results: + table.add_result(res) + try: + results_payload.append(res.to_dict()) + except Exception: + results_payload.append({"table": "tidal", "title": getattr(res, "title", ""), "path": getattr(res, "path", "")}) + + try: + ctx.set_last_result_table(table, results_payload) + ctx.set_current_stage_table(table) + except Exception: + pass + + try: + suppress = bool(getattr(ctx, "_suppress_provider_selector_print", False)) + except Exception: + suppress = False + + if not suppress: + try: + stdout_console().print() + stdout_console().print(table) + except Exception: + pass + + return True + + if table_type == "tidal.track" or (is_generic_tidal and any(str(get_field(i, "path")).startswith("tidal://track/") for i in selected_items)): + try: + meta = ( + current_table.get_table_metadata() + if current_table is not None and hasattr(current_table, "get_table_metadata") + else {} + ) + except Exception: + meta = {} + if isinstance(meta, dict) and meta.get("resolved_manifest"): + return False + + contexts = self._extract_track_selection_context(selected_items) + try: + debug(f"[tidal.selector] track contexts={len(contexts)}") + except Exception: + pass + if not contexts: + return False + + track_details: List[Tuple[int, str, str, Dict[str, Any]]] = [] + for track_id, title, path in contexts: + detail = self._fetch_track_details(track_id) + if detail: + track_details.append((track_id, title, path, detail)) + + if not track_details: + return False + + try: + from SYS.rich_display import stdout_console + from SYS.result_table import ResultTable + except Exception: + return False + + table = ResultTable("Tidal Track").set_preserve_order(True) + table.set_table("tidal.track") + try: + table.set_table_metadata({"provider": "tidal", "view": "track", "resolved_manifest": True}) + except Exception: + pass + results_payload: List[Dict[str, Any]] = [] + for track_id, title, path, detail in track_details: + # Decode the DASH MPD manifest to a local file and use it as the selectable/playable path. + try: + from cmdlet._shared import resolve_tidal_manifest_path + + manifest_path = resolve_tidal_manifest_path( + {"full_metadata": detail, "path": f"tidal://track/{track_id}"} + ) + except Exception: + manifest_path = None + + resolved_path = str(manifest_path) if manifest_path else f"tidal://track/{track_id}" + + artists = self._extract_artists(detail) + artist_display = ", ".join(artists) if artists else "" + columns = self._build_track_columns(detail, track_id) + if artist_display: + columns.insert(1, ("Artist", artist_display)) + album = detail.get("album") + if isinstance(album, dict): + album_title = self._stringify(album.get("title")) + else: + album_title = self._stringify(detail.get("album")) + if album_title: + insert_pos = 2 if artist_display else 1 + columns.insert(insert_pos, ("Album", album_title)) + + tags = self._build_track_tags(detail) + url_value = self._stringify(detail.get("url")) + + result = SearchResult( + table="tidal.track", + title=title, + path=resolved_path, + detail=f"id:{track_id}", + annotations=["tidal", "track"], + media_kind="audio", + columns=columns, + full_metadata=detail, + tag=tags, + ) + if url_value: + try: + result.url = url_value + except Exception: + pass + table.add_result(result) + try: + results_payload.append(result.to_dict()) + except Exception: + results_payload.append({ + "table": "tidal.track", + "title": result.title, + "path": result.path, + }) + + try: + ctx.set_last_result_table(table, results_payload) + ctx.set_current_stage_table(table) + except Exception: + pass + + try: + stdout_console().print() + stdout_console().print(table) + except Exception: + pass + + return True \ No newline at end of file diff --git a/Provider/metadata_provider.py b/Provider/metadata_provider.py index d78a3e4..0471031 100644 --- a/Provider/metadata_provider.py +++ b/Provider/metadata_provider.py @@ -12,10 +12,10 @@ import subprocess from API.HTTP import HTTPClient from ProviderCore.base import SearchResult try: - from Provider.HIFI import HIFI + from Provider.Tidal import Tidal except ImportError: # pragma: no cover - optional - HIFI = None -from Provider.tidal_shared import ( + Tidal = None +from API.Tidal import ( build_track_tags, extract_artists, stringify, @@ -1426,17 +1426,17 @@ except Exception: # Registry --------------------------------------------------------------- class TidalMetadataProvider(MetadataProvider): - """Metadata provider that reuses the HIFI search provider for tidal info.""" + """Metadata provider that reuses the Tidal search provider for tidal info.""" @property def name(self) -> str: # type: ignore[override] return "tidal" def __init__(self, config: Optional[Dict[str, Any]] = None) -> None: - if HIFI is None: - raise RuntimeError("HIFI provider unavailable for tidal metadata") + if Tidal is None: + raise RuntimeError("Tidal provider unavailable for tidal metadata") super().__init__(config) - self._provider = HIFI(self.config) + self._provider = Tidal(self.config) def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: normalized = str(query or "").strip() diff --git a/Provider/soulseek.py b/Provider/soulseek.py index ec38226..054cfc9 100644 --- a/Provider/soulseek.py +++ b/Provider/soulseek.py @@ -236,7 +236,7 @@ class Soulseek(Provider): if not stage_is_last: return False - # If we wanted to handle drill-down (like HIFI.py) we would: + # If we wanted to handle drill-down (like Tidal.py) we would: # 1. Fetch more data (e.g. user shares) # 2. Create a new ResultTable # 3. ctx.set_current_stage_table(new_table) diff --git a/Provider/tidal_shared.py b/Provider/tidal_shared.py deleted file mode 100644 index 44b3a5d..0000000 --- a/Provider/tidal_shared.py +++ /dev/null @@ -1,109 +0,0 @@ -from __future__ import annotations - -from typing import Any, Dict, List, Optional, Set - - -def stringify(value: Any) -> str: - text = str(value or "").strip() - return text - - -def extract_artists(item: Dict[str, Any]) -> List[str]: - names: List[str] = [] - artists = item.get("artists") - if isinstance(artists, list): - for artist in artists: - if isinstance(artist, dict): - name = stringify(artist.get("name")) - if name and name not in names: - names.append(name) - if not names: - primary = item.get("artist") - if isinstance(primary, dict): - name = stringify(primary.get("name")) - if name: - names.append(name) - return names - - -def build_track_tags(metadata: Dict[str, Any]) -> Set[str]: - tags: Set[str] = {"tidal"} - - audio_quality = stringify(metadata.get("audioQuality")) - if audio_quality: - tags.add(f"quality:{audio_quality.lower()}") - - media_md = metadata.get("mediaMetadata") - if isinstance(media_md, dict): - tag_values = media_md.get("tags") or [] - for tag in tag_values: - if isinstance(tag, str): - candidate = tag.strip() - if candidate: - tags.add(candidate.lower()) - - title_text = stringify(metadata.get("title")) - if title_text: - tags.add(f"title:{title_text}") - - artists = extract_artists(metadata) - for artist in artists: - artist_clean = stringify(artist) - if artist_clean: - tags.add(f"artist:{artist_clean}") - - album_title = "" - album_obj = metadata.get("album") - if isinstance(album_obj, dict): - album_title = stringify(album_obj.get("title")) - else: - album_title = stringify(metadata.get("album")) - if album_title: - tags.add(f"album:{album_title}") - - track_no_val = metadata.get("trackNumber") or metadata.get("track_number") - if track_no_val is not None: - try: - track_int = int(track_no_val) - if track_int > 0: - tags.add(f"track:{track_int}") - except Exception: - track_text = stringify(track_no_val) - if track_text: - tags.add(f"track:{track_text}") - - return tags - - -def coerce_duration_seconds(value: Any) -> Optional[int]: - candidates = [value] - try: - if isinstance(value, dict): - for key in ( - "duration", - "durationSeconds", - "duration_sec", - "duration_ms", - "durationMillis", - ): - if key in value: - candidates.append(value.get(key)) - except Exception: - pass - - for cand in candidates: - try: - if cand is None: - continue - text = str(cand).strip() - if text.lower().endswith("ms"): - text = text[:-2].strip() - num = float(text) - if num <= 0: - continue - if num > 10_000: - num = num / 1000.0 - return int(round(num)) - except Exception: - continue - return None diff --git a/ProviderCore/base.py b/ProviderCore/base.py index 48d47ce..4a173dd 100644 --- a/ProviderCore/base.py +++ b/ProviderCore/base.py @@ -131,7 +131,7 @@ class Provider(ABC): # # Example: # TABLE_AUTO_STAGES = {"youtube": ["download-file"]} - # TABLE_AUTO_PREFIXES = {"hifi": ["download-file"]} # matches hifi.* + # TABLE_AUTO_PREFIXES = {"tidal": ["download-file"]} # matches tidal.* TABLE_AUTO_STAGES: Dict[str, Sequence[str]] = {} TABLE_AUTO_PREFIXES: Dict[str, Sequence[str]] = {} AUTO_STAGE_USE_SELECTION_ARGS: bool = False diff --git a/ProviderCore/registry.py b/ProviderCore/registry.py index 8c6999c..c05839b 100644 --- a/ProviderCore/registry.py +++ b/ProviderCore/registry.py @@ -69,11 +69,11 @@ class ProviderRegistry: if override_name: _add(override_name) else: + # Use class name as the primary canonical name + _add(getattr(provider_class, "__name__", None)) _add(getattr(provider_class, "PROVIDER_NAME", None)) _add(getattr(provider_class, "NAME", None)) - _add(getattr(provider_class, "__name__", None)) - for alias in getattr(provider_class, "PROVIDER_ALIASES", ()) or (): _add(alias) @@ -193,9 +193,23 @@ class ProviderRegistry: def has_name(self, name: str) -> bool: return self.get(name) is not None + def _sync_subclasses(self) -> None: + """Walk all Provider subclasses in memory and register them.""" + def _walk(cls: Type[Provider]) -> None: + for sub in cls.__subclasses__(): + if sub in {SearchProvider, FileProvider}: + _walk(sub) + continue + try: + self.register(sub) + except Exception: + pass + _walk(sub) + _walk(Provider) REGISTRY = ProviderRegistry("Provider") REGISTRY.discover() +REGISTRY._sync_subclasses() def register_provider( @@ -382,7 +396,7 @@ def match_provider_name_for_url(url: str) -> Optional[str]: dom = dom_raw.lower() if not dom: continue - if dom.startswith("magnet:") or dom.startswith("http://") or dom.startswith("https://"): + if "://" in dom or dom.startswith("magnet:"): if raw_url_lower.startswith(dom): return info.canonical_name continue diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index 4989d96..67e55df 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -2499,7 +2499,7 @@ def resolve_tidal_manifest_path(item: Any) -> Optional[str]: raw_manifest = metadata.get("manifest") if not raw_manifest: - # When piping directly from the HIFI search table, we may only have a track id. + # When piping directly from the Tidal search table, we may only have a track id. # Fetch track details from the proxy so downstream stages can decode the manifest. try: already = bool(metadata.get("_tidal_track_details_fetched")) @@ -2518,7 +2518,7 @@ def resolve_tidal_manifest_path(item: Any) -> Optional[str]: if candidate_path: m = re.search( - r"hifi:(?://)?track[\\/](\d+)", + r"tidal:(?://)?track[\\/](\d+)", str(candidate_path), flags=re.IGNORECASE, ) @@ -2626,7 +2626,7 @@ def resolve_tidal_manifest_path(item: Any) -> Optional[str]: except Exception: pass log( - f"[hifi] JSON manifest for track {metadata.get('trackId') or metadata.get('id')} had no playable urls", + f"[tidal] JSON manifest for track {metadata.get('trackId') or metadata.get('id')} had no playable urls", file=sys.stderr, ) except Exception as exc: @@ -2637,7 +2637,7 @@ def resolve_tidal_manifest_path(item: Any) -> Optional[str]: except Exception: pass log( - f"[hifi] Failed to parse JSON manifest for track {metadata.get('trackId') or metadata.get('id')}: {exc}", + f"[tidal] Failed to parse JSON manifest for track {metadata.get('trackId') or metadata.get('id')}: {exc}", file=sys.stderr, ) return None @@ -2658,7 +2658,7 @@ def resolve_tidal_manifest_path(item: Any) -> Optional[str]: pass try: log( - f"[hifi] Decoded manifest is not an MPD XML for track {metadata.get('trackId') or metadata.get('id')} (mime {manifest_mime or 'unknown'})", + f"[tidal] Decoded manifest is not an MPD XML for track {metadata.get('trackId') or metadata.get('id')} (mime {manifest_mime or 'unknown'})", file=sys.stderr, ) except Exception: @@ -2681,13 +2681,13 @@ def resolve_tidal_manifest_path(item: Any) -> Optional[str]: # Persist as .mpd for DASH manifests. ext = "mpd" - manifest_dir = Path(tempfile.gettempdir()) / "medeia" / "hifi" + manifest_dir = Path(tempfile.gettempdir()) / "medeia" / "tidal" try: manifest_dir.mkdir(parents=True, exist_ok=True) except Exception: pass - filename = f"hifi-{track_safe}-{identifier_safe[:24]}.{ext}" + filename = f"tidal-{track_safe}-{identifier_safe[:24]}.{ext}" target_path = manifest_dir / filename try: with open(target_path, "wb") as fh: diff --git a/cmdlet/add_file.py b/cmdlet/add_file.py index a4cd2a7..3ea7321 100644 --- a/cmdlet/add_file.py +++ b/cmdlet/add_file.py @@ -1050,7 +1050,7 @@ class Add_File(Cmdlet): "https://", "magnet:", "torrent:", - "hifi:", + "tidal:", "hydrus:")): log( "add-file ingests local files only. Use download-file first.", @@ -1067,7 +1067,7 @@ class Add_File(Cmdlet): "https://", "magnet:", "torrent:", - "hifi:", + "tidal:", "hydrus:")): log( "add-file ingests local files only. Use download-file first.", @@ -1088,7 +1088,7 @@ class Add_File(Cmdlet): "https://", "magnet:", "torrent:", - "hifi:", + "tidal:", "hydrus:")): log( "add-file ingests local files only. Use download-file first.", @@ -1214,7 +1214,7 @@ class Add_File(Cmdlet): "https://", "magnet:", "torrent:", - "hifi:", + "tidal:", "hydrus:")): log("add-file ingests local files only.", file=sys.stderr) return False diff --git a/cmdlet/download_file.py b/cmdlet/download_file.py index 4ff3e17..05eedfc 100644 --- a/cmdlet/download_file.py +++ b/cmdlet/download_file.py @@ -931,7 +931,8 @@ class Download_File(Cmdlet): pass transfer_label = label - if str(table or "").lower() == "hifi": + table_type = str(table or "").lower() + if table_type == "tidal" or table_type.startswith("tidal."): try: progress.begin_transfer(label=transfer_label, total=None) except Exception: @@ -943,7 +944,9 @@ class Download_File(Cmdlet): provider_sr = None provider_obj = None if table and get_search_provider and SearchResult: - provider_obj = get_search_provider(str(table), config) + # Strip sub-table suffix (e.g. tidal.track -> tidal) to find the provider key + provider_key = str(table).split(".")[0] + provider_obj = get_search_provider(provider_key, config) if provider_obj is not None: attempted_provider_download = True sr = SearchResult( @@ -1160,14 +1163,7 @@ class Download_File(Cmdlet): pass # Allow providers to add/enrich tags and metadata during download. - if str(table or "").lower() == "libgen" and provider_sr is not None: - try: - sr_tags = getattr(provider_sr, "tag", None) - if tags_list is None and isinstance(sr_tags, set) and sr_tags: - tags_list = sorted([str(t) for t in sr_tags if t]) - except Exception: - pass - + if provider_sr is not None: try: sr_md = getattr(provider_sr, "full_metadata", None) if isinstance(sr_md, dict) and sr_md: @@ -1183,6 +1179,15 @@ class Download_File(Cmdlet): except Exception: pass + # Prefer tags from the search result object if the provider mutated them during download. + try: + sr_tags = getattr(provider_sr, "tag", None) + if isinstance(sr_tags, (set, list)) and sr_tags: + # Re-sync tags_list with the potentially enriched provider_sr.tag + tags_list = sorted([str(t) for t in sr_tags if t]) + except Exception: + pass + self._emit_local_file( downloaded_path=downloaded_path, source=str(target) if target else None, @@ -1201,7 +1206,8 @@ class Download_File(Cmdlet): except Exception as e: log(f"Error downloading item: {e}", file=sys.stderr) finally: - if str(table or "").lower() == "hifi": + table_type = str(table or "").lower() + if table_type == "tidal" or table_type.startswith("tidal."): try: progress.finish_transfer(label=transfer_label) except Exception: diff --git a/cmdlet/search_file.py b/cmdlet/search_file.py index 4afabd7..ea9a26f 100644 --- a/cmdlet/search_file.py +++ b/cmdlet/search_file.py @@ -66,8 +66,8 @@ class search_file(Cmdlet): CmdletArg( "provider", type="string", - description= - "External provider name: bandcamp, libgen, soulseek, youtube, alldebrid, loc, internetarchive, hifi", + description="External provider name (e.g., tidal, youtube, soulseek, etc)", + choices=["bandcamp", "libgen", "soulseek", "youtube", "alldebrid", "loc", "internetarchive", "tidal", "tidal"], ), CmdletArg( "open", @@ -116,7 +116,7 @@ class search_file(Cmdlet): return ext[:5] @staticmethod - def _get_hifi_view_from_query(query: str) -> str: + def _get_tidal_view_from_query(query: str) -> str: text = str(query or "").strip() if not text: return "track" @@ -303,10 +303,10 @@ class search_file(Cmdlet): preserve_order = provider_lower in {"youtube", "openlibrary", "loc", "torrent"} table_type = provider_name table_meta: Dict[str, Any] = {"provider": provider_name} - if provider_lower == "hifi": - view = self._get_hifi_view_from_query(query) + if provider_lower == "tidal": + view = self._get_tidal_view_from_query(query) table_meta["view"] = view - table_type = f"hifi.{view}" + table_type = f"tidal.{view}" elif provider_lower == "internetarchive": # Internet Archive search results are effectively folders (items); selecting @N # should open a list of downloadable files for the chosen item. @@ -339,10 +339,10 @@ class search_file(Cmdlet): results = provider.search(query, limit=limit, filters=search_filters or None) debug(f"[search-file] {provider_name} -> {len(results or [])} result(s)") - # HIFI artist UX: if there is exactly one artist match, auto-expand + # Tidal artist UX: if there is exactly one artist match, auto-expand # directly to albums without requiring an explicit @1 selection. if ( - provider_lower == "hifi" + provider_lower == "tidal" and table_meta.get("view") == "artist" and isinstance(results, list) and len(results) == 1 @@ -372,7 +372,7 @@ class search_file(Cmdlet): if album_results: results = album_results - table_type = "hifi.album" + table_type = "tidal.album" table.set_table(table_type) table_meta["view"] = "album" try: diff --git a/cmdnat/pipe.py b/cmdnat/pipe.py index cfad3a3..dc81473 100644 --- a/cmdnat/pipe.py +++ b/cmdnat/pipe.py @@ -790,9 +790,9 @@ def _get_playable_path( if manifest_path: path = manifest_path else: - # If this is a hifi:// placeholder and we couldn't resolve a manifest, do not fall back. + # If this is a tidal:// placeholder and we couldn't resolve a manifest, do not fall back. try: - if isinstance(path, str) and path.strip().lower().startswith("hifi:"): + if isinstance(path, str) and path.strip().lower().startswith("tidal:"): try: meta = None if isinstance(item, dict): @@ -803,7 +803,7 @@ def _get_playable_path( print(str(meta.get("_tidal_manifest_error")), file=sys.stderr) except Exception: pass - print("HIFI selection has no playable DASH MPD manifest.", file=sys.stderr) + print("Tidal selection has no playable DASH MPD manifest.", file=sys.stderr) return None except Exception: pass diff --git a/search_results.txt b/search_results.txt new file mode 100644 index 0000000..8190310 --- /dev/null +++ b/search_results.txt @@ -0,0 +1,134 @@ + DEBUG: Calling hifi.search(filters={'artist': 'bonobo'}) + DEBUG: +┌─────────────────────────────── HTTP request ────────────────────────────────┐ +│ method GET │ +│ url https://triton.squid.wtf/search/ │ +│ attempt 1/3 │ +│ params {'s': '*'} │ +│ headers {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) │ +│ AppleWebKit/537.36'} │ +│ verify C:\Users\Admin\AppData\Local\Programs\Python\Python313\Li… │ +│ follow_redirects True │ +└─────────────────────────────────────────────────────────────────────────────┘ + DEBUG: +┌───────────────── HTTP response ─────────────────┐ +│ method GET │ +│ url https://triton.squid.wtf/search/ │ +│ status 200 │ +│ elapsed 0:00:00.592893 │ +│ content_length None │ +└─────────────────────────────────────────────────┘ + DEBUG: hifi -> 25 result(s) + +┌────────────────────────────────── Hifi: * ──────────────────────────────────┐ +│ │ +│ # TITLE DISC # TRACK # ALBUM ARTIST DURATI… QUALITY │ +│ ───────────────────────────────────────────────────────────────────────── │ +│ 1 (god 1 5 'n sync *nsync 4:43 lossle… │ +│ must │ +│ have │ +│ spent) │ +│ a │ +│ little │ +│ more │ +│ time on │ +│ you │ +│ 2 (god 1 4 the *nsync 4:01 lossle… │ +│ must essent… │ +│ have *nsync │ +│ spent) │ +│ a │ +│ little │ +│ more │ +│ time on │ +│ you │ +│ 3 ***** 1 15 the eminem, 4:51 lossle… │ +│ please marsha… dr. │ +│ ii mathers dre, │ +│ lp snoop │ +│ dogg, │ +│ nate │ +│ dogg, │ +│ xzibit │ +│ 4 *****s… 1 1 shut up jessie 2:23 lossle… │ +│ up***** reyez, │ +│ big │ +│ sean │ +│ 5 ***fla… 1 11 beyoncé beyonc… 4:11 lossle… │ +│ (feat. chimam… │ +│ chimam… ngozi │ +│ ngozi adichie │ +│ adichi… │ +│ 6 **jean… 1 1 jeans jessie 3:15 lossle… │ +│ reyez, │ +│ miguel │ +│ 7 *equip 1 2 you'll hot 2:46 lossle… │ +│ sungla… be fine mullig… │ +│ 8 better 1 1 better *nsync, 3:37 lossle… │ +│ place place justin │ +│ (from (from timber… │ +│ trolls trolls │ +│ band band │ +│ togeth… togeth… │ +│ 9 bring 1 8 blaque blaque, 3:38 lossle… │ +│ it all *nsync │ +│ to me │ +│ (feat. │ +│ *nsync) │ +│ 10 bye bye 1 9 the *nsync 3:20 lossle… │ +│ bye essent… │ +│ *nsync │ +│ 11 girlfr… 1 4 celebr… *nsync 4:14 lossle… │ +│ 12 girlfr… 1 16 the *nsync, 4:45 lossle… │ +│ (feat. essent… nelly │ +│ nelly) *nsync │ +│ 13 gone 1 6 celebr… *nsync 4:52 lossle… │ +│ 14 here we 1 3 'n sync *nsync 3:36 lossle… │ +│ go │ +│ 15 i want 1 2 the *nsync 3:20 lossle… │ +│ you essent… │ +│ back *nsync │ +│ 16 it 1 5 no *nsync 3:26 lossle… │ +│ makes strings │ +│ me ill attach… │ +│ 17 it's 1 2 no *nsync 3:12 lossle… │ +│ gonna strings │ +│ be me attach… │ +│ 18 just 1 4 no *nsync 4:09 lossle… │ +│ got strings │ +│ paid attach… │ +│ 19 merry 1 4 home *nsync 4:15 lossle… │ +│ christ… for │ +│ happy christ… │ +│ holida… │ +│ 20 no 1 7 no *nsync 3:49 lossle… │ +│ strings strings │ +│ attach… attach… │ +│ 21 pop 1 1 celebr… *nsync 3:58 lossle… │ +│ 22 space 1 3 no *nsync, 4:22 lossle… │ +│ cowboy strings lisa │ +│ (yippi… attach… "left │ +│ (feat. eye" │ +│ lisa lopes │ +│ "left │ +│ eye" │ +│ lopes) │ +│ 23 tearin' 1 3 the *nsync 3:29 lossle… │ +│ up my essent… │ +│ heart *nsync │ +│ 24 thinki… 1 5 the *nsync 3:58 lossle… │ +│ of you essent… │ +│ (i *nsync │ +│ drive │ +│ myself │ +│ crazy) │ +│ 25 this i 1 6 no *nsync 4:45 lossle… │ +│ promise strings │ +│ you attach… │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ +Usage: CLI.py [OPTIONS] COMMAND [ARGS]... +Try 'CLI.py --help' for help. +┌─ Error ─────────────────────────────────────────────────────────────────────┐ +│ No such command '@1'. │ +└─────────────────────────────────────────────────────────────────────────────┘