112 lines
3.2 KiB
Python
112 lines
3.2 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from SYS.result_table import Table
|
|
|
|
|
|
def upper_text(value: Any) -> str:
|
|
text = "" if value is None else str(value)
|
|
return text.upper()
|
|
|
|
|
|
def add_startup_check(
|
|
table: Table,
|
|
status: str,
|
|
name: str,
|
|
*,
|
|
provider: str = "",
|
|
store: str = "",
|
|
files: int | str | None = None,
|
|
detail: str = "",
|
|
) -> None:
|
|
row = table.add_row()
|
|
row.add_column("STATUS", upper_text(status))
|
|
row.add_column("NAME", upper_text(name))
|
|
row.add_column("PROVIDER", upper_text(provider or ""))
|
|
row.add_column("STORE", upper_text(store or ""))
|
|
row.add_column("FILES", "" if files is None else str(files))
|
|
row.add_column("DETAIL", upper_text(detail or ""))
|
|
|
|
|
|
def has_store_subtype(cfg: dict, subtype: str) -> bool:
|
|
store_cfg = cfg.get("store")
|
|
if not isinstance(store_cfg, dict):
|
|
return False
|
|
bucket = store_cfg.get(subtype)
|
|
if not isinstance(bucket, dict):
|
|
return False
|
|
return any(isinstance(value, dict) and bool(value) for value in bucket.values())
|
|
|
|
|
|
def has_provider(cfg: dict, name: str) -> bool:
|
|
provider_cfg = cfg.get("provider")
|
|
if not isinstance(provider_cfg, dict):
|
|
return False
|
|
block = provider_cfg.get(str(name).strip().lower())
|
|
return isinstance(block, dict) and bool(block)
|
|
|
|
|
|
def has_tool(cfg: dict, name: str) -> bool:
|
|
tool_cfg = cfg.get("tool")
|
|
if not isinstance(tool_cfg, dict):
|
|
return False
|
|
block = tool_cfg.get(str(name).strip().lower())
|
|
return isinstance(block, dict) and bool(block)
|
|
|
|
|
|
def ping_url(url: str, timeout: float = 3.0) -> tuple[bool, str]:
|
|
try:
|
|
from API.HTTP import HTTPClient
|
|
|
|
with HTTPClient(timeout=timeout, retries=1) as client:
|
|
response = client.get(url, allow_redirects=True)
|
|
code = int(getattr(response, "status_code", 0) or 0)
|
|
ok = 200 <= code < 500
|
|
return ok, f"{url} (HTTP {code})"
|
|
except httpx.TimeoutException:
|
|
return False, f"{url} (timeout)"
|
|
except Exception as exc:
|
|
return False, f"{url} ({type(exc).__name__})"
|
|
|
|
|
|
def provider_display_name(key: str) -> str:
|
|
label = (key or "").strip()
|
|
lower = label.lower()
|
|
if lower == "openlibrary":
|
|
return "OpenLibrary"
|
|
if lower == "alldebrid":
|
|
return "AllDebrid"
|
|
if lower == "youtube":
|
|
return "YouTube"
|
|
return label[:1].upper() + label[1:] if label else "Provider"
|
|
|
|
|
|
def default_provider_ping_targets(provider_key: str) -> list[str]:
|
|
provider = (provider_key or "").strip().lower()
|
|
if provider == "openlibrary":
|
|
return ["https://openlibrary.org"]
|
|
if provider == "youtube":
|
|
return ["https://www.youtube.com"]
|
|
if provider == "bandcamp":
|
|
return ["https://bandcamp.com"]
|
|
if provider == "libgen":
|
|
try:
|
|
from Provider.libgen import MIRRORS
|
|
|
|
return [str(url).rstrip("/") + "/json.php" for url in (MIRRORS or []) if str(url).strip()]
|
|
except ImportError:
|
|
return []
|
|
return []
|
|
|
|
|
|
def ping_first(urls: list[str]) -> tuple[bool, str]:
|
|
for url in urls:
|
|
ok, detail = ping_url(url)
|
|
if ok:
|
|
return True, detail
|
|
if urls:
|
|
return ping_url(urls[0])
|
|
return False, "No ping target" |