From 41c11d39fd7130472ac73f8758808cf7fdf1d063 Mon Sep 17 00:00:00 2001 From: Nose Date: Tue, 6 Jan 2026 01:38:59 -0800 Subject: [PATCH] refactor(download): remove ProviderCore/download.py, move sanitize_filename to SYS.utils, replace callers to use API.HTTP.HTTPClient --- API/HTTP.py | 64 ++- CLI.py | 213 +++++++- Provider/alldebrid.py | 4 +- Provider/bandcamp.py | 34 +- Provider/internetarchive.py | 2 +- Provider/libgen.py | 2 +- Provider/openlibrary.py | 36 +- Provider/vimm.py | 998 +++++++++++++++++++++++++++------- ProviderCore/base.py | 50 ++ ProviderCore/download.py | 100 ---- ProviderCore/inline_utils.py | 127 +++++ ProviderCore/registry.py | 175 +++++- SYS/html_table.py | 302 ++++++++++ SYS/pipeline.py | 21 + SYS/provider_helpers.py | 110 ++++ SYS/result_table.py | 31 ++ SYS/result_table_adapters.py | 83 ++- SYS/result_table_api.py | 45 +- SYS/result_table_renderers.py | 21 +- SYS/utils.py | 18 + Store/registry.py | 58 ++ cmdlet/_shared.py | 7 +- cmdlet/add_file.py | 6 +- cmdlet/download_file.py | 4 +- cmdlet/provider_table.py | 37 +- cmdlet/screen_shot.py | 9 +- cmdlet/search_file.py | 39 +- cmdlet/select_item.py | 54 +- docs/CHANGELOG.md | 9 + docs/PR_PROVIDER_AUTHORING.md | 15 + docs/provider_authoring.md | 141 +++++ docs/result_table.md | 7 +- scripts/README.md | 60 ++ scripts/debug_import_vimm.py | 9 + scripts/list_providers.py | 4 + tmp_trim_registry.py | 10 - tmp_write_registry.py | 3 - tool/playwright.py | 258 ++++++++- 38 files changed, 2640 insertions(+), 526 deletions(-) delete mode 100644 ProviderCore/download.py create mode 100644 ProviderCore/inline_utils.py create mode 100644 SYS/html_table.py create mode 100644 SYS/provider_helpers.py create mode 100644 docs/CHANGELOG.md create mode 100644 docs/PR_PROVIDER_AUTHORING.md create mode 100644 docs/provider_authoring.md create mode 100644 scripts/README.md create mode 100644 scripts/debug_import_vimm.py create mode 100644 scripts/list_providers.py delete mode 100644 tmp_trim_registry.py delete mode 100644 tmp_write_registry.py diff --git a/API/HTTP.py b/API/HTTP.py index 7e07448..b95e44c 100644 --- a/API/HTTP.py +++ b/API/HTTP.py @@ -20,7 +20,7 @@ from pathlib import Path from urllib.parse import unquote, urlparse, parse_qs import logging -from SYS.logger import debug, log +from SYS.logger import debug, is_debug_enabled, log from SYS.models import DebugLogger, DownloadError, DownloadMediaResult, ProgressBar from SYS.utils import ensure_directory, sha256_file @@ -51,8 +51,11 @@ def _resolve_verify_value(verify_ssl: bool) -> Union[bool, str]: return env_cert def _try_module_bundle(mod_name: str) -> Optional[str]: + # Prefer checking sys.modules first (helps test injection / monkeypatching) try: - mod = __import__(mod_name) + mod = sys.modules.get(mod_name) + if mod is None: + mod = __import__(mod_name) except Exception: return None @@ -178,6 +181,28 @@ class HTTPClient: self._httpx_verify = _resolve_verify_value(verify_ssl) + # Debug helpers + def _debug_panel(self, title: str, rows: List[tuple[str, Any]]) -> None: + if not is_debug_enabled(): + return + try: + from rich.table import Table + from rich.panel import Panel + + grid = Table.grid(padding=(0, 1)) + grid.add_column("Key", style="cyan", no_wrap=True) + grid.add_column("Value") + for key, val in rows: + try: + grid.add_row(str(key), str(val)) + except Exception: + grid.add_row(str(key), "") + + debug(Panel(grid, title=title, expand=False)) + except Exception: + # Fallback to simple debug output + debug(title, rows) + def __enter__(self): """Context manager entry.""" self._client = httpx.Client( @@ -425,8 +450,33 @@ class HTTPClient: last_exception = None for attempt in range(self.retries): + self._debug_panel( + "HTTP request", + [ + ("method", method), + ("url", url), + ("attempt", f"{attempt + 1}/{self.retries}"), + ("params", kwargs.get("params")), + ("headers", kwargs.get("headers")), + ("verify", self._httpx_verify), + ("follow_redirects", kwargs.get("follow_redirects", False)), + ], + ) try: response = self._client.request(method, url, **kwargs) + self._debug_panel( + "HTTP response", + [ + ("method", method), + ("url", url), + ("status", getattr(response, "status_code", "")), + ("elapsed", getattr(response, "elapsed", "")), + ( + "content_length", + response.headers.get("content-length") if hasattr(response, "headers") else "", + ), + ], + ) if raise_for_status: response.raise_for_status() return response @@ -537,6 +587,16 @@ class HTTPClient: else: kwargs["headers"] = self._get_headers() + self._debug_panel( + "HTTP stream", + [ + ("method", method), + ("url", url), + ("headers", kwargs.get("headers")), + ("follow_redirects", kwargs.get("follow_redirects", False)), + ], + ) + return self._client.stream(method, url, **kwargs) diff --git a/CLI.py b/CLI.py index 46a9d31..b298c92 100644 --- a/CLI.py +++ b/CLI.py @@ -67,6 +67,7 @@ from SYS.cmdlet_catalog import ( ) from SYS.config import get_local_storage_path, load_config from SYS.result_table import ResultTable +from ProviderCore.registry import provider_inline_query_choices HELP_EXAMPLE_SOURCE_COMMANDS = { ".help-example", @@ -797,10 +798,10 @@ class CmdletIntrospection: @staticmethod def store_choices(config: Dict[str, Any]) -> List[str]: try: - from Store import Store + # Use config-only helper to avoid instantiating backends during completion + from Store.registry import list_configured_backend_names - storage = Store(config=config, suppress_debug=True) - return list(storage.list_backends() or []) + return list(list_configured_backend_names(config) or []) except Exception: return [] @@ -903,6 +904,21 @@ class CmdletCompleter(Completer): return used + @staticmethod + def _flag_value(tokens: Sequence[str], *flags: str) -> Optional[str]: + want = {str(f).strip().lower() for f in flags if str(f).strip()} + if not want: + return None + for idx, tok in enumerate(tokens): + low = str(tok or "").strip().lower() + if "=" in low: + head, val = low.split("=", 1) + if head in want: + return tok.split("=", 1)[1] + if low in want and idx + 1 < len(tokens): + return tokens[idx + 1] + return None + def get_completions( self, document: Document, @@ -971,6 +987,48 @@ class CmdletCompleter(Completer): prev_token = stage_tokens[-2].lower() if len(stage_tokens) > 1 else "" config = self._config_loader.load() + + provider_name = None + if cmd_name == "search-file": + provider_name = self._flag_value(stage_tokens, "-provider", "--provider") + + if ( + cmd_name == "search-file" + and provider_name + and not ends_with_space + and ":" in current_token + and not current_token.startswith("-") + ): + # Allow quoted tokens like "system:g + quote_prefix = current_token[0] if current_token[:1] in {"'", '"'} else "" + inline_token = current_token[1:] if quote_prefix else current_token + if inline_token.endswith(quote_prefix) and len(inline_token) > 1: + inline_token = inline_token[:-1] + + # Allow comma-separated inline specs; operate on the last segment only. + if "," in inline_token: + inline_token = inline_token.split(",")[-1].lstrip() + + if ":" not in inline_token: + return + + field, partial = inline_token.split(":", 1) + field = field.strip().lower() + partial_lower = partial.strip().lower() + inline_choices = provider_inline_query_choices(provider_name, field, config) + if inline_choices: + filtered = ( + [c for c in inline_choices if partial_lower in str(c).lower()] + if partial_lower + else list(inline_choices) + ) + for choice in (filtered or inline_choices): + # Replace only the partial after the colon; keep the field prefix and quotes as typed. + start_pos = -len(partial) + suggestion = str(choice) + yield Completion(suggestion, start_position=start_pos) + return + choices = CmdletIntrospection.arg_choices( cmd_name=cmd_name, arg_name=prev_token, @@ -2580,27 +2638,32 @@ class PipelineExecutor: else: cmd_list = [] - expanded_stage: List[str] = cmd_list + source_args + selected_row_args + # IMPORTANT: Put selected row args *before* source_args. + # Rationale: The cmdlet argument parser treats the *first* unknown + # token as a positional value (e.g., URL). If `source_args` + # contain unknown flags (like -provider which download-file does + # not declare), they could be misinterpreted as the positional + # URL argument and cause attempts to download strings like + # "-provider" (which is invalid). By placing selection args + # first we ensure the intended URL/selection token is parsed + # as the positional URL and avoid this class of parsing errors. + expanded_stage: List[str] = cmd_list + selected_row_args + source_args - if first_stage_had_extra_args and stages: - expanded_stage += stages[0] - stages[0] = expanded_stage - else: - stages.insert(0, expanded_stage) + if first_stage_had_extra_args and stages: + expanded_stage += stages[0] + stages[0] = expanded_stage + else: + stages.insert(0, expanded_stage) - if pipeline_session and worker_manager: - try: - worker_manager.log_step( - pipeline_session.worker_id, - f"@N expansion: {source_cmd} + {' '.join(str(x) for x in selected_row_args)}", - ) - except Exception: - pass + if pipeline_session and worker_manager: + try: + worker_manager.log_step( + pipeline_session.worker_id, + f"@N expansion: {source_cmd} + selected_args={selected_row_args} + source_args={source_args}", + ) + except Exception: + pass - selection_indices = [] - command_expanded = True - - if (not command_expanded) and selection_indices: stage_table = None try: stage_table = ctx.get_current_stage_table() @@ -2770,6 +2833,41 @@ class PipelineExecutor: except Exception: auto_stage = None + def _apply_row_action_to_stage(stage_idx: int) -> bool: + if not selection_indices or len(selection_indices) != 1: + return False + try: + row_action = ctx.get_current_stage_table_row_selection_action( + selection_indices[0] + ) + except Exception: + row_action = None + if not row_action: + # Fallback to serialized payload when the table row is unavailable + try: + items = ctx.get_last_result_items() or [] + if 0 <= selection_indices[0] < len(items): + maybe = items[selection_indices[0]] + if isinstance(maybe, dict): + candidate = maybe.get("_selection_action") + if isinstance(candidate, (list, tuple)): + row_action = [str(x) for x in candidate if x is not None] + debug(f"@N row {selection_indices[0]} restored action from payload: {row_action}") + except Exception: + row_action = row_action or None + if not row_action: + debug(f"@N row {selection_indices[0]} has no selection_action") + return False + normalized = [str(x) for x in row_action if x is not None] + if not normalized: + return False + debug(f"Applying row action for row {selection_indices[0]} -> {normalized}") + if 0 <= stage_idx < len(stages): + debug(f"Replacing stage {stage_idx} {stages[stage_idx]} with row action {normalized}") + stages[stage_idx] = normalized + return True + return False + if not stages: if isinstance(table_type, str) and table_type.startswith("metadata."): print("Auto-applying metadata selection via get-tag") @@ -2779,7 +2877,43 @@ class PipelineExecutor: print(f"Auto-running selection via {auto_stage[0]}") except Exception: pass - stages.append(list(auto_stage)) + # Append the auto stage now. If the user also provided a selection + # (e.g., @1 | add-file ...), we want to attach the row selection + # args *to the auto-inserted stage* so the download command receives + # the selected row information immediately. + stages.append(list(auto_stage) + (source_args or [])) + debug(f"Inserted auto stage before row action: {stages[-1]}") + + # If the caller included a selection (e.g., @1) try to attach + # the selection args immediately to the inserted auto stage so + # the expansion is effective in a single pass. + if selection_indices: + try: + if not _apply_row_action_to_stage(len(stages) - 1): + # Only support single-row selection for auto-attach here + if len(selection_indices) == 1: + idx = selection_indices[0] + row_args = ctx.get_current_stage_table_row_selection_args(idx) + if not row_args: + try: + items = ctx.get_last_result_items() or [] + if 0 <= idx < len(items): + maybe = items[idx] + if isinstance(maybe, dict): + candidate = maybe.get("_selection_args") + if isinstance(candidate, (list, tuple)): + row_args = [str(x) for x in candidate if x is not None] + except Exception: + row_args = row_args or None + if row_args: + # Place selection args before any existing source args + inserted = stages[-1] + if inserted: + cmd = inserted[0] + tail = [str(x) for x in inserted[1:]] + stages[-1] = [cmd] + [str(x) for x in row_args] + tail + except Exception: + pass else: first_cmd = stages[0][0] if stages and stages[0] else None if isinstance(table_type, str) and table_type.startswith("metadata.") and first_cmd not in ( @@ -2795,8 +2929,41 @@ class PipelineExecutor: auto_cmd_norm = _norm_cmd(auto_stage[0]) if first_cmd_norm not in (auto_cmd_norm, ".pipe", ".mpv"): debug(f"Auto-inserting {auto_cmd_norm} after selection") - stages.insert(0, list(auto_stage)) + # Insert the auto stage before the user-specified stage + stages.insert(0, list(auto_stage) + (source_args or [])) + debug(f"Inserted auto stage before existing pipeline: {stages[0]}") + # If a selection is present, attach the row selection args to the + # newly-inserted stage so the download stage runs with the + # selected row information. + if selection_indices: + try: + if not _apply_row_action_to_stage(0): + if len(selection_indices) == 1: + idx = selection_indices[0] + row_args = ctx.get_current_stage_table_row_selection_args(idx) + if not row_args: + try: + items = ctx.get_last_result_items() or [] + if 0 <= idx < len(items): + maybe = items[idx] + if isinstance(maybe, dict): + candidate = maybe.get("_selection_args") + if isinstance(candidate, (list, tuple)): + row_args = [str(x) for x in candidate if x is not None] + except Exception: + row_args = row_args or None + if row_args: + inserted = stages[0] + if inserted: + cmd = inserted[0] + tail = [str(x) for x in inserted[1:]] + stages[0] = [cmd] + [str(x) for x in row_args] + tail + except Exception: + pass + + # After inserting/appending an auto-stage, continue processing so later + # selection-expansion logic can still run (e.g., for example selectors). return True, piped_result else: print("No previous results to select from\n") diff --git a/Provider/alldebrid.py b/Provider/alldebrid.py index c4f76f0..36abb20 100644 --- a/Provider/alldebrid.py +++ b/Provider/alldebrid.py @@ -11,7 +11,7 @@ from urllib.parse import urlparse from API.HTTP import HTTPClient, _download_direct_file from API.alldebrid import AllDebridClient, parse_magnet_or_hash, is_torrent_file from ProviderCore.base import Provider, SearchResult -from ProviderCore.download import sanitize_filename +from SYS.utils import sanitize_filename from SYS.logger import log, debug from SYS.models import DownloadError @@ -495,7 +495,7 @@ def adjust_output_dir_for_alldebrid( full_metadata: Optional[Dict[str, Any]], item: Any, ) -> Path: - from ProviderCore.download import sanitize_filename as _sf + from SYS.utils import sanitize_filename as _sf output_dir = base_output_dir md = full_metadata if isinstance(full_metadata, dict) else {} diff --git a/Provider/bandcamp.py b/Provider/bandcamp.py index 4a42bfc..a494771 100644 --- a/Provider/bandcamp.py +++ b/Provider/bandcamp.py @@ -7,10 +7,7 @@ from typing import Any, Dict, List, Optional from ProviderCore.base import Provider, SearchResult from SYS.logger import log, debug -try: - from playwright.sync_api import sync_playwright -except ImportError: # pragma: no cover - sync_playwright = None +from tool.playwright import PlaywrightTool class Bandcamp(Provider): @@ -137,8 +134,7 @@ class Bandcamp(Provider): if not stage_is_last: return False - if sync_playwright is None: - return False + # Playwright is required; proceed to handle artist selection # Only handle artist selections. chosen: List[Dict[str, Any]] = [] @@ -219,11 +215,10 @@ class Bandcamp(Provider): artist_url = chosen[0].get("url") or "" try: - with sync_playwright() as p: - browser = p.chromium.launch(headless=True) - page = browser.new_page() + tool = PlaywrightTool({}) + tool.require() + with tool.open_page(headless=True) as page: discography = self._scrape_artist_page(page, artist_url, limit=50) - browser.close() except Exception as exc: print(f"bandcamp artist lookup failed: {exc}\n") return True @@ -275,18 +270,10 @@ class Bandcamp(Provider): Any]] = None, **kwargs: Any, ) -> List[SearchResult]: - if sync_playwright is None: - log( - "[bandcamp] Playwright not available. Install with: pip install playwright", - file=sys.stderr, - ) - return [] - try: - with sync_playwright() as p: - browser = p.chromium.launch(headless=True) - page = browser.new_page() - + tool = PlaywrightTool({}) + tool.require() + with tool.open_page(headless=True) as page: if query.strip().lower().startswith("artist:"): artist_name = query[7:].strip().strip('"') search_url = f"https://bandcamp.com/search?q={artist_name}&item_type=b" @@ -294,8 +281,6 @@ class Bandcamp(Provider): search_url = f"https://bandcamp.com/search?q={query}&item_type=a" results = self._scrape_url(page, search_url, limit) - - browser.close() return results except Exception as exc: @@ -366,4 +351,5 @@ class Bandcamp(Provider): return results def validate(self) -> bool: - return sync_playwright is not None + # Playwright is required for the provider to function + return True diff --git a/Provider/internetarchive.py b/Provider/internetarchive.py index f0f2a4f..cc4d642 100644 --- a/Provider/internetarchive.py +++ b/Provider/internetarchive.py @@ -10,7 +10,7 @@ from typing import Any, Dict, List, Optional from urllib.parse import quote, urlparse from ProviderCore.base import Provider, SearchResult -from ProviderCore.download import sanitize_filename +from SYS.utils import sanitize_filename from SYS.logger import log # Helper for download-file: render selectable formats for a details URL. diff --git a/Provider/libgen.py b/Provider/libgen.py index 6db8a01..bc6e6a8 100644 --- a/Provider/libgen.py +++ b/Provider/libgen.py @@ -11,7 +11,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple from urllib.parse import urljoin, urlparse, unquote from ProviderCore.base import Provider, SearchResult -from ProviderCore.download import sanitize_filename +from SYS.utils import sanitize_filename from SYS.logger import log from SYS.models import ProgressBar diff --git a/Provider/openlibrary.py b/Provider/openlibrary.py index 32815cf..d298eed 100644 --- a/Provider/openlibrary.py +++ b/Provider/openlibrary.py @@ -18,7 +18,7 @@ import requests from API.HTTP import HTTPClient from ProviderCore.base import Provider, SearchResult -from ProviderCore.download import download_file, sanitize_filename +from SYS.utils import sanitize_filename from SYS.cli_syntax import get_field, get_free_text, parse_query from SYS.logger import debug, log from SYS.utils import unique_path @@ -1541,21 +1541,25 @@ class OpenLibrary(Provider): except Exception: pass out_path = unique_path(output_dir / f"{safe_title}.pdf") - ok = download_file( - pdf_url, - out_path, - session=self._session, - progress_callback=( - ( - lambda downloaded, total, label: - progress_callback("bytes", downloaded, total, label) - ) if progress_callback is not None else None - ), - ) - if ok: - return out_path - log("[openlibrary] Direct download failed", file=sys.stderr) - return None + try: + with HTTPClient(timeout=30.0) as client: + path = client.download( + pdf_url, + str(out_path), + chunk_size=1024 * 256, + progress_callback=( + (lambda downloaded, total: progress_callback("bytes", downloaded, total, safe_title)) + if progress_callback is not None + else None + ), + ) + if path and path.exists(): + return path + log("[openlibrary] Direct download failed", file=sys.stderr) + return None + except Exception: + log("[openlibrary] Direct download failed", file=sys.stderr) + return None # 2) Borrow flow (credentials required). try: diff --git a/Provider/vimm.py b/Provider/vimm.py index 39b49a9..647eb19 100644 --- a/Provider/vimm.py +++ b/Provider/vimm.py @@ -1,242 +1,830 @@ -"""Vimm provider skeleton (lxml + HTTPClient). +"""Minimal Vimm provider: table-row parsing for display. -This is a lightweight, resilient provider implementation intended as a -starting point for implementing a full Vimm (vimm.net) provider. - -It prefers server-rendered HTML parsing via lxml and uses the repo's -`HTTPClient` helper for robust HTTP calls (timeouts/retries). - -Selectors in `search()` are intentionally permissive heuristics; update the -XPaths to match the real site HTML when you have an actual fixture. +This minimal implementation focuses on fetching a Vimm search result page, +turning the vault table rows into SearchResults, and letting the CLI +auto-insert the download-file stage directly from the first table so that +Playwright-driven downloads happen without showing a nested detail table. """ from __future__ import annotations -import re -import sys -from typing import Any, Dict, List, Optional -from urllib.parse import urljoin, quote_plus +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import parse_qsl, parse_qs, urljoin, urlparse, urlunparse, urlencode from lxml import html as lxml_html +import base64 +import re +from pathlib import Path from API.HTTP import HTTPClient -from ProviderCore.base import Provider, SearchResult -from SYS.logger import log, debug +from ProviderCore.base import Provider, SearchResult, parse_inline_query_arguments +from ProviderCore.inline_utils import resolve_filter +from SYS.logger import debug +from SYS.provider_helpers import TableProviderMixin +from tool.playwright import PlaywrightTool -class Vimm(Provider): - """Provider for vimm.net vault listings (skeleton). +class Vimm(TableProviderMixin, Provider): + """Minimal provider for vimm.net vault listings using TableProvider mixin. - - Uses lxml for parsing - - No authentication required - """ + NOTES / HOW-TO (selection & auto-download): + - This provider exposes file rows on a detail page. Each file row includes + a `path` which is an absolute download URL (or a form action + mediaId). + + - To make `@N` expansion robust (so users can do `@1 | add-file -store `) + we ensure three things: + 1) The ResultTable produced by the `selector()` sets `source_command` to + "download-file" (the canonical cmdlet for downloading files). + 2) Each row carries explicit selection args: `['-url', '']`. + Using an explicit `-url` flag avoids ambiguity during argument + parsing (some cmdlets accept positional URLs, others accept flags). + 3) The CLI's expansion logic places selection args *before* provider + source args (e.g., `-provider vimm`) so the first positional token is + the intended URL (not an unknown flag like `-provider`). + + - Why this approach? Argument parsing treats the *first* unrecognized token + as a positional value (commonly interpreted as a URL). If a provider + injects hints like `-provider vimm` *before* a bare URL, the parser can + misinterpret `-provider` as the URL, causing confusing attempts to + download `-provider`. By using `-url` and ensuring the URL appears first + we avoid that class of bugs and make `@N` -> `download-file`/`add-file` + flows reliable. + + The code below implements these choices (and contains inline comments + explaining specific decisions).""" URL = ("https://vimm.net/vault/",) URL_DOMAINS = ("vimm.net",) + REGION_CHOICES = [ + {"value": "1", "text": "Argentina"}, + {"value": "2", "text": "Asia"}, + {"value": "3", "text": "Australia"}, + {"value": "35", "text": "Austria"}, + {"value": "31", "text": "Belgium"}, + {"value": "4", "text": "Brazil"}, + {"value": "5", "text": "Canada"}, + {"value": "6", "text": "China"}, + {"value": "38", "text": "Croatia"}, + {"value": "7", "text": "Denmark"}, + {"value": "8", "text": "Europe"}, + {"value": "9", "text": "Finland"}, + {"value": "10", "text": "France"}, + {"value": "11", "text": "Germany"}, + {"value": "12", "text": "Greece"}, + {"value": "13", "text": "Hong Kong"}, + {"value": "27", "text": "India"}, + {"value": "33", "text": "Ireland"}, + {"value": "34", "text": "Israel"}, + {"value": "14", "text": "Italy"}, + {"value": "15", "text": "Japan"}, + {"value": "16", "text": "Korea"}, + {"value": "30", "text": "Latin America"}, + {"value": "17", "text": "Mexico"}, + {"value": "18", "text": "Netherlands"}, + {"value": "40", "text": "New Zealand"}, + {"value": "19", "text": "Norway"}, + {"value": "28", "text": "Poland"}, + {"value": "29", "text": "Portugal"}, + {"value": "20", "text": "Russia"}, + {"value": "32", "text": "Scandinavia"}, + {"value": "37", "text": "South Africa"}, + {"value": "21", "text": "Spain"}, + {"value": "22", "text": "Sweden"}, + {"value": "36", "text": "Switzerland"}, + {"value": "23", "text": "Taiwan"}, + {"value": "39", "text": "Turkey"}, + {"value": "41", "text": "United Arab Emirates"}, + {"value": "24", "text": "United Kingdom"}, + {"value": "25", "text": "USA"}, + {"value": "26", "text": "World"}, + ] + + QUERY_ARG_CHOICES = { + "system": [ + "Atari2600", + "Atari5200", + "Atari7800", + "CDi", + "Dreamcast", + "GB", + "GBA", + "GBC", + "GG", + "GameCube", + "Genesis", + "Jaguar", + "JaguarCD", + "Lynx", + "SMS", + "NES", + "3DS", + "N64", + "DS", + "PS1", + "PS2", + "PS3", + "PSP", + "Saturn", + "32X", + "SegaCD", + "SNES", + "TG16", + "TGCD", + "VB", + "Wii", + "WiiWare", + "Xbox", + "Xbox360", + "X360-D", + ], + "region": REGION_CHOICES, + } + # ProviderCore still looks for INLINE_QUERY_FIELD_CHOICES, so expose this + # mapping once and keep QUERY_ARG_CHOICES as the readable name we prefer. + INLINE_QUERY_FIELD_CHOICES = QUERY_ARG_CHOICES + + # Table metadata/constants grouped near the table helpers below. + TABLE_AUTO_STAGES = {"vimm": ["download-file"]} + AUTO_STAGE_USE_SELECTION_ARGS = True + TABLE_SYSTEM_COLUMN = {"label": "Platform", "metadata_key": "system"} + def validate(self) -> bool: - # This provider has no required config; consider more checks if needed. return True - def _parse_size_bytes(self, size_str: str) -> Optional[int]: - if not size_str: - return None - try: - s = str(size_str or "").strip().replace(",", "") - m = re.search(r"(?P[\d\.]+)\s*(?P[KMGT]?B)?", s, flags=re.I) - if not m: - return None - val = float(m.group("val")) - unit = (m.group("unit") or "B").upper() - mul = { - "B": 1, - "KB": 1024, - "MB": 1024 ** 2, - "GB": 1024 ** 3, - "TB": 1024 ** 4, - }.get(unit, 1) - return int(val * mul) - except Exception: - return None - - def search( - self, - query: str, - limit: int = 50, - filters: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> List[SearchResult]: + def search(self, query: str, limit: int = 50, filters: Optional[Dict[str, Any]] = None, **kwargs: Any) -> List[SearchResult]: q = (query or "").strip() if not q: return [] - # Build search/list URL base = "https://vimm.net/vault/" - url = f"{base}?p=list&q={quote_plus(q)}" + normalized_filters: Dict[str, Any] = {} + for key, value in (filters or {}).items(): + if key is None: + continue + normalized_filters[str(key).lower()] = value + + system_value = normalized_filters.get("system") or normalized_filters.get("platform") + system_param = str(system_value or "").strip() + + region_value = normalized_filters.get("region") + region_param = str(region_value or "").strip() + + params = [("p", "list"), ("q", q)] + if system_param: + params.append(("system", system_param)) + if region_param: + params.append(("region", region_param)) + url = f"{base}?{urlencode(params)}" + debug(f"[vimm] search: query={q} url={url} filters={normalized_filters}") try: - with HTTPClient(timeout=20.0) as client: + with HTTPClient(timeout=9.0) as client: resp = client.get(url) content = resp.content except Exception as exc: - # Log and return empty results on failure. The HTTP client will - # already attempt a certifi-based retry in common certificate - # verification failure cases; if you still see cert errors, install - # the `certifi` package or configure SSL_CERT_FILE to point at a - # valid CA bundle. - log(f"[vimm] HTTP fetch failed: {exc}", file=sys.stderr) + debug(f"[vimm] HTTP fetch failed: {exc}") return [] try: doc = lxml_html.fromstring(content) except Exception as exc: - log(f"[vimm] HTML parse failed: {exc}", file=sys.stderr) + debug(f"[vimm] HTML parse failed: {exc}") return [] - results: List[SearchResult] = [] - - # Candidate XPaths for list items (tweak to match real DOM) - container_xpaths = [ - '//div[contains(@class,"list-item")]', - '//div[contains(@class,"result")]', - '//li[contains(@class,"item")]', - '//tr[contains(@class,"result")]', - '//article', + xpaths = [ + "//table//tbody/tr", + "//table//tr[td]", + "//div[contains(@class,'list-item')]", + "//div[contains(@class,'result')]", + "//li[contains(@class,'item')]", ] - nodes = [] - for xp in container_xpaths: + rows = doc.xpath("//table//tr[td]") + results = self._build_results_from_rows(rows, url, system_param, limit) + if not results: + results = self.search_table_from_url(url, limit=limit, xpaths=xpaths) + self._ensure_system_column(results, system_param) + + results = [self._apply_selection_defaults(r, referer=url, detail_url=getattr(r, "path", "")) for r in (results or [])] + + debug(f"[vimm] results={len(results)}") + return results[: int(limit)] + + def extract_query_arguments(self, query: str) -> Tuple[str, Dict[str, Any]]: + normalized, inline_args = parse_inline_query_arguments(query) + inline_args_norm: Dict[str, Any] = {} + for k, v in (inline_args or {}).items(): + if k is None: + continue + key_norm = str(k).strip().lower() + if key_norm == "platform": + key_norm = "system" + inline_args_norm[key_norm] = v + + filters = resolve_filter(self, inline_args_norm) + return normalized, filters + + def _build_results_from_rows( + self, + rows: List[Any], + base_url: str, + system_value: Optional[str], + limit: int, + ) -> List[SearchResult]: + out: List[SearchResult] = [] + seen: set[str] = set() + system_column = getattr(self, "TABLE_SYSTEM_COLUMN", {}) or {} + key = str(system_column.get("metadata_key") or "system").strip() + if not key: + key = "system" + + for tr in rows: + if len(out) >= limit: + break + rec = self._parse_table_row(tr, base_url, system_value) + if not rec: + continue + path = rec.get("path") + if not path or path in seen: + continue + seen.add(path) + columns = self._build_columns_from_record(rec) + if not columns: + continue + metadata: Dict[str, Any] = {"raw_record": rec, "detail_url": path, "referer": base_url} + if path: + metadata["_selection_args"] = ["-url", path] + platform_value = rec.get("platform") + if platform_value: + metadata[key] = platform_value + sr = SearchResult( + table="vimm", + title=rec.get("title") or "", + path=path, + detail="", + annotations=[], + media_kind="file", + size_bytes=None, + tag={"vimm"}, + columns=columns, + full_metadata=metadata, + ) + out.append(self._apply_selection_defaults(sr, referer=base_url, detail_url=path)) + return out + + def _parse_table_row(self, tr: Any, base_url: str, system_value: Optional[str]) -> Dict[str, str]: + tds = tr.xpath("./td") + if not tds: + return {} + + rec: Dict[str, str] = {} + title_anchor = tds[0].xpath('.//a[contains(@href,"/vault/")]') or [] + if title_anchor: + el = title_anchor[0] + rec["title"] = (el.text_content() or "").strip() + href = el.get("href") or "" + rec["path"] = urljoin(base_url, href) if href else "" + if system_value: + rec["platform"] = str(system_value).strip().upper() + rec["region"] = self._flag_text_at(tds, 1) + rec["version"] = self._text_at(tds, 2) + rec["languages"] = self._text_at(tds, 3) + else: + raw_platform = (tds[0].text_content() or "").strip() + if raw_platform: + rec["platform"] = raw_platform.upper() + anchors = tds[1].xpath('.//a[contains(@href,"/vault/")]') or tds[1].xpath('.//a') + if not anchors: + return {} + el = anchors[0] + rec["title"] = (el.text_content() or "").strip() + href = el.get("href") or "" + rec["path"] = urljoin(base_url, href) if href else "" + rec["region"] = self._flag_text_at(tds, 2) + rec["version"] = self._text_at(tds, 3) + rec["languages"] = self._text_at(tds, 4) + + return {k: v for k, v in rec.items() if v} + + def _text_at(self, tds: List[Any], idx: int) -> str: + if idx < 0 or idx >= len(tds): + return "" + return (tds[idx].text_content() or "").strip() + + def _flag_text_at(self, tds: List[Any], idx: int) -> str: + if idx < 0 or idx >= len(tds): + return "" + td = tds[idx] + imgs = td.xpath('.//img[contains(@class,"flag")]/@title') + if imgs: + return str(imgs[0]).strip() + return (td.text_content() or "").strip() + + def _build_columns_from_record(self, rec: Dict[str, str]) -> List[Tuple[str, str]]: + title = rec.get("title") or "" + if not title: + return [] + columns: List[Tuple[str, str]] = [("Title", title)] + system_column = getattr(self, "TABLE_SYSTEM_COLUMN", {}) or {} + label = str(system_column.get("label") or "Platform") + platform_value = rec.get("platform") + if platform_value: + columns.append((label, platform_value)) + for key, friendly in (("region", "Region"), ("version", "Version"), ("languages", "Languages")): + value = rec.get(key) + if value: + columns.append((friendly, value)) + return columns + + def _apply_selection_defaults(self, sr: SearchResult, *, referer: Optional[str], detail_url: Optional[str]) -> SearchResult: + """Attach selection metadata so @N expansion passes a usable URL first.""" + + try: + md = dict(getattr(sr, "full_metadata", {}) or {}) + except Exception: + md = {} + + path_val = str(getattr(sr, "path", "") or "") + if not path_val: + path_val = str(detail_url or "") + + if path_val: + md.setdefault("_selection_args", ["-url", path_val]) + md.setdefault("detail_url", detail_url or path_val) + if referer: + md.setdefault("referer", referer) + + sr.full_metadata = md + return sr + + def _ensure_system_column(self, results: List[SearchResult], system_value: Optional[str]) -> None: + if not results or not system_value: + return + label_value = str(system_value).strip() + if not label_value: + return + label_value = label_value.upper() + system_column = getattr(self, "TABLE_SYSTEM_COLUMN", {}) or {} + label_name = str(system_column.get("label") or "Platform").strip() + if not label_name: + label_name = "Platform" + normalized_label = label_name.strip().lower() + metadata_key = str(system_column.get("metadata_key") or "system").strip() + if not metadata_key: + metadata_key = "system" + for result in results: try: - found = doc.xpath(xp) - if found: - nodes = found - debug(f"[vimm] using xpath {xp} -> {len(found)} nodes") - break + cols = getattr(result, "columns", None) + if isinstance(cols, list): + lowered = {str(name or "").strip().lower() for name, _ in cols} + if normalized_label not in lowered: + insert_pos = 1 if cols else 0 + cols.insert(insert_pos, (label_name, label_value)) + metadata = getattr(result, "full_metadata", None) + if isinstance(metadata, dict): + metadata.setdefault(metadata_key, label_value) except Exception: continue - # Fallback: try generic anchors under a list area - if not nodes: - try: - nodes = doc.xpath('//div[contains(@id,"list")]/div') or doc.xpath('//div[contains(@class,"results")]/div') - except Exception: - nodes = [] + def _parse_detail_doc(self, doc, base_url: str) -> List[Any]: + """Parse a Vimm detail page (non-standard table layout) and return a list + of SearchResult or dict payloads suitable for `ResultTable.add_result()`. - for n in (nodes or [])[: max(1, int(limit))]: - try: - # Prefer explicit title anchors - title = None - href = None + The function extracts simple key/value rows and file download entries (anchors + or download forms) and returns property dicts first followed by file SearchResults. + """ + def _build_download_url(action_url: str, params: Dict[str, str]) -> str: + if not action_url: + return "" + if not params: + return action_url + cleaned = {k: str(v) for k, v in params.items() if v is not None and str(v) != ""} + if not cleaned: + return action_url + parsed = urlparse(action_url) + existing = dict(parse_qsl(parsed.query, keep_blank_values=True)) + existing.update(cleaned) + query = urlencode(existing, doseq=True) + return urlunparse(parsed._replace(query=query)) + + try: + # Prefer the compact 'rounded' detail table when present + tables = doc.xpath('//table[contains(@class,"rounded") and contains(@class,"cellpadding1")]') or doc.xpath('//table[contains(@class,"rounded")]') + if not tables: + return [] + + tbl = tables[0] + trs = tbl.xpath('.//tr') or [] + + # Aggregate page properties into a mapping and create file rows with Title, Region, CRC, Version + props: Dict[str, Any] = {} + anchors_by_label: Dict[str, List[Dict[str, str]]] = {} + + for tr in trs: try: - # a few heuristic searches for a meaningful anchor - a = (n.xpath('.//a[contains(@class,"title")]') or - n.xpath('.//h2/a') or - n.xpath('.//a[contains(@href,"/vault/")]') or - n.xpath('.//a')) - if a: - a0 = a[0] - title = a0.text_content().strip() - href = a0.get('href') + if tr.xpath('.//hr'): + continue + tds = tr.xpath('./td') + if not tds: + continue + + # Canvas-based title row (base64 encoded in data-v) + canvas = tr.xpath('.//canvas[@data-v]') + if canvas: + data_v = canvas[0].get('data-v') or '' + try: + raw = base64.b64decode(data_v) + txt = raw.decode('utf-8', errors='ignore').strip() + except Exception: + txt = (canvas[0].text_content() or '').strip() + if txt: + props['Title'] = txt + continue + + label = (tds[0].text_content() or '').strip() + if not label: + continue + val_td = tds[-1] + + # collect anchors under this label for later detection + anchors = val_td.xpath('.//a') + if anchors: + entries = [] + for a in anchors: + entries.append({'text': (a.text_content() or '').strip(), 'href': a.get('href') or ''}) + # try to capture any explicit span value (e.g., CRC) even if an anchor exists + span_data = val_td.xpath('.//span[@id]/text()') + if span_data: + props[label] = str(span_data[0]).strip() + else: + # fallback to direct text nodes excluding anchor text + txts = [t.strip() for t in val_td.xpath('.//text()') if t.strip()] + anchor_texts = [a.text_content().strip() for a in anchors if a.text_content()] + filtered = [t for t in txts if t not in anchor_texts] + if filtered: + props[label] = filtered[0] + anchors_by_label[label] = entries + continue + + img_title = val_td.xpath('.//img/@title') + if img_title: + val = str(img_title[0]).strip() + else: + span_data = val_td.xpath('.//span[@id]/text()') + if span_data: + val = str(span_data[0]).strip() + else: + opt = val_td.xpath('.//select/option[@selected]/text()') + if opt: + val = str(opt[0]).strip() + else: + vt = val_td.xpath('.//div[@id="version_text"]/text()') + if vt: + val = vt[0].strip() + else: + val = (val_td.text_content() or '').strip() + + props[label] = val except Exception: - title = None - href = None + continue - if not title: - title = (n.text_content() or "").strip() + # Download form handling: find action, mediaId, and dl_size + form = doc.xpath('//form[@id="dl_form"]') + action = '' + media_id = None + dl_size = None + form_inputs: Dict[str, str] = {} + download_url = '' + if form: + f = form[0] + action = f.get('action') or '' + if action.startswith('//'): + action = 'https:' + action + elif action.startswith('/'): + action = urljoin(base_url, action) + media_ids = f.xpath('.//input[@name="mediaId"]/@value') + media_id = media_ids[0] if media_ids else None + size_vals = doc.xpath('//td[@id="dl_size"]/text()') + dl_size = size_vals[0].strip() if size_vals else None + inputs = f.xpath('.//input[@name]') + for inp in inputs: + name = (inp.get('name') or '').strip() + if not name: + continue + form_inputs[name] = inp.get('value') or '' + download_url = _build_download_url(action, form_inputs) - path = urljoin(base, href) if href else "" + file_results: List[SearchResult] = [] - # Extract size & platform heuristics - size_text = "" - try: - s = n.xpath('.//*[contains(@class,"size")]/text()') or n.xpath('.//span[contains(text(),"MB") or contains(text(),"GB")]/text()') - if s: - size_text = str(s[0]).strip() - except Exception: - size_text = "" + # Create file rows from anchors that look like downloads + for lbl, alist in anchors_by_label.items(): + for a in alist: + href = a.get('href') or '' + txt = a.get('text') or '' + is_download_link = False + if href: + low = href.lower() + if 'p=download' in low or '/download' in low or '/dl' in low: + is_download_link = True + for ext in ('.zip', '.nes', '.gba', '.bin', '.7z', '.iso'): + if low.endswith(ext): + is_download_link = True + break + if txt and re.search(r"\.[a-z0-9]{1,5}$", txt, re.I): + is_download_link = True + if not is_download_link: + continue - size_bytes = self._parse_size_bytes(size_text) + title = txt or props.get('Title') or '' + path = urljoin(base_url, href) if href else '' + cols = [("Title", title), ("Region", props.get('Region', '')), ("CRC", props.get('CRC', '')), ("Version", props.get('Version', ''))] + if dl_size: + cols.append(("Size", dl_size)) + metadata: Dict[str, Any] = {"raw_record": {"label": lbl}} + if base_url: + metadata["referer"] = base_url + metadata.setdefault("detail_url", base_url) + sr = SearchResult(table="vimm", title=title, path=path, detail="", annotations=[], media_kind="file", size_bytes=None, tag={"vimm"}, columns=cols, full_metadata=metadata) + file_results.append(self._apply_selection_defaults(sr, referer=base_url, detail_url=base_url)) - platform = "" - try: - p = n.xpath('.//*[contains(@class,"platform")]/text()') - if p: - platform = str(p[0]).strip() - except Exception: - platform = "" + # If no explicit file anchors, but we have a form, create a single file entry using page properties + if not file_results and (media_id or action): + # Ensure CRC is captured even if earlier parsing missed it + if not props.get('CRC'): + try: + crc_vals = doc.xpath('//span[@id="data-crc"]/text()') + if crc_vals: + props['CRC'] = str(crc_vals[0]).strip() + except Exception: + pass - columns = [] - if platform: - columns.append(("Platform", platform)) - if size_text: - columns.append(("Size", size_text)) - - results.append( - SearchResult( - table="vimm", - title=str(title or "").strip(), - path=str(path or ""), - detail="", - annotations=[], - media_kind="file", - size_bytes=size_bytes, - tag={"vimm"}, - columns=columns, - full_metadata={"raw": lxml_html.tostring(n, encoding="unicode")}, - ) + title = props.get('Title') or '' + cols = [("Title", title), ("Region", props.get('Region', '')), ("CRC", props.get('CRC', '')), ("Version", props.get('Version', ''))] + if dl_size: + cols.append(("Size", dl_size)) + target_path = download_url or action or base_url + sr = SearchResult( + table="vimm", + title=title, + path=target_path, + detail="", + annotations=[], + media_kind="file", + size_bytes=None, + tag={"vimm"}, + columns=cols, + full_metadata={ + "mediaId": media_id, + "dl_action": action, + "download_url": download_url, + "form_params": dict(form_inputs), + "referer": base_url, + "raw_props": props, + }, ) - except Exception: - continue + file_results.append(self._apply_selection_defaults(sr, referer=base_url, detail_url=base_url)) - return results[: max(0, int(limit))] + # Attach mediaId/dl_action to file rows + if file_results and (media_id or action): + for fi in file_results: + try: + fi.full_metadata = dict(getattr(fi, 'full_metadata', {}) or {}) + if media_id: + fi.full_metadata['mediaId'] = media_id + if action: + fi.full_metadata['dl_action'] = action + if form_inputs: + fi.full_metadata.setdefault('form_params', dict(form_inputs)) + if download_url: + fi.full_metadata['download_url'] = download_url + if dl_size and not any((k.lower() == 'size') for k, _ in getattr(fi, 'columns', [])): + fi.columns.append(("Size", dl_size)) + except Exception: + continue + + # Return only file rows (properties are attached as columns) + return file_results + except Exception: + return [] + + def _fetch_detail_rows(self, detail_url: str) -> List[SearchResult]: + """Fetch the detail page for a selected row and return the parsed file rows.""" + + detail_url = str(detail_url or "").strip() + if not detail_url: + return [] + try: + with HTTPClient(timeout=9.0) as client: + resp = client.get(detail_url) + doc = lxml_html.fromstring(resp.content) + except Exception as exc: + debug(f"[vimm] detail fetch failed: {exc}") + return [] + return self._parse_detail_doc(doc, base_url=detail_url) + + def _download_from_payload(self, payload: Dict[str, Any], output_dir: Path) -> Optional[Path]: + """Download using the metadata/form data stored in a SearchResult payload.""" + + try: + d = payload or {} + fm = d.get("full_metadata") or {} + media_id = fm.get("mediaId") or fm.get("media_id") + base_action = fm.get("dl_action") or d.get("path") or "" + download_url = fm.get("download_url") + params = dict(fm.get("form_params") or {}) + if media_id: + params.setdefault("mediaId", media_id) + target = download_url or base_action + if not target: + return None + if target.startswith("//"): + target = "https:" + target + + # Avoid downloading HTML detail pages directly; let detail parsing handle them. + low_target = target.lower() + if ("vimm.net/vault" in low_target or "?p=list" in low_target) and not download_url and not media_id and not params: + return None + + referer = fm.get("referer") or d.get("referer") or d.get("detail_url") + headers: Dict[str, str] = {} + + if not referer: + try: + from SYS.pipeline import get_last_result_items + + items = get_last_result_items() or [] + try: + parsed_target = urlparse(target) + target_qs = parse_qs(parsed_target.query) + target_media = None + if isinstance(target_qs, dict): + target_media = (target_qs.get("mediaId") or target_qs.get("mediaid") or [None])[0] + if target_media is not None: + target_media = str(target_media) + except Exception: + target_media = None + + found = None + for it in items: + try: + it_d = it if isinstance(it, dict) else (it.to_dict() if hasattr(it, "to_dict") else {}) + fm2 = (it_d.get("full_metadata") or {}) if isinstance(it_d, dict) else {} + dl_cand = (fm2.get("download_url") or fm2.get("dl_action") or it_d.get("path")) + if target_media: + m2 = None + if isinstance(fm2, dict): + m2 = str(fm2.get("mediaId") or fm2.get("media_id") or "") + if m2 and m2 == target_media: + found = it_d + break + if dl_cand and str(dl_cand).strip() and (str(dl_cand).strip() == str(target).strip() or str(dl_cand) in str(target) or str(target) in str(dl_cand)): + found = it_d + break + except Exception: + continue + + if found: + referer = (found.get("full_metadata") or {}).get("referer") or found.get("detail_url") or found.get("path") + except Exception: + referer = referer + + if referer: + headers["Referer"] = str(referer) + headers_arg = headers or None + + out_dir = Path(output_dir or Path(".")) + out_dir.mkdir(parents=True, exist_ok=True) + filename_hint = str(d.get("title") or f"vimm_{media_id or 'download'}") + + with HTTPClient(timeout=60.0) as client: + try: + if download_url: + resp = client.get(target, headers=headers_arg) + elif params: + resp = client.get(target, params=params, headers=headers_arg) + else: + resp = client.get(target, headers=headers_arg) + except Exception as exc_get: + try: + detail_url = referer or target + p = self._playwright_fetch(detail_url, out_dir, selector="form#dl_form button[type=submit]", timeout_sec=60) + if p: + debug(f"[vimm] downloaded via Playwright after get() error: {p}") + return p + except Exception as e: + debug(f"[vimm] Playwright download failed after get() error: {e}") + + debug(f"[vimm] HTTP GET failed (network): {exc_get}") + return None + + try: + resp.raise_for_status() + except Exception as exc: + try: + detail_url = referer or target + p = self._playwright_fetch(detail_url, out_dir, selector="form#dl_form button[type=submit]", timeout_sec=60) + if p: + debug(f"[vimm] downloaded via Playwright after HTTP error: {p}") + return p + except Exception as e: + debug(f"[vimm] Playwright download failed after HTTP error: {e}") + + debug(f"[vimm] HTTP GET failed: {exc}") + return None + + content = getattr(resp, "content", b"") or b"" + cd = getattr(resp, "headers", {}).get("content-disposition", "") if hasattr(resp, "headers") else "" + m = re.search(r'filename\*?=(?:"([^"]*)"|([^;\s]*))', cd) + if m: + fname = m.group(1) or m.group(2) + else: + fname = filename_hint + + out_path = out_dir / str(fname) + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_bytes(content) + return out_path + except Exception as exc: + debug(f"[vimm] download failed: {exc}") + return None + + def _playwright_fetch(self, detail_url: str, out_dir: Path, selector: str = "form#dl_form button[type=submit]", timeout_sec: int = 90) -> Optional[Path]: + """Attempt a browser-driven download using the shared Playwright tool. + + Playwright is a required runtime dependency for this operation; import + failures will surface at module import time rather than being silently + swallowed by per-call guards. + """ + + # Prefer headful-first attempts for Vimm to mirror real browser behaviour + cfg = {} + try: + from SYS.config import load_config + + cfg = load_config() or {} + except Exception: + cfg = {} + + tool = PlaywrightTool(cfg) + result = tool.download_file( + detail_url, + selector=selector, + out_dir=out_dir, + timeout_sec=timeout_sec, + headless_first=False, + debug_mode=False, + ) + if result.ok and result.path: + return result.path + debug(f"[vimm] playwright helper failed: {result.error}") + return None + + def download(self, result: Any, output_dir: Path, progress_callback: Optional[Any] = None) -> Optional[Path]: + """Download an item identified on a Vimm detail page.""" + + payload = result.to_dict() if hasattr(result, "to_dict") else (result if isinstance(result, dict) else {}) + downloaded = self._download_from_payload(payload, output_dir) + if downloaded: + return downloaded + + detail_url = str(payload.get("path") or "").strip() + if not detail_url: + return None + + for row in self._fetch_detail_rows(detail_url): + detail_payload = row.to_dict() if hasattr(row, "to_dict") else (row if isinstance(row, dict) else {}) + downloaded = self._download_from_payload(detail_payload, output_dir) + if downloaded: + return downloaded + + return None -# Bridge into the ResultTable provider registry so vimm results can be rendered -# with the new provider/table/select API. +# Minimal provider registration + +# Minimal provider registration try: from SYS.result_table_adapters import register_provider - from SYS.result_table_api import ResultModel - from SYS.result_table_api import title_column, ext_column, metadata_column + from SYS.result_table_api import ResultModel, title_column, metadata_column def _convert_search_result_to_model(sr): - try: - if hasattr(sr, "to_dict"): - d = sr.to_dict() - elif isinstance(sr, dict): - d = sr - else: - d = { - "title": getattr(sr, "title", str(sr)), - "path": getattr(sr, "path", None), - "size_bytes": getattr(sr, "size_bytes", None), - "columns": getattr(sr, "columns", None), - "full_metadata": getattr(sr, "full_metadata", None), - } - except Exception: - d = {"title": getattr(sr, "title", str(sr))} - + d = sr.to_dict() if hasattr(sr, "to_dict") else (sr if isinstance(sr, dict) else {"title": getattr(sr, "title", str(sr))}) title = d.get("title") or "" path = d.get("path") or None - size = d.get("size_bytes") or None - ext = None + columns = d.get("columns") or getattr(sr, "columns", None) or [] + metadata: Dict[str, Any] = {} + for name, value in columns: + key = str(name or "").strip().lower() + if key in ("system", "region", "version", "languages", "size"): + metadata[key] = value try: - if path: - from pathlib import Path - - suf = Path(str(path)).suffix - if suf: - ext = suf.lstrip(".") + fm = d.get("full_metadata") or {} + if isinstance(fm, dict): + for k, v in fm.items(): + metadata[str(k).strip().lower()] = v except Exception: - ext = None - - metadata = d.get("full_metadata") or d.get("metadata") or {} - return ResultModel( - title=str(title), - path=str(path) if path is not None else None, - ext=str(ext) if ext is not None else None, - size_bytes=int(size) if size is not None else None, - metadata=metadata or {}, - source="vimm", - ) + pass + return ResultModel(title=str(title), path=str(path) if path else None, ext=None, size_bytes=None, metadata=metadata, source="vimm") def _adapter(items): for it in items: @@ -244,48 +832,36 @@ try: def _columns_factory(rows): cols = [title_column()] - if any(getattr(r, "ext", None) for r in rows): - cols.append(ext_column()) - if any(getattr(r, "size_bytes", None) for r in rows): + md = lambda key: any((r.metadata or {}).get(key) for r in rows) + if md("system"): + cols.append(metadata_column("system", "system")) + if md("region"): + cols.append(metadata_column("region", "Region")) + if md("version"): + cols.append(metadata_column("version", "Version")) + if md("languages"): + cols.append(metadata_column("languages", "Languages")) + if md("size"): cols.append(metadata_column("size", "Size")) - # Add up to 2 discovered metadata keys from rows - seen = [] - for r in rows: - for k in (r.metadata or {}).keys(): - if k in ("name", "title", "path"): - continue - if k not in seen: - seen.append(k) - if len(seen) >= 2: - break - if len(seen) >= 2: - break - for k in seen: - cols.append(metadata_column(k)) return cols def _selection_fn(row): + # Return explicit URL selection args so `select -run-cmd` and `@N` expansion + # behave correctly when the downstream stage is a downloader (e.g., download-file). + # Using '-url' is explicit and avoids ambiguity during argument parsing. if getattr(row, "path", None): - return ["-path", row.path] + return ["-url", row.path] return ["-title", row.title or ""] - SAMPLE_ITEMS = [ - {"title": "Room of Awe", "path": "sample/Room of Awe", "ext": "zip", "size_bytes": 1024 * 1024 * 12, "full_metadata": {"platform": "PC"}}, - {"title": "Song of Joy", "path": "sample/Song of Joy.mp3", "ext": "mp3", "size_bytes": 5120000, "full_metadata": {"platform": "PC"}}, - {"title": "Cover Image", "path": "sample/Cover.jpg", "ext": "jpg", "size_bytes": 20480, "full_metadata": {}}, - ] - try: - register_provider( - "vimm", - _adapter, - columns=_columns_factory, - selection_fn=_selection_fn, - metadata={"description": "Vimm provider bridge (ProviderCore -> ResultTable API)"}, - ) - except Exception: - # Non-fatal: registration is best-effort - pass + register_provider( + "vimm", + _adapter, + columns=_columns_factory, + selection_fn=_selection_fn, + metadata={"description": "Minimal Vimm provider"}, + ) except Exception: + # best-effort registration pass diff --git a/ProviderCore/base.py b/ProviderCore/base.py index 874c40f..cdc2f45 100644 --- a/ProviderCore/base.py +++ b/ProviderCore/base.py @@ -1,5 +1,7 @@ from __future__ import annotations +import re + from abc import ABC, abstractmethod from dataclasses import dataclass, field from pathlib import Path @@ -46,9 +48,51 @@ class SearchResult: except Exception: pass + try: + selection_args = getattr(self, "selection_args", None) + except Exception: + selection_args = None + if selection_args is None: + try: + fm = getattr(self, "full_metadata", None) + if isinstance(fm, dict): + selection_args = fm.get("_selection_args") or fm.get("selection_args") + except Exception: + selection_args = None + if selection_args: + out["_selection_args"] = selection_args + return out +def parse_inline_query_arguments(raw_query: str) -> Tuple[str, Dict[str, str]]: + """Extract inline key:value arguments from a provider search query.""" + + query_text = str(raw_query or "").strip() + if not query_text: + return "", {} + + tokens = re.split(r"[,\s]+", query_text) + leftover: List[str] = [] + parsed_args: Dict[str, str] = {} + + for token in tokens: + if not token: + continue + sep_index = token.find(":") + if sep_index < 0: + sep_index = token.find("=") + if sep_index > 0: + key = token[:sep_index].strip().lower() + value = token[sep_index + 1 :].strip() + if key and value: + parsed_args[key] = value + continue + leftover.append(token) + + return " ".join(leftover).strip(), parsed_args + + class Provider(ABC): """Unified provider base class. @@ -97,6 +141,12 @@ class Provider(ABC): return [] return out + def extract_query_arguments(self, query: str) -> Tuple[str, Dict[str, Any]]: + """Allow providers to normalize query text and parse inline arguments.""" + + normalized = str(query or "").strip() + return normalized, {} + # Standard lifecycle/auth hook. def login(self, **_kwargs: Any) -> bool: return True diff --git a/ProviderCore/download.py b/ProviderCore/download.py deleted file mode 100644 index 04ec43f..0000000 --- a/ProviderCore/download.py +++ /dev/null @@ -1,100 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import Callable, Optional -import sys - -import requests - -from SYS.models import ProgressBar - - -def sanitize_filename(name: str, *, max_len: int = 150) -> str: - text = str(name or "").strip() - if not text: - return "download" - - forbidden = set('<>:"/\\|?*') - cleaned = "".join("_" if c in forbidden else c for c in text) - cleaned = " ".join(cleaned.split()).strip().strip(".") - if not cleaned: - cleaned = "download" - return cleaned[:max_len] - - -def download_file( - url: str, - output_path: Path, - *, - session: Optional[requests.Session] = None, - timeout_s: float = 30.0, - progress_callback: Optional[Callable[[int, - Optional[int], - str], - None]] = None, -) -> bool: - output_path = Path(output_path) - output_path.parent.mkdir(parents=True, exist_ok=True) - - s = session or requests.Session() - - bar = ProgressBar() if progress_callback is None else None - downloaded = 0 - total = None - - try: - with s.get(url, stream=True, timeout=timeout_s) as resp: - resp.raise_for_status() - try: - total_val = int(resp.headers.get("content-length") or 0) - total = total_val if total_val > 0 else None - except Exception: - total = None - - label = str(output_path.name or "download") - - # Render once immediately so fast downloads still show something. - try: - if progress_callback is not None: - progress_callback(0, total, label) - elif bar is not None: - bar.update(downloaded=0, total=total, label=label, file=sys.stderr) - except Exception: - pass - - with open(output_path, "wb") as f: - for chunk in resp.iter_content(chunk_size=1024 * 256): - if chunk: - f.write(chunk) - downloaded += len(chunk) - try: - if progress_callback is not None: - progress_callback(downloaded, total, label) - elif bar is not None: - bar.update( - downloaded=downloaded, - total=total, - label=label, - file=sys.stderr - ) - except Exception: - pass - - try: - if bar is not None: - bar.finish() - except Exception: - pass - return output_path.exists() and output_path.stat().st_size > 0 - except Exception: - try: - if bar is not None: - bar.finish() - except Exception: - pass - try: - if output_path.exists(): - output_path.unlink() - except Exception: - pass - return False diff --git a/ProviderCore/inline_utils.py b/ProviderCore/inline_utils.py new file mode 100644 index 0000000..3e17c58 --- /dev/null +++ b/ProviderCore/inline_utils.py @@ -0,0 +1,127 @@ +"""Inline query helpers for providers (choice normalization and filter resolution).""" +from __future__ import annotations + +from typing import Any, Dict, List, Optional + + +def _normalize_choice(entry: Any) -> Optional[Dict[str, Any]]: + if entry is None: + return None + if isinstance(entry, dict): + value = entry.get("value") + text = entry.get("text") or entry.get("label") or value + aliases = entry.get("alias") or entry.get("aliases") or [] + value_str = str(value) if value is not None else (str(text) if text is not None else None) + text_str = str(text) if text is not None else value_str + if not value_str or not text_str: + return None + alias_list = [str(a) for a in aliases if a is not None] + return {"value": value_str, "text": text_str, "aliases": alias_list} + return {"value": str(entry), "text": str(entry), "aliases": []} + + +def collect_choice(provider: Any) -> Dict[str, List[Dict[str, Any]]]: + """Collect normalized inline/query argument choice entries from a provider. + + Supports QUERY_ARG_CHOICES, INLINE_QUERY_FIELD_CHOICES, and the + helper methods valued by Providers (`query_field_choices` / + `inline_query_field_choices`). Each choice is normalized to {value,text,aliases}. + """ + + mapping: Dict[str, List[Dict[str, Any]]] = {} + + def _ingest(source: Any, target_key: str) -> None: + normalized: List[Dict[str, Any]] = [] + seq = source + try: + if callable(seq): + seq = seq() + except Exception: + seq = source + if isinstance(seq, dict): + seq = seq.get("choices") or seq.get("values") or seq + if isinstance(seq, (list, tuple, set)): + for entry in seq: + n = _normalize_choice(entry) + if n: + normalized.append(n) + if normalized: + mapping[target_key] = normalized + + base = getattr(provider, "QUERY_ARG_CHOICES", None) + if isinstance(base, dict): + for k, v in base.items(): + key_norm = str(k).strip().lower() + if not key_norm: + continue + _ingest(v, key_norm) + + try: + fn = getattr(provider, "inline_query_field_choices", None) + if callable(fn): + extra = fn() + if isinstance(extra, dict): + for k, v in extra.items(): + key_norm = str(k).strip().lower() + if not key_norm: + continue + _ingest(v, key_norm) + except Exception: + pass + + return mapping + + +def resolve_filter( + provider: Any, + inline_args: Dict[str, Any], + *, + field_transforms: Optional[Dict[str, Any]] = None, +) -> Dict[str, str]: + """Map inline query args to provider filter values using declared choices. + + - Uses provider choice mapping (value/text/aliases) to resolve user text. + - Applies optional per-field transforms (e.g., str.upper). + - Returns normalized filters suitable for provider.search. + """ + + filters: Dict[str, str] = {} + if not inline_args: + return filters + + mapping = collect_choice(provider) + transforms = field_transforms or {} + + for raw_key, raw_val in inline_args.items(): + if raw_val is None: + continue + key = str(raw_key or "").strip().lower() + val_str = str(raw_val).strip() + if not key or not val_str: + continue + + entries = mapping.get(key, []) + resolved: Optional[str] = None + val_lower = val_str.lower() + for entry in entries: + text = str(entry.get("text") or "").strip() + value = str(entry.get("value") or "").strip() + aliases = [str(a).strip() for a in entry.get("aliases", []) if a is not None] + alias_lowers = {a.lower() for a in aliases} + if val_lower in {text.lower(), value.lower()} or val_lower in alias_lowers: + resolved = value or text or val_str + break + + if resolved is None: + resolved = val_str + + transform = transforms.get(key) + if callable(transform): + try: + resolved = transform(resolved) + except Exception: + pass + if resolved: + filters[key] = str(resolved) + + return filters diff --git a/ProviderCore/registry.py b/ProviderCore/registry.py index 1a9093a..afc5655 100644 --- a/ProviderCore/registry.py +++ b/ProviderCore/registry.py @@ -89,7 +89,6 @@ class ProviderRegistry: replace: bool = False, ) -> ProviderInfo: """Register a provider class with canonical and alias names.""" - candidates = self._candidate_names(provider_class, override_name) if not candidates: raise ValueError("provider name candidates are required") @@ -397,6 +396,125 @@ def match_provider_name_for_url(url: str) -> Optional[str]: return None +def provider_inline_query_choices( + provider_name: str, + field_name: str, + config: Optional[Dict[str, Any]] = None, +) -> List[str]: + """Return provider-declared inline query choices for a field (e.g., system:GBA). + + Providers can expose a mapping via ``QUERY_ARG_CHOICES`` (preferred) or + ``INLINE_QUERY_FIELD_CHOICES`` / ``inline_query_field_choices()``. The helper + keeps completion logic simple and reusable. + This helper keeps completion logic simple and reusable. + """ + + pname = str(provider_name or "").strip().lower() + field = str(field_name or "").strip().lower() + if not pname or not field: + return [] + + provider = get_search_provider(pname, config) + if provider is None: + provider = get_provider(pname, config) + if provider is None: + return [] + + def _normalize_choice_entry(entry: Any) -> Optional[Dict[str, Any]]: + if entry is None: + return None + if isinstance(entry, dict): + value = entry.get("value") + text = entry.get("text") or entry.get("label") or value + aliases = entry.get("alias") or entry.get("aliases") or [] + value_str = str(value) if value is not None else (str(text) if text is not None else None) + text_str = str(text) if text is not None else value_str + if not value_str or not text_str: + return None + alias_list = [str(a) for a in aliases if a is not None] + return {"value": value_str, "text": text_str, "aliases": alias_list} + # string/other primitives + return {"value": str(entry), "text": str(entry), "aliases": []} + + def _collect_mapping(p) -> Dict[str, List[Dict[str, Any]]]: + mapping: Dict[str, List[Dict[str, Any]]] = {} + base = getattr(p, "QUERY_ARG_CHOICES", None) + if not isinstance(base, dict): + base = getattr(p, "INLINE_QUERY_FIELD_CHOICES", None) + if isinstance(base, dict): + for k, v in base.items(): + normalized: List[Dict[str, Any]] = [] + seq = v + try: + if callable(seq): + seq = seq() + except Exception: + seq = v + if isinstance(seq, dict): + seq = seq.get("choices") or seq.get("values") or seq + if isinstance(seq, (list, tuple, set)): + for entry in seq: + n = _normalize_choice_entry(entry) + if n: + normalized.append(n) + if normalized: + mapping[str(k).strip().lower()] = normalized + try: + fn = getattr(p, "inline_query_field_choices", None) + if callable(fn): + extra = fn() + if isinstance(extra, dict): + for k, v in extra.items(): + normalized: List[Dict[str, Any]] = [] + seq = v + try: + if callable(seq): + seq = seq() + except Exception: + seq = v + if isinstance(seq, dict): + seq = seq.get("choices") or seq.get("values") or seq + if isinstance(seq, (list, tuple, set)): + for entry in seq: + n = _normalize_choice_entry(entry) + if n: + normalized.append(n) + if normalized: + mapping[str(k).strip().lower()] = normalized + except Exception: + pass + return mapping + + try: + mapping = _collect_mapping(provider) + if not mapping: + return [] + + entries = mapping.get(field, []) + if not entries: + return [] + + seen: set[str] = set() + out: List[str] = [] + for entry in entries: + text = entry.get("text") or entry.get("value") + if not text: + continue + text_str = str(text) + if text_str in seen: + continue + seen.add(text_str) + out.append(text_str) + for alias in entry.get("aliases", []): + alias_str = str(alias) + if alias_str and alias_str not in seen: + seen.add(alias_str) + out.append(alias_str) + return out + except Exception: + return [] + + def get_provider_for_url(url: str, config: Optional[Dict[str, Any]] = None) -> Optional[Provider]: name = match_provider_name_for_url(url) @@ -405,6 +523,60 @@ def get_provider_for_url(url: str, return get_provider(name, config) +def resolve_inline_filters( + provider: Provider, + inline_args: Dict[str, Any], + *, + field_transforms: Optional[Dict[str, Any]] = None, +) -> Dict[str, str]: + """Map inline query args to provider filter values using declared choices. + + - Uses provider's inline choice mapping (value/text/aliases) to resolve user text. + - Applies optional per-field transforms (e.g., str.upper). + - Returns normalized filters suitable for provider.search. + """ + + filters: Dict[str, str] = {} + if not inline_args: + return filters + + mapping = _collect_mapping(provider) + transforms = field_transforms or {} + + for raw_key, raw_val in inline_args.items(): + if raw_val is None: + continue + key = str(raw_key or "").strip().lower() + val_str = str(raw_val).strip() + if not key or not val_str: + continue + + entries = mapping.get(key, []) + resolved: Optional[str] = None + val_lower = val_str.lower() + for entry in entries: + text = str(entry.get("text") or "").strip() + value = str(entry.get("value") or "").strip() + aliases = [str(a).strip() for a in entry.get("aliases", []) if a is not None] + if val_lower in {text.lower(), value.lower()} or val_lower in {a.lower() for a in aliases}: + resolved = value or text or val_str + break + + if resolved is None: + resolved = val_str + + transform = transforms.get(key) + if callable(transform): + try: + resolved = transform(resolved) + except Exception: + pass + if resolved: + filters[key] = str(resolved) + + return filters + + __all__ = [ "ProviderInfo", "Provider", @@ -423,4 +595,5 @@ __all__ = [ "get_provider_class", "selection_auto_stage_for_table", "download_soulseek_file", + "provider_inline_query_choices", ] diff --git a/SYS/html_table.py b/SYS/html_table.py new file mode 100644 index 0000000..9d8facb --- /dev/null +++ b/SYS/html_table.py @@ -0,0 +1,302 @@ +"""Small helper utilities for extracting structured records from HTML tables +using lxml. + +Goal: make it trivial for provider authors to extract table rows and common +fields (title, link, standardized column keys) without re-implementing the +same heuristics in every provider. + +Key functions: +- find_candidate_nodes(doc_or_html, xpaths=...) +- extract_records(doc_or_html, base_url=None, xpaths=...) +- normalize_header(name, synonyms=...) + +This module intentionally avoids heavyweight deps (no pandas) and works with +`lxml.html` elements (the project already uses lxml). +""" +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Tuple +from lxml import html as lxml_html +from urllib.parse import urljoin +import re + +# Default xpaths for candidate result containers +_DEFAULT_XPATHS = [ + "//table//tbody/tr", + "//table//tr[td]", + "//div[contains(@class,'list-item')]", + "//div[contains(@class,'result')]", + "//li[contains(@class,'item')]", +] + +# Simple header synonyms (you can extend as needed) +_DEFAULT_SYNONYMS = { + "platform": "system", + "system": "system", + "name": "title", + "title": "title", +} + + +def _ensure_doc(doc_or_html: Any) -> lxml_html.HtmlElement: + if isinstance(doc_or_html, str): + return lxml_html.fromstring(doc_or_html) + return doc_or_html + + +def _text_or_img_title(el) -> str: + # Prefer img/@title if present (useful for flag icons) + try: + imgs = el.xpath('.//img/@title') + if imgs: + return str(imgs[0]).strip() + except Exception: + pass + return (el.text_content() or "").strip() + + +def find_candidate_nodes(doc_or_html: Any, xpaths: Optional[List[str]] = None) -> Tuple[List[Any], Optional[str]]: + """Find candidate nodes for results using a prioritized xpath list. + + Returns (nodes, chosen_xpath). + """ + doc = _ensure_doc(doc_or_html) + for xp in (xpaths or _DEFAULT_XPATHS): + try: + found = doc.xpath(xp) + if found: + return list(found), xp + except Exception: + continue + return [], None + + +def _parse_tr_nodes(tr_nodes: List[Any], base: Optional[str] = None) -> List[Dict[str, str]]: + out: List[Dict[str, str]] = [] + + for tr in tr_nodes: + try: + tds = tr.xpath("./td") + if not tds or len(tds) < 1: + continue + + # canonical fields + rec: Dict[str, str] = {} + + # Heuristic: if the first cell contains an anchor, treat it as the title/path + # (detail pages often put the file link in the first column and size in the second). + a0 = tds[0].xpath('.//a[contains(@href,"/vault/")]') or tds[0].xpath('.//a') + if a0: + rec["title"] = (a0[0].text_content() or "").strip() + href = a0[0].get("href") + rec["path"] = urljoin(base, href) if href and base else (href or "") + + # Try to find a size cell in the remaining tds (class 'size' is common) + size_val = None + for td in tds[1:]: + s = td.xpath('.//span[contains(@class,"size")]/text()') + if s: + size_val = str(s[0]).strip() + break + if not size_val and len(tds) > 1: + txt = (tds[1].text_content() or "").strip() + # crude size heuristic: contains digits and a unit letter + if txt and re.search(r"\d", txt): + size_val = txt + + if size_val: + rec["size"] = size_val + + else: + # First cell often "system"/"platform" + rec["platform"] = _text_or_img_title(tds[0]) + + # Title + optional link from second column + if len(tds) > 1: + a = tds[1].xpath('.//a[contains(@href,"/vault/")]') or tds[1].xpath('.//a') + if a: + rec["title"] = (a[0].text_content() or "").strip() + href = a[0].get("href") + rec["path"] = urljoin(base, href) if href and base else (href or "") + else: + rec["title"] = (tds[1].text_content() or "").strip() + + # Additional columns in common Vimm layout + if len(tds) > 2: + rec["region"] = _text_or_img_title(tds[2]).strip() + if len(tds) > 3: + rec["version"] = (tds[3].text_content() or "").strip() + if len(tds) > 4: + rec["languages"] = (tds[4].text_content() or "").strip() + + out.append(rec) + except Exception: + continue + + return out + + +def _parse_list_item_nodes(nodes: List[Any], base: Optional[str] = None) -> List[Dict[str, str]]: + out: List[Dict[str, str]] = [] + for node in nodes: + try: + rec: Dict[str, str] = {} + # title heuristics + a = node.xpath('.//h2/a') or node.xpath('.//a') + if a: + rec["title"] = (a[0].text_content() or "").strip() + href = a[0].get("href") + rec["path"] = urljoin(base, href) if href and base else (href or "") + else: + rec["title"] = (node.text_content() or "").strip() + + # platform, size + p = node.xpath('.//span[contains(@class,"platform")]/text()') + if p: + rec["platform"] = str(p[0]).strip() + + s = node.xpath('.//span[contains(@class,"size")]/text()') + if s: + rec["size"] = str(s[0]).strip() + + out.append(rec) + except Exception: + continue + return out + + +def normalize_header(name: str, synonyms: Optional[Dict[str, str]] = None) -> str: + """Normalize header names to a canonical form. + + Defaults map 'platform' -> 'system' and 'name' -> 'title', but callers + can pass a custom synonyms dict. + """ + if not name: + return "" + s = str(name or "").strip().lower() + s = re.sub(r"\s+", "_", s) + syn = (synonyms or _DEFAULT_SYNONYMS).get(s) + return syn or s + + +def extract_records(doc_or_html: Any, base_url: Optional[str] = None, xpaths: Optional[List[str]] = None, use_pandas_if_available: bool = True) -> Tuple[List[Dict[str, str]], Optional[str]]: + """Find result candidate nodes and return a list of normalized records plus chosen xpath. + + If pandas is available and `use_pandas_if_available` is True, attempt to parse + HTML tables using `pandas.read_html` and return those records. Falls back to + node-based parsing when pandas is not available or fails. Returns (records, chosen) + where `chosen` is the xpath that matched or the string 'pandas' when the + pandas path was used. + """ + # Prepare an HTML string for pandas if needed + html_text: Optional[str] = None + if isinstance(doc_or_html, (bytes, bytearray)): + try: + html_text = doc_or_html.decode("utf-8") + except Exception: + html_text = doc_or_html.decode("latin-1", errors="ignore") + elif isinstance(doc_or_html, str): + html_text = doc_or_html + else: + try: + html_text = lxml_html.tostring(doc_or_html, encoding="unicode") + except Exception: + html_text = str(doc_or_html) + + # Try pandas first when available and requested + if use_pandas_if_available and html_text is not None: + try: + import pandas as _pd # type: ignore + + dfs = _pd.read_html(html_text) + if dfs: + # pick the largest dataframe by row count for heuristics + df = max(dfs, key=lambda d: getattr(d, "shape", (len(getattr(d, 'index', [])), 0))[0]) + try: + rows = df.to_dict("records") + except Exception: + # Some DataFrame-like objects may have slightly different APIs + rows = [dict(r) for r in df] + + records: List[Dict[str, str]] = [] + for row in rows: + nr: Dict[str, str] = {} + for k, v in (row or {}).items(): + nk = normalize_header(str(k or "")) + nr[nk] = (str(v).strip() if v is not None else "") + records.append(nr) + + # Attempt to recover hrefs by matching anchor text -> href + try: + doc = lxml_html.fromstring(html_text) + anchors = {} + for a in doc.xpath('//a'): + txt = (a.text_content() or "").strip() + href = a.get("href") + if txt and href and txt not in anchors: + anchors[txt] = href + for rec in records: + if not rec.get("path") and rec.get("title"): + href = anchors.get(rec["title"]) + if href: + rec["path"] = urljoin(base_url, href) if base_url else href + except Exception: + pass + + return records, "pandas" + except Exception: + # Pandas not present or parsing failed; fall back to node parsing + pass + + # Fallback to node-based parsing + nodes, chosen = find_candidate_nodes(doc_or_html, xpaths=xpaths) + if not nodes: + return [], chosen + + # Determine node type and parse accordingly + first = nodes[0] + tag = getattr(first, "tag", "").lower() + if tag == "tr": + records = _parse_tr_nodes(nodes, base=base_url) + else: + # list-item style + records = _parse_list_item_nodes(nodes, base=base_url) + + # Normalize keys (map platform->system etc) + normed: List[Dict[str, str]] = [] + for r in records: + nr: Dict[str, str] = {} + for k, v in (r or {}).items(): + nk = normalize_header(k) + nr[nk] = v + normed.append(nr) + + return normed, chosen + + +# Small convenience: convert records to SearchResult. Providers can call this or +# use their own mapping when they need full SearchResult objects. +from ProviderCore.base import SearchResult # local import to avoid circular issues + + +def records_to_search_results(records: List[Dict[str, str]], table: str = "provider") -> List[SearchResult]: + out: List[SearchResult] = [] + for rec in records: + title = rec.get("title") or rec.get("name") or "" + path = rec.get("path") or "" + meta = dict(rec) + out.append( + SearchResult( + table=table, + title=str(title), + path=str(path), + detail="", + annotations=[], + media_kind="file", + size_bytes=None, + tag={table}, + columns=[(k.title(), v) for k, v in rec.items() if k and v], + full_metadata={"raw_record": rec, "raw": rec}, + ) + ) + return out diff --git a/SYS/pipeline.py b/SYS/pipeline.py index 62bbd8e..5f0c1e5 100644 --- a/SYS/pipeline.py +++ b/SYS/pipeline.py @@ -972,6 +972,16 @@ def get_last_result_table_row_selection_args(row_index: int) -> Optional[List[st return None +def get_last_result_table_row_selection_action(row_index: int) -> Optional[List[str]]: + """Get the expanded stage tokens for a row in the last result table.""" + state = _get_pipeline_state() + if _is_selectable_table(state.last_result_table) and hasattr(state.last_result_table, "rows"): + if 0 <= row_index < len(state.last_result_table.rows): + row = state.last_result_table.rows[row_index] + if hasattr(row, "selection_action"): + return row.selection_action + return None + def set_current_stage_table(result_table: Optional[Any]) -> None: """Store the current pipeline stage table for @N expansion. @@ -1035,6 +1045,17 @@ def get_current_stage_table_row_selection_args(row_index: int) -> Optional[List[ return None +def get_current_stage_table_row_selection_action(row_index: int) -> Optional[List[str]]: + """Get the expanded stage tokens for a row in the current stage table.""" + state = _get_pipeline_state() + if _is_selectable_table(state.current_stage_table) and hasattr(state.current_stage_table, "rows"): + if 0 <= row_index < len(state.current_stage_table.rows): + row = state.current_stage_table.rows[row_index] + if hasattr(row, "selection_action"): + return row.selection_action + return None + + def get_current_stage_table_row_source_index(row_index: int) -> Optional[int]: """Get the original source index for a row in the current stage table. diff --git a/SYS/provider_helpers.py b/SYS/provider_helpers.py new file mode 100644 index 0000000..c40dca5 --- /dev/null +++ b/SYS/provider_helpers.py @@ -0,0 +1,110 @@ +"""Convenience mixins and helpers for table-based providers. + +Provides a small `TableProviderMixin` that handles HTTP fetch + table extraction +(using `SYS.html_table.extract_records`) and converts records into +`ProviderCore.base.SearchResult` rows with sane default column ordering. + +Providers can subclass this mixin to implement search quickly: + +class MyProvider(TableProviderMixin, Provider): + URL = ("https://example.org/search",) + + def search(self, query, limit=50, **kwargs): + url = f"{self.URL[0]}?q={quote_plus(query)}" + return self.search_table_from_url(url, limit=limit, xpaths=self.DEFAULT_XPATHS) + +The mixin deliberately avoids adding heavy dependencies (uses our lxml helper) +so authors don't have to install pandas/bs4 unless they want to. +""" +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from urllib.parse import quote_plus + +from API.HTTP import HTTPClient +from ProviderCore.base import SearchResult +from SYS.html_table import extract_records +import lxml.html as lxml_html + + +class TableProviderMixin: + """Mixin to simplify providers that scrape table/list results from HTML. + + Methods: + - search_table_from_url(url, limit, xpaths): fetches HTML, extracts records, returns SearchResults + - DEFAULT_XPATHS: default xpath list used when none is provided + """ + + # Reuse the same defaults as the html_table helper + DEFAULT_XPATHS: List[str] = [ + "//table//tbody/tr", + "//table//tr[td]", + "//div[contains(@class,'list-item')]", + "//div[contains(@class,'result')]", + "//li[contains(@class,'item')]", + ] + + def search_table_from_url(self, url: str, limit: int = 50, xpaths: Optional[List[str]] = None, timeout: float = 15.0) -> List[SearchResult]: + """Fetch `url`, extract table/list records, and return SearchResult list. + + `xpaths` is passed to `extract_records` (falls back to DEFAULT_XPATHS). + """ + if not url: + return [] + + try: + with HTTPClient(timeout=timeout) as client: + resp = client.get(url) + content = resp.content + except Exception: + return [] + + # Ensure we pass an lxml document or string (httpx returns bytes) + try: + doc = lxml_html.fromstring(content) + except Exception: + try: + doc = content.decode("utf-8") + except Exception: + doc = str(content) + + records, chosen = extract_records(doc, base_url=url, xpaths=xpaths or self.DEFAULT_XPATHS) + + results: List[SearchResult] = [] + for rec in (records or [])[: int(limit)]: + title = rec.get("title") or "" + path = rec.get("path") or "" + platform = rec.get("system") or rec.get("platform") or "" + size = rec.get("size") or "" + region = rec.get("region") or "" + version = rec.get("version") or "" + languages = rec.get("languages") or "" + + cols = [("Title", title)] + if platform: + cols.append(("Platform", platform)) + if size: + cols.append(("Size", size)) + if region: + cols.append(("Region", region)) + if version: + cols.append(("Version", version)) + if languages: + cols.append(("Languages", languages)) + + results.append( + SearchResult( + table=(getattr(self, "name", "provider") or "provider"), + title=title, + path=path, + detail="", + annotations=[], + media_kind="file", + size_bytes=None, + tag={getattr(self, "name", "provider")}, + columns=cols, + full_metadata={"raw_record": rec}, + ) + ) + + return results diff --git a/SYS/result_table.py b/SYS/result_table.py index e417f31..64bbe76 100644 --- a/SYS/result_table.py +++ b/SYS/result_table.py @@ -359,6 +359,8 @@ class ResultRow: columns: List[ResultColumn] = field(default_factory=list) selection_args: Optional[List[str]] = None """Arguments to use for this row when selected via @N syntax (e.g., ['-item', '3'])""" + selection_action: Optional[List[str]] = None + """Full expanded stage tokens that should run when this row is selected.""" source_index: Optional[int] = None """Original insertion order index (used to map sorted views back to source items).""" payload: Optional[Any] = None @@ -648,6 +650,11 @@ class ResultTable: if 0 <= row_index < len(self.rows): self.rows[row_index].selection_args = selection_args + def set_row_selection_action(self, row_index: int, selection_action: List[str]) -> None: + """Specify the entire stage tokens to run for this row on @N.""" + if 0 <= row_index < len(self.rows): + self.rows[row_index].selection_action = selection_action + def set_header_lines(self, lines: List[str]) -> "ResultTable": """Attach metadata lines that render beneath the title.""" self.header_lines = [line for line in lines if line] @@ -827,6 +834,30 @@ class ResultTable: if hasattr(result, "annotations") and result.annotations: row.add_column("Annotations", ", ".join(str(a) for a in result.annotations)) + try: + md = getattr(result, "full_metadata", None) + md_dict = dict(md) if isinstance(md, dict) else {} + except Exception: + md_dict = {} + + try: + selection_args = getattr(result, "selection_args", None) + except Exception: + selection_args = None + if selection_args is None: + selection_args = md_dict.get("_selection_args") or md_dict.get("selection_args") + if selection_args: + row.selection_args = [str(a) for a in selection_args if a is not None] + + try: + selection_action = getattr(result, "selection_action", None) + except Exception: + selection_action = None + if selection_action is None: + selection_action = md_dict.get("_selection_action") or md_dict.get("selection_action") + if selection_action: + row.selection_action = [str(a) for a in selection_action if a is not None] + def _add_result_item(self, row: ResultRow, item: Any) -> None: """Extract and add ResultItem fields to row (compact display for search results). diff --git a/SYS/result_table_adapters.py b/SYS/result_table_adapters.py index b9a9ffa..935e9f1 100644 --- a/SYS/result_table_adapters.py +++ b/SYS/result_table_adapters.py @@ -10,10 +10,10 @@ from __future__ import annotations from dataclasses import dataclass from typing import Any, Callable, Dict, Iterable, List, Optional, Union -from SYS.result_table_api import ColumnSpec, ProviderAdapter, ResultModel +from SYS.result_table_api import ColumnSpec, ProviderAdapter, ResultModel, ResultTable, ensure_result_model -ColumnFactory = Callable[[Iterable[ResultModel]], List[ColumnSpec]] +ColumnFactory = Callable[[List[ResultModel]], List[ColumnSpec]] SelectionFn = Callable[[ResultModel], List[str]] @@ -22,33 +22,57 @@ class Provider: name: str adapter: ProviderAdapter # columns can be a static list or a factory that derives columns from sample rows - columns: Optional[Union[List[ColumnSpec], ColumnFactory]] = None - selection_fn: Optional[SelectionFn] = None + columns: Union[List[ColumnSpec], ColumnFactory] + selection_fn: SelectionFn metadata: Optional[Dict[str, Any]] = None def get_columns(self, rows: Optional[Iterable[ResultModel]] = None) -> List[ColumnSpec]: + if self.columns is None: + raise ValueError(f"provider '{self.name}' must define columns") + if callable(self.columns): - try: - rows_list = list(rows) if rows is not None else [] - return list(self.columns(rows_list)) - except Exception: - # Fall back to a minimal Title column on errors - return [ColumnSpec("title", "Title", lambda r: r.title)] - if self.columns is not None: - return list(self.columns) - # Default minimal column set - return [ColumnSpec("title", "Title", lambda r: r.title)] + rows_list = list(rows) if rows is not None else [] + cols = list(self.columns(rows_list)) + else: + cols = list(self.columns) + + if not cols: + raise ValueError(f"provider '{self.name}' produced no columns") + + return cols def selection_args(self, row: ResultModel) -> List[str]: - if callable(self.selection_fn): - try: - return list(self.selection_fn(row)) - except Exception: - return [] - # Default selector: prefer path flag, then title - if getattr(row, "path", None): - return ["-path", str(row.path)] - return ["-title", str(row.title)] + if not callable(self.selection_fn): + raise ValueError(f"provider '{self.name}' must define a selection function") + + sel = list(self.selection_fn(ensure_result_model(row))) + return sel + + def build_table(self, items: Iterable[Any]) -> ResultTable: + """Materialize adapter output into a ResultTable (strict, no legacy types).""" + + try: + rows = [ensure_result_model(r) for r in self.adapter(items)] + except Exception as exc: + raise RuntimeError(f"provider '{self.name}' adapter failed") from exc + + cols = self.get_columns(rows) + return ResultTable(provider=self.name, rows=rows, columns=cols, meta=self.metadata or {}) + + def serialize_row(self, row: ResultModel) -> Dict[str, Any]: + r = ensure_result_model(row) + return { + "title": r.title, + "path": r.path, + "ext": r.ext, + "size_bytes": r.size_bytes, + "metadata": r.metadata or {}, + "source": r.source or self.name, + "_selection_args": self.selection_args(r), + } + + def serialize_rows(self, rows: Iterable[ResultModel]) -> List[Dict[str, Any]]: + return [self.serialize_row(r) for r in rows] _PROVIDERS: Dict[str, Provider] = {} @@ -58,8 +82,8 @@ def register_provider( name: str, adapter: ProviderAdapter, *, - columns: Optional[Union[List[ColumnSpec], ColumnFactory]] = None, - selection_fn: Optional[SelectionFn] = None, + columns: Union[List[ColumnSpec], ColumnFactory], + selection_fn: SelectionFn, metadata: Optional[Dict[str, Any]] = None, ) -> Provider: name = str(name or "").strip().lower() @@ -67,13 +91,20 @@ def register_provider( raise ValueError("provider name required") if name in _PROVIDERS: raise ValueError(f"provider already registered: {name}") + if columns is None: + raise ValueError("provider registration requires columns") + if selection_fn is None: + raise ValueError("provider registration requires selection_fn") p = Provider(name=name, adapter=adapter, columns=columns, selection_fn=selection_fn, metadata=metadata) _PROVIDERS[name] = p return p def get_provider(name: str) -> Provider: - return _PROVIDERS[name.lower()] + normalized = str(name or "").lower() + if normalized not in _PROVIDERS: + raise KeyError(f"provider not registered: {name}") + return _PROVIDERS[normalized] def list_providers() -> List[str]: diff --git a/SYS/result_table_api.py b/SYS/result_table_api.py index cf64d6b..a4eb992 100644 --- a/SYS/result_table_api.py +++ b/SYS/result_table_api.py @@ -7,7 +7,7 @@ renderers must use. It intentionally refuses to accept legacy dicts/strings/objs from __future__ import annotations from dataclasses import dataclass, field -from typing import Any, Callable, Dict, Iterable, Optional, Protocol +from typing import Any, Callable, Dict, Iterable, List, Optional, Protocol @dataclass(frozen=True) @@ -33,6 +33,48 @@ class ResultModel: source: Optional[str] = None +@dataclass(frozen=True) +class ResultTable: + """Concrete, provider-owned table of rows/columns. + + This is intentionally minimal: it only stores rows, column specs, and + optional metadata used by renderers. It does not auto-normalize legacy + objects or infer columns. + """ + + provider: str + rows: List[ResultModel] + columns: List[ColumnSpec] + meta: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + if not str(self.provider or "").strip(): + raise ValueError("provider required for ResultTable") + object.__setattr__(self, "rows", [ensure_result_model(r) for r in self.rows]) + if not self.columns: + raise ValueError("columns are required for ResultTable") + object.__setattr__(self, "columns", list(self.columns)) + object.__setattr__(self, "meta", dict(self.meta or {})) + + def serialize_row(self, row: ResultModel, selection: Optional[List[str]] = None) -> Dict[str, Any]: + """Convert a row into pipeline-friendly dict (with selection args). + + Selection args must be precomputed by the provider; this method only + copies them into the serialized dict. + """ + + r = ensure_result_model(row) + return { + "title": r.title, + "path": r.path, + "ext": r.ext, + "size_bytes": r.size_bytes, + "metadata": r.metadata or {}, + "source": r.source or self.provider, + "_selection_args": list(selection or []), + } + + @dataclass(frozen=True) class ColumnSpec: """Specification for a column that renderers will use. @@ -100,6 +142,7 @@ def metadata_column(key: str, header: Optional[str] = None, format_fn: Optional[ __all__ = [ "ResultModel", + "ResultTable", "ColumnSpec", "ProviderAdapter", "Renderer", diff --git a/SYS/result_table_renderers.py b/SYS/result_table_renderers.py index c361e6a..416b33f 100644 --- a/SYS/result_table_renderers.py +++ b/SYS/result_table_renderers.py @@ -9,7 +9,7 @@ from __future__ import annotations from typing import Any, Dict, Iterable, Optional -from SYS.result_table_api import ColumnSpec, ResultModel, Renderer +from SYS.result_table_api import ColumnSpec, ResultModel, ResultTable, Renderer class RichRenderer(Renderer): @@ -65,3 +65,22 @@ def render_to_console(rows: Iterable[ResultModel], columns: Iterable[ColumnSpec] table = RichRenderer().render(rows, columns, meta) Console().print(table) + + +def render_result_table(table: ResultTable, renderer: Optional[Renderer] = None) -> Any: + """Render a ResultTable with the provided renderer (RichRenderer by default).""" + + rend = renderer or RichRenderer() + return rend.render(table.rows, table.columns, table.meta) + + +def render_result_table_to_console(table: ResultTable, renderer: Optional[Renderer] = None) -> None: + try: + from rich.console import Console + except Exception: + for r in table.rows: + print(" ".join(str((col.extractor(r) or "")) for col in table.columns)) + return + + console = Console() + console.print(render_result_table(table, renderer)) diff --git a/SYS/utils.py b/SYS/utils.py index 2598171..e32a328 100644 --- a/SYS/utils.py +++ b/SYS/utils.py @@ -66,6 +66,24 @@ def sanitize_metadata_value(value: Any) -> str | None: return value +def sanitize_filename(name: str, *, max_len: int = 150) -> str: + """Return a filesystem-safe filename derived from *name*. + + Replaces characters that are invalid on Windows with underscores and + collapses whitespace. Trims trailing periods and enforces a max length. + """ + text = str(name or "").strip() + if not text: + return "download" + + forbidden = set('<>:"/\\|?*') + cleaned = "".join("_" if c in forbidden else c for c in text) + cleaned = " ".join(cleaned.split()).strip().strip(".") + if not cleaned: + cleaned = "download" + return cleaned[:max_len] + + def unique_preserve_order(values: Iterable[str]) -> list[str]: seen: set[str] = set() ordered: list[str] = [] diff --git a/Store/registry.py b/Store/registry.py index 50464a8..8365673 100644 --- a/Store/registry.py +++ b/Store/registry.py @@ -374,3 +374,61 @@ class Store: return bool(ok) if ok is not None else True except Exception: return False + + +def list_configured_backend_names(config: Optional[Dict[str, Any]]) -> list[str]: + """Return backend instance names present in the provided config WITHOUT instantiating backends. + + This is a lightweight helper for CLI usage where we only need to know if a + configured backend exists (e.g., to distinguish a store name from a filesystem path) + without triggering backend initialization (which may perform network calls). + + Behaviour: + - For each configured store type, returns the per-instance NAME override (case-insensitive) + when present, otherwise the instance key. + - Includes a 'temp' alias when a folder backend points to the configured 'temp' path. + """ + try: + store_cfg = (config or {}).get("store") or {} + if not isinstance(store_cfg, dict): + return [] + + names: list[str] = [] + for raw_store_type, instances in store_cfg.items(): + if not isinstance(instances, dict): + continue + for instance_name, instance_config in instances.items(): + if isinstance(instance_config, dict): + override_name = _get_case_insensitive(dict(instance_config), "NAME") + if override_name: + names.append(str(override_name)) + else: + names.append(str(instance_name)) + else: + names.append(str(instance_name)) + + # Best-effort: alias 'temp' when a folder backend points at config['temp'] + try: + temp_value = (config or {}).get("temp") + if temp_value: + temp_path = str(Path(str(temp_value)).expanduser().resolve()) + for raw_store_type, instances in store_cfg.items(): + if not isinstance(instances, dict): + continue + if _normalize_store_type(str(raw_store_type)) != "folder": + continue + for instance_name, instance_config in instances.items(): + if not isinstance(instance_config, dict): + continue + path_value = instance_config.get("PATH") or instance_config.get("path") + if not path_value: + continue + if str(Path(str(path_value)).expanduser().resolve()) == temp_path: + if "temp" not in names: + names.append("temp") + except Exception: + pass + + return sorted(set(names)) + except Exception: + return [] diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index 40c8b42..3bdf683 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -219,17 +219,18 @@ class SharedArgs: SharedArgs.STORE.choices = SharedArgs.get_store_choices(config) """ try: - from Store import Store + # Use the non-instantiating helper so autocomplete doesn't trigger backend init. + from Store.registry import list_configured_backend_names # If no config provided, try to load it if config is None: try: from SYS.config import load_config + config = load_config() except Exception: return [] - store = Store(config) - return store.list_backends() + return list_configured_backend_names(config) except Exception: # Fallback to empty list if FileStorage isn't available return [] diff --git a/cmdlet/add_file.py b/cmdlet/add_file.py index 8b36963..d975855 100644 --- a/cmdlet/add_file.py +++ b/cmdlet/add_file.py @@ -321,9 +321,11 @@ class Add_File(Cmdlet): is_storage_backend_location = False if location: try: - store_probe = Store(config) + # Use a config-only check to avoid instantiating backends (which may perform network checks). + from Store.registry import list_configured_backend_names + is_storage_backend_location = location in ( - store_probe.list_backends() or [] + list_configured_backend_names(config) or [] ) except Exception: is_storage_backend_location = False diff --git a/cmdlet/download_file.py b/cmdlet/download_file.py index 826d398..d210139 100644 --- a/cmdlet/download_file.py +++ b/cmdlet/download_file.py @@ -70,6 +70,7 @@ class Download_File(Cmdlet): "download-http"], arg=[ SharedArgs.URL, + SharedArgs.PROVIDER, SharedArgs.PATH, SharedArgs.QUERY, # Prefer -path for output directory to match other cmdlets; keep -output for backwards compatibility. @@ -121,6 +122,7 @@ class Download_File(Cmdlet): def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Main execution method.""" + debug(f"[download-file] run invoked with args: {list(args)}") return self._run_impl(result, args, config) @staticmethod @@ -889,7 +891,7 @@ class Download_File(Cmdlet): return expanded_items - def _process_provider_items( + def _process_provider_items(self, *, piped_items: Sequence[Any], final_output_dir: Path, diff --git a/cmdlet/provider_table.py b/cmdlet/provider_table.py index d4d9970..ba4ee1c 100644 --- a/cmdlet/provider_table.py +++ b/cmdlet/provider_table.py @@ -1,7 +1,7 @@ from __future__ import annotations -from typing import Any, Dict, Iterable, Optional, Sequence -from pathlib import Path +import sys +from typing import Any, Dict, Iterable, Sequence from . import _shared as sh from SYS.logger import log, debug @@ -68,47 +68,34 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: return 1 items = inputs - # Build rows try: - rows = list(provider.adapter(items)) + table = provider.build_table(items) except Exception as exc: - log(f"Provider adapter failed: {exc}", file=sys.stderr) + log(f"Provider '{provider.name}' failed: {exc}", file=sys.stderr) return 1 - cols = provider.get_columns(rows) - # Emit rows for downstream pipeline consumption (pipable behavior). try: - for r in rows: + for item in provider.serialize_rows(table.rows): try: - item = { - "title": getattr(r, "title", None) or None, - "path": getattr(r, "path", None) or None, - "ext": getattr(r, "ext", None) or None, - "size_bytes": getattr(r, "size_bytes", None) or None, - "metadata": getattr(r, "metadata", None) or {}, - "source": getattr(r, "source", None) or provider.name, - "_selection_args": provider.selection_args(r), - } ctx.emit(item) except Exception: - # Best-effort: continue emitting other rows continue except Exception: - # Non-fatal: continue to rendering even if emission fails + # Non-fatal: rendering still happens pass # Render using RichRenderer try: - table = RichRenderer().render(rows, cols, provider.metadata) + renderable = RichRenderer().render(table.rows, table.columns, table.meta) try: from rich.console import Console - Console().print(table) + Console().print(renderable) except Exception: # Fallback to simple printing - for r in rows: - print(" ".join(str((c.extractor(r) or "")) for c in cols)) + for r in table.rows: + print(" ".join(str((c.extractor(r) or "")) for c in table.columns)) except Exception as exc: log(f"Rendering failed: {exc}", file=sys.stderr) return 1 @@ -123,11 +110,11 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: log("Invalid -select value; must be an integer", file=sys.stderr) return 1 - if select_idx < 0 or select_idx >= len(rows): + if select_idx < 0 or select_idx >= len(table.rows): log("-select out of range", file=sys.stderr) return 1 - selected = rows[select_idx] + selected = table.rows[select_idx] sel_args = provider.selection_args(selected) if not run_cmd: diff --git a/cmdlet/screen_shot.py b/cmdlet/screen_shot.py index 47b185a..32f5bf2 100644 --- a/cmdlet/screen_shot.py +++ b/cmdlet/screen_shot.py @@ -40,7 +40,7 @@ from SYS import pipeline as pipeline_context # Playwright & Screenshot Dependencies # ============================================================================ -from tool.playwright import HAS_PLAYWRIGHT, PlaywrightTimeoutError, PlaywrightTool +from tool.playwright import PlaywrightTimeoutError, PlaywrightTool try: from SYS.config import resolve_output_dir @@ -853,12 +853,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}") return 0 - if not HAS_PLAYWRIGHT: - log( - "playwright is required for screenshot capture; install with: pip install playwright; then: playwright install", - file=sys.stderr, - ) - return 1 + progress = PipelineProgress(pipeline_context) diff --git a/cmdlet/search_file.py b/cmdlet/search_file.py index 02d08c8..cf904cf 100644 --- a/cmdlet/search_file.py +++ b/cmdlet/search_file.py @@ -241,6 +241,16 @@ class search_file(Cmdlet): else: provider_label = provider_text[:1].upper() + provider_text[1:] if provider_text else "Provider" + normalized_query = str(query or "").strip() + provider_filters: Dict[str, Any] = {} + try: + normalized_query, provider_filters = provider.extract_query_arguments(query) + except Exception: + provider_filters = {} + normalized_query = (normalized_query or "").strip() + query = normalized_query or "*" + provider_filters = dict(provider_filters or {}) + if provider_lower == "alldebrid" and effective_open_id is not None: table_title = f"{provider_label} Files: {effective_open_id}".strip().rstrip(":") else: @@ -267,17 +277,22 @@ class search_file(Cmdlet): table.set_table_metadata(table_meta) except Exception: pass - table.set_source_command("search-file", list(args_list)) - - debug(f"[search-file] Calling {provider_name}.search()") - if provider_lower == "alldebrid": - filters = {"view": "folders"} - search_open_id = parsed_open_id if parsed_open_id is not None else open_id - if search_open_id is not None: - filters = {"view": "files", "magnet_id": search_open_id} - results = provider.search(query, limit=limit, filters=filters) + if provider_lower == "vimm": + # Keep auto-staged download-file from inheriting raw query tokens; + # only propagate provider hint so @N expands to a clean downloader call. + table.set_source_command("search-file", ["-provider", provider_name]) else: - results = provider.search(query, limit=limit) + table.set_source_command("search-file", list(args_list)) + + search_filters = dict(provider_filters) + debug(f"[search-file] Calling {provider_name}.search(filters={search_filters})") + if provider_lower == "alldebrid": + search_open_id = parsed_open_id if parsed_open_id is not None else open_id + view_value = "files" if search_open_id is not None else "folders" + search_filters["view"] = view_value + if search_open_id is not None: + search_filters["magnet_id"] = search_open_id + results = provider.search(query, limit=limit, filters=search_filters or None) debug(f"[search-file] {provider_name} -> {len(results or [])} result(s)") # HIFI artist UX: if there is exactly one artist match, auto-expand @@ -342,6 +357,10 @@ class search_file(Cmdlet): if "table" not in item_dict: item_dict["table"] = table_type + # Ensure provider source is present so downstream cmdlets (select) can resolve provider + if "source" not in item_dict: + item_dict["source"] = provider_name + row_index = len(table.rows) table.add_result(search_result) diff --git a/cmdlet/select_item.py b/cmdlet/select_item.py index e7b656e..6618432 100644 --- a/cmdlet/select_item.py +++ b/cmdlet/select_item.py @@ -1,5 +1,6 @@ from __future__ import annotations +import sys from typing import Any, Dict, List, Sequence from . import _shared as sh from SYS.logger import log, debug @@ -89,28 +90,22 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: log("No input provided to select; pipe provider-table output or use a cmdlet that emits items.", file=sys.stderr) return 1 + first_src = inputs[0].get("source") if isinstance(inputs[0], dict) else None + if not first_src: + log("Input items must include 'source' to resolve provider for selection.", file=sys.stderr) + return 1 + + try: + provider = get_provider(first_src) + except Exception: + log(f"Unknown provider: {first_src}", file=sys.stderr) + return 1 + # Model-ize items rows = [_dict_to_result_model(item if isinstance(item, dict) else item) for item in inputs] - # Attempt to detect provider from first item - provider = None - first_src = inputs[0].get("source") if isinstance(inputs[0], dict) else None - if first_src: - try: - provider = get_provider(first_src) - except Exception: - provider = None - - # Columns: ask provider for column spec if available, else build minimal columns - if provider: - cols = provider.get_columns(rows) - else: - # Minimal columns built from available keys - from SYS.result_table_api import title_column, ext_column - - cols = [title_column()] - if any(r.ext for r in rows): - cols.append(ext_column()) + # Columns: provider must supply them (no legacy defaults) + cols = provider.get_columns(rows) # Render table to console try: @@ -172,26 +167,19 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: "source": raw.source, } else: - # try to call to_dict or fallback try: selected = raw.to_dict() except Exception: selected = {"title": getattr(raw, "title", str(raw))} - # Ensure selection args exist + # Ensure selection args exist using provider's selector only if not selected.get("_selection_args"): - if provider: - try: - sel_args = provider.selection_args(rows[idx]) - selected["_selection_args"] = sel_args - except Exception: - selected["_selection_args"] = [] - else: - # fallback - if selected.get("path"): - selected["_selection_args"] = ["-path", selected.get("path")] - else: - selected["_selection_args"] = ["-title", selected.get("title") or ""] + try: + sel_args = provider.selection_args(rows[idx]) + selected["_selection_args"] = sel_args + except Exception: + log("Selection args missing and provider selector failed.", file=sys.stderr) + return 1 selected_items.append(selected) except Exception: diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md new file mode 100644 index 0000000..ce35921 --- /dev/null +++ b/docs/CHANGELOG.md @@ -0,0 +1,9 @@ +# Changelog + +## Unreleased (2026-01-05) + +- **docs:** Add `docs/provider_authoring.md` with a Quick Start, examples, and testing guidance for providers that integrate with the strict `ResultTable` API (ResultModel/ColumnSpec/selection_fn). +- **docs:** Add link to `docs/result_table.md` pointing to the provider authoring guide. +- **tests:** Add `tests/test_provider_author_examples.py` validating example provider registration and adapter behavior. +- **notes:** Existing example providers (`Provider/example_provider.py`, `Provider/vimm.py`) are referenced as canonical patterns. + diff --git a/docs/PR_PROVIDER_AUTHORING.md b/docs/PR_PROVIDER_AUTHORING.md new file mode 100644 index 0000000..a15eea3 --- /dev/null +++ b/docs/PR_PROVIDER_AUTHORING.md @@ -0,0 +1,15 @@ +PR Title: docs: Add Provider authoring doc, examples, and tests + +Summary: +- Add `docs/provider_authoring.md` describing the strict `ResultModel`-based provider adapter pattern, `ColumnSpec` usage, `selection_fn`, and `TableProviderMixin` for HTML table scraping. +- Link new doc from `docs/result_table.md`. +- Add `tests/test_provider_author_examples.py` to validate `Provider/example_provider.py` and `Provider/vimm.py` integration with the registry. + +Why: +- Provide a short, focused Quick Start to help contributors author providers that integrate with the new strict ResultTable API. + +Testing: +- New tests pass locally (provider-related subset). + +Notes: +- The change is documentation-first and non-functional, with tests ensuring examples remain valid. diff --git a/docs/provider_authoring.md b/docs/provider_authoring.md new file mode 100644 index 0000000..d57f286 --- /dev/null +++ b/docs/provider_authoring.md @@ -0,0 +1,141 @@ +# Provider authoring: ResultTable & provider adapters ✅ + +This short guide explains how to write providers that integrate with the *strict* ResultTable API: adapters must yield `ResultModel` instances and providers register via `SYS.result_table_adapters.register_provider` with a column specification and a `selection_fn`. + +--- + +## Quick summary + +- Providers register a *provider adapter* (callable that yields `ResultModel`). +- Providers must also provide `columns` (static list or factory) and a `selection_fn` that returns CLI args for a selected row. +- For simple HTML table/list scraping, prefer `TableProviderMixin` from `SYS.provider_helpers` to fetch and extract rows using `SYS.html_table.extract_records`. + +## Runtime dependency policy + +- Treat required runtime dependencies (e.g., **Playwright**) as mandatory: import them unconditionally and let missing dependencies fail fast at import time. Avoid adding per-call try/except import guards for required modules—these silently hide configuration errors and add bloat. +- Use guarded imports only for truly optional dependencies (e.g., `pandas` for enhanced table parsing) and provide meaningful fallbacks or helpful error messages in those cases. +- Keep provider code minimal and explicit: fail early and document required runtime dependencies in README/installation notes. + +--- + +## Minimal provider template (copy/paste) + +```py +# Provider/my_provider.py +from typing import Any, Dict, Iterable, List + +from SYS.result_table_api import ResultModel, ColumnSpec, title_column, metadata_column +from SYS.result_table_adapters import register_provider + +# Example adapter: convert provider-specific items into ResultModel instances +SAMPLE_ITEMS = [ + {"name": "Example File.pdf", "path": "https://example.com/x.pdf", "ext": "pdf", "size": 1024, "source": "myprovider"}, +] + +def adapter(items: Iterable[Dict[str, Any]]) -> Iterable[ResultModel]: + for it in items: + title = it.get("name") or it.get("title") or str(it.get("path") or "") + yield ResultModel( + title=str(title), + path=str(it.get("path")) if it.get("path") else None, + ext=str(it.get("ext")) if it.get("ext") else None, + size_bytes=int(it.get("size")) if it.get("size") is not None else None, + metadata=dict(it), + source=str(it.get("source")) if it.get("source") else "myprovider", + ) + +# Optional: build columns dynamically from sample rows +def columns_factory(rows: List[ResultModel]) -> List[ColumnSpec]: + cols = [title_column()] + # add extra columns if metadata keys exist + if any((r.metadata or {}).get("size") for r in rows): + cols.append(ColumnSpec("size", "Size", lambda r: r.size_bytes or "")) + return cols + +# Selection args for `@N` expansion or `select` cmdlet +def selection_fn(row: ResultModel) -> List[str]: + # prefer -path when available + if row.path: + return ["-path", row.path] + return ["-title", row.title or ""] + +# Register provider (done at import time) +register_provider("myprovider", adapter, columns=columns_factory, selection_fn=selection_fn) +``` + +--- + +## Table scraping: using TableProviderMixin (HTML tables / list-results) + +If your provider scrapes HTML tables or list-like results (common on web search pages), use `TableProviderMixin`: + +```py +from ProviderCore.base import Provider +from SYS.provider_helpers import TableProviderMixin + +class MyTableProvider(TableProviderMixin, Provider): + URL = ("https://example.org/search",) + + def validate(self) -> bool: + return True + + def search(self, query: str, limit: int = 50, **kwargs): + url = f"{self.URL[0]}?q={quote_plus(query)}" + return self.search_table_from_url(url, limit=limit) +``` + +`TableProviderMixin.search_table_from_url` returns `ProviderCore.base.SearchResult` entries. If you want to integrate this provider with the strict `ResultTable` registry, add a small adapter that converts `SearchResult` -> `ResultModel` and register it using `register_provider` (see `Provider/vimm.py` for a real example). + +--- + +## Columns & selection + +- `columns` may be a static `List[ColumnSpec]` or a factory `def cols(rows: List[ResultModel]) -> List[ColumnSpec]` that inspects sample rows. +- `selection_fn` must accept a `ResultModel` and return a `List[str]` representing CLI args (e.g., `['-path', row.path]`). These args are used by `select` and `@N` expansion. + + **Tip:** for providers that produce downloadable file rows prefer returning explicit URL args (e.g., `['-url', row.path]`) so the selected URL is clearly identified by downstream downloaders and to avoid ambiguous parsing when provider hints (like `-provider`) are present. +- Ensure your `ResultModel.source` is set (either in the model or rely on the provider name set by `serialize_row`). + +--- + +## Optional: pandas path for `` extraction + +`SYS.html_table.extract_records` prefers a pure-lxml path but will use `pandas.read_html` if pandas is installed and the helper detects it works for the input table. This is optional and **not required** to author a provider — document in your provider whether it requires `pandas` and add an informative error/log message when it is missing. + +--- + +## Testing & examples + +- Write `tests/test_provider_.py` that imports your provider and verifies `provider.build_table(...)` produces a `ResultTable` (has `.rows` and `.columns`) and that `serialize_rows()` yields dicts with `_selection_args`, `_selection_action` when applicable, and `source`. +- When you need to guarantee a specific CLI stage sequence (e.g., `download-file -url -provider `), call `table.set_row_selection_action(row_index, tokens)` so the serialized payload emits `_selection_action` and the CLI can run the row exactly as intended. +- For table providers you can test `search_table_from_url` using a local HTML fixture or by mocking `HTTPClient` to return a small sample page. +- If you rely on pandas, add a test that monkeypatches `sys.modules['pandas']` to a simple shim to validate the pandas path. + +**Example test skeleton** + +```py +from SYS.result_table_adapters import get_provider +from Provider import example_provider + + +def test_example_provider_registration(): + provider = get_provider("example") + rows = list(provider.adapter(example_provider.SAMPLE_ITEMS)) + assert rows and rows[0].title + cols = provider.get_columns(rows) + assert any(c.name == "title" for c in cols) + table = provider.build_table(example_provider.SAMPLE_ITEMS) + assert table.provider == "example" and table.rows +``` + +--- + +## References & examples + +- Read `Provider/example_provider.py` for a compact example of a strict adapter and dynamic columns. +- Read `Provider/vimm.py` for a table-provider that uses `TableProviderMixin` and converts `SearchResult` → `ResultModel` for registration. +- See `docs/provider_guide.md` for a broader provider development checklist. + +--- + +If you want, I can also add a small `Provider/myprovider_template.py` file and unit tests for it — say the word and I'll add them and wire up tests. 🎯 diff --git a/docs/result_table.md b/docs/result_table.md index 46c8ef6..d31310f 100644 --- a/docs/result_table.md +++ b/docs/result_table.md @@ -13,10 +13,11 @@ This document explains the `ResultTable` system used across the CLI and TUI: how - **ResultTable** (`SYS/result_table.py`) - Renders rows as a rich table and stores metadata used for selection expansion. - - Important APIs: `add_result()`, `set_table()`, `set_source_command()`, `set_row_selection_args()`, `set_table_metadata()`, and `select_interactive()`. + - Important APIs: `add_result()`, `set_table()`, `set_source_command()`, `set_row_selection_args()`, `set_row_selection_action()`, `set_table_metadata()`, and `select_interactive()`. - **ResultRow** - Holds columns plus `selection_args` (used for `@N` expansion) and `payload` (original object). + - Optionally stores `selection_action`, a full list of CLI tokens to run when `@N` selects this row. When present the CLI honors the explicit action instead of reconstructing it from `source_command` and `selection_args`. - **Provider selector** - If a provider implements `selector(selected_items, ctx=..., stage_is_last=True)`, it is run first when `@N` is used; if the selector returns `True` it has handled the selection (e.g., drilling into a folder and publishing a new ResultTable). @@ -112,7 +113,7 @@ SearchResult( ) ``` -Illustrative file SearchResult (after drilling): +4. Otherwise, for single selections, CLI checks for `row.selection_action` and runs that verbatim if present; otherwise it expands `source_command + source_args + row_selection_args`. For multi-selections, items are piped downstream. ```py SearchResult( @@ -217,6 +218,8 @@ Notes: --- +For more detail on ResultTable provider authoring, see `docs/provider_authoring.md`. + If you'd like, I can also: - Add provider-specific examples (AllDebrid, Bandcamp) into this doc ✅ - Add a short checklist for PR reviewers when adding new providers diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 0000000..d2d421a --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,60 @@ +Playwright fetch helper + +This helper uses Playwright to drive a browser to click the download button on a Vimm detail page and save the resulting file to disk. + +Usage examples + +Programmatic usage + +- Basic example (Python): + + ```py + from tool.playwright import PlaywrightTool + + tool = PlaywrightTool({}) + result = tool.download_file("https://vimm.net/vault/48075", selector="form#dl_form button[type=submit]", out_dir=None, timeout_sec=60) + if result.ok: + print(result.path) + else: + print("Download failed:", result.error) + ``` + +- Shell one-liners (PowerShell / Unix compatible): + + - PowerShell: + + ```powershell + python - <<'PY' + from tool.playwright import PlaywrightTool + r = PlaywrightTool().download_file("https://vimm.net/vault/48075") + print(r.to_dict()) + PY + ``` + + - Unix shell: + + ```sh + python -c "from tool.playwright import PlaywrightTool; import json; r=PlaywrightTool().download_file('https://vimm.net/vault/48075'); print(json.dumps(r.to_dict()))" + ``` + +- Download to a specific directory: + + ```py + tool.download_file("https://vimm.net/vault/48075", out_dir="C:\\tmp") + ``` + +- Pipe the result into `add-file`: + + Use one of the shell one-liners above and extract the `path` field from the returned JSON to pass to `CLI.py add-file`. For example, in Unix: + + ```sh + python -c "from tool.playwright import PlaywrightTool, json; r=PlaywrightTool().download_file('https://vimm.net/vault/48075'); print(r.to_dict())" | jq -r .path | xargs -I{} python CLI.py add-file -store default -path "{}" + ``` + +Notes + +- The script prints a single JSON line to stdout on completion. On success, `ok` is true and `path` contains the saved file path. +- Provider `Provider.vimm` will use Playwright when HTTP GET fails (4xx/5xx) or on network errors. Playwright is a required runtime dependency for these flows. + + +- Playwright must be available in the current Python environment; install with `pip install playwright && playwright install`. diff --git a/scripts/debug_import_vimm.py b/scripts/debug_import_vimm.py new file mode 100644 index 0000000..809178d --- /dev/null +++ b/scripts/debug_import_vimm.py @@ -0,0 +1,9 @@ +import importlib, traceback + +try: + m = importlib.import_module('Provider.vimm') + print('Imported', m) + print('Vimm class:', getattr(m, 'Vimm', None)) +except Exception as e: + print('Import failed:', e) + traceback.print_exc() diff --git a/scripts/list_providers.py b/scripts/list_providers.py new file mode 100644 index 0000000..073fb65 --- /dev/null +++ b/scripts/list_providers.py @@ -0,0 +1,4 @@ +from ProviderCore.registry import list_search_providers, list_providers + +print('Search providers:', list_search_providers()) +print('All providers:', list_providers()) diff --git a/tmp_trim_registry.py b/tmp_trim_registry.py deleted file mode 100644 index 4267295..0000000 --- a/tmp_trim_registry.py +++ /dev/null @@ -1,10 +0,0 @@ -from pathlib import Path - -path = Path("ProviderCore/registry.py") -text = path.read_text() -marker = '"""Provider registry.' -first = text.find(marker) -second = text.find(marker, first + 1) -if second != -1: - trimmed = text[:second].rstrip() + "\n" - path.write_text(trimmed, encoding="utf-8") diff --git a/tmp_write_registry.py b/tmp_write_registry.py deleted file mode 100644 index 2abda3f..0000000 --- a/tmp_write_registry.py +++ /dev/null @@ -1,3 +0,0 @@ -from pathlib import Path - -new_content = """""" \ No newline at end of file diff --git a/tool/playwright.py b/tool/playwright.py index 7285fc2..d4eaa0a 100644 --- a/tool/playwright.py +++ b/tool/playwright.py @@ -1,29 +1,24 @@ from __future__ import annotations import contextlib +import re +import tempfile +import traceback from dataclasses import dataclass -from typing import Any, Dict, Iterator, Optional +from pathlib import Path +from typing import Any, Dict, Iterator, Optional, Union from SYS.logger import debug -try: - from playwright.sync_api import TimeoutError as PlaywrightTimeoutError - from playwright.sync_api import sync_playwright - - HAS_PLAYWRIGHT = True - _PLAYWRIGHT_IMPORT_ERROR: Optional[Exception] = None -except Exception as exc: # pragma: no cover - HAS_PLAYWRIGHT = False - _PLAYWRIGHT_IMPORT_ERROR = exc - PlaywrightTimeoutError = TimeoutError # type: ignore - sync_playwright = None # type: ignore +from playwright.sync_api import TimeoutError as PlaywrightTimeoutError +from playwright.sync_api import sync_playwright # Re-export for consumers (e.g. cmdlets catching navigation timeouts) __all__ = [ - "HAS_PLAYWRIGHT", "PlaywrightTimeoutError", "PlaywrightTool", - "PlaywrightDefaults" + "PlaywrightDefaults", + "PlaywrightDownloadResult", ] @@ -36,6 +31,36 @@ def _get_nested(config: Dict[str, Any], *path: str) -> Any: return cur +def _resolve_out_dir(arg_outdir: Optional[Union[str, Path]]) -> Path: + """Resolve an output directory using config when possible.""" + if arg_outdir: + p = Path(arg_outdir) + p.mkdir(parents=True, exist_ok=True) + return p + + try: + from SYS.config import load_config, resolve_output_dir + + cfg = load_config() + p = resolve_output_dir(cfg) + try: + p.mkdir(parents=True, exist_ok=True) + except Exception: + pass + return p + except Exception: + return Path(tempfile.mkdtemp(prefix="pwdl_")) + + +def _find_filename_from_cd(cd: str) -> Optional[str]: + if not cd: + return None + m = re.search(r"filename\*?=(?:UTF-8''\s*)?\"?([^\";]+)\"?", cd) + if m: + return m.group(1) + return None + + @dataclass(slots=True) class PlaywrightDefaults: browser: str = "chromium" # chromium|firefox|webkit @@ -51,6 +76,24 @@ class PlaywrightDefaults: ignore_https_errors: bool = True +@dataclass(slots=True) +class PlaywrightDownloadResult: + ok: bool + path: Optional[Path] = None + url: Optional[str] = None + mode: Optional[str] = None + error: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "ok": bool(self.ok), + "path": str(self.path) if self.path else None, + "url": self.url, + "mode": self.mode, + "error": self.error, + } + + class PlaywrightTool: """Small wrapper to standardize Playwright defaults and lifecycle. @@ -130,13 +173,13 @@ class PlaywrightTool: ) def require(self) -> None: - if HAS_PLAYWRIGHT and sync_playwright is not None: - return - detail = str(_PLAYWRIGHT_IMPORT_ERROR or "playwright is not installed") - raise RuntimeError( - "playwright is required; install with: pip install playwright; then: playwright install\n" - f"detail: {detail}" - ) + """Ensure Playwright is present; raise a helpful RuntimeError if not.""" + try: + assert sync_playwright is not None + except Exception: + raise RuntimeError( + "playwright is required; install with: pip install playwright; then: playwright install" + ) @contextlib.contextmanager def open_page( @@ -147,6 +190,7 @@ class PlaywrightTool: viewport_width: Optional[int] = None, viewport_height: Optional[int] = None, ignore_https_errors: Optional[bool] = None, + accept_downloads: bool = False, ) -> Iterator[Any]: """Context manager yielding a Playwright page with sane defaults.""" self.require() @@ -198,6 +242,7 @@ class PlaywrightTool: "height": vh }, "ignore_https_errors": ihe, + "accept_downloads": bool(accept_downloads), } if ua_value is not None: context_kwargs["user_agent"] = ua_value @@ -233,6 +278,146 @@ class PlaywrightTool: except Exception: raise + def download_file( + self, + url: str, + *, + selector: str = "form#dl_form button[type=submit]", + out_dir: Optional[Union[str, Path]] = None, + timeout_sec: int = 60, + headless_first: bool = False, + debug_mode: bool = False, + ) -> PlaywrightDownloadResult: + """Download a file by clicking a selector and capturing the response. + + The helper mirrors the standalone `scripts/playwright_fetch.py` logic + and tries multiple click strategies (expect_download, tooltip continue, + submitDL, JS/mouse click) to coax stubborn sites. + """ + try: + self.require() + except Exception as exc: + return PlaywrightDownloadResult(ok=False, error=str(exc)) + + out_path_base = _resolve_out_dir(out_dir) + timeout_ms = max(10_000, int(timeout_sec) * 1000 if timeout_sec is not None else int(self.defaults.navigation_timeout_ms)) + nav_timeout_ms = max(timeout_ms, int(self.defaults.navigation_timeout_ms)) + selector_timeout_ms = 10_000 + + # Preserve legacy behaviour: headless_first=False tries headful then headless; True reverses the order. + order = [True, False] if headless_first else [False, True] + seen = set() + modes = [] + for m in order: + if m in seen: + continue + seen.add(m) + modes.append(m) + + last_error: Optional[str] = None + + for mode in modes: + try: + if debug_mode: + debug(f"[playwright] download url={url} selector={selector} headless={mode} out_dir={out_path_base}") + + with self.open_page(headless=mode, accept_downloads=True) as page: + page.goto(url, wait_until="networkidle", timeout=nav_timeout_ms) + page.wait_for_selector(selector, timeout=selector_timeout_ms) + self._wait_for_block_clear(page, timeout_ms=6000) + + el = page.query_selector(selector) + + # 1) Direct click with expect_download + try: + with page.expect_download(timeout=timeout_ms) as dl_info: + if el: + el.click() + else: + page.click(selector) + dl = dl_info.value + filename = dl.suggested_filename or Path(dl.url).name or "download" + out_path = out_path_base / filename + dl.save_as(str(out_path)) + return PlaywrightDownloadResult(ok=True, path=out_path, url=dl.url, mode="download") + except PlaywrightTimeoutError: + last_error = "download timeout" + except Exception as click_exc: + last_error = str(click_exc) or last_error + + # 2) Tooltip continue flow + try: + btn = page.query_selector("#tooltip4 input[type=button]") + if btn: + btn.click() + with page.expect_download(timeout=timeout_ms) as dl_info: + if el: + el.click() + else: + page.click(selector) + dl = dl_info.value + filename = dl.suggested_filename or Path(dl.url).name or "download" + out_path = out_path_base / filename + dl.save_as(str(out_path)) + return PlaywrightDownloadResult(ok=True, path=out_path, url=dl.url, mode="tooltip-download") + except Exception as tooltip_exc: + last_error = str(tooltip_exc) or last_error + + # 3) Submit handler that respects tooltip flow + try: + page.evaluate("() => { try { submitDL(document.forms['dl_form'], 'tooltip4'); } catch (e) {} }") + resp = page.wait_for_response( + lambda r: r.status == 200 and any(k.lower() == 'content-disposition' for k in r.headers.keys()), + timeout=timeout_ms, + ) + if resp: + out_path = self._save_response(resp, out_path_base) + if out_path: + return PlaywrightDownloadResult(ok=True, path=out_path, url=getattr(resp, "url", None), mode="response") + except Exception as resp_exc: + last_error = str(resp_exc) or last_error + + # 4) JS/mouse click and capture response + try: + if el: + try: + page.evaluate("el => el.click()", el) + except Exception: + page.evaluate(f"() => document.querySelector('{selector}').click()") + else: + page.evaluate(f"() => document.querySelector('{selector}').click()") + + if el: + try: + box = el.bounding_box() + if box: + page.mouse.move(box['x'] + box['width'] / 2, box['y'] + box['height'] / 2) + page.mouse.click(box['x'] + box['width'] / 2, box['y'] + box['height'] / 2) + except Exception: + pass + + resp = page.wait_for_response( + lambda r: r.status == 200 and any(k.lower() == 'content-disposition' for k in r.headers.keys()), + timeout=timeout_ms, + ) + if resp: + out_path = self._save_response(resp, out_path_base) + if out_path: + return PlaywrightDownloadResult(ok=True, path=out_path, url=getattr(resp, "url", None), mode="response-fallback") + except Exception as final_exc: + last_error = str(final_exc) or last_error + + except Exception as exc: + last_error = str(exc) + if debug_mode: + try: + debug(f"[playwright] attempt failed (headless={mode}): {traceback.format_exc()}") + except Exception: + pass + continue + + return PlaywrightDownloadResult(ok=False, error=last_error or "no download captured") + def debug_dump(self) -> None: try: debug( @@ -242,3 +427,34 @@ class PlaywrightTool: ) except Exception: pass + + def _wait_for_block_clear(self, page: Any, timeout_ms: int = 8000) -> bool: + try: + page.wait_for_function( + "() => { for (const k in window) { if (Object.prototype.hasOwnProperty.call(window, k) && k.startsWith('blocked_')) { try { return window[k] === false; } catch(e) {} return false; } } return true; }", + timeout=timeout_ms, + ) + return True + except Exception: + return False + + def _save_response(self, response: Any, out_dir: Path) -> Optional[Path]: + try: + cd = "" + try: + headers = getattr(response, "headers", {}) or {} + cd = "".join([v for k, v in headers.items() if str(k).lower() == "content-disposition"]) + except Exception: + cd = "" + + filename = _find_filename_from_cd(cd) or Path(str(getattr(response, "url", "") or "")).name or "download" + body = response.body() + out_path = out_dir / filename + out_path.write_bytes(body) + return out_path + except Exception as exc: + try: + debug(f"[playwright] failed to save response: {exc}") + except Exception: + pass + return None