Files
Medios-Macina/cmdlet/download_file.py

354 lines
16 KiB
Python
Raw Normal View History

2025-12-11 23:21:45 -08:00
"""Generic file downloader.
2025-12-11 12:47:30 -08:00
2025-12-11 23:21:45 -08:00
Supports:
- Direct HTTP file URLs (PDFs, images, documents; non-yt-dlp)
- Piped provider items (uses provider.download when available)
2025-12-11 12:47:30 -08:00
2025-12-11 23:21:45 -08:00
No streaming site logic; use download-media for yt-dlp/streaming.
2025-12-11 12:47:30 -08:00
"""
from __future__ import annotations
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence
2025-12-11 19:04:02 -08:00
from SYS.download import DownloadError, _download_direct_file
from SYS.logger import log, debug
2025-12-11 12:47:30 -08:00
import pipeline as pipeline_context
2025-12-16 23:23:43 -08:00
from . import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
parse_cmdlet_args = sh.parse_cmdlet_args
register_url_with_local_library = sh.register_url_with_local_library
coerce_to_pipe_object = sh.coerce_to_pipe_object
get_field = sh.get_field
2025-12-11 12:47:30 -08:00
class Download_File(Cmdlet):
"""Class-based download-file cmdlet - direct HTTP downloads."""
def __init__(self) -> None:
"""Initialize download-file cmdlet."""
super().__init__(
name="download-file",
2025-12-11 23:21:45 -08:00
summary="Download files via HTTP or provider handlers",
usage="download-file <url> [options] OR @N | download-file [options]",
2025-12-11 12:47:30 -08:00
alias=["dl-file", "download-http"],
arg=[
2025-12-11 23:21:45 -08:00
CmdletArg(name="output", type="string", alias="o", description="Output directory (overrides defaults)"),
SharedArgs.URL,
2025-12-11 12:47:30 -08:00
],
detail=["Download files directly via HTTP without yt-dlp processing.", "For streaming sites, use download-media."],
exec=self.run,
)
self.register()
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Main execution method."""
stage_ctx = pipeline_context.get_stage_context()
in_pipeline = stage_ctx is not None and getattr(stage_ctx, "total_stages", 1) > 1
if in_pipeline and isinstance(config, dict):
config["_quiet_background_output"] = True
return self._run_impl(result, args, config)
def _run_impl(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Main download implementation for direct HTTP files."""
try:
debug("Starting download-file")
# Parse arguments
parsed = parse_cmdlet_args(args, self)
2025-12-11 23:21:45 -08:00
# Extract explicit URL args (if any)
2025-12-11 12:47:30 -08:00
raw_url = parsed.get("url", [])
if isinstance(raw_url, str):
raw_url = [raw_url]
2025-12-11 23:21:45 -08:00
# If no URL args were provided, fall back to piped results (provider items)
piped_items: List[Any] = []
2025-12-11 12:47:30 -08:00
if not raw_url:
2025-12-11 23:21:45 -08:00
if isinstance(result, list):
piped_items = result
elif result:
piped_items = [result]
if not raw_url and not piped_items:
log("No url or piped items to download", file=sys.stderr)
2025-12-11 12:47:30 -08:00
return 1
# Get output directory
final_output_dir = self._resolve_output_dir(parsed, config)
if not final_output_dir:
return 1
debug(f"Output directory: {final_output_dir}")
2025-12-11 23:21:45 -08:00
# Download each URL and/or provider item
2025-12-11 12:47:30 -08:00
downloaded_count = 0
quiet_mode = bool(config.get("_quiet_background_output")) if isinstance(config, dict) else False
2025-12-11 23:21:45 -08:00
# Provider lookup is optional; keep import local to avoid overhead if unused
get_search_provider = None
SearchResult = None
try:
2025-12-12 21:55:38 -08:00
from ProviderCore.registry import get_search_provider as _get_search_provider, SearchResult as _SearchResult
2025-12-11 23:21:45 -08:00
get_search_provider = _get_search_provider
SearchResult = _SearchResult
except Exception:
get_search_provider = None
SearchResult = None
def _emit_local_file(downloaded_path: Path, source: Optional[str], title_hint: Optional[str], tags_hint: Optional[List[str]], media_kind_hint: Optional[str], full_metadata: Optional[Dict[str, Any]]) -> None:
title_val = (title_hint or downloaded_path.stem or "Unknown").strip() or downloaded_path.stem
hash_value = self._compute_file_hash(downloaded_path)
tag: List[str] = []
if tags_hint:
tag.extend([str(t) for t in tags_hint if t])
if not any(str(t).lower().startswith("title:") for t in tag):
tag.insert(0, f"title:{title_val}")
payload: Dict[str, Any] = {
"path": str(downloaded_path),
"hash": hash_value,
"title": title_val,
"action": "cmdlet:download-file",
"download_mode": "file",
"store": "local",
"media_kind": media_kind_hint or "file",
"tag": tag,
}
if full_metadata:
payload["full_metadata"] = full_metadata
if source and str(source).startswith("http"):
payload["url"] = source
elif source:
payload["source_url"] = source
pipeline_context.emit(payload)
# Automatically register url with local library
if payload.get("url"):
pipe_obj = coerce_to_pipe_object(payload)
register_url_with_local_library(pipe_obj, config)
# 1) Explicit URL downloads
2025-12-11 12:47:30 -08:00
for url in raw_url:
try:
2025-12-11 23:21:45 -08:00
debug(f"Processing URL: {url}")
2025-12-11 12:47:30 -08:00
result_obj = _download_direct_file(url, final_output_dir, quiet=quiet_mode)
2025-12-11 23:21:45 -08:00
file_path = None
if hasattr(result_obj, "path"):
file_path = getattr(result_obj, "path")
elif isinstance(result_obj, dict):
file_path = result_obj.get("path")
if not file_path:
file_path = str(result_obj)
downloaded_path = Path(str(file_path))
_emit_local_file(
downloaded_path=downloaded_path,
source=url,
title_hint=downloaded_path.stem,
tags_hint=[f"title:{downloaded_path.stem}"],
media_kind_hint="file",
full_metadata=None,
)
2025-12-11 12:47:30 -08:00
downloaded_count += 1
debug("✓ Downloaded and emitted")
except DownloadError as e:
log(f"Download failed for {url}: {e}", file=sys.stderr)
except Exception as e:
log(f"Error processing {url}: {e}", file=sys.stderr)
2025-12-11 23:21:45 -08:00
# 2) Provider item downloads (piped results)
for item in piped_items:
try:
table = get_field(item, "table")
title = get_field(item, "title")
target = get_field(item, "path") or get_field(item, "url")
media_kind = get_field(item, "media_kind")
tags_val = get_field(item, "tag")
tags_list: Optional[List[str]]
if isinstance(tags_val, list):
tags_list = [str(t) for t in tags_val if t]
else:
tags_list = None
full_metadata = get_field(item, "full_metadata")
if (not full_metadata) and isinstance(item, dict) and isinstance(item.get("extra"), dict):
extra_md = item["extra"].get("full_metadata")
if isinstance(extra_md, dict):
full_metadata = extra_md
# If this looks like a provider item and providers are available, prefer provider.download()
downloaded_path: Optional[Path] = None
2025-12-14 00:53:52 -08:00
attempted_provider_download = False
2025-12-11 23:21:45 -08:00
if table and get_search_provider and SearchResult:
provider = get_search_provider(str(table), config)
if provider is not None:
2025-12-14 00:53:52 -08:00
attempted_provider_download = True
2025-12-11 23:21:45 -08:00
sr = SearchResult(
table=str(table),
title=str(title or "Unknown"),
path=str(target or ""),
full_metadata=full_metadata if isinstance(full_metadata, dict) else {},
)
debug(f"[download-file] Downloading provider item via {table}: {sr.title}")
downloaded_path = provider.download(sr, final_output_dir)
2025-12-14 00:53:52 -08:00
# OpenLibrary: if provider download failed, do NOT try to download the OpenLibrary page HTML.
if downloaded_path is None and attempted_provider_download and str(table or "").lower() == "openlibrary":
availability = None
reason = None
if isinstance(full_metadata, dict):
availability = full_metadata.get("availability")
reason = full_metadata.get("availability_reason")
msg = "[download-file] OpenLibrary item not downloadable"
if availability or reason:
msg += f" (availability={availability or ''} reason={reason or ''})"
log(msg, file=sys.stderr)
2025-12-16 01:45:01 -08:00
# Fallback: run a LibGen title search so the user can pick an alternative source.
try:
title_text = str(title or "").strip()
if not title_text and isinstance(full_metadata, dict):
title_text = str(full_metadata.get("title") or "").strip()
if title_text:
log(f"[download-file] Not available on OpenLibrary; searching LibGen for: {title_text}", file=sys.stderr)
from cmdlet.search_provider import CMDLET as _SEARCH_PROVIDER_CMDLET
# Use plain title text (LibGen mirrors can be finicky with fielded query prefixes).
fallback_query = title_text
ret = _SEARCH_PROVIDER_CMDLET.exec(
None,
["-provider", "libgen", "-query", fallback_query],
config,
)
# download-file is treated as an action command by the pipeline printer.
# Promote the search-provider table to a display overlay so it renders.
try:
table = pipeline_context.get_last_result_table()
items = pipeline_context.get_last_result_items()
if table is not None:
pipeline_context.set_last_result_table_overlay(table, items)
except Exception:
pass
return int(ret)
except Exception:
pass
2025-12-14 00:53:52 -08:00
continue
2025-12-11 23:21:45 -08:00
# Fallback: if we have a direct HTTP URL, download it directly
if downloaded_path is None and isinstance(target, str) and target.startswith("http"):
2025-12-16 23:23:43 -08:00
# Guard: provider landing pages (e.g. LibGen ads.php) are HTML, not files.
# Never download these as "files".
if str(table or "").lower() == "libgen":
low = target.lower()
if ("/ads.php" in low) or ("/file.php" in low) or ("/index.php" in low):
log("[download-file] Refusing to download LibGen landing page (expected provider to resolve file link)", file=sys.stderr)
continue
2025-12-11 23:21:45 -08:00
debug(f"[download-file] Provider item looks like direct URL, downloading: {target}")
result_obj = _download_direct_file(target, final_output_dir, quiet=quiet_mode)
file_path = None
if hasattr(result_obj, "path"):
file_path = getattr(result_obj, "path")
elif isinstance(result_obj, dict):
file_path = result_obj.get("path")
if not file_path:
file_path = str(result_obj)
downloaded_path = Path(str(file_path))
if downloaded_path is None:
log(f"Cannot download item (no provider handler / unsupported target): {title or target}", file=sys.stderr)
continue
_emit_local_file(
downloaded_path=downloaded_path,
source=str(target) if target else None,
title_hint=str(title) if title else downloaded_path.stem,
tags_hint=tags_list,
media_kind_hint=str(media_kind) if media_kind else None,
full_metadata=full_metadata if isinstance(full_metadata, dict) else None,
)
downloaded_count += 1
except DownloadError as e:
log(f"Download failed: {e}", file=sys.stderr)
except Exception as e:
log(f"Error downloading item: {e}", file=sys.stderr)
2025-12-11 12:47:30 -08:00
if downloaded_count > 0:
debug(f"✓ Successfully processed {downloaded_count} file(s)")
return 0
log("No downloads completed", file=sys.stderr)
return 1
except Exception as e:
log(f"Error in download-file: {e}", file=sys.stderr)
return 1
def _resolve_output_dir(self, parsed: Dict[str, Any], config: Dict[str, Any]) -> Optional[Path]:
"""Resolve the output directory from storage location or config."""
2025-12-11 23:21:45 -08:00
output_dir_arg = parsed.get("output")
if output_dir_arg:
try:
out_path = Path(str(output_dir_arg)).expanduser()
out_path.mkdir(parents=True, exist_ok=True)
return out_path
except Exception as e:
log(f"Cannot use output directory {output_dir_arg}: {e}", file=sys.stderr)
return None
2025-12-11 12:47:30 -08:00
storage_location = parsed.get("storage")
# Priority 1: --storage flag
if storage_location:
try:
return SharedArgs.resolve_storage(storage_location)
except Exception as e:
log(f"Invalid storage location: {e}", file=sys.stderr)
return None
2025-12-13 00:18:30 -08:00
# Priority 2: Config default output/temp directory
try:
from config import resolve_output_dir
final_output_dir = resolve_output_dir(config)
except Exception:
final_output_dir = Path.home() / "Downloads"
2025-12-11 12:47:30 -08:00
debug(f"Using default directory: {final_output_dir}")
# Ensure directory exists
try:
final_output_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
log(f"Cannot create output directory {final_output_dir}: {e}", file=sys.stderr)
return None
return final_output_dir
def _compute_file_hash(self, filepath: Path) -> str:
"""Compute SHA256 hash of a file."""
import hashlib
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
# Module-level singleton registration
CMDLET = Download_File()