from __future__ import annotations from pathlib import Path import sys from typing import Any, Dict, Iterable, List, Optional from ProviderCore.base import Provider, SearchResult from ProviderCore.download import sanitize_filename from SYS.logger import log def _get_debrid_api_key(config: Dict[str, Any]) -> Optional[str]: """Read AllDebrid API key from config. Preferred formats: - config.conf provider block: [provider=alldebrid] api_key=... -> config["provider"]["alldebrid"]["api_key"] - store-style debrid block: config["store"]["debrid"]["all-debrid"]["api_key"] Falls back to some legacy keys if present. """ # 1) provider block: [provider=alldebrid] provider = config.get("provider") if isinstance(provider, dict): entry = provider.get("alldebrid") if isinstance(entry, dict): for k in ("api_key", "apikey", "API_KEY", "APIKEY"): val = entry.get(k) if isinstance(val, str) and val.strip(): return val.strip() if isinstance(entry, str) and entry.strip(): return entry.strip() # 2) store.debrid block (canonical for debrid store configuration) try: from config import get_debrid_api_key key = get_debrid_api_key(config, service="All-debrid") return key.strip() if key else None except Exception: pass # Legacy fallback (kept permissive so older configs still work) for legacy_key in ("alldebrid_api_key", "AllDebrid", "all_debrid_api_key"): val = config.get(legacy_key) if isinstance(val, str) and val.strip(): return val.strip() return None class AllDebrid(Provider): """Search provider for AllDebrid account content. This provider lists and searches the files/magnets already present in the user's AllDebrid account. Query behavior: - "*" / "all" / "list": list recent files from ready magnets - otherwise: substring match on file name OR magnet name, or exact magnet id """ def validate(self) -> bool: # Consider "available" when configured; actual API connectivity can vary. return bool(_get_debrid_api_key(self.config or {})) def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]: """Download an AllDebrid SearchResult into output_dir. AllDebrid magnet file listings often provide links that require an API "unlock" step to produce a true direct-download URL. Without unlocking, callers may download a small HTML/redirect page instead of file bytes. This is used by the download-file cmdlet when a provider item is piped. """ try: api_key = _get_debrid_api_key(self.config or {}) if not api_key: return None target = str(getattr(result, "path", "") or "").strip() if not target.startswith(("http://", "https://")): return None try: from API.alldebrid import AllDebridClient client = AllDebridClient(api_key) except Exception as exc: log(f"[alldebrid] Failed to init client: {exc}", file=sys.stderr) return None # Quiet mode when download-file is mid-pipeline. quiet = bool(self.config.get("_quiet_background_output")) if isinstance(self.config, dict) else False unlocked_url = target try: unlocked = client.unlock_link(target) if isinstance(unlocked, str) and unlocked.strip().startswith(("http://", "https://")): unlocked_url = unlocked.strip() except Exception as exc: # Fall back to the raw link, but warn. log(f"[alldebrid] Failed to unlock link: {exc}", file=sys.stderr) # Prefer provider title as the output filename. suggested = sanitize_filename(str(getattr(result, "title", "") or "").strip()) suggested_name = suggested if suggested else None try: from SYS.download import _download_direct_file dl_res = _download_direct_file( unlocked_url, Path(output_dir), quiet=quiet, suggested_filename=suggested_name, ) downloaded_path = getattr(dl_res, "path", None) if downloaded_path is None: return None downloaded_path = Path(str(downloaded_path)) # Guard: if we got an HTML error/redirect page, treat as failure. try: if downloaded_path.exists(): size = downloaded_path.stat().st_size if size > 0 and size <= 250_000 and downloaded_path.suffix.lower() not in (".html", ".htm"): head = downloaded_path.read_bytes()[:512] try: text = head.decode("utf-8", errors="ignore").lower() except Exception: text = "" if " Iterable[Dict[str, Any]]: """Flatten AllDebrid magnet file tree into file dicts, preserving relative paths. API commonly returns: - file: {n: name, s: size, l: link} - folder: {n: name, e: [sub_items]} This flattener attaches a best-effort relative path to each yielded file node as `_relpath` using POSIX separators (e.g., "Season 1/E01.mkv"). Some call sites in this repo also expect {name, size, link}, so we accept both. """ prefix = list(_prefix or []) if not items: return if isinstance(items, dict): items = [items] if not isinstance(items, list): return for node in items: if not isinstance(node, dict): continue children = node.get('e') or node.get('children') if isinstance(children, list): folder_name = node.get('n') or node.get('name') next_prefix = prefix if isinstance(folder_name, str) and folder_name.strip(): next_prefix = prefix + [folder_name.strip()] yield from AllDebrid._flatten_files(children, _prefix=next_prefix) continue name = node.get('n') or node.get('name') link = node.get('l') or node.get('link') if isinstance(name, str) and name.strip() and isinstance(link, str) and link.strip(): rel_parts = prefix + [name.strip()] relpath = "/".join([p for p in rel_parts if p]) enriched = dict(node) enriched["_relpath"] = relpath yield enriched def search( self, query: str, limit: int = 50, filters: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[SearchResult]: q = (query or "").strip() if not q: return [] api_key = _get_debrid_api_key(self.config or {}) if not api_key: return [] view = None if isinstance(filters, dict): view = str(filters.get("view") or "").strip().lower() or None view = view or "folders" try: from API.alldebrid import AllDebridClient client = AllDebridClient(api_key) except Exception as exc: log(f"[alldebrid] Failed to init client: {exc}", file=sys.stderr) return [] q_lower = q.lower() needle = "" if q_lower in {"*", "all", "list"} else q_lower # Second-stage: list files for a specific magnet id. if view == "files": magnet_id_val = None if isinstance(filters, dict): magnet_id_val = filters.get("magnet_id") if magnet_id_val is None: magnet_id_val = kwargs.get("magnet_id") try: magnet_id = int(magnet_id_val) except Exception: return [] magnet_status: Dict[str, Any] = {} try: magnet_status = client.magnet_status(magnet_id) except Exception: magnet_status = {} magnet_name = str(magnet_status.get('filename') or magnet_status.get('name') or magnet_status.get('hash') or f"magnet-{magnet_id}") status_code = magnet_status.get('statusCode') status_text = str(magnet_status.get('status') or "").strip() or "unknown" ready = status_code == 4 or bool(magnet_status.get('ready')) if not ready: return [ SearchResult( table="alldebrid", title=magnet_name, path=f"alldebrid:magnet:{magnet_id}", detail=status_text, annotations=["folder", "not-ready"], media_kind="folder", tag={"alldebrid", "folder", str(magnet_id), "not-ready"}, columns=[ ("Folder", magnet_name), ("ID", str(magnet_id)), ("Status", status_text), ("Ready", "no"), ], full_metadata={"magnet": magnet_status, "magnet_id": magnet_id}, ) ] try: files_result = client.magnet_links([magnet_id]) magnet_files = files_result.get(str(magnet_id), {}) if isinstance(files_result, dict) else {} file_tree = magnet_files.get('files', []) if isinstance(magnet_files, dict) else [] except Exception as exc: log(f"[alldebrid] Failed to list files for magnet {magnet_id}: {exc}", file=sys.stderr) file_tree = [] results: List[SearchResult] = [] for file_node in self._flatten_files(file_tree): file_name = str(file_node.get('n') or file_node.get('name') or '').strip() file_url = str(file_node.get('l') or file_node.get('link') or '').strip() relpath = str(file_node.get('_relpath') or file_name or '').strip() file_size = file_node.get('s') or file_node.get('size') if not file_name or not file_url: continue if needle and needle not in file_name.lower(): continue size_bytes: Optional[int] = None try: if isinstance(file_size, (int, float)): size_bytes = int(file_size) elif isinstance(file_size, str) and file_size.isdigit(): size_bytes = int(file_size) except Exception: size_bytes = None results.append( SearchResult( table="alldebrid", title=file_name, path=file_url, detail=magnet_name, annotations=["file"], media_kind="file", size_bytes=size_bytes, tag={"alldebrid", "file", str(magnet_id)}, columns=[ ("File", file_name), ("Folder", magnet_name), ("ID", str(magnet_id)), ], full_metadata={ "magnet": magnet_status, "magnet_id": magnet_id, "magnet_name": magnet_name, "relpath": relpath, "file": file_node, }, ) ) if len(results) >= max(1, limit): break return results # Default: folders view (magnets) try: magnets = client.magnet_list() or [] except Exception as exc: log(f"[alldebrid] Failed to list account magnets: {exc}", file=sys.stderr) return [] wanted_id: Optional[int] = None if needle.isdigit(): try: wanted_id = int(needle) except Exception: wanted_id = None results: List[SearchResult] = [] for magnet in magnets: if not isinstance(magnet, dict): continue try: magnet_id = int(magnet.get('id')) except Exception: continue magnet_name = str(magnet.get('filename') or magnet.get('name') or magnet.get('hash') or f"magnet-{magnet_id}") magnet_name_lower = magnet_name.lower() status_text = str(magnet.get('status') or "").strip() or "unknown" status_code = magnet.get('statusCode') ready = status_code == 4 or bool(magnet.get('ready')) if wanted_id is not None: if magnet_id != wanted_id: continue elif needle and (needle not in magnet_name_lower): continue size_bytes: Optional[int] = None try: size_val = magnet.get('size') if isinstance(size_val, (int, float)): size_bytes = int(size_val) elif isinstance(size_val, str) and size_val.isdigit(): size_bytes = int(size_val) except Exception: size_bytes = None results.append( SearchResult( table="alldebrid", title=magnet_name, path=f"alldebrid:magnet:{magnet_id}", detail=status_text, annotations=["folder"], media_kind="folder", size_bytes=size_bytes, tag={"alldebrid", "folder", str(magnet_id)} | ({"ready"} if ready else {"not-ready"}), columns=[ ("Folder", magnet_name), ("ID", str(magnet_id)), ("Status", status_text), ("Ready", "yes" if ready else "no"), ], full_metadata={"magnet": magnet, "magnet_id": magnet_id}, ) ) if len(results) >= max(1, limit): break return results