Files
Medios-Macina/ProviderCore/commands.py
T

99 lines
3.0 KiB
Python

from __future__ import annotations
from importlib import import_module
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, Sequence
CmdletFn = Callable[[Any, Sequence[str], Dict[str, Any]], int]
def iter_command_objects(module: Any) -> list[Any]:
objects: list[Any] = []
many = getattr(module, "COMMANDS", None)
if isinstance(many, (list, tuple)):
for item in many:
if item is not None:
objects.append(item)
single = getattr(module, "COMMAND", None)
if single is not None:
objects.append(single)
legacy = getattr(module, "CMDLET", None)
if legacy is not None:
objects.append(legacy)
deduped: list[Any] = []
seen: set[int] = set()
for item in objects:
marker = id(item)
if marker in seen:
continue
seen.add(marker)
deduped.append(item)
return deduped
def get_primary_command_object(module: Any) -> Any:
commands = iter_command_objects(module)
return commands[0] if commands else None
def _register_command_object(cmdlet_obj: Any, registry: Dict[str, CmdletFn]) -> None:
run_fn = getattr(cmdlet_obj, "exec", None) if hasattr(cmdlet_obj, "exec") else None
if not callable(run_fn):
return
name = getattr(cmdlet_obj, "name", None)
if name:
registry[str(name).replace("_", "-").lower()] = run_fn
aliases: list[str] = []
if hasattr(cmdlet_obj, "alias") and getattr(cmdlet_obj, "alias"):
aliases.extend(getattr(cmdlet_obj, "alias") or [])
if hasattr(cmdlet_obj, "aliases") and getattr(cmdlet_obj, "aliases"):
aliases.extend(getattr(cmdlet_obj, "aliases") or [])
for alias in aliases:
text = str(alias or "").strip()
if text:
registry[text.replace("_", "-").lower()] = run_fn
def iter_plugin_command_module_names() -> list[str]:
try:
repo_root = Path(__file__).resolve().parent.parent
except Exception:
return []
plugins_dir = repo_root / "plugins"
if not plugins_dir.is_dir():
return []
module_names: list[str] = []
for entry in sorted(plugins_dir.iterdir(), key=lambda path: path.name.lower()):
if not entry.is_dir() or entry.name.startswith("."):
continue
if not (entry / "__init__.py").is_file():
continue
if (entry / "commands.py").is_file() or (entry / "commands" / "__init__.py").is_file():
module_names.append(f"plugins.{entry.name}.commands")
return module_names
def register_plugin_commands(registry: Dict[str, CmdletFn]) -> None:
for module_name in iter_plugin_command_module_names():
try:
module = import_module(module_name)
for cmdlet_obj in iter_command_objects(module):
_register_command_object(cmdlet_obj, registry)
except Exception as exc:
import sys
print(
f"Error importing plugin command '{module_name}': {exc}",
file=sys.stderr,
)
continue