154 lines
6.0 KiB
Python
154 lines
6.0 KiB
Python
from __future__ import annotations
|
|
|
|
from importlib import import_module
|
|
from typing import Any, Dict, List, Sequence
|
|
import sys
|
|
|
|
from SYS.logger import log
|
|
from . import _shared as sh
|
|
|
|
Cmdlet = sh.Cmdlet
|
|
CmdletArg = sh.CmdletArg
|
|
SharedArgs = sh.SharedArgs
|
|
|
|
|
|
class File(Cmdlet):
|
|
"""Unified file command: file -add|-delete|-get|-merge|..."""
|
|
|
|
_ACTION_FLAGS = {
|
|
"add": {"-add", "--add"},
|
|
"delete": {"-delete", "--delete", "-del", "--del"},
|
|
"get": {"-get", "--get"},
|
|
"merge": {"-merge", "--merge"},
|
|
"download": {"-download", "--download", "-dl", "--dl"},
|
|
"convert": {"-convert", "--convert"},
|
|
"trim": {"-trim", "--trim"},
|
|
"archive": {"-archive", "--archive"},
|
|
"screenshot": {"-screenshot", "--screenshot", "-screen-shot", "--screen-shot", "-shot", "--shot"},
|
|
}
|
|
|
|
_ACTION_MODULE = {
|
|
"add": "cmdlet.file.add",
|
|
"delete": "cmdlet.file.delete",
|
|
"get": "cmdlet.file.get",
|
|
"merge": "cmdlet.file.merge",
|
|
"download": "cmdlet.file.download",
|
|
"search": "cmdlet.file.search",
|
|
"convert": "cmdlet.file.convert",
|
|
"trim": "cmdlet.file.trim",
|
|
"archive": "cmdlet.file.archive",
|
|
"screenshot": "cmdlet.file.screenshot",
|
|
}
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(
|
|
name="file",
|
|
summary="Manage file operations with one command",
|
|
usage='file -query <query> [args] | file (-add|-delete|-get|-merge|-download|-convert|-trim|-archive|-screenshot) [args]',
|
|
arg=[
|
|
SharedArgs.QUERY,
|
|
SharedArgs.PLUGIN,
|
|
SharedArgs.INSTANCE,
|
|
SharedArgs.PATH,
|
|
CmdletArg("-add", type="flag", required=False, description="Run add-file"),
|
|
CmdletArg("-delete", type="flag", required=False, description="Run delete-file", alias="del"),
|
|
CmdletArg("-get", type="flag", required=False, description="Run get-file"),
|
|
CmdletArg("-merge", type="flag", required=False, description="Run merge-file"),
|
|
CmdletArg("-download", type="flag", required=False, description="Run download-file", alias="dl"),
|
|
CmdletArg("-convert", type="flag", required=False, description="Run convert-file"),
|
|
CmdletArg("-trim", type="flag", required=False, description="Run trim-file"),
|
|
CmdletArg("-archive", type="flag", required=False, description="Run archive-file"),
|
|
CmdletArg("-screenshot", type="flag", required=False, description="Run screen-shot", alias="shot"),
|
|
],
|
|
detail=[
|
|
"- Use -query to run search-file through the unified file command.",
|
|
"- Otherwise, exactly one non-search action flag is required.",
|
|
"- Remaining args are passed through to the selected file cmdlet.",
|
|
"- Examples: file -query ..., file -add ..., file -delete ...",
|
|
],
|
|
exec=self.run,
|
|
)
|
|
self.register()
|
|
|
|
@staticmethod
|
|
def _has_query_arg(args: Sequence[str]) -> bool:
|
|
query_flags = {"-query", "--query"}
|
|
for token in args or []:
|
|
text = str(token or "").strip().lower()
|
|
if text in query_flags:
|
|
return True
|
|
if any(text.startswith(f"{flag}=") for flag in query_flags):
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def _extract_action(cls, args: Sequence[str]) -> tuple[str | None, List[str], List[str]]:
|
|
matched_actions: List[str] = []
|
|
passthrough: List[str] = []
|
|
|
|
for token in args or []:
|
|
text = str(token or "")
|
|
lower = text.strip().lower()
|
|
matched = None
|
|
for action_name, variants in cls._ACTION_FLAGS.items():
|
|
if lower in variants:
|
|
matched = action_name
|
|
break
|
|
if matched:
|
|
matched_actions.append(matched)
|
|
continue
|
|
passthrough.append(text)
|
|
|
|
unique_actions: List[str] = []
|
|
for action in matched_actions:
|
|
if action not in unique_actions:
|
|
unique_actions.append(action)
|
|
|
|
if not unique_actions and cls._has_query_arg(passthrough):
|
|
return "search", passthrough, unique_actions
|
|
|
|
if len(unique_actions) != 1:
|
|
return None, passthrough, unique_actions
|
|
return unique_actions[0], passthrough, unique_actions
|
|
|
|
@classmethod
|
|
def _dispatch(cls, action: str, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
|
|
module_name = cls._ACTION_MODULE.get(action)
|
|
if not module_name:
|
|
log(f"file: unsupported action '{action}'", file=sys.stderr)
|
|
return 1
|
|
|
|
module = import_module(module_name)
|
|
|
|
cmdlet_obj = getattr(module, "CMDLET", None)
|
|
if cmdlet_obj is not None:
|
|
exec_fn = getattr(cmdlet_obj, "exec", None)
|
|
if callable(exec_fn):
|
|
return int(exec_fn(result, args, config))
|
|
|
|
fallback_run = getattr(module, "_run", None)
|
|
if callable(fallback_run):
|
|
return int(fallback_run(result, args, config))
|
|
|
|
log(f"file: cannot dispatch action '{action}' via module '{module_name}'", file=sys.stderr)
|
|
return 1
|
|
|
|
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
|
|
action, passthrough_args, seen = self._extract_action(args)
|
|
|
|
if action is None:
|
|
if not seen:
|
|
log(
|
|
"file: missing action; use -query for search or choose exactly one of -add, -delete, -get, -merge, -download, -convert, -trim, -archive, -screenshot",
|
|
file=sys.stderr,
|
|
)
|
|
else:
|
|
rendered = ", ".join(f"-{name}" for name in seen)
|
|
log(f"file: conflicting actions ({rendered}); choose exactly one", file=sys.stderr)
|
|
return 1
|
|
|
|
return self._dispatch(action, result, passthrough_args, config)
|
|
|
|
|
|
CMDLET = File()
|