from __future__ import annotations import re import shutil import sys from pathlib import Path import subprocess from typing import Any, Dict, List, Optional, Tuple from API.hifi import HifiApiClient from ProviderCore.base import Provider, SearchResult from SYS.logger import debug, log DEFAULT_API_URLS = ( "https://tidal-api.binimum.org", ) _KEY_TO_PARAM: Dict[str, str] = { "album": "al", "artist": "a", "playlist": "p", "video": "v", "song": "s", "track": "s", "title": "s", } _DELIMITERS_RE = re.compile(r"[;,]") _SEGMENT_BOUNDARY_RE = re.compile(r"(?=\b\w+\s*:)") class HIFI(Provider): TABLE_AUTO_PREFIXES = { "hifi": ["download-file"], } """Provider that targets the HiFi-RestAPI (Tidal proxy) search endpoint. The CLI can supply a list of fail-over URLs via ``provider.hifi.api_urls`` or ``provider.hifi.api_url`` in the config. When not configured, it defaults to https://tidal-api.binimum.org. """ def __init__(self, config: Optional[Dict[str, Any]] = None) -> None: super().__init__(config) self.api_urls = self._resolve_api_urls() try: self.api_timeout = float(self.config.get("timeout", 10.0)) except Exception: self.api_timeout = 10.0 self.api_clients = [HifiApiClient(base_url=url, timeout=self.api_timeout) for url in self.api_urls] def validate(self) -> bool: return bool(self.api_urls) def search( self, query: str, limit: int = 50, filters: Optional[Dict[str, Any]] = None, **_kwargs: Any, ) -> List[SearchResult]: if limit <= 0: return [] params = self._build_search_params(query) if not params: return [] payload: Optional[Dict[str, Any]] = None for base in self.api_urls: endpoint = f"{base.rstrip('/')}/search/" try: client = self._get_api_client_for_base(base) payload = client.search(params) if client else None if payload is not None: break except Exception as exc: log(f"[hifi] Search failed for {endpoint}: {exc}", file=sys.stderr) continue if not payload: return [] data = payload.get("data") or {} items = self._extract_track_items(data) results: List[SearchResult] = [] for item in items: if limit and len(results) >= limit: break result = self._item_to_result(item) if result is not None: results.append(result) return results[:limit] @staticmethod def _safe_filename(value: Any, *, fallback: str = "hifi") -> str: text = str(value or "").strip() if not text: return fallback text = re.sub(r"[<>:\"/\\|?*\x00-\x1f]", "_", text) text = re.sub(r"\s+", " ", text).strip().strip(". ") return text[:120] if text else fallback @staticmethod def _parse_track_id(value: Any) -> Optional[int]: if value is None: return None try: track_id = int(value) except Exception: return None return track_id if track_id > 0 else None def _extract_track_id_from_result(self, result: SearchResult) -> Optional[int]: md = getattr(result, "full_metadata", None) if isinstance(md, dict): track_id = self._parse_track_id(md.get("trackId") or md.get("id")) if track_id: return track_id path = str(getattr(result, "path", "") or "").strip() if path: m = re.search(r"hifi:(?://)?track[\\/](\d+)", path, flags=re.IGNORECASE) if m: return self._parse_track_id(m.group(1)) return None @staticmethod def _find_ffmpeg() -> Optional[str]: exe = shutil.which("ffmpeg") if exe: return exe try: repo_root = Path(__file__).resolve().parents[1] bundled = repo_root / "MPV" / "ffmpeg" / "bin" / "ffmpeg.exe" if bundled.is_file(): return str(bundled) except Exception: pass return None @staticmethod def _find_ffprobe() -> Optional[str]: exe = shutil.which("ffprobe") if exe: return exe try: repo_root = Path(__file__).resolve().parents[1] bundled = repo_root / "MPV" / "ffmpeg" / "bin" / "ffprobe.exe" if bundled.is_file(): return str(bundled) except Exception: pass return None def _probe_audio_codec(self, input_ref: str) -> Optional[str]: """Best-effort probe for primary audio codec name (lowercase).""" candidate = str(input_ref or "").strip() if not candidate: return None ffprobe_path = self._find_ffprobe() if ffprobe_path: cmd = [ ffprobe_path, "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_name", "-of", "default=nw=1:nk=1", candidate, ] try: proc = subprocess.run( cmd, capture_output=True, text=True, check=False, ) if proc.returncode == 0: codec = str(proc.stdout or "").strip().lower() if codec: return codec except Exception: pass # Fallback: parse `ffmpeg -i` stream info. ffmpeg_path = self._find_ffmpeg() if not ffmpeg_path: return None try: proc = subprocess.run( [ffmpeg_path, "-hide_banner", "-i", candidate], capture_output=True, text=True, check=False, ) text = (proc.stderr or "") + "\n" + (proc.stdout or "") m = re.search(r"Audio:\s*([A-Za-z0-9_]+)", text) if m: return str(m.group(1)).strip().lower() except Exception: pass return None @staticmethod def _preferred_audio_suffix(codec: Optional[str], metadata: Optional[Dict[str, Any]] = None) -> str: c = str(codec or "").strip().lower() if c == "flac": return ".flac" if c in {"aac", "alac"}: return ".m4a" # Default to Matroska Audio for unknown / uncommon codecs. return ".mka" @staticmethod def _has_nonempty_file(path: Path) -> bool: try: return path.is_file() and path.stat().st_size > 0 except Exception: return False def _ffmpeg_demux_to_audio( self, *, input_ref: str, output_path: Path, lossless_fallback: bool = True, ) -> Optional[Path]: ffmpeg_path = self._find_ffmpeg() if not ffmpeg_path: debug("[hifi] ffmpeg not found; cannot materialize audio from MPD") return None if self._has_nonempty_file(output_path): return output_path try: output_path.parent.mkdir(parents=True, exist_ok=True) except Exception: pass protocol_whitelist = "file,https,http,tcp,tls,crypto,data" def _run(cmd: List[str]) -> bool: try: proc = subprocess.run( cmd, capture_output=True, text=True, check=False, ) if proc.returncode == 0 and self._has_nonempty_file(output_path): return True if proc.stderr: debug(f"[hifi] ffmpeg failed: {proc.stderr.strip()}") except Exception as exc: debug(f"[hifi] ffmpeg invocation failed: {exc}") return False # Prefer remux (fast, no transcode). cmd_copy = [ ffmpeg_path, "-y", "-hide_banner", "-loglevel", "error", "-protocol_whitelist", protocol_whitelist, "-i", str(input_ref), "-vn", "-c", "copy", str(output_path), ] if _run(cmd_copy): return output_path if not lossless_fallback: return None # Fallback: decode/transcode to FLAC to guarantee a supported file. flac_path = ( output_path if output_path.suffix.lower() == ".flac" else output_path.with_suffix(".flac") ) if self._has_nonempty_file(flac_path): return flac_path # Avoid leaving a partial FLAC behind if we're transcoding into the final name. tmp_flac_path = flac_path if flac_path == output_path: tmp_flac_path = output_path.with_name(f"{output_path.stem}.tmp{output_path.suffix}") cmd_flac = [ ffmpeg_path, "-y", "-hide_banner", "-loglevel", "error", "-protocol_whitelist", protocol_whitelist, "-i", str(input_ref), "-vn", "-c:a", "flac", str(tmp_flac_path), ] try: proc = subprocess.run( cmd_flac, capture_output=True, text=True, check=False, ) if proc.returncode == 0 and self._has_nonempty_file(tmp_flac_path): if tmp_flac_path != flac_path: try: tmp_flac_path.replace(flac_path) except Exception: # If rename fails, still return the temp file. return tmp_flac_path return flac_path if proc.stderr: debug(f"[hifi] ffmpeg flac fallback failed: {proc.stderr.strip()}") except Exception as exc: debug(f"[hifi] ffmpeg flac fallback invocation failed: {exc}") return None def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]: """Materialize a playable audio file from a Tidal DASH manifest.""" try: output_dir.mkdir(parents=True, exist_ok=True) except Exception: pass raw_path = str(getattr(result, "path", "") or "").strip() md: Dict[str, Any] = {} if isinstance(getattr(result, "full_metadata", None), dict): md = dict(getattr(result, "full_metadata") or {}) if not md.get("manifest"): track_id = self._extract_track_id_from_result(result) if track_id: detail = self._fetch_track_details(track_id) if isinstance(detail, dict) and detail: try: md.update(detail) except Exception: md = detail # Best-effort: fetch synced lyric subtitles for MPV (LRC). try: track_id_for_lyrics = self._extract_track_id_from_result(result) except Exception: track_id_for_lyrics = None if track_id_for_lyrics and not md.get("_tidal_lyrics_subtitles"): lyr = self._fetch_track_lyrics(track_id_for_lyrics) if isinstance(lyr, dict) and lyr: try: md.setdefault("lyrics", lyr) except Exception: pass try: subtitles = lyr.get("subtitles") if isinstance(subtitles, str) and subtitles.strip(): md["_tidal_lyrics_subtitles"] = subtitles.strip() except Exception: pass # Ensure downstream cmdlets see our enriched metadata. try: if isinstance(getattr(result, "full_metadata", None), dict): result.full_metadata.update(md) else: result.full_metadata = md except Exception: pass try: from cmdlet._shared import resolve_tidal_manifest_path except Exception: return None resolved = resolve_tidal_manifest_path({"full_metadata": md, "path": raw_path, "title": getattr(result, "title", "")}) if not resolved: return None resolved_text = str(resolved).strip() if not resolved_text: return None track_id = self._extract_track_id_from_result(result) title_part = self._safe_filename(getattr(result, "title", None), fallback="hifi") hash_part = self._safe_filename(md.get("manifestHash"), fallback="") stem_parts = ["hifi"] if track_id: stem_parts.append(str(track_id)) if hash_part: stem_parts.append(hash_part[:12]) if title_part: stem_parts.append(title_part) stem = "-".join([p for p in stem_parts if p])[:180].rstrip("- ") codec = self._probe_audio_codec(resolved_text) suffix = self._preferred_audio_suffix(codec, md) # If resolve_tidal_manifest_path returned a URL, prefer feeding it directly to ffmpeg. if resolved_text.lower().startswith("http"): out_file = output_dir / f"{stem}{suffix}" materialized = self._ffmpeg_demux_to_audio(input_ref=resolved_text, output_path=out_file) if materialized is not None: return materialized # As a fallback, try downloading the URL directly if it looks like a file. try: import httpx resp = httpx.get(resolved_text, timeout=float(getattr(self, "api_timeout", 10.0))) resp.raise_for_status() content = resp.content direct_path = output_dir / f"{stem}.bin" with open(direct_path, "wb") as fh: fh.write(content) return direct_path except Exception: return None try: source_path = Path(resolved_text) except Exception: return None if source_path.is_file() and source_path.suffix.lower() == ".mpd": # Materialize audio from the local MPD. out_file = output_dir / f"{stem}{suffix}" materialized = self._ffmpeg_demux_to_audio(input_ref=str(source_path), output_path=out_file) if materialized is not None: return materialized return None # If we somehow got a local audio file already, copy it to output_dir. if source_path.is_file() and source_path.suffix.lower() in {".m4a", ".mp3", ".flac", ".wav", ".mka", ".mp4"}: dest = output_dir / f"{stem}{source_path.suffix.lower()}" if self._has_nonempty_file(dest): return dest try: shutil.copyfile(source_path, dest) return dest except Exception: return None # As a last resort, attempt to treat the local path as an ffmpeg input. out_file = output_dir / f"{stem}{suffix}" materialized = self._ffmpeg_demux_to_audio(input_ref=resolved_text, output_path=out_file) return materialized def _get_api_client_for_base(self, base_url: str) -> Optional[HifiApiClient]: base = base_url.rstrip("/") for client in self.api_clients: if getattr(client, "base_url", "").rstrip("/") == base: return client return None def _extract_track_items(self, data: Any) -> List[Dict[str, Any]]: if isinstance(data, list): return [item for item in data if isinstance(item, dict)] if not isinstance(data, dict): return [] items: List[Dict[str, Any]] = [] direct = data.get("items") if isinstance(direct, list): items.extend(item for item in direct if isinstance(item, dict)) tracks_section = data.get("tracks") if isinstance(tracks_section, dict): track_items = tracks_section.get("items") if isinstance(track_items, list): items.extend(item for item in track_items if isinstance(item, dict)) top_hits = data.get("topHits") if isinstance(top_hits, list): for hit in top_hits: if not isinstance(hit, dict): continue hit_type = str(hit.get("type") or "").upper() if hit_type != "TRACKS": continue value = hit.get("value") if isinstance(value, dict): items.append(value) seen: set[int] = set() deduped: List[Dict[str, Any]] = [] for item in items: track_id = item.get("id") or item.get("trackId") if track_id is None: continue try: track_int = int(track_id) except Exception: track_int = None if track_int is None or track_int in seen: continue seen.add(track_int) deduped.append(item) return deduped def _resolve_api_urls(self) -> List[str]: urls: List[str] = [] raw = self.config.get("api_urls") if raw is None: raw = self.config.get("api_url") if isinstance(raw, (list, tuple)): urls.extend(str(item).strip() for item in raw if isinstance(item, str)) elif isinstance(raw, str): urls.append(raw.strip()) cleaned = [u.rstrip("/") for u in urls if isinstance(u, str) and u.strip()] if not cleaned: cleaned = [DEFAULT_API_URLS[0]] return cleaned def _build_search_params(self, query: str) -> Dict[str, str]: cleaned = str(query or "").strip() if not cleaned: return {} segments: List[str] = [] for chunk in _DELIMITERS_RE.split(cleaned): chunk = chunk.strip() if not chunk: continue if ":" in chunk: for sub in _SEGMENT_BOUNDARY_RE.split(chunk): part = sub.strip() if part: segments.append(part) else: segments.append(chunk) key_values: Dict[str, str] = {} free_text: List[str] = [] for segment in segments: if ":" not in segment: free_text.append(segment) continue key, value = segment.split(":", 1) key = key.strip().lower() value = value.strip().strip('"').strip("'") if value: key_values[key] = value params: Dict[str, str] = {} for key, value in key_values.items(): if not value: continue mapped = _KEY_TO_PARAM.get(key) if mapped: params[mapped] = value general = " ".join(part for part in free_text if part).strip() if general: params.setdefault("s", general) elif not params: params["s"] = cleaned return params @staticmethod def _format_duration(seconds: Any) -> str: try: total = int(seconds) if total < 0: return "" except Exception: return "" minutes, secs = divmod(total, 60) return f"{minutes}:{secs:02d}" @staticmethod def _stringify(value: Any) -> str: text = str(value or "").strip() return text @staticmethod def _extract_artists(item: Dict[str, Any]) -> List[str]: names: List[str] = [] artists = item.get("artists") if isinstance(artists, list): for artist in artists: if isinstance(artist, dict): name = str(artist.get("name") or "").strip() if name and name not in names: names.append(name) if not names: primary = item.get("artist") if isinstance(primary, dict): name = str(primary.get("name") or "").strip() if name: names.append(name) return names def _item_to_result(self, item: Dict[str, Any]) -> Optional[SearchResult]: if not isinstance(item, dict): return None title = str(item.get("title") or "").strip() if not title: return None identifier = item.get("id") if identifier is None: return None try: track_id = int(identifier) except (TypeError, ValueError): return None # Avoid tidal.com URLs entirely; selection will resolve to a decoded MPD. path = f"hifi://track/{track_id}" artists = self._extract_artists(item) artist_display = ", ".join(artists) album = item.get("album") album_title = "" if isinstance(album, dict): album_title = str(album.get("title") or "").strip() detail_parts: List[str] = [] if artist_display: detail_parts.append(artist_display) if album_title: detail_parts.append(album_title) detail = " | ".join(detail_parts) columns: List[tuple[str, str]] = [] if title: columns.append(("Title", title)) if album_title: columns.append(("Album", album_title)) if artist_display: columns.append(("Artist", artist_display)) duration_text = self._format_duration(item.get("duration")) if duration_text: columns.append(("Duration", duration_text)) audio_quality = str(item.get("audioQuality") or "").strip() if audio_quality: columns.append(("Quality", audio_quality)) tags = {"tidal"} if audio_quality: tags.add(f"quality:{audio_quality.lower()}") metadata = item.get("mediaMetadata") if isinstance(metadata, dict): tag_values = metadata.get("tags") or [] for tag in tag_values: if isinstance(tag, str) and tag.strip(): tags.add(tag.strip().lower()) return SearchResult( table="hifi", title=title, path=path, detail=detail, annotations=["tidal"], media_kind="audio", tag=tags, columns=columns, full_metadata=item, ) def _extract_track_selection_context( self, selected_items: List[Any] ) -> List[Tuple[int, str, str]]: contexts: List[Tuple[int, str, str]] = [] seen_ids: set[int] = set() for item in selected_items or []: payload: Dict[str, Any] = {} if isinstance(item, dict): payload = item else: try: payload = ( item.to_dict() if hasattr(item, "to_dict") and callable(getattr(item, "to_dict")) else {} ) except Exception: payload = {} if not payload: try: payload = { "title": getattr(item, "title", None), "path": getattr(item, "path", None), "url": getattr(item, "url", None), "full_metadata": getattr(item, "full_metadata", None), } except Exception: payload = {} meta = ( payload.get("full_metadata") if isinstance(payload.get("full_metadata"), dict) else payload ) if not isinstance(meta, dict): meta = {} raw_id = meta.get("trackId") or meta.get("id") or payload.get("id") if raw_id is None: continue try: track_id = int(raw_id) except (TypeError, ValueError): continue if track_id in seen_ids: continue seen_ids.add(track_id) title = ( payload.get("title") or meta.get("title") or payload.get("name") or payload.get("path") or payload.get("url") ) if not title: title = f"Track {track_id}" path = ( payload.get("path") or payload.get("url") or f"hifi://track/{track_id}" ) contexts.append((track_id, str(title).strip(), str(path).strip())) return contexts def _fetch_track_details(self, track_id: int) -> Optional[Dict[str, Any]]: if track_id <= 0: return None for base in self.api_urls: endpoint = f"{base.rstrip('/')}/track/" try: client = self._get_api_client_for_base(base) payload = client.track(track_id) if client else None data = payload.get("data") if isinstance(payload, dict) else None if isinstance(data, dict): return data except Exception as exc: log(f"[hifi] Track lookup failed for {endpoint}: {exc}", file=sys.stderr) continue return None def _fetch_track_lyrics(self, track_id: int) -> Optional[Dict[str, Any]]: if track_id <= 0: return None for base in self.api_urls: endpoint = f"{base.rstrip('/')}/lyrics/" try: client = self._get_api_client_for_base(base) payload = client.lyrics(track_id) if client else None if not isinstance(payload, dict): continue lyrics_obj = payload.get("lyrics") if isinstance(lyrics_obj, dict) and lyrics_obj: return lyrics_obj data_obj = payload.get("data") if isinstance(data_obj, dict) and data_obj: return data_obj except Exception as exc: debug(f"[hifi] Lyrics lookup failed for {endpoint}: {exc}") continue return None def _build_track_columns(self, detail: Dict[str, Any], track_id: int) -> List[Tuple[str, str]]: values: List[Tuple[str, str]] = [ ("Track ID", str(track_id)), ("Quality", self._stringify(detail.get("audioQuality"))), ("Mode", self._stringify(detail.get("audioMode"))), ("Asset", self._stringify(detail.get("assetPresentation"))), ("Manifest Type", self._stringify(detail.get("manifestMimeType"))), ("Manifest Hash", self._stringify(detail.get("manifestHash"))), ("Bit Depth", self._stringify(detail.get("bitDepth"))), ("Sample Rate", self._stringify(detail.get("sampleRate"))), ] return [(name, value) for name, value in values if value] def selector( self, selected_items: List[Any], *, ctx: Any, stage_is_last: bool = True, **_kwargs: Any, ) -> bool: if not stage_is_last: return False try: current_table = ctx.get_current_stage_table() except Exception: current_table = None table_type = ( current_table.table if current_table and hasattr(current_table, "table") else None ) if isinstance(table_type, str) and table_type.strip().lower() == "hifi.track": try: meta = ( current_table.get_table_metadata() if current_table is not None and hasattr(current_table, "get_table_metadata") else {} ) except Exception: meta = {} if isinstance(meta, dict) and meta.get("resolved_manifest"): return False contexts = self._extract_track_selection_context(selected_items) if not contexts: return False track_details: List[Tuple[int, str, str, Dict[str, Any]]] = [] for track_id, title, path in contexts: detail = self._fetch_track_details(track_id) if detail: track_details.append((track_id, title, path, detail)) if not track_details: return False try: from SYS.rich_display import stdout_console from SYS.result_table import ResultTable except Exception: return False table = ResultTable("HIFI Track").set_preserve_order(True) table.set_table("hifi.track") try: table.set_table_metadata({"provider": "hifi", "view": "track", "resolved_manifest": True}) except Exception: pass results_payload: List[Dict[str, Any]] = [] for track_id, title, path, detail in track_details: # Decode the DASH MPD manifest to a local file and use it as the selectable/playable path. try: from cmdlet._shared import resolve_tidal_manifest_path manifest_path = resolve_tidal_manifest_path( {"full_metadata": detail, "path": f"hifi://track/{track_id}"} ) except Exception: manifest_path = None resolved_path = str(manifest_path) if manifest_path else f"hifi://track/{track_id}" artists = self._extract_artists(detail) artist_display = ", ".join(artists) if artists else "" columns = self._build_track_columns(detail, track_id) if artist_display: columns.insert(1, ("Artist", artist_display)) album = detail.get("album") if isinstance(album, dict): album_title = self._stringify(album.get("title")) else: album_title = self._stringify(detail.get("album")) if album_title: insert_pos = 2 if artist_display else 1 columns.insert(insert_pos, ("Album", album_title)) result = SearchResult( table="hifi", title=title, path=resolved_path, detail=f"id:{track_id}", annotations=["tidal", "track"], media_kind="audio", columns=columns, full_metadata=detail, ) table.add_result(result) try: results_payload.append(result.to_dict()) except Exception: results_payload.append({ "table": "hifi.track", "title": result.title, "path": result.path, }) try: ctx.set_last_result_table(table, results_payload) ctx.set_current_stage_table(table) except Exception: pass try: stdout_console().print() stdout_console().print(table) except Exception: pass return True