"""Smart downloader front-door. Currently focused on Internet Archive item pages: - Takes a piped InternetArchive search-provider row (table=internetarchive) or an archive.org details URL - Displays a selectable table of available files/formats (PDF/ZIP/OCR/etc) - Selecting a row via @N expands to download-file This enables: search-provider -provider internetarchive "..." @3 # shows formats table @2 | add-file ... # downloads selected file then pipes to add-file """ from __future__ import annotations import re import sys from typing import Any, Dict, List, Sequence, cast from urllib.parse import quote from SYS.logger import log, debug import pipeline as pipeline_context from result_table import ResultTable from . import _shared as sh Cmdlet = sh.Cmdlet SharedArgs = sh.SharedArgs parse_cmdlet_args = sh.parse_cmdlet_args get_field = sh.get_field def _extract_ia_identifier(text: str) -> str: s = str(text or "").strip() if not s: return "" # https://archive.org/details/ m = re.search(r"archive\.org/(?:details|download)/([^/?#\s]+)", s, flags=re.IGNORECASE) if m: return str(m.group(1) or "").strip() # internetarchive: if s.lower().startswith("internetarchive:"): return s.split(":", 1)[-1].strip() return "" class Download_Data(Cmdlet): def __init__(self) -> None: super().__init__( name="download-data", summary="List downloadable files/formats for provider items (e.g., Internet Archive)", usage="download-data OR @N | download-data (provider item), then select a file with @N", alias=[], arg=[SharedArgs.URL], detail=[ "For Internet Archive item pages, shows a selectable list of available files (PDF/ZIP/OCR/etc).", "Select a file row with @N to run download-file on that direct URL.", ], exec=self.run, ) self.register() def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: try: # parse_cmdlet_args typing varies across cmdlets; keep runtime behavior. parsed = parse_cmdlet_args(args, cast(Any, self)) except Exception: parsed = {} raw_urls = parsed.get("url", []) if isinstance(raw_urls, str): raw_urls = [raw_urls] url_arg = str(raw_urls[0]).strip() if raw_urls else "" piped_items: List[Any] = [] if isinstance(result, list): piped_items = list(result) elif result is not None: piped_items = [result] # Prefer piped item target if present. target = "" if piped_items: target = str(get_field(piped_items[0], "path") or get_field(piped_items[0], "url") or "").strip() if not target: target = url_arg table_name = "" try: table_name = str(get_field(piped_items[0], "table") or "").strip().lower() if piped_items else "" except Exception: table_name = "" identifier = "" if piped_items: md = get_field(piped_items[0], "full_metadata") if isinstance(md, dict): identifier = str(md.get("identifier") or "").strip() if not identifier: identifier = _extract_ia_identifier(target) if table_name == "internetarchive" or ("archive.org" in target.lower() and identifier): return self._run_internetarchive(piped_items[0] if piped_items else None, identifier=identifier) log("download-data: unsupported target (currently only Internet Archive item pages are supported)", file=sys.stderr) return 1 @staticmethod def _run_internetarchive(item: Any, *, identifier: str) -> int: try: from Provider.internetarchive import _ia as _ia_loader except Exception as exc: log(f"download-data: Internet Archive provider unavailable: {exc}", file=sys.stderr) return 1 def _is_ia_metadata_file(f: Dict[str, Any]) -> bool: try: source = str(f.get("source") or "").strip().lower() fmt = str(f.get("format") or "").strip().lower() except Exception: source = "" fmt = "" if source == "metadata": return True if fmt in {"metadata", "archive bittorrent"}: return True if fmt.startswith("thumbnail"): return True return False ia = None try: ia = _ia_loader() except Exception as exc: log(f"download-data: Internet Archive module unavailable: {exc}", file=sys.stderr) return 1 try: get_item = getattr(ia, "get_item", None) if not callable(get_item): raise Exception("internetarchive.get_item is not available") ia_item = cast(Any, get_item(str(identifier))) except Exception as exc: log(f"download-data: Internet Archive item lookup failed: {exc}", file=sys.stderr) return 1 files: List[Dict[str, Any]] = [] try: raw_files = getattr(ia_item, "files", None) if isinstance(raw_files, list): for f in raw_files: if isinstance(f, dict): files.append(f) except Exception: files = [] if not files: try: for f in ia_item.get_files(): name = getattr(f, "name", None) if not name and isinstance(f, dict): name = f.get("name") if not name: continue files.append( { "name": str(name), "size": getattr(f, "size", None), "format": getattr(f, "format", None), "source": getattr(f, "source", None), } ) except Exception: files = [] if not files: log("download-data: Internet Archive item has no files", file=sys.stderr) return 1 # Prefer non-metadata files for the picker. candidates = [f for f in files if not _is_ia_metadata_file(f)] if not candidates: candidates = list(files) def _key(f: Dict[str, Any]) -> tuple[str, str]: fmt = str(f.get("format") or "").strip().lower() name = str(f.get("name") or "").strip().lower() return (fmt, name) candidates.sort(key=_key) title = "" try: title = str(get_field(item, "title") or "").strip() except Exception: title = "" table_title = f"Internet Archive: {title}".strip().rstrip(":") if not title: table_title = f"Internet Archive: {identifier}".strip().rstrip(":") table = ResultTable(table_title).set_preserve_order(True) table.set_table("internetarchive.formats") # Selecting a row should expand to `download-file `. table.set_source_command("download-file", []) rows: List[Dict[str, Any]] = [] for f in candidates: name = str(f.get("name") or "").strip() if not name: continue fmt = str(f.get("format") or "").strip() src = str(f.get("source") or "").strip() size_val: Any = f.get("size") try: size_val = int(size_val) if size_val not in (None, "") else "" except Exception: # Keep as-is; ResultTable will stringify. pass direct_url = f"https://archive.org/download/{identifier}/{quote(name, safe='')}" row_item: Dict[str, Any] = { "table": "internetarchive", "title": fmt or name, "path": direct_url, "url": direct_url, "columns": [ ("Format", fmt), ("Name", name), ("Size", size_val), ("Source", src), ], # Used by @N expansion: download-file "_selection_args": [direct_url], "full_metadata": { "identifier": identifier, "name": name, "format": fmt, "source": src, "size": f.get("size"), }, } rows.append(row_item) table.add_result(row_item) if not rows: log("download-data: no downloadable files found for this item", file=sys.stderr) return 1 try: pipeline_context.set_last_result_table(table, rows, subject=item) pipeline_context.set_current_stage_table(table) except Exception as exc: debug(f"[download-data] Failed to register result table: {exc}") return 0 CMDLET = Download_Data()