3102 lines
122 KiB
Python
3102 lines
122 KiB
Python
"""Generic file/stream downloader.
|
|
|
|
Supports:
|
|
- Direct HTTP file URLs (PDFs, images, documents; non-yt-dlp)
|
|
- Piped plugin items (uses plugin.download when available)
|
|
- Streaming sites via yt-dlp (YouTube, Bandcamp, etc.)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Mapping, Sequence as SequenceABC
|
|
import sys
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Sequence
|
|
from urllib.parse import urlparse
|
|
from contextlib import AbstractContextManager, nullcontext
|
|
import shutil
|
|
import webbrowser
|
|
|
|
|
|
from API.HTTP import download_direct_file
|
|
from SYS.models import DownloadError, DownloadOptions, DownloadMediaResult
|
|
from SYS.logger import log, debug_panel, is_debug_enabled
|
|
from SYS.payload_builders import build_file_result_payload, build_table_result_payload
|
|
from SYS.pipeline_progress import PipelineProgress
|
|
from SYS.result_table import Table, build_display_row
|
|
from SYS.rich_display import stderr_console as get_stderr_console
|
|
from SYS import pipeline as pipeline_context
|
|
from SYS.item_accessors import get_result_title
|
|
from rich.prompt import Prompt
|
|
# SYS.metadata import deferred: normalize_urls loaded lazily at call site to avoid
|
|
# pulling in Cryptodome (~900ms) at module import time.
|
|
from SYS.selection_builder import (
|
|
build_hash_store_selection,
|
|
extract_selection_fields,
|
|
extract_urls_from_selection_args,
|
|
selection_args_have_url,
|
|
)
|
|
from SYS.utils import sha256_file
|
|
|
|
try:
|
|
from plugins.ytdlp import YtDlpTool # type: ignore
|
|
except Exception: # pragma: no cover - optional dependency for tests/runtime wrappers
|
|
YtDlpTool = None # type: ignore
|
|
|
|
from .. import _shared as sh
|
|
|
|
Cmdlet = sh.Cmdlet
|
|
CmdletArg = sh.CmdletArg
|
|
SharedArgs = sh.SharedArgs
|
|
QueryArg = sh.QueryArg
|
|
parse_cmdlet_args = sh.parse_cmdlet_args
|
|
register_url_with_local_library = sh.register_url_with_local_library
|
|
coerce_to_pipe_object = sh.coerce_to_pipe_object
|
|
get_field = sh.get_field
|
|
resolve_target_dir = sh.resolve_target_dir
|
|
coerce_to_path = sh.coerce_to_path
|
|
build_pipeline_preview = sh.build_pipeline_preview
|
|
|
|
class Download_File(Cmdlet):
|
|
"""Class-based download-file cmdlet - direct HTTP downloads."""
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize download-file cmdlet."""
|
|
super().__init__(
|
|
name="download-file",
|
|
summary="Download files or streaming media",
|
|
usage=
|
|
"download-file <url|path> [-plugin NAME] [-instance NAME] [-path DIR] [options] OR @N | download-file [-plugin NAME] [-instance NAME] [-path DIR] [options] OR download-file -query \"hash:<sha256>\" -instance <store> [-browser]",
|
|
alias=["dl-file",
|
|
"download-http"],
|
|
arg=[
|
|
SharedArgs.URL,
|
|
SharedArgs.PLUGIN,
|
|
SharedArgs.INSTANCE,
|
|
SharedArgs.PATH,
|
|
SharedArgs.QUERY,
|
|
CmdletArg(
|
|
name="name",
|
|
type="string",
|
|
description="Output filename override for store exports.",
|
|
),
|
|
CmdletArg(
|
|
name="browser",
|
|
type="flag",
|
|
description="Open a backend-provided browser URL instead of exporting to disk when available.",
|
|
),
|
|
QueryArg(
|
|
"clip",
|
|
key="clip",
|
|
aliases=["range",
|
|
"section",
|
|
"sections"],
|
|
type="string",
|
|
required=False,
|
|
description=(
|
|
"Clip time ranges via -query keyed fields (e.g. clip:1m-2m or clip:00:01-00:10). "
|
|
"Comma-separated values supported."
|
|
),
|
|
query_only=True,
|
|
),
|
|
CmdletArg(
|
|
name="item",
|
|
type="string",
|
|
description="Item selection for playlists/formats",
|
|
),
|
|
],
|
|
detail=[
|
|
"Download files directly via HTTP or streaming media via yt-dlp.",
|
|
"Also exports store-backed files via hash+store selection or -query \"hash:<sha256>\" -instance <store>.",
|
|
"Use -plugin with -instance to target a named provider config when a plugin exposes multiple instances.",
|
|
"For Internet Archive item pages (archive.org/details/...), shows a selectable file/format list; pick with @N to download.",
|
|
],
|
|
exec=self.run,
|
|
)
|
|
self.register()
|
|
|
|
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
|
|
"""Main execution method."""
|
|
try:
|
|
debug_panel(
|
|
"download-file",
|
|
[
|
|
("args", list(args)),
|
|
("has_piped_input", bool(result)),
|
|
],
|
|
border_style="cyan",
|
|
)
|
|
except Exception:
|
|
pass
|
|
return self._run_impl(result, args, config)
|
|
|
|
@staticmethod
|
|
def _path_from_download_result(result_obj: Any) -> Path:
|
|
"""Normalize downloader return values to a concrete filesystem path."""
|
|
resolved = coerce_to_path(result_obj)
|
|
if resolved is None:
|
|
raise DownloadError("Could not determine downloaded file path")
|
|
return resolved
|
|
|
|
@staticmethod
|
|
def _selection_run_label(
|
|
run_args: Sequence[str],
|
|
*,
|
|
extra_url_prefixes: Sequence[str] = (),
|
|
) -> str:
|
|
try:
|
|
urls = extract_urls_from_selection_args(
|
|
run_args,
|
|
extra_url_prefixes=extra_url_prefixes,
|
|
)
|
|
if urls:
|
|
return str(urls[0])
|
|
except Exception:
|
|
pass
|
|
|
|
for arg in run_args:
|
|
text = str(arg or "").strip()
|
|
if text and not text.startswith("-"):
|
|
return text
|
|
return "item"
|
|
|
|
@staticmethod
|
|
def _batch_progress_state(config: Optional[Dict[str, Any]]) -> tuple[bool, int, int, str]:
|
|
if not isinstance(config, dict):
|
|
return False, 0, 0, ""
|
|
|
|
suppress_nested = bool(config.get("_download_file_suppress_nested_pipe_progress"))
|
|
if not suppress_nested:
|
|
return False, 0, 0, ""
|
|
|
|
try:
|
|
total = max(0, int(config.get("_download_file_batch_total") or 0))
|
|
except Exception:
|
|
total = 0
|
|
try:
|
|
index = max(0, int(config.get("_download_file_batch_index") or 0))
|
|
except Exception:
|
|
index = 0
|
|
try:
|
|
label = str(config.get("_download_file_batch_label") or "").strip()
|
|
except Exception:
|
|
label = ""
|
|
|
|
return True, total, index, label
|
|
|
|
@staticmethod
|
|
def _selection_url_prefixes(registry: Dict[str, Any]) -> List[str]:
|
|
loader = registry.get("list_selection_url_prefixes")
|
|
if not callable(loader):
|
|
return []
|
|
try:
|
|
values = loader() or []
|
|
except Exception:
|
|
return []
|
|
return [str(value).strip().lower() for value in values if str(value or "").strip()]
|
|
|
|
def _emit_plugin_items(
|
|
self,
|
|
*,
|
|
items: Sequence[Any],
|
|
config: Dict[str, Any],
|
|
) -> int:
|
|
emitted = 0
|
|
for item in items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
pipeline_context.emit(item)
|
|
if item.get("url"):
|
|
try:
|
|
pipe_obj = coerce_to_pipe_object(item)
|
|
register_url_with_local_library(pipe_obj, config)
|
|
except Exception:
|
|
pass
|
|
emitted += 1
|
|
return emitted
|
|
|
|
def _consume_plugin_download_result(
|
|
self,
|
|
*,
|
|
result: Any,
|
|
config: Dict[str, Any],
|
|
) -> tuple[int, Optional[int], bool]:
|
|
if result is None:
|
|
return 0, None, False
|
|
|
|
if isinstance(result, list):
|
|
if result and all(isinstance(item, dict) for item in result):
|
|
return self._emit_plugin_items(items=result, config=config), 0, True
|
|
return 0, None, False
|
|
|
|
if not isinstance(result, dict):
|
|
return 0, None, False
|
|
|
|
action = str(
|
|
result.get("action")
|
|
or result.get("plugin_action")
|
|
or ""
|
|
).strip().lower()
|
|
|
|
if action in {"emit_items", "emit_pipe_objects"}:
|
|
items = result.get("items") or []
|
|
exit_code = result.get("exit_code")
|
|
emitted = self._emit_plugin_items(
|
|
items=items if isinstance(items, list) else [],
|
|
config=config,
|
|
)
|
|
try:
|
|
normalized_exit = int(exit_code) if exit_code is not None else 0
|
|
except Exception:
|
|
normalized_exit = 0
|
|
return emitted, normalized_exit, True
|
|
|
|
if action == "handled":
|
|
exit_code = result.get("exit_code")
|
|
try:
|
|
normalized_exit = int(exit_code) if exit_code is not None else 0
|
|
except Exception:
|
|
normalized_exit = 0
|
|
try:
|
|
downloaded = int(result.get("downloaded") or 0)
|
|
except Exception:
|
|
downloaded = 0
|
|
return downloaded, normalized_exit, True
|
|
|
|
return 0, None, False
|
|
|
|
def _process_explicit_urls(
|
|
self,
|
|
*,
|
|
raw_urls: Sequence[str],
|
|
final_output_dir: Path,
|
|
config: Dict[str,
|
|
Any],
|
|
quiet_mode: bool,
|
|
registry: Dict[str,
|
|
Any],
|
|
progress: PipelineProgress,
|
|
parsed: Dict[str, Any],
|
|
args: Sequence[str],
|
|
context_items: Sequence[Any] = (),
|
|
) -> tuple[int,
|
|
Optional[int]]:
|
|
downloaded_count = 0
|
|
skipped_duplicate_only = 0
|
|
attempted_download = False
|
|
suppress_nested, batch_total, batch_index, batch_label = self._batch_progress_state(config)
|
|
total_urls = len(raw_urls or [])
|
|
|
|
try:
|
|
if total_urls > 1 and not suppress_nested:
|
|
progress.begin_pipe(total_items=total_urls, items_preview=list(raw_urls[:5]))
|
|
except Exception:
|
|
pass
|
|
|
|
SearchResult = registry.get("SearchResult")
|
|
get_plugin = registry.get("get_plugin")
|
|
match_plugin_name_for_url = registry.get("match_plugin_name_for_url")
|
|
|
|
for idx, url in enumerate(raw_urls, 1):
|
|
try:
|
|
try:
|
|
display_total = batch_total if batch_total > 0 else total_urls
|
|
display_index = batch_index if batch_total > 0 else idx
|
|
display_label = batch_label or str(url)
|
|
if display_total > 0:
|
|
progress.set_status(
|
|
f"downloading {display_index}/{display_total}: {display_label}"
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Check providers first
|
|
provider_name = None
|
|
if match_plugin_name_for_url:
|
|
try:
|
|
provider_name = match_plugin_name_for_url(str(url))
|
|
except Exception:
|
|
pass
|
|
|
|
provider = None
|
|
if provider_name and get_plugin:
|
|
provider = get_plugin(provider_name, config)
|
|
|
|
if provider:
|
|
try:
|
|
# Try generic handle_url
|
|
handled = False
|
|
if hasattr(provider, "handle_url"):
|
|
try:
|
|
handled, path = provider.handle_url(str(url), output_dir=final_output_dir)
|
|
if handled:
|
|
extra_meta = None
|
|
title_hint = None
|
|
tags_hint: Optional[List[str]] = None
|
|
media_kind_hint = None
|
|
path_value: Optional[Any] = path
|
|
|
|
if isinstance(path, dict):
|
|
plugin_action = str(
|
|
path.get("action")
|
|
or path.get("plugin_action")
|
|
or ""
|
|
).strip().lower()
|
|
if plugin_action == "download_items" or bool(path.get("download_items")):
|
|
request_metadata = path.get("metadata") or path.get("full_metadata") or {}
|
|
if not isinstance(request_metadata, dict):
|
|
request_metadata = {}
|
|
magnet_id = path.get("magnet_id") or request_metadata.get("magnet_id")
|
|
if magnet_id is not None:
|
|
request_metadata.setdefault("magnet_id", magnet_id)
|
|
|
|
if SearchResult is None:
|
|
continue
|
|
|
|
sr = SearchResult(
|
|
table=str(provider_name),
|
|
title=str(path.get("title") or path.get("name") or f"{provider_name} item"),
|
|
path=str(path.get("path") or path.get("url") or url),
|
|
full_metadata=request_metadata,
|
|
)
|
|
downloaded_extra = self._download_provider_items(
|
|
provider=provider,
|
|
provider_name=str(provider_name),
|
|
search_result=sr,
|
|
output_dir=final_output_dir,
|
|
progress=progress,
|
|
quiet_mode=quiet_mode,
|
|
config=config,
|
|
)
|
|
if downloaded_extra:
|
|
downloaded_count += int(downloaded_extra)
|
|
continue
|
|
|
|
plugin_downloaded, plugin_exit, plugin_handled = self._consume_plugin_download_result(
|
|
result=path,
|
|
config=config,
|
|
)
|
|
if plugin_handled:
|
|
downloaded_count += plugin_downloaded
|
|
if plugin_exit is not None and plugin_downloaded == 0:
|
|
return downloaded_count, int(plugin_exit)
|
|
if plugin_downloaded:
|
|
continue
|
|
|
|
path_value = path.get("path") or path.get("file_path")
|
|
extra_meta = path.get("metadata") or path.get("full_metadata")
|
|
title_hint = path.get("title") or path.get("name")
|
|
media_kind_hint = path.get("media_kind")
|
|
tags_val = path.get("tags") or path.get("tag")
|
|
if isinstance(tags_val, (list, tuple, set)):
|
|
tags_hint = [str(t) for t in tags_val if t]
|
|
elif isinstance(tags_val, str) and tags_val.strip():
|
|
tags_hint = [str(tags_val).strip()]
|
|
|
|
if path_value:
|
|
p_val = Path(str(path_value))
|
|
if not title_hint and isinstance(extra_meta, dict):
|
|
title_hint = extra_meta.get("title") or extra_meta.get("name")
|
|
|
|
self._emit_local_file(
|
|
downloaded_path=p_val,
|
|
source=str(url),
|
|
title_hint=str(title_hint) if title_hint else p_val.stem,
|
|
tags_hint=tags_hint,
|
|
media_kind_hint=str(media_kind_hint) if media_kind_hint else "file",
|
|
full_metadata=extra_meta,
|
|
progress=progress,
|
|
config=config,
|
|
provider_hint=provider_name
|
|
)
|
|
downloaded_count += 1
|
|
continue
|
|
except Exception as e:
|
|
debug_panel(
|
|
"download-file provider error",
|
|
[
|
|
("plugin", provider_name),
|
|
("url", url),
|
|
("operation", "handle_url"),
|
|
("error", e),
|
|
],
|
|
border_style="yellow",
|
|
)
|
|
|
|
# Try generic download_url if not already handled
|
|
if not handled and hasattr(provider, "download_url"):
|
|
parsed_for_provider = parsed
|
|
provider_preflight_items = self._resolve_provider_preflight_items(
|
|
provider,
|
|
url=str(url),
|
|
parsed=parsed,
|
|
args=args,
|
|
)
|
|
if provider_preflight_items:
|
|
provider_preflight_urls = [
|
|
str(item.get("url") or "").strip()
|
|
for item in provider_preflight_items
|
|
if str(item.get("url") or "").strip()
|
|
]
|
|
provider_preflight_urls, preflight_exit, provider_skipped = self._preflight_explicit_url_duplicates(
|
|
raw_urls=provider_preflight_urls,
|
|
config=config,
|
|
)
|
|
if preflight_exit is not None:
|
|
return downloaded_count, int(preflight_exit)
|
|
if provider_skipped:
|
|
if not provider_preflight_urls:
|
|
skipped_duplicate_only += 1
|
|
continue
|
|
selector = self._build_provider_playlist_item_selector(
|
|
provider_preflight_items,
|
|
remaining_urls=provider_preflight_urls,
|
|
)
|
|
if selector:
|
|
parsed_for_provider = dict(parsed)
|
|
parsed_for_provider["item"] = selector
|
|
try:
|
|
attempted_download = True
|
|
res = provider.download_url(
|
|
str(url),
|
|
final_output_dir,
|
|
parsed=parsed_for_provider,
|
|
args=list(args),
|
|
progress=progress,
|
|
quiet_mode=quiet_mode,
|
|
context_items=list(context_items or []),
|
|
)
|
|
except TypeError:
|
|
attempted_download = True
|
|
res = provider.download_url(str(url), final_output_dir)
|
|
|
|
plugin_downloaded, plugin_exit, plugin_handled = self._consume_plugin_download_result(
|
|
result=res,
|
|
config=config,
|
|
)
|
|
if plugin_handled:
|
|
downloaded_count += plugin_downloaded
|
|
if plugin_exit is not None and plugin_downloaded == 0:
|
|
return downloaded_count, int(plugin_exit)
|
|
if plugin_downloaded:
|
|
continue
|
|
|
|
if res:
|
|
# Standardize result: can be Path, tuple(Path, Info), or dict with "path"
|
|
p_val = None
|
|
extra_meta = None
|
|
if isinstance(res, (str, Path)):
|
|
p_val = Path(res)
|
|
elif isinstance(res, tuple) and len(res) > 0:
|
|
p_val = Path(res[0])
|
|
if len(res) > 1 and isinstance(res[1], dict):
|
|
extra_meta = res[1]
|
|
elif isinstance(res, dict):
|
|
path_candidate = res.get("path") or res.get("file_path")
|
|
if path_candidate:
|
|
p_val = Path(path_candidate)
|
|
extra_meta = res
|
|
|
|
if p_val:
|
|
self._emit_local_file(
|
|
downloaded_path=p_val,
|
|
source=str(url),
|
|
title_hint=p_val.stem,
|
|
tags_hint=None,
|
|
media_kind_hint=extra_meta.get("media_kind") if extra_meta else "file",
|
|
full_metadata=extra_meta,
|
|
provider_hint=provider_name,
|
|
progress=progress,
|
|
config=config,
|
|
)
|
|
downloaded_count += 1
|
|
continue
|
|
|
|
except Exception as e:
|
|
log(f"Provider {provider_name} error handling {url}: {e}", file=sys.stderr)
|
|
pass
|
|
|
|
if not handled:
|
|
continue
|
|
|
|
# Direct Download Fallback
|
|
attempted_download = True
|
|
result_obj = download_direct_file(
|
|
str(url),
|
|
final_output_dir,
|
|
quiet=quiet_mode,
|
|
pipeline_progress=progress,
|
|
)
|
|
downloaded_path = self._path_from_download_result(result_obj)
|
|
|
|
self._emit_local_file(
|
|
downloaded_path=downloaded_path,
|
|
source=str(url),
|
|
title_hint=downloaded_path.stem,
|
|
tags_hint=[f"title:{downloaded_path.stem}"],
|
|
media_kind_hint="file",
|
|
full_metadata=None,
|
|
progress=progress,
|
|
config=config,
|
|
)
|
|
downloaded_count += 1
|
|
|
|
except DownloadError as e:
|
|
log(f"Download failed for {url}: {e}", file=sys.stderr)
|
|
except Exception as e:
|
|
log(f"Error processing {url}: {e}", file=sys.stderr)
|
|
|
|
if downloaded_count == 0 and skipped_duplicate_only > 0 and not attempted_download:
|
|
return downloaded_count, 0
|
|
return downloaded_count, None
|
|
|
|
def _normalize_provider_key(self, value: Optional[Any]) -> Optional[str]:
|
|
if value is None:
|
|
return None
|
|
try:
|
|
normalized = str(value).strip()
|
|
except Exception:
|
|
return None
|
|
if not normalized:
|
|
return None
|
|
if "." in normalized:
|
|
normalized = normalized.split(".", 1)[0]
|
|
return normalized.lower()
|
|
|
|
def _provider_key_from_item(self, item: Any) -> Optional[str]:
|
|
table_hint = get_field(item, "table")
|
|
key = self._normalize_provider_key(table_hint)
|
|
if key:
|
|
return key
|
|
provider_hint = get_field(item, "plugin")
|
|
key = self._normalize_provider_key(provider_hint)
|
|
if key:
|
|
return key
|
|
return self._normalize_provider_key(get_field(item, "source"))
|
|
|
|
def _expand_provider_items(
|
|
self,
|
|
*,
|
|
piped_items: Sequence[Any],
|
|
registry: Dict[str,
|
|
Any],
|
|
config: Dict[str,
|
|
Any],
|
|
) -> List[Any]:
|
|
get_provider = registry.get("get_plugin")
|
|
expanded_items: List[Any] = []
|
|
|
|
for item in piped_items:
|
|
try:
|
|
provider_key = self._provider_key_from_item(item)
|
|
provider = get_provider(provider_key, config) if provider_key and get_provider else None
|
|
|
|
# Generic hook: If provider has expand_item(item), use it.
|
|
if provider and hasattr(provider, "expand_item") and callable(provider.expand_item):
|
|
try:
|
|
sub_items = provider.expand_item(item)
|
|
if sub_items:
|
|
expanded_items.extend(sub_items)
|
|
continue
|
|
except Exception as e:
|
|
debug_panel(
|
|
"download-file expand_item failed",
|
|
[
|
|
("plugin", provider_key),
|
|
("error", e),
|
|
],
|
|
border_style="yellow",
|
|
)
|
|
|
|
expanded_items.append(item)
|
|
except Exception:
|
|
expanded_items.append(item)
|
|
|
|
return expanded_items
|
|
|
|
def _process_provider_items(self,
|
|
*,
|
|
piped_items: Sequence[Any],
|
|
final_output_dir: Path,
|
|
config: Dict[str,
|
|
Any],
|
|
quiet_mode: bool,
|
|
registry: Dict[str,
|
|
Any],
|
|
progress: PipelineProgress,
|
|
) -> tuple[int, int]:
|
|
downloaded_count = 0
|
|
queued_magnet_submissions = 0
|
|
get_provider = registry.get("get_plugin")
|
|
SearchResult = registry.get("SearchResult")
|
|
|
|
expanded_items = self._expand_provider_items(
|
|
piped_items=piped_items,
|
|
registry=registry,
|
|
config=config
|
|
)
|
|
|
|
total_items = len(expanded_items)
|
|
processed_items = 0
|
|
|
|
try:
|
|
if total_items:
|
|
progress.set_percent(0)
|
|
except Exception:
|
|
pass
|
|
|
|
for idx, item in enumerate(expanded_items, 1):
|
|
try:
|
|
label = "item"
|
|
table = get_field(item, "table")
|
|
title = get_field(item, "title")
|
|
target = get_field(item, "path") or get_field(item, "url")
|
|
|
|
media_kind = get_field(item, "media_kind")
|
|
tags_val = get_field(item, "tag")
|
|
tags_list: Optional[List[str]]
|
|
if isinstance(tags_val, (list, set)):
|
|
tags_list = sorted([str(t) for t in tags_val if t])
|
|
else:
|
|
tags_list = None
|
|
|
|
full_metadata = get_field(item, "full_metadata")
|
|
if ((not full_metadata) and isinstance(item,
|
|
dict)
|
|
and isinstance(item.get("extra"),
|
|
dict)):
|
|
extra_md = item["extra"].get("full_metadata")
|
|
if isinstance(extra_md, dict):
|
|
full_metadata = extra_md
|
|
|
|
try:
|
|
label = title or target
|
|
label = str(label or "item").strip()
|
|
if total_items:
|
|
pct = int(round((processed_items / max(1, total_items)) * 100))
|
|
progress.set_percent(pct)
|
|
progress.set_status(
|
|
f"downloading {processed_items + 1}/{total_items}: {label}"
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
transfer_label = label
|
|
|
|
# If this looks like a plugin-owned item and a plugin is available, prefer plugin.download().
|
|
downloaded_path: Optional[Path] = None
|
|
attempted_provider_download = False
|
|
provider_sr = None
|
|
provider_obj = None
|
|
provider_key = self._provider_key_from_item(item)
|
|
if provider_key and get_provider and SearchResult:
|
|
# Reuse helper to derive the plugin key from table/plugin/source hints.
|
|
provider_obj = get_provider(provider_key, config)
|
|
|
|
if provider_obj is not None and getattr(provider_obj, "prefers_transfer_progress", False):
|
|
try:
|
|
progress.begin_transfer(label=transfer_label, total=None)
|
|
except Exception:
|
|
pass
|
|
|
|
if provider_obj is not None:
|
|
attempted_provider_download = True
|
|
sr = SearchResult(
|
|
table=str(table),
|
|
title=str(title or "Unknown"),
|
|
path=str(target or ""),
|
|
tag=set(tags_list) if tags_list else set(),
|
|
media_kind=str(media_kind or "file"),
|
|
full_metadata=full_metadata
|
|
if isinstance(full_metadata, dict) else {},
|
|
)
|
|
|
|
# Preserve plugin-managed output structure when a plugin encodes nested paths.
|
|
output_dir = final_output_dir
|
|
# Generic: allow provider to strict output_dir?
|
|
# Using default output_dir for now.
|
|
|
|
downloaded_path = provider_obj.download(sr, output_dir)
|
|
provider_sr = sr
|
|
|
|
if downloaded_path is None:
|
|
try:
|
|
downloaded_extra = self._download_provider_items(
|
|
provider=provider_obj,
|
|
provider_name=str(provider_key),
|
|
search_result=sr,
|
|
output_dir=output_dir,
|
|
progress=progress,
|
|
quiet_mode=quiet_mode,
|
|
config=config,
|
|
)
|
|
except Exception:
|
|
downloaded_extra = 0
|
|
|
|
if downloaded_extra:
|
|
downloaded_count += int(downloaded_extra)
|
|
continue
|
|
|
|
# Fallback: if we have a direct HTTP URL and no provider successfully handled it
|
|
if (downloaded_path is None and not attempted_provider_download
|
|
and isinstance(target, str) and target.startswith("http")):
|
|
|
|
suggested_name = str(title).strip() if title is not None else None
|
|
result_obj = download_direct_file(
|
|
target,
|
|
final_output_dir,
|
|
quiet=quiet_mode,
|
|
suggested_filename=suggested_name,
|
|
pipeline_progress=progress,
|
|
)
|
|
downloaded_path = coerce_to_path(result_obj)
|
|
|
|
if downloaded_path is None:
|
|
log(
|
|
f"Cannot download item (no provider handler / unsupported target): {title or target}",
|
|
file=sys.stderr,
|
|
)
|
|
continue
|
|
|
|
# Allow plugins to add or enrich tags and metadata during download.
|
|
if provider_sr is not None:
|
|
try:
|
|
sr_md = getattr(provider_sr, "full_metadata", None)
|
|
if isinstance(sr_md, dict) and sr_md:
|
|
full_metadata = sr_md
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
if isinstance(full_metadata, dict):
|
|
t = str(full_metadata.get("title") or "").strip()
|
|
if t:
|
|
title = t
|
|
except Exception:
|
|
pass
|
|
|
|
# Prefer tags from the search result object if the provider mutated them during download.
|
|
try:
|
|
sr_tags = getattr(provider_sr, "tag", None)
|
|
if isinstance(sr_tags, (set, list)) and sr_tags:
|
|
# Re-sync tags_list with the potentially enriched provider_sr.tag
|
|
tags_list = sorted([str(t) for t in sr_tags if t])
|
|
except Exception:
|
|
pass
|
|
|
|
self._emit_local_file(
|
|
downloaded_path=downloaded_path,
|
|
source=str(target) if target else None,
|
|
title_hint=str(title) if title else downloaded_path.stem,
|
|
tags_hint=tags_list,
|
|
media_kind_hint=str(media_kind) if media_kind else None,
|
|
full_metadata=full_metadata if isinstance(full_metadata,
|
|
dict) else None,
|
|
progress=progress,
|
|
config=config,
|
|
provider_hint=provider_key
|
|
)
|
|
downloaded_count += 1
|
|
|
|
except DownloadError as e:
|
|
log(f"Download failed: {e}", file=sys.stderr)
|
|
except Exception as e:
|
|
log(f"Error downloading item: {e}", file=sys.stderr)
|
|
finally:
|
|
if provider_obj is not None and getattr(provider_obj, "prefers_transfer_progress", False):
|
|
try:
|
|
progress.finish_transfer(label=transfer_label)
|
|
except Exception:
|
|
pass
|
|
processed_items += 1
|
|
try:
|
|
pct = int(round((processed_items / max(1, total_items)) * 100))
|
|
progress.set_percent(pct)
|
|
if processed_items >= total_items:
|
|
progress.clear_status()
|
|
except Exception:
|
|
pass
|
|
|
|
return downloaded_count, queued_magnet_submissions
|
|
|
|
def _download_provider_items(
|
|
self,
|
|
*,
|
|
provider: Any,
|
|
provider_name: str,
|
|
search_result: Any,
|
|
output_dir: Path,
|
|
progress: PipelineProgress,
|
|
quiet_mode: bool,
|
|
config: Dict[str, Any],
|
|
) -> int:
|
|
if provider is None or not hasattr(provider, "download_items"):
|
|
return 0
|
|
|
|
def _on_emit(path: Path, file_url: str, relpath: str, metadata: Dict[str, Any]) -> None:
|
|
title_hint = None
|
|
try:
|
|
title_hint = metadata.get("name") or relpath
|
|
except Exception:
|
|
title_hint = relpath
|
|
title_hint = title_hint or (Path(path).name if path else "download")
|
|
|
|
self._emit_local_file(
|
|
downloaded_path=path,
|
|
source=file_url,
|
|
title_hint=title_hint,
|
|
tags_hint=None,
|
|
media_kind_hint="file",
|
|
full_metadata=metadata if isinstance(metadata, dict) else None,
|
|
progress=progress,
|
|
config=config,
|
|
provider_hint=provider_name,
|
|
)
|
|
|
|
try:
|
|
downloaded_count = provider.download_items(
|
|
search_result,
|
|
output_dir,
|
|
emit=_on_emit,
|
|
progress=progress,
|
|
quiet_mode=quiet_mode,
|
|
path_from_result=coerce_to_path,
|
|
config=config,
|
|
)
|
|
except TypeError:
|
|
downloaded_count = provider.download_items(
|
|
search_result,
|
|
output_dir,
|
|
emit=_on_emit,
|
|
progress=progress,
|
|
quiet_mode=quiet_mode,
|
|
path_from_result=coerce_to_path,
|
|
)
|
|
except Exception as exc:
|
|
log(f"Provider {provider_name} download_items error: {exc}", file=sys.stderr)
|
|
return 0
|
|
|
|
try:
|
|
return int(downloaded_count or 0)
|
|
except Exception:
|
|
return 0
|
|
|
|
def _emit_local_file(
|
|
self,
|
|
*,
|
|
downloaded_path: Path,
|
|
source: Optional[str],
|
|
title_hint: Optional[str],
|
|
tags_hint: Optional[List[str]],
|
|
media_kind_hint: Optional[str],
|
|
full_metadata: Optional[Dict[str, Any]],
|
|
progress: PipelineProgress,
|
|
config: Dict[str, Any],
|
|
provider_hint: Optional[str] = None,
|
|
) -> None:
|
|
title_val = (title_hint or downloaded_path.stem or "Unknown").strip() or downloaded_path.stem
|
|
hash_value = sha256_file(downloaded_path)
|
|
notes: Optional[Dict[str, str]] = None
|
|
try:
|
|
if isinstance(full_metadata, dict):
|
|
# Plugins attach pre-built notes under the generic "_notes" key
|
|
# (e.g. Tidal sets {"lyric": subtitles} during download enrichment).
|
|
# This keeps plugin-specific metadata handling inside the plugin.
|
|
_provider_notes = full_metadata.get("_notes")
|
|
if isinstance(_provider_notes, dict) and _provider_notes:
|
|
notes = {str(k): str(v) for k, v in _provider_notes.items() if k and v}
|
|
except Exception:
|
|
notes = None
|
|
tag: List[str] = []
|
|
if tags_hint:
|
|
tag.extend([str(t) for t in tags_hint if t])
|
|
if not any(str(t).lower().startswith("title:") for t in tag):
|
|
tag.insert(0, f"title:{title_val}")
|
|
|
|
payload: Dict[str, Any] = {
|
|
"path": str(downloaded_path),
|
|
"hash": hash_value,
|
|
"title": title_val,
|
|
"action": "cmdlet:download-file",
|
|
"download_mode": "file",
|
|
"store": "local",
|
|
"media_kind": media_kind_hint or "file",
|
|
"tag": tag,
|
|
}
|
|
if provider_hint:
|
|
payload["plugin"] = str(provider_hint)
|
|
if full_metadata:
|
|
payload["metadata"] = full_metadata
|
|
if notes:
|
|
payload["notes"] = notes
|
|
if source and str(source).startswith("http"):
|
|
payload["url"] = source
|
|
elif source:
|
|
payload["source_url"] = source
|
|
|
|
pipeline_context.emit(payload)
|
|
|
|
@staticmethod
|
|
def _path_looks_local(value: Any) -> bool:
|
|
text = str(value or "").strip()
|
|
if not text:
|
|
return False
|
|
if text.startswith(("http://", "https://", "ftp://", "ftps://", "magnet:", "torrent:")):
|
|
return False
|
|
if len(text) >= 2 and text[1] == ":":
|
|
return True
|
|
if text.startswith(("\\", "/", ".", "~")):
|
|
return True
|
|
return Path(text).exists()
|
|
|
|
@staticmethod
|
|
def _resolve_display_title(result: Any, metadata: Optional[Dict[str, Any]]) -> str:
|
|
candidates = [
|
|
get_result_title(result, "title", "name", "filename"),
|
|
get_result_title(metadata or {}, "title", "name", "filename"),
|
|
]
|
|
for candidate in candidates:
|
|
if candidate is None:
|
|
continue
|
|
text = str(candidate).strip()
|
|
if text:
|
|
return text
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _sanitize_export_filename(name: str) -> str:
|
|
allowed_chars: List[str] = []
|
|
for ch in str(name or ""):
|
|
if ch.isalnum() or ch in {"-", "_", " ", "."}:
|
|
allowed_chars.append(ch)
|
|
else:
|
|
allowed_chars.append(" ")
|
|
sanitized = " ".join("".join(allowed_chars).split())
|
|
return sanitized or "export"
|
|
|
|
@staticmethod
|
|
def _unique_export_path(path: Path) -> Path:
|
|
if not path.exists():
|
|
return path
|
|
stem = path.stem
|
|
suffix = path.suffix
|
|
parent = path.parent
|
|
counter = 1
|
|
while True:
|
|
candidate = parent / f"{stem} ({counter}){suffix}"
|
|
if not candidate.exists():
|
|
return candidate
|
|
counter += 1
|
|
|
|
@staticmethod
|
|
def _iter_storage_export_refs(
|
|
parsed: Dict[str, Any],
|
|
piped_items: Sequence[Any],
|
|
) -> tuple[List[Dict[str, Any]], List[Any], Optional[int]]:
|
|
refs: List[Dict[str, Any]] = []
|
|
residual_items: List[Any] = []
|
|
|
|
query_text = str(parsed.get("query") or "").strip()
|
|
query_hash: Optional[str] = None
|
|
if query_text:
|
|
query_hash = sh.parse_single_hash_query(query_text)
|
|
if query_text.lower().startswith("hash") and not query_hash:
|
|
log('Error: -query must be of the form hash:<sha256>', file=sys.stderr)
|
|
return [], list(piped_items or []), 1
|
|
|
|
explicit_store = str(parsed.get("instance") or "").strip()
|
|
if query_hash:
|
|
if not explicit_store:
|
|
log('Error: No store name provided', file=sys.stderr)
|
|
return [], list(piped_items or []), 1
|
|
refs.append(
|
|
{
|
|
"hash": query_hash,
|
|
"store": explicit_store,
|
|
"result": None,
|
|
}
|
|
)
|
|
|
|
for item in piped_items or []:
|
|
normalized_hash = sh.normalize_hash(
|
|
str(get_field(item, "hash") or get_field(item, "file_hash") or get_field(item, "hash_hex") or "")
|
|
)
|
|
store_name = str(parsed.get("instance") or get_field(item, "store") or "").strip()
|
|
if normalized_hash and store_name:
|
|
refs.append(
|
|
{
|
|
"hash": normalized_hash,
|
|
"store": store_name,
|
|
"result": item,
|
|
}
|
|
)
|
|
else:
|
|
residual_items.append(item)
|
|
|
|
return refs, residual_items, None
|
|
|
|
def _export_store_file(
|
|
self,
|
|
*,
|
|
file_hash: str,
|
|
store_name: str,
|
|
result: Any,
|
|
parsed: Dict[str, Any],
|
|
config: Dict[str, Any],
|
|
final_output_dir: Path,
|
|
) -> int:
|
|
output_path = parsed.get("path")
|
|
explicit_output_requested = bool(output_path)
|
|
output_name = parsed.get("name")
|
|
browser_flag = bool(parsed.get("browser"))
|
|
|
|
backend, _store_registry, _exc = sh.get_preferred_store_backend(
|
|
config,
|
|
store_name,
|
|
suppress_debug=True,
|
|
)
|
|
if backend is None:
|
|
log(f"Error: Storage backend '{store_name}' not found", file=sys.stderr)
|
|
return 1
|
|
|
|
metadata = backend.get_metadata(file_hash)
|
|
if not metadata:
|
|
log(f"Error: File metadata not found for hash {file_hash}", file=sys.stderr)
|
|
return 1
|
|
|
|
try:
|
|
debug_panel(
|
|
"download-file store export",
|
|
[
|
|
("hash", file_hash),
|
|
("instance", store_name),
|
|
("output_path", output_path or "<default>"),
|
|
("output_name", output_name or "<auto>"),
|
|
("browser", browser_flag),
|
|
],
|
|
border_style="blue",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
want_url = browser_flag
|
|
source_path = backend.get_file(file_hash, url=want_url)
|
|
download_url = None
|
|
if isinstance(source_path, str):
|
|
if source_path.startswith(("http://", "https://")):
|
|
download_url = source_path
|
|
else:
|
|
source_path = Path(source_path)
|
|
|
|
if download_url and (browser_flag or not explicit_output_requested):
|
|
try:
|
|
webbrowser.open(download_url)
|
|
except Exception as exc:
|
|
log(f"Error opening browser: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
pipeline_context.emit(
|
|
build_file_result_payload(
|
|
title=self._resolve_display_title(result, metadata) or "Opened",
|
|
hash_value=file_hash,
|
|
store=store_name,
|
|
url=download_url,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
if download_url is None:
|
|
if not source_path or not Path(source_path).exists():
|
|
log(f"Error: Backend could not retrieve file for hash {file_hash}", file=sys.stderr)
|
|
return 1
|
|
|
|
filename = str(output_name or "").strip()
|
|
if not filename:
|
|
title = (metadata.get("title") if isinstance(metadata, dict) else None) or self._resolve_display_title(result, metadata) or "export"
|
|
filename = self._sanitize_export_filename(str(title))
|
|
|
|
ext = metadata.get("ext") if isinstance(metadata, dict) else None
|
|
if ext and not filename.endswith(str(ext)):
|
|
ext_text = str(ext)
|
|
if not ext_text.startswith("."):
|
|
ext_text = "." + ext_text
|
|
filename += ext_text
|
|
|
|
if download_url:
|
|
result_obj = download_direct_file(
|
|
download_url,
|
|
final_output_dir,
|
|
quiet=True,
|
|
suggested_filename=filename,
|
|
pipeline_progress=config.get("_pipeline_progress") if isinstance(config, dict) else None,
|
|
)
|
|
dest_path = self._path_from_download_result(result_obj)
|
|
else:
|
|
dest_path = self._unique_export_path(final_output_dir / filename)
|
|
shutil.copy2(Path(source_path), dest_path)
|
|
|
|
pipeline_context.emit(
|
|
build_file_result_payload(
|
|
title=filename,
|
|
hash_value=file_hash,
|
|
store=store_name,
|
|
path=str(dest_path),
|
|
)
|
|
)
|
|
return 0
|
|
|
|
def _process_storage_items(
|
|
self,
|
|
*,
|
|
piped_items: Sequence[Any],
|
|
parsed: Dict[str, Any],
|
|
config: Dict[str, Any],
|
|
final_output_dir: Path,
|
|
) -> tuple[int, List[Any], Optional[int]]:
|
|
refs, residual_items, early_exit = self._iter_storage_export_refs(parsed, piped_items)
|
|
if early_exit is not None:
|
|
return 0, list(residual_items), early_exit
|
|
if not refs:
|
|
return 0, list(residual_items), None
|
|
|
|
successes = 0
|
|
for ref in refs:
|
|
exit_code = self._export_store_file(
|
|
file_hash=str(ref.get("hash") or ""),
|
|
store_name=str(ref.get("store") or ""),
|
|
result=ref.get("result"),
|
|
parsed=parsed,
|
|
config=config,
|
|
final_output_dir=final_output_dir,
|
|
)
|
|
if exit_code != 0:
|
|
return successes, list(residual_items), exit_code
|
|
successes += 1
|
|
|
|
return successes, list(residual_items), None
|
|
|
|
def _process_explicit_local_sources(
|
|
self,
|
|
*,
|
|
local_sources: Sequence[str],
|
|
final_output_dir: Path,
|
|
parsed: Dict[str, Any],
|
|
progress: PipelineProgress,
|
|
config: Dict[str, Any],
|
|
) -> int:
|
|
explicit_output_requested = bool(parsed.get("path"))
|
|
downloaded_count = 0
|
|
for raw_source in local_sources or []:
|
|
source_path = Path(str(raw_source or "")).expanduser()
|
|
if not source_path.exists() or not source_path.is_file():
|
|
log(f"File not found: {source_path}", file=sys.stderr)
|
|
continue
|
|
|
|
if explicit_output_requested:
|
|
destination = final_output_dir / source_path.name
|
|
destination = self._unique_export_path(destination)
|
|
shutil.copy2(source_path, destination)
|
|
emit_path = destination
|
|
else:
|
|
emit_path = source_path
|
|
|
|
self._emit_local_file(
|
|
downloaded_path=emit_path,
|
|
source=str(source_path),
|
|
title_hint=emit_path.stem,
|
|
tags_hint=None,
|
|
media_kind_hint="file",
|
|
full_metadata=None,
|
|
progress=progress,
|
|
config=config,
|
|
)
|
|
downloaded_count += 1
|
|
return downloaded_count
|
|
|
|
def _maybe_render_download_details(self, *, config: Dict[str, Any]) -> None:
|
|
try:
|
|
stage_ctx = pipeline_context.get_stage_context()
|
|
except Exception:
|
|
stage_ctx = None
|
|
|
|
is_last_stage = (stage_ctx is None) or bool(getattr(stage_ctx, "is_last_stage", False))
|
|
if not is_last_stage:
|
|
return
|
|
|
|
try:
|
|
quiet_mode = bool(config.get("_quiet_background_output")) if isinstance(config, dict) else False
|
|
except Exception:
|
|
quiet_mode = False
|
|
if quiet_mode:
|
|
return
|
|
|
|
emitted_items: List[Any] = []
|
|
try:
|
|
emitted_items = list(getattr(stage_ctx, "emits", None) or []) if stage_ctx is not None else []
|
|
except Exception:
|
|
emitted_items = []
|
|
|
|
if not emitted_items:
|
|
return
|
|
|
|
# Stop the live pipeline progress UI before rendering the details panel.
|
|
try:
|
|
live_progress = pipeline_context.get_live_progress()
|
|
except Exception:
|
|
live_progress = None
|
|
|
|
if live_progress is not None:
|
|
try:
|
|
pipe_idx = getattr(stage_ctx, "pipe_index", None) if stage_ctx is not None else None
|
|
if isinstance(pipe_idx, int):
|
|
live_progress.finish_pipe(int(pipe_idx), force_complete=True)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
live_progress.stop()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
if hasattr(pipeline_context, "set_live_progress"):
|
|
pipeline_context.set_live_progress(None)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
subject = emitted_items[0] if len(emitted_items) == 1 else list(emitted_items)
|
|
# Use helper to display items and make them @-selectable
|
|
from .._shared import display_and_persist_items
|
|
display_and_persist_items(list(emitted_items), title="Result", subject=subject)
|
|
except Exception:
|
|
pass
|
|
|
|
# Prevent CLI from printing a redundant table after the detail panels.
|
|
try:
|
|
if stage_ctx is not None:
|
|
stage_ctx.emits = []
|
|
except Exception:
|
|
pass
|
|
|
|
@staticmethod
|
|
def _load_provider_registry() -> Dict[str, Any]:
|
|
"""Lightweight accessor for plugin helpers without hard dependencies."""
|
|
try:
|
|
from PluginCore import registry as provider_registry # type: ignore
|
|
from PluginCore.base import SearchResult # type: ignore
|
|
|
|
return {
|
|
"get_plugin": getattr(provider_registry, "get_plugin", None),
|
|
"match_plugin_name_for_url": getattr(provider_registry, "match_plugin_name_for_url", None),
|
|
"list_selection_url_prefixes": getattr(provider_registry, "list_selection_url_prefixes", None),
|
|
"SearchResult": SearchResult,
|
|
}
|
|
except Exception:
|
|
return {
|
|
"get_plugin": None,
|
|
"match_plugin_name_for_url": None,
|
|
"list_selection_url_prefixes": None,
|
|
"SearchResult": None,
|
|
}
|
|
|
|
@classmethod
|
|
def _extract_hash_from_search_hit(cls, hit: Any) -> Optional[str]:
|
|
if not isinstance(hit, dict):
|
|
return None
|
|
for key in ("hash", "hash_hex", "file_hash", "hydrus_hash"):
|
|
v = hit.get(key)
|
|
normalized = sh.normalize_hash(str(v) if v is not None else None)
|
|
if normalized:
|
|
return normalized
|
|
return None
|
|
|
|
@staticmethod
|
|
def _iter_duplicate_tag_values(item: Any) -> List[str]:
|
|
def _append_tag(out: List[str], value: Any) -> None:
|
|
text = ""
|
|
if isinstance(value, bytes):
|
|
try:
|
|
text = value.decode("utf-8", errors="ignore")
|
|
except Exception:
|
|
text = str(value)
|
|
elif isinstance(value, str):
|
|
text = value
|
|
if not text:
|
|
return
|
|
cleaned = text.strip()
|
|
if cleaned:
|
|
out.append(cleaned)
|
|
|
|
def _collect_current(container: Any, out: List[str]) -> None:
|
|
if isinstance(container, SequenceABC) and not isinstance(container, (str, bytes, bytearray, Mapping)):
|
|
for tag in container:
|
|
_append_tag(out, tag)
|
|
return
|
|
if not isinstance(container, Mapping):
|
|
return
|
|
current = container.get("0")
|
|
if current is None:
|
|
current = container.get(0)
|
|
if isinstance(current, SequenceABC) and not isinstance(current, (str, bytes, bytearray, Mapping)):
|
|
for tag in current:
|
|
_append_tag(out, tag)
|
|
|
|
def _collect_service_data(service_data: Any, out: List[str]) -> None:
|
|
if not isinstance(service_data, Mapping):
|
|
return
|
|
for key in (
|
|
"display_tags",
|
|
"display_friendly_tags",
|
|
"display",
|
|
"storage_tags",
|
|
"statuses_to_tags",
|
|
"tags",
|
|
):
|
|
_collect_current(service_data.get(key), out)
|
|
|
|
collected: List[str] = []
|
|
for raw_tags in (
|
|
get_field(item, "tags_flat"),
|
|
get_field(item, "tags"),
|
|
get_field(item, "tag"),
|
|
):
|
|
if isinstance(raw_tags, str):
|
|
_append_tag(collected, raw_tags)
|
|
continue
|
|
if isinstance(raw_tags, (list, tuple, set)):
|
|
for raw_tag in raw_tags:
|
|
_append_tag(collected, raw_tag)
|
|
continue
|
|
if not isinstance(raw_tags, Mapping):
|
|
continue
|
|
|
|
statuses_map = raw_tags.get("service_keys_to_statuses_to_tags")
|
|
if isinstance(statuses_map, Mapping):
|
|
for status_payload in statuses_map.values():
|
|
_collect_current(status_payload, collected)
|
|
|
|
names_map = raw_tags.get("service_keys_to_names")
|
|
if isinstance(names_map, Mapping):
|
|
_ = names_map
|
|
|
|
_collect_service_data(raw_tags, collected)
|
|
for maybe_service in raw_tags.values():
|
|
_collect_service_data(maybe_service, collected)
|
|
|
|
deduped: List[str] = []
|
|
seen: set[str] = set()
|
|
for raw_tag in collected:
|
|
text = str(raw_tag or "").strip()
|
|
key = text.lower()
|
|
if not text or key in seen:
|
|
continue
|
|
seen.add(key)
|
|
deduped.append(text)
|
|
return deduped
|
|
|
|
@staticmethod
|
|
def _extract_duplicate_namespace_tags(item: Any) -> List[str]:
|
|
tag_values = Download_File._iter_duplicate_tag_values(item)
|
|
|
|
namespace_tags: List[str] = []
|
|
seen: set[str] = set()
|
|
for raw_tag in tag_values:
|
|
text = str(raw_tag or "").strip()
|
|
if not text:
|
|
continue
|
|
lower = text.lower()
|
|
if ":" not in text or lower.startswith("title:"):
|
|
continue
|
|
if lower in seen:
|
|
continue
|
|
seen.add(lower)
|
|
namespace_tags.append(text)
|
|
return namespace_tags
|
|
|
|
@staticmethod
|
|
def _extract_duplicate_title_tag(item: Any) -> Optional[str]:
|
|
for raw_tag in Download_File._iter_duplicate_tag_values(item):
|
|
tag_text = str(raw_tag or "").strip()
|
|
if not tag_text or not tag_text.lower().startswith("title:"):
|
|
continue
|
|
value = tag_text.split(":", 1)[1].strip()
|
|
if value:
|
|
return value
|
|
return None
|
|
|
|
@classmethod
|
|
def _extract_duplicate_title(cls, item: Any) -> str:
|
|
for key in ("title", "name", "filename", "target"):
|
|
value = get_field(item, key)
|
|
text = str(value or "").strip()
|
|
if text:
|
|
return text
|
|
|
|
tag_title = cls._extract_duplicate_title_tag(item)
|
|
if tag_title:
|
|
return tag_title
|
|
|
|
path_value = str(get_field(item, "path") or "").strip()
|
|
if path_value and not path_value.lower().startswith(("http://", "https://", "file://")):
|
|
return path_value
|
|
|
|
return "(exists)"
|
|
|
|
@classmethod
|
|
def _has_duplicate_title(cls, item: Any) -> bool:
|
|
return cls._extract_duplicate_title(item) != "(exists)"
|
|
|
|
@staticmethod
|
|
def _normalize_duplicate_preflight_policy(value: Any) -> Optional[str]:
|
|
text = str(value or "").strip().lower()
|
|
if not text:
|
|
return "skip"
|
|
mapping = {
|
|
"i": "ignore",
|
|
"ignore": "ignore",
|
|
"s": "skip",
|
|
"skip": "skip",
|
|
"c": "cancel",
|
|
"cancel": "cancel",
|
|
}
|
|
return mapping.get(text)
|
|
|
|
@classmethod
|
|
def _build_duplicate_display_row(
|
|
cls,
|
|
item: Any,
|
|
*,
|
|
backend_name: str,
|
|
original_url: str,
|
|
) -> Dict[str, Any]:
|
|
try:
|
|
extracted = build_display_row(item, keys=["title", "store", "hash", "ext", "size"])
|
|
except Exception:
|
|
extracted = {}
|
|
|
|
title = extracted.get("title") or cls._extract_duplicate_title(item)
|
|
store_name = extracted.get("store") or get_field(item, "store") or backend_name
|
|
file_hash = extracted.get("hash") or get_field(item, "hash") or get_field(item, "file_hash") or get_field(item, "hash_hex") or ""
|
|
ext_text = str(extracted.get("ext") or get_field(item, "ext") or "").strip()
|
|
size_raw = extracted.get("size")
|
|
if size_raw is None:
|
|
size_raw = get_field(item, "size_bytes")
|
|
if size_raw is None:
|
|
size_raw = get_field(item, "size")
|
|
|
|
if not ext_text:
|
|
for candidate in (get_field(item, "path"), get_field(item, "title"), get_field(item, "name")):
|
|
candidate_text = str(candidate or "").strip()
|
|
if not candidate_text:
|
|
continue
|
|
suffix = Path(candidate_text).suffix.lstrip(".")
|
|
if suffix:
|
|
ext_text = suffix
|
|
break
|
|
|
|
title_text = str(title)
|
|
tag_text = ", ".join(cls._extract_duplicate_namespace_tags(item))
|
|
store_text = str(store_name or backend_name)
|
|
file_hash_text = str(file_hash or "")
|
|
selection_args = None
|
|
selection_action = None
|
|
selection_url = None
|
|
if file_hash_text and store_text and file_hash_text.strip().lower() != "unknown":
|
|
selection_args, selection_action = build_hash_store_selection(
|
|
file_hash_text,
|
|
store_text,
|
|
)
|
|
if selection_args and len(selection_args) >= 2:
|
|
normalized_hash = str(selection_args[1]).split("hash:", 1)[-1].strip()
|
|
if normalized_hash:
|
|
file_hash_text = normalized_hash
|
|
selection_url = f"hydrus://{store_text}/{normalized_hash}"
|
|
|
|
columns: List[tuple[str, Any]] = [("Title", title_text)]
|
|
if tag_text:
|
|
columns.append(("Tag", tag_text))
|
|
columns.extend(
|
|
[
|
|
("Store", store_text),
|
|
("Size", size_raw),
|
|
("Ext", ext_text),
|
|
("URL", original_url),
|
|
]
|
|
)
|
|
|
|
metadata = dict(item) if isinstance(item, dict) else {}
|
|
if file_hash_text:
|
|
metadata.setdefault("hash", file_hash_text)
|
|
if store_text:
|
|
metadata.setdefault("store", store_text)
|
|
if ext_text:
|
|
metadata.setdefault("ext", ext_text)
|
|
if size_raw is not None:
|
|
metadata.setdefault("size", size_raw)
|
|
metadata.setdefault("size_bytes", size_raw)
|
|
metadata.setdefault("url", original_url)
|
|
if selection_url:
|
|
metadata.setdefault("selection_url", selection_url)
|
|
|
|
payload = build_table_result_payload(
|
|
title=title_text,
|
|
columns=columns,
|
|
selection_args=selection_args,
|
|
selection_action=selection_action,
|
|
store=store_text,
|
|
hash=file_hash_text,
|
|
ext=ext_text,
|
|
size=size_raw,
|
|
size_bytes=size_raw,
|
|
url=original_url,
|
|
tags_flat=metadata.get("tags_flat"),
|
|
full_metadata=metadata,
|
|
)
|
|
if selection_url:
|
|
payload["path"] = selection_url
|
|
payload["selection_url"] = selection_url
|
|
return payload
|
|
|
|
@classmethod
|
|
def _fetch_duplicate_metadata_for_hash(
|
|
cls,
|
|
backend: Any,
|
|
*,
|
|
backend_name: str,
|
|
file_hash: str,
|
|
) -> Dict[str, Any]:
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
fetcher = getattr(backend, "fetch_file_metadata", None)
|
|
if callable(fetcher):
|
|
try:
|
|
payload = fetcher(file_hash)
|
|
except TypeError:
|
|
try:
|
|
payload = fetcher(file_hash=file_hash)
|
|
except Exception:
|
|
payload = None
|
|
except Exception:
|
|
payload = None
|
|
|
|
if isinstance(payload, dict):
|
|
meta_list = payload.get("metadata")
|
|
if isinstance(meta_list, list) and meta_list and isinstance(meta_list[0], dict):
|
|
metadata = dict(meta_list[0])
|
|
else:
|
|
metadata = dict(payload)
|
|
|
|
metadata = cls._enrich_duplicate_metadata(
|
|
metadata,
|
|
backend,
|
|
backend_name=backend_name,
|
|
file_hash=file_hash,
|
|
)
|
|
|
|
metadata.setdefault("hash", file_hash)
|
|
metadata.setdefault("store", backend_name)
|
|
return metadata
|
|
|
|
@classmethod
|
|
def _enrich_duplicate_metadata(
|
|
cls,
|
|
metadata: Optional[Dict[str, Any]],
|
|
backend: Any,
|
|
*,
|
|
backend_name: str,
|
|
file_hash: str,
|
|
) -> Dict[str, Any]:
|
|
result = dict(metadata) if isinstance(metadata, dict) else {}
|
|
|
|
if result:
|
|
extractor = getattr(backend, "_extract_title_and_tags", None)
|
|
if callable(extractor):
|
|
file_id_value = get_field(result, "file_id") or file_hash
|
|
try:
|
|
extracted_title, extracted_tags = extractor(result, file_id_value)
|
|
except Exception:
|
|
extracted_title, extracted_tags = None, None
|
|
|
|
if not get_field(result, "tags_flat") and isinstance(extracted_tags, SequenceABC) and not isinstance(extracted_tags, (str, bytes, bytearray, Mapping)):
|
|
deduped_tags: List[str] = []
|
|
seen_tags: set[str] = set()
|
|
for raw_tag in extracted_tags:
|
|
tag_text = str(raw_tag or "").strip()
|
|
lowered = tag_text.lower()
|
|
if not tag_text or lowered in seen_tags:
|
|
continue
|
|
seen_tags.add(lowered)
|
|
deduped_tags.append(tag_text)
|
|
if deduped_tags:
|
|
result["tags_flat"] = deduped_tags
|
|
|
|
title_text = str(extracted_title or "").strip()
|
|
generic_title = f"Hydrus File {file_id_value}".strip()
|
|
if title_text and title_text != generic_title:
|
|
result.setdefault("title", title_text)
|
|
|
|
if not result:
|
|
getter = getattr(backend, "get_metadata", None)
|
|
if callable(getter):
|
|
try:
|
|
payload = getter(file_hash)
|
|
except Exception:
|
|
payload = None
|
|
if isinstance(payload, dict):
|
|
result = dict(payload)
|
|
|
|
getter = getattr(backend, "get_metadata", None)
|
|
if callable(getter) and not cls._has_duplicate_title(result):
|
|
try:
|
|
getter_payload = getter(file_hash)
|
|
except Exception:
|
|
getter_payload = None
|
|
if isinstance(getter_payload, dict):
|
|
for key, value in getter_payload.items():
|
|
current = result.get(key)
|
|
if current not in (None, "", [], {}, ()):
|
|
continue
|
|
if value in (None, "", [], {}, ()):
|
|
continue
|
|
result[key] = value
|
|
|
|
return result
|
|
|
|
@classmethod
|
|
def _fetch_duplicate_metadata_for_hashes(
|
|
cls,
|
|
backend: Any,
|
|
*,
|
|
backend_name: str,
|
|
file_hashes: Sequence[str],
|
|
) -> Dict[str, Dict[str, Any]]:
|
|
normalized_hashes: List[str] = []
|
|
seen_hashes: set[str] = set()
|
|
for raw_hash in file_hashes or []:
|
|
normalized_hash = sh.normalize_hash(str(raw_hash) if raw_hash is not None else None)
|
|
if not normalized_hash or normalized_hash in seen_hashes:
|
|
continue
|
|
seen_hashes.add(normalized_hash)
|
|
normalized_hashes.append(normalized_hash)
|
|
|
|
if not normalized_hashes:
|
|
return {}
|
|
|
|
metadata_by_hash: Dict[str, Dict[str, Any]] = {}
|
|
fetcher = getattr(backend, "fetch_files_metadata", None)
|
|
if callable(fetcher):
|
|
try:
|
|
payload = fetcher(
|
|
normalized_hashes,
|
|
include_service_keys_to_tags=True,
|
|
include_file_url=True,
|
|
include_duration=True,
|
|
include_size=True,
|
|
include_mime=True,
|
|
)
|
|
except TypeError:
|
|
try:
|
|
payload = fetcher(
|
|
file_hashes=normalized_hashes,
|
|
include_service_keys_to_tags=True,
|
|
include_file_url=True,
|
|
include_duration=True,
|
|
include_size=True,
|
|
include_mime=True,
|
|
)
|
|
except Exception:
|
|
payload = None
|
|
except Exception:
|
|
payload = None
|
|
|
|
if isinstance(payload, dict):
|
|
meta_list = payload.get("metadata")
|
|
if isinstance(meta_list, list):
|
|
for entry in meta_list:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
entry_hash = sh.normalize_hash(str(entry.get("hash") or entry.get("hash_hex") or entry.get("file_hash") or ""))
|
|
if not entry_hash:
|
|
continue
|
|
metadata_by_hash[entry_hash] = cls._enrich_duplicate_metadata(
|
|
dict(entry),
|
|
backend,
|
|
backend_name=backend_name,
|
|
file_hash=entry_hash,
|
|
)
|
|
|
|
for normalized_hash in normalized_hashes:
|
|
metadata = metadata_by_hash.get(normalized_hash)
|
|
if metadata is None:
|
|
metadata = cls._fetch_duplicate_metadata_for_hash(
|
|
backend,
|
|
backend_name=backend_name,
|
|
file_hash=normalized_hash,
|
|
)
|
|
metadata.setdefault("hash", normalized_hash)
|
|
metadata.setdefault("store", backend_name)
|
|
metadata_by_hash[normalized_hash] = metadata
|
|
|
|
return metadata_by_hash
|
|
|
|
@classmethod
|
|
def _collect_existing_url_match_refs_for_url(
|
|
cls,
|
|
storage: Any,
|
|
canonical_url: str,
|
|
*,
|
|
hydrus_available: bool,
|
|
config: Optional[Dict[str, Any]] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
if not canonical_url:
|
|
return []
|
|
if not cls._supports_storage_duplicate_lookup(canonical_url):
|
|
return []
|
|
|
|
config_dict = config if isinstance(config, dict) else {}
|
|
refs: List[Dict[str, Any]] = []
|
|
seen_pairs: set[tuple[str, str]] = set()
|
|
seen_backends: set[str] = set()
|
|
|
|
def _append_ref(backend_name: str, backend: Any, *, item: Any = None, file_hash_hint: Optional[str] = None, is_exact: bool = False) -> None:
|
|
normalized_hash = sh.normalize_hash(str(file_hash_hint) if file_hash_hint is not None else None)
|
|
if not normalized_hash:
|
|
normalized_hash = cls._extract_hash_from_search_hit(item)
|
|
pair_key = (str(backend_name or "").strip().lower(), str(normalized_hash or ""))
|
|
if pair_key in seen_pairs:
|
|
return
|
|
seen_pairs.add(pair_key)
|
|
refs.append(
|
|
{
|
|
"backend_name": str(backend_name or "").strip(),
|
|
"backend": backend,
|
|
"hash": normalized_hash,
|
|
"item": dict(item) if isinstance(item, dict) else item,
|
|
"is_exact": bool(is_exact),
|
|
}
|
|
)
|
|
|
|
def _iter_backends() -> List[tuple[str, Any]]:
|
|
backends: List[tuple[str, Any]] = []
|
|
if storage is not None:
|
|
try:
|
|
backend_names = list(storage.list_searchable_backends() or [])
|
|
except Exception:
|
|
backend_names = []
|
|
|
|
for backend_name in backend_names:
|
|
try:
|
|
backend = storage[backend_name]
|
|
except Exception:
|
|
continue
|
|
name_text = str(backend_name).strip()
|
|
if not name_text or name_text.lower() == "temp":
|
|
continue
|
|
key = name_text.lower()
|
|
if key in seen_backends:
|
|
continue
|
|
seen_backends.add(key)
|
|
backends.append((name_text, backend))
|
|
|
|
try:
|
|
registry_helpers = cls._load_provider_registry()
|
|
get_plugin = registry_helpers.get("get_plugin")
|
|
hydrus_provider = get_plugin("hydrusnetwork", config_dict) if callable(get_plugin) else None
|
|
if hydrus_provider is not None:
|
|
for backend_name, backend in hydrus_provider.iter_backends():
|
|
name_text = str(backend_name or "").strip()
|
|
if not name_text:
|
|
continue
|
|
key = name_text.lower()
|
|
if key in seen_backends:
|
|
continue
|
|
seen_backends.add(key)
|
|
backends.append((name_text, backend))
|
|
except Exception:
|
|
pass
|
|
|
|
return backends
|
|
|
|
for backend_name, backend in _iter_backends():
|
|
try:
|
|
if not hydrus_available and str(getattr(backend, "STORE_TYPE", "")).strip().lower() == "hydrusnetwork":
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
found_exact = False
|
|
lookup_exact = getattr(backend, "find_hashes_by_url", None)
|
|
if callable(lookup_exact):
|
|
try:
|
|
hashes = lookup_exact(canonical_url) or []
|
|
except Exception:
|
|
hashes = []
|
|
if isinstance(hashes, (list, tuple, set)):
|
|
for existing_hash in hashes:
|
|
normalized_hash = sh.normalize_hash(str(existing_hash) if existing_hash is not None else None)
|
|
if not normalized_hash:
|
|
continue
|
|
found_exact = True
|
|
_append_ref(
|
|
backend_name,
|
|
backend,
|
|
file_hash_hint=normalized_hash,
|
|
is_exact=True,
|
|
)
|
|
if found_exact:
|
|
continue
|
|
|
|
searcher = getattr(backend, "search", None)
|
|
if callable(searcher):
|
|
try:
|
|
hits = searcher(f"url:{canonical_url}", limit=5, minimal=True) or []
|
|
except Exception:
|
|
hits = []
|
|
for hit in hits:
|
|
_append_ref(backend_name, backend, item=hit)
|
|
|
|
return refs
|
|
|
|
@classmethod
|
|
def _find_existing_url_matches_for_url(
|
|
cls,
|
|
storage: Any,
|
|
canonical_url: str,
|
|
*,
|
|
hydrus_available: bool,
|
|
config: Optional[Dict[str, Any]] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
refs = cls._collect_existing_url_match_refs_for_url(
|
|
storage,
|
|
canonical_url,
|
|
hydrus_available=hydrus_available,
|
|
config=config,
|
|
)
|
|
if not refs:
|
|
return []
|
|
|
|
matches: List[Dict[str, Any]] = []
|
|
exact_hashes_by_backend: Dict[str, Dict[str, Any]] = {}
|
|
prefetched_metadata: Dict[tuple[str, str], Dict[str, Any]] = {}
|
|
|
|
for ref in refs:
|
|
if not ref.get("is_exact"):
|
|
continue
|
|
backend_name = str(ref.get("backend_name") or "").strip()
|
|
backend_key = backend_name.lower()
|
|
normalized_hash = sh.normalize_hash(str(ref.get("hash") or ""))
|
|
if not backend_key or not normalized_hash:
|
|
continue
|
|
bucket = exact_hashes_by_backend.setdefault(
|
|
backend_key,
|
|
{
|
|
"backend_name": backend_name,
|
|
"backend": ref.get("backend"),
|
|
"hashes": [],
|
|
},
|
|
)
|
|
if normalized_hash not in bucket["hashes"]:
|
|
bucket["hashes"].append(normalized_hash)
|
|
|
|
for backend_key, bucket in exact_hashes_by_backend.items():
|
|
metadata_map = cls._fetch_duplicate_metadata_for_hashes(
|
|
bucket.get("backend"),
|
|
backend_name=str(bucket.get("backend_name") or backend_key),
|
|
file_hashes=list(bucket.get("hashes") or []),
|
|
)
|
|
for normalized_hash, metadata in metadata_map.items():
|
|
prefetched_metadata[(backend_key, normalized_hash)] = metadata
|
|
|
|
for ref in refs:
|
|
backend_name = str(ref.get("backend_name") or "").strip()
|
|
backend_key = backend_name.lower()
|
|
normalized_hash = sh.normalize_hash(str(ref.get("hash") or ""))
|
|
if ref.get("is_exact") and normalized_hash:
|
|
candidate = prefetched_metadata.get((backend_key, normalized_hash))
|
|
if candidate is None:
|
|
candidate = cls._fetch_duplicate_metadata_for_hash(
|
|
ref.get("backend"),
|
|
backend_name=backend_name,
|
|
file_hash=normalized_hash,
|
|
)
|
|
else:
|
|
item = ref.get("item")
|
|
candidate = dict(item) if isinstance(item, dict) else {"hash": normalized_hash or "", "store": backend_name}
|
|
|
|
if normalized_hash:
|
|
candidate.setdefault("hash", normalized_hash)
|
|
candidate.setdefault("store", backend_name)
|
|
matches.append(
|
|
cls._build_duplicate_display_row(
|
|
candidate,
|
|
backend_name=backend_name,
|
|
original_url=canonical_url,
|
|
)
|
|
)
|
|
|
|
return matches
|
|
|
|
@classmethod
|
|
def _find_existing_hash_for_url(
|
|
cls, storage: Any, canonical_url: str, *, hydrus_available: bool
|
|
) -> Optional[str]:
|
|
hashes = cls._find_existing_hashes_for_url(
|
|
storage,
|
|
canonical_url,
|
|
hydrus_available=hydrus_available,
|
|
config={},
|
|
)
|
|
return hashes[0] if hashes else None
|
|
|
|
@staticmethod
|
|
def _init_storage(config: Dict[str, Any]) -> tuple[Any, bool]:
|
|
"""Initialize store registry and determine whether a Hydrus backend is usable."""
|
|
storage = None
|
|
try:
|
|
from PluginCore.backend_registry import BackendRegistry as _Store
|
|
|
|
storage = _Store(config)
|
|
except Exception:
|
|
storage = None
|
|
|
|
hydrus_available = False
|
|
try:
|
|
from plugins.hydrusnetwork import api as hydrus_api
|
|
|
|
hydrus_available = bool(hydrus_api.is_hydrus_available(config))
|
|
except Exception:
|
|
hydrus_available = False
|
|
|
|
if storage is not None and not hydrus_available:
|
|
try:
|
|
backend_names = list(storage.list_backends() or [])
|
|
except Exception:
|
|
backend_names = []
|
|
for backend_name in backend_names:
|
|
try:
|
|
backend = storage[backend_name]
|
|
except Exception:
|
|
continue
|
|
if str(getattr(backend, "STORE_TYPE", "")).strip().lower() == "hydrusnetwork":
|
|
hydrus_available = True
|
|
break
|
|
|
|
return storage, hydrus_available
|
|
|
|
@staticmethod
|
|
def _supports_storage_duplicate_lookup(raw_url: str) -> bool:
|
|
text = str(raw_url or "").strip()
|
|
if not text:
|
|
return False
|
|
|
|
try:
|
|
parsed = urlparse(text)
|
|
except Exception:
|
|
parsed = None
|
|
|
|
scheme = str(getattr(parsed, "scheme", "") or "").strip().lower()
|
|
return scheme in {"http", "https", "ftp", "ftps"}
|
|
|
|
@staticmethod
|
|
def _filter_supported_urls(raw_urls: Sequence[str]) -> tuple[List[str], List[str]]:
|
|
"""Split explicit URLs into supported and unsupported buckets."""
|
|
supported: List[str] = []
|
|
unsupported: List[str] = []
|
|
for raw in raw_urls or []:
|
|
text = str(raw or "").strip()
|
|
if not text:
|
|
continue
|
|
low = text.lower()
|
|
if low.startswith(("http://", "https://", "ftp://", "ftps://", "magnet:")):
|
|
supported.append(text)
|
|
else:
|
|
unsupported.append(text)
|
|
return supported, unsupported
|
|
|
|
@staticmethod
|
|
def _canonicalize_url_for_storage(
|
|
*,
|
|
requested_url: str,
|
|
provider_name: Optional[str] = None,
|
|
provider_instance: Optional[str] = None,
|
|
provider_item: Optional[Any] = None,
|
|
) -> str:
|
|
"""Return the URL key used for duplicate preflight lookups."""
|
|
return str(requested_url or "").strip()
|
|
|
|
@staticmethod
|
|
def _preflight_url_duplicate(
|
|
*,
|
|
canonical_url: str,
|
|
storage: Any,
|
|
hydrus_available: bool,
|
|
final_output_dir: Path,
|
|
auto_continue_duplicates: bool = True,
|
|
force_prompt_in_pipeline: bool = False,
|
|
) -> bool:
|
|
"""Run duplicate URL preflight against configured storage backends."""
|
|
if not canonical_url or storage is None:
|
|
return True
|
|
return not sh.check_url_exists_in_storage(
|
|
urls=[canonical_url],
|
|
storage=storage,
|
|
hydrus_available=hydrus_available,
|
|
final_output_dir=final_output_dir,
|
|
auto_continue_duplicates=auto_continue_duplicates,
|
|
force_prompt_in_pipeline=force_prompt_in_pipeline,
|
|
)
|
|
|
|
@staticmethod
|
|
def _parse_clip_spec_to_ranges(clip_spec: Optional[str]) -> Optional[List[tuple[int, int]]]:
|
|
"""Parse clip spec strings like '2m-2m20s,5m-6m'."""
|
|
text = str(clip_spec or "").strip()
|
|
if not text:
|
|
return None
|
|
|
|
def _parse_time(value: str) -> Optional[int]:
|
|
s = str(value or "").strip().lower()
|
|
if not s:
|
|
return None
|
|
try:
|
|
if ":" in s:
|
|
parts = [int(p) for p in s.split(":")]
|
|
if len(parts) == 2:
|
|
return (parts[0] * 60) + parts[1]
|
|
if len(parts) == 3:
|
|
return (parts[0] * 3600) + (parts[1] * 60) + parts[2]
|
|
return None
|
|
total = 0
|
|
number = ""
|
|
units_seen = False
|
|
for ch in s:
|
|
if ch.isdigit():
|
|
number += ch
|
|
continue
|
|
if ch in {"h", "m", "s"} and number:
|
|
units_seen = True
|
|
val = int(number)
|
|
if ch == "h":
|
|
total += val * 3600
|
|
elif ch == "m":
|
|
total += val * 60
|
|
else:
|
|
total += val
|
|
number = ""
|
|
continue
|
|
return None
|
|
if number:
|
|
total += int(number)
|
|
if total == 0 and units_seen:
|
|
return 0
|
|
return total if total >= 0 else None
|
|
except Exception:
|
|
return None
|
|
|
|
ranges: List[tuple[int, int]] = []
|
|
for chunk in [c.strip() for c in text.split(",") if c.strip()]:
|
|
if "-" not in chunk:
|
|
return None
|
|
left, right = chunk.split("-", 1)
|
|
start = _parse_time(left)
|
|
end = _parse_time(right)
|
|
if start is None or end is None or end < start:
|
|
return None
|
|
ranges.append((start, end))
|
|
return ranges or None
|
|
|
|
def _download_supported_urls(self, **kwargs: Any) -> int:
|
|
"""Download pre-validated streaming URLs (wrapper used by tests)."""
|
|
urls = list(kwargs.get("supported_url") or [])
|
|
storage = kwargs.get("storage")
|
|
hydrus_available = bool(kwargs.get("hydrus_available"))
|
|
final_output_dir = kwargs.get("final_output_dir")
|
|
skip_preflight = bool(kwargs.get("skip_per_url_preflight"))
|
|
|
|
if not urls:
|
|
return 1
|
|
|
|
for requested_url in urls:
|
|
canonical = self._canonicalize_url_for_storage(requested_url=requested_url)
|
|
if skip_preflight:
|
|
continue
|
|
ok = self._preflight_url_duplicate(
|
|
canonical_url=canonical,
|
|
storage=storage,
|
|
hydrus_available=hydrus_available,
|
|
final_output_dir=Path(final_output_dir) if final_output_dir else Path.cwd(),
|
|
)
|
|
if not ok:
|
|
# Duplicate skip is non-fatal for the whole batch.
|
|
continue
|
|
|
|
return 0
|
|
|
|
def _maybe_show_playlist_table(self, **kwargs: Any) -> bool:
|
|
"""Compat hook used by tests; playlist table rendering is handled elsewhere."""
|
|
return False
|
|
|
|
def _maybe_show_format_table_for_single_url(self, **kwargs: Any) -> Optional[int]:
|
|
"""Compat hook used by tests; format table rendering is handled elsewhere."""
|
|
return None
|
|
|
|
def _run_streaming_urls(
|
|
self,
|
|
*,
|
|
streaming_urls: Sequence[str],
|
|
args: Sequence[str],
|
|
config: Dict[str, Any],
|
|
parsed: Dict[str, Any],
|
|
) -> int:
|
|
"""Compat wrapper for tests that exercise legacy streaming dispatch flow."""
|
|
storage, hydrus_available = self._init_storage(config)
|
|
supported_url, _unsupported = self._filter_supported_urls(streaming_urls)
|
|
if not supported_url:
|
|
return 1
|
|
|
|
final_output_dir = resolve_target_dir(parsed, config)
|
|
if final_output_dir is None:
|
|
return 1
|
|
|
|
query_text = str(parsed.get("query") or "")
|
|
clip_spec = None
|
|
for token in [t.strip() for t in query_text.split(",") if t.strip()]:
|
|
if token.lower().startswith("clip:"):
|
|
clip_spec = token.split(":", 1)[1].strip()
|
|
break
|
|
clip_ranges = self._parse_clip_spec_to_ranges(clip_spec)
|
|
|
|
ytdlp_tool = YtDlpTool(config) if callable(YtDlpTool) else None
|
|
playlist_items = parsed.get("item")
|
|
|
|
return self._download_supported_urls(
|
|
supported_url=supported_url,
|
|
ytdlp_tool=ytdlp_tool,
|
|
args=list(args),
|
|
config=config,
|
|
final_output_dir=final_output_dir,
|
|
mode="audio",
|
|
clip_spec=clip_spec,
|
|
clip_ranges=clip_ranges,
|
|
query_hash_override=None,
|
|
embed_chapters=False,
|
|
write_sub=False,
|
|
quiet_mode=bool(config.get("_quiet_background_output")) if isinstance(config, dict) else False,
|
|
playlist_items=playlist_items,
|
|
ytdl_format=(ytdlp_tool.default_format("audio") if ytdlp_tool and hasattr(ytdlp_tool, "default_format") else "best"),
|
|
skip_per_url_preflight=False,
|
|
forced_single_format_id=None,
|
|
forced_single_format_for_batch=False,
|
|
formats_cache={},
|
|
storage=storage,
|
|
hydrus_available=hydrus_available,
|
|
download_timeout_seconds=int(config.get("_pipeobject_timeout_seconds") or 300) if isinstance(config, dict) else 300,
|
|
)
|
|
|
|
@classmethod
|
|
def _find_existing_hashes_for_url(
|
|
cls,
|
|
storage: Any,
|
|
canonical_url: str,
|
|
*,
|
|
hydrus_available: bool,
|
|
config: Optional[Dict[str, Any]] = None,
|
|
) -> List[str]:
|
|
refs = cls._collect_existing_url_match_refs_for_url(
|
|
storage,
|
|
canonical_url,
|
|
hydrus_available=hydrus_available,
|
|
config=config,
|
|
)
|
|
hashes: List[str] = []
|
|
seen_hashes: set[str] = set()
|
|
for ref in refs:
|
|
normalized = sh.normalize_hash(str(ref.get("hash") or ""))
|
|
if not normalized or normalized in seen_hashes:
|
|
continue
|
|
seen_hashes.add(normalized)
|
|
hashes.append(normalized)
|
|
return hashes
|
|
|
|
def _preflight_explicit_url_duplicates(
|
|
self,
|
|
*,
|
|
raw_urls: Sequence[str],
|
|
config: Dict[str, Any],
|
|
) -> tuple[List[str], Optional[int], int]:
|
|
"""Return (urls_to_process, early_exit, skipped_count)."""
|
|
urls = [str(u or "").strip() for u in (raw_urls or []) if str(u or "").strip()]
|
|
if not urls:
|
|
return [], None, 0
|
|
|
|
if bool(config.get("_skip_url_preflight")):
|
|
return urls, None, 0
|
|
|
|
storage, hydrus_available = self._init_storage(config)
|
|
duplicate_refs: Dict[str, List[Dict[str, Any]]] = {}
|
|
exact_hashes_by_backend: Dict[str, Dict[str, Any]] = {}
|
|
for url in urls:
|
|
refs = self._collect_existing_url_match_refs_for_url(
|
|
storage,
|
|
url,
|
|
hydrus_available=hydrus_available,
|
|
config=config,
|
|
)
|
|
if not refs:
|
|
continue
|
|
duplicate_refs[url] = refs
|
|
for ref in refs:
|
|
if not ref.get("is_exact"):
|
|
continue
|
|
backend_name = str(ref.get("backend_name") or "").strip()
|
|
backend_key = backend_name.lower()
|
|
normalized_hash = sh.normalize_hash(str(ref.get("hash") or ""))
|
|
if not backend_key or not normalized_hash:
|
|
continue
|
|
bucket = exact_hashes_by_backend.setdefault(
|
|
backend_key,
|
|
{
|
|
"backend_name": backend_name,
|
|
"backend": ref.get("backend"),
|
|
"hashes": [],
|
|
},
|
|
)
|
|
if normalized_hash not in bucket["hashes"]:
|
|
bucket["hashes"].append(normalized_hash)
|
|
|
|
if not duplicate_refs:
|
|
return urls, None, 0
|
|
|
|
prefetched_metadata: Dict[tuple[str, str], Dict[str, Any]] = {}
|
|
for backend_key, bucket in exact_hashes_by_backend.items():
|
|
metadata_map = self._fetch_duplicate_metadata_for_hashes(
|
|
bucket.get("backend"),
|
|
backend_name=str(bucket.get("backend_name") or backend_key),
|
|
file_hashes=list(bucket.get("hashes") or []),
|
|
)
|
|
for normalized_hash, metadata in metadata_map.items():
|
|
prefetched_metadata[(backend_key, normalized_hash)] = metadata
|
|
|
|
duplicates: Dict[str, List[Dict[str, Any]]] = {}
|
|
for url, refs in duplicate_refs.items():
|
|
rows: List[Dict[str, Any]] = []
|
|
for ref in refs:
|
|
backend_name = str(ref.get("backend_name") or "").strip()
|
|
backend_key = backend_name.lower()
|
|
normalized_hash = sh.normalize_hash(str(ref.get("hash") or ""))
|
|
if ref.get("is_exact") and normalized_hash:
|
|
candidate = prefetched_metadata.get((backend_key, normalized_hash))
|
|
if candidate is None:
|
|
candidate = self._fetch_duplicate_metadata_for_hash(
|
|
ref.get("backend"),
|
|
backend_name=backend_name,
|
|
file_hash=normalized_hash,
|
|
)
|
|
else:
|
|
item = ref.get("item")
|
|
candidate = dict(item) if isinstance(item, dict) else {"hash": normalized_hash or "", "store": backend_name}
|
|
|
|
if normalized_hash:
|
|
candidate.setdefault("hash", normalized_hash)
|
|
candidate.setdefault("store", backend_name)
|
|
rows.append(
|
|
self._build_duplicate_display_row(
|
|
candidate,
|
|
backend_name=backend_name,
|
|
original_url=url,
|
|
)
|
|
)
|
|
if rows:
|
|
duplicates[url] = rows
|
|
|
|
duplicate_count = len(duplicates)
|
|
total_count = len(urls)
|
|
try:
|
|
debug_panel(
|
|
"download-file duplicate preflight",
|
|
[
|
|
("total_urls", total_count),
|
|
("duplicate_urls", duplicate_count),
|
|
],
|
|
border_style="yellow",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
table = Table(f"Duplicate URLs detected ({duplicate_count}/{total_count})", max_columns=12)
|
|
table._interactive(False)
|
|
duplicate_rows: List[Dict[str, Any]] = []
|
|
for _url, rows in duplicates.items():
|
|
for row in rows:
|
|
payload = dict(row) if isinstance(row, dict) else {}
|
|
duplicate_rows.append(payload)
|
|
table.add_result(payload)
|
|
|
|
try:
|
|
pipeline_context.set_last_result_table_overlay(table, duplicate_rows)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
stdin_interactive = bool(sys.stdin and sys.stdin.isatty())
|
|
except Exception:
|
|
stdin_interactive = False
|
|
|
|
suspend = getattr(pipeline_context, "suspend_live_progress", None)
|
|
cm: AbstractContextManager[Any] = nullcontext()
|
|
if callable(suspend):
|
|
try:
|
|
maybe_cm = suspend()
|
|
if maybe_cm is not None:
|
|
cm = maybe_cm # type: ignore[assignment]
|
|
except Exception:
|
|
cm = nullcontext()
|
|
|
|
policy = "skip"
|
|
with cm:
|
|
console = get_stderr_console()
|
|
try:
|
|
console.print(table)
|
|
except Exception:
|
|
pass
|
|
setattr(table, "_rendered_by_cmdlet", True)
|
|
|
|
if stdin_interactive:
|
|
while True:
|
|
try:
|
|
raw_policy = Prompt.ask(
|
|
"Duplicate URLs found. Action? [I]gnore/[S]kip/[C]ancel",
|
|
default="skip",
|
|
console=console,
|
|
)
|
|
except (EOFError, KeyboardInterrupt):
|
|
policy = "cancel"
|
|
break
|
|
|
|
normalized_policy = self._normalize_duplicate_preflight_policy(raw_policy)
|
|
if normalized_policy is not None:
|
|
policy = normalized_policy
|
|
break
|
|
|
|
try:
|
|
console.print("Please select one of: I, S, C, ignore, skip, cancel")
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# Safe default in non-interactive runs: avoid redownloading known duplicates.
|
|
policy = "skip"
|
|
|
|
if policy == "cancel":
|
|
try:
|
|
pipeline_context.request_pipeline_stop(reason="duplicate-url cancelled", exit_code=0)
|
|
except Exception:
|
|
pass
|
|
return [], 0, 0
|
|
|
|
if policy == "ignore":
|
|
return urls, None, 0
|
|
|
|
filtered = [u for u in urls if u not in duplicates]
|
|
skipped = len(urls) - len(filtered)
|
|
if skipped:
|
|
try:
|
|
log(f"Skipped {skipped} duplicate URL(s); processing remaining {len(filtered)}.", file=sys.stderr)
|
|
except Exception:
|
|
pass
|
|
return filtered, None, skipped
|
|
|
|
@staticmethod
|
|
def _resolve_provider_preflight_items(
|
|
provider: Any,
|
|
*,
|
|
url: str,
|
|
parsed: Dict[str, Any],
|
|
args: Sequence[str],
|
|
) -> List[Dict[str, Any]]:
|
|
resolver = getattr(provider, "resolve_preflight_items", None)
|
|
if not callable(resolver):
|
|
return []
|
|
|
|
try:
|
|
items = resolver(url, parsed=parsed, args=list(args))
|
|
except TypeError:
|
|
try:
|
|
items = resolver(url)
|
|
except Exception:
|
|
items = None
|
|
except Exception:
|
|
items = None
|
|
|
|
if not isinstance(items, list):
|
|
return []
|
|
|
|
normalized: List[Dict[str, Any]] = []
|
|
for idx, item in enumerate(items, 1):
|
|
if not isinstance(item, dict):
|
|
continue
|
|
item_url = str(item.get("url") or "").strip()
|
|
if not item_url:
|
|
continue
|
|
playlist_index = item.get("playlist_index")
|
|
try:
|
|
playlist_index_value = int(playlist_index)
|
|
except Exception:
|
|
playlist_index_value = idx
|
|
normalized.append(
|
|
{
|
|
"url": item_url,
|
|
"playlist_index": playlist_index_value,
|
|
}
|
|
)
|
|
|
|
return normalized
|
|
|
|
@staticmethod
|
|
def _build_provider_playlist_item_selector(
|
|
items: Sequence[Dict[str, Any]],
|
|
*,
|
|
remaining_urls: Sequence[str],
|
|
) -> Optional[str]:
|
|
allowed_urls = {
|
|
str(url or "").strip() for url in (remaining_urls or []) if str(url or "").strip()
|
|
}
|
|
if not allowed_urls:
|
|
return None
|
|
|
|
selectors: List[str] = []
|
|
for idx, item in enumerate(items, 1):
|
|
item_url = str(item.get("url") or "").strip()
|
|
if not item_url or item_url not in allowed_urls:
|
|
continue
|
|
playlist_index = item.get("playlist_index")
|
|
try:
|
|
playlist_index_value = int(playlist_index)
|
|
except Exception:
|
|
playlist_index_value = idx
|
|
if playlist_index_value <= 0:
|
|
continue
|
|
selectors.append(str(playlist_index_value))
|
|
|
|
if not selectors:
|
|
return None
|
|
return ",".join(selectors)
|
|
|
|
@staticmethod
|
|
def _format_timecode(seconds: int, *, force_hours: bool) -> str:
|
|
total = max(0, int(seconds))
|
|
minutes, secs = divmod(total, 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
if force_hours:
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
|
return f"{minutes:02d}:{secs:02d}"
|
|
|
|
@staticmethod
|
|
def _rebase_subtitle_timestamp_text(text: str, offset_seconds: int) -> str:
|
|
if not text:
|
|
return text
|
|
|
|
try:
|
|
offset_value = float(offset_seconds)
|
|
except Exception:
|
|
return text
|
|
|
|
if offset_value <= 0:
|
|
return text
|
|
|
|
timestamp_re = re.compile(r"(?<!\d)(?P<ts>(?:\d{2}:)?\d{2}:\d{2}(?:[\.,]\d{1,3})?)(?!\d)")
|
|
|
|
def _shift(match: re.Match[str]) -> str:
|
|
original = str(match.group("ts") or "")
|
|
if not original:
|
|
return original
|
|
|
|
frac_sep = "."
|
|
frac_digits = 0
|
|
base = original
|
|
frac_seconds = 0.0
|
|
if "." in original:
|
|
base, frac = original.split(".", 1)
|
|
frac_sep = "."
|
|
frac_digits = len(frac)
|
|
try:
|
|
frac_seconds = float(f"0.{frac}") if frac else 0.0
|
|
except Exception:
|
|
frac_seconds = 0.0
|
|
elif "," in original:
|
|
base, frac = original.split(",", 1)
|
|
frac_sep = ","
|
|
frac_digits = len(frac)
|
|
try:
|
|
frac_seconds = float(f"0.{frac}") if frac else 0.0
|
|
except Exception:
|
|
frac_seconds = 0.0
|
|
|
|
parts = base.split(":")
|
|
if len(parts) == 3:
|
|
hours_s, minutes_s, seconds_s = parts
|
|
include_hours = True
|
|
elif len(parts) == 2:
|
|
hours_s = "0"
|
|
minutes_s, seconds_s = parts
|
|
include_hours = False
|
|
else:
|
|
return original
|
|
|
|
try:
|
|
total = (
|
|
(int(hours_s) * 3600)
|
|
+ (int(minutes_s) * 60)
|
|
+ int(seconds_s)
|
|
+ frac_seconds
|
|
+ offset_value
|
|
)
|
|
except Exception:
|
|
return original
|
|
|
|
total = max(0.0, total)
|
|
whole_seconds = int(total)
|
|
fraction = total - whole_seconds
|
|
hours, remainder = divmod(whole_seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
|
|
if frac_digits > 0:
|
|
scale = 10 ** frac_digits
|
|
frac_value = int(round(fraction * scale))
|
|
if frac_value >= scale:
|
|
frac_value = 0
|
|
seconds += 1
|
|
if seconds >= 60:
|
|
seconds = 0
|
|
minutes += 1
|
|
if minutes >= 60:
|
|
minutes = 0
|
|
hours += 1
|
|
frac_text = f"{frac_value:0{frac_digits}d}"
|
|
if include_hours or hours > 0:
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}{frac_sep}{frac_text}"
|
|
return f"{minutes:02d}:{seconds:02d}{frac_sep}{frac_text}"
|
|
|
|
if include_hours or hours > 0:
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
|
return f"{minutes:02d}:{seconds:02d}"
|
|
|
|
try:
|
|
return timestamp_re.sub(_shift, str(text))
|
|
except Exception:
|
|
return text
|
|
|
|
@classmethod
|
|
def _format_clip_range(cls, start_s: int, end_s: int) -> str:
|
|
force_hours = bool(start_s >= 3600 or end_s >= 3600)
|
|
return f"{cls._format_timecode(start_s, force_hours=force_hours)}-{cls._format_timecode(end_s, force_hours=force_hours)}"
|
|
|
|
@classmethod
|
|
def _apply_clip_decorations(
|
|
cls, pipe_objects: List[Dict[str, Any]], clip_ranges: List[tuple[int, int]], *, source_king_hash: Optional[str]
|
|
) -> None:
|
|
if not pipe_objects or len(pipe_objects) != len(clip_ranges):
|
|
return
|
|
|
|
for po, (start_s, end_s) in zip(pipe_objects, clip_ranges):
|
|
clip_range = cls._format_clip_range(start_s, end_s)
|
|
clip_tag = f"clip:{clip_range}"
|
|
|
|
po["title"] = clip_tag
|
|
|
|
tags = po.get("tag")
|
|
if not isinstance(tags, list):
|
|
tags = []
|
|
|
|
tags = [t for t in tags if not str(t).strip().lower().startswith("title:")]
|
|
tags = [t for t in tags if not str(t).strip().lower().startswith("relationship:")]
|
|
tags.insert(0, f"title:{clip_tag}")
|
|
|
|
if clip_tag not in tags:
|
|
tags.append(clip_tag)
|
|
|
|
po["tag"] = tags
|
|
|
|
notes = po.get("notes")
|
|
if isinstance(notes, dict):
|
|
sub_text = notes.get("sub")
|
|
if isinstance(sub_text, str) and sub_text.strip():
|
|
notes["sub"] = cls._rebase_subtitle_timestamp_text(sub_text, start_s)
|
|
po["notes"] = notes
|
|
|
|
if len(pipe_objects) < 2:
|
|
return
|
|
|
|
hashes: List[str] = []
|
|
for po in pipe_objects:
|
|
h_val = sh.normalize_hash(str(po.get("hash") or ""))
|
|
hashes.append(h_val or "")
|
|
|
|
king_hash = sh.normalize_hash(source_king_hash) if source_king_hash else None
|
|
if not king_hash:
|
|
king_hash = hashes[0] if hashes and hashes[0] else None
|
|
if not king_hash:
|
|
return
|
|
|
|
alt_hashes: List[str] = [h for h in hashes if h and h != king_hash]
|
|
if not alt_hashes:
|
|
return
|
|
|
|
for po in pipe_objects:
|
|
po["relationships"] = {"king": [king_hash], "alt": list(alt_hashes)}
|
|
|
|
def _run_impl(
|
|
self,
|
|
result: Any,
|
|
args: Sequence[str],
|
|
config: Dict[str, Any]
|
|
) -> int:
|
|
"""Main download implementation for direct HTTP files."""
|
|
progress = PipelineProgress(pipeline_context)
|
|
prev_progress = None
|
|
had_progress_key = False
|
|
try:
|
|
# Allow providers to tap into the active PipelineProgress (optional).
|
|
try:
|
|
if isinstance(config, dict):
|
|
had_progress_key = "_pipeline_progress" in config
|
|
prev_progress = config.get("_pipeline_progress")
|
|
config["_pipeline_progress"] = progress
|
|
except Exception:
|
|
pass
|
|
|
|
# Parse arguments
|
|
parsed = parse_cmdlet_args(args, self)
|
|
registry = self._load_provider_registry()
|
|
selection_url_prefixes = self._selection_url_prefixes(registry)
|
|
explicit_input = parsed.get("url")
|
|
|
|
# Resolve URLs from -url or positional arguments
|
|
url_candidates = parsed.get("url") or [
|
|
a for a in parsed.get("args", [])
|
|
if isinstance(a, str) and (
|
|
a.startswith("http") or "://" in a or ":" in a
|
|
or "🧲" in a
|
|
and not a.startswith("-")
|
|
)
|
|
]
|
|
from SYS.metadata import normalize_urls as normalize_url_list # lazy: avoids Cryptodome at startup
|
|
raw_url = normalize_url_list(url_candidates)
|
|
local_source_inputs: List[str] = []
|
|
if not raw_url and isinstance(explicit_input, str) and self._path_looks_local(explicit_input):
|
|
local_source_inputs = [str(explicit_input)]
|
|
|
|
quiet_mode = bool(config.get("_quiet_background_output")) if isinstance(config, dict) else False
|
|
|
|
# Fallback to piped items if no explicit URLs provided
|
|
piped_items = []
|
|
if not raw_url:
|
|
piped_items = sh.normalize_result_items(
|
|
result,
|
|
include_falsey_single=True,
|
|
)
|
|
|
|
# Handle TABLE_AUTO_STAGES routing: if a piped item has _selection_args,
|
|
# re-invoke download-file with those args instead of processing the PipeObject itself.
|
|
if piped_items and not raw_url:
|
|
selection_runs: List[List[str]] = []
|
|
residual_items: List[Any] = []
|
|
|
|
for item in piped_items:
|
|
handled = False
|
|
try:
|
|
normalized_args, _normalized_action, item_url = extract_selection_fields(
|
|
item,
|
|
extra_url_prefixes=selection_url_prefixes,
|
|
)
|
|
if normalized_args:
|
|
if selection_args_have_url(
|
|
normalized_args,
|
|
extra_url_prefixes=selection_url_prefixes,
|
|
):
|
|
selection_runs.append(list(normalized_args))
|
|
handled = True
|
|
elif item_url:
|
|
selection_runs.append([str(item_url)] + list(normalized_args))
|
|
handled = True
|
|
except Exception as e:
|
|
debug_panel(
|
|
"download-file selection args failed",
|
|
[("error", e)],
|
|
border_style="yellow",
|
|
)
|
|
handled = False
|
|
if not handled:
|
|
residual_items.append(item)
|
|
if selection_runs:
|
|
selection_urls: List[str] = []
|
|
|
|
for run_args in selection_runs:
|
|
for u in extract_urls_from_selection_args(
|
|
run_args,
|
|
extra_url_prefixes=selection_url_prefixes,
|
|
):
|
|
if u not in selection_urls:
|
|
selection_urls.append(u)
|
|
|
|
original_skip_preflight = None
|
|
original_timeout = None
|
|
original_skip_direct = None
|
|
original_batch_total = None
|
|
original_batch_index = None
|
|
original_batch_label = None
|
|
original_suppress_nested = None
|
|
try:
|
|
if isinstance(config, dict):
|
|
original_skip_preflight = config.get("_skip_url_preflight")
|
|
original_timeout = config.get("_pipeobject_timeout_seconds")
|
|
original_skip_direct = config.get("_skip_direct_on_streaming_failure")
|
|
original_batch_total = config.get("_download_file_batch_total")
|
|
original_batch_index = config.get("_download_file_batch_index")
|
|
original_batch_label = config.get("_download_file_batch_label")
|
|
original_suppress_nested = config.get("_download_file_suppress_nested_pipe_progress")
|
|
except Exception:
|
|
original_skip_preflight = None
|
|
original_timeout = None
|
|
original_batch_total = None
|
|
original_batch_index = None
|
|
original_batch_label = None
|
|
original_suppress_nested = None
|
|
|
|
try:
|
|
if selection_urls:
|
|
# Skip Duplicate Preflight on selection re-entry:
|
|
# User has already seen the table/status and explicitly selected an item.
|
|
# Skipping this reduces DB load and latency.
|
|
if isinstance(config, dict):
|
|
config["_skip_url_preflight"] = True
|
|
config["_skip_direct_on_streaming_failure"] = True
|
|
|
|
if isinstance(config, dict) and config.get("_pipeobject_timeout_seconds") is None:
|
|
# Use a generous default for individual items
|
|
config["_pipeobject_timeout_seconds"] = 600
|
|
|
|
successes = 0
|
|
failures = 0
|
|
last_code = 0
|
|
total_selection = len(selection_runs)
|
|
preview_items = list(selection_urls[:5]) or [
|
|
self._selection_run_label(
|
|
run_args,
|
|
extra_url_prefixes=selection_url_prefixes,
|
|
)
|
|
for run_args in selection_runs[:5]
|
|
]
|
|
try:
|
|
progress.ensure_local_ui(
|
|
label="download-file",
|
|
total_items=total_selection,
|
|
items_preview=preview_items,
|
|
)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
progress.begin_pipe(
|
|
total_items=total_selection,
|
|
items_preview=preview_items,
|
|
)
|
|
except Exception:
|
|
pass
|
|
for idx, run_args in enumerate(selection_runs, 1):
|
|
run_label = self._selection_run_label(
|
|
run_args,
|
|
extra_url_prefixes=selection_url_prefixes,
|
|
)
|
|
try:
|
|
progress.set_status(
|
|
f"downloading {idx}/{total_selection}: {run_label}"
|
|
)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
if isinstance(config, dict):
|
|
config["_download_file_batch_total"] = total_selection
|
|
config["_download_file_batch_index"] = idx
|
|
config["_download_file_batch_label"] = run_label
|
|
config["_download_file_suppress_nested_pipe_progress"] = True
|
|
except Exception:
|
|
pass
|
|
exit_code = self._run_impl(None, run_args, config)
|
|
if exit_code == 0:
|
|
successes += 1
|
|
else:
|
|
failures += 1
|
|
last_code = exit_code
|
|
|
|
piped_items = residual_items
|
|
if not piped_items:
|
|
if successes > 0:
|
|
return 0
|
|
return last_code or 1
|
|
finally:
|
|
try:
|
|
if isinstance(config, dict):
|
|
if original_skip_preflight is None:
|
|
config.pop("_skip_url_preflight", None)
|
|
else:
|
|
config["_skip_url_preflight"] = original_skip_preflight
|
|
if original_timeout is None:
|
|
config.pop("_pipeobject_timeout_seconds", None)
|
|
else:
|
|
config["_pipeobject_timeout_seconds"] = original_timeout
|
|
if original_skip_direct is None:
|
|
config.pop("_skip_direct_on_streaming_failure", None)
|
|
else:
|
|
config["_skip_direct_on_streaming_failure"] = original_skip_direct
|
|
if original_batch_total is None:
|
|
config.pop("_download_file_batch_total", None)
|
|
else:
|
|
config["_download_file_batch_total"] = original_batch_total
|
|
if original_batch_index is None:
|
|
config.pop("_download_file_batch_index", None)
|
|
else:
|
|
config["_download_file_batch_index"] = original_batch_index
|
|
if original_batch_label is None:
|
|
config.pop("_download_file_batch_label", None)
|
|
else:
|
|
config["_download_file_batch_label"] = original_batch_label
|
|
if original_suppress_nested is None:
|
|
config.pop("_download_file_suppress_nested_pipe_progress", None)
|
|
else:
|
|
config["_download_file_suppress_nested_pipe_progress"] = original_suppress_nested
|
|
except Exception:
|
|
pass
|
|
|
|
had_piped_input = False
|
|
try:
|
|
if isinstance(result, list):
|
|
had_piped_input = bool(result)
|
|
else:
|
|
had_piped_input = bool(result)
|
|
except Exception:
|
|
had_piped_input = False
|
|
|
|
# UX: In piped mode, allow a single positional arg to be the destination directory.
|
|
# Example: @1-4 | download-file "C:\\Users\\Me\\Downloads\\yoyo"
|
|
if (had_piped_input and raw_url and len(raw_url) == 1
|
|
and (not parsed.get("path"))):
|
|
candidate = str(raw_url[0] or "").strip()
|
|
low = candidate.lower()
|
|
looks_like_url = low.startswith(
|
|
("http://", "https://", "ftp://", "magnet:", "torrent:")
|
|
+ tuple(selection_url_prefixes)
|
|
)
|
|
looks_like_provider = (
|
|
":" in candidate and not candidate.startswith(
|
|
("http:", "https:", "ftp:", "ftps:", "file:")
|
|
+ tuple(selection_url_prefixes)
|
|
)
|
|
)
|
|
looks_like_windows_path = (
|
|
(len(candidate) >= 2 and candidate[1] == ":")
|
|
or candidate.startswith("\\\\") or candidate.startswith("\\")
|
|
or candidate.endswith(("\\",
|
|
"/"))
|
|
)
|
|
if (not looks_like_url) and (
|
|
not looks_like_provider) and looks_like_windows_path:
|
|
parsed["path"] = candidate
|
|
raw_url = []
|
|
piped_items = self._collect_piped_items_if_no_urls(result, raw_url)
|
|
|
|
if not raw_url and not piped_items and not local_source_inputs:
|
|
log("No url or piped items to download", file=sys.stderr)
|
|
return 1
|
|
|
|
# Provider-pre-check (e.g. Internet Archive format picker)
|
|
picker_result = self._maybe_show_provider_picker(
|
|
raw_urls=raw_url,
|
|
piped_items=piped_items,
|
|
parsed=parsed,
|
|
config=config,
|
|
registry=registry,
|
|
)
|
|
if picker_result is not None:
|
|
return int(picker_result)
|
|
|
|
# Re-check picker if partial processing occurred
|
|
picker_result = self._maybe_show_provider_picker(
|
|
raw_urls=raw_url,
|
|
piped_items=piped_items,
|
|
parsed=parsed,
|
|
config=config,
|
|
registry=registry,
|
|
)
|
|
if picker_result is not None:
|
|
return int(picker_result)
|
|
|
|
# Get output directory
|
|
final_output_dir = resolve_target_dir(parsed, config)
|
|
if not final_output_dir:
|
|
return 1
|
|
|
|
try:
|
|
debug_panel(
|
|
"download-file plan",
|
|
[
|
|
("output_dir", final_output_dir),
|
|
("remaining_urls", len(raw_url)),
|
|
("piped_items", len(piped_items) if isinstance(piped_items, list) else int(bool(piped_items))),
|
|
],
|
|
border_style="cyan",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# If the caller isn't running the shared pipeline Live progress UI (e.g. direct
|
|
# cmdlet execution), start a minimal local pipeline progress panel so downloads
|
|
# show consistent, Rich-formatted progress (like download-media).
|
|
total_items = max(1, len(raw_url or []) + len(piped_items or []))
|
|
preview = build_pipeline_preview(raw_url, piped_items)
|
|
|
|
progress.ensure_local_ui(
|
|
label="download-file",
|
|
total_items=total_items,
|
|
items_preview=preview
|
|
)
|
|
|
|
raw_url, preflight_exit, skipped_dupe_count = self._preflight_explicit_url_duplicates(
|
|
raw_urls=raw_url,
|
|
config=config,
|
|
)
|
|
if preflight_exit is not None:
|
|
return int(preflight_exit)
|
|
|
|
downloaded_count = 0
|
|
|
|
if local_source_inputs:
|
|
downloaded_count += self._process_explicit_local_sources(
|
|
local_sources=local_source_inputs,
|
|
final_output_dir=final_output_dir,
|
|
parsed=parsed,
|
|
progress=progress,
|
|
config=config,
|
|
)
|
|
|
|
storage_downloaded, piped_items, storage_exit = self._process_storage_items(
|
|
piped_items=piped_items,
|
|
parsed=parsed,
|
|
config=config,
|
|
final_output_dir=final_output_dir,
|
|
)
|
|
downloaded_count += int(storage_downloaded)
|
|
if storage_exit is not None:
|
|
return int(storage_exit)
|
|
|
|
if skipped_dupe_count and not raw_url and not piped_items:
|
|
return 0 if downloaded_count > 0 else 0
|
|
|
|
urls_downloaded, early_exit = self._process_explicit_urls(
|
|
raw_urls=raw_url,
|
|
final_output_dir=final_output_dir,
|
|
config=config,
|
|
quiet_mode=quiet_mode,
|
|
registry=registry,
|
|
progress=progress,
|
|
parsed=parsed,
|
|
args=args,
|
|
context_items=(result if isinstance(result, list) else ([result] if result else [])),
|
|
)
|
|
downloaded_count += int(urls_downloaded)
|
|
if early_exit is not None:
|
|
return int(early_exit)
|
|
|
|
provider_downloaded, magnet_submissions = self._process_provider_items(
|
|
piped_items=piped_items,
|
|
final_output_dir=final_output_dir,
|
|
config=config,
|
|
quiet_mode=quiet_mode,
|
|
registry=registry,
|
|
progress=progress,
|
|
)
|
|
downloaded_count += provider_downloaded
|
|
|
|
if downloaded_count > 0 or magnet_submissions > 0:
|
|
# Render detail panels for downloaded items when download-file is the last stage.
|
|
self._maybe_render_download_details(config=config)
|
|
return 0
|
|
|
|
return 1
|
|
|
|
except Exception as e:
|
|
log(f"Error in download-file: {e}", file=sys.stderr)
|
|
return 1
|
|
|
|
finally:
|
|
try:
|
|
if isinstance(config, dict):
|
|
if had_progress_key:
|
|
config["_pipeline_progress"] = prev_progress
|
|
else:
|
|
config.pop("_pipeline_progress", None)
|
|
except Exception:
|
|
pass
|
|
progress.close_local_ui(force_complete=True)
|
|
|
|
def _maybe_show_provider_picker(
|
|
self,
|
|
*,
|
|
raw_urls: Sequence[str],
|
|
piped_items: Sequence[Any],
|
|
parsed: Dict[str, Any],
|
|
config: Dict[str, Any],
|
|
registry: Dict[str, Any],
|
|
) -> Optional[int]:
|
|
"""Generic hook for providers to show a selection table (e.g. Internet Archive format picker)."""
|
|
total_inputs = len(raw_urls or []) + len(piped_items or [])
|
|
if total_inputs != 1:
|
|
return None
|
|
|
|
target_url = None
|
|
if raw_urls:
|
|
target_url = str(raw_urls[0])
|
|
elif piped_items:
|
|
target_url = str(get_field(piped_items[0], "path") or get_field(piped_items[0], "url") or "")
|
|
|
|
if not target_url:
|
|
return None
|
|
|
|
match_provider_name_for_url = registry.get("match_plugin_name_for_url")
|
|
get_provider = registry.get("get_plugin")
|
|
|
|
provider_name = None
|
|
if match_provider_name_for_url:
|
|
try:
|
|
provider_name = match_provider_name_for_url(target_url)
|
|
except Exception:
|
|
pass
|
|
|
|
if provider_name and get_provider:
|
|
provider = get_provider(provider_name, config)
|
|
if provider and hasattr(provider, "maybe_show_picker"):
|
|
try:
|
|
quiet_mode = bool(config.get("_quiet_background_output"))
|
|
res = provider.maybe_show_picker(
|
|
url=target_url,
|
|
item=piped_items[0] if piped_items else None,
|
|
parsed=parsed,
|
|
config=config,
|
|
quiet_mode=quiet_mode,
|
|
)
|
|
if res is not None:
|
|
return int(res)
|
|
except Exception as e:
|
|
debug_panel(
|
|
"download-file picker error",
|
|
[
|
|
("plugin", provider_name),
|
|
("url", target_url),
|
|
("error", e),
|
|
],
|
|
border_style="yellow",
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
# Module-level singleton registration
|
|
CMDLET = Download_File()
|