from __future__ import annotations import hashlib import json import os import random import re import string import subprocess import sys import threading import time import traceback from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Sequence, cast from urllib.parse import urlparse from SYS import pipeline as pipeline_context from SYS.logger import debug, log from SYS.models import ( DebugLogger, DownloadError, DownloadMediaResult, DownloadOptions, ProgressBar, ) from SYS.pipeline_progress import PipelineProgress from SYS.utils import ensure_directory, sha256_file from SYS.yt_metadata import extract_ytdlp_tags _YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {} try: import yt_dlp # type: ignore from yt_dlp.extractor import gen_extractors # type: ignore except Exception as exc: # pragma: no cover - handled at runtime yt_dlp = None # type: ignore gen_extractors = None # type: ignore YTDLP_IMPORT_ERROR: Optional[Exception] = exc else: YTDLP_IMPORT_ERROR = None _EXTRACTOR_CACHE: List[Any] | None = None # Patterns for domain extraction from yt-dlp regexes # 1) Alternation group followed by \.tld e.g. (?:youtube|youtu|youtube-nocookie)\.com ALT_GROUP_TLD = re.compile(r'\((?:\?:)?([^\)]+)\)\\\.(?P[A-Za-z0-9.+-]+)') # 2) Literal domain pieces like youtube\.com or youtu\.be (not preceded by a group) LITERAL_DOMAIN = re.compile(r'(? List[str]: if not valid_url: return [] if isinstance(valid_url, str): return [valid_url] if isinstance(valid_url, (list, tuple)): return [p for p in valid_url if isinstance(p, str)] return [] def extract_from_pattern(pat: str) -> set[str]: domains = set() # 1) Alternation groups followed by .tld for alt_group, tld in ALT_GROUP_TLD.findall(pat): # alt_group like "youtube|youtu|youtube-nocookie" for alt in alt_group.split('|'): alt = alt.strip() # remove any non-domain tokens like (?:www\.)? if present inside alt (rare) alt = re.sub(r'\(\?:www\\\.\)\?', '', alt) if alt: domains.add(f"{alt}.{tld}".lower()) # 2) Literal domain matches (youtube\.com) for name, tld in LITERAL_DOMAIN.findall(pat): domains.add(f"{name}.{tld}".lower()) # 3) Partial tokens fallback (only if we didn't already capture domains) # This helps when regexes contain plain tokens like 'zhihu' or 'vimeo' without .com if not domains: for token in PARTIAL_TOKEN.findall(pat): # ignore common regex words that are not domains if len(token) <= 2: continue # avoid tokens that are clearly regex constructs if token.lower() in {"https", "http", "www", "com", "net", "org"}: continue domains.add(f"{token.lower()}.com") return domains def extract_domains(valid_url) -> set[str]: patterns = normalize_patterns(valid_url) all_domains = set() for pat in patterns: all_domains |= extract_from_pattern(pat) # final cleanup: remove obvious junk like 'com.com' if present cleaned = set() for d in all_domains: # drop duplicates where left side equals tld (e.g., com.com) parts = d.split('.') if len(parts) >= 2 and parts[-2] == parts[-1]: continue cleaned.add(d) return cleaned def _build_supported_domains() -> set[str]: global _SUPPORTED_DOMAINS if _SUPPORTED_DOMAINS is not None: return _SUPPORTED_DOMAINS _SUPPORTED_DOMAINS = set() if gen_extractors is None: return _SUPPORTED_DOMAINS try: for e in gen_extractors(): name = getattr(e, "IE_NAME", "").lower() if name == "generic": continue regex = getattr(e, "_VALID_URL", None) domains = extract_domains(regex) _SUPPORTED_DOMAINS.update(domains) except Exception: from SYS.logger import logger logger.exception("Failed to build supported domains from yt-dlp extractors") return _SUPPORTED_DOMAINS def _get_nested(config: Dict[str, Any], *path: str) -> Any: cur: Any = config for key in path: if not isinstance(cur, dict): return None cur = cur.get(key) return cur def _parse_csv_list(value: Any) -> Optional[List[str]]: if value is None: return None if isinstance(value, list): out: List[str] = [] for item in value: s = str(item).strip() if s: out.append(s) return out or None s = str(value).strip() if not s: return None # allow either JSON-ish list strings or simple comma-separated values if s.startswith("[") and s.endswith("]"): s = s[1:-1] parts = [p.strip() for p in s.split(",")] parts = [p for p in parts if p] return parts or None _BROWSER_COOKIES_AVAILABLE: Optional[bool] = None _BROWSER_COOKIE_WARNING_EMITTED = False def _browser_cookie_candidate_paths() -> List[Path]: try: home = Path.home() except Exception: home = Path.cwd() candidates: List[Path] = [] if os.name == "nt": for env_value in (os.getenv("LOCALAPPDATA"), os.getenv("APPDATA")): if not env_value: continue base_path = Path(env_value) if not base_path: continue candidates.extend([ base_path / "Google" / "Chrome" / "User Data" / "Default" / "Cookies", base_path / "Chromium" / "User Data" / "Default" / "Cookies", base_path / "BraveSoftware" / "Brave-Browser" / "User Data" / "Default" / "Cookies", ]) else: candidates.extend([ home / ".config" / "google-chrome" / "Default" / "Cookies", home / ".config" / "chromium" / "Default" / "Cookies", home / ".config" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies", ]) if sys.platform == "darwin": candidates.extend([ home / "Library" / "Application Support" / "Google" / "Chrome" / "Default" / "Cookies", home / "Library" / "Application Support" / "Chromium" / "Default" / "Cookies", home / "Library" / "Application Support" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies", ]) return candidates def _has_browser_cookie_database() -> bool: global _BROWSER_COOKIES_AVAILABLE if _BROWSER_COOKIES_AVAILABLE is not None: return _BROWSER_COOKIES_AVAILABLE for path in _browser_cookie_candidate_paths(): try: if path.is_file(): _BROWSER_COOKIES_AVAILABLE = True return True except Exception: continue _BROWSER_COOKIES_AVAILABLE = False return False def _browser_cookie_path_for(browser_name: str) -> Optional[Path]: """Return the cookie DB Path for a specific browser if present, else None. Supported browsers (case-insensitive): "chrome", "chromium", "brave". """ name = str(browser_name or "").strip().lower() if not name: return None try: home = Path.home() except Exception: home = Path.cwd() # Windows if os.name == "nt": for env_value in (os.getenv("LOCALAPPDATA"), os.getenv("APPDATA")): if not env_value: continue base = Path(env_value) if name in ("chrome", "google-chrome"): p = base / "Google" / "Chrome" / "User Data" / "Default" / "Cookies" if p.is_file(): return p if name == "chromium": p = base / "Chromium" / "User Data" / "Default" / "Cookies" if p.is_file(): return p if name in ("brave", "brave-browser"): p = base / "BraveSoftware" / "Brave-Browser" / "User Data" / "Default" / "Cookies" if p.is_file(): return p # *nix and macOS if sys.platform == "darwin": if name in ("chrome", "google-chrome"): p = home / "Library" / "Application Support" / "Google" / "Chrome" / "Default" / "Cookies" if p.is_file(): return p if name == "chromium": p = home / "Library" / "Application Support" / "Chromium" / "Default" / "Cookies" if p.is_file(): return p if name in ("brave", "brave-browser"): p = home / "Library" / "Application Support" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies" if p.is_file(): return p # Linux and other if name in ("chrome", "google-chrome"): p = home / ".config" / "google-chrome" / "Default" / "Cookies" if p.is_file(): return p if name == "chromium": p = home / ".config" / "chromium" / "Default" / "Cookies" if p.is_file(): return p if name in ("brave", "brave-browser"): p = home / ".config" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies" if p.is_file(): return p return None def _add_browser_cookies_if_available(options: Dict[str, Any], preferred_browser: Optional[str] = None) -> None: global _BROWSER_COOKIE_WARNING_EMITTED # If a preferred browser is specified, try to use it if available if preferred_browser: try: if _browser_cookie_path_for(preferred_browser) is not None: options["cookiesfrombrowser"] = [preferred_browser] return else: if not _BROWSER_COOKIE_WARNING_EMITTED: log(f"Requested browser cookie DB '{preferred_browser}' not found; falling back to autodetect.") _BROWSER_COOKIE_WARNING_EMITTED = True except Exception: from SYS.logger import logger logger.exception("Failed to check browser cookie path for preferred browser '%s'", preferred_browser) # Auto-detect in common order (chrome/chromium/brave) for candidate in ("chrome", "chromium", "brave"): try: if _browser_cookie_path_for(candidate) is not None: options["cookiesfrombrowser"] = [candidate] return except Exception: from SYS.logger import logger logger.exception("Error while checking cookie path for candidate browser '%s'", candidate) continue if not _BROWSER_COOKIE_WARNING_EMITTED: log( "Browser cookie extraction skipped because no Chrome-compatible cookie database was found. " "Provide a cookies file via config or --cookies if authentication is required." ) _BROWSER_COOKIE_WARNING_EMITTED = True def ensure_yt_dlp_ready() -> None: """Verify yt-dlp is importable, raising DownloadError if missing.""" if yt_dlp is not None: return detail = str(YTDLP_IMPORT_ERROR or "yt-dlp is not installed") raise DownloadError(f"yt-dlp module not available: {detail}") def _get_extractors() -> List[Any]: global _EXTRACTOR_CACHE if _EXTRACTOR_CACHE is not None: return _EXTRACTOR_CACHE ensure_yt_dlp_ready() if gen_extractors is None: _EXTRACTOR_CACHE = [] return _EXTRACTOR_CACHE try: _EXTRACTOR_CACHE = [ie for ie in gen_extractors()] except Exception: _EXTRACTOR_CACHE = [] return _EXTRACTOR_CACHE def is_url_supported_by_ytdlp(url: str) -> bool: """Return True if yt-dlp has a non-generic extractor for the URL.""" if not url or not isinstance(url, str): return False if YTDLP_IMPORT_ERROR is not None: return False try: parsed = urlparse(url) if not parsed.scheme or not parsed.netloc: return False except Exception: return False try: parsed = urlparse(url) domain = parsed.netloc.lower() if not domain: return False supported = _build_supported_domains() for base in supported: if domain == base or domain.endswith("." + base): return True except Exception: return False return False _FORMATS_CACHE: Dict[str, tuple[float, List[Dict[str, Any]]]] = {} def list_formats( url: str, *, no_playlist: bool = False, playlist_items: Optional[str] = None, cookiefile: Optional[str] = None, timeout_seconds: int = 20, ) -> Optional[List[Dict[str, Any]]]: """Get available formats for a URL. Returns a list of format dicts or None if unsupported or probing fails. """ if not is_url_supported_by_ytdlp(url): return None # Cache format probes to avoid redundant network hits cache_key = hashlib.md5(f"{url}|{no_playlist}|{playlist_items}|{cookiefile}".encode()).hexdigest() now = time.monotonic() if cache_key in _FORMATS_CACHE: ts, result = _FORMATS_CACHE[cache_key] if now - ts < 300: # 5 minute cache for formats return result result_container: List[Optional[Any]] = [None, None] # [result, error] def _do_list() -> None: try: ensure_yt_dlp_ready() assert yt_dlp is not None ydl_opts: Dict[str, Any] = { "quiet": True, "no_warnings": True, "skip_download": True, "noprogress": True, "socket_timeout": min(10, max(1, int(timeout_seconds))), "retries": 2, } if cookiefile: ydl_opts["cookiefile"] = str(cookiefile) else: # Best effort attempt to use browser cookies if no file is explicitly passed _add_browser_cookies_if_available(ydl_opts) if no_playlist: ydl_opts["noplaylist"] = True if playlist_items: ydl_opts["playlist_items"] = str(playlist_items) with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type] info = ydl.extract_info(url, download=False) if not isinstance(info, dict): result_container[0] = None return formats = info.get("formats") if not isinstance(formats, list): result_container[0] = None return out: List[Dict[str, Any]] = [] for fmt in formats: if isinstance(fmt, dict): out.append(fmt) result_container[0] = out except Exception as exc: debug(f"yt-dlp format probe failed for {url}: {exc}") result_container[1] = exc # Use daemon=True so a hung thread doesn't block process exit thread = threading.Thread(target=_do_list, daemon=True) thread.start() thread.join(timeout=max(1, int(timeout_seconds))) if thread.is_alive(): debug(f"yt-dlp format probe timed out for {url} (>={timeout_seconds}s)") return None if result_container[1] is not None: return None if result_container[0] is not None: _FORMATS_CACHE[cache_key] = (now, cast(List[Dict[str, Any]], result_container[0])) return cast(Optional[List[Dict[str, Any]]], result_container[0]) _PROBE_CACHE: Dict[str, tuple[float, Dict[str, Any]]] = {} def probe_url( url: str, no_playlist: bool = False, timeout_seconds: int = 15, *, cookiefile: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """Probe URL metadata without downloading. Returns None if unsupported, errors, or times out. """ if not is_url_supported_by_ytdlp(url): return None # Simple in-memory cache to avoid duplicate probes for the same URL/options in a short window. cache_key = hashlib.md5(f"{url}|{no_playlist}|{cookiefile}".encode()).hexdigest() now = time.monotonic() if cache_key in _PROBE_CACHE: ts, result = _PROBE_CACHE[cache_key] if now - ts < 60: # 60 second cache return result result_container: List[Optional[Any]] = [None, None] # [result, error] def _do_probe() -> None: try: debug(f"[probe] Starting probe for {url}") ensure_yt_dlp_ready() assert yt_dlp is not None ydl_opts: Dict[str, Any] = { "quiet": True, "no_warnings": True, "socket_timeout": 10, "retries": 2, "skip_download": True, "extract_flat": "in_playlist", "noprogress": True, } if cookiefile: ydl_opts["cookiefile"] = str(cookiefile) else: # Best effort fallback _add_browser_cookies_if_available(ydl_opts) if no_playlist: ydl_opts["noplaylist"] = True with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type] debug(f"[probe] ytdlp extract_info (download=False) start: {url}") info = ydl.extract_info(url, download=False) debug(f"[probe] ytdlp extract_info (download=False) done: {url}") if not isinstance(info, dict): result_container[0] = None return webpage_url = info.get("webpage_url") or info.get("original_url") or info.get("url") result_container[0] = { "extractor": info.get("extractor", ""), "title": info.get("title", ""), "entries": info.get("entries", []), "duration": info.get("duration"), "uploader": info.get("uploader"), "description": info.get("description"), "requested_url": url, "webpage_url": webpage_url, "url": webpage_url or url, } except Exception as exc: debug(f"Probe error for {url}: {exc}") result_container[1] = exc # Use daemon=True so a hung probe doesn't block the process thread = threading.Thread(target=_do_probe, daemon=True) thread.start() thread.join(timeout=timeout_seconds) if thread.is_alive(): debug(f"Probe timeout for {url} (>={timeout_seconds}s), proceeding without probe") return None if result_container[1] is not None: return None if result_container[0] is not None: _PROBE_CACHE[cache_key] = (now, cast(Dict[str, Any], result_container[0])) return cast(Optional[Dict[str, Any]], result_container[0]) def is_browseable_format(fmt: Any) -> bool: """Check if a format is user-browseable (not storyboard, metadata, etc). Used by the ytdlp format selector to filter out non-downloadable formats. Returns False for: - MHTML, JSON sidecar metadata - Storyboard/thumbnail formats - Audio-only or video-only when both available Args: fmt: Format dict from yt-dlp with keys like format_id, ext, vcodec, acodec, format_note Returns: bool: True if format is suitable for browsing/selection """ if not isinstance(fmt, dict): return False format_id = str(fmt.get("format_id") or "").strip() if not format_id: return False # Filter out metadata/sidecar formats ext = str(fmt.get("ext") or "").strip().lower() if ext in {"mhtml", "json"}: return False # Filter out storyboard/thumbnail formats note = str(fmt.get("format_note") or "").lower() if "storyboard" in note: return False if format_id.lower().startswith("sb"): return False # Filter out formats with no audio and no video vcodec = str(fmt.get("vcodec", "none")) acodec = str(fmt.get("acodec", "none")) return not (vcodec == "none" and acodec == "none") def format_for_table_selection( fmt: Dict[str, Any], url: str, index: int, *, selection_format_id: Optional[str] = None, ) -> Dict[str, Any]: """Format a yt-dlp format dict into a table result row for selection. This helper formats a single format from list_formats() into the shape expected by the ResultTable system, ready for user selection and routing to download-file with -query "format:". Args: fmt: Format dict from yt-dlp url: The URL this format came from index: Row number for display (1-indexed) selection_format_id: Override format_id for selection (e.g., with +ba suffix) Returns: dict: Format result row with _selection_args for table system Example: fmts = list_formats("https://youtube.com/watch?v=abc") browseable = [f for f in fmts if is_browseable_format(f)] results = [format_for_table_selection(f, url, i+1) for i, f in enumerate(browseable)] """ format_id = fmt.get("format_id", "") resolution = fmt.get("resolution", "") ext = fmt.get("ext", "") vcodec = fmt.get("vcodec", "none") acodec = fmt.get("acodec", "none") filesize = fmt.get("filesize") filesize_approx = fmt.get("filesize_approx") # If not provided, compute selection format ID (add +ba for video-only) if selection_format_id is None: selection_format_id = format_id try: if vcodec != "none" and acodec == "none" and format_id: selection_format_id = f"{format_id}+ba" except Exception: from SYS.logger import logger logger.exception("Failed to compute selection_format_id for format: %s", fmt) # Format file size size_str = "" size_prefix = "" size_bytes = filesize or filesize_approx try: if isinstance(size_bytes, (int, float)) and size_bytes > 0: size_mb = float(size_bytes) / (1024 * 1024) size_str = f"{size_prefix}{size_mb:.1f}MB" except Exception: from SYS.logger import logger logger.exception("Failed to compute size string for format: %s", fmt) # Build description desc_parts: List[str] = [] if resolution and resolution != "audio only": desc_parts.append(resolution) if ext: desc_parts.append(str(ext).upper()) if vcodec != "none": desc_parts.append(f"v:{vcodec}") if acodec != "none": desc_parts.append(f"a:{acodec}") if size_str: desc_parts.append(size_str) format_desc = " | ".join(desc_parts) # Build table row return { "table": "download-file", "title": f"Format {format_id}", "url": url, "target": url, "detail": format_desc, "annotations": [ext, resolution] if resolution else [ext], "media_kind": "format", "columns": [ ("ID", format_id), ("Resolution", resolution or "N/A"), ("Ext", ext), ("Size", size_str or ""), ("Video", vcodec), ("Audio", acodec), ], "full_metadata": { "format_id": format_id, "url": url, "item_selector": selection_format_id, "_selection_args": ["-query", f"format:{selection_format_id}"], }, "_selection_args": ["-query", f"format:{selection_format_id}"], } @dataclass(slots=True) class YtDlpDefaults: """User-tunable defaults for yt-dlp behavior. Recommended config.conf keys (top-level dotted keys): - format="best|1080|720|640|audio" - ytdlp.format_sort="res:2160,res:1440,res:1080,res:720,res" Cookies: - cookies="C:\\path\\cookies.txt" (already supported by config.resolve_cookies_path) - cookies_from_browser="auto|none|chrome|brave|chromium" """ format: str = "best" video_format: str = "bestvideo+bestaudio/best" audio_format: str = "251/140/bestaudio" format_sort: Optional[List[str]] = None cookies_from_browser: Optional[str] = None class YtDlpTool: """Centralizes yt-dlp defaults and translation helpers. This is intentionally small and dependency-light so cmdlets can use it without forcing a full refactor. """ def __init__( self, config: Optional[Dict[str, Any]] = None, *, script_dir: Optional[Path] = None ) -> None: self._config: Dict[str, Any] = dict(config or {}) # `resolve_cookies_path` expects the app root so it can fall back to ./cookies.txt. # This file lives under ./tool/, so default to the parent directory. self._script_dir = script_dir or Path(__file__).resolve().parent.parent self.defaults = self._load_defaults() self._cookiefile: Optional[Path] = self._init_cookiefile() def _init_cookiefile(self) -> Optional[Path]: """Resolve cookies once at tool init (yt-dlp is the primary consumer).""" try: from SYS.config import resolve_cookies_path resolved = resolve_cookies_path(self._config, script_dir=self._script_dir) if resolved is not None and resolved.is_file(): return resolved except Exception: from SYS.logger import logger logger.exception("Failed to initialize cookiefile using resolve_cookies_path") return None def resolve_height_selector(self, format_str: Optional[str]) -> Optional[str]: """Resolve numeric heights (720, 1080p) to yt-dlp height selectors. Examples: "720" -> "bv*[height<=720]+ba" "1080p" -> "bv*[height<=1080]+ba" """ if not format_str or not isinstance(format_str, str): return None s = format_str.strip().lower() if not s: return None # Strip trailing 'p' if present (e.g. 720p -> 720) if s.endswith('p'): s = s[:-1] if s.isdigit(): height = int(s) if height >= 144: return f"bv*[height<={height}]+ba" return None def _load_defaults(self) -> YtDlpDefaults: cfg = self._config # NOTE: `YtDlpDefaults` is a slots dataclass. Referencing defaults via # `YtDlpDefaults.video_format` yields a `member_descriptor`, not the # default string value. Use an instance for fallback defaults. _fallback_defaults = YtDlpDefaults() tool_block = _get_nested(cfg, "tool", "ytdlp") if not isinstance(tool_block, dict): tool_block = {} ytdlp_block = cfg.get("ytdlp") if isinstance(cfg.get("ytdlp"), dict) else {} if not isinstance(ytdlp_block, dict): ytdlp_block = {} # Accept both nested and flat styles. video_format = ( tool_block.get("video_format") or tool_block.get("format") or ytdlp_block.get("video_format") or ytdlp_block.get("video") or ytdlp_block.get("format_video") or cfg.get("ytdlp_video_format") ) audio_format = ( tool_block.get("audio_format") or ytdlp_block.get("audio_format") or ytdlp_block.get("audio") or ytdlp_block.get("format_audio") or cfg.get("ytdlp_audio_format") ) # Also accept dotted keys written as nested dicts: ytdlp.format.video, ytdlp.format.audio nested_video = _get_nested(cfg, "ytdlp", "format", "video") nested_audio = _get_nested(cfg, "ytdlp", "format", "audio") fmt_sort_val = ( tool_block.get("format_sort") or ytdlp_block.get("format_sort") or ytdlp_block.get("formatSort") or cfg.get("ytdlp_format_sort") or _get_nested(cfg, "ytdlp", "format", "sort") ) fmt_sort = _parse_csv_list(fmt_sort_val) # Cookie source preference: allow forcing a browser DB or 'auto'/'none' cookies_pref = ( tool_block.get("cookies_from_browser") or tool_block.get("cookiesfrombrowser") or ytdlp_block.get("cookies_from_browser") or ytdlp_block.get("cookiesfrombrowser") or cfg.get("ytdlp_cookies_from_browser") or _get_nested(cfg, "ytdlp", "cookies_from_browser") ) # Unified format preference: prefer explicit 'format' key but accept legacy keys format_pref = ( tool_block.get("format") or tool_block.get("video_format") or ytdlp_block.get("format") or ytdlp_block.get("video_format") or cfg.get("ytdlp_format") or cfg.get("ytdlp_video_format") or _get_nested(cfg, "ytdlp", "format") ) defaults = YtDlpDefaults( format=str(format_pref).strip() if format_pref else "best", video_format=str( nested_video or video_format or _fallback_defaults.video_format ), audio_format=str( nested_audio or audio_format or _fallback_defaults.audio_format ), format_sort=fmt_sort, cookies_from_browser=(str(cookies_pref).strip() if cookies_pref else None), ) return defaults def resolve_cookiefile(self) -> Optional[Path]: return self._cookiefile def default_format(self, mode: str) -> str: """Determine the final yt-dlp format string. Priority: - If caller explicitly requested audio mode (mode == 'audio'), return audio format. - If configured default format is 'audio', return audio format. - If configured default is 'best' or blank, return video_format. - Otherwise return the configured format value (e.g., '720'). """ m = str(mode or "").lower().strip() if m == "audio": return self.defaults.audio_format cfg = (str(self.defaults.format or "")).strip() lc = cfg.lower() if lc == "audio": return self.defaults.audio_format if not cfg or lc == "best": return self.defaults.video_format return cfg def build_ytdlp_options(self, opts: DownloadOptions) -> Dict[str, Any]: """Translate DownloadOptions into yt-dlp API options.""" ensure_directory(opts.output_dir) outtmpl = str((opts.output_dir / "%(title)s.%(ext)s").resolve()) base_options: Dict[str, Any] = { "outtmpl": outtmpl, "quiet": True, "no_warnings": True, "noprogress": True, "socket_timeout": 30, "retries": 10, "fragment_retries": 10, "http_chunk_size": 10_485_760, "restrictfilenames": True, "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36", "referer": "https://www.youtube.com/", } base_options.setdefault( "http_headers", { "User-Agent": base_options.get("user_agent"), "Referer": base_options.get("referer"), }, ) try: repo_root = Path(__file__).resolve().parents[1] bundled_ffmpeg_dir = repo_root / "MPV" / "ffmpeg" / "bin" if bundled_ffmpeg_dir.exists(): base_options.setdefault("ffmpeg_location", str(bundled_ffmpeg_dir)) except Exception: from SYS.logger import logger logger.exception("Failed to inspect bundled ffmpeg directory") try: if os.name == "nt": base_options.setdefault("file_access_retries", 40) except Exception: from SYS.logger import logger logger.exception("Failed to set Windows-specific yt-dlp options") if opts.cookies_path and opts.cookies_path.is_file(): base_options["cookiefile"] = str(opts.cookies_path) else: cookiefile = self.resolve_cookiefile() if cookiefile is not None and cookiefile.is_file(): base_options["cookiefile"] = str(cookiefile) else: # Respect configured browser cookie preference if provided; otherwise fall back to auto-detect. pref = (self.defaults.cookies_from_browser or "").lower().strip() if pref: if pref in {"none", "off", "false"}: # Explicitly disabled pass elif pref in {"auto", "detect"}: _add_browser_cookies_if_available(base_options) else: # Try the preferred browser first; fall back to auto-detect if not present _add_browser_cookies_if_available(base_options, preferred_browser=pref) else: # Add browser cookies support "just in case" if no file found (best effort) _add_browser_cookies_if_available(base_options) # Special handling for format keywords explicitly passed in via options if opts.ytdl_format == "audio": try: opts = opts._replace(mode="audio", ytdl_format=None) except Exception: try: import dataclasses as _dc opts = _dc.replace(opts, mode="audio", ytdl_format=None) except Exception: from SYS.logger import logger logger.exception("Failed to set opts mode to audio via dataclasses.replace") elif opts.ytdl_format == "video": try: opts = opts._replace(mode="video", ytdl_format=None) except Exception: try: import dataclasses as _dc opts = _dc.replace(opts, mode="video", ytdl_format=None) except Exception: from SYS.logger import logger logger.exception("Failed to set opts mode to video via dataclasses.replace") if opts.no_playlist: base_options["noplaylist"] = True # If no explicit format was provided, honor the configured default format ytdl_format = opts.ytdl_format if not ytdl_format: configured_format = (str(self.defaults.format or "")).strip() if configured_format: if configured_format.lower() == "audio": # Default to audio-only downloads try: opts = opts._replace(mode="audio") except Exception: try: import dataclasses as _dc opts = _dc.replace(opts, mode="audio") except Exception: from SYS.logger import logger logger.exception("Failed to set opts mode to audio via dataclasses.replace (configured default)") ytdl_format = None else: # Leave ytdl_format None so that default_format(opts.mode) # returns the configured format literally (e.g., '720') and # we don't auto-convert it to an internal selector. pass if ytdl_format and opts.mode != "audio": # Don't resolve 3-digit format IDs (like 251, 249, 140 from YouTube format tables) as heights # YouTube format IDs are typically 2-3 digits representing specific codec/quality combinations # Height selectors come from user input like "720" or "1080p" is_likely_format_id = ( isinstance(ytdl_format, str) and len(ytdl_format.strip()) == 3 and ytdl_format.strip().isdigit() ) if not is_likely_format_id: resolved = self.resolve_height_selector(ytdl_format) if resolved: ytdl_format = resolved fmt = ytdl_format or self.default_format(opts.mode) base_options["format"] = fmt if opts.mode == "audio": base_options["postprocessors"] = [{ "key": "FFmpegExtractAudio" }] if opts.mode != "audio": format_sort = self.defaults.format_sort or [ "res:4320", "res:2880", "res:2160", "res:1440", "res:1080", "res:720", "res", ] base_options["format_sort"] = format_sort if getattr(opts, "embed_chapters", False): pps = base_options.get("postprocessors") if not isinstance(pps, list): pps = [] already_has_metadata = any( isinstance(pp, dict) and str(pp.get("key") or "") == "FFmpegMetadata" for pp in pps ) if not already_has_metadata: pps.append( { "key": "FFmpegMetadata", "add_metadata": True, "add_chapters": True, "add_infojson": "if_exists", } ) base_options["postprocessors"] = pps if opts.mode != "audio": base_options.setdefault("merge_output_format", "mkv") if getattr(opts, "write_sub", False): base_options["writesubtitles"] = True base_options["writeautomaticsub"] = True base_options["subtitlesformat"] = "vtt" if opts.clip_sections: sections: List[str] = [] def _secs_to_hms(seconds: float) -> str: total = max(0, int(seconds)) minutes, secs = divmod(total, 60) hours, minutes = divmod(minutes, 60) return f"{hours:02d}:{minutes:02d}:{secs:02d}" for section_range in str(opts.clip_sections).split(","): section_range = section_range.strip() if not section_range: continue try: start_s_raw, end_s_raw = section_range.split("-", 1) start_s = float(start_s_raw.strip()) end_s = float(end_s_raw.strip()) if start_s >= end_s: continue sections.append(f"*{_secs_to_hms(start_s)}-{_secs_to_hms(end_s)}") except (ValueError, AttributeError): continue if sections: base_options["download_sections"] = sections # Clipped outputs should begin with a keyframe; otherwise players (notably mpv) # can show audio before video or a black screen until the next keyframe. # yt-dlp implements this by forcing keyframes at cut points. base_options["force_keyframes_at_cuts"] = True debug(f"Download sections configured: {', '.join(sections)}") if opts.playlist_items: base_options["playlist_items"] = opts.playlist_items if not opts.quiet: debug(f"yt-dlp: mode={opts.mode}, format={base_options.get('format')}, cookiefile={base_options.get('cookiefile')}") return base_options def build_yt_dlp_cli_args( self, *, url: str, output_dir: Optional[Path] = None, ytdl_format: Optional[str] = None, playlist_items: Optional[str] = None, no_playlist: bool = False, quiet: bool = True, extra_args: Optional[Sequence[str]] = None, ) -> List[str]: """Build a yt-dlp command line (argv list). This is primarily for debug output or subprocess execution. """ argv: List[str] = ["yt-dlp"] if quiet: argv.extend(["--quiet", "--no-warnings"]) argv.append("--no-progress") cookiefile = self.resolve_cookiefile() if cookiefile is not None: argv.extend(["--cookies", str(cookiefile)]) if no_playlist: argv.append("--no-playlist") if playlist_items: argv.extend(["--playlist-items", str(playlist_items)]) fmt = (ytdl_format or "").strip() if fmt: # Use long form to avoid confusion with app-level flags. argv.extend(["--format", fmt]) if self.defaults.format_sort: for sort_key in self.defaults.format_sort: argv.extend(["-S", sort_key]) if output_dir is not None: outtmpl = str((output_dir / "%(title)s.%(ext)s").resolve()) argv.extend(["-o", outtmpl]) if extra_args: argv.extend([str(a) for a in extra_args if str(a).strip()]) argv.append(str(url)) return argv def debug_print_cli(self, argv: Sequence[str]) -> None: try: debug("yt-dlp argv: " + " ".join(str(a) for a in argv)) except Exception: from SYS.logger import logger logger.exception("Failed to debug-print yt-dlp CLI arguments") def config_schema() -> List[Dict[str, Any]]: """Return a schema describing editable YT-DLP tool defaults for the config UI.""" format_choices = [ "best", "1080", "720", "640", "audio", ] # Offer browser choices depending on what's present on the host system browser_choices = ["auto", "none"] for b in ("chrome", "chromium", "brave"): try: if _browser_cookie_path_for(b) is not None: browser_choices.append(b) except Exception: from SYS.logger import logger logger.exception("Error while checking cookie path for browser '%s'", b) continue return [ { "key": "format", "label": "Default format", "default": YtDlpDefaults.format, "choices": format_choices, }, { "key": "cookies", "label": "Cookie file (path)", "default": "", }, { "key": "cookies_from_browser", "label": "Browser cookie source (used if no cookie file)", "default": "auto", "choices": browser_choices, }, ] # Progress + utility helpers for yt-dlp driven downloads (previously in cmdlet/download_media). _YTDLP_PROGRESS_BAR = ProgressBar() _YTDLP_PROGRESS_ACTIVITY_LOCK = threading.Lock() _YTDLP_PROGRESS_LAST_ACTIVITY = 0.0 _SUBTITLE_EXTS = (".vtt", ".srt", ".ass", ".ssa", ".lrc") def _progress_label(status: Optional[Dict[str, Any]]) -> str: if not status: return "unknown" raw_info = status.get("info_dict") info_dict = raw_info if isinstance(raw_info, dict) else {} candidates = [ status.get("filename"), info_dict.get("_filename"), info_dict.get("filepath"), info_dict.get("title"), info_dict.get("id"), ] for cand in candidates: if not cand: continue try: name = Path(str(cand)).name except Exception: name = str(cand) label = str(name or "").strip() if label: return label return "download" def _record_progress_activity(timestamp: Optional[float] = None) -> None: global _YTDLP_PROGRESS_LAST_ACTIVITY with _YTDLP_PROGRESS_ACTIVITY_LOCK: _YTDLP_PROGRESS_LAST_ACTIVITY = timestamp if timestamp is not None else time.monotonic() def _get_last_progress_activity() -> float: with _YTDLP_PROGRESS_ACTIVITY_LOCK: return _YTDLP_PROGRESS_LAST_ACTIVITY def _clear_progress_activity() -> None: _record_progress_activity(0.0) def _live_ui_and_pipe_index() -> tuple[Optional[Any], int]: ui = None try: ui = pipeline_context.get_live_progress() if hasattr(pipeline_context, "get_live_progress") else None except Exception: ui = None pipe_idx: int = 0 try: stage_ctx = pipeline_context.get_stage_context() if hasattr(pipeline_context, "get_stage_context") else None maybe_idx = getattr(stage_ctx, "pipe_index", None) if stage_ctx is not None else None if isinstance(maybe_idx, int): pipe_idx = int(maybe_idx) except Exception: pipe_idx = 0 return ui, pipe_idx def _begin_live_steps(total_steps: int) -> None: ui, pipe_idx = _live_ui_and_pipe_index() if ui is None: return try: begin = getattr(ui, "begin_pipe_steps", None) if callable(begin): begin(int(pipe_idx), total_steps=int(total_steps)) except Exception: return def _step(text: str) -> None: ui, pipe_idx = _live_ui_and_pipe_index() if ui is None: return try: adv = getattr(ui, "advance_pipe_step", None) if callable(adv): adv(int(pipe_idx), str(text)) except Exception: return def _set_pipe_percent(percent: int) -> None: ui, pipe_idx = _live_ui_and_pipe_index() if ui is None: return try: set_pct = getattr(ui, "set_pipe_percent", None) if callable(set_pct): set_pct(int(pipe_idx), int(percent)) except Exception: return def _format_chapters_note(info: Dict[str, Any]) -> Optional[str]: """Format yt-dlp chapter metadata into a stable, note-friendly text.""" try: chapters = info.get("chapters") except Exception: chapters = None if not isinstance(chapters, list) or not chapters: return None rows: List[tuple[int, Optional[int], str]] = [] max_t = 0 for ch in chapters: if not isinstance(ch, dict): continue start_raw = ch.get("start_time") end_raw = ch.get("end_time") title_raw = ch.get("title") or ch.get("name") or ch.get("chapter") try: if start_raw is None: continue start_s = int(float(start_raw)) except Exception: continue end_s: Optional[int] = None try: if end_raw is not None: end_s = int(float(end_raw)) except Exception: end_s = None title = str(title_raw).strip() if title_raw is not None else "" rows.append((start_s, end_s, title)) try: max_t = max(max_t, start_s, end_s or 0) except Exception: max_t = max(max_t, start_s) if not rows: return None force_hours = bool(max_t >= 3600) def _tc(seconds: int) -> str: total = max(0, int(seconds)) minutes, secs = divmod(total, 60) hours, minutes = divmod(minutes, 60) if force_hours: return f"{hours:02d}:{minutes:02d}:{secs:02d}" return f"{minutes:02d}:{secs:02d}" lines: List[str] = [] for start_s, end_s, title in sorted( rows, key=lambda r: (r[0], r[1] if r[1] is not None else 10**9, r[2]) ): if end_s is not None and end_s > start_s: prefix = f"{_tc(start_s)}-{_tc(end_s)}" else: prefix = _tc(start_s) line = f"{prefix} {title}".strip() if line: lines.append(line) text = "\n".join(lines).strip() return text or None def _best_subtitle_sidecar(media_path: Path) -> Optional[Path]: """Find the most likely subtitle sidecar file for a downloaded media file.""" try: base_dir = media_path.parent stem = media_path.stem if not stem: return None candidates: List[Path] = [] for p in base_dir.glob(stem + ".*"): try: if not p.is_file(): continue except Exception: continue if p.suffix.lower() in _SUBTITLE_EXTS: candidates.append(p) preferred_order = [".vtt", ".srt", ".ass", ".ssa", ".lrc"] for ext in preferred_order: for p in candidates: if p.suffix.lower() == ext: return p return candidates[0] if candidates else None except Exception: return None def _read_text_file(path: Path) -> Optional[str]: try: return path.read_text(encoding="utf-8", errors="ignore") except Exception: return None def _download_with_sections_via_cli( url: str, ytdl_options: Dict[str, Any], sections: List[str], quiet: bool = False, ) -> tuple[Optional[str], Dict[str, Any]]: sections_list = ytdl_options.get("download_sections", []) if not sections_list: return "", {} pipeline = PipelineProgress(pipeline_context) class _SectionProgressSimulator: def __init__(self, start_pct: int, max_pct: int, interval: float = 0.5) -> None: self._start_pct = max(0, min(int(start_pct), 99)) self._max_pct = max(self._start_pct, min(int(max_pct), 98)) self._interval = max(0.1, float(interval)) self._stop_event = threading.Event() self._thread: Optional[threading.Thread] = None def _run(self) -> None: current = self._start_pct while not self._stop_event.wait(self._interval): if current < self._max_pct: current += 1 try: _set_pipe_percent(current) except Exception: from SYS.logger import logger logger.exception("Failed to set pipeline percent to %d", current) def start(self) -> None: if self._thread is not None or self._start_pct >= self._max_pct: return self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def stop(self) -> None: self._stop_event.set() if self._thread is not None: self._thread.join(timeout=0.5) self._thread = None try: _set_pipe_percent(self._max_pct) except Exception: from SYS.logger import logger logger.exception("Failed to set pipeline percent to max %d", self._max_pct) session_id = hashlib.md5((url + str(time.time()) + "".join(random.choices(string.ascii_letters, k=10))).encode()).hexdigest()[:12] first_section_info = None total_sections = len(sections_list) try: for section_idx, section in enumerate(sections_list, 1): display_pct = 50 if total_sections > 0: display_pct = 50 + int(((section_idx - 1) / max(1, total_sections)) * 49) try: _set_pipe_percent(display_pct) except Exception: from SYS.logger import logger logger.exception("Failed to set pipeline percent to display_pct %d for section %d", display_pct, section_idx) pipeline.set_status(f"Downloading & clipping clip section {section_idx}/{total_sections}") base_outtmpl = ytdl_options.get("outtmpl", "%(title)s.%(ext)s") output_dir_path = Path(base_outtmpl).parent filename_tmpl = f"{session_id}_{section_idx}" if base_outtmpl.endswith(".%(ext)s"): filename_tmpl += ".%(ext)s" section_outtmpl = str(output_dir_path / filename_tmpl) if section_idx == 1: metadata_cmd = ["yt-dlp", "--dump-json", "--skip-download"] if ytdl_options.get("cookiefile"): cookies_path = ytdl_options["cookiefile"].replace("\\", "/") metadata_cmd.extend(["--cookies", cookies_path]) if ytdl_options.get("noplaylist"): metadata_cmd.append("--no-playlist") metadata_cmd.append(url) try: meta_result = subprocess.run(metadata_cmd, capture_output=True, text=True) if meta_result.returncode == 0 and meta_result.stdout: try: info_dict = json.loads(meta_result.stdout.strip()) first_section_info = info_dict if not quiet: debug(f"Extracted title from metadata: {info_dict.get('title')}") except json.JSONDecodeError: if not quiet: debug("Could not parse JSON metadata") except Exception as exc: if not quiet: debug(f"Error extracting metadata: {exc}") cmd = ["yt-dlp"] if quiet: cmd.append("--quiet") cmd.append("--no-warnings") cmd.append("--no-progress") cmd.extend(["--postprocessor-args", "ffmpeg:-hide_banner -loglevel error"]) if ytdl_options.get("ffmpeg_location"): try: cmd.extend(["--ffmpeg-location", str(ytdl_options["ffmpeg_location"])]) except Exception: from SYS.logger import logger logger.exception("Failed to append ffmpeg_location CLI option") if ytdl_options.get("format"): cmd.extend(["-f", ytdl_options["format"]]) if ytdl_options.get("merge_output_format"): cmd.extend(["--merge-output-format", str(ytdl_options["merge_output_format"])]) postprocessors = ytdl_options.get("postprocessors") want_add_metadata = bool(ytdl_options.get("addmetadata")) want_embed_chapters = bool(ytdl_options.get("embedchapters")) if isinstance(postprocessors, list): for pp in postprocessors: if not isinstance(pp, dict): continue if str(pp.get("key") or "") == "FFmpegMetadata": want_add_metadata = True if bool(pp.get("add_chapters", True)): want_embed_chapters = True if want_add_metadata: cmd.append("--add-metadata") if want_embed_chapters: cmd.append("--embed-chapters") if ytdl_options.get("writesubtitles"): cmd.append("--write-sub") cmd.append("--write-auto-sub") cmd.extend(["--sub-format", "vtt"]) if ytdl_options.get("force_keyframes_at_cuts"): cmd.append("--force-keyframes-at-cuts") cmd.extend(["-o", section_outtmpl]) if ytdl_options.get("cookiefile"): cookies_path = ytdl_options["cookiefile"].replace("\\", "/") cmd.extend(["--cookies", cookies_path]) if ytdl_options.get("noplaylist"): cmd.append("--no-playlist") cmd.extend(["--download-sections", section]) cmd.append(url) if not quiet: debug(f"Running yt-dlp for section: {section}") progress_end_pct = min(display_pct + 45, 98) simulator = _SectionProgressSimulator(display_pct, progress_end_pct) simulator.start() try: if quiet: subprocess.run(cmd, check=True, capture_output=True, text=True) else: subprocess.run(cmd, check=True) except subprocess.CalledProcessError as exc: stderr_text = exc.stderr or "" tail = "\n".join(stderr_text.splitlines()[-12:]).strip() details = f"\n{tail}" if tail else "" raise DownloadError(f"yt-dlp failed for section {section} (exit {exc.returncode}){details}") from exc except Exception as exc: raise DownloadError(f"yt-dlp failed for section {section}: {exc}") from exc finally: simulator.stop() finally: pipeline.clear_status() try: _set_pipe_percent(99) except Exception: from SYS.logger import logger logger.exception("Failed to set pipeline percent to 99 at end of multi-section job") return session_id, first_section_info or {} def _iter_download_entries(info: Dict[str, Any]) -> Iterator[Dict[str, Any]]: queue: List[Dict[str, Any]] = [info] seen: set[int] = set() while queue: current = queue.pop(0) obj_id = id(current) if obj_id in seen: continue seen.add(obj_id) entries = current.get("entries") if isinstance(entries, list): for entry in entries: queue.append(entry) if current.get("requested_downloads") or not entries: yield current def _candidate_paths(entry: Dict[str, Any], output_dir: Path) -> Iterator[Path]: requested = entry.get("requested_downloads") if isinstance(requested, list): for item in requested: if isinstance(item, dict): fp = item.get("filepath") or item.get("_filename") if fp: yield Path(fp) for key in ("filepath", "_filename", "filename"): value = entry.get(key) if value: yield Path(value) if entry.get("filename"): yield output_dir / entry["filename"] def _resolve_entry_and_path(info: Dict[str, Any], output_dir: Path) -> tuple[Dict[str, Any], Path]: for entry in _iter_download_entries(info): for candidate in _candidate_paths(entry, output_dir): if candidate.is_file(): return entry, candidate if not candidate.is_absolute(): maybe = output_dir / candidate if maybe.is_file(): return entry, maybe raise FileNotFoundError("yt-dlp did not report a downloaded media file") def _resolve_entries_and_paths(info: Dict[str, Any], output_dir: Path) -> List[tuple[Dict[str, Any], Path]]: resolved: List[tuple[Dict[str, Any], Path]] = [] seen: set[str] = set() for entry in _iter_download_entries(info): chosen: Optional[Path] = None for candidate in _candidate_paths(entry, output_dir): if candidate.is_file(): chosen = candidate break if not candidate.is_absolute(): maybe = output_dir / candidate if maybe.is_file(): chosen = maybe break if chosen is None: continue key = str(chosen.resolve()) if key in seen: continue seen.add(key) resolved.append((entry, chosen)) return resolved def _extract_sha256(info: Dict[str, Any]) -> Optional[str]: for payload in [info] + info.get("entries", []): if not isinstance(payload, dict): continue hashes = payload.get("hashes") if isinstance(hashes, dict): for key in ("sha256", "sha-256", "sha_256"): if key in hashes and isinstance(hashes[key], str) and hashes[key].strip(): return hashes[key].strip() for key in ("sha256", "sha-256", "sha_256"): value = payload.get(key) if isinstance(value, str) and value.strip(): return value.strip() return None def _progress_callback(status: Dict[str, Any]) -> None: label = _progress_label(status) event = status.get("status") downloaded = status.get("downloaded_bytes") total = status.get("total_bytes") or status.get("total_bytes_estimate") if event == "downloading": _record_progress_activity() pipeline = PipelineProgress(pipeline_context) live_ui, _ = pipeline.ui_and_pipe_index() use_live = live_ui is not None def _total_bytes(value: Any) -> Optional[int]: try: if isinstance(value, (int, float)) and value > 0: return int(value) except Exception: from SYS.logger import logger logger.exception("Failed to interpret total bytes value: %r", value) return None if event == "downloading": if use_live: try: if not _YTDLP_TRANSFER_STATE.get(label, {}).get("started"): pipeline.begin_transfer(label=label, total=_total_bytes(total)) _YTDLP_TRANSFER_STATE[label] = {"started": True} pipeline.update_transfer( label=label, completed=int(downloaded) if downloaded is not None else None, total=_total_bytes(total), ) except Exception: from SYS.logger import logger logger.exception("Failed to update pipeline transfer for label '%s'", label) else: _YTDLP_PROGRESS_BAR.update( downloaded=int(downloaded) if downloaded is not None else None, total=int(total) if total is not None else None, label=label, file=sys.stderr, ) elif event == "finished": if use_live: try: if _YTDLP_TRANSFER_STATE.get(label, {}).get("started"): pipeline.finish_transfer(label=label) except Exception: from SYS.logger import logger logger.exception("Failed to finish pipeline transfer for label '%s'", label) _YTDLP_TRANSFER_STATE.pop(label, None) else: _YTDLP_PROGRESS_BAR.finish() elif event in ("postprocessing", "processing"): return try: from SYS.metadata import extract_ytdlp_tags except ImportError: extract_ytdlp_tags = None # type: ignore def _is_http_403(exc: Exception) -> bool: msg_parts: list[str] = [] try: msg_parts.append(str(exc)) except Exception: pass try: cause = getattr(exc, "__cause__", None) if cause is not None: msg_parts.append(str(cause)) except Exception: pass try: context = getattr(exc, "__context__", None) if context is not None: msg_parts.append(str(context)) except Exception: pass for msg in msg_parts: if "HTTP Error 403" in msg or "403: Forbidden" in msg or "403 Forbidden" in msg: return True return False def download_media(opts: DownloadOptions, *, config: Optional[Dict[str, Any]] = None, debug_logger: Optional[DebugLogger] = None) -> Any: """Download streaming media exclusively via yt-dlp. Optional `config` dict may be provided so tool defaults (e.g., cookies, default format) are applied when constructing the YtDlpTool instance. """ debug(f"[download_media] start: {opts.url}") try: netloc = urlparse(opts.url).netloc.lower() except Exception: netloc = "" if "gofile.io" in netloc: msg = "GoFile links are currently unsupported" if not opts.quiet: debug(msg) if debug_logger is not None: debug_logger.write_record("gofile-unsupported", {"url": opts.url}) raise DownloadError(msg) ytdlp_supported = is_url_supported_by_ytdlp(opts.url) if not ytdlp_supported: msg = "URL not supported by yt-dlp; try download-file for manual downloads" if not opts.quiet: log(msg) if debug_logger is not None: debug_logger.write_record("ytdlp-unsupported", {"url": opts.url}) raise DownloadError(msg) if opts.playlist_items: debug( f"Skipping probe for playlist (item selection: {opts.playlist_items}), proceeding with download" ) probe_result: Optional[Dict[str, Any]] = {"url": opts.url} else: probe_cookiefile = None try: if opts.cookies_path and opts.cookies_path.is_file(): probe_cookiefile = str(opts.cookies_path) except Exception: probe_cookiefile = None probe_result = probe_url(opts.url, no_playlist=opts.no_playlist, timeout_seconds=15, cookiefile=probe_cookiefile) if probe_result is None: msg = "yt-dlp could not detect media for this URL; use download-file for direct downloads" if not opts.quiet: log(msg) if debug_logger is not None: debug_logger.write_record("ytdlp-skip-no-media", {"url": opts.url}) raise DownloadError(msg) ensure_yt_dlp_ready() # Use provided config when available so user tool settings are honored ytdlp_tool = YtDlpTool(config or {}) ytdl_options = ytdlp_tool.build_ytdlp_options(opts) hooks = ytdl_options.get("progress_hooks") if not isinstance(hooks, list): hooks = [] ytdl_options["progress_hooks"] = hooks if _progress_callback not in hooks: hooks.append(_progress_callback) if not opts.quiet: debug(f"Starting yt-dlp download: {opts.url}") if debug_logger is not None: debug_logger.write_record("ytdlp-start", {"url": opts.url}) assert yt_dlp is not None info: Optional[Dict[str, Any]] = None session_id = None first_section_info: Dict[str, Any] = {} try: if not opts.quiet: if ytdl_options.get("download_sections"): debug(f"[yt-dlp] download_sections: {ytdl_options['download_sections']}") debug(f"[yt-dlp] force_keyframes_at_cuts: {ytdl_options.get('force_keyframes_at_cuts', False)}") if ytdl_options.get("download_sections"): live_ui, _ = PipelineProgress(pipeline_context).ui_and_pipe_index() quiet_sections = bool(opts.quiet) or (live_ui is not None) session_id, first_section_info = _download_with_sections_via_cli( opts.url, ytdl_options, ytdl_options.get("download_sections", []), quiet=quiet_sections, ) info = None else: with yt_dlp.YoutubeDL(ytdl_options) as ydl: # type: ignore[arg-type] info = ydl.extract_info(opts.url, download=True) except Exception as exc: retry_attempted = False if _is_http_403(exc) and not ytdl_options.get("download_sections"): retry_attempted = True try: if not opts.quiet: debug("yt-dlp hit HTTP 403; retrying with browser cookies + android/web player client") fallback_options = dict(ytdl_options) fallback_options.pop("cookiefile", None) _add_browser_cookies_if_available(fallback_options) extractor_args = fallback_options.get("extractor_args") if not isinstance(extractor_args, dict): extractor_args = {} youtube_args = extractor_args.get("youtube") if not isinstance(youtube_args, dict): youtube_args = {} if "player_client" not in youtube_args: youtube_args["player_client"] = ["android", "web"] extractor_args["youtube"] = youtube_args fallback_options["extractor_args"] = extractor_args with yt_dlp.YoutubeDL(fallback_options) as ydl: # type: ignore[arg-type] info = ydl.extract_info(opts.url, download=True) except Exception as exc2: log(f"yt-dlp failed: {exc2}", file=sys.stderr) if debug_logger is not None: debug_logger.write_record( "exception", {"phase": "yt-dlp", "error": str(exc2), "traceback": traceback.format_exc()}, ) raise DownloadError("yt-dlp download failed") from exc2 if not retry_attempted: log(f"yt-dlp failed: {exc}", file=sys.stderr) if debug_logger is not None: debug_logger.write_record( "exception", {"phase": "yt-dlp", "error": str(exc), "traceback": traceback.format_exc()}, ) raise DownloadError("yt-dlp download failed") from exc if info is None: try: time.sleep(0.5) files = sorted(opts.output_dir.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True) if not files: raise FileNotFoundError(f"No files found in {opts.output_dir}") if opts.clip_sections and session_id: section_pattern = re.compile(rf"^{re.escape(session_id)}_(\d+)") matching_files = [f for f in files if section_pattern.search(f.name)] if matching_files: def extract_section_num(path: Path) -> int: match = section_pattern.search(path.name) return int(match.group(1)) if match else 999 matching_files.sort(key=extract_section_num) debug(f"Found {len(matching_files)} section file(s) matching pattern") by_index: Dict[int, List[Path]] = {} for f in matching_files: m = section_pattern.search(f.name) if not m: continue try: n = int(m.group(1)) except Exception: continue by_index.setdefault(n, []).append(f) renamed_media_files: List[Path] = [] for sec_num in sorted(by_index.keys()): group = by_index.get(sec_num) or [] if not group: continue def _is_subtitle(p: Path) -> bool: try: return p.suffix.lower() in _SUBTITLE_EXTS except Exception: return False media_candidates = [p for p in group if not _is_subtitle(p)] subtitle_candidates = [p for p in group if _is_subtitle(p)] media_file: Optional[Path] = None for cand in media_candidates: try: if cand.suffix.lower() in {".json", ".info.json"}: continue except Exception: from SYS.logger import logger logger.exception("Failed to inspect candidate suffix for %s", cand) media_file = cand break if media_file is None and media_candidates: media_file = media_candidates[0] if media_file is None: continue try: media_hash = sha256_file(media_file) except Exception as exc: debug(f"Failed to hash section media file {media_file.name}: {exc}") renamed_media_files.append(media_file) continue prefix = f"{session_id}_{sec_num}" def _tail(name: str) -> str: try: if name.startswith(prefix): return name[len(prefix):] except Exception: from SYS.logger import logger logger.exception("Failed to check name prefix for '%s'", name) try: return Path(name).suffix except Exception: from SYS.logger import logger logger.exception("Failed to obtain suffix for name '%s'", name) return "" try: new_media_name = f"{media_hash}{_tail(media_file.name)}" new_media_path = opts.output_dir / new_media_name if new_media_path.exists() and new_media_path != media_file: debug(f"File with hash {media_hash} already exists, using existing file.") try: media_file.unlink() except OSError: from SYS.logger import logger logger.exception("Failed to unlink duplicate media file %s", media_file) else: media_file.rename(new_media_path) debug(f"Renamed section file: {media_file.name} -> {new_media_name}") renamed_media_files.append(new_media_path) except Exception as exc: debug(f"Failed to rename section media file {media_file.name}: {exc}") renamed_media_files.append(media_file) new_media_path = media_file for sub_file in subtitle_candidates: try: new_sub_name = f"{media_hash}{_tail(sub_file.name)}" new_sub_path = opts.output_dir / new_sub_name if new_sub_path.exists() and new_sub_path != sub_file: try: sub_file.unlink() except OSError: pass else: sub_file.rename(new_sub_path) debug(f"Renamed section file: {sub_file.name} -> {new_sub_name}") except Exception as exc: debug(f"Failed to rename section subtitle file {sub_file.name}: {exc}") media_path = renamed_media_files[0] if renamed_media_files else matching_files[0] media_paths = renamed_media_files if renamed_media_files else None if not opts.quiet: count = len(media_paths) if isinstance(media_paths, list) else 1 debug(f"✓ Downloaded {count} section media file(s) (session: {session_id})") else: media_path = files[0] media_paths = None if not opts.quiet: debug(f"✓ Downloaded section file (pattern not found): {media_path.name}") else: media_path = files[0] media_paths = None if not opts.quiet: debug(f"✓ Downloaded: {media_path.name}") if debug_logger is not None: debug_logger.write_record("ytdlp-file-found", {"path": str(media_path)}) except Exception as exc: log(f"Error finding downloaded file: {exc}", file=sys.stderr) if debug_logger is not None: debug_logger.write_record("exception", {"phase": "find-file", "error": str(exc)}) raise DownloadError(str(exc)) from exc file_hash = sha256_file(media_path) section_tags: List[str] = [] title = "" if first_section_info: title = first_section_info.get("title", "") if title: section_tags.append(f"title:{title}") debug(f"Added title tag for section download: {title}") if first_section_info: info_dict_sec = first_section_info else: info_dict_sec = {"id": media_path.stem, "title": title or media_path.stem, "ext": media_path.suffix.lstrip(".")} return DownloadMediaResult(path=media_path, info=info_dict_sec, tag=section_tags, source_url=opts.url, hash_value=file_hash, paths=media_paths) if not isinstance(info, dict): log(f"Unexpected yt-dlp response: {type(info)}", file=sys.stderr) raise DownloadError("Unexpected yt-dlp response type") info_dict: Dict[str, Any] = cast(Dict[str, Any], info) if debug_logger is not None: debug_logger.write_record("ytdlp-info", {"keys": sorted(info_dict.keys()), "is_playlist": bool(info_dict.get("entries"))}) if info_dict.get("entries") and not opts.no_playlist: resolved = _resolve_entries_and_paths(info_dict, opts.output_dir) if resolved: results: List[DownloadMediaResult] = [] for entry, media_path in resolved: hash_value = _extract_sha256(entry) or _extract_sha256(info_dict) if not hash_value: try: hash_value = sha256_file(media_path) except OSError: hash_value = None tags: List[str] = [] if extract_ytdlp_tags is not None: try: tags = extract_ytdlp_tags(entry) except Exception as exc: log(f"Error extracting tags: {exc}", file=sys.stderr) source_url = entry.get("webpage_url") or entry.get("original_url") or entry.get("url") or opts.url results.append( DownloadMediaResult( path=media_path, info=entry, tag=tags, source_url=source_url, hash_value=hash_value, ) ) if not opts.quiet: debug(f"✓ Downloaded playlist items: {len(results)}") return results try: entry, media_path = _resolve_entry_and_path(info_dict, opts.output_dir) except FileNotFoundError as exc: log(f"Error: {exc}", file=sys.stderr) if debug_logger is not None: debug_logger.write_record("exception", {"phase": "resolve-path", "error": str(exc)}) raise DownloadError(str(exc)) from exc if debug_logger is not None: debug_logger.write_record("resolved-media", {"path": str(media_path), "entry_keys": sorted(entry.keys())}) hash_value = _extract_sha256(entry) or _extract_sha256(info_dict) if not hash_value: try: hash_value = sha256_file(media_path) except OSError as exc: if debug_logger is not None: debug_logger.write_record("hash-error", {"path": str(media_path), "error": str(exc)}) tags_res: List[str] = [] if extract_ytdlp_tags is not None: try: tags_res = extract_ytdlp_tags(entry) except Exception as exc: log(f"Error extracting tags: {exc}", file=sys.stderr) source_url = entry.get("webpage_url") or entry.get("original_url") or entry.get("url") if not opts.quiet: debug(f"✓ Downloaded: {media_path.name} ({len(tags_res)} tags)") if debug_logger is not None: debug_logger.write_record( "downloaded", { "path": str(media_path), "tag_count": len(tags_res), "source_url": source_url, "sha256": hash_value, }, ) return DownloadMediaResult(path=media_path, info=entry, tag=tags_res, source_url=source_url, hash_value=hash_value) def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300, config: Optional[Dict[str, Any]] = None) -> Any: import threading from typing import cast result_container: List[Optional[Any]] = [None, None] def _do_download() -> None: try: result_container[0] = download_media(opts, config=config) except Exception as exc: result_container[1] = exc # Use daemon=True so a hung download doesn't block process exit if the wall timeout hits. thread = threading.Thread(target=_do_download, daemon=True) thread.start() start_time = time.monotonic() # We use two timeouts: # 1. Activity timeout (no progress updates for X seconds) # 2. Hard wall-clock timeout (total time for this URL) # The wall-clock timeout is slightly larger than the activity timeout # to allow for slow-but-steady progress, up to a hard cap (e.g. 10 minutes). wall_timeout = max(timeout_seconds * 2, 600) _record_progress_activity(start_time) try: while thread.is_alive(): thread.join(1) if not thread.is_alive(): break now = time.monotonic() # Check activity timeout last_activity = _get_last_progress_activity() if last_activity <= 0: last_activity = start_time if now - last_activity > timeout_seconds: raise DownloadError(f"Download activity timeout after {timeout_seconds} seconds for {opts.url}") # Check hard wall-clock timeout if now - start_time > wall_timeout: raise DownloadError(f"Download hard timeout after {wall_timeout} seconds for {opts.url}") finally: _clear_progress_activity() if result_container[1] is not None: raise cast(Exception, result_container[1]) if result_container[0] is None: raise DownloadError(f"Download failed for {opts.url}") return cast(Any, result_container[0])