from __future__ import annotations import sys import hashlib from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from ProviderCore.base import Provider, SearchResult from SYS.logger import log def _get_podcastindex_credentials(config: Dict[str, Any]) -> Tuple[str, str]: provider = config.get("provider") if not isinstance(provider, dict): return "", "" entry = provider.get("podcastindex") if not isinstance(entry, dict): return "", "" key = entry.get("key") or entry.get("Key") or entry.get("api_key") secret = entry.get("secret") or entry.get("Secret") or entry.get("api_secret") key_str = str(key or "").strip() secret_str = str(secret or "").strip() return key_str, secret_str class PodcastIndex(Provider): """Search provider for PodcastIndex.org.""" @staticmethod def _format_duration(value: Any) -> str: def _to_seconds(v: Any) -> Optional[int]: if v is None: return None if isinstance(v, (int, float)): try: return max(0, int(v)) except Exception: return None if isinstance(v, str): text = v.strip() if not text: return None if text.isdigit(): try: return max(0, int(text)) except Exception: return None # Accept common clock formats too. if ":" in text: parts = [p.strip() for p in text.split(":") if p.strip()] if len(parts) == 2 and all(p.isdigit() for p in parts): m, s = parts return max(0, int(m) * 60 + int(s)) if len(parts) == 3 and all(p.isdigit() for p in parts): h, m, s = parts return max(0, int(h) * 3600 + int(m) * 60 + int(s)) return None total = _to_seconds(value) if total is None: return "" if value is None else str(value).strip() h = total // 3600 m = (total % 3600) // 60 s = total % 60 if h > 0: return f"{h:d}h{m:d}m{s:d}s" if m > 0: return f"{m:d}m{s:d}s" return f"{s:d}s" @staticmethod def _format_bytes(value: Any) -> str: try: n = int(value) except Exception: return "" if n <= 0: return "" units = ["B", "KB", "MB", "GB", "TB"] size = float(n) unit = units[0] for u in units: unit = u if size < 1024.0 or u == units[-1]: break size /= 1024.0 if unit == "B": return f"{int(size)}{unit}" return f"{size:.1f}{unit}" @staticmethod def _format_date_from_epoch(value: Any) -> str: if value is None: return "" try: import datetime ts = int(value) if ts <= 0: return "" return datetime.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d") except Exception: return "" @staticmethod def _extract_episode_categories(ep: Dict[str, Any]) -> List[str]: cats = ep.get("categories") or ep.get("category") out: List[str] = [] if isinstance(cats, dict): for v in cats.values(): if isinstance(v, str): t = v.strip() if t: out.append(t) elif isinstance(cats, list): for v in cats: if isinstance(v, str): t = v.strip() if t: out.append(t) elif isinstance(cats, str): t = cats.strip() if t: out.append(t) # Keep the table readable. dedup: List[str] = [] seen: set[str] = set() for t in out: low = t.lower() if low in seen: continue seen.add(low) dedup.append(t) return dedup @staticmethod def _looks_like_episode(item: Any) -> bool: if not isinstance(item, dict): return False md = item.get("full_metadata") if not isinstance(md, dict): return False enc = md.get("enclosureUrl") or md.get("enclosure_url") if isinstance(enc, str) and enc.strip().startswith("http"): return True # Some pipelines may flatten episode fields. enc2 = item.get("enclosureUrl") or item.get("url") return isinstance(enc2, str) and enc2.strip().startswith("http") @staticmethod def _compute_sha256(filepath: Path) -> str: h = hashlib.sha256() with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def selector( self, selected_items: List[Any], *, ctx: Any, stage_is_last: bool = True, **_kwargs: Any, ) -> bool: if not stage_is_last: return False if not selected_items: return False # Episode selection (terminal): download episodes to temp/output dir. if all(self._looks_like_episode(x) for x in selected_items): return self._handle_episode_download_selection(selected_items, ctx) # Podcast selection (terminal): expand into episode list. return self._handle_podcast_expand_selection(selected_items, ctx) def _handle_podcast_expand_selection(self, selected_items: List[Any], ctx: Any) -> bool: chosen: List[Dict[str, Any]] = [x for x in (selected_items or []) if isinstance(x, dict)] if not chosen: return False key, secret = _get_podcastindex_credentials(self.config or {}) if not key or not secret: return False # Resolve feed id/url from the selected podcast row. item0 = chosen[0] feed_md = item0.get("full_metadata") if isinstance(item0.get("full_metadata"), dict) else {} feed_title = str(item0.get("title") or feed_md.get("title") or "Podcast").strip() or "Podcast" feed_id = None try: feed_id = int(feed_md.get("id")) if feed_md.get("id") is not None else None except Exception: feed_id = None feed_url = str(feed_md.get("url") or item0.get("path") or "").strip() try: from API.podcastindex import PodcastIndexClient client = PodcastIndexClient(key, secret) if feed_id: episodes = client.episodes_byfeedid(feed_id, max_results=200) else: episodes = client.episodes_byfeedurl(feed_url, max_results=200) except Exception as exc: log(f"[podcastindex] episode lookup failed: {exc}", file=sys.stderr) return True try: from SYS.result_table import ResultTable from SYS.rich_display import stdout_console except Exception: return True table = ResultTable(f"PodcastIndex Episodes: {feed_title}").set_preserve_order(True) table.set_table("podcastindex.episodes") try: table.set_value_case("lower") except Exception: pass results_payload: List[Dict[str, Any]] = [] for ep in episodes or []: if not isinstance(ep, dict): continue ep_title = str(ep.get("title") or "").strip() or "Unknown" enc_url = str(ep.get("enclosureUrl") or "").strip() page_url = str(ep.get("link") or "").strip() audio_url = enc_url or page_url if not audio_url: continue duration = ep.get("duration") size_bytes = ep.get("enclosureLength") or ep.get("enclosure_length") published = ep.get("datePublished") or ep.get("datePublishedPretty") published_text = self._format_date_from_epoch(published) or str(published or "").strip() sr = SearchResult( table="podcastindex", title=ep_title, path=audio_url, detail=feed_title, media_kind="audio", size_bytes=int(size_bytes) if str(size_bytes or "").isdigit() else None, columns=[ ("Title", ep_title), ("Date", published_text), ("Duration", self._format_duration(duration)), ("Size", self._format_bytes(size_bytes)), ("Url", audio_url), ], full_metadata={ **dict(ep), "_feed": dict(feed_md) if isinstance(feed_md, dict) else {}, }, ) table.add_result(sr) results_payload.append(sr.to_dict()) try: ctx.set_last_result_table(table, results_payload) ctx.set_current_stage_table(table) except Exception: pass try: stdout_console().print() stdout_console().print(table) except Exception: pass return True def _handle_episode_download_selection(self, selected_items: List[Any], ctx: Any) -> bool: key, secret = _get_podcastindex_credentials(self.config or {}) if not key or not secret: return False try: from SYS.config import resolve_output_dir output_dir = resolve_output_dir(self.config or {}) except Exception: output_dir = Path.home() / "Downloads" try: output_dir = Path(output_dir).expanduser() output_dir.mkdir(parents=True, exist_ok=True) except Exception: pass try: from SYS.download import _download_direct_file except Exception: return True payloads: List[Dict[str, Any]] = [] downloaded = 0 for item in selected_items: if not isinstance(item, dict): continue md = item.get("full_metadata") if isinstance(item.get("full_metadata"), dict) else {} enc_url = str(md.get("enclosureUrl") or item.get("url") or item.get("path") or "").strip() if not enc_url or not enc_url.startswith("http"): continue title_hint = str(item.get("title") or md.get("title") or "episode").strip() or "episode" try: result_obj = _download_direct_file( enc_url, Path(output_dir), quiet=False, suggested_filename=title_hint, ) except Exception as exc: log(f"[podcastindex] download failed: {exc}", file=sys.stderr) continue downloaded_path = None try: downloaded_path = getattr(result_obj, "filepath", None) except Exception: downloaded_path = None if downloaded_path is None: try: downloaded_path = getattr(result_obj, "file_path", None) except Exception: downloaded_path = None if downloaded_path is None: try: downloaded_path = getattr(result_obj, "path", None) except Exception: downloaded_path = None try: local_path = Path(str(downloaded_path)) except Exception: local_path = None if local_path is None or not local_path.exists(): continue sha256 = "" try: sha256 = self._compute_sha256(local_path) except Exception: sha256 = "" tags: List[str] = [] tags.append(f"title:{title_hint}") cats = self._extract_episode_categories(md) if isinstance(md, dict) else [] for c in cats[:10]: tags.append(f"tag:{c}") payload: Dict[str, Any] = { "path": str(local_path), "hash": sha256, "title": title_hint, "action": "provider:podcastindex.selector", "download_mode": "file", "store": "local", "media_kind": "audio", "tag": tags, "provider": "podcastindex", "url": enc_url, } if isinstance(md, dict) and md: payload["full_metadata"] = dict(md) payloads.append(payload) downloaded += 1 try: if payloads and hasattr(ctx, "set_last_result_items_only"): ctx.set_last_result_items_only(payloads) except Exception: pass if downloaded <= 0: return True try: from SYS.rich_display import stdout_console stdout_console().print(f"Downloaded {downloaded} episode(s) -> {output_dir}") except Exception: pass return True def validate(self) -> bool: key, secret = _get_podcastindex_credentials(self.config or {}) return bool(key and secret) def search( self, query: str, limit: int = 10, filters: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[SearchResult]: _ = filters _ = kwargs key, secret = _get_podcastindex_credentials(self.config or {}) if not key or not secret: return [] try: from API.podcastindex import PodcastIndexClient client = PodcastIndexClient(key, secret) feeds = client.search_byterm(query, max_results=limit) except Exception as exc: log(f"[podcastindex] search failed: {exc}", file=sys.stderr) return [] results: List[SearchResult] = [] for feed in feeds[: max(0, int(limit))]: if not isinstance(feed, dict): continue title = str(feed.get("title") or "").strip() or "Unknown" author = str(feed.get("author") or feed.get("ownerName") or "").strip() feed_url = str(feed.get("url") or "").strip() site_url = str(feed.get("link") or "").strip() language = str(feed.get("language") or "").strip() episode_count_val = feed.get("episodeCount") episode_count = "" if episode_count_val is not None: try: episode_count = str(int(episode_count_val)) except Exception: episode_count = str(episode_count_val).strip() path = feed_url or site_url or str(feed.get("id") or "").strip() columns = [ ("Title", title), ("Author", author), ("Episodes", episode_count), ("Lang", language), ("Feed", feed_url), ] results.append( SearchResult( table="podcastindex", title=title, path=path, detail=author, media_kind="audio", columns=columns, full_metadata=dict(feed), ) ) return results