From 3acf21a67330e340aab2108868da14470450ae23 Mon Sep 17 00:00:00 2001 From: Nose Date: Sat, 3 Jan 2026 21:23:55 -0800 Subject: [PATCH] h --- API/hifi.py | 13 + CLI.py | 163 +++- MPV/portable_config/script-opts/medeia.conf | 2 +- Provider/HIFI.py | 807 +++++++++++++++++++- Provider/openlibrary.py | 19 +- SYS/pipeline.py | 98 ++- cmdlet/download_file.py | 2 +- cmdlet/search_file.py | 43 ++ cmdnat/pipe.py | 17 +- cmdnat/table.py | 123 +++ 10 files changed, 1244 insertions(+), 43 deletions(-) create mode 100644 cmdnat/table.py diff --git a/API/hifi.py b/API/hifi.py index c04004b..10e8956 100644 --- a/API/hifi.py +++ b/API/hifi.py @@ -18,6 +18,7 @@ class HifiApiClient: - GET /search/ with exactly one of s, a, v, p - GET /track/ with id (and optional quality) - GET /info/ with id + - GET /album/ with id - GET /lyrics/ with id """ @@ -68,6 +69,18 @@ class HifiApiClient: return self._get_json("info/", params={"id": track_int}) + def album(self, album_id: int) -> Dict[str, Any]: + """Fetch album details, including track list when provided by the backend.""" + + try: + album_int = int(album_id) + except Exception as exc: + raise HifiApiError(f"album_id must be int-compatible: {exc}") from exc + if album_int <= 0: + raise HifiApiError("album_id must be positive") + + return self._get_json("album/", params={"id": album_int}) + def lyrics(self, track_id: int) -> Dict[str, Any]: """Fetch lyrics (including subtitles/LRC) for a track.""" diff --git a/CLI.py b/CLI.py index d5ac921..4768bf8 100644 --- a/CLI.py +++ b/CLI.py @@ -2168,6 +2168,20 @@ class PipelineExecutor: table if current_table and hasattr(current_table, "table") else None ) + + # Prefer an explicit provider hint from table metadata when available. + # This keeps @N selectors working even when row payloads don't carry a + # provider key (or when they carry a table-type like hifi.album). + try: + meta = ( + current_table.get_table_metadata() + if current_table is not None and hasattr(current_table, "get_table_metadata") + else getattr(current_table, "table_metadata", None) + ) + except Exception: + meta = None + if isinstance(meta, dict): + _add(meta.get("provider")) except Exception: pass @@ -2187,6 +2201,23 @@ class PipelineExecutor: get_provider = None # type: ignore is_known_provider_name = None # type: ignore + # If we have a table-type like "hifi.album", also try its provider prefix ("hifi") + # when that prefix is a registered provider name. + if is_known_provider_name is not None: + try: + for key in list(candidates): + if not isinstance(key, str): + continue + if "." not in key: + continue + if is_known_provider_name(key): + continue + prefix = str(key).split(".", 1)[0].strip().lower() + if prefix and is_known_provider_name(prefix): + _add(prefix) + except Exception: + pass + if get_provider is not None: for key in candidates: try: @@ -2401,15 +2432,37 @@ class PipelineExecutor: if not selection_indices: return True, None + # Selection should operate on the *currently displayed* selectable table. + # Some navigation flows (e.g. @.. back) can show a display table without + # updating current_stage_table. Provider selectors rely on current_stage_table + # to detect table type (e.g. hifi.album -> tracks), so sync it here. + display_table = None try: - if not ctx.get_current_stage_table_source_command(): - display_table = ( - ctx.get_display_table() if hasattr(ctx, - "get_display_table") else None + display_table = ( + ctx.get_display_table() if hasattr(ctx, "get_display_table") else None + ) + except Exception: + display_table = None + + current_stage_table = None + try: + current_stage_table = ( + ctx.get_current_stage_table() + if hasattr(ctx, "get_current_stage_table") else None + ) + except Exception: + current_stage_table = None + + try: + if display_table is not None and hasattr(ctx, "set_current_stage_table"): + ctx.set_current_stage_table(display_table) + elif current_stage_table is None and hasattr(ctx, "set_current_stage_table"): + last_table = ( + ctx.get_last_result_table() + if hasattr(ctx, "get_last_result_table") else None ) - table_for_stage = display_table or ctx.get_last_result_table() - if table_for_stage: - ctx.set_current_stage_table(table_for_stage) + if last_table is not None: + ctx.set_current_stage_table(last_table) except Exception: pass @@ -2600,6 +2653,67 @@ class PipelineExecutor: print("No items matched selection in pipeline\n") return False, None + # Provider selection expansion (non-terminal): allow certain provider tables + # (e.g. hifi.album) to expand to multiple downstream items when the user + # pipes into another stage (e.g. @N | .mpv or @N | add-file). + table_type_hint = None + try: + table_type_hint = ( + stage_table.table + if stage_table is not None and hasattr(stage_table, "table") + else None + ) + except Exception: + table_type_hint = None + + if stages and isinstance(table_type_hint, str) and table_type_hint.strip().lower() == "hifi.album": + try: + from ProviderCore.registry import get_provider + + prov = get_provider("hifi", config) + except Exception: + prov = None + + if prov is not None and hasattr(prov, "_extract_album_selection_context") and hasattr(prov, "_tracks_for_album"): + try: + album_contexts = prov._extract_album_selection_context(filtered) # type: ignore[attr-defined] + except Exception: + album_contexts = [] + + track_items: List[Any] = [] + seen_track_ids: set[int] = set() + for album_id, album_title, artist_name in album_contexts or []: + try: + track_results = prov._tracks_for_album( # type: ignore[attr-defined] + album_id=album_id, + album_title=album_title, + artist_name=artist_name, + limit=500, + ) + except Exception: + track_results = [] + for tr in track_results or []: + try: + md = getattr(tr, "full_metadata", None) + tid = None + if isinstance(md, dict): + raw_id = md.get("trackId") or md.get("id") + try: + tid = int(raw_id) if raw_id is not None else None + except Exception: + tid = None + if tid is not None: + if tid in seen_track_ids: + continue + seen_track_ids.add(tid) + except Exception: + pass + track_items.append(tr) + + if track_items: + filtered = track_items + table_type_hint = "hifi.track" + if PipelineExecutor._maybe_run_class_selector( ctx, config, @@ -2634,11 +2748,20 @@ class PipelineExecutor: current_table = ctx.get_last_result_table() except Exception: current_table = None - table_type = ( - current_table.table - if current_table and hasattr(current_table, - "table") else None - ) + table_type = None + try: + if isinstance(table_type_hint, str) and table_type_hint.strip(): + table_type = table_type_hint + else: + table_type = ( + current_table.table + if current_table and hasattr(current_table, "table") else None + ) + except Exception: + table_type = ( + current_table.table + if current_table and hasattr(current_table, "table") else None + ) def _norm_cmd(name: Any) -> str: return str(name or "").replace("_", "-").strip().lower() @@ -2981,11 +3104,27 @@ class PipelineExecutor: display_table = None stage_table = ctx.get_current_stage_table() + # Selection should operate on the table the user sees. + # If a display overlay table exists, force it as the current-stage table + # so provider selectors (e.g. hifi.album -> tracks) behave consistently. + try: + if display_table is not None and hasattr(ctx, "set_current_stage_table"): + ctx.set_current_stage_table(display_table) + stage_table = display_table + except Exception: + pass + if not stage_table and display_table is not None: stage_table = display_table if not stage_table: stage_table = ctx.get_last_result_table() + try: + if hasattr(ctx, "debug_table_state"): + ctx.debug_table_state(f"selection {selection_token}") + except Exception: + pass + if display_table is not None and stage_table is display_table: items_list = ctx.get_last_result_items() or [] else: diff --git a/MPV/portable_config/script-opts/medeia.conf b/MPV/portable_config/script-opts/medeia.conf index 0cdb96d..4077249 100644 --- a/MPV/portable_config/script-opts/medeia.conf +++ b/MPV/portable_config/script-opts/medeia.conf @@ -1,2 +1,2 @@ # Medeia MPV script options -store=default +store=tutorial diff --git a/Provider/HIFI.py b/Provider/HIFI.py index e5fd819..67ba6ca 100644 --- a/Provider/HIFI.py +++ b/Provider/HIFI.py @@ -29,10 +29,22 @@ _DELIMITERS_RE = re.compile(r"[;,]") _SEGMENT_BOUNDARY_RE = re.compile(r"(?=\b\w+\s*:)") +def _format_total_seconds(seconds: Any) -> str: + try: + total = int(seconds) + except Exception: + return "" + if total <= 0: + return "" + mins = total // 60 + secs = total % 60 + return f"{mins}:{secs:02d}" + + class HIFI(Provider): - TABLE_AUTO_PREFIXES = { - "hifi": ["download-file"], + TABLE_AUTO_STAGES = { + "hifi.track": ["download-file"], } """Provider that targets the HiFi-RestAPI (Tidal proxy) search endpoint. @@ -62,6 +74,7 @@ class HIFI(Provider): ) -> List[SearchResult]: if limit <= 0: return [] + view = self._get_view_from_query(query) params = self._build_search_params(query) if not params: return [] @@ -82,17 +95,34 @@ class HIFI(Provider): return [] data = payload.get("data") or {} - items = self._extract_track_items(data) + if view == "artist": + items = self._extract_artist_items(data) + else: + items = self._extract_track_items(data) results: List[SearchResult] = [] for item in items: if limit and len(results) >= limit: break - result = self._item_to_result(item) + if view == "artist": + result = self._artist_item_to_result(item) + else: + result = self._item_to_result(item) if result is not None: results.append(result) return results[:limit] + @staticmethod + def _get_view_from_query(query: str) -> str: + text = str(query or "").strip() + if not text: + return "track" + if re.search(r"\bartist\s*:", text, flags=re.IGNORECASE): + return "artist" + if re.search(r"\balbum\s*:", text, flags=re.IGNORECASE): + return "album" + return "track" + @staticmethod def _safe_filename(value: Any, *, fallback: str = "hifi") -> str: text = str(value or "").strip() @@ -126,6 +156,478 @@ class HIFI(Provider): return self._parse_track_id(m.group(1)) return None + @staticmethod + def _parse_int(value: Any) -> Optional[int]: + if value is None: + return None + try: + num = int(value) + except Exception: + return None + return num if num > 0 else None + + def _extract_artist_selection_context(self, selected_items: List[Any]) -> List[Tuple[int, str]]: + contexts: List[Tuple[int, str]] = [] + seen: set[int] = set() + + for item in selected_items or []: + payload: Dict[str, Any] = {} + if isinstance(item, dict): + payload = item + else: + try: + payload = item.to_dict() if hasattr(item, "to_dict") and callable(getattr(item, "to_dict")) else {} + except Exception: + payload = {} + if not payload: + try: + payload = { + "title": getattr(item, "title", None), + "path": getattr(item, "path", None), + "full_metadata": getattr(item, "full_metadata", None), + } + except Exception: + payload = {} + + meta = payload.get("full_metadata") if isinstance(payload.get("full_metadata"), dict) else payload + if not isinstance(meta, dict): + meta = {} + + artist_id = self._parse_int(meta.get("artistId") or meta.get("id") or payload.get("artistId") or payload.get("id")) + if not artist_id: + # Try to parse from path. + raw_path = str(payload.get("path") or "").strip() + if raw_path: + m = re.search(r"hifi:(?://)?artist[\\/](\d+)", raw_path, flags=re.IGNORECASE) + if m: + artist_id = self._parse_int(m.group(1)) + + if not artist_id or artist_id in seen: + continue + seen.add(artist_id) + + name = ( + payload.get("title") + or meta.get("name") + or meta.get("title") + or payload.get("name") + ) + name_text = str(name or "").strip() or f"Artist {artist_id}" + contexts.append((artist_id, name_text)) + + return contexts + + def _extract_album_selection_context(self, selected_items: List[Any]) -> List[Tuple[Optional[int], str, str]]: + """Return (album_id, album_title, artist_name) for selected album rows.""" + + contexts: List[Tuple[Optional[int], str, str]] = [] + seen_ids: set[int] = set() + seen_keys: set[str] = set() + + for item in selected_items or []: + payload: Dict[str, Any] = {} + if isinstance(item, dict): + payload = item + else: + try: + payload = item.to_dict() if hasattr(item, "to_dict") and callable(getattr(item, "to_dict")) else {} + except Exception: + payload = {} + if not payload: + try: + payload = { + "title": getattr(item, "title", None), + "path": getattr(item, "path", None), + "full_metadata": getattr(item, "full_metadata", None), + } + except Exception: + payload = {} + + meta = payload.get("full_metadata") if isinstance(payload.get("full_metadata"), dict) else payload + if not isinstance(meta, dict): + meta = {} + + album_title = self._stringify(payload.get("title") or meta.get("title") or meta.get("name")) + if not album_title: + album_title = self._stringify(meta.get("album") or meta.get("albumTitle")) + if not album_title: + continue + + artist_name = self._stringify(meta.get("_artist_name") or meta.get("artist") or meta.get("artistName")) + if not artist_name: + # Some album payloads include nested artist objects. + artist_obj = meta.get("artist") + if isinstance(artist_obj, dict): + artist_name = self._stringify(artist_obj.get("name")) + + # Prefer albumId when available; some payloads carry both id/albumId. + album_id = self._parse_int(meta.get("albumId") or meta.get("id")) + + if not album_id: + raw_path = self._stringify(payload.get("path")) + if raw_path: + m = re.search(r"hifi:(?://)?album[\\/](\d+)", raw_path, flags=re.IGNORECASE) + if m: + album_id = self._parse_int(m.group(1)) + + if album_id: + if album_id in seen_ids: + continue + seen_ids.add(album_id) + else: + key = f"{album_title.lower()}::{artist_name.lower()}" + if key in seen_keys: + continue + seen_keys.add(key) + + contexts.append((album_id, album_title, artist_name)) + + return contexts + + def _track_matches_artist(self, track: Dict[str, Any], *, artist_id: Optional[int], artist_name: str) -> bool: + if not isinstance(track, dict): + return False + wanted = str(artist_name or "").strip().lower() + + primary = track.get("artist") + if isinstance(primary, dict): + if artist_id and self._parse_int(primary.get("id")) == artist_id: + return True + name = str(primary.get("name") or "").strip().lower() + if wanted and name == wanted: + return True + + artists = track.get("artists") + if isinstance(artists, list): + for a in artists: + if not isinstance(a, dict): + continue + if artist_id and self._parse_int(a.get("id")) == artist_id: + return True + name = str(a.get("name") or "").strip().lower() + if wanted and name == wanted: + return True + + # Fallback: string-match extracted display. + if wanted: + try: + names = [n.lower() for n in self._extract_artists(track)] + except Exception: + names = [] + return wanted in names + + return False + + def _albums_for_artist(self, *, artist_id: Optional[int], artist_name: str, limit: int = 200) -> List[SearchResult]: + name = str(artist_name or "").strip() + if not name: + return [] + + payload: Optional[Dict[str, Any]] = None + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/search/" + try: + client = self._get_api_client_for_base(base) + payload = client.search({"s": name}) if client else None + if payload is not None: + break + except Exception as exc: + log(f"[hifi] Album lookup failed for {endpoint}: {exc}", file=sys.stderr) + continue + + if not payload: + return [] + + data = payload.get("data") or {} + tracks = self._extract_track_items(data) + if not tracks: + return [] + + albums_by_id: Dict[int, Dict[str, Any]] = {} + albums_by_key: Dict[str, Dict[str, Any]] = {} + for track in tracks: + if not self._track_matches_artist(track, artist_id=artist_id, artist_name=name): + continue + album = track.get("album") + if not isinstance(album, dict): + continue + # Prefer albumId when available; some payloads carry both id/albumId. + album_id = self._parse_int(album.get("albumId") or album.get("id")) + title = self._stringify(album.get("title")) + if not title: + continue + if album_id: + albums_by_id.setdefault(album_id, album) + continue + key = f"{title.lower()}::{name.lower()}" + albums_by_key.setdefault(key, album) + + album_items: List[Dict[str, Any]] = list(albums_by_id.values()) + list(albums_by_key.values()) + results: List[SearchResult] = [] + for album in album_items: + if limit and len(results) >= limit: + break + res = self._album_item_to_result(album, artist_name=name) + if res is not None: + results.append(res) + return results + + def _tracks_for_album(self, *, album_id: Optional[int], album_title: str, artist_name: str = "", limit: int = 200) -> List[SearchResult]: + title = str(album_title or "").strip() + if not title: + return [] + + def _norm_album(text: str) -> str: + # Normalize album titles for matching across punctuation/case/spacing. + # Example: "either/or" vs "Either Or" or "Either/Or (Expanded Edition)". + s = str(text or "").strip().lower() + if not s: + return "" + s = re.sub(r"&", " and ", s) + s = re.sub(r"[^a-z0-9]+", "", s) + return s + + search_text = title + artist_text = str(artist_name or "").strip() + if artist_text: + # The proxy only supports s/a/v/p. Use a combined s= query to bias results + # toward the target album's tracks. + search_text = f"{artist_text} {title}".strip() + + # Prefer /album when we have a numeric album id. + # The proxy returns the album payload including a full track list in `data.items`. + # When this endpoint is available, it is authoritative for an album id, so we do + # not apply additional title/artist filtering. + if album_id: + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/album/" + try: + client = self._get_api_client_for_base(base) + album_payload = client.album(int(album_id)) if client else None + except Exception as exc: + log(f"[hifi] Album lookup failed for {endpoint}: {exc}", file=sys.stderr) + continue + + if not isinstance(album_payload, dict) or not album_payload: + continue + + try: + album_data = album_payload.get("data") + album_tracks = self._extract_track_items(album_data if album_data is not None else album_payload) + except Exception: + album_tracks = [] + + if not album_tracks: + # Try the next configured base URL (some backends return an error-shaped + # JSON object with 200, or omit tracks for certain ids). + continue + + ordered: List[Tuple[int, int, Dict[str, Any]]] = [] + for tr in album_tracks: + if not isinstance(tr, dict): + continue + disc_val = self._parse_int(tr.get("volumeNumber") or tr.get("discNumber") or 0) or 0 + track_val = self._parse_int(tr.get("trackNumber") or 0) or 0 + ordered.append((disc_val, track_val, tr)) + + ordered.sort(key=lambda t: (t[0], t[1])) + try: + debug(f"hifi album endpoint tracks: album_id={album_id} extracted={len(album_tracks)}") + except Exception: + pass + + results: List[SearchResult] = [] + for _disc, _track, tr in ordered: + if limit and len(results) >= limit: + break + res = self._item_to_result(tr) + if res is not None: + results.append(res) + if results: + return results + + # Reduce punctuation in the raw search string to improve /search/ recall. + try: + search_text = re.sub(r"[/\\]+", " ", search_text) + search_text = re.sub(r"\s+", " ", search_text).strip() + except Exception: + pass + + payload: Optional[Dict[str, Any]] = None + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/search/" + try: + client = self._get_api_client_for_base(base) + payload = client.search({"s": search_text}) if client else None + if payload is not None: + break + except Exception as exc: + log(f"[hifi] Track lookup failed for {endpoint}: {exc}", file=sys.stderr) + continue + + if not payload: + return [] + + data = payload.get("data") or {} + tracks = self._extract_track_items(data) + if not tracks: + return [] + + try: + debug(f"hifi album search tracks: album_id={album_id} extracted={len(tracks)} query={repr(search_text)}") + except Exception: + pass + + wanted_album = title.lower() + wanted_album_norm = _norm_album(title) + wanted_artist = artist_text.lower() + seen_ids: set[int] = set() + candidates: List[Tuple[int, int, Dict[str, Any]]] = [] + + for track in tracks: + if not isinstance(track, dict): + continue + tid = self._parse_int(track.get("id") or track.get("trackId")) + if not tid or tid in seen_ids: + continue + + album = track.get("album") + album_ok = False + if isinstance(album, dict): + if album_id and self._parse_int(album.get("albumId") or album.get("id")) == album_id: + album_ok = True + else: + at = self._stringify(album.get("title")).lower() + if at: + if at == wanted_album: + album_ok = True + else: + at_norm = _norm_album(at) + if wanted_album_norm and at_norm and ( + at_norm == wanted_album_norm + or wanted_album_norm in at_norm + or at_norm in wanted_album_norm): + album_ok = True + else: + # If album is not a dict, fall back to string compare. + at = self._stringify(track.get("album")).lower() + if at: + if at == wanted_album: + album_ok = True + else: + at_norm = _norm_album(at) + if wanted_album_norm and at_norm and ( + at_norm == wanted_album_norm + or wanted_album_norm in at_norm + or at_norm in wanted_album_norm): + album_ok = True + if not album_ok: + continue + + if wanted_artist: + if not self._track_matches_artist(track, artist_id=None, artist_name=artist_name): + continue + seen_ids.add(tid) + + disc_val = self._parse_int(track.get("volumeNumber") or track.get("discNumber") or 0) or 0 + track_val = self._parse_int(track.get("trackNumber") or 0) or 0 + candidates.append((disc_val, track_val, track)) + + candidates.sort(key=lambda t: (t[0], t[1])) + + # If strict matching found nothing, relax title matching (substring) while still + # keeping artist filtering when available. + if not candidates: + for track in tracks: + if not isinstance(track, dict): + continue + tid = self._parse_int(track.get("id") or track.get("trackId")) + if not tid or tid in seen_ids: + continue + + album = track.get("album") + if isinstance(album, dict): + at = self._stringify(album.get("title")).lower() + else: + at = self._stringify(track.get("album")).lower() + + if not at: + continue + at_norm = _norm_album(at) + if wanted_album_norm and at_norm: + if not (wanted_album_norm in at_norm or at_norm in wanted_album_norm): + continue + else: + if wanted_album not in at: + continue + if wanted_artist: + if not self._track_matches_artist(track, artist_id=None, artist_name=artist_name): + continue + + seen_ids.add(tid) + disc_val = self._parse_int(track.get("volumeNumber") or track.get("discNumber") or 0) or 0 + track_val = self._parse_int(track.get("trackNumber") or 0) or 0 + candidates.append((disc_val, track_val, track)) + + candidates.sort(key=lambda t: (t[0], t[1])) + + try: + debug(f"hifi album search tracks: album_id={album_id} matched={len(candidates)} title={repr(title)} artist={repr(artist_name)}") + except Exception: + pass + + results: List[SearchResult] = [] + for _disc, _track, track in candidates: + if limit and len(results) >= limit: + break + res = self._item_to_result(track) + if res is not None: + results.append(res) + + return results + + def _album_item_to_result(self, album: Dict[str, Any], *, artist_name: str) -> Optional[SearchResult]: + if not isinstance(album, dict): + return None + title = self._stringify(album.get("title")) + if not title: + return None + # Prefer albumId when available; some payloads carry both id/albumId. + album_id = self._parse_int(album.get("albumId") or album.get("id")) + path = f"hifi://album/{album_id}" if album_id else f"hifi://album/{self._safe_filename(title)}" + + columns: List[tuple[str, str]] = [("Album", title)] + if artist_name: + columns.append(("Artist", str(artist_name))) + + # Album stats (best-effort): show track count and total duration when available. + track_count = self._parse_int(album.get("numberOfTracks") or album.get("trackCount") or album.get("tracks") or 0) + if track_count: + columns.append(("Tracks", str(track_count))) + total_time = _format_total_seconds(album.get("duration") or album.get("durationSeconds") or album.get("duration_sec") or 0) + if total_time: + columns.append(("Total", total_time)) + + release_date = self._stringify(album.get("releaseDate") or album.get("release_date") or album.get("date")) + if release_date: + columns.append(("Release", release_date)) + + # Preserve the original album payload but add a hint for downstream. + md: Dict[str, Any] = dict(album) + if artist_name and "_artist_name" not in md: + md["_artist_name"] = artist_name + + return SearchResult( + table="hifi", + title=title, + path=path, + detail="album", + annotations=["tidal", "album"], + media_kind="audio", + columns=columns, + full_metadata=md, + ) + @staticmethod def _find_ffmpeg() -> Optional[str]: exe = shutil.which("ffmpeg") @@ -475,20 +977,56 @@ class HIFI(Provider): def _extract_track_items(self, data: Any) -> List[Dict[str, Any]]: if isinstance(data, list): - return [item for item in data if isinstance(item, dict)] + items: List[Dict[str, Any]] = [] + for item in data: + if not isinstance(item, dict): + continue + # Some endpoints return wrapper objects like {"item": {...}}. + nested = item.get("item") + if isinstance(nested, dict): + items.append(nested) + continue + nested = item.get("track") + if isinstance(nested, dict): + items.append(nested) + continue + items.append(item) + return items if not isinstance(data, dict): return [] items: List[Dict[str, Any]] = [] direct = data.get("items") if isinstance(direct, list): - items.extend(item for item in direct if isinstance(item, dict)) + for item in direct: + if not isinstance(item, dict): + continue + nested = item.get("item") + if isinstance(nested, dict): + items.append(nested) + continue + nested = item.get("track") + if isinstance(nested, dict): + items.append(nested) + continue + items.append(item) tracks_section = data.get("tracks") if isinstance(tracks_section, dict): track_items = tracks_section.get("items") if isinstance(track_items, list): - items.extend(item for item in track_items if isinstance(item, dict)) + for item in track_items: + if not isinstance(item, dict): + continue + nested = item.get("item") + if isinstance(nested, dict): + items.append(nested) + continue + nested = item.get("track") + if isinstance(nested, dict): + items.append(nested) + continue + items.append(item) top_hits = data.get("topHits") if isinstance(top_hits, list): @@ -563,20 +1101,121 @@ class HIFI(Provider): if value: key_values[key] = value - params: Dict[str, str] = {} + # The proxy API only accepts exactly one of s/a/v/p. If the user mixes + # free text with a structured key (e.g. artist:foo bar), treat the free + # text as part of the same query instead of creating an additional key. + mapped_values: Dict[str, List[str]] = {} for key, value in key_values.items(): if not value: continue mapped = _KEY_TO_PARAM.get(key) - if mapped: - params[mapped] = value + if not mapped: + continue + mapped_values.setdefault(mapped, []).append(value) - general = " ".join(part for part in free_text if part).strip() - if general: - params.setdefault("s", general) - elif not params: - params["s"] = cleaned - return params + # Choose the search key in priority order. + chosen_key = None + for candidate in ("a", "v", "p", "s"): + if mapped_values.get(candidate): + chosen_key = candidate + break + if chosen_key is None: + chosen_key = "s" + + chosen_parts: List[str] = [] + chosen_parts.extend(mapped_values.get(chosen_key, [])) + + # If the user provided free text and a structured key (like artist:), + # fold it into the chosen key instead of forcing a second key. + extra = " ".join(part for part in free_text if part).strip() + if extra: + chosen_parts.append(extra) + + chosen_value = " ".join(p for p in chosen_parts if p).strip() + if not chosen_value: + chosen_value = cleaned + + return {chosen_key: chosen_value} if chosen_value else {} + + def _extract_artist_items(self, data: Any) -> List[Dict[str, Any]]: + if isinstance(data, list): + return [item for item in data if isinstance(item, dict)] + if not isinstance(data, dict): + return [] + + items: List[Dict[str, Any]] = [] + direct = data.get("items") + if isinstance(direct, list): + items.extend(item for item in direct if isinstance(item, dict)) + + artists_section = data.get("artists") + if isinstance(artists_section, dict): + artist_items = artists_section.get("items") + if isinstance(artist_items, list): + items.extend(item for item in artist_items if isinstance(item, dict)) + + top_hits = data.get("topHits") + if isinstance(top_hits, list): + for hit in top_hits: + if not isinstance(hit, dict): + continue + hit_type = str(hit.get("type") or "").upper() + if hit_type != "ARTISTS": + continue + value = hit.get("value") + if isinstance(value, dict): + items.append(value) + + seen: set[int] = set() + deduped: List[Dict[str, Any]] = [] + for item in items: + raw_id = item.get("id") or item.get("artistId") + if raw_id is None: + continue + try: + artist_int = int(raw_id) + except Exception: + artist_int = None + if artist_int is None or artist_int in seen: + continue + seen.add(artist_int) + deduped.append(item) + + return deduped + + def _artist_item_to_result(self, item: Dict[str, Any]) -> Optional[SearchResult]: + if not isinstance(item, dict): + return None + + name = str(item.get("name") or item.get("title") or "").strip() + if not name: + return None + + raw_id = item.get("id") or item.get("artistId") + if raw_id is None: + return None + try: + artist_id = int(raw_id) + except (TypeError, ValueError): + return None + + path = f"hifi://artist/{artist_id}" + + columns: List[tuple[str, str]] = [("Artist", name), ("Artist ID", str(artist_id))] + popularity = self._stringify(item.get("popularity")) + if popularity: + columns.append(("Popularity", popularity)) + + return SearchResult( + table="hifi", + title=name, + path=path, + detail="artist", + annotations=["tidal", "artist"], + media_kind="audio", + columns=columns, + full_metadata=item, + ) @staticmethod def _format_duration(seconds: Any) -> str: @@ -649,6 +1288,12 @@ class HIFI(Provider): columns: List[tuple[str, str]] = [] if title: columns.append(("Title", title)) + disc_no = self._stringify(item.get("volumeNumber") or item.get("discNumber") or item.get("disc_number")) + track_no = self._stringify(item.get("trackNumber") or item.get("track_number")) + if disc_no: + columns.append(("Disc #", disc_no)) + if track_no: + columns.append(("Track #", track_no)) if album_title: columns.append(("Album", album_title)) if artist_display: @@ -670,6 +1315,12 @@ class HIFI(Provider): if isinstance(tag, str) and tag.strip(): tags.add(tag.strip().lower()) + # IMPORTANT: do not retain a shared reference to the raw API dict. + # Downstream playback (MPV) mutates metadata to cache the decoded Tidal + # manifest path/URL. If multiple results share the same dict reference, + # they can incorrectly collapse to a single playable target. + full_md: Dict[str, Any] = dict(item) + return SearchResult( table="hifi", title=title, @@ -679,7 +1330,7 @@ class HIFI(Provider): media_kind="audio", tag=tags, columns=columns, - full_metadata=item, + full_metadata=full_md, ) def _extract_track_selection_context( @@ -814,11 +1465,133 @@ class HIFI(Provider): current_table = ctx.get_current_stage_table() except Exception: current_table = None + if current_table is None: + try: + current_table = ctx.get_last_result_table() + except Exception: + current_table = None table_type = ( current_table.table if current_table and hasattr(current_table, "table") else None ) + + # Artist selection: selecting @N should open an albums list. + if isinstance(table_type, str) and table_type.strip().lower() == "hifi.artist": + contexts = self._extract_artist_selection_context(selected_items) + if not contexts: + return False + + artist_id, artist_name = contexts[0] + album_results = self._albums_for_artist(artist_id=artist_id, artist_name=artist_name, limit=200) + if not album_results: + return False + + try: + from SYS.rich_display import stdout_console + from SYS.result_table import ResultTable + except Exception: + return False + + table = ResultTable(f"HIFI Albums: {artist_name}").set_preserve_order(False) + table.set_table("hifi.album") + try: + table.set_table_metadata({"provider": "hifi", "view": "album", "artist_id": artist_id, "artist_name": artist_name}) + except Exception: + pass + + results_payload: List[Dict[str, Any]] = [] + for res in album_results: + table.add_result(res) + try: + results_payload.append(res.to_dict()) + except Exception: + results_payload.append({"table": "hifi", "title": getattr(res, "title", ""), "path": getattr(res, "path", "")}) + + try: + ctx.set_last_result_table(table, results_payload) + ctx.set_current_stage_table(table) + except Exception: + pass + + try: + suppress = bool(getattr(ctx, "_suppress_provider_selector_print", False)) + except Exception: + suppress = False + + if not suppress: + try: + stdout_console().print() + stdout_console().print(table) + except Exception: + pass + + return True + + # Album selection: selecting @N should open the track list for that album. + if isinstance(table_type, str) and table_type.strip().lower() == "hifi.album": + contexts = self._extract_album_selection_context(selected_items) + if not contexts: + return False + + album_id, album_title, artist_name = contexts[0] + track_results = self._tracks_for_album(album_id=album_id, album_title=album_title, artist_name=artist_name, limit=200) + if not track_results: + return False + + try: + from SYS.rich_display import stdout_console + from SYS.result_table import ResultTable + except Exception: + return False + + label = album_title + if artist_name: + label = f"{artist_name} - {album_title}" + # Preserve album order (disc/track) rather than sorting by title. + table = ResultTable(f"HIFI Tracks: {label}").set_preserve_order(True) + table.set_table("hifi.track") + try: + table.set_table_metadata( + { + "provider": "hifi", + "view": "track", + "album_id": album_id, + "album_title": album_title, + "artist_name": artist_name, + } + ) + except Exception: + pass + + results_payload: List[Dict[str, Any]] = [] + for res in track_results: + table.add_result(res) + try: + results_payload.append(res.to_dict()) + except Exception: + results_payload.append({"table": "hifi", "title": getattr(res, "title", ""), "path": getattr(res, "path", "")}) + + try: + ctx.set_last_result_table(table, results_payload) + ctx.set_current_stage_table(table) + except Exception: + pass + + try: + suppress = bool(getattr(ctx, "_suppress_provider_selector_print", False)) + except Exception: + suppress = False + + if not suppress: + try: + stdout_console().print() + stdout_console().print(table) + except Exception: + pass + + return True + if isinstance(table_type, str) and table_type.strip().lower() == "hifi.track": try: meta = ( diff --git a/Provider/openlibrary.py b/Provider/openlibrary.py index 47d85d6..32815cf 100644 --- a/Provider/openlibrary.py +++ b/Provider/openlibrary.py @@ -780,9 +780,9 @@ class OpenLibrary(Provider): elif isinstance(collection, str): values = [collection.strip().lower()] - if any(v in {"inlibrary", - "printdisabled", - "lendinglibrary"} for v in values): + # Treat borrowable as "inlibrary" (and keep "lendinglibrary" as a safe alias). + # IMPORTANT: do NOT treat "printdisabled" alone as borrowable. + if any(v in {"inlibrary", "lendinglibrary"} for v in values): return True, "archive-collection" return False, "archive-not-lendable" except Exception: @@ -1312,11 +1312,11 @@ class OpenLibrary(Provider): if lendable_local: return "borrow", reason_local, archive_id_local, "" - # Not lendable: check whether it's directly downloadable (public domain uploads, etc.). + # OpenLibrary API can be a false-negative; fall back to Archive metadata. try: - can_direct, pdf_url = self._archive_check_direct_download(archive_id_local) - if can_direct and pdf_url: - return "download", reason_local, archive_id_local, str(pdf_url) + lendable2, reason2 = self._archive_is_lendable(archive_id_local) + if lendable2: + return "borrow", reason2 or reason_local, archive_id_local, "" except Exception: pass @@ -1401,6 +1401,11 @@ class OpenLibrary(Provider): if 0 <= idx < len(availability_rows): availability, availability_reason, archive_id, direct_url = availability_rows[idx] + # UX requirement: OpenLibrary provider should ONLY show borrowable books. + # Ignore printdisabled-only and non-borrow items. + if availability != "borrow": + continue + # Patch the display column. for idx, (name, _val) in enumerate(columns): if name == "Avail": diff --git a/SYS/pipeline.py b/SYS/pipeline.py index 0ed8430..62bbd8e 100644 --- a/SYS/pipeline.py +++ b/SYS/pipeline.py @@ -10,7 +10,7 @@ from dataclasses import dataclass, field from contextvars import ContextVar from typing import Any, Dict, List, Optional, Sequence from SYS.models import PipelineStageContext -from SYS.logger import log +from SYS.logger import log, debug, is_debug_enabled def set_live_progress(progress_ui: Any) -> None: @@ -696,8 +696,11 @@ def restore_previous_result_table() -> bool: # If an underlying table exists, we're done. # Otherwise, fall through to history restore so @.. actually returns to the last table. if state.last_result_table is not None: + # Ensure subsequent @N selection uses the table the user sees. + state.current_stage_table = state.last_result_table return True if not state.result_table_history: + state.current_stage_table = state.last_result_table return True if not state.result_table_history: @@ -723,6 +726,15 @@ def restore_previous_result_table() -> bool: state.display_table = None state.display_subject = None + # Sync current stage table to the restored view so provider selectors run + # against the correct table type. + state.current_stage_table = state.last_result_table + + try: + debug_table_state("restore_previous_result_table") + except Exception: + pass + return True @@ -740,8 +752,11 @@ def restore_next_result_table() -> bool: # If an underlying table exists, we're done. # Otherwise, fall through to forward restore when available. if state.last_result_table is not None: + # Ensure subsequent @N selection uses the table the user sees. + state.current_stage_table = state.last_result_table return True if not state.result_table_forward: + state.current_stage_table = state.last_result_table return True if not state.result_table_forward: @@ -769,6 +784,15 @@ def restore_next_result_table() -> bool: state.display_table = None state.display_subject = None + # Sync current stage table to the restored view so provider selectors run + # against the correct table type. + state.current_stage_table = state.last_result_table + + try: + debug_table_state("restore_next_result_table") + except Exception: + pass + return True @@ -819,6 +843,78 @@ def get_last_result_items() -> List[Any]: return [] +def debug_table_state(label: str = "") -> None: + """Dump pipeline table and item-buffer state (debug-only). + + Useful for diagnosing cases where `@N` selection appears to act on a different + table than the one currently displayed. + """ + + if not is_debug_enabled(): + return + + state = _get_pipeline_state() + + def _tbl(name: str, t: Any) -> None: + if t is None: + debug(f"[table] {name}: None") + return + try: + table_type = getattr(t, "table", None) + except Exception: + table_type = None + try: + title = getattr(t, "title", None) + except Exception: + title = None + try: + src_cmd = getattr(t, "source_command", None) + except Exception: + src_cmd = None + try: + src_args = getattr(t, "source_args", None) + except Exception: + src_args = None + try: + no_choice = bool(getattr(t, "no_choice", False)) + except Exception: + no_choice = False + try: + preserve_order = bool(getattr(t, "preserve_order", False)) + except Exception: + preserve_order = False + try: + row_count = len(getattr(t, "rows", []) or []) + except Exception: + row_count = 0 + try: + meta = ( + t.get_table_metadata() if hasattr(t, "get_table_metadata") else getattr(t, "table_metadata", None) + ) + except Exception: + meta = None + meta_keys = list(meta.keys()) if isinstance(meta, dict) else [] + + debug( + f"[table] {name}: id={id(t)} class={type(t).__name__} title={repr(title)} table={repr(table_type)} rows={row_count} " + f"source={repr(src_cmd)} source_args={repr(src_args)} no_choice={no_choice} preserve_order={preserve_order} meta_keys={meta_keys}" + ) + + if label: + debug(f"[table] state: {label}") + _tbl("display_table", getattr(state, "display_table", None)) + _tbl("current_stage_table", getattr(state, "current_stage_table", None)) + _tbl("last_result_table", getattr(state, "last_result_table", None)) + + try: + debug( + f"[table] buffers: display_items={len(state.display_items or [])} last_result_items={len(state.last_result_items or [])} " + f"history={len(state.result_table_history or [])} forward={len(state.result_table_forward or [])} last_selection={list(state.last_selection or [])}" + ) + except Exception: + pass + + def get_last_selectable_result_items() -> List[Any]: """Get items from the last *selectable* result table, ignoring display-only items. diff --git a/cmdlet/download_file.py b/cmdlet/download_file.py index bb1ebf6..a0dda28 100644 --- a/cmdlet/download_file.py +++ b/cmdlet/download_file.py @@ -205,7 +205,7 @@ class Download_File(Cmdlet): elif isinstance(collection, str): values = [collection.strip().lower()] if collection.strip() else [] - lendable = any(v in {"inlibrary", "printdisabled", "lendinglibrary"} for v in values) + lendable = any(v in {"inlibrary", "lendinglibrary"} for v in values) except Exception: lendable = False diff --git a/cmdlet/search_file.py b/cmdlet/search_file.py index 1ab7dbf..ed25e56 100644 --- a/cmdlet/search_file.py +++ b/cmdlet/search_file.py @@ -280,6 +280,49 @@ class search_file(Cmdlet): results = provider.search(query, limit=limit) debug(f"[search-file] {provider_name} -> {len(results or [])} result(s)") + # HIFI artist UX: if there is exactly one artist match, auto-expand + # directly to albums without requiring an explicit @1 selection. + if ( + provider_lower == "hifi" + and table_meta.get("view") == "artist" + and isinstance(results, list) + and len(results) == 1 + ): + try: + artist_res = results[0] + artist_name = str(getattr(artist_res, "title", "") or "").strip() + artist_md = getattr(artist_res, "full_metadata", None) + artist_id = None + if isinstance(artist_md, dict): + raw_id = artist_md.get("artistId") or artist_md.get("id") + try: + artist_id = int(raw_id) if raw_id is not None else None + except Exception: + artist_id = None + + album_results = [] + if hasattr(provider, "_albums_for_artist") and callable(getattr(provider, "_albums_for_artist")): + try: + album_results = provider._albums_for_artist( # type: ignore[attr-defined] + artist_id=artist_id, + artist_name=artist_name, + limit=max(int(limit or 0), 200), + ) + except Exception: + album_results = [] + + if album_results: + results = album_results + table_type = "hifi.album" + table.set_table(table_type) + table_meta["view"] = "album" + try: + table.set_table_metadata(table_meta) + except Exception: + pass + except Exception: + pass + if not results: log(f"No results found for query: {query}", file=sys.stderr) if db is not None: diff --git a/cmdnat/pipe.py b/cmdnat/pipe.py index 4b294c5..cfad3a3 100644 --- a/cmdnat/pipe.py +++ b/cmdnat/pipe.py @@ -1109,18 +1109,27 @@ def _queue_items( } _send_ipc_command(ytdl_cmd, silent=True) + # For memory:// M3U payloads (used to carry titles), use loadlist so mpv parses + # the content as a playlist and does not expose #EXTINF lines as entries. + command_name = "loadfile" + try: + if isinstance(target_to_send, str) and target_to_send.startswith("memory://") and "#EXTM3U" in target_to_send: + command_name = "loadlist" + except Exception: + pass + cmd = { - "command": ["loadfile", + "command": [command_name, target_to_send, mode], "request_id": 200 } try: - debug(f"Sending MPV loadfile: {target_to_send} mode={mode}") + debug(f"Sending MPV {command_name}: {target_to_send} mode={mode}") resp = _send_ipc_command(cmd, silent=True) - debug(f"MPV loadfile response: {resp}") + debug(f"MPV {command_name} response: {resp}") except Exception as e: - debug(f"Exception sending loadfile to MPV: {e}", file=sys.stderr) + debug(f"Exception sending {command_name} to MPV: {e}", file=sys.stderr) resp = None if resp is None: diff --git a/cmdnat/table.py b/cmdnat/table.py new file mode 100644 index 0000000..53720b3 --- /dev/null +++ b/cmdnat/table.py @@ -0,0 +1,123 @@ +from typing import Any, Dict, Sequence + +from cmdlet._shared import Cmdlet, CmdletArg +from SYS.logger import log + + +def _run(piped_result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: + # Debug utility: dump current pipeline table state (display/current/last + buffers) + try: + from SYS import pipeline as ctx + except Exception as exc: + log(f"Failed to import pipeline context: {exc}") + return 1 + + state = None + try: + state = ctx.get_pipeline_state() if hasattr(ctx, "get_pipeline_state") else None + except Exception: + state = None + + def _summarize_table(name: str, t: Any) -> None: + if t is None: + print(f"{name}: None") + return + try: + table_type = getattr(t, "table", None) + except Exception: + table_type = None + try: + title = getattr(t, "title", None) + except Exception: + title = None + try: + src_cmd = getattr(t, "source_command", None) + except Exception: + src_cmd = None + try: + src_args = getattr(t, "source_args", None) + except Exception: + src_args = None + try: + no_choice = bool(getattr(t, "no_choice", False)) + except Exception: + no_choice = False + try: + preserve_order = bool(getattr(t, "preserve_order", False)) + except Exception: + preserve_order = False + try: + row_count = len(getattr(t, "rows", []) or []) + except Exception: + row_count = 0 + try: + meta = ( + t.get_table_metadata() if hasattr(t, "get_table_metadata") else getattr(t, "table_metadata", None) + ) + except Exception: + meta = None + meta_keys = list(meta.keys()) if isinstance(meta, dict) else [] + + print( + f"{name}: id={id(t)} class={type(t).__name__} title={repr(title)} table={repr(table_type)} rows={row_count} " + f"source={repr(src_cmd)} source_args={repr(src_args)} no_choice={no_choice} preserve_order={preserve_order} meta_keys={meta_keys}" + ) + + label = "" + try: + label = str(args[0]) if args else "" + if label: + print(f"Table State: {label}") + else: + print("Table State") + except Exception: + print("Table State") + + try: + _summarize_table("display_table", getattr(state, "display_table", None) if state is not None else None) + _summarize_table("current_stage_table", getattr(state, "current_stage_table", None) if state is not None else None) + _summarize_table("last_result_table", getattr(state, "last_result_table", None) if state is not None else None) + + display_items = getattr(state, "display_items", None) if state is not None else None + last_result_items = getattr(state, "last_result_items", None) if state is not None else None + hist = getattr(state, "result_table_history", None) if state is not None else None + fwd = getattr(state, "result_table_forward", None) if state is not None else None + last_sel = getattr(state, "last_selection", None) if state is not None else None + + print( + "buffers: " + f"display_items={len(display_items or [])} " + f"last_result_items={len(last_result_items or [])} " + f"history={len(hist or [])} " + f"forward={len(fwd or [])} " + f"last_selection={list(last_sel or [])}" + ) + except Exception as exc: + log(f"Failed to summarize table state: {exc}") + return 1 + + # If debug logging is enabled, also emit the richer debug dump. + try: + if hasattr(ctx, "debug_table_state"): + ctx.debug_table_state(label or ".table") + except Exception: + pass + + return 0 + + +CMDLET = Cmdlet( + name=".table", + summary="Dump pipeline table state for debugging", + usage=".table [label]", + arg=[ + CmdletArg( + name="label", + type="string", + description="Optional label to include in the dump", + required=False, + ), + ], +) + +CMDLET.exec = _run