"""Generic file downloader. Supports: - Direct HTTP file URLs (PDFs, images, documents; non-yt-dlp) - Piped provider items (uses provider.download when available) No streaming site logic; use download-media for yt-dlp/streaming. """ from __future__ import annotations import sys from pathlib import Path from typing import Any, Dict, List, Optional, Sequence from urllib.parse import urlparse from SYS.download import DownloadError, _download_direct_file from SYS.logger import log, debug import pipeline as pipeline_context from . import _shared as sh Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg SharedArgs = sh.SharedArgs parse_cmdlet_args = sh.parse_cmdlet_args register_url_with_local_library = sh.register_url_with_local_library coerce_to_pipe_object = sh.coerce_to_pipe_object get_field = sh.get_field class Download_File(Cmdlet): """Class-based download-file cmdlet - direct HTTP downloads.""" def __init__(self) -> None: """Initialize download-file cmdlet.""" super().__init__( name="download-file", summary="Download files via HTTP or provider handlers", usage="download-file [-path DIR] [options] OR @N | download-file [-path DIR] [options]", alias=["dl-file", "download-http"], arg=[ SharedArgs.URL, SharedArgs.PATH, # Prefer -path for output directory to match other cmdlets; keep -output for backwards compatibility. CmdletArg(name="-output", type="string", alias="o", description="(deprecated) Output directory (use -path instead)"), ], detail=["Download files directly via HTTP without yt-dlp processing.", "For streaming sites, use download-media."], exec=self.run, ) self.register() def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Main execution method.""" return self._run_impl(result, args, config) def _run_impl(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Main download implementation for direct HTTP files.""" try: debug("Starting download-file") # Parse arguments parsed = parse_cmdlet_args(args, self) # Extract explicit URL args (if any) raw_url = parsed.get("url", []) if isinstance(raw_url, str): raw_url = [raw_url] # If no URL args were provided, fall back to piped results (provider items) piped_items: List[Any] = [] if not raw_url: if isinstance(result, list): piped_items = result elif result: piped_items = [result] if not raw_url and not piped_items: log("No url or piped items to download", file=sys.stderr) return 1 # Get output directory final_output_dir = self._resolve_output_dir(parsed, config) if not final_output_dir: return 1 debug(f"Output directory: {final_output_dir}") # Download each URL and/or provider item downloaded_count = 0 quiet_mode = bool(config.get("_quiet_background_output")) if isinstance(config, dict) else False # Provider lookup is optional; keep import local to avoid overhead if unused get_search_provider = None SearchResult = None try: from ProviderCore.registry import get_search_provider as _get_search_provider, SearchResult as _SearchResult get_search_provider = _get_search_provider SearchResult = _SearchResult except Exception: get_search_provider = None SearchResult = None def _emit_local_file(downloaded_path: Path, source: Optional[str], title_hint: Optional[str], tags_hint: Optional[List[str]], media_kind_hint: Optional[str], full_metadata: Optional[Dict[str, Any]], provider_hint: Optional[str] = None) -> None: title_val = (title_hint or downloaded_path.stem or "Unknown").strip() or downloaded_path.stem hash_value = self._compute_file_hash(downloaded_path) tag: List[str] = [] if tags_hint: tag.extend([str(t) for t in tags_hint if t]) if not any(str(t).lower().startswith("title:") for t in tag): tag.insert(0, f"title:{title_val}") payload: Dict[str, Any] = { "path": str(downloaded_path), "hash": hash_value, "title": title_val, "action": "cmdlet:download-file", "download_mode": "file", "store": "local", "media_kind": media_kind_hint or "file", "tag": tag, } if provider_hint: payload["provider"] = str(provider_hint) if full_metadata: payload["full_metadata"] = full_metadata if source and str(source).startswith("http"): payload["url"] = source elif source: payload["source_url"] = source pipeline_context.emit(payload) # Automatically register url with local library if payload.get("url"): pipe_obj = coerce_to_pipe_object(payload) register_url_with_local_library(pipe_obj, config) # 1) Explicit URL downloads for url in raw_url: try: debug(f"Processing URL: {url}") # Telegram message URLs are not direct files; route through the provider. try: parsed = urlparse(str(url)) host = (parsed.hostname or "").lower().strip() except Exception: host = "" is_telegram = host in {"t.me", "telegram.me"} or host.endswith(".t.me") if is_telegram and SearchResult: try: from ProviderCore.registry import get_provider as _get_provider except Exception: _get_provider = None if _get_provider is None: raise DownloadError("Telegram provider registry not available") provider = _get_provider("telegram", config) if provider is None: raise DownloadError("Telegram provider not configured or not available (check telethon/app_id/api_hash)") sr = SearchResult(table="telegram", title=str(url), path=str(url), full_metadata={}) downloaded_path = None telegram_info: Optional[Dict[str, Any]] = None if hasattr(provider, "download_url"): try: downloaded_path, telegram_info = provider.download_url(str(url), final_output_dir) # type: ignore[attr-defined] except Exception as exc: raise DownloadError(str(exc)) else: downloaded_path = provider.download(sr, final_output_dir) if not downloaded_path: raise DownloadError("Telegram download returned no file") channel = "" post = None if isinstance(telegram_info, dict): try: chat_info = telegram_info.get("chat") if isinstance(telegram_info.get("chat"), dict) else {} msg_info = telegram_info.get("message") if isinstance(telegram_info.get("message"), dict) else {} channel = str(chat_info.get("title") or chat_info.get("username") or "").strip() post = msg_info.get("id") except Exception: channel = "" post = None title_hint = None tags_hint: List[str] = [] if channel: tags_hint.append(f"channel:{channel}") if post is not None: tags_hint.append(f"post:{post}") if channel and post is not None: title_hint = f"{channel} {post}" elif post is not None: title_hint = f"post:{post}" else: title_hint = downloaded_path.stem _emit_local_file( downloaded_path=downloaded_path, source=str(url), title_hint=title_hint, tags_hint=tags_hint, media_kind_hint="file", full_metadata=telegram_info, provider_hint="telegram", ) downloaded_count += 1 debug("✓ Downloaded via Telegram provider and emitted") continue result_obj = _download_direct_file(url, final_output_dir, quiet=quiet_mode) file_path = None if hasattr(result_obj, "path"): file_path = getattr(result_obj, "path") elif isinstance(result_obj, dict): file_path = result_obj.get("path") if not file_path: file_path = str(result_obj) downloaded_path = Path(str(file_path)) _emit_local_file( downloaded_path=downloaded_path, source=url, title_hint=downloaded_path.stem, tags_hint=[f"title:{downloaded_path.stem}"], media_kind_hint="file", full_metadata=None, ) downloaded_count += 1 debug("✓ Downloaded and emitted") except DownloadError as e: log(f"Download failed for {url}: {e}", file=sys.stderr) except Exception as e: log(f"Error processing {url}: {e}", file=sys.stderr) # 2) Provider item downloads (piped results) # Expand provider "folder" rows into their contained files when possible (e.g., AllDebrid magnets). expanded_items: List[Any] = [] for item in piped_items: try: table = get_field(item, "table") media_kind = get_field(item, "media_kind") full_metadata = get_field(item, "full_metadata") target = get_field(item, "path") or get_field(item, "url") if str(table or "").lower() == "alldebrid" and str(media_kind or "").lower() == "folder": magnet_id = None if isinstance(full_metadata, dict): magnet_id = full_metadata.get("magnet_id") if magnet_id is None and isinstance(target, str) and target.lower().startswith("alldebrid:magnet:"): try: magnet_id = int(target.split(":")[-1]) except Exception: magnet_id = None if magnet_id is not None and get_search_provider is not None: provider = get_search_provider("alldebrid", config) if provider is not None: try: files = provider.search("*", limit=10_000, filters={"view": "files", "magnet_id": int(magnet_id)}) except Exception: files = [] # If the magnet isn't ready, provider.search returns a single not-ready folder row. if files and len(files) == 1 and getattr(files[0], "media_kind", "") == "folder": detail = getattr(files[0], "detail", "") log(f"[download-file] AllDebrid magnet {magnet_id} not ready ({detail or 'unknown'})", file=sys.stderr) else: for sr in files: expanded_items.append(sr.to_dict() if hasattr(sr, "to_dict") else sr) continue expanded_items.append(item) except Exception: expanded_items.append(item) for item in expanded_items: try: table = get_field(item, "table") title = get_field(item, "title") target = get_field(item, "path") or get_field(item, "url") media_kind = get_field(item, "media_kind") tags_val = get_field(item, "tag") tags_list: Optional[List[str]] if isinstance(tags_val, list): tags_list = [str(t) for t in tags_val if t] else: tags_list = None full_metadata = get_field(item, "full_metadata") if (not full_metadata) and isinstance(item, dict) and isinstance(item.get("extra"), dict): extra_md = item["extra"].get("full_metadata") if isinstance(extra_md, dict): full_metadata = extra_md # If this looks like a provider item and providers are available, prefer provider.download() downloaded_path: Optional[Path] = None attempted_provider_download = False if table and get_search_provider and SearchResult: provider = get_search_provider(str(table), config) if provider is not None: attempted_provider_download = True sr = SearchResult( table=str(table), title=str(title or "Unknown"), path=str(target or ""), full_metadata=full_metadata if isinstance(full_metadata, dict) else {}, ) debug(f"[download-file] Downloading provider item via {table}: {sr.title}") downloaded_path = provider.download(sr, final_output_dir) # OpenLibrary: if provider download failed, do NOT try to download the OpenLibrary page HTML. if downloaded_path is None and attempted_provider_download and str(table or "").lower() == "openlibrary": availability = None reason = None if isinstance(full_metadata, dict): availability = full_metadata.get("availability") reason = full_metadata.get("availability_reason") msg = "[download-file] OpenLibrary item not downloadable" if availability or reason: msg += f" (availability={availability or ''} reason={reason or ''})" log(msg, file=sys.stderr) # Fallback: run a LibGen title search so the user can pick an alternative source. try: title_text = str(title or "").strip() if not title_text and isinstance(full_metadata, dict): title_text = str(full_metadata.get("title") or "").strip() if title_text: log(f"[download-file] Not available on OpenLibrary; searching LibGen for: {title_text}", file=sys.stderr) from cmdlet.search_provider import CMDLET as _SEARCH_PROVIDER_CMDLET # Use plain title text (LibGen mirrors can be finicky with fielded query prefixes). fallback_query = title_text exec_fn = getattr(_SEARCH_PROVIDER_CMDLET, "exec", None) if not callable(exec_fn): log("[download-file] search-provider cmdlet unavailable; cannot run LibGen fallback search", file=sys.stderr) continue ret = exec_fn( None, ["-provider", "libgen", "-query", fallback_query], config, ) # download-file is treated as an action command by the pipeline printer. # Promote the search-provider table to a display overlay so it renders. try: table = pipeline_context.get_last_result_table() items = pipeline_context.get_last_result_items() if table is not None: pipeline_context.set_last_result_table_overlay(table, items) except Exception: pass try: return int(ret) # type: ignore[arg-type] except Exception: return 1 except Exception: pass continue # Fallback: if we have a direct HTTP URL, download it directly if downloaded_path is None and isinstance(target, str) and target.startswith("http"): # Guard: provider landing pages (e.g. LibGen ads.php) are HTML, not files. # Never download these as "files". if str(table or "").lower() == "libgen": low = target.lower() if ("/ads.php" in low) or ("/file.php" in low) or ("/index.php" in low): log("[download-file] Refusing to download LibGen landing page (expected provider to resolve file link)", file=sys.stderr) continue debug(f"[download-file] Provider item looks like direct URL, downloading: {target}") # Use provider title as filename hint so multiple items don't overwrite as downloaded_file.bin suggested_name = str(title).strip() if title is not None else None result_obj = _download_direct_file( target, final_output_dir, quiet=quiet_mode, suggested_filename=suggested_name, ) file_path = None if hasattr(result_obj, "path"): file_path = getattr(result_obj, "path") elif isinstance(result_obj, dict): file_path = result_obj.get("path") if not file_path: file_path = str(result_obj) downloaded_path = Path(str(file_path)) if downloaded_path is None: log(f"Cannot download item (no provider handler / unsupported target): {title or target}", file=sys.stderr) continue _emit_local_file( downloaded_path=downloaded_path, source=str(target) if target else None, title_hint=str(title) if title else downloaded_path.stem, tags_hint=tags_list, media_kind_hint=str(media_kind) if media_kind else None, full_metadata=full_metadata if isinstance(full_metadata, dict) else None, ) downloaded_count += 1 except DownloadError as e: log(f"Download failed: {e}", file=sys.stderr) except Exception as e: log(f"Error downloading item: {e}", file=sys.stderr) if downloaded_count > 0: debug(f"✓ Successfully processed {downloaded_count} file(s)") return 0 log("No downloads completed", file=sys.stderr) return 1 except Exception as e: log(f"Error in download-file: {e}", file=sys.stderr) return 1 def _resolve_output_dir(self, parsed: Dict[str, Any], config: Dict[str, Any]) -> Optional[Path]: """Resolve the output directory from storage location or config.""" output_dir_arg = parsed.get("path") or parsed.get("output") if output_dir_arg: try: out_path = Path(str(output_dir_arg)).expanduser() out_path.mkdir(parents=True, exist_ok=True) return out_path except Exception as e: log(f"Cannot use output directory {output_dir_arg}: {e}", file=sys.stderr) return None storage_location = parsed.get("storage") # Priority 1: --storage flag if storage_location: try: return SharedArgs.resolve_storage(storage_location) except Exception as e: log(f"Invalid storage location: {e}", file=sys.stderr) return None # Priority 2: Config default output/temp directory try: from config import resolve_output_dir final_output_dir = resolve_output_dir(config) except Exception: final_output_dir = Path.home() / "Downloads" debug(f"Using default directory: {final_output_dir}") # Ensure directory exists try: final_output_dir.mkdir(parents=True, exist_ok=True) except Exception as e: log(f"Cannot create output directory {final_output_dir}: {e}", file=sys.stderr) return None return final_output_dir def _compute_file_hash(self, filepath: Path) -> str: """Compute SHA256 hash of a file.""" import hashlib sha256_hash = hashlib.sha256() with open(filepath, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() # Module-level singleton registration CMDLET = Download_File()