2025-12-29 19:00:00 -08:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
from importlib import import_module
|
2026-01-01 20:37:27 -08:00
|
|
|
from types import ModuleType
|
2025-12-29 19:00:00 -08:00
|
|
|
from typing import Any, Dict, List, Optional
|
2026-01-31 19:57:09 -08:00
|
|
|
import logging
|
|
|
|
|
logger = logging.getLogger(__name__)
|
2025-12-29 19:00:00 -08:00
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
from .config import get_local_storage_path
|
|
|
|
|
except Exception:
|
|
|
|
|
get_local_storage_path = None # type: ignore
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _should_hide_db_args(config: Optional[Dict[str, Any]]) -> bool:
|
|
|
|
|
"""Return True when the library root/local DB is not configured."""
|
|
|
|
|
if not isinstance(config, dict):
|
|
|
|
|
return False
|
|
|
|
|
if get_local_storage_path is None:
|
|
|
|
|
return False
|
|
|
|
|
try:
|
|
|
|
|
return not bool(get_local_storage_path(config))
|
|
|
|
|
except Exception:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
2026-01-01 20:37:27 -08:00
|
|
|
_cmdlet_pkg: ModuleType | None = None
|
2025-12-29 19:00:00 -08:00
|
|
|
|
2026-01-01 20:37:27 -08:00
|
|
|
|
|
|
|
|
def _get_cmdlet_package() -> Optional[ModuleType]:
|
|
|
|
|
global _cmdlet_pkg
|
|
|
|
|
if _cmdlet_pkg is not None:
|
|
|
|
|
return _cmdlet_pkg
|
|
|
|
|
try:
|
|
|
|
|
_cmdlet_pkg = import_module("cmdlet")
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Failed to import cmdlet package: %s", exc)
|
2026-01-01 20:37:27 -08:00
|
|
|
_cmdlet_pkg = None
|
|
|
|
|
return _cmdlet_pkg
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_registry() -> Dict[str, Any]:
|
|
|
|
|
pkg = _get_cmdlet_package()
|
|
|
|
|
if pkg is None:
|
|
|
|
|
return {}
|
|
|
|
|
return getattr(pkg, "REGISTRY", {}) or {}
|
2025-12-29 19:00:00 -08:00
|
|
|
|
|
|
|
|
|
2026-01-11 00:39:17 -08:00
|
|
|
def ensure_registry_loaded(force: bool = False) -> None:
|
|
|
|
|
"""Ensure native commands are registered into REGISTRY (idempotent unless force=True)."""
|
2026-01-01 20:37:27 -08:00
|
|
|
pkg = _get_cmdlet_package()
|
|
|
|
|
if pkg is None:
|
|
|
|
|
return
|
|
|
|
|
ensure_fn = getattr(pkg, "ensure_cmdlet_modules_loaded", None)
|
|
|
|
|
if callable(ensure_fn):
|
2025-12-29 19:00:00 -08:00
|
|
|
try:
|
2026-01-11 00:39:17 -08:00
|
|
|
ensure_fn(force=force)
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("ensure_registry_loaded: ensure_cmdlet_modules_loaded failed: %s", exc)
|
2025-12-29 19:00:00 -08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _normalize_mod_name(mod_name: str) -> str:
|
|
|
|
|
"""Normalize a command/module name for import resolution."""
|
|
|
|
|
normalized = (mod_name or "").strip()
|
|
|
|
|
if normalized.startswith("."):
|
|
|
|
|
normalized = normalized.lstrip(".")
|
|
|
|
|
normalized = normalized.replace("-", "_")
|
|
|
|
|
return normalized
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def import_cmd_module(mod_name: str):
|
|
|
|
|
"""Import a cmdlet/native module from cmdnat or cmdlet packages."""
|
|
|
|
|
normalized = _normalize_mod_name(mod_name)
|
|
|
|
|
if not normalized:
|
|
|
|
|
return None
|
|
|
|
|
for package in ("cmdnat", "cmdlet", None):
|
|
|
|
|
try:
|
2026-01-31 20:24:15 -08:00
|
|
|
# When attempting a bare import (package is None), prefer the repo-local
|
|
|
|
|
# `MPV` package for the `mpv` module name so we don't accidentally
|
|
|
|
|
# import the third-party `mpv` package (python-mpv) which can raise
|
|
|
|
|
# OSError if system libmpv is missing.
|
|
|
|
|
if package is None and normalized == "mpv":
|
|
|
|
|
try:
|
|
|
|
|
return import_module("MPV")
|
|
|
|
|
except ModuleNotFoundError:
|
|
|
|
|
# Local MPV package not present; fall back to the normal bare import.
|
|
|
|
|
pass
|
|
|
|
|
|
2025-12-29 19:00:00 -08:00
|
|
|
qualified = f"{package}.{normalized}" if package else normalized
|
|
|
|
|
return import_module(qualified)
|
|
|
|
|
except ModuleNotFoundError:
|
2026-01-31 20:24:15 -08:00
|
|
|
# Module not available in this package prefix; try the next.
|
2025-12-29 19:00:00 -08:00
|
|
|
continue
|
2026-01-31 20:24:15 -08:00
|
|
|
except (ImportError, OSError) as exc:
|
|
|
|
|
# Some native/binary-backed packages (e.g., mpv) raise ImportError/OSError
|
|
|
|
|
# on systems missing shared libraries. These are optional; log a short
|
|
|
|
|
# warning but avoid spamming the console with a full traceback.
|
|
|
|
|
logger.warning("Optional module %s failed to import: %s", qualified, exc)
|
|
|
|
|
continue
|
|
|
|
|
except Exception:
|
|
|
|
|
# Unexpected errors should be loud and include a traceback to aid debugging.
|
|
|
|
|
logger.exception("Unexpected error importing module %s", qualified)
|
2025-12-29 19:00:00 -08:00
|
|
|
continue
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _normalize_arg(arg: Any) -> Dict[str, Any]:
|
|
|
|
|
"""Convert a CmdletArg/dict into a plain metadata dict."""
|
|
|
|
|
if isinstance(arg, dict):
|
|
|
|
|
name = arg.get("name", "")
|
|
|
|
|
return {
|
|
|
|
|
"name": str(name).lstrip("-"),
|
|
|
|
|
"type": arg.get("type", "string"),
|
|
|
|
|
"required": bool(arg.get("required", False)),
|
|
|
|
|
"description": arg.get("description", ""),
|
|
|
|
|
"choices": arg.get("choices", []) or [],
|
|
|
|
|
"alias": arg.get("alias", ""),
|
|
|
|
|
"variadic": arg.get("variadic", False),
|
|
|
|
|
"requires_db": bool(arg.get("requires_db", False)),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
name = getattr(arg, "name", "") or ""
|
|
|
|
|
return {
|
|
|
|
|
"name": str(name).lstrip("-"),
|
|
|
|
|
"type": getattr(arg, "type", "string"),
|
|
|
|
|
"required": bool(getattr(arg, "required", False)),
|
|
|
|
|
"description": getattr(arg, "description", ""),
|
|
|
|
|
"choices": getattr(arg, "choices", []) or [],
|
|
|
|
|
"alias": getattr(arg, "alias", ""),
|
|
|
|
|
"variadic": getattr(arg, "variadic", False),
|
|
|
|
|
"requires_db": bool(getattr(arg, "requires_db", False)),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_cmdlet_metadata(
|
|
|
|
|
cmd_name: str, config: Optional[Dict[str, Any]] = None
|
|
|
|
|
) -> Optional[Dict[str, Any]]:
|
|
|
|
|
"""Return normalized metadata for a cmdlet, if available (aliases supported)."""
|
|
|
|
|
ensure_registry_loaded()
|
|
|
|
|
normalized = cmd_name.replace("-", "_")
|
|
|
|
|
mod = import_cmd_module(normalized)
|
|
|
|
|
data = getattr(mod, "CMDLET", None) if mod else None
|
|
|
|
|
|
|
|
|
|
if data is None:
|
|
|
|
|
try:
|
2026-01-01 20:37:27 -08:00
|
|
|
registry = _get_registry()
|
|
|
|
|
reg_fn = registry.get(cmd_name.replace("_", "-").lower())
|
2025-12-29 19:00:00 -08:00
|
|
|
if reg_fn:
|
|
|
|
|
owner_mod = getattr(reg_fn, "__module__", "")
|
|
|
|
|
if owner_mod:
|
|
|
|
|
owner = import_module(owner_mod)
|
|
|
|
|
data = getattr(owner, "CMDLET", None)
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Registry fallback failed while resolving cmdlet %s: %s", cmd_name, exc)
|
2025-12-29 19:00:00 -08:00
|
|
|
data = None
|
|
|
|
|
|
|
|
|
|
if not data:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
if hasattr(data, "to_dict"):
|
|
|
|
|
base = data.to_dict()
|
|
|
|
|
elif isinstance(data, dict):
|
|
|
|
|
base = data
|
|
|
|
|
else:
|
|
|
|
|
base = {}
|
|
|
|
|
|
|
|
|
|
name = getattr(data, "name", base.get("name", cmd_name)) or cmd_name
|
|
|
|
|
aliases = getattr(data, "alias", base.get("alias", [])) or []
|
|
|
|
|
usage = getattr(data, "usage", base.get("usage", ""))
|
|
|
|
|
summary = getattr(data, "summary", base.get("summary", ""))
|
|
|
|
|
details = getattr(data, "detail", base.get("detail", [])) or []
|
|
|
|
|
args_list = getattr(data, "arg", base.get("arg", [])) or []
|
|
|
|
|
args = [_normalize_arg(arg) for arg in args_list]
|
2026-01-03 03:37:48 -08:00
|
|
|
examples_list = getattr(data, "examples", base.get("examples", [])) or []
|
|
|
|
|
if not examples_list:
|
|
|
|
|
examples_list = getattr(data, "example", base.get("example", [])) or []
|
|
|
|
|
examples = []
|
|
|
|
|
for example in examples_list:
|
|
|
|
|
text = str(example or "").strip()
|
|
|
|
|
if text:
|
|
|
|
|
examples.append(text)
|
2025-12-29 19:00:00 -08:00
|
|
|
|
|
|
|
|
if _should_hide_db_args(config):
|
|
|
|
|
args = [a for a in args if not a.get("requires_db")]
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"name": str(name).replace("_", "-").lower(),
|
|
|
|
|
"aliases": [str(a).replace("_", "-").lower() for a in aliases if a],
|
|
|
|
|
"usage": usage,
|
|
|
|
|
"summary": summary,
|
|
|
|
|
"details": details,
|
|
|
|
|
"args": args,
|
2026-01-03 03:37:48 -08:00
|
|
|
"examples": examples,
|
2025-12-29 19:00:00 -08:00
|
|
|
"raw": data,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2026-01-11 00:39:17 -08:00
|
|
|
def list_cmdlet_metadata(
|
|
|
|
|
force: bool = False, config: Optional[Dict[str, Any]] = None
|
|
|
|
|
) -> Dict[str, Dict[str, Any]]:
|
2025-12-29 19:00:00 -08:00
|
|
|
"""Collect metadata for all registered cmdlet keyed by canonical name."""
|
2026-01-11 00:39:17 -08:00
|
|
|
ensure_registry_loaded(force=force)
|
2025-12-29 19:00:00 -08:00
|
|
|
entries: Dict[str, Dict[str, Any]] = {}
|
2026-01-01 20:37:27 -08:00
|
|
|
registry = _get_registry()
|
|
|
|
|
for reg_name in registry.keys():
|
2025-12-29 19:00:00 -08:00
|
|
|
meta = get_cmdlet_metadata(reg_name, config=config)
|
|
|
|
|
canonical = str(reg_name).replace("_", "-").lower()
|
|
|
|
|
|
|
|
|
|
if meta:
|
|
|
|
|
canonical = meta.get("name", canonical)
|
|
|
|
|
aliases = meta.get("aliases", [])
|
|
|
|
|
base = entries.get(
|
|
|
|
|
canonical,
|
|
|
|
|
{
|
|
|
|
|
"name": canonical,
|
|
|
|
|
"aliases": [],
|
|
|
|
|
"usage": "",
|
|
|
|
|
"summary": "",
|
|
|
|
|
"details": [],
|
|
|
|
|
"args": [],
|
2026-01-03 03:37:48 -08:00
|
|
|
"examples": meta.get("examples", []),
|
2025-12-29 19:00:00 -08:00
|
|
|
"raw": meta.get("raw"),
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
merged_aliases = set(base.get("aliases", [])) | set(aliases)
|
|
|
|
|
if canonical != reg_name:
|
|
|
|
|
merged_aliases.add(reg_name)
|
|
|
|
|
base["aliases"] = sorted(a for a in merged_aliases if a and a != canonical)
|
|
|
|
|
if not base.get("usage") and meta.get("usage"):
|
|
|
|
|
base["usage"] = meta["usage"]
|
|
|
|
|
if not base.get("summary") and meta.get("summary"):
|
|
|
|
|
base["summary"] = meta["summary"]
|
|
|
|
|
if not base.get("details") and meta.get("details"):
|
|
|
|
|
base["details"] = meta["details"]
|
|
|
|
|
if not base.get("args") and meta.get("args"):
|
|
|
|
|
base["args"] = meta["args"]
|
2026-01-03 03:37:48 -08:00
|
|
|
example_sources: List[str] = []
|
|
|
|
|
for attr in ("examples", "example"):
|
|
|
|
|
values = meta.get(attr, []) if isinstance(meta, dict) else []
|
|
|
|
|
example_sources.extend(values or [])
|
|
|
|
|
merged_examples = [e for e in base.get("examples", []) or []]
|
|
|
|
|
for example_entry in example_sources:
|
|
|
|
|
if example_entry not in merged_examples:
|
|
|
|
|
merged_examples.append(example_entry)
|
|
|
|
|
base["examples"] = merged_examples
|
2025-12-29 19:00:00 -08:00
|
|
|
if not base.get("raw"):
|
|
|
|
|
base["raw"] = meta.get("raw")
|
|
|
|
|
entries[canonical] = base
|
|
|
|
|
else:
|
|
|
|
|
entries.setdefault(
|
|
|
|
|
canonical,
|
|
|
|
|
{
|
|
|
|
|
"name": canonical,
|
|
|
|
|
"aliases": [],
|
|
|
|
|
"usage": "",
|
|
|
|
|
"summary": "",
|
|
|
|
|
"details": [],
|
|
|
|
|
"args": [],
|
2026-01-03 03:37:48 -08:00
|
|
|
"examples": [],
|
2025-12-29 19:00:00 -08:00
|
|
|
"raw": None,
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
return entries
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_cmdlet_names(
|
2026-01-11 00:39:17 -08:00
|
|
|
include_aliases: bool = True,
|
|
|
|
|
force: bool = False,
|
|
|
|
|
config: Optional[Dict[str, Any]] = None,
|
2025-12-29 19:00:00 -08:00
|
|
|
) -> List[str]:
|
|
|
|
|
"""Return sorted cmdlet names (optionally including aliases)."""
|
2026-01-11 00:39:17 -08:00
|
|
|
ensure_registry_loaded(force=force)
|
|
|
|
|
entries = list_cmdlet_metadata(force=force, config=config)
|
2025-12-29 19:00:00 -08:00
|
|
|
names = set()
|
|
|
|
|
for meta in entries.values():
|
|
|
|
|
names.add(meta.get("name", ""))
|
|
|
|
|
if include_aliases:
|
|
|
|
|
for alias in meta.get("aliases", []):
|
|
|
|
|
names.add(alias)
|
|
|
|
|
return sorted(n for n in names if n)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_cmdlet_arg_flags(cmd_name: str, config: Optional[Dict[str, Any]] = None) -> List[str]:
|
|
|
|
|
"""Return flag variants for cmdlet arguments (e.g., -name/--name)."""
|
|
|
|
|
meta = get_cmdlet_metadata(cmd_name, config=config)
|
|
|
|
|
if not meta:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
flags: List[str] = []
|
|
|
|
|
seen: set[str] = set()
|
|
|
|
|
|
|
|
|
|
for arg in meta.get("args", []):
|
|
|
|
|
name = str(arg.get("name") or "").strip().lstrip("-")
|
|
|
|
|
if not name:
|
|
|
|
|
continue
|
|
|
|
|
for candidate in (f"-{name}", f"--{name}"):
|
|
|
|
|
if candidate not in seen:
|
|
|
|
|
flags.append(candidate)
|
|
|
|
|
seen.add(candidate)
|
|
|
|
|
|
|
|
|
|
return flags
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_cmdlet_arg_choices(
|
|
|
|
|
cmd_name: str, arg_name: str, config: Optional[Dict[str, Any]] = None
|
|
|
|
|
) -> List[str]:
|
2026-01-30 16:24:08 -08:00
|
|
|
"""Return declared choices for a cmdlet argument.
|
|
|
|
|
|
|
|
|
|
Special-cases dynamic choices for certain arguments (e.g., Matrix -room)
|
|
|
|
|
which may be populated from configuration or provider queries.
|
|
|
|
|
"""
|
2025-12-29 19:00:00 -08:00
|
|
|
meta = get_cmdlet_metadata(cmd_name, config=config)
|
|
|
|
|
if not meta:
|
|
|
|
|
return []
|
|
|
|
|
target = arg_name.lstrip("-")
|
2026-01-30 16:24:08 -08:00
|
|
|
|
|
|
|
|
# Dynamic handling for Matrix room choices
|
|
|
|
|
try:
|
|
|
|
|
canonical = (meta.get("name") or str(cmd_name)).replace("_", "-")
|
|
|
|
|
except Exception:
|
|
|
|
|
canonical = str(cmd_name)
|
|
|
|
|
|
|
|
|
|
if target == "room" and canonical in (".matrix", "matrix"):
|
|
|
|
|
# Load default room IDs from configuration and attempt to resolve display names
|
|
|
|
|
try:
|
|
|
|
|
if config is None:
|
|
|
|
|
from SYS.config import load_config
|
|
|
|
|
|
|
|
|
|
config = load_config()
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Failed to load config for matrix default choices: %s", exc)
|
2026-01-30 16:24:08 -08:00
|
|
|
config = config or {}
|
|
|
|
|
|
|
|
|
|
matrix_conf = {}
|
|
|
|
|
try:
|
|
|
|
|
providers = config.get("provider") or {}
|
|
|
|
|
matrix_conf = providers.get("matrix") or {}
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Failed to read matrix provider config: %s", exc)
|
2026-01-30 16:24:08 -08:00
|
|
|
matrix_conf = {}
|
|
|
|
|
|
|
|
|
|
raw = None
|
|
|
|
|
for key in ("room", "room_id", "rooms", "room_ids"):
|
|
|
|
|
if key in matrix_conf:
|
|
|
|
|
raw = matrix_conf.get(key)
|
|
|
|
|
break
|
|
|
|
|
ids: List[str] = []
|
|
|
|
|
try:
|
|
|
|
|
if isinstance(raw, (list, tuple, set)):
|
|
|
|
|
ids = [str(v).strip() for v in raw if str(v).strip()]
|
|
|
|
|
else:
|
|
|
|
|
text = str(raw or "").strip()
|
|
|
|
|
if text:
|
|
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
ids = [p.strip() for p in re.split(r"[,\s]+", text) if p and p.strip()]
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Failed to parse matrix room ids from config: %r", raw)
|
2026-01-30 16:24:08 -08:00
|
|
|
ids = []
|
|
|
|
|
|
|
|
|
|
if ids:
|
|
|
|
|
# Try to resolve names via Provider.matrix if config provides auth info
|
|
|
|
|
try:
|
|
|
|
|
hs = matrix_conf.get("homeserver")
|
|
|
|
|
token = matrix_conf.get("access_token")
|
|
|
|
|
if hs and token:
|
|
|
|
|
try:
|
|
|
|
|
from Provider.matrix import Matrix
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
m = Matrix(config)
|
|
|
|
|
rooms = m.list_rooms(room_ids=ids)
|
|
|
|
|
choices = []
|
|
|
|
|
for r in rooms or []:
|
|
|
|
|
name = str(r.get("name") or "").strip()
|
|
|
|
|
rid = str(r.get("room_id") or "").strip()
|
|
|
|
|
choices.append(name or rid)
|
|
|
|
|
if choices:
|
|
|
|
|
return choices
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Matrix provider failed while listing rooms: %s", exc)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Failed to import Matrix provider or initialize: %s", exc)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Failed to resolve matrix rooms: %s", exc)
|
2026-01-30 16:24:08 -08:00
|
|
|
|
|
|
|
|
# Fallback: return raw ids as choices
|
|
|
|
|
return ids
|
|
|
|
|
|
|
|
|
|
# Default static choices from metadata
|
2025-12-29 19:00:00 -08:00
|
|
|
for arg in meta.get("args", []):
|
|
|
|
|
if arg.get("name") == target:
|
|
|
|
|
return list(arg.get("choices", []) or [])
|
|
|
|
|
return []
|