from __future__ import annotations """Medeia-Macina CLI. This module intentionally uses a class-based architecture: - no legacy procedural entrypoints - no compatibility shims - all REPL/pipeline/cmdlet execution state lives on objects """ # When running the CLI directly (not via the 'mm' launcher), honor the # repository config `debug` flag by enabling `MM_DEBUG` so import-time # diagnostics and bootstrap debug output are visible without setting the # environment variable manually. import os from pathlib import Path if not os.environ.get("MM_DEBUG"): try: # Check database first db_path = Path(__file__).resolve().parent / "medios.db" if db_path.exists(): import sqlite3 with sqlite3.connect(str(db_path), timeout=30.0) as conn: cur = conn.cursor() # Check for global debug key cur.execute("SELECT value FROM config WHERE key = 'debug' AND category = 'global'") row = cur.fetchone() if row: val = str(row[0]).strip().lower() if val in ("1", "true", "yes", "on"): os.environ["MM_DEBUG"] = "1" except Exception: pass import json import shlex import sys import threading import time import uuid from copy import deepcopy from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, cast, Callable import typer from prompt_toolkit import PromptSession from prompt_toolkit.completion import Completer, Completion from prompt_toolkit.document import Document from prompt_toolkit.styles import Style from rich.console import Console from rich.layout import Layout from rich.panel import Panel from rich.markdown import Markdown from rich.bar import Bar from rich.table import Table as RichTable from SYS.rich_display import ( stderr_console, stdout_console, ) def _install_rich_traceback(*, show_locals: bool = False) -> None: """Install Rich traceback handler as the default excepthook. This keeps uncaught exceptions readable in the terminal. """ try: from rich.traceback import install as rich_traceback_install rich_traceback_install(show_locals=bool(show_locals)) except Exception: # Fall back to the standard Python traceback if Rich isn't available. return # Default to Rich tracebacks for the whole process. _install_rich_traceback(show_locals=False) from SYS.logger import debug, set_debug from SYS.worker_manager import WorkerManager from SYS.cmdlet_catalog import ( get_cmdlet_arg_choices, get_cmdlet_arg_flags, get_cmdlet_metadata, import_cmd_module, list_cmdlet_metadata, list_cmdlet_names, ) from SYS.config import load_config from SYS.result_table import Table from SYS.worker import WorkerManagerRegistry, WorkerStages, WorkerOutputMirror, WorkerStageSession from SYS.pipeline import PipelineExecutor from ProviderCore.registry import provider_inline_query_choices # Selection parsing and REPL lexer moved to SYS.cli_parsing from SYS.cli_parsing import Lexer, DRIVE_RE, KEY_PREFIX_RE, TOKEN_PATTERN, SELECTION_RANGE_RE, SelectionSyntax, SelectionFilterSyntax, MedeiaLexer # SelectionFilterSyntax moved to SYS.cli_parsing (imported above) class _OldWorkerStages: """Factory methods for stage/pipeline worker sessions.""" @staticmethod def _start_worker_session( worker_manager: Optional[WorkerManager], *, worker_type: str, title: str, description: str, pipe_text: str, config: Optional[Dict[str, Any]], completion_label: str, error_label: str, skip_logging_for: Optional[Set[str]] = None, session_worker_ids: Optional[Set[str]] = None, ) -> Optional[WorkerStageSession]: if worker_manager is None: return None if skip_logging_for and worker_type in skip_logging_for: return None safe_type = worker_type or "cmd" worker_id = f"cli_{safe_type[:8]}_{uuid.uuid4().hex[:6]}" try: tracked = worker_manager.track_worker( worker_id, worker_type=worker_type, title=title, description=description or "(no args)", pipe=pipe_text, ) if not tracked: return None except Exception as exc: print(f"[worker] Failed to track {worker_type}: {exc}", file=sys.stderr) return None if session_worker_ids is not None: session_worker_ids.add(worker_id) logging_enabled = False try: handler = worker_manager.enable_logging_for_worker(worker_id) logging_enabled = handler is not None except Exception: logging_enabled = False orig_stdout = sys.stdout orig_stderr = sys.stderr stdout_proxy = WorkerOutputMirror( orig_stdout, worker_manager, worker_id, "stdout" ) stderr_proxy = WorkerOutputMirror( orig_stderr, worker_manager, worker_id, "stderr" ) sys.stdout = stdout_proxy sys.stderr = stderr_proxy if isinstance(config, dict): config["_current_worker_id"] = worker_id try: worker_manager.log_step(worker_id, f"Started {worker_type}") except Exception: pass return WorkerStageSession( manager=worker_manager, worker_id=worker_id, orig_stdout=orig_stdout, orig_stderr=orig_stderr, stdout_proxy=stdout_proxy, stderr_proxy=stderr_proxy, config=config, logging_enabled=logging_enabled, completion_label=completion_label, error_label=error_label, ) @classmethod def begin_stage( cls, worker_manager: Optional[WorkerManager], *, cmd_name: str, stage_tokens: Sequence[str], config: Optional[Dict[str, Any]], command_text: str, ) -> Optional[WorkerStageSession]: description = " ".join(stage_tokens[1:] ) if len(stage_tokens) > 1 else "(no args)" session_worker_ids = None if isinstance(config, dict): session_worker_ids = config.get("_session_worker_ids") return cls._start_worker_session( worker_manager, worker_type=cmd_name, title=f"{cmd_name} stage", description=description, pipe_text=command_text, config=config, completion_label="Stage completed", error_label="Stage error", skip_logging_for={".worker", "worker", "workers"}, session_worker_ids=session_worker_ids, ) @classmethod def begin_pipeline( cls, worker_manager: Optional[WorkerManager], *, pipeline_text: str, config: Optional[Dict[str, Any]], ) -> Optional[WorkerStageSession]: session_worker_ids: Set[str] = set() if isinstance(config, dict): config["_session_worker_ids"] = session_worker_ids return cls._start_worker_session( worker_manager, worker_type="pipeline", title="Pipeline run", description=pipeline_text, pipe_text=pipeline_text, config=config, completion_label="Pipeline completed", error_label="Pipeline error", session_worker_ids=session_worker_ids, ) class CmdletIntrospection: @staticmethod def cmdlet_names(force: bool = False) -> List[str]: try: return list_cmdlet_names(force=force) or [] except Exception: return [] @staticmethod def cmdlet_args(cmd_name: str, config: Optional[Dict[str, Any]] = None) -> List[str]: try: return get_cmdlet_arg_flags(cmd_name, config=config) or [] except Exception: return [] @staticmethod def store_choices(config: Dict[str, Any], force: bool = False) -> List[str]: try: # Use the cached startup check from SharedArgs from cmdlet._shared import SharedArgs return SharedArgs.get_store_choices(config, force=force) except Exception: return [] @classmethod def arg_choices(cls, *, cmd_name: str, arg_name: str, config: Dict[str, Any], force: bool = False) -> List[str]: try: normalized_arg = (arg_name or "").lstrip("-").strip().lower() if normalized_arg in ("storage", "store"): # Use cached/lightweight names for completions to avoid instantiating backends # (instantiating backends may perform heavy initialization). backends = cls.store_choices(config, force=False) if backends: return backends if normalized_arg == "provider": canonical_cmd = (cmd_name or "").replace("_", "-").lower() try: from ProviderCore.registry import list_search_providers, list_file_providers except Exception: list_search_providers = None # type: ignore list_file_providers = None # type: ignore provider_choices: List[str] = [] if canonical_cmd in {"add-file"} and list_file_providers is not None: providers = list_file_providers(config) or {} available = [ name for name, is_ready in providers.items() if is_ready ] return sorted(available) if available else sorted(providers.keys()) if list_search_providers is not None: providers = list_search_providers(config) or {} available = [ name for name, is_ready in providers.items() if is_ready ] provider_choices = sorted(available) if available else sorted( providers.keys() ) if provider_choices: return provider_choices if normalized_arg == "scrape": try: from Provider.metadata_provider import list_metadata_providers meta_providers = list_metadata_providers(config) or {} if meta_providers: return sorted(meta_providers.keys()) except Exception: pass return get_cmdlet_arg_choices(cmd_name, arg_name) or [] except Exception: return [] class CmdletCompleter(Completer): """Prompt-toolkit completer for the Medeia cmdlet REPL.""" def __init__(self, *, config_loader: "ConfigLoader") -> None: self._config_loader = config_loader self.cmdlet_names = CmdletIntrospection.cmdlet_names() @staticmethod def _used_arg_logicals( cmd_name: str, stage_tokens: List[str], config: Dict[str, Any] ) -> Set[str]: """Return logical argument names already used in this cmdlet stage. Example: if the user has typed `download-file -url ...`, then `url` is considered used and should not be suggested again (even as `--url`). """ arg_flags = CmdletIntrospection.cmdlet_args(cmd_name, config) allowed = {a.lstrip("-").strip().lower() for a in arg_flags if a} if not allowed: return set() used: Set[str] = set() for tok in stage_tokens[1:]: if not tok or not tok.startswith("-"): continue if tok in {"-", "--"}: continue # Handle common `-arg=value` form. raw = tok.split("=", 1)[0] logical = raw.lstrip("-").strip().lower() if logical and logical in allowed: used.add(logical) return used @staticmethod def _flag_value(tokens: Sequence[str], *flags: str) -> Optional[str]: want = {str(f).strip().lower() for f in flags if str(f).strip()} if not want: return None for idx, tok in enumerate(tokens): low = str(tok or "").strip().lower() if "=" in low: head, _ = low.split("=", 1) if head in want: return tok.split("=", 1)[1] if low in want and idx + 1 < len(tokens): return tokens[idx + 1] return None def get_completions( self, document: Document, complete_event ): # type: ignore[override] # Refresh cmdlet names from introspection to pick up dynamic updates self.cmdlet_names = CmdletIntrospection.cmdlet_names(force=True) text = document.text_before_cursor tokens = text.split() ends_with_space = bool(text) and text[-1].isspace() last_pipe = -1 for idx, tok in enumerate(tokens): if tok == "|": last_pipe = idx stage_tokens = tokens[last_pipe + 1:] if last_pipe >= 0 else tokens if not stage_tokens: for cmd in self.cmdlet_names: yield Completion(cmd, start_position=0) return if len(stage_tokens) == 1: current = stage_tokens[0].lower() if ends_with_space: cmd_name = current.replace("_", "-") config = self._config_loader.load() if cmd_name == "help": for cmd in self.cmdlet_names: yield Completion(cmd, start_position=0) return if cmd_name not in self.cmdlet_names: return arg_names = CmdletIntrospection.cmdlet_args(cmd_name, config) seen_logicals: Set[str] = set() for arg in arg_names: arg_low = arg.lower() if arg_low.startswith("--"): continue logical = arg.lstrip("-").lower() if logical in seen_logicals: continue yield Completion(arg, start_position=0) seen_logicals.add(logical) yield Completion("-help", start_position=0) return for cmd in self.cmdlet_names: if cmd.startswith(current): yield Completion(cmd, start_position=-len(current)) for keyword in ("help", "exit", "quit"): if keyword.startswith(current): yield Completion(keyword, start_position=-len(current)) return cmd_name = stage_tokens[0].replace("_", "-").lower() if ends_with_space: current_token = "" prev_token = stage_tokens[-1].lower() else: current_token = stage_tokens[-1].lower() prev_token = stage_tokens[-2].lower() if len(stage_tokens) > 1 else "" config = self._config_loader.load() provider_name = None if cmd_name == "search-file": provider_name = self._flag_value(stage_tokens, "-provider", "--provider") if ( cmd_name == "search-file" and provider_name and not ends_with_space and ":" in current_token and not current_token.startswith("-") ): # Allow quoted tokens like "system:g quote_prefix = current_token[0] if current_token[:1] in {"'", '"'} else "" inline_token = current_token[1:] if quote_prefix else current_token if inline_token.endswith(quote_prefix) and len(inline_token) > 1: inline_token = inline_token[:-1] # Allow comma-separated inline specs; operate on the last segment only. if "," in inline_token: inline_token = inline_token.split(",")[-1].lstrip() if ":" not in inline_token: return field, partial = inline_token.split(":", 1) field = field.strip().lower() partial_lower = partial.strip().lower() inline_choices = provider_inline_query_choices(provider_name, field, config) if inline_choices: filtered = ( [c for c in inline_choices if partial_lower in str(c).lower()] if partial_lower else list(inline_choices) ) for choice in (filtered or inline_choices): # Replace only the partial after the colon; keep the field prefix and quotes as typed. start_pos = -len(partial) suggestion = str(choice) yield Completion(suggestion, start_position=start_pos) return choices = CmdletIntrospection.arg_choices( cmd_name=cmd_name, arg_name=prev_token, config=config, force=True ) if choices: choice_list = choices normalized_prev = prev_token.lstrip("-").strip().lower() if normalized_prev == "provider" and current_token: current_lower = current_token.lower() filtered = [c for c in choices if current_lower in c.lower()] if filtered: choice_list = filtered for choice in choice_list: yield Completion(choice, start_position=-len(current_token)) # Example: if the user has typed `download-file -url ...`, then `url` # is considered used and should not be suggested again (even as `--url`). return arg_names = CmdletIntrospection.cmdlet_args(cmd_name, config) used_logicals = self._used_arg_logicals(cmd_name, stage_tokens, config) logical_seen: Set[str] = set() for arg in arg_names: arg_low = arg.lower() prefer_single_dash = current_token in {"", "-"} if prefer_single_dash and arg_low.startswith("--"): continue logical = arg.lstrip("-").lower() if logical in used_logicals: continue if prefer_single_dash and logical in logical_seen: continue if arg_low.startswith(current_token): yield Completion(arg, start_position=-len(current_token)) if prefer_single_dash: logical_seen.add(logical) if cmd_name in self.cmdlet_names: if current_token.startswith("--"): if "--help".startswith(current_token): yield Completion("--help", start_position=-len(current_token)) else: if "-help".startswith(current_token): yield Completion("-help", start_position=-len(current_token)) class ConfigLoader: def __init__(self, *, root: Path) -> None: self._root = root def load(self) -> Dict[str, Any]: try: return deepcopy(load_config()) except Exception: return {} class CmdletHelp: @staticmethod def show_cmdlet_list() -> None: try: metadata = list_cmdlet_metadata() or {} from rich.box import SIMPLE from rich.panel import Panel from rich.table import Table as RichTable table = RichTable( show_header=True, header_style="bold", box=SIMPLE, expand=True ) table.add_column("Cmdlet", no_wrap=True) table.add_column("Aliases") table.add_column("Args") table.add_column("Summary") for cmd_name in sorted(metadata.keys()): info = metadata[cmd_name] aliases = info.get("aliases", []) args = info.get("args", []) summary = info.get("summary") or "" alias_str = ", ".join( [str(a) for a in (aliases or []) if str(a).strip()] ) arg_names = [ a.get("name") for a in (args or []) if isinstance(a, dict) and a.get("name") ] args_str = ", ".join([str(a) for a in arg_names if str(a).strip()]) table.add_row(str(cmd_name), alias_str, args_str, str(summary)) stdout_console().print(Panel(table, title="Cmdlets", expand=False)) except Exception as exc: from rich.panel import Panel from rich.text import Text stderr_console().print( Panel(Text(f"Error: {exc}"), title="Error", expand=False) ) @staticmethod def show_cmdlet_help(cmd_name: str) -> None: try: meta = get_cmdlet_metadata(cmd_name) if meta: CmdletHelp._print_metadata(cmd_name, meta) return print(f"Unknown command: {cmd_name}\n") except Exception as exc: print(f"Error: {exc}\n") @staticmethod def _print_metadata(cmd_name: str, data: Any) -> None: d = data.to_dict() if hasattr(data, "to_dict") else data if not isinstance(d, dict): from rich.panel import Panel from rich.text import Text stderr_console().print( Panel( Text(f"Invalid metadata for {cmd_name}"), title="Error", expand=False ) ) return name = d.get("name", cmd_name) summary = d.get("summary", "") usage = d.get("usage", "") description = d.get("description", "") args = d.get("args", []) details = d.get("details", []) from rich.box import SIMPLE from rich.console import Group from rich.panel import Panel from rich.table import Table as RichTable from rich.text import Text header = Text.assemble((str(name), "bold")) synopsis = Text(str(usage or name)) stdout_console().print( Panel(Group(header, synopsis), title="Help", expand=False) ) if summary or description: desc_bits: List[Text] = [] if summary: desc_bits.append(Text(str(summary))) if description: desc_bits.append(Text(str(description))) stdout_console().print( Panel(Group(*desc_bits), title="Description", expand=False) ) if args and isinstance(args, list): param_table = RichTable( show_header=True, header_style="bold", box=SIMPLE, expand=True ) param_table.add_column("Arg", no_wrap=True) param_table.add_column("Type", no_wrap=True) param_table.add_column("Required", no_wrap=True) param_table.add_column("Description") for arg in args: if isinstance(arg, dict): name_str = arg.get("name", "?") typ = arg.get("type", "string") required = bool(arg.get("required", False)) desc = arg.get("description", "") else: name_str = getattr(arg, "name", "?") typ = getattr(arg, "type", "string") required = bool(getattr(arg, "required", False)) desc = getattr(arg, "description", "") param_table.add_row( f"-{name_str}", str(typ), "yes" if required else "no", str(desc or "") ) stdout_console().print(Panel(param_table, title="Parameters", expand=False)) if details: stdout_console().print( Panel( Group(*[Text(str(x)) for x in details]), title="Remarks", expand=False ) ) class CmdletExecutor: def __init__(self, *, config_loader: ConfigLoader) -> None: self._config_loader = config_loader @staticmethod def _get_table_title_for_command( cmd_name: str, emitted_items: Optional[List[Any]] = None, cmd_args: Optional[List[str]] = None, ) -> str: title_map = { "search-file": "Results", "search_file": "Results", "download-data": "Downloads", "download_data": "Downloads", "download-file": "Downloads", "download_file": "Downloads", "get-tag": "Tags", "get_tag": "Tags", "get-file": "Results", "get_file": "Results", "add-tags": "Results", "add_tags": "Results", "delete-tag": "Results", "delete_tag": "Results", "add-url": "Results", "add_url": "Results", "get-url": "url", "get_url": "url", "delete-url": "Results", "delete_url": "Results", "get-note": "Notes", "get_note": "Notes", "add-note": "Results", "add_note": "Results", "delete-note": "Results", "delete_note": "Results", "get-relationship": "Relationships", "get_relationship": "Relationships", "add-relationship": "Results", "add_relationship": "Results", "add-file": "Results", "add_file": "Results", "delete-file": "Results", "delete_file": "Results", "get-metadata": None, "get_metadata": None, } mapped = title_map.get(cmd_name, "Results") if mapped is not None: return mapped if emitted_items: first = emitted_items[0] try: if isinstance(first, dict) and first.get("title"): return str(first.get("title")) if hasattr(first, "title") and getattr(first, "title"): return str(getattr(first, "title")) except Exception: pass return "Results" def execute(self, cmd_name: str, args: List[str]) -> None: from SYS import pipeline as ctx from cmdlet import REGISTRY # REPL guard: stage-local selection tables should not leak across independent # commands. @ selection can always re-seed from the last result table. try: if hasattr(ctx, "set_current_stage_table"): ctx.set_current_stage_table(None) except Exception: pass cmd_fn = REGISTRY.get(cmd_name) if not cmd_fn: # Lazy-import module and register its CMDLET. try: mod = import_cmd_module(cmd_name) data = getattr(mod, "CMDLET", None) if mod else None if data and hasattr(data, "exec") and callable(getattr(data, "exec")): run_fn = getattr(data, "exec") REGISTRY[cmd_name] = run_fn cmd_fn = run_fn except Exception: cmd_fn = None if not cmd_fn: print(f"Unknown command: {cmd_name}\n") return config = self._config_loader.load() # ------------------------------------------------------------------ # Single-command Live pipeline progress (match REPL behavior) # ------------------------------------------------------------------ progress_ui = None pipe_idx: Optional[int] = None def _maybe_start_single_live_progress( *, cmd_name_norm: str, filtered_args: List[str], piped_input: Any, config: Any, ) -> None: nonlocal progress_ui, pipe_idx # Keep behavior consistent with pipeline runner exclusions. # Some commands render their own Rich UI (tables/panels) and don't # play nicely with Live cursor control. if cmd_name_norm in { "get-relationship", "get-rel", ".pipe", ".mpv", ".matrix", ".telegram", "telegram", "delete-file", "del-file", }: return # add-file directory selector mode: show only the selection table, no Live progress. if cmd_name_norm in {"add-file", "add_file"}: try: from pathlib import Path as _Path toks = list(filtered_args or []) i = 0 while i < len(toks): t = str(toks[i]) low = t.lower().strip() if low in {"-path", "--path", "-p"} and i + 1 < len(toks): nxt = str(toks[i + 1]) if nxt and ("," not in nxt): p = _Path(nxt) if p.exists() and p.is_dir(): return i += 2 continue i += 1 except Exception: pass try: quiet_mode = ( bool(config.get("_quiet_background_output")) if isinstance(config, dict) else False ) except Exception: quiet_mode = False if quiet_mode: return try: import sys as _sys if not bool(getattr(_sys.stderr, "isatty", lambda: False)()): return except Exception: return try: from SYS.models import PipelineLiveProgress progress_ui = PipelineLiveProgress([cmd_name_norm], enabled=True) progress_ui.start() try: if hasattr(ctx, "set_live_progress"): ctx.set_live_progress(progress_ui) except Exception: pass pipe_idx = 0 # Estimate per-item task count for the single pipe. total_items = 1 preview_items: Optional[List[Any]] = None try: if isinstance(piped_input, list): total_items = max(1, int(len(piped_input))) preview_items = list(piped_input) elif piped_input is not None: total_items = 1 preview_items = [piped_input] else: preview: List[Any] = [] toks = list(filtered_args or []) i = 0 while i < len(toks): t = str(toks[i]) low = t.lower().strip() if (cmd_name_norm in {"add-file", "add_file"} and low in {"-path", "--path", "-p"} and i + 1 < len(toks)): nxt = str(toks[i + 1]) if nxt: if "," in nxt: parts = [ p.strip().strip("\"'") for p in nxt.split(",") ] parts = [p for p in parts if p] if parts: preview.extend(parts) i += 2 continue else: preview.append(nxt) i += 2 continue if low in {"-url", "--url"} and i + 1 < len(toks): nxt = str(toks[i + 1]) if nxt and not nxt.startswith("-"): preview.append(nxt) i += 2 continue if (not t.startswith("-")) and ("://" in low or low.startswith( ("magnet:", "torrent:"))): preview.append(t) i += 1 preview_items = preview if preview else None total_items = max(1, int(len(preview)) if preview else 1) except Exception: total_items = 1 preview_items = None try: progress_ui.begin_pipe( 0, total_items=int(total_items), items_preview=preview_items ) except Exception: pass except Exception: progress_ui = None pipe_idx = None filtered_args: List[str] = [] selected_indices: List[int] = [] select_all = False selection_filters: List[List[Tuple[str, str]]] = [] value_flags: Set[str] = set() try: meta = get_cmdlet_metadata(cmd_name) raw = meta.get("raw") if isinstance(meta, dict) else None arg_specs = getattr(raw, "arg", None) if raw is not None else None if isinstance(arg_specs, list): for spec in arg_specs: spec_type = str(getattr(spec, "type", "string") or "string").strip().lower() if spec_type == "flag": continue spec_name = str(getattr(spec, "name", "") or "") canonical = spec_name.lstrip("-").strip() if not canonical: continue value_flags.add(f"-{canonical}".lower()) value_flags.add(f"--{canonical}".lower()) alias = str(getattr(spec, "alias", "") or "").strip() if alias: value_flags.add(f"-{alias}".lower()) except Exception: value_flags = set() for i, arg in enumerate(args): if isinstance(arg, str) and arg.startswith("@"): # selection candidate prev = str(args[i - 1]).lower() if i > 0 else "" if prev in value_flags: filtered_args.append(arg) continue # Universal selection filter: @"COL:expr" (quotes may be stripped by tokenization) filter_spec = SelectionFilterSyntax.parse(arg) if filter_spec is not None: selection_filters.append(filter_spec) continue if arg.strip() == "@*": select_all = True continue selection = SelectionSyntax.parse(arg) if selection is not None: zero_based = sorted(idx - 1 for idx in selection) for idx in zero_based: if idx not in selected_indices: selected_indices.append(idx) continue filtered_args.append(arg) continue filtered_args.append(str(arg)) # IMPORTANT: Do not implicitly feed the previous command's results into # a new command unless the user explicitly selected items via @ syntax. # Piping should require `|` (or an explicit @ selection). piped_items = ctx.get_last_result_items() result: Any = None effective_selected_indices: List[int] = [] if piped_items and (select_all or selected_indices or selection_filters): candidate_idxs = list(range(len(piped_items))) for spec in selection_filters: candidate_idxs = [ i for i in candidate_idxs if SelectionFilterSyntax.matches(piped_items[i], spec) ] if select_all: effective_selected_indices = list(candidate_idxs) elif selected_indices: effective_selected_indices = [ candidate_idxs[i] for i in selected_indices if 0 <= i < len(candidate_idxs) ] else: effective_selected_indices = list(candidate_idxs) result = [piped_items[i] for i in effective_selected_indices] worker_manager = WorkerManagerRegistry.ensure(config) stage_session = WorkerStages.begin_stage( worker_manager, cmd_name=cmd_name, stage_tokens=[cmd_name, *filtered_args], config=config, command_text=" ".join([cmd_name, *filtered_args]).strip() or cmd_name, ) stage_worker_id = stage_session.worker_id if stage_session else None # Start live progress after we know the effective cmd + args + piped input. cmd_norm = str(cmd_name or "").replace("_", "-").strip().lower() _maybe_start_single_live_progress( cmd_name_norm=cmd_norm or str(cmd_name or "").strip().lower(), filtered_args=filtered_args, piped_input=result, config=config, ) on_emit = None if progress_ui is not None and pipe_idx is not None: _ui = progress_ui def _on_emit(obj: Any, _progress=_ui) -> None: try: _progress.on_emit(0, obj) except Exception: pass on_emit = _on_emit pipeline_ctx = ctx.PipelineStageContext( stage_index=0, total_stages=1, pipe_index=pipe_idx if pipe_idx is not None else 0, worker_id=stage_worker_id, on_emit=on_emit, ) ctx.set_stage_context(pipeline_ctx) stage_status = "completed" stage_error = "" ctx.set_last_selection(effective_selected_indices) try: try: if hasattr(ctx, "set_current_cmdlet_name"): ctx.set_current_cmdlet_name(cmd_name) except Exception: pass try: if hasattr(ctx, "set_current_stage_text"): raw_stage = "" try: raw_stage = ( ctx.get_current_command_text("") if hasattr(ctx, "get_current_command_text") else "" ) except Exception: raw_stage = "" if raw_stage: ctx.set_current_stage_text(raw_stage) else: ctx.set_current_stage_text( " ".join([cmd_name, *filtered_args]).strip() or cmd_name ) except Exception: pass ret_code = cmd_fn(result, filtered_args, config) if getattr(pipeline_ctx, "emits", None): emits = list(pipeline_ctx.emits) # Shared `-path` behavior: if the cmdlet emitted temp/PATH file artifacts, # move them to the user-specified destination and update emitted paths. try: from cmdlet import _shared as sh emits = sh.apply_output_path_from_pipeobjects( cmd_name=cmd_name, args=filtered_args, emits=emits ) try: pipeline_ctx.emits = list(emits) except Exception: pass except Exception: pass # Detect format-selection emits and skip printing (user selects with @N). is_format_selection = False if emits: first_emit = emits[0] if isinstance(first_emit, dict) and "format_id" in first_emit: is_format_selection = True if is_format_selection: ctx.set_last_result_items_only(emits) else: table_title = self._get_table_title_for_command( cmd_name, emits, filtered_args ) selectable_commands = { "search-file", "download-data", "download-file", "search_file", "download_data", "download_file", ".config", ".worker", } display_only_commands = { "get-url", "get_url", "get-note", "get_note", "get-relationship", "get_relationship", "get-file", "get_file", "get-metadata", "get_metadata", } self_managing_commands = { "get-tag", "get_tag", "tags", "get-metadata", "get_metadata", "get-url", "get_url", "search-file", "search_file", "add-file", "add_file", } if cmd_name in self_managing_commands: table = ( ctx.get_display_table() if hasattr(ctx, "get_display_table") else None ) if table is None: table = ctx.get_last_result_table() if table is None: table = Table(table_title) for emitted in emits: table.add_result(emitted) else: table = Table(table_title) for emitted in emits: table.add_result(emitted) if cmd_name in selectable_commands: table.set_source_command(cmd_name, filtered_args) ctx.set_last_result_table(table, emits) ctx.set_current_stage_table(None) elif cmd_name in display_only_commands: ctx.set_last_result_items_only(emits) else: ctx.set_last_result_items_only(emits) # Stop Live progress before printing tables. if progress_ui is not None: try: if pipe_idx is not None: progress_ui.finish_pipe( int(pipe_idx), force_complete=(stage_status == "completed") ) except Exception: pass try: progress_ui.complete_all_pipes() except Exception: pass try: progress_ui.stop() except Exception: pass try: if hasattr(ctx, "set_live_progress"): ctx.set_live_progress(None) except Exception: pass progress_ui = None pipe_idx = None if not getattr(table, "_rendered_by_cmdlet", False): stdout_console().print() stdout_console().print(table) # If the cmdlet produced a current-stage table without emits (e.g. format selection), # render it here for parity with REPL pipeline runner. if (not getattr(pipeline_ctx, "emits", None)) and hasattr(ctx, "get_current_stage_table"): try: stage_table = ctx.get_current_stage_table() except Exception: stage_table = None if stage_table is not None: try: already_rendered = bool( getattr(stage_table, "_rendered_by_cmdlet", False) ) except Exception: already_rendered = False if already_rendered: return if progress_ui is not None: try: if pipe_idx is not None: progress_ui.finish_pipe( int(pipe_idx), force_complete=(stage_status == "completed") ) except Exception: pass try: progress_ui.complete_all_pipes() except Exception: pass try: progress_ui.stop() except Exception: pass try: if hasattr(ctx, "set_live_progress"): ctx.set_live_progress(None) except Exception: pass progress_ui = None pipe_idx = None stdout_console().print() stdout_console().print(stage_table) if ret_code != 0: stage_status = "failed" stage_error = f"exit code {ret_code}" # No print here - we want to keep output clean and avoid redundant "exit code" notices. except Exception as exc: stage_status = "failed" stage_error = f"{type(exc).__name__}: {exc}" print(f"[error] {type(exc).__name__}: {exc}\n") finally: if progress_ui is not None: try: if pipe_idx is not None: progress_ui.finish_pipe( int(pipe_idx), force_complete=(stage_status == "completed") ) except Exception: pass try: progress_ui.complete_all_pipes() except Exception: pass try: progress_ui.stop() except Exception: pass try: if hasattr(ctx, "set_live_progress"): ctx.set_live_progress(None) except Exception: pass # Do not keep stage tables around after a single command; it can cause # later @ selections to bind to stale tables (e.g. old add-file scans). try: if hasattr(ctx, "set_current_stage_table"): ctx.set_current_stage_table(None) except Exception: pass try: if hasattr(ctx, "clear_current_cmdlet_name"): ctx.clear_current_cmdlet_name() except Exception: pass try: if hasattr(ctx, "clear_current_stage_text"): ctx.clear_current_stage_text() except Exception: pass ctx.clear_last_selection() if stage_session: stage_session.close(status=stage_status, error_msg=stage_error) console = Console() class CLI: """Main CLI application object.""" ROOT = Path(__file__).resolve().parent def __init__(self) -> None: self._config_loader = ConfigLoader(root=self.ROOT) # Optional dependency auto-install for configured tools (best-effort). try: from SYS.optional_deps import maybe_auto_install_configured_tools maybe_auto_install_configured_tools(self._config_loader.load()) except Exception: pass # Initialize the store choices cache at startup (filters disabled stores) try: from cmdlet._shared import SharedArgs config = self._config_loader.load() SharedArgs._refresh_store_choices_cache(config) except Exception: pass self._cmdlet_executor = CmdletExecutor(config_loader=self._config_loader) self._pipeline_executor = PipelineExecutor(config_loader=self._config_loader) @staticmethod def parse_selection_syntax(token: str) -> Optional[Set[int]]: return SelectionSyntax.parse(token) @classmethod def get_store_choices(cls) -> List[str]: loader = ConfigLoader(root=cls.ROOT) return CmdletIntrospection.store_choices(loader.load()) def build_app(self) -> typer.Typer: app = typer.Typer(help="Medeia-Macina CLI") def _validate_pipeline_option( ctx: typer.Context, param: typer.CallbackParam, value: str ): try: from SYS.cli_syntax import validate_pipeline_text syntax_error = validate_pipeline_text(value) if syntax_error: raise typer.BadParameter(syntax_error.message) except typer.BadParameter: raise except Exception: pass return value @app.command("pipeline") def pipeline( command: str = typer.Option( ..., "--pipeline", "-p", help="Pipeline command string to execute", callback=_validate_pipeline_option, ), seeds_json: Optional[str] = typer.Option( None, "--seeds-json", "-s", help="JSON string of seed items" ), ) -> None: from SYS import pipeline as ctx config = self._config_loader.load() debug_enabled = bool(config.get("debug", False)) set_debug(debug_enabled) if seeds_json: try: seeds = json.loads(seeds_json) if not isinstance(seeds, list): seeds = [seeds] ctx.set_last_result_items_only(seeds) except Exception as exc: print(f"Error parsing seeds JSON: {exc}") return try: from SYS.cli_syntax import validate_pipeline_text syntax_error = validate_pipeline_text(command) if syntax_error: print(syntax_error.message, file=sys.stderr) return except Exception: pass try: tokens = shlex.split(command) except ValueError as exc: print(f"Syntax error: {exc}", file=sys.stderr) return if not tokens: return self._pipeline_executor.execute_tokens(tokens) @app.command("repl") def repl() -> None: self.run_repl() @app.callback(invoke_without_command=True) def main_callback(ctx: typer.Context) -> None: if ctx.invoked_subcommand is None: self.run_repl() _ = (pipeline, repl, main_callback) # Dynamically register all cmdlets as top-level Typer commands so users can # invoke `mm [args]` directly from the shell. We use Click/Typer # context settings to allow arbitrary flags and options to pass through to # the cmdlet system without Typer trying to parse them. try: names = list_cmdlet_names() skip = {"pipeline", "repl"} for nm in names: if not nm or nm in skip: continue # create a scoped handler to capture the command name def _make_handler(cmd_name: str): @app.command( cmd_name, context_settings={ "ignore_unknown_options": True, "allow_extra_args": True, }, ) def _handler(ctx: typer.Context): try: args = list(ctx.args or []) except Exception: args = [] self._cmdlet_executor.execute(cmd_name, args) return _handler _make_handler(nm) except Exception: # Don't let failure to register dynamic commands break startup pass return app def run(self) -> None: # Ensure Rich tracebacks are active even when invoking subcommands. try: config = self._config_loader.load() debug_enabled = bool(config.get("debug", False) ) if isinstance(config, dict) else False except Exception: debug_enabled = False set_debug(debug_enabled) _install_rich_traceback(show_locals=debug_enabled) self.build_app()() def run_repl(self) -> None: # console = Console(width=100) # Valid Rich rainbow colors RAINBOW = [ "red", "dark_orange", "yellow", "green", "blue", "purple", "magenta", ] def rainbow_pillar(colors, height=21, bar_width=36): table = RichTable.grid(padding=0) table.add_column(no_wrap=True) for i in range(height): color = colors[i % len(colors)] table.add_row(Bar(size=1, begin=0, end=1, width=bar_width, color=color)) return table # Build root layout root = Layout(name="root") root.split_row( Layout(name="left", ratio=2), Layout(name="center", ratio=8), Layout(name="right", ratio=2), ) # Left pillar → forward rainbow root["left"].update( Panel(rainbow_pillar(RAINBOW, height=21, bar_width=36), title="DELTA") ) # Right pillar → reverse rainbow root["right"].update( Panel( rainbow_pillar(list(reversed(RAINBOW)), height=21, bar_width=36), title="LAMBDA" ) ) # Center content center_md = Markdown( """ # ****************** Medios Macina ****************** take what you want | keep what you like | share what you love _____________________________________________________________ _____________________________________________________________ _____________________________________________________________ For suddenly you may be let loose from the net, and thrown out to sea. Waving around clutching at gnats, unable to lift the heavy anchor. Lost and without a map, forgotten things from the past by distracting wind storms. _____________________________________________________________ _____________________________________________________________ _____________________________________________________________ Light shines a straight path to the golden shores. Come to love it when others take what you share, as there is no greater joy """ ) root["center"].update(Panel(center_md, title="KAPPA", height=21)) console.print(root) prompt_text = "<🜂🜄|🜁🜃>" startup_table = Table( "*********************************************" ) startup_table._interactive(True)._perseverance(True) startup_table.set_value_case("upper") def _upper(value: Any) -> str: text = "" if value is None else str(value) return text.upper() def _add_startup_check( status: str, name: str, *, provider: str = "", store: str = "", files: int | str | None = None, detail: str = "", ) -> None: row = startup_table.add_row() row.add_column("STATUS", _upper(status)) row.add_column("NAME", _upper(name)) row.add_column("PROVIDER", _upper(provider or "")) row.add_column("STORE", _upper(store or "")) row.add_column("FILES", "" if files is None else str(files)) row.add_column("DETAIL", _upper(detail or "")) def _has_store_subtype(cfg: dict, subtype: str) -> bool: store_cfg = cfg.get("store") if not isinstance(store_cfg, dict): return False bucket = store_cfg.get(subtype) if not isinstance(bucket, dict): return False return any(isinstance(v, dict) and bool(v) for v in bucket.values()) def _has_provider(cfg: dict, name: str) -> bool: provider_cfg = cfg.get("provider") if not isinstance(provider_cfg, dict): return False block = provider_cfg.get(str(name).strip().lower()) return isinstance(block, dict) and bool(block) def _has_tool(cfg: dict, name: str) -> bool: tool_cfg = cfg.get("tool") if not isinstance(tool_cfg, dict): return False block = tool_cfg.get(str(name).strip().lower()) return isinstance(block, dict) and bool(block) def _ping_url(url: str, timeout: float = 3.0) -> tuple[bool, str]: try: from API.HTTP import HTTPClient with HTTPClient(timeout=timeout, retries=1) as client: resp = client.get(url, allow_redirects=True) code = int(getattr(resp, "status_code", 0) or 0) ok = 200 <= code < 500 return ok, f"{url} (HTTP {code})" except Exception as exc: return False, f"{url} ({type(exc).__name__})" config = self._config_loader.load() debug_enabled = bool(config.get("debug", False)) set_debug(debug_enabled) _install_rich_traceback(show_locals=debug_enabled) _add_startup_check("ENABLED" if debug_enabled else "DISABLED", "DEBUGGING") try: try: from MPV.mpv_ipc import MPV import shutil MPV() mpv_path = shutil.which("mpv") _add_startup_check("ENABLED", "MPV", detail=mpv_path or "Available") except Exception as exc: _add_startup_check("DISABLED", "MPV", detail=str(exc)) store_registry = None if config: try: from Store import Store as StoreRegistry store_registry = StoreRegistry(config=config, suppress_debug=True) except Exception: store_registry = None if _has_store_subtype(config, "hydrusnetwork"): store_cfg = config.get("store") hydrus_cfg = ( store_cfg.get("hydrusnetwork", {}) if isinstance(store_cfg, dict) else {} ) if isinstance(hydrus_cfg, dict): for instance_name, instance_cfg in hydrus_cfg.items(): if not isinstance(instance_cfg, dict): continue name_key = str(instance_cfg.get("NAME") or instance_name) url_val = str(instance_cfg.get("URL") or "").strip() ok = bool( store_registry and store_registry.is_available(name_key) ) status = "ENABLED" if ok else "DISABLED" if ok: total = None try: if store_registry: backend = store_registry[name_key] total = getattr(backend, "total_count", None) if total is None: getter = getattr( backend, "get_total_count", None ) if callable(getter): total = getter() except Exception: total = None detail = url_val files = total if isinstance( total, int ) and total >= 0 else None else: err = None if store_registry: err = store_registry.get_backend_error( instance_name ) or store_registry.get_backend_error(name_key) detail = (url_val + (" - " if url_val else "") ) + (err or "Unavailable") files = None _add_startup_check( status, name_key, store="hydrusnetwork", files=files, detail=detail ) provider_cfg = config.get("provider" ) if isinstance(config, dict) else None if isinstance(provider_cfg, dict) and provider_cfg: from Provider.metadata_provider import list_metadata_providers from ProviderCore.registry import ( list_file_providers, list_providers, list_search_providers, ) provider_availability = list_providers(config) or {} search_availability = list_search_providers(config) or {} file_availability = list_file_providers(config) or {} meta_availability = list_metadata_providers(config) or {} def _provider_display_name(key: str) -> str: k = (key or "").strip() low = k.lower() if low == "openlibrary": return "OpenLibrary" if low == "alldebrid": return "AllDebrid" if low == "youtube": return "YouTube" return k[:1].upper() + k[1:] if k else "Provider" already_checked = {"matrix"} def _default_provider_ping_targets(provider_key: str) -> list[str]: prov = (provider_key or "").strip().lower() if prov == "openlibrary": return ["https://openlibrary.org"] if prov == "youtube": return ["https://www.youtube.com"] if prov == "bandcamp": return ["https://bandcamp.com"] if prov == "libgen": from Provider.libgen import MIRRORS mirrors = [ str(x).rstrip("/") for x in (MIRRORS or []) if str(x).strip() ] return [m + "/json.php" for m in mirrors] return [] def _ping_first(urls: list[str]) -> tuple[bool, str]: for u in urls: ok, detail = _ping_url(u) if ok: return True, detail if urls: ok, detail = _ping_url(urls[0]) return ok, detail return False, "No ping target" for provider_name in provider_cfg.keys(): prov = str(provider_name or "").strip().lower() if not prov or prov in already_checked: continue display = _provider_display_name(prov) if prov == "alldebrid": try: from Provider.alldebrid import _get_debrid_api_key from API.alldebrid import AllDebridClient api_key = _get_debrid_api_key(config) if not api_key: _add_startup_check( "DISABLED", display, provider=prov, detail="Not configured" ) else: client = AllDebridClient(api_key) base_url = str( getattr(client, "base_url", "") or "" ).strip() _add_startup_check( "ENABLED", display, provider=prov, detail=base_url or "Connected", ) except Exception as exc: _add_startup_check( "DISABLED", display, provider=prov, detail=str(exc) ) continue is_known = False ok_val: Optional[bool] = None if prov in provider_availability: is_known = True ok_val = bool(provider_availability.get(prov)) elif prov in search_availability: is_known = True ok_val = bool(search_availability.get(prov)) elif prov in file_availability: is_known = True ok_val = bool(file_availability.get(prov)) elif prov in meta_availability: is_known = True ok_val = bool(meta_availability.get(prov)) if not is_known: _add_startup_check( "UNKNOWN", display, provider=prov, detail="Not registered" ) else: detail = "Configured" if ok_val else "Not configured" ping_targets = _default_provider_ping_targets(prov) if ping_targets: ping_ok, ping_detail = _ping_first(ping_targets) if ok_val: detail = ping_detail else: detail = ( (detail + " | " + ping_detail) if ping_detail else detail ) _add_startup_check( "ENABLED" if ok_val else "DISABLED", display, provider=prov, detail=detail, ) already_checked.add(prov) default_search_providers = [ "openlibrary", "libgen", "youtube", "bandcamp" ] for prov in default_search_providers: if prov in already_checked: continue display = _provider_display_name(prov) ok_val = ( bool(search_availability.get(prov)) if prov in search_availability else False ) ping_targets = _default_provider_ping_targets(prov) ping_ok, ping_detail = ( _ping_first(ping_targets) if ping_targets else (False, "No ping target") ) detail = ping_detail or ( "Available" if ok_val else "Unavailable" ) if not ok_val: detail = "Unavailable" + ( f" | {ping_detail}" if ping_detail else "" ) _add_startup_check( "ENABLED" if (ok_val and ping_ok) else "DISABLED", display, provider=prov, detail=detail, ) already_checked.add(prov) if "0x0" not in already_checked: ok_val = ( bool(file_availability.get("0x0")) if "0x0" in file_availability else False ) ping_ok, ping_detail = _ping_url("https://0x0.st") detail = ping_detail if not ok_val: detail = "Unavailable" + ( f" | {ping_detail}" if ping_detail else "" ) _add_startup_check( "ENABLED" if (ok_val and ping_ok) else "DISABLED", "0x0", provider="0x0", detail=detail, ) if _has_provider(config, "matrix"): try: from Provider.matrix import Matrix provider = Matrix(config) matrix_conf = ( config.get("provider", {}).get("matrix", {}) if isinstance(config, dict) else {} ) homeserver = str(matrix_conf.get("homeserver") or "").strip() room_id = str(matrix_conf.get("room_id") or "").strip() if homeserver and not homeserver.startswith("http"): homeserver = f"https://{homeserver}" target = homeserver.rstrip("/") if room_id: target = ( target + (" " if target else "") ) + f"room:{room_id}" _add_startup_check( "ENABLED" if provider.validate() else "DISABLED", "Matrix", provider="matrix", detail=target or ("Connected" if provider.validate() else "Not configured"), ) except Exception as exc: _add_startup_check( "DISABLED", "Matrix", provider="matrix", detail=str(exc) ) if _has_store_subtype(config, "debrid"): try: from SYS.config import get_debrid_api_key from API.alldebrid import AllDebridClient api_key = get_debrid_api_key(config) if not api_key: _add_startup_check( "DISABLED", "Debrid", store="debrid", detail="Not configured" ) else: client = AllDebridClient(api_key) base_url = str(getattr(client, "base_url", "") or "").strip() _add_startup_check( "ENABLED", "Debrid", store="debrid", detail=base_url or "Connected" ) except Exception as exc: _add_startup_check( "DISABLED", "Debrid", store="debrid", detail=str(exc) ) try: from tool.ytdlp import YtDlpTool cookiefile = YtDlpTool(config).resolve_cookiefile() if cookiefile is not None: _add_startup_check("FOUND", "Cookies", detail=str(cookiefile)) else: _add_startup_check("MISSING", "Cookies", detail="Not found") except Exception as exc: _add_startup_check("ERROR", "Cookies", detail=str(exc)) # Tool checks (configured via [tool=...]) if _has_tool(config, "florencevision"): try: tool_cfg = config.get("tool") fv_cfg = tool_cfg.get("florencevision") if isinstance(tool_cfg, dict) else None enabled = bool(fv_cfg.get("enabled")) if isinstance(fv_cfg, dict) else False if not enabled: _add_startup_check( "DISABLED", "FlorenceVision", provider="tool", detail="Not enabled", ) else: from SYS.optional_deps import florencevision_missing_modules missing = florencevision_missing_modules() if missing: _add_startup_check( "DISABLED", "FlorenceVision", provider="tool", detail="Missing: " + ", ".join(missing), ) else: _add_startup_check( "ENABLED", "FlorenceVision", provider="tool", detail="Ready", ) except Exception as exc: _add_startup_check( "DISABLED", "FlorenceVision", provider="tool", detail=str(exc), ) if startup_table.rows: stdout_console().print() stdout_console().print(startup_table) except Exception as exc: if debug_enabled: debug(f"⚠ Could not check service availability: {exc}") style = Style.from_dict( { "cmdlet": "#ffffff", "argument": "#3b8eea", "value": "#9a3209", "string": "#6d0d93", "pipe": "#4caf50", "selection_at": "#f1c40f", "selection_range": "#4caf50", "bottom-toolbar": "noreverse", } ) class ToolbarState: text: str = "" last_update_time: float = 0.0 clear_timer: Optional[threading.Timer] = None toolbar_state = ToolbarState() session: Optional[PromptSession] = None def get_toolbar() -> Optional[str]: if not toolbar_state.text or not toolbar_state.text.strip(): return None if time.time() - toolbar_state.last_update_time > 3: toolbar_state.text = "" return None return toolbar_state.text def update_toolbar(text: str) -> None: nonlocal session text = text.strip() toolbar_state.text = text toolbar_state.last_update_time = time.time() if toolbar_state.clear_timer: toolbar_state.clear_timer.cancel() toolbar_state.clear_timer = None if text: def clear_toolbar() -> None: toolbar_state.text = "" toolbar_state.clear_timer = None if session is not None and hasattr( session, "app") and session.app.is_running: session.app.invalidate() toolbar_state.clear_timer = threading.Timer(3.0, clear_toolbar) toolbar_state.clear_timer.daemon = True toolbar_state.clear_timer.start() if session is not None and hasattr(session, "app") and session.app.is_running: session.app.invalidate() self._pipeline_executor.set_toolbar_output(update_toolbar) completer = CmdletCompleter(config_loader=self._config_loader) session = PromptSession( completer=cast(Any, completer), lexer=MedeiaLexer(), style=style, bottom_toolbar=get_toolbar, refresh_interval=0.5, ) while True: try: user_input = session.prompt(prompt_text).strip() except (EOFError, KeyboardInterrupt): print("He who is victorious through deceit is defeated by the truth.") break if not user_input: continue low = user_input.lower() if low in {"exit", "quit", "q"}: print("He who is victorious through deceit is defeated by the truth.") break if low in {"help", "?"}: CmdletHelp.show_cmdlet_list() continue pipeline_ctx_ref = None try: from SYS import pipeline as ctx ctx.set_current_command_text(user_input) pipeline_ctx_ref = ctx except Exception: pipeline_ctx_ref = None try: from SYS.cli_syntax import validate_pipeline_text syntax_error = validate_pipeline_text(user_input) if syntax_error: print(syntax_error.message, file=sys.stderr) continue except Exception: pass try: tokens = shlex.split(user_input) except ValueError as exc: print(f"Syntax error: {exc}", file=sys.stderr) continue if not tokens: continue if len(tokens) == 1 and tokens[0] == "@,,": try: from SYS import pipeline as ctx if ctx.restore_next_result_table(): last_table = ( ctx.get_display_table() if hasattr(ctx, "get_display_table") else None ) if last_table is None: last_table = ctx.get_last_result_table() if last_table: stdout_console().print() ctx.set_current_stage_table(last_table) stdout_console().print(last_table) else: items = ctx.get_last_result_items() if items: ctx.set_current_stage_table(None) print( f"Restored {len(items)} items (no table format available)" ) else: print("No forward history available", file=sys.stderr) except Exception as exc: print(f"Error restoring next table: {exc}", file=sys.stderr) continue if len(tokens) == 1 and tokens[0] == "@..": try: from SYS import pipeline as ctx if ctx.restore_previous_result_table(): last_table = ( ctx.get_display_table() if hasattr(ctx, "get_display_table") else None ) if last_table is None: last_table = ctx.get_last_result_table() # Auto-refresh search-file tables when navigating back, # so row payloads (titles/tags) reflect latest store state. try: src_cmd = ( getattr(last_table, "source_command", None) if last_table else None ) if (isinstance(src_cmd, str) and src_cmd.lower().replace("_", "-") == "search-file"): src_args = ( getattr(last_table, "source_args", None) if last_table else None ) base_args = list(src_args ) if isinstance(src_args, list) else [] cleaned_args = [ str(a) for a in base_args if str(a).strip().lower() not in {"--refresh", "-refresh"} ] if hasattr(ctx, "set_current_command_text"): try: title_text = ( getattr(last_table, "title", None) if last_table else None ) if isinstance(title_text, str) and title_text.strip(): ctx.set_current_command_text( title_text.strip() ) else: ctx.set_current_command_text( " ".join( ["search-file", *cleaned_args] ).strip() ) except Exception: pass try: self._cmdlet_executor.execute( "search-file", cleaned_args + ["--refresh"] ) finally: if hasattr(ctx, "clear_current_command_text"): try: ctx.clear_current_command_text() except Exception: pass continue except Exception as exc: print( f"Error refreshing search-file table: {exc}", file=sys.stderr ) if last_table: stdout_console().print() ctx.set_current_stage_table(last_table) stdout_console().print(last_table) else: items = ctx.get_last_result_items() if items: ctx.set_current_stage_table(None) print( f"Restored {len(items)} items (no table format available)" ) else: print("No previous result table in history") else: print("Result table history is empty") except Exception as exc: print(f"Error restoring previous result table: {exc}") continue try: if "|" in tokens or (tokens and tokens[0].startswith("@")): self._pipeline_executor.execute_tokens(tokens) else: cmd_name = tokens[0].replace("_", "-").lower() is_help = any( arg in {"-help", "--help", "-h"} for arg in tokens[1:] ) if is_help: CmdletHelp.show_cmdlet_help(cmd_name) else: self._cmdlet_executor.execute(cmd_name, tokens[1:]) finally: if pipeline_ctx_ref: pipeline_ctx_ref.clear_current_command_text() _PTK_Lexer = object # type: ignore # Expose a stable name used by the rest of the module Lexer = _PTK_Lexer # The REPL lexer implementation is provided by `SYS.cli_parsing.MedeiaLexer`. # We import and expose it from there to avoid duplication and keep parsing # helpers centralized for testing and reuse. # # Note: The class previously defined here has been moved to `SYS.cli_parsing`. if __name__ == "__main__": CLI().run()