from __future__ import annotations import sys import tempfile from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Set from SYS.logger import log @dataclass class CmdletArg: """Represents a single cmdlet argument with optional enum choices.""" name: str type: str = "string" required: bool = False description: str = "" choices: List[str] = field(default_factory=list) alias: str = "" handler: Optional[Any] = None variadic: bool = False usage: str = "" requires_db: bool = False query_key: Optional[str] = None query_aliases: List[str] = field(default_factory=list) query_only: bool = False def resolve(self, value: Any) -> Any: if self.handler is not None and callable(self.handler): return self.handler(value) return value def to_flags(self) -> tuple[str, ...]: flags = [f"--{self.name}", f"-{self.name}"] if self.alias: flags.append(f"-{self.alias}") if self.type == "flag": flags.append(f"--no-{self.name}") flags.append(f"-no{self.name}") if self.alias: flags.append(f"-n{self.alias}") return tuple(flags) def QueryArg( name: str, *, key: Optional[str] = None, aliases: Optional[Sequence[str]] = None, type: str = "string", required: bool = False, description: str = "", choices: Optional[Sequence[str]] = None, handler: Optional[Any] = None, query_only: bool = True, ) -> CmdletArg: """Create an argument that can be populated from `-query` fields.""" return CmdletArg( name=str(name), type=str(type or "string"), required=bool(required), description=str(description or ""), choices=list(choices or []), handler=handler, query_key=str(key or name).strip().lower() if str(key or name).strip() else None, query_aliases=[str(a).strip().lower() for a in (aliases or []) if str(a).strip()], query_only=bool(query_only), ) class SharedArgs: """Registry of shared CmdletArg definitions used across multiple cmdlet.""" STORE = CmdletArg( name="store", type="enum", choices=[], description="Selects store", query_key="store", ) URL = CmdletArg( name="url", type="string", description="http parser", ) PROVIDER = CmdletArg( name="provider", type="string", description="selects provider", ) @staticmethod def get_store_choices(config: Optional[Dict[str, Any]] = None, force: bool = False) -> List[str]: if not force and hasattr(SharedArgs, "_cached_available_stores"): return SharedArgs._cached_available_stores or [] if not force: SharedArgs._refresh_store_choices_cache(config, skip_instantiation=True) else: SharedArgs._refresh_store_choices_cache(config, skip_instantiation=False) return SharedArgs._cached_available_stores or [] @staticmethod def _refresh_store_choices_cache(config: Optional[Dict[str, Any]] = None, skip_instantiation: bool = False) -> None: try: if config is None: try: from SYS.config import load_config config = load_config() except Exception: SharedArgs._cached_available_stores = [] return try: from Store.registry import list_configured_backend_names SharedArgs._cached_available_stores = list_configured_backend_names(config) or [] except Exception: SharedArgs._cached_available_stores = [] if skip_instantiation: return try: from Store.registry import Store as StoreRegistry registry = StoreRegistry(config=config, suppress_debug=True) available = registry.list_backends() if available: SharedArgs._cached_available_stores = available except Exception: pass except Exception: SharedArgs._cached_available_stores = [] LOCATION = CmdletArg( "location", type="enum", choices=["hydrus", "0x0"], required=True, description="Destination location", ) DELETE = CmdletArg( "delete", type="flag", description="Delete the file after successful operation.", ) ARTIST = CmdletArg( "artist", type="string", description="Filter by artist name (case-insensitive, partial match).", ) ALBUM = CmdletArg( "album", type="string", description="Filter by album name (case-insensitive, partial match).", ) TRACK = CmdletArg( "track", type="string", description="Filter by track title (case-insensitive, partial match).", ) LIBRARY = CmdletArg( "library", type="string", choices=["hydrus", "local", "soulseek", "libgen", "ftp"], description="Search library or source location.", ) TIMEOUT = CmdletArg( "timeout", type="integer", description="Search or operation timeout in seconds.", ) LIMIT = CmdletArg( "limit", type="integer", description="Maximum number of results to return.", ) PATH = CmdletArg("path", type="string", description="File or directory path.") QUERY = CmdletArg( "query", type="string", description="Unified query string (e.g., hash:, hash:{

,

}).", ) REASON = CmdletArg( "reason", type="string", description="Reason or explanation for the operation.", ) ARCHIVE = CmdletArg( "archive", type="flag", description="Archive the URL to Wayback Machine, Archive.today, and Archive.ph (requires URL argument in cmdlet).", alias="arch", ) @staticmethod def resolve_storage( storage_value: Optional[str], default: Optional[Path] = None, ) -> Path: _ = storage_value if default is not None: return default return Path(tempfile.gettempdir()) @classmethod def get(cls, name: str) -> Optional[CmdletArg]: try: return getattr(cls, name.upper()) except AttributeError: return None @dataclass class Cmdlet: """Represents a cmdlet with metadata and arguments.""" name: str summary: str usage: str alias: List[str] = field(default_factory=list) arg: List[CmdletArg] = field(default_factory=list) detail: List[str] = field(default_factory=list) examples: List[str] = field(default_factory=list) exec: Optional[Callable[[Any, Sequence[str], Dict[str, Any]], int]] = field(default=None) def _collect_names(self) -> List[str]: names: List[str] = [] if self.name: names.append(self.name) for alias in self.alias or []: if alias: names.append(alias) for alias in getattr(self, "aliases", None) or []: if alias: names.append(alias) seen: Set[str] = set() deduped: List[str] = [] for name in names: key = name.replace("_", "-").lower() if key in seen: continue seen.add(key) deduped.append(name) return deduped def register(self) -> "Cmdlet": if not callable(self.exec): return self try: from cmdlet import register_callable as _register_callable except Exception: return self names = self._collect_names() if not names: return self _register_callable(names, self.exec) return self def get_flags(self, arg_name: str) -> set[str]: return {f"-{arg_name}", f"--{arg_name}"} def build_flag_registry(self) -> Dict[str, set[str]]: return {arg.name: self.get_flags(arg.name) for arg in self.arg} def parse_cmdlet_args( args: Sequence[str], cmdlet_spec: Dict[str, Any] | Cmdlet, ) -> Dict[str, Any]: """Parse command-line arguments based on cmdlet specification.""" result: Dict[str, Any] = {} arg_specs_raw = getattr(cmdlet_spec, "arg", None) if arg_specs_raw is None or not isinstance(arg_specs_raw, (list, tuple)): raise TypeError( f"Expected cmdlet-like object with an 'arg' list, got {type(cmdlet_spec).__name__}" ) arg_specs: List[Any] = list(arg_specs_raw) positional_args: List[CmdletArg] = [] query_mapped_args: List[CmdletArg] = [] arg_spec_map: Dict[str, str] = {} arg_spec_by_canonical: Dict[str, Any] = {} for spec in arg_specs: name = getattr(spec, "name", None) if not name: continue try: if getattr(spec, "query_key", None): query_mapped_args.append(spec) except Exception: pass name_str = str(name) canonical_name = name_str.lstrip("-") canonical_key = canonical_name.lower() try: if bool(getattr(spec, "query_only", False)): continue except Exception: pass arg_spec_by_canonical[canonical_key] = spec if "-" not in name_str: positional_args.append(spec) arg_spec_map[canonical_key] = canonical_name arg_spec_map[f"-{canonical_name}".lower()] = canonical_name arg_spec_map[f"--{canonical_name}".lower()] = canonical_name i = 0 positional_index = 0 while i < len(args): token = str(args[i]) token_lower = token.lower() if token_lower in {"-hash", "--hash"} and token_lower not in arg_spec_map: try: log( 'Legacy flag -hash is no longer supported. Use: -query "hash:"', file=sys.stderr, ) except Exception: pass i += 1 continue if token_lower in arg_spec_map: canonical_name = arg_spec_map[token_lower] spec = arg_spec_by_canonical.get(canonical_name.lower()) is_flag = bool(spec and str(getattr(spec, "type", "")).lower() == "flag") if is_flag: result[canonical_name] = True i += 1 else: if i + 1 < len(args) and not str(args[i + 1]).startswith("-"): value = args[i + 1] is_variadic = bool(spec and getattr(spec, "variadic", False)) if is_variadic: if canonical_name not in result: result[canonical_name] = [] elif not isinstance(result[canonical_name], list): result[canonical_name] = [result[canonical_name]] result[canonical_name].append(value) else: result[canonical_name] = value i += 2 else: i += 1 elif positional_index < len(positional_args): positional_spec = positional_args[positional_index] canonical_name = str(getattr(positional_spec, "name", "")).lstrip("-") is_variadic = bool(getattr(positional_spec, "variadic", False)) if is_variadic: if canonical_name not in result: result[canonical_name] = [] elif not isinstance(result[canonical_name], list): result[canonical_name] = [result[canonical_name]] result[canonical_name].append(token) i += 1 else: result[canonical_name] = token positional_index += 1 i += 1 else: i += 1 try: raw_query = result.get("query") except Exception: raw_query = None if query_mapped_args and raw_query is not None: try: from SYS.cli_syntax import parse_query as _parse_query parsed_query = _parse_query(str(raw_query)) fields = parsed_query.get("fields", {}) if isinstance(parsed_query, dict) else {} norm_fields = ( {str(k).strip().lower(): v for k, v in fields.items()} if isinstance(fields, dict) else {} ) except Exception: norm_fields = {} for spec in query_mapped_args: canonical_name = str(getattr(spec, "name", "") or "").lstrip("-") if not canonical_name: continue if canonical_name in result and result.get(canonical_name) not in (None, ""): continue try: key = str(getattr(spec, "query_key", "") or "").strip().lower() aliases = getattr(spec, "query_aliases", None) alias_list = [str(a).strip().lower() for a in (aliases or []) if str(a).strip()] except Exception: key = "" alias_list = [] candidates = [k for k in [key, canonical_name] + alias_list if k] val = None for k in candidates: if k in norm_fields: val = norm_fields.get(k) break if val is None: continue try: result[canonical_name] = spec.resolve(val) except Exception: result[canonical_name] = val return result