Files
Medios-Macina/cmdlet/download_media.py
2025-12-19 15:20:08 -08:00

2649 lines
110 KiB
Python

"""Download media from url using yt-dlp (streaming sites only).
Focused cmdlet for video/audio downloads from yt-dlp-supported sites:
- YouTube, Twitch, Dailymotion, Vimeo, etc.
- No direct file downloads (use download-file for that)
- Playlist detection with item selection
- Clip extraction (time ranges)
- Format selection and audio/video modes
- Tags extraction and metadata integration
"""
from __future__ import annotations
import sys
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence
import glob # noqa: F401
import hashlib
import json # noqa: F401
import random
import re
import string
import subprocess
import sys
import tempfile
import time
import traceback
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin, urlparse
import httpx
from SYS.logger import log, debug
from SYS.utils import ensure_directory, sha256_file
from API.HTTP import HTTPClient
from models import DownloadError, DownloadOptions, DownloadMediaResult, DebugLogger, ProgressBar
import pipeline as pipeline_context
from result_table import ResultTable
from tool.ytdlp import YtDlpTool
from . import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
create_pipe_object_result = sh.create_pipe_object_result
parse_cmdlet_args = sh.parse_cmdlet_args
register_url_with_local_library = sh.register_url_with_local_library
coerce_to_pipe_object = sh.coerce_to_pipe_object
get_field = sh.get_field
# Minimal inlined helpers from helper/download.py (is_url_supported_by_ytdlp, list_formats)
try:
import yt_dlp # type: ignore
from yt_dlp.extractor import gen_extractors # type: ignore
except Exception as exc:
yt_dlp = None # type: ignore
YTDLP_IMPORT_ERROR = exc
else:
YTDLP_IMPORT_ERROR = None
try:
from metadata import extract_ytdlp_tags
except ImportError:
extract_ytdlp_tags = None
_EXTRACTOR_CACHE: List[Any] | None = None
# Reused progress formatter for yt-dlp callbacks (stderr only).
_YTDLP_PROGRESS_BAR = ProgressBar()
_SUBTITLE_EXTS = (".vtt", ".srt", ".ass", ".ssa", ".lrc")
def _format_chapters_note(info: Dict[str, Any]) -> Optional[str]:
"""Format yt-dlp chapter metadata into a stable, note-friendly text.
Output is one chapter per line, e.g.:
00:00 Intro
01:23-02:10 Topic name
"""
try:
chapters = info.get("chapters")
except Exception:
chapters = None
if not isinstance(chapters, list) or not chapters:
return None
rows: List[tuple[int, Optional[int], str]] = []
max_t = 0
for ch in chapters:
if not isinstance(ch, dict):
continue
start_raw = ch.get("start_time")
end_raw = ch.get("end_time")
title_raw = ch.get("title") or ch.get("name") or ch.get("chapter")
try:
start_s = int(float(start_raw))
except Exception:
continue
end_s: Optional[int] = None
try:
if end_raw is not None:
end_s = int(float(end_raw))
except Exception:
end_s = None
title = str(title_raw).strip() if title_raw is not None else ""
rows.append((start_s, end_s, title))
try:
max_t = max(max_t, start_s, end_s or 0)
except Exception:
max_t = max(max_t, start_s)
if not rows:
return None
force_hours = bool(max_t >= 3600)
def _tc(seconds: int) -> str:
total = max(0, int(seconds))
minutes, secs = divmod(total, 60)
hours, minutes = divmod(minutes, 60)
if force_hours:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
lines: List[str] = []
for start_s, end_s, title in sorted(rows, key=lambda r: (r[0], r[1] if r[1] is not None else 10**9, r[2])):
if end_s is not None and end_s > start_s:
prefix = f"{_tc(start_s)}-{_tc(end_s)}"
else:
prefix = _tc(start_s)
line = f"{prefix} {title}".strip()
if line:
lines.append(line)
text = "\n".join(lines).strip()
return text or None
def _best_subtitle_sidecar(media_path: Path) -> Optional[Path]:
"""Find the most likely subtitle sidecar file for a downloaded media file."""
try:
base_dir = media_path.parent
stem = media_path.stem
if not stem:
return None
candidates: List[Path] = []
for p in base_dir.glob(stem + ".*"):
try:
if not p.is_file():
continue
except Exception:
continue
if p.suffix.lower() in _SUBTITLE_EXTS:
candidates.append(p)
if not candidates:
return None
def _rank(path: Path) -> tuple[int, int, float, str]:
name = path.name.lower()
lang_rank = 0 if ".en." in name or name.endswith(".en" + path.suffix.lower()) else 1
ext = path.suffix.lower()
ext_rank_map = {".vtt": 0, ".srt": 1, ".ass": 2, ".ssa": 3, ".lrc": 4}
ext_rank = ext_rank_map.get(ext, 9)
try:
mtime = float(path.stat().st_mtime)
except Exception:
mtime = 0.0
return (lang_rank, ext_rank, -mtime, name)
candidates.sort(key=_rank)
return candidates[0]
except Exception:
return None
def _read_text_file(path: Path, *, max_bytes: int = 1_500_000) -> Optional[str]:
try:
data = path.read_bytes()
except Exception:
return None
if not data:
return None
if len(data) > max_bytes:
data = data[:max_bytes]
try:
return data.decode("utf-8", errors="replace")
except Exception:
try:
return data.decode(errors="replace")
except Exception:
return None
def _ensure_yt_dlp_ready() -> None:
if yt_dlp is not None:
return
detail = str(YTDLP_IMPORT_ERROR or "yt-dlp is not installed")
raise DownloadError(f"yt-dlp module not available: {detail}")
def is_url_supported_by_ytdlp(url: str) -> bool:
if yt_dlp is None:
return False
global _EXTRACTOR_CACHE
if _EXTRACTOR_CACHE is None:
try:
_EXTRACTOR_CACHE = [ie for ie in gen_extractors()] # type: ignore[arg-type]
except Exception:
_EXTRACTOR_CACHE = []
for extractor in _EXTRACTOR_CACHE:
try:
if not extractor.suitable(url):
continue
except Exception:
continue
name = getattr(extractor, "IE_NAME", "")
if name.lower() == "generic":
continue
return True
return False
def list_formats(url: str, no_playlist: bool = False, playlist_items: Optional[str] = None) -> Optional[List[Dict[str, Any]]]:
_ensure_yt_dlp_ready()
try:
assert yt_dlp is not None
ydl_opts: Dict[str, Any] = {"quiet": True, "no_warnings": True, "socket_timeout": 30}
if no_playlist:
ydl_opts["noplaylist"] = True
if playlist_items:
ydl_opts["playlist_items"] = playlist_items
debug(f"Fetching format list for: {url}")
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
info = ydl.extract_info(url, download=False)
if not isinstance(info, dict):
log("No formats available", file=sys.stderr)
return None
formats = info.get("formats") or []
if not isinstance(formats, list) or not formats:
log("No formats available", file=sys.stderr)
return None
result_formats: List[Dict[str, Any]] = []
for fmt in formats:
if not isinstance(fmt, dict):
continue
result_formats.append(
{
"format_id": fmt.get("format_id", ""),
"format": fmt.get("format", ""),
"ext": fmt.get("ext", ""),
"resolution": fmt.get("resolution", ""),
"width": fmt.get("width"),
"height": fmt.get("height"),
"fps": fmt.get("fps"),
"vcodec": fmt.get("vcodec", "none"),
"acodec": fmt.get("acodec", "none"),
"filesize": fmt.get("filesize"),
"abr": fmt.get("abr"),
"tbr": fmt.get("tbr"),
}
)
debug(f"Found {len(result_formats)} available formats")
return result_formats or None
except Exception as e:
log(f"✗ Error fetching formats: {e}", file=sys.stderr)
return None
def _pick_best_audio_format_id(formats: List[Dict[str, Any]]) -> Optional[str]:
audio_only: List[Dict[str, Any]] = []
for fmt in formats:
if not isinstance(fmt, dict):
continue
format_id = str(fmt.get("format_id") or "").strip()
if not format_id:
continue
vcodec = str(fmt.get("vcodec") or "none").lower()
acodec = str(fmt.get("acodec") or "none").lower()
if vcodec != "none":
continue
if not acodec or acodec == "none":
continue
audio_only.append(fmt)
if not audio_only:
return None
def score(f: Dict[str, Any]) -> tuple[float, float]:
tbr = f.get("tbr")
abr = f.get("abr")
bitrate = 0.0
for candidate in (tbr, abr):
try:
if candidate is not None:
bitrate = max(bitrate, float(candidate))
except Exception:
pass
size = 0.0
try:
fs = f.get("filesize")
if fs is not None:
size = float(fs)
except Exception:
pass
return (bitrate, size)
best = max(audio_only, key=score)
best_id = str(best.get("format_id") or "").strip()
return best_id or None
def _download_with_sections_via_cli(url: str, ytdl_options: Dict[str, Any], sections: List[str], quiet: bool = False) -> tuple[Optional[str], Dict[str, Any]]:
sections_list = ytdl_options.get("download_sections", [])
if not sections_list:
return "", {}
session_id = hashlib.md5((url + str(time.time()) + ''.join(random.choices(string.ascii_letters, k=10))).encode()).hexdigest()[:12]
first_section_info = None
for section_idx, section in enumerate(sections_list, 1):
base_outtmpl = ytdl_options.get("outtmpl", "%(title)s.%(ext)s")
output_dir_path = Path(base_outtmpl).parent
filename_tmpl = f"{session_id}_{section_idx}"
if base_outtmpl.endswith(".%(ext)s"):
filename_tmpl += ".%(ext)s"
section_outtmpl = str(output_dir_path / filename_tmpl)
if section_idx == 1:
metadata_cmd = ["yt-dlp", "--dump-json", "--skip-download"]
if ytdl_options.get("cookiefile"):
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
metadata_cmd.extend(["--cookies", cookies_path])
if ytdl_options.get("noplaylist"):
metadata_cmd.append("--no-playlist")
metadata_cmd.append(url)
try:
meta_result = subprocess.run(metadata_cmd, capture_output=True, text=True)
if meta_result.returncode == 0 and meta_result.stdout:
try:
info_dict = json.loads(meta_result.stdout.strip())
first_section_info = info_dict
if not quiet:
debug(f"Extracted title from metadata: {info_dict.get('title')}")
except json.JSONDecodeError:
if not quiet:
debug("Could not parse JSON metadata")
except Exception as e:
if not quiet:
debug(f"Error extracting metadata: {e}")
cmd = ["yt-dlp"]
if ytdl_options.get("format"):
cmd.extend(["-f", ytdl_options["format"]])
if ytdl_options.get("merge_output_format"):
cmd.extend(["--merge-output-format", str(ytdl_options["merge_output_format"])])
# For CLI downloads, infer chapter/metadata embedding from either legacy flags
# or explicit FFmpegMetadata postprocessor entries.
postprocessors = ytdl_options.get("postprocessors")
want_add_metadata = bool(ytdl_options.get("addmetadata"))
want_embed_chapters = bool(ytdl_options.get("embedchapters"))
if isinstance(postprocessors, list):
for pp in postprocessors:
if not isinstance(pp, dict):
continue
if str(pp.get("key") or "") == "FFmpegMetadata":
want_add_metadata = True
if bool(pp.get("add_chapters", True)):
want_embed_chapters = True
if want_add_metadata:
cmd.append("--add-metadata")
if want_embed_chapters:
cmd.append("--embed-chapters")
if ytdl_options.get("writesubtitles"):
cmd.append("--write-sub")
cmd.append("--write-auto-sub")
cmd.extend(["--sub-format", "vtt"])
if ytdl_options.get("force_keyframes_at_cuts"):
cmd.extend(["--force-keyframes-at-cuts"]) if ytdl_options.get("force_keyframes_at_cuts") else None
cmd.extend(["-o", section_outtmpl])
if ytdl_options.get("cookiefile"):
cookies_path = ytdl_options["cookiefile"].replace("\\", "/")
cmd.extend(["--cookies", cookies_path])
if ytdl_options.get("noplaylist"):
cmd.append("--no-playlist")
# Apply clip/section selection
cmd.extend(["--download-sections", section])
cmd.append(url)
if not quiet:
debug(f"Running yt-dlp for section: {section}")
try:
subprocess.run(cmd, check=True)
except Exception as exc:
if not quiet:
debug(f"yt-dlp error for section {section}: {exc}")
return session_id, first_section_info or {}
def _build_ytdlp_options(opts: DownloadOptions) -> Dict[str, Any]:
ensure_directory(opts.output_dir)
outtmpl = str((opts.output_dir / "%(title)s.%(ext)s").resolve())
base_options: Dict[str, Any] = {
"outtmpl": outtmpl,
"quiet": True,
"no_warnings": True,
"noprogress": True,
"socket_timeout": 30,
"retries": 10,
"fragment_retries": 10,
"http_chunk_size": 10_485_760,
"restrictfilenames": True,
}
# Prefer the bundled ffmpeg shipped with the repo (used for merges/remux/postproc).
try:
repo_root = Path(__file__).resolve().parents[1]
bundled_ffmpeg_dir = repo_root / "MPV" / "ffmpeg" / "bin"
if bundled_ffmpeg_dir.exists():
base_options.setdefault("ffmpeg_location", str(bundled_ffmpeg_dir))
except Exception:
pass
# On Windows, AV/indexers can transiently lock files at the end of a download.
# yt-dlp uses file_access_retries for renames (e.g. .part -> final). Default is low.
try:
if os.name == "nt":
base_options.setdefault("file_access_retries", 40)
except Exception:
pass
# Always show a progress bar. The hook prints to stderr so piped stdout stays clean.
base_options["progress_hooks"] = [_progress_callback]
if opts.cookies_path and opts.cookies_path.is_file():
base_options["cookiefile"] = str(opts.cookies_path)
if opts.no_playlist:
base_options["noplaylist"] = True
if opts.mode == "audio":
base_options["format"] = opts.ytdl_format or "251/140/bestaudio"
base_options["postprocessors"] = [{"key": "FFmpegExtractAudio"}]
else:
base_options["format"] = opts.ytdl_format or "bestvideo+bestaudio/best"
base_options["format_sort"] = ["res:4320", "res:2880", "res:2160", "res:1440", "res:1080", "res:720", "res"]
# Optional yt-dlp features
if getattr(opts, "embed_chapters", False):
# Prefer explicit FFmpegMetadata PP so chapter embedding runs even when
# we already specified other postprocessors (e.g. FFmpegExtractAudio).
pps = base_options.get("postprocessors")
if not isinstance(pps, list):
pps = []
already_has_metadata = any(
isinstance(pp, dict) and str(pp.get("key") or "") == "FFmpegMetadata" for pp in pps
)
if not already_has_metadata:
pps.append(
{
"key": "FFmpegMetadata",
"add_metadata": True,
"add_chapters": True,
"add_infojson": "if_exists",
}
)
base_options["postprocessors"] = pps
# Chapter embedding is most reliable in mkv/mp4 containers.
# When merging separate video+audio streams, prefer mkv so mpv sees chapters.
if opts.mode != "audio":
base_options.setdefault("merge_output_format", "mkv")
if getattr(opts, "write_sub", False):
base_options["writesubtitles"] = True
base_options["writeautomaticsub"] = True
base_options["subtitlesformat"] = "vtt"
if opts.clip_sections:
sections: List[str] = []
def _secs_to_hms(seconds: float) -> str:
total = max(0, int(seconds))
minutes, secs = divmod(total, 60)
hours, minutes = divmod(minutes, 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
for section_range in str(opts.clip_sections).split(","):
section_range = section_range.strip()
if not section_range:
continue
try:
start_s_raw, end_s_raw = section_range.split("-", 1)
start_s = float(start_s_raw.strip())
end_s = float(end_s_raw.strip())
if start_s >= end_s:
continue
sections.append(f"*{_secs_to_hms(start_s)}-{_secs_to_hms(end_s)}")
except (ValueError, AttributeError):
continue
if sections:
base_options["download_sections"] = sections
debug(f"Download sections configured: {', '.join(sections)}")
if opts.playlist_items:
base_options["playlist_items"] = opts.playlist_items
if not opts.quiet:
debug(f"yt-dlp: mode={opts.mode}, format={base_options.get('format')}")
return base_options
def _iter_download_entries(info: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
queue: List[Dict[str, Any]] = [info]
seen: set[int] = set()
while queue:
current = queue.pop(0)
obj_id = id(current)
if obj_id in seen:
continue
seen.add(obj_id)
entries = current.get("entries")
if isinstance(entries, list):
for entry in entries:
queue.append(entry)
if current.get("requested_downloads") or not entries:
yield current
def _candidate_paths(entry: Dict[str, Any], output_dir: Path) -> Iterator[Path]:
requested = entry.get("requested_downloads")
if isinstance(requested, list):
for item in requested:
if isinstance(item, dict):
fp = item.get("filepath") or item.get("_filename")
if fp:
yield Path(fp)
for key in ("filepath", "_filename", "filename"):
value = entry.get(key)
if value:
yield Path(value)
if entry.get("filename"):
yield output_dir / entry["filename"]
def _resolve_entry_and_path(info: Dict[str, Any], output_dir: Path) -> tuple[Dict[str, Any], Path]:
for entry in _iter_download_entries(info):
for candidate in _candidate_paths(entry, output_dir):
if candidate.is_file():
return entry, candidate
if not candidate.is_absolute():
maybe = output_dir / candidate
if maybe.is_file():
return entry, maybe
raise FileNotFoundError("yt-dlp did not report a downloaded media file")
def _resolve_entries_and_paths(info: Dict[str, Any], output_dir: Path) -> List[tuple[Dict[str, Any], Path]]:
resolved: List[tuple[Dict[str, Any], Path]] = []
seen: set[str] = set()
for entry in _iter_download_entries(info):
chosen: Optional[Path] = None
for candidate in _candidate_paths(entry, output_dir):
if candidate.is_file():
chosen = candidate
break
if not candidate.is_absolute():
maybe = output_dir / candidate
if maybe.is_file():
chosen = maybe
break
if chosen is None:
continue
key = str(chosen.resolve())
if key in seen:
continue
seen.add(key)
resolved.append((entry, chosen))
return resolved
def _extract_sha256(info: Dict[str, Any]) -> Optional[str]:
for payload in [info] + info.get("entries", []):
if not isinstance(payload, dict):
continue
hashes = payload.get("hashes")
if isinstance(hashes, dict):
for key in ("sha256", "sha-256", "sha_256"):
if key in hashes and isinstance(hashes[key], str) and hashes[key].strip():
return hashes[key].strip()
for key in ("sha256", "sha-256", "sha_256"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return None
def _get_libgen_download_url(libgen_url: str) -> Optional[str]:
try:
from urllib.parse import urlparse
import requests
parsed = urlparse(libgen_url)
if 'libgen' not in parsed.netloc.lower():
return None
if '/file.php' not in parsed.path.lower():
return None
session = requests.Session()
session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})
debug(f"Following LibGen redirect chain for: {libgen_url}")
try:
response = session.get(libgen_url, timeout=10, allow_redirects=True)
final_url = response.url
try:
try:
from lxml import html as lxml_html
except ImportError:
lxml_html = None
if lxml_html is not None:
doc = lxml_html.fromstring(response.content)
for a in doc.xpath("//a[@href]"):
href = str(a.get("href") or "").strip()
if href and "get.php" in href.lower():
return urljoin(final_url, href)
else:
for m in re.finditer(
r"href=[\"\']([^\"\']+)[\"\']",
response.text or "",
flags=re.IGNORECASE,
):
href = str(m.group(1) or "").strip()
if href and "get.php" in href.lower():
return urljoin(final_url, href)
except Exception:
pass
if final_url != libgen_url:
debug(f"LibGen resolved to mirror: {final_url}")
return final_url
except requests.RequestException as e:
log(f"Error following LibGen redirects: {e}", file=sys.stderr)
try:
response = session.head(libgen_url, allow_redirects=True, timeout=10)
if response.url != libgen_url:
return response.url
except:
pass
return None
except Exception as e:
log(f"Error resolving LibGen URL: {e}", file=sys.stderr)
return None
def _progress_callback(status: Dict[str, Any]) -> None:
"""Simple progress callback using logger."""
event = status.get("status")
if event == "downloading":
# Always print progress to stderr so piped stdout remains clean.
percent = status.get("_percent_str")
downloaded = status.get("downloaded_bytes")
total = status.get("total_bytes") or status.get("total_bytes_estimate")
speed = status.get("_speed_str")
eta = status.get("_eta_str")
try:
line = _YTDLP_PROGRESS_BAR.format_progress(
percent_str=str(percent) if percent is not None else None,
downloaded=int(downloaded) if downloaded is not None else None,
total=int(total) if total is not None else None,
speed_str=str(speed) if speed is not None else None,
eta_str=str(eta) if eta is not None else None,
)
except Exception:
pct = str(percent) if percent is not None else "?"
spd = str(speed) if speed is not None else "?"
et = str(eta) if eta is not None else "?"
line = f"[download] {pct} at {spd} ETA {et}"
sys.stderr.write("\r" + line + " ")
sys.stderr.flush()
elif event == "finished":
# Clear the in-place progress line.
sys.stderr.write("\r" + (" " * 140) + "\r")
sys.stderr.write("\n")
sys.stderr.flush()
elif event in ("postprocessing", "processing"):
return
def _download_direct_file(
url: str,
output_dir: Path,
debug_logger: Optional[DebugLogger] = None,
quiet: bool = False,
) -> DownloadMediaResult:
"""Download a direct file (PDF, image, document, etc.) without yt-dlp."""
ensure_directory(output_dir)
from urllib.parse import unquote, urlparse, parse_qs
import re
# Extract filename from URL
parsed_url = urlparse(url)
url_path = parsed_url.path
# Try to get filename from query parameters first (for LibGen and similar services)
# e.g., ?filename=Book+Title.pdf or &download=filename.pdf
filename = None
if parsed_url.query:
query_params = parse_qs(parsed_url.query)
for param_name in ('filename', 'download', 'file', 'name'):
if param_name in query_params and query_params[param_name]:
filename = query_params[param_name][0]
filename = unquote(filename)
break
# If not found in query params, extract from URL path
if not filename or not filename.strip():
filename = url_path.split("/")[-1] if url_path else ""
filename = unquote(filename)
# Remove query strings from filename if any
if "?" in filename:
filename = filename.split("?")[0]
# Try to get real filename from Content-Disposition header (HEAD request)
try:
with HTTPClient(timeout=10.0) as client:
response = client._request("HEAD", url, follow_redirects=True)
content_disposition = response.headers.get("content-disposition", "")
if content_disposition:
# Extract filename from Content-Disposition header
# Format: attachment; filename="filename.pdf" or filename=filename.pdf
match = re.search(r'filename\*?=(?:"([^"]*)"|([^;\s]*))', content_disposition)
if match:
extracted_name = match.group(1) or match.group(2)
if extracted_name:
filename = unquote(extracted_name)
if not quiet:
debug(f"Filename from Content-Disposition: {filename}")
except Exception as e:
if not quiet:
log(f"Could not get filename from headers: {e}", file=sys.stderr)
# Fallback if we still don't have a good filename
if not filename or "." not in filename:
filename = "downloaded_file.bin"
file_path = output_dir / filename
progress_bar = ProgressBar()
if not quiet:
debug(f"Direct download: {filename}")
try:
start_time = time.time()
downloaded_bytes = [0]
total_bytes = [0]
last_progress_time = [start_time]
def progress_callback(bytes_downloaded: int, content_length: int) -> None:
downloaded_bytes[0] = bytes_downloaded
total_bytes[0] = content_length
now = time.time()
if now - last_progress_time[0] >= 0.5 and total_bytes[0] > 0:
elapsed = now - start_time
percent = (bytes_downloaded / content_length) * 100 if content_length > 0 else 0
speed = bytes_downloaded / elapsed if elapsed > 0 else 0
eta_seconds = (content_length - bytes_downloaded) / speed if speed > 0 else 0
speed_str = progress_bar.format_bytes(speed) + "/s"
minutes, seconds = divmod(int(eta_seconds), 60)
hours, minutes = divmod(minutes, 60)
eta_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
progress_line = progress_bar.format_progress(
percent_str=f"{percent:.1f}%",
downloaded=bytes_downloaded,
total=content_length,
speed_str=speed_str,
eta_str=eta_str,
)
sys.stderr.write("\r" + progress_line + " ")
sys.stderr.flush()
last_progress_time[0] = now
with HTTPClient(timeout=30.0) as client:
client.download(url, str(file_path), progress_callback=progress_callback)
# Clear progress line after completion.
sys.stderr.write("\r" + (" " * 140) + "\r")
sys.stderr.write("\n")
sys.stderr.flush()
# For direct file downloads, create minimal info dict without filename as title
# This prevents creating duplicate title: tags when filename gets auto-generated
# We'll add title back later only if we couldn't extract meaningful tags
info = {
"id": filename.rsplit(".", 1)[0],
"ext": filename.rsplit(".", 1)[1] if "." in filename else "bin",
"webpage_url": url,
}
hash_value = None
try:
hash_value = sha256_file(file_path)
except Exception:
pass
tags = []
if extract_ytdlp_tags:
try:
tags = extract_ytdlp_tags(info)
except Exception as e:
log(f"Error extracting tags: {e}", file=sys.stderr)
# Only use filename as a title tag if we couldn't extract any meaningful tags
# This prevents duplicate title: tags when the filename could be mistaken for metadata
if not any(t.startswith('title:') for t in tags):
# Re-extract tags with filename as title only if needed
info['title'] = filename
tags = []
if extract_ytdlp_tags:
try:
tags = extract_ytdlp_tags(info)
except Exception as e:
log(f"Error extracting tags with filename: {e}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"direct-file-downloaded",
{"url": url, "path": str(file_path), "hash": hash_value},
)
return DownloadMediaResult(
path=file_path,
info=info,
tag=tags,
source_url=url,
hash_value=hash_value,
)
except (httpx.HTTPError, httpx.RequestError) as exc:
log(f"Download error: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{"phase": "direct-file", "url": url, "error": str(exc)},
)
raise DownloadError(f"Failed to download {url}: {exc}") from exc
except Exception as exc:
log(f"Error downloading file: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{
"phase": "direct-file",
"url": url,
"error": str(exc),
"traceback": traceback.format_exc(),
},
)
raise DownloadError(f"Error downloading file: {exc}") from exc
def probe_url(url: str, no_playlist: bool = False, timeout_seconds: int = 15, *, cookiefile: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Probe URL to extract metadata WITHOUT downloading.
Args:
url: URL to probe
no_playlist: If True, ignore playlists and probe only the single video
timeout_seconds: Max seconds to wait for probe (default 15s)
Returns:
Dict with keys: extractor, title, entries (if playlist), duration, etc.
Returns None if not supported by yt-dlp or on timeout.
"""
if not is_url_supported_by_ytdlp(url):
return None
# Wrap probe in timeout to prevent hanging on large playlists
import threading
from typing import cast
result_container: List[Optional[Any]] = [None, None] # [result, error]
def _do_probe() -> None:
try:
_ensure_yt_dlp_ready()
assert yt_dlp is not None
# Extract info without downloading
# Use extract_flat='in_playlist' to get full metadata for playlist items
ydl_opts = {
"quiet": True, # Suppress all output
"no_warnings": True,
"socket_timeout": 10,
"retries": 2, # Reduce retries for faster timeout
"skip_download": True, # Don't actually download
"extract_flat": "in_playlist", # Get playlist with metadata for each entry
"noprogress": True, # No progress bars
}
if cookiefile:
ydl_opts["cookiefile"] = str(cookiefile)
# Add no_playlist option if specified
if no_playlist:
ydl_opts["noplaylist"] = True
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
info = ydl.extract_info(url, download=False)
if not isinstance(info, dict):
result_container[0] = None
return
# Extract relevant fields
webpage_url = info.get("webpage_url") or info.get("original_url") or info.get("url")
result_container[0] = {
"extractor": info.get("extractor", ""),
"title": info.get("title", ""),
"entries": info.get("entries", []), # Will be populated if playlist
"duration": info.get("duration"),
"uploader": info.get("uploader"),
"description": info.get("description"),
# Keep both the requested and canonical URL forms; callers should prefer webpage_url.
"requested_url": url,
"webpage_url": webpage_url,
}
except Exception as exc:
log(f"Probe error for {url}: {exc}")
result_container[1] = exc
thread = threading.Thread(target=_do_probe, daemon=False)
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
# Probe timed out - return None to fall back to direct download
debug(f"Probe timeout for {url} (>={timeout_seconds}s), proceeding with download")
return None
if result_container[1] is not None:
# Probe error - return None to proceed anyway
return None
return cast(Optional[Dict[str, Any]], result_container[0])
def download_media(
opts: DownloadOptions,
*,
debug_logger: Optional[DebugLogger] = None,
) -> Any:
"""Download media from URL using yt-dlp or direct HTTP download.
Args:
opts: DownloadOptions with url, mode, output_dir, etc.
debug_logger: Optional debug logger for troubleshooting
Returns:
DownloadMediaResult with path, info, tags, hash
Raises:
DownloadError: If download fails
"""
# Handle LibGen url specially
# file.php redirects to mirrors, get.php is direct from modern API
if 'libgen' in opts.url.lower():
if '/get.php' in opts.url.lower():
# Modern API get.php links are direct downloads from mirrors (not file redirects)
if not opts.quiet:
log(f"Detected LibGen get.php URL, downloading directly...")
if debug_logger is not None:
debug_logger.write_record("libgen-direct", {"url": opts.url})
return _download_direct_file(opts.url, opts.output_dir, debug_logger, quiet=opts.quiet)
elif '/file.php' in opts.url.lower():
# Old-style file.php redirects to mirrors, we need to resolve
if not opts.quiet:
log(f"Detected LibGen file.php URL, resolving to actual mirror...")
actual_url = _get_libgen_download_url(opts.url)
if actual_url and actual_url != opts.url:
if not opts.quiet:
log(f"Resolved LibGen URL to mirror: {actual_url}")
opts.url = actual_url
# After resolution, this will typically be an onion link or direct file
# Skip yt-dlp for this (it won't support onion/mirrors), go direct
if debug_logger is not None:
debug_logger.write_record("libgen-resolved", {"original": opts.url, "resolved": actual_url})
return _download_direct_file(opts.url, opts.output_dir, debug_logger, quiet=opts.quiet)
else:
if not opts.quiet:
log(f"Could not resolve LibGen URL, trying direct download anyway", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record("libgen-resolve-failed", {"url": opts.url})
return _download_direct_file(opts.url, opts.output_dir, debug_logger, quiet=opts.quiet)
# Handle GoFile shares with a dedicated resolver before yt-dlp/direct fallbacks
try:
netloc = urlparse(opts.url).netloc.lower()
except Exception:
netloc = ""
if "gofile.io" in netloc:
msg = "GoFile links are currently unsupported"
if not opts.quiet:
debug(msg)
if debug_logger is not None:
debug_logger.write_record("gofile-unsupported", {"url": opts.url})
raise DownloadError(msg)
# Determine if yt-dlp should be used
ytdlp_supported = is_url_supported_by_ytdlp(opts.url)
if ytdlp_supported:
# Skip probe for playlists with item selection (probe can hang on large playlists)
# Just proceed straight to download which will handle item selection
if opts.playlist_items:
debug(f"Skipping probe for playlist (item selection: {opts.playlist_items}), proceeding with download")
probe_result = {"url": opts.url} # Minimal probe result
else:
probe_cookiefile = None
try:
if opts.cookies_path and opts.cookies_path.is_file():
probe_cookiefile = str(opts.cookies_path)
except Exception:
probe_cookiefile = None
probe_result = probe_url(opts.url, no_playlist=opts.no_playlist, timeout_seconds=15, cookiefile=probe_cookiefile)
if probe_result is None:
if not opts.quiet:
log(f"URL supported by yt-dlp but no media detected, falling back to direct download: {opts.url}")
if debug_logger is not None:
debug_logger.write_record("ytdlp-skip-no-media", {"url": opts.url})
return _download_direct_file(opts.url, opts.output_dir, debug_logger, quiet=opts.quiet)
else:
if not opts.quiet:
log(f"URL not supported by yt-dlp, trying direct download: {opts.url}")
if debug_logger is not None:
debug_logger.write_record("direct-file-attempt", {"url": opts.url})
return _download_direct_file(opts.url, opts.output_dir, debug_logger, quiet=opts.quiet)
_ensure_yt_dlp_ready()
ytdl_options = _build_ytdlp_options(opts)
if not opts.quiet:
debug(f"Starting yt-dlp download: {opts.url}")
if debug_logger is not None:
debug_logger.write_record("ytdlp-start", {"url": opts.url})
assert yt_dlp is not None
try:
# Debug: show what options we're using
if not opts.quiet:
if ytdl_options.get("download_sections"):
debug(f"[yt-dlp] download_sections: {ytdl_options['download_sections']}")
debug(f"[yt-dlp] force_keyframes_at_cuts: {ytdl_options.get('force_keyframes_at_cuts', False)}")
# Use subprocess when download_sections are present (Python API doesn't support them properly)
session_id = None
first_section_info = {}
if ytdl_options.get("download_sections"):
session_id, first_section_info = _download_with_sections_via_cli(opts.url, ytdl_options, ytdl_options.get("download_sections", []), quiet=opts.quiet)
info = None
else:
with yt_dlp.YoutubeDL(ytdl_options) as ydl: # type: ignore[arg-type]
info = ydl.extract_info(opts.url, download=True)
except Exception as exc:
log(f"yt-dlp failed: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{
"phase": "yt-dlp",
"error": str(exc),
"traceback": traceback.format_exc(),
},
)
raise DownloadError("yt-dlp download failed") from exc
# If we used subprocess, we need to find the file manually
if info is None:
# Find files created/modified during this download (after we started)
# Look for files matching the expected output template pattern
try:
import glob
import time
import re
# Get the expected filename pattern from outtmpl
# For sections: "C:\path\{session_id}.section_1_of_3.ext", etc.
# For non-sections: "C:\path\title.ext"
# Wait a moment to ensure files are fully written
time.sleep(0.5)
# List all files in output_dir, sorted by modification time
files = sorted(opts.output_dir.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True)
if not files:
raise FileNotFoundError(f"No files found in {opts.output_dir}")
# If we downloaded sections, look for files with the session_id pattern
if opts.clip_sections and session_id:
# Pattern: "{session_id}_1.ext", "{session_id}_2.ext", etc.
section_pattern = re.compile(rf'^{re.escape(session_id)}_(\d+)\.')
matching_files = [f for f in files if section_pattern.search(f.name)]
if matching_files:
# Sort by section number to ensure correct order
def extract_section_num(path: Path) -> int:
match = section_pattern.search(path.name)
return int(match.group(1)) if match else 999
matching_files.sort(key=extract_section_num)
debug(f"Found {len(matching_files)} section file(s) matching pattern")
# Now rename section files to use hash-based names
# This ensures unique filenames for each section content
renamed_files = []
for idx, section_file in enumerate(matching_files, 1):
try:
# Calculate hash for the file
file_hash = sha256_file(section_file)
ext = section_file.suffix
new_name = f"{file_hash}{ext}"
new_path = opts.output_dir / new_name
if new_path.exists() and new_path != section_file:
# If file with same hash exists, use it and delete the temp one
debug(f"File with hash {file_hash} already exists, using existing file.")
try:
section_file.unlink()
except OSError:
pass
renamed_files.append(new_path)
else:
section_file.rename(new_path)
debug(f"Renamed section file: {section_file.name}{new_name}")
renamed_files.append(new_path)
except Exception as e:
debug(f"Failed to process section file {section_file.name}: {e}")
renamed_files.append(section_file)
media_path = renamed_files[0]
media_paths = renamed_files
if not opts.quiet:
debug(f"✓ Downloaded {len(media_paths)} section file(s) (session: {session_id})")
else:
# Fallback to most recent file if pattern not found
media_path = files[0]
media_paths = None
if not opts.quiet:
debug(f"✓ Downloaded section file (pattern not found): {media_path.name}")
else:
# No sections, just take the most recent file
media_path = files[0]
media_paths = None
if not opts.quiet:
debug(f"✓ Downloaded: {media_path.name}")
if debug_logger is not None:
debug_logger.write_record("ytdlp-file-found", {"path": str(media_path)})
except Exception as exc:
log(f"Error finding downloaded file: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{"phase": "find-file", "error": str(exc)},
)
raise DownloadError(str(exc)) from exc
# Create result with minimal data extracted from filename
file_hash = sha256_file(media_path)
# For section downloads, create tags with the title and build proper info dict
tags = []
title = ''
if first_section_info:
title = first_section_info.get('title', '')
if title:
tags.append(f'title:{title}')
debug(f"Added title tag for section download: {title}")
# Build info dict - always use extracted title if available, not hash
if first_section_info:
info_dict = first_section_info
else:
info_dict = {
"id": media_path.stem,
"title": title or media_path.stem,
"ext": media_path.suffix.lstrip(".")
}
return DownloadMediaResult(
path=media_path,
info=info_dict,
tag=tags,
source_url=opts.url,
hash_value=file_hash,
paths=media_paths, # Include all section files if present
)
if not isinstance(info, dict):
log(f"Unexpected yt-dlp response: {type(info)}", file=sys.stderr)
raise DownloadError("Unexpected yt-dlp response type")
info_dict: Dict[str, Any] = info
if debug_logger is not None:
debug_logger.write_record(
"ytdlp-info",
{
"keys": sorted(info_dict.keys()),
"is_playlist": bool(info_dict.get("entries")),
},
)
# Playlist/album handling: resolve ALL downloaded entries and return multiple results.
# The cmdlet will emit one PipeObject per downloaded file.
if info_dict.get("entries") and not opts.no_playlist:
resolved = _resolve_entries_and_paths(info_dict, opts.output_dir)
if resolved:
results: List[DownloadMediaResult] = []
for entry, media_path in resolved:
hash_value = _extract_sha256(entry) or _extract_sha256(info_dict)
if not hash_value:
try:
hash_value = sha256_file(media_path)
except OSError:
hash_value = None
tags: List[str] = []
if extract_ytdlp_tags:
try:
tags = extract_ytdlp_tags(entry)
except Exception as e:
log(f"Error extracting tags: {e}", file=sys.stderr)
source_url = (
entry.get("webpage_url")
or entry.get("original_url")
or entry.get("url")
or opts.url
)
results.append(
DownloadMediaResult(
path=media_path,
info=entry,
tag=tags,
source_url=source_url,
hash_value=hash_value,
)
)
if not opts.quiet:
debug(f"✓ Downloaded playlist items: {len(results)}")
return results
try:
entry, media_path = _resolve_entry_and_path(info_dict, opts.output_dir)
except FileNotFoundError as exc:
log(f"Error: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{"phase": "resolve-path", "error": str(exc)},
)
raise DownloadError(str(exc)) from exc
if debug_logger is not None:
debug_logger.write_record(
"resolved-media",
{"path": str(media_path), "entry_keys": sorted(entry.keys())},
)
# Extract hash from metadata or compute
hash_value = _extract_sha256(entry) or _extract_sha256(info_dict)
if not hash_value:
try:
hash_value = sha256_file(media_path)
except OSError as exc:
if debug_logger is not None:
debug_logger.write_record(
"hash-error",
{"path": str(media_path), "error": str(exc)},
)
# Extract tags using metadata.py
tags = []
if extract_ytdlp_tags:
try:
tags = extract_ytdlp_tags(entry)
except Exception as e:
log(f"Error extracting tags: {e}", file=sys.stderr)
source_url = (
entry.get("webpage_url")
or entry.get("original_url")
or entry.get("url")
)
if not opts.quiet:
debug(f"✓ Downloaded: {media_path.name} ({len(tags)} tags)")
if debug_logger is not None:
debug_logger.write_record(
"downloaded",
{
"path": str(media_path),
"tag_count": len(tags),
"source_url": source_url,
"sha256": hash_value,
},
)
return DownloadMediaResult(
path=media_path,
info=entry,
tag=tags,
source_url=source_url,
hash_value=hash_value,
)
# Timeout handler to prevent yt-dlp hangs
def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300) -> Any:
"""Download with timeout protection.
Args:
opts: DownloadOptions
timeout_seconds: Max seconds to wait (default 300s = 5 min)
Returns:
DownloadMediaResult or List[DownloadMediaResult]
Raises:
DownloadError: If timeout exceeded
"""
import threading
from typing import cast
result_container: List[Optional[Any]] = [None, None] # [result, error]
def _do_download() -> None:
try:
result_container[0] = download_media(opts)
except Exception as e:
result_container[1] = e
thread = threading.Thread(target=_do_download, daemon=False)
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
# Thread still running - timeout
raise DownloadError(f"Download timeout after {timeout_seconds} seconds for {opts.url}")
if result_container[1] is not None:
raise cast(Exception, result_container[1])
if result_container[0] is None:
raise DownloadError(f"Download failed for {opts.url}")
return cast(Any, result_container[0])
class Download_Media(Cmdlet):
"""Class-based download-media cmdlet - yt-dlp only, streaming sites."""
def __init__(self) -> None:
"""Initialize download-media cmdlet."""
super().__init__(
name="download-media",
summary="Download media from streaming sites (YouTube, Twitch, etc.)",
usage="download-media <url> [options] or search-file | download-media [options]",
alias=[""],
arg=[
SharedArgs.URL,
CmdletArg(name="audio", type="flag", alias="a", description="Download audio only"),
CmdletArg(name="format", type="string", alias="fmt", description="Explicit yt-dlp format selector"),
CmdletArg(name="clip", type="string", description="Extract time range: MM:SS-MM:SS"),
CmdletArg(name="item", type="string", description="Item selection for playlists/formats"),
SharedArgs.PATH
],
detail=["Download media from streaming sites using yt-dlp.", "For direct file downloads, use download-file."],
exec=self.run,
)
self.register()
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Main execution method."""
stage_ctx = pipeline_context.get_stage_context()
in_pipeline = stage_ctx is not None and getattr(stage_ctx, "total_stages", 1) > 1
if in_pipeline and isinstance(config, dict):
config["_quiet_background_output"] = True
return self._run_impl(result, args, config)
def _run_impl(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Main download implementation for yt-dlp-supported url."""
try:
debug("Starting download-media")
ytdlp_tool = YtDlpTool(config)
# Parse arguments
parsed = parse_cmdlet_args(args, self)
# Extract options
raw_url = parsed.get("url", [])
if isinstance(raw_url, str):
raw_url = [raw_url]
# Allow a single quoted argument containing multiple URLs separated by commas.
# Example: download-media "https://a,https://b" -audio
expanded_urls: List[str] = []
for u in (raw_url or []):
if u is None:
continue
s = str(u).strip()
if not s:
continue
if "," in s:
parts = [p.strip() for p in s.split(",")]
expanded_urls.extend([p for p in parts if p])
else:
expanded_urls.append(s)
if expanded_urls:
raw_url = expanded_urls
# If no url provided via args, try to extract from piped result
if not raw_url and result:
# Handle single result or list of results
results_to_check = result if isinstance(result, list) else [result]
for item in results_to_check:
# Try to get URL from various possible fields
url = get_field(item, "url") or get_field(item, "target")
if url:
raw_url.append(url)
# Filter to yt-dlp supported url only
supported_url = [
url for url in raw_url
if is_url_supported_by_ytdlp(url)
]
if not supported_url:
log("No yt-dlp-supported url to download", file=sys.stderr)
return 1
# Log unsupported url if any
unsupported = set(raw_url) - set(supported_url)
if unsupported:
debug(f"Skipping {len(unsupported)} unsupported url (use download-file for direct downloads)")
# Get output directory
final_output_dir = self._resolve_output_dir(parsed, config)
if not final_output_dir:
return 1
debug(f"Output directory: {final_output_dir}")
# Get other options
clip_spec = parsed.get("clip")
# Always enable chapters + subtitles so downstream pipes (e.g. mpv) can consume them.
embed_chapters = True
write_sub = True
mode = "audio" if parsed.get("audio") else "video"
# Parse clip range(s) if specified
clip_ranges: Optional[List[tuple[int, int]]] = None
if clip_spec:
clip_ranges = self._parse_time_ranges(str(clip_spec))
if not clip_ranges:
log(f"Invalid clip format: {clip_spec}", file=sys.stderr)
return 1
quiet_mode = bool(config.get("_quiet_background_output")) if isinstance(config, dict) else False
storage = None
hydrus_available = True
try:
from Store import Store
storage = Store(config=config or {}, suppress_debug=True)
from API.HydrusNetwork import is_hydrus_available
hydrus_available = bool(is_hydrus_available(config or {}))
except Exception:
storage = None
def _preflight_url_duplicate(candidate_url: str, extra_urls: Optional[Sequence[str]] = None) -> bool:
# NOTE: download-media sets _quiet_background_output=True when running in a pipeline to
# reduce background noise. URL de-dup is interactive and must still run in pipelines.
if storage is None:
debug("Preflight URL check skipped: storage unavailable")
return True
debug(f"Preflight URL check: candidate={candidate_url}")
try:
from metadata import normalize_urls
except Exception:
normalize_urls = None # type: ignore[assignment]
needles: List[str] = []
if normalize_urls is not None:
for raw in [candidate_url, *(list(extra_urls) if extra_urls else [])]:
try:
needles.extend(normalize_urls(raw))
except Exception:
continue
# Fallback: always have at least one needle
if not needles:
needles = [str(candidate_url)]
# Deduplicate needles (preserve order)
seen_needles: List[str] = []
for needle in needles:
if needle and needle not in seen_needles:
seen_needles.append(needle)
needles = seen_needles
try:
debug(f"Preflight URL needles: {needles}")
except Exception:
pass
url_matches: List[Dict[str, Any]] = []
try:
from Store.HydrusNetwork import HydrusNetwork
# Avoid searching the temp/download directory backend during dedup.
# We only want to warn about duplicates in real stores.
backend_names_all = storage.list_searchable_backends()
backend_names: List[str] = []
skipped: List[str] = []
for backend_name in backend_names_all:
try:
backend = storage[backend_name]
except Exception:
continue
try:
if str(backend_name).strip().lower() == "temp":
skipped.append(backend_name)
continue
except Exception:
pass
# Heuristic: if a Folder backend points at the configured temp output dir, skip it.
try:
backend_location = getattr(backend, "_location", None)
if backend_location and final_output_dir:
backend_path = Path(str(backend_location)).expanduser().resolve()
temp_path = Path(str(final_output_dir)).expanduser().resolve()
if backend_path == temp_path:
skipped.append(backend_name)
continue
except Exception:
pass
backend_names.append(backend_name)
try:
if skipped:
debug(f"Preflight backends: {backend_names} (skipped temp: {skipped})")
else:
debug(f"Preflight backends: {backend_names}")
except Exception:
pass
for backend_name in backend_names:
backend = storage[backend_name]
if isinstance(backend, HydrusNetwork) and not hydrus_available:
continue
backend_hits: List[Dict[str, Any]] = []
for needle in needles:
try:
backend_hits = backend.search(f"url:{needle}", limit=25) or []
if backend_hits:
break
except Exception:
continue
if backend_hits:
url_matches.extend([dict(x) if isinstance(x, dict) else {"title": str(x)} for x in backend_hits])
if len(url_matches) >= 25:
url_matches = url_matches[:25]
break
except Exception:
url_matches = []
if not url_matches:
debug("Preflight URL check: no matches")
return True
table = ResultTable(f"URL already exists ({len(url_matches)} match(es))")
results_list: List[Dict[str, Any]] = []
for item in url_matches:
if "title" not in item:
item["title"] = item.get("name") or item.get("target") or item.get("path") or "Result"
table.add_result(item)
results_list.append(item)
pipeline_context.set_current_stage_table(table)
pipeline_context.set_last_result_table(table, results_list)
print(f"\n{table}")
response = input("Continue anyway? (y/n): ").strip().lower()
if response not in {"y", "yes"}:
return False
return True
def _canonicalize_url_for_storage(requested_url: str) -> str:
# Prefer yt-dlp's canonical webpage URL (e.g. strips timestamps/redirects).
# Fall back to the requested URL if probing fails.
# Important: when playlist item selection is used, avoid probing (can hang on large playlists).
if playlist_items:
return str(requested_url)
try:
cf = None
try:
cookie_path = ytdlp_tool.resolve_cookiefile()
if cookie_path is not None and cookie_path.is_file():
cf = str(cookie_path)
except Exception:
cf = None
pr = probe_url(requested_url, no_playlist=False, timeout_seconds=15, cookiefile=cf)
if isinstance(pr, dict):
for key in ("webpage_url", "original_url", "url", "requested_url"):
value = pr.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
except Exception:
pass
return str(requested_url)
# Check if we need to show format selection
playlist_items = str(parsed.get("item")) if parsed.get("item") else None
ytdl_format = parsed.get("format")
playlist_selection_handled = False
def _parse_at_selection(choice: str, *, max_index: int) -> Optional[List[int]]:
"""Parse @ selection syntax (@2, @2-5, @{1,3,5}, @2,5,7) into 1-based indices."""
raw = str(choice or "").strip()
if not raw:
return None
if raw.lower() in {"q", "quit", "cancel"}:
return None
if raw == "@*" or raw == "*":
return list(range(1, max_index + 1))
if raw.startswith("@"):
raw = raw[1:].strip()
if raw.startswith("{") and raw.endswith("}"):
raw = raw[1:-1].strip()
if not raw:
return None
indices: set[int] = set()
for part in raw.split(","):
part = part.strip()
if not part:
continue
if "-" in part:
left, right = [p.strip() for p in part.split("-", 1)]
if not left or not right:
return None
try:
start = int(left)
end = int(right)
except ValueError:
return None
if start < 1 or end < 1:
return None
if end < start:
start, end = end, start
for i in range(start, end + 1):
if 1 <= i <= max_index:
indices.add(i)
else:
try:
i = int(part)
except ValueError:
return None
if 1 <= i <= max_index:
indices.add(i)
if not indices:
return None
return sorted(indices)
def _maybe_prompt_playlist_items(url: str) -> Optional[Dict[str, Any]]:
"""If URL appears to be a playlist/channel/collection, prompt user for @ selection.
Returns:
- None if URL is not a playlist-like multi-entry page (or probe fails)
- Dict with keys:
- cancel: bool
- playlist_items: Optional[str] (None means download all)
- selected_urls: Optional[List[str]] (expanded per-entry urls when available)
"""
try:
cf = None
try:
cookie_path = ytdlp_tool.resolve_cookiefile()
if cookie_path is not None and cookie_path.is_file():
cf = str(cookie_path)
except Exception:
cf = None
pr = probe_url(url, no_playlist=False, timeout_seconds=15, cookiefile=cf)
except Exception:
pr = None
if not isinstance(pr, dict):
return None
entries = pr.get("entries")
if not isinstance(entries, list) or len(entries) <= 1:
return None
# Display table (limit rows to keep output reasonable)
max_rows = 200
display_entries = entries[:max_rows]
total = len(entries)
def _entry_to_url(entry: Any) -> Optional[str]:
if not isinstance(entry, dict):
return None
# Prefer explicit absolute URLs when present
for key in ("webpage_url", "original_url", "url"):
v = entry.get(key)
if isinstance(v, str) and v.strip():
s = v.strip()
try:
if urlparse(s).scheme in {"http", "https"}:
return s
except Exception:
return s
# Best-effort YouTube fallback from id
entry_id = entry.get("id")
if isinstance(entry_id, str) and entry_id.strip():
extractor_name = str(pr.get("extractor") or pr.get("extractor_key") or "").lower()
if "youtube" in extractor_name:
return f"https://www.youtube.com/watch?v={entry_id.strip()}"
return None
table = ResultTable()
table.title = f"Playlist items ({total}{' shown ' + str(len(display_entries)) if total > max_rows else ''})"
table.set_source_command("download-media", [url])
try:
table.set_preserve_order(True)
except Exception:
pass
results_list: List[Dict[str, Any]] = []
for idx, entry in enumerate(display_entries, 1):
title = None
uploader = None
duration = None
try:
if isinstance(entry, dict):
title = entry.get("title")
uploader = entry.get("uploader") or pr.get("uploader")
duration = entry.get("duration")
except Exception:
pass
row: Dict[str, Any] = {
"table": "download-media",
"title": str(title or f"Item {idx}"),
"detail": str(uploader or ""),
"media_kind": "playlist-item",
"playlist_index": idx,
"columns": [
("#", str(idx)),
("Title", str(title or "")),
("Duration", str(duration or "")),
("Uploader", str(uploader or "")),
],
}
results_list.append(row)
table.add_result(row)
pipeline_context.set_current_stage_table(table)
pipeline_context.set_last_result_table(table, results_list)
print(f"\n{table}")
choice = input("Select items to download (@N, @2-5, @{1,3}, @*, or 'q' to cancel): ").strip()
if not choice or choice.lower() in {"q", "quit", "cancel"}:
return {"cancel": True, "playlist_items": None, "selected_urls": []}
if choice.strip() == "@*" or choice.strip() == "*":
# @* means all entries, not just displayed rows.
selected_urls: List[str] = []
for entry in entries:
u = _entry_to_url(entry)
if u and u not in selected_urls:
selected_urls.append(u)
# Only expand when we can derive URLs for all entries; otherwise fall back to yt-dlp playlist handling.
if len(selected_urls) == len(entries):
return {"cancel": False, "playlist_items": None, "selected_urls": selected_urls}
return {"cancel": False, "playlist_items": None, "selected_urls": []}
parsed_indices = _parse_at_selection(choice, max_index=len(display_entries))
if not parsed_indices:
log("Invalid selection. Use @N, @2-5, @{1,3}, or @*", file=sys.stderr)
return {"cancel": True, "playlist_items": None, "selected_urls": []}
selected_urls: List[str] = []
for i in parsed_indices:
try:
entry = display_entries[i - 1]
except Exception:
continue
u = _entry_to_url(entry)
if u and u not in selected_urls:
selected_urls.append(u)
# If we can expand per-entry URLs, return them.
if selected_urls and len(selected_urls) == len(parsed_indices):
return {"cancel": False, "playlist_items": None, "selected_urls": selected_urls}
# yt-dlp accepts comma-separated 1-based indices for playlist_items
return {"cancel": False, "playlist_items": ",".join(str(i) for i in parsed_indices), "selected_urls": []}
# Playlist/multi-entry detection: if the URL has multiple items and the user didn't
# specify -item, prompt for @ selection (supports @* for all).
if len(supported_url) == 1 and not playlist_items and not ytdl_format:
candidate_url = supported_url[0]
selection_info = _maybe_prompt_playlist_items(candidate_url)
if selection_info is not None:
playlist_selection_handled = True
if bool(selection_info.get("cancel")):
return 0
selected_urls = selection_info.get("selected_urls")
if isinstance(selected_urls, list) and selected_urls:
# Expand playlist/channel URL into per-entry URLs so that de-dup preflight
# and downloads operate per file.
supported_url = selected_urls
playlist_items = None
else:
playlist_items = selection_info.get("playlist_items")
# If no -item, no explicit -format specified, and single URL, show the format table.
# Do NOT stop to show formats when -audio is used (auto-pick) or when -clip is used.
if (
mode != "audio"
and not clip_spec
and not playlist_items
and not ytdl_format
and len(supported_url) == 1
and not playlist_selection_handled
):
url = supported_url[0]
canonical_url = _canonicalize_url_for_storage(url)
if not _preflight_url_duplicate(canonical_url, extra_urls=[url]):
log(f"Skipping download: {url}", file=sys.stderr)
return 0
formats = list_formats(url, no_playlist=False)
if formats and len(formats) > 1:
# Filter formats: multiple videos (640x+, one per resolution tier) + 1 best audio
video_formats = []
audio_formats = []
for fmt in formats:
width = fmt.get("width") or 0
height = fmt.get("height") or 0
vcodec = fmt.get("vcodec", "none")
acodec = fmt.get("acodec", "none")
# Classify as video or audio
if vcodec != "none" and acodec == "none" and width >= 640:
video_formats.append(fmt)
elif acodec != "none" and vcodec == "none":
audio_formats.append(fmt)
# Group videos by resolution and select best format per resolution
filtered_formats = []
if video_formats:
# Group by height (resolution tier)
from collections import defaultdict
by_resolution = defaultdict(list)
for f in video_formats:
height = f.get("height") or 0
by_resolution[height].append(f)
# For each resolution, prefer AV1, then highest bitrate
for height in sorted(by_resolution.keys(), reverse=True):
candidates = by_resolution[height]
av1_formats = [f for f in candidates if "av01" in f.get("vcodec", "")]
if av1_formats:
best = max(av1_formats, key=lambda f: f.get("tbr") or 0)
else:
best = max(candidates, key=lambda f: f.get("tbr") or 0)
filtered_formats.append(best)
# Select best audio: highest bitrate (any format)
if audio_formats:
best_audio = max(audio_formats, key=lambda f: f.get("tbr") or f.get("abr") or 0)
filtered_formats.append(best_audio)
if not filtered_formats:
# Fallback to all formats if filtering resulted in nothing
filtered_formats = formats
debug(f"Filtered to {len(filtered_formats)} formats from {len(formats)} total")
# Show format selection table
log(f"Available formats for {url}:", file=sys.stderr)
log("", file=sys.stderr)
# Build the base command that will be replayed with @N selection
# Include any additional args from the original command
base_cmd = f'download-media "{url}"'
# Preserve any additional pipeline stages if this is in a pipeline
remaining_args = [arg for arg in args if arg not in [url] and not arg.startswith('-')]
if remaining_args:
base_cmd += ' ' + ' '.join(remaining_args)
# Create result table for display
table = ResultTable()
table.title = f"Available formats for {url}"
table.set_source_command("download-media", [url])
# Collect results for table
results_list = []
# Emit format results for selection
for idx, fmt in enumerate(filtered_formats, 1):
resolution = fmt.get("resolution", "")
ext = fmt.get("ext", "")
vcodec = fmt.get("vcodec", "none")
acodec = fmt.get("acodec", "none")
filesize = fmt.get("filesize")
format_id = fmt.get("format_id", "")
# If the chosen format is video-only (no audio stream), automatically
# request best audio too so the resulting file has sound.
selection_format_id = format_id
try:
if vcodec != "none" and acodec == "none" and format_id:
selection_format_id = f"{format_id}+ba"
except Exception:
selection_format_id = format_id
# Format size
size_str = ""
if filesize:
size_mb = filesize / (1024 * 1024)
size_str = f"{size_mb:.1f}MB"
# Build format description
desc_parts = []
if resolution and resolution != "audio only":
desc_parts.append(resolution)
if ext:
desc_parts.append(ext.upper())
if vcodec != "none":
desc_parts.append(f"v:{vcodec}")
if acodec != "none":
desc_parts.append(f"a:{acodec}")
if size_str:
desc_parts.append(size_str)
format_desc = " | ".join(desc_parts)
# Build format dict for emission and table
format_dict = {
"table": "download-media",
"title": f"Format {format_id}",
"url": url,
"target": url,
"detail": format_desc,
"annotations": [ext, resolution] if resolution else [ext],
"media_kind": "format",
"cmd": base_cmd,
"columns": [
("#", str(idx)),
("ID", format_id),
("Resolution", resolution or "N/A"),
("Ext", ext),
("Video", vcodec),
("Audio", acodec),
("Size", size_str or "N/A"),
],
"full_metadata": {
"format_id": format_id,
"url": url,
"item_selector": selection_format_id,
},
"_selection_args": ["-format", selection_format_id]
}
# Add to results list and table (don't emit - formats should wait for @N selection)
results_list.append(format_dict)
table.add_result(format_dict)
# Render and display the table
# Table is displayed by pipeline runner via set_current_stage_table
# Set the result table so it displays and is available for @N selection
pipeline_context.set_current_stage_table(table)
pipeline_context.set_last_result_table(table, results_list)
log(f"", file=sys.stderr)
log(f"Use: @N | download-media to select and download format", file=sys.stderr)
return 0
# Download each URL
downloaded_count = 0
clip_sections_spec = self._build_clip_sections_spec(clip_ranges)
for url in supported_url:
try:
debug(f"Processing: {url}")
canonical_url = _canonicalize_url_for_storage(url)
# Preflight: warn if URL already exists in storage backends.
if not _preflight_url_duplicate(canonical_url, extra_urls=[url]):
log(f"Skipping download: {url}", file=sys.stderr)
continue
# If playlist_items is specified but looks like a format ID (e.g. from table selection),
# treat it as a format selector instead of playlist items.
# This handles the case where @N selection passes -item <format_id>
actual_format = ytdl_format
actual_playlist_items = playlist_items
if playlist_items and not ytdl_format:
# Heuristic: if it contains non-numeric chars (excluding ranges/commas)
# it is likely a format ID (e.g. '140-drc', 'best', '137+140')
import re
if re.search(r'[^0-9,-]', playlist_items):
actual_format = playlist_items
actual_playlist_items = None
# For -audio, default to yt-dlp's built-in bestaudio selector.
# This should *not* require interactive format picking.
if mode == "audio" and not actual_format:
actual_format = "bestaudio"
# If no explicit format is provided for video mode, allow a config override.
if mode == "video" and not actual_format:
configured = (ytdlp_tool.default_format("video") or "").strip()
if configured and configured != "bestvideo+bestaudio/best":
actual_format = configured
# If a single format id was chosen and it is video-only, auto-merge best audio.
if (
actual_format
and isinstance(actual_format, str)
and mode != "audio"
and "+" not in actual_format
and "/" not in actual_format
and "[" not in actual_format
and actual_format not in {"best", "bv", "ba", "b"}
):
try:
formats = list_formats(url, no_playlist=False, playlist_items=actual_playlist_items)
if formats:
fmt_match = next(
(f for f in formats if str(f.get("format_id", "")) == actual_format),
None,
)
if fmt_match:
vcodec = str(fmt_match.get("vcodec", "none"))
acodec = str(fmt_match.get("acodec", "none"))
if vcodec != "none" and acodec == "none":
debug(
f"Selected video-only format {actual_format}; using {actual_format}+ba for audio"
)
actual_format = f"{actual_format}+ba"
except Exception:
pass
opts = DownloadOptions(
url=url,
mode=mode,
output_dir=final_output_dir,
ytdl_format=actual_format,
cookies_path=ytdlp_tool.resolve_cookiefile(),
clip_sections=clip_sections_spec,
playlist_items=actual_playlist_items,
quiet=quiet_mode,
no_playlist=False,
embed_chapters=embed_chapters,
write_sub=write_sub,
)
# Use timeout wrapper to prevent hanging
debug(f"Starting download with 5-minute timeout...")
result_obj = _download_with_timeout(opts, timeout_seconds=300)
debug(f"Download completed, building pipe object...")
# Expand result set:
# - playlists return a list
# - section clips return a single DownloadMediaResult with `paths` populated
results_to_emit: List[Any] = []
if isinstance(result_obj, list):
results_to_emit = list(result_obj)
else:
paths = getattr(result_obj, "paths", None)
if isinstance(paths, list) and paths:
# Create one DownloadMediaResult per section file
for p in paths:
try:
p_path = Path(p)
except Exception:
continue
if not p_path.exists() or p_path.is_dir():
continue
try:
hv = sha256_file(p_path)
except Exception:
hv = None
results_to_emit.append(
DownloadMediaResult(
path=p_path,
info=getattr(result_obj, "info", {}) or {},
tag=list(getattr(result_obj, "tag", []) or []),
source_url=getattr(result_obj, "source_url", None) or opts.url,
hash_value=hv,
)
)
else:
results_to_emit = [result_obj]
# Build PipeObjects first so we can attach cross-clip relationships.
pipe_objects: List[Dict[str, Any]] = []
for downloaded in results_to_emit:
po = self._build_pipe_object(downloaded, url, opts)
# Attach chapter timestamps for downstream consumers (e.g., mpv scripts)
# even if container embedding fails.
try:
info = downloaded.info if isinstance(getattr(downloaded, "info", None), dict) else {}
except Exception:
info = {}
chapters_text = _format_chapters_note(info) if embed_chapters else None
if chapters_text:
notes = po.get("notes")
if not isinstance(notes, dict):
notes = {}
notes.setdefault("chapters", chapters_text)
po["notes"] = notes
if write_sub:
try:
media_path = Path(str(po.get("path") or ""))
except Exception:
media_path = None
if media_path is not None and media_path.exists() and media_path.is_file():
sub_path = _best_subtitle_sidecar(media_path)
if sub_path is not None:
sub_text = _read_text_file(sub_path)
if sub_text:
notes = po.get("notes")
if not isinstance(notes, dict):
notes = {}
notes["sub"] = sub_text
po["notes"] = notes
pipe_objects.append(po)
# If this is a clip download, decorate titles/tags so the title: tag is clip-based.
# Relationship tags are only added when multiple clips exist.
try:
if clip_ranges and len(pipe_objects) == len(clip_ranges):
source_hash = self._find_existing_hash_for_url(storage, canonical_url, hydrus_available=hydrus_available)
self._apply_clip_decorations(pipe_objects, clip_ranges, source_king_hash=source_hash)
except Exception:
pass
debug(f"Emitting {len(pipe_objects)} result(s) to pipeline...")
stage_ctx = pipeline_context.get_stage_context()
emit_enabled = bool(stage_ctx is not None and not getattr(stage_ctx, "is_last_stage", False))
for pipe_obj_dict in pipe_objects:
# Only emit when there is a downstream stage.
# This keeps `download-media` from producing a result table when run standalone.
if emit_enabled:
pipeline_context.emit(pipe_obj_dict)
# Automatically register url with local library
if pipe_obj_dict.get("url"):
pipe_obj = coerce_to_pipe_object(pipe_obj_dict)
register_url_with_local_library(pipe_obj, config)
downloaded_count += len(pipe_objects)
debug("✓ Downloaded and emitted")
except DownloadError as e:
# Special-case yt-dlp format errors: show a selectable format list table so
# the user can pick a working format_id and continue the pipeline via @N.
cause = getattr(e, "__cause__", None)
detail = ""
try:
detail = str(cause or "")
except Exception:
detail = ""
if "requested format is not available" in (detail or "").lower() and mode != "audio":
formats = list_formats(url, no_playlist=False, playlist_items=actual_playlist_items)
if formats:
formats_to_show = formats
table = ResultTable()
table.title = f"Available formats for {url}"
table.set_source_command("download-media", [str(a) for a in (args or [])])
results_list: List[Dict[str, Any]] = []
for idx, fmt in enumerate(formats_to_show, 1):
resolution = fmt.get("resolution", "")
ext = fmt.get("ext", "")
vcodec = fmt.get("vcodec", "none")
acodec = fmt.get("acodec", "none")
filesize = fmt.get("filesize")
format_id = fmt.get("format_id", "")
selection_format_id = format_id
try:
if vcodec != "none" and acodec == "none" and format_id:
selection_format_id = f"{format_id}+ba"
except Exception:
selection_format_id = format_id
size_str = ""
if filesize:
try:
size_mb = float(filesize) / (1024 * 1024)
size_str = f"{size_mb:.1f}MB"
except Exception:
size_str = ""
desc_parts: List[str] = []
if resolution and resolution != "audio only":
desc_parts.append(str(resolution))
if ext:
desc_parts.append(str(ext).upper())
if vcodec != "none":
desc_parts.append(f"v:{vcodec}")
if acodec != "none":
desc_parts.append(f"a:{acodec}")
if size_str:
desc_parts.append(size_str)
format_desc = " | ".join(desc_parts)
format_dict: Dict[str, Any] = {
"table": "download-media",
"title": f"Format {format_id}",
"url": url,
"target": url,
"detail": format_desc,
"media_kind": "format",
"columns": [
("#", str(idx)),
("ID", format_id),
("Resolution", resolution or "N/A"),
("Ext", ext),
("Video", vcodec),
("Audio", acodec),
("Size", size_str or "N/A"),
],
"full_metadata": {
"format_id": format_id,
"url": url,
"item_selector": selection_format_id,
},
"_selection_args": ["-format", selection_format_id],
}
results_list.append(format_dict)
table.add_result(format_dict)
pipeline_context.set_current_stage_table(table)
pipeline_context.set_last_result_table(table, results_list)
# Returning 0 with no emits lets the CLI pause the pipeline for @N selection.
log("Requested format is not available; select a working format with @N", file=sys.stderr)
return 0
log(f"Download failed for {url}: {e}", file=sys.stderr)
except Exception as e:
log(f"Error processing {url}: {e}", file=sys.stderr)
if downloaded_count > 0:
debug(f"✓ Successfully processed {downloaded_count} URL(s)")
return 0
log("No downloads completed", file=sys.stderr)
return 1
except Exception as e:
log(f"Error in download-media: {e}", file=sys.stderr)
return 1
def _resolve_output_dir(self, parsed: Dict[str, Any], config: Dict[str, Any]) -> Optional[Path]:
"""Resolve the output directory.
Rules:
- If user passes `-path`, use that directory (override).
- Otherwise default to a temp directory (config["temp"] if present, else OS temp).
"""
# Priority 1: explicit output directory override
path_override = parsed.get("path")
if path_override:
try:
candidate = Path(str(path_override)).expanduser()
# If user passed a file path, treat its parent as output dir.
if candidate.suffix:
candidate = candidate.parent
candidate.mkdir(parents=True, exist_ok=True)
debug(f"Using output directory override: {candidate}")
return candidate
except Exception as e:
log(f"Invalid -path output directory: {e}", file=sys.stderr)
return None
# Priority 2: config-provided temp/output directory
try:
temp_value = (config or {}).get("temp") if isinstance(config, dict) else None
except Exception:
temp_value = None
if temp_value:
try:
candidate = Path(str(temp_value)).expanduser()
candidate.mkdir(parents=True, exist_ok=True)
debug(f"Using config temp directory: {candidate}")
return candidate
except Exception as e:
log(f"Cannot use configured temp directory '{temp_value}': {e}", file=sys.stderr)
return None
# Priority 3: OS temp fallback
try:
candidate = Path(tempfile.gettempdir()) / "Medios-Macina"
candidate.mkdir(parents=True, exist_ok=True)
debug(f"Using OS temp directory: {candidate}")
return candidate
except Exception as e:
log(f"Cannot create OS temp directory: {e}", file=sys.stderr)
return None
def _parse_time_ranges(self, spec: str) -> List[tuple[int, int]]:
"""Parse clip specs into a list of (start_seconds, end_seconds).
Supported inputs:
- "MM:SS-MM:SS"
- "HH:MM:SS-HH:MM:SS"
- seconds: "280-300"
- multiple ranges separated by commas: "4:40-5:00,5:15-5:25"
"""
def _to_seconds(ts: str) -> Optional[int]:
ts = str(ts).strip()
if not ts:
return None
if ":" in ts:
parts = [p.strip() for p in ts.split(":")]
if len(parts) == 2:
hh_s = "0"
mm_s, ss_s = parts
elif len(parts) == 3:
hh_s, mm_s, ss_s = parts
else:
return None
try:
hours = int(hh_s)
minutes = int(mm_s)
seconds = float(ss_s)
total = (hours * 3600) + (minutes * 60) + seconds
return int(total)
except Exception:
return None
try:
return int(float(ts))
except Exception:
return None
ranges: List[tuple[int, int]] = []
if not spec:
return ranges
for piece in str(spec).split(","):
piece = piece.strip()
if not piece:
continue
if "-" not in piece:
return []
start_s, end_s = [p.strip() for p in piece.split("-", 1)]
start = _to_seconds(start_s)
end = _to_seconds(end_s)
if start is None or end is None or start >= end:
return []
ranges.append((start, end))
return ranges
def _build_clip_sections_spec(
self,
clip_ranges: Optional[List[tuple[int, int]]],
) -> Optional[str]:
"""Convert parsed clip range into downloader spec (seconds)."""
ranges: List[str] = []
if clip_ranges:
for start_s, end_s in clip_ranges:
ranges.append(f"{start_s}-{end_s}")
return ",".join(ranges) if ranges else None
def _build_pipe_object(self, download_result: Any, url: str, opts: DownloadOptions) -> Dict[str, Any]:
"""Create a PipeObject-compatible dict from a DownloadMediaResult."""
info: Dict[str, Any] = download_result.info if isinstance(download_result.info, dict) else {}
media_path = Path(download_result.path)
hash_value = download_result.hash_value or self._compute_file_hash(media_path)
title = info.get("title") or media_path.stem
tag = list(download_result.tag or [])
# Add title tag for searchability
if title and f"title:{title}" not in tag:
tag.insert(0, f"title:{title}")
# Store the canonical URL for de-dup/search purposes.
# Prefer yt-dlp's webpage_url, and do not mix in the raw requested URL (which may contain timestamps).
final_url = None
try:
page_url = info.get("webpage_url") or info.get("original_url") or info.get("url")
if page_url:
final_url = str(page_url)
except Exception:
final_url = None
if not final_url and url:
final_url = str(url)
# Construct canonical PipeObject dict: hash, store, path, url, title, tags
# Prefer explicit backend names (storage_name/storage_location). If none, default to PATH
# which indicates the file is available at a filesystem path and hasn't been added to a backend yet.
return {
"path": str(media_path),
"hash": hash_value,
"title": title,
"url": final_url,
"tag": tag,
"action": "cmdlet:download-media",
"is_temp": True,
# download_mode removed (deprecated), keep media_kind
"store": getattr(opts, "storage_name", None) or getattr(opts, "storage_location", None) or "PATH",
"media_kind": "video" if opts.mode == "video" else "audio",
}
@staticmethod
def _normalise_hash_hex(value: Optional[str]) -> Optional[str]:
if not value or not isinstance(value, str):
return None
candidate = value.strip().lower()
if len(candidate) == 64 and all(c in "0123456789abcdef" for c in candidate):
return candidate
return None
@classmethod
def _extract_hash_from_search_hit(cls, hit: Any) -> Optional[str]:
if not isinstance(hit, dict):
return None
for key in ("hash", "hash_hex", "file_hash", "hydrus_hash"):
v = hit.get(key)
normalized = cls._normalise_hash_hex(str(v) if v is not None else None)
if normalized:
return normalized
return None
@classmethod
def _find_existing_hash_for_url(
cls,
storage: Any,
canonical_url: str,
*,
hydrus_available: bool,
) -> Optional[str]:
"""Best-effort lookup of an existing stored item hash by url:<canonical_url>.
Used to make the stored source video the king for multi-clip relationships.
"""
if storage is None or not canonical_url:
return None
try:
from Store.HydrusNetwork import HydrusNetwork
except Exception:
HydrusNetwork = None # type: ignore
try:
backend_names = list(storage.list_searchable_backends() or [])
except Exception:
backend_names = []
for backend_name in backend_names:
try:
backend = storage[backend_name]
except Exception:
continue
try:
if str(backend_name).strip().lower() == "temp":
continue
except Exception:
pass
try:
if HydrusNetwork is not None and isinstance(backend, HydrusNetwork) and not hydrus_available:
continue
except Exception:
pass
try:
hits = backend.search(f"url:{canonical_url}", limit=5) or []
except Exception:
hits = []
for hit in hits:
extracted = cls._extract_hash_from_search_hit(hit)
if extracted:
return extracted
return None
@staticmethod
def _format_timecode(seconds: int, *, force_hours: bool) -> str:
total = max(0, int(seconds))
minutes, secs = divmod(total, 60)
hours, minutes = divmod(minutes, 60)
if force_hours:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
@classmethod
def _format_clip_range(cls, start_s: int, end_s: int) -> str:
force_hours = bool(start_s >= 3600 or end_s >= 3600)
return f"{cls._format_timecode(start_s, force_hours=force_hours)}-{cls._format_timecode(end_s, force_hours=force_hours)}"
@classmethod
def _apply_clip_decorations(
cls,
pipe_objects: List[Dict[str, Any]],
clip_ranges: List[tuple[int, int]],
*,
source_king_hash: Optional[str],
) -> None:
"""Apply clip:{range} tags/titles and relationship metadata for multi-clip downloads.
- Sets the clip title (and title: tag) to exactly `clip:{range}`.
- Adds `clip:{range}` tag.
- Sets `relationships` on each emitted item (king hash first, then alt hashes)
so downstream can persist relationships into a DB/API without storing relationship tags.
"""
if not pipe_objects or len(pipe_objects) != len(clip_ranges):
return
# Always apply clip titles/tags (even for a single clip).
for po, (start_s, end_s) in zip(pipe_objects, clip_ranges):
clip_range = cls._format_clip_range(start_s, end_s)
clip_tag = f"clip:{clip_range}"
# Title: make it generic/consistent for clips.
po["title"] = clip_tag
tags = po.get("tag")
if not isinstance(tags, list):
tags = []
# Replace any existing title: tags with title:<clip_tag>
tags = [t for t in tags if not str(t).strip().lower().startswith("title:")]
# Relationships must not be stored as tags.
tags = [t for t in tags if not str(t).strip().lower().startswith("relationship:")]
tags.insert(0, f"title:{clip_tag}")
# Ensure clip tag exists
if clip_tag not in tags:
tags.append(clip_tag)
po["tag"] = tags
# Relationship tagging only makes sense when multiple clips exist.
if len(pipe_objects) < 2:
return
hashes: List[str] = []
for po in pipe_objects:
h = cls._normalise_hash_hex(str(po.get("hash") or ""))
hashes.append(h or "")
# Determine king: prefer an existing source video hash if present; else first clip becomes king.
king_hash = cls._normalise_hash_hex(source_king_hash) if source_king_hash else None
if not king_hash:
king_hash = hashes[0] if hashes and hashes[0] else None
if not king_hash:
return
alt_hashes: List[str] = [h for h in hashes if h and h != king_hash]
if not alt_hashes:
return
# Carry relationship metadata through the pipeline without using tags.
rel_payload = {"king": [king_hash], "alt": list(alt_hashes)}
for po in pipe_objects:
po["relationships"] = {"king": [king_hash], "alt": list(alt_hashes)}
def _compute_file_hash(self, filepath: Path) -> str:
"""Compute SHA256 hash of a file."""
import hashlib
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
# Module-level singleton registration
CMDLET = Download_Media()