From cc715e1fef1cfc958617f441d3ff6fbb2944992c Mon Sep 17 00:00:00 2001 From: Nose Date: Wed, 11 Feb 2026 16:29:02 -0800 Subject: [PATCH] kl --- Provider/Tidal.py | 146 +++++++++++++++++++++++++++++++++++++++++----- cmdnat/pipe.py | 6 ++ 2 files changed, 137 insertions(+), 15 deletions(-) diff --git a/Provider/Tidal.py b/Provider/Tidal.py index 161fdba..cbe2e70 100644 --- a/Provider/Tidal.py +++ b/Provider/Tidal.py @@ -16,7 +16,7 @@ from API.Tidal import ( extract_artists, stringify, ) -from ProviderCore.base import Provider, SearchResult, parse_inline_query_arguments +from ProviderCore.base import Provider, SearchResult from cmdlet._shared import get_field from SYS import pipeline as pipeline_context from SYS.logger import debug, log @@ -98,7 +98,34 @@ class Tidal(Provider): def prefers_transfer_progress(self) -> bool: return True - def _get_view(self, query: str) -> str: + @staticmethod + def _normalize_query_filters(filters: Optional[Dict[str, Any]]) -> Dict[str, str]: + """Normalize cmdlet-provided filters / inline args. + + The search-file cmdlet calls `provider.extract_query_arguments()` and then + passes the extracted args back in as `filters=`. We treat those as + first-class query arguments. + """ + + out: Dict[str, str] = {} + if not isinstance(filters, dict): + return out + for k, v in filters.items(): + key = str(k or "").strip().lower() + if not key: + continue + val = str(v or "").strip() + if not val: + continue + out[key] = val + return out + + def _get_view(self, query: str, filters: Optional[Dict[str, Any]] = None) -> str: + # If filters/inline args specify the view, that wins. + inline_args = self._normalize_query_filters(filters) + if inline_args: + return self._determine_view(str(query or "").strip(), inline_args) + text = str(query or "").strip() if not text: return "track" @@ -109,12 +136,12 @@ class Tidal(Provider): return "track" def get_table_type(self, query: str, filters: Optional[Dict[str, Any]] = None) -> str: - view = self._get_view(query) + view = self._get_view(query, filters) return f"tidal.{view}" def get_table_metadata(self, query: str, filters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: meta = super().get_table_metadata(query, filters) - meta["view"] = self._get_view(query) + meta["view"] = self._get_view(query, filters) return meta def __init__(self, config: Optional[Dict[str, Any]] = None) -> None: @@ -127,12 +154,71 @@ class Tidal(Provider): self.api_clients = [TidalApiClient(base_url=url, timeout=self.api_timeout) for url in self.api_urls] def extract_query_arguments(self, query: str) -> Tuple[str, Dict[str, Any]]: - normalized, parsed = parse_inline_query_arguments(query) - filtered: Dict[str, Any] = {} - for key, value in parsed.items(): + """Parse inline `key:value` query arguments. + + Unlike the generic parser in ProviderCore, this supports multi-word + values (e.g. `artist:elliott smith`). + + Returns: + (normalized_free_text_query, parsed_args) + """ + + cleaned = str(query or "").strip() + if not cleaned: + return "", {} + + segments: List[str] = [] + for chunk in _DELIMITERS_RE.split(cleaned): + chunk = chunk.strip() + if not chunk: + continue + if ":" in chunk: + for sub in _SEGMENT_BOUNDARY_RE.split(chunk): + part = sub.strip() + if part: + segments.append(part) + else: + segments.append(chunk) + + parsed_args: Dict[str, Any] = {} + free_text: List[str] = [] + for segment in segments: + # Support both key:value and key=value. + sep_index = segment.find(":") + if sep_index < 0: + sep_index = segment.find("=") + + if sep_index <= 0: + free_text.append(segment) + continue + + key = segment[:sep_index].strip().lower() + value = segment[sep_index + 1 :].strip().strip('"').strip("'") + if not key or not value: + free_text.append(segment) + continue + if key in self.QUERY_ARG_CHOICES: - filtered[key] = value - return normalized, filtered + parsed_args[key] = value + else: + # Unknown key: keep it in the free text so it isn't silently lost. + free_text.append(segment) + + normalized = " ".join(part for part in free_text if part).strip() + + # If the query was *only* structured args (no free text), provide a + # human-friendly query string for table titles (avoid falling back to '*'). + if not normalized and parsed_args: + for preferred in ("artist", "album", "track", "title", "playlist", "video"): + val = str(parsed_args.get(preferred) or "").strip() + if val: + normalized = val + break + if not normalized: + # Last resort: join all values. + normalized = " ".join(str(v) for v in parsed_args.values() if str(v).strip()).strip() + + return normalized, parsed_args def validate(self) -> bool: return bool(self.api_urls) @@ -148,14 +234,44 @@ class Tidal(Provider): return [] normalized_query, inline_args = self.extract_query_arguments(query) raw_query = str(query or "").strip() - search_query = normalized_query or raw_query - if not search_query and inline_args: - search_query = " ".join(f"{k}:{v}" for k, v in inline_args.items()) - if not search_query: + search_query = (normalized_query or raw_query).strip() + + # Merge cmdlet-provided filters with inline args. + merged_args: Dict[str, str] = {} + merged_args.update(self._normalize_query_filters(filters)) + for k, v in (inline_args or {}).items(): + key = str(k or "").strip().lower() + val = str(v or "").strip() + if key and val: + merged_args[key] = val + + # Best-effort: if the cmdlet split a multi-word value (e.g. artist:elliott smith + # -> filters={'artist': 'elliott'}, query='smith'), stitch it back together. + if merged_args.get("artist") and search_query and search_query not in {"*", ""}: + candidate = merged_args.get("artist", "") + if candidate: + low_candidate = candidate.lower() + low_query = search_query.lower() + if low_query and low_query not in low_candidate: + # Only append when it looks like plain text (not another structured segment). + if ":" not in search_query and "=" not in search_query: + merged_args["artist"] = f"{candidate} {search_query}".strip() + + # Determine view from merged args (preferred), otherwise from the query text. + view = self._determine_view(search_query, merged_args) if merged_args else self._determine_view(search_query, inline_args) + + # Build API params. Prefer structured args when present; the backend only accepts + # one of s/a/v/p. + structured_query = " ".join( + f"{k}:{v}" for k, v in merged_args.items() if k in self.QUERY_ARG_CHOICES and str(v).strip() + ).strip() + params_source = structured_query or search_query + if not params_source and merged_args: + params_source = " ".join(f"{k}:{v}" for k, v in merged_args.items() if str(v).strip()).strip() + if not params_source: return [] - view = self._determine_view(search_query, inline_args) - params = self._build_search_params(search_query) + params = self._build_search_params(params_source) if not params: return [] diff --git a/cmdnat/pipe.py b/cmdnat/pipe.py index a8aff56..9118ab3 100644 --- a/cmdnat/pipe.py +++ b/cmdnat/pipe.py @@ -734,6 +734,12 @@ def _is_probable_ytdl_url(url: str) -> bool: if "tidal.com" in lower and "/manifest" in lower: return False + # Exclude Tidal CDN direct media URLs (these are already-resolved streams and do + # not need ytdl-hook; wrapping them in memory:// M3U is safe and lets us carry + # per-item titles into MPV's playlist UI). + if "audio.tidal.com" in lower or "/mediatracks/" in lower: + return False + # Exclude AllDebrid protected links if "alldebrid.com/f/" in lower: return False