Files
Medios-Macina/cmdnat/_status_shared.py
T
2026-05-14 20:47:20 -07:00

256 lines
7.9 KiB
Python

from __future__ import annotations
from typing import Any
from SYS.result_table import Table
def upper_text(value: Any) -> str:
text = "" if value is None else str(value)
return text.upper()
def add_startup_check(
table: Table,
status: str,
name: str,
*,
provider: str = "",
instance: str = "",
files: int | str | None = None,
detail: str = "",
) -> None:
row = table.add_row()
row.add_column("STATUS", upper_text(status))
row.add_column("NAME", upper_text(name))
row.add_column("PLUGIN", upper_text(provider or ""))
row.add_column("INSTANCE", upper_text(instance or ""))
row.add_column("FILES", "" if files is None else str(files))
row.add_column("DETAIL", upper_text(detail or ""))
def _provider_config_map(config: dict) -> dict[str, Any]:
if not isinstance(config, dict):
return {}
provider_cfg = config.get("plugin")
if not isinstance(provider_cfg, dict):
provider_cfg = config.get("provider")
return provider_cfg if isinstance(provider_cfg, dict) else {}
def _iter_registered_plugin_infos() -> tuple[Any, ...]:
try:
from ProviderCore.registry import REGISTRY
return tuple(
sorted(
REGISTRY.iter_plugins(),
key=lambda info: str(
getattr(info, "canonical_name", "") or ""
).lower(),
)
)
except Exception:
return ()
def _extract_configured_instance_names(raw_entry: Any) -> list[str]:
if not isinstance(raw_entry, dict) or not raw_entry:
return []
if not all(isinstance(value, dict) for value in raw_entry.values()):
return []
names: list[str] = []
for key in raw_entry.keys():
name = str(key or "").strip()
if not name or name.lower() == "default":
continue
names.append(name)
return names
def _resolve_startup_instance_text(
plugin: Any,
summary: dict[str, Any],
configured_entry: Any,
) -> str:
instance_text = str(summary.get("instance") or "").strip()
if instance_text:
return instance_text
raw_instances = summary.get("instances")
if isinstance(raw_instances, (list, tuple, set)):
values = [str(value).strip() for value in raw_instances if str(value).strip()]
if values:
return ", ".join(values)
elif raw_instances is not None:
instance_text = str(raw_instances).strip()
if instance_text:
return instance_text
try:
configured_instances = plugin.configured_instances() if plugin is not None else []
except Exception:
configured_instances = []
if configured_instances:
return ", ".join(str(value).strip() for value in configured_instances if str(value).strip())
return ", ".join(_extract_configured_instance_names(configured_entry))
def has_store_subtype(cfg: dict, subtype: str) -> bool:
store_cfg = cfg.get("store")
if not isinstance(store_cfg, dict):
return False
bucket = store_cfg.get(subtype)
if not isinstance(bucket, dict):
return False
return any(isinstance(value, dict) and bool(value) for value in bucket.values())
def has_provider(cfg: dict, name: str) -> bool:
provider_cfg = cfg.get("plugin")
if not isinstance(provider_cfg, dict):
provider_cfg = cfg.get("provider")
if not isinstance(provider_cfg, dict):
return False
block = provider_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def has_tool(cfg: dict, name: str) -> bool:
tool_cfg = cfg.get("tool")
if not isinstance(tool_cfg, dict):
return False
block = tool_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def ping_url(url: str, timeout: float = 3.0) -> tuple[bool, str]:
try:
from API.HTTP import HTTPClient
with HTTPClient(timeout=timeout, retries=1) as client:
response = client.get(url, allow_redirects=True)
code = int(getattr(response, "status_code", 0) or 0)
ok = 200 <= code < 500
return ok, f"{url} (HTTP {code})"
except Exception as exc:
import httpx as _httpx
if isinstance(exc, _httpx.TimeoutException):
return False, f"{url} (timeout)"
return False, f"{url} ({type(exc).__name__})"
def provider_display_name(key: str) -> str:
label = (key or "").strip().lower()
if not label:
return "Plugin"
# Preserve expected brand casing for common providers.
display_overrides = {
"youtube": "YouTube",
"openlibrary": "OpenLibrary",
"podcastindex": "PodcastIndex",
}
if label in display_overrides:
return display_overrides[label]
return label[:1].upper() + label[1:]
def default_provider_ping_targets(key: str) -> list[str]:
"""Return default health-check URLs for known providers."""
label = (key or "").strip().lower()
defaults = {
"bandcamp": ["https://bandcamp.com"],
"youtube": ["https://www.youtube.com"],
"openlibrary": ["https://openlibrary.org"],
"podcastindex": ["https://podcastindex.org"],
"loc": ["https://www.loc.gov"],
}
return list(defaults.get(label, []))
def ping_first(urls: list[str]) -> tuple[bool, str]:
for url in urls:
ok, detail = ping_url(url)
if ok:
return True, detail
if urls:
return ping_url(urls[0])
return False, "No ping target"
def collect_plugin_startup_checks(config: dict) -> list[dict[str, Any]]:
provider_cfg = _provider_config_map(config)
checks: list[dict[str, Any]] = []
seen_plugin_keys: set[str] = set()
for info in _iter_registered_plugin_infos():
plugin_key = str(getattr(info, "canonical_name", "") or "").strip().lower()
if not plugin_key:
continue
seen_plugin_keys.add(plugin_key)
plugin = None
summary: dict[str, Any]
display_name = provider_display_name(plugin_key)
configured_entry: Any = None
try:
plugin = info.plugin_class(config)
configured_entry = plugin.plugin_config_root()
summary = plugin.status_summary()
except Exception as exc:
summary = {
"status": "DISABLED",
"name": display_name,
"plugin": plugin_key,
"detail": str(exc),
}
status = str(summary.get("status") or "UNKNOWN").strip().upper() or "UNKNOWN"
name = str(summary.get("name") or display_name)
detail = str(summary.get("detail") or "").strip()
if detail.lower() == "configured" and not configured_entry:
detail = "Available"
if not detail:
if status == "ENABLED":
detail = "Configured" if configured_entry else "Available"
else:
detail = "Not configured" if not configured_entry else "Unavailable"
checks.append(
{
"status": status,
"name": name,
"plugin": str(summary.get("plugin") or plugin_key),
"instance": _resolve_startup_instance_text(
plugin,
summary,
configured_entry if configured_entry else provider_cfg.get(plugin_key),
),
"detail": detail,
"files": summary.get("files"),
}
)
for plugin_name, raw_entry in sorted(provider_cfg.items()):
plugin_key = str(plugin_name or "").strip().lower()
if not plugin_key or plugin_key in seen_plugin_keys:
continue
checks.append(
{
"status": "UNKNOWN",
"name": provider_display_name(plugin_key),
"plugin": plugin_key,
"instance": ", ".join(_extract_configured_instance_names(raw_entry)),
"detail": "Not registered",
"files": None,
}
)
return checks