from __future__ import annotations import re import sys from typing import Any, Dict, List, Optional, Tuple from API.hifi import HifiApiClient from ProviderCore.base import Provider, SearchResult from SYS.logger import log DEFAULT_API_URLS = ( "https://tidal-api.binimum.org", ) _KEY_TO_PARAM: Dict[str, str] = { "album": "al", "artist": "a", "playlist": "p", "video": "v", "song": "s", "track": "s", "title": "s", } _DELIMITERS_RE = re.compile(r"[;,]") _SEGMENT_BOUNDARY_RE = re.compile(r"(?=\b\w+\s*:)") class HIFI(Provider): """Provider that targets the HiFi-RestAPI (Tidal proxy) search endpoint. The CLI can supply a list of fail-over URLs via ``provider.hifi.api_urls`` or ``provider.hifi.api_url`` in the config. When not configured, it defaults to https://tidal-api.binimum.org. """ def __init__(self, config: Optional[Dict[str, Any]] = None) -> None: super().__init__(config) self.api_urls = self._resolve_api_urls() try: self.api_timeout = float(self.config.get("timeout", 10.0)) except Exception: self.api_timeout = 10.0 self.api_clients = [HifiApiClient(base_url=url, timeout=self.api_timeout) for url in self.api_urls] def validate(self) -> bool: return bool(self.api_urls) def search( self, query: str, limit: int = 50, filters: Optional[Dict[str, Any]] = None, **_kwargs: Any, ) -> List[SearchResult]: if limit <= 0: return [] params = self._build_search_params(query) if not params: return [] payload: Optional[Dict[str, Any]] = None for base in self.api_urls: endpoint = f"{base.rstrip('/')}/search/" try: client = self._get_api_client_for_base(base) payload = client.search(params) if client else None if payload is not None: break except Exception as exc: log(f"[hifi] Search failed for {endpoint}: {exc}", file=sys.stderr) continue if not payload: return [] data = payload.get("data") or {} items = self._extract_track_items(data) results: List[SearchResult] = [] for item in items: if limit and len(results) >= limit: break result = self._item_to_result(item) if result is not None: results.append(result) return results[:limit] def _get_api_client_for_base(self, base_url: str) -> Optional[HifiApiClient]: base = base_url.rstrip("/") for client in self.api_clients: if getattr(client, "base_url", "").rstrip("/") == base: return client return None def _extract_track_items(self, data: Any) -> List[Dict[str, Any]]: if isinstance(data, list): return [item for item in data if isinstance(item, dict)] if not isinstance(data, dict): return [] items: List[Dict[str, Any]] = [] direct = data.get("items") if isinstance(direct, list): items.extend(item for item in direct if isinstance(item, dict)) tracks_section = data.get("tracks") if isinstance(tracks_section, dict): track_items = tracks_section.get("items") if isinstance(track_items, list): items.extend(item for item in track_items if isinstance(item, dict)) top_hits = data.get("topHits") if isinstance(top_hits, list): for hit in top_hits: if not isinstance(hit, dict): continue hit_type = str(hit.get("type") or "").upper() if hit_type != "TRACKS": continue value = hit.get("value") if isinstance(value, dict): items.append(value) seen: set[int] = set() deduped: List[Dict[str, Any]] = [] for item in items: track_id = item.get("id") or item.get("trackId") try: track_int = int(track_id) except Exception: track_int = None if track_int is None or track_int in seen: continue seen.add(track_int) deduped.append(item) return deduped def _resolve_api_urls(self) -> List[str]: urls: List[str] = [] raw = self.config.get("api_urls") if raw is None: raw = self.config.get("api_url") if isinstance(raw, (list, tuple)): urls.extend(str(item).strip() for item in raw if isinstance(item, str)) elif isinstance(raw, str): urls.append(raw.strip()) cleaned = [u.rstrip("/") for u in urls if isinstance(u, str) and u.strip()] if not cleaned: cleaned = [DEFAULT_API_URLS[0]] return cleaned def _build_search_params(self, query: str) -> Dict[str, str]: cleaned = str(query or "").strip() if not cleaned: return {} segments: List[str] = [] for chunk in _DELIMITERS_RE.split(cleaned): chunk = chunk.strip() if not chunk: continue if ":" in chunk: for sub in _SEGMENT_BOUNDARY_RE.split(chunk): part = sub.strip() if part: segments.append(part) else: segments.append(chunk) key_values: Dict[str, str] = {} free_text: List[str] = [] for segment in segments: if ":" not in segment: free_text.append(segment) continue key, value = segment.split(":", 1) key = key.strip().lower() value = value.strip().strip('"').strip("'") if value: key_values[key] = value params: Dict[str, str] = {} for key, value in key_values.items(): if not value: continue mapped = _KEY_TO_PARAM.get(key) if mapped: params[mapped] = value general = " ".join(part for part in free_text if part).strip() if general: params.setdefault("s", general) elif not params: params["s"] = cleaned return params @staticmethod def _format_duration(seconds: Any) -> str: try: total = int(seconds) if total < 0: return "" except Exception: return "" minutes, secs = divmod(total, 60) return f"{minutes}:{secs:02d}" @staticmethod def _stringify(value: Any) -> str: text = str(value or "").strip() return text @staticmethod def _extract_artists(item: Dict[str, Any]) -> List[str]: names: List[str] = [] artists = item.get("artists") if isinstance(artists, list): for artist in artists: if isinstance(artist, dict): name = str(artist.get("name") or "").strip() if name and name not in names: names.append(name) if not names: primary = item.get("artist") if isinstance(primary, dict): name = str(primary.get("name") or "").strip() if name: names.append(name) return names def _item_to_result(self, item: Dict[str, Any]) -> Optional[SearchResult]: if not isinstance(item, dict): return None title = str(item.get("title") or "").strip() if not title: return None identifier = item.get("id") if identifier is None: return None try: track_id = int(identifier) except (TypeError, ValueError): return None # Avoid tidal.com URLs entirely; selection will resolve to a decoded MPD. path = f"hifi://track/{track_id}" artists = self._extract_artists(item) artist_display = ", ".join(artists) album = item.get("album") album_title = "" if isinstance(album, dict): album_title = str(album.get("title") or "").strip() detail_parts: List[str] = [] if artist_display: detail_parts.append(artist_display) if album_title: detail_parts.append(album_title) detail = " | ".join(detail_parts) columns: List[tuple[str, str]] = [] if title: columns.append(("Title", title)) if album_title: columns.append(("Album", album_title)) if artist_display: columns.append(("Artist", artist_display)) duration_text = self._format_duration(item.get("duration")) if duration_text: columns.append(("Duration", duration_text)) audio_quality = str(item.get("audioQuality") or "").strip() if audio_quality: columns.append(("Quality", audio_quality)) tags = {"tidal"} if audio_quality: tags.add(f"quality:{audio_quality.lower()}") metadata = item.get("mediaMetadata") if isinstance(metadata, dict): tag_values = metadata.get("tags") or [] for tag in tag_values: if isinstance(tag, str) and tag.strip(): tags.add(tag.strip().lower()) return SearchResult( table="hifi", title=title, path=path, detail=detail, annotations=["tidal"], media_kind="audio", tag=tags, columns=columns, full_metadata=item, ) def _extract_track_selection_context( self, selected_items: List[Any] ) -> List[Tuple[int, str, str]]: contexts: List[Tuple[int, str, str]] = [] seen_ids: set[int] = set() for item in selected_items or []: payload: Dict[str, Any] = {} if isinstance(item, dict): payload = item else: try: payload = ( item.to_dict() if hasattr(item, "to_dict") and callable(getattr(item, "to_dict")) else {} ) except Exception: payload = {} if not payload: try: payload = { "title": getattr(item, "title", None), "path": getattr(item, "path", None), "url": getattr(item, "url", None), "full_metadata": getattr(item, "full_metadata", None), } except Exception: payload = {} meta = ( payload.get("full_metadata") if isinstance(payload.get("full_metadata"), dict) else payload ) if not isinstance(meta, dict): meta = {} raw_id = meta.get("trackId") or meta.get("id") or payload.get("id") if raw_id is None: continue try: track_id = int(raw_id) except (TypeError, ValueError): continue if track_id in seen_ids: continue seen_ids.add(track_id) title = ( payload.get("title") or meta.get("title") or payload.get("name") or payload.get("path") or payload.get("url") ) if not title: title = f"Track {track_id}" path = ( payload.get("path") or payload.get("url") or f"hifi://track/{track_id}" ) contexts.append((track_id, str(title).strip(), str(path).strip())) return contexts def _fetch_track_details(self, track_id: int) -> Optional[Dict[str, Any]]: if track_id <= 0: return None for base in self.api_urls: endpoint = f"{base.rstrip('/')}/track/" try: client = self._get_api_client_for_base(base) payload = client.track(track_id) if client else None data = payload.get("data") if isinstance(payload, dict) else None if isinstance(data, dict): return data except Exception as exc: log(f"[hifi] Track lookup failed for {endpoint}: {exc}", file=sys.stderr) continue return None def _build_track_columns(self, detail: Dict[str, Any], track_id: int) -> List[Tuple[str, str]]: values: List[Tuple[str, str]] = [ ("Track ID", str(track_id)), ("Quality", self._stringify(detail.get("audioQuality"))), ("Mode", self._stringify(detail.get("audioMode"))), ("Asset", self._stringify(detail.get("assetPresentation"))), ("Manifest Type", self._stringify(detail.get("manifestMimeType"))), ("Manifest Hash", self._stringify(detail.get("manifestHash"))), ("Bit Depth", self._stringify(detail.get("bitDepth"))), ("Sample Rate", self._stringify(detail.get("sampleRate"))), ] return [(name, value) for name, value in values if value] def selector( self, selected_items: List[Any], *, ctx: Any, stage_is_last: bool = True, **_kwargs: Any, ) -> bool: if not stage_is_last: return False contexts = self._extract_track_selection_context(selected_items) if not contexts: return False track_details: List[Tuple[int, str, str, Dict[str, Any]]] = [] for track_id, title, path in contexts: detail = self._fetch_track_details(track_id) if detail: track_details.append((track_id, title, path, detail)) if not track_details: return False try: from SYS.rich_display import stdout_console from SYS.result_table import ResultTable except Exception: return False table = ResultTable("HIFI Track").set_preserve_order(True) table.set_table("hifi.track") results_payload: List[Dict[str, Any]] = [] for track_id, title, path, detail in track_details: # Decode the DASH MPD manifest to a local file and use it as the selectable/playable path. try: from cmdlet._shared import resolve_tidal_manifest_path manifest_path = resolve_tidal_manifest_path( {"full_metadata": detail, "path": f"hifi://track/{track_id}"} ) except Exception: manifest_path = None resolved_path = str(manifest_path) if manifest_path else f"hifi://track/{track_id}" artists = self._extract_artists(detail) artist_display = ", ".join(artists) if artists else "" columns = self._build_track_columns(detail, track_id) if artist_display: columns.insert(1, ("Artist", artist_display)) album = detail.get("album") if isinstance(album, dict): album_title = self._stringify(album.get("title")) else: album_title = self._stringify(detail.get("album")) if album_title: insert_pos = 2 if artist_display else 1 columns.insert(insert_pos, ("Album", album_title)) result = SearchResult( table="hifi.track", title=title, path=resolved_path, detail=f"id:{track_id}", annotations=["tidal", "track"], media_kind="audio", columns=columns, full_metadata=detail, ) table.add_result(result) try: results_payload.append(result.to_dict()) except Exception: results_payload.append({ "table": "hifi.track", "title": result.title, "path": result.path, }) try: ctx.set_last_result_table(table, results_payload) ctx.set_current_stage_table(table) except Exception: pass try: stdout_console().print() stdout_console().print(table) except Exception: pass return True