Files
Medios-Macina/cmdnat/status.py

253 lines
11 KiB
Python
Raw Permalink Normal View History

2026-01-11 00:39:17 -08:00
from __future__ import annotations
import sys
import shutil
from typing import Any, Dict, List, Optional, Sequence, Tuple
from datetime import datetime
from cmdlet._shared import Cmdlet, CmdletArg
from SYS import pipeline as ctx
from SYS.result_table import ResultTable
from SYS.logger import log, set_debug, debug
from SYS.rich_display import stdout_console
CMDLET = Cmdlet(
name=".status",
summary="Check and display service/provider status",
usage=".status",
arg=[],
)
def _upper(value: Any) -> str:
text = "" if value is None else str(value)
return text.upper()
def _add_startup_check(
table: ResultTable,
status: str,
name: str,
*,
provider: str = "",
store: str = "",
files: int | str | None = None,
detail: str = "",
) -> None:
row = table.add_row()
row.add_column("STATUS", _upper(status))
row.add_column("NAME", _upper(name))
row.add_column("PROVIDER", _upper(provider or ""))
row.add_column("STORE", _upper(store or ""))
row.add_column("FILES", "" if files is None else str(files))
row.add_column("DETAIL", _upper(detail or ""))
def _has_store_subtype(cfg: dict, subtype: str) -> bool:
store_cfg = cfg.get("store")
if not isinstance(store_cfg, dict):
return False
bucket = store_cfg.get(subtype)
if not isinstance(bucket, dict):
return False
return any(isinstance(v, dict) and bool(v) for v in bucket.values())
def _has_provider(cfg: dict, name: str) -> bool:
provider_cfg = cfg.get("provider")
if not isinstance(provider_cfg, dict):
return False
block = provider_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def _has_tool(cfg: dict, name: str) -> bool:
tool_cfg = cfg.get("tool")
if not isinstance(tool_cfg, dict):
return False
block = tool_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def _ping_url(url: str, timeout: float = 3.0) -> tuple[bool, str]:
try:
from API.HTTP import HTTPClient
with HTTPClient(timeout=timeout, retries=1) as client:
resp = client.get(url, allow_redirects=True)
code = int(getattr(resp, "status_code", 0) or 0)
ok = 200 <= code < 500
return ok, f"{url} (HTTP {code})"
except Exception as exc:
return False, f"{url} ({type(exc).__name__})"
def _provider_display_name(key: str) -> str:
k = (key or "").strip()
low = k.lower()
if low == "openlibrary": return "OpenLibrary"
if low == "alldebrid": return "AllDebrid"
if low == "youtube": return "YouTube"
return k[:1].upper() + k[1:] if k else "Provider"
def _default_provider_ping_targets(provider_key: str) -> list[str]:
prov = (provider_key or "").strip().lower()
if prov == "openlibrary": return ["https://openlibrary.org"]
if prov == "youtube": return ["https://www.youtube.com"]
if prov == "bandcamp": return ["https://bandcamp.com"]
if prov == "libgen":
try:
from Provider.libgen import MIRRORS
return [str(x).rstrip("/") + "/json.php" for x in (MIRRORS or []) if str(x).strip()]
except ImportError: return []
return []
def _ping_first(urls: list[str]) -> tuple[bool, str]:
for u in urls:
ok, detail = _ping_url(u)
if ok: return True, detail
if urls:
ok, detail = _ping_url(urls[0])
return ok, detail
return False, "No ping target"
def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int:
startup_table = ResultTable(
"*********<IGNITIO>*********<NOUSEMPEH>*********<RUGRAPOG>*********<OMEGHAU>*********"
)
startup_table.set_no_choice(True).set_preserve_order(True)
startup_table.set_value_case("upper")
debug_enabled = bool(config.get("debug", False))
_add_startup_check(startup_table, "ENABLED" if debug_enabled else "DISABLED", "DEBUGGING")
try:
# MPV check
try:
from MPV.mpv_ipc import MPV
MPV()
mpv_path = shutil.which("mpv")
_add_startup_check(startup_table, "ENABLED", "MPV", detail=mpv_path or "Available")
except Exception as exc:
_add_startup_check(startup_table, "DISABLED", "MPV", detail=str(exc))
# Store Registry
store_registry = None
try:
from Store import Store as StoreRegistry
store_registry = StoreRegistry(config=config, suppress_debug=True)
except Exception:
pass
# Hydrus
if _has_store_subtype(config, "hydrusnetwork"):
hcfg = config.get("store", {}).get("hydrusnetwork", {})
for iname, icfg in hcfg.items():
if not isinstance(icfg, dict): continue
nkey = str(icfg.get("NAME") or iname)
uval = str(icfg.get("URL") or "").strip()
ok = bool(store_registry and store_registry.is_available(nkey))
status = "ENABLED" if ok else "DISABLED"
files = None
detail = uval
if ok and store_registry:
try:
backend = store_registry[nkey]
files = getattr(backend, "total_count", None)
if files is None and hasattr(backend, "get_total_count"):
files = backend.get_total_count()
except Exception: pass
else:
err = store_registry.get_backend_error(iname) if store_registry else None
detail = f"{uval} - {err or 'Unavailable'}"
_add_startup_check(startup_table, status, nkey, store="hydrusnetwork", files=files, detail=detail)
# Providers
pcfg = config.get("provider", {})
if isinstance(pcfg, dict) and pcfg:
from ProviderCore.registry import list_providers, list_search_providers, list_file_providers
from Provider.metadata_provider import list_metadata_providers
p_avail = list_providers(config) or {}
s_avail = list_search_providers(config) or {}
f_avail = list_file_providers(config) or {}
m_avail = list_metadata_providers(config) or {}
already = {"matrix"}
for pname in pcfg.keys():
prov = str(pname).lower()
if prov in already: continue
display = _provider_display_name(prov)
if prov == "alldebrid":
try:
from Provider.alldebrid import _get_debrid_api_key
from API.alldebrid import AllDebridClient
api_key = _get_debrid_api_key(config)
if not api_key:
_add_startup_check(startup_table, "DISABLED", display, provider=prov, detail="Not configured")
else:
client = AllDebridClient(api_key)
_add_startup_check(startup_table, "ENABLED", display, provider=prov, detail=getattr(client, "base_url", "Connected"))
except Exception as exc:
_add_startup_check(startup_table, "DISABLED", display, provider=prov, detail=str(exc))
already.add(prov)
continue
is_known = prov in p_avail or prov in s_avail or prov in f_avail or prov in m_avail
if not is_known:
_add_startup_check(startup_table, "UNKNOWN", display, provider=prov, detail="Not registered")
else:
ok_val = p_avail.get(prov) or s_avail.get(prov) or f_avail.get(prov) or m_avail.get(prov)
detail = "Configured" if ok_val else "Not configured"
ping_targets = _default_provider_ping_targets(prov)
if ping_targets:
pok, pdet = _ping_first(ping_targets)
detail = pdet if ok_val else f"{detail} | {pdet}"
_add_startup_check(startup_table, "ENABLED" if ok_val else "DISABLED", display, provider=prov, detail=detail)
already.add(prov)
# Matrix
if _has_provider(config, "matrix"):
try:
from Provider.matrix import Matrix
m_prov = Matrix(config)
mcfg = config.get("provider", {}).get("matrix", {})
hs = str(mcfg.get("homeserver") or "").strip()
rid = str(mcfg.get("room_id") or "").strip()
detail = f"{hs} room:{rid}"
_add_startup_check(startup_table, "ENABLED" if m_prov.validate() else "DISABLED", "Matrix", provider="matrix", detail=detail)
except Exception as exc:
_add_startup_check(startup_table, "DISABLED", "Matrix", provider="matrix", detail=str(exc))
# Folders
if _has_store_subtype(config, "folder"):
fcfg = config.get("store", {}).get("folder", {})
for iname, icfg in fcfg.items():
if not isinstance(icfg, dict): continue
nkey = str(icfg.get("NAME") or iname)
pval = str(icfg.get("PATH") or icfg.get("path") or "").strip()
ok = bool(store_registry and store_registry.is_available(nkey))
if ok and store_registry:
backend = store_registry[nkey]
scan_ok = getattr(backend, "scan_ok", True)
sdet = getattr(backend, "scan_detail", "Up to date")
stats = getattr(backend, "scan_stats", {})
files = int(stats.get("files_total_db", 0)) if stats else None
_add_startup_check(startup_table, "SCANNED" if scan_ok else "ERROR", nkey, store="folder", files=files, detail=f"{pval} - {sdet}")
else:
err = store_registry.get_backend_error(iname) if store_registry else None
_add_startup_check(startup_table, "ERROR", nkey, store="folder", detail=f"{pval} - {err or 'Unavailable'}")
# Cookies
try:
from tool.ytdlp import YtDlpTool
cf = YtDlpTool(config).resolve_cookiefile()
_add_startup_check(startup_table, "FOUND" if cf else "MISSING", "Cookies", detail=str(cf) if cf else "Not found")
except Exception: pass
except Exception as exc:
debug(f"Status check failed: {exc}")
if startup_table.rows:
# Mark as rendered to prevent CLI.py from auto-printing it to stdout
# (avoiding duplication in TUI logs, while keeping it in TUI Results)
setattr(startup_table, "_rendered_by_cmdlet", True)
ctx.set_current_stage_table(startup_table)
return 0
CMDLET.exec = _run