"""Generic file/stream downloader. Supports: - Direct HTTP file URLs (PDFs, images, documents; non-yt-dlp) - Piped provider items (uses provider.download when available) - Streaming sites via yt-dlp (YouTube, Bandcamp, etc.) """ from __future__ import annotations import sys import re from pathlib import Path from typing import Any, Dict, List, Optional, Sequence from urllib.parse import urlparse from contextlib import AbstractContextManager, nullcontext from API.HTTP import _download_direct_file from SYS.models import DownloadError, DownloadOptions, DownloadMediaResult from SYS.logger import log, debug from SYS.pipeline_progress import PipelineProgress from SYS.result_table import Table from SYS.rich_display import stderr_console as get_stderr_console from SYS import pipeline as pipeline_context from SYS.metadata import normalize_urls as normalize_url_list from tool.ytdlp import ( YtDlpTool, _best_subtitle_sidecar, _SUBTITLE_EXTS, _download_with_timeout, _format_chapters_note, _read_text_file, is_url_supported_by_ytdlp, is_browseable_format, format_for_table_selection, list_formats, probe_url, ) from . import _shared as sh Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg SharedArgs = sh.SharedArgs QueryArg = sh.QueryArg parse_cmdlet_args = sh.parse_cmdlet_args register_url_with_local_library = sh.register_url_with_local_library coerce_to_pipe_object = sh.coerce_to_pipe_object get_field = sh.get_field resolve_target_dir = sh.resolve_target_dir coerce_to_path = sh.coerce_to_path build_pipeline_preview = sh.build_pipeline_preview class Download_File(Cmdlet): """Class-based download-file cmdlet - direct HTTP downloads.""" def __init__(self) -> None: """Initialize download-file cmdlet.""" super().__init__( name="download-file", summary="Download files or streaming media", usage= "download-file [-path DIR] [options] OR @N | download-file [-path DIR|DIR] [options]", alias=["dl-file", "download-http"], arg=[ SharedArgs.URL, SharedArgs.PROVIDER, SharedArgs.PATH, SharedArgs.QUERY, QueryArg( "clip", key="clip", aliases=["range", "section", "sections"], type="string", required=False, description=( "Clip time ranges via -query keyed fields (e.g. clip:1m-2m or clip:00:01-00:10). " "Comma-separated values supported." ), query_only=True, ), CmdletArg( name="item", type="string", description="Item selection for playlists/formats", ), ], detail=[ "Download files directly via HTTP or streaming media via yt-dlp.", "For Internet Archive item pages (archive.org/details/...), shows a selectable file/format list; pick with @N to download.", ], exec=self.run, ) self.register() def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Main execution method.""" debug(f"[download-file] run invoked with args: {list(args)}") return self._run_impl(result, args, config) def _process_explicit_urls( self, *, raw_urls: Sequence[str], final_output_dir: Path, config: Dict[str, Any], quiet_mode: bool, registry: Dict[str, Any], progress: PipelineProgress, context_items: Sequence[Any] = (), ) -> tuple[int, Optional[int]]: downloaded_count = 0 SearchResult = registry.get("SearchResult") get_provider = registry.get("get_provider") match_provider_name_for_url = registry.get("match_provider_name_for_url") for url in raw_urls: try: debug(f"Processing URL: {url}") # Check providers first provider_name = None if match_provider_name_for_url: try: provider_name = match_provider_name_for_url(str(url)) except Exception: pass provider = None if provider_name and get_provider: provider = get_provider(provider_name, config) if provider: debug(f"Provider {provider_name} claimed {url}") try: # Try generic handle_url handled = False if hasattr(provider, "handle_url"): try: handled, path = provider.handle_url(str(url), output_dir=final_output_dir) if handled: extra_meta = None title_hint = None tags_hint: Optional[List[str]] = None media_kind_hint = None path_value: Optional[Any] = path if isinstance(path, dict): provider_action = str( path.get("action") or path.get("provider_action") or "" ).strip().lower() if provider_action == "download_items" or bool(path.get("download_items")): request_metadata = path.get("metadata") or path.get("full_metadata") or {} if not isinstance(request_metadata, dict): request_metadata = {} magnet_id = path.get("magnet_id") or request_metadata.get("magnet_id") if magnet_id is not None: request_metadata.setdefault("magnet_id", magnet_id) if SearchResult is None: debug("Provider download_items requested but SearchResult unavailable") continue sr = SearchResult( table=str(provider_name), title=str(path.get("title") or path.get("name") or f"{provider_name} item"), path=str(path.get("path") or path.get("url") or url), full_metadata=request_metadata, ) downloaded_extra = self._download_provider_items( provider=provider, provider_name=str(provider_name), search_result=sr, output_dir=final_output_dir, progress=progress, quiet_mode=quiet_mode, config=config, ) if downloaded_extra: downloaded_count += int(downloaded_extra) continue path_value = path.get("path") or path.get("file_path") extra_meta = path.get("metadata") or path.get("full_metadata") title_hint = path.get("title") or path.get("name") media_kind_hint = path.get("media_kind") tags_val = path.get("tags") or path.get("tag") if isinstance(tags_val, (list, tuple, set)): tags_hint = [str(t) for t in tags_val if t] elif isinstance(tags_val, str) and tags_val.strip(): tags_hint = [str(tags_val).strip()] if path_value: p_val = Path(str(path_value)) if not title_hint and isinstance(extra_meta, dict): title_hint = extra_meta.get("title") or extra_meta.get("name") self._emit_local_file( downloaded_path=p_val, source=str(url), title_hint=str(title_hint) if title_hint else p_val.stem, tags_hint=tags_hint, media_kind_hint=str(media_kind_hint) if media_kind_hint else "file", full_metadata=extra_meta, progress=progress, config=config, provider_hint=provider_name ) downloaded_count += 1 else: debug(f"Provider {provider_name} handled URL without file output") continue except Exception as e: debug(f"Provider {provider_name} handle_url error: {e}") # Try generic download_url if not already handled if not handled and hasattr(provider, "download_url"): res = provider.download_url(str(url), final_output_dir) if res: # Standardize result: can be Path, tuple(Path, Info), or dict with "path" p_val = None extra_meta = None if isinstance(res, (str, Path)): p_val = Path(res) elif isinstance(res, tuple) and len(res) > 0: p_val = Path(res[0]) if len(res) > 1 and isinstance(res[1], dict): extra_meta = res[1] elif isinstance(res, dict): path_candidate = res.get("path") or res.get("file_path") if path_candidate: p_val = Path(path_candidate) extra_meta = res if p_val: self._emit_local_file( downloaded_path=p_val, source=str(url), title_hint=p_val.stem, tags_hint=None, media_kind_hint=extra_meta.get("media_kind") if extra_meta else "file", full_metadata=extra_meta, provider_hint=provider_name, progress=progress, config=config, ) downloaded_count += 1 continue except Exception as e: log(f"Provider {provider_name} error handling {url}: {e}", file=sys.stderr) pass if not handled: debug(f"Provider {provider_name} matched URL but failed to download. Skipping direct fallback to avoid landing pages.") continue # Direct Download Fallback result_obj = _download_direct_file( str(url), final_output_dir, quiet=quiet_mode, pipeline_progress=progress, ) downloaded_path = self._path_from_download_result(result_obj) self._emit_local_file( downloaded_path=downloaded_path, source=str(url), title_hint=downloaded_path.stem, tags_hint=[f"title:{downloaded_path.stem}"], media_kind_hint="file", full_metadata=None, progress=progress, config=config, ) downloaded_count += 1 debug("✓ Downloaded and emitted") except DownloadError as e: log(f"Download failed for {url}: {e}", file=sys.stderr) except Exception as e: log(f"Error processing {url}: {e}", file=sys.stderr) return downloaded_count, None def _normalize_provider_key(self, value: Optional[Any]) -> Optional[str]: if value is None: return None try: normalized = str(value).strip() except Exception: return None if not normalized: return None if "." in normalized: normalized = normalized.split(".", 1)[0] return normalized.lower() def _provider_key_from_item(self, item: Any) -> Optional[str]: table_hint = get_field(item, "table") key = self._normalize_provider_key(table_hint) if key: return key provider_hint = get_field(item, "provider") key = self._normalize_provider_key(provider_hint) if key: return key return self._normalize_provider_key(get_field(item, "source")) def _expand_provider_items( self, *, piped_items: Sequence[Any], registry: Dict[str, Any], config: Dict[str, Any], ) -> List[Any]: get_search_provider = registry.get("get_search_provider") expanded_items: List[Any] = [] for item in piped_items: try: provider_key = self._provider_key_from_item(item) provider = get_search_provider(provider_key, config) if provider_key and get_search_provider else None # Generic hook: If provider has expand_item(item), use it. if provider and hasattr(provider, "expand_item") and callable(provider.expand_item): try: sub_items = provider.expand_item(item) if sub_items: expanded_items.extend(sub_items) continue except Exception as e: debug(f"Provider {provider_key} expand_item failed: {e}") expanded_items.append(item) except Exception: expanded_items.append(item) return expanded_items def _process_provider_items(self, *, piped_items: Sequence[Any], final_output_dir: Path, config: Dict[str, Any], quiet_mode: bool, registry: Dict[str, Any], progress: PipelineProgress, ) -> tuple[int, int]: downloaded_count = 0 queued_magnet_submissions = 0 get_search_provider = registry.get("get_search_provider") SearchResult = registry.get("SearchResult") expanded_items = self._expand_provider_items( piped_items=piped_items, registry=registry, config=config ) total_items = len(expanded_items) processed_items = 0 debug(f"[download-file] Processing {total_items} piped item(s)...") try: if total_items: progress.set_percent(0) except Exception: pass for idx, item in enumerate(expanded_items, 1): try: label = "item" table = get_field(item, "table") title = get_field(item, "title") target = get_field(item, "path") or get_field(item, "url") debug(f"[download-file] Item {idx}/{total_items}: {title or target or 'unnamed'}") media_kind = get_field(item, "media_kind") tags_val = get_field(item, "tag") tags_list: Optional[List[str]] if isinstance(tags_val, (list, set)): tags_list = sorted([str(t) for t in tags_val if t]) else: tags_list = None full_metadata = get_field(item, "full_metadata") if ((not full_metadata) and isinstance(item, dict) and isinstance(item.get("extra"), dict)): extra_md = item["extra"].get("full_metadata") if isinstance(extra_md, dict): full_metadata = extra_md try: label = title or target label = str(label or "item").strip() if total_items: pct = int(round((processed_items / max(1, total_items)) * 100)) progress.set_percent(pct) progress.set_status( f"downloading {processed_items + 1}/{total_items}: {label}" ) except Exception: pass transfer_label = label # If this looks like a provider item and providers are available, prefer provider.download() downloaded_path: Optional[Path] = None attempted_provider_download = False provider_sr = None provider_obj = None provider_key = self._provider_key_from_item(item) if provider_key and get_search_provider and SearchResult: # Reuse helper to derive the provider key from table/provider/source hints. provider_obj = get_search_provider(provider_key, config) if provider_obj is not None and getattr(provider_obj, "prefers_transfer_progress", False): try: progress.begin_transfer(label=transfer_label, total=None) except Exception: pass if provider_obj is not None: attempted_provider_download = True sr = SearchResult( table=str(table), title=str(title or "Unknown"), path=str(target or ""), tag=set(tags_list) if tags_list else set(), media_kind=str(media_kind or "file"), full_metadata=full_metadata if isinstance(full_metadata, dict) else {}, ) debug( f"[download-file] Downloading provider item via {table}: {sr.title}" ) # Preserve provider structure when possible (AllDebrid folders -> subfolders). output_dir = final_output_dir # Generic: allow provider to strict output_dir? # Using default output_dir for now. downloaded_path = provider_obj.download(sr, output_dir) provider_sr = sr debug(f"[download-file] Provider download result: {downloaded_path}") if downloaded_path is None: try: downloaded_extra = self._download_provider_items( provider=provider_obj, provider_name=str(provider_key), search_result=sr, output_dir=output_dir, progress=progress, quiet_mode=quiet_mode, config=config, ) except Exception: downloaded_extra = 0 if downloaded_extra: downloaded_count += int(downloaded_extra) continue # Fallback: if we have a direct HTTP URL and no provider successfully handled it if (downloaded_path is None and not attempted_provider_download and isinstance(target, str) and target.startswith("http")): debug( f"[download-file] Provider item looks like direct URL, downloading: {target}" ) suggested_name = str(title).strip() if title is not None else None result_obj = _download_direct_file( target, final_output_dir, quiet=quiet_mode, suggested_filename=suggested_name, pipeline_progress=progress, ) downloaded_path = coerce_to_path(result_obj) if downloaded_path is None: log( f"Cannot download item (no provider handler / unsupported target): {title or target}", file=sys.stderr, ) continue # Allow providers to add/enrich tags and metadata during download. if provider_sr is not None: try: sr_md = getattr(provider_sr, "full_metadata", None) if isinstance(sr_md, dict) and sr_md: debug(f"[download-file] Syncing full_metadata from provider_sr (keys={list(sr_md.keys())})") full_metadata = sr_md except Exception: pass try: if isinstance(full_metadata, dict): t = str(full_metadata.get("title") or "").strip() if t: title = t except Exception: pass # Prefer tags from the search result object if the provider mutated them during download. try: sr_tags = getattr(provider_sr, "tag", None) if isinstance(sr_tags, (set, list)) and sr_tags: debug(f"[download-file] Syncing tags_list from provider_sr (count={len(sr_tags)})") # Re-sync tags_list with the potentially enriched provider_sr.tag tags_list = sorted([str(t) for t in sr_tags if t]) except Exception: pass self._emit_local_file( downloaded_path=downloaded_path, source=str(target) if target else None, title_hint=str(title) if title else downloaded_path.stem, tags_hint=tags_list, media_kind_hint=str(media_kind) if media_kind else None, full_metadata=full_metadata if isinstance(full_metadata, dict) else None, progress=progress, config=config, provider_hint=provider_key ) downloaded_count += 1 except DownloadError as e: log(f"Download failed: {e}", file=sys.stderr) except Exception as e: log(f"Error downloading item: {e}", file=sys.stderr) finally: if provider_obj is not None and getattr(provider_obj, "prefers_transfer_progress", False): try: progress.finish_transfer(label=transfer_label) except Exception: pass processed_items += 1 try: pct = int(round((processed_items / max(1, total_items)) * 100)) progress.set_percent(pct) if processed_items >= total_items: progress.clear_status() except Exception: pass return downloaded_count, queued_magnet_submissions def _download_provider_items( self, *, provider: Any, provider_name: str, search_result: Any, output_dir: Path, progress: PipelineProgress, quiet_mode: bool, config: Dict[str, Any], ) -> int: if provider is None or not hasattr(provider, "download_items"): return 0 def _on_emit(path: Path, file_url: str, relpath: str, metadata: Dict[str, Any]) -> None: title_hint = None try: title_hint = metadata.get("name") or relpath except Exception: title_hint = relpath title_hint = title_hint or (Path(path).name if path else "download") self._emit_local_file( downloaded_path=path, source=file_url, title_hint=title_hint, tags_hint=None, media_kind_hint="file", full_metadata=metadata if isinstance(metadata, dict) else None, progress=progress, config=config, provider_hint=provider_name, ) try: downloaded_count = provider.download_items( search_result, output_dir, emit=_on_emit, progress=progress, quiet_mode=quiet_mode, path_from_result=coerce_to_path, config=config, ) except TypeError: downloaded_count = provider.download_items( search_result, output_dir, emit=_on_emit, progress=progress, quiet_mode=quiet_mode, path_from_result=coerce_to_path, ) except Exception as exc: log(f"Provider {provider_name} download_items error: {exc}", file=sys.stderr) return 0 try: return int(downloaded_count or 0) except Exception: return 0 def _emit_local_file( self, *, downloaded_path: Path, source: Optional[str], title_hint: Optional[str], tags_hint: Optional[List[str]], media_kind_hint: Optional[str], full_metadata: Optional[Dict[str, Any]], progress: PipelineProgress, config: Dict[str, Any], provider_hint: Optional[str] = None, ) -> None: title_val = (title_hint or downloaded_path.stem or "Unknown").strip() or downloaded_path.stem hash_value = sha256_file(downloaded_path) notes: Optional[Dict[str, str]] = None try: if isinstance(full_metadata, dict): subtitles = full_metadata.get("_tidal_lyrics_subtitles") if isinstance(subtitles, str) and subtitles.strip(): notes = {"lyric": subtitles} except Exception: notes = None tag: List[str] = [] if tags_hint: tag.extend([str(t) for t in tags_hint if t]) if not any(str(t).lower().startswith("title:") for t in tag): tag.insert(0, f"title:{title_val}") payload: Dict[str, Any] = { "path": str(downloaded_path), "hash": hash_value, "title": title_val, "action": "cmdlet:download-file", "download_mode": "file", "store": "local", "media_kind": media_kind_hint or "file", "tag": tag, } if provider_hint: payload["provider"] = str(provider_hint) if full_metadata: payload["metadata"] = full_metadata if notes: payload["notes"] = notes if source and str(source).startswith("http"): payload["url"] = source elif source: payload["source_url"] = source pipeline_context.emit(payload) def _maybe_render_download_details(self, *, config: Dict[str, Any]) -> None: try: stage_ctx = pipeline_context.get_stage_context() except Exception: stage_ctx = None is_last_stage = (stage_ctx is None) or bool(getattr(stage_ctx, "is_last_stage", False)) if not is_last_stage: return try: quiet_mode = bool(config.get("_quiet_background_output")) if isinstance(config, dict) else False except Exception: quiet_mode = False if quiet_mode: return emitted_items: List[Any] = [] try: emitted_items = list(getattr(stage_ctx, "emits", None) or []) if stage_ctx is not None else [] except Exception: emitted_items = [] if not emitted_items: return # Stop the live pipeline progress UI before rendering the details panel. try: live_progress = pipeline_context.get_live_progress() except Exception: live_progress = None if live_progress is not None: try: pipe_idx = getattr(stage_ctx, "pipe_index", None) if stage_ctx is not None else None if isinstance(pipe_idx, int): live_progress.finish_pipe(int(pipe_idx), force_complete=True) except Exception: pass try: live_progress.stop() except Exception: pass try: if hasattr(pipeline_context, "set_live_progress"): pipeline_context.set_live_progress(None) except Exception: pass try: subject = emitted_items[0] if len(emitted_items) == 1 else list(emitted_items) # Use helper to display items and make them @-selectable from ._shared import display_and_persist_items display_and_persist_items(list(emitted_items), title="Result", subject=subject) except Exception: pass # Prevent CLI from printing a redundant table after the detail panels. try: if stage_ctx is not None: stage_ctx.emits = [] except Exception: pass @staticmethod def _load_provider_registry() -> Dict[str, Any]: """Lightweight accessor for provider helpers without hard dependencies.""" try: from ProviderCore import registry as provider_registry # type: ignore from ProviderCore.base import SearchResult # type: ignore return { "get_provider": getattr(provider_registry, "get_provider", None), "get_search_provider": getattr(provider_registry, "get_search_provider", None), "match_provider_name_for_url": getattr(provider_registry, "match_provider_name_for_url", None), "SearchResult": SearchResult, } except Exception: return { "get_provider": None, "get_search_provider": None, "match_provider_name_for_url": None, "SearchResult": None, } # === Streaming helpers (yt-dlp) === @staticmethod def _filter_supported_urls(raw_urls: Sequence[str]) -> tuple[List[str], List[str]]: supported = [url for url in (raw_urls or []) if is_url_supported_by_ytdlp(url)] unsupported = list(set(raw_urls or []) - set(supported or [])) return supported, unsupported @staticmethod def _match_provider_urls( raw_urls: Sequence[str], registry: Dict[str, Any], ) -> Dict[str, str]: matches: Dict[str, str] = {} if not raw_urls: return matches match_provider_name_for_url = registry.get("match_provider_name_for_url") if not callable(match_provider_name_for_url): return matches for url in raw_urls: try: url_str = str(url or "").strip() except Exception: continue if not url_str: continue try: provider_name = match_provider_name_for_url(url_str) except Exception: provider_name = None if provider_name: matches[url_str] = str(provider_name).strip().lower() return matches def _parse_query_keyed_spec(self, query_spec: Optional[str]) -> Dict[str, List[str]]: if not query_spec: return {} try: keyed = self._parse_keyed_csv_spec(str(query_spec), default_key="hash") if not keyed: return {} def _alias(src: str, dest: str) -> None: try: values = keyed.get(src) except Exception: values = None if not values: return try: keyed.setdefault(dest, []).extend(list(values)) except Exception: pass try: keyed.pop(src, None) except Exception: pass for src in ("range", "ranges", "section", "sections"): _alias(src, "clip") for src in ("fmt", "f"): _alias(src, "format") for src in ("aud", "a"): _alias(src, "audio") return keyed except Exception: return {} @staticmethod def _extract_hash_override(query_spec: Optional[str], query_keyed: Dict[str, List[str]]) -> Optional[str]: try: hash_values = query_keyed.get("hash", []) if isinstance(query_keyed, dict) else [] hash_candidate = hash_values[-1] if hash_values else None if hash_candidate: return sh.parse_single_hash_query(f"hash:{hash_candidate}") try: has_non_hash_keys = bool( query_keyed and isinstance(query_keyed, dict) and any(k for k in query_keyed.keys() if str(k).strip().lower() != "hash") ) except Exception: has_non_hash_keys = False if has_non_hash_keys: return None return sh.parse_single_hash_query(str(query_spec)) if query_spec else None except Exception: return None def _parse_clip_ranges_and_apply_items( self, *, clip_spec: Optional[str], query_keyed: Dict[str, List[str]], parsed: Dict[str, Any], query_spec: Optional[str], ) -> tuple[Optional[List[tuple[int, int]]], bool, List[str]]: clip_ranges: Optional[List[tuple[int, int]]] = None clip_values: List[str] = [] item_values: List[str] = [] def _uniq(values: Sequence[str]) -> List[str]: seen: set[str] = set() out: List[str] = [] for v in values: key = str(v) if key in seen: continue seen.add(key) out.append(v) return out if clip_spec: keyed = self._parse_keyed_csv_spec(str(clip_spec), default_key="clip") clip_values.extend(keyed.get("clip", []) or []) item_values.extend(keyed.get("item", []) or []) if query_keyed: clip_values.extend(query_keyed.get("clip", []) or []) item_values.extend(query_keyed.get("item", []) or []) clip_values = _uniq(clip_values) item_values = _uniq(item_values) if item_values and not parsed.get("item"): parsed["item"] = ",".join([v for v in item_values if v]) if clip_values: clip_ranges = self._parse_time_ranges(",".join([v for v in clip_values if v])) if not clip_ranges: bad_spec = clip_spec or query_spec log(f"Invalid clip format: {bad_spec}", file=sys.stderr) return None, True, clip_values return clip_ranges, False, clip_values @staticmethod def _init_storage(config: Dict[str, Any]) -> tuple[Optional[Any], bool]: # Cache storage object in config to avoid excessive DB initialization in loops if isinstance(config, dict) and "_storage_cache" in config: cached = config["_storage_cache"] if isinstance(cached, tuple) and len(cached) == 2: return cached # type: ignore storage = None hydrus_available = True try: from Store import Store from API.HydrusNetwork import is_hydrus_available debug("[download-file] Initializing storage interface...") storage = Store(config=config or {}, suppress_debug=True) hydrus_available = bool(is_hydrus_available(config or {})) # If any Hydrus store backend was successfully initialized in the Store # registry, consider Hydrus available even if the global probe failed. try: from Store.HydrusNetwork import HydrusNetwork as _HydrusStoreClass for bn in storage.list_backends(): try: backend = storage[bn] if isinstance(backend, _HydrusStoreClass): hydrus_available = True break except Exception: continue except Exception: pass if isinstance(config, dict): config["_storage_cache"] = (storage, hydrus_available) except Exception as e: debug(f"[download-file] Storage initialization error: {e}") storage = None return storage, hydrus_available @staticmethod def _cookiefile_str(ytdlp_tool: YtDlpTool) -> Optional[str]: try: cookie_path = ytdlp_tool.resolve_cookiefile() if cookie_path is not None and cookie_path.is_file(): return str(cookie_path) except Exception: pass return None def _list_formats_cached( self, u: str, *, playlist_items_value: Optional[str], formats_cache: Dict[str, Optional[List[Dict[str, Any]]]], ytdlp_tool: YtDlpTool, ) -> Optional[List[Dict[str, Any]]]: key = f"{u}||{playlist_items_value or ''}" if key in formats_cache: return formats_cache[key] fmts = list_formats( u, no_playlist=False, playlist_items=playlist_items_value, cookiefile=self._cookiefile_str(ytdlp_tool), ) formats_cache[key] = fmts return fmts def _is_browseable_format(self, fmt: Any) -> bool: """Check if format is user-browseable. Delegates to ytdlp helper.""" return is_browseable_format(fmt) def _format_id_for_query_index( self, query_format: str, url: str, formats_cache: Dict[str, Optional[List[Dict[str, Any]]]], ytdlp_tool: YtDlpTool, ) -> Optional[str]: import re if not query_format or not re.match(r"^\s*#?\d+\s*$", str(query_format)): return None try: s_val = str(query_format).strip() idx = int(s_val.lstrip("#")) except Exception: raise ValueError(f"Invalid format index: {query_format}") fmts = self._list_formats_cached( url, playlist_items_value=None, formats_cache=formats_cache, ytdlp_tool=ytdlp_tool, ) if not fmts: raise ValueError("Unable to list formats for the URL; cannot resolve numeric format index") # Prioritize exact format_id match if it's a numeric string that happens to be an ID # (e.g. YouTube's 251 for opus). if s_val and not s_val.startswith("#"): if any(str(f.get("format_id", "")) == s_val for f in fmts): return s_val candidate_formats = [f for f in fmts if self._is_browseable_format(f)] filtered_formats = candidate_formats if candidate_formats else list(fmts) if not filtered_formats: raise ValueError("No formats available for selection") if idx <= 0 or idx > len(filtered_formats): raise ValueError(f"Format index {idx} out of range (1..{len(filtered_formats)})") chosen = filtered_formats[idx - 1] selection_format_id = str(chosen.get("format_id") or "").strip() if not selection_format_id: raise ValueError("Selected format has no format_id") try: vcodec = str(chosen.get("vcodec", "none")) acodec = str(chosen.get("acodec", "none")) if vcodec != "none" and acodec == "none": selection_format_id = f"{selection_format_id}+ba" except Exception: pass return selection_format_id @staticmethod def _canonicalize_url_for_storage(*, requested_url: str, ytdlp_tool: YtDlpTool, playlist_items: Optional[str]) -> str: if playlist_items: debug(f"[download-file] Skipping canonicalization for playlist item(s): {playlist_items}") return str(requested_url) try: cf = None try: cookie_path = ytdlp_tool.resolve_cookiefile() if cookie_path is not None and cookie_path.is_file(): cf = str(cookie_path) except Exception: cf = None debug(f"[download-file] Canonicalizing URL: {requested_url}") pr = probe_url(requested_url, no_playlist=False, timeout_seconds=15, cookiefile=cf) if isinstance(pr, dict): for key in ("webpage_url", "original_url", "url", "requested_url"): value = pr.get(key) if isinstance(value, str) and value.strip(): canon = value.strip() if canon != requested_url: debug(f"[download-file] Resolved canonical URL: {requested_url} -> {canon}") return canon except Exception as e: debug(f"[download-file] Canonicalization error for {requested_url}: {e}") return str(requested_url) def _preflight_url_duplicate( self, *, storage: Any, hydrus_available: bool, final_output_dir: Path, candidate_url: Optional[str] = None, extra_urls: Optional[List[str]] = None, **kwargs: Any, ) -> bool: to_check = [] if candidate_url: to_check.append(str(candidate_url)) if extra_urls: to_check.extend([str(u) for u in extra_urls if u]) # De-duplicate needles to avoid redundant DB searches. seen = set() unique_to_check = [] for u in to_check: if u not in seen: unique_to_check.append(u) seen.add(u) return sh.check_url_exists_in_storage( urls=unique_to_check, storage=storage, hydrus_available=hydrus_available, final_output_dir=final_output_dir ) def _preflight_url_duplicates_bulk( self, *, urls: List[str], storage: Any, hydrus_available: bool, final_output_dir: Path, **kwargs: Any, ) -> bool: if not urls: return True unique_urls = [] seen = set() for u in urls: if u and u not in seen: unique_urls.append(u) seen.add(u) return sh.check_url_exists_in_storage( urls=unique_urls, storage=storage, hydrus_available=hydrus_available, final_output_dir=final_output_dir ) def _maybe_show_playlist_table(self, *, url: str, ytdlp_tool: YtDlpTool) -> bool: ctx = pipeline_context.get_stage_context() if ctx is not None and getattr(ctx, "total_stages", 0) > 1: return False try: cf = self._cookiefile_str(ytdlp_tool) pr = probe_url(url, no_playlist=False, timeout_seconds=15, cookiefile=cf) except Exception: pr = None if not isinstance(pr, dict): return False entries = pr.get("entries") if not isinstance(entries, list) or len(entries) <= 1: return False extractor_name = "" try: extractor_name = str(pr.get("extractor") or pr.get("extractor_key") or "").strip().lower() except Exception: extractor_name = "" table_type: Optional[str] = None if "bandcamp" in extractor_name: table_type = "bandcamp" elif "youtube" in extractor_name: table_type = "youtube" max_rows = 200 display_entries = entries[:max_rows] def _entry_to_url(entry: Any) -> Optional[str]: if not isinstance(entry, dict): return None for key in ("webpage_url", "original_url", "url"): v = entry.get(key) if isinstance(v, str) and v.strip(): s_val = v.strip() try: if urlparse(s_val).scheme in {"http", "https"}: return s_val except Exception: return s_val entry_id = entry.get("id") if isinstance(entry_id, str) and entry_id.strip(): extractor_name_inner = str(pr.get("extractor") or pr.get("extractor_key") or "").lower() if "youtube" in extractor_name_inner: return f"https://www.youtube.com/watch?v={entry_id.strip()}" return None table = Table() safe_url = str(url or "").strip() table.title = f'download-file -url "{safe_url}"' if safe_url else "download-file" if table_type: try: table.set_table(table_type) except Exception: table.table = table_type table.set_source_command("download-file", []) try: table._perseverance(True) except Exception: pass results_list: List[Dict[str, Any]] = [] for idx, entry in enumerate(display_entries, 1): title = None uploader = None duration = None entry_url = _entry_to_url(entry) try: if isinstance(entry, dict): title = entry.get("title") uploader = entry.get("uploader") or pr.get("uploader") duration = entry.get("duration") except Exception: pass row: Dict[str, Any] = { "table": "download-file", "title": str(title or f"Item {idx}"), "detail": str(uploader or ""), "media_kind": "playlist-item", "playlist_index": idx, "_selection_args": (["-url", str(entry_url)] if entry_url else ["-url", str(url), "-item", str(idx)]), "url": entry_url, "target": entry_url, "columns": [ ("#", str(idx)), ("Title", str(title or "")), ("Duration", str(duration or "")), ("Uploader", str(uploader or "")), ], } results_list.append(row) table.add_result(row) pipeline_context.set_current_stage_table(table) pipeline_context.set_last_result_table(table, results_list) try: suspend = getattr(pipeline_context, "suspend_live_progress", None) cm: AbstractContextManager[Any] = nullcontext() if callable(suspend): maybe_cm = suspend() if maybe_cm is not None: cm = maybe_cm # type: ignore[assignment] with cm: get_stderr_console().print(table) except Exception: pass setattr(table, "_rendered_by_cmdlet", True) return True def _maybe_show_format_table_for_single_url( self, *, mode: str, clip_spec: Any, clip_values: Sequence[str], playlist_items: Optional[str], ytdl_format: Any, supported_url: Sequence[str], playlist_selection_handled: bool, ytdlp_tool: YtDlpTool, formats_cache: Dict[str, Optional[List[Dict[str, Any]]]], storage: Any, hydrus_available: bool, final_output_dir: Path, args: Sequence[str], skip_preflight: bool = False, ) -> Optional[int]: try: ctx = pipeline_context.get_stage_context() if ctx is not None and getattr(ctx, "total_stages", 0) > 1: # In pipelines, skip interactive format tables; require explicit -query format. return None except Exception: pass if ( mode != "audio" and not clip_spec and not clip_values and not playlist_items and not ytdl_format and len(supported_url) == 1 and not playlist_selection_handled ): url = supported_url[0] canonical_url = self._canonicalize_url_for_storage( requested_url=url, ytdlp_tool=ytdlp_tool, playlist_items=playlist_items, ) if not skip_preflight: if not self._preflight_url_duplicate( storage=storage, hydrus_available=hydrus_available, final_output_dir=final_output_dir, candidate_url=canonical_url, extra_urls=[url], ): log(f"Skipping download: {url}", file=sys.stderr) return 0 formats = self._list_formats_cached( url, playlist_items_value=None, formats_cache=formats_cache, ytdlp_tool=ytdlp_tool, ) if formats and len(formats) > 1: candidate_formats = [f for f in formats if self._is_browseable_format(f)] filtered_formats = candidate_formats if candidate_formats else list(formats) debug(f"Formatlist: showing {len(filtered_formats)} formats (raw={len(formats)})") base_cmd = f'download-file "{url}"' remaining_args = [arg for arg in args if arg not in [url] and not arg.startswith("-")] if remaining_args: base_cmd += " " + " ".join(remaining_args) table = Table(title=f"Available formats for {url}", max_columns=10, preserve_order=True) table.set_table("ytdlp.formatlist") table.set_source_command("download-file", [url]) debug(f"[ytdlp.formatlist] Displaying format selection table for {url}") debug("[ytdlp.formatlist] Provider: ytdlp (routing to download-file via TABLE_AUTO_STAGES)") results_list: List[Dict[str, Any]] = [] for idx, fmt in enumerate(filtered_formats, 1): resolution = fmt.get("resolution", "") ext = fmt.get("ext", "") vcodec = fmt.get("vcodec", "none") acodec = fmt.get("acodec", "none") filesize = fmt.get("filesize") filesize_approx = fmt.get("filesize_approx") format_id = fmt.get("format_id", "") selection_format_id = format_id try: if vcodec != "none" and acodec == "none" and format_id: selection_format_id = f"{format_id}+ba" except Exception: selection_format_id = format_id # Use ytdlp helper to format for table format_dict = format_for_table_selection( fmt, url, idx, selection_format_id=selection_format_id, ) # Add base command for display format_dict["cmd"] = base_cmd def _merge_query_args(selection_args: List[str], query_value: str) -> List[str]: if not query_value: return selection_args merged = list(selection_args or []) if "-query" in merged: idx_query = merged.index("-query") if idx_query + 1 < len(merged): existing = str(merged[idx_query + 1] or "").strip() merged[idx_query + 1] = f"{existing},{query_value}" if existing else query_value else: merged.append(query_value) else: merged.extend(["-query", query_value]) return merged # Append clip values to selection args if needed selection_args: List[str] = list(format_dict.get("_selection_args") or []) try: if (not clip_spec) and clip_values: clip_query = f"clip:{','.join([v for v in clip_values if v])}" selection_args = _merge_query_args(selection_args, clip_query) except Exception: pass format_dict["_selection_args"] = selection_args # Also update in full_metadata for provider registration format_dict["full_metadata"]["_selection_args"] = selection_args results_list.append(format_dict) table.add_result(format_dict) try: suspend = getattr(pipeline_context, "suspend_live_progress", None) cm: AbstractContextManager[Any] = nullcontext() if callable(suspend): maybe_cm = suspend() if maybe_cm is not None: cm = maybe_cm # type: ignore[assignment] with cm: get_stderr_console().print(table) except Exception: pass setattr(table, "_rendered_by_cmdlet", True) pipeline_context.set_current_stage_table(table) pipeline_context.set_last_result_table(table, results_list) debug(f"[ytdlp.formatlist] Format table registered with {len(results_list)} formats") debug( f"[ytdlp.formatlist] When user selects @N, will invoke: download-file {url} -query 'format:'" ) log("", file=sys.stderr) return 0 return None def _download_supported_urls( self, *, supported_url: Sequence[str], ytdlp_tool: YtDlpTool, args: Sequence[str], config: Dict[str, Any], final_output_dir: Path, mode: str, clip_spec: Any, clip_ranges: Optional[List[tuple[int, int]]], query_hash_override: Optional[str], embed_chapters: bool, write_sub: bool, quiet_mode: bool, playlist_items: Optional[str], ytdl_format: Any, skip_per_url_preflight: bool, forced_single_format_id: Optional[str], forced_single_format_for_batch: bool, formats_cache: Dict[str, Optional[List[Dict[str, Any]]]], storage: Any, hydrus_available: bool, download_timeout_seconds: int, ) -> int: downloaded_count = 0 downloaded_pipe_objects: List[Dict[str, Any]] = [] pipe_seq = 0 clip_sections_spec = self._build_clip_sections_spec(clip_ranges) if clip_sections_spec: try: debug(f"Clip sections spec: {clip_sections_spec}") except Exception: pass for url in supported_url: try: debug(f"[download-file] Processing URL in loop: {url}") debug(f"[download-file] ytdl_format parameter passed in: {ytdl_format}") canonical_url = url if not skip_per_url_preflight or clip_ranges: canonical_url = self._canonicalize_url_for_storage( requested_url=url, ytdlp_tool=ytdlp_tool, playlist_items=playlist_items, ) if not skip_per_url_preflight: debug(f"[download-file] Running duplicate preflight for: {canonical_url}") if not self._preflight_url_duplicate( storage=storage, hydrus_available=hydrus_available, final_output_dir=final_output_dir, candidate_url=canonical_url, extra_urls=[url], ): log(f"Skipping download (duplicate found): {url}", file=sys.stderr) continue PipelineProgress(pipeline_context).begin_steps(2) actual_format = ytdl_format actual_playlist_items = playlist_items if playlist_items and not ytdl_format: import re if re.search(r"[^0-9,-]", playlist_items): actual_format = playlist_items actual_playlist_items = None if mode == "audio" and not actual_format: actual_format = "bestaudio" if mode == "video" and not actual_format: configured = (ytdlp_tool.default_format("video") or "").strip() if configured and configured != "bestvideo+bestaudio/best": actual_format = configured forced_single_applied = False if ( forced_single_format_for_batch and forced_single_format_id and not ytdl_format and not actual_playlist_items ): actual_format = forced_single_format_id forced_single_applied = True if ( actual_format and isinstance(actual_format, str) and mode != "audio" and "+" not in actual_format and "/" not in actual_format and "[" not in actual_format and actual_format not in {"best", "bv", "ba", "b"} and not forced_single_applied ): try: formats = self._list_formats_cached( url, playlist_items_value=actual_playlist_items, formats_cache=formats_cache, ytdlp_tool=ytdlp_tool, ) if formats: fmt_match = next((f for f in formats if str(f.get("format_id", "")) == actual_format), None) if fmt_match: vcodec = str(fmt_match.get("vcodec", "none")) acodec = str(fmt_match.get("acodec", "none")) if vcodec != "none" and acodec == "none": debug(f"Selected video-only format {actual_format}; using {actual_format}+ba for audio") actual_format = f"{actual_format}+ba" except Exception as e: pass attempted_single_format_fallback = False while True: try: opts = DownloadOptions( url=url, mode=mode, output_dir=final_output_dir, ytdl_format=actual_format, cookies_path=ytdlp_tool.resolve_cookiefile(), clip_sections=clip_sections_spec, playlist_items=actual_playlist_items, quiet=quiet_mode, no_playlist=False, embed_chapters=embed_chapters, write_sub=write_sub, ) PipelineProgress(pipeline_context).step("downloading") debug(f"Starting download for {url} (format: {actual_format or 'default'}) with {download_timeout_seconds}s activity timeout...") result_obj = _download_with_timeout(opts, timeout_seconds=download_timeout_seconds, config=config) debug(f"Download completed for {url}, building pipe object...") break except DownloadError as e: cause = getattr(e, "__cause__", None) detail = "" try: detail = str(cause or "") except Exception: detail = "" if ("requested format is not available" in (detail or "").lower()) and mode != "audio": if ( forced_single_format_for_batch and forced_single_format_id and not ytdl_format and not actual_playlist_items and not attempted_single_format_fallback ): attempted_single_format_fallback = True actual_format = forced_single_format_id debug(f"Only one format available (playlist preflight); retrying with: {actual_format}") continue formats = self._list_formats_cached( url, playlist_items_value=actual_playlist_items, formats_cache=formats_cache, ytdlp_tool=ytdlp_tool, ) if ( (not attempted_single_format_fallback) and isinstance(formats, list) and len(formats) == 1 and isinstance(formats[0], dict) ): only = formats[0] fallback_format = str(only.get("format_id") or "").strip() selection_format_id = fallback_format try: vcodec = str(only.get("vcodec", "none")) acodec = str(only.get("acodec", "none")) if vcodec != "none" and acodec == "none" and fallback_format: selection_format_id = f"{fallback_format}+ba" except Exception: selection_format_id = fallback_format if selection_format_id: attempted_single_format_fallback = True actual_format = selection_format_id debug(f"Only one format available; retrying with: {actual_format}") continue if formats: formats_to_show = formats table = Table(title=f"Available formats for {url}", max_columns=10, preserve_order=True) table.set_table("ytdlp.formatlist") table.set_source_command("download-file", [url]) results_list: List[Dict[str, Any]] = [] for idx, fmt in enumerate(formats_to_show, 1): resolution = fmt.get("resolution", "") ext = fmt.get("ext", "") vcodec = fmt.get("vcodec", "none") acodec = fmt.get("acodec", "none") filesize = fmt.get("filesize") filesize_approx = fmt.get("filesize_approx") format_id = fmt.get("format_id", "") selection_format_id = format_id try: if vcodec != "none" and acodec == "none" and format_id: selection_format_id = f"{format_id}+ba" except Exception: selection_format_id = format_id size_str = "" size_prefix = "" size_bytes = filesize if not size_bytes: size_bytes = filesize_approx if size_bytes: size_prefix = "~" try: if isinstance(size_bytes, (int, float)) and size_bytes > 0: size_mb = float(size_bytes) / (1024 * 1024) size_str = f"{size_prefix}{size_mb:.1f}MB" except Exception: size_str = "" desc_parts: List[str] = [] if resolution and resolution != "audio only": desc_parts.append(str(resolution)) if ext: desc_parts.append(str(ext).upper()) if vcodec != "none": desc_parts.append(f"v:{vcodec}") if acodec != "none": desc_parts.append(f"a:{acodec}") if size_str: desc_parts.append(size_str) format_desc = " | ".join(desc_parts) format_dict: Dict[str, Any] = { "table": "download-file", "title": f"Format {format_id}", "url": url, "target": url, "detail": format_desc, "media_kind": "format", "columns": [ ("ID", format_id), ("Resolution", resolution or "N/A"), ("Ext", ext), ("Size", size_str or ""), ("Video", vcodec), ("Audio", acodec), ], "full_metadata": { "format_id": format_id, "url": url, "item_selector": selection_format_id, }, "_selection_args": ["-query", f"format:{selection_format_id}"], } results_list.append(format_dict) table.add_result(format_dict) pipeline_context.set_current_stage_table(table) pipeline_context.set_last_result_table(table, results_list) try: suspend = getattr(pipeline_context, "suspend_live_progress", None) cm: AbstractContextManager[Any] = nullcontext() if callable(suspend): maybe_cm = suspend() if maybe_cm is not None: cm = maybe_cm # type: ignore[assignment] with cm: get_stderr_console().print(table) except Exception: pass PipelineProgress(pipeline_context).step("awaiting selection") log("Requested format is not available; select a working format with @N", file=sys.stderr) return 0 raise results_to_emit: List[Any] = [] if isinstance(result_obj, list): results_to_emit = list(result_obj) else: paths = getattr(result_obj, "paths", None) if isinstance(paths, list) and paths: for p in paths: try: p_path = Path(p) except Exception: continue try: if p_path.suffix.lower() in _SUBTITLE_EXTS: continue except Exception: pass if not p_path.exists() or p_path.is_dir(): continue try: hv = sha256_file(p_path) except Exception: hv = None results_to_emit.append( DownloadMediaResult( path=p_path, info=getattr(result_obj, "info", {}) or {}, tag=list(getattr(result_obj, "tag", []) or []), source_url=getattr(result_obj, "source_url", None) or opts.url, hash_value=hv, ) ) else: results_to_emit = [result_obj] pipe_objects: List[Dict[str, Any]] = [] for downloaded in results_to_emit: po = self._build_pipe_object(downloaded, url, opts) pipe_seq += 1 try: po.setdefault("pipe_index", pipe_seq) except Exception: pass try: info = downloaded.info if isinstance(getattr(downloaded, "info", None), dict) else {} except Exception: info = {} chapters_text = _format_chapters_note(info) if embed_chapters else None if chapters_text: notes = po.get("notes") if not isinstance(notes, dict): notes = {} notes.setdefault("chapters", chapters_text) po["notes"] = notes if write_sub: try: media_path = Path(str(po.get("path") or "")) except Exception: media_path = None if media_path is not None and media_path.exists() and media_path.is_file(): sub_path = _best_subtitle_sidecar(media_path) if sub_path is not None: sub_text = _read_text_file(sub_path) if sub_text: notes = po.get("notes") if not isinstance(notes, dict): notes = {} notes["sub"] = sub_text po["notes"] = notes try: sub_path.unlink() except Exception: pass pipe_objects.append(po) try: if clip_ranges and len(pipe_objects) == len(clip_ranges): source_hash = query_hash_override or self._find_existing_hash_for_url( storage, canonical_url, hydrus_available=hydrus_available, ) self._apply_clip_decorations(pipe_objects, clip_ranges, source_king_hash=source_hash) except Exception: pass debug(f"Emitting {len(pipe_objects)} result(s) to pipeline...") PipelineProgress(pipeline_context).step("finalized") stage_ctx = pipeline_context.get_stage_context() emit_enabled = bool(stage_ctx is not None) for pipe_obj_dict in pipe_objects: if emit_enabled: pipeline_context.emit(pipe_obj_dict) if pipe_obj_dict.get("url"): pipe_obj = coerce_to_pipe_object(pipe_obj_dict) register_url_with_local_library(pipe_obj, config) try: downloaded_pipe_objects.append(pipe_obj_dict) except Exception: pass downloaded_count += len(pipe_objects) debug("✓ Downloaded and emitted") except DownloadError as e: log(f"Download failed for {url}: {e}", file=sys.stderr) except Exception as e: log(f"Error processing {url}: {e}", file=sys.stderr) if downloaded_count > 0: debug(f"✓ Successfully processed {downloaded_count} URL(s)") return 0 log("No downloads completed", file=sys.stderr) return 1 def _run_streaming_urls( self, *, streaming_urls: List[str], args: Sequence[str], config: Dict[str, Any], parsed: Dict[str, Any], ) -> int: try: debug("Starting streaming download handler") ytdlp_tool = YtDlpTool(config) raw_url = list(streaming_urls) supported_url, unsupported_list = self._filter_supported_urls(raw_url) if not supported_url: log("No yt-dlp-supported url to download", file=sys.stderr) return 1 if unsupported_list: debug(f"Skipping {len(unsupported_list)} unsupported url (use direct HTTP mode)") final_output_dir = resolve_target_dir(parsed, config) if not final_output_dir: return 1 debug(f"Output directory: {final_output_dir}") progress = PipelineProgress(pipeline_context) try: # If we are already in a pipeline stage, the parent UI is already handling progress. # Calling ensure_local_ui can cause re-initialization hangs on some platforms. if pipeline_context.get_stage_context() is None: debug("[download-file] Initializing local UI...") progress.ensure_local_ui( label="download-file", total_items=len(supported_url), items_preview=supported_url, ) else: debug("[download-file] Skipping local UI: running inside pipeline stage") try: progress.begin_pipe( total_items=len(supported_url), items_preview=supported_url, ) except Exception as err: debug(f"[download-file] PipelineProgress begin_pipe error: {err}") except Exception as e: debug(f"[download-file] PipelineProgress update error: {e}") debug("[download-file] Parsing clip and query specs...") clip_spec = parsed.get("clip") query_spec = parsed.get("query") query_keyed = self._parse_query_keyed_spec(str(query_spec) if query_spec is not None else None) query_hash_override = self._extract_hash_override(str(query_spec) if query_spec is not None else None, query_keyed) embed_chapters = True write_sub = True query_format: Optional[str] = None try: fmt_values = query_keyed.get("format", []) if isinstance(query_keyed, dict) else [] fmt_candidate = fmt_values[-1] if fmt_values else None if fmt_candidate is not None: query_format = str(fmt_candidate).strip() except Exception: query_format = None query_audio: Optional[bool] = None try: audio_values = query_keyed.get("audio", []) if isinstance(query_keyed, dict) else [] audio_candidate = audio_values[-1] if audio_values else None if audio_candidate is not None: s_val = str(audio_candidate).strip().lower() if s_val in {"1", "true", "t", "yes", "y", "on"}: query_audio = True elif s_val in {"0", "false", "f", "no", "n", "off"}: query_audio = False elif s_val: query_audio = True except Exception: query_audio = None query_wants_audio = False if query_format: try: query_wants_audio = str(query_format).strip().lower() == "audio" except Exception: query_wants_audio = False if query_audio is not None: wants_audio = bool(query_audio) else: wants_audio = bool(query_wants_audio) mode = "audio" if wants_audio else "video" clip_ranges, clip_invalid, clip_values = self._parse_clip_ranges_and_apply_items( clip_spec=str(clip_spec) if clip_spec is not None else None, query_keyed=query_keyed, parsed=parsed, query_spec=str(query_spec) if query_spec is not None else None, ) if clip_invalid: return 1 if clip_ranges: try: debug(f"Clip ranges: {clip_ranges}") except Exception: pass quiet_mode = bool(config.get("_quiet_background_output")) if isinstance(config, dict) else False storage, hydrus_available = self._init_storage(config if isinstance(config, dict) else {}) formats_cache: Dict[str, Optional[List[Dict[str, Any]]]] = {} playlist_items = str(parsed.get("item")) if parsed.get("item") else None ytdl_format = None height_selector = None if query_format and not query_wants_audio: try: # Check if this looks like a YouTube format ID (used when selecting from format table) # Format IDs are typically 3 digits and come from user selections # Only treat as height if it looks like a resolution (ends with 'p' or is 1080+) is_likely_format_id = ( len(str(query_format).strip()) == 3 and str(query_format).strip().isdigit() ) if not is_likely_format_id: height_selector = ytdlp_tool.resolve_height_selector(query_format) except Exception: height_selector = None if query_wants_audio: # Explicit audio request should map to best-audio-only selector ytdl_format = "ba" elif height_selector: ytdl_format = height_selector elif query_format: # Use query_format as literal format ID (e.g., from table selection like '251') ytdl_format = query_format playlist_selection_handled = False if len(supported_url) == 1 and not playlist_items: candidate_url = supported_url[0] # If query_format is provided and numeric, resolve it now. if query_format and not query_wants_audio and not ytdl_format: try: idx_fmt = self._format_id_for_query_index(query_format, candidate_url, formats_cache, ytdlp_tool) if idx_fmt: ytdl_format = idx_fmt except ValueError as e: # Fallback: Treat as literal format if resolution fails or it's not a valid row index. debug(f"Format resolution for '{query_format}' failed ({e}); treating as literal.") ytdl_format = query_format if not ytdl_format: debug(f"[download-file] Checking for playlist at {candidate_url}...") if self._maybe_show_playlist_table(url=candidate_url, ytdlp_tool=ytdlp_tool): playlist_selection_handled = True # ... (existing logging code) ... return 0 skip_per_url_preflight = False try: skip_preflight_override = bool(config.get("_skip_url_preflight")) if isinstance(config, dict) else False except Exception: skip_preflight_override = False if skip_preflight_override: skip_per_url_preflight = True elif len(supported_url) > 1: if not self._preflight_url_duplicates_bulk( storage=storage, hydrus_available=hydrus_available, final_output_dir=final_output_dir, urls=list(supported_url), ): return 0 skip_per_url_preflight = True forced_single_format_id: Optional[str] = None forced_single_format_for_batch = False if len(supported_url) > 1 and not playlist_items and not ytdl_format: try: sample_url = str(supported_url[0]) fmts = self._list_formats_cached( sample_url, playlist_items_value=None, formats_cache=formats_cache, ytdlp_tool=ytdlp_tool, ) if isinstance(fmts, list) and len(fmts) == 1 and isinstance(fmts[0], dict): only_id = str(fmts[0].get("format_id") or "").strip() if only_id: forced_single_format_id = only_id forced_single_format_for_batch = True debug( f"Playlist format preflight: only one format available; using {forced_single_format_id} for all items" ) except Exception: forced_single_format_id = None forced_single_format_for_batch = False debug("[download-file] Checking if format table should be shown...") early_ret = self._maybe_show_format_table_for_single_url( mode=mode, clip_spec=clip_spec, clip_values=clip_values, playlist_items=playlist_items, ytdl_format=ytdl_format, supported_url=supported_url, playlist_selection_handled=playlist_selection_handled, ytdlp_tool=ytdlp_tool, formats_cache=formats_cache, storage=storage, hydrus_available=hydrus_available, final_output_dir=final_output_dir, args=args, skip_preflight=skip_preflight_override, ) if early_ret is not None: return int(early_ret) timeout_seconds = 300 try: override = config.get("_pipeobject_timeout_seconds") if isinstance(config, dict) else None if override is not None: timeout_seconds = max(1, int(override)) except Exception: timeout_seconds = 300 debug(f"[download-file] Proceeding to final download call for {len(supported_url)} URL(s)...") return self._download_supported_urls( supported_url=supported_url, ytdlp_tool=ytdlp_tool, args=args, config=config, final_output_dir=final_output_dir, mode=mode, clip_spec=clip_spec, clip_ranges=clip_ranges, query_hash_override=query_hash_override, embed_chapters=embed_chapters, write_sub=write_sub, quiet_mode=quiet_mode, playlist_items=playlist_items, ytdl_format=ytdl_format, skip_per_url_preflight=skip_per_url_preflight, forced_single_format_id=forced_single_format_id, forced_single_format_for_batch=forced_single_format_for_batch, formats_cache=formats_cache, storage=storage, hydrus_available=hydrus_available, download_timeout_seconds=timeout_seconds, ) except Exception as e: log(f"Error in streaming download handler: {e}", file=sys.stderr) return 1 def _parse_time_ranges(self, spec: str) -> List[tuple[int, int]]: def _to_seconds(ts: str) -> Optional[int]: ts = str(ts).strip() if not ts: return None try: unit_match = re.fullmatch(r"(?i)\s*(?:(?P\d+)h)?\s*(?:(?P\d+)m)?\s*(?:(?P\d+(?:\.\d+)?)s)?\s*", ts) except Exception: unit_match = None if unit_match and unit_match.group(0).strip() and any(unit_match.group(g) for g in ("h", "m", "s")): try: hours = int(unit_match.group("h") or 0) minutes = int(unit_match.group("m") or 0) seconds = float(unit_match.group("s") or 0) total = (hours * 3600) + (minutes * 60) + seconds return int(total) except Exception: return None if ":" in ts: parts = [p.strip() for p in ts.split(":")] if len(parts) == 2: hh_s = "0" mm_s, ss_s = parts elif len(parts) == 3: hh_s, mm_s, ss_s = parts else: return None try: hours = int(hh_s) minutes = int(mm_s) seconds = float(ss_s) total = (hours * 3600) + (minutes * 60) + seconds return int(total) except Exception: return None try: return int(float(ts)) except Exception: return None ranges: List[tuple[int, int]] = [] if not spec: return ranges for piece in str(spec).split(","): piece = piece.strip() if not piece: continue if "-" not in piece: return [] start_s, end_s = [p.strip() for p in piece.split("-", 1)] start = _to_seconds(start_s) end = _to_seconds(end_s) if start is None or end is None or start >= end: return [] ranges.append((start, end)) return ranges @staticmethod def _parse_keyed_csv_spec(spec: str, *, default_key: str) -> Dict[str, List[str]]: out: Dict[str, List[str]] = {} if not isinstance(spec, str): spec = str(spec) text = spec.strip() if not text: return out active = (default_key or "").strip().lower() or "clip" key_pattern = re.compile(r"^([A-Za-z_][A-Za-z0-9_-]*)\s*:\s*(.*)$") for raw_piece in text.split(","): piece = raw_piece.strip() if not piece: continue m = key_pattern.match(piece) if m: active = (m.group(1) or "").strip().lower() or active value = (m.group(2) or "").strip() if value: out.setdefault(active, []).append(value) continue out.setdefault(active, []).append(piece) return out def _build_clip_sections_spec(self, clip_ranges: Optional[List[tuple[int, int]]]) -> Optional[str]: ranges: List[str] = [] if clip_ranges: for start_s, end_s in clip_ranges: ranges.append(f"{start_s}-{end_s}") return ",".join(ranges) if ranges else None def _build_pipe_object(self, download_result: Any, url: str, opts: DownloadOptions) -> Dict[str, Any]: info: Dict[str, Any] = download_result.info if isinstance(download_result.info, dict) else {} media_path = Path(download_result.path) hash_value = download_result.hash_value or sha256_file(media_path) title = info.get("title") or media_path.stem tag = list(download_result.tag or []) if title and f"title:{title}" not in tag: tag.insert(0, f"title:{title}") final_url = None try: page_url = info.get("webpage_url") or info.get("original_url") or info.get("url") if page_url: final_url = str(page_url) except Exception: final_url = None if not final_url and url: final_url = str(url) return { "path": str(media_path), "hash": hash_value, "title": title, "url": final_url, "tag": tag, "action": "cmdlet:download-file", "is_temp": True, "ytdl_format": getattr(opts, "ytdl_format", None), "store": getattr(opts, "storage_name", None) or getattr(opts, "storage_location", None) or "PATH", "media_kind": "video" if opts.mode == "video" else "audio", } @staticmethod def download_streaming_url_as_pipe_objects( url: str, config: Dict[str, Any], *, mode_hint: Optional[str] = None, ytdl_format_hint: Optional[str] = None, ) -> List[Dict[str, Any]]: """Download a yt-dlp-supported URL and return PipeObject-style dict(s). This is a lightweight helper intended for cmdlets that need to expand streaming URLs into local files without re-implementing yt-dlp glue. """ url_str = str(url or "").strip() if not url_str: return [] if not is_url_supported_by_ytdlp(url_str): return [] try: from SYS.config import resolve_output_dir out_dir = resolve_output_dir(config) if out_dir is None: return [] except Exception: return [] cookies_path = None try: cookie_candidate = YtDlpTool(config).resolve_cookiefile() if cookie_candidate is not None and cookie_candidate.is_file(): cookies_path = cookie_candidate except Exception: cookies_path = None quiet_download = False try: quiet_download = bool((config or {}).get("_quiet_background_output")) except Exception: quiet_download = False mode = str(mode_hint or "").strip().lower() if mode_hint else "" if mode not in {"audio", "video"}: mode = "video" try: cf = ( str(cookies_path) if cookies_path is not None and cookies_path.is_file() else None ) fmts_probe = list_formats( url_str, no_playlist=False, playlist_items=None, cookiefile=cf, ) if isinstance(fmts_probe, list) and fmts_probe: has_video = False for f in fmts_probe: if not isinstance(f, dict): continue vcodec = str(f.get("vcodec", "none") or "none").strip().lower() if vcodec and vcodec != "none": has_video = True break mode = "video" if has_video else "audio" except Exception: mode = "video" fmt_hint = str(ytdl_format_hint).strip() if ytdl_format_hint else "" chosen_format: Optional[str] if fmt_hint: chosen_format = fmt_hint else: chosen_format = None if mode == "audio": chosen_format = "bestaudio/best" opts = DownloadOptions( url=url_str, mode=mode, output_dir=Path(out_dir), cookies_path=cookies_path, ytdl_format=chosen_format, quiet=quiet_download, embed_chapters=True, write_sub=True, ) try: result_obj = _download_with_timeout(opts, timeout_seconds=300, config=config) except Exception as exc: log(f"[download-file] Download failed for {url_str}: {exc}", file=sys.stderr) return [] results: List[Any] if isinstance(result_obj, list): results = list(result_obj) else: paths = getattr(result_obj, "paths", None) if isinstance(paths, list) and paths: results = [] for p in paths: try: p_path = Path(p) except Exception: continue if not p_path.exists() or p_path.is_dir(): continue try: hv = sha256_file(p_path) except Exception: hv = None try: results.append( DownloadMediaResult( path=p_path, info=getattr(result_obj, "info", {}) or {}, tag=list(getattr(result_obj, "tag", []) or []), source_url=getattr(result_obj, "source_url", None) or url_str, hash_value=hv, ) ) except Exception: continue else: results = [result_obj] out: List[Dict[str, Any]] = [] for downloaded in results: try: info = ( downloaded.info if isinstance(getattr(downloaded, "info", None), dict) else {} ) except Exception: info = {} try: media_path = Path(str(getattr(downloaded, "path", "") or "")) except Exception: continue if not media_path.exists() or media_path.is_dir(): continue try: hash_value = getattr(downloaded, "hash_value", None) or sha256_file(media_path) except Exception: hash_value = None title = None try: title = info.get("title") except Exception: title = None title = title or media_path.stem tags = list(getattr(downloaded, "tag", []) or []) if title and f"title:{title}" not in tags: tags.insert(0, f"title:{title}") final_url = None try: page_url = info.get("webpage_url") or info.get("original_url") or info.get("url") if page_url: final_url = str(page_url) except Exception: final_url = None if not final_url: final_url = url_str po: Dict[str, Any] = { "path": str(media_path), "hash": hash_value, "title": title, "url": final_url, "tag": tags, "action": "cmdlet:download-file", "is_temp": True, "ytdl_format": getattr(opts, "ytdl_format", None), "store": getattr(opts, "storage_name", None) or getattr(opts, "storage_location", None) or "PATH", "media_kind": "video" if opts.mode == "video" else "audio", } try: chapters_text = _format_chapters_note(info) except Exception: chapters_text = None if chapters_text: notes = po.get("notes") if not isinstance(notes, dict): notes = {} notes.setdefault("chapters", chapters_text) po["notes"] = notes try: sub_path = _best_subtitle_sidecar(media_path) except Exception: sub_path = None if sub_path is not None: sub_text = _read_text_file(sub_path) if sub_text: notes = po.get("notes") if not isinstance(notes, dict): notes = {} notes["sub"] = sub_text po["notes"] = notes try: sub_path.unlink() except Exception: pass out.append(po) return out @staticmethod def _normalize_hash_hex(value: Optional[str]) -> Optional[str]: if not value or not isinstance(value, str): return None candidate = value.strip().lower() if len(candidate) == 64 and all(c in "0123456789abcdef" for c in candidate): return candidate return None @classmethod def _extract_hash_from_search_hit(cls, hit: Any) -> Optional[str]: if not isinstance(hit, dict): return None for key in ("hash", "hash_hex", "file_hash", "hydrus_hash"): v = hit.get(key) normalized = cls._normalize_hash_hex(str(v) if v is not None else None) if normalized: return normalized return None @classmethod def _find_existing_hash_for_url( cls, storage: Any, canonical_url: str, *, hydrus_available: bool ) -> Optional[str]: if storage is None or not canonical_url: return None try: from Store.HydrusNetwork import HydrusNetwork except Exception: HydrusNetwork = None # type: ignore try: backend_names = list(storage.list_searchable_backends() or []) except Exception: backend_names = [] for backend_name in backend_names: try: backend = storage[backend_name] except Exception: continue try: if str(backend_name).strip().lower() == "temp": continue except Exception: pass try: if HydrusNetwork is not None and isinstance(backend, HydrusNetwork) and not hydrus_available: continue except Exception: pass try: hits = backend.search(f"url:{canonical_url}", limit=5) or [] except Exception: hits = [] for hit in hits: extracted = cls._extract_hash_from_search_hit(hit) if extracted: return extracted return None @staticmethod def _format_timecode(seconds: int, *, force_hours: bool) -> str: total = max(0, int(seconds)) minutes, secs = divmod(total, 60) hours, minutes = divmod(minutes, 60) if force_hours: return f"{hours:02d}:{minutes:02d}:{secs:02d}" return f"{minutes:02d}:{secs:02d}" @classmethod def _format_clip_range(cls, start_s: int, end_s: int) -> str: force_hours = bool(start_s >= 3600 or end_s >= 3600) return f"{cls._format_timecode(start_s, force_hours=force_hours)}-{cls._format_timecode(end_s, force_hours=force_hours)}" @classmethod def _apply_clip_decorations( cls, pipe_objects: List[Dict[str, Any]], clip_ranges: List[tuple[int, int]], *, source_king_hash: Optional[str] ) -> None: if not pipe_objects or len(pipe_objects) != len(clip_ranges): return for po, (start_s, end_s) in zip(pipe_objects, clip_ranges): clip_range = cls._format_clip_range(start_s, end_s) clip_tag = f"clip:{clip_range}" po["title"] = clip_tag tags = po.get("tag") if not isinstance(tags, list): tags = [] tags = [t for t in tags if not str(t).strip().lower().startswith("title:")] tags = [t for t in tags if not str(t).strip().lower().startswith("relationship:")] tags.insert(0, f"title:{clip_tag}") if clip_tag not in tags: tags.append(clip_tag) po["tag"] = tags if len(pipe_objects) < 2: return hashes: List[str] = [] for po in pipe_objects: h_val = cls._normalize_hash_hex(str(po.get("hash") or "")) hashes.append(h_val or "") king_hash = cls._normalize_hash_hex(source_king_hash) if source_king_hash else None if not king_hash: king_hash = hashes[0] if hashes and hashes[0] else None if not king_hash: return alt_hashes: List[str] = [h for h in hashes if h and h != king_hash] if not alt_hashes: return for po in pipe_objects: po["relationships"] = {"king": [king_hash], "alt": list(alt_hashes)} def _run_impl( self, result: Any, args: Sequence[str], config: Dict[str, Any] ) -> int: """Main download implementation for direct HTTP files.""" progress = PipelineProgress(pipeline_context) prev_progress = None had_progress_key = False try: debug("Starting download-file") # Allow providers to tap into the active PipelineProgress (optional). try: if isinstance(config, dict): had_progress_key = "_pipeline_progress" in config prev_progress = config.get("_pipeline_progress") config["_pipeline_progress"] = progress except Exception: pass # Parse arguments parsed = parse_cmdlet_args(args, self) # Resolve URLs from -url or positional arguments url_candidates = parsed.get("url") or [ a for a in parsed.get("args", []) if isinstance(a, str) and ( a.startswith("http") or "://" in a or ":" in a or "🧲" in a and not a.startswith("-") ) ] raw_url = normalize_url_list(url_candidates) quiet_mode = bool(config.get("_quiet_background_output")) if isinstance(config, dict) else False # Fallback to piped items if no explicit URLs provided piped_items = [] if not raw_url: if isinstance(result, list): piped_items = list(result) elif result is not None: piped_items = [result] # Handle TABLE_AUTO_STAGES routing: if a piped item has _selection_args, # re-invoke download-file with those args instead of processing the PipeObject itself. if piped_items and not raw_url: selection_runs: List[List[str]] = [] residual_items: List[Any] = [] def _looks_like_url(value: Any) -> bool: try: s_val = str(value or "").strip().lower() except Exception: return False return s_val.startswith(("http://", "https://", "magnet:", "torrent:", "alldebrid:", "alldebrid🧲")) def _extract_selection_args(item: Any) -> tuple[Optional[List[str]], Optional[str]]: selection_args: Optional[List[str]] = None item_url: Optional[str] = None if isinstance(item, dict): selection_args = item.get("_selection_args") or item.get("selection_args") item_url = item.get("url") or item.get("path") or item.get("target") md = item.get("metadata") or item.get("full_metadata") if isinstance(md, dict): selection_args = selection_args or md.get("_selection_args") or md.get("selection_args") item_url = item_url or md.get("url") or md.get("source_url") extra = item.get("extra") if isinstance(extra, dict): selection_args = selection_args or extra.get("_selection_args") or extra.get("selection_args") item_url = item_url or extra.get("url") or extra.get("source_url") else: item_url = getattr(item, "url", None) or getattr(item, "path", None) or getattr(item, "target", None) md = getattr(item, "metadata", None) if isinstance(md, dict): selection_args = md.get("_selection_args") or md.get("selection_args") item_url = item_url or md.get("url") or md.get("source_url") extra = getattr(item, "extra", None) if isinstance(extra, dict): selection_args = selection_args or extra.get("_selection_args") or extra.get("selection_args") item_url = item_url or extra.get("url") or extra.get("source_url") if isinstance(selection_args, (list, tuple)): normalized_args = [str(arg) for arg in selection_args if arg is not None] elif selection_args is not None: normalized_args = [str(selection_args)] else: normalized_args = None if item_url and not _looks_like_url(item_url): item_url = None return normalized_args, item_url def _selection_args_have_url(args_list: Sequence[str]) -> bool: for idx, arg in enumerate(args_list): low = str(arg or "").strip().lower() if low in {"-url", "--url"}: return True if _looks_like_url(arg): return True return False for item in piped_items: handled = False try: normalized_args, item_url = _extract_selection_args(item) if normalized_args: if _selection_args_have_url(normalized_args): selection_runs.append(list(normalized_args)) handled = True elif item_url: selection_runs.append([str(item_url)] + list(normalized_args)) handled = True except Exception as e: debug(f"[ytdlp] Error handling selection args: {e}") handled = False if not handled: residual_items.append(item) if selection_runs: selection_urls: List[str] = [] def _extract_urls_from_args(args_list: Sequence[str]) -> List[str]: urls: List[str] = [] idx = 0 while idx < len(args_list): token = str(args_list[idx] or "") low = token.strip().lower() if low in {"-url", "--url"} and idx + 1 < len(args_list): candidate = str(args_list[idx + 1] or "").strip() if _looks_like_url(candidate): urls.append(candidate) idx += 2 continue if _looks_like_url(token): urls.append(token.strip()) idx += 1 return urls for run_args in selection_runs: for u in _extract_urls_from_args(run_args): if u not in selection_urls: selection_urls.append(u) original_skip_preflight = None original_timeout = None original_skip_direct = None try: if isinstance(config, dict): original_skip_preflight = config.get("_skip_url_preflight") original_timeout = config.get("_pipeobject_timeout_seconds") original_skip_direct = config.get("_skip_direct_on_streaming_failure") except Exception: original_skip_preflight = None original_timeout = None try: if selection_urls: # Skip Duplicate Preflight on selection re-entry: # User has already seen the table/status and explicitly selected an item. # Skipping this reduces DB load and latency. if isinstance(config, dict): config["_skip_url_preflight"] = True config["_skip_direct_on_streaming_failure"] = True if isinstance(config, dict) and config.get("_pipeobject_timeout_seconds") is None: # Use a generous default for individual items config["_pipeobject_timeout_seconds"] = 600 successes = 0 failures = 0 last_code = 0 total_selection = len(selection_runs) debug(f"[download-file] Processing {total_selection} selected item(s) from table...") for idx, run_args in enumerate(selection_runs, 1): debug(f"[download-file] Item {idx}/{total_selection}: {run_args}") debug("[download-file] Re-invoking download-file for selected item...") exit_code = self._run_impl(None, run_args, config) if exit_code == 0: successes += 1 else: failures += 1 last_code = exit_code piped_items = residual_items if not piped_items: if successes > 0: return 0 return last_code or 1 finally: try: if isinstance(config, dict): if original_skip_preflight is None: config.pop("_skip_url_preflight", None) else: config["_skip_url_preflight"] = original_skip_preflight if original_timeout is None: config.pop("_pipeobject_timeout_seconds", None) else: config["_pipeobject_timeout_seconds"] = original_timeout if original_skip_direct is None: config.pop("_skip_direct_on_streaming_failure", None) else: config["_skip_direct_on_streaming_failure"] = original_skip_direct except Exception: pass had_piped_input = False try: if isinstance(result, list): had_piped_input = bool(result) else: had_piped_input = bool(result) except Exception: had_piped_input = False # UX: In piped mode, allow a single positional arg to be the destination directory. # Example: @1-4 | download-file "C:\\Users\\Me\\Downloads\\yoyo" if (had_piped_input and raw_url and len(raw_url) == 1 and (not parsed.get("path"))): candidate = str(raw_url[0] or "").strip() low = candidate.lower() looks_like_url = low.startswith(( "http://", "https://", "ftp://", "magnet:", "torrent:", "alldebrid:", "alldebrid🧲" )) looks_like_provider = ( ":" in candidate and not candidate.startswith(( "http:", "https:", "ftp:", "ftps:", "file:", "alldebrid:" )) ) looks_like_windows_path = ( (len(candidate) >= 2 and candidate[1] == ":") or candidate.startswith("\\\\") or candidate.startswith("\\") or candidate.endswith(("\\", "/")) ) if (not looks_like_url) and ( not looks_like_provider) and looks_like_windows_path: parsed["path"] = candidate raw_url = [] piped_items = self._collect_piped_items_if_no_urls(result, raw_url) if not raw_url and not piped_items: log("No url or piped items to download", file=sys.stderr) return 1 registry = self._load_provider_registry() # Provider-pre-check (e.g. Internet Archive format picker) picker_result = self._maybe_show_provider_picker( raw_urls=raw_url, piped_items=piped_items, parsed=parsed, config=config, registry=registry, ) if picker_result is not None: return int(picker_result) provider_url_matches = self._match_provider_urls(raw_url, registry) streaming_candidates = [ url for url in raw_url if provider_url_matches.get(str(url).strip()) == "ytdlp" ] supported_streaming, unsupported_streaming = self._filter_supported_urls(streaming_candidates) matched_ytdlp = bool(streaming_candidates) streaming_exit_code: Optional[int] = None streaming_downloaded = 0 if supported_streaming: debug(f"[download-file] Using ytdlp provider for {len(supported_streaming)} URL(s)") streaming_exit_code = self._run_streaming_urls( streaming_urls=supported_streaming, args=args, config=config, parsed=parsed, ) if streaming_exit_code == 0: streaming_downloaded += 1 # Only remove URLs from further processing when streaming succeeded. raw_url = [u for u in raw_url if u not in supported_streaming] if not raw_url and not unsupported_streaming: piped_items = [] if not raw_url and not piped_items: return int(streaming_exit_code or 0) else: try: skip_direct = bool(config.get("_skip_direct_on_streaming_failure")) if isinstance(config, dict) else False except Exception: skip_direct = False if matched_ytdlp: skip_direct = True if skip_direct: raw_url = [u for u in raw_url if u not in supported_streaming] if not raw_url and not piped_items: return int(streaming_exit_code or 1) # Re-check picker if partial processing occurred picker_result = self._maybe_show_provider_picker( raw_urls=raw_url, piped_items=piped_items, parsed=parsed, config=config, registry=registry, ) if picker_result is not None: return int(picker_result) # Get output directory final_output_dir = resolve_target_dir(parsed, config) if not final_output_dir: return 1 debug(f"Output directory: {final_output_dir}") # If the caller isn't running the shared pipeline Live progress UI (e.g. direct # cmdlet execution), start a minimal local pipeline progress panel so downloads # show consistent, Rich-formatted progress (like download-media). total_items = max(1, len(raw_url or []) + len(piped_items or [])) preview = build_pipeline_preview(raw_url, piped_items) progress.ensure_local_ui( label="download-file", total_items=total_items, items_preview=preview ) downloaded_count = 0 urls_downloaded, early_exit = self._process_explicit_urls( raw_urls=raw_url, final_output_dir=final_output_dir, config=config, quiet_mode=quiet_mode, registry=registry, progress=progress, context_items=(result if isinstance(result, list) else ([result] if result else [])), ) downloaded_count += int(urls_downloaded) if early_exit is not None: return int(early_exit) provider_downloaded, magnet_submissions = self._process_provider_items( piped_items=piped_items, final_output_dir=final_output_dir, config=config, quiet_mode=quiet_mode, registry=registry, progress=progress, ) downloaded_count += provider_downloaded if downloaded_count > 0 or streaming_downloaded > 0 or magnet_submissions > 0: # Render detail panels for downloaded items when download-file is the last stage. self._maybe_render_download_details(config=config) msg = f"✓ Successfully processed {downloaded_count} file(s)" if magnet_submissions: msg += f" and queued {magnet_submissions} magnet(s)" debug(msg) return 0 if streaming_exit_code is not None: return int(streaming_exit_code) log("No downloads completed", file=sys.stderr) return 1 except Exception as e: log(f"Error in download-file: {e}", file=sys.stderr) return 1 finally: try: if isinstance(config, dict): if had_progress_key: config["_pipeline_progress"] = prev_progress else: config.pop("_pipeline_progress", None) except Exception: pass progress.close_local_ui(force_complete=True) def _maybe_show_provider_picker( self, *, raw_urls: Sequence[str], piped_items: Sequence[Any], parsed: Dict[str, Any], config: Dict[str, Any], registry: Dict[str, Any], ) -> Optional[int]: """Generic hook for providers to show a selection table (e.g. Internet Archive format picker).""" total_inputs = len(raw_urls or []) + len(piped_items or []) if total_inputs != 1: return None target_url = None if raw_urls: target_url = str(raw_urls[0]) elif piped_items: target_url = str(get_field(piped_items[0], "path") or get_field(piped_items[0], "url") or "") if not target_url: return None match_provider_name_for_url = registry.get("match_provider_name_for_url") get_provider = registry.get("get_provider") provider_name = None if match_provider_name_for_url: try: provider_name = match_provider_name_for_url(target_url) except Exception: pass if provider_name and get_provider: provider = get_provider(provider_name, config) if provider and hasattr(provider, "maybe_show_picker"): try: quiet_mode = bool(config.get("_quiet_background_output")) res = provider.maybe_show_picker( url=target_url, item=piped_items[0] if piped_items else None, parsed=parsed, config=config, quiet_mode=quiet_mode, ) if res is not None: return int(res) except Exception as e: debug(f"Provider {provider_name} picker error: {e}") return None # Module-level singleton registration CMDLET = Download_File()