This commit is contained in:
2025-12-31 05:17:37 -08:00
parent 3bbaa28fb4
commit e8842ceded
10 changed files with 1255 additions and 29 deletions

434
Provider/HIFI.py Normal file
View File

@@ -0,0 +1,434 @@
from __future__ import annotations
import re
import sys
from typing import Any, Dict, List, Optional, Tuple
import httpx
from ProviderCore.base import Provider, SearchResult
from SYS.logger import log
DEFAULT_API_URLS = (
"https://tidal-api.binimum.org",
)
_KEY_TO_PARAM: Dict[str, str] = {
"album": "al",
"artist": "a",
"playlist": "p",
"video": "v",
"song": "s",
"track": "s",
"title": "s",
}
_DELIMITERS_RE = re.compile(r"[;,]")
_SEGMENT_BOUNDARY_RE = re.compile(r"(?=\b\w+\s*:)")
class HIFI(Provider):
"""Provider that targets the HiFi-RestAPI (Tidal proxy) search endpoint.
The CLI can supply a list of fail-over URLs via ``provider.hifi.api_urls`` or
``provider.hifi.api_url`` in the config. When not configured, it defaults to
https://tidal-api.binimum.org.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None) -> None:
super().__init__(config)
self.api_urls = self._resolve_api_urls()
def validate(self) -> bool:
return bool(self.api_urls)
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**_kwargs: Any,
) -> List[SearchResult]:
if limit <= 0:
return []
params = self._build_search_params(query)
if not params:
return []
payload: Optional[Dict[str, Any]] = None
for base in self.api_urls:
endpoint = f"{base.rstrip('/')}/search/"
try:
resp = httpx.get(endpoint, params=params, timeout=10.0)
resp.raise_for_status()
payload = resp.json()
break
except Exception as exc:
log(f"[hifi] Search failed for {endpoint}: {exc}", file=sys.stderr)
continue
if not payload:
return []
data = payload.get("data") or {}
items = data.get("items") or []
results: List[SearchResult] = []
for item in items:
if limit and len(results) >= limit:
break
result = self._item_to_result(item)
if result is not None:
results.append(result)
return results[:limit]
def _resolve_api_urls(self) -> List[str]:
urls: List[str] = []
raw = self.config.get("api_urls")
if raw is None:
raw = self.config.get("api_url")
if isinstance(raw, (list, tuple)):
urls.extend(str(item).strip() for item in raw if isinstance(item, str))
elif isinstance(raw, str):
urls.append(raw.strip())
cleaned = [u.rstrip("/") for u in urls if isinstance(u, str) and u.strip()]
if not cleaned:
cleaned = [DEFAULT_API_URLS[0]]
return cleaned
def _build_search_params(self, query: str) -> Dict[str, str]:
cleaned = str(query or "").strip()
if not cleaned:
return {}
segments: List[str] = []
for chunk in _DELIMITERS_RE.split(cleaned):
chunk = chunk.strip()
if not chunk:
continue
if ":" in chunk:
for sub in _SEGMENT_BOUNDARY_RE.split(chunk):
part = sub.strip()
if part:
segments.append(part)
else:
segments.append(chunk)
key_values: Dict[str, str] = {}
free_text: List[str] = []
for segment in segments:
if ":" not in segment:
free_text.append(segment)
continue
key, value = segment.split(":", 1)
key = key.strip().lower()
value = value.strip().strip('"').strip("'")
if value:
key_values[key] = value
params: Dict[str, str] = {}
for key, value in key_values.items():
if not value:
continue
mapped = _KEY_TO_PARAM.get(key)
if mapped:
params[mapped] = value
general = " ".join(part for part in free_text if part).strip()
if general:
params.setdefault("s", general)
elif not params:
params["s"] = cleaned
return params
@staticmethod
def _format_duration(seconds: Any) -> str:
try:
total = int(seconds)
if total < 0:
return ""
except Exception:
return ""
minutes, secs = divmod(total, 60)
return f"{minutes}:{secs:02d}"
@staticmethod
def _stringify(value: Any) -> str:
text = str(value or "").strip()
return text
@staticmethod
def _extract_artists(item: Dict[str, Any]) -> List[str]:
names: List[str] = []
artists = item.get("artists")
if isinstance(artists, list):
for artist in artists:
if isinstance(artist, dict):
name = str(artist.get("name") or "").strip()
if name and name not in names:
names.append(name)
if not names:
primary = item.get("artist")
if isinstance(primary, dict):
name = str(primary.get("name") or "").strip()
if name:
names.append(name)
return names
def _item_to_result(self, item: Dict[str, Any]) -> Optional[SearchResult]:
if not isinstance(item, dict):
return None
title = str(item.get("title") or "").strip()
if not title:
return None
identifier = item.get("id")
if identifier is None:
return None
try:
track_id = int(identifier)
except (TypeError, ValueError):
return None
# Avoid tidal.com URLs entirely; selection will resolve to a decoded MPD.
path = f"hifi://track/{track_id}"
artists = self._extract_artists(item)
artist_display = ", ".join(artists)
album = item.get("album")
album_title = ""
if isinstance(album, dict):
album_title = str(album.get("title") or "").strip()
detail_parts: List[str] = []
if artist_display:
detail_parts.append(artist_display)
if album_title:
detail_parts.append(album_title)
detail = " | ".join(detail_parts)
columns: List[tuple[str, str]] = []
if artist_display:
columns.append(("Artist", artist_display))
if album_title:
columns.append(("Album", album_title))
duration_text = self._format_duration(item.get("duration"))
if duration_text:
columns.append(("Duration", duration_text))
audio_quality = str(item.get("audioQuality") or "").strip()
if audio_quality:
columns.append(("Quality", audio_quality))
tags = {"tidal"}
if audio_quality:
tags.add(f"quality:{audio_quality.lower()}")
metadata = item.get("mediaMetadata")
if isinstance(metadata, dict):
tag_values = metadata.get("tags") or []
for tag in tag_values:
if isinstance(tag, str) and tag.strip():
tags.add(tag.strip().lower())
return SearchResult(
table="hifi",
title=title,
path=path,
detail=detail,
annotations=["tidal"],
media_kind="audio",
tag=tags,
columns=columns,
full_metadata=item,
)
def _extract_track_selection_context(
self, selected_items: List[Any]
) -> List[Tuple[int, str, str]]:
contexts: List[Tuple[int, str, str]] = []
seen_ids: set[int] = set()
for item in selected_items or []:
payload: Dict[str, Any] = {}
if isinstance(item, dict):
payload = item
else:
try:
payload = (
item.to_dict()
if hasattr(item, "to_dict")
and callable(getattr(item, "to_dict"))
else {}
)
except Exception:
payload = {}
if not payload:
try:
payload = {
"title": getattr(item, "title", None),
"path": getattr(item, "path", None),
"url": getattr(item, "url", None),
"full_metadata": getattr(item, "full_metadata", None),
}
except Exception:
payload = {}
meta = (
payload.get("full_metadata")
if isinstance(payload.get("full_metadata"), dict)
else payload
)
if not isinstance(meta, dict):
meta = {}
raw_id = meta.get("trackId") or meta.get("id") or payload.get("id")
if raw_id is None:
continue
try:
track_id = int(raw_id)
except (TypeError, ValueError):
continue
if track_id in seen_ids:
continue
seen_ids.add(track_id)
title = (
payload.get("title")
or meta.get("title")
or payload.get("name")
or payload.get("path")
or payload.get("url")
)
if not title:
title = f"Track {track_id}"
path = (
payload.get("path")
or payload.get("url")
or f"hifi://track/{track_id}"
)
contexts.append((track_id, str(title).strip(), str(path).strip()))
return contexts
def _fetch_track_details(self, track_id: int) -> Optional[Dict[str, Any]]:
if track_id <= 0:
return None
params = {"id": str(track_id)}
for base in self.api_urls:
endpoint = f"{base.rstrip('/')}/track/"
try:
resp = httpx.get(endpoint, params=params, timeout=10.0)
resp.raise_for_status()
payload = resp.json()
data = payload.get("data")
if isinstance(data, dict):
return data
except Exception as exc:
log(f"[hifi] Track lookup failed for {endpoint}: {exc}", file=sys.stderr)
continue
return None
def _build_track_columns(self, detail: Dict[str, Any], track_id: int) -> List[Tuple[str, str]]:
values: List[Tuple[str, str]] = [
("Track ID", str(track_id)),
("Quality", self._stringify(detail.get("audioQuality"))),
("Mode", self._stringify(detail.get("audioMode"))),
("Asset", self._stringify(detail.get("assetPresentation"))),
("Manifest Type", self._stringify(detail.get("manifestMimeType"))),
("Manifest Hash", self._stringify(detail.get("manifestHash"))),
("Bit Depth", self._stringify(detail.get("bitDepth"))),
("Sample Rate", self._stringify(detail.get("sampleRate"))),
]
return [(name, value) for name, value in values if value]
def selector(
self,
selected_items: List[Any],
*,
ctx: Any,
stage_is_last: bool = True,
**_kwargs: Any,
) -> bool:
if not stage_is_last:
return False
contexts = self._extract_track_selection_context(selected_items)
if not contexts:
return False
track_details: List[Tuple[int, str, str, Dict[str, Any]]] = []
for track_id, title, path in contexts:
detail = self._fetch_track_details(track_id)
if detail:
track_details.append((track_id, title, path, detail))
if not track_details:
return False
try:
from SYS.rich_display import stdout_console
from SYS.result_table import ResultTable
except Exception:
return False
table = ResultTable("HIFI Track").set_preserve_order(True)
table.set_table("hifi.track")
results_payload: List[Dict[str, Any]] = []
for track_id, title, path, detail in track_details:
# Decode the DASH MPD manifest to a local file and use it as the selectable/playable path.
try:
from cmdlet._shared import resolve_tidal_manifest_path
manifest_path = resolve_tidal_manifest_path(
{"full_metadata": detail, "path": f"hifi://track/{track_id}"}
)
except Exception:
manifest_path = None
resolved_path = str(manifest_path) if manifest_path else f"hifi://track/{track_id}"
artists = self._extract_artists(detail)
artist_display = ", ".join(artists) if artists else ""
columns = self._build_track_columns(detail, track_id)
if artist_display:
columns.insert(1, ("Artist", artist_display))
album = detail.get("album")
if isinstance(album, dict):
album_title = self._stringify(album.get("title"))
else:
album_title = self._stringify(detail.get("album"))
if album_title:
insert_pos = 2 if artist_display else 1
columns.insert(insert_pos, ("Album", album_title))
result = SearchResult(
table="hifi.track",
title=title,
path=resolved_path,
detail=f"id:{track_id}",
annotations=["tidal", "track"],
media_kind="audio",
columns=columns,
full_metadata=detail,
)
table.add_result(result)
try:
results_payload.append(result.to_dict())
except Exception:
results_payload.append({
"table": "hifi.track",
"title": result.title,
"path": result.path,
})
try:
ctx.set_last_result_table(table, results_payload)
ctx.set_current_stage_table(table)
except Exception:
pass
try:
stdout_console().print()
stdout_console().print(table)
except Exception:
pass
return True