Files
Medios-Macina/tool/ytdlp.py

2191 lines
81 KiB
Python
Raw Normal View History

2026-02-09 17:45:57 -08:00
# pyright: reportUnusedFunction=false
2025-12-16 23:23:43 -08:00
from __future__ import annotations
2026-01-01 20:37:27 -08:00
import hashlib
import json
2025-12-20 23:57:44 -08:00
import os
2026-01-01 20:37:27 -08:00
import random
import re
import string
import subprocess
import sys
import threading
import time
import traceback
2025-12-16 23:23:43 -08:00
from dataclasses import dataclass
from pathlib import Path
2026-01-01 20:37:27 -08:00
from typing import Any, Dict, Iterator, List, Optional, Sequence, cast
from urllib.parse import urlparse
from SYS import pipeline as pipeline_context
from SYS.logger import debug, log
from SYS.models import (
DebugLogger,
DownloadError,
DownloadMediaResult,
DownloadOptions,
ProgressBar,
)
from SYS.pipeline_progress import PipelineProgress
from SYS.utils import ensure_directory, sha256_file
2026-01-20 16:42:49 -08:00
from SYS.yt_metadata import extract_ytdlp_tags
2025-12-16 23:23:43 -08:00
2026-01-05 07:51:19 -08:00
_YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {}
2026-01-01 20:37:27 -08:00
try:
import yt_dlp # type: ignore
from yt_dlp.extractor import gen_extractors # type: ignore
except Exception as exc: # pragma: no cover - handled at runtime
yt_dlp = None # type: ignore
gen_extractors = None # type: ignore
2026-01-19 06:24:09 -08:00
YTDLP_IMPORT_ERROR: Optional[Exception] = exc
2026-01-01 20:37:27 -08:00
else:
YTDLP_IMPORT_ERROR = None
_EXTRACTOR_CACHE: List[Any] | None = None
2025-12-16 23:23:43 -08:00
2026-01-20 16:42:49 -08:00
# Patterns for domain extraction from yt-dlp regexes
# 1) Alternation group followed by \.tld e.g. (?:youtube|youtu|youtube-nocookie)\.com
ALT_GROUP_TLD = re.compile(r'\((?:\?:)?([^\)]+)\)\\\.(?P<tld>[A-Za-z0-9.+-]+)')
# 2) Literal domain pieces like youtube\.com or youtu\.be (not preceded by a group)
LITERAL_DOMAIN = re.compile(r'(?<!\()(?<!\|)(?<!:)([A-Za-z0-9][A-Za-z0-9_-]{0,})\\\.([A-Za-z0-9.+-]+)')
# 3) Partial domain tokens that appear alone (e.g., zhihu) — treat as zhihu.com fallback
PARTIAL_TOKEN = re.compile(r'(?<![A-Za-z0-9_-])([A-Za-z0-9][A-Za-z0-9_-]{1,})(?=(?:\\?[/\)\$]|\\\.|$))')
_SUPPORTED_DOMAINS: set[str] | None = None
def normalize_patterns(valid_url) -> List[str]:
if not valid_url:
return []
if isinstance(valid_url, str):
return [valid_url]
if isinstance(valid_url, (list, tuple)):
return [p for p in valid_url if isinstance(p, str)]
return []
def extract_from_pattern(pat: str) -> set[str]:
domains = set()
# 1) Alternation groups followed by .tld
for alt_group, tld in ALT_GROUP_TLD.findall(pat):
# alt_group like "youtube|youtu|youtube-nocookie"
for alt in alt_group.split('|'):
alt = alt.strip()
# remove any non-domain tokens like (?:www\.)? if present inside alt (rare)
alt = re.sub(r'\(\?:www\\\.\)\?', '', alt)
if alt:
domains.add(f"{alt}.{tld}".lower())
# 2) Literal domain matches (youtube\.com)
for name, tld in LITERAL_DOMAIN.findall(pat):
domains.add(f"{name}.{tld}".lower())
# 3) Partial tokens fallback (only if we didn't already capture domains)
# This helps when regexes contain plain tokens like 'zhihu' or 'vimeo' without .com
if not domains:
for token in PARTIAL_TOKEN.findall(pat):
# ignore common regex words that are not domains
if len(token) <= 2:
continue
# avoid tokens that are clearly regex constructs
if token.lower() in {"https", "http", "www", "com", "net", "org"}:
continue
domains.add(f"{token.lower()}.com")
return domains
def extract_domains(valid_url) -> set[str]:
patterns = normalize_patterns(valid_url)
all_domains = set()
for pat in patterns:
all_domains |= extract_from_pattern(pat)
# final cleanup: remove obvious junk like 'com.com' if present
cleaned = set()
for d in all_domains:
# drop duplicates where left side equals tld (e.g., com.com)
parts = d.split('.')
if len(parts) >= 2 and parts[-2] == parts[-1]:
continue
cleaned.add(d)
return cleaned
def _build_supported_domains() -> set[str]:
global _SUPPORTED_DOMAINS
if _SUPPORTED_DOMAINS is not None:
return _SUPPORTED_DOMAINS
_SUPPORTED_DOMAINS = set()
if gen_extractors is None:
return _SUPPORTED_DOMAINS
try:
for e in gen_extractors():
name = getattr(e, "IE_NAME", "").lower()
if name == "generic":
continue
regex = getattr(e, "_VALID_URL", None)
domains = extract_domains(regex)
_SUPPORTED_DOMAINS.update(domains)
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to build supported domains from yt-dlp extractors")
2026-01-20 16:42:49 -08:00
return _SUPPORTED_DOMAINS
2025-12-16 23:23:43 -08:00
def _get_nested(config: Dict[str, Any], *path: str) -> Any:
cur: Any = config
for key in path:
if not isinstance(cur, dict):
return None
cur = cur.get(key)
return cur
def _parse_csv_list(value: Any) -> Optional[List[str]]:
if value is None:
return None
if isinstance(value, list):
out: List[str] = []
for item in value:
s = str(item).strip()
if s:
out.append(s)
return out or None
s = str(value).strip()
if not s:
return None
# allow either JSON-ish list strings or simple comma-separated values
if s.startswith("[") and s.endswith("]"):
s = s[1:-1]
parts = [p.strip() for p in s.split(",")]
parts = [p for p in parts if p]
return parts or None
2026-01-21 15:08:22 -08:00
_BROWSER_COOKIES_AVAILABLE: Optional[bool] = None
_BROWSER_COOKIE_WARNING_EMITTED = False
def _browser_cookie_candidate_paths() -> List[Path]:
try:
home = Path.home()
except Exception:
home = Path.cwd()
candidates: List[Path] = []
if os.name == "nt":
for env_value in (os.getenv("LOCALAPPDATA"), os.getenv("APPDATA")):
if not env_value:
continue
base_path = Path(env_value)
if not base_path:
continue
candidates.extend([
base_path / "Google" / "Chrome" / "User Data" / "Default" / "Cookies",
base_path / "Chromium" / "User Data" / "Default" / "Cookies",
base_path / "BraveSoftware" / "Brave-Browser" / "User Data" / "Default" / "Cookies",
])
else:
candidates.extend([
home / ".config" / "google-chrome" / "Default" / "Cookies",
home / ".config" / "chromium" / "Default" / "Cookies",
home / ".config" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies",
])
if sys.platform == "darwin":
candidates.extend([
home / "Library" / "Application Support" / "Google" / "Chrome" / "Default" / "Cookies",
home / "Library" / "Application Support" / "Chromium" / "Default" / "Cookies",
home / "Library" / "Application Support" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies",
])
return candidates
def _has_browser_cookie_database() -> bool:
global _BROWSER_COOKIES_AVAILABLE
if _BROWSER_COOKIES_AVAILABLE is not None:
return _BROWSER_COOKIES_AVAILABLE
for path in _browser_cookie_candidate_paths():
try:
if path.is_file():
_BROWSER_COOKIES_AVAILABLE = True
return True
except Exception:
continue
_BROWSER_COOKIES_AVAILABLE = False
return False
2026-01-30 12:04:37 -08:00
def _browser_cookie_path_for(browser_name: str) -> Optional[Path]:
"""Return the cookie DB Path for a specific browser if present, else None.
Supported browsers (case-insensitive): "chrome", "chromium", "brave".
"""
name = str(browser_name or "").strip().lower()
if not name:
return None
try:
home = Path.home()
except Exception:
home = Path.cwd()
# Windows
if os.name == "nt":
for env_value in (os.getenv("LOCALAPPDATA"), os.getenv("APPDATA")):
if not env_value:
continue
base = Path(env_value)
if name in ("chrome", "google-chrome"):
p = base / "Google" / "Chrome" / "User Data" / "Default" / "Cookies"
if p.is_file():
return p
if name == "chromium":
p = base / "Chromium" / "User Data" / "Default" / "Cookies"
if p.is_file():
return p
if name in ("brave", "brave-browser"):
p = base / "BraveSoftware" / "Brave-Browser" / "User Data" / "Default" / "Cookies"
if p.is_file():
return p
# *nix and macOS
if sys.platform == "darwin":
if name in ("chrome", "google-chrome"):
p = home / "Library" / "Application Support" / "Google" / "Chrome" / "Default" / "Cookies"
if p.is_file():
return p
if name == "chromium":
p = home / "Library" / "Application Support" / "Chromium" / "Default" / "Cookies"
if p.is_file():
return p
if name in ("brave", "brave-browser"):
p = home / "Library" / "Application Support" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies"
if p.is_file():
return p
# Linux and other
if name in ("chrome", "google-chrome"):
p = home / ".config" / "google-chrome" / "Default" / "Cookies"
if p.is_file():
return p
if name == "chromium":
p = home / ".config" / "chromium" / "Default" / "Cookies"
if p.is_file():
return p
if name in ("brave", "brave-browser"):
p = home / ".config" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies"
if p.is_file():
return p
return None
def _add_browser_cookies_if_available(options: Dict[str, Any], preferred_browser: Optional[str] = None) -> None:
2026-01-21 15:08:22 -08:00
global _BROWSER_COOKIE_WARNING_EMITTED
2026-01-30 12:04:37 -08:00
# If a preferred browser is specified, try to use it if available
if preferred_browser:
try:
if _browser_cookie_path_for(preferred_browser) is not None:
options["cookiesfrombrowser"] = [preferred_browser]
return
else:
if not _BROWSER_COOKIE_WARNING_EMITTED:
log(f"Requested browser cookie DB '{preferred_browser}' not found; falling back to autodetect.")
_BROWSER_COOKIE_WARNING_EMITTED = True
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to check browser cookie path for preferred browser '%s'", preferred_browser)
2026-01-30 12:04:37 -08:00
# Auto-detect in common order (chrome/chromium/brave)
for candidate in ("chrome", "chromium", "brave"):
try:
if _browser_cookie_path_for(candidate) is not None:
options["cookiesfrombrowser"] = [candidate]
return
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Error while checking cookie path for candidate browser '%s'", candidate)
2026-01-30 12:04:37 -08:00
continue
2026-01-21 15:08:22 -08:00
if not _BROWSER_COOKIE_WARNING_EMITTED:
log(
"Browser cookie extraction skipped because no Chrome-compatible cookie database was found. "
"Provide a cookies file via config or --cookies if authentication is required."
)
_BROWSER_COOKIE_WARNING_EMITTED = True
2026-01-01 20:37:27 -08:00
def ensure_yt_dlp_ready() -> None:
"""Verify yt-dlp is importable, raising DownloadError if missing."""
if yt_dlp is not None:
return
detail = str(YTDLP_IMPORT_ERROR or "yt-dlp is not installed")
raise DownloadError(f"yt-dlp module not available: {detail}")
def _get_extractors() -> List[Any]:
global _EXTRACTOR_CACHE
if _EXTRACTOR_CACHE is not None:
return _EXTRACTOR_CACHE
ensure_yt_dlp_ready()
if gen_extractors is None:
_EXTRACTOR_CACHE = []
return _EXTRACTOR_CACHE
try:
_EXTRACTOR_CACHE = [ie for ie in gen_extractors()]
except Exception:
_EXTRACTOR_CACHE = []
return _EXTRACTOR_CACHE
def is_url_supported_by_ytdlp(url: str) -> bool:
"""Return True if yt-dlp has a non-generic extractor for the URL."""
if not url or not isinstance(url, str):
return False
if YTDLP_IMPORT_ERROR is not None:
return False
try:
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
return False
except Exception:
return False
try:
2026-01-20 16:42:49 -08:00
parsed = urlparse(url)
domain = parsed.netloc.lower()
if not domain:
return False
supported = _build_supported_domains()
for base in supported:
if domain == base or domain.endswith("." + base):
return True
2026-01-01 20:37:27 -08:00
except Exception:
return False
return False
2026-01-21 14:06:18 -08:00
_FORMATS_CACHE: Dict[str, tuple[float, List[Dict[str, Any]]]] = {}
2026-01-01 20:37:27 -08:00
def list_formats(
url: str,
*,
no_playlist: bool = False,
playlist_items: Optional[str] = None,
cookiefile: Optional[str] = None,
2026-01-18 03:18:48 -08:00
timeout_seconds: int = 20,
2026-01-01 20:37:27 -08:00
) -> Optional[List[Dict[str, Any]]]:
"""Get available formats for a URL.
Returns a list of format dicts or None if unsupported or probing fails.
"""
if not is_url_supported_by_ytdlp(url):
return None
2026-01-21 14:06:18 -08:00
# Cache format probes to avoid redundant network hits
cache_key = hashlib.md5(f"{url}|{no_playlist}|{playlist_items}|{cookiefile}".encode()).hexdigest()
now = time.monotonic()
if cache_key in _FORMATS_CACHE:
ts, result = _FORMATS_CACHE[cache_key]
if now - ts < 300: # 5 minute cache for formats
return result
2026-01-18 03:18:48 -08:00
result_container: List[Optional[Any]] = [None, None] # [result, error]
2026-01-01 20:37:27 -08:00
2026-01-18 03:18:48 -08:00
def _do_list() -> None:
try:
ensure_yt_dlp_ready()
assert yt_dlp is not None
2026-01-01 20:37:27 -08:00
2026-01-18 03:18:48 -08:00
ydl_opts: Dict[str, Any] = {
"quiet": True,
"no_warnings": True,
"skip_download": True,
"noprogress": True,
"socket_timeout": min(10, max(1, int(timeout_seconds))),
"retries": 2,
}
2026-01-12 16:15:51 -08:00
2026-01-18 03:18:48 -08:00
if cookiefile:
ydl_opts["cookiefile"] = str(cookiefile)
else:
# Best effort attempt to use browser cookies if no file is explicitly passed
2026-01-21 15:08:22 -08:00
_add_browser_cookies_if_available(ydl_opts)
2026-01-01 20:37:27 -08:00
2026-01-18 03:18:48 -08:00
if no_playlist:
ydl_opts["noplaylist"] = True
if playlist_items:
ydl_opts["playlist_items"] = str(playlist_items)
2026-01-01 20:37:27 -08:00
2026-01-18 03:18:48 -08:00
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
info = ydl.extract_info(url, download=False)
if not isinstance(info, dict):
result_container[0] = None
return
formats = info.get("formats")
if not isinstance(formats, list):
result_container[0] = None
return
out: List[Dict[str, Any]] = []
for fmt in formats:
if isinstance(fmt, dict):
out.append(fmt)
result_container[0] = out
except Exception as exc:
debug(f"yt-dlp format probe failed for {url}: {exc}")
result_container[1] = exc
# Use daemon=True so a hung thread doesn't block process exit
thread = threading.Thread(target=_do_list, daemon=True)
thread.start()
thread.join(timeout=max(1, int(timeout_seconds)))
2026-01-01 20:37:27 -08:00
2026-01-18 03:18:48 -08:00
if thread.is_alive():
debug(f"yt-dlp format probe timed out for {url} (>={timeout_seconds}s)")
2026-01-01 20:37:27 -08:00
return None
2026-01-18 03:18:48 -08:00
if result_container[1] is not None:
return None
2026-01-01 20:37:27 -08:00
2026-01-21 14:06:18 -08:00
if result_container[0] is not None:
_FORMATS_CACHE[cache_key] = (now, cast(List[Dict[str, Any]], result_container[0]))
2026-01-18 03:18:48 -08:00
return cast(Optional[List[Dict[str, Any]]], result_container[0])
2026-01-01 20:37:27 -08:00
2026-01-21 14:06:18 -08:00
_PROBE_CACHE: Dict[str, tuple[float, Dict[str, Any]]] = {}
2026-01-01 20:37:27 -08:00
def probe_url(
url: str,
no_playlist: bool = False,
timeout_seconds: int = 15,
*,
cookiefile: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""Probe URL metadata without downloading.
Returns None if unsupported, errors, or times out.
"""
if not is_url_supported_by_ytdlp(url):
return None
2026-01-21 14:06:18 -08:00
# Simple in-memory cache to avoid duplicate probes for the same URL/options in a short window.
cache_key = hashlib.md5(f"{url}|{no_playlist}|{cookiefile}".encode()).hexdigest()
now = time.monotonic()
if cache_key in _PROBE_CACHE:
ts, result = _PROBE_CACHE[cache_key]
if now - ts < 60: # 60 second cache
return result
2026-01-01 20:37:27 -08:00
result_container: List[Optional[Any]] = [None, None] # [result, error]
def _do_probe() -> None:
try:
2026-01-18 03:18:48 -08:00
debug(f"[probe] Starting probe for {url}")
2026-01-01 20:37:27 -08:00
ensure_yt_dlp_ready()
assert yt_dlp is not None
ydl_opts: Dict[str, Any] = {
"quiet": True,
"no_warnings": True,
"socket_timeout": 10,
"retries": 2,
"skip_download": True,
"extract_flat": "in_playlist",
"noprogress": True,
}
if cookiefile:
ydl_opts["cookiefile"] = str(cookiefile)
2026-01-21 14:06:18 -08:00
else:
# Best effort fallback
2026-01-21 15:08:22 -08:00
_add_browser_cookies_if_available(ydl_opts)
2026-01-21 14:06:18 -08:00
2026-01-01 20:37:27 -08:00
if no_playlist:
ydl_opts["noplaylist"] = True
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
2026-01-18 03:18:48 -08:00
debug(f"[probe] ytdlp extract_info (download=False) start: {url}")
2026-01-01 20:37:27 -08:00
info = ydl.extract_info(url, download=False)
2026-01-18 03:18:48 -08:00
debug(f"[probe] ytdlp extract_info (download=False) done: {url}")
2026-01-01 20:37:27 -08:00
if not isinstance(info, dict):
result_container[0] = None
return
webpage_url = info.get("webpage_url") or info.get("original_url") or info.get("url")
result_container[0] = {
"extractor": info.get("extractor", ""),
"title": info.get("title", ""),
"entries": info.get("entries", []),
"duration": info.get("duration"),
"uploader": info.get("uploader"),
"description": info.get("description"),
"requested_url": url,
"webpage_url": webpage_url,
"url": webpage_url or url,
}
except Exception as exc:
debug(f"Probe error for {url}: {exc}")
result_container[1] = exc
2026-01-18 03:18:48 -08:00
# Use daemon=True so a hung probe doesn't block the process
thread = threading.Thread(target=_do_probe, daemon=True)
2026-01-01 20:37:27 -08:00
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
debug(f"Probe timeout for {url} (>={timeout_seconds}s), proceeding without probe")
return None
if result_container[1] is not None:
return None
2026-01-21 14:06:18 -08:00
if result_container[0] is not None:
_PROBE_CACHE[cache_key] = (now, cast(Dict[str, Any], result_container[0]))
2026-01-01 20:37:27 -08:00
return cast(Optional[Dict[str, Any]], result_container[0])
2026-01-10 17:30:18 -08:00
def is_browseable_format(fmt: Any) -> bool:
"""Check if a format is user-browseable (not storyboard, metadata, etc).
Used by the ytdlp format selector to filter out non-downloadable formats.
Returns False for:
- MHTML, JSON sidecar metadata
- Storyboard/thumbnail formats
- Audio-only or video-only when both available
Args:
fmt: Format dict from yt-dlp with keys like format_id, ext, vcodec, acodec, format_note
Returns:
bool: True if format is suitable for browsing/selection
"""
if not isinstance(fmt, dict):
return False
format_id = str(fmt.get("format_id") or "").strip()
if not format_id:
return False
# Filter out metadata/sidecar formats
ext = str(fmt.get("ext") or "").strip().lower()
if ext in {"mhtml", "json"}:
return False
# Filter out storyboard/thumbnail formats
note = str(fmt.get("format_note") or "").lower()
if "storyboard" in note:
return False
if format_id.lower().startswith("sb"):
return False
# Filter out formats with no audio and no video
vcodec = str(fmt.get("vcodec", "none"))
acodec = str(fmt.get("acodec", "none"))
return not (vcodec == "none" and acodec == "none")
def format_for_table_selection(
fmt: Dict[str, Any],
url: str,
index: int,
*,
selection_format_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Format a yt-dlp format dict into a table result row for selection.
This helper formats a single format from list_formats() into the shape
expected by the ResultTable system, ready for user selection and routing
2026-01-17 02:36:06 -08:00
to download-file with -query "format:<id>".
2026-01-10 17:30:18 -08:00
Args:
fmt: Format dict from yt-dlp
url: The URL this format came from
index: Row number for display (1-indexed)
selection_format_id: Override format_id for selection (e.g., with +ba suffix)
Returns:
dict: Format result row with _selection_args for table system
Example:
fmts = list_formats("https://youtube.com/watch?v=abc")
browseable = [f for f in fmts if is_browseable_format(f)]
results = [format_for_table_selection(f, url, i+1) for i, f in enumerate(browseable)]
"""
format_id = fmt.get("format_id", "")
resolution = fmt.get("resolution", "")
ext = fmt.get("ext", "")
vcodec = fmt.get("vcodec", "none")
acodec = fmt.get("acodec", "none")
filesize = fmt.get("filesize")
filesize_approx = fmt.get("filesize_approx")
# If not provided, compute selection format ID (add +ba for video-only)
if selection_format_id is None:
selection_format_id = format_id
try:
if vcodec != "none" and acodec == "none" and format_id:
selection_format_id = f"{format_id}+ba"
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to compute selection_format_id for format: %s", fmt)
2026-01-10 17:30:18 -08:00
# Format file size
size_str = ""
size_prefix = ""
size_bytes = filesize or filesize_approx
try:
if isinstance(size_bytes, (int, float)) and size_bytes > 0:
size_mb = float(size_bytes) / (1024 * 1024)
size_str = f"{size_prefix}{size_mb:.1f}MB"
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to compute size string for format: %s", fmt)
2026-01-10 17:30:18 -08:00
# Build description
desc_parts: List[str] = []
if resolution and resolution != "audio only":
desc_parts.append(resolution)
if ext:
desc_parts.append(str(ext).upper())
if vcodec != "none":
desc_parts.append(f"v:{vcodec}")
if acodec != "none":
desc_parts.append(f"a:{acodec}")
if size_str:
desc_parts.append(size_str)
format_desc = " | ".join(desc_parts)
# Build table row
return {
"table": "download-file",
"title": f"Format {format_id}",
"url": url,
"target": url,
"detail": format_desc,
"annotations": [ext, resolution] if resolution else [ext],
"media_kind": "format",
"columns": [
("ID", format_id),
("Resolution", resolution or "N/A"),
("Ext", ext),
("Size", size_str or ""),
("Video", vcodec),
("Audio", acodec),
],
"full_metadata": {
"format_id": format_id,
"url": url,
"item_selector": selection_format_id,
2026-01-17 02:36:06 -08:00
"_selection_args": ["-query", f"format:{selection_format_id}"],
2026-01-10 17:30:18 -08:00
},
2026-01-17 02:36:06 -08:00
"_selection_args": ["-query", f"format:{selection_format_id}"],
2026-01-10 17:30:18 -08:00
}
2025-12-16 23:23:43 -08:00
@dataclass(slots=True)
class YtDlpDefaults:
"""User-tunable defaults for yt-dlp behavior.
Recommended config.conf keys (top-level dotted keys):
2026-01-30 12:04:37 -08:00
- format="best|1080|720|640|audio"
2025-12-16 23:23:43 -08:00
- ytdlp.format_sort="res:2160,res:1440,res:1080,res:720,res"
Cookies:
- cookies="C:\\path\\cookies.txt" (already supported by config.resolve_cookies_path)
2026-01-30 12:04:37 -08:00
- cookies_from_browser="auto|none|chrome|brave|chromium"
2025-12-16 23:23:43 -08:00
"""
2026-01-30 12:04:37 -08:00
format: str = "best"
2025-12-16 23:23:43 -08:00
video_format: str = "bestvideo+bestaudio/best"
audio_format: str = "251/140/bestaudio"
format_sort: Optional[List[str]] = None
2026-01-30 12:04:37 -08:00
cookies_from_browser: Optional[str] = None
2025-12-16 23:23:43 -08:00
class YtDlpTool:
"""Centralizes yt-dlp defaults and translation helpers.
This is intentionally small and dependency-light so cmdlets can use it without
forcing a full refactor.
"""
2025-12-29 17:05:03 -08:00
def __init__(
self,
config: Optional[Dict[str,
Any]] = None,
*,
script_dir: Optional[Path] = None
2025-12-29 17:05:03 -08:00
) -> None:
self._config: Dict[str,
Any] = dict(config or {})
2025-12-16 23:23:43 -08:00
# `resolve_cookies_path` expects the app root so it can fall back to ./cookies.txt.
# This file lives under ./tool/, so default to the parent directory.
self._script_dir = script_dir or Path(__file__).resolve().parent.parent
self.defaults = self._load_defaults()
self._cookiefile: Optional[Path] = self._init_cookiefile()
def _init_cookiefile(self) -> Optional[Path]:
"""Resolve cookies once at tool init (yt-dlp is the primary consumer)."""
try:
from SYS.config import resolve_cookies_path
2025-12-16 23:23:43 -08:00
resolved = resolve_cookies_path(self._config, script_dir=self._script_dir)
if resolved is not None and resolved.is_file():
return resolved
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to initialize cookiefile using resolve_cookies_path")
2025-12-16 23:23:43 -08:00
return None
2026-01-27 14:56:01 -08:00
def resolve_height_selector(self, format_str: Optional[str]) -> Optional[str]:
"""Resolve numeric heights (720, 1080p) to yt-dlp height selectors.
Examples:
"720" -> "bv*[height<=720]+ba"
"1080p" -> "bv*[height<=1080]+ba"
"""
if not format_str or not isinstance(format_str, str):
return None
s = format_str.strip().lower()
if not s:
return None
# Strip trailing 'p' if present (e.g. 720p -> 720)
if s.endswith('p'):
s = s[:-1]
if s.isdigit():
height = int(s)
if height >= 144:
return f"bv*[height<={height}]+ba"
return None
2025-12-16 23:23:43 -08:00
def _load_defaults(self) -> YtDlpDefaults:
cfg = self._config
2025-12-21 05:10:09 -08:00
# NOTE: `YtDlpDefaults` is a slots dataclass. Referencing defaults via
# `YtDlpDefaults.video_format` yields a `member_descriptor`, not the
# default string value. Use an instance for fallback defaults.
_fallback_defaults = YtDlpDefaults()
2025-12-16 23:23:43 -08:00
tool_block = _get_nested(cfg, "tool", "ytdlp")
if not isinstance(tool_block, dict):
tool_block = {}
ytdlp_block = cfg.get("ytdlp") if isinstance(cfg.get("ytdlp"),
dict) else {}
2025-12-16 23:23:43 -08:00
if not isinstance(ytdlp_block, dict):
ytdlp_block = {}
# Accept both nested and flat styles.
video_format = (
tool_block.get("video_format") or tool_block.get("format")
or ytdlp_block.get("video_format") or ytdlp_block.get("video")
or ytdlp_block.get("format_video") or cfg.get("ytdlp_video_format")
2025-12-16 23:23:43 -08:00
)
audio_format = (
tool_block.get("audio_format") or ytdlp_block.get("audio_format")
or ytdlp_block.get("audio") or ytdlp_block.get("format_audio")
2025-12-16 23:23:43 -08:00
or cfg.get("ytdlp_audio_format")
)
# Also accept dotted keys written as nested dicts: ytdlp.format.video, ytdlp.format.audio
nested_video = _get_nested(cfg, "ytdlp", "format", "video")
nested_audio = _get_nested(cfg, "ytdlp", "format", "audio")
fmt_sort_val = (
tool_block.get("format_sort") or ytdlp_block.get("format_sort")
or ytdlp_block.get("formatSort") or cfg.get("ytdlp_format_sort")
or _get_nested(cfg,
"ytdlp",
"format",
"sort")
2025-12-16 23:23:43 -08:00
)
fmt_sort = _parse_csv_list(fmt_sort_val)
2026-01-30 12:04:37 -08:00
# Cookie source preference: allow forcing a browser DB or 'auto'/'none'
cookies_pref = (
tool_block.get("cookies_from_browser")
or tool_block.get("cookiesfrombrowser")
or ytdlp_block.get("cookies_from_browser")
or ytdlp_block.get("cookiesfrombrowser")
or cfg.get("ytdlp_cookies_from_browser")
or _get_nested(cfg, "ytdlp", "cookies_from_browser")
)
# Unified format preference: prefer explicit 'format' key but accept legacy keys
format_pref = (
tool_block.get("format")
or tool_block.get("video_format")
or ytdlp_block.get("format")
or ytdlp_block.get("video_format")
or cfg.get("ytdlp_format")
or cfg.get("ytdlp_video_format")
or _get_nested(cfg, "ytdlp", "format")
)
2025-12-16 23:23:43 -08:00
defaults = YtDlpDefaults(
2026-01-30 12:04:37 -08:00
format=str(format_pref).strip() if format_pref else "best",
video_format=str(
nested_video or video_format or _fallback_defaults.video_format
),
audio_format=str(
nested_audio or audio_format or _fallback_defaults.audio_format
),
2025-12-16 23:23:43 -08:00
format_sort=fmt_sort,
2026-01-30 12:04:37 -08:00
cookies_from_browser=(str(cookies_pref).strip() if cookies_pref else None),
2025-12-16 23:23:43 -08:00
)
return defaults
2026-01-30 12:04:37 -08:00
2025-12-16 23:23:43 -08:00
def resolve_cookiefile(self) -> Optional[Path]:
return self._cookiefile
def default_format(self, mode: str) -> str:
2026-01-30 12:04:37 -08:00
"""Determine the final yt-dlp format string.
Priority:
- If caller explicitly requested audio mode (mode == 'audio'), return audio format.
- If configured default format is 'audio', return audio format.
- If configured default is 'best' or blank, return video_format.
- Otherwise return the configured format value (e.g., '720').
"""
2025-12-16 23:23:43 -08:00
m = str(mode or "").lower().strip()
if m == "audio":
return self.defaults.audio_format
2026-01-30 12:04:37 -08:00
cfg = (str(self.defaults.format or "")).strip()
lc = cfg.lower()
if lc == "audio":
return self.defaults.audio_format
if not cfg or lc == "best":
return self.defaults.video_format
return cfg
2025-12-16 23:23:43 -08:00
2025-12-20 23:57:44 -08:00
def build_ytdlp_options(self, opts: DownloadOptions) -> Dict[str, Any]:
"""Translate DownloadOptions into yt-dlp API options."""
ensure_directory(opts.output_dir)
outtmpl = str((opts.output_dir / "%(title)s.%(ext)s").resolve())
base_options: Dict[str,
Any] = {
"outtmpl": outtmpl,
"quiet": True,
"no_warnings": True,
"noprogress": True,
"socket_timeout": 30,
"retries": 10,
"fragment_retries": 10,
"http_chunk_size": 10_485_760,
"restrictfilenames": True,
2026-02-09 17:34:40 -08:00
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36",
"referer": "https://www.youtube.com/",
}
2025-12-20 23:57:44 -08:00
2026-02-09 17:34:40 -08:00
base_options.setdefault(
"http_headers",
{
"User-Agent": base_options.get("user_agent"),
"Referer": base_options.get("referer"),
},
)
2025-12-20 23:57:44 -08:00
try:
repo_root = Path(__file__).resolve().parents[1]
bundled_ffmpeg_dir = repo_root / "MPV" / "ffmpeg" / "bin"
if bundled_ffmpeg_dir.exists():
base_options.setdefault("ffmpeg_location", str(bundled_ffmpeg_dir))
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to inspect bundled ffmpeg directory")
2025-12-20 23:57:44 -08:00
try:
if os.name == "nt":
base_options.setdefault("file_access_retries", 40)
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to set Windows-specific yt-dlp options")
2025-12-20 23:57:44 -08:00
if opts.cookies_path and opts.cookies_path.is_file():
base_options["cookiefile"] = str(opts.cookies_path)
else:
cookiefile = self.resolve_cookiefile()
if cookiefile is not None and cookiefile.is_file():
base_options["cookiefile"] = str(cookiefile)
2026-01-12 16:15:51 -08:00
else:
2026-01-30 12:04:37 -08:00
# Respect configured browser cookie preference if provided; otherwise fall back to auto-detect.
pref = (self.defaults.cookies_from_browser or "").lower().strip()
if pref:
if pref in {"none", "off", "false"}:
# Explicitly disabled
pass
elif pref in {"auto", "detect"}:
_add_browser_cookies_if_available(base_options)
else:
# Try the preferred browser first; fall back to auto-detect if not present
_add_browser_cookies_if_available(base_options, preferred_browser=pref)
else:
# Add browser cookies support "just in case" if no file found (best effort)
_add_browser_cookies_if_available(base_options)
2025-12-20 23:57:44 -08:00
2026-01-30 12:04:37 -08:00
# Special handling for format keywords explicitly passed in via options
2026-01-20 16:42:49 -08:00
if opts.ytdl_format == "audio":
2026-01-30 12:04:37 -08:00
try:
2026-02-09 17:45:57 -08:00
import dataclasses as _dc
2026-01-30 12:04:37 -08:00
2026-02-09 17:45:57 -08:00
opts = _dc.replace(opts, mode="audio", ytdl_format=None)
except Exception:
from SYS.logger import logger
logger.exception("Failed to set opts mode to audio via dataclasses.replace")
2026-01-20 16:42:49 -08:00
elif opts.ytdl_format == "video":
2026-01-30 12:04:37 -08:00
try:
2026-02-09 17:45:57 -08:00
import dataclasses as _dc
2026-01-30 12:04:37 -08:00
2026-02-09 17:45:57 -08:00
opts = _dc.replace(opts, mode="video", ytdl_format=None)
except Exception:
from SYS.logger import logger
logger.exception("Failed to set opts mode to video via dataclasses.replace")
2026-01-20 16:42:49 -08:00
2025-12-20 23:57:44 -08:00
if opts.no_playlist:
base_options["noplaylist"] = True
2026-01-30 12:04:37 -08:00
# If no explicit format was provided, honor the configured default format
2026-01-27 14:56:01 -08:00
ytdl_format = opts.ytdl_format
2026-01-30 12:04:37 -08:00
if not ytdl_format:
configured_format = (str(self.defaults.format or "")).strip()
if configured_format:
if configured_format.lower() == "audio":
# Default to audio-only downloads
try:
2026-02-09 17:45:57 -08:00
import dataclasses as _dc
2026-01-30 12:04:37 -08:00
2026-02-09 17:45:57 -08:00
opts = _dc.replace(opts, mode="audio")
except Exception:
from SYS.logger import logger
logger.exception("Failed to set opts mode to audio via dataclasses.replace (configured default)")
2026-01-30 12:04:37 -08:00
ytdl_format = None
else:
# Leave ytdl_format None so that default_format(opts.mode)
# returns the configured format literally (e.g., '720') and
# we don't auto-convert it to an internal selector.
pass
2026-01-27 14:56:01 -08:00
if ytdl_format and opts.mode != "audio":
2026-02-09 17:22:40 -08:00
# Don't resolve 3-digit format IDs (like 251, 249, 140 from YouTube format tables) as heights
# YouTube format IDs are typically 2-3 digits representing specific codec/quality combinations
# Height selectors come from user input like "720" or "1080p"
is_likely_format_id = (
isinstance(ytdl_format, str) and
len(ytdl_format.strip()) == 3 and
ytdl_format.strip().isdigit()
)
if not is_likely_format_id:
resolved = self.resolve_height_selector(ytdl_format)
if resolved:
ytdl_format = resolved
2026-01-27 14:56:01 -08:00
fmt = ytdl_format or self.default_format(opts.mode)
2025-12-20 23:57:44 -08:00
base_options["format"] = fmt
2026-01-20 16:42:49 -08:00
if opts.mode == "audio":
base_options["postprocessors"] = [{
"key": "FFmpegExtractAudio"
}]
2026-01-15 16:26:22 -08:00
if opts.mode != "audio":
2025-12-20 23:57:44 -08:00
format_sort = self.defaults.format_sort or [
"res:4320",
"res:2880",
"res:2160",
"res:1440",
"res:1080",
"res:720",
"res",
]
base_options["format_sort"] = format_sort
if getattr(opts, "embed_chapters", False):
pps = base_options.get("postprocessors")
if not isinstance(pps, list):
pps = []
already_has_metadata = any(
isinstance(pp,
dict) and str(pp.get("key") or "") == "FFmpegMetadata"
for pp in pps
2025-12-20 23:57:44 -08:00
)
if not already_has_metadata:
pps.append(
{
"key": "FFmpegMetadata",
"add_metadata": True,
"add_chapters": True,
"add_infojson": "if_exists",
}
)
base_options["postprocessors"] = pps
if opts.mode != "audio":
base_options.setdefault("merge_output_format", "mkv")
if getattr(opts, "write_sub", False):
base_options["writesubtitles"] = True
base_options["writeautomaticsub"] = True
base_options["subtitlesformat"] = "vtt"
if opts.clip_sections:
sections: List[str] = []
def _secs_to_hms(seconds: float) -> str:
total = max(0, int(seconds))
minutes, secs = divmod(total, 60)
hours, minutes = divmod(minutes, 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
for section_range in str(opts.clip_sections).split(","):
section_range = section_range.strip()
if not section_range:
continue
try:
start_s_raw, end_s_raw = section_range.split("-", 1)
start_s = float(start_s_raw.strip())
end_s = float(end_s_raw.strip())
if start_s >= end_s:
continue
sections.append(f"*{_secs_to_hms(start_s)}-{_secs_to_hms(end_s)}")
except (ValueError, AttributeError):
continue
if sections:
base_options["download_sections"] = sections
2025-12-21 16:59:37 -08:00
# Clipped outputs should begin with a keyframe; otherwise players (notably mpv)
# can show audio before video or a black screen until the next keyframe.
# yt-dlp implements this by forcing keyframes at cut points.
base_options["force_keyframes_at_cuts"] = True
2025-12-20 23:57:44 -08:00
debug(f"Download sections configured: {', '.join(sections)}")
if opts.playlist_items:
base_options["playlist_items"] = opts.playlist_items
if not opts.quiet:
2026-02-09 17:34:40 -08:00
debug(f"yt-dlp: mode={opts.mode}, format={base_options.get('format')}, cookiefile={base_options.get('cookiefile')}")
2025-12-20 23:57:44 -08:00
return base_options
2025-12-16 23:23:43 -08:00
def build_yt_dlp_cli_args(
self,
*,
url: str,
output_dir: Optional[Path] = None,
ytdl_format: Optional[str] = None,
playlist_items: Optional[str] = None,
no_playlist: bool = False,
quiet: bool = True,
extra_args: Optional[Sequence[str]] = None,
) -> List[str]:
"""Build a yt-dlp command line (argv list).
This is primarily for debug output or subprocess execution.
"""
argv: List[str] = ["yt-dlp"]
if quiet:
argv.extend(["--quiet", "--no-warnings"])
argv.append("--no-progress")
cookiefile = self.resolve_cookiefile()
if cookiefile is not None:
argv.extend(["--cookies", str(cookiefile)])
if no_playlist:
argv.append("--no-playlist")
if playlist_items:
argv.extend(["--playlist-items", str(playlist_items)])
fmt = (ytdl_format or "").strip()
if fmt:
# Use long form to avoid confusion with app-level flags.
argv.extend(["--format", fmt])
if self.defaults.format_sort:
for sort_key in self.defaults.format_sort:
argv.extend(["-S", sort_key])
if output_dir is not None:
outtmpl = str((output_dir / "%(title)s.%(ext)s").resolve())
argv.extend(["-o", outtmpl])
if extra_args:
argv.extend([str(a) for a in extra_args if str(a).strip()])
argv.append(str(url))
return argv
def debug_print_cli(self, argv: Sequence[str]) -> None:
try:
debug("yt-dlp argv: " + " ".join(str(a) for a in argv))
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to debug-print yt-dlp CLI arguments")
2026-01-01 20:37:27 -08:00
2026-01-30 12:04:37 -08:00
def config_schema() -> List[Dict[str, Any]]:
"""Return a schema describing editable YT-DLP tool defaults for the config UI."""
format_choices = [
"best",
"1080",
"720",
"640",
"audio",
]
# Offer browser choices depending on what's present on the host system
browser_choices = ["auto", "none"]
for b in ("chrome", "chromium", "brave"):
try:
if _browser_cookie_path_for(b) is not None:
browser_choices.append(b)
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Error while checking cookie path for browser '%s'", b)
2026-01-30 12:04:37 -08:00
continue
return [
{
"key": "format",
"label": "Default format",
"default": YtDlpDefaults.format,
"choices": format_choices,
},
{
"key": "cookies",
"label": "Cookie file (path)",
"default": "",
},
{
"key": "cookies_from_browser",
"label": "Browser cookie source (used if no cookie file)",
"default": "auto",
"choices": browser_choices,
},
]
2026-01-01 20:37:27 -08:00
# Progress + utility helpers for yt-dlp driven downloads (previously in cmdlet/download_media).
_YTDLP_PROGRESS_BAR = ProgressBar()
2026-01-16 19:39:45 -08:00
_YTDLP_PROGRESS_ACTIVITY_LOCK = threading.Lock()
_YTDLP_PROGRESS_LAST_ACTIVITY = 0.0
2026-01-01 20:37:27 -08:00
_SUBTITLE_EXTS = (".vtt", ".srt", ".ass", ".ssa", ".lrc")
2026-01-19 06:24:09 -08:00
def _progress_label(status: Optional[Dict[str, Any]]) -> str:
if not status:
return "unknown"
raw_info = status.get("info_dict")
info_dict = raw_info if isinstance(raw_info, dict) else {}
2026-01-05 07:51:19 -08:00
candidates = [
status.get("filename"),
info_dict.get("_filename"),
info_dict.get("filepath"),
info_dict.get("title"),
info_dict.get("id"),
]
for cand in candidates:
if not cand:
continue
try:
name = Path(str(cand)).name
except Exception:
name = str(cand)
label = str(name or "").strip()
if label:
return label
return "download"
2026-01-16 19:39:45 -08:00
def _record_progress_activity(timestamp: Optional[float] = None) -> None:
global _YTDLP_PROGRESS_LAST_ACTIVITY
with _YTDLP_PROGRESS_ACTIVITY_LOCK:
_YTDLP_PROGRESS_LAST_ACTIVITY = timestamp if timestamp is not None else time.monotonic()
def _get_last_progress_activity() -> float:
with _YTDLP_PROGRESS_ACTIVITY_LOCK:
return _YTDLP_PROGRESS_LAST_ACTIVITY
def _clear_progress_activity() -> None:
_record_progress_activity(0.0)
2026-01-01 20:37:27 -08:00
def _live_ui_and_pipe_index() -> tuple[Optional[Any], int]:
ui = None
try:
ui = pipeline_context.get_live_progress() if hasattr(pipeline_context, "get_live_progress") else None
except Exception:
ui = None
pipe_idx: int = 0
try:
stage_ctx = pipeline_context.get_stage_context() if hasattr(pipeline_context, "get_stage_context") else None
maybe_idx = getattr(stage_ctx, "pipe_index", None) if stage_ctx is not None else None
if isinstance(maybe_idx, int):
pipe_idx = int(maybe_idx)
except Exception:
pipe_idx = 0
return ui, pipe_idx
def _begin_live_steps(total_steps: int) -> None:
ui, pipe_idx = _live_ui_and_pipe_index()
if ui is None:
return
try:
begin = getattr(ui, "begin_pipe_steps", None)
if callable(begin):
begin(int(pipe_idx), total_steps=int(total_steps))
except Exception:
return
def _step(text: str) -> None:
ui, pipe_idx = _live_ui_and_pipe_index()
if ui is None:
return
try:
adv = getattr(ui, "advance_pipe_step", None)
if callable(adv):
adv(int(pipe_idx), str(text))
except Exception:
return
def _set_pipe_percent(percent: int) -> None:
ui, pipe_idx = _live_ui_and_pipe_index()
if ui is None:
return
try:
set_pct = getattr(ui, "set_pipe_percent", None)
if callable(set_pct):
set_pct(int(pipe_idx), int(percent))
except Exception:
return
def _format_chapters_note(info: Dict[str, Any]) -> Optional[str]:
"""Format yt-dlp chapter metadata into a stable, note-friendly text."""
try:
chapters = info.get("chapters")
except Exception:
chapters = None
if not isinstance(chapters, list) or not chapters:
return None
rows: List[tuple[int, Optional[int], str]] = []
max_t = 0
for ch in chapters:
if not isinstance(ch, dict):
continue
start_raw = ch.get("start_time")
end_raw = ch.get("end_time")
title_raw = ch.get("title") or ch.get("name") or ch.get("chapter")
try:
if start_raw is None:
continue
start_s = int(float(start_raw))
except Exception:
continue
end_s: Optional[int] = None
try:
if end_raw is not None:
end_s = int(float(end_raw))
except Exception:
end_s = None
title = str(title_raw).strip() if title_raw is not None else ""
rows.append((start_s, end_s, title))
try:
max_t = max(max_t, start_s, end_s or 0)
except Exception:
max_t = max(max_t, start_s)
if not rows:
return None
force_hours = bool(max_t >= 3600)
def _tc(seconds: int) -> str:
total = max(0, int(seconds))
minutes, secs = divmod(total, 60)
hours, minutes = divmod(minutes, 60)
if force_hours:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
lines: List[str] = []
for start_s, end_s, title in sorted(
rows, key=lambda r: (r[0], r[1] if r[1] is not None else 10**9, r[2])
):
if end_s is not None and end_s > start_s:
prefix = f"{_tc(start_s)}-{_tc(end_s)}"
else:
prefix = _tc(start_s)
line = f"{prefix} {title}".strip()
if line:
lines.append(line)
text = "\n".join(lines).strip()
return text or None
def _best_subtitle_sidecar(media_path: Path) -> Optional[Path]:
"""Find the most likely subtitle sidecar file for a downloaded media file."""
try:
base_dir = media_path.parent
stem = media_path.stem
if not stem:
return None
candidates: List[Path] = []
for p in base_dir.glob(stem + ".*"):
try:
if not p.is_file():
continue
except Exception:
continue
if p.suffix.lower() in _SUBTITLE_EXTS:
candidates.append(p)
preferred_order = [".vtt", ".srt", ".ass", ".ssa", ".lrc"]
for ext in preferred_order:
for p in candidates:
if p.suffix.lower() == ext:
return p
return candidates[0] if candidates else None
except Exception:
return None
def _read_text_file(path: Path) -> Optional[str]:
try:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception:
return None
def _download_with_sections_via_cli(
url: str,
ytdl_options: Dict[str, Any],
sections: List[str],
quiet: bool = False,
) -> tuple[Optional[str], Dict[str, Any]]:
sections_list = ytdl_options.get("download_sections", [])
if not sections_list:
return "", {}
2026-01-23 00:24:00 -08:00
pipeline = PipelineProgress(pipeline_context)
class _SectionProgressSimulator:
def __init__(self, start_pct: int, max_pct: int, interval: float = 0.5) -> None:
self._start_pct = max(0, min(int(start_pct), 99))
self._max_pct = max(self._start_pct, min(int(max_pct), 98))
self._interval = max(0.1, float(interval))
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
def _run(self) -> None:
current = self._start_pct
while not self._stop_event.wait(self._interval):
if current < self._max_pct:
current += 1
try:
_set_pipe_percent(current)
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to set pipeline percent to %d", current)
2026-01-23 00:24:00 -08:00
def start(self) -> None:
if self._thread is not None or self._start_pct >= self._max_pct:
return
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
if self._thread is not None:
self._thread.join(timeout=0.5)
self._thread = None
try:
_set_pipe_percent(self._max_pct)
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to set pipeline percent to max %d", self._max_pct)
2026-01-23 00:24:00 -08:00
2026-01-01 20:37:27 -08:00
session_id = hashlib.md5((url + str(time.time()) + "".join(random.choices(string.ascii_letters, k=10))).encode()).hexdigest()[:12]
first_section_info = None
total_sections = len(sections_list)
2026-01-23 00:24:00 -08:00
try:
for section_idx, section in enumerate(sections_list, 1):
display_pct = 50
2026-01-01 20:37:27 -08:00
if total_sections > 0:
2026-01-23 00:24:00 -08:00
display_pct = 50 + int(((section_idx - 1) / max(1, total_sections)) * 49)
try:
_set_pipe_percent(display_pct)
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to set pipeline percent to display_pct %d for section %d", display_pct, section_idx)
2026-01-01 20:37:27 -08:00
2026-01-23 00:24:00 -08:00
pipeline.set_status(f"Downloading & clipping clip section {section_idx}/{total_sections}")
base_outtmpl = ytdl_options.get("outtmpl", "%(title)s.%(ext)s")
output_dir_path = Path(base_outtmpl).parent
filename_tmpl = f"{session_id}_{section_idx}"
if base_outtmpl.endswith(".%(ext)s"):
filename_tmpl += ".%(ext)s"
section_outtmpl = str(output_dir_path / filename_tmpl)
if section_idx == 1:
metadata_cmd = ["yt-dlp", "--dump-json", "--skip-download"]
if ytdl_options.get("cookiefile"):
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
metadata_cmd.extend(["--cookies", cookies_path])
if ytdl_options.get("noplaylist"):
metadata_cmd.append("--no-playlist")
metadata_cmd.append(url)
try:
meta_result = subprocess.run(metadata_cmd, capture_output=True, text=True)
if meta_result.returncode == 0 and meta_result.stdout:
try:
info_dict = json.loads(meta_result.stdout.strip())
first_section_info = info_dict
if not quiet:
debug(f"Extracted title from metadata: {info_dict.get('title')}")
except json.JSONDecodeError:
if not quiet:
debug("Could not parse JSON metadata")
except Exception as exc:
if not quiet:
debug(f"Error extracting metadata: {exc}")
cmd = ["yt-dlp"]
if quiet:
cmd.append("--quiet")
cmd.append("--no-warnings")
cmd.append("--no-progress")
cmd.extend(["--postprocessor-args", "ffmpeg:-hide_banner -loglevel error"])
if ytdl_options.get("ffmpeg_location"):
try:
cmd.extend(["--ffmpeg-location", str(ytdl_options["ffmpeg_location"])])
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to append ffmpeg_location CLI option")
2026-01-23 00:24:00 -08:00
if ytdl_options.get("format"):
cmd.extend(["-f", ytdl_options["format"]])
if ytdl_options.get("merge_output_format"):
cmd.extend(["--merge-output-format", str(ytdl_options["merge_output_format"])])
postprocessors = ytdl_options.get("postprocessors")
want_add_metadata = bool(ytdl_options.get("addmetadata"))
want_embed_chapters = bool(ytdl_options.get("embedchapters"))
if isinstance(postprocessors, list):
for pp in postprocessors:
if not isinstance(pp, dict):
continue
if str(pp.get("key") or "") == "FFmpegMetadata":
want_add_metadata = True
if bool(pp.get("add_chapters", True)):
want_embed_chapters = True
if want_add_metadata:
cmd.append("--add-metadata")
if want_embed_chapters:
cmd.append("--embed-chapters")
if ytdl_options.get("writesubtitles"):
cmd.append("--write-sub")
cmd.append("--write-auto-sub")
cmd.extend(["--sub-format", "vtt"])
if ytdl_options.get("force_keyframes_at_cuts"):
cmd.append("--force-keyframes-at-cuts")
cmd.extend(["-o", section_outtmpl])
2026-01-01 20:37:27 -08:00
if ytdl_options.get("cookiefile"):
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
2026-01-23 00:24:00 -08:00
cmd.extend(["--cookies", cookies_path])
2026-01-01 20:37:27 -08:00
if ytdl_options.get("noplaylist"):
2026-01-23 00:24:00 -08:00
cmd.append("--no-playlist")
2026-01-01 20:37:27 -08:00
2026-01-23 00:24:00 -08:00
cmd.extend(["--download-sections", section])
cmd.append(url)
if not quiet:
debug(f"Running yt-dlp for section: {section}")
progress_end_pct = min(display_pct + 45, 98)
simulator = _SectionProgressSimulator(display_pct, progress_end_pct)
simulator.start()
2026-01-01 20:37:27 -08:00
try:
2026-01-23 00:24:00 -08:00
if quiet:
subprocess.run(cmd, check=True, capture_output=True, text=True)
else:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as exc:
stderr_text = exc.stderr or ""
tail = "\n".join(stderr_text.splitlines()[-12:]).strip()
details = f"\n{tail}" if tail else ""
raise DownloadError(f"yt-dlp failed for section {section} (exit {exc.returncode}){details}") from exc
except Exception as exc:
raise DownloadError(f"yt-dlp failed for section {section}: {exc}") from exc
finally:
simulator.stop()
finally:
pipeline.clear_status()
2026-01-01 20:37:27 -08:00
try:
_set_pipe_percent(99)
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to set pipeline percent to 99 at end of multi-section job")
2026-01-01 20:37:27 -08:00
return session_id, first_section_info or {}
def _iter_download_entries(info: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
queue: List[Dict[str, Any]] = [info]
seen: set[int] = set()
while queue:
current = queue.pop(0)
obj_id = id(current)
if obj_id in seen:
continue
seen.add(obj_id)
entries = current.get("entries")
if isinstance(entries, list):
for entry in entries:
queue.append(entry)
if current.get("requested_downloads") or not entries:
yield current
def _candidate_paths(entry: Dict[str, Any], output_dir: Path) -> Iterator[Path]:
requested = entry.get("requested_downloads")
if isinstance(requested, list):
for item in requested:
if isinstance(item, dict):
fp = item.get("filepath") or item.get("_filename")
if fp:
yield Path(fp)
for key in ("filepath", "_filename", "filename"):
value = entry.get(key)
if value:
yield Path(value)
if entry.get("filename"):
yield output_dir / entry["filename"]
def _resolve_entry_and_path(info: Dict[str, Any], output_dir: Path) -> tuple[Dict[str, Any], Path]:
for entry in _iter_download_entries(info):
for candidate in _candidate_paths(entry, output_dir):
if candidate.is_file():
return entry, candidate
if not candidate.is_absolute():
maybe = output_dir / candidate
if maybe.is_file():
return entry, maybe
raise FileNotFoundError("yt-dlp did not report a downloaded media file")
def _resolve_entries_and_paths(info: Dict[str, Any], output_dir: Path) -> List[tuple[Dict[str, Any], Path]]:
resolved: List[tuple[Dict[str, Any], Path]] = []
seen: set[str] = set()
for entry in _iter_download_entries(info):
chosen: Optional[Path] = None
for candidate in _candidate_paths(entry, output_dir):
if candidate.is_file():
chosen = candidate
break
if not candidate.is_absolute():
maybe = output_dir / candidate
if maybe.is_file():
chosen = maybe
break
if chosen is None:
continue
key = str(chosen.resolve())
if key in seen:
continue
seen.add(key)
resolved.append((entry, chosen))
return resolved
def _extract_sha256(info: Dict[str, Any]) -> Optional[str]:
for payload in [info] + info.get("entries", []):
if not isinstance(payload, dict):
continue
hashes = payload.get("hashes")
if isinstance(hashes, dict):
for key in ("sha256", "sha-256", "sha_256"):
if key in hashes and isinstance(hashes[key], str) and hashes[key].strip():
return hashes[key].strip()
for key in ("sha256", "sha-256", "sha_256"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return None
def _progress_callback(status: Dict[str, Any]) -> None:
2026-01-05 07:51:19 -08:00
label = _progress_label(status)
2026-01-01 20:37:27 -08:00
event = status.get("status")
2026-01-05 07:51:19 -08:00
downloaded = status.get("downloaded_bytes")
total = status.get("total_bytes") or status.get("total_bytes_estimate")
2026-01-16 19:39:45 -08:00
if event == "downloading":
_record_progress_activity()
2026-01-05 07:51:19 -08:00
pipeline = PipelineProgress(pipeline_context)
live_ui, _ = pipeline.ui_and_pipe_index()
use_live = live_ui is not None
def _total_bytes(value: Any) -> Optional[int]:
try:
if isinstance(value, (int, float)) and value > 0:
return int(value)
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to interpret total bytes value: %r", value)
2026-01-05 07:51:19 -08:00
return None
2026-01-01 20:37:27 -08:00
if event == "downloading":
2026-01-05 07:51:19 -08:00
if use_live:
try:
if not _YTDLP_TRANSFER_STATE.get(label, {}).get("started"):
pipeline.begin_transfer(label=label, total=_total_bytes(total))
_YTDLP_TRANSFER_STATE[label] = {"started": True}
pipeline.update_transfer(
label=label,
completed=int(downloaded) if downloaded is not None else None,
total=_total_bytes(total),
)
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to update pipeline transfer for label '%s'", label)
2026-01-05 07:51:19 -08:00
else:
_YTDLP_PROGRESS_BAR.update(
downloaded=int(downloaded) if downloaded is not None else None,
total=int(total) if total is not None else None,
label=label,
file=sys.stderr,
)
2026-01-01 20:37:27 -08:00
elif event == "finished":
2026-01-05 07:51:19 -08:00
if use_live:
try:
if _YTDLP_TRANSFER_STATE.get(label, {}).get("started"):
pipeline.finish_transfer(label=label)
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to finish pipeline transfer for label '%s'", label)
2026-01-05 07:51:19 -08:00
_YTDLP_TRANSFER_STATE.pop(label, None)
else:
_YTDLP_PROGRESS_BAR.finish()
2026-01-01 20:37:27 -08:00
elif event in ("postprocessing", "processing"):
return
try:
from SYS.metadata import extract_ytdlp_tags
except ImportError:
extract_ytdlp_tags = None # type: ignore
2026-02-09 17:34:40 -08:00
def _is_http_403(exc: Exception) -> bool:
msg_parts: list[str] = []
try:
msg_parts.append(str(exc))
except Exception:
pass
try:
cause = getattr(exc, "__cause__", None)
if cause is not None:
msg_parts.append(str(cause))
except Exception:
pass
try:
context = getattr(exc, "__context__", None)
if context is not None:
msg_parts.append(str(context))
except Exception:
pass
for msg in msg_parts:
if "HTTP Error 403" in msg or "403: Forbidden" in msg or "403 Forbidden" in msg:
return True
return False
2026-01-30 12:04:37 -08:00
def download_media(opts: DownloadOptions, *, config: Optional[Dict[str, Any]] = None, debug_logger: Optional[DebugLogger] = None) -> Any:
"""Download streaming media exclusively via yt-dlp.
Optional `config` dict may be provided so tool defaults (e.g., cookies, default
format) are applied when constructing the YtDlpTool instance.
"""
2026-01-01 20:37:27 -08:00
2026-01-18 03:18:48 -08:00
debug(f"[download_media] start: {opts.url}")
2026-01-01 20:37:27 -08:00
try:
netloc = urlparse(opts.url).netloc.lower()
except Exception:
netloc = ""
if "gofile.io" in netloc:
msg = "GoFile links are currently unsupported"
if not opts.quiet:
debug(msg)
if debug_logger is not None:
debug_logger.write_record("gofile-unsupported", {"url": opts.url})
raise DownloadError(msg)
ytdlp_supported = is_url_supported_by_ytdlp(opts.url)
if not ytdlp_supported:
msg = "URL not supported by yt-dlp; try download-file for manual downloads"
if not opts.quiet:
log(msg)
if debug_logger is not None:
debug_logger.write_record("ytdlp-unsupported", {"url": opts.url})
raise DownloadError(msg)
if opts.playlist_items:
debug(
f"Skipping probe for playlist (item selection: {opts.playlist_items}), proceeding with download"
)
2026-01-19 06:24:09 -08:00
probe_result: Optional[Dict[str, Any]] = {"url": opts.url}
2026-01-01 20:37:27 -08:00
else:
probe_cookiefile = None
try:
if opts.cookies_path and opts.cookies_path.is_file():
probe_cookiefile = str(opts.cookies_path)
except Exception:
probe_cookiefile = None
probe_result = probe_url(opts.url, no_playlist=opts.no_playlist, timeout_seconds=15, cookiefile=probe_cookiefile)
if probe_result is None:
msg = "yt-dlp could not detect media for this URL; use download-file for direct downloads"
if not opts.quiet:
log(msg)
if debug_logger is not None:
debug_logger.write_record("ytdlp-skip-no-media", {"url": opts.url})
raise DownloadError(msg)
ensure_yt_dlp_ready()
2026-01-30 12:04:37 -08:00
# Use provided config when available so user tool settings are honored
ytdlp_tool = YtDlpTool(config or {})
2026-01-01 20:37:27 -08:00
ytdl_options = ytdlp_tool.build_ytdlp_options(opts)
hooks = ytdl_options.get("progress_hooks")
if not isinstance(hooks, list):
hooks = []
ytdl_options["progress_hooks"] = hooks
if _progress_callback not in hooks:
hooks.append(_progress_callback)
if not opts.quiet:
debug(f"Starting yt-dlp download: {opts.url}")
if debug_logger is not None:
debug_logger.write_record("ytdlp-start", {"url": opts.url})
assert yt_dlp is not None
2026-02-09 17:34:40 -08:00
info: Optional[Dict[str, Any]] = None
session_id = None
first_section_info: Dict[str, Any] = {}
2026-01-01 20:37:27 -08:00
try:
if not opts.quiet:
if ytdl_options.get("download_sections"):
debug(f"[yt-dlp] download_sections: {ytdl_options['download_sections']}")
debug(f"[yt-dlp] force_keyframes_at_cuts: {ytdl_options.get('force_keyframes_at_cuts', False)}")
if ytdl_options.get("download_sections"):
live_ui, _ = PipelineProgress(pipeline_context).ui_and_pipe_index()
quiet_sections = bool(opts.quiet) or (live_ui is not None)
session_id, first_section_info = _download_with_sections_via_cli(
opts.url,
ytdl_options,
ytdl_options.get("download_sections", []),
quiet=quiet_sections,
)
info = None
else:
with yt_dlp.YoutubeDL(ytdl_options) as ydl: # type: ignore[arg-type]
2026-02-09 17:45:57 -08:00
info = cast(Dict[str, Any], ydl.extract_info(opts.url, download=True))
2026-01-01 20:37:27 -08:00
except Exception as exc:
2026-02-09 17:34:40 -08:00
retry_attempted = False
if _is_http_403(exc) and not ytdl_options.get("download_sections"):
retry_attempted = True
try:
if not opts.quiet:
debug("yt-dlp hit HTTP 403; retrying with browser cookies + android/web player client")
fallback_options = dict(ytdl_options)
fallback_options.pop("cookiefile", None)
_add_browser_cookies_if_available(fallback_options)
extractor_args = fallback_options.get("extractor_args")
if not isinstance(extractor_args, dict):
extractor_args = {}
youtube_args = extractor_args.get("youtube")
if not isinstance(youtube_args, dict):
youtube_args = {}
if "player_client" not in youtube_args:
youtube_args["player_client"] = ["android", "web"]
extractor_args["youtube"] = youtube_args
fallback_options["extractor_args"] = extractor_args
with yt_dlp.YoutubeDL(fallback_options) as ydl: # type: ignore[arg-type]
2026-02-09 17:45:57 -08:00
info = cast(Dict[str, Any], ydl.extract_info(opts.url, download=True))
2026-02-09 17:34:40 -08:00
except Exception as exc2:
log(f"yt-dlp failed: {exc2}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{"phase": "yt-dlp", "error": str(exc2), "traceback": traceback.format_exc()},
)
raise DownloadError("yt-dlp download failed") from exc2
if not retry_attempted:
log(f"yt-dlp failed: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{"phase": "yt-dlp", "error": str(exc), "traceback": traceback.format_exc()},
)
raise DownloadError("yt-dlp download failed") from exc
2026-01-01 20:37:27 -08:00
if info is None:
try:
time.sleep(0.5)
files = sorted(opts.output_dir.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True)
if not files:
raise FileNotFoundError(f"No files found in {opts.output_dir}")
if opts.clip_sections and session_id:
section_pattern = re.compile(rf"^{re.escape(session_id)}_(\d+)")
matching_files = [f for f in files if section_pattern.search(f.name)]
if matching_files:
def extract_section_num(path: Path) -> int:
match = section_pattern.search(path.name)
return int(match.group(1)) if match else 999
matching_files.sort(key=extract_section_num)
debug(f"Found {len(matching_files)} section file(s) matching pattern")
by_index: Dict[int, List[Path]] = {}
for f in matching_files:
m = section_pattern.search(f.name)
if not m:
continue
try:
n = int(m.group(1))
except Exception:
continue
by_index.setdefault(n, []).append(f)
renamed_media_files: List[Path] = []
for sec_num in sorted(by_index.keys()):
group = by_index.get(sec_num) or []
if not group:
continue
def _is_subtitle(p: Path) -> bool:
try:
return p.suffix.lower() in _SUBTITLE_EXTS
except Exception:
return False
media_candidates = [p for p in group if not _is_subtitle(p)]
subtitle_candidates = [p for p in group if _is_subtitle(p)]
media_file: Optional[Path] = None
for cand in media_candidates:
try:
if cand.suffix.lower() in {".json", ".info.json"}:
continue
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to inspect candidate suffix for %s", cand)
2026-01-01 20:37:27 -08:00
media_file = cand
break
if media_file is None and media_candidates:
media_file = media_candidates[0]
if media_file is None:
continue
try:
media_hash = sha256_file(media_file)
except Exception as exc:
debug(f"Failed to hash section media file {media_file.name}: {exc}")
renamed_media_files.append(media_file)
continue
prefix = f"{session_id}_{sec_num}"
def _tail(name: str) -> str:
try:
if name.startswith(prefix):
return name[len(prefix):]
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to check name prefix for '%s'", name)
2026-01-01 20:37:27 -08:00
try:
return Path(name).suffix
except Exception:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to obtain suffix for name '%s'", name)
2026-01-01 20:37:27 -08:00
return ""
try:
new_media_name = f"{media_hash}{_tail(media_file.name)}"
new_media_path = opts.output_dir / new_media_name
if new_media_path.exists() and new_media_path != media_file:
debug(f"File with hash {media_hash} already exists, using existing file.")
try:
media_file.unlink()
except OSError:
2026-01-31 19:00:04 -08:00
from SYS.logger import logger
logger.exception("Failed to unlink duplicate media file %s", media_file)
2026-01-01 20:37:27 -08:00
else:
media_file.rename(new_media_path)
debug(f"Renamed section file: {media_file.name} -> {new_media_name}")
renamed_media_files.append(new_media_path)
except Exception as exc:
debug(f"Failed to rename section media file {media_file.name}: {exc}")
renamed_media_files.append(media_file)
new_media_path = media_file
for sub_file in subtitle_candidates:
try:
new_sub_name = f"{media_hash}{_tail(sub_file.name)}"
new_sub_path = opts.output_dir / new_sub_name
if new_sub_path.exists() and new_sub_path != sub_file:
try:
sub_file.unlink()
except OSError:
pass
else:
sub_file.rename(new_sub_path)
debug(f"Renamed section file: {sub_file.name} -> {new_sub_name}")
except Exception as exc:
debug(f"Failed to rename section subtitle file {sub_file.name}: {exc}")
media_path = renamed_media_files[0] if renamed_media_files else matching_files[0]
media_paths = renamed_media_files if renamed_media_files else None
if not opts.quiet:
count = len(media_paths) if isinstance(media_paths, list) else 1
debug(f"✓ Downloaded {count} section media file(s) (session: {session_id})")
else:
media_path = files[0]
media_paths = None
if not opts.quiet:
debug(f"✓ Downloaded section file (pattern not found): {media_path.name}")
else:
media_path = files[0]
media_paths = None
if not opts.quiet:
debug(f"✓ Downloaded: {media_path.name}")
if debug_logger is not None:
debug_logger.write_record("ytdlp-file-found", {"path": str(media_path)})
except Exception as exc:
log(f"Error finding downloaded file: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record("exception", {"phase": "find-file", "error": str(exc)})
raise DownloadError(str(exc)) from exc
file_hash = sha256_file(media_path)
2026-01-19 06:24:09 -08:00
section_tags: List[str] = []
2026-01-01 20:37:27 -08:00
title = ""
if first_section_info:
title = first_section_info.get("title", "")
if title:
2026-01-19 06:24:09 -08:00
section_tags.append(f"title:{title}")
2026-01-01 20:37:27 -08:00
debug(f"Added title tag for section download: {title}")
if first_section_info:
2026-01-19 06:24:09 -08:00
info_dict_sec = first_section_info
2026-01-01 20:37:27 -08:00
else:
2026-01-19 06:24:09 -08:00
info_dict_sec = {"id": media_path.stem, "title": title or media_path.stem, "ext": media_path.suffix.lstrip(".")}
2026-01-01 20:37:27 -08:00
2026-01-19 06:24:09 -08:00
return DownloadMediaResult(path=media_path, info=info_dict_sec, tag=section_tags, source_url=opts.url, hash_value=file_hash, paths=media_paths)
2026-01-01 20:37:27 -08:00
if not isinstance(info, dict):
log(f"Unexpected yt-dlp response: {type(info)}", file=sys.stderr)
raise DownloadError("Unexpected yt-dlp response type")
info_dict: Dict[str, Any] = cast(Dict[str, Any], info)
if debug_logger is not None:
debug_logger.write_record("ytdlp-info", {"keys": sorted(info_dict.keys()), "is_playlist": bool(info_dict.get("entries"))})
if info_dict.get("entries") and not opts.no_playlist:
resolved = _resolve_entries_and_paths(info_dict, opts.output_dir)
if resolved:
results: List[DownloadMediaResult] = []
for entry, media_path in resolved:
hash_value = _extract_sha256(entry) or _extract_sha256(info_dict)
if not hash_value:
try:
hash_value = sha256_file(media_path)
except OSError:
hash_value = None
tags: List[str] = []
2026-01-19 06:24:09 -08:00
if extract_ytdlp_tags is not None:
2026-01-01 20:37:27 -08:00
try:
tags = extract_ytdlp_tags(entry)
except Exception as exc:
log(f"Error extracting tags: {exc}", file=sys.stderr)
source_url = entry.get("webpage_url") or entry.get("original_url") or entry.get("url") or opts.url
results.append(
DownloadMediaResult(
path=media_path,
info=entry,
tag=tags,
source_url=source_url,
hash_value=hash_value,
)
)
if not opts.quiet:
debug(f"✓ Downloaded playlist items: {len(results)}")
return results
try:
entry, media_path = _resolve_entry_and_path(info_dict, opts.output_dir)
except FileNotFoundError as exc:
log(f"Error: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record("exception", {"phase": "resolve-path", "error": str(exc)})
raise DownloadError(str(exc)) from exc
if debug_logger is not None:
debug_logger.write_record("resolved-media", {"path": str(media_path), "entry_keys": sorted(entry.keys())})
hash_value = _extract_sha256(entry) or _extract_sha256(info_dict)
if not hash_value:
try:
hash_value = sha256_file(media_path)
except OSError as exc:
if debug_logger is not None:
debug_logger.write_record("hash-error", {"path": str(media_path), "error": str(exc)})
2026-01-19 06:24:09 -08:00
tags_res: List[str] = []
if extract_ytdlp_tags is not None:
2026-01-01 20:37:27 -08:00
try:
2026-01-19 06:24:09 -08:00
tags_res = extract_ytdlp_tags(entry)
2026-01-01 20:37:27 -08:00
except Exception as exc:
log(f"Error extracting tags: {exc}", file=sys.stderr)
source_url = entry.get("webpage_url") or entry.get("original_url") or entry.get("url")
if not opts.quiet:
2026-01-21 13:51:17 -08:00
debug(f"✓ Downloaded: {media_path.name} ({len(tags_res)} tags)")
2026-01-01 20:37:27 -08:00
if debug_logger is not None:
debug_logger.write_record(
"downloaded",
{
"path": str(media_path),
2026-01-21 13:51:17 -08:00
"tag_count": len(tags_res),
2026-01-01 20:37:27 -08:00
"source_url": source_url,
"sha256": hash_value,
},
)
2026-01-19 06:24:09 -08:00
return DownloadMediaResult(path=media_path, info=entry, tag=tags_res, source_url=source_url, hash_value=hash_value)
2026-01-01 20:37:27 -08:00
2026-01-30 12:04:37 -08:00
def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300, config: Optional[Dict[str, Any]] = None) -> Any:
2026-01-01 20:37:27 -08:00
import threading
from typing import cast
result_container: List[Optional[Any]] = [None, None]
def _do_download() -> None:
try:
2026-01-30 12:04:37 -08:00
result_container[0] = download_media(opts, config=config)
2026-01-01 20:37:27 -08:00
except Exception as exc:
result_container[1] = exc
2026-01-18 03:18:48 -08:00
# Use daemon=True so a hung download doesn't block process exit if the wall timeout hits.
thread = threading.Thread(target=_do_download, daemon=True)
2026-01-01 20:37:27 -08:00
thread.start()
2026-01-16 19:39:45 -08:00
start_time = time.monotonic()
2026-01-18 03:18:48 -08:00
# We use two timeouts:
# 1. Activity timeout (no progress updates for X seconds)
# 2. Hard wall-clock timeout (total time for this URL)
# The wall-clock timeout is slightly larger than the activity timeout
# to allow for slow-but-steady progress, up to a hard cap (e.g. 10 minutes).
wall_timeout = max(timeout_seconds * 2, 600)
2026-01-16 19:39:45 -08:00
_record_progress_activity(start_time)
try:
while thread.is_alive():
thread.join(1)
if not thread.is_alive():
break
2026-01-18 03:18:48 -08:00
now = time.monotonic()
# Check activity timeout
2026-01-16 19:39:45 -08:00
last_activity = _get_last_progress_activity()
if last_activity <= 0:
last_activity = start_time
2026-01-18 03:18:48 -08:00
if now - last_activity > timeout_seconds:
raise DownloadError(f"Download activity timeout after {timeout_seconds} seconds for {opts.url}")
# Check hard wall-clock timeout
if now - start_time > wall_timeout:
raise DownloadError(f"Download hard timeout after {wall_timeout} seconds for {opts.url}")
2026-01-16 19:39:45 -08:00
finally:
_clear_progress_activity()
2026-01-01 20:37:27 -08:00
if result_container[1] is not None:
raise cast(Exception, result_container[1])
if result_container[0] is None:
raise DownloadError(f"Download failed for {opts.url}")
return cast(Any, result_container[0])