Files
Medios-Macina/Provider/alldebrid.py

477 lines
18 KiB
Python

from __future__ import annotations
from pathlib import Path
import sys
from typing import Any, Dict, Iterable, List, Optional
from ProviderCore.base import Provider, SearchResult
from ProviderCore.download import sanitize_filename
from SYS.logger import log
def _get_debrid_api_key(config: Dict[str, Any]) -> Optional[str]:
"""Read AllDebrid API key from config.
Preferred formats:
- config.conf provider block:
[provider=alldebrid]
api_key=...
-> config["provider"]["alldebrid"]["api_key"]
- store-style debrid block:
config["store"]["debrid"]["all-debrid"]["api_key"]
Falls back to some legacy keys if present.
"""
# 1) provider block: [provider=alldebrid]
provider = config.get("provider")
if isinstance(provider, dict):
entry = provider.get("alldebrid")
if isinstance(entry, dict):
for k in ("api_key", "apikey", "API_KEY", "APIKEY"):
val = entry.get(k)
if isinstance(val, str) and val.strip():
return val.strip()
if isinstance(entry, str) and entry.strip():
return entry.strip()
# 2) store.debrid block (canonical for debrid store configuration)
try:
from SYS.config import get_debrid_api_key
key = get_debrid_api_key(config, service="All-debrid")
return key.strip() if key else None
except Exception:
pass
# Legacy fallback (kept permissive so older configs still work)
for legacy_key in ("alldebrid_api_key", "AllDebrid", "all_debrid_api_key"):
val = config.get(legacy_key)
if isinstance(val, str) and val.strip():
return val.strip()
return None
class AllDebrid(Provider):
"""Search provider for AllDebrid account content.
This provider lists and searches the files/magnets already present in the
user's AllDebrid account.
Query behavior:
- "*" / "all" / "list": list recent files from ready magnets
- otherwise: substring match on file name OR magnet name, or exact magnet id
"""
def validate(self) -> bool:
# Consider "available" when configured; actual API connectivity can vary.
return bool(_get_debrid_api_key(self.config or {}))
def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]:
"""Download an AllDebrid SearchResult into output_dir.
AllDebrid magnet file listings often provide links that require an API
"unlock" step to produce a true direct-download URL. Without unlocking,
callers may download a small HTML/redirect page instead of file bytes.
This is used by the download-file cmdlet when a provider item is piped.
"""
try:
api_key = _get_debrid_api_key(self.config or {})
if not api_key:
return None
target = str(getattr(result, "path", "") or "").strip()
if not target.startswith(("http://", "https://")):
return None
try:
from API.alldebrid import AllDebridClient
client = AllDebridClient(api_key)
except Exception as exc:
log(f"[alldebrid] Failed to init client: {exc}", file=sys.stderr)
return None
# Quiet mode when download-file is mid-pipeline.
quiet = (
bool(self.config.get("_quiet_background_output"))
if isinstance(self.config,
dict) else False
)
unlocked_url = target
try:
unlocked = client.unlock_link(target)
if isinstance(unlocked,
str) and unlocked.strip().startswith(("http://",
"https://")):
unlocked_url = unlocked.strip()
except Exception as exc:
# Fall back to the raw link, but warn.
log(f"[alldebrid] Failed to unlock link: {exc}", file=sys.stderr)
# Prefer provider title as the output filename.
suggested = sanitize_filename(
str(getattr(result,
"title",
"") or "").strip()
)
suggested_name = suggested if suggested else None
try:
from SYS.download import _download_direct_file
pipe_progress = None
try:
if isinstance(self.config, dict):
pipe_progress = self.config.get("_pipeline_progress")
except Exception:
pipe_progress = None
dl_res = _download_direct_file(
unlocked_url,
Path(output_dir),
quiet=quiet,
suggested_filename=suggested_name,
pipeline_progress=pipe_progress,
)
downloaded_path = getattr(dl_res, "path", None)
if downloaded_path is None:
return None
downloaded_path = Path(str(downloaded_path))
# Guard: if we got an HTML error/redirect page, treat as failure.
try:
if downloaded_path.exists():
size = downloaded_path.stat().st_size
if (size > 0 and size <= 250_000
and downloaded_path.suffix.lower() not in (".html",
".htm")):
head = downloaded_path.read_bytes()[:512]
try:
text = head.decode("utf-8", errors="ignore").lower()
except Exception:
text = ""
if "<html" in text or "<!doctype html" in text:
try:
downloaded_path.unlink()
except Exception:
pass
log(
"[alldebrid] Download returned HTML page (not file bytes). Try again or check AllDebrid link status.",
file=sys.stderr,
)
return None
except Exception:
pass
return downloaded_path if downloaded_path.exists() else None
except Exception as exc:
log(f"[alldebrid] Download failed: {exc}", file=sys.stderr)
return None
except Exception:
return None
@staticmethod
def _flatten_files(items: Any,
*,
_prefix: Optional[List[str]] = None) -> Iterable[Dict[str,
Any]]:
"""Flatten AllDebrid magnet file tree into file dicts, preserving relative paths.
API commonly returns:
- file: {n: name, s: size, l: link}
- folder: {n: name, e: [sub_items]}
This flattener attaches a best-effort relative path to each yielded file node
as `_relpath` using POSIX separators (e.g., "Season 1/E01.mkv").
Some call sites in this repo also expect {name, size, link}, so we accept both.
"""
prefix = list(_prefix or [])
if not items:
return
if isinstance(items, dict):
items = [items]
if not isinstance(items, list):
return
for node in items:
if not isinstance(node, dict):
continue
children = node.get("e") or node.get("children")
if isinstance(children, list):
folder_name = node.get("n") or node.get("name")
next_prefix = prefix
if isinstance(folder_name, str) and folder_name.strip():
next_prefix = prefix + [folder_name.strip()]
yield from AllDebrid._flatten_files(children, _prefix=next_prefix)
continue
name = node.get("n") or node.get("name")
link = node.get("l") or node.get("link")
if isinstance(name,
str) and name.strip() and isinstance(link,
str) and link.strip():
rel_parts = prefix + [name.strip()]
relpath = "/".join([p for p in rel_parts if p])
enriched = dict(node)
enriched["_relpath"] = relpath
yield enriched
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str,
Any]] = None,
**kwargs: Any,
) -> List[SearchResult]:
q = (query or "").strip()
if not q:
return []
api_key = _get_debrid_api_key(self.config or {})
if not api_key:
return []
view = None
if isinstance(filters, dict):
view = str(filters.get("view") or "").strip().lower() or None
view = view or "folders"
try:
from API.alldebrid import AllDebridClient
client = AllDebridClient(api_key)
except Exception as exc:
log(f"[alldebrid] Failed to init client: {exc}", file=sys.stderr)
return []
q_lower = q.lower()
needle = "" if q_lower in {"*",
"all",
"list"} else q_lower
# Second-stage: list files for a specific magnet id.
if view == "files":
magnet_id_val = None
if isinstance(filters, dict):
magnet_id_val = filters.get("magnet_id")
if magnet_id_val is None:
magnet_id_val = kwargs.get("magnet_id")
try:
magnet_id = int(magnet_id_val)
except Exception:
return []
magnet_status: Dict[str,
Any] = {}
try:
magnet_status = client.magnet_status(magnet_id)
except Exception:
magnet_status = {}
magnet_name = str(
magnet_status.get("filename") or magnet_status.get("name")
or magnet_status.get("hash") or f"magnet-{magnet_id}"
)
status_code = magnet_status.get("statusCode")
status_text = str(magnet_status.get("status") or "").strip() or "unknown"
ready = status_code == 4 or bool(magnet_status.get("ready"))
if not ready:
return [
SearchResult(
table="alldebrid",
title=magnet_name,
path=f"alldebrid:magnet:{magnet_id}",
detail=status_text,
annotations=["folder",
"not-ready"],
media_kind="folder",
tag={"alldebrid",
"folder",
str(magnet_id),
"not-ready"},
columns=[
("Folder",
magnet_name),
("ID",
str(magnet_id)),
("Status",
status_text),
("Ready",
"no"),
],
full_metadata={
"magnet": magnet_status,
"magnet_id": magnet_id
},
)
]
try:
files_result = client.magnet_links([magnet_id])
magnet_files = (
files_result.get(str(magnet_id),
{}) if isinstance(files_result,
dict) else {}
)
file_tree = magnet_files.get("files",
[]) if isinstance(magnet_files,
dict) else []
except Exception as exc:
log(
f"[alldebrid] Failed to list files for magnet {magnet_id}: {exc}",
file=sys.stderr,
)
file_tree = []
results: List[SearchResult] = []
for file_node in self._flatten_files(file_tree):
file_name = str(file_node.get("n") or file_node.get("name")
or "").strip()
file_url = str(file_node.get("l") or file_node.get("link")
or "").strip()
relpath = str(file_node.get("_relpath") or file_name or "").strip()
file_size = file_node.get("s") or file_node.get("size")
if not file_name or not file_url:
continue
if needle and needle not in file_name.lower():
continue
size_bytes: Optional[int] = None
try:
if isinstance(file_size, (int, float)):
size_bytes = int(file_size)
elif isinstance(file_size, str) and file_size.isdigit():
size_bytes = int(file_size)
except Exception:
size_bytes = None
results.append(
SearchResult(
table="alldebrid",
title=file_name,
path=file_url,
detail=magnet_name,
annotations=["file"],
media_kind="file",
size_bytes=size_bytes,
tag={"alldebrid",
"file",
str(magnet_id)},
columns=[
("File",
file_name),
("Folder",
magnet_name),
("ID",
str(magnet_id)),
],
full_metadata={
"magnet": magnet_status,
"magnet_id": magnet_id,
"magnet_name": magnet_name,
"relpath": relpath,
"file": file_node,
},
)
)
if len(results) >= max(1, limit):
break
return results
# Default: folders view (magnets)
try:
magnets = client.magnet_list() or []
except Exception as exc:
log(f"[alldebrid] Failed to list account magnets: {exc}", file=sys.stderr)
return []
wanted_id: Optional[int] = None
if needle.isdigit():
try:
wanted_id = int(needle)
except Exception:
wanted_id = None
results: List[SearchResult] = []
for magnet in magnets:
if not isinstance(magnet, dict):
continue
try:
magnet_id = int(magnet.get("id"))
except Exception:
continue
magnet_name = str(
magnet.get("filename") or magnet.get("name") or magnet.get("hash")
or f"magnet-{magnet_id}"
)
magnet_name_lower = magnet_name.lower()
status_text = str(magnet.get("status") or "").strip() or "unknown"
status_code = magnet.get("statusCode")
ready = status_code == 4 or bool(magnet.get("ready"))
if wanted_id is not None:
if magnet_id != wanted_id:
continue
elif needle and (needle not in magnet_name_lower):
continue
size_bytes: Optional[int] = None
try:
size_val = magnet.get("size")
if isinstance(size_val, (int, float)):
size_bytes = int(size_val)
elif isinstance(size_val, str) and size_val.isdigit():
size_bytes = int(size_val)
except Exception:
size_bytes = None
results.append(
SearchResult(
table="alldebrid",
title=magnet_name,
path=f"alldebrid:magnet:{magnet_id}",
detail=status_text,
annotations=["folder"],
media_kind="folder",
size_bytes=size_bytes,
tag={"alldebrid",
"folder",
str(magnet_id)}
| ({"ready"} if ready else {"not-ready"}),
columns=[
("Folder",
magnet_name),
("ID",
str(magnet_id)),
("Status",
status_text),
("Ready",
"yes" if ready else "no"),
],
full_metadata={
"magnet": magnet,
"magnet_id": magnet_id
},
)
)
if len(results) >= max(1, limit):
break
return results