Files
Medios-Macina/tool/ytdlp.py
2026-01-05 07:51:19 -08:00

1385 lines
48 KiB
Python

from __future__ import annotations
import hashlib
import json
import os
import random
import re
import string
import subprocess
import sys
import threading
import time
import traceback
from contextlib import AbstractContextManager, nullcontext
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Sequence, cast
from urllib.parse import urlparse
from SYS import pipeline as pipeline_context
from SYS.logger import debug, log
from SYS.models import (
DebugLogger,
DownloadError,
DownloadMediaResult,
DownloadOptions,
ProgressBar,
)
from SYS.pipeline_progress import PipelineProgress
from SYS.utils import ensure_directory, sha256_file
_YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {}
try:
import yt_dlp # type: ignore
from yt_dlp.extractor import gen_extractors # type: ignore
except Exception as exc: # pragma: no cover - handled at runtime
yt_dlp = None # type: ignore
gen_extractors = None # type: ignore
YTDLP_IMPORT_ERROR = exc
else:
YTDLP_IMPORT_ERROR = None
_EXTRACTOR_CACHE: List[Any] | None = None
def _get_nested(config: Dict[str, Any], *path: str) -> Any:
cur: Any = config
for key in path:
if not isinstance(cur, dict):
return None
cur = cur.get(key)
return cur
def _parse_csv_list(value: Any) -> Optional[List[str]]:
if value is None:
return None
if isinstance(value, list):
out: List[str] = []
for item in value:
s = str(item).strip()
if s:
out.append(s)
return out or None
s = str(value).strip()
if not s:
return None
# allow either JSON-ish list strings or simple comma-separated values
if s.startswith("[") and s.endswith("]"):
s = s[1:-1]
parts = [p.strip() for p in s.split(",")]
parts = [p for p in parts if p]
return parts or None
def ensure_yt_dlp_ready() -> None:
"""Verify yt-dlp is importable, raising DownloadError if missing."""
if yt_dlp is not None:
return
detail = str(YTDLP_IMPORT_ERROR or "yt-dlp is not installed")
raise DownloadError(f"yt-dlp module not available: {detail}")
def _get_extractors() -> List[Any]:
global _EXTRACTOR_CACHE
if _EXTRACTOR_CACHE is not None:
return _EXTRACTOR_CACHE
ensure_yt_dlp_ready()
if gen_extractors is None:
_EXTRACTOR_CACHE = []
return _EXTRACTOR_CACHE
try:
_EXTRACTOR_CACHE = [ie for ie in gen_extractors()]
except Exception:
_EXTRACTOR_CACHE = []
return _EXTRACTOR_CACHE
def is_url_supported_by_ytdlp(url: str) -> bool:
"""Return True if yt-dlp has a non-generic extractor for the URL."""
if not url or not isinstance(url, str):
return False
if YTDLP_IMPORT_ERROR is not None:
return False
try:
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
return False
except Exception:
return False
try:
for extractor in _get_extractors():
try:
if not extractor.suitable(url):
continue
except Exception:
continue
name = getattr(extractor, "IE_NAME", "").lower()
if name == "generic":
continue
return True
except Exception:
return False
return False
def list_formats(
url: str,
*,
no_playlist: bool = False,
playlist_items: Optional[str] = None,
cookiefile: Optional[str] = None,
) -> Optional[List[Dict[str, Any]]]:
"""Get available formats for a URL.
Returns a list of format dicts or None if unsupported or probing fails.
"""
if not is_url_supported_by_ytdlp(url):
return None
ensure_yt_dlp_ready()
assert yt_dlp is not None
ydl_opts: Dict[str, Any] = {
"quiet": True,
"no_warnings": True,
"skip_download": True,
"noprogress": True,
}
if cookiefile:
ydl_opts["cookiefile"] = str(cookiefile)
if no_playlist:
ydl_opts["noplaylist"] = True
if playlist_items:
ydl_opts["playlist_items"] = str(playlist_items)
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
info = ydl.extract_info(url, download=False)
except Exception as exc:
debug(f"yt-dlp format probe failed for {url}: {exc}")
return None
if not isinstance(info, dict):
return None
formats = info.get("formats")
if not isinstance(formats, list):
return None
out: List[Dict[str, Any]] = []
for fmt in formats:
if isinstance(fmt, dict):
out.append(fmt)
return out
def probe_url(
url: str,
no_playlist: bool = False,
timeout_seconds: int = 15,
*,
cookiefile: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""Probe URL metadata without downloading.
Returns None if unsupported, errors, or times out.
"""
if not is_url_supported_by_ytdlp(url):
return None
result_container: List[Optional[Any]] = [None, None] # [result, error]
def _do_probe() -> None:
try:
ensure_yt_dlp_ready()
assert yt_dlp is not None
ydl_opts: Dict[str, Any] = {
"quiet": True,
"no_warnings": True,
"socket_timeout": 10,
"retries": 2,
"skip_download": True,
"extract_flat": "in_playlist",
"noprogress": True,
}
if cookiefile:
ydl_opts["cookiefile"] = str(cookiefile)
if no_playlist:
ydl_opts["noplaylist"] = True
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
info = ydl.extract_info(url, download=False)
if not isinstance(info, dict):
result_container[0] = None
return
webpage_url = info.get("webpage_url") or info.get("original_url") or info.get("url")
result_container[0] = {
"extractor": info.get("extractor", ""),
"title": info.get("title", ""),
"entries": info.get("entries", []),
"duration": info.get("duration"),
"uploader": info.get("uploader"),
"description": info.get("description"),
"requested_url": url,
"webpage_url": webpage_url,
"url": webpage_url or url,
}
except Exception as exc:
debug(f"Probe error for {url}: {exc}")
result_container[1] = exc
thread = threading.Thread(target=_do_probe, daemon=False)
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
debug(f"Probe timeout for {url} (>={timeout_seconds}s), proceeding without probe")
return None
if result_container[1] is not None:
return None
return cast(Optional[Dict[str, Any]], result_container[0])
@dataclass(slots=True)
class YtDlpDefaults:
"""User-tunable defaults for yt-dlp behavior.
Recommended config.conf keys (top-level dotted keys):
- ytdlp.video_format="bestvideo+bestaudio/best"
- ytdlp.audio_format="251/140/bestaudio"
- ytdlp.format_sort="res:2160,res:1440,res:1080,res:720,res"
Cookies:
- cookies="C:\\path\\cookies.txt" (already supported by config.resolve_cookies_path)
"""
video_format: str = "bestvideo+bestaudio/best"
audio_format: str = "251/140/bestaudio"
format_sort: Optional[List[str]] = None
class YtDlpTool:
"""Centralizes yt-dlp defaults and translation helpers.
This is intentionally small and dependency-light so cmdlets can use it without
forcing a full refactor.
"""
def __init__(
self,
config: Optional[Dict[str,
Any]] = None,
*,
script_dir: Optional[Path] = None
) -> None:
self._config: Dict[str,
Any] = dict(config or {})
# `resolve_cookies_path` expects the app root so it can fall back to ./cookies.txt.
# This file lives under ./tool/, so default to the parent directory.
self._script_dir = script_dir or Path(__file__).resolve().parent.parent
self.defaults = self._load_defaults()
self._cookiefile: Optional[Path] = self._init_cookiefile()
def _init_cookiefile(self) -> Optional[Path]:
"""Resolve cookies once at tool init (yt-dlp is the primary consumer)."""
try:
from SYS.config import resolve_cookies_path
resolved = resolve_cookies_path(self._config, script_dir=self._script_dir)
if resolved is not None and resolved.is_file():
return resolved
except Exception:
pass
return None
def _load_defaults(self) -> YtDlpDefaults:
cfg = self._config
# NOTE: `YtDlpDefaults` is a slots dataclass. Referencing defaults via
# `YtDlpDefaults.video_format` yields a `member_descriptor`, not the
# default string value. Use an instance for fallback defaults.
_fallback_defaults = YtDlpDefaults()
tool_block = _get_nested(cfg, "tool", "ytdlp")
if not isinstance(tool_block, dict):
tool_block = {}
ytdlp_block = cfg.get("ytdlp") if isinstance(cfg.get("ytdlp"),
dict) else {}
if not isinstance(ytdlp_block, dict):
ytdlp_block = {}
# Accept both nested and flat styles.
video_format = (
tool_block.get("video_format") or tool_block.get("format")
or ytdlp_block.get("video_format") or ytdlp_block.get("video")
or ytdlp_block.get("format_video") or cfg.get("ytdlp_video_format")
)
audio_format = (
tool_block.get("audio_format") or ytdlp_block.get("audio_format")
or ytdlp_block.get("audio") or ytdlp_block.get("format_audio")
or cfg.get("ytdlp_audio_format")
)
# Also accept dotted keys written as nested dicts: ytdlp.format.video, ytdlp.format.audio
nested_video = _get_nested(cfg, "ytdlp", "format", "video")
nested_audio = _get_nested(cfg, "ytdlp", "format", "audio")
fmt_sort_val = (
tool_block.get("format_sort") or ytdlp_block.get("format_sort")
or ytdlp_block.get("formatSort") or cfg.get("ytdlp_format_sort")
or _get_nested(cfg,
"ytdlp",
"format",
"sort")
)
fmt_sort = _parse_csv_list(fmt_sort_val)
defaults = YtDlpDefaults(
video_format=str(
nested_video or video_format or _fallback_defaults.video_format
),
audio_format=str(
nested_audio or audio_format or _fallback_defaults.audio_format
),
format_sort=fmt_sort,
)
return defaults
def resolve_cookiefile(self) -> Optional[Path]:
return self._cookiefile
def default_format(self, mode: str) -> str:
m = str(mode or "").lower().strip()
if m == "audio":
return self.defaults.audio_format
return self.defaults.video_format
def build_ytdlp_options(self, opts: DownloadOptions) -> Dict[str, Any]:
"""Translate DownloadOptions into yt-dlp API options."""
ensure_directory(opts.output_dir)
outtmpl = str((opts.output_dir / "%(title)s.%(ext)s").resolve())
base_options: Dict[str,
Any] = {
"outtmpl": outtmpl,
"quiet": True,
"no_warnings": True,
"noprogress": True,
"socket_timeout": 30,
"retries": 10,
"fragment_retries": 10,
"http_chunk_size": 10_485_760,
"restrictfilenames": True,
}
try:
repo_root = Path(__file__).resolve().parents[1]
bundled_ffmpeg_dir = repo_root / "MPV" / "ffmpeg" / "bin"
if bundled_ffmpeg_dir.exists():
base_options.setdefault("ffmpeg_location", str(bundled_ffmpeg_dir))
except Exception:
pass
try:
if os.name == "nt":
base_options.setdefault("file_access_retries", 40)
except Exception:
pass
if opts.cookies_path and opts.cookies_path.is_file():
base_options["cookiefile"] = str(opts.cookies_path)
else:
cookiefile = self.resolve_cookiefile()
if cookiefile is not None and cookiefile.is_file():
base_options["cookiefile"] = str(cookiefile)
if opts.no_playlist:
base_options["noplaylist"] = True
fmt = opts.ytdl_format or self.default_format(opts.mode)
base_options["format"] = fmt
if opts.mode == "audio":
base_options["postprocessors"] = [{
"key": "FFmpegExtractAudio"
}]
else:
format_sort = self.defaults.format_sort or [
"res:4320",
"res:2880",
"res:2160",
"res:1440",
"res:1080",
"res:720",
"res",
]
base_options["format_sort"] = format_sort
if getattr(opts, "embed_chapters", False):
pps = base_options.get("postprocessors")
if not isinstance(pps, list):
pps = []
already_has_metadata = any(
isinstance(pp,
dict) and str(pp.get("key") or "") == "FFmpegMetadata"
for pp in pps
)
if not already_has_metadata:
pps.append(
{
"key": "FFmpegMetadata",
"add_metadata": True,
"add_chapters": True,
"add_infojson": "if_exists",
}
)
base_options["postprocessors"] = pps
if opts.mode != "audio":
base_options.setdefault("merge_output_format", "mkv")
if getattr(opts, "write_sub", False):
base_options["writesubtitles"] = True
base_options["writeautomaticsub"] = True
base_options["subtitlesformat"] = "vtt"
if opts.clip_sections:
sections: List[str] = []
def _secs_to_hms(seconds: float) -> str:
total = max(0, int(seconds))
minutes, secs = divmod(total, 60)
hours, minutes = divmod(minutes, 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
for section_range in str(opts.clip_sections).split(","):
section_range = section_range.strip()
if not section_range:
continue
try:
start_s_raw, end_s_raw = section_range.split("-", 1)
start_s = float(start_s_raw.strip())
end_s = float(end_s_raw.strip())
if start_s >= end_s:
continue
sections.append(f"*{_secs_to_hms(start_s)}-{_secs_to_hms(end_s)}")
except (ValueError, AttributeError):
continue
if sections:
base_options["download_sections"] = sections
# Clipped outputs should begin with a keyframe; otherwise players (notably mpv)
# can show audio before video or a black screen until the next keyframe.
# yt-dlp implements this by forcing keyframes at cut points.
base_options["force_keyframes_at_cuts"] = True
debug(f"Download sections configured: {', '.join(sections)}")
if opts.playlist_items:
base_options["playlist_items"] = opts.playlist_items
if not opts.quiet:
debug(f"yt-dlp: mode={opts.mode}, format={base_options.get('format')}")
return base_options
def build_yt_dlp_cli_args(
self,
*,
url: str,
output_dir: Optional[Path] = None,
ytdl_format: Optional[str] = None,
playlist_items: Optional[str] = None,
no_playlist: bool = False,
quiet: bool = True,
extra_args: Optional[Sequence[str]] = None,
) -> List[str]:
"""Build a yt-dlp command line (argv list).
This is primarily for debug output or subprocess execution.
"""
argv: List[str] = ["yt-dlp"]
if quiet:
argv.extend(["--quiet", "--no-warnings"])
argv.append("--no-progress")
cookiefile = self.resolve_cookiefile()
if cookiefile is not None:
argv.extend(["--cookies", str(cookiefile)])
if no_playlist:
argv.append("--no-playlist")
if playlist_items:
argv.extend(["--playlist-items", str(playlist_items)])
fmt = (ytdl_format or "").strip()
if fmt:
# Use long form to avoid confusion with app-level flags.
argv.extend(["--format", fmt])
if self.defaults.format_sort:
for sort_key in self.defaults.format_sort:
argv.extend(["-S", sort_key])
if output_dir is not None:
outtmpl = str((output_dir / "%(title)s.%(ext)s").resolve())
argv.extend(["-o", outtmpl])
if extra_args:
argv.extend([str(a) for a in extra_args if str(a).strip()])
argv.append(str(url))
return argv
def debug_print_cli(self, argv: Sequence[str]) -> None:
try:
debug("yt-dlp argv: " + " ".join(str(a) for a in argv))
except Exception:
pass
# Progress + utility helpers for yt-dlp driven downloads (previously in cmdlet/download_media).
_YTDLP_PROGRESS_BAR = ProgressBar()
_YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {}
_SUBTITLE_EXTS = (".vtt", ".srt", ".ass", ".ssa", ".lrc")
def _progress_label(status: Dict[str, Any]) -> str:
info_dict = status.get("info_dict") if isinstance(status.get("info_dict"), dict) else {}
candidates = [
status.get("filename"),
info_dict.get("_filename"),
info_dict.get("filepath"),
info_dict.get("title"),
info_dict.get("id"),
]
for cand in candidates:
if not cand:
continue
try:
name = Path(str(cand)).name
except Exception:
name = str(cand)
label = str(name or "").strip()
if label:
return label
return "download"
def _live_ui_and_pipe_index() -> tuple[Optional[Any], int]:
ui = None
try:
ui = pipeline_context.get_live_progress() if hasattr(pipeline_context, "get_live_progress") else None
except Exception:
ui = None
pipe_idx: int = 0
try:
stage_ctx = pipeline_context.get_stage_context() if hasattr(pipeline_context, "get_stage_context") else None
maybe_idx = getattr(stage_ctx, "pipe_index", None) if stage_ctx is not None else None
if isinstance(maybe_idx, int):
pipe_idx = int(maybe_idx)
except Exception:
pipe_idx = 0
return ui, pipe_idx
def _begin_live_steps(total_steps: int) -> None:
ui, pipe_idx = _live_ui_and_pipe_index()
if ui is None:
return
try:
begin = getattr(ui, "begin_pipe_steps", None)
if callable(begin):
begin(int(pipe_idx), total_steps=int(total_steps))
except Exception:
return
def _step(text: str) -> None:
ui, pipe_idx = _live_ui_and_pipe_index()
if ui is None:
return
try:
adv = getattr(ui, "advance_pipe_step", None)
if callable(adv):
adv(int(pipe_idx), str(text))
except Exception:
return
def _set_pipe_percent(percent: int) -> None:
ui, pipe_idx = _live_ui_and_pipe_index()
if ui is None:
return
try:
set_pct = getattr(ui, "set_pipe_percent", None)
if callable(set_pct):
set_pct(int(pipe_idx), int(percent))
except Exception:
return
def _format_chapters_note(info: Dict[str, Any]) -> Optional[str]:
"""Format yt-dlp chapter metadata into a stable, note-friendly text."""
try:
chapters = info.get("chapters")
except Exception:
chapters = None
if not isinstance(chapters, list) or not chapters:
return None
rows: List[tuple[int, Optional[int], str]] = []
max_t = 0
for ch in chapters:
if not isinstance(ch, dict):
continue
start_raw = ch.get("start_time")
end_raw = ch.get("end_time")
title_raw = ch.get("title") or ch.get("name") or ch.get("chapter")
try:
if start_raw is None:
continue
start_s = int(float(start_raw))
except Exception:
continue
end_s: Optional[int] = None
try:
if end_raw is not None:
end_s = int(float(end_raw))
except Exception:
end_s = None
title = str(title_raw).strip() if title_raw is not None else ""
rows.append((start_s, end_s, title))
try:
max_t = max(max_t, start_s, end_s or 0)
except Exception:
max_t = max(max_t, start_s)
if not rows:
return None
force_hours = bool(max_t >= 3600)
def _tc(seconds: int) -> str:
total = max(0, int(seconds))
minutes, secs = divmod(total, 60)
hours, minutes = divmod(minutes, 60)
if force_hours:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
lines: List[str] = []
for start_s, end_s, title in sorted(
rows, key=lambda r: (r[0], r[1] if r[1] is not None else 10**9, r[2])
):
if end_s is not None and end_s > start_s:
prefix = f"{_tc(start_s)}-{_tc(end_s)}"
else:
prefix = _tc(start_s)
line = f"{prefix} {title}".strip()
if line:
lines.append(line)
text = "\n".join(lines).strip()
return text or None
def _best_subtitle_sidecar(media_path: Path) -> Optional[Path]:
"""Find the most likely subtitle sidecar file for a downloaded media file."""
try:
base_dir = media_path.parent
stem = media_path.stem
if not stem:
return None
candidates: List[Path] = []
for p in base_dir.glob(stem + ".*"):
try:
if not p.is_file():
continue
except Exception:
continue
if p.suffix.lower() in _SUBTITLE_EXTS:
candidates.append(p)
preferred_order = [".vtt", ".srt", ".ass", ".ssa", ".lrc"]
for ext in preferred_order:
for p in candidates:
if p.suffix.lower() == ext:
return p
return candidates[0] if candidates else None
except Exception:
return None
def _read_text_file(path: Path) -> Optional[str]:
try:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception:
return None
def _download_with_sections_via_cli(
url: str,
ytdl_options: Dict[str, Any],
sections: List[str],
quiet: bool = False,
) -> tuple[Optional[str], Dict[str, Any]]:
sections_list = ytdl_options.get("download_sections", [])
if not sections_list:
return "", {}
session_id = hashlib.md5((url + str(time.time()) + "".join(random.choices(string.ascii_letters, k=10))).encode()).hexdigest()[:12]
first_section_info = None
total_sections = len(sections_list)
for section_idx, section in enumerate(sections_list, 1):
try:
if total_sections > 0:
pct = 50 + int(((section_idx - 1) / max(1, total_sections)) * 49)
_set_pipe_percent(pct)
except Exception:
pass
base_outtmpl = ytdl_options.get("outtmpl", "%(title)s.%(ext)s")
output_dir_path = Path(base_outtmpl).parent
filename_tmpl = f"{session_id}_{section_idx}"
if base_outtmpl.endswith(".%(ext)s"):
filename_tmpl += ".%(ext)s"
section_outtmpl = str(output_dir_path / filename_tmpl)
if section_idx == 1:
metadata_cmd = ["yt-dlp", "--dump-json", "--skip-download"]
if ytdl_options.get("cookiefile"):
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
metadata_cmd.extend(["--cookies", cookies_path])
if ytdl_options.get("noplaylist"):
metadata_cmd.append("--no-playlist")
metadata_cmd.append(url)
try:
meta_result = subprocess.run(metadata_cmd, capture_output=True, text=True)
if meta_result.returncode == 0 and meta_result.stdout:
try:
info_dict = json.loads(meta_result.stdout.strip())
first_section_info = info_dict
if not quiet:
debug(f"Extracted title from metadata: {info_dict.get('title')}")
except json.JSONDecodeError:
if not quiet:
debug("Could not parse JSON metadata")
except Exception as exc:
if not quiet:
debug(f"Error extracting metadata: {exc}")
cmd = ["yt-dlp"]
if quiet:
cmd.append("--quiet")
cmd.append("--no-warnings")
cmd.append("--no-progress")
cmd.extend(["--postprocessor-args", "ffmpeg:-hide_banner -loglevel error"])
if ytdl_options.get("ffmpeg_location"):
try:
cmd.extend(["--ffmpeg-location", str(ytdl_options["ffmpeg_location"])])
except Exception:
pass
if ytdl_options.get("format"):
cmd.extend(["-f", ytdl_options["format"]])
if ytdl_options.get("merge_output_format"):
cmd.extend(["--merge-output-format", str(ytdl_options["merge_output_format"])])
postprocessors = ytdl_options.get("postprocessors")
want_add_metadata = bool(ytdl_options.get("addmetadata"))
want_embed_chapters = bool(ytdl_options.get("embedchapters"))
if isinstance(postprocessors, list):
for pp in postprocessors:
if not isinstance(pp, dict):
continue
if str(pp.get("key") or "") == "FFmpegMetadata":
want_add_metadata = True
if bool(pp.get("add_chapters", True)):
want_embed_chapters = True
if want_add_metadata:
cmd.append("--add-metadata")
if want_embed_chapters:
cmd.append("--embed-chapters")
if ytdl_options.get("writesubtitles"):
cmd.append("--write-sub")
cmd.append("--write-auto-sub")
cmd.extend(["--sub-format", "vtt"])
if ytdl_options.get("force_keyframes_at_cuts"):
cmd.append("--force-keyframes-at-cuts")
cmd.extend(["-o", section_outtmpl])
if ytdl_options.get("cookiefile"):
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
cmd.extend(["--cookies", cookies_path])
if ytdl_options.get("noplaylist"):
cmd.append("--no-playlist")
cmd.extend(["--download-sections", section])
cmd.append(url)
if not quiet:
debug(f"Running yt-dlp for section: {section}")
try:
if quiet:
subprocess.run(cmd, check=True, capture_output=True, text=True)
else:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as exc:
stderr_text = exc.stderr or ""
tail = "\n".join(stderr_text.splitlines()[-12:]).strip()
details = f"\n{tail}" if tail else ""
raise DownloadError(f"yt-dlp failed for section {section} (exit {exc.returncode}){details}") from exc
except Exception as exc:
raise DownloadError(f"yt-dlp failed for section {section}: {exc}") from exc
try:
_set_pipe_percent(99)
except Exception:
pass
return session_id, first_section_info or {}
def _iter_download_entries(info: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
queue: List[Dict[str, Any]] = [info]
seen: set[int] = set()
while queue:
current = queue.pop(0)
obj_id = id(current)
if obj_id in seen:
continue
seen.add(obj_id)
entries = current.get("entries")
if isinstance(entries, list):
for entry in entries:
queue.append(entry)
if current.get("requested_downloads") or not entries:
yield current
def _candidate_paths(entry: Dict[str, Any], output_dir: Path) -> Iterator[Path]:
requested = entry.get("requested_downloads")
if isinstance(requested, list):
for item in requested:
if isinstance(item, dict):
fp = item.get("filepath") or item.get("_filename")
if fp:
yield Path(fp)
for key in ("filepath", "_filename", "filename"):
value = entry.get(key)
if value:
yield Path(value)
if entry.get("filename"):
yield output_dir / entry["filename"]
def _resolve_entry_and_path(info: Dict[str, Any], output_dir: Path) -> tuple[Dict[str, Any], Path]:
for entry in _iter_download_entries(info):
for candidate in _candidate_paths(entry, output_dir):
if candidate.is_file():
return entry, candidate
if not candidate.is_absolute():
maybe = output_dir / candidate
if maybe.is_file():
return entry, maybe
raise FileNotFoundError("yt-dlp did not report a downloaded media file")
def _resolve_entries_and_paths(info: Dict[str, Any], output_dir: Path) -> List[tuple[Dict[str, Any], Path]]:
resolved: List[tuple[Dict[str, Any], Path]] = []
seen: set[str] = set()
for entry in _iter_download_entries(info):
chosen: Optional[Path] = None
for candidate in _candidate_paths(entry, output_dir):
if candidate.is_file():
chosen = candidate
break
if not candidate.is_absolute():
maybe = output_dir / candidate
if maybe.is_file():
chosen = maybe
break
if chosen is None:
continue
key = str(chosen.resolve())
if key in seen:
continue
seen.add(key)
resolved.append((entry, chosen))
return resolved
def _extract_sha256(info: Dict[str, Any]) -> Optional[str]:
for payload in [info] + info.get("entries", []):
if not isinstance(payload, dict):
continue
hashes = payload.get("hashes")
if isinstance(hashes, dict):
for key in ("sha256", "sha-256", "sha_256"):
if key in hashes and isinstance(hashes[key], str) and hashes[key].strip():
return hashes[key].strip()
for key in ("sha256", "sha-256", "sha_256"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return None
def _progress_callback(status: Dict[str, Any]) -> None:
label = _progress_label(status)
event = status.get("status")
downloaded = status.get("downloaded_bytes")
total = status.get("total_bytes") or status.get("total_bytes_estimate")
pipeline = PipelineProgress(pipeline_context)
live_ui, _ = pipeline.ui_and_pipe_index()
use_live = live_ui is not None
def _total_bytes(value: Any) -> Optional[int]:
try:
if isinstance(value, (int, float)) and value > 0:
return int(value)
except Exception:
pass
return None
if event == "downloading":
if use_live:
try:
if not _YTDLP_TRANSFER_STATE.get(label, {}).get("started"):
pipeline.begin_transfer(label=label, total=_total_bytes(total))
_YTDLP_TRANSFER_STATE[label] = {"started": True}
pipeline.update_transfer(
label=label,
completed=int(downloaded) if downloaded is not None else None,
total=_total_bytes(total),
)
except Exception:
pass
else:
_YTDLP_PROGRESS_BAR.update(
downloaded=int(downloaded) if downloaded is not None else None,
total=int(total) if total is not None else None,
label=label,
file=sys.stderr,
)
elif event == "finished":
if use_live:
try:
if _YTDLP_TRANSFER_STATE.get(label, {}).get("started"):
pipeline.finish_transfer(label=label)
except Exception:
pass
_YTDLP_TRANSFER_STATE.pop(label, None)
else:
_YTDLP_PROGRESS_BAR.finish()
elif event in ("postprocessing", "processing"):
return
try:
from SYS.metadata import extract_ytdlp_tags
except ImportError:
extract_ytdlp_tags = None # type: ignore
def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] = None) -> Any:
"""Download streaming media exclusively via yt-dlp."""
try:
netloc = urlparse(opts.url).netloc.lower()
except Exception:
netloc = ""
if "gofile.io" in netloc:
msg = "GoFile links are currently unsupported"
if not opts.quiet:
debug(msg)
if debug_logger is not None:
debug_logger.write_record("gofile-unsupported", {"url": opts.url})
raise DownloadError(msg)
ytdlp_supported = is_url_supported_by_ytdlp(opts.url)
if not ytdlp_supported:
msg = "URL not supported by yt-dlp; try download-file for manual downloads"
if not opts.quiet:
log(msg)
if debug_logger is not None:
debug_logger.write_record("ytdlp-unsupported", {"url": opts.url})
raise DownloadError(msg)
if opts.playlist_items:
debug(
f"Skipping probe for playlist (item selection: {opts.playlist_items}), proceeding with download"
)
probe_result = {"url": opts.url}
else:
probe_cookiefile = None
try:
if opts.cookies_path and opts.cookies_path.is_file():
probe_cookiefile = str(opts.cookies_path)
except Exception:
probe_cookiefile = None
probe_result = probe_url(opts.url, no_playlist=opts.no_playlist, timeout_seconds=15, cookiefile=probe_cookiefile)
if probe_result is None:
msg = "yt-dlp could not detect media for this URL; use download-file for direct downloads"
if not opts.quiet:
log(msg)
if debug_logger is not None:
debug_logger.write_record("ytdlp-skip-no-media", {"url": opts.url})
raise DownloadError(msg)
ensure_yt_dlp_ready()
ytdlp_tool = YtDlpTool()
ytdl_options = ytdlp_tool.build_ytdlp_options(opts)
hooks = ytdl_options.get("progress_hooks")
if not isinstance(hooks, list):
hooks = []
ytdl_options["progress_hooks"] = hooks
if _progress_callback not in hooks:
hooks.append(_progress_callback)
if not opts.quiet:
debug(f"Starting yt-dlp download: {opts.url}")
if debug_logger is not None:
debug_logger.write_record("ytdlp-start", {"url": opts.url})
assert yt_dlp is not None
try:
if not opts.quiet:
if ytdl_options.get("download_sections"):
debug(f"[yt-dlp] download_sections: {ytdl_options['download_sections']}")
debug(f"[yt-dlp] force_keyframes_at_cuts: {ytdl_options.get('force_keyframes_at_cuts', False)}")
session_id = None
first_section_info = {}
if ytdl_options.get("download_sections"):
live_ui, _ = PipelineProgress(pipeline_context).ui_and_pipe_index()
quiet_sections = bool(opts.quiet) or (live_ui is not None)
session_id, first_section_info = _download_with_sections_via_cli(
opts.url,
ytdl_options,
ytdl_options.get("download_sections", []),
quiet=quiet_sections,
)
info = None
else:
with yt_dlp.YoutubeDL(ytdl_options) as ydl: # type: ignore[arg-type]
info = ydl.extract_info(opts.url, download=True)
except Exception as exc:
log(f"yt-dlp failed: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{"phase": "yt-dlp", "error": str(exc), "traceback": traceback.format_exc()},
)
raise DownloadError("yt-dlp download failed") from exc
if info is None:
try:
time.sleep(0.5)
files = sorted(opts.output_dir.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True)
if not files:
raise FileNotFoundError(f"No files found in {opts.output_dir}")
if opts.clip_sections and session_id:
section_pattern = re.compile(rf"^{re.escape(session_id)}_(\d+)")
matching_files = [f for f in files if section_pattern.search(f.name)]
if matching_files:
def extract_section_num(path: Path) -> int:
match = section_pattern.search(path.name)
return int(match.group(1)) if match else 999
matching_files.sort(key=extract_section_num)
debug(f"Found {len(matching_files)} section file(s) matching pattern")
by_index: Dict[int, List[Path]] = {}
for f in matching_files:
m = section_pattern.search(f.name)
if not m:
continue
try:
n = int(m.group(1))
except Exception:
continue
by_index.setdefault(n, []).append(f)
renamed_media_files: List[Path] = []
for sec_num in sorted(by_index.keys()):
group = by_index.get(sec_num) or []
if not group:
continue
def _is_subtitle(p: Path) -> bool:
try:
return p.suffix.lower() in _SUBTITLE_EXTS
except Exception:
return False
media_candidates = [p for p in group if not _is_subtitle(p)]
subtitle_candidates = [p for p in group if _is_subtitle(p)]
media_file: Optional[Path] = None
for cand in media_candidates:
try:
if cand.suffix.lower() in {".json", ".info.json"}:
continue
except Exception:
pass
media_file = cand
break
if media_file is None and media_candidates:
media_file = media_candidates[0]
if media_file is None:
continue
try:
media_hash = sha256_file(media_file)
except Exception as exc:
debug(f"Failed to hash section media file {media_file.name}: {exc}")
renamed_media_files.append(media_file)
continue
prefix = f"{session_id}_{sec_num}"
def _tail(name: str) -> str:
try:
if name.startswith(prefix):
return name[len(prefix):]
except Exception:
pass
try:
return Path(name).suffix
except Exception:
return ""
try:
new_media_name = f"{media_hash}{_tail(media_file.name)}"
new_media_path = opts.output_dir / new_media_name
if new_media_path.exists() and new_media_path != media_file:
debug(f"File with hash {media_hash} already exists, using existing file.")
try:
media_file.unlink()
except OSError:
pass
else:
media_file.rename(new_media_path)
debug(f"Renamed section file: {media_file.name} -> {new_media_name}")
renamed_media_files.append(new_media_path)
except Exception as exc:
debug(f"Failed to rename section media file {media_file.name}: {exc}")
renamed_media_files.append(media_file)
new_media_path = media_file
for sub_file in subtitle_candidates:
try:
new_sub_name = f"{media_hash}{_tail(sub_file.name)}"
new_sub_path = opts.output_dir / new_sub_name
if new_sub_path.exists() and new_sub_path != sub_file:
try:
sub_file.unlink()
except OSError:
pass
else:
sub_file.rename(new_sub_path)
debug(f"Renamed section file: {sub_file.name} -> {new_sub_name}")
except Exception as exc:
debug(f"Failed to rename section subtitle file {sub_file.name}: {exc}")
media_path = renamed_media_files[0] if renamed_media_files else matching_files[0]
media_paths = renamed_media_files if renamed_media_files else None
if not opts.quiet:
count = len(media_paths) if isinstance(media_paths, list) else 1
debug(f"✓ Downloaded {count} section media file(s) (session: {session_id})")
else:
media_path = files[0]
media_paths = None
if not opts.quiet:
debug(f"✓ Downloaded section file (pattern not found): {media_path.name}")
else:
media_path = files[0]
media_paths = None
if not opts.quiet:
debug(f"✓ Downloaded: {media_path.name}")
if debug_logger is not None:
debug_logger.write_record("ytdlp-file-found", {"path": str(media_path)})
except Exception as exc:
log(f"Error finding downloaded file: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record("exception", {"phase": "find-file", "error": str(exc)})
raise DownloadError(str(exc)) from exc
file_hash = sha256_file(media_path)
tags = []
title = ""
if first_section_info:
title = first_section_info.get("title", "")
if title:
tags.append(f"title:{title}")
debug(f"Added title tag for section download: {title}")
if first_section_info:
info_dict = first_section_info
else:
info_dict = {"id": media_path.stem, "title": title or media_path.stem, "ext": media_path.suffix.lstrip(".")}
return DownloadMediaResult(path=media_path, info=info_dict, tag=tags, source_url=opts.url, hash_value=file_hash, paths=media_paths)
if not isinstance(info, dict):
log(f"Unexpected yt-dlp response: {type(info)}", file=sys.stderr)
raise DownloadError("Unexpected yt-dlp response type")
info_dict: Dict[str, Any] = cast(Dict[str, Any], info)
if debug_logger is not None:
debug_logger.write_record("ytdlp-info", {"keys": sorted(info_dict.keys()), "is_playlist": bool(info_dict.get("entries"))})
if info_dict.get("entries") and not opts.no_playlist:
resolved = _resolve_entries_and_paths(info_dict, opts.output_dir)
if resolved:
results: List[DownloadMediaResult] = []
for entry, media_path in resolved:
hash_value = _extract_sha256(entry) or _extract_sha256(info_dict)
if not hash_value:
try:
hash_value = sha256_file(media_path)
except OSError:
hash_value = None
tags: List[str] = []
if extract_ytdlp_tags:
try:
tags = extract_ytdlp_tags(entry)
except Exception as exc:
log(f"Error extracting tags: {exc}", file=sys.stderr)
source_url = entry.get("webpage_url") or entry.get("original_url") or entry.get("url") or opts.url
results.append(
DownloadMediaResult(
path=media_path,
info=entry,
tag=tags,
source_url=source_url,
hash_value=hash_value,
)
)
if not opts.quiet:
debug(f"✓ Downloaded playlist items: {len(results)}")
return results
try:
entry, media_path = _resolve_entry_and_path(info_dict, opts.output_dir)
except FileNotFoundError as exc:
log(f"Error: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record("exception", {"phase": "resolve-path", "error": str(exc)})
raise DownloadError(str(exc)) from exc
if debug_logger is not None:
debug_logger.write_record("resolved-media", {"path": str(media_path), "entry_keys": sorted(entry.keys())})
hash_value = _extract_sha256(entry) or _extract_sha256(info_dict)
if not hash_value:
try:
hash_value = sha256_file(media_path)
except OSError as exc:
if debug_logger is not None:
debug_logger.write_record("hash-error", {"path": str(media_path), "error": str(exc)})
tags = []
if extract_ytdlp_tags:
try:
tags = extract_ytdlp_tags(entry)
except Exception as exc:
log(f"Error extracting tags: {exc}", file=sys.stderr)
source_url = entry.get("webpage_url") or entry.get("original_url") or entry.get("url")
if not opts.quiet:
debug(f"✓ Downloaded: {media_path.name} ({len(tags)} tags)")
if debug_logger is not None:
debug_logger.write_record(
"downloaded",
{
"path": str(media_path),
"tag_count": len(tags),
"source_url": source_url,
"sha256": hash_value,
},
)
return DownloadMediaResult(path=media_path, info=entry, tag=tags, source_url=source_url, hash_value=hash_value)
def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300) -> Any:
import threading
from typing import cast
result_container: List[Optional[Any]] = [None, None]
def _do_download() -> None:
try:
result_container[0] = download_media(opts)
except Exception as exc:
result_container[1] = exc
thread = threading.Thread(target=_do_download, daemon=False)
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
raise DownloadError(f"Download timeout after {timeout_seconds} seconds for {opts.url}")
if result_container[1] is not None:
raise cast(Exception, result_container[1])
if result_container[0] is None:
raise DownloadError(f"Download failed for {opts.url}")
return cast(Any, result_container[0])