Files
Medios-Macina/cmdlet/download_file.py
Nose ef01ca03a0
Some checks failed
smoke-mm / Install & smoke test mm --help (push) Has been cancelled
Migrate imports to SYS package (pipeline/result_table) and update related imports
2025-12-29 23:28:15 -08:00

1414 lines
59 KiB
Python

"""Generic file downloader.
Supports:
- Direct HTTP file URLs (PDFs, images, documents; non-yt-dlp)
- Piped provider items (uses provider.download when available)
No streaming site logic; use download-media for yt-dlp/streaming.
"""
from __future__ import annotations
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence
from urllib.parse import urlparse
from SYS.download import DownloadError, _download_direct_file
from SYS.logger import log, debug
from SYS.pipeline_progress import PipelineProgress
from SYS import pipeline as pipeline_context
from . import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
parse_cmdlet_args = sh.parse_cmdlet_args
register_url_with_local_library = sh.register_url_with_local_library
coerce_to_pipe_object = sh.coerce_to_pipe_object
get_field = sh.get_field
class Download_File(Cmdlet):
"""Class-based download-file cmdlet - direct HTTP downloads."""
def __init__(self) -> None:
"""Initialize download-file cmdlet."""
super().__init__(
name="download-file",
summary="Download files via HTTP or provider handlers",
usage=
"download-file <url> [-path DIR] [options] OR @N | download-file [-path DIR|DIR] [options]",
alias=["dl-file",
"download-http"],
arg=[
SharedArgs.URL,
SharedArgs.PATH,
# Prefer -path for output directory to match other cmdlets; keep -output for backwards compatibility.
CmdletArg(
name="-output",
type="string",
alias="o",
description="(deprecated) Output directory (use -path instead)",
),
],
detail=[
"Download files directly via HTTP without yt-dlp processing.",
"For streaming sites, use download-media.",
"For Internet Archive item pages (archive.org/details/...), shows a selectable file list; pick with @N to download.",
],
exec=self.run,
)
self.register()
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Main execution method."""
return self._run_impl(result, args, config)
@staticmethod
def _normalize_urls(parsed: Dict[str, Any]) -> List[str]:
raw_url = parsed.get("url", [])
if isinstance(raw_url, str):
raw_url = [raw_url]
expanded_urls: List[str] = []
for u in raw_url or []:
if u is None:
continue
s = str(u).strip()
if not s:
continue
if "," in s:
parts = [p.strip() for p in s.split(",")]
expanded_urls.extend([p for p in parts if p])
else:
expanded_urls.append(s)
return expanded_urls
@staticmethod
def _collect_piped_items_if_no_urls(result: Any,
raw_urls: Sequence[str]) -> List[Any]:
if raw_urls:
return []
if isinstance(result, list):
return list(result)
if result:
return [result]
return []
@staticmethod
def _safe_total_items(raw_urls: Sequence[str], piped_items: Sequence[Any]) -> int:
try:
return int(len(raw_urls or []) + len(piped_items or []))
except Exception:
return 1
@staticmethod
def _build_preview(
raw_urls: Sequence[str],
piped_items: Sequence[Any],
total_items: int
) -> List[Any]:
try:
preview: List[Any] = []
preview.extend(list(raw_urls or [])[:max(0, total_items)])
if len(preview) < total_items:
preview.extend(
list(piped_items or [])[:max(0,
total_items - len(preview))]
)
return preview
except Exception:
return []
@staticmethod
def _load_provider_registry() -> Dict[str, Any]:
try:
from ProviderCore.registry import (
get_search_provider as _get_search_provider,
get_provider as _get_provider,
match_provider_name_for_url as _match_provider_name_for_url,
SearchResult as _SearchResult,
)
return {
"get_search_provider": _get_search_provider,
"get_provider": _get_provider,
"match_provider_name_for_url": _match_provider_name_for_url,
"SearchResult": _SearchResult,
}
except Exception:
return {
"get_search_provider": None,
"get_provider": None,
"match_provider_name_for_url": None,
"SearchResult": None,
}
@staticmethod
def _maybe_show_internetarchive_formats(
*,
raw_urls: Sequence[str],
piped_items: Sequence[Any],
parsed: Dict[str,
Any],
config: Dict[str,
Any],
quiet_mode: bool,
) -> Optional[int]:
"""If the input is an IA item page, show a selectable formats table.
Returns an exit code when handled; otherwise None.
"""
if quiet_mode:
return None
try:
total_inputs = int(len(raw_urls or []) + len(piped_items or []))
except Exception:
total_inputs = 0
if total_inputs != 1:
return None
item = piped_items[0] if piped_items else None
target = ""
if item is not None:
try:
target = str(get_field(item,
"path") or get_field(item,
"url") or "").strip()
except Exception:
target = ""
if not target and raw_urls:
target = str(raw_urls[0]).strip()
if not target:
return None
try:
from Provider import internetarchive as ia
except Exception:
return None
identifier = ""
try:
md = get_field(item, "full_metadata") if item is not None else None
if isinstance(md, dict):
identifier = str(md.get("identifier") or "").strip()
except Exception:
identifier = ""
if not identifier:
try:
identifier = str(ia.extract_identifier(target) or "").strip()
except Exception:
identifier = ""
if not identifier:
return None
# Only show picker for item pages (details); direct download URLs should download immediately.
try:
if not ia.is_details_url(target):
return None
except Exception:
return None
try:
files = ia.list_download_files(identifier)
except Exception as exc:
log(
f"download-file: Internet Archive lookup failed: {exc}",
file=sys.stderr
)
return 1
if not files:
log(
"download-file: Internet Archive item has no downloadable files",
file=sys.stderr
)
return 1
title = ""
try:
title = str(get_field(item,
"title") or "").strip() if item is not None else ""
except Exception:
title = ""
table_title = (
f"Internet Archive: {title}".strip().rstrip(":")
if title else f"Internet Archive: {identifier}"
)
try:
from SYS.result_table import ResultTable
except Exception as exc:
log(f"download-file: ResultTable unavailable: {exc}", file=sys.stderr)
return 1
base_args: List[str] = []
out_arg = parsed.get("path") or parsed.get("output")
if out_arg:
base_args.extend(["-path", str(out_arg)])
table = ResultTable(table_title).set_preserve_order(True)
table.set_table("internetarchive.formats")
table.set_source_command("download-file", base_args)
rows: List[Dict[str, Any]] = []
for f in files:
name = str(f.get("name") or "").strip()
if not name:
continue
fmt = str(f.get("format") or "").strip()
src = str(f.get("source") or "").strip()
direct_url = str(f.get("direct_url") or "").strip()
if not direct_url:
continue
size_val: Any = f.get("size")
try:
size_val = int(size_val) if size_val not in (None, "") else ""
except Exception:
pass
row_item: Dict[str,
Any] = {
"table":
"internetarchive",
"title":
fmt or name,
"path":
direct_url,
"url":
direct_url,
"columns": [
("Format",
fmt),
("Name",
name),
("Size",
size_val),
("Source",
src),
],
"_selection_args": [direct_url],
"full_metadata": {
"identifier": identifier,
"name": name,
"format": fmt,
"source": src,
"size": f.get("size"),
},
}
rows.append(row_item)
table.add_result(row_item)
if not rows:
log(
"download-file: no downloadable files found for this item",
file=sys.stderr
)
return 1
try:
pipeline_context.set_last_result_table(table, rows, subject=item)
pipeline_context.set_current_stage_table(table)
except Exception:
pass
log(
"Internet Archive item detected: select a file with @N to download",
file=sys.stderr
)
return 0
@staticmethod
def _openlibrary_edition_id_from_url(u: str) -> str:
try:
p = urlparse(str(u))
parts = [x for x in (p.path or "").split("/") if x]
except Exception:
parts = []
# /books/OL35443598M/...
if len(parts) >= 2 and str(parts[0]).lower() == "books":
return str(parts[1]).strip()
return ""
@staticmethod
def _title_hint_from_url_slug(u: str) -> str:
try:
p = urlparse(str(u))
parts = [x for x in (p.path or "").split("/") if x]
slug = parts[-1] if parts else ""
except Exception:
slug = ""
slug = (slug or "").strip().replace("_", " ")
return slug or "OpenLibrary"
@staticmethod
def _path_from_download_result(result_obj: Any) -> Path:
file_path = None
if hasattr(result_obj, "path"):
file_path = getattr(result_obj, "path")
elif isinstance(result_obj, dict):
file_path = result_obj.get("path")
if not file_path:
file_path = str(result_obj)
return Path(str(file_path))
def _emit_local_file(
self,
*,
downloaded_path: Path,
source: Optional[str],
title_hint: Optional[str],
tags_hint: Optional[List[str]],
media_kind_hint: Optional[str],
full_metadata: Optional[Dict[str,
Any]],
progress: PipelineProgress,
config: Dict[str,
Any],
provider_hint: Optional[str] = None,
) -> None:
title_val = (title_hint or downloaded_path.stem
or "Unknown").strip() or downloaded_path.stem
hash_value = self._compute_file_hash(downloaded_path)
tag: List[str] = []
if tags_hint:
tag.extend([str(t) for t in tags_hint if t])
if not any(str(t).lower().startswith("title:") for t in tag):
tag.insert(0, f"title:{title_val}")
payload: Dict[str,
Any] = {
"path": str(downloaded_path),
"hash": hash_value,
"title": title_val,
"action": "cmdlet:download-file",
"download_mode": "file",
"store": "local",
"media_kind": media_kind_hint or "file",
"tag": tag,
}
if provider_hint:
payload["provider"] = str(provider_hint)
if full_metadata:
payload["full_metadata"] = full_metadata
if source and str(source).startswith("http"):
payload["url"] = source
elif source:
payload["source_url"] = source
pipeline_context.emit(payload)
# When running with a local progress UI (standalone cmdlet), ensure
# the pipe advances on emit.
progress.on_emit(payload)
# Automatically register url with local library
if payload.get("url"):
pipe_obj = coerce_to_pipe_object(payload)
register_url_with_local_library(pipe_obj, config)
def _process_explicit_urls(
self,
*,
raw_urls: Sequence[str],
final_output_dir: Path,
config: Dict[str,
Any],
quiet_mode: bool,
registry: Dict[str,
Any],
progress: PipelineProgress,
) -> tuple[int,
Optional[int]]:
downloaded_count = 0
SearchResult = registry.get("SearchResult")
get_provider = registry.get("get_provider")
match_provider_name_for_url = registry.get("match_provider_name_for_url")
for url in raw_urls:
try:
debug(f"Processing URL: {url}")
# Telegram message URLs are not direct files; route through the provider.
try:
parsed_url = urlparse(str(url))
host = (parsed_url.hostname or "").lower().strip()
except Exception:
host = ""
is_telegram = host in {"t.me",
"telegram.me"} or host.endswith(".t.me")
if is_telegram and SearchResult:
try:
from ProviderCore.registry import get_provider as _get_provider
except Exception:
_get_provider = None
if _get_provider is None:
raise DownloadError("Telegram provider registry not available")
provider = _get_provider("telegram", config)
if provider is None:
raise DownloadError(
"Telegram provider not configured or not available (check telethon/app_id/api_hash)"
)
sr = SearchResult(
table="telegram",
title=str(url),
path=str(url),
full_metadata={}
)
downloaded_path = None
telegram_info: Optional[Dict[str, Any]] = None
if hasattr(provider, "download_url"):
try:
downloaded_path, telegram_info = provider.download_url(str(url), final_output_dir) # type: ignore[attr-defined]
except Exception as exc:
raise DownloadError(str(exc))
else:
downloaded_path = provider.download(sr, final_output_dir)
if not downloaded_path:
raise DownloadError("Telegram download returned no file")
channel = ""
post = None
if isinstance(telegram_info, dict):
try:
chat_info_raw = telegram_info.get("chat")
msg_info_raw = telegram_info.get("message")
chat_info: Dict[str,
Any] = (
chat_info_raw
if isinstance(chat_info_raw,
dict) else {}
)
msg_info: Dict[str,
Any] = (
msg_info_raw
if isinstance(msg_info_raw,
dict) else {}
)
channel = str(
chat_info.get("title") or chat_info.get("username")
or ""
).strip()
post = msg_info.get("id")
except Exception:
channel = ""
post = None
title_hint = None
tg_tags: List[str] = []
if channel:
tg_tags.append(f"channel:{channel}")
if post is not None:
tg_tags.append(f"post:{post}")
if channel and post is not None:
title_hint = f"{channel} {post}"
elif post is not None:
title_hint = f"post:{post}"
else:
title_hint = downloaded_path.stem
self._emit_local_file(
downloaded_path=downloaded_path,
source=str(url),
title_hint=title_hint,
tags_hint=tg_tags,
media_kind_hint="file",
full_metadata=telegram_info,
provider_hint="telegram",
progress=progress,
config=config,
)
downloaded_count += 1
debug("✓ Downloaded via Telegram provider and emitted")
continue
# Provider URL routing (e.g. OpenLibrary book pages).
provider_name = None
if match_provider_name_for_url is not None:
try:
provider_name = match_provider_name_for_url(str(url))
except Exception:
provider_name = None
# Heuristic: LibGen often uses landing pages like edition.php/file.php.
# These should never be treated as direct file URLs.
if not provider_name:
try:
p = urlparse(str(url))
h = (p.hostname or "").strip().lower()
path = (p.path or "").strip().lower()
if "libgen" in h and any(x in path for x in (
"/edition.php",
"/file.php",
"/ads.php",
"/get.php",
"/series.php", )):
provider_name = "libgen"
except Exception:
pass
if provider_name and get_provider is not None and SearchResult is not None:
# OpenLibrary URLs should be handled by the OpenLibrary provider.
if provider_name == "openlibrary":
provider = get_provider("openlibrary", config)
if provider is None:
raise DownloadError(
"OpenLibrary provider not configured or not available"
)
edition_id = self._openlibrary_edition_id_from_url(str(url))
title_hint = self._title_hint_from_url_slug(str(url))
sr = SearchResult(
table="openlibrary",
title=title_hint,
path=str(url),
media_kind="book",
full_metadata={
"openlibrary_id": edition_id,
},
)
downloaded_path = None
try:
ui, _pipe_idx = progress.ui_and_pipe_index()
progress_cb = None
if ui is not None:
# High-level steps for OpenLibrary borrow/download flow.
progress.begin_steps(5)
def _progress(
kind: str,
done: int,
total: Optional[int],
label: str
) -> None:
# kind:
# - "step": advance step text
# - "pages": update pipe percent/status
# - "bytes": update transfer bar
if kind == "step":
progress.step(label)
return
if kind == "pages":
t = int(total) if isinstance(total, int) else 0
d = int(done) if isinstance(done, int) else 0
if t > 0:
pct = int(
round(
(max(0,
min(d,
t)) / max(1,
t)) * 100.0
)
)
progress.set_percent(pct)
progress.set_status(
f"downloading pages {d}/{t}"
)
else:
progress.set_status(
f"downloading pages {d}"
)
return
if kind == "bytes":
try:
lbl = str(label or "download")
except Exception:
lbl = "download"
progress.begin_transfer(label=lbl, total=total)
progress.update_transfer(
label=lbl,
completed=done,
total=total
)
try:
if (isinstance(total,
int) and total > 0
and int(done) >= int(total)):
progress.finish_transfer(label=lbl)
except Exception:
pass
return
progress_cb = _progress
downloaded_path = provider.download(
sr,
final_output_dir,
progress_callback=progress_cb
) # type: ignore[call-arg]
except Exception as exc:
raise DownloadError(str(exc))
# Clear long-running status line after the download attempt.
progress.clear_status()
if downloaded_path:
tags_hint: Optional[List[str]] = None
try:
sr_tags = getattr(sr, "tag", None)
if isinstance(sr_tags, set) and sr_tags:
tags_hint = sorted([str(t) for t in sr_tags if t])
except Exception:
tags_hint = None
self._emit_local_file(
downloaded_path=Path(downloaded_path),
source=str(url),
title_hint=title_hint,
tags_hint=tags_hint,
media_kind_hint="book",
full_metadata=sr.full_metadata,
provider_hint="openlibrary",
progress=progress,
config=config,
)
downloaded_count += 1
continue
# If OpenLibrary can't provide it (not lendable, no creds, etc), auto-search LibGen.
try:
fallback_query = str(title_hint or "").strip()
if fallback_query:
log(
f"[download-file] Not available on OpenLibrary; searching LibGen for: {fallback_query}",
file=sys.stderr,
)
from cmdlet.search_provider import CMDLET as _SEARCH_PROVIDER_CMDLET
exec_fn = getattr(_SEARCH_PROVIDER_CMDLET, "exec", None)
if callable(exec_fn):
ret = exec_fn(
None,
[
"-provider",
"libgen",
"-query",
fallback_query
],
config,
)
try:
table = pipeline_context.get_last_result_table()
items = pipeline_context.get_last_result_items()
if table is not None:
pipeline_context.set_last_result_table_overlay(
table,
items
)
except Exception:
pass
try:
return downloaded_count, int(ret) # type: ignore[arg-type]
except Exception:
return downloaded_count, 1
except Exception:
pass
log(
"[download-file] OpenLibrary URL could not be downloaded",
file=sys.stderr,
)
continue
# Generic provider URL handler (if a provider implements `download_url`).
provider = get_provider(provider_name, config)
if provider is not None and hasattr(provider, "download_url"):
try:
downloaded_path = provider.download_url(
str(url),
final_output_dir
) # type: ignore[attr-defined]
except Exception as exc:
raise DownloadError(str(exc))
if downloaded_path:
self._emit_local_file(
downloaded_path=Path(downloaded_path),
source=str(url),
title_hint=Path(str(downloaded_path)).stem,
tags_hint=None,
media_kind_hint="file",
full_metadata=None,
provider_hint=str(provider_name),
progress=progress,
config=config,
)
downloaded_count += 1
continue
# Otherwise, try provider.download(SearchResult) with the URL as the target.
if provider is not None:
sr_obj = None
try:
sr_obj = SearchResult(
table=str(provider_name),
title=str(url),
path=str(url),
full_metadata={},
)
downloaded_path = provider.download(
sr_obj,
final_output_dir
) # type: ignore[call-arg]
except Exception:
downloaded_path = None
# Refuse to fall back to direct-download for LibGen landing pages.
# This prevents saving HTML (e.g. edition.php) as a bogus file.
if (not downloaded_path
) and str(provider_name).lower() == "libgen":
raise DownloadError(
"LibGen URL did not resolve to a downloadable file"
)
if downloaded_path:
emit_tags: Optional[List[str]] = None
full_md: Optional[Dict[str, Any]] = None
title_hint = Path(str(downloaded_path)).stem
media_kind_hint = "file"
if str(provider_name
).lower() == "libgen" and sr_obj is not None:
media_kind_hint = "book"
try:
sr_tags = getattr(sr_obj, "tag", None)
if isinstance(sr_tags, set) and sr_tags:
emit_tags = sorted(
[str(t) for t in sr_tags if t]
)
except Exception:
emit_tags = None
try:
sr_full_md = getattr(sr_obj, "full_metadata", None)
if isinstance(sr_full_md, dict):
full_md = sr_full_md
t = str(sr_full_md.get("title") or "").strip()
if t:
title_hint = t
except Exception:
full_md = None
self._emit_local_file(
downloaded_path=Path(downloaded_path),
source=str(url),
title_hint=title_hint,
tags_hint=emit_tags,
media_kind_hint=media_kind_hint,
full_metadata=full_md,
provider_hint=str(provider_name),
progress=progress,
config=config,
)
downloaded_count += 1
continue
result_obj = _download_direct_file(
str(url),
final_output_dir,
quiet=quiet_mode,
pipeline_progress=progress,
)
downloaded_path = self._path_from_download_result(result_obj)
self._emit_local_file(
downloaded_path=downloaded_path,
source=str(url),
title_hint=downloaded_path.stem,
tags_hint=[f"title:{downloaded_path.stem}"],
media_kind_hint="file",
full_metadata=None,
progress=progress,
config=config,
)
downloaded_count += 1
debug("✓ Downloaded and emitted")
except DownloadError as e:
log(f"Download failed for {url}: {e}", file=sys.stderr)
except Exception as e:
log(f"Error processing {url}: {e}", file=sys.stderr)
return downloaded_count, None
def _expand_provider_items(
self,
*,
piped_items: Sequence[Any],
registry: Dict[str,
Any],
config: Dict[str,
Any],
) -> List[Any]:
get_search_provider = registry.get("get_search_provider")
expanded_items: List[Any] = []
for item in piped_items:
try:
table = get_field(item, "table")
media_kind = get_field(item, "media_kind")
full_metadata = get_field(item, "full_metadata")
target = get_field(item, "path") or get_field(item, "url")
if (str(table or "").lower() == "alldebrid"
and str(media_kind or "").lower() == "folder"):
magnet_id = None
if isinstance(full_metadata, dict):
magnet_id = full_metadata.get("magnet_id")
if (magnet_id is None and isinstance(target,
str)
and target.lower().startswith("alldebrid:magnet:")):
try:
magnet_id = int(target.split(":")[-1])
except Exception:
magnet_id = None
if magnet_id is not None and get_search_provider is not None:
provider = get_search_provider("alldebrid", config)
if provider is not None:
try:
files = provider.search(
"*",
limit=10_000,
filters={
"view": "files",
"magnet_id": int(magnet_id)
},
)
except Exception:
files = []
# If the magnet isn't ready, provider.search returns a single not-ready folder row.
if (files and len(files) == 1 and getattr(files[0],
"media_kind",
"") == "folder"):
detail = getattr(files[0], "detail", "")
log(
f"[download-file] AllDebrid magnet {magnet_id} not ready ({detail or 'unknown'})",
file=sys.stderr,
)
else:
for sr in files:
expanded_items.append(
sr.to_dict() if hasattr(sr,
"to_dict") else sr
)
continue
expanded_items.append(item)
except Exception:
expanded_items.append(item)
return expanded_items
def _process_provider_items(
self,
*,
piped_items: Sequence[Any],
final_output_dir: Path,
config: Dict[str,
Any],
quiet_mode: bool,
registry: Dict[str,
Any],
progress: PipelineProgress,
) -> int:
downloaded_count = 0
get_search_provider = registry.get("get_search_provider")
SearchResult = registry.get("SearchResult")
expanded_items = self._expand_provider_items(
piped_items=piped_items,
registry=registry,
config=config
)
for item in expanded_items:
try:
table = get_field(item, "table")
title = get_field(item, "title")
target = get_field(item, "path") or get_field(item, "url")
media_kind = get_field(item, "media_kind")
tags_val = get_field(item, "tag")
tags_list: Optional[List[str]]
if isinstance(tags_val, list):
tags_list = [str(t) for t in tags_val if t]
else:
tags_list = None
full_metadata = get_field(item, "full_metadata")
if ((not full_metadata) and isinstance(item,
dict)
and isinstance(item.get("extra"),
dict)):
extra_md = item["extra"].get("full_metadata")
if isinstance(extra_md, dict):
full_metadata = extra_md
# If this looks like a provider item and providers are available, prefer provider.download()
downloaded_path: Optional[Path] = None
attempted_provider_download = False
provider_sr = None
if table and get_search_provider and SearchResult:
provider = get_search_provider(str(table), config)
if provider is not None:
attempted_provider_download = True
sr = SearchResult(
table=str(table),
title=str(title or "Unknown"),
path=str(target or ""),
full_metadata=full_metadata
if isinstance(full_metadata,
dict) else {},
)
debug(
f"[download-file] Downloading provider item via {table}: {sr.title}"
)
# Preserve provider structure when possible (AllDebrid folders -> subfolders).
output_dir = final_output_dir
try:
if str(table).strip().lower() == "alldebrid":
from ProviderCore.download import sanitize_filename as _sf
md = full_metadata if isinstance(full_metadata,
dict) else {}
magnet_name = None
if isinstance(md, dict):
magnet_name = md.get("magnet_name"
) or md.get("folder")
if not magnet_name:
magnet_name = (
str(get_field(item,
"detail") or "").strip() or None
)
magnet_dir_name = _sf(
str(magnet_name)
) if magnet_name else ""
# If user already chose -path that ends with the magnet folder name,
# don't create a duplicate nested folder.
try:
base_tail = str(Path(output_dir).name or "")
except Exception:
base_tail = ""
base_tail_norm = _sf(base_tail).lower(
) if base_tail.strip() else ""
magnet_dir_norm = magnet_dir_name.lower(
) if magnet_dir_name else ""
if magnet_dir_name and (not base_tail_norm
or base_tail_norm
!= magnet_dir_norm):
output_dir = Path(output_dir) / magnet_dir_name
relpath = None
if isinstance(md, dict):
relpath = md.get("relpath")
if not relpath and isinstance(md.get("file"), dict):
relpath = md["file"].get("_relpath")
if relpath:
parts = [
p for p in str(relpath).replace("\\", "/"
).split("/")
if p and p not in {".", ".."}
]
# If the provider relpath already includes the magnet folder name as a
# root directory (common), strip it to prevent double nesting.
if magnet_dir_name and parts:
try:
if _sf(parts[0]).lower() == magnet_dir_norm:
parts = parts[1:]
except Exception:
pass
# relpath includes the filename; only join parent directories.
for part in parts[:-1]:
output_dir = Path(output_dir) / _sf(part)
try:
Path(output_dir).mkdir(parents=True, exist_ok=True)
except Exception:
output_dir = final_output_dir
except Exception:
output_dir = final_output_dir
downloaded_path = provider.download(sr, output_dir)
provider_sr = sr
# OpenLibrary: if provider download failed, do NOT try to download the OpenLibrary page HTML.
if (downloaded_path is None and attempted_provider_download
and str(table or "").lower() == "openlibrary"):
availability = None
reason = None
if isinstance(full_metadata, dict):
availability = full_metadata.get("availability")
reason = full_metadata.get("availability_reason")
msg = "[download-file] OpenLibrary item not downloadable"
if availability or reason:
msg += f" (availability={availability or ''} reason={reason or ''})"
log(msg, file=sys.stderr)
# Fallback: run a LibGen title search so the user can pick an alternative source.
try:
title_text = str(title or "").strip()
if not title_text and isinstance(full_metadata, dict):
title_text = str(full_metadata.get("title") or "").strip()
if title_text:
log(
f"[download-file] Not available on OpenLibrary; searching LibGen for: {title_text}",
file=sys.stderr,
)
from cmdlet.search_provider import CMDLET as _SEARCH_PROVIDER_CMDLET
fallback_query = title_text
exec_fn = getattr(_SEARCH_PROVIDER_CMDLET, "exec", None)
if not callable(exec_fn):
log(
"[download-file] search-provider cmdlet unavailable; cannot run LibGen fallback search",
file=sys.stderr,
)
continue
ret = exec_fn(
None,
["-provider",
"libgen",
"-query",
fallback_query],
config,
)
# Promote the search-provider table to a display overlay so it renders.
try:
table_obj = pipeline_context.get_last_result_table()
items_obj = pipeline_context.get_last_result_items()
if table_obj is not None:
pipeline_context.set_last_result_table_overlay(
table_obj,
items_obj
)
except Exception:
pass
try:
return int(ret) # type: ignore[arg-type]
except Exception:
return 1
except Exception:
pass
continue
# Fallback: if we have a direct HTTP URL, download it directly
if (downloaded_path is None and isinstance(target,
str)
and target.startswith("http")):
# Guard: provider landing pages (e.g. LibGen ads.php) are HTML, not files.
# Never download these as "files".
if str(table or "").lower() == "libgen":
low = target.lower()
if ("/ads.php" in low) or ("/file.php" in low) or ("/index.php"
in low):
log(
"[download-file] Refusing to download LibGen landing page (expected provider to resolve file link)",
file=sys.stderr,
)
continue
debug(
f"[download-file] Provider item looks like direct URL, downloading: {target}"
)
suggested_name = str(title).strip() if title is not None else None
result_obj = _download_direct_file(
target,
final_output_dir,
quiet=quiet_mode,
suggested_filename=suggested_name,
pipeline_progress=progress,
)
downloaded_path = self._path_from_download_result(result_obj)
if downloaded_path is None:
log(
f"Cannot download item (no provider handler / unsupported target): {title or target}",
file=sys.stderr,
)
continue
# Allow providers to add/enrich tags and metadata during download.
if str(table or "").lower() == "libgen" and provider_sr is not None:
try:
sr_tags = getattr(provider_sr, "tag", None)
if tags_list is None and isinstance(sr_tags, set) and sr_tags:
tags_list = sorted([str(t) for t in sr_tags if t])
except Exception:
pass
try:
sr_md = getattr(provider_sr, "full_metadata", None)
if isinstance(sr_md, dict) and sr_md:
full_metadata = sr_md
except Exception:
pass
try:
if isinstance(full_metadata, dict):
t = str(full_metadata.get("title") or "").strip()
if t:
title = t
except Exception:
pass
self._emit_local_file(
downloaded_path=downloaded_path,
source=str(target) if target else None,
title_hint=str(title) if title else downloaded_path.stem,
tags_hint=tags_list,
media_kind_hint=str(media_kind) if media_kind else None,
full_metadata=full_metadata if isinstance(full_metadata,
dict) else None,
progress=progress,
config=config,
)
downloaded_count += 1
except DownloadError as e:
log(f"Download failed: {e}", file=sys.stderr)
except Exception as e:
log(f"Error downloading item: {e}", file=sys.stderr)
return downloaded_count
def _run_impl(
self,
result: Any,
args: Sequence[str],
config: Dict[str,
Any]
) -> int:
"""Main download implementation for direct HTTP files."""
progress = PipelineProgress(pipeline_context)
prev_progress = None
had_progress_key = False
try:
debug("Starting download-file")
# Allow providers to tap into the active PipelineProgress (optional).
try:
if isinstance(config, dict):
had_progress_key = "_pipeline_progress" in config
prev_progress = config.get("_pipeline_progress")
config["_pipeline_progress"] = progress
except Exception:
pass
# Parse arguments
parsed = parse_cmdlet_args(args, self)
raw_url = self._normalize_urls(parsed)
piped_items = self._collect_piped_items_if_no_urls(result, raw_url)
had_piped_input = False
try:
if isinstance(result, list):
had_piped_input = bool(result)
else:
had_piped_input = bool(result)
except Exception:
had_piped_input = False
# UX: In piped mode, allow a single positional arg to be the destination directory.
# Example: @1-4 | download-file "C:\\Users\\Me\\Downloads\\yoyo"
if (had_piped_input and raw_url and len(raw_url) == 1
and (not parsed.get("path")) and (not parsed.get("output"))):
candidate = str(raw_url[0] or "").strip()
low = candidate.lower()
looks_like_url = low.startswith(("http://", "https://", "ftp://"))
looks_like_provider = low.startswith(
("magnet:",
"alldebrid:",
"hydrus:",
"ia:",
"internetarchive:")
)
looks_like_windows_path = (
(len(candidate) >= 2 and candidate[1] == ":")
or candidate.startswith("\\\\") or candidate.startswith("\\")
or candidate.endswith(("\\",
"/"))
)
if (not looks_like_url) and (
not looks_like_provider) and looks_like_windows_path:
parsed["path"] = candidate
raw_url = []
piped_items = self._collect_piped_items_if_no_urls(result, raw_url)
if not raw_url and not piped_items:
log("No url or piped items to download", file=sys.stderr)
return 1
quiet_mode = (
bool(config.get("_quiet_background_output"))
if isinstance(config,
dict) else False
)
ia_picker_exit = self._maybe_show_internetarchive_formats(
raw_urls=raw_url,
piped_items=piped_items,
parsed=parsed,
config=config,
quiet_mode=quiet_mode,
)
if ia_picker_exit is not None:
return int(ia_picker_exit)
# Get output directory
final_output_dir = self._resolve_output_dir(parsed, config)
if not final_output_dir:
return 1
debug(f"Output directory: {final_output_dir}")
# If the caller isn't running the shared pipeline Live progress UI (e.g. direct
# cmdlet execution), start a minimal local pipeline progress panel so downloads
# show consistent, Rich-formatted progress (like download-media).
total_items = self._safe_total_items(raw_url, piped_items)
preview = self._build_preview(raw_url, piped_items, total_items)
progress.ensure_local_ui(
label="download-file",
total_items=total_items,
items_preview=preview
)
registry = self._load_provider_registry()
downloaded_count = 0
urls_downloaded, early_exit = self._process_explicit_urls(
raw_urls=raw_url,
final_output_dir=final_output_dir,
config=config,
quiet_mode=quiet_mode,
registry=registry,
progress=progress,
)
downloaded_count += int(urls_downloaded)
if early_exit is not None:
return int(early_exit)
downloaded_count += self._process_provider_items(
piped_items=piped_items,
final_output_dir=final_output_dir,
config=config,
quiet_mode=quiet_mode,
registry=registry,
progress=progress,
)
if downloaded_count > 0:
debug(f"✓ Successfully processed {downloaded_count} file(s)")
return 0
log("No downloads completed", file=sys.stderr)
return 1
except Exception as e:
log(f"Error in download-file: {e}", file=sys.stderr)
return 1
finally:
try:
if isinstance(config, dict):
if had_progress_key:
config["_pipeline_progress"] = prev_progress
else:
config.pop("_pipeline_progress", None)
except Exception:
pass
progress.close_local_ui(force_complete=True)
def _resolve_output_dir(self,
parsed: Dict[str,
Any],
config: Dict[str,
Any]) -> Optional[Path]:
"""Resolve the output directory from storage location or config."""
output_dir_arg = parsed.get("path") or parsed.get("output")
if output_dir_arg:
try:
out_path = Path(str(output_dir_arg)).expanduser()
out_path.mkdir(parents=True, exist_ok=True)
return out_path
except Exception as e:
log(
f"Cannot use output directory {output_dir_arg}: {e}",
file=sys.stderr
)
return None
storage_location = parsed.get("storage")
# Priority 1: --storage flag
if storage_location:
try:
return SharedArgs.resolve_storage(storage_location)
except Exception as e:
log(f"Invalid storage location: {e}", file=sys.stderr)
return None
# Priority 2: Config default output/temp directory
try:
from SYS.config import resolve_output_dir
final_output_dir = resolve_output_dir(config)
except Exception:
final_output_dir = Path.home() / "Downloads"
debug(f"Using default directory: {final_output_dir}")
# Ensure directory exists
try:
final_output_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
log(
f"Cannot create output directory {final_output_dir}: {e}",
file=sys.stderr
)
return None
return final_output_dir
def _compute_file_hash(self, filepath: Path) -> str:
"""Compute SHA256 hash of a file."""
import hashlib
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
# Module-level singleton registration
CMDLET = Download_File()