This commit is contained in:
2026-02-11 16:29:02 -08:00
parent 33d7461db6
commit cc715e1fef
2 changed files with 137 additions and 15 deletions

View File

@@ -16,7 +16,7 @@ from API.Tidal import (
extract_artists,
stringify,
)
from ProviderCore.base import Provider, SearchResult, parse_inline_query_arguments
from ProviderCore.base import Provider, SearchResult
from cmdlet._shared import get_field
from SYS import pipeline as pipeline_context
from SYS.logger import debug, log
@@ -98,7 +98,34 @@ class Tidal(Provider):
def prefers_transfer_progress(self) -> bool:
return True
def _get_view(self, query: str) -> str:
@staticmethod
def _normalize_query_filters(filters: Optional[Dict[str, Any]]) -> Dict[str, str]:
"""Normalize cmdlet-provided filters / inline args.
The search-file cmdlet calls `provider.extract_query_arguments()` and then
passes the extracted args back in as `filters=`. We treat those as
first-class query arguments.
"""
out: Dict[str, str] = {}
if not isinstance(filters, dict):
return out
for k, v in filters.items():
key = str(k or "").strip().lower()
if not key:
continue
val = str(v or "").strip()
if not val:
continue
out[key] = val
return out
def _get_view(self, query: str, filters: Optional[Dict[str, Any]] = None) -> str:
# If filters/inline args specify the view, that wins.
inline_args = self._normalize_query_filters(filters)
if inline_args:
return self._determine_view(str(query or "").strip(), inline_args)
text = str(query or "").strip()
if not text:
return "track"
@@ -109,12 +136,12 @@ class Tidal(Provider):
return "track"
def get_table_type(self, query: str, filters: Optional[Dict[str, Any]] = None) -> str:
view = self._get_view(query)
view = self._get_view(query, filters)
return f"tidal.{view}"
def get_table_metadata(self, query: str, filters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
meta = super().get_table_metadata(query, filters)
meta["view"] = self._get_view(query)
meta["view"] = self._get_view(query, filters)
return meta
def __init__(self, config: Optional[Dict[str, Any]] = None) -> None:
@@ -127,12 +154,71 @@ class Tidal(Provider):
self.api_clients = [TidalApiClient(base_url=url, timeout=self.api_timeout) for url in self.api_urls]
def extract_query_arguments(self, query: str) -> Tuple[str, Dict[str, Any]]:
normalized, parsed = parse_inline_query_arguments(query)
filtered: Dict[str, Any] = {}
for key, value in parsed.items():
"""Parse inline `key:value` query arguments.
Unlike the generic parser in ProviderCore, this supports multi-word
values (e.g. `artist:elliott smith`).
Returns:
(normalized_free_text_query, parsed_args)
"""
cleaned = str(query or "").strip()
if not cleaned:
return "", {}
segments: List[str] = []
for chunk in _DELIMITERS_RE.split(cleaned):
chunk = chunk.strip()
if not chunk:
continue
if ":" in chunk:
for sub in _SEGMENT_BOUNDARY_RE.split(chunk):
part = sub.strip()
if part:
segments.append(part)
else:
segments.append(chunk)
parsed_args: Dict[str, Any] = {}
free_text: List[str] = []
for segment in segments:
# Support both key:value and key=value.
sep_index = segment.find(":")
if sep_index < 0:
sep_index = segment.find("=")
if sep_index <= 0:
free_text.append(segment)
continue
key = segment[:sep_index].strip().lower()
value = segment[sep_index + 1 :].strip().strip('"').strip("'")
if not key or not value:
free_text.append(segment)
continue
if key in self.QUERY_ARG_CHOICES:
filtered[key] = value
return normalized, filtered
parsed_args[key] = value
else:
# Unknown key: keep it in the free text so it isn't silently lost.
free_text.append(segment)
normalized = " ".join(part for part in free_text if part).strip()
# If the query was *only* structured args (no free text), provide a
# human-friendly query string for table titles (avoid falling back to '*').
if not normalized and parsed_args:
for preferred in ("artist", "album", "track", "title", "playlist", "video"):
val = str(parsed_args.get(preferred) or "").strip()
if val:
normalized = val
break
if not normalized:
# Last resort: join all values.
normalized = " ".join(str(v) for v in parsed_args.values() if str(v).strip()).strip()
return normalized, parsed_args
def validate(self) -> bool:
return bool(self.api_urls)
@@ -148,14 +234,44 @@ class Tidal(Provider):
return []
normalized_query, inline_args = self.extract_query_arguments(query)
raw_query = str(query or "").strip()
search_query = normalized_query or raw_query
if not search_query and inline_args:
search_query = " ".join(f"{k}:{v}" for k, v in inline_args.items())
if not search_query:
search_query = (normalized_query or raw_query).strip()
# Merge cmdlet-provided filters with inline args.
merged_args: Dict[str, str] = {}
merged_args.update(self._normalize_query_filters(filters))
for k, v in (inline_args or {}).items():
key = str(k or "").strip().lower()
val = str(v or "").strip()
if key and val:
merged_args[key] = val
# Best-effort: if the cmdlet split a multi-word value (e.g. artist:elliott smith
# -> filters={'artist': 'elliott'}, query='smith'), stitch it back together.
if merged_args.get("artist") and search_query and search_query not in {"*", ""}:
candidate = merged_args.get("artist", "")
if candidate:
low_candidate = candidate.lower()
low_query = search_query.lower()
if low_query and low_query not in low_candidate:
# Only append when it looks like plain text (not another structured segment).
if ":" not in search_query and "=" not in search_query:
merged_args["artist"] = f"{candidate} {search_query}".strip()
# Determine view from merged args (preferred), otherwise from the query text.
view = self._determine_view(search_query, merged_args) if merged_args else self._determine_view(search_query, inline_args)
# Build API params. Prefer structured args when present; the backend only accepts
# one of s/a/v/p.
structured_query = " ".join(
f"{k}:{v}" for k, v in merged_args.items() if k in self.QUERY_ARG_CHOICES and str(v).strip()
).strip()
params_source = structured_query or search_query
if not params_source and merged_args:
params_source = " ".join(f"{k}:{v}" for k, v in merged_args.items() if str(v).strip()).strip()
if not params_source:
return []
view = self._determine_view(search_query, inline_args)
params = self._build_search_params(search_query)
params = self._build_search_params(params_source)
if not params:
return []