Files
Medios-Macina/cmdnat/_status_shared.py
T
2026-04-26 16:49:23 -07:00

150 lines
4.4 KiB
Python

from __future__ import annotations
from typing import Any
import httpx
from SYS.result_table import Table
def upper_text(value: Any) -> str:
text = "" if value is None else str(value)
return text.upper()
def add_startup_check(
table: Table,
status: str,
name: str,
*,
provider: str = "",
store: str = "",
files: int | str | None = None,
detail: str = "",
) -> None:
row = table.add_row()
row.add_column("STATUS", upper_text(status))
row.add_column("NAME", upper_text(name))
row.add_column("PLUGIN", upper_text(provider or ""))
row.add_column("STORE", upper_text(store or ""))
row.add_column("FILES", "" if files is None else str(files))
row.add_column("DETAIL", upper_text(detail or ""))
def has_store_subtype(cfg: dict, subtype: str) -> bool:
store_cfg = cfg.get("store")
if not isinstance(store_cfg, dict):
return False
bucket = store_cfg.get(subtype)
if not isinstance(bucket, dict):
return False
return any(isinstance(value, dict) and bool(value) for value in bucket.values())
def has_provider(cfg: dict, name: str) -> bool:
provider_cfg = cfg.get("plugin")
if not isinstance(provider_cfg, dict):
provider_cfg = cfg.get("provider")
if not isinstance(provider_cfg, dict):
return False
block = provider_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def has_tool(cfg: dict, name: str) -> bool:
tool_cfg = cfg.get("tool")
if not isinstance(tool_cfg, dict):
return False
block = tool_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def ping_url(url: str, timeout: float = 3.0) -> tuple[bool, str]:
try:
from API.HTTP import HTTPClient
with HTTPClient(timeout=timeout, retries=1) as client:
response = client.get(url, allow_redirects=True)
code = int(getattr(response, "status_code", 0) or 0)
ok = 200 <= code < 500
return ok, f"{url} (HTTP {code})"
except httpx.TimeoutException:
return False, f"{url} (timeout)"
except Exception as exc:
return False, f"{url} ({type(exc).__name__})"
def provider_display_name(key: str) -> str:
label = (key or "").strip()
return label[:1].upper() + label[1:] if label else "Plugin"
def ping_first(urls: list[str]) -> tuple[bool, str]:
for url in urls:
ok, detail = ping_url(url)
if ok:
return True, detail
if urls:
return ping_url(urls[0])
return False, "No ping target"
def collect_plugin_startup_checks(config: dict) -> list[dict[str, Any]]:
provider_cfg = None
if isinstance(config, dict):
provider_cfg = config.get("plugin")
if not isinstance(provider_cfg, dict):
provider_cfg = config.get("provider")
if not isinstance(provider_cfg, dict) or not provider_cfg:
return []
try:
from ProviderCore.registry import get_plugin_class
except Exception:
return []
checks: list[dict[str, Any]] = []
for plugin_name in provider_cfg.keys():
plugin_key = str(plugin_name or "").strip().lower()
if not plugin_key:
continue
plugin_class = None
try:
plugin_class = get_plugin_class(plugin_key)
except Exception:
plugin_class = None
if plugin_class is None:
checks.append(
{
"status": "UNKNOWN",
"name": provider_display_name(plugin_key),
"plugin": plugin_key,
"detail": "Not registered",
}
)
continue
try:
plugin = plugin_class(config)
summary = plugin.status_summary()
except Exception as exc:
summary = {
"status": "DISABLED",
"name": provider_display_name(plugin_key),
"plugin": plugin_key,
"detail": str(exc),
}
checks.append(
{
"status": str(summary.get("status") or "UNKNOWN"),
"name": str(summary.get("name") or provider_display_name(plugin_key)),
"plugin": str(summary.get("plugin") or plugin_key),
"detail": str(summary.get("detail") or ""),
"files": summary.get("files"),
}
)
return checks