Files
Medios-Macina/Provider/ytdlp.py

391 lines
15 KiB
Python
Raw Normal View History

2026-01-10 17:30:18 -08:00
"""ytdlp format selector provider.
When a URL is passed through download-file, this provider displays available formats
in a table and routes format selection back to download-file with the chosen format
already specified via -format, skipping the format table on the second invocation.
This keeps format selection logic in ytdlp and leaves add-file plug-and-play.
"""
from __future__ import annotations
import sys
from typing import Any, Dict, Iterable, List, Optional
from ProviderCore.base import Provider, SearchResult
from SYS.provider_helpers import TableProviderMixin
from SYS.logger import log, debug
from tool.ytdlp import list_formats, is_url_supported_by_ytdlp
class ytdlp(TableProviderMixin, Provider):
"""ytdlp format selector and video search provider.
DUAL FUNCTIONALITY:
1. FORMAT SELECTION: When download-file is used with a yt-dlp supported URL,
displays available formats in a table for user selection.
2. SEARCH: When search-file is used with -provider ytdlp, searches YouTube
(and other yt-dlp supported sites) for videos.
FORMAT SELECTION USAGE:
- User runs: download-file "https://example.com/video"
- If URL is ytdlp-supported and no format specified, displays format table
- User selects @N (e.g., @3 for format index 3)
- Selection args include -format <format_id>, re-invoking download-file
- Second download-file call sees -format and skips table, downloads directly
SEARCH USAGE:
- User runs: search-file -provider ytdlp "linux tutorial"
- Shows YouTube search results as a table
- User selects @1 to download that video
- Selection args route to download-file for streaming download
SELECTION FLOW (Format):
1. download-file receives URL without -format
2. Calls ytdlp to list formats
3. Returns formats as ResultTable (from this provider)
4. User selects @N
5. Selection args: ["-format", "<format_id>"] route back to download-file
6. Second download-file invocation with -format skips table
SELECTION FLOW (Search):
1. search-file lists YouTube videos via yt_dlp
2. Returns videos as ResultTable (from this provider)
3. User selects @N
4. Selection args: ["-url", "<youtube_url>"] route to download-file
5. download-file downloads the selected video
TABLE AUTO-STAGES:
- Format selection: ytdlp.formatlist -> download-file (with -format)
- Video search: ytdlp.search -> download-file (with -url)
SUPPORTED URLS:
This provider dynamically discovers all yt-dlp supported sites via yt_dlp.gen_extractors().
"""
# Dynamically load URL domains from yt-dlp's extractors
# This enables provider auto-discovery for format selection routing
@property
def URL(self) -> List[str]:
"""Get list of supported domains from yt-dlp extractors."""
try:
import yt_dlp
# Build a comprehensive list from known extractors and fallback domains
domains = set(self._fallback_domains)
# Try to get extractors and extract domain info
try:
extractors = yt_dlp.gen_extractors()
for extractor_class in extractors:
# Get extractor name and try to convert to domain
name = getattr(extractor_class, 'IE_NAME', '')
if name and name not in ('generic', 'http'):
# Convert extractor name to domain (e.g., 'YouTube' -> 'youtube.com')
name_lower = name.lower().replace('ie', '').strip()
if name_lower and len(name_lower) > 2:
domains.add(f"{name_lower}.com")
except Exception:
pass
return list(domains) if domains else self._fallback_domains
except Exception:
return self._fallback_domains
# Fallback common domains in case extraction fails
_fallback_domains = [
"youtube.com", "youtu.be",
"bandcamp.com",
"vimeo.com",
"twitch.tv",
"dailymotion.com",
"rumble.com",
"odysee.com",
]
TABLE_AUTO_STAGES = {
"ytdlp.formatlist": ["download-file"],
"ytdlp.search": ["download-file"],
}
# Forward selection args (including -format or -url) to the next stage
AUTO_STAGE_USE_SELECTION_ARGS = True
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[SearchResult]:
"""
NOT USED: This provider is invoked via ResultTable integration, not search.
Formats are fetched directly in download-file and returned as ResultTable rows
with this provider registered as the handler.
"""
return []
def search(
self,
query: str,
limit: int = 10,
filters: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[SearchResult]:
"""Search YouTube and other yt-dlp supported sites for videos.
Uses yt-dlp's ytsearch capability to find videos, then returns them
as SearchResult rows for table display and selection.
"""
try:
import yt_dlp # type: ignore
ydl_opts: Dict[str, Any] = {
"quiet": True,
"skip_download": True,
"extract_flat": True
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
search_query = f"ytsearch{limit}:{query}"
info = ydl.extract_info(search_query, download=False)
entries = info.get("entries") or []
results: List[SearchResult] = []
for video_data in entries[:limit]:
title = video_data.get("title", "Unknown")
video_id = video_data.get("id", "")
url = video_data.get(
"url"
) or f"https://youtube.com/watch?v={video_id}"
uploader = video_data.get("uploader", "Unknown")
duration = video_data.get("duration", 0)
view_count = video_data.get("view_count", 0)
duration_str = (
f"{int(duration // 60)}:{int(duration % 60):02d}"
if duration else ""
)
views_str = f"{view_count:,}" if view_count else ""
results.append(
SearchResult(
table="ytdlp.search",
title=title,
path=url,
detail=f"By: {uploader}",
annotations=[duration_str, f"{views_str} views"],
media_kind="video",
columns=[
("Title", title),
("Uploader", uploader),
("Duration", duration_str),
("Views", views_str),
],
full_metadata={
"video_id": video_id,
"uploader": uploader,
"duration": duration,
"view_count": view_count,
# Selection metadata for table system and @N expansion
"_selection_args": ["-url", url],
},
)
)
return results
except Exception:
debug("[ytdlp] yt_dlp import or search failed")
return []
def validate(self) -> bool:
"""Validate yt-dlp availability."""
try:
import yt_dlp # type: ignore
return True
except Exception:
return False
# Minimal provider registration for the new table system
try:
from SYS.result_table_adapters import register_provider
from SYS.result_table_api import ResultModel, ColumnSpec, metadata_column, title_column
def _convert_format_result_to_model(sr: Any) -> ResultModel:
"""Convert format dict to ResultModel for strict table display."""
d = sr.to_dict() if hasattr(sr, "to_dict") else (sr if isinstance(sr, dict) else {})
title = d.get("title") or f"Format {d.get('format_id', 'unknown')}"
# Extract metadata from columns and full_metadata
metadata: Dict[str, Any] = {}
columns = d.get("columns") or []
for name, value in columns:
key = str(name or "").strip().lower()
if key in ("id", "resolution", "ext", "size", "video", "audio", "format_id"):
metadata[key] = value
try:
fm = d.get("full_metadata") or {}
if isinstance(fm, dict):
for k, v in fm.items():
metadata[str(k).strip().lower()] = v
except Exception:
pass
return ResultModel(
title=str(title),
path=d.get("url") or d.get("target"),
ext=d.get("ext"),
size_bytes=None,
metadata=metadata,
source="ytdlp"
)
def _adapter(items: Iterable[Any]) -> Iterable[ResultModel]:
"""Adapter to convert format results to ResultModels."""
for it in items:
try:
yield _convert_format_result_to_model(it)
except Exception:
continue
def _has_metadata(rows: List[ResultModel], key: str) -> bool:
"""Check if any row has a given metadata key with a non-empty value."""
for row in rows:
md = row.metadata or {}
if key in md:
val = md[key]
if val is None:
continue
if isinstance(val, str) and not val.strip():
continue
return True
return False
def _columns_factory(rows: List[ResultModel]) -> List[ColumnSpec]:
"""Build column specifications from available metadata in rows."""
cols = [title_column()]
if _has_metadata(rows, "resolution"):
cols.append(metadata_column("resolution", "Resolution"))
if _has_metadata(rows, "ext"):
cols.append(metadata_column("ext", "Ext"))
if _has_metadata(rows, "size"):
cols.append(metadata_column("size", "Size"))
if _has_metadata(rows, "video"):
cols.append(metadata_column("video", "Video"))
if _has_metadata(rows, "audio"):
cols.append(metadata_column("audio", "Audio"))
return cols
def _selection_fn(row: ResultModel) -> List[str]:
"""Return selection args for format selection.
When user selects @N, these args are passed to download-file which sees
the -format specifier and skips the format table, downloading directly.
"""
metadata = row.metadata or {}
# Check for explicit selection args first
args = metadata.get("_selection_args") or metadata.get("selection_args")
if isinstance(args, (list, tuple)) and args:
result_args = [str(x) for x in args if x is not None]
debug(f"[ytdlp] Selection routed with args: {result_args}")
return result_args
# Fallback: use format_id
format_id = metadata.get("format_id") or metadata.get("id")
if format_id:
result_args = ["-format", str(format_id)]
debug(f"[ytdlp] Selection routed with format_id: {format_id}")
return result_args
debug(f"[ytdlp] Warning: No selection args or format_id found in row")
return []
register_provider(
"ytdlp.formatlist",
_adapter,
columns=_columns_factory,
selection_fn=_selection_fn,
metadata={"description": "ytdlp format selector for streaming media"},
)
debug("[ytdlp] Provider registered successfully with TABLE_AUTO_STAGES routing to download-file")
# Also register the search table
def _convert_search_result_to_model(sr: Any) -> ResultModel:
"""Convert YouTube SearchResult to ResultModel for strict table display."""
d = sr.to_dict() if hasattr(sr, "to_dict") else (sr if isinstance(sr, dict) else {"title": getattr(sr, "title", str(sr))})
title = d.get("title") or ""
path = d.get("path") or None
columns = d.get("columns") or getattr(sr, "columns", None) or []
# Extract metadata from columns and full_metadata
metadata: Dict[str, Any] = {}
for name, value in columns:
key = str(name or "").strip().lower()
if key in ("uploader", "duration", "views", "video_id"):
metadata[key] = value
try:
fm = d.get("full_metadata") or {}
if isinstance(fm, dict):
for k, v in fm.items():
metadata[str(k).strip().lower()] = v
except Exception:
pass
return ResultModel(
title=str(title),
path=str(path) if path else None,
ext=None,
size_bytes=None,
metadata=metadata,
source="ytdlp"
)
def _search_adapter(items: Iterable[Any]) -> Iterable[ResultModel]:
"""Adapter to convert search results to ResultModels."""
for it in items:
try:
yield _convert_search_result_to_model(it)
except Exception:
continue
def _search_columns_factory(rows: List[ResultModel]) -> List[ColumnSpec]:
"""Build column specifications for search results."""
cols = [title_column()]
if _has_metadata(rows, "uploader"):
cols.append(metadata_column("uploader", "Uploader"))
if _has_metadata(rows, "duration"):
cols.append(metadata_column("duration", "Duration"))
if _has_metadata(rows, "views"):
cols.append(metadata_column("views", "Views"))
return cols
def _search_selection_fn(row: ResultModel) -> List[str]:
"""Return selection args for search results.
When user selects @N from search results, route to download-file with -url.
"""
metadata = row.metadata or {}
# Check for explicit selection args first
args = metadata.get("_selection_args") or metadata.get("selection_args")
if isinstance(args, (list, tuple)) and args:
return [str(x) for x in args if x is not None]
# Fallback to direct URL
if row.path:
return ["-url", row.path]
return ["-title", row.title or ""]
register_provider(
"ytdlp.search",
_search_adapter,
columns=_search_columns_factory,
selection_fn=_search_selection_fn,
metadata={"description": "ytdlp video search using yt-dlp"},
)
debug("[ytdlp] Search provider registered successfully")
except Exception as e:
# best-effort registration
debug(f"[ytdlp] Provider registration note: {e}")
pass