2025-12-16 23:23:43 -08:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2026-01-01 20:37:27 -08:00
|
|
|
import hashlib
|
|
|
|
|
import json
|
2025-12-20 23:57:44 -08:00
|
|
|
import os
|
2026-01-01 20:37:27 -08:00
|
|
|
import random
|
|
|
|
|
import re
|
|
|
|
|
import string
|
|
|
|
|
import subprocess
|
|
|
|
|
import sys
|
|
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
import traceback
|
|
|
|
|
from contextlib import AbstractContextManager, nullcontext
|
2025-12-16 23:23:43 -08:00
|
|
|
from dataclasses import dataclass
|
|
|
|
|
from pathlib import Path
|
2026-01-01 20:37:27 -08:00
|
|
|
from typing import Any, Dict, Iterator, List, Optional, Sequence, cast
|
|
|
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
|
|
|
|
from SYS import pipeline as pipeline_context
|
|
|
|
|
from SYS.logger import debug, log
|
|
|
|
|
from SYS.models import (
|
|
|
|
|
DebugLogger,
|
|
|
|
|
DownloadError,
|
|
|
|
|
DownloadMediaResult,
|
|
|
|
|
DownloadOptions,
|
|
|
|
|
ProgressBar,
|
|
|
|
|
)
|
|
|
|
|
from SYS.pipeline_progress import PipelineProgress
|
|
|
|
|
from SYS.utils import ensure_directory, sha256_file
|
2025-12-16 23:23:43 -08:00
|
|
|
|
2026-01-05 07:51:19 -08:00
|
|
|
_YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {}
|
|
|
|
|
|
|
|
|
|
|
2026-01-01 20:37:27 -08:00
|
|
|
try:
|
|
|
|
|
import yt_dlp # type: ignore
|
|
|
|
|
from yt_dlp.extractor import gen_extractors # type: ignore
|
|
|
|
|
except Exception as exc: # pragma: no cover - handled at runtime
|
|
|
|
|
yt_dlp = None # type: ignore
|
|
|
|
|
gen_extractors = None # type: ignore
|
|
|
|
|
YTDLP_IMPORT_ERROR = exc
|
|
|
|
|
else:
|
|
|
|
|
YTDLP_IMPORT_ERROR = None
|
|
|
|
|
|
|
|
|
|
_EXTRACTOR_CACHE: List[Any] | None = None
|
2025-12-16 23:23:43 -08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_nested(config: Dict[str, Any], *path: str) -> Any:
|
|
|
|
|
cur: Any = config
|
|
|
|
|
for key in path:
|
|
|
|
|
if not isinstance(cur, dict):
|
|
|
|
|
return None
|
|
|
|
|
cur = cur.get(key)
|
|
|
|
|
return cur
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_csv_list(value: Any) -> Optional[List[str]]:
|
|
|
|
|
if value is None:
|
|
|
|
|
return None
|
|
|
|
|
if isinstance(value, list):
|
|
|
|
|
out: List[str] = []
|
|
|
|
|
for item in value:
|
|
|
|
|
s = str(item).strip()
|
|
|
|
|
if s:
|
|
|
|
|
out.append(s)
|
|
|
|
|
return out or None
|
|
|
|
|
s = str(value).strip()
|
|
|
|
|
if not s:
|
|
|
|
|
return None
|
|
|
|
|
# allow either JSON-ish list strings or simple comma-separated values
|
|
|
|
|
if s.startswith("[") and s.endswith("]"):
|
|
|
|
|
s = s[1:-1]
|
|
|
|
|
parts = [p.strip() for p in s.split(",")]
|
|
|
|
|
parts = [p for p in parts if p]
|
|
|
|
|
return parts or None
|
|
|
|
|
|
|
|
|
|
|
2026-01-01 20:37:27 -08:00
|
|
|
def ensure_yt_dlp_ready() -> None:
|
|
|
|
|
"""Verify yt-dlp is importable, raising DownloadError if missing."""
|
|
|
|
|
|
|
|
|
|
if yt_dlp is not None:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
detail = str(YTDLP_IMPORT_ERROR or "yt-dlp is not installed")
|
|
|
|
|
raise DownloadError(f"yt-dlp module not available: {detail}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_extractors() -> List[Any]:
|
|
|
|
|
global _EXTRACTOR_CACHE
|
|
|
|
|
|
|
|
|
|
if _EXTRACTOR_CACHE is not None:
|
|
|
|
|
return _EXTRACTOR_CACHE
|
|
|
|
|
|
|
|
|
|
ensure_yt_dlp_ready()
|
|
|
|
|
|
|
|
|
|
if gen_extractors is None:
|
|
|
|
|
_EXTRACTOR_CACHE = []
|
|
|
|
|
return _EXTRACTOR_CACHE
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
_EXTRACTOR_CACHE = [ie for ie in gen_extractors()]
|
|
|
|
|
except Exception:
|
|
|
|
|
_EXTRACTOR_CACHE = []
|
|
|
|
|
|
|
|
|
|
return _EXTRACTOR_CACHE
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_url_supported_by_ytdlp(url: str) -> bool:
|
|
|
|
|
"""Return True if yt-dlp has a non-generic extractor for the URL."""
|
|
|
|
|
|
|
|
|
|
if not url or not isinstance(url, str):
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
if YTDLP_IMPORT_ERROR is not None:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
parsed = urlparse(url)
|
|
|
|
|
if not parsed.scheme or not parsed.netloc:
|
|
|
|
|
return False
|
|
|
|
|
except Exception:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
for extractor in _get_extractors():
|
|
|
|
|
try:
|
|
|
|
|
if not extractor.suitable(url):
|
|
|
|
|
continue
|
|
|
|
|
except Exception:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
name = getattr(extractor, "IE_NAME", "").lower()
|
|
|
|
|
if name == "generic":
|
|
|
|
|
continue
|
|
|
|
|
return True
|
|
|
|
|
except Exception:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_formats(
|
|
|
|
|
url: str,
|
|
|
|
|
*,
|
|
|
|
|
no_playlist: bool = False,
|
|
|
|
|
playlist_items: Optional[str] = None,
|
|
|
|
|
cookiefile: Optional[str] = None,
|
|
|
|
|
) -> Optional[List[Dict[str, Any]]]:
|
|
|
|
|
"""Get available formats for a URL.
|
|
|
|
|
|
|
|
|
|
Returns a list of format dicts or None if unsupported or probing fails.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if not is_url_supported_by_ytdlp(url):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
ensure_yt_dlp_ready()
|
|
|
|
|
assert yt_dlp is not None
|
|
|
|
|
|
|
|
|
|
ydl_opts: Dict[str, Any] = {
|
|
|
|
|
"quiet": True,
|
|
|
|
|
"no_warnings": True,
|
|
|
|
|
"skip_download": True,
|
|
|
|
|
"noprogress": True,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if cookiefile:
|
|
|
|
|
ydl_opts["cookiefile"] = str(cookiefile)
|
|
|
|
|
if no_playlist:
|
|
|
|
|
ydl_opts["noplaylist"] = True
|
|
|
|
|
if playlist_items:
|
|
|
|
|
ydl_opts["playlist_items"] = str(playlist_items)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
|
|
|
|
|
info = ydl.extract_info(url, download=False)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
debug(f"yt-dlp format probe failed for {url}: {exc}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
if not isinstance(info, dict):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
formats = info.get("formats")
|
|
|
|
|
if not isinstance(formats, list):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
out: List[Dict[str, Any]] = []
|
|
|
|
|
for fmt in formats:
|
|
|
|
|
if isinstance(fmt, dict):
|
|
|
|
|
out.append(fmt)
|
|
|
|
|
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def probe_url(
|
|
|
|
|
url: str,
|
|
|
|
|
no_playlist: bool = False,
|
|
|
|
|
timeout_seconds: int = 15,
|
|
|
|
|
*,
|
|
|
|
|
cookiefile: Optional[str] = None,
|
|
|
|
|
) -> Optional[Dict[str, Any]]:
|
|
|
|
|
"""Probe URL metadata without downloading.
|
|
|
|
|
|
|
|
|
|
Returns None if unsupported, errors, or times out.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if not is_url_supported_by_ytdlp(url):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
result_container: List[Optional[Any]] = [None, None] # [result, error]
|
|
|
|
|
|
|
|
|
|
def _do_probe() -> None:
|
|
|
|
|
try:
|
|
|
|
|
ensure_yt_dlp_ready()
|
|
|
|
|
|
|
|
|
|
assert yt_dlp is not None
|
|
|
|
|
ydl_opts: Dict[str, Any] = {
|
|
|
|
|
"quiet": True,
|
|
|
|
|
"no_warnings": True,
|
|
|
|
|
"socket_timeout": 10,
|
|
|
|
|
"retries": 2,
|
|
|
|
|
"skip_download": True,
|
|
|
|
|
"extract_flat": "in_playlist",
|
|
|
|
|
"noprogress": True,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if cookiefile:
|
|
|
|
|
ydl_opts["cookiefile"] = str(cookiefile)
|
|
|
|
|
if no_playlist:
|
|
|
|
|
ydl_opts["noplaylist"] = True
|
|
|
|
|
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
|
|
|
|
|
info = ydl.extract_info(url, download=False)
|
|
|
|
|
|
|
|
|
|
if not isinstance(info, dict):
|
|
|
|
|
result_container[0] = None
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
webpage_url = info.get("webpage_url") or info.get("original_url") or info.get("url")
|
|
|
|
|
|
|
|
|
|
result_container[0] = {
|
|
|
|
|
"extractor": info.get("extractor", ""),
|
|
|
|
|
"title": info.get("title", ""),
|
|
|
|
|
"entries": info.get("entries", []),
|
|
|
|
|
"duration": info.get("duration"),
|
|
|
|
|
"uploader": info.get("uploader"),
|
|
|
|
|
"description": info.get("description"),
|
|
|
|
|
"requested_url": url,
|
|
|
|
|
"webpage_url": webpage_url,
|
|
|
|
|
"url": webpage_url or url,
|
|
|
|
|
}
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
debug(f"Probe error for {url}: {exc}")
|
|
|
|
|
result_container[1] = exc
|
|
|
|
|
|
|
|
|
|
thread = threading.Thread(target=_do_probe, daemon=False)
|
|
|
|
|
thread.start()
|
|
|
|
|
thread.join(timeout=timeout_seconds)
|
|
|
|
|
|
|
|
|
|
if thread.is_alive():
|
|
|
|
|
debug(f"Probe timeout for {url} (>={timeout_seconds}s), proceeding without probe")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
if result_container[1] is not None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
return cast(Optional[Dict[str, Any]], result_container[0])
|
|
|
|
|
|
|
|
|
|
|
2026-01-10 17:30:18 -08:00
|
|
|
def is_browseable_format(fmt: Any) -> bool:
|
|
|
|
|
"""Check if a format is user-browseable (not storyboard, metadata, etc).
|
|
|
|
|
|
|
|
|
|
Used by the ytdlp format selector to filter out non-downloadable formats.
|
|
|
|
|
Returns False for:
|
|
|
|
|
- MHTML, JSON sidecar metadata
|
|
|
|
|
- Storyboard/thumbnail formats
|
|
|
|
|
- Audio-only or video-only when both available
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
fmt: Format dict from yt-dlp with keys like format_id, ext, vcodec, acodec, format_note
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: True if format is suitable for browsing/selection
|
|
|
|
|
"""
|
|
|
|
|
if not isinstance(fmt, dict):
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
format_id = str(fmt.get("format_id") or "").strip()
|
|
|
|
|
if not format_id:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Filter out metadata/sidecar formats
|
|
|
|
|
ext = str(fmt.get("ext") or "").strip().lower()
|
|
|
|
|
if ext in {"mhtml", "json"}:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Filter out storyboard/thumbnail formats
|
|
|
|
|
note = str(fmt.get("format_note") or "").lower()
|
|
|
|
|
if "storyboard" in note:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
if format_id.lower().startswith("sb"):
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Filter out formats with no audio and no video
|
|
|
|
|
vcodec = str(fmt.get("vcodec", "none"))
|
|
|
|
|
acodec = str(fmt.get("acodec", "none"))
|
|
|
|
|
return not (vcodec == "none" and acodec == "none")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_for_table_selection(
|
|
|
|
|
fmt: Dict[str, Any],
|
|
|
|
|
url: str,
|
|
|
|
|
index: int,
|
|
|
|
|
*,
|
|
|
|
|
selection_format_id: Optional[str] = None,
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
"""Format a yt-dlp format dict into a table result row for selection.
|
|
|
|
|
|
|
|
|
|
This helper formats a single format from list_formats() into the shape
|
|
|
|
|
expected by the ResultTable system, ready for user selection and routing
|
|
|
|
|
to download-file with -format argument.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
fmt: Format dict from yt-dlp
|
|
|
|
|
url: The URL this format came from
|
|
|
|
|
index: Row number for display (1-indexed)
|
|
|
|
|
selection_format_id: Override format_id for selection (e.g., with +ba suffix)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
dict: Format result row with _selection_args for table system
|
|
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
fmts = list_formats("https://youtube.com/watch?v=abc")
|
|
|
|
|
browseable = [f for f in fmts if is_browseable_format(f)]
|
|
|
|
|
results = [format_for_table_selection(f, url, i+1) for i, f in enumerate(browseable)]
|
|
|
|
|
"""
|
|
|
|
|
format_id = fmt.get("format_id", "")
|
|
|
|
|
resolution = fmt.get("resolution", "")
|
|
|
|
|
ext = fmt.get("ext", "")
|
|
|
|
|
vcodec = fmt.get("vcodec", "none")
|
|
|
|
|
acodec = fmt.get("acodec", "none")
|
|
|
|
|
filesize = fmt.get("filesize")
|
|
|
|
|
filesize_approx = fmt.get("filesize_approx")
|
|
|
|
|
|
|
|
|
|
# If not provided, compute selection format ID (add +ba for video-only)
|
|
|
|
|
if selection_format_id is None:
|
|
|
|
|
selection_format_id = format_id
|
|
|
|
|
try:
|
|
|
|
|
if vcodec != "none" and acodec == "none" and format_id:
|
|
|
|
|
selection_format_id = f"{format_id}+ba"
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# Format file size
|
|
|
|
|
size_str = ""
|
|
|
|
|
size_prefix = ""
|
|
|
|
|
size_bytes = filesize or filesize_approx
|
|
|
|
|
try:
|
|
|
|
|
if isinstance(size_bytes, (int, float)) and size_bytes > 0:
|
|
|
|
|
size_mb = float(size_bytes) / (1024 * 1024)
|
|
|
|
|
size_str = f"{size_prefix}{size_mb:.1f}MB"
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# Build description
|
|
|
|
|
desc_parts: List[str] = []
|
|
|
|
|
if resolution and resolution != "audio only":
|
|
|
|
|
desc_parts.append(resolution)
|
|
|
|
|
if ext:
|
|
|
|
|
desc_parts.append(str(ext).upper())
|
|
|
|
|
if vcodec != "none":
|
|
|
|
|
desc_parts.append(f"v:{vcodec}")
|
|
|
|
|
if acodec != "none":
|
|
|
|
|
desc_parts.append(f"a:{acodec}")
|
|
|
|
|
if size_str:
|
|
|
|
|
desc_parts.append(size_str)
|
|
|
|
|
format_desc = " | ".join(desc_parts)
|
|
|
|
|
|
|
|
|
|
# Build table row
|
|
|
|
|
return {
|
|
|
|
|
"table": "download-file",
|
|
|
|
|
"title": f"Format {format_id}",
|
|
|
|
|
"url": url,
|
|
|
|
|
"target": url,
|
|
|
|
|
"detail": format_desc,
|
|
|
|
|
"annotations": [ext, resolution] if resolution else [ext],
|
|
|
|
|
"media_kind": "format",
|
|
|
|
|
"columns": [
|
|
|
|
|
("ID", format_id),
|
|
|
|
|
("Resolution", resolution or "N/A"),
|
|
|
|
|
("Ext", ext),
|
|
|
|
|
("Size", size_str or ""),
|
|
|
|
|
("Video", vcodec),
|
|
|
|
|
("Audio", acodec),
|
|
|
|
|
],
|
|
|
|
|
"full_metadata": {
|
|
|
|
|
"format_id": format_id,
|
|
|
|
|
"url": url,
|
|
|
|
|
"item_selector": selection_format_id,
|
|
|
|
|
"_selection_args": ["-format", selection_format_id],
|
|
|
|
|
},
|
|
|
|
|
"_selection_args": ["-format", selection_format_id],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2025-12-16 23:23:43 -08:00
|
|
|
@dataclass(slots=True)
|
|
|
|
|
class YtDlpDefaults:
|
|
|
|
|
"""User-tunable defaults for yt-dlp behavior.
|
|
|
|
|
|
|
|
|
|
Recommended config.conf keys (top-level dotted keys):
|
|
|
|
|
- ytdlp.video_format="bestvideo+bestaudio/best"
|
|
|
|
|
- ytdlp.audio_format="251/140/bestaudio"
|
|
|
|
|
- ytdlp.format_sort="res:2160,res:1440,res:1080,res:720,res"
|
|
|
|
|
|
|
|
|
|
Cookies:
|
|
|
|
|
- cookies="C:\\path\\cookies.txt" (already supported by config.resolve_cookies_path)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
video_format: str = "bestvideo+bestaudio/best"
|
|
|
|
|
audio_format: str = "251/140/bestaudio"
|
|
|
|
|
format_sort: Optional[List[str]] = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class YtDlpTool:
|
|
|
|
|
"""Centralizes yt-dlp defaults and translation helpers.
|
|
|
|
|
|
|
|
|
|
This is intentionally small and dependency-light so cmdlets can use it without
|
|
|
|
|
forcing a full refactor.
|
|
|
|
|
"""
|
|
|
|
|
|
2025-12-29 17:05:03 -08:00
|
|
|
def __init__(
|
2025-12-29 18:42:02 -08:00
|
|
|
self,
|
|
|
|
|
config: Optional[Dict[str,
|
|
|
|
|
Any]] = None,
|
|
|
|
|
*,
|
|
|
|
|
script_dir: Optional[Path] = None
|
2025-12-29 17:05:03 -08:00
|
|
|
) -> None:
|
2025-12-29 18:42:02 -08:00
|
|
|
self._config: Dict[str,
|
|
|
|
|
Any] = dict(config or {})
|
2025-12-16 23:23:43 -08:00
|
|
|
# `resolve_cookies_path` expects the app root so it can fall back to ./cookies.txt.
|
|
|
|
|
# This file lives under ./tool/, so default to the parent directory.
|
|
|
|
|
self._script_dir = script_dir or Path(__file__).resolve().parent.parent
|
|
|
|
|
self.defaults = self._load_defaults()
|
|
|
|
|
self._cookiefile: Optional[Path] = self._init_cookiefile()
|
|
|
|
|
|
|
|
|
|
def _init_cookiefile(self) -> Optional[Path]:
|
|
|
|
|
"""Resolve cookies once at tool init (yt-dlp is the primary consumer)."""
|
|
|
|
|
try:
|
2025-12-29 18:42:02 -08:00
|
|
|
from SYS.config import resolve_cookies_path
|
2025-12-16 23:23:43 -08:00
|
|
|
|
|
|
|
|
resolved = resolve_cookies_path(self._config, script_dir=self._script_dir)
|
|
|
|
|
if resolved is not None and resolved.is_file():
|
|
|
|
|
return resolved
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _load_defaults(self) -> YtDlpDefaults:
|
|
|
|
|
cfg = self._config
|
|
|
|
|
|
2025-12-21 05:10:09 -08:00
|
|
|
# NOTE: `YtDlpDefaults` is a slots dataclass. Referencing defaults via
|
|
|
|
|
# `YtDlpDefaults.video_format` yields a `member_descriptor`, not the
|
|
|
|
|
# default string value. Use an instance for fallback defaults.
|
|
|
|
|
_fallback_defaults = YtDlpDefaults()
|
|
|
|
|
|
2025-12-16 23:23:43 -08:00
|
|
|
tool_block = _get_nested(cfg, "tool", "ytdlp")
|
|
|
|
|
if not isinstance(tool_block, dict):
|
|
|
|
|
tool_block = {}
|
|
|
|
|
|
2025-12-29 18:42:02 -08:00
|
|
|
ytdlp_block = cfg.get("ytdlp") if isinstance(cfg.get("ytdlp"),
|
|
|
|
|
dict) else {}
|
2025-12-16 23:23:43 -08:00
|
|
|
if not isinstance(ytdlp_block, dict):
|
|
|
|
|
ytdlp_block = {}
|
|
|
|
|
|
|
|
|
|
# Accept both nested and flat styles.
|
|
|
|
|
video_format = (
|
2025-12-29 18:42:02 -08:00
|
|
|
tool_block.get("video_format") or tool_block.get("format")
|
|
|
|
|
or ytdlp_block.get("video_format") or ytdlp_block.get("video")
|
|
|
|
|
or ytdlp_block.get("format_video") or cfg.get("ytdlp_video_format")
|
2025-12-16 23:23:43 -08:00
|
|
|
)
|
|
|
|
|
audio_format = (
|
2025-12-29 18:42:02 -08:00
|
|
|
tool_block.get("audio_format") or ytdlp_block.get("audio_format")
|
|
|
|
|
or ytdlp_block.get("audio") or ytdlp_block.get("format_audio")
|
2025-12-16 23:23:43 -08:00
|
|
|
or cfg.get("ytdlp_audio_format")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Also accept dotted keys written as nested dicts: ytdlp.format.video, ytdlp.format.audio
|
|
|
|
|
nested_video = _get_nested(cfg, "ytdlp", "format", "video")
|
|
|
|
|
nested_audio = _get_nested(cfg, "ytdlp", "format", "audio")
|
|
|
|
|
|
|
|
|
|
fmt_sort_val = (
|
2025-12-29 18:42:02 -08:00
|
|
|
tool_block.get("format_sort") or ytdlp_block.get("format_sort")
|
|
|
|
|
or ytdlp_block.get("formatSort") or cfg.get("ytdlp_format_sort")
|
|
|
|
|
or _get_nested(cfg,
|
|
|
|
|
"ytdlp",
|
|
|
|
|
"format",
|
|
|
|
|
"sort")
|
2025-12-16 23:23:43 -08:00
|
|
|
)
|
|
|
|
|
fmt_sort = _parse_csv_list(fmt_sort_val)
|
|
|
|
|
|
|
|
|
|
defaults = YtDlpDefaults(
|
2025-12-29 18:42:02 -08:00
|
|
|
video_format=str(
|
|
|
|
|
nested_video or video_format or _fallback_defaults.video_format
|
|
|
|
|
),
|
|
|
|
|
audio_format=str(
|
|
|
|
|
nested_audio or audio_format or _fallback_defaults.audio_format
|
|
|
|
|
),
|
2025-12-16 23:23:43 -08:00
|
|
|
format_sort=fmt_sort,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return defaults
|
|
|
|
|
|
|
|
|
|
def resolve_cookiefile(self) -> Optional[Path]:
|
|
|
|
|
return self._cookiefile
|
|
|
|
|
|
|
|
|
|
def default_format(self, mode: str) -> str:
|
|
|
|
|
m = str(mode or "").lower().strip()
|
|
|
|
|
if m == "audio":
|
|
|
|
|
return self.defaults.audio_format
|
|
|
|
|
return self.defaults.video_format
|
|
|
|
|
|
2025-12-20 23:57:44 -08:00
|
|
|
def build_ytdlp_options(self, opts: DownloadOptions) -> Dict[str, Any]:
|
|
|
|
|
"""Translate DownloadOptions into yt-dlp API options."""
|
|
|
|
|
ensure_directory(opts.output_dir)
|
|
|
|
|
outtmpl = str((opts.output_dir / "%(title)s.%(ext)s").resolve())
|
2025-12-29 18:42:02 -08:00
|
|
|
base_options: Dict[str,
|
|
|
|
|
Any] = {
|
|
|
|
|
"outtmpl": outtmpl,
|
|
|
|
|
"quiet": True,
|
|
|
|
|
"no_warnings": True,
|
|
|
|
|
"noprogress": True,
|
|
|
|
|
"socket_timeout": 30,
|
|
|
|
|
"retries": 10,
|
|
|
|
|
"fragment_retries": 10,
|
|
|
|
|
"http_chunk_size": 10_485_760,
|
|
|
|
|
"restrictfilenames": True,
|
|
|
|
|
}
|
2025-12-20 23:57:44 -08:00
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
repo_root = Path(__file__).resolve().parents[1]
|
|
|
|
|
bundled_ffmpeg_dir = repo_root / "MPV" / "ffmpeg" / "bin"
|
|
|
|
|
if bundled_ffmpeg_dir.exists():
|
|
|
|
|
base_options.setdefault("ffmpeg_location", str(bundled_ffmpeg_dir))
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
if os.name == "nt":
|
|
|
|
|
base_options.setdefault("file_access_retries", 40)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
if opts.cookies_path and opts.cookies_path.is_file():
|
|
|
|
|
base_options["cookiefile"] = str(opts.cookies_path)
|
|
|
|
|
else:
|
|
|
|
|
cookiefile = self.resolve_cookiefile()
|
|
|
|
|
if cookiefile is not None and cookiefile.is_file():
|
|
|
|
|
base_options["cookiefile"] = str(cookiefile)
|
|
|
|
|
|
|
|
|
|
if opts.no_playlist:
|
|
|
|
|
base_options["noplaylist"] = True
|
|
|
|
|
|
|
|
|
|
fmt = opts.ytdl_format or self.default_format(opts.mode)
|
|
|
|
|
base_options["format"] = fmt
|
|
|
|
|
|
|
|
|
|
if opts.mode == "audio":
|
2025-12-29 18:42:02 -08:00
|
|
|
base_options["postprocessors"] = [{
|
|
|
|
|
"key": "FFmpegExtractAudio"
|
|
|
|
|
}]
|
2025-12-20 23:57:44 -08:00
|
|
|
else:
|
|
|
|
|
format_sort = self.defaults.format_sort or [
|
|
|
|
|
"res:4320",
|
|
|
|
|
"res:2880",
|
|
|
|
|
"res:2160",
|
|
|
|
|
"res:1440",
|
|
|
|
|
"res:1080",
|
|
|
|
|
"res:720",
|
|
|
|
|
"res",
|
|
|
|
|
]
|
|
|
|
|
base_options["format_sort"] = format_sort
|
|
|
|
|
|
|
|
|
|
if getattr(opts, "embed_chapters", False):
|
|
|
|
|
pps = base_options.get("postprocessors")
|
|
|
|
|
if not isinstance(pps, list):
|
|
|
|
|
pps = []
|
|
|
|
|
already_has_metadata = any(
|
2025-12-29 18:42:02 -08:00
|
|
|
isinstance(pp,
|
|
|
|
|
dict) and str(pp.get("key") or "") == "FFmpegMetadata"
|
|
|
|
|
for pp in pps
|
2025-12-20 23:57:44 -08:00
|
|
|
)
|
|
|
|
|
if not already_has_metadata:
|
|
|
|
|
pps.append(
|
|
|
|
|
{
|
|
|
|
|
"key": "FFmpegMetadata",
|
|
|
|
|
"add_metadata": True,
|
|
|
|
|
"add_chapters": True,
|
|
|
|
|
"add_infojson": "if_exists",
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
base_options["postprocessors"] = pps
|
|
|
|
|
|
|
|
|
|
if opts.mode != "audio":
|
|
|
|
|
base_options.setdefault("merge_output_format", "mkv")
|
|
|
|
|
|
|
|
|
|
if getattr(opts, "write_sub", False):
|
|
|
|
|
base_options["writesubtitles"] = True
|
|
|
|
|
base_options["writeautomaticsub"] = True
|
|
|
|
|
base_options["subtitlesformat"] = "vtt"
|
|
|
|
|
|
|
|
|
|
if opts.clip_sections:
|
|
|
|
|
sections: List[str] = []
|
|
|
|
|
|
|
|
|
|
def _secs_to_hms(seconds: float) -> str:
|
|
|
|
|
total = max(0, int(seconds))
|
|
|
|
|
minutes, secs = divmod(total, 60)
|
|
|
|
|
hours, minutes = divmod(minutes, 60)
|
|
|
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
|
|
|
|
|
|
|
|
|
for section_range in str(opts.clip_sections).split(","):
|
|
|
|
|
section_range = section_range.strip()
|
|
|
|
|
if not section_range:
|
|
|
|
|
continue
|
|
|
|
|
try:
|
|
|
|
|
start_s_raw, end_s_raw = section_range.split("-", 1)
|
|
|
|
|
start_s = float(start_s_raw.strip())
|
|
|
|
|
end_s = float(end_s_raw.strip())
|
|
|
|
|
if start_s >= end_s:
|
|
|
|
|
continue
|
|
|
|
|
sections.append(f"*{_secs_to_hms(start_s)}-{_secs_to_hms(end_s)}")
|
|
|
|
|
except (ValueError, AttributeError):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if sections:
|
|
|
|
|
base_options["download_sections"] = sections
|
2025-12-21 16:59:37 -08:00
|
|
|
# Clipped outputs should begin with a keyframe; otherwise players (notably mpv)
|
|
|
|
|
# can show audio before video or a black screen until the next keyframe.
|
|
|
|
|
# yt-dlp implements this by forcing keyframes at cut points.
|
|
|
|
|
base_options["force_keyframes_at_cuts"] = True
|
2025-12-20 23:57:44 -08:00
|
|
|
debug(f"Download sections configured: {', '.join(sections)}")
|
|
|
|
|
|
|
|
|
|
if opts.playlist_items:
|
|
|
|
|
base_options["playlist_items"] = opts.playlist_items
|
|
|
|
|
|
|
|
|
|
if not opts.quiet:
|
|
|
|
|
debug(f"yt-dlp: mode={opts.mode}, format={base_options.get('format')}")
|
|
|
|
|
|
|
|
|
|
return base_options
|
|
|
|
|
|
2025-12-16 23:23:43 -08:00
|
|
|
def build_yt_dlp_cli_args(
|
|
|
|
|
self,
|
|
|
|
|
*,
|
|
|
|
|
url: str,
|
|
|
|
|
output_dir: Optional[Path] = None,
|
|
|
|
|
ytdl_format: Optional[str] = None,
|
|
|
|
|
playlist_items: Optional[str] = None,
|
|
|
|
|
no_playlist: bool = False,
|
|
|
|
|
quiet: bool = True,
|
|
|
|
|
extra_args: Optional[Sequence[str]] = None,
|
|
|
|
|
) -> List[str]:
|
|
|
|
|
"""Build a yt-dlp command line (argv list).
|
|
|
|
|
|
|
|
|
|
This is primarily for debug output or subprocess execution.
|
|
|
|
|
"""
|
|
|
|
|
argv: List[str] = ["yt-dlp"]
|
|
|
|
|
if quiet:
|
|
|
|
|
argv.extend(["--quiet", "--no-warnings"])
|
|
|
|
|
argv.append("--no-progress")
|
|
|
|
|
|
|
|
|
|
cookiefile = self.resolve_cookiefile()
|
|
|
|
|
if cookiefile is not None:
|
|
|
|
|
argv.extend(["--cookies", str(cookiefile)])
|
|
|
|
|
|
|
|
|
|
if no_playlist:
|
|
|
|
|
argv.append("--no-playlist")
|
|
|
|
|
if playlist_items:
|
|
|
|
|
argv.extend(["--playlist-items", str(playlist_items)])
|
|
|
|
|
|
|
|
|
|
fmt = (ytdl_format or "").strip()
|
|
|
|
|
if fmt:
|
|
|
|
|
# Use long form to avoid confusion with app-level flags.
|
|
|
|
|
argv.extend(["--format", fmt])
|
|
|
|
|
|
|
|
|
|
if self.defaults.format_sort:
|
|
|
|
|
for sort_key in self.defaults.format_sort:
|
|
|
|
|
argv.extend(["-S", sort_key])
|
|
|
|
|
|
|
|
|
|
if output_dir is not None:
|
|
|
|
|
outtmpl = str((output_dir / "%(title)s.%(ext)s").resolve())
|
|
|
|
|
argv.extend(["-o", outtmpl])
|
|
|
|
|
|
|
|
|
|
if extra_args:
|
|
|
|
|
argv.extend([str(a) for a in extra_args if str(a).strip()])
|
|
|
|
|
|
|
|
|
|
argv.append(str(url))
|
|
|
|
|
return argv
|
|
|
|
|
|
|
|
|
|
def debug_print_cli(self, argv: Sequence[str]) -> None:
|
|
|
|
|
try:
|
|
|
|
|
debug("yt-dlp argv: " + " ".join(str(a) for a in argv))
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
2026-01-01 20:37:27 -08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# Progress + utility helpers for yt-dlp driven downloads (previously in cmdlet/download_media).
|
|
|
|
|
_YTDLP_PROGRESS_BAR = ProgressBar()
|
2026-01-05 07:51:19 -08:00
|
|
|
_YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {}
|
2026-01-01 20:37:27 -08:00
|
|
|
_SUBTITLE_EXTS = (".vtt", ".srt", ".ass", ".ssa", ".lrc")
|
|
|
|
|
|
|
|
|
|
|
2026-01-05 07:51:19 -08:00
|
|
|
def _progress_label(status: Dict[str, Any]) -> str:
|
|
|
|
|
info_dict = status.get("info_dict") if isinstance(status.get("info_dict"), dict) else {}
|
|
|
|
|
|
|
|
|
|
candidates = [
|
|
|
|
|
status.get("filename"),
|
|
|
|
|
info_dict.get("_filename"),
|
|
|
|
|
info_dict.get("filepath"),
|
|
|
|
|
info_dict.get("title"),
|
|
|
|
|
info_dict.get("id"),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
for cand in candidates:
|
|
|
|
|
if not cand:
|
|
|
|
|
continue
|
|
|
|
|
try:
|
|
|
|
|
name = Path(str(cand)).name
|
|
|
|
|
except Exception:
|
|
|
|
|
name = str(cand)
|
|
|
|
|
label = str(name or "").strip()
|
|
|
|
|
if label:
|
|
|
|
|
return label
|
|
|
|
|
|
|
|
|
|
return "download"
|
|
|
|
|
|
|
|
|
|
|
2026-01-01 20:37:27 -08:00
|
|
|
def _live_ui_and_pipe_index() -> tuple[Optional[Any], int]:
|
|
|
|
|
ui = None
|
|
|
|
|
try:
|
|
|
|
|
ui = pipeline_context.get_live_progress() if hasattr(pipeline_context, "get_live_progress") else None
|
|
|
|
|
except Exception:
|
|
|
|
|
ui = None
|
|
|
|
|
|
|
|
|
|
pipe_idx: int = 0
|
|
|
|
|
try:
|
|
|
|
|
stage_ctx = pipeline_context.get_stage_context() if hasattr(pipeline_context, "get_stage_context") else None
|
|
|
|
|
maybe_idx = getattr(stage_ctx, "pipe_index", None) if stage_ctx is not None else None
|
|
|
|
|
if isinstance(maybe_idx, int):
|
|
|
|
|
pipe_idx = int(maybe_idx)
|
|
|
|
|
except Exception:
|
|
|
|
|
pipe_idx = 0
|
|
|
|
|
|
|
|
|
|
return ui, pipe_idx
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _begin_live_steps(total_steps: int) -> None:
|
|
|
|
|
ui, pipe_idx = _live_ui_and_pipe_index()
|
|
|
|
|
if ui is None:
|
|
|
|
|
return
|
|
|
|
|
try:
|
|
|
|
|
begin = getattr(ui, "begin_pipe_steps", None)
|
|
|
|
|
if callable(begin):
|
|
|
|
|
begin(int(pipe_idx), total_steps=int(total_steps))
|
|
|
|
|
except Exception:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _step(text: str) -> None:
|
|
|
|
|
ui, pipe_idx = _live_ui_and_pipe_index()
|
|
|
|
|
if ui is None:
|
|
|
|
|
return
|
|
|
|
|
try:
|
|
|
|
|
adv = getattr(ui, "advance_pipe_step", None)
|
|
|
|
|
if callable(adv):
|
|
|
|
|
adv(int(pipe_idx), str(text))
|
|
|
|
|
except Exception:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _set_pipe_percent(percent: int) -> None:
|
|
|
|
|
ui, pipe_idx = _live_ui_and_pipe_index()
|
|
|
|
|
if ui is None:
|
|
|
|
|
return
|
|
|
|
|
try:
|
|
|
|
|
set_pct = getattr(ui, "set_pipe_percent", None)
|
|
|
|
|
if callable(set_pct):
|
|
|
|
|
set_pct(int(pipe_idx), int(percent))
|
|
|
|
|
except Exception:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _format_chapters_note(info: Dict[str, Any]) -> Optional[str]:
|
|
|
|
|
"""Format yt-dlp chapter metadata into a stable, note-friendly text."""
|
|
|
|
|
try:
|
|
|
|
|
chapters = info.get("chapters")
|
|
|
|
|
except Exception:
|
|
|
|
|
chapters = None
|
|
|
|
|
|
|
|
|
|
if not isinstance(chapters, list) or not chapters:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
rows: List[tuple[int, Optional[int], str]] = []
|
|
|
|
|
max_t = 0
|
|
|
|
|
for ch in chapters:
|
|
|
|
|
if not isinstance(ch, dict):
|
|
|
|
|
continue
|
|
|
|
|
start_raw = ch.get("start_time")
|
|
|
|
|
end_raw = ch.get("end_time")
|
|
|
|
|
title_raw = ch.get("title") or ch.get("name") or ch.get("chapter")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
if start_raw is None:
|
|
|
|
|
continue
|
|
|
|
|
start_s = int(float(start_raw))
|
|
|
|
|
except Exception:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
end_s: Optional[int] = None
|
|
|
|
|
try:
|
|
|
|
|
if end_raw is not None:
|
|
|
|
|
end_s = int(float(end_raw))
|
|
|
|
|
except Exception:
|
|
|
|
|
end_s = None
|
|
|
|
|
|
|
|
|
|
title = str(title_raw).strip() if title_raw is not None else ""
|
|
|
|
|
rows.append((start_s, end_s, title))
|
|
|
|
|
try:
|
|
|
|
|
max_t = max(max_t, start_s, end_s or 0)
|
|
|
|
|
except Exception:
|
|
|
|
|
max_t = max(max_t, start_s)
|
|
|
|
|
|
|
|
|
|
if not rows:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
force_hours = bool(max_t >= 3600)
|
|
|
|
|
|
|
|
|
|
def _tc(seconds: int) -> str:
|
|
|
|
|
total = max(0, int(seconds))
|
|
|
|
|
minutes, secs = divmod(total, 60)
|
|
|
|
|
hours, minutes = divmod(minutes, 60)
|
|
|
|
|
if force_hours:
|
|
|
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
|
|
|
|
return f"{minutes:02d}:{secs:02d}"
|
|
|
|
|
|
|
|
|
|
lines: List[str] = []
|
|
|
|
|
for start_s, end_s, title in sorted(
|
|
|
|
|
rows, key=lambda r: (r[0], r[1] if r[1] is not None else 10**9, r[2])
|
|
|
|
|
):
|
|
|
|
|
if end_s is not None and end_s > start_s:
|
|
|
|
|
prefix = f"{_tc(start_s)}-{_tc(end_s)}"
|
|
|
|
|
else:
|
|
|
|
|
prefix = _tc(start_s)
|
|
|
|
|
line = f"{prefix} {title}".strip()
|
|
|
|
|
if line:
|
|
|
|
|
lines.append(line)
|
|
|
|
|
|
|
|
|
|
text = "\n".join(lines).strip()
|
|
|
|
|
return text or None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _best_subtitle_sidecar(media_path: Path) -> Optional[Path]:
|
|
|
|
|
"""Find the most likely subtitle sidecar file for a downloaded media file."""
|
|
|
|
|
try:
|
|
|
|
|
base_dir = media_path.parent
|
|
|
|
|
stem = media_path.stem
|
|
|
|
|
if not stem:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
candidates: List[Path] = []
|
|
|
|
|
for p in base_dir.glob(stem + ".*"):
|
|
|
|
|
try:
|
|
|
|
|
if not p.is_file():
|
|
|
|
|
continue
|
|
|
|
|
except Exception:
|
|
|
|
|
continue
|
|
|
|
|
if p.suffix.lower() in _SUBTITLE_EXTS:
|
|
|
|
|
candidates.append(p)
|
|
|
|
|
|
|
|
|
|
preferred_order = [".vtt", ".srt", ".ass", ".ssa", ".lrc"]
|
|
|
|
|
for ext in preferred_order:
|
|
|
|
|
for p in candidates:
|
|
|
|
|
if p.suffix.lower() == ext:
|
|
|
|
|
return p
|
|
|
|
|
|
|
|
|
|
return candidates[0] if candidates else None
|
|
|
|
|
except Exception:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _read_text_file(path: Path) -> Optional[str]:
|
|
|
|
|
try:
|
|
|
|
|
return path.read_text(encoding="utf-8", errors="ignore")
|
|
|
|
|
except Exception:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _download_with_sections_via_cli(
|
|
|
|
|
url: str,
|
|
|
|
|
ytdl_options: Dict[str, Any],
|
|
|
|
|
sections: List[str],
|
|
|
|
|
quiet: bool = False,
|
|
|
|
|
) -> tuple[Optional[str], Dict[str, Any]]:
|
|
|
|
|
sections_list = ytdl_options.get("download_sections", [])
|
|
|
|
|
if not sections_list:
|
|
|
|
|
return "", {}
|
|
|
|
|
|
|
|
|
|
session_id = hashlib.md5((url + str(time.time()) + "".join(random.choices(string.ascii_letters, k=10))).encode()).hexdigest()[:12]
|
|
|
|
|
first_section_info = None
|
|
|
|
|
|
|
|
|
|
total_sections = len(sections_list)
|
|
|
|
|
for section_idx, section in enumerate(sections_list, 1):
|
|
|
|
|
try:
|
|
|
|
|
if total_sections > 0:
|
|
|
|
|
pct = 50 + int(((section_idx - 1) / max(1, total_sections)) * 49)
|
|
|
|
|
_set_pipe_percent(pct)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
base_outtmpl = ytdl_options.get("outtmpl", "%(title)s.%(ext)s")
|
|
|
|
|
output_dir_path = Path(base_outtmpl).parent
|
|
|
|
|
filename_tmpl = f"{session_id}_{section_idx}"
|
|
|
|
|
if base_outtmpl.endswith(".%(ext)s"):
|
|
|
|
|
filename_tmpl += ".%(ext)s"
|
|
|
|
|
section_outtmpl = str(output_dir_path / filename_tmpl)
|
|
|
|
|
|
|
|
|
|
if section_idx == 1:
|
|
|
|
|
metadata_cmd = ["yt-dlp", "--dump-json", "--skip-download"]
|
|
|
|
|
if ytdl_options.get("cookiefile"):
|
|
|
|
|
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
|
|
|
|
|
metadata_cmd.extend(["--cookies", cookies_path])
|
|
|
|
|
if ytdl_options.get("noplaylist"):
|
|
|
|
|
metadata_cmd.append("--no-playlist")
|
|
|
|
|
metadata_cmd.append(url)
|
|
|
|
|
try:
|
|
|
|
|
meta_result = subprocess.run(metadata_cmd, capture_output=True, text=True)
|
|
|
|
|
if meta_result.returncode == 0 and meta_result.stdout:
|
|
|
|
|
try:
|
|
|
|
|
info_dict = json.loads(meta_result.stdout.strip())
|
|
|
|
|
first_section_info = info_dict
|
|
|
|
|
if not quiet:
|
|
|
|
|
debug(f"Extracted title from metadata: {info_dict.get('title')}")
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
if not quiet:
|
|
|
|
|
debug("Could not parse JSON metadata")
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
if not quiet:
|
|
|
|
|
debug(f"Error extracting metadata: {exc}")
|
|
|
|
|
|
|
|
|
|
cmd = ["yt-dlp"]
|
|
|
|
|
if quiet:
|
|
|
|
|
cmd.append("--quiet")
|
|
|
|
|
cmd.append("--no-warnings")
|
|
|
|
|
cmd.append("--no-progress")
|
|
|
|
|
cmd.extend(["--postprocessor-args", "ffmpeg:-hide_banner -loglevel error"])
|
|
|
|
|
if ytdl_options.get("ffmpeg_location"):
|
|
|
|
|
try:
|
|
|
|
|
cmd.extend(["--ffmpeg-location", str(ytdl_options["ffmpeg_location"])])
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
if ytdl_options.get("format"):
|
|
|
|
|
cmd.extend(["-f", ytdl_options["format"]])
|
|
|
|
|
if ytdl_options.get("merge_output_format"):
|
|
|
|
|
cmd.extend(["--merge-output-format", str(ytdl_options["merge_output_format"])])
|
|
|
|
|
|
|
|
|
|
postprocessors = ytdl_options.get("postprocessors")
|
|
|
|
|
want_add_metadata = bool(ytdl_options.get("addmetadata"))
|
|
|
|
|
want_embed_chapters = bool(ytdl_options.get("embedchapters"))
|
|
|
|
|
if isinstance(postprocessors, list):
|
|
|
|
|
for pp in postprocessors:
|
|
|
|
|
if not isinstance(pp, dict):
|
|
|
|
|
continue
|
|
|
|
|
if str(pp.get("key") or "") == "FFmpegMetadata":
|
|
|
|
|
want_add_metadata = True
|
|
|
|
|
if bool(pp.get("add_chapters", True)):
|
|
|
|
|
want_embed_chapters = True
|
|
|
|
|
|
|
|
|
|
if want_add_metadata:
|
|
|
|
|
cmd.append("--add-metadata")
|
|
|
|
|
if want_embed_chapters:
|
|
|
|
|
cmd.append("--embed-chapters")
|
|
|
|
|
if ytdl_options.get("writesubtitles"):
|
|
|
|
|
cmd.append("--write-sub")
|
|
|
|
|
cmd.append("--write-auto-sub")
|
|
|
|
|
cmd.extend(["--sub-format", "vtt"])
|
|
|
|
|
if ytdl_options.get("force_keyframes_at_cuts"):
|
|
|
|
|
cmd.append("--force-keyframes-at-cuts")
|
|
|
|
|
cmd.extend(["-o", section_outtmpl])
|
|
|
|
|
if ytdl_options.get("cookiefile"):
|
|
|
|
|
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
|
|
|
|
|
cmd.extend(["--cookies", cookies_path])
|
|
|
|
|
if ytdl_options.get("noplaylist"):
|
|
|
|
|
cmd.append("--no-playlist")
|
|
|
|
|
|
|
|
|
|
cmd.extend(["--download-sections", section])
|
|
|
|
|
|
|
|
|
|
cmd.append(url)
|
|
|
|
|
if not quiet:
|
|
|
|
|
debug(f"Running yt-dlp for section: {section}")
|
|
|
|
|
try:
|
|
|
|
|
if quiet:
|
|
|
|
|
subprocess.run(cmd, check=True, capture_output=True, text=True)
|
|
|
|
|
else:
|
|
|
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
except subprocess.CalledProcessError as exc:
|
|
|
|
|
stderr_text = exc.stderr or ""
|
|
|
|
|
tail = "\n".join(stderr_text.splitlines()[-12:]).strip()
|
|
|
|
|
details = f"\n{tail}" if tail else ""
|
|
|
|
|
raise DownloadError(f"yt-dlp failed for section {section} (exit {exc.returncode}){details}") from exc
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
raise DownloadError(f"yt-dlp failed for section {section}: {exc}") from exc
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
_set_pipe_percent(99)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
return session_id, first_section_info or {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _iter_download_entries(info: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
|
|
|
|
|
queue: List[Dict[str, Any]] = [info]
|
|
|
|
|
seen: set[int] = set()
|
|
|
|
|
while queue:
|
|
|
|
|
current = queue.pop(0)
|
|
|
|
|
obj_id = id(current)
|
|
|
|
|
if obj_id in seen:
|
|
|
|
|
continue
|
|
|
|
|
seen.add(obj_id)
|
|
|
|
|
entries = current.get("entries")
|
|
|
|
|
if isinstance(entries, list):
|
|
|
|
|
for entry in entries:
|
|
|
|
|
queue.append(entry)
|
|
|
|
|
if current.get("requested_downloads") or not entries:
|
|
|
|
|
yield current
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _candidate_paths(entry: Dict[str, Any], output_dir: Path) -> Iterator[Path]:
|
|
|
|
|
requested = entry.get("requested_downloads")
|
|
|
|
|
if isinstance(requested, list):
|
|
|
|
|
for item in requested:
|
|
|
|
|
if isinstance(item, dict):
|
|
|
|
|
fp = item.get("filepath") or item.get("_filename")
|
|
|
|
|
if fp:
|
|
|
|
|
yield Path(fp)
|
|
|
|
|
for key in ("filepath", "_filename", "filename"):
|
|
|
|
|
value = entry.get(key)
|
|
|
|
|
if value:
|
|
|
|
|
yield Path(value)
|
|
|
|
|
if entry.get("filename"):
|
|
|
|
|
yield output_dir / entry["filename"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _resolve_entry_and_path(info: Dict[str, Any], output_dir: Path) -> tuple[Dict[str, Any], Path]:
|
|
|
|
|
for entry in _iter_download_entries(info):
|
|
|
|
|
for candidate in _candidate_paths(entry, output_dir):
|
|
|
|
|
if candidate.is_file():
|
|
|
|
|
return entry, candidate
|
|
|
|
|
if not candidate.is_absolute():
|
|
|
|
|
maybe = output_dir / candidate
|
|
|
|
|
if maybe.is_file():
|
|
|
|
|
return entry, maybe
|
|
|
|
|
raise FileNotFoundError("yt-dlp did not report a downloaded media file")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _resolve_entries_and_paths(info: Dict[str, Any], output_dir: Path) -> List[tuple[Dict[str, Any], Path]]:
|
|
|
|
|
resolved: List[tuple[Dict[str, Any], Path]] = []
|
|
|
|
|
seen: set[str] = set()
|
|
|
|
|
for entry in _iter_download_entries(info):
|
|
|
|
|
chosen: Optional[Path] = None
|
|
|
|
|
for candidate in _candidate_paths(entry, output_dir):
|
|
|
|
|
if candidate.is_file():
|
|
|
|
|
chosen = candidate
|
|
|
|
|
break
|
|
|
|
|
if not candidate.is_absolute():
|
|
|
|
|
maybe = output_dir / candidate
|
|
|
|
|
if maybe.is_file():
|
|
|
|
|
chosen = maybe
|
|
|
|
|
break
|
|
|
|
|
if chosen is None:
|
|
|
|
|
continue
|
|
|
|
|
key = str(chosen.resolve())
|
|
|
|
|
if key in seen:
|
|
|
|
|
continue
|
|
|
|
|
seen.add(key)
|
|
|
|
|
resolved.append((entry, chosen))
|
|
|
|
|
return resolved
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_sha256(info: Dict[str, Any]) -> Optional[str]:
|
|
|
|
|
for payload in [info] + info.get("entries", []):
|
|
|
|
|
if not isinstance(payload, dict):
|
|
|
|
|
continue
|
|
|
|
|
hashes = payload.get("hashes")
|
|
|
|
|
if isinstance(hashes, dict):
|
|
|
|
|
for key in ("sha256", "sha-256", "sha_256"):
|
|
|
|
|
if key in hashes and isinstance(hashes[key], str) and hashes[key].strip():
|
|
|
|
|
return hashes[key].strip()
|
|
|
|
|
for key in ("sha256", "sha-256", "sha_256"):
|
|
|
|
|
value = payload.get(key)
|
|
|
|
|
if isinstance(value, str) and value.strip():
|
|
|
|
|
return value.strip()
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _progress_callback(status: Dict[str, Any]) -> None:
|
2026-01-05 07:51:19 -08:00
|
|
|
label = _progress_label(status)
|
2026-01-01 20:37:27 -08:00
|
|
|
event = status.get("status")
|
2026-01-05 07:51:19 -08:00
|
|
|
downloaded = status.get("downloaded_bytes")
|
|
|
|
|
total = status.get("total_bytes") or status.get("total_bytes_estimate")
|
|
|
|
|
|
|
|
|
|
pipeline = PipelineProgress(pipeline_context)
|
|
|
|
|
live_ui, _ = pipeline.ui_and_pipe_index()
|
|
|
|
|
use_live = live_ui is not None
|
|
|
|
|
|
|
|
|
|
def _total_bytes(value: Any) -> Optional[int]:
|
|
|
|
|
try:
|
|
|
|
|
if isinstance(value, (int, float)) and value > 0:
|
|
|
|
|
return int(value)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
return None
|
|
|
|
|
|
2026-01-01 20:37:27 -08:00
|
|
|
if event == "downloading":
|
2026-01-05 07:51:19 -08:00
|
|
|
if use_live:
|
|
|
|
|
try:
|
|
|
|
|
if not _YTDLP_TRANSFER_STATE.get(label, {}).get("started"):
|
|
|
|
|
pipeline.begin_transfer(label=label, total=_total_bytes(total))
|
|
|
|
|
_YTDLP_TRANSFER_STATE[label] = {"started": True}
|
|
|
|
|
pipeline.update_transfer(
|
|
|
|
|
label=label,
|
|
|
|
|
completed=int(downloaded) if downloaded is not None else None,
|
|
|
|
|
total=_total_bytes(total),
|
|
|
|
|
)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
else:
|
|
|
|
|
_YTDLP_PROGRESS_BAR.update(
|
|
|
|
|
downloaded=int(downloaded) if downloaded is not None else None,
|
|
|
|
|
total=int(total) if total is not None else None,
|
|
|
|
|
label=label,
|
|
|
|
|
file=sys.stderr,
|
|
|
|
|
)
|
2026-01-01 20:37:27 -08:00
|
|
|
elif event == "finished":
|
2026-01-05 07:51:19 -08:00
|
|
|
if use_live:
|
|
|
|
|
try:
|
|
|
|
|
if _YTDLP_TRANSFER_STATE.get(label, {}).get("started"):
|
|
|
|
|
pipeline.finish_transfer(label=label)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
_YTDLP_TRANSFER_STATE.pop(label, None)
|
|
|
|
|
else:
|
|
|
|
|
_YTDLP_PROGRESS_BAR.finish()
|
2026-01-01 20:37:27 -08:00
|
|
|
elif event in ("postprocessing", "processing"):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
from SYS.metadata import extract_ytdlp_tags
|
|
|
|
|
except ImportError:
|
|
|
|
|
extract_ytdlp_tags = None # type: ignore
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] = None) -> Any:
|
|
|
|
|
"""Download streaming media exclusively via yt-dlp."""
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
netloc = urlparse(opts.url).netloc.lower()
|
|
|
|
|
except Exception:
|
|
|
|
|
netloc = ""
|
|
|
|
|
if "gofile.io" in netloc:
|
|
|
|
|
msg = "GoFile links are currently unsupported"
|
|
|
|
|
if not opts.quiet:
|
|
|
|
|
debug(msg)
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record("gofile-unsupported", {"url": opts.url})
|
|
|
|
|
raise DownloadError(msg)
|
|
|
|
|
|
|
|
|
|
ytdlp_supported = is_url_supported_by_ytdlp(opts.url)
|
|
|
|
|
if not ytdlp_supported:
|
|
|
|
|
msg = "URL not supported by yt-dlp; try download-file for manual downloads"
|
|
|
|
|
if not opts.quiet:
|
|
|
|
|
log(msg)
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record("ytdlp-unsupported", {"url": opts.url})
|
|
|
|
|
raise DownloadError(msg)
|
|
|
|
|
|
|
|
|
|
if opts.playlist_items:
|
|
|
|
|
debug(
|
|
|
|
|
f"Skipping probe for playlist (item selection: {opts.playlist_items}), proceeding with download"
|
|
|
|
|
)
|
|
|
|
|
probe_result = {"url": opts.url}
|
|
|
|
|
else:
|
|
|
|
|
probe_cookiefile = None
|
|
|
|
|
try:
|
|
|
|
|
if opts.cookies_path and opts.cookies_path.is_file():
|
|
|
|
|
probe_cookiefile = str(opts.cookies_path)
|
|
|
|
|
except Exception:
|
|
|
|
|
probe_cookiefile = None
|
|
|
|
|
|
|
|
|
|
probe_result = probe_url(opts.url, no_playlist=opts.no_playlist, timeout_seconds=15, cookiefile=probe_cookiefile)
|
|
|
|
|
|
|
|
|
|
if probe_result is None:
|
|
|
|
|
msg = "yt-dlp could not detect media for this URL; use download-file for direct downloads"
|
|
|
|
|
if not opts.quiet:
|
|
|
|
|
log(msg)
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record("ytdlp-skip-no-media", {"url": opts.url})
|
|
|
|
|
raise DownloadError(msg)
|
|
|
|
|
|
|
|
|
|
ensure_yt_dlp_ready()
|
|
|
|
|
|
|
|
|
|
ytdlp_tool = YtDlpTool()
|
|
|
|
|
ytdl_options = ytdlp_tool.build_ytdlp_options(opts)
|
|
|
|
|
hooks = ytdl_options.get("progress_hooks")
|
|
|
|
|
if not isinstance(hooks, list):
|
|
|
|
|
hooks = []
|
|
|
|
|
ytdl_options["progress_hooks"] = hooks
|
|
|
|
|
if _progress_callback not in hooks:
|
|
|
|
|
hooks.append(_progress_callback)
|
|
|
|
|
if not opts.quiet:
|
|
|
|
|
debug(f"Starting yt-dlp download: {opts.url}")
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record("ytdlp-start", {"url": opts.url})
|
|
|
|
|
|
|
|
|
|
assert yt_dlp is not None
|
|
|
|
|
try:
|
|
|
|
|
if not opts.quiet:
|
|
|
|
|
if ytdl_options.get("download_sections"):
|
|
|
|
|
debug(f"[yt-dlp] download_sections: {ytdl_options['download_sections']}")
|
|
|
|
|
debug(f"[yt-dlp] force_keyframes_at_cuts: {ytdl_options.get('force_keyframes_at_cuts', False)}")
|
|
|
|
|
|
|
|
|
|
session_id = None
|
|
|
|
|
first_section_info = {}
|
|
|
|
|
if ytdl_options.get("download_sections"):
|
|
|
|
|
live_ui, _ = PipelineProgress(pipeline_context).ui_and_pipe_index()
|
|
|
|
|
quiet_sections = bool(opts.quiet) or (live_ui is not None)
|
|
|
|
|
session_id, first_section_info = _download_with_sections_via_cli(
|
|
|
|
|
opts.url,
|
|
|
|
|
ytdl_options,
|
|
|
|
|
ytdl_options.get("download_sections", []),
|
|
|
|
|
quiet=quiet_sections,
|
|
|
|
|
)
|
|
|
|
|
info = None
|
|
|
|
|
else:
|
|
|
|
|
with yt_dlp.YoutubeDL(ytdl_options) as ydl: # type: ignore[arg-type]
|
|
|
|
|
info = ydl.extract_info(opts.url, download=True)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log(f"yt-dlp failed: {exc}", file=sys.stderr)
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record(
|
|
|
|
|
"exception",
|
|
|
|
|
{"phase": "yt-dlp", "error": str(exc), "traceback": traceback.format_exc()},
|
|
|
|
|
)
|
|
|
|
|
raise DownloadError("yt-dlp download failed") from exc
|
|
|
|
|
|
|
|
|
|
if info is None:
|
|
|
|
|
try:
|
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
files = sorted(opts.output_dir.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True)
|
|
|
|
|
if not files:
|
|
|
|
|
raise FileNotFoundError(f"No files found in {opts.output_dir}")
|
|
|
|
|
|
|
|
|
|
if opts.clip_sections and session_id:
|
|
|
|
|
section_pattern = re.compile(rf"^{re.escape(session_id)}_(\d+)")
|
|
|
|
|
matching_files = [f for f in files if section_pattern.search(f.name)]
|
|
|
|
|
|
|
|
|
|
if matching_files:
|
|
|
|
|
def extract_section_num(path: Path) -> int:
|
|
|
|
|
match = section_pattern.search(path.name)
|
|
|
|
|
return int(match.group(1)) if match else 999
|
|
|
|
|
|
|
|
|
|
matching_files.sort(key=extract_section_num)
|
|
|
|
|
debug(f"Found {len(matching_files)} section file(s) matching pattern")
|
|
|
|
|
|
|
|
|
|
by_index: Dict[int, List[Path]] = {}
|
|
|
|
|
for f in matching_files:
|
|
|
|
|
m = section_pattern.search(f.name)
|
|
|
|
|
if not m:
|
|
|
|
|
continue
|
|
|
|
|
try:
|
|
|
|
|
n = int(m.group(1))
|
|
|
|
|
except Exception:
|
|
|
|
|
continue
|
|
|
|
|
by_index.setdefault(n, []).append(f)
|
|
|
|
|
|
|
|
|
|
renamed_media_files: List[Path] = []
|
|
|
|
|
|
|
|
|
|
for sec_num in sorted(by_index.keys()):
|
|
|
|
|
group = by_index.get(sec_num) or []
|
|
|
|
|
if not group:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
def _is_subtitle(p: Path) -> bool:
|
|
|
|
|
try:
|
|
|
|
|
return p.suffix.lower() in _SUBTITLE_EXTS
|
|
|
|
|
except Exception:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
media_candidates = [p for p in group if not _is_subtitle(p)]
|
|
|
|
|
subtitle_candidates = [p for p in group if _is_subtitle(p)]
|
|
|
|
|
|
|
|
|
|
media_file: Optional[Path] = None
|
|
|
|
|
for cand in media_candidates:
|
|
|
|
|
try:
|
|
|
|
|
if cand.suffix.lower() in {".json", ".info.json"}:
|
|
|
|
|
continue
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
media_file = cand
|
|
|
|
|
break
|
|
|
|
|
if media_file is None and media_candidates:
|
|
|
|
|
media_file = media_candidates[0]
|
|
|
|
|
if media_file is None:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
media_hash = sha256_file(media_file)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
debug(f"Failed to hash section media file {media_file.name}: {exc}")
|
|
|
|
|
renamed_media_files.append(media_file)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
prefix = f"{session_id}_{sec_num}"
|
|
|
|
|
|
|
|
|
|
def _tail(name: str) -> str:
|
|
|
|
|
try:
|
|
|
|
|
if name.startswith(prefix):
|
|
|
|
|
return name[len(prefix):]
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
try:
|
|
|
|
|
return Path(name).suffix
|
|
|
|
|
except Exception:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
new_media_name = f"{media_hash}{_tail(media_file.name)}"
|
|
|
|
|
new_media_path = opts.output_dir / new_media_name
|
|
|
|
|
if new_media_path.exists() and new_media_path != media_file:
|
|
|
|
|
debug(f"File with hash {media_hash} already exists, using existing file.")
|
|
|
|
|
try:
|
|
|
|
|
media_file.unlink()
|
|
|
|
|
except OSError:
|
|
|
|
|
pass
|
|
|
|
|
else:
|
|
|
|
|
media_file.rename(new_media_path)
|
|
|
|
|
debug(f"Renamed section file: {media_file.name} -> {new_media_name}")
|
|
|
|
|
renamed_media_files.append(new_media_path)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
debug(f"Failed to rename section media file {media_file.name}: {exc}")
|
|
|
|
|
renamed_media_files.append(media_file)
|
|
|
|
|
new_media_path = media_file
|
|
|
|
|
|
|
|
|
|
for sub_file in subtitle_candidates:
|
|
|
|
|
try:
|
|
|
|
|
new_sub_name = f"{media_hash}{_tail(sub_file.name)}"
|
|
|
|
|
new_sub_path = opts.output_dir / new_sub_name
|
|
|
|
|
if new_sub_path.exists() and new_sub_path != sub_file:
|
|
|
|
|
try:
|
|
|
|
|
sub_file.unlink()
|
|
|
|
|
except OSError:
|
|
|
|
|
pass
|
|
|
|
|
else:
|
|
|
|
|
sub_file.rename(new_sub_path)
|
|
|
|
|
debug(f"Renamed section file: {sub_file.name} -> {new_sub_name}")
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
debug(f"Failed to rename section subtitle file {sub_file.name}: {exc}")
|
|
|
|
|
|
|
|
|
|
media_path = renamed_media_files[0] if renamed_media_files else matching_files[0]
|
|
|
|
|
media_paths = renamed_media_files if renamed_media_files else None
|
|
|
|
|
if not opts.quiet:
|
|
|
|
|
count = len(media_paths) if isinstance(media_paths, list) else 1
|
|
|
|
|
debug(f"✓ Downloaded {count} section media file(s) (session: {session_id})")
|
|
|
|
|
else:
|
|
|
|
|
media_path = files[0]
|
|
|
|
|
media_paths = None
|
|
|
|
|
if not opts.quiet:
|
|
|
|
|
debug(f"✓ Downloaded section file (pattern not found): {media_path.name}")
|
|
|
|
|
else:
|
|
|
|
|
media_path = files[0]
|
|
|
|
|
media_paths = None
|
|
|
|
|
|
|
|
|
|
if not opts.quiet:
|
|
|
|
|
debug(f"✓ Downloaded: {media_path.name}")
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record("ytdlp-file-found", {"path": str(media_path)})
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log(f"Error finding downloaded file: {exc}", file=sys.stderr)
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record("exception", {"phase": "find-file", "error": str(exc)})
|
|
|
|
|
raise DownloadError(str(exc)) from exc
|
|
|
|
|
|
|
|
|
|
file_hash = sha256_file(media_path)
|
|
|
|
|
tags = []
|
|
|
|
|
title = ""
|
|
|
|
|
if first_section_info:
|
|
|
|
|
title = first_section_info.get("title", "")
|
|
|
|
|
if title:
|
|
|
|
|
tags.append(f"title:{title}")
|
|
|
|
|
debug(f"Added title tag for section download: {title}")
|
|
|
|
|
|
|
|
|
|
if first_section_info:
|
|
|
|
|
info_dict = first_section_info
|
|
|
|
|
else:
|
|
|
|
|
info_dict = {"id": media_path.stem, "title": title or media_path.stem, "ext": media_path.suffix.lstrip(".")}
|
|
|
|
|
|
|
|
|
|
return DownloadMediaResult(path=media_path, info=info_dict, tag=tags, source_url=opts.url, hash_value=file_hash, paths=media_paths)
|
|
|
|
|
|
|
|
|
|
if not isinstance(info, dict):
|
|
|
|
|
log(f"Unexpected yt-dlp response: {type(info)}", file=sys.stderr)
|
|
|
|
|
raise DownloadError("Unexpected yt-dlp response type")
|
|
|
|
|
|
|
|
|
|
info_dict: Dict[str, Any] = cast(Dict[str, Any], info)
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record("ytdlp-info", {"keys": sorted(info_dict.keys()), "is_playlist": bool(info_dict.get("entries"))})
|
|
|
|
|
|
|
|
|
|
if info_dict.get("entries") and not opts.no_playlist:
|
|
|
|
|
resolved = _resolve_entries_and_paths(info_dict, opts.output_dir)
|
|
|
|
|
if resolved:
|
|
|
|
|
results: List[DownloadMediaResult] = []
|
|
|
|
|
for entry, media_path in resolved:
|
|
|
|
|
hash_value = _extract_sha256(entry) or _extract_sha256(info_dict)
|
|
|
|
|
if not hash_value:
|
|
|
|
|
try:
|
|
|
|
|
hash_value = sha256_file(media_path)
|
|
|
|
|
except OSError:
|
|
|
|
|
hash_value = None
|
|
|
|
|
|
|
|
|
|
tags: List[str] = []
|
|
|
|
|
if extract_ytdlp_tags:
|
|
|
|
|
try:
|
|
|
|
|
tags = extract_ytdlp_tags(entry)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log(f"Error extracting tags: {exc}", file=sys.stderr)
|
|
|
|
|
|
|
|
|
|
source_url = entry.get("webpage_url") or entry.get("original_url") or entry.get("url") or opts.url
|
|
|
|
|
|
|
|
|
|
results.append(
|
|
|
|
|
DownloadMediaResult(
|
|
|
|
|
path=media_path,
|
|
|
|
|
info=entry,
|
|
|
|
|
tag=tags,
|
|
|
|
|
source_url=source_url,
|
|
|
|
|
hash_value=hash_value,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not opts.quiet:
|
|
|
|
|
debug(f"✓ Downloaded playlist items: {len(results)}")
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
entry, media_path = _resolve_entry_and_path(info_dict, opts.output_dir)
|
|
|
|
|
except FileNotFoundError as exc:
|
|
|
|
|
log(f"Error: {exc}", file=sys.stderr)
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record("exception", {"phase": "resolve-path", "error": str(exc)})
|
|
|
|
|
raise DownloadError(str(exc)) from exc
|
|
|
|
|
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record("resolved-media", {"path": str(media_path), "entry_keys": sorted(entry.keys())})
|
|
|
|
|
|
|
|
|
|
hash_value = _extract_sha256(entry) or _extract_sha256(info_dict)
|
|
|
|
|
if not hash_value:
|
|
|
|
|
try:
|
|
|
|
|
hash_value = sha256_file(media_path)
|
|
|
|
|
except OSError as exc:
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record("hash-error", {"path": str(media_path), "error": str(exc)})
|
|
|
|
|
|
|
|
|
|
tags = []
|
|
|
|
|
if extract_ytdlp_tags:
|
|
|
|
|
try:
|
|
|
|
|
tags = extract_ytdlp_tags(entry)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log(f"Error extracting tags: {exc}", file=sys.stderr)
|
|
|
|
|
|
|
|
|
|
source_url = entry.get("webpage_url") or entry.get("original_url") or entry.get("url")
|
|
|
|
|
|
|
|
|
|
if not opts.quiet:
|
|
|
|
|
debug(f"✓ Downloaded: {media_path.name} ({len(tags)} tags)")
|
|
|
|
|
if debug_logger is not None:
|
|
|
|
|
debug_logger.write_record(
|
|
|
|
|
"downloaded",
|
|
|
|
|
{
|
|
|
|
|
"path": str(media_path),
|
|
|
|
|
"tag_count": len(tags),
|
|
|
|
|
"source_url": source_url,
|
|
|
|
|
"sha256": hash_value,
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return DownloadMediaResult(path=media_path, info=entry, tag=tags, source_url=source_url, hash_value=hash_value)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300) -> Any:
|
|
|
|
|
import threading
|
|
|
|
|
from typing import cast
|
|
|
|
|
|
|
|
|
|
result_container: List[Optional[Any]] = [None, None]
|
|
|
|
|
|
|
|
|
|
def _do_download() -> None:
|
|
|
|
|
try:
|
|
|
|
|
result_container[0] = download_media(opts)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
result_container[1] = exc
|
|
|
|
|
|
|
|
|
|
thread = threading.Thread(target=_do_download, daemon=False)
|
|
|
|
|
thread.start()
|
|
|
|
|
thread.join(timeout=timeout_seconds)
|
|
|
|
|
|
|
|
|
|
if thread.is_alive():
|
|
|
|
|
raise DownloadError(f"Download timeout after {timeout_seconds} seconds for {opts.url}")
|
|
|
|
|
|
|
|
|
|
if result_container[1] is not None:
|
|
|
|
|
raise cast(Exception, result_container[1])
|
|
|
|
|
|
|
|
|
|
if result_container[0] is None:
|
|
|
|
|
raise DownloadError(f"Download failed for {opts.url}")
|
|
|
|
|
|
|
|
|
|
return cast(Any, result_container[0])
|