2092 lines
75 KiB
Python
2092 lines
75 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import random
|
|
import re
|
|
import string
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterator, List, Optional, Sequence, cast
|
|
from urllib.parse import urlparse
|
|
|
|
from SYS import pipeline as pipeline_context
|
|
from SYS.logger import debug, log
|
|
from SYS.models import (
|
|
DebugLogger,
|
|
DownloadError,
|
|
DownloadMediaResult,
|
|
DownloadOptions,
|
|
ProgressBar,
|
|
)
|
|
from SYS.pipeline_progress import PipelineProgress
|
|
from SYS.utils import ensure_directory, sha256_file
|
|
from SYS.yt_metadata import extract_ytdlp_tags
|
|
|
|
_YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {}
|
|
|
|
|
|
try:
|
|
import yt_dlp # type: ignore
|
|
from yt_dlp.extractor import gen_extractors # type: ignore
|
|
except Exception as exc: # pragma: no cover - handled at runtime
|
|
yt_dlp = None # type: ignore
|
|
gen_extractors = None # type: ignore
|
|
YTDLP_IMPORT_ERROR: Optional[Exception] = exc
|
|
else:
|
|
YTDLP_IMPORT_ERROR = None
|
|
|
|
_EXTRACTOR_CACHE: List[Any] | None = None
|
|
|
|
# Patterns for domain extraction from yt-dlp regexes
|
|
# 1) Alternation group followed by \.tld e.g. (?:youtube|youtu|youtube-nocookie)\.com
|
|
ALT_GROUP_TLD = re.compile(r'\((?:\?:)?([^\)]+)\)\\\.(?P<tld>[A-Za-z0-9.+-]+)')
|
|
# 2) Literal domain pieces like youtube\.com or youtu\.be (not preceded by a group)
|
|
LITERAL_DOMAIN = re.compile(r'(?<!\()(?<!\|)(?<!:)([A-Za-z0-9][A-Za-z0-9_-]{0,})\\\.([A-Za-z0-9.+-]+)')
|
|
# 3) Partial domain tokens that appear alone (e.g., zhihu) — treat as zhihu.com fallback
|
|
PARTIAL_TOKEN = re.compile(r'(?<![A-Za-z0-9_-])([A-Za-z0-9][A-Za-z0-9_-]{1,})(?=(?:\\?[/\)\$]|\\\.|$))')
|
|
|
|
_SUPPORTED_DOMAINS: set[str] | None = None
|
|
|
|
|
|
def normalize_patterns(valid_url) -> List[str]:
|
|
if not valid_url:
|
|
return []
|
|
if isinstance(valid_url, str):
|
|
return [valid_url]
|
|
if isinstance(valid_url, (list, tuple)):
|
|
return [p for p in valid_url if isinstance(p, str)]
|
|
return []
|
|
|
|
|
|
def extract_from_pattern(pat: str) -> set[str]:
|
|
domains = set()
|
|
|
|
# 1) Alternation groups followed by .tld
|
|
for alt_group, tld in ALT_GROUP_TLD.findall(pat):
|
|
# alt_group like "youtube|youtu|youtube-nocookie"
|
|
for alt in alt_group.split('|'):
|
|
alt = alt.strip()
|
|
# remove any non-domain tokens like (?:www\.)? if present inside alt (rare)
|
|
alt = re.sub(r'\(\?:www\\\.\)\?', '', alt)
|
|
if alt:
|
|
domains.add(f"{alt}.{tld}".lower())
|
|
|
|
# 2) Literal domain matches (youtube\.com)
|
|
for name, tld in LITERAL_DOMAIN.findall(pat):
|
|
domains.add(f"{name}.{tld}".lower())
|
|
|
|
# 3) Partial tokens fallback (only if we didn't already capture domains)
|
|
# This helps when regexes contain plain tokens like 'zhihu' or 'vimeo' without .com
|
|
if not domains:
|
|
for token in PARTIAL_TOKEN.findall(pat):
|
|
# ignore common regex words that are not domains
|
|
if len(token) <= 2:
|
|
continue
|
|
# avoid tokens that are clearly regex constructs
|
|
if token.lower() in {"https", "http", "www", "com", "net", "org"}:
|
|
continue
|
|
domains.add(f"{token.lower()}.com")
|
|
|
|
return domains
|
|
|
|
|
|
def extract_domains(valid_url) -> set[str]:
|
|
patterns = normalize_patterns(valid_url)
|
|
all_domains = set()
|
|
for pat in patterns:
|
|
all_domains |= extract_from_pattern(pat)
|
|
# final cleanup: remove obvious junk like 'com.com' if present
|
|
cleaned = set()
|
|
for d in all_domains:
|
|
# drop duplicates where left side equals tld (e.g., com.com)
|
|
parts = d.split('.')
|
|
if len(parts) >= 2 and parts[-2] == parts[-1]:
|
|
continue
|
|
cleaned.add(d)
|
|
return cleaned
|
|
|
|
|
|
def _build_supported_domains() -> set[str]:
|
|
global _SUPPORTED_DOMAINS
|
|
if _SUPPORTED_DOMAINS is not None:
|
|
return _SUPPORTED_DOMAINS
|
|
|
|
_SUPPORTED_DOMAINS = set()
|
|
if gen_extractors is None:
|
|
return _SUPPORTED_DOMAINS
|
|
|
|
try:
|
|
for e in gen_extractors():
|
|
name = getattr(e, "IE_NAME", "").lower()
|
|
if name == "generic":
|
|
continue
|
|
regex = getattr(e, "_VALID_URL", None)
|
|
domains = extract_domains(regex)
|
|
_SUPPORTED_DOMAINS.update(domains)
|
|
except Exception:
|
|
pass
|
|
return _SUPPORTED_DOMAINS
|
|
|
|
|
|
def _get_nested(config: Dict[str, Any], *path: str) -> Any:
|
|
cur: Any = config
|
|
for key in path:
|
|
if not isinstance(cur, dict):
|
|
return None
|
|
cur = cur.get(key)
|
|
return cur
|
|
|
|
|
|
def _parse_csv_list(value: Any) -> Optional[List[str]]:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, list):
|
|
out: List[str] = []
|
|
for item in value:
|
|
s = str(item).strip()
|
|
if s:
|
|
out.append(s)
|
|
return out or None
|
|
s = str(value).strip()
|
|
if not s:
|
|
return None
|
|
# allow either JSON-ish list strings or simple comma-separated values
|
|
if s.startswith("[") and s.endswith("]"):
|
|
s = s[1:-1]
|
|
parts = [p.strip() for p in s.split(",")]
|
|
parts = [p for p in parts if p]
|
|
return parts or None
|
|
|
|
|
|
_BROWSER_COOKIES_AVAILABLE: Optional[bool] = None
|
|
_BROWSER_COOKIE_WARNING_EMITTED = False
|
|
|
|
|
|
def _browser_cookie_candidate_paths() -> List[Path]:
|
|
try:
|
|
home = Path.home()
|
|
except Exception:
|
|
home = Path.cwd()
|
|
|
|
candidates: List[Path] = []
|
|
if os.name == "nt":
|
|
for env_value in (os.getenv("LOCALAPPDATA"), os.getenv("APPDATA")):
|
|
if not env_value:
|
|
continue
|
|
base_path = Path(env_value)
|
|
if not base_path:
|
|
continue
|
|
candidates.extend([
|
|
base_path / "Google" / "Chrome" / "User Data" / "Default" / "Cookies",
|
|
base_path / "Chromium" / "User Data" / "Default" / "Cookies",
|
|
base_path / "BraveSoftware" / "Brave-Browser" / "User Data" / "Default" / "Cookies",
|
|
])
|
|
else:
|
|
candidates.extend([
|
|
home / ".config" / "google-chrome" / "Default" / "Cookies",
|
|
home / ".config" / "chromium" / "Default" / "Cookies",
|
|
home / ".config" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies",
|
|
])
|
|
if sys.platform == "darwin":
|
|
candidates.extend([
|
|
home / "Library" / "Application Support" / "Google" / "Chrome" / "Default" / "Cookies",
|
|
home / "Library" / "Application Support" / "Chromium" / "Default" / "Cookies",
|
|
home / "Library" / "Application Support" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies",
|
|
])
|
|
return candidates
|
|
|
|
|
|
def _has_browser_cookie_database() -> bool:
|
|
global _BROWSER_COOKIES_AVAILABLE
|
|
if _BROWSER_COOKIES_AVAILABLE is not None:
|
|
return _BROWSER_COOKIES_AVAILABLE
|
|
|
|
for path in _browser_cookie_candidate_paths():
|
|
try:
|
|
if path.is_file():
|
|
_BROWSER_COOKIES_AVAILABLE = True
|
|
return True
|
|
except Exception:
|
|
continue
|
|
|
|
_BROWSER_COOKIES_AVAILABLE = False
|
|
return False
|
|
|
|
|
|
def _browser_cookie_path_for(browser_name: str) -> Optional[Path]:
|
|
"""Return the cookie DB Path for a specific browser if present, else None.
|
|
|
|
Supported browsers (case-insensitive): "chrome", "chromium", "brave".
|
|
"""
|
|
name = str(browser_name or "").strip().lower()
|
|
if not name:
|
|
return None
|
|
|
|
try:
|
|
home = Path.home()
|
|
except Exception:
|
|
home = Path.cwd()
|
|
|
|
# Windows
|
|
if os.name == "nt":
|
|
for env_value in (os.getenv("LOCALAPPDATA"), os.getenv("APPDATA")):
|
|
if not env_value:
|
|
continue
|
|
base = Path(env_value)
|
|
if name in ("chrome", "google-chrome"):
|
|
p = base / "Google" / "Chrome" / "User Data" / "Default" / "Cookies"
|
|
if p.is_file():
|
|
return p
|
|
if name == "chromium":
|
|
p = base / "Chromium" / "User Data" / "Default" / "Cookies"
|
|
if p.is_file():
|
|
return p
|
|
if name in ("brave", "brave-browser"):
|
|
p = base / "BraveSoftware" / "Brave-Browser" / "User Data" / "Default" / "Cookies"
|
|
if p.is_file():
|
|
return p
|
|
|
|
# *nix and macOS
|
|
if sys.platform == "darwin":
|
|
if name in ("chrome", "google-chrome"):
|
|
p = home / "Library" / "Application Support" / "Google" / "Chrome" / "Default" / "Cookies"
|
|
if p.is_file():
|
|
return p
|
|
if name == "chromium":
|
|
p = home / "Library" / "Application Support" / "Chromium" / "Default" / "Cookies"
|
|
if p.is_file():
|
|
return p
|
|
if name in ("brave", "brave-browser"):
|
|
p = home / "Library" / "Application Support" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies"
|
|
if p.is_file():
|
|
return p
|
|
|
|
# Linux and other
|
|
if name in ("chrome", "google-chrome"):
|
|
p = home / ".config" / "google-chrome" / "Default" / "Cookies"
|
|
if p.is_file():
|
|
return p
|
|
if name == "chromium":
|
|
p = home / ".config" / "chromium" / "Default" / "Cookies"
|
|
if p.is_file():
|
|
return p
|
|
if name in ("brave", "brave-browser"):
|
|
p = home / ".config" / "BraveSoftware" / "Brave-Browser" / "Default" / "Cookies"
|
|
if p.is_file():
|
|
return p
|
|
|
|
return None
|
|
|
|
|
|
def _add_browser_cookies_if_available(options: Dict[str, Any], preferred_browser: Optional[str] = None) -> None:
|
|
global _BROWSER_COOKIE_WARNING_EMITTED
|
|
|
|
# If a preferred browser is specified, try to use it if available
|
|
if preferred_browser:
|
|
try:
|
|
if _browser_cookie_path_for(preferred_browser) is not None:
|
|
options["cookiesfrombrowser"] = [preferred_browser]
|
|
return
|
|
else:
|
|
if not _BROWSER_COOKIE_WARNING_EMITTED:
|
|
log(f"Requested browser cookie DB '{preferred_browser}' not found; falling back to autodetect.")
|
|
_BROWSER_COOKIE_WARNING_EMITTED = True
|
|
except Exception:
|
|
pass
|
|
|
|
# Auto-detect in common order (chrome/chromium/brave)
|
|
for candidate in ("chrome", "chromium", "brave"):
|
|
try:
|
|
if _browser_cookie_path_for(candidate) is not None:
|
|
options["cookiesfrombrowser"] = [candidate]
|
|
return
|
|
except Exception:
|
|
continue
|
|
|
|
if not _BROWSER_COOKIE_WARNING_EMITTED:
|
|
log(
|
|
"Browser cookie extraction skipped because no Chrome-compatible cookie database was found. "
|
|
"Provide a cookies file via config or --cookies if authentication is required."
|
|
)
|
|
_BROWSER_COOKIE_WARNING_EMITTED = True
|
|
|
|
|
|
def ensure_yt_dlp_ready() -> None:
|
|
"""Verify yt-dlp is importable, raising DownloadError if missing."""
|
|
|
|
if yt_dlp is not None:
|
|
return
|
|
|
|
detail = str(YTDLP_IMPORT_ERROR or "yt-dlp is not installed")
|
|
raise DownloadError(f"yt-dlp module not available: {detail}")
|
|
|
|
|
|
def _get_extractors() -> List[Any]:
|
|
global _EXTRACTOR_CACHE
|
|
|
|
if _EXTRACTOR_CACHE is not None:
|
|
return _EXTRACTOR_CACHE
|
|
|
|
ensure_yt_dlp_ready()
|
|
|
|
if gen_extractors is None:
|
|
_EXTRACTOR_CACHE = []
|
|
return _EXTRACTOR_CACHE
|
|
|
|
try:
|
|
_EXTRACTOR_CACHE = [ie for ie in gen_extractors()]
|
|
except Exception:
|
|
_EXTRACTOR_CACHE = []
|
|
|
|
return _EXTRACTOR_CACHE
|
|
|
|
|
|
def is_url_supported_by_ytdlp(url: str) -> bool:
|
|
"""Return True if yt-dlp has a non-generic extractor for the URL."""
|
|
|
|
if not url or not isinstance(url, str):
|
|
return False
|
|
|
|
if YTDLP_IMPORT_ERROR is not None:
|
|
return False
|
|
|
|
try:
|
|
parsed = urlparse(url)
|
|
if not parsed.scheme or not parsed.netloc:
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
try:
|
|
parsed = urlparse(url)
|
|
domain = parsed.netloc.lower()
|
|
if not domain:
|
|
return False
|
|
supported = _build_supported_domains()
|
|
for base in supported:
|
|
if domain == base or domain.endswith("." + base):
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
_FORMATS_CACHE: Dict[str, tuple[float, List[Dict[str, Any]]]] = {}
|
|
|
|
def list_formats(
|
|
url: str,
|
|
*,
|
|
no_playlist: bool = False,
|
|
playlist_items: Optional[str] = None,
|
|
cookiefile: Optional[str] = None,
|
|
timeout_seconds: int = 20,
|
|
) -> Optional[List[Dict[str, Any]]]:
|
|
"""Get available formats for a URL.
|
|
|
|
Returns a list of format dicts or None if unsupported or probing fails.
|
|
"""
|
|
|
|
if not is_url_supported_by_ytdlp(url):
|
|
return None
|
|
|
|
# Cache format probes to avoid redundant network hits
|
|
cache_key = hashlib.md5(f"{url}|{no_playlist}|{playlist_items}|{cookiefile}".encode()).hexdigest()
|
|
now = time.monotonic()
|
|
if cache_key in _FORMATS_CACHE:
|
|
ts, result = _FORMATS_CACHE[cache_key]
|
|
if now - ts < 300: # 5 minute cache for formats
|
|
return result
|
|
|
|
result_container: List[Optional[Any]] = [None, None] # [result, error]
|
|
|
|
def _do_list() -> None:
|
|
try:
|
|
ensure_yt_dlp_ready()
|
|
assert yt_dlp is not None
|
|
|
|
ydl_opts: Dict[str, Any] = {
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"skip_download": True,
|
|
"noprogress": True,
|
|
"socket_timeout": min(10, max(1, int(timeout_seconds))),
|
|
"retries": 2,
|
|
}
|
|
|
|
if cookiefile:
|
|
ydl_opts["cookiefile"] = str(cookiefile)
|
|
else:
|
|
# Best effort attempt to use browser cookies if no file is explicitly passed
|
|
_add_browser_cookies_if_available(ydl_opts)
|
|
|
|
if no_playlist:
|
|
ydl_opts["noplaylist"] = True
|
|
if playlist_items:
|
|
ydl_opts["playlist_items"] = str(playlist_items)
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
|
|
info = ydl.extract_info(url, download=False)
|
|
|
|
if not isinstance(info, dict):
|
|
result_container[0] = None
|
|
return
|
|
|
|
formats = info.get("formats")
|
|
if not isinstance(formats, list):
|
|
result_container[0] = None
|
|
return
|
|
|
|
out: List[Dict[str, Any]] = []
|
|
for fmt in formats:
|
|
if isinstance(fmt, dict):
|
|
out.append(fmt)
|
|
result_container[0] = out
|
|
except Exception as exc:
|
|
debug(f"yt-dlp format probe failed for {url}: {exc}")
|
|
result_container[1] = exc
|
|
|
|
# Use daemon=True so a hung thread doesn't block process exit
|
|
thread = threading.Thread(target=_do_list, daemon=True)
|
|
thread.start()
|
|
thread.join(timeout=max(1, int(timeout_seconds)))
|
|
|
|
if thread.is_alive():
|
|
debug(f"yt-dlp format probe timed out for {url} (>={timeout_seconds}s)")
|
|
return None
|
|
|
|
if result_container[1] is not None:
|
|
return None
|
|
|
|
if result_container[0] is not None:
|
|
_FORMATS_CACHE[cache_key] = (now, cast(List[Dict[str, Any]], result_container[0]))
|
|
|
|
return cast(Optional[List[Dict[str, Any]]], result_container[0])
|
|
|
|
|
|
_PROBE_CACHE: Dict[str, tuple[float, Dict[str, Any]]] = {}
|
|
|
|
def probe_url(
|
|
url: str,
|
|
no_playlist: bool = False,
|
|
timeout_seconds: int = 15,
|
|
*,
|
|
cookiefile: Optional[str] = None,
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Probe URL metadata without downloading.
|
|
|
|
Returns None if unsupported, errors, or times out.
|
|
"""
|
|
|
|
if not is_url_supported_by_ytdlp(url):
|
|
return None
|
|
|
|
# Simple in-memory cache to avoid duplicate probes for the same URL/options in a short window.
|
|
cache_key = hashlib.md5(f"{url}|{no_playlist}|{cookiefile}".encode()).hexdigest()
|
|
now = time.monotonic()
|
|
if cache_key in _PROBE_CACHE:
|
|
ts, result = _PROBE_CACHE[cache_key]
|
|
if now - ts < 60: # 60 second cache
|
|
return result
|
|
|
|
result_container: List[Optional[Any]] = [None, None] # [result, error]
|
|
|
|
def _do_probe() -> None:
|
|
try:
|
|
debug(f"[probe] Starting probe for {url}")
|
|
ensure_yt_dlp_ready()
|
|
|
|
assert yt_dlp is not None
|
|
ydl_opts: Dict[str, Any] = {
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"socket_timeout": 10,
|
|
"retries": 2,
|
|
"skip_download": True,
|
|
"extract_flat": "in_playlist",
|
|
"noprogress": True,
|
|
}
|
|
|
|
if cookiefile:
|
|
ydl_opts["cookiefile"] = str(cookiefile)
|
|
else:
|
|
# Best effort fallback
|
|
_add_browser_cookies_if_available(ydl_opts)
|
|
|
|
if no_playlist:
|
|
ydl_opts["noplaylist"] = True
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
|
|
debug(f"[probe] ytdlp extract_info (download=False) start: {url}")
|
|
info = ydl.extract_info(url, download=False)
|
|
debug(f"[probe] ytdlp extract_info (download=False) done: {url}")
|
|
|
|
if not isinstance(info, dict):
|
|
result_container[0] = None
|
|
return
|
|
|
|
webpage_url = info.get("webpage_url") or info.get("original_url") or info.get("url")
|
|
|
|
result_container[0] = {
|
|
"extractor": info.get("extractor", ""),
|
|
"title": info.get("title", ""),
|
|
"entries": info.get("entries", []),
|
|
"duration": info.get("duration"),
|
|
"uploader": info.get("uploader"),
|
|
"description": info.get("description"),
|
|
"requested_url": url,
|
|
"webpage_url": webpage_url,
|
|
"url": webpage_url or url,
|
|
}
|
|
except Exception as exc:
|
|
debug(f"Probe error for {url}: {exc}")
|
|
result_container[1] = exc
|
|
|
|
# Use daemon=True so a hung probe doesn't block the process
|
|
thread = threading.Thread(target=_do_probe, daemon=True)
|
|
thread.start()
|
|
thread.join(timeout=timeout_seconds)
|
|
|
|
if thread.is_alive():
|
|
debug(f"Probe timeout for {url} (>={timeout_seconds}s), proceeding without probe")
|
|
return None
|
|
|
|
if result_container[1] is not None:
|
|
return None
|
|
|
|
if result_container[0] is not None:
|
|
_PROBE_CACHE[cache_key] = (now, cast(Dict[str, Any], result_container[0]))
|
|
|
|
return cast(Optional[Dict[str, Any]], result_container[0])
|
|
|
|
|
|
def is_browseable_format(fmt: Any) -> bool:
|
|
"""Check if a format is user-browseable (not storyboard, metadata, etc).
|
|
|
|
Used by the ytdlp format selector to filter out non-downloadable formats.
|
|
Returns False for:
|
|
- MHTML, JSON sidecar metadata
|
|
- Storyboard/thumbnail formats
|
|
- Audio-only or video-only when both available
|
|
|
|
Args:
|
|
fmt: Format dict from yt-dlp with keys like format_id, ext, vcodec, acodec, format_note
|
|
|
|
Returns:
|
|
bool: True if format is suitable for browsing/selection
|
|
"""
|
|
if not isinstance(fmt, dict):
|
|
return False
|
|
|
|
format_id = str(fmt.get("format_id") or "").strip()
|
|
if not format_id:
|
|
return False
|
|
|
|
# Filter out metadata/sidecar formats
|
|
ext = str(fmt.get("ext") or "").strip().lower()
|
|
if ext in {"mhtml", "json"}:
|
|
return False
|
|
|
|
# Filter out storyboard/thumbnail formats
|
|
note = str(fmt.get("format_note") or "").lower()
|
|
if "storyboard" in note:
|
|
return False
|
|
|
|
if format_id.lower().startswith("sb"):
|
|
return False
|
|
|
|
# Filter out formats with no audio and no video
|
|
vcodec = str(fmt.get("vcodec", "none"))
|
|
acodec = str(fmt.get("acodec", "none"))
|
|
return not (vcodec == "none" and acodec == "none")
|
|
|
|
|
|
def format_for_table_selection(
|
|
fmt: Dict[str, Any],
|
|
url: str,
|
|
index: int,
|
|
*,
|
|
selection_format_id: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Format a yt-dlp format dict into a table result row for selection.
|
|
|
|
This helper formats a single format from list_formats() into the shape
|
|
expected by the ResultTable system, ready for user selection and routing
|
|
to download-file with -query "format:<id>".
|
|
|
|
Args:
|
|
fmt: Format dict from yt-dlp
|
|
url: The URL this format came from
|
|
index: Row number for display (1-indexed)
|
|
selection_format_id: Override format_id for selection (e.g., with +ba suffix)
|
|
|
|
Returns:
|
|
dict: Format result row with _selection_args for table system
|
|
|
|
Example:
|
|
fmts = list_formats("https://youtube.com/watch?v=abc")
|
|
browseable = [f for f in fmts if is_browseable_format(f)]
|
|
results = [format_for_table_selection(f, url, i+1) for i, f in enumerate(browseable)]
|
|
"""
|
|
format_id = fmt.get("format_id", "")
|
|
resolution = fmt.get("resolution", "")
|
|
ext = fmt.get("ext", "")
|
|
vcodec = fmt.get("vcodec", "none")
|
|
acodec = fmt.get("acodec", "none")
|
|
filesize = fmt.get("filesize")
|
|
filesize_approx = fmt.get("filesize_approx")
|
|
|
|
# If not provided, compute selection format ID (add +ba for video-only)
|
|
if selection_format_id is None:
|
|
selection_format_id = format_id
|
|
try:
|
|
if vcodec != "none" and acodec == "none" and format_id:
|
|
selection_format_id = f"{format_id}+ba"
|
|
except Exception:
|
|
pass
|
|
|
|
# Format file size
|
|
size_str = ""
|
|
size_prefix = ""
|
|
size_bytes = filesize or filesize_approx
|
|
try:
|
|
if isinstance(size_bytes, (int, float)) and size_bytes > 0:
|
|
size_mb = float(size_bytes) / (1024 * 1024)
|
|
size_str = f"{size_prefix}{size_mb:.1f}MB"
|
|
except Exception:
|
|
pass
|
|
|
|
# Build description
|
|
desc_parts: List[str] = []
|
|
if resolution and resolution != "audio only":
|
|
desc_parts.append(resolution)
|
|
if ext:
|
|
desc_parts.append(str(ext).upper())
|
|
if vcodec != "none":
|
|
desc_parts.append(f"v:{vcodec}")
|
|
if acodec != "none":
|
|
desc_parts.append(f"a:{acodec}")
|
|
if size_str:
|
|
desc_parts.append(size_str)
|
|
format_desc = " | ".join(desc_parts)
|
|
|
|
# Build table row
|
|
return {
|
|
"table": "download-file",
|
|
"title": f"Format {format_id}",
|
|
"url": url,
|
|
"target": url,
|
|
"detail": format_desc,
|
|
"annotations": [ext, resolution] if resolution else [ext],
|
|
"media_kind": "format",
|
|
"columns": [
|
|
("ID", format_id),
|
|
("Resolution", resolution or "N/A"),
|
|
("Ext", ext),
|
|
("Size", size_str or ""),
|
|
("Video", vcodec),
|
|
("Audio", acodec),
|
|
],
|
|
"full_metadata": {
|
|
"format_id": format_id,
|
|
"url": url,
|
|
"item_selector": selection_format_id,
|
|
"_selection_args": ["-query", f"format:{selection_format_id}"],
|
|
},
|
|
"_selection_args": ["-query", f"format:{selection_format_id}"],
|
|
}
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class YtDlpDefaults:
|
|
"""User-tunable defaults for yt-dlp behavior.
|
|
|
|
Recommended config.conf keys (top-level dotted keys):
|
|
- format="best|1080|720|640|audio"
|
|
- ytdlp.format_sort="res:2160,res:1440,res:1080,res:720,res"
|
|
|
|
Cookies:
|
|
- cookies="C:\\path\\cookies.txt" (already supported by config.resolve_cookies_path)
|
|
- cookies_from_browser="auto|none|chrome|brave|chromium"
|
|
"""
|
|
|
|
format: str = "best"
|
|
video_format: str = "bestvideo+bestaudio/best"
|
|
audio_format: str = "251/140/bestaudio"
|
|
format_sort: Optional[List[str]] = None
|
|
cookies_from_browser: Optional[str] = None
|
|
|
|
|
|
class YtDlpTool:
|
|
"""Centralizes yt-dlp defaults and translation helpers.
|
|
|
|
This is intentionally small and dependency-light so cmdlets can use it without
|
|
forcing a full refactor.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Optional[Dict[str,
|
|
Any]] = None,
|
|
*,
|
|
script_dir: Optional[Path] = None
|
|
) -> None:
|
|
self._config: Dict[str,
|
|
Any] = dict(config or {})
|
|
# `resolve_cookies_path` expects the app root so it can fall back to ./cookies.txt.
|
|
# This file lives under ./tool/, so default to the parent directory.
|
|
self._script_dir = script_dir or Path(__file__).resolve().parent.parent
|
|
self.defaults = self._load_defaults()
|
|
self._cookiefile: Optional[Path] = self._init_cookiefile()
|
|
|
|
def _init_cookiefile(self) -> Optional[Path]:
|
|
"""Resolve cookies once at tool init (yt-dlp is the primary consumer)."""
|
|
try:
|
|
from SYS.config import resolve_cookies_path
|
|
|
|
resolved = resolve_cookies_path(self._config, script_dir=self._script_dir)
|
|
if resolved is not None and resolved.is_file():
|
|
return resolved
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def resolve_height_selector(self, format_str: Optional[str]) -> Optional[str]:
|
|
"""Resolve numeric heights (720, 1080p) to yt-dlp height selectors.
|
|
|
|
Examples:
|
|
"720" -> "bv*[height<=720]+ba"
|
|
"1080p" -> "bv*[height<=1080]+ba"
|
|
"""
|
|
if not format_str or not isinstance(format_str, str):
|
|
return None
|
|
|
|
s = format_str.strip().lower()
|
|
if not s:
|
|
return None
|
|
|
|
# Strip trailing 'p' if present (e.g. 720p -> 720)
|
|
if s.endswith('p'):
|
|
s = s[:-1]
|
|
if s.isdigit():
|
|
height = int(s)
|
|
if height >= 144:
|
|
return f"bv*[height<={height}]+ba"
|
|
return None
|
|
|
|
def _load_defaults(self) -> YtDlpDefaults:
|
|
cfg = self._config
|
|
|
|
# NOTE: `YtDlpDefaults` is a slots dataclass. Referencing defaults via
|
|
# `YtDlpDefaults.video_format` yields a `member_descriptor`, not the
|
|
# default string value. Use an instance for fallback defaults.
|
|
_fallback_defaults = YtDlpDefaults()
|
|
|
|
tool_block = _get_nested(cfg, "tool", "ytdlp")
|
|
if not isinstance(tool_block, dict):
|
|
tool_block = {}
|
|
|
|
ytdlp_block = cfg.get("ytdlp") if isinstance(cfg.get("ytdlp"),
|
|
dict) else {}
|
|
if not isinstance(ytdlp_block, dict):
|
|
ytdlp_block = {}
|
|
|
|
# Accept both nested and flat styles.
|
|
video_format = (
|
|
tool_block.get("video_format") or tool_block.get("format")
|
|
or ytdlp_block.get("video_format") or ytdlp_block.get("video")
|
|
or ytdlp_block.get("format_video") or cfg.get("ytdlp_video_format")
|
|
)
|
|
audio_format = (
|
|
tool_block.get("audio_format") or ytdlp_block.get("audio_format")
|
|
or ytdlp_block.get("audio") or ytdlp_block.get("format_audio")
|
|
or cfg.get("ytdlp_audio_format")
|
|
)
|
|
|
|
# Also accept dotted keys written as nested dicts: ytdlp.format.video, ytdlp.format.audio
|
|
nested_video = _get_nested(cfg, "ytdlp", "format", "video")
|
|
nested_audio = _get_nested(cfg, "ytdlp", "format", "audio")
|
|
|
|
fmt_sort_val = (
|
|
tool_block.get("format_sort") or ytdlp_block.get("format_sort")
|
|
or ytdlp_block.get("formatSort") or cfg.get("ytdlp_format_sort")
|
|
or _get_nested(cfg,
|
|
"ytdlp",
|
|
"format",
|
|
"sort")
|
|
)
|
|
fmt_sort = _parse_csv_list(fmt_sort_val)
|
|
|
|
# Cookie source preference: allow forcing a browser DB or 'auto'/'none'
|
|
cookies_pref = (
|
|
tool_block.get("cookies_from_browser")
|
|
or tool_block.get("cookiesfrombrowser")
|
|
or ytdlp_block.get("cookies_from_browser")
|
|
or ytdlp_block.get("cookiesfrombrowser")
|
|
or cfg.get("ytdlp_cookies_from_browser")
|
|
or _get_nested(cfg, "ytdlp", "cookies_from_browser")
|
|
)
|
|
|
|
# Unified format preference: prefer explicit 'format' key but accept legacy keys
|
|
format_pref = (
|
|
tool_block.get("format")
|
|
or tool_block.get("video_format")
|
|
or ytdlp_block.get("format")
|
|
or ytdlp_block.get("video_format")
|
|
or cfg.get("ytdlp_format")
|
|
or cfg.get("ytdlp_video_format")
|
|
or _get_nested(cfg, "ytdlp", "format")
|
|
)
|
|
|
|
defaults = YtDlpDefaults(
|
|
format=str(format_pref).strip() if format_pref else "best",
|
|
video_format=str(
|
|
nested_video or video_format or _fallback_defaults.video_format
|
|
),
|
|
audio_format=str(
|
|
nested_audio or audio_format or _fallback_defaults.audio_format
|
|
),
|
|
format_sort=fmt_sort,
|
|
cookies_from_browser=(str(cookies_pref).strip() if cookies_pref else None),
|
|
)
|
|
|
|
return defaults
|
|
|
|
|
|
|
|
def resolve_cookiefile(self) -> Optional[Path]:
|
|
return self._cookiefile
|
|
|
|
def default_format(self, mode: str) -> str:
|
|
"""Determine the final yt-dlp format string.
|
|
|
|
Priority:
|
|
- If caller explicitly requested audio mode (mode == 'audio'), return audio format.
|
|
- If configured default format is 'audio', return audio format.
|
|
- If configured default is 'best' or blank, return video_format.
|
|
- Otherwise return the configured format value (e.g., '720').
|
|
"""
|
|
m = str(mode or "").lower().strip()
|
|
if m == "audio":
|
|
return self.defaults.audio_format
|
|
|
|
cfg = (str(self.defaults.format or "")).strip()
|
|
lc = cfg.lower()
|
|
if lc == "audio":
|
|
return self.defaults.audio_format
|
|
if not cfg or lc == "best":
|
|
return self.defaults.video_format
|
|
return cfg
|
|
|
|
def build_ytdlp_options(self, opts: DownloadOptions) -> Dict[str, Any]:
|
|
"""Translate DownloadOptions into yt-dlp API options."""
|
|
ensure_directory(opts.output_dir)
|
|
outtmpl = str((opts.output_dir / "%(title)s.%(ext)s").resolve())
|
|
base_options: Dict[str,
|
|
Any] = {
|
|
"outtmpl": outtmpl,
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"noprogress": True,
|
|
"socket_timeout": 30,
|
|
"retries": 10,
|
|
"fragment_retries": 10,
|
|
"http_chunk_size": 10_485_760,
|
|
"restrictfilenames": True,
|
|
}
|
|
|
|
try:
|
|
repo_root = Path(__file__).resolve().parents[1]
|
|
bundled_ffmpeg_dir = repo_root / "MPV" / "ffmpeg" / "bin"
|
|
if bundled_ffmpeg_dir.exists():
|
|
base_options.setdefault("ffmpeg_location", str(bundled_ffmpeg_dir))
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
if os.name == "nt":
|
|
base_options.setdefault("file_access_retries", 40)
|
|
except Exception:
|
|
pass
|
|
|
|
if opts.cookies_path and opts.cookies_path.is_file():
|
|
base_options["cookiefile"] = str(opts.cookies_path)
|
|
else:
|
|
cookiefile = self.resolve_cookiefile()
|
|
if cookiefile is not None and cookiefile.is_file():
|
|
base_options["cookiefile"] = str(cookiefile)
|
|
else:
|
|
# Respect configured browser cookie preference if provided; otherwise fall back to auto-detect.
|
|
pref = (self.defaults.cookies_from_browser or "").lower().strip()
|
|
if pref:
|
|
if pref in {"none", "off", "false"}:
|
|
# Explicitly disabled
|
|
pass
|
|
elif pref in {"auto", "detect"}:
|
|
_add_browser_cookies_if_available(base_options)
|
|
else:
|
|
# Try the preferred browser first; fall back to auto-detect if not present
|
|
_add_browser_cookies_if_available(base_options, preferred_browser=pref)
|
|
else:
|
|
# Add browser cookies support "just in case" if no file found (best effort)
|
|
_add_browser_cookies_if_available(base_options)
|
|
|
|
# Special handling for format keywords explicitly passed in via options
|
|
if opts.ytdl_format == "audio":
|
|
try:
|
|
opts = opts._replace(mode="audio", ytdl_format=None)
|
|
except Exception:
|
|
try:
|
|
import dataclasses as _dc
|
|
|
|
opts = _dc.replace(opts, mode="audio", ytdl_format=None)
|
|
except Exception:
|
|
pass
|
|
elif opts.ytdl_format == "video":
|
|
try:
|
|
opts = opts._replace(mode="video", ytdl_format=None)
|
|
except Exception:
|
|
try:
|
|
import dataclasses as _dc
|
|
|
|
opts = _dc.replace(opts, mode="video", ytdl_format=None)
|
|
except Exception:
|
|
pass
|
|
|
|
if opts.no_playlist:
|
|
base_options["noplaylist"] = True
|
|
|
|
# If no explicit format was provided, honor the configured default format
|
|
ytdl_format = opts.ytdl_format
|
|
if not ytdl_format:
|
|
configured_format = (str(self.defaults.format or "")).strip()
|
|
if configured_format:
|
|
if configured_format.lower() == "audio":
|
|
# Default to audio-only downloads
|
|
try:
|
|
opts = opts._replace(mode="audio")
|
|
except Exception:
|
|
try:
|
|
import dataclasses as _dc
|
|
|
|
opts = _dc.replace(opts, mode="audio")
|
|
except Exception:
|
|
pass
|
|
ytdl_format = None
|
|
else:
|
|
# Leave ytdl_format None so that default_format(opts.mode)
|
|
# returns the configured format literally (e.g., '720') and
|
|
# we don't auto-convert it to an internal selector.
|
|
pass
|
|
|
|
if ytdl_format and opts.mode != "audio":
|
|
resolved = self.resolve_height_selector(ytdl_format)
|
|
if resolved:
|
|
ytdl_format = resolved
|
|
|
|
fmt = ytdl_format or self.default_format(opts.mode)
|
|
base_options["format"] = fmt
|
|
|
|
if opts.mode == "audio":
|
|
base_options["postprocessors"] = [{
|
|
"key": "FFmpegExtractAudio"
|
|
}]
|
|
|
|
if opts.mode != "audio":
|
|
format_sort = self.defaults.format_sort or [
|
|
"res:4320",
|
|
"res:2880",
|
|
"res:2160",
|
|
"res:1440",
|
|
"res:1080",
|
|
"res:720",
|
|
"res",
|
|
]
|
|
base_options["format_sort"] = format_sort
|
|
|
|
if getattr(opts, "embed_chapters", False):
|
|
pps = base_options.get("postprocessors")
|
|
if not isinstance(pps, list):
|
|
pps = []
|
|
already_has_metadata = any(
|
|
isinstance(pp,
|
|
dict) and str(pp.get("key") or "") == "FFmpegMetadata"
|
|
for pp in pps
|
|
)
|
|
if not already_has_metadata:
|
|
pps.append(
|
|
{
|
|
"key": "FFmpegMetadata",
|
|
"add_metadata": True,
|
|
"add_chapters": True,
|
|
"add_infojson": "if_exists",
|
|
}
|
|
)
|
|
base_options["postprocessors"] = pps
|
|
|
|
if opts.mode != "audio":
|
|
base_options.setdefault("merge_output_format", "mkv")
|
|
|
|
if getattr(opts, "write_sub", False):
|
|
base_options["writesubtitles"] = True
|
|
base_options["writeautomaticsub"] = True
|
|
base_options["subtitlesformat"] = "vtt"
|
|
|
|
if opts.clip_sections:
|
|
sections: List[str] = []
|
|
|
|
def _secs_to_hms(seconds: float) -> str:
|
|
total = max(0, int(seconds))
|
|
minutes, secs = divmod(total, 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
|
|
|
for section_range in str(opts.clip_sections).split(","):
|
|
section_range = section_range.strip()
|
|
if not section_range:
|
|
continue
|
|
try:
|
|
start_s_raw, end_s_raw = section_range.split("-", 1)
|
|
start_s = float(start_s_raw.strip())
|
|
end_s = float(end_s_raw.strip())
|
|
if start_s >= end_s:
|
|
continue
|
|
sections.append(f"*{_secs_to_hms(start_s)}-{_secs_to_hms(end_s)}")
|
|
except (ValueError, AttributeError):
|
|
continue
|
|
|
|
if sections:
|
|
base_options["download_sections"] = sections
|
|
# Clipped outputs should begin with a keyframe; otherwise players (notably mpv)
|
|
# can show audio before video or a black screen until the next keyframe.
|
|
# yt-dlp implements this by forcing keyframes at cut points.
|
|
base_options["force_keyframes_at_cuts"] = True
|
|
debug(f"Download sections configured: {', '.join(sections)}")
|
|
|
|
if opts.playlist_items:
|
|
base_options["playlist_items"] = opts.playlist_items
|
|
|
|
if not opts.quiet:
|
|
debug(f"yt-dlp: mode={opts.mode}, format={base_options.get('format')}")
|
|
|
|
return base_options
|
|
|
|
def build_yt_dlp_cli_args(
|
|
self,
|
|
*,
|
|
url: str,
|
|
output_dir: Optional[Path] = None,
|
|
ytdl_format: Optional[str] = None,
|
|
playlist_items: Optional[str] = None,
|
|
no_playlist: bool = False,
|
|
quiet: bool = True,
|
|
extra_args: Optional[Sequence[str]] = None,
|
|
) -> List[str]:
|
|
"""Build a yt-dlp command line (argv list).
|
|
|
|
This is primarily for debug output or subprocess execution.
|
|
"""
|
|
argv: List[str] = ["yt-dlp"]
|
|
if quiet:
|
|
argv.extend(["--quiet", "--no-warnings"])
|
|
argv.append("--no-progress")
|
|
|
|
cookiefile = self.resolve_cookiefile()
|
|
if cookiefile is not None:
|
|
argv.extend(["--cookies", str(cookiefile)])
|
|
|
|
if no_playlist:
|
|
argv.append("--no-playlist")
|
|
if playlist_items:
|
|
argv.extend(["--playlist-items", str(playlist_items)])
|
|
|
|
fmt = (ytdl_format or "").strip()
|
|
if fmt:
|
|
# Use long form to avoid confusion with app-level flags.
|
|
argv.extend(["--format", fmt])
|
|
|
|
if self.defaults.format_sort:
|
|
for sort_key in self.defaults.format_sort:
|
|
argv.extend(["-S", sort_key])
|
|
|
|
if output_dir is not None:
|
|
outtmpl = str((output_dir / "%(title)s.%(ext)s").resolve())
|
|
argv.extend(["-o", outtmpl])
|
|
|
|
if extra_args:
|
|
argv.extend([str(a) for a in extra_args if str(a).strip()])
|
|
|
|
argv.append(str(url))
|
|
return argv
|
|
|
|
def debug_print_cli(self, argv: Sequence[str]) -> None:
|
|
try:
|
|
debug("yt-dlp argv: " + " ".join(str(a) for a in argv))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def config_schema() -> List[Dict[str, Any]]:
|
|
"""Return a schema describing editable YT-DLP tool defaults for the config UI."""
|
|
format_choices = [
|
|
"best",
|
|
"1080",
|
|
"720",
|
|
"640",
|
|
"audio",
|
|
]
|
|
|
|
# Offer browser choices depending on what's present on the host system
|
|
browser_choices = ["auto", "none"]
|
|
for b in ("chrome", "chromium", "brave"):
|
|
try:
|
|
if _browser_cookie_path_for(b) is not None:
|
|
browser_choices.append(b)
|
|
except Exception:
|
|
continue
|
|
|
|
return [
|
|
{
|
|
"key": "format",
|
|
"label": "Default format",
|
|
"default": YtDlpDefaults.format,
|
|
"choices": format_choices,
|
|
},
|
|
{
|
|
"key": "cookies",
|
|
"label": "Cookie file (path)",
|
|
"default": "",
|
|
},
|
|
{
|
|
"key": "cookies_from_browser",
|
|
"label": "Browser cookie source (used if no cookie file)",
|
|
"default": "auto",
|
|
"choices": browser_choices,
|
|
},
|
|
]
|
|
|
|
# Progress + utility helpers for yt-dlp driven downloads (previously in cmdlet/download_media).
|
|
_YTDLP_PROGRESS_BAR = ProgressBar()
|
|
_YTDLP_PROGRESS_ACTIVITY_LOCK = threading.Lock()
|
|
_YTDLP_PROGRESS_LAST_ACTIVITY = 0.0
|
|
_SUBTITLE_EXTS = (".vtt", ".srt", ".ass", ".ssa", ".lrc")
|
|
|
|
|
|
def _progress_label(status: Optional[Dict[str, Any]]) -> str:
|
|
if not status:
|
|
return "unknown"
|
|
raw_info = status.get("info_dict")
|
|
info_dict = raw_info if isinstance(raw_info, dict) else {}
|
|
|
|
candidates = [
|
|
status.get("filename"),
|
|
info_dict.get("_filename"),
|
|
info_dict.get("filepath"),
|
|
info_dict.get("title"),
|
|
info_dict.get("id"),
|
|
]
|
|
|
|
for cand in candidates:
|
|
if not cand:
|
|
continue
|
|
try:
|
|
name = Path(str(cand)).name
|
|
except Exception:
|
|
name = str(cand)
|
|
label = str(name or "").strip()
|
|
if label:
|
|
return label
|
|
|
|
return "download"
|
|
|
|
|
|
def _record_progress_activity(timestamp: Optional[float] = None) -> None:
|
|
global _YTDLP_PROGRESS_LAST_ACTIVITY
|
|
with _YTDLP_PROGRESS_ACTIVITY_LOCK:
|
|
_YTDLP_PROGRESS_LAST_ACTIVITY = timestamp if timestamp is not None else time.monotonic()
|
|
|
|
|
|
def _get_last_progress_activity() -> float:
|
|
with _YTDLP_PROGRESS_ACTIVITY_LOCK:
|
|
return _YTDLP_PROGRESS_LAST_ACTIVITY
|
|
|
|
|
|
def _clear_progress_activity() -> None:
|
|
_record_progress_activity(0.0)
|
|
|
|
|
|
def _live_ui_and_pipe_index() -> tuple[Optional[Any], int]:
|
|
ui = None
|
|
try:
|
|
ui = pipeline_context.get_live_progress() if hasattr(pipeline_context, "get_live_progress") else None
|
|
except Exception:
|
|
ui = None
|
|
|
|
pipe_idx: int = 0
|
|
try:
|
|
stage_ctx = pipeline_context.get_stage_context() if hasattr(pipeline_context, "get_stage_context") else None
|
|
maybe_idx = getattr(stage_ctx, "pipe_index", None) if stage_ctx is not None else None
|
|
if isinstance(maybe_idx, int):
|
|
pipe_idx = int(maybe_idx)
|
|
except Exception:
|
|
pipe_idx = 0
|
|
|
|
return ui, pipe_idx
|
|
|
|
|
|
def _begin_live_steps(total_steps: int) -> None:
|
|
ui, pipe_idx = _live_ui_and_pipe_index()
|
|
if ui is None:
|
|
return
|
|
try:
|
|
begin = getattr(ui, "begin_pipe_steps", None)
|
|
if callable(begin):
|
|
begin(int(pipe_idx), total_steps=int(total_steps))
|
|
except Exception:
|
|
return
|
|
|
|
|
|
def _step(text: str) -> None:
|
|
ui, pipe_idx = _live_ui_and_pipe_index()
|
|
if ui is None:
|
|
return
|
|
try:
|
|
adv = getattr(ui, "advance_pipe_step", None)
|
|
if callable(adv):
|
|
adv(int(pipe_idx), str(text))
|
|
except Exception:
|
|
return
|
|
|
|
|
|
def _set_pipe_percent(percent: int) -> None:
|
|
ui, pipe_idx = _live_ui_and_pipe_index()
|
|
if ui is None:
|
|
return
|
|
try:
|
|
set_pct = getattr(ui, "set_pipe_percent", None)
|
|
if callable(set_pct):
|
|
set_pct(int(pipe_idx), int(percent))
|
|
except Exception:
|
|
return
|
|
|
|
|
|
def _format_chapters_note(info: Dict[str, Any]) -> Optional[str]:
|
|
"""Format yt-dlp chapter metadata into a stable, note-friendly text."""
|
|
try:
|
|
chapters = info.get("chapters")
|
|
except Exception:
|
|
chapters = None
|
|
|
|
if not isinstance(chapters, list) or not chapters:
|
|
return None
|
|
|
|
rows: List[tuple[int, Optional[int], str]] = []
|
|
max_t = 0
|
|
for ch in chapters:
|
|
if not isinstance(ch, dict):
|
|
continue
|
|
start_raw = ch.get("start_time")
|
|
end_raw = ch.get("end_time")
|
|
title_raw = ch.get("title") or ch.get("name") or ch.get("chapter")
|
|
|
|
try:
|
|
if start_raw is None:
|
|
continue
|
|
start_s = int(float(start_raw))
|
|
except Exception:
|
|
continue
|
|
|
|
end_s: Optional[int] = None
|
|
try:
|
|
if end_raw is not None:
|
|
end_s = int(float(end_raw))
|
|
except Exception:
|
|
end_s = None
|
|
|
|
title = str(title_raw).strip() if title_raw is not None else ""
|
|
rows.append((start_s, end_s, title))
|
|
try:
|
|
max_t = max(max_t, start_s, end_s or 0)
|
|
except Exception:
|
|
max_t = max(max_t, start_s)
|
|
|
|
if not rows:
|
|
return None
|
|
|
|
force_hours = bool(max_t >= 3600)
|
|
|
|
def _tc(seconds: int) -> str:
|
|
total = max(0, int(seconds))
|
|
minutes, secs = divmod(total, 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
if force_hours:
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
|
return f"{minutes:02d}:{secs:02d}"
|
|
|
|
lines: List[str] = []
|
|
for start_s, end_s, title in sorted(
|
|
rows, key=lambda r: (r[0], r[1] if r[1] is not None else 10**9, r[2])
|
|
):
|
|
if end_s is not None and end_s > start_s:
|
|
prefix = f"{_tc(start_s)}-{_tc(end_s)}"
|
|
else:
|
|
prefix = _tc(start_s)
|
|
line = f"{prefix} {title}".strip()
|
|
if line:
|
|
lines.append(line)
|
|
|
|
text = "\n".join(lines).strip()
|
|
return text or None
|
|
|
|
|
|
def _best_subtitle_sidecar(media_path: Path) -> Optional[Path]:
|
|
"""Find the most likely subtitle sidecar file for a downloaded media file."""
|
|
try:
|
|
base_dir = media_path.parent
|
|
stem = media_path.stem
|
|
if not stem:
|
|
return None
|
|
|
|
candidates: List[Path] = []
|
|
for p in base_dir.glob(stem + ".*"):
|
|
try:
|
|
if not p.is_file():
|
|
continue
|
|
except Exception:
|
|
continue
|
|
if p.suffix.lower() in _SUBTITLE_EXTS:
|
|
candidates.append(p)
|
|
|
|
preferred_order = [".vtt", ".srt", ".ass", ".ssa", ".lrc"]
|
|
for ext in preferred_order:
|
|
for p in candidates:
|
|
if p.suffix.lower() == ext:
|
|
return p
|
|
|
|
return candidates[0] if candidates else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _read_text_file(path: Path) -> Optional[str]:
|
|
try:
|
|
return path.read_text(encoding="utf-8", errors="ignore")
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _download_with_sections_via_cli(
|
|
url: str,
|
|
ytdl_options: Dict[str, Any],
|
|
sections: List[str],
|
|
quiet: bool = False,
|
|
) -> tuple[Optional[str], Dict[str, Any]]:
|
|
sections_list = ytdl_options.get("download_sections", [])
|
|
if not sections_list:
|
|
return "", {}
|
|
|
|
pipeline = PipelineProgress(pipeline_context)
|
|
|
|
class _SectionProgressSimulator:
|
|
def __init__(self, start_pct: int, max_pct: int, interval: float = 0.5) -> None:
|
|
self._start_pct = max(0, min(int(start_pct), 99))
|
|
self._max_pct = max(self._start_pct, min(int(max_pct), 98))
|
|
self._interval = max(0.1, float(interval))
|
|
self._stop_event = threading.Event()
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
|
def _run(self) -> None:
|
|
current = self._start_pct
|
|
while not self._stop_event.wait(self._interval):
|
|
if current < self._max_pct:
|
|
current += 1
|
|
try:
|
|
_set_pipe_percent(current)
|
|
except Exception:
|
|
pass
|
|
|
|
def start(self) -> None:
|
|
if self._thread is not None or self._start_pct >= self._max_pct:
|
|
return
|
|
self._thread = threading.Thread(target=self._run, daemon=True)
|
|
self._thread.start()
|
|
|
|
def stop(self) -> None:
|
|
self._stop_event.set()
|
|
if self._thread is not None:
|
|
self._thread.join(timeout=0.5)
|
|
self._thread = None
|
|
try:
|
|
_set_pipe_percent(self._max_pct)
|
|
except Exception:
|
|
pass
|
|
|
|
session_id = hashlib.md5((url + str(time.time()) + "".join(random.choices(string.ascii_letters, k=10))).encode()).hexdigest()[:12]
|
|
first_section_info = None
|
|
|
|
total_sections = len(sections_list)
|
|
try:
|
|
for section_idx, section in enumerate(sections_list, 1):
|
|
display_pct = 50
|
|
if total_sections > 0:
|
|
display_pct = 50 + int(((section_idx - 1) / max(1, total_sections)) * 49)
|
|
try:
|
|
_set_pipe_percent(display_pct)
|
|
except Exception:
|
|
pass
|
|
|
|
pipeline.set_status(f"Downloading & clipping clip section {section_idx}/{total_sections}")
|
|
|
|
base_outtmpl = ytdl_options.get("outtmpl", "%(title)s.%(ext)s")
|
|
output_dir_path = Path(base_outtmpl).parent
|
|
filename_tmpl = f"{session_id}_{section_idx}"
|
|
if base_outtmpl.endswith(".%(ext)s"):
|
|
filename_tmpl += ".%(ext)s"
|
|
section_outtmpl = str(output_dir_path / filename_tmpl)
|
|
|
|
if section_idx == 1:
|
|
metadata_cmd = ["yt-dlp", "--dump-json", "--skip-download"]
|
|
if ytdl_options.get("cookiefile"):
|
|
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
|
|
metadata_cmd.extend(["--cookies", cookies_path])
|
|
if ytdl_options.get("noplaylist"):
|
|
metadata_cmd.append("--no-playlist")
|
|
metadata_cmd.append(url)
|
|
try:
|
|
meta_result = subprocess.run(metadata_cmd, capture_output=True, text=True)
|
|
if meta_result.returncode == 0 and meta_result.stdout:
|
|
try:
|
|
info_dict = json.loads(meta_result.stdout.strip())
|
|
first_section_info = info_dict
|
|
if not quiet:
|
|
debug(f"Extracted title from metadata: {info_dict.get('title')}")
|
|
except json.JSONDecodeError:
|
|
if not quiet:
|
|
debug("Could not parse JSON metadata")
|
|
except Exception as exc:
|
|
if not quiet:
|
|
debug(f"Error extracting metadata: {exc}")
|
|
|
|
cmd = ["yt-dlp"]
|
|
if quiet:
|
|
cmd.append("--quiet")
|
|
cmd.append("--no-warnings")
|
|
cmd.append("--no-progress")
|
|
cmd.extend(["--postprocessor-args", "ffmpeg:-hide_banner -loglevel error"])
|
|
if ytdl_options.get("ffmpeg_location"):
|
|
try:
|
|
cmd.extend(["--ffmpeg-location", str(ytdl_options["ffmpeg_location"])])
|
|
except Exception:
|
|
pass
|
|
if ytdl_options.get("format"):
|
|
cmd.extend(["-f", ytdl_options["format"]])
|
|
if ytdl_options.get("merge_output_format"):
|
|
cmd.extend(["--merge-output-format", str(ytdl_options["merge_output_format"])])
|
|
|
|
postprocessors = ytdl_options.get("postprocessors")
|
|
want_add_metadata = bool(ytdl_options.get("addmetadata"))
|
|
want_embed_chapters = bool(ytdl_options.get("embedchapters"))
|
|
if isinstance(postprocessors, list):
|
|
for pp in postprocessors:
|
|
if not isinstance(pp, dict):
|
|
continue
|
|
if str(pp.get("key") or "") == "FFmpegMetadata":
|
|
want_add_metadata = True
|
|
if bool(pp.get("add_chapters", True)):
|
|
want_embed_chapters = True
|
|
|
|
if want_add_metadata:
|
|
cmd.append("--add-metadata")
|
|
if want_embed_chapters:
|
|
cmd.append("--embed-chapters")
|
|
if ytdl_options.get("writesubtitles"):
|
|
cmd.append("--write-sub")
|
|
cmd.append("--write-auto-sub")
|
|
cmd.extend(["--sub-format", "vtt"])
|
|
if ytdl_options.get("force_keyframes_at_cuts"):
|
|
cmd.append("--force-keyframes-at-cuts")
|
|
cmd.extend(["-o", section_outtmpl])
|
|
if ytdl_options.get("cookiefile"):
|
|
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
|
|
cmd.extend(["--cookies", cookies_path])
|
|
if ytdl_options.get("noplaylist"):
|
|
cmd.append("--no-playlist")
|
|
|
|
cmd.extend(["--download-sections", section])
|
|
cmd.append(url)
|
|
if not quiet:
|
|
debug(f"Running yt-dlp for section: {section}")
|
|
|
|
progress_end_pct = min(display_pct + 45, 98)
|
|
simulator = _SectionProgressSimulator(display_pct, progress_end_pct)
|
|
simulator.start()
|
|
try:
|
|
if quiet:
|
|
subprocess.run(cmd, check=True, capture_output=True, text=True)
|
|
else:
|
|
subprocess.run(cmd, check=True)
|
|
except subprocess.CalledProcessError as exc:
|
|
stderr_text = exc.stderr or ""
|
|
tail = "\n".join(stderr_text.splitlines()[-12:]).strip()
|
|
details = f"\n{tail}" if tail else ""
|
|
raise DownloadError(f"yt-dlp failed for section {section} (exit {exc.returncode}){details}") from exc
|
|
except Exception as exc:
|
|
raise DownloadError(f"yt-dlp failed for section {section}: {exc}") from exc
|
|
finally:
|
|
simulator.stop()
|
|
finally:
|
|
pipeline.clear_status()
|
|
|
|
try:
|
|
_set_pipe_percent(99)
|
|
except Exception:
|
|
pass
|
|
|
|
return session_id, first_section_info or {}
|
|
|
|
|
|
def _iter_download_entries(info: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
|
|
queue: List[Dict[str, Any]] = [info]
|
|
seen: set[int] = set()
|
|
while queue:
|
|
current = queue.pop(0)
|
|
obj_id = id(current)
|
|
if obj_id in seen:
|
|
continue
|
|
seen.add(obj_id)
|
|
entries = current.get("entries")
|
|
if isinstance(entries, list):
|
|
for entry in entries:
|
|
queue.append(entry)
|
|
if current.get("requested_downloads") or not entries:
|
|
yield current
|
|
|
|
|
|
def _candidate_paths(entry: Dict[str, Any], output_dir: Path) -> Iterator[Path]:
|
|
requested = entry.get("requested_downloads")
|
|
if isinstance(requested, list):
|
|
for item in requested:
|
|
if isinstance(item, dict):
|
|
fp = item.get("filepath") or item.get("_filename")
|
|
if fp:
|
|
yield Path(fp)
|
|
for key in ("filepath", "_filename", "filename"):
|
|
value = entry.get(key)
|
|
if value:
|
|
yield Path(value)
|
|
if entry.get("filename"):
|
|
yield output_dir / entry["filename"]
|
|
|
|
|
|
def _resolve_entry_and_path(info: Dict[str, Any], output_dir: Path) -> tuple[Dict[str, Any], Path]:
|
|
for entry in _iter_download_entries(info):
|
|
for candidate in _candidate_paths(entry, output_dir):
|
|
if candidate.is_file():
|
|
return entry, candidate
|
|
if not candidate.is_absolute():
|
|
maybe = output_dir / candidate
|
|
if maybe.is_file():
|
|
return entry, maybe
|
|
raise FileNotFoundError("yt-dlp did not report a downloaded media file")
|
|
|
|
|
|
def _resolve_entries_and_paths(info: Dict[str, Any], output_dir: Path) -> List[tuple[Dict[str, Any], Path]]:
|
|
resolved: List[tuple[Dict[str, Any], Path]] = []
|
|
seen: set[str] = set()
|
|
for entry in _iter_download_entries(info):
|
|
chosen: Optional[Path] = None
|
|
for candidate in _candidate_paths(entry, output_dir):
|
|
if candidate.is_file():
|
|
chosen = candidate
|
|
break
|
|
if not candidate.is_absolute():
|
|
maybe = output_dir / candidate
|
|
if maybe.is_file():
|
|
chosen = maybe
|
|
break
|
|
if chosen is None:
|
|
continue
|
|
key = str(chosen.resolve())
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
resolved.append((entry, chosen))
|
|
return resolved
|
|
|
|
|
|
def _extract_sha256(info: Dict[str, Any]) -> Optional[str]:
|
|
for payload in [info] + info.get("entries", []):
|
|
if not isinstance(payload, dict):
|
|
continue
|
|
hashes = payload.get("hashes")
|
|
if isinstance(hashes, dict):
|
|
for key in ("sha256", "sha-256", "sha_256"):
|
|
if key in hashes and isinstance(hashes[key], str) and hashes[key].strip():
|
|
return hashes[key].strip()
|
|
for key in ("sha256", "sha-256", "sha_256"):
|
|
value = payload.get(key)
|
|
if isinstance(value, str) and value.strip():
|
|
return value.strip()
|
|
return None
|
|
|
|
|
|
def _progress_callback(status: Dict[str, Any]) -> None:
|
|
label = _progress_label(status)
|
|
event = status.get("status")
|
|
downloaded = status.get("downloaded_bytes")
|
|
total = status.get("total_bytes") or status.get("total_bytes_estimate")
|
|
if event == "downloading":
|
|
_record_progress_activity()
|
|
|
|
pipeline = PipelineProgress(pipeline_context)
|
|
live_ui, _ = pipeline.ui_and_pipe_index()
|
|
use_live = live_ui is not None
|
|
|
|
def _total_bytes(value: Any) -> Optional[int]:
|
|
try:
|
|
if isinstance(value, (int, float)) and value > 0:
|
|
return int(value)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
if event == "downloading":
|
|
if use_live:
|
|
try:
|
|
if not _YTDLP_TRANSFER_STATE.get(label, {}).get("started"):
|
|
pipeline.begin_transfer(label=label, total=_total_bytes(total))
|
|
_YTDLP_TRANSFER_STATE[label] = {"started": True}
|
|
pipeline.update_transfer(
|
|
label=label,
|
|
completed=int(downloaded) if downloaded is not None else None,
|
|
total=_total_bytes(total),
|
|
)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
_YTDLP_PROGRESS_BAR.update(
|
|
downloaded=int(downloaded) if downloaded is not None else None,
|
|
total=int(total) if total is not None else None,
|
|
label=label,
|
|
file=sys.stderr,
|
|
)
|
|
elif event == "finished":
|
|
if use_live:
|
|
try:
|
|
if _YTDLP_TRANSFER_STATE.get(label, {}).get("started"):
|
|
pipeline.finish_transfer(label=label)
|
|
except Exception:
|
|
pass
|
|
_YTDLP_TRANSFER_STATE.pop(label, None)
|
|
else:
|
|
_YTDLP_PROGRESS_BAR.finish()
|
|
elif event in ("postprocessing", "processing"):
|
|
return
|
|
|
|
|
|
try:
|
|
from SYS.metadata import extract_ytdlp_tags
|
|
except ImportError:
|
|
extract_ytdlp_tags = None # type: ignore
|
|
|
|
|
|
def download_media(opts: DownloadOptions, *, config: Optional[Dict[str, Any]] = None, debug_logger: Optional[DebugLogger] = None) -> Any:
|
|
"""Download streaming media exclusively via yt-dlp.
|
|
|
|
Optional `config` dict may be provided so tool defaults (e.g., cookies, default
|
|
format) are applied when constructing the YtDlpTool instance.
|
|
"""
|
|
|
|
debug(f"[download_media] start: {opts.url}")
|
|
try:
|
|
netloc = urlparse(opts.url).netloc.lower()
|
|
except Exception:
|
|
netloc = ""
|
|
if "gofile.io" in netloc:
|
|
msg = "GoFile links are currently unsupported"
|
|
if not opts.quiet:
|
|
debug(msg)
|
|
if debug_logger is not None:
|
|
debug_logger.write_record("gofile-unsupported", {"url": opts.url})
|
|
raise DownloadError(msg)
|
|
|
|
ytdlp_supported = is_url_supported_by_ytdlp(opts.url)
|
|
if not ytdlp_supported:
|
|
msg = "URL not supported by yt-dlp; try download-file for manual downloads"
|
|
if not opts.quiet:
|
|
log(msg)
|
|
if debug_logger is not None:
|
|
debug_logger.write_record("ytdlp-unsupported", {"url": opts.url})
|
|
raise DownloadError(msg)
|
|
|
|
if opts.playlist_items:
|
|
debug(
|
|
f"Skipping probe for playlist (item selection: {opts.playlist_items}), proceeding with download"
|
|
)
|
|
probe_result: Optional[Dict[str, Any]] = {"url": opts.url}
|
|
else:
|
|
probe_cookiefile = None
|
|
try:
|
|
if opts.cookies_path and opts.cookies_path.is_file():
|
|
probe_cookiefile = str(opts.cookies_path)
|
|
except Exception:
|
|
probe_cookiefile = None
|
|
|
|
probe_result = probe_url(opts.url, no_playlist=opts.no_playlist, timeout_seconds=15, cookiefile=probe_cookiefile)
|
|
|
|
if probe_result is None:
|
|
msg = "yt-dlp could not detect media for this URL; use download-file for direct downloads"
|
|
if not opts.quiet:
|
|
log(msg)
|
|
if debug_logger is not None:
|
|
debug_logger.write_record("ytdlp-skip-no-media", {"url": opts.url})
|
|
raise DownloadError(msg)
|
|
|
|
ensure_yt_dlp_ready()
|
|
|
|
# Use provided config when available so user tool settings are honored
|
|
ytdlp_tool = YtDlpTool(config or {})
|
|
ytdl_options = ytdlp_tool.build_ytdlp_options(opts)
|
|
hooks = ytdl_options.get("progress_hooks")
|
|
if not isinstance(hooks, list):
|
|
hooks = []
|
|
ytdl_options["progress_hooks"] = hooks
|
|
if _progress_callback not in hooks:
|
|
hooks.append(_progress_callback)
|
|
if not opts.quiet:
|
|
debug(f"Starting yt-dlp download: {opts.url}")
|
|
if debug_logger is not None:
|
|
debug_logger.write_record("ytdlp-start", {"url": opts.url})
|
|
|
|
assert yt_dlp is not None
|
|
try:
|
|
if not opts.quiet:
|
|
if ytdl_options.get("download_sections"):
|
|
debug(f"[yt-dlp] download_sections: {ytdl_options['download_sections']}")
|
|
debug(f"[yt-dlp] force_keyframes_at_cuts: {ytdl_options.get('force_keyframes_at_cuts', False)}")
|
|
|
|
session_id = None
|
|
first_section_info: Dict[str, Any] = {}
|
|
if ytdl_options.get("download_sections"):
|
|
live_ui, _ = PipelineProgress(pipeline_context).ui_and_pipe_index()
|
|
quiet_sections = bool(opts.quiet) or (live_ui is not None)
|
|
session_id, first_section_info = _download_with_sections_via_cli(
|
|
opts.url,
|
|
ytdl_options,
|
|
ytdl_options.get("download_sections", []),
|
|
quiet=quiet_sections,
|
|
)
|
|
info = None
|
|
else:
|
|
with yt_dlp.YoutubeDL(ytdl_options) as ydl: # type: ignore[arg-type]
|
|
info = ydl.extract_info(opts.url, download=True)
|
|
except Exception as exc:
|
|
log(f"yt-dlp failed: {exc}", file=sys.stderr)
|
|
if debug_logger is not None:
|
|
debug_logger.write_record(
|
|
"exception",
|
|
{"phase": "yt-dlp", "error": str(exc), "traceback": traceback.format_exc()},
|
|
)
|
|
raise DownloadError("yt-dlp download failed") from exc
|
|
|
|
if info is None:
|
|
try:
|
|
time.sleep(0.5)
|
|
files = sorted(opts.output_dir.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True)
|
|
if not files:
|
|
raise FileNotFoundError(f"No files found in {opts.output_dir}")
|
|
|
|
if opts.clip_sections and session_id:
|
|
section_pattern = re.compile(rf"^{re.escape(session_id)}_(\d+)")
|
|
matching_files = [f for f in files if section_pattern.search(f.name)]
|
|
|
|
if matching_files:
|
|
def extract_section_num(path: Path) -> int:
|
|
match = section_pattern.search(path.name)
|
|
return int(match.group(1)) if match else 999
|
|
|
|
matching_files.sort(key=extract_section_num)
|
|
debug(f"Found {len(matching_files)} section file(s) matching pattern")
|
|
|
|
by_index: Dict[int, List[Path]] = {}
|
|
for f in matching_files:
|
|
m = section_pattern.search(f.name)
|
|
if not m:
|
|
continue
|
|
try:
|
|
n = int(m.group(1))
|
|
except Exception:
|
|
continue
|
|
by_index.setdefault(n, []).append(f)
|
|
|
|
renamed_media_files: List[Path] = []
|
|
|
|
for sec_num in sorted(by_index.keys()):
|
|
group = by_index.get(sec_num) or []
|
|
if not group:
|
|
continue
|
|
|
|
def _is_subtitle(p: Path) -> bool:
|
|
try:
|
|
return p.suffix.lower() in _SUBTITLE_EXTS
|
|
except Exception:
|
|
return False
|
|
|
|
media_candidates = [p for p in group if not _is_subtitle(p)]
|
|
subtitle_candidates = [p for p in group if _is_subtitle(p)]
|
|
|
|
media_file: Optional[Path] = None
|
|
for cand in media_candidates:
|
|
try:
|
|
if cand.suffix.lower() in {".json", ".info.json"}:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
media_file = cand
|
|
break
|
|
if media_file is None and media_candidates:
|
|
media_file = media_candidates[0]
|
|
if media_file is None:
|
|
continue
|
|
|
|
try:
|
|
media_hash = sha256_file(media_file)
|
|
except Exception as exc:
|
|
debug(f"Failed to hash section media file {media_file.name}: {exc}")
|
|
renamed_media_files.append(media_file)
|
|
continue
|
|
|
|
prefix = f"{session_id}_{sec_num}"
|
|
|
|
def _tail(name: str) -> str:
|
|
try:
|
|
if name.startswith(prefix):
|
|
return name[len(prefix):]
|
|
except Exception:
|
|
pass
|
|
try:
|
|
return Path(name).suffix
|
|
except Exception:
|
|
return ""
|
|
|
|
try:
|
|
new_media_name = f"{media_hash}{_tail(media_file.name)}"
|
|
new_media_path = opts.output_dir / new_media_name
|
|
if new_media_path.exists() and new_media_path != media_file:
|
|
debug(f"File with hash {media_hash} already exists, using existing file.")
|
|
try:
|
|
media_file.unlink()
|
|
except OSError:
|
|
pass
|
|
else:
|
|
media_file.rename(new_media_path)
|
|
debug(f"Renamed section file: {media_file.name} -> {new_media_name}")
|
|
renamed_media_files.append(new_media_path)
|
|
except Exception as exc:
|
|
debug(f"Failed to rename section media file {media_file.name}: {exc}")
|
|
renamed_media_files.append(media_file)
|
|
new_media_path = media_file
|
|
|
|
for sub_file in subtitle_candidates:
|
|
try:
|
|
new_sub_name = f"{media_hash}{_tail(sub_file.name)}"
|
|
new_sub_path = opts.output_dir / new_sub_name
|
|
if new_sub_path.exists() and new_sub_path != sub_file:
|
|
try:
|
|
sub_file.unlink()
|
|
except OSError:
|
|
pass
|
|
else:
|
|
sub_file.rename(new_sub_path)
|
|
debug(f"Renamed section file: {sub_file.name} -> {new_sub_name}")
|
|
except Exception as exc:
|
|
debug(f"Failed to rename section subtitle file {sub_file.name}: {exc}")
|
|
|
|
media_path = renamed_media_files[0] if renamed_media_files else matching_files[0]
|
|
media_paths = renamed_media_files if renamed_media_files else None
|
|
if not opts.quiet:
|
|
count = len(media_paths) if isinstance(media_paths, list) else 1
|
|
debug(f"✓ Downloaded {count} section media file(s) (session: {session_id})")
|
|
else:
|
|
media_path = files[0]
|
|
media_paths = None
|
|
if not opts.quiet:
|
|
debug(f"✓ Downloaded section file (pattern not found): {media_path.name}")
|
|
else:
|
|
media_path = files[0]
|
|
media_paths = None
|
|
|
|
if not opts.quiet:
|
|
debug(f"✓ Downloaded: {media_path.name}")
|
|
if debug_logger is not None:
|
|
debug_logger.write_record("ytdlp-file-found", {"path": str(media_path)})
|
|
except Exception as exc:
|
|
log(f"Error finding downloaded file: {exc}", file=sys.stderr)
|
|
if debug_logger is not None:
|
|
debug_logger.write_record("exception", {"phase": "find-file", "error": str(exc)})
|
|
raise DownloadError(str(exc)) from exc
|
|
|
|
file_hash = sha256_file(media_path)
|
|
section_tags: List[str] = []
|
|
title = ""
|
|
if first_section_info:
|
|
title = first_section_info.get("title", "")
|
|
if title:
|
|
section_tags.append(f"title:{title}")
|
|
debug(f"Added title tag for section download: {title}")
|
|
|
|
if first_section_info:
|
|
info_dict_sec = first_section_info
|
|
else:
|
|
info_dict_sec = {"id": media_path.stem, "title": title or media_path.stem, "ext": media_path.suffix.lstrip(".")}
|
|
|
|
return DownloadMediaResult(path=media_path, info=info_dict_sec, tag=section_tags, source_url=opts.url, hash_value=file_hash, paths=media_paths)
|
|
|
|
if not isinstance(info, dict):
|
|
log(f"Unexpected yt-dlp response: {type(info)}", file=sys.stderr)
|
|
raise DownloadError("Unexpected yt-dlp response type")
|
|
|
|
info_dict: Dict[str, Any] = cast(Dict[str, Any], info)
|
|
if debug_logger is not None:
|
|
debug_logger.write_record("ytdlp-info", {"keys": sorted(info_dict.keys()), "is_playlist": bool(info_dict.get("entries"))})
|
|
|
|
if info_dict.get("entries") and not opts.no_playlist:
|
|
resolved = _resolve_entries_and_paths(info_dict, opts.output_dir)
|
|
if resolved:
|
|
results: List[DownloadMediaResult] = []
|
|
for entry, media_path in resolved:
|
|
hash_value = _extract_sha256(entry) or _extract_sha256(info_dict)
|
|
if not hash_value:
|
|
try:
|
|
hash_value = sha256_file(media_path)
|
|
except OSError:
|
|
hash_value = None
|
|
|
|
tags: List[str] = []
|
|
if extract_ytdlp_tags is not None:
|
|
try:
|
|
tags = extract_ytdlp_tags(entry)
|
|
except Exception as exc:
|
|
log(f"Error extracting tags: {exc}", file=sys.stderr)
|
|
|
|
source_url = entry.get("webpage_url") or entry.get("original_url") or entry.get("url") or opts.url
|
|
|
|
results.append(
|
|
DownloadMediaResult(
|
|
path=media_path,
|
|
info=entry,
|
|
tag=tags,
|
|
source_url=source_url,
|
|
hash_value=hash_value,
|
|
)
|
|
)
|
|
|
|
if not opts.quiet:
|
|
debug(f"✓ Downloaded playlist items: {len(results)}")
|
|
return results
|
|
|
|
try:
|
|
entry, media_path = _resolve_entry_and_path(info_dict, opts.output_dir)
|
|
except FileNotFoundError as exc:
|
|
log(f"Error: {exc}", file=sys.stderr)
|
|
if debug_logger is not None:
|
|
debug_logger.write_record("exception", {"phase": "resolve-path", "error": str(exc)})
|
|
raise DownloadError(str(exc)) from exc
|
|
|
|
if debug_logger is not None:
|
|
debug_logger.write_record("resolved-media", {"path": str(media_path), "entry_keys": sorted(entry.keys())})
|
|
|
|
hash_value = _extract_sha256(entry) or _extract_sha256(info_dict)
|
|
if not hash_value:
|
|
try:
|
|
hash_value = sha256_file(media_path)
|
|
except OSError as exc:
|
|
if debug_logger is not None:
|
|
debug_logger.write_record("hash-error", {"path": str(media_path), "error": str(exc)})
|
|
|
|
tags_res: List[str] = []
|
|
if extract_ytdlp_tags is not None:
|
|
try:
|
|
tags_res = extract_ytdlp_tags(entry)
|
|
except Exception as exc:
|
|
log(f"Error extracting tags: {exc}", file=sys.stderr)
|
|
|
|
source_url = entry.get("webpage_url") or entry.get("original_url") or entry.get("url")
|
|
|
|
if not opts.quiet:
|
|
debug(f"✓ Downloaded: {media_path.name} ({len(tags_res)} tags)")
|
|
if debug_logger is not None:
|
|
debug_logger.write_record(
|
|
"downloaded",
|
|
{
|
|
"path": str(media_path),
|
|
"tag_count": len(tags_res),
|
|
"source_url": source_url,
|
|
"sha256": hash_value,
|
|
},
|
|
)
|
|
|
|
return DownloadMediaResult(path=media_path, info=entry, tag=tags_res, source_url=source_url, hash_value=hash_value)
|
|
|
|
|
|
def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300, config: Optional[Dict[str, Any]] = None) -> Any:
|
|
import threading
|
|
from typing import cast
|
|
|
|
result_container: List[Optional[Any]] = [None, None]
|
|
|
|
def _do_download() -> None:
|
|
try:
|
|
result_container[0] = download_media(opts, config=config)
|
|
except Exception as exc:
|
|
result_container[1] = exc
|
|
|
|
# Use daemon=True so a hung download doesn't block process exit if the wall timeout hits.
|
|
thread = threading.Thread(target=_do_download, daemon=True)
|
|
thread.start()
|
|
start_time = time.monotonic()
|
|
|
|
# We use two timeouts:
|
|
# 1. Activity timeout (no progress updates for X seconds)
|
|
# 2. Hard wall-clock timeout (total time for this URL)
|
|
# The wall-clock timeout is slightly larger than the activity timeout
|
|
# to allow for slow-but-steady progress, up to a hard cap (e.g. 10 minutes).
|
|
wall_timeout = max(timeout_seconds * 2, 600)
|
|
|
|
_record_progress_activity(start_time)
|
|
try:
|
|
while thread.is_alive():
|
|
thread.join(1)
|
|
if not thread.is_alive():
|
|
break
|
|
|
|
now = time.monotonic()
|
|
|
|
# Check activity timeout
|
|
last_activity = _get_last_progress_activity()
|
|
if last_activity <= 0:
|
|
last_activity = start_time
|
|
if now - last_activity > timeout_seconds:
|
|
raise DownloadError(f"Download activity timeout after {timeout_seconds} seconds for {opts.url}")
|
|
|
|
# Check hard wall-clock timeout
|
|
if now - start_time > wall_timeout:
|
|
raise DownloadError(f"Download hard timeout after {wall_timeout} seconds for {opts.url}")
|
|
finally:
|
|
_clear_progress_activity()
|
|
|
|
if result_container[1] is not None:
|
|
raise cast(Exception, result_container[1])
|
|
|
|
if result_container[0] is None:
|
|
raise DownloadError(f"Download failed for {opts.url}")
|
|
|
|
return cast(Any, result_container[0])
|