327 lines
11 KiB
Python
327 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Sequence
|
|
|
|
from SYS.logger import debug
|
|
from SYS.utils import ensure_directory
|
|
from models import DownloadOptions
|
|
|
|
|
|
def _get_nested(config: Dict[str, Any], *path: str) -> Any:
|
|
cur: Any = config
|
|
for key in path:
|
|
if not isinstance(cur, dict):
|
|
return None
|
|
cur = cur.get(key)
|
|
return cur
|
|
|
|
|
|
def _parse_csv_list(value: Any) -> Optional[List[str]]:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, list):
|
|
out: List[str] = []
|
|
for item in value:
|
|
s = str(item).strip()
|
|
if s:
|
|
out.append(s)
|
|
return out or None
|
|
s = str(value).strip()
|
|
if not s:
|
|
return None
|
|
# allow either JSON-ish list strings or simple comma-separated values
|
|
if s.startswith("[") and s.endswith("]"):
|
|
s = s[1:-1]
|
|
parts = [p.strip() for p in s.split(",")]
|
|
parts = [p for p in parts if p]
|
|
return parts or None
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class YtDlpDefaults:
|
|
"""User-tunable defaults for yt-dlp behavior.
|
|
|
|
Recommended config.conf keys (top-level dotted keys):
|
|
- ytdlp.video_format="bestvideo+bestaudio/best"
|
|
- ytdlp.audio_format="251/140/bestaudio"
|
|
- ytdlp.format_sort="res:2160,res:1440,res:1080,res:720,res"
|
|
|
|
Cookies:
|
|
- cookies="C:\\path\\cookies.txt" (already supported by config.resolve_cookies_path)
|
|
"""
|
|
|
|
video_format: str = "bestvideo+bestaudio/best"
|
|
audio_format: str = "251/140/bestaudio"
|
|
format_sort: Optional[List[str]] = None
|
|
|
|
|
|
class YtDlpTool:
|
|
"""Centralizes yt-dlp defaults and translation helpers.
|
|
|
|
This is intentionally small and dependency-light so cmdlets can use it without
|
|
forcing a full refactor.
|
|
"""
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None, *, script_dir: Optional[Path] = None) -> None:
|
|
self._config: Dict[str, Any] = dict(config or {})
|
|
# `resolve_cookies_path` expects the app root so it can fall back to ./cookies.txt.
|
|
# This file lives under ./tool/, so default to the parent directory.
|
|
self._script_dir = script_dir or Path(__file__).resolve().parent.parent
|
|
self.defaults = self._load_defaults()
|
|
self._cookiefile: Optional[Path] = self._init_cookiefile()
|
|
|
|
def _init_cookiefile(self) -> Optional[Path]:
|
|
"""Resolve cookies once at tool init (yt-dlp is the primary consumer)."""
|
|
try:
|
|
from config import resolve_cookies_path
|
|
|
|
resolved = resolve_cookies_path(self._config, script_dir=self._script_dir)
|
|
if resolved is not None and resolved.is_file():
|
|
return resolved
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def _load_defaults(self) -> YtDlpDefaults:
|
|
cfg = self._config
|
|
|
|
# NOTE: `YtDlpDefaults` is a slots dataclass. Referencing defaults via
|
|
# `YtDlpDefaults.video_format` yields a `member_descriptor`, not the
|
|
# default string value. Use an instance for fallback defaults.
|
|
_fallback_defaults = YtDlpDefaults()
|
|
|
|
tool_block = _get_nested(cfg, "tool", "ytdlp")
|
|
if not isinstance(tool_block, dict):
|
|
tool_block = {}
|
|
|
|
ytdlp_block = cfg.get("ytdlp") if isinstance(cfg.get("ytdlp"), dict) else {}
|
|
if not isinstance(ytdlp_block, dict):
|
|
ytdlp_block = {}
|
|
|
|
# Accept both nested and flat styles.
|
|
video_format = (
|
|
tool_block.get("video_format")
|
|
or tool_block.get("format")
|
|
or ytdlp_block.get("video_format")
|
|
or ytdlp_block.get("video")
|
|
or ytdlp_block.get("format_video")
|
|
or cfg.get("ytdlp_video_format")
|
|
)
|
|
audio_format = (
|
|
tool_block.get("audio_format")
|
|
or ytdlp_block.get("audio_format")
|
|
or ytdlp_block.get("audio")
|
|
or ytdlp_block.get("format_audio")
|
|
or cfg.get("ytdlp_audio_format")
|
|
)
|
|
|
|
# Also accept dotted keys written as nested dicts: ytdlp.format.video, ytdlp.format.audio
|
|
nested_video = _get_nested(cfg, "ytdlp", "format", "video")
|
|
nested_audio = _get_nested(cfg, "ytdlp", "format", "audio")
|
|
|
|
fmt_sort_val = (
|
|
tool_block.get("format_sort")
|
|
or ytdlp_block.get("format_sort")
|
|
or ytdlp_block.get("formatSort")
|
|
or cfg.get("ytdlp_format_sort")
|
|
or _get_nested(cfg, "ytdlp", "format", "sort")
|
|
)
|
|
fmt_sort = _parse_csv_list(fmt_sort_val)
|
|
|
|
defaults = YtDlpDefaults(
|
|
video_format=str(nested_video or video_format or _fallback_defaults.video_format),
|
|
audio_format=str(nested_audio or audio_format or _fallback_defaults.audio_format),
|
|
format_sort=fmt_sort,
|
|
)
|
|
|
|
return defaults
|
|
|
|
def resolve_cookiefile(self) -> Optional[Path]:
|
|
return self._cookiefile
|
|
|
|
def default_format(self, mode: str) -> str:
|
|
m = str(mode or "").lower().strip()
|
|
if m == "audio":
|
|
return self.defaults.audio_format
|
|
return self.defaults.video_format
|
|
|
|
def build_ytdlp_options(self, opts: DownloadOptions) -> Dict[str, Any]:
|
|
"""Translate DownloadOptions into yt-dlp API options."""
|
|
ensure_directory(opts.output_dir)
|
|
outtmpl = str((opts.output_dir / "%(title)s.%(ext)s").resolve())
|
|
base_options: Dict[str, Any] = {
|
|
"outtmpl": outtmpl,
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"noprogress": True,
|
|
"socket_timeout": 30,
|
|
"retries": 10,
|
|
"fragment_retries": 10,
|
|
"http_chunk_size": 10_485_760,
|
|
"restrictfilenames": True,
|
|
}
|
|
|
|
try:
|
|
repo_root = Path(__file__).resolve().parents[1]
|
|
bundled_ffmpeg_dir = repo_root / "MPV" / "ffmpeg" / "bin"
|
|
if bundled_ffmpeg_dir.exists():
|
|
base_options.setdefault("ffmpeg_location", str(bundled_ffmpeg_dir))
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
if os.name == "nt":
|
|
base_options.setdefault("file_access_retries", 40)
|
|
except Exception:
|
|
pass
|
|
|
|
if opts.cookies_path and opts.cookies_path.is_file():
|
|
base_options["cookiefile"] = str(opts.cookies_path)
|
|
else:
|
|
cookiefile = self.resolve_cookiefile()
|
|
if cookiefile is not None and cookiefile.is_file():
|
|
base_options["cookiefile"] = str(cookiefile)
|
|
|
|
if opts.no_playlist:
|
|
base_options["noplaylist"] = True
|
|
|
|
fmt = opts.ytdl_format or self.default_format(opts.mode)
|
|
base_options["format"] = fmt
|
|
|
|
if opts.mode == "audio":
|
|
base_options["postprocessors"] = [{"key": "FFmpegExtractAudio"}]
|
|
else:
|
|
format_sort = self.defaults.format_sort or [
|
|
"res:4320",
|
|
"res:2880",
|
|
"res:2160",
|
|
"res:1440",
|
|
"res:1080",
|
|
"res:720",
|
|
"res",
|
|
]
|
|
base_options["format_sort"] = format_sort
|
|
|
|
if getattr(opts, "embed_chapters", False):
|
|
pps = base_options.get("postprocessors")
|
|
if not isinstance(pps, list):
|
|
pps = []
|
|
already_has_metadata = any(
|
|
isinstance(pp, dict) and str(pp.get("key") or "") == "FFmpegMetadata" for pp in pps
|
|
)
|
|
if not already_has_metadata:
|
|
pps.append(
|
|
{
|
|
"key": "FFmpegMetadata",
|
|
"add_metadata": True,
|
|
"add_chapters": True,
|
|
"add_infojson": "if_exists",
|
|
}
|
|
)
|
|
base_options["postprocessors"] = pps
|
|
|
|
if opts.mode != "audio":
|
|
base_options.setdefault("merge_output_format", "mkv")
|
|
|
|
if getattr(opts, "write_sub", False):
|
|
base_options["writesubtitles"] = True
|
|
base_options["writeautomaticsub"] = True
|
|
base_options["subtitlesformat"] = "vtt"
|
|
|
|
if opts.clip_sections:
|
|
sections: List[str] = []
|
|
|
|
def _secs_to_hms(seconds: float) -> str:
|
|
total = max(0, int(seconds))
|
|
minutes, secs = divmod(total, 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
|
|
|
for section_range in str(opts.clip_sections).split(","):
|
|
section_range = section_range.strip()
|
|
if not section_range:
|
|
continue
|
|
try:
|
|
start_s_raw, end_s_raw = section_range.split("-", 1)
|
|
start_s = float(start_s_raw.strip())
|
|
end_s = float(end_s_raw.strip())
|
|
if start_s >= end_s:
|
|
continue
|
|
sections.append(f"*{_secs_to_hms(start_s)}-{_secs_to_hms(end_s)}")
|
|
except (ValueError, AttributeError):
|
|
continue
|
|
|
|
if sections:
|
|
base_options["download_sections"] = sections
|
|
# Clipped outputs should begin with a keyframe; otherwise players (notably mpv)
|
|
# can show audio before video or a black screen until the next keyframe.
|
|
# yt-dlp implements this by forcing keyframes at cut points.
|
|
base_options["force_keyframes_at_cuts"] = True
|
|
debug(f"Download sections configured: {', '.join(sections)}")
|
|
|
|
if opts.playlist_items:
|
|
base_options["playlist_items"] = opts.playlist_items
|
|
|
|
if not opts.quiet:
|
|
debug(f"yt-dlp: mode={opts.mode}, format={base_options.get('format')}")
|
|
|
|
return base_options
|
|
|
|
def build_yt_dlp_cli_args(
|
|
self,
|
|
*,
|
|
url: str,
|
|
output_dir: Optional[Path] = None,
|
|
ytdl_format: Optional[str] = None,
|
|
playlist_items: Optional[str] = None,
|
|
no_playlist: bool = False,
|
|
quiet: bool = True,
|
|
extra_args: Optional[Sequence[str]] = None,
|
|
) -> List[str]:
|
|
"""Build a yt-dlp command line (argv list).
|
|
|
|
This is primarily for debug output or subprocess execution.
|
|
"""
|
|
argv: List[str] = ["yt-dlp"]
|
|
if quiet:
|
|
argv.extend(["--quiet", "--no-warnings"])
|
|
argv.append("--no-progress")
|
|
|
|
cookiefile = self.resolve_cookiefile()
|
|
if cookiefile is not None:
|
|
argv.extend(["--cookies", str(cookiefile)])
|
|
|
|
if no_playlist:
|
|
argv.append("--no-playlist")
|
|
if playlist_items:
|
|
argv.extend(["--playlist-items", str(playlist_items)])
|
|
|
|
fmt = (ytdl_format or "").strip()
|
|
if fmt:
|
|
# Use long form to avoid confusion with app-level flags.
|
|
argv.extend(["--format", fmt])
|
|
|
|
if self.defaults.format_sort:
|
|
for sort_key in self.defaults.format_sort:
|
|
argv.extend(["-S", sort_key])
|
|
|
|
if output_dir is not None:
|
|
outtmpl = str((output_dir / "%(title)s.%(ext)s").resolve())
|
|
argv.extend(["-o", outtmpl])
|
|
|
|
if extra_args:
|
|
argv.extend([str(a) for a in extra_args if str(a).strip()])
|
|
|
|
argv.append(str(url))
|
|
return argv
|
|
|
|
def debug_print_cli(self, argv: Sequence[str]) -> None:
|
|
try:
|
|
debug("yt-dlp argv: " + " ".join(str(a) for a in argv))
|
|
except Exception:
|
|
pass
|