Files
Medios-Macina/helper/download.py
2025-12-03 15:18:57 -08:00

1030 lines
40 KiB
Python

"""Download media files using yt-dlp with support for direct file downloads.
Lean, focused downloader without event infrastructure overhead.
- yt-dlp integration for streaming sites
- Direct file download fallback for PDFs, images, documents
- Tag extraction via metadata.extract_ytdlp_tags()
- Logging via helper.logger.log()
"""
from __future__ import annotations
import glob # noqa: F401
import hashlib
import json # noqa: F401
import random
import re
import string
import subprocess
import sys
import time
import traceback
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin
import httpx
from helper.logger import log, debug
from .utils import ensure_directory, sha256_file
from .http_client import HTTPClient
from models import DownloadError, DownloadOptions, DownloadMediaResult, DebugLogger, ProgressBar
from hydrus_health_check import get_cookies_file_path
try:
import yt_dlp # type: ignore
from yt_dlp.extractor import gen_extractors # type: ignore
except Exception as exc:
yt_dlp = None # type: ignore
YTDLP_IMPORT_ERROR = exc
else:
YTDLP_IMPORT_ERROR = None
try:
from metadata import extract_ytdlp_tags
except ImportError:
extract_ytdlp_tags = None
_EXTRACTOR_CACHE: List[Any] | None = None
def _ensure_yt_dlp_ready() -> None:
"""Verify yt-dlp is available, raise if not."""
if yt_dlp is not None:
return
detail = str(YTDLP_IMPORT_ERROR or "yt-dlp is not installed")
raise DownloadError(f"yt-dlp module not available: {detail}")
def _progress_callback(status: Dict[str, Any]) -> None:
"""Simple progress callback using logger."""
event = status.get("status")
if event == "downloading":
percent = status.get("_percent_str", "?")
speed = status.get("_speed_str", "?")
eta = status.get("_eta_str", "?")
# Print progress to stdout with carriage return to update in place
sys.stdout.write(f"\r[download] {percent} at {speed} ETA {eta} ")
sys.stdout.flush()
elif event == "finished":
# Clear the progress line
sys.stdout.write("\r" + " " * 70 + "\r")
sys.stdout.flush()
# Log finished message (visible)
debug(f"✓ Download finished: {status.get('filename')}")
elif event in ("postprocessing", "processing"):
debug(f"Post-processing: {status.get('postprocessor')}")
def is_url_supported_by_ytdlp(url: str) -> bool:
"""Check if URL is supported by yt-dlp."""
if yt_dlp is None:
return False
global _EXTRACTOR_CACHE
if _EXTRACTOR_CACHE is None:
try:
_EXTRACTOR_CACHE = [ie for ie in gen_extractors()] # type: ignore[arg-type]
except Exception:
_EXTRACTOR_CACHE = []
for extractor in _EXTRACTOR_CACHE:
try:
if not extractor.suitable(url):
continue
except Exception:
continue
name = getattr(extractor, "IE_NAME", "")
if name.lower() == "generic":
continue
return True
return False
def list_formats(url: str, no_playlist: bool = False, playlist_items: Optional[str] = None) -> Optional[List[Dict[str, Any]]]:
"""Get list of available formats for a URL using yt-dlp.
Args:
url: URL to get formats for
no_playlist: If True, ignore playlists and list formats for single video
playlist_items: If specified, only list formats for these playlist items (e.g., "1,3,5-8")
Returns:
List of format dictionaries with keys: format_id, format, resolution, fps, vcodec, acodec, filesize, etc.
Returns None if yt-dlp is not available or format listing fails.
"""
_ensure_yt_dlp_ready()
try:
ydl_opts = {
"quiet": True,
"no_warnings": True,
"socket_timeout": 30,
}
# Add no_playlist option if specified
if no_playlist:
ydl_opts["noplaylist"] = True
# Add playlist_items filter if specified
if playlist_items:
ydl_opts["playlist_items"] = playlist_items
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
debug(f"Fetching format list for: {url}")
info = ydl.extract_info(url, download=False)
formats = info.get("formats", [])
if not formats:
log("No formats available", file=sys.stderr)
return None
# Parse and extract relevant format info
result_formats = []
for fmt in formats:
format_info = {
"format_id": fmt.get("format_id", ""),
"format": fmt.get("format", ""),
"ext": fmt.get("ext", ""),
"resolution": fmt.get("resolution", ""),
"width": fmt.get("width"),
"height": fmt.get("height"),
"fps": fmt.get("fps"),
"vcodec": fmt.get("vcodec", "none"),
"acodec": fmt.get("acodec", "none"),
"filesize": fmt.get("filesize"),
"tbr": fmt.get("tbr"), # Total bitrate
}
result_formats.append(format_info)
debug(f"Found {len(result_formats)} available formats")
return result_formats
except Exception as e:
log(f"✗ Error fetching formats: {e}", file=sys.stderr)
return None
def _download_with_sections_via_cli(url: str, ytdl_options: Dict[str, Any], sections: List[str]) -> tuple[Optional[str], Dict[str, Any]]:
"""Download each section separately so merge-file can combine them.
yt-dlp with multiple --download-sections args merges them into one file.
We need separate files for merge-file, so download each section individually.
Uses hash-based filenames for sections (not title-based) to prevent yt-dlp from
thinking sections are already downloaded. The title is extracted and stored in tags.
Returns:
(session_id, first_section_info_dict) - session_id for finding files, info dict for metadata extraction
"""
sections_list = ytdl_options.get("download_sections", [])
if not sections_list:
return "", {}
# Generate a unique hash-based ID for this download session
# This ensures different videos/downloads don't have filename collisions
session_id = hashlib.md5(
(url + str(time.time()) + ''.join(random.choices(string.ascii_letters, k=10))).encode()
).hexdigest()[:12]
first_section_info = None
title_from_first = None
# Download each section separately with unique output template using session ID
for section_idx, section in enumerate(sections_list, 1):
# Build unique output template for this section using session-based filename
# e.g., "{session_id}_{section_idx}.ext" - simple and unique per section
base_outtmpl = ytdl_options.get("outtmpl", "%(title)s.%(ext)s")
output_dir_path = Path(base_outtmpl).parent
# Use session_id + section index for temp filename
# e.g., "/path/{session_id}_1.%(ext)s"
filename_tmpl = f"{session_id}_{section_idx}"
if base_outtmpl.endswith(".%(ext)s"):
filename_tmpl += ".%(ext)s"
# Use Path to handle separators correctly for the OS
section_outtmpl = str(output_dir_path / filename_tmpl)
# For the first section, extract metadata first (separate call)
if section_idx == 1:
metadata_cmd = ["yt-dlp", "--dump-json", "--skip-download"]
if ytdl_options.get("cookiefile"):
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
metadata_cmd.extend(["--cookies", cookies_path])
if ytdl_options.get("noplaylist"):
metadata_cmd.append("--no-playlist")
metadata_cmd.append(url)
try:
meta_result = subprocess.run(metadata_cmd, capture_output=True, text=True)
if meta_result.returncode == 0 and meta_result.stdout:
try:
info_dict = json.loads(meta_result.stdout.strip())
first_section_info = info_dict
title_from_first = info_dict.get('title')
debug(f"Extracted title from metadata: {title_from_first}")
except json.JSONDecodeError:
debug("Could not parse JSON metadata")
except Exception as e:
debug(f"Error extracting metadata: {e}")
# Build yt-dlp command for downloading this section
cmd = ["yt-dlp"]
# Add format
if ytdl_options.get("format"):
cmd.extend(["-f", ytdl_options["format"]])
# Add ONLY this section (not all sections)
cmd.extend(["--download-sections", section])
# Add force-keyframes-at-cuts if specified
if ytdl_options.get("force_keyframes_at_cuts"):
cmd.append("--force-keyframes-at-cuts")
# Add output template for this section
cmd.extend(["-o", section_outtmpl])
# Add cookies file if present
if ytdl_options.get("cookiefile"):
# Convert backslashes to forward slashes for better compatibility
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
cmd.extend(["--cookies", cookies_path])
# Add no-playlist if specified
if ytdl_options.get("noplaylist"):
cmd.append("--no-playlist")
# Add the URL
cmd.append(url)
debug(f"Running yt-dlp for section {section_idx}/{len(sections_list)}: {section}")
debug(f"Command: {' '.join(cmd)}")
# Run the subprocess - don't capture output so progress is shown
try:
result = subprocess.run(cmd)
if result.returncode != 0:
raise DownloadError(f"yt-dlp subprocess failed for section {section_idx} with code {result.returncode}")
except Exception as exc:
raise DownloadError(f"yt-dlp subprocess error for section {section_idx}: {exc}") from exc
return session_id, first_section_info or {}
def _build_ytdlp_options(opts: DownloadOptions) -> Dict[str, Any]:
"""Build yt-dlp download options."""
ensure_directory(opts.output_dir)
# Build output template
# When downloading sections, each section will have .section_N_of_M added by _download_with_sections_via_cli
outtmpl = str((opts.output_dir / "%(title)s.%(ext)s").resolve())
base_options: Dict[str, Any] = {
"outtmpl": outtmpl,
"quiet": True,
"no_warnings": True,
"noprogress": True,
"socket_timeout": 30,
"retries": 10,
"fragment_retries": 10,
"http_chunk_size": 10_485_760,
"restrictfilenames": True,
"progress_hooks": [_progress_callback],
}
if opts.cookies_path and opts.cookies_path.is_file():
base_options["cookiefile"] = str(opts.cookies_path)
else:
# Check global cookies file
global_cookies = get_cookies_file_path()
if global_cookies:
base_options["cookiefile"] = global_cookies
else:
# Fallback to browser cookies
base_options["cookiesfrombrowser"] = ("chrome",)
# Add no-playlist option if specified (for single video from playlist URLs)
if opts.no_playlist:
base_options["noplaylist"] = True
# Configure based on mode
if opts.mode == "audio":
base_options["format"] = opts.ytdl_format or "251/140/bestaudio"
base_options["postprocessors"] = [{"key": "FFmpegExtractAudio"}]
else: # video
base_options["format"] = opts.ytdl_format or "bestvideo+bestaudio/best"
base_options["format_sort"] = [
"res:4320", "res:2880", "res:2160", "res:1440", "res:1080", "res:720", "res"
]
# Add clip sections if provided (yt-dlp will download only these sections)
if opts.clip_sections:
# Parse section ranges like "48-65,120-152,196-205" (seconds)
# and convert to yt-dlp format: "*HH:MM:SS-HH:MM:SS,*HH:MM:SS-HH:MM:SS"
sections = []
for section_range in opts.clip_sections.split(','):
try:
start_str, end_str = section_range.strip().split('-')
start_sec = float(start_str)
end_sec = float(end_str)
# Convert seconds to HH:MM:SS format
def sec_to_hhmmss(seconds):
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
start_time = sec_to_hhmmss(start_sec)
end_time = sec_to_hhmmss(end_sec)
sections.append(f"*{start_time}-{end_time}")
except (ValueError, AttributeError):
pass
if sections:
# Pass each section as a separate element in the list (yt-dlp expects multiple --download-sections args)
base_options["download_sections"] = sections
debug(f"Download sections configured: {', '.join(sections)}")
# Note: Not using --force-keyframes-at-cuts to avoid re-encoding
# This may result in less precise cuts but faster downloads
# Add playlist items selection if provided
if opts.playlist_items:
base_options["playlist_items"] = opts.playlist_items
debug(f"yt-dlp: mode={opts.mode}, format={base_options.get('format')}")
return base_options
def _iter_download_entries(info: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
"""Iterate through download entries, handling playlists."""
queue: List[Dict[str, Any]] = [info]
seen: set[int] = set()
while queue:
current = queue.pop(0)
obj_id = id(current)
if obj_id in seen:
continue
seen.add(obj_id)
entries = current.get("entries")
if isinstance(entries, list):
for entry in entries:
if isinstance(entry, dict):
queue.append(entry)
if current.get("requested_downloads") or not entries:
yield current
def _candidate_paths(entry: Dict[str, Any], output_dir: Path) -> Iterator[Path]:
"""Get candidate file paths for downloaded media."""
requested = entry.get("requested_downloads")
if isinstance(requested, list):
for item in requested:
if isinstance(item, dict):
for key in ("filepath", "_filename", "filename"):
value = item.get(key)
if value:
yield Path(value)
for key in ("filepath", "_filename", "filename"):
value = entry.get(key)
if value:
yield Path(value)
if entry.get("filename"):
yield output_dir / entry["filename"]
def _resolve_entry_and_path(info: Dict[str, Any], output_dir: Path) -> tuple[Dict[str, Any], Path]:
"""Find downloaded file in yt-dlp metadata."""
for entry in _iter_download_entries(info):
for candidate in _candidate_paths(entry, output_dir):
if candidate.is_file():
return entry, candidate
if not candidate.is_absolute():
resolved = output_dir / candidate
if resolved.is_file():
return entry, resolved
raise FileNotFoundError("yt-dlp did not report a downloaded media file")
def _extract_sha256(info: Dict[str, Any]) -> Optional[str]:
"""Extract SHA256 hash from yt-dlp metadata."""
for payload in [info] + info.get("entries", []):
if not isinstance(payload, dict):
continue
hashes = payload.get("hashes")
if isinstance(hashes, dict):
for key in ("sha256", "sha-256", "sha_256"):
value = hashes.get(key)
if isinstance(value, str) and value.strip():
return value.strip().lower()
for key in ("sha256", "sha-256", "sha_256"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip().lower()
return None
def _get_libgen_download_url(libgen_url: str) -> Optional[str]:
"""Extract the actual download link from LibGen redirect URL.
LibGen URLs like https://libgen.gl/file.php?id=123456 redirect to
actual mirror URLs. This follows the redirect chain to get the real file.
Args:
libgen_url: LibGen file.php URL
Returns:
Actual download URL or None if extraction fails
"""
try:
import requests
from urllib.parse import urlparse
# Check if this is a LibGen URL
parsed = urlparse(libgen_url)
if 'libgen' not in parsed.netloc.lower():
return None
if '/file.php' not in parsed.path.lower():
return None
# LibGen redirects to actual mirrors, follow redirects to get final URL
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
debug(f"Following LibGen redirect chain for: {libgen_url}")
# First, get the page and look for direct download link
try:
response = session.get(libgen_url, timeout=10, allow_redirects=True)
final_url = response.url
# Try to find actual download link in the page
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.content, 'html.parser')
# Look for download links - LibGen typically has forms with download buttons
# Look for all links and forms that might lead to download
for link in soup.find_all('a'):
href = link.get('href')
if href and isinstance(href, str):
# Look for direct file links or get.php redirects
if 'get.php' in href.lower() or href.endswith(('.pdf', '.epub', '.djvu', '.mobi')):
download_url = href if href.startswith('http') else urljoin(final_url, href)
debug(f"Found download link: {download_url}")
return download_url
except ImportError:
pass # BeautifulSoup not available
# If we followed redirects successfully, return the final URL
# This handles cases where libgen redirects to a direct download mirror
if final_url != libgen_url:
debug(f"LibGen resolved to mirror: {final_url}")
return final_url
except requests.RequestException as e:
log(f"Error following LibGen redirects: {e}", file=sys.stderr)
# Try head request as fallback
try:
response = session.head(libgen_url, allow_redirects=True, timeout=10)
if response.url != libgen_url:
debug(f"LibGen HEAD resolved to: {response.url}")
return response.url
except:
pass
return None
except Exception as e:
log(f"Error resolving LibGen URL: {e}", file=sys.stderr)
return None
def _download_direct_file(
url: str,
output_dir: Path,
debug_logger: Optional[DebugLogger] = None,
) -> DownloadMediaResult:
"""Download a direct file (PDF, image, document, etc.) without yt-dlp."""
ensure_directory(output_dir)
from urllib.parse import unquote, urlparse, parse_qs
import re
# Extract filename from URL
parsed_url = urlparse(url)
url_path = parsed_url.path
# Try to get filename from query parameters first (for LibGen and similar services)
# e.g., ?filename=Book+Title.pdf or &download=filename.pdf
filename = None
if parsed_url.query:
query_params = parse_qs(parsed_url.query)
for param_name in ('filename', 'download', 'file', 'name'):
if param_name in query_params and query_params[param_name]:
filename = query_params[param_name][0]
filename = unquote(filename)
break
# If not found in query params, extract from URL path
if not filename or not filename.strip():
filename = url_path.split("/")[-1] if url_path else ""
filename = unquote(filename)
# Remove query strings from filename if any
if "?" in filename:
filename = filename.split("?")[0]
# Try to get real filename from Content-Disposition header (HEAD request)
try:
with HTTPClient(timeout=10.0) as client:
response = client._request("HEAD", url, follow_redirects=True)
content_disposition = response.headers.get("content-disposition", "")
if content_disposition:
# Extract filename from Content-Disposition header
# Format: attachment; filename="filename.pdf" or filename=filename.pdf
match = re.search(r'filename\*?=(?:"([^"]*)"|([^;\s]*))', content_disposition)
if match:
extracted_name = match.group(1) or match.group(2)
if extracted_name:
filename = unquote(extracted_name)
debug(f"Filename from Content-Disposition: {filename}")
except Exception as e:
log(f"Could not get filename from headers: {e}", file=sys.stderr)
# Fallback if we still don't have a good filename
if not filename or "." not in filename:
filename = "downloaded_file.bin"
file_path = output_dir / filename
progress_bar = ProgressBar()
debug(f"Direct download: {filename}")
try:
start_time = time.time()
downloaded_bytes = [0]
total_bytes = [0]
last_progress_time = [start_time]
def progress_callback(bytes_downloaded: int, content_length: int) -> None:
downloaded_bytes[0] = bytes_downloaded
total_bytes[0] = content_length
now = time.time()
if now - last_progress_time[0] >= 0.5 and total_bytes[0] > 0:
elapsed = now - start_time
percent = (bytes_downloaded / content_length) * 100 if content_length > 0 else 0
speed = bytes_downloaded / elapsed if elapsed > 0 else 0
eta_seconds = (content_length - bytes_downloaded) / speed if speed > 0 else 0
speed_str = progress_bar.format_bytes(speed) + "/s"
minutes, seconds = divmod(int(eta_seconds), 60)
hours, minutes = divmod(minutes, 60)
eta_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
progress_line = progress_bar.format_progress(
percent_str=f"{percent:.1f}%",
downloaded=bytes_downloaded,
total=content_length,
speed_str=speed_str,
eta_str=eta_str,
)
debug(progress_line)
last_progress_time[0] = now
with HTTPClient(timeout=30.0) as client:
client.download(url, str(file_path), progress_callback=progress_callback)
elapsed = time.time() - start_time
avg_speed_str = progress_bar.format_bytes(downloaded_bytes[0] / elapsed if elapsed > 0 else 0) + "/s"
debug(f"✓ Downloaded in {elapsed:.1f}s at {avg_speed_str}")
# For direct file downloads, create minimal info dict without filename as title
# This prevents creating duplicate title: tags when filename gets auto-generated
# We'll add title back later only if we couldn't extract meaningful tags
info = {
"id": filename.rsplit(".", 1)[0],
"ext": filename.rsplit(".", 1)[1] if "." in filename else "bin",
"webpage_url": url,
}
hash_value = None
try:
hash_value = sha256_file(file_path)
except Exception:
pass
tags = []
if extract_ytdlp_tags:
try:
tags = extract_ytdlp_tags(info)
except Exception as e:
log(f"Error extracting tags: {e}", file=sys.stderr)
# Only use filename as a title tag if we couldn't extract any meaningful tags
# This prevents duplicate title: tags when the filename could be mistaken for metadata
if not any(t.startswith('title:') for t in tags):
# Re-extract tags with filename as title only if needed
info['title'] = filename
tags = []
if extract_ytdlp_tags:
try:
tags = extract_ytdlp_tags(info)
except Exception as e:
log(f"Error extracting tags with filename: {e}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"direct-file-downloaded",
{"url": url, "path": str(file_path), "hash": hash_value},
)
return DownloadMediaResult(
path=file_path,
info=info,
tags=tags,
source_url=url,
hash_value=hash_value,
)
except (httpx.HTTPError, httpx.RequestError) as exc:
log(f"Download error: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{"phase": "direct-file", "url": url, "error": str(exc)},
)
raise DownloadError(f"Failed to download {url}: {exc}") from exc
except Exception as exc:
log(f"Error downloading file: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{
"phase": "direct-file",
"url": url,
"error": str(exc),
"traceback": traceback.format_exc(),
},
)
raise DownloadError(f"Error downloading file: {exc}") from exc
def probe_url(url: str, no_playlist: bool = False) -> Optional[Dict[str, Any]]:
"""Probe URL to extract metadata WITHOUT downloading.
Args:
url: URL to probe
no_playlist: If True, ignore playlists and probe only the single video
Returns:
Dict with keys: extractor, title, entries (if playlist), duration, etc.
Returns None if not supported by yt-dlp.
"""
if not is_url_supported_by_ytdlp(url):
return None
_ensure_yt_dlp_ready()
assert yt_dlp is not None
try:
# Extract info without downloading
# Use extract_flat='in_playlist' to get full metadata for playlist items
ydl_opts = {
"quiet": True, # Suppress all output
"no_warnings": True,
"socket_timeout": 10,
"retries": 3,
"skip_download": True, # Don't actually download
"extract_flat": "in_playlist", # Get playlist with metadata for each entry
"noprogress": True, # No progress bars
}
# Add cookies if available
global_cookies = get_cookies_file_path()
if global_cookies:
ydl_opts["cookiefile"] = global_cookies
# Add no_playlist option if specified
if no_playlist:
ydl_opts["noplaylist"] = True
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
info = ydl.extract_info(url, download=False)
if not isinstance(info, dict):
return None
# Extract relevant fields
return {
"extractor": info.get("extractor", ""),
"title": info.get("title", ""),
"entries": info.get("entries", []), # Will be populated if playlist
"duration": info.get("duration"),
"uploader": info.get("uploader"),
"description": info.get("description"),
"url": url,
}
except Exception as exc:
log(f"Probe failed for {url}: {exc}")
return None
def download_media(
opts: DownloadOptions,
*,
debug_logger: Optional[DebugLogger] = None,
) -> DownloadMediaResult:
"""Download media from URL using yt-dlp or direct HTTP download.
Args:
opts: DownloadOptions with url, mode, output_dir, etc.
debug_logger: Optional debug logger for troubleshooting
Returns:
DownloadMediaResult with path, info, tags, hash
Raises:
DownloadError: If download fails
"""
# Handle LibGen URLs specially
# file.php redirects to mirrors, get.php is direct from modern API
if 'libgen' in opts.url.lower():
if '/get.php' in opts.url.lower():
# Modern API get.php links are direct downloads from mirrors (not file redirects)
log(f"Detected LibGen get.php URL, downloading directly...")
if debug_logger is not None:
debug_logger.write_record("libgen-direct", {"url": opts.url})
return _download_direct_file(opts.url, opts.output_dir, debug_logger)
elif '/file.php' in opts.url.lower():
# Old-style file.php redirects to mirrors, we need to resolve
log(f"Detected LibGen file.php URL, resolving to actual mirror...")
actual_url = _get_libgen_download_url(opts.url)
if actual_url and actual_url != opts.url:
log(f"Resolved LibGen URL to mirror: {actual_url}")
opts.url = actual_url
# After resolution, this will typically be an onion link or direct file
# Skip yt-dlp for this (it won't support onion/mirrors), go direct
if debug_logger is not None:
debug_logger.write_record("libgen-resolved", {"original": opts.url, "resolved": actual_url})
return _download_direct_file(opts.url, opts.output_dir, debug_logger)
else:
log(f"Could not resolve LibGen URL, trying direct download anyway", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record("libgen-resolve-failed", {"url": opts.url})
return _download_direct_file(opts.url, opts.output_dir, debug_logger)
# Try yt-dlp first if URL is supported
if not is_url_supported_by_ytdlp(opts.url):
log(f"URL not supported by yt-dlp, trying direct download: {opts.url}")
if debug_logger is not None:
debug_logger.write_record("direct-file-attempt", {"url": opts.url})
return _download_direct_file(opts.url, opts.output_dir, debug_logger)
_ensure_yt_dlp_ready()
ytdl_options = _build_ytdlp_options(opts)
debug(f"Starting yt-dlp download: {opts.url}")
if debug_logger is not None:
debug_logger.write_record("ytdlp-start", {"url": opts.url})
assert yt_dlp is not None
try:
# Debug: show what options we're using
if ytdl_options.get("download_sections"):
debug(f"[yt-dlp] download_sections: {ytdl_options['download_sections']}")
debug(f"[yt-dlp] force_keyframes_at_cuts: {ytdl_options.get('force_keyframes_at_cuts', False)}")
# Use subprocess when download_sections are present (Python API doesn't support them properly)
session_id = None
first_section_info = {}
if ytdl_options.get("download_sections"):
session_id, first_section_info = _download_with_sections_via_cli(opts.url, ytdl_options, ytdl_options.get("download_sections", []))
info = None
else:
with yt_dlp.YoutubeDL(ytdl_options) as ydl: # type: ignore[arg-type]
info = ydl.extract_info(opts.url, download=True)
except Exception as exc:
log(f"yt-dlp failed: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{
"phase": "yt-dlp",
"error": str(exc),
"traceback": traceback.format_exc(),
},
)
raise DownloadError("yt-dlp download failed") from exc
# If we used subprocess, we need to find the file manually
if info is None:
# Find files created/modified during this download (after we started)
# Look for files matching the expected output template pattern
try:
import glob
import time
import re
# Get the expected filename pattern from outtmpl
# For sections: "C:\path\{session_id}.section_1_of_3.ext", etc.
# For non-sections: "C:\path\title.ext"
# Wait a moment to ensure files are fully written
time.sleep(0.5)
# List all files in output_dir, sorted by modification time
files = sorted(opts.output_dir.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True)
if not files:
raise FileNotFoundError(f"No files found in {opts.output_dir}")
# If we downloaded sections, look for files with the session_id pattern
if opts.clip_sections and session_id:
# Pattern: "{session_id}_1.ext", "{session_id}_2.ext", etc.
section_pattern = re.compile(rf'^{re.escape(session_id)}_(\d+)\.')
matching_files = [f for f in files if section_pattern.search(f.name)]
if matching_files:
# Sort by section number to ensure correct order
def extract_section_num(path: Path) -> int:
match = section_pattern.search(path.name)
return int(match.group(1)) if match else 999
matching_files.sort(key=extract_section_num)
debug(f"Found {len(matching_files)} section file(s) matching pattern")
# Now rename section files to use hash-based names
# This ensures unique filenames for each section content
renamed_files = []
for idx, section_file in enumerate(matching_files, 1):
try:
# Calculate hash for the file
file_hash = sha256_file(section_file)
ext = section_file.suffix
new_name = f"{file_hash}{ext}"
new_path = opts.output_dir / new_name
if new_path.exists() and new_path != section_file:
# If file with same hash exists, use it and delete the temp one
debug(f"File with hash {file_hash} already exists, using existing file.")
try:
section_file.unlink()
except OSError:
pass
renamed_files.append(new_path)
else:
section_file.rename(new_path)
debug(f"Renamed section file: {section_file.name}{new_name}")
renamed_files.append(new_path)
except Exception as e:
debug(f"Failed to process section file {section_file.name}: {e}")
renamed_files.append(section_file)
media_path = renamed_files[0]
media_paths = renamed_files
debug(f"✓ Downloaded {len(media_paths)} section file(s) (session: {session_id})")
else:
# Fallback to most recent file if pattern not found
media_path = files[0]
media_paths = None
debug(f"✓ Downloaded section file (pattern not found): {media_path.name}")
else:
# No sections, just take the most recent file
media_path = files[0]
media_paths = None
debug(f"✓ Downloaded: {media_path.name}")
if debug_logger is not None:
debug_logger.write_record("ytdlp-file-found", {"path": str(media_path)})
except Exception as exc:
log(f"Error finding downloaded file: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{"phase": "find-file", "error": str(exc)},
)
raise DownloadError(str(exc)) from exc
# Create result with minimal data extracted from filename
file_hash = sha256_file(media_path)
# For section downloads, create tags with the title and build proper info dict
tags = []
title = ''
if first_section_info:
title = first_section_info.get('title', '')
if title:
tags.append(f'title:{title}')
debug(f"Added title tag for section download: {title}")
# Build info dict - always use extracted title if available, not hash
if first_section_info:
info_dict = first_section_info
else:
info_dict = {
"id": media_path.stem,
"title": title or media_path.stem,
"ext": media_path.suffix.lstrip(".")
}
return DownloadMediaResult(
path=media_path,
info=info_dict,
tags=tags,
source_url=opts.url,
hash_value=file_hash,
paths=media_paths, # Include all section files if present
)
if not isinstance(info, dict):
log(f"Unexpected yt-dlp response: {type(info)}", file=sys.stderr)
raise DownloadError("Unexpected yt-dlp response type")
info_dict: Dict[str, Any] = info
if debug_logger is not None:
debug_logger.write_record(
"ytdlp-info",
{
"keys": sorted(info_dict.keys()),
"is_playlist": bool(info_dict.get("entries")),
},
)
try:
entry, media_path = _resolve_entry_and_path(info_dict, opts.output_dir)
except FileNotFoundError as exc:
log(f"Error: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{"phase": "resolve-path", "error": str(exc)},
)
raise DownloadError(str(exc)) from exc
if debug_logger is not None:
debug_logger.write_record(
"resolved-media",
{"path": str(media_path), "entry_keys": sorted(entry.keys())},
)
# Extract hash from metadata or compute
hash_value = _extract_sha256(entry) or _extract_sha256(info_dict)
if not hash_value:
try:
hash_value = sha256_file(media_path)
except OSError as exc:
if debug_logger is not None:
debug_logger.write_record(
"hash-error",
{"path": str(media_path), "error": str(exc)},
)
# Extract tags using metadata.py
tags = []
if extract_ytdlp_tags:
try:
tags = extract_ytdlp_tags(entry)
except Exception as e:
log(f"Error extracting tags: {e}", file=sys.stderr)
source_url = (
entry.get("webpage_url")
or entry.get("original_url")
or entry.get("url")
)
debug(f"✓ Downloaded: {media_path.name} ({len(tags)} tags)")
if debug_logger is not None:
debug_logger.write_record(
"downloaded",
{
"path": str(media_path),
"tag_count": len(tags),
"source_url": source_url,
"sha256": hash_value,
},
)
return DownloadMediaResult(
path=media_path,
info=entry,
tags=tags,
source_url=source_url,
hash_value=hash_value,
)
__all__ = [
"download_media",
"is_url_supported_by_ytdlp",
"DownloadError",
"DownloadOptions",
"DownloadMediaResult",
]