Files
Medios-Macina/CLI.py
2026-01-23 17:12:15 -08:00

2464 lines
94 KiB
Python

from __future__ import annotations
"""Medeia-Macina CLI.
This module intentionally uses a class-based architecture:
- no legacy procedural entrypoints
- no compatibility shims
- all REPL/pipeline/cmdlet execution state lives on objects
"""
# When running the CLI directly (not via the 'mm' launcher), honor the
# repository config `debug` flag by enabling `MM_DEBUG` so import-time
# diagnostics and bootstrap debug output are visible without setting the
# environment variable manually.
import os
from pathlib import Path
if not os.environ.get("MM_DEBUG"):
try:
# Check database first
db_path = Path(__file__).resolve().parent / "medios.db"
if db_path.exists():
import sqlite3
with sqlite3.connect(str(db_path), timeout=30.0) as conn:
cur = conn.cursor()
# Check for global debug key
cur.execute("SELECT value FROM config WHERE key = 'debug' AND category = 'global'")
row = cur.fetchone()
if row:
val = str(row[0]).strip().lower()
if val in ("1", "true", "yes", "on"):
os.environ["MM_DEBUG"] = "1"
except Exception:
pass
import json
import shlex
import sys
import threading
import time
import uuid
from copy import deepcopy
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, cast, Callable
import typer
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit.document import Document
from prompt_toolkit.styles import Style
from rich.console import Console
from rich.layout import Layout
from rich.panel import Panel
from rich.markdown import Markdown
from rich.bar import Bar
from rich.table import Table as RichTable
from SYS.rich_display import (
stderr_console,
stdout_console,
)
def _install_rich_traceback(*, show_locals: bool = False) -> None:
"""Install Rich traceback handler as the default excepthook.
This keeps uncaught exceptions readable in the terminal.
"""
try:
from rich.traceback import install as rich_traceback_install
rich_traceback_install(show_locals=bool(show_locals))
except Exception:
# Fall back to the standard Python traceback if Rich isn't available.
return
# Default to Rich tracebacks for the whole process.
_install_rich_traceback(show_locals=False)
from SYS.logger import debug, set_debug
from SYS.worker_manager import WorkerManager
from SYS.cmdlet_catalog import (
get_cmdlet_arg_choices,
get_cmdlet_arg_flags,
get_cmdlet_metadata,
import_cmd_module,
list_cmdlet_metadata,
list_cmdlet_names,
)
from SYS.config import load_config
from SYS.result_table import Table
from SYS.worker import WorkerManagerRegistry, WorkerStages, WorkerOutputMirror, WorkerStageSession
from SYS.pipeline import PipelineExecutor
from ProviderCore.registry import provider_inline_query_choices
# Selection parsing and REPL lexer moved to SYS.cli_parsing
from SYS.cli_parsing import Lexer, DRIVE_RE, KEY_PREFIX_RE, TOKEN_PATTERN, SELECTION_RANGE_RE, SelectionSyntax, SelectionFilterSyntax
# SelectionFilterSyntax moved to SYS.cli_parsing (imported above)
class _OldWorkerStages:
"""Factory methods for stage/pipeline worker sessions."""
@staticmethod
def _start_worker_session(
worker_manager: Optional[WorkerManager],
*,
worker_type: str,
title: str,
description: str,
pipe_text: str,
config: Optional[Dict[str,
Any]],
completion_label: str,
error_label: str,
skip_logging_for: Optional[Set[str]] = None,
session_worker_ids: Optional[Set[str]] = None,
) -> Optional[WorkerStageSession]:
if worker_manager is None:
return None
if skip_logging_for and worker_type in skip_logging_for:
return None
safe_type = worker_type or "cmd"
worker_id = f"cli_{safe_type[:8]}_{uuid.uuid4().hex[:6]}"
try:
tracked = worker_manager.track_worker(
worker_id,
worker_type=worker_type,
title=title,
description=description or "(no args)",
pipe=pipe_text,
)
if not tracked:
return None
except Exception as exc:
print(f"[worker] Failed to track {worker_type}: {exc}", file=sys.stderr)
return None
if session_worker_ids is not None:
session_worker_ids.add(worker_id)
logging_enabled = False
try:
handler = worker_manager.enable_logging_for_worker(worker_id)
logging_enabled = handler is not None
except Exception:
logging_enabled = False
orig_stdout = sys.stdout
orig_stderr = sys.stderr
stdout_proxy = WorkerOutputMirror(
orig_stdout,
worker_manager,
worker_id,
"stdout"
)
stderr_proxy = WorkerOutputMirror(
orig_stderr,
worker_manager,
worker_id,
"stderr"
)
sys.stdout = stdout_proxy
sys.stderr = stderr_proxy
if isinstance(config, dict):
config["_current_worker_id"] = worker_id
try:
worker_manager.log_step(worker_id, f"Started {worker_type}")
except Exception:
pass
return WorkerStageSession(
manager=worker_manager,
worker_id=worker_id,
orig_stdout=orig_stdout,
orig_stderr=orig_stderr,
stdout_proxy=stdout_proxy,
stderr_proxy=stderr_proxy,
config=config,
logging_enabled=logging_enabled,
completion_label=completion_label,
error_label=error_label,
)
@classmethod
def begin_stage(
cls,
worker_manager: Optional[WorkerManager],
*,
cmd_name: str,
stage_tokens: Sequence[str],
config: Optional[Dict[str,
Any]],
command_text: str,
) -> Optional[WorkerStageSession]:
description = " ".join(stage_tokens[1:]
) if len(stage_tokens) > 1 else "(no args)"
session_worker_ids = None
if isinstance(config, dict):
session_worker_ids = config.get("_session_worker_ids")
return cls._start_worker_session(
worker_manager,
worker_type=cmd_name,
title=f"{cmd_name} stage",
description=description,
pipe_text=command_text,
config=config,
completion_label="Stage completed",
error_label="Stage error",
skip_logging_for={".worker",
"worker",
"workers"},
session_worker_ids=session_worker_ids,
)
@classmethod
def begin_pipeline(
cls,
worker_manager: Optional[WorkerManager],
*,
pipeline_text: str,
config: Optional[Dict[str,
Any]],
) -> Optional[WorkerStageSession]:
session_worker_ids: Set[str] = set()
if isinstance(config, dict):
config["_session_worker_ids"] = session_worker_ids
return cls._start_worker_session(
worker_manager,
worker_type="pipeline",
title="Pipeline run",
description=pipeline_text,
pipe_text=pipeline_text,
config=config,
completion_label="Pipeline completed",
error_label="Pipeline error",
session_worker_ids=session_worker_ids,
)
class CmdletIntrospection:
@staticmethod
def cmdlet_names(force: bool = False) -> List[str]:
try:
return list_cmdlet_names(force=force) or []
except Exception:
return []
@staticmethod
def cmdlet_args(cmd_name: str,
config: Optional[Dict[str,
Any]] = None) -> List[str]:
try:
return get_cmdlet_arg_flags(cmd_name, config=config) or []
except Exception:
return []
@staticmethod
def store_choices(config: Dict[str, Any], force: bool = False) -> List[str]:
try:
# Use the cached startup check from SharedArgs
from cmdlet._shared import SharedArgs
return SharedArgs.get_store_choices(config, force=force)
except Exception:
return []
@classmethod
def arg_choices(cls,
*,
cmd_name: str,
arg_name: str,
config: Dict[str,
Any],
force: bool = False) -> List[str]:
try:
normalized_arg = (arg_name or "").lstrip("-").strip().lower()
if normalized_arg in ("storage", "store"):
# Use cached/lightweight names for completions to avoid instantiating backends
# (instantiating backends may perform heavy initialization).
backends = cls.store_choices(config, force=False)
if backends:
return backends
if normalized_arg == "provider":
canonical_cmd = (cmd_name or "").replace("_", "-").lower()
try:
from ProviderCore.registry import list_search_providers, list_file_providers
except Exception:
list_search_providers = None # type: ignore
list_file_providers = None # type: ignore
provider_choices: List[str] = []
if canonical_cmd in {"add-file"} and list_file_providers is not None:
providers = list_file_providers(config) or {}
available = [
name for name, is_ready in providers.items() if is_ready
]
return sorted(available) if available else sorted(providers.keys())
if list_search_providers is not None:
providers = list_search_providers(config) or {}
available = [
name for name, is_ready in providers.items() if is_ready
]
provider_choices = sorted(available) if available else sorted(
providers.keys()
)
if provider_choices:
return provider_choices
if normalized_arg == "scrape":
try:
from Provider.metadata_provider import list_metadata_providers
meta_providers = list_metadata_providers(config) or {}
if meta_providers:
return sorted(meta_providers.keys())
except Exception:
pass
return get_cmdlet_arg_choices(cmd_name, arg_name) or []
except Exception:
return []
class CmdletCompleter(Completer):
"""Prompt-toolkit completer for the Medeia cmdlet REPL."""
def __init__(self, *, config_loader: "ConfigLoader") -> None:
self._config_loader = config_loader
self.cmdlet_names = CmdletIntrospection.cmdlet_names()
@staticmethod
def _used_arg_logicals(
cmd_name: str,
stage_tokens: List[str],
config: Dict[str,
Any]
) -> Set[str]:
"""Return logical argument names already used in this cmdlet stage.
Example: if the user has typed `download-file -url ...`, then `url`
is considered used and should not be suggested again (even as `--url`).
"""
arg_flags = CmdletIntrospection.cmdlet_args(cmd_name, config)
allowed = {a.lstrip("-").strip().lower()
for a in arg_flags if a}
if not allowed:
return set()
used: Set[str] = set()
for tok in stage_tokens[1:]:
if not tok or not tok.startswith("-"):
continue
if tok in {"-",
"--"}:
continue
# Handle common `-arg=value` form.
raw = tok.split("=", 1)[0]
logical = raw.lstrip("-").strip().lower()
if logical and logical in allowed:
used.add(logical)
return used
@staticmethod
def _flag_value(tokens: Sequence[str], *flags: str) -> Optional[str]:
want = {str(f).strip().lower() for f in flags if str(f).strip()}
if not want:
return None
for idx, tok in enumerate(tokens):
low = str(tok or "").strip().lower()
if "=" in low:
head, _ = low.split("=", 1)
if head in want:
return tok.split("=", 1)[1]
if low in want and idx + 1 < len(tokens):
return tokens[idx + 1]
return None
def get_completions(
self,
document: Document,
complete_event
): # type: ignore[override]
# Refresh cmdlet names from introspection to pick up dynamic updates
self.cmdlet_names = CmdletIntrospection.cmdlet_names(force=True)
text = document.text_before_cursor
tokens = text.split()
ends_with_space = bool(text) and text[-1].isspace()
last_pipe = -1
for idx, tok in enumerate(tokens):
if tok == "|":
last_pipe = idx
stage_tokens = tokens[last_pipe + 1:] if last_pipe >= 0 else tokens
if not stage_tokens:
for cmd in self.cmdlet_names:
yield Completion(cmd, start_position=0)
return
if len(stage_tokens) == 1:
current = stage_tokens[0].lower()
if ends_with_space:
cmd_name = current.replace("_", "-")
config = self._config_loader.load()
if cmd_name == "help":
for cmd in self.cmdlet_names:
yield Completion(cmd, start_position=0)
return
if cmd_name not in self.cmdlet_names:
return
arg_names = CmdletIntrospection.cmdlet_args(cmd_name, config)
seen_logicals: Set[str] = set()
for arg in arg_names:
arg_low = arg.lower()
if arg_low.startswith("--"):
continue
logical = arg.lstrip("-").lower()
if logical in seen_logicals:
continue
yield Completion(arg, start_position=0)
seen_logicals.add(logical)
yield Completion("-help", start_position=0)
return
for cmd in self.cmdlet_names:
if cmd.startswith(current):
yield Completion(cmd, start_position=-len(current))
for keyword in ("help", "exit", "quit"):
if keyword.startswith(current):
yield Completion(keyword, start_position=-len(current))
return
cmd_name = stage_tokens[0].replace("_", "-").lower()
if ends_with_space:
current_token = ""
prev_token = stage_tokens[-1].lower()
else:
current_token = stage_tokens[-1].lower()
prev_token = stage_tokens[-2].lower() if len(stage_tokens) > 1 else ""
config = self._config_loader.load()
provider_name = None
if cmd_name == "search-file":
provider_name = self._flag_value(stage_tokens, "-provider", "--provider")
if (
cmd_name == "search-file"
and provider_name
and not ends_with_space
and ":" in current_token
and not current_token.startswith("-")
):
# Allow quoted tokens like "system:g
quote_prefix = current_token[0] if current_token[:1] in {"'", '"'} else ""
inline_token = current_token[1:] if quote_prefix else current_token
if inline_token.endswith(quote_prefix) and len(inline_token) > 1:
inline_token = inline_token[:-1]
# Allow comma-separated inline specs; operate on the last segment only.
if "," in inline_token:
inline_token = inline_token.split(",")[-1].lstrip()
if ":" not in inline_token:
return
field, partial = inline_token.split(":", 1)
field = field.strip().lower()
partial_lower = partial.strip().lower()
inline_choices = provider_inline_query_choices(provider_name, field, config)
if inline_choices:
filtered = (
[c for c in inline_choices if partial_lower in str(c).lower()]
if partial_lower
else list(inline_choices)
)
for choice in (filtered or inline_choices):
# Replace only the partial after the colon; keep the field prefix and quotes as typed.
start_pos = -len(partial)
suggestion = str(choice)
yield Completion(suggestion, start_position=start_pos)
return
choices = CmdletIntrospection.arg_choices(
cmd_name=cmd_name,
arg_name=prev_token,
config=config,
force=True
)
if choices:
choice_list = choices
normalized_prev = prev_token.lstrip("-").strip().lower()
if normalized_prev == "provider" and current_token:
current_lower = current_token.lower()
filtered = [c for c in choices if current_lower in c.lower()]
if filtered:
choice_list = filtered
for choice in choice_list:
yield Completion(choice, start_position=-len(current_token))
# Example: if the user has typed `download-file -url ...`, then `url`
# is considered used and should not be suggested again (even as `--url`).
return
arg_names = CmdletIntrospection.cmdlet_args(cmd_name, config)
used_logicals = self._used_arg_logicals(cmd_name, stage_tokens, config)
logical_seen: Set[str] = set()
for arg in arg_names:
arg_low = arg.lower()
prefer_single_dash = current_token in {"",
"-"}
if prefer_single_dash and arg_low.startswith("--"):
continue
logical = arg.lstrip("-").lower()
if logical in used_logicals:
continue
if prefer_single_dash and logical in logical_seen:
continue
if arg_low.startswith(current_token):
yield Completion(arg, start_position=-len(current_token))
if prefer_single_dash:
logical_seen.add(logical)
if cmd_name in self.cmdlet_names:
if current_token.startswith("--"):
if "--help".startswith(current_token):
yield Completion("--help", start_position=-len(current_token))
else:
if "-help".startswith(current_token):
yield Completion("-help", start_position=-len(current_token))
class ConfigLoader:
def __init__(self, *, root: Path) -> None:
self._root = root
def load(self) -> Dict[str, Any]:
try:
return deepcopy(load_config())
except Exception:
return {}
class CmdletHelp:
@staticmethod
def show_cmdlet_list() -> None:
try:
metadata = list_cmdlet_metadata() or {}
from rich.box import SIMPLE
from rich.panel import Panel
from rich.table import Table as RichTable
table = RichTable(
show_header=True,
header_style="bold",
box=SIMPLE,
expand=True
)
table.add_column("Cmdlet", no_wrap=True)
table.add_column("Aliases")
table.add_column("Args")
table.add_column("Summary")
for cmd_name in sorted(metadata.keys()):
info = metadata[cmd_name]
aliases = info.get("aliases", [])
args = info.get("args", [])
summary = info.get("summary") or ""
alias_str = ", ".join(
[str(a) for a in (aliases or []) if str(a).strip()]
)
arg_names = [
a.get("name") for a in (args or [])
if isinstance(a, dict) and a.get("name")
]
args_str = ", ".join([str(a) for a in arg_names if str(a).strip()])
table.add_row(str(cmd_name), alias_str, args_str, str(summary))
stdout_console().print(Panel(table, title="Cmdlets", expand=False))
except Exception as exc:
from rich.panel import Panel
from rich.text import Text
stderr_console().print(
Panel(Text(f"Error: {exc}"),
title="Error",
expand=False)
)
@staticmethod
def show_cmdlet_help(cmd_name: str) -> None:
try:
meta = get_cmdlet_metadata(cmd_name)
if meta:
CmdletHelp._print_metadata(cmd_name, meta)
return
print(f"Unknown command: {cmd_name}\n")
except Exception as exc:
print(f"Error: {exc}\n")
@staticmethod
def _print_metadata(cmd_name: str, data: Any) -> None:
d = data.to_dict() if hasattr(data, "to_dict") else data
if not isinstance(d, dict):
from rich.panel import Panel
from rich.text import Text
stderr_console().print(
Panel(
Text(f"Invalid metadata for {cmd_name}"),
title="Error",
expand=False
)
)
return
name = d.get("name", cmd_name)
summary = d.get("summary", "")
usage = d.get("usage", "")
description = d.get("description", "")
args = d.get("args", [])
details = d.get("details", [])
from rich.box import SIMPLE
from rich.console import Group
from rich.panel import Panel
from rich.table import Table as RichTable
from rich.text import Text
header = Text.assemble((str(name), "bold"))
synopsis = Text(str(usage or name))
stdout_console().print(
Panel(Group(header,
synopsis),
title="Help",
expand=False)
)
if summary or description:
desc_bits: List[Text] = []
if summary:
desc_bits.append(Text(str(summary)))
if description:
desc_bits.append(Text(str(description)))
stdout_console().print(
Panel(Group(*desc_bits),
title="Description",
expand=False)
)
if args and isinstance(args, list):
param_table = RichTable(
show_header=True,
header_style="bold",
box=SIMPLE,
expand=True
)
param_table.add_column("Arg", no_wrap=True)
param_table.add_column("Type", no_wrap=True)
param_table.add_column("Required", no_wrap=True)
param_table.add_column("Description")
for arg in args:
if isinstance(arg, dict):
name_str = arg.get("name", "?")
typ = arg.get("type", "string")
required = bool(arg.get("required", False))
desc = arg.get("description", "")
else:
name_str = getattr(arg, "name", "?")
typ = getattr(arg, "type", "string")
required = bool(getattr(arg, "required", False))
desc = getattr(arg, "description", "")
param_table.add_row(
f"-{name_str}",
str(typ),
"yes" if required else "no",
str(desc or "")
)
stdout_console().print(Panel(param_table, title="Parameters", expand=False))
if details:
stdout_console().print(
Panel(
Group(*[Text(str(x)) for x in details]),
title="Remarks",
expand=False
)
)
class CmdletExecutor:
def __init__(self, *, config_loader: ConfigLoader) -> None:
self._config_loader = config_loader
@staticmethod
def _get_table_title_for_command(
cmd_name: str,
emitted_items: Optional[List[Any]] = None,
cmd_args: Optional[List[str]] = None,
) -> str:
title_map = {
"search-file": "Results",
"search_file": "Results",
"download-data": "Downloads",
"download_data": "Downloads",
"download-file": "Downloads",
"download_file": "Downloads",
"get-tag": "Tags",
"get_tag": "Tags",
"get-file": "Results",
"get_file": "Results",
"add-tags": "Results",
"add_tags": "Results",
"delete-tag": "Results",
"delete_tag": "Results",
"add-url": "Results",
"add_url": "Results",
"get-url": "url",
"get_url": "url",
"delete-url": "Results",
"delete_url": "Results",
"get-note": "Notes",
"get_note": "Notes",
"add-note": "Results",
"add_note": "Results",
"delete-note": "Results",
"delete_note": "Results",
"get-relationship": "Relationships",
"get_relationship": "Relationships",
"add-relationship": "Results",
"add_relationship": "Results",
"add-file": "Results",
"add_file": "Results",
"delete-file": "Results",
"delete_file": "Results",
"get-metadata": None,
"get_metadata": None,
}
mapped = title_map.get(cmd_name, "Results")
if mapped is not None:
return mapped
if emitted_items:
first = emitted_items[0]
try:
if isinstance(first, dict) and first.get("title"):
return str(first.get("title"))
if hasattr(first, "title") and getattr(first, "title"):
return str(getattr(first, "title"))
except Exception:
pass
return "Results"
def execute(self, cmd_name: str, args: List[str]) -> None:
from SYS import pipeline as ctx
from cmdlet import REGISTRY
# REPL guard: stage-local selection tables should not leak across independent
# commands. @ selection can always re-seed from the last result table.
try:
if hasattr(ctx, "set_current_stage_table"):
ctx.set_current_stage_table(None)
except Exception:
pass
cmd_fn = REGISTRY.get(cmd_name)
if not cmd_fn:
# Lazy-import module and register its CMDLET.
try:
mod = import_cmd_module(cmd_name)
data = getattr(mod, "CMDLET", None) if mod else None
if data and hasattr(data, "exec") and callable(getattr(data, "exec")):
run_fn = getattr(data, "exec")
REGISTRY[cmd_name] = run_fn
cmd_fn = run_fn
except Exception:
cmd_fn = None
if not cmd_fn:
print(f"Unknown command: {cmd_name}\n")
return
config = self._config_loader.load()
# ------------------------------------------------------------------
# Single-command Live pipeline progress (match REPL behavior)
# ------------------------------------------------------------------
progress_ui = None
pipe_idx: Optional[int] = None
def _maybe_start_single_live_progress(
*,
cmd_name_norm: str,
filtered_args: List[str],
piped_input: Any,
config: Any,
) -> None:
nonlocal progress_ui, pipe_idx
# Keep behavior consistent with pipeline runner exclusions.
# Some commands render their own Rich UI (tables/panels) and don't
# play nicely with Live cursor control.
if cmd_name_norm in {
"get-relationship",
"get-rel",
".pipe",
".mpv",
".matrix",
".telegram",
"telegram",
"delete-file",
"del-file",
}:
return
# add-file directory selector mode: show only the selection table, no Live progress.
if cmd_name_norm in {"add-file",
"add_file"}:
try:
from pathlib import Path as _Path
toks = list(filtered_args or [])
i = 0
while i < len(toks):
t = str(toks[i])
low = t.lower().strip()
if low in {"-path",
"--path",
"-p"} and i + 1 < len(toks):
nxt = str(toks[i + 1])
if nxt and ("," not in nxt):
p = _Path(nxt)
if p.exists() and p.is_dir():
return
i += 2
continue
i += 1
except Exception:
pass
try:
quiet_mode = (
bool(config.get("_quiet_background_output"))
if isinstance(config,
dict) else False
)
except Exception:
quiet_mode = False
if quiet_mode:
return
try:
import sys as _sys
if not bool(getattr(_sys.stderr, "isatty", lambda: False)()):
return
except Exception:
return
try:
from SYS.models import PipelineLiveProgress
progress_ui = PipelineLiveProgress([cmd_name_norm], enabled=True)
progress_ui.start()
try:
if hasattr(ctx, "set_live_progress"):
ctx.set_live_progress(progress_ui)
except Exception:
pass
pipe_idx = 0
# Estimate per-item task count for the single pipe.
total_items = 1
preview_items: Optional[List[Any]] = None
try:
if isinstance(piped_input, list):
total_items = max(1, int(len(piped_input)))
preview_items = list(piped_input)
elif piped_input is not None:
total_items = 1
preview_items = [piped_input]
else:
preview: List[Any] = []
toks = list(filtered_args or [])
i = 0
while i < len(toks):
t = str(toks[i])
low = t.lower().strip()
if (cmd_name_norm in {"add-file",
"add_file"} and low in {"-path",
"--path",
"-p"}
and i + 1 < len(toks)):
nxt = str(toks[i + 1])
if nxt:
if "," in nxt:
parts = [
p.strip().strip("\"'")
for p in nxt.split(",")
]
parts = [p for p in parts if p]
if parts:
preview.extend(parts)
i += 2
continue
else:
preview.append(nxt)
i += 2
continue
if low in {"-url",
"--url"} and i + 1 < len(toks):
nxt = str(toks[i + 1])
if nxt and not nxt.startswith("-"):
preview.append(nxt)
i += 2
continue
if (not t.startswith("-")) and ("://" in low
or low.startswith(
("magnet:",
"torrent:"))):
preview.append(t)
i += 1
preview_items = preview if preview else None
total_items = max(1, int(len(preview)) if preview else 1)
except Exception:
total_items = 1
preview_items = None
try:
progress_ui.begin_pipe(
0,
total_items=int(total_items),
items_preview=preview_items
)
except Exception:
pass
except Exception:
progress_ui = None
pipe_idx = None
filtered_args: List[str] = []
selected_indices: List[int] = []
select_all = False
selection_filters: List[List[Tuple[str, str]]] = []
value_flags: Set[str] = set()
try:
meta = get_cmdlet_metadata(cmd_name)
raw = meta.get("raw") if isinstance(meta, dict) else None
arg_specs = getattr(raw, "arg", None) if raw is not None else None
if isinstance(arg_specs, list):
for spec in arg_specs:
spec_type = str(getattr(spec,
"type",
"string") or "string").strip().lower()
if spec_type == "flag":
continue
spec_name = str(getattr(spec, "name", "") or "")
canonical = spec_name.lstrip("-").strip()
if not canonical:
continue
value_flags.add(f"-{canonical}".lower())
value_flags.add(f"--{canonical}".lower())
alias = str(getattr(spec, "alias", "") or "").strip()
if alias:
value_flags.add(f"-{alias}".lower())
except Exception:
value_flags = set()
for i, arg in enumerate(args):
if isinstance(arg, str) and arg.startswith("@"): # selection candidate
prev = str(args[i - 1]).lower() if i > 0 else ""
if prev in value_flags:
filtered_args.append(arg)
continue
# Universal selection filter: @"COL:expr" (quotes may be stripped by tokenization)
filter_spec = SelectionFilterSyntax.parse(arg)
if filter_spec is not None:
selection_filters.append(filter_spec)
continue
if arg.strip() == "@*":
select_all = True
continue
selection = SelectionSyntax.parse(arg)
if selection is not None:
zero_based = sorted(idx - 1 for idx in selection)
for idx in zero_based:
if idx not in selected_indices:
selected_indices.append(idx)
continue
filtered_args.append(arg)
continue
filtered_args.append(str(arg))
# IMPORTANT: Do not implicitly feed the previous command's results into
# a new command unless the user explicitly selected items via @ syntax.
# Piping should require `|` (or an explicit @ selection).
piped_items = ctx.get_last_result_items()
result: Any = None
effective_selected_indices: List[int] = []
if piped_items and (select_all or selected_indices or selection_filters):
candidate_idxs = list(range(len(piped_items)))
for spec in selection_filters:
candidate_idxs = [
i for i in candidate_idxs
if SelectionFilterSyntax.matches(piped_items[i], spec)
]
if select_all:
effective_selected_indices = list(candidate_idxs)
elif selected_indices:
effective_selected_indices = [
candidate_idxs[i] for i in selected_indices
if 0 <= i < len(candidate_idxs)
]
else:
effective_selected_indices = list(candidate_idxs)
result = [piped_items[i] for i in effective_selected_indices]
worker_manager = WorkerManagerRegistry.ensure(config)
stage_session = WorkerStages.begin_stage(
worker_manager,
cmd_name=cmd_name,
stage_tokens=[cmd_name,
*filtered_args],
config=config,
command_text=" ".join([cmd_name,
*filtered_args]).strip() or cmd_name,
)
stage_worker_id = stage_session.worker_id if stage_session else None
# Start live progress after we know the effective cmd + args + piped input.
cmd_norm = str(cmd_name or "").replace("_", "-").strip().lower()
_maybe_start_single_live_progress(
cmd_name_norm=cmd_norm or str(cmd_name or "").strip().lower(),
filtered_args=filtered_args,
piped_input=result,
config=config,
)
on_emit = None
if progress_ui is not None and pipe_idx is not None:
_ui = progress_ui
def _on_emit(obj: Any, _progress=_ui) -> None:
try:
_progress.on_emit(0, obj)
except Exception:
pass
on_emit = _on_emit
pipeline_ctx = ctx.PipelineStageContext(
stage_index=0,
total_stages=1,
pipe_index=pipe_idx if pipe_idx is not None else 0,
worker_id=stage_worker_id,
on_emit=on_emit,
)
ctx.set_stage_context(pipeline_ctx)
stage_status = "completed"
stage_error = ""
ctx.set_last_selection(effective_selected_indices)
try:
try:
if hasattr(ctx, "set_current_cmdlet_name"):
ctx.set_current_cmdlet_name(cmd_name)
except Exception:
pass
try:
if hasattr(ctx, "set_current_stage_text"):
raw_stage = ""
try:
raw_stage = (
ctx.get_current_command_text("")
if hasattr(ctx,
"get_current_command_text") else ""
)
except Exception:
raw_stage = ""
if raw_stage:
ctx.set_current_stage_text(raw_stage)
else:
ctx.set_current_stage_text(
" ".join([cmd_name,
*filtered_args]).strip() or cmd_name
)
except Exception:
pass
ret_code = cmd_fn(result, filtered_args, config)
if getattr(pipeline_ctx, "emits", None):
emits = list(pipeline_ctx.emits)
# Shared `-path` behavior: if the cmdlet emitted temp/PATH file artifacts,
# move them to the user-specified destination and update emitted paths.
try:
from cmdlet import _shared as sh
emits = sh.apply_output_path_from_pipeobjects(
cmd_name=cmd_name,
args=filtered_args,
emits=emits
)
try:
pipeline_ctx.emits = list(emits)
except Exception:
pass
except Exception:
pass
# Detect format-selection emits and skip printing (user selects with @N).
is_format_selection = False
if emits:
first_emit = emits[0]
if isinstance(first_emit, dict) and "format_id" in first_emit:
is_format_selection = True
if is_format_selection:
ctx.set_last_result_items_only(emits)
else:
table_title = self._get_table_title_for_command(
cmd_name,
emits,
filtered_args
)
selectable_commands = {
"search-file",
"download-data",
"download-file",
"search_file",
"download_data",
"download_file",
".config",
".worker",
}
display_only_commands = {
"get-url",
"get_url",
"get-note",
"get_note",
"get-relationship",
"get_relationship",
"get-file",
"get_file",
"get-metadata",
"get_metadata",
}
self_managing_commands = {
"get-tag",
"get_tag",
"tags",
"get-metadata",
"get_metadata",
"search-file",
"search_file",
}
if cmd_name in self_managing_commands:
table = (
ctx.get_display_table()
if hasattr(ctx, "get_display_table") else None
)
if table is None:
table = ctx.get_last_result_table()
if table is None:
table = Table(table_title)
for emitted in emits:
table.add_result(emitted)
else:
table = Table(table_title)
for emitted in emits:
table.add_result(emitted)
if cmd_name in selectable_commands:
table.set_source_command(cmd_name, filtered_args)
ctx.set_last_result_table(table, emits)
ctx.set_current_stage_table(None)
elif cmd_name in display_only_commands:
ctx.set_last_result_items_only(emits)
else:
ctx.set_last_result_items_only(emits)
# Stop Live progress before printing tables.
if progress_ui is not None:
try:
if pipe_idx is not None:
progress_ui.finish_pipe(
int(pipe_idx),
force_complete=(stage_status == "completed")
)
except Exception:
pass
try:
progress_ui.complete_all_pipes()
except Exception:
pass
try:
progress_ui.stop()
except Exception:
pass
try:
if hasattr(ctx, "set_live_progress"):
ctx.set_live_progress(None)
except Exception:
pass
progress_ui = None
pipe_idx = None
if not getattr(table, "_rendered_by_cmdlet", False):
stdout_console().print()
stdout_console().print(table)
# If the cmdlet produced a current-stage table without emits (e.g. format selection),
# render it here for parity with REPL pipeline runner.
if (not getattr(pipeline_ctx,
"emits",
None)) and hasattr(ctx,
"get_current_stage_table"):
try:
stage_table = ctx.get_current_stage_table()
except Exception:
stage_table = None
if stage_table is not None:
try:
already_rendered = bool(
getattr(stage_table,
"_rendered_by_cmdlet",
False)
)
except Exception:
already_rendered = False
if already_rendered:
return
if progress_ui is not None:
try:
if pipe_idx is not None:
progress_ui.finish_pipe(
int(pipe_idx),
force_complete=(stage_status == "completed")
)
except Exception:
pass
try:
progress_ui.complete_all_pipes()
except Exception:
pass
try:
progress_ui.stop()
except Exception:
pass
try:
if hasattr(ctx, "set_live_progress"):
ctx.set_live_progress(None)
except Exception:
pass
progress_ui = None
pipe_idx = None
stdout_console().print()
stdout_console().print(stage_table)
if ret_code != 0:
stage_status = "failed"
stage_error = f"exit code {ret_code}"
# No print here - we want to keep output clean and avoid redundant "exit code" notices.
except Exception as exc:
stage_status = "failed"
stage_error = f"{type(exc).__name__}: {exc}"
print(f"[error] {type(exc).__name__}: {exc}\n")
finally:
if progress_ui is not None:
try:
if pipe_idx is not None:
progress_ui.finish_pipe(
int(pipe_idx),
force_complete=(stage_status == "completed")
)
except Exception:
pass
try:
progress_ui.complete_all_pipes()
except Exception:
pass
try:
progress_ui.stop()
except Exception:
pass
try:
if hasattr(ctx, "set_live_progress"):
ctx.set_live_progress(None)
except Exception:
pass
# Do not keep stage tables around after a single command; it can cause
# later @ selections to bind to stale tables (e.g. old add-file scans).
try:
if hasattr(ctx, "set_current_stage_table"):
ctx.set_current_stage_table(None)
except Exception:
pass
try:
if hasattr(ctx, "clear_current_cmdlet_name"):
ctx.clear_current_cmdlet_name()
except Exception:
pass
try:
if hasattr(ctx, "clear_current_stage_text"):
ctx.clear_current_stage_text()
except Exception:
pass
ctx.clear_last_selection()
if stage_session:
stage_session.close(status=stage_status, error_msg=stage_error)
console = Console()
class CLI:
"""Main CLI application object."""
ROOT = Path(__file__).resolve().parent
def __init__(self) -> None:
self._config_loader = ConfigLoader(root=self.ROOT)
# Optional dependency auto-install for configured tools (best-effort).
try:
from SYS.optional_deps import maybe_auto_install_configured_tools
maybe_auto_install_configured_tools(self._config_loader.load())
except Exception:
pass
# Initialize the store choices cache at startup (filters disabled stores)
try:
from cmdlet._shared import SharedArgs
config = self._config_loader.load()
SharedArgs._refresh_store_choices_cache(config)
except Exception:
pass
self._cmdlet_executor = CmdletExecutor(config_loader=self._config_loader)
self._pipeline_executor = PipelineExecutor(config_loader=self._config_loader)
@staticmethod
def parse_selection_syntax(token: str) -> Optional[Set[int]]:
return SelectionSyntax.parse(token)
@classmethod
def get_store_choices(cls) -> List[str]:
loader = ConfigLoader(root=cls.ROOT)
return CmdletIntrospection.store_choices(loader.load())
def build_app(self) -> typer.Typer:
app = typer.Typer(help="Medeia-Macina CLI")
def _validate_pipeline_option(
ctx: typer.Context,
param: typer.CallbackParam,
value: str
):
try:
from SYS.cli_syntax import validate_pipeline_text
syntax_error = validate_pipeline_text(value)
if syntax_error:
raise typer.BadParameter(syntax_error.message)
except typer.BadParameter:
raise
except Exception:
pass
return value
@app.command("pipeline")
def pipeline(
command: str = typer.Option(
...,
"--pipeline",
"-p",
help="Pipeline command string to execute",
callback=_validate_pipeline_option,
),
seeds_json: Optional[str] = typer.Option(
None,
"--seeds-json",
"-s",
help="JSON string of seed items"
),
) -> None:
from SYS import pipeline as ctx
config = self._config_loader.load()
debug_enabled = bool(config.get("debug", False))
set_debug(debug_enabled)
if seeds_json:
try:
seeds = json.loads(seeds_json)
if not isinstance(seeds, list):
seeds = [seeds]
ctx.set_last_result_items_only(seeds)
except Exception as exc:
print(f"Error parsing seeds JSON: {exc}")
return
try:
from SYS.cli_syntax import validate_pipeline_text
syntax_error = validate_pipeline_text(command)
if syntax_error:
print(syntax_error.message, file=sys.stderr)
return
except Exception:
pass
try:
tokens = shlex.split(command)
except ValueError as exc:
print(f"Syntax error: {exc}", file=sys.stderr)
return
if not tokens:
return
self._pipeline_executor.execute_tokens(tokens)
@app.command("repl")
def repl() -> None:
self.run_repl()
@app.callback(invoke_without_command=True)
def main_callback(ctx: typer.Context) -> None:
if ctx.invoked_subcommand is None:
self.run_repl()
_ = (pipeline, repl, main_callback)
# Dynamically register all cmdlets as top-level Typer commands so users can
# invoke `mm <cmdlet> [args]` directly from the shell. We use Click/Typer
# context settings to allow arbitrary flags and options to pass through to
# the cmdlet system without Typer trying to parse them.
try:
names = list_cmdlet_names()
skip = {"pipeline", "repl"}
for nm in names:
if not nm or nm in skip:
continue
# create a scoped handler to capture the command name
def _make_handler(cmd_name: str):
@app.command(
cmd_name,
context_settings={
"ignore_unknown_options": True,
"allow_extra_args": True,
},
)
def _handler(ctx: typer.Context):
try:
args = list(ctx.args or [])
except Exception:
args = []
self._cmdlet_executor.execute(cmd_name, args)
return _handler
_make_handler(nm)
except Exception:
# Don't let failure to register dynamic commands break startup
pass
return app
def run(self) -> None:
# Ensure Rich tracebacks are active even when invoking subcommands.
try:
config = self._config_loader.load()
debug_enabled = bool(config.get("debug",
False)
) if isinstance(config,
dict) else False
except Exception:
debug_enabled = False
set_debug(debug_enabled)
_install_rich_traceback(show_locals=debug_enabled)
self.build_app()()
def run_repl(self) -> None:
# console = Console(width=100)
# Valid Rich rainbow colors
RAINBOW = [
"red",
"dark_orange",
"yellow",
"green",
"blue",
"purple",
"magenta",
]
def rainbow_pillar(colors, height=21, bar_width=36):
table = RichTable.grid(padding=0)
table.add_column(no_wrap=True)
for i in range(height):
color = colors[i % len(colors)]
table.add_row(Bar(size=1, begin=0, end=1, width=bar_width, color=color))
return table
# Build root layout
root = Layout(name="root")
root.split_row(
Layout(name="left",
ratio=2),
Layout(name="center",
ratio=8),
Layout(name="right",
ratio=2),
)
# Left pillar → forward rainbow
root["left"].update(
Panel(rainbow_pillar(RAINBOW,
height=21,
bar_width=36),
title="DELTA")
)
# Right pillar → reverse rainbow
root["right"].update(
Panel(
rainbow_pillar(list(reversed(RAINBOW)),
height=21,
bar_width=36),
title="LAMBDA"
)
)
# Center content
center_md = Markdown(
"""
# ****************** Medios Macina ******************
take what you want | keep what you like | share what you love
_____________________________________________________________
_____________________________________________________________
_____________________________________________________________
For suddenly you may be let loose from the net, and thrown out to sea.
Waving around clutching at gnats, unable to lift the heavy anchor. Lost
and without a map, forgotten things from the past by distracting wind storms.
_____________________________________________________________
_____________________________________________________________
_____________________________________________________________
Light shines a straight path to the golden shores.
Come to love it when others take what you share, as there is no greater joy
"""
)
root["center"].update(Panel(center_md, title="KAPPA", height=21))
console.print(root)
prompt_text = "<🜂🜄|🜁🜃>"
startup_table = Table(
"*********<IGNITIO>*********<NOUSEMPEH>*********<RUGRAPOG>*********<OMEGHAU>*********"
)
startup_table._interactive(True)._perseverance(True)
startup_table.set_value_case("upper")
def _upper(value: Any) -> str:
text = "" if value is None else str(value)
return text.upper()
def _add_startup_check(
status: str,
name: str,
*,
provider: str = "",
store: str = "",
files: int | str | None = None,
detail: str = "",
) -> None:
row = startup_table.add_row()
row.add_column("STATUS", _upper(status))
row.add_column("NAME", _upper(name))
row.add_column("PROVIDER", _upper(provider or ""))
row.add_column("STORE", _upper(store or ""))
row.add_column("FILES", "" if files is None else str(files))
row.add_column("DETAIL", _upper(detail or ""))
def _has_store_subtype(cfg: dict, subtype: str) -> bool:
store_cfg = cfg.get("store")
if not isinstance(store_cfg, dict):
return False
bucket = store_cfg.get(subtype)
if not isinstance(bucket, dict):
return False
return any(isinstance(v, dict) and bool(v) for v in bucket.values())
def _has_provider(cfg: dict, name: str) -> bool:
provider_cfg = cfg.get("provider")
if not isinstance(provider_cfg, dict):
return False
block = provider_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def _has_tool(cfg: dict, name: str) -> bool:
tool_cfg = cfg.get("tool")
if not isinstance(tool_cfg, dict):
return False
block = tool_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def _ping_url(url: str, timeout: float = 3.0) -> tuple[bool, str]:
try:
from API.HTTP import HTTPClient
with HTTPClient(timeout=timeout, retries=1) as client:
resp = client.get(url, allow_redirects=True)
code = int(getattr(resp, "status_code", 0) or 0)
ok = 200 <= code < 500
return ok, f"{url} (HTTP {code})"
except Exception as exc:
return False, f"{url} ({type(exc).__name__})"
config = self._config_loader.load()
debug_enabled = bool(config.get("debug", False))
set_debug(debug_enabled)
_install_rich_traceback(show_locals=debug_enabled)
_add_startup_check("ENABLED" if debug_enabled else "DISABLED", "DEBUGGING")
try:
try:
from MPV.mpv_ipc import MPV
import shutil
MPV()
mpv_path = shutil.which("mpv")
_add_startup_check("ENABLED", "MPV", detail=mpv_path or "Available")
except Exception as exc:
_add_startup_check("DISABLED", "MPV", detail=str(exc))
store_registry = None
if config:
try:
from Store import Store as StoreRegistry
store_registry = StoreRegistry(config=config, suppress_debug=True)
except Exception:
store_registry = None
if _has_store_subtype(config, "hydrusnetwork"):
store_cfg = config.get("store")
hydrus_cfg = (
store_cfg.get("hydrusnetwork",
{}) if isinstance(store_cfg,
dict) else {}
)
if isinstance(hydrus_cfg, dict):
for instance_name, instance_cfg in hydrus_cfg.items():
if not isinstance(instance_cfg, dict):
continue
name_key = str(instance_cfg.get("NAME") or instance_name)
url_val = str(instance_cfg.get("URL") or "").strip()
ok = bool(
store_registry
and store_registry.is_available(name_key)
)
status = "ENABLED" if ok else "DISABLED"
if ok:
total = None
try:
if store_registry:
backend = store_registry[name_key]
total = getattr(backend, "total_count", None)
if total is None:
getter = getattr(
backend,
"get_total_count",
None
)
if callable(getter):
total = getter()
except Exception:
total = None
detail = url_val
files = total if isinstance(
total,
int
) and total >= 0 else None
else:
err = None
if store_registry:
err = store_registry.get_backend_error(
instance_name
) or store_registry.get_backend_error(name_key)
detail = (url_val + (" - " if url_val else "")
) + (err or "Unavailable")
files = None
_add_startup_check(
status,
name_key,
store="hydrusnetwork",
files=files,
detail=detail
)
provider_cfg = config.get("provider"
) if isinstance(config,
dict) else None
if isinstance(provider_cfg, dict) and provider_cfg:
from Provider.metadata_provider import list_metadata_providers
from ProviderCore.registry import (
list_file_providers,
list_providers,
list_search_providers,
)
provider_availability = list_providers(config) or {}
search_availability = list_search_providers(config) or {}
file_availability = list_file_providers(config) or {}
meta_availability = list_metadata_providers(config) or {}
def _provider_display_name(key: str) -> str:
k = (key or "").strip()
low = k.lower()
if low == "openlibrary":
return "OpenLibrary"
if low == "alldebrid":
return "AllDebrid"
if low == "youtube":
return "YouTube"
return k[:1].upper() + k[1:] if k else "Provider"
already_checked = {"matrix"}
def _default_provider_ping_targets(provider_key: str) -> list[str]:
prov = (provider_key or "").strip().lower()
if prov == "openlibrary":
return ["https://openlibrary.org"]
if prov == "youtube":
return ["https://www.youtube.com"]
if prov == "bandcamp":
return ["https://bandcamp.com"]
if prov == "libgen":
from Provider.libgen import MIRRORS
mirrors = [
str(x).rstrip("/") for x in (MIRRORS or [])
if str(x).strip()
]
return [m + "/json.php" for m in mirrors]
return []
def _ping_first(urls: list[str]) -> tuple[bool, str]:
for u in urls:
ok, detail = _ping_url(u)
if ok:
return True, detail
if urls:
ok, detail = _ping_url(urls[0])
return ok, detail
return False, "No ping target"
for provider_name in provider_cfg.keys():
prov = str(provider_name or "").strip().lower()
if not prov or prov in already_checked:
continue
display = _provider_display_name(prov)
if prov == "alldebrid":
try:
from Provider.alldebrid import _get_debrid_api_key
from API.alldebrid import AllDebridClient
api_key = _get_debrid_api_key(config)
if not api_key:
_add_startup_check(
"DISABLED",
display,
provider=prov,
detail="Not configured"
)
else:
client = AllDebridClient(api_key)
base_url = str(
getattr(client,
"base_url",
"") or ""
).strip()
_add_startup_check(
"ENABLED",
display,
provider=prov,
detail=base_url or "Connected",
)
except Exception as exc:
_add_startup_check(
"DISABLED",
display,
provider=prov,
detail=str(exc)
)
continue
is_known = False
ok_val: Optional[bool] = None
if prov in provider_availability:
is_known = True
ok_val = bool(provider_availability.get(prov))
elif prov in search_availability:
is_known = True
ok_val = bool(search_availability.get(prov))
elif prov in file_availability:
is_known = True
ok_val = bool(file_availability.get(prov))
elif prov in meta_availability:
is_known = True
ok_val = bool(meta_availability.get(prov))
if not is_known:
_add_startup_check(
"UNKNOWN",
display,
provider=prov,
detail="Not registered"
)
else:
detail = "Configured" if ok_val else "Not configured"
ping_targets = _default_provider_ping_targets(prov)
if ping_targets:
ping_ok, ping_detail = _ping_first(ping_targets)
if ok_val:
detail = ping_detail
else:
detail = (
(detail + " | " +
ping_detail) if ping_detail else detail
)
_add_startup_check(
"ENABLED" if ok_val else "DISABLED",
display,
provider=prov,
detail=detail,
)
already_checked.add(prov)
default_search_providers = [
"openlibrary",
"libgen",
"youtube",
"bandcamp"
]
for prov in default_search_providers:
if prov in already_checked:
continue
display = _provider_display_name(prov)
ok_val = (
bool(search_availability.get(prov))
if prov in search_availability else False
)
ping_targets = _default_provider_ping_targets(prov)
ping_ok, ping_detail = (
_ping_first(ping_targets) if ping_targets else (False, "No ping target")
)
detail = ping_detail or (
"Available" if ok_val else "Unavailable"
)
if not ok_val:
detail = "Unavailable" + (
f" | {ping_detail}" if ping_detail else ""
)
_add_startup_check(
"ENABLED" if (ok_val and ping_ok) else "DISABLED",
display,
provider=prov,
detail=detail,
)
already_checked.add(prov)
if "0x0" not in already_checked:
ok_val = (
bool(file_availability.get("0x0"))
if "0x0" in file_availability else False
)
ping_ok, ping_detail = _ping_url("https://0x0.st")
detail = ping_detail
if not ok_val:
detail = "Unavailable" + (
f" | {ping_detail}" if ping_detail else ""
)
_add_startup_check(
"ENABLED" if (ok_val and ping_ok) else "DISABLED",
"0x0",
provider="0x0",
detail=detail,
)
if _has_provider(config, "matrix"):
try:
from Provider.matrix import Matrix
provider = Matrix(config)
matrix_conf = (
config.get("provider",
{}).get("matrix",
{}) if isinstance(config,
dict) else {}
)
homeserver = str(matrix_conf.get("homeserver") or "").strip()
room_id = str(matrix_conf.get("room_id") or "").strip()
if homeserver and not homeserver.startswith("http"):
homeserver = f"https://{homeserver}"
target = homeserver.rstrip("/")
if room_id:
target = (
target + (" " if target else "")
) + f"room:{room_id}"
_add_startup_check(
"ENABLED" if provider.validate() else "DISABLED",
"Matrix",
provider="matrix",
detail=target or
("Connected" if provider.validate() else "Not configured"),
)
except Exception as exc:
_add_startup_check(
"DISABLED",
"Matrix",
provider="matrix",
detail=str(exc)
)
if _has_store_subtype(config, "debrid"):
try:
from SYS.config import get_debrid_api_key
from API.alldebrid import AllDebridClient
api_key = get_debrid_api_key(config)
if not api_key:
_add_startup_check(
"DISABLED",
"Debrid",
store="debrid",
detail="Not configured"
)
else:
client = AllDebridClient(api_key)
base_url = str(getattr(client,
"base_url",
"") or "").strip()
_add_startup_check(
"ENABLED",
"Debrid",
store="debrid",
detail=base_url or "Connected"
)
except Exception as exc:
_add_startup_check(
"DISABLED",
"Debrid",
store="debrid",
detail=str(exc)
)
try:
from tool.ytdlp import YtDlpTool
cookiefile = YtDlpTool(config).resolve_cookiefile()
if cookiefile is not None:
_add_startup_check("FOUND", "Cookies", detail=str(cookiefile))
else:
_add_startup_check("MISSING", "Cookies", detail="Not found")
except Exception as exc:
_add_startup_check("ERROR", "Cookies", detail=str(exc))
# Tool checks (configured via [tool=...])
if _has_tool(config, "florencevision"):
try:
tool_cfg = config.get("tool")
fv_cfg = tool_cfg.get("florencevision") if isinstance(tool_cfg, dict) else None
enabled = bool(fv_cfg.get("enabled")) if isinstance(fv_cfg, dict) else False
if not enabled:
_add_startup_check(
"DISABLED",
"FlorenceVision",
provider="tool",
detail="Not enabled",
)
else:
from SYS.optional_deps import florencevision_missing_modules
missing = florencevision_missing_modules()
if missing:
_add_startup_check(
"DISABLED",
"FlorenceVision",
provider="tool",
detail="Missing: " + ", ".join(missing),
)
else:
_add_startup_check(
"ENABLED",
"FlorenceVision",
provider="tool",
detail="Ready",
)
except Exception as exc:
_add_startup_check(
"DISABLED",
"FlorenceVision",
provider="tool",
detail=str(exc),
)
if startup_table.rows:
stdout_console().print()
stdout_console().print(startup_table)
except Exception as exc:
if debug_enabled:
debug(f"⚠ Could not check service availability: {exc}")
style = Style.from_dict(
{
"cmdlet": "#ffffff",
"argument": "#3b8eea",
"value": "#9a3209",
"string": "#6d0d93",
"pipe": "#4caf50",
"selection_at": "#f1c40f",
"selection_range": "#4caf50",
"bottom-toolbar": "noreverse",
}
)
class ToolbarState:
text: str = ""
last_update_time: float = 0.0
clear_timer: Optional[threading.Timer] = None
toolbar_state = ToolbarState()
session: Optional[PromptSession] = None
def get_toolbar() -> Optional[str]:
if not toolbar_state.text or not toolbar_state.text.strip():
return None
if time.time() - toolbar_state.last_update_time > 3:
toolbar_state.text = ""
return None
return toolbar_state.text
def update_toolbar(text: str) -> None:
nonlocal session
text = text.strip()
toolbar_state.text = text
toolbar_state.last_update_time = time.time()
if toolbar_state.clear_timer:
toolbar_state.clear_timer.cancel()
toolbar_state.clear_timer = None
if text:
def clear_toolbar() -> None:
toolbar_state.text = ""
toolbar_state.clear_timer = None
if session is not None and hasattr(
session,
"app") and session.app.is_running:
session.app.invalidate()
toolbar_state.clear_timer = threading.Timer(3.0, clear_toolbar)
toolbar_state.clear_timer.daemon = True
toolbar_state.clear_timer.start()
if session is not None and hasattr(session,
"app") and session.app.is_running:
session.app.invalidate()
self._pipeline_executor.set_toolbar_output(update_toolbar)
completer = CmdletCompleter(config_loader=self._config_loader)
session = PromptSession(
completer=cast(Any,
completer),
lexer=MedeiaLexer(),
style=style,
bottom_toolbar=get_toolbar,
refresh_interval=0.5,
)
while True:
try:
user_input = session.prompt(prompt_text).strip()
except (EOFError, KeyboardInterrupt):
print("He who is victorious through deceit is defeated by the truth.")
break
if not user_input:
continue
low = user_input.lower()
if low in {"exit",
"quit",
"q"}:
print("He who is victorious through deceit is defeated by the truth.")
break
if low in {"help",
"?"}:
CmdletHelp.show_cmdlet_list()
continue
pipeline_ctx_ref = None
try:
from SYS import pipeline as ctx
ctx.set_current_command_text(user_input)
pipeline_ctx_ref = ctx
except Exception:
pipeline_ctx_ref = None
try:
from SYS.cli_syntax import validate_pipeline_text
syntax_error = validate_pipeline_text(user_input)
if syntax_error:
print(syntax_error.message, file=sys.stderr)
continue
except Exception:
pass
try:
tokens = shlex.split(user_input)
except ValueError as exc:
print(f"Syntax error: {exc}", file=sys.stderr)
continue
if not tokens:
continue
if len(tokens) == 1 and tokens[0] == "@,,":
try:
from SYS import pipeline as ctx
if ctx.restore_next_result_table():
last_table = (
ctx.get_display_table()
if hasattr(ctx,
"get_display_table") else None
)
if last_table is None:
last_table = ctx.get_last_result_table()
if last_table:
stdout_console().print()
ctx.set_current_stage_table(last_table)
stdout_console().print(last_table)
else:
items = ctx.get_last_result_items()
if items:
ctx.set_current_stage_table(None)
print(
f"Restored {len(items)} items (no table format available)"
)
else:
print("No forward history available", file=sys.stderr)
except Exception as exc:
print(f"Error restoring next table: {exc}", file=sys.stderr)
continue
if len(tokens) == 1 and tokens[0] == "@..":
try:
from SYS import pipeline as ctx
if ctx.restore_previous_result_table():
last_table = (
ctx.get_display_table()
if hasattr(ctx,
"get_display_table") else None
)
if last_table is None:
last_table = ctx.get_last_result_table()
# Auto-refresh search-file tables when navigating back,
# so row payloads (titles/tags) reflect latest store state.
try:
src_cmd = (
getattr(last_table,
"source_command",
None) if last_table else None
)
if (isinstance(src_cmd,
str)
and src_cmd.lower().replace("_",
"-") == "search-file"):
src_args = (
getattr(last_table,
"source_args",
None) if last_table else None
)
base_args = list(src_args
) if isinstance(src_args,
list) else []
cleaned_args = [
str(a) for a in base_args if str(a).strip().lower()
not in {"--refresh", "-refresh"}
]
if hasattr(ctx, "set_current_command_text"):
try:
title_text = (
getattr(last_table,
"title",
None) if last_table else None
)
if isinstance(title_text,
str) and title_text.strip():
ctx.set_current_command_text(
title_text.strip()
)
else:
ctx.set_current_command_text(
" ".join(
["search-file",
*cleaned_args]
).strip()
)
except Exception:
pass
try:
self._cmdlet_executor.execute(
"search-file",
cleaned_args + ["--refresh"]
)
finally:
if hasattr(ctx, "clear_current_command_text"):
try:
ctx.clear_current_command_text()
except Exception:
pass
continue
except Exception as exc:
print(
f"Error refreshing search-file table: {exc}",
file=sys.stderr
)
if last_table:
stdout_console().print()
ctx.set_current_stage_table(last_table)
stdout_console().print(last_table)
else:
items = ctx.get_last_result_items()
if items:
ctx.set_current_stage_table(None)
print(
f"Restored {len(items)} items (no table format available)"
)
else:
print("No previous result table in history")
else:
print("Result table history is empty")
except Exception as exc:
print(f"Error restoring previous result table: {exc}")
continue
try:
if "|" in tokens or (tokens and tokens[0].startswith("@")):
self._pipeline_executor.execute_tokens(tokens)
else:
cmd_name = tokens[0].replace("_", "-").lower()
is_help = any(
arg in {"-help",
"--help",
"-h"} for arg in tokens[1:]
)
if is_help:
CmdletHelp.show_cmdlet_help(cmd_name)
else:
self._cmdlet_executor.execute(cmd_name, tokens[1:])
finally:
if pipeline_ctx_ref:
pipeline_ctx_ref.clear_current_command_text()
_PTK_Lexer = object # type: ignore
# Expose a stable name used by the rest of the module
Lexer = _PTK_Lexer
class MedeiaLexer(Lexer):
def lex_document(self, document: "Document") -> Callable[[int], List[Tuple[str, str]]]: # type: ignore[override]
def get_line(lineno: int) -> List[Tuple[str, str]]:
"""Return token list for a single input line (used by prompt-toolkit)."""
line = document.lines[lineno]
tokens: List[Tuple[str, str]] = []
# Using TOKEN_PATTERN precompiled at module scope.
is_cmdlet = True
def _emit_keyed_value(word: str) -> bool:
"""Emit `key:` prefixes (comma-separated) as argument tokens.
Designed for values like:
clip:3m4s-3m14s,1h22m-1h33m,item:2-3
Avoids special-casing URLs (://) and Windows drive paths (C:\\...).
Returns True if it handled the token.
"""
if not word or ":" not in word:
return False
# Avoid URLs and common scheme patterns.
if "://" in word:
return False
# Avoid Windows drive paths (e.g., C:\\foo or D:/bar)
if DRIVE_RE.match(word):
return False
parts = word.split(",")
handled_any = False
for i, part in enumerate(parts):
if i > 0:
tokens.append(("class:value", ","))
if part == "":
continue
m = KEY_PREFIX_RE.match(part)
if m:
tokens.append(("class:argument", m.group(1)))
if m.group(2):
tokens.append(("class:value", m.group(2)))
handled_any = True
else:
tokens.append(("class:value", part))
handled_any = True
return handled_any
for match in TOKEN_PATTERN.finditer(line):
ws, pipe, quote, word = match.groups()
if ws:
tokens.append(("", ws))
continue
if pipe:
tokens.append(("class:pipe", pipe))
is_cmdlet = True
continue
if quote:
# If the quoted token contains a keyed spec (clip:/item:/hash:),
# highlight the `key:` portion in argument-blue even inside quotes.
if len(quote) >= 2 and quote[0] == quote[-1] and quote[0] in ('"', "'"):
q = quote[0]
inner = quote[1:-1]
start_index = len(tokens)
if _emit_keyed_value(inner):
# _emit_keyed_value already appended tokens for inner; insert opening quote
# before that chunk, then add the closing quote.
tokens.insert(start_index, ("class:string", q))
tokens.append(("class:string", q))
is_cmdlet = False
continue
tokens.append(("class:string", quote))
is_cmdlet = False
continue
if not word:
continue
if word.startswith("@"): # selection tokens
rest = word[1:]
if rest and SELECTION_RANGE_RE.fullmatch(rest):
tokens.append(("class:selection_at", "@"))
tokens.append(("class:selection_range", rest))
is_cmdlet = False
continue
if rest and ":" in rest:
tokens.append(("class:selection_at", "@"))
tokens.append(("class:selection_filter", rest))
is_cmdlet = False
continue
if rest == "":
tokens.append(("class:selection_at", "@"))
is_cmdlet = False
continue
if is_cmdlet:
tokens.append(("class:cmdlet", word))
is_cmdlet = False
elif word.startswith("-"):
tokens.append(("class:argument", word))
else:
if not _emit_keyed_value(word):
tokens.append(("class:value", word))
return tokens
return get_line
if __name__ == "__main__":
CLI().run()