from __future__ import annotations import sys from typing import Any, Dict, Iterable, List, Optional from ProviderCore.base import Provider, SearchResult from SYS.provider_helpers import TableProviderMixin from SYS.logger import log class YouTube(TableProviderMixin, Provider): """YouTube video search provider using yt_dlp. This provider uses the new table system (strict ResultTable adapter pattern) for consistent selection and auto-stage integration. It exposes videos as SearchResult rows with metadata enrichment for: - video_id: Unique YouTube video identifier - uploader: Channel/creator name - duration: Video length in seconds - view_count: Number of views - _selection_args: For @N expansion control and download-file routing SELECTION FLOW: 1. User runs: search-file -provider youtube "linux tutorial" 2. Results show video rows with uploader, duration, views 3. User selects a video: @1 4. Selection metadata routes to download-file with the YouTube URL 5. download-file uses yt_dlp to download the video """ TABLE_AUTO_STAGES = { "youtube": ["download-file"], } # If the user provides extra args on the selection stage, forward them to download-file. AUTO_STAGE_USE_SELECTION_ARGS = True def search( self, query: str, limit: int = 10, filters: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[SearchResult]: # Use the yt_dlp Python module (installed via requirements.txt). try: import yt_dlp # type: ignore ydl_opts: Dict[str, Any] = { "quiet": True, "skip_download": True, "extract_flat": True } with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type] search_query = f"ytsearch{limit}:{query}" info = ydl.extract_info(search_query, download=False) entries = info.get("entries") or [] results: List[SearchResult] = [] for video_data in entries[:limit]: title = video_data.get("title", "Unknown") video_id = video_data.get("id", "") url = video_data.get( "url" ) or f"https://youtube.com/watch?v={video_id}" uploader = video_data.get("uploader", "Unknown") duration = video_data.get("duration", 0) view_count = video_data.get("view_count", 0) duration_str = ( f"{int(duration // 60)}:{int(duration % 60):02d}" if duration else "" ) views_str = f"{view_count:,}" if view_count else "" results.append( SearchResult( table="youtube", title=title, path=url, detail=f"By: {uploader}", annotations=[duration_str, f"{views_str} views"], media_kind="video", columns=[ ("Title", title), ("Uploader", uploader), ("Duration", duration_str), ("Views", views_str), ], full_metadata={ "video_id": video_id, "uploader": uploader, "duration": duration, "view_count": view_count, # Selection metadata for table system and @N expansion "_selection_args": ["-url", url], }, ) ) return results except Exception: log("[youtube] yt_dlp import failed", file=sys.stderr) return [] def validate(self) -> bool: try: import yt_dlp # type: ignore return True except Exception: return False # Minimal provider registration for the new table system try: from SYS.result_table_adapters import register_provider from SYS.result_table_api import ResultModel, ColumnSpec, metadata_column, title_column def _convert_search_result_to_model(sr: Any) -> ResultModel: """Convert YouTube SearchResult to ResultModel for strict table display.""" d = sr.to_dict() if hasattr(sr, "to_dict") else (sr if isinstance(sr, dict) else {"title": getattr(sr, "title", str(sr))}) title = d.get("title") or "" path = d.get("path") or None columns = d.get("columns") or getattr(sr, "columns", None) or [] # Extract metadata from columns and full_metadata metadata: Dict[str, Any] = {} for name, value in columns: key = str(name or "").strip().lower() if key in ("uploader", "duration", "views", "video_id"): metadata[key] = value try: fm = d.get("full_metadata") or {} if isinstance(fm, dict): for k, v in fm.items(): metadata[str(k).strip().lower()] = v except Exception: pass return ResultModel( title=str(title), path=str(path) if path else None, ext=None, size_bytes=None, metadata=metadata, source="youtube" ) def _adapter(items: Iterable[Any]) -> Iterable[ResultModel]: """Adapter to convert SearchResults to ResultModels.""" for it in items: try: yield _convert_search_result_to_model(it) except Exception: continue def _has_metadata(rows: List[ResultModel], key: str) -> bool: """Check if any row has a given metadata key with a non-empty value.""" for row in rows: md = row.metadata or {} if key in md: val = md[key] if val is None: continue if isinstance(val, str) and not val.strip(): continue return True return False def _columns_factory(rows: List[ResultModel]) -> List[ColumnSpec]: """Build column specifications from available metadata in rows.""" cols = [title_column()] if _has_metadata(rows, "uploader"): cols.append(metadata_column("uploader", "Uploader")) if _has_metadata(rows, "duration"): cols.append(metadata_column("duration", "Duration")) if _has_metadata(rows, "views"): cols.append(metadata_column("views", "Views")) return cols def _selection_fn(row: ResultModel) -> List[str]: """Return selection args for @N expansion and auto-download integration. Uses explicit -url flag to ensure the YouTube URL is properly routed to download-file for yt_dlp download processing. """ metadata = row.metadata or {} # Check for explicit selection args first args = metadata.get("_selection_args") or metadata.get("selection_args") if isinstance(args, (list, tuple)) and args: return [str(x) for x in args if x is not None] # Fallback to direct URL if row.path: return ["-url", row.path] return ["-title", row.title or ""] register_provider( "youtube", _adapter, columns=_columns_factory, selection_fn=_selection_fn, metadata={"description": "YouTube video search using yt_dlp"}, ) except Exception: # best-effort registration pass