This commit is contained in:
2026-01-07 05:09:59 -08:00
parent edc33f4528
commit f0799191ff
10 changed files with 956 additions and 353 deletions

View File

@@ -10,14 +10,34 @@ import time
import sys
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
from urllib.parse import urlparse
from API.hifi import HifiApiClient
from ProviderCore.base import Provider, SearchResult
from ProviderCore.base import Provider, SearchResult, parse_inline_query_arguments
from Provider.tidal_shared import (
build_track_tags,
coerce_duration_seconds,
extract_artists,
stringify,
)
from SYS import pipeline as pipeline_context
from SYS.logger import debug, log
DEFAULT_API_URLS = (
URL_API = (
"https://tidal-api.binimum.org",
"https://triton.squid.wtf",
"https://wolf.qqdl.site",
"https://maus.qqdl.site",
"https://vogel.qqdl.site",
"https://katze.qqdl.site",
"https://hund.qqdl.site",
"https://tidal.kinoplus.online",
"https://tidal-api.binimum.org",
)
_KEY_TO_PARAM: Dict[str, str] = {
"album": "al",
"artist": "a",
@@ -49,6 +69,20 @@ class HIFI(Provider):
TABLE_AUTO_STAGES = {
"hifi.track": ["download-file"],
}
QUERY_ARG_CHOICES = {
"album": (),
"artist": (),
"playlist": (),
"track": (),
"title": (),
"video": (),
}
INLINE_QUERY_FIELD_CHOICES = QUERY_ARG_CHOICES
URL_DOMAINS = (
"tidal.com",
"listen.tidal.com",
)
URL = URL_DOMAINS
"""Provider that targets the HiFi-RestAPI (Tidal proxy) search endpoint.
The CLI can supply a list of fail-over URLs via ``provider.hifi.api_urls`` or
@@ -65,6 +99,14 @@ class HIFI(Provider):
self.api_timeout = 10.0
self.api_clients = [HifiApiClient(base_url=url, timeout=self.api_timeout) for url in self.api_urls]
def extract_query_arguments(self, query: str) -> Tuple[str, Dict[str, Any]]:
normalized, parsed = parse_inline_query_arguments(query)
filtered: Dict[str, Any] = {}
for key, value in parsed.items():
if key in self.QUERY_ARG_CHOICES:
filtered[key] = value
return normalized, filtered
def validate(self) -> bool:
return bool(self.api_urls)
@@ -77,8 +119,16 @@ class HIFI(Provider):
) -> List[SearchResult]:
if limit <= 0:
return []
view = self._get_view_from_query(query)
params = self._build_search_params(query)
normalized_query, inline_args = self.extract_query_arguments(query)
raw_query = str(query or "").strip()
search_query = normalized_query or raw_query
if not search_query and inline_args:
search_query = " ".join(f"{k}:{v}" for k, v in inline_args.items())
if not search_query:
return []
view = self._determine_view(search_query, inline_args)
params = self._build_search_params(search_query)
if not params:
return []
@@ -126,6 +176,18 @@ class HIFI(Provider):
return "album"
return "track"
def _determine_view(self, query: str, inline_args: Dict[str, Any]) -> str:
if inline_args:
if "artist" in inline_args:
return "artist"
if "album" in inline_args:
return "album"
if "track" in inline_args or "title" in inline_args:
return "track"
if "video" in inline_args or "playlist" in inline_args:
return "track"
return self._get_view_from_query(query)
@staticmethod
def _safe_filename(value: Any, *, fallback: str = "hifi") -> str:
text = str(value or "").strip()
@@ -169,6 +231,56 @@ class HIFI(Provider):
return None
return num if num > 0 else None
def _parse_tidal_url(self, url: str) -> Tuple[str, Optional[int]]:
try:
parsed = urlparse(str(url))
except Exception:
return "", None
parts = [segment for segment in (parsed.path or "").split("/") if segment]
if not parts:
return "", None
idx = 0
if parts[0].lower() == "browse":
idx = 1
if idx >= len(parts):
return "", None
view = parts[idx].lower()
if view not in {"album", "track"}:
return "", None
for segment in parts[idx + 1:]:
identifier = self._parse_int(segment)
if identifier is not None:
return view, identifier
return view, None
def _track_detail_to_result(self, detail: Optional[Dict[str, Any]], track_id: int) -> SearchResult:
if isinstance(detail, dict):
candidate = self._item_to_result(detail)
if candidate is not None:
try:
candidate.full_metadata = dict(detail)
except Exception:
pass
return candidate
title = f"Track {track_id}"
if isinstance(detail, dict):
title = self._stringify(detail.get("title")) or title
return SearchResult(
table="hifi",
title=title,
path=f"hifi://track/{track_id}",
detail=f"id:{track_id}",
annotations=["tidal", "track"],
media_kind="audio",
full_metadata=dict(detail) if isinstance(detail, dict) else {},
)
def _extract_artist_selection_context(self, selected_items: List[Any]) -> List[Tuple[int, str]]:
contexts: List[Tuple[int, str]] = []
seen: set[int] = set()
@@ -589,6 +701,65 @@ class HIFI(Provider):
return results
def _present_album_tracks(
self,
track_results: List[SearchResult],
*,
album_id: Optional[int],
album_title: str,
artist_name: str,
) -> None:
if not track_results:
return
try:
from SYS.rich_display import stdout_console
from SYS.result_table import ResultTable
except Exception:
return
label = album_title or "Album"
if artist_name:
label = f"{artist_name} - {label}"
table = ResultTable(f"HIFI Tracks: {label}").set_preserve_order(True)
table.set_table("hifi.track")
try:
table.set_table_metadata(
{
"provider": "hifi",
"view": "track",
"album_id": album_id,
"album_title": album_title,
"artist_name": artist_name,
}
)
except Exception:
pass
results_payload: List[Dict[str, Any]] = []
for result in track_results:
table.add_result(result)
try:
results_payload.append(result.to_dict())
except Exception:
results_payload.append(
{
"table": getattr(result, "table", "hifi.track"),
"title": getattr(result, "title", ""),
"path": getattr(result, "path", ""),
}
)
pipeline_context.set_last_result_table(table, results_payload)
pipeline_context.set_current_stage_table(table)
try:
stdout_console().print()
stdout_console().print(table)
except Exception:
pass
def _album_item_to_result(self, album: Dict[str, Any], *, artist_name: str) -> Optional[SearchResult]:
if not isinstance(album, dict):
return None
@@ -1080,6 +1251,73 @@ class HIFI(Provider):
)
return materialized
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]:
view, identifier = self._parse_tidal_url(url)
if not view:
return False, None
if view == "track":
if not identifier or output_dir is None:
return False, None
try:
detail = self._fetch_track_details(identifier)
except Exception:
detail = None
result = self._track_detail_to_result(detail, identifier)
try:
downloaded = self.download(result, output_dir)
except Exception:
return False, None
if downloaded:
return True, downloaded
return False, None
if view == "album":
if not identifier:
return False, None
try:
track_results = self._tracks_for_album(
album_id=identifier,
album_title="",
artist_name="",
limit=200,
)
except Exception:
return False, None
if not track_results:
return False, None
album_title = ""
artist_name = ""
metadata = getattr(track_results[0], "full_metadata", None)
if isinstance(metadata, dict):
album_obj = metadata.get("album")
if isinstance(album_obj, dict):
album_title = self._stringify(album_obj.get("title"))
else:
album_title = self._stringify(album_obj or metadata.get("album"))
artists = self._extract_artists(metadata)
if artists:
artist_name = artists[0]
if not album_title:
album_title = f"Album {identifier}"
self._present_album_tracks(
track_results,
album_id=identifier,
album_title=album_title,
artist_name=artist_name,
)
return True, None
return False, None
def _get_api_client_for_base(self, base_url: str) -> Optional[HifiApiClient]:
base = base_url.rstrip("/")
for client in self.api_clients:
@@ -1180,7 +1418,7 @@ class HIFI(Provider):
urls.append(raw.strip())
cleaned = [u.rstrip("/") for u in urls if isinstance(u, str) and u.strip()]
if not cleaned:
cleaned = [DEFAULT_API_URLS[0]]
cleaned = [URL_API[0]]
return cleaned
def _build_search_params(self, query: str) -> Dict[str, str]:
@@ -1342,58 +1580,15 @@ class HIFI(Provider):
@staticmethod
def _coerce_duration_seconds(value: Any) -> Optional[int]:
candidates = []
candidates.append(value)
try:
if isinstance(value, dict):
for key in ("duration",
"durationSeconds",
"duration_sec",
"duration_ms",
"durationMillis"):
if key in value:
candidates.append(value.get(key))
except Exception:
pass
for cand in candidates:
try:
if cand is None:
continue
if isinstance(cand, str) and cand.strip().endswith("ms"):
cand = cand.strip()[:-2]
v = float(cand)
if v <= 0:
continue
if v > 10_000: # treat as milliseconds
v = v / 1000.0
return int(round(v))
except Exception:
continue
return None
return coerce_duration_seconds(value)
@staticmethod
def _stringify(value: Any) -> str:
text = str(value or "").strip()
return text
return stringify(value)
@staticmethod
def _extract_artists(item: Dict[str, Any]) -> List[str]:
names: List[str] = []
artists = item.get("artists")
if isinstance(artists, list):
for artist in artists:
if isinstance(artist, dict):
name = str(artist.get("name") or "").strip()
if name and name not in names:
names.append(name)
if not names:
primary = item.get("artist")
if isinstance(primary, dict):
name = str(primary.get("name") or "").strip()
if name:
names.append(name)
return names
return extract_artists(item)
def _item_to_result(self, item: Dict[str, Any]) -> Optional[SearchResult]:
if not isinstance(item, dict):
@@ -1619,52 +1814,7 @@ class HIFI(Provider):
return [(name, value) for name, value in values if value]
def _build_track_tags(self, metadata: Dict[str, Any]) -> set[str]:
tags: set[str] = {"tidal"}
audio_quality = self._stringify(metadata.get("audioQuality"))
if audio_quality:
tags.add(f"quality:{audio_quality.lower()}")
media_md = metadata.get("mediaMetadata")
if isinstance(media_md, dict):
tag_values = media_md.get("tags") or []
for tag in tag_values:
if isinstance(tag, str):
candidate = tag.strip()
if candidate:
tags.add(candidate.lower())
title_text = self._stringify(metadata.get("title"))
if title_text:
tags.add(f"title:{title_text}")
artists = self._extract_artists(metadata)
for artist in artists:
artist_clean = self._stringify(artist)
if artist_clean:
tags.add(f"artist:{artist_clean}")
album_title = ""
album_obj = metadata.get("album")
if isinstance(album_obj, dict):
album_title = self._stringify(album_obj.get("title"))
else:
album_title = self._stringify(metadata.get("album"))
if album_title:
tags.add(f"album:{album_title}")
track_no_val = metadata.get("trackNumber") or metadata.get("track_number")
if track_no_val is not None:
try:
track_int = int(track_no_val)
if track_int > 0:
tags.add(f"track:{track_int}")
except Exception:
track_text = self._stringify(track_no_val)
if track_text:
tags.add(f"track:{track_text}")
return tags
return build_track_tags(metadata)
def selector(
self,