489 lines
16 KiB
Python
489 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
import sys
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from API.hifi import HifiApiClient
|
|
from ProviderCore.base import Provider, SearchResult
|
|
from SYS.logger import log
|
|
|
|
DEFAULT_API_URLS = (
|
|
"https://tidal-api.binimum.org",
|
|
)
|
|
|
|
_KEY_TO_PARAM: Dict[str, str] = {
|
|
"album": "al",
|
|
"artist": "a",
|
|
"playlist": "p",
|
|
"video": "v",
|
|
"song": "s",
|
|
"track": "s",
|
|
"title": "s",
|
|
}
|
|
|
|
_DELIMITERS_RE = re.compile(r"[;,]")
|
|
_SEGMENT_BOUNDARY_RE = re.compile(r"(?=\b\w+\s*:)")
|
|
|
|
|
|
class HIFI(Provider):
|
|
"""Provider that targets the HiFi-RestAPI (Tidal proxy) search endpoint.
|
|
|
|
The CLI can supply a list of fail-over URLs via ``provider.hifi.api_urls`` or
|
|
``provider.hifi.api_url`` in the config. When not configured, it defaults to
|
|
https://tidal-api.binimum.org.
|
|
"""
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None) -> None:
|
|
super().__init__(config)
|
|
self.api_urls = self._resolve_api_urls()
|
|
try:
|
|
self.api_timeout = float(self.config.get("timeout", 10.0))
|
|
except Exception:
|
|
self.api_timeout = 10.0
|
|
self.api_clients = [HifiApiClient(base_url=url, timeout=self.api_timeout) for url in self.api_urls]
|
|
|
|
def validate(self) -> bool:
|
|
return bool(self.api_urls)
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
limit: int = 50,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
**_kwargs: Any,
|
|
) -> List[SearchResult]:
|
|
if limit <= 0:
|
|
return []
|
|
params = self._build_search_params(query)
|
|
if not params:
|
|
return []
|
|
|
|
payload: Optional[Dict[str, Any]] = None
|
|
for base in self.api_urls:
|
|
endpoint = f"{base.rstrip('/')}/search/"
|
|
try:
|
|
client = self._get_api_client_for_base(base)
|
|
payload = client.search(params) if client else None
|
|
if payload is not None:
|
|
break
|
|
except Exception as exc:
|
|
log(f"[hifi] Search failed for {endpoint}: {exc}", file=sys.stderr)
|
|
continue
|
|
|
|
if not payload:
|
|
return []
|
|
|
|
data = payload.get("data") or {}
|
|
items = self._extract_track_items(data)
|
|
results: List[SearchResult] = []
|
|
for item in items:
|
|
if limit and len(results) >= limit:
|
|
break
|
|
result = self._item_to_result(item)
|
|
if result is not None:
|
|
results.append(result)
|
|
|
|
return results[:limit]
|
|
|
|
def _get_api_client_for_base(self, base_url: str) -> Optional[HifiApiClient]:
|
|
base = base_url.rstrip("/")
|
|
for client in self.api_clients:
|
|
if getattr(client, "base_url", "").rstrip("/") == base:
|
|
return client
|
|
return None
|
|
|
|
def _extract_track_items(self, data: Any) -> List[Dict[str, Any]]:
|
|
if isinstance(data, list):
|
|
return [item for item in data if isinstance(item, dict)]
|
|
if not isinstance(data, dict):
|
|
return []
|
|
|
|
items: List[Dict[str, Any]] = []
|
|
direct = data.get("items")
|
|
if isinstance(direct, list):
|
|
items.extend(item for item in direct if isinstance(item, dict))
|
|
|
|
tracks_section = data.get("tracks")
|
|
if isinstance(tracks_section, dict):
|
|
track_items = tracks_section.get("items")
|
|
if isinstance(track_items, list):
|
|
items.extend(item for item in track_items if isinstance(item, dict))
|
|
|
|
top_hits = data.get("topHits")
|
|
if isinstance(top_hits, list):
|
|
for hit in top_hits:
|
|
if not isinstance(hit, dict):
|
|
continue
|
|
hit_type = str(hit.get("type") or "").upper()
|
|
if hit_type != "TRACKS":
|
|
continue
|
|
value = hit.get("value")
|
|
if isinstance(value, dict):
|
|
items.append(value)
|
|
|
|
seen: set[int] = set()
|
|
deduped: List[Dict[str, Any]] = []
|
|
for item in items:
|
|
track_id = item.get("id") or item.get("trackId")
|
|
try:
|
|
track_int = int(track_id)
|
|
except Exception:
|
|
track_int = None
|
|
if track_int is None or track_int in seen:
|
|
continue
|
|
seen.add(track_int)
|
|
deduped.append(item)
|
|
|
|
return deduped
|
|
|
|
def _resolve_api_urls(self) -> List[str]:
|
|
urls: List[str] = []
|
|
raw = self.config.get("api_urls")
|
|
if raw is None:
|
|
raw = self.config.get("api_url")
|
|
if isinstance(raw, (list, tuple)):
|
|
urls.extend(str(item).strip() for item in raw if isinstance(item, str))
|
|
elif isinstance(raw, str):
|
|
urls.append(raw.strip())
|
|
cleaned = [u.rstrip("/") for u in urls if isinstance(u, str) and u.strip()]
|
|
if not cleaned:
|
|
cleaned = [DEFAULT_API_URLS[0]]
|
|
return cleaned
|
|
|
|
def _build_search_params(self, query: str) -> Dict[str, str]:
|
|
cleaned = str(query or "").strip()
|
|
if not cleaned:
|
|
return {}
|
|
|
|
segments: List[str] = []
|
|
for chunk in _DELIMITERS_RE.split(cleaned):
|
|
chunk = chunk.strip()
|
|
if not chunk:
|
|
continue
|
|
if ":" in chunk:
|
|
for sub in _SEGMENT_BOUNDARY_RE.split(chunk):
|
|
part = sub.strip()
|
|
if part:
|
|
segments.append(part)
|
|
else:
|
|
segments.append(chunk)
|
|
|
|
key_values: Dict[str, str] = {}
|
|
free_text: List[str] = []
|
|
for segment in segments:
|
|
if ":" not in segment:
|
|
free_text.append(segment)
|
|
continue
|
|
key, value = segment.split(":", 1)
|
|
key = key.strip().lower()
|
|
value = value.strip().strip('"').strip("'")
|
|
if value:
|
|
key_values[key] = value
|
|
|
|
params: Dict[str, str] = {}
|
|
for key, value in key_values.items():
|
|
if not value:
|
|
continue
|
|
mapped = _KEY_TO_PARAM.get(key)
|
|
if mapped:
|
|
params[mapped] = value
|
|
|
|
general = " ".join(part for part in free_text if part).strip()
|
|
if general:
|
|
params.setdefault("s", general)
|
|
elif not params:
|
|
params["s"] = cleaned
|
|
return params
|
|
|
|
@staticmethod
|
|
def _format_duration(seconds: Any) -> str:
|
|
try:
|
|
total = int(seconds)
|
|
if total < 0:
|
|
return ""
|
|
except Exception:
|
|
return ""
|
|
minutes, secs = divmod(total, 60)
|
|
return f"{minutes}:{secs:02d}"
|
|
|
|
@staticmethod
|
|
def _stringify(value: Any) -> str:
|
|
text = str(value or "").strip()
|
|
return text
|
|
|
|
@staticmethod
|
|
def _extract_artists(item: Dict[str, Any]) -> List[str]:
|
|
names: List[str] = []
|
|
artists = item.get("artists")
|
|
if isinstance(artists, list):
|
|
for artist in artists:
|
|
if isinstance(artist, dict):
|
|
name = str(artist.get("name") or "").strip()
|
|
if name and name not in names:
|
|
names.append(name)
|
|
if not names:
|
|
primary = item.get("artist")
|
|
if isinstance(primary, dict):
|
|
name = str(primary.get("name") or "").strip()
|
|
if name:
|
|
names.append(name)
|
|
return names
|
|
|
|
def _item_to_result(self, item: Dict[str, Any]) -> Optional[SearchResult]:
|
|
if not isinstance(item, dict):
|
|
return None
|
|
|
|
title = str(item.get("title") or "").strip()
|
|
if not title:
|
|
return None
|
|
|
|
identifier = item.get("id")
|
|
if identifier is None:
|
|
return None
|
|
try:
|
|
track_id = int(identifier)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
# Avoid tidal.com URLs entirely; selection will resolve to a decoded MPD.
|
|
path = f"hifi://track/{track_id}"
|
|
|
|
artists = self._extract_artists(item)
|
|
artist_display = ", ".join(artists)
|
|
|
|
album = item.get("album")
|
|
album_title = ""
|
|
if isinstance(album, dict):
|
|
album_title = str(album.get("title") or "").strip()
|
|
|
|
detail_parts: List[str] = []
|
|
if artist_display:
|
|
detail_parts.append(artist_display)
|
|
if album_title:
|
|
detail_parts.append(album_title)
|
|
detail = " | ".join(detail_parts)
|
|
|
|
columns: List[tuple[str, str]] = []
|
|
if title:
|
|
columns.append(("Title", title))
|
|
if album_title:
|
|
columns.append(("Album", album_title))
|
|
if artist_display:
|
|
columns.append(("Artist", artist_display))
|
|
duration_text = self._format_duration(item.get("duration"))
|
|
if duration_text:
|
|
columns.append(("Duration", duration_text))
|
|
audio_quality = str(item.get("audioQuality") or "").strip()
|
|
if audio_quality:
|
|
columns.append(("Quality", audio_quality))
|
|
|
|
tags = {"tidal"}
|
|
if audio_quality:
|
|
tags.add(f"quality:{audio_quality.lower()}")
|
|
metadata = item.get("mediaMetadata")
|
|
if isinstance(metadata, dict):
|
|
tag_values = metadata.get("tags") or []
|
|
for tag in tag_values:
|
|
if isinstance(tag, str) and tag.strip():
|
|
tags.add(tag.strip().lower())
|
|
|
|
return SearchResult(
|
|
table="hifi",
|
|
title=title,
|
|
path=path,
|
|
detail=detail,
|
|
annotations=["tidal"],
|
|
media_kind="audio",
|
|
tag=tags,
|
|
columns=columns,
|
|
full_metadata=item,
|
|
)
|
|
|
|
def _extract_track_selection_context(
|
|
self, selected_items: List[Any]
|
|
) -> List[Tuple[int, str, str]]:
|
|
contexts: List[Tuple[int, str, str]] = []
|
|
seen_ids: set[int] = set()
|
|
for item in selected_items or []:
|
|
payload: Dict[str, Any] = {}
|
|
if isinstance(item, dict):
|
|
payload = item
|
|
else:
|
|
try:
|
|
payload = (
|
|
item.to_dict()
|
|
if hasattr(item, "to_dict")
|
|
and callable(getattr(item, "to_dict"))
|
|
else {}
|
|
)
|
|
except Exception:
|
|
payload = {}
|
|
if not payload:
|
|
try:
|
|
payload = {
|
|
"title": getattr(item, "title", None),
|
|
"path": getattr(item, "path", None),
|
|
"url": getattr(item, "url", None),
|
|
"full_metadata": getattr(item, "full_metadata", None),
|
|
}
|
|
except Exception:
|
|
payload = {}
|
|
|
|
meta = (
|
|
payload.get("full_metadata")
|
|
if isinstance(payload.get("full_metadata"), dict)
|
|
else payload
|
|
)
|
|
if not isinstance(meta, dict):
|
|
meta = {}
|
|
raw_id = meta.get("trackId") or meta.get("id") or payload.get("id")
|
|
if raw_id is None:
|
|
continue
|
|
try:
|
|
track_id = int(raw_id)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
if track_id in seen_ids:
|
|
continue
|
|
seen_ids.add(track_id)
|
|
|
|
title = (
|
|
payload.get("title")
|
|
or meta.get("title")
|
|
or payload.get("name")
|
|
or payload.get("path")
|
|
or payload.get("url")
|
|
)
|
|
if not title:
|
|
title = f"Track {track_id}"
|
|
path = (
|
|
payload.get("path")
|
|
or payload.get("url")
|
|
or f"hifi://track/{track_id}"
|
|
)
|
|
contexts.append((track_id, str(title).strip(), str(path).strip()))
|
|
return contexts
|
|
|
|
def _fetch_track_details(self, track_id: int) -> Optional[Dict[str, Any]]:
|
|
if track_id <= 0:
|
|
return None
|
|
for base in self.api_urls:
|
|
endpoint = f"{base.rstrip('/')}/track/"
|
|
try:
|
|
client = self._get_api_client_for_base(base)
|
|
payload = client.track(track_id) if client else None
|
|
data = payload.get("data") if isinstance(payload, dict) else None
|
|
if isinstance(data, dict):
|
|
return data
|
|
except Exception as exc:
|
|
log(f"[hifi] Track lookup failed for {endpoint}: {exc}", file=sys.stderr)
|
|
continue
|
|
return None
|
|
|
|
def _build_track_columns(self, detail: Dict[str, Any], track_id: int) -> List[Tuple[str, str]]:
|
|
values: List[Tuple[str, str]] = [
|
|
("Track ID", str(track_id)),
|
|
("Quality", self._stringify(detail.get("audioQuality"))),
|
|
("Mode", self._stringify(detail.get("audioMode"))),
|
|
("Asset", self._stringify(detail.get("assetPresentation"))),
|
|
("Manifest Type", self._stringify(detail.get("manifestMimeType"))),
|
|
("Manifest Hash", self._stringify(detail.get("manifestHash"))),
|
|
("Bit Depth", self._stringify(detail.get("bitDepth"))),
|
|
("Sample Rate", self._stringify(detail.get("sampleRate"))),
|
|
]
|
|
return [(name, value) for name, value in values if value]
|
|
|
|
def selector(
|
|
self,
|
|
selected_items: List[Any],
|
|
*,
|
|
ctx: Any,
|
|
stage_is_last: bool = True,
|
|
**_kwargs: Any,
|
|
) -> bool:
|
|
if not stage_is_last:
|
|
return False
|
|
|
|
contexts = self._extract_track_selection_context(selected_items)
|
|
if not contexts:
|
|
return False
|
|
|
|
track_details: List[Tuple[int, str, str, Dict[str, Any]]] = []
|
|
for track_id, title, path in contexts:
|
|
detail = self._fetch_track_details(track_id)
|
|
if detail:
|
|
track_details.append((track_id, title, path, detail))
|
|
|
|
if not track_details:
|
|
return False
|
|
|
|
try:
|
|
from SYS.rich_display import stdout_console
|
|
from SYS.result_table import ResultTable
|
|
except Exception:
|
|
return False
|
|
|
|
table = ResultTable("HIFI Track").set_preserve_order(True)
|
|
table.set_table("hifi.track")
|
|
results_payload: List[Dict[str, Any]] = []
|
|
for track_id, title, path, detail in track_details:
|
|
# Decode the DASH MPD manifest to a local file and use it as the selectable/playable path.
|
|
try:
|
|
from cmdlet._shared import resolve_tidal_manifest_path
|
|
|
|
manifest_path = resolve_tidal_manifest_path(
|
|
{"full_metadata": detail, "path": f"hifi://track/{track_id}"}
|
|
)
|
|
except Exception:
|
|
manifest_path = None
|
|
|
|
resolved_path = str(manifest_path) if manifest_path else f"hifi://track/{track_id}"
|
|
|
|
artists = self._extract_artists(detail)
|
|
artist_display = ", ".join(artists) if artists else ""
|
|
columns = self._build_track_columns(detail, track_id)
|
|
if artist_display:
|
|
columns.insert(1, ("Artist", artist_display))
|
|
album = detail.get("album")
|
|
if isinstance(album, dict):
|
|
album_title = self._stringify(album.get("title"))
|
|
else:
|
|
album_title = self._stringify(detail.get("album"))
|
|
if album_title:
|
|
insert_pos = 2 if artist_display else 1
|
|
columns.insert(insert_pos, ("Album", album_title))
|
|
|
|
result = SearchResult(
|
|
table="hifi.track",
|
|
title=title,
|
|
path=resolved_path,
|
|
detail=f"id:{track_id}",
|
|
annotations=["tidal", "track"],
|
|
media_kind="audio",
|
|
columns=columns,
|
|
full_metadata=detail,
|
|
)
|
|
table.add_result(result)
|
|
try:
|
|
results_payload.append(result.to_dict())
|
|
except Exception:
|
|
results_payload.append({
|
|
"table": "hifi.track",
|
|
"title": result.title,
|
|
"path": result.path,
|
|
})
|
|
|
|
try:
|
|
ctx.set_last_result_table(table, results_payload)
|
|
ctx.set_current_stage_table(table)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
stdout_console().print()
|
|
stdout_console().print(table)
|
|
except Exception:
|
|
pass
|
|
|
|
return True |