Files
Medios-Macina/SYS/cmdlet_spec.py

450 lines
14 KiB
Python
Raw Permalink Normal View History

2026-02-11 19:06:38 -08:00
from __future__ import annotations
import sys
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Set
from SYS.logger import log
@dataclass
class CmdletArg:
"""Represents a single cmdlet argument with optional enum choices."""
name: str
type: str = "string"
required: bool = False
description: str = ""
choices: List[str] = field(default_factory=list)
alias: str = ""
handler: Optional[Any] = None
variadic: bool = False
usage: str = ""
requires_db: bool = False
query_key: Optional[str] = None
query_aliases: List[str] = field(default_factory=list)
query_only: bool = False
def resolve(self, value: Any) -> Any:
if self.handler is not None and callable(self.handler):
return self.handler(value)
return value
def to_flags(self) -> tuple[str, ...]:
flags = [f"--{self.name}", f"-{self.name}"]
if self.alias:
flags.append(f"-{self.alias}")
if self.type == "flag":
flags.append(f"--no-{self.name}")
flags.append(f"-no{self.name}")
if self.alias:
flags.append(f"-n{self.alias}")
return tuple(flags)
def QueryArg(
name: str,
*,
key: Optional[str] = None,
aliases: Optional[Sequence[str]] = None,
type: str = "string",
required: bool = False,
description: str = "",
choices: Optional[Sequence[str]] = None,
handler: Optional[Any] = None,
query_only: bool = True,
) -> CmdletArg:
"""Create an argument that can be populated from `-query` fields."""
return CmdletArg(
name=str(name),
type=str(type or "string"),
required=bool(required),
description=str(description or ""),
choices=list(choices or []),
handler=handler,
query_key=str(key or name).strip().lower() if str(key or name).strip() else None,
query_aliases=[str(a).strip().lower() for a in (aliases or []) if str(a).strip()],
query_only=bool(query_only),
)
class SharedArgs:
"""Registry of shared CmdletArg definitions used across multiple cmdlet."""
STORE = CmdletArg(
name="store",
type="enum",
choices=[],
description="Selects store",
query_key="store",
)
URL = CmdletArg(
name="url",
type="string",
description="http parser",
)
PROVIDER = CmdletArg(
name="provider",
type="string",
description="selects provider",
)
@staticmethod
def get_store_choices(config: Optional[Dict[str, Any]] = None, force: bool = False) -> List[str]:
if not force and hasattr(SharedArgs, "_cached_available_stores"):
return SharedArgs._cached_available_stores or []
if not force:
SharedArgs._refresh_store_choices_cache(config, skip_instantiation=True)
else:
SharedArgs._refresh_store_choices_cache(config, skip_instantiation=False)
return SharedArgs._cached_available_stores or []
@staticmethod
def _refresh_store_choices_cache(config: Optional[Dict[str, Any]] = None, skip_instantiation: bool = False) -> None:
try:
if config is None:
try:
from SYS.config import load_config
config = load_config()
except Exception:
SharedArgs._cached_available_stores = []
return
try:
from Store.registry import list_configured_backend_names
SharedArgs._cached_available_stores = list_configured_backend_names(config) or []
except Exception:
SharedArgs._cached_available_stores = []
if skip_instantiation:
return
try:
from Store.registry import Store as StoreRegistry
registry = StoreRegistry(config=config, suppress_debug=True)
available = registry.list_backends()
if available:
SharedArgs._cached_available_stores = available
except Exception:
pass
except Exception:
SharedArgs._cached_available_stores = []
LOCATION = CmdletArg(
"location",
type="enum",
choices=["hydrus", "0x0"],
required=True,
description="Destination location",
)
DELETE = CmdletArg(
"delete",
type="flag",
description="Delete the file after successful operation.",
)
ARTIST = CmdletArg(
"artist",
type="string",
description="Filter by artist name (case-insensitive, partial match).",
)
ALBUM = CmdletArg(
"album",
type="string",
description="Filter by album name (case-insensitive, partial match).",
)
TRACK = CmdletArg(
"track",
type="string",
description="Filter by track title (case-insensitive, partial match).",
)
LIBRARY = CmdletArg(
"library",
type="string",
choices=["hydrus", "local", "soulseek", "libgen", "ftp"],
description="Search library or source location.",
)
TIMEOUT = CmdletArg(
"timeout",
type="integer",
description="Search or operation timeout in seconds.",
)
LIMIT = CmdletArg(
"limit",
type="integer",
description="Maximum number of results to return.",
)
PATH = CmdletArg("path", type="string", description="File or directory path.")
QUERY = CmdletArg(
"query",
type="string",
description="Unified query string (e.g., hash:<sha256>, hash:{<h1>,<h2>}).",
)
REASON = CmdletArg(
"reason",
type="string",
description="Reason or explanation for the operation.",
)
ARCHIVE = CmdletArg(
"archive",
type="flag",
description="Archive the URL to Wayback Machine, Archive.today, and Archive.ph (requires URL argument in cmdlet).",
alias="arch",
)
@staticmethod
def resolve_storage(
storage_value: Optional[str],
default: Optional[Path] = None,
) -> Path:
_ = storage_value
if default is not None:
return default
return Path(tempfile.gettempdir())
@classmethod
def get(cls, name: str) -> Optional[CmdletArg]:
try:
return getattr(cls, name.upper())
except AttributeError:
return None
@dataclass
class Cmdlet:
"""Represents a cmdlet with metadata and arguments."""
name: str
summary: str
usage: str
alias: List[str] = field(default_factory=list)
arg: List[CmdletArg] = field(default_factory=list)
detail: List[str] = field(default_factory=list)
examples: List[str] = field(default_factory=list)
exec: Optional[Callable[[Any, Sequence[str], Dict[str, Any]], int]] = field(default=None)
def _collect_names(self) -> List[str]:
names: List[str] = []
if self.name:
names.append(self.name)
for alias in self.alias or []:
if alias:
names.append(alias)
for alias in getattr(self, "aliases", None) or []:
if alias:
names.append(alias)
seen: Set[str] = set()
deduped: List[str] = []
for name in names:
key = name.replace("_", "-").lower()
if key in seen:
continue
seen.add(key)
deduped.append(name)
return deduped
def register(self) -> "Cmdlet":
if not callable(self.exec):
return self
try:
from cmdlet import register_callable as _register_callable
except Exception:
return self
names = self._collect_names()
if not names:
return self
_register_callable(names, self.exec)
return self
def get_flags(self, arg_name: str) -> set[str]:
return {f"-{arg_name}", f"--{arg_name}"}
def build_flag_registry(self) -> Dict[str, set[str]]:
return {arg.name: self.get_flags(arg.name) for arg in self.arg}
def parse_cmdlet_args(
args: Sequence[str],
cmdlet_spec: Dict[str, Any] | Cmdlet,
) -> Dict[str, Any]:
"""Parse command-line arguments based on cmdlet specification."""
result: Dict[str, Any] = {}
arg_specs_raw = getattr(cmdlet_spec, "arg", None)
if arg_specs_raw is None or not isinstance(arg_specs_raw, (list, tuple)):
raise TypeError(
f"Expected cmdlet-like object with an 'arg' list, got {type(cmdlet_spec).__name__}"
)
arg_specs: List[Any] = list(arg_specs_raw)
positional_args: List[CmdletArg] = []
query_mapped_args: List[CmdletArg] = []
arg_spec_map: Dict[str, str] = {}
arg_spec_by_canonical: Dict[str, Any] = {}
for spec in arg_specs:
name = getattr(spec, "name", None)
if not name:
continue
try:
if getattr(spec, "query_key", None):
query_mapped_args.append(spec)
except Exception:
pass
name_str = str(name)
canonical_name = name_str.lstrip("-")
canonical_key = canonical_name.lower()
try:
if bool(getattr(spec, "query_only", False)):
continue
except Exception:
pass
arg_spec_by_canonical[canonical_key] = spec
if "-" not in name_str:
positional_args.append(spec)
arg_spec_map[canonical_key] = canonical_name
arg_spec_map[f"-{canonical_name}".lower()] = canonical_name
arg_spec_map[f"--{canonical_name}".lower()] = canonical_name
i = 0
positional_index = 0
while i < len(args):
token = str(args[i])
token_lower = token.lower()
if token_lower in {"-hash", "--hash"} and token_lower not in arg_spec_map:
try:
log(
'Legacy flag -hash is no longer supported. Use: -query "hash:<sha256>"',
file=sys.stderr,
)
except Exception:
pass
i += 1
continue
if token_lower in arg_spec_map:
canonical_name = arg_spec_map[token_lower]
spec = arg_spec_by_canonical.get(canonical_name.lower())
is_flag = bool(spec and str(getattr(spec, "type", "")).lower() == "flag")
if is_flag:
result[canonical_name] = True
i += 1
else:
if i + 1 < len(args) and not str(args[i + 1]).startswith("-"):
value = args[i + 1]
is_variadic = bool(spec and getattr(spec, "variadic", False))
if is_variadic:
if canonical_name not in result:
result[canonical_name] = []
elif not isinstance(result[canonical_name], list):
result[canonical_name] = [result[canonical_name]]
result[canonical_name].append(value)
else:
result[canonical_name] = value
i += 2
else:
i += 1
elif positional_index < len(positional_args):
positional_spec = positional_args[positional_index]
canonical_name = str(getattr(positional_spec, "name", "")).lstrip("-")
is_variadic = bool(getattr(positional_spec, "variadic", False))
if is_variadic:
if canonical_name not in result:
result[canonical_name] = []
elif not isinstance(result[canonical_name], list):
result[canonical_name] = [result[canonical_name]]
result[canonical_name].append(token)
i += 1
else:
result[canonical_name] = token
positional_index += 1
i += 1
else:
i += 1
try:
raw_query = result.get("query")
except Exception:
raw_query = None
if query_mapped_args and raw_query is not None:
try:
from SYS.cli_syntax import parse_query as _parse_query
parsed_query = _parse_query(str(raw_query))
fields = parsed_query.get("fields", {}) if isinstance(parsed_query, dict) else {}
norm_fields = (
{str(k).strip().lower(): v for k, v in fields.items()}
if isinstance(fields, dict)
else {}
)
except Exception:
norm_fields = {}
for spec in query_mapped_args:
canonical_name = str(getattr(spec, "name", "") or "").lstrip("-")
if not canonical_name:
continue
if canonical_name in result and result.get(canonical_name) not in (None, ""):
continue
try:
key = str(getattr(spec, "query_key", "") or "").strip().lower()
aliases = getattr(spec, "query_aliases", None)
alias_list = [str(a).strip().lower() for a in (aliases or []) if str(a).strip()]
except Exception:
key = ""
alias_list = []
candidates = [k for k in [key, canonical_name] + alias_list if k]
val = None
for k in candidates:
if k in norm_fields:
val = norm_fields.get(k)
break
if val is None:
continue
try:
result[canonical_name] = spec.resolve(val)
except Exception:
result[canonical_name] = val
return result