This commit is contained in:
nose
2025-12-03 15:18:57 -08:00
parent 89aa24961b
commit 5e4df11dbf
12 changed files with 1953 additions and 346 deletions

View File

@@ -8,7 +8,12 @@ Lean, focused downloader without event infrastructure overhead.
"""
from __future__ import annotations
import re # noqa: F401
import glob # noqa: F401
import hashlib
import json # noqa: F401
import random
import re
import string
import subprocess
import sys
import time
@@ -157,31 +162,72 @@ def list_formats(url: str, no_playlist: bool = False, playlist_items: Optional[s
return None
def _download_with_sections_via_cli(url: str, ytdl_options: Dict[str, Any], sections: List[str]) -> None:
def _download_with_sections_via_cli(url: str, ytdl_options: Dict[str, Any], sections: List[str]) -> tuple[Optional[str], Dict[str, Any]]:
"""Download each section separately so merge-file can combine them.
yt-dlp with multiple --download-sections args merges them into one file.
We need separate files for merge-file, so download each section individually.
Uses hash-based filenames for sections (not title-based) to prevent yt-dlp from
thinking sections are already downloaded. The title is extracted and stored in tags.
Returns:
(session_id, first_section_info_dict) - session_id for finding files, info dict for metadata extraction
"""
sections_list = ytdl_options.get("download_sections", [])
if not sections_list:
return
return "", {}
# Download each section separately with unique output template
# Generate a unique hash-based ID for this download session
# This ensures different videos/downloads don't have filename collisions
session_id = hashlib.md5(
(url + str(time.time()) + ''.join(random.choices(string.ascii_letters, k=10))).encode()
).hexdigest()[:12]
first_section_info = None
title_from_first = None
# Download each section separately with unique output template using session ID
for section_idx, section in enumerate(sections_list, 1):
# Build unique output template for this section
# e.g., "title.section_1_of_3.ext" for the first section
# Build unique output template for this section using session-based filename
# e.g., "{session_id}_{section_idx}.ext" - simple and unique per section
base_outtmpl = ytdl_options.get("outtmpl", "%(title)s.%(ext)s")
output_dir_path = Path(base_outtmpl).parent
# Insert section number before extension
# e.g., "/path/title.hash.webm" → "/path/title.hash.section_1_of_3.webm"
# Use session_id + section index for temp filename
# e.g., "/path/{session_id}_1.%(ext)s"
filename_tmpl = f"{session_id}_{section_idx}"
if base_outtmpl.endswith(".%(ext)s"):
section_outtmpl = base_outtmpl.replace(".%(ext)s", f".section_{section_idx}_of_{len(sections_list)}.%(ext)s")
else:
section_outtmpl = base_outtmpl + f".section_{section_idx}_of_{len(sections_list)}"
filename_tmpl += ".%(ext)s"
# Use Path to handle separators correctly for the OS
section_outtmpl = str(output_dir_path / filename_tmpl)
# Build yt-dlp command for this section
# For the first section, extract metadata first (separate call)
if section_idx == 1:
metadata_cmd = ["yt-dlp", "--dump-json", "--skip-download"]
if ytdl_options.get("cookiefile"):
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
metadata_cmd.extend(["--cookies", cookies_path])
if ytdl_options.get("noplaylist"):
metadata_cmd.append("--no-playlist")
metadata_cmd.append(url)
try:
meta_result = subprocess.run(metadata_cmd, capture_output=True, text=True)
if meta_result.returncode == 0 and meta_result.stdout:
try:
info_dict = json.loads(meta_result.stdout.strip())
first_section_info = info_dict
title_from_first = info_dict.get('title')
debug(f"Extracted title from metadata: {title_from_first}")
except json.JSONDecodeError:
debug("Could not parse JSON metadata")
except Exception as e:
debug(f"Error extracting metadata: {e}")
# Build yt-dlp command for downloading this section
cmd = ["yt-dlp"]
# Add format
@@ -212,14 +258,18 @@ def _download_with_sections_via_cli(url: str, ytdl_options: Dict[str, Any], sect
cmd.append(url)
debug(f"Running yt-dlp for section {section_idx}/{len(sections_list)}: {section}")
debug(f"Command: {' '.join(cmd)}")
# Run the subprocess
# Run the subprocess - don't capture output so progress is shown
try:
result = subprocess.run(cmd, capture_output=False, text=True)
result = subprocess.run(cmd)
if result.returncode != 0:
raise DownloadError(f"yt-dlp subprocess failed for section {section_idx} with code {result.returncode}")
except Exception as exc:
raise DownloadError(f"yt-dlp subprocess error for section {section_idx}: {exc}") from exc
return session_id, first_section_info or {}
def _build_ytdlp_options(opts: DownloadOptions) -> Dict[str, Any]:
@@ -296,8 +346,8 @@ def _build_ytdlp_options(opts: DownloadOptions) -> Dict[str, Any]:
# Pass each section as a separate element in the list (yt-dlp expects multiple --download-sections args)
base_options["download_sections"] = sections
debug(f"Download sections configured: {', '.join(sections)}")
# Force keyframes at cuts for accurate section boundaries
base_options["force_keyframes_at_cuts"] = True
# Note: Not using --force-keyframes-at-cuts to avoid re-encoding
# This may result in less precise cuts but faster downloads
# Add playlist items selection if provided
if opts.playlist_items:
@@ -751,8 +801,10 @@ def download_media(
debug(f"[yt-dlp] force_keyframes_at_cuts: {ytdl_options.get('force_keyframes_at_cuts', False)}")
# Use subprocess when download_sections are present (Python API doesn't support them properly)
session_id = None
first_section_info = {}
if ytdl_options.get("download_sections"):
_download_with_sections_via_cli(opts.url, ytdl_options, ytdl_options.get("download_sections", []))
session_id, first_section_info = _download_with_sections_via_cli(opts.url, ytdl_options, ytdl_options.get("download_sections", []))
info = None
else:
with yt_dlp.YoutubeDL(ytdl_options) as ydl: # type: ignore[arg-type]
@@ -780,7 +832,7 @@ def download_media(
import re
# Get the expected filename pattern from outtmpl
# For sections: "C:\path\title.section_1_of_3.ext", "C:\path\title.section_2_of_3.ext", etc.
# For sections: "C:\path\{session_id}.section_1_of_3.ext", etc.
# For non-sections: "C:\path\title.ext"
# Wait a moment to ensure files are fully written
@@ -791,10 +843,10 @@ def download_media(
if not files:
raise FileNotFoundError(f"No files found in {opts.output_dir}")
# If we downloaded sections, look for files with .section_N_of_M pattern
if opts.clip_sections:
# Pattern: "title.section_1_of_3.ext", "title.section_2_of_3.ext", etc.
section_pattern = re.compile(r'\.section_(\d+)_of_(\d+)\.')
# If we downloaded sections, look for files with the session_id pattern
if opts.clip_sections and session_id:
# Pattern: "{session_id}_1.ext", "{session_id}_2.ext", etc.
section_pattern = re.compile(rf'^{re.escape(session_id)}_(\d+)\.')
matching_files = [f for f in files if section_pattern.search(f.name)]
if matching_files:
@@ -804,13 +856,44 @@ def download_media(
return int(match.group(1)) if match else 999
matching_files.sort(key=extract_section_num)
media_path = matching_files[0] # First section
media_paths = matching_files # All sections
debug(f"✓ Downloaded {len(media_paths)} section file(s)")
debug(f"Found {len(matching_files)} section file(s) matching pattern")
# Now rename section files to use hash-based names
# This ensures unique filenames for each section content
renamed_files = []
for idx, section_file in enumerate(matching_files, 1):
try:
# Calculate hash for the file
file_hash = sha256_file(section_file)
ext = section_file.suffix
new_name = f"{file_hash}{ext}"
new_path = opts.output_dir / new_name
if new_path.exists() and new_path != section_file:
# If file with same hash exists, use it and delete the temp one
debug(f"File with hash {file_hash} already exists, using existing file.")
try:
section_file.unlink()
except OSError:
pass
renamed_files.append(new_path)
else:
section_file.rename(new_path)
debug(f"Renamed section file: {section_file.name}{new_name}")
renamed_files.append(new_path)
except Exception as e:
debug(f"Failed to process section file {section_file.name}: {e}")
renamed_files.append(section_file)
media_path = renamed_files[0]
media_paths = renamed_files
debug(f"✓ Downloaded {len(media_paths)} section file(s) (session: {session_id})")
else:
# Fallback to most recent file if pattern not found
media_path = files[0]
media_paths = None
debug(f"✓ Downloaded section file (pattern not found): {media_path.name}")
else:
# No sections, just take the most recent file
media_path = files[0]
@@ -830,10 +913,30 @@ def download_media(
# Create result with minimal data extracted from filename
file_hash = sha256_file(media_path)
# For section downloads, create tags with the title and build proper info dict
tags = []
title = ''
if first_section_info:
title = first_section_info.get('title', '')
if title:
tags.append(f'title:{title}')
debug(f"Added title tag for section download: {title}")
# Build info dict - always use extracted title if available, not hash
if first_section_info:
info_dict = first_section_info
else:
info_dict = {
"id": media_path.stem,
"title": title or media_path.stem,
"ext": media_path.suffix.lstrip(".")
}
return DownloadMediaResult(
path=media_path,
info={"id": media_path.stem, "title": media_path.stem, "ext": media_path.suffix.lstrip(".")},
tags=[],
info=info_dict,
tags=tags,
source_url=opts.url,
hash_value=file_hash,
paths=media_paths, # Include all section files if present