Files
Medios-Macina/CLI.py

1849 lines
78 KiB
Python
Raw Normal View History

2025-11-25 20:09:33 -08:00
from __future__ import annotations
"""CLI REPL for Medeia-Macina with autocomplete support."""
import sys
import json
import re
import io
import uuid
import atexit
from copy import deepcopy
from importlib import import_module
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Set, TextIO, TYPE_CHECKING, cast
try:
import typer
except ImportError:
typer = None
try:
from result_table import ResultTable, format_result
RESULT_TABLE_AVAILABLE = True
except ImportError:
RESULT_TABLE_AVAILABLE = False
ResultTable = None # type: ignore
format_result = None # type: ignore
try:
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit.document import Document
2025-11-25 22:34:41 -08:00
from prompt_toolkit.lexers import Lexer
from prompt_toolkit.styles import Style
2025-11-25 20:09:33 -08:00
PROMPT_TOOLKIT_AVAILABLE = True
except ImportError: # pragma: no cover - optional dependency
PromptSession = None # type: ignore
Completer = None # type: ignore
Completion = None # type: ignore
Document = None # type: ignore
2025-11-25 22:34:41 -08:00
Lexer = None # type: ignore
Style = None # type: ignore
2025-11-25 20:09:33 -08:00
PROMPT_TOOLKIT_AVAILABLE = False
try:
from helper.worker_manager import WorkerManager
except ImportError: # pragma: no cover - optional dependency
WorkerManager = None # type: ignore
if TYPE_CHECKING: # pragma: no cover - typing helper
from helper.worker_manager import WorkerManager as WorkerManagerType
else:
WorkerManagerType = Any
from config import get_local_storage_path, load_config
class _WorkerOutputMirror(io.TextIOBase):
"""Mirror stdout/stderr to worker manager while preserving console output."""
def __init__(self, original: TextIO, manager: WorkerManagerType, worker_id: str, channel: str):
self._original = original
self._manager = manager
self._worker_id = worker_id
self._channel = channel
self._pending: str = ""
def write(self, data: str) -> int:
if not data:
return 0
self._original.write(data)
self._buffer_text(data)
return len(data)
def flush(self) -> None:
self._original.flush()
self._flush_pending(force=True)
def isatty(self) -> bool: # pragma: no cover - passthrough
return bool(getattr(self._original, "isatty", lambda: False)())
def _buffer_text(self, data: str) -> None:
combined = self._pending + data
lines = combined.splitlines(keepends=True)
if not lines:
self._pending = combined
return
if lines[-1].endswith(("\n", "\r")):
complete = lines
self._pending = ""
else:
complete = lines[:-1]
self._pending = lines[-1]
for chunk in complete:
self._emit(chunk)
def _flush_pending(self, force: bool = False) -> None:
if self._pending and force:
self._emit(self._pending)
self._pending = ""
def _emit(self, text: str) -> None:
if not text:
return
try:
self._manager.append_stdout(self._worker_id, text, channel=self._channel)
except Exception:
pass
@property
def encoding(self) -> str: # type: ignore[override]
return getattr(self._original, "encoding", "utf-8")
class _WorkerStageSession:
"""Lifecycle helper for wrapping a CLI cmdlet execution in a worker record."""
def __init__(
self,
manager: WorkerManagerType,
worker_id: str,
orig_stdout: TextIO,
orig_stderr: TextIO,
stdout_proxy: _WorkerOutputMirror,
stderr_proxy: _WorkerOutputMirror,
config: Optional[Dict[str, Any]],
logging_enabled: bool,
completion_label: str,
error_label: str,
) -> None:
self.manager = manager
self.worker_id = worker_id
self.orig_stdout = orig_stdout
self.orig_stderr = orig_stderr
self.stdout_proxy = stdout_proxy
self.stderr_proxy = stderr_proxy
self.config = config
self.logging_enabled = logging_enabled
self.closed = False
self._completion_label = completion_label
self._error_label = error_label
def close(self, status: str = "completed", error_msg: str = "") -> None:
if self.closed:
return
try:
self.stdout_proxy.flush()
self.stderr_proxy.flush()
except Exception:
pass
sys.stdout = self.orig_stdout
sys.stderr = self.orig_stderr
if self.logging_enabled:
try:
self.manager.disable_logging_for_worker(self.worker_id)
except Exception:
pass
try:
if status == "completed":
self.manager.log_step(self.worker_id, self._completion_label)
else:
self.manager.log_step(self.worker_id, f"{self._error_label}: {error_msg or status}")
except Exception:
pass
try:
self.manager.finish_worker(self.worker_id, result=status or "completed", error_msg=error_msg or "")
except Exception:
pass
if self.config and self.config.get('_current_worker_id') == self.worker_id:
self.config.pop('_current_worker_id', None)
self.closed = True
_CLI_WORKER_MANAGER: Optional[WorkerManagerType] = None
_CLI_ORPHAN_CLEANUP_DONE = False
CLI_ROOT = Path(__file__).resolve().parent
def _load_cli_config() -> Dict[str, Any]:
"""Load config.json relative to the CLI script location."""
try:
return deepcopy(load_config(config_dir=CLI_ROOT))
except Exception:
return {}
def _get_table_title_for_command(cmd_name: str, emitted_items: Optional[List[Any]] = None) -> str:
"""Generate a dynamic table title based on the command and emitted items.
Args:
cmd_name: The command name (e.g., 'search-file', 'get-tag', 'get-file')
emitted_items: The items being displayed
Returns:
A descriptive title for the result table
"""
# Mapping of commands to title templates
title_map = {
'search-file': 'Results',
'search_file': 'Results',
'download-data': 'Downloads',
'download_data': 'Downloads',
'get-tag': 'Tags',
'get_tag': 'Tags',
'get-file': 'Results',
'get_file': 'Results',
'add-tag': 'Results',
'add_tag': 'Results',
'delete-tag': 'Results',
'delete_tag': 'Results',
'add-url': 'Results',
'add_url': 'Results',
'get-url': 'URLs',
'get_url': 'URLs',
'delete-url': 'Results',
'delete_url': 'Results',
'get-note': 'Notes',
'get_note': 'Notes',
'add-note': 'Results',
'add_note': 'Results',
'delete-note': 'Results',
'delete_note': 'Results',
'get-relationship': 'Relationships',
'get_relationship': 'Relationships',
'add-relationship': 'Results',
'add_relationship': 'Results',
'add-file': 'Results',
'add_file': 'Results',
'delete-file': 'Results',
'delete_file': 'Results',
'check-file-status': 'Status',
'check_file_status': 'Status',
}
return title_map.get(cmd_name, 'Results')
def _close_cli_worker_manager() -> None:
global _CLI_WORKER_MANAGER
if _CLI_WORKER_MANAGER:
try:
_CLI_WORKER_MANAGER.close()
except Exception:
pass
_CLI_WORKER_MANAGER = None
atexit.register(_close_cli_worker_manager)
def _ensure_worker_manager(config: Dict[str, Any]) -> Optional[WorkerManagerType]:
"""Attach a WorkerManager to the CLI config for cmdlet execution."""
global _CLI_WORKER_MANAGER, _CLI_ORPHAN_CLEANUP_DONE
if WorkerManager is None:
return None
if not isinstance(config, dict):
return None
existing = config.get('_worker_manager')
if isinstance(existing, WorkerManager):
return existing
library_root = get_local_storage_path(config)
if not library_root:
return None
try:
resolved_root = Path(library_root).resolve()
except Exception:
resolved_root = Path(library_root)
try:
if not _CLI_WORKER_MANAGER or Path(getattr(_CLI_WORKER_MANAGER, 'library_root', '')) != resolved_root:
if _CLI_WORKER_MANAGER:
try:
_CLI_WORKER_MANAGER.close()
except Exception:
pass
_CLI_WORKER_MANAGER = WorkerManager(resolved_root, auto_refresh_interval=0)
manager = _CLI_WORKER_MANAGER
config['_worker_manager'] = manager
if manager and not _CLI_ORPHAN_CLEANUP_DONE:
try:
manager.expire_running_workers(
older_than_seconds=120,
worker_id_prefix="cli_%",
reason="CLI session ended unexpectedly; marking worker as failed",
)
except Exception:
pass
else:
_CLI_ORPHAN_CLEANUP_DONE = True
return manager
except Exception as exc:
print(f"[worker] Could not initialize worker manager: {exc}", file=sys.stderr)
return None
def _start_worker_session(
worker_manager: Optional[WorkerManagerType],
*,
worker_type: str,
title: str,
description: str,
pipe_text: str,
config: Optional[Dict[str, Any]],
completion_label: str,
error_label: str,
skip_logging_for: Optional[Set[str]] = None,
) -> Optional[_WorkerStageSession]:
"""Create a worker session wrapper and mirror stdout/stderr."""
if worker_manager is None:
return None
if skip_logging_for and worker_type in skip_logging_for:
return None
safe_type = worker_type or "cmd"
worker_id = f"cli_{safe_type[:8]}_{uuid.uuid4().hex[:6]}"
try:
tracked = worker_manager.track_worker(
worker_id,
worker_type=worker_type,
title=title,
description=description or "(no args)",
pipe=pipe_text,
)
if not tracked:
return None
except Exception as exc:
print(f"[worker] Failed to track {worker_type}: {exc}", file=sys.stderr)
return None
logging_enabled = False
try:
handler = worker_manager.enable_logging_for_worker(worker_id)
logging_enabled = handler is not None
except Exception:
logging_enabled = False
orig_stdout = sys.stdout
orig_stderr = sys.stderr
stdout_proxy = _WorkerOutputMirror(orig_stdout, worker_manager, worker_id, 'stdout')
stderr_proxy = _WorkerOutputMirror(orig_stderr, worker_manager, worker_id, 'stderr')
sys.stdout = stdout_proxy
sys.stderr = stderr_proxy
if isinstance(config, dict):
config['_current_worker_id'] = worker_id
try:
worker_manager.log_step(worker_id, f"Started {worker_type}")
except Exception:
pass
return _WorkerStageSession(
manager=worker_manager,
worker_id=worker_id,
orig_stdout=orig_stdout,
orig_stderr=orig_stderr,
stdout_proxy=stdout_proxy,
stderr_proxy=stderr_proxy,
config=config,
logging_enabled=logging_enabled,
completion_label=completion_label,
error_label=error_label,
)
def _begin_worker_stage(
worker_manager: Optional[WorkerManagerType],
cmd_name: str,
stage_tokens: Sequence[str],
config: Optional[Dict[str, Any]],
command_text: str,
) -> Optional[_WorkerStageSession]:
"""Start a worker entry for an individual CLI stage."""
description = " ".join(stage_tokens[1:]) if len(stage_tokens) > 1 else "(no args)"
return _start_worker_session(
worker_manager,
worker_type=cmd_name,
title=f"{cmd_name} stage",
description=description,
pipe_text=command_text,
config=config,
completion_label="Stage completed",
error_label="Stage error",
skip_logging_for={".worker", "worker", "workers"},
)
def _begin_pipeline_worker(
worker_manager: Optional[WorkerManagerType],
pipeline_text: str,
config: Optional[Dict[str, Any]],
) -> Optional[_WorkerStageSession]:
"""Start a worker that represents the entire pipeline execution."""
return _start_worker_session(
worker_manager,
worker_type="pipeline",
title="Pipeline run",
description=pipeline_text,
pipe_text=pipeline_text,
config=config,
completion_label="Pipeline completed",
error_label="Pipeline error",
)
def _get_cmdlet_names() -> List[str]:
"""Get list of all available cmdlet names."""
try:
from cmdlets import REGISTRY
return sorted(set(REGISTRY.keys()))
except Exception:
return []
def _get_cmdlet_args(cmd_name: str) -> List[str]:
"""Get list of argument flags for a cmdlet (with - and -- prefixes)."""
try:
# Try to load CMDLET object from the module
mod_name = cmd_name.replace("-", "_")
# Try importing as cmdlet first, then as root-level module
data = None
try:
mod = import_module(f"cmdlets.{mod_name}")
data = getattr(mod, "CMDLET", None)
except (ModuleNotFoundError, ImportError):
try:
# Try root-level modules like search_soulseek
mod = import_module(mod_name)
data = getattr(mod, "CMDLET", None)
except (ModuleNotFoundError, ImportError):
pass
if data:
# If CMDLET is an object (not dict), use build_flag_registry if available
if not isinstance(data, dict) and hasattr(data, 'build_flag_registry'):
registry = data.build_flag_registry()
# Flatten all flags into a single list
all_flags = []
for flag_set in registry.values():
all_flags.extend(flag_set)
return sorted(all_flags)
# Fallback for dict format or old style
args_list = data.get("args", []) if isinstance(data, dict) else getattr(data, "args", [])
arg_names = []
for arg in args_list:
if isinstance(arg, dict):
name = arg.get("name", "")
else:
name = getattr(arg, "name", "")
if name:
# Add both - and -- variants
arg_names.append(f"-{name}")
arg_names.append(f"--{name}")
return arg_names
return []
except Exception:
return []
def _get_arg_choices(cmd_name: str, arg_name: str) -> List[str]:
"""Get list of valid choices for a specific cmdlet argument."""
try:
mod_name = cmd_name.replace("-", "_")
try:
mod = import_module(f"cmdlets.{mod_name}")
data = getattr(mod, "CMDLET", None)
if data:
args_list = data.get("args", []) if isinstance(data, dict) else getattr(data, "args", [])
for arg in args_list:
if isinstance(arg, dict):
arg_obj_name = arg.get("name", "")
else:
arg_obj_name = getattr(arg, "name", "")
if arg_obj_name == arg_name:
# Found matching arg, get choices
if isinstance(arg, dict):
return arg.get("choices", [])
else:
return getattr(arg, "choices", [])
except ModuleNotFoundError:
pass
return []
except Exception:
return []
if (
PROMPT_TOOLKIT_AVAILABLE
and PromptSession is not None
and Completion is not None
and Completer is not None
and Document is not None
):
CompletionType = cast(Any, Completion)
class CmdletCompleter(Completer):
"""Custom completer for cmdlet REPL with autocomplete tied to cmdlet metadata."""
def __init__(self):
self.cmdlet_names = _get_cmdlet_names()
def get_completions(self, document: Document, complete_event): # type: ignore[override]
"""Generate completions for the current input."""
text = document.text_before_cursor
tokens = text.split()
if not tokens:
for cmd in self.cmdlet_names:
yield CompletionType(cmd, start_position=0)
elif len(tokens) == 1:
current = tokens[0].lower()
for cmd in self.cmdlet_names:
if cmd.startswith(current):
yield CompletionType(cmd, start_position=-len(current))
for keyword in ["help", "exit", "quit"]:
if keyword.startswith(current):
yield CompletionType(keyword, start_position=-len(current))
else:
cmd_name = tokens[0].replace("_", "-").lower()
current_token = tokens[-1].lower()
prev_token = tokens[-2].lower() if len(tokens) > 1 else ""
choices = _get_arg_choices(cmd_name, prev_token)
if choices:
for choice in choices:
if choice.lower().startswith(current_token):
yield CompletionType(choice, start_position=-len(current_token))
return
arg_names = _get_cmdlet_args(cmd_name)
for arg in arg_names:
if arg.lower().startswith(current_token):
yield CompletionType(arg, start_position=-len(current_token))
if "--help".startswith(current_token):
yield CompletionType("--help", start_position=-len(current_token))
async def get_completions_async(self, document: Document, complete_event): # type: ignore[override]
for completion in self.get_completions(document, complete_event):
yield completion
2025-11-25 22:34:41 -08:00
class MedeiaLexer(Lexer):
def lex_document(self, document):
def get_line(lineno):
line = document.lines[lineno]
tokens = []
import re
# Match: Whitespace, Pipe, Quoted string, or Word
pattern = re.compile(r'''
(\s+) | # 1. Whitespace
(\|) | # 2. Pipe
("(?:[^"\\]|\\.)*"|'(?:[^'\\]|\\.)*') | # 3. Quoted string
([^\s\|]+) # 4. Word
''', re.VERBOSE)
is_cmdlet = True
for match in pattern.finditer(line):
ws, pipe, quote, word = match.groups()
if ws:
tokens.append(('', ws))
elif pipe:
tokens.append(('class:pipe', pipe))
is_cmdlet = True
elif quote:
tokens.append(('class:string', quote))
is_cmdlet = False
elif word:
if is_cmdlet:
tokens.append(('class:cmdlet', word))
is_cmdlet = False
elif word.startswith('-'):
tokens.append(('class:argument', word))
else:
tokens.append(('class:value', word))
return tokens
return get_line
2025-11-25 20:09:33 -08:00
else: # pragma: no cover - prompt toolkit unavailable
CmdletCompleter = None # type: ignore[assignment]
def _create_cmdlet_cli():
"""Create Typer CLI app for cmdlet-based commands."""
if typer is None:
return None
app = typer.Typer(help="Medeia-Macina CLI")
@app.command("repl")
def repl():
"""Start interactive REPL for cmdlets with autocomplete."""
banner = """
Medeia-Macina
=======================================
Commands: help | exit | <cmdlet> --help
Example: search-file --help
"""
print(banner)
# Pre-acquire Hydrus session key at startup (like hub-ui does)
try:
config = _load_cli_config()
if config:
# Initialize debug logging
from helper.logger import set_debug, debug
debug_enabled = config.get("debug", False)
set_debug(debug_enabled)
if debug_enabled:
debug("✓ Debug logging enabled")
try:
from helper.hydrus import get_client
get_client(config) # Pre-acquire and cache session key
debug("✓ Hydrus session key acquired")
except RuntimeError as e:
# Hydrus is not available - this is expected and normal
# Don't show a message, just continue without it
pass
except Exception as e:
debug(f"⚠ Could not pre-acquire Hydrus session key: {e}")
# Check MPV availability at startup
try:
from hydrus_health_check import check_mpv_availability
check_mpv_availability()
except Exception as e:
debug(f"⚠ Could not check MPV availability: {e}")
except Exception:
pass # Silently ignore if config loading fails
if PROMPT_TOOLKIT_AVAILABLE and PromptSession is not None and CmdletCompleter is not None:
completer = CmdletCompleter()
2025-11-25 22:34:41 -08:00
# Define style for syntax highlighting
style = Style.from_dict({
'cmdlet': '#ffffff', # white
'argument': '#3b8eea', # blue-ish
'value': '#ce9178', # red-ish
'string': '#ce55ff', # purple
'pipe': '#4caf50', # green
})
session = PromptSession(
completer=cast(Any, completer),
lexer=MedeiaLexer(),
style=style
)
2025-11-25 20:09:33 -08:00
def get_input(prompt: str = ">>>|") -> str:
return session.prompt(prompt)
else:
def get_input(prompt: str = ">>>|") -> str:
return input(prompt)
while True:
try:
user_input = get_input(">>>|").strip()
except (EOFError, KeyboardInterrupt):
print("\nGoodbye!")
break
if not user_input:
continue
low = user_input.lower()
if low in {"exit", "quit", "q"}:
print("Goodbye!")
break
if low in {"help", "?"}:
_show_cmdlet_list()
continue
pipeline_ctx_ref = None
try:
import pipeline as ctx # noqa: F401
ctx.set_current_command_text(user_input)
pipeline_ctx_ref = ctx
except Exception:
pipeline_ctx_ref = None
try:
import shlex
tokens = shlex.split(user_input)
except ValueError:
tokens = user_input.split()
if not tokens:
continue
# Handle special @.. selector to restore previous result table
if len(tokens) == 1 and tokens[0] == "@..":
try:
import pipeline as ctx
if ctx.restore_previous_result_table():
# Check for overlay table first
if hasattr(ctx, 'get_display_table'):
last_table = ctx.get_display_table()
else:
last_table = None
if last_table is None:
last_table = ctx.get_last_result_table()
2025-11-25 22:34:41 -08:00
2025-11-25 20:09:33 -08:00
if last_table:
print()
# Also update current stage table so @N expansion works correctly
ctx.set_current_stage_table(last_table)
print(last_table.format_plain())
else:
# Fallback to items if no table object
items = ctx.get_last_result_items()
if items:
# Clear current stage table if we only have items
ctx.set_current_stage_table(None)
print(f"Restored {len(items)} items (no table format available)")
else:
print("No previous result table in history")
else:
print("Result table history is empty")
except Exception as e:
print(f"Error restoring previous result table: {e}")
continue
# Check for pipe operators to support chaining: cmd1 arg1 | cmd2 arg2 | cmd3 arg3
# Also treat selection commands (@1, @*, etc) as pipelines so they can be expanded
try:
if '|' in tokens or (tokens and tokens[0].startswith('@')):
_execute_pipeline(tokens)
else:
cmd_name = tokens[0].replace("_", "-").lower()
is_help = any(arg in {"-help", "--help", "-h"} for arg in tokens[1:])
if is_help:
_show_cmdlet_help(cmd_name)
else:
# Execute the cmdlet
_execute_cmdlet(cmd_name, tokens[1:])
finally:
if pipeline_ctx_ref:
pipeline_ctx_ref.clear_current_command_text()
return app
def _execute_pipeline(tokens: list):
"""Execute a pipeline of cmdlets separated by pipes (|).
Example: cmd1 arg1 arg2 | cmd2 arg2 | cmd3 arg3
"""
try:
from cmdlets import REGISTRY
import json
import pipeline as ctx
# Split tokens by pipe operator
stages = []
current_stage = []
for token in tokens:
if token == '|':
if current_stage:
stages.append(current_stage)
current_stage = []
else:
current_stage.append(token)
if current_stage:
stages.append(current_stage)
if not stages:
print("Invalid pipeline syntax\n")
return
# Load config relative to CLI root
config = _load_cli_config()
# Check if the first stage has @ selection - if so, apply it before pipeline execution
first_stage_tokens = stages[0] if stages else []
first_stage_selection_indices = []
first_stage_had_extra_args = False
if first_stage_tokens:
# Look for @N, @N-M, @{N,M} in the first stage args
new_first_stage = []
first_stage_select_all = False
for token in first_stage_tokens:
if token.startswith('@'):
selection = _parse_selection_syntax(token)
if selection is not None:
# This is a selection syntax - apply it to get initial piped_result
first_stage_selection_indices = sorted([i - 1 for i in selection])
elif token == "@*":
# Special case: select all items
first_stage_select_all = True
else:
# Not a valid selection, keep as arg
new_first_stage.append(token)
else:
new_first_stage.append(token)
# Update first stage - if it's now empty (only had @N), keep the selection for later processing
if new_first_stage:
stages[0] = new_first_stage
# If we found selection indices but still have tokens, these are extra args
if first_stage_selection_indices or first_stage_select_all:
first_stage_had_extra_args = True
elif first_stage_selection_indices or first_stage_select_all:
# First stage was ONLY selection (@N or @*) - remove it and apply selection to next stage's input
stages.pop(0)
# Execute each stage, threading results to the next
piped_result = None
worker_manager = _ensure_worker_manager(config)
pipeline_text = " | ".join(" ".join(stage) for stage in stages)
pipeline_session = _begin_pipeline_worker(worker_manager, pipeline_text, config)
pipeline_status = "completed"
pipeline_error = ""
# Apply first-stage selection if present
if first_stage_selection_indices:
# Ensure we have a table context for expansion from previous command
if not ctx.get_current_stage_table_source_command():
last_table = ctx.get_last_result_table()
if last_table:
ctx.set_current_stage_table(last_table)
# Special check for YouTube search results BEFORE command expansion
# If we are selecting from a YouTube search, we want to force auto-piping to .pipe
# instead of trying to expand to a command (which search-file doesn't support well for re-execution)
source_cmd = ctx.get_current_stage_table_source_command()
source_args = ctx.get_current_stage_table_source_args()
if source_cmd == 'search-file' and source_args and 'youtube' in source_args:
# Force fallback to item-based selection so we can auto-pipe
command_expanded = False
# Skip the command expansion block below
else:
# Try command-based expansion first if we have source command info
command_expanded = False
2025-11-25 22:34:41 -08:00
selected_row_args = []
2025-11-25 20:09:33 -08:00
if source_cmd:
# Try to find row args for the selected indices
for idx in first_stage_selection_indices:
row_args = ctx.get_current_stage_table_row_selection_args(idx)
if row_args:
selected_row_args.extend(row_args)
break # For now, take first selected row's args
if selected_row_args:
# Success: Reconstruct the command with selection args
expanded_stage = [source_cmd] + source_args + selected_row_args
if first_stage_had_extra_args:
# Append extra args from the first stage (e.g. @3 arg1 arg2)
expanded_stage += stages[0]
stages[0] = expanded_stage
else:
# Insert expanded command as first stage (it was popped earlier if it was only @N)
stages.insert(0, expanded_stage)
log_msg = f"@N expansion: {source_cmd} + {' '.join(selected_row_args)}"
worker_manager.log_step(pipeline_session.worker_id, log_msg) if pipeline_session and worker_manager else None
first_stage_selection_indices = [] # Clear, we've expanded it
command_expanded = True
# If command-based expansion didn't work, fall back to item-based selection
if not command_expanded and first_stage_selection_indices:
# FALLBACK: Item-based selection (filter piped items directly)
last_piped_items = ctx.get_last_result_items()
if last_piped_items:
try:
filtered = [last_piped_items[i] for i in first_stage_selection_indices if 0 <= i < len(last_piped_items)]
if filtered:
piped_result = filtered if len(filtered) > 1 else filtered[0]
log_msg = f"Applied @N selection {' | '.join('@' + str(i+1) for i in first_stage_selection_indices)}"
worker_manager.log_step(pipeline_session.worker_id, log_msg) if pipeline_session and worker_manager else None
# Special case for youtube search results in fallback mode: auto-pipe to .pipe
# This handles the case where @N is the ONLY stage (e.g. user typed "@1")
# In this case, stages is [['@1']], but we are in the fallback block because command_expanded is False
# We need to check if the source was youtube search
source_cmd = ctx.get_last_result_table_source_command()
source_args = ctx.get_last_result_table_source_args()
if source_cmd == 'search-file' and source_args and 'youtube' in source_args:
print(f"Auto-piping YouTube selection to .pipe")
# We can't modify stages here easily as we are outside the loop or before it?
# Actually, this block runs BEFORE the loop if stages[0] is a selection.
# But wait, the loop iterates over stages.
# If we are here, it means we handled the selection by filtering `piped_result`.
# The loop will then execute stages starting from 0?
# No, `_execute_pipeline` logic is complex.
# Let's look at where this block is.
# It is inside `_execute_pipeline`.
# It runs if `first_stage_selection_indices` is set (meaning stages[0] was a selection).
# And `command_expanded` is False (meaning we didn't replace stages[0] with a command).
# If we are here, `piped_result` holds the selected item(s).
# The loop below iterates `for stage_index, stage_tokens in enumerate(stages):`
# But we removed the first stage from `stages`? No.
# Wait, let's check how `first_stage_selection_indices` is used.
# It seems `stages` is modified earlier?
# "if stages and stages[0] and stages[0][0].startswith('@'): ... stages.pop(0)"
# Yes, lines 750-760 (approx) pop the first stage if it is a selection.
# So `stages` now contains the REST of the pipeline.
# If user typed just `@1`, `stages` is now empty `[]`.
# So if we want to pipe to `.pipe`, we should append `.pipe` to `stages`.
stages.append(['.pipe'])
else:
print(f"No items matched selection in pipeline\n")
return
except (TypeError, IndexError) as e:
print(f"Error applying selection in pipeline: {e}\n")
return
else:
print(f"No previous results to select from\n")
return
try:
for stage_index, stage_tokens in enumerate(stages):
if not stage_tokens:
continue
cmd_name = stage_tokens[0].replace("_", "-").lower()
stage_args = stage_tokens[1:]
# Check if this is a selection syntax (@N, @N-M, @{N,M,K}, @*, @3,5,7, @3-6,8) instead of a command
if cmd_name.startswith('@'):
selection = _parse_selection_syntax(cmd_name)
is_select_all = (cmd_name == "@*")
if selection is not None or is_select_all:
# This is a selection stage
# Check if we should expand it to a full command instead of just filtering
should_expand_to_command = False
# Check if piped_result contains format objects and we have expansion info
source_cmd = ctx.get_current_stage_table_source_command()
source_args = ctx.get_current_stage_table_source_args()
if source_cmd == '.pipe' or source_cmd == '.adjective':
should_expand_to_command = True
elif source_cmd == 'search-file' and source_args and 'youtube' in source_args:
# Special case for youtube search results: @N expands to .pipe
if stage_index + 1 >= len(stages):
# Only auto-pipe if this is the last stage
print(f"Auto-piping YouTube selection to .pipe")
stages.append(['.pipe'])
# Force should_expand_to_command to False so we fall through to filtering
should_expand_to_command = False
elif isinstance(piped_result, (list, tuple)):
first_item = piped_result[0] if piped_result else None
if isinstance(first_item, dict) and first_item.get('format_id') is not None:
# Format objects detected - check for source command
if source_cmd:
should_expand_to_command = True
elif isinstance(piped_result, dict) and piped_result.get('format_id') is not None:
# Single format object
if source_cmd:
should_expand_to_command = True
# If expanding to command, replace this stage and re-execute
if should_expand_to_command and selection is not None:
source_cmd = ctx.get_current_stage_table_source_command()
source_args = ctx.get_current_stage_table_source_args()
selection_indices = sorted([i - 1 for i in selection])
# Get row args for first selected index
selected_row_args = []
for idx in selection_indices:
row_args = ctx.get_current_stage_table_row_selection_args(idx)
if row_args:
selected_row_args.extend(row_args)
break
if selected_row_args:
# Expand to full command
# Include any arguments passed to the selection command (e.g. @3 arg1 arg2)
extra_args = stage_tokens[1:]
expanded_stage = [source_cmd] + source_args + selected_row_args + extra_args
print(f"Expanding {cmd_name} to: {' '.join(expanded_stage)}")
# Replace current stage and re-execute it
stages[stage_index] = expanded_stage
stage_tokens = expanded_stage
cmd_name = expanded_stage[0].replace("_", "-").lower()
stage_args = expanded_stage[1:]
# Clear piped_result so the expanded command doesn't receive the format objects
piped_result = None
# Don't continue - fall through to execute the expanded command
# If not expanding, use as filter
if not should_expand_to_command:
# This is a selection stage - filter piped results
if piped_result is None:
print(f"No piped results to select from with {cmd_name}\n")
pipeline_status = "failed"
pipeline_error = f"Selection {cmd_name} without upstream results"
return
# Normalize piped_result to always be a list for indexing
if isinstance(piped_result, dict) or not isinstance(piped_result, (list, tuple)):
piped_result_list = [piped_result]
else:
piped_result_list = piped_result
# Get indices to select
if is_select_all:
# @* means select all items
selection_indices = list(range(len(piped_result_list)))
elif selection is not None:
# Convert to 0-based indices
selection_indices = sorted([i - 1 for i in selection])
else:
selection_indices = []
try:
filtered = [piped_result_list[i] for i in selection_indices if 0 <= i < len(piped_result_list)]
if filtered:
piped_result = filtered if len(filtered) > 1 else filtered[0]
print(f"Selected {len(filtered)} item(s) using {cmd_name}")
continue
else:
print(f"No items matched selection {cmd_name}\n")
pipeline_status = "failed"
pipeline_error = f"Selection {cmd_name} matched nothing"
return
except (TypeError, IndexError) as e:
print(f"Error applying selection {cmd_name}: {e}\n")
pipeline_status = "failed"
pipeline_error = f"Selection error: {e}"
return
# If parse failed, treat as regular command name (will fail below)
# Get the cmdlet function
cmd_fn = REGISTRY.get(cmd_name)
if not cmd_fn:
print(f"Unknown command in pipeline: {cmd_name}\n")
pipeline_status = "failed"
pipeline_error = f"Unknown command {cmd_name}"
return
# Create pipeline context for this stage
is_last_stage = (stage_index == len(stages) - 1)
pipeline_ctx = ctx.PipelineStageContext(stage_index=stage_index, total_stages=len(stages))
ctx.set_stage_context(pipeline_ctx)
ctx.set_active(True)
# Execute the cmdlet with piped input
stage_session: Optional[_WorkerStageSession] = None
stage_status = "completed"
stage_error = ""
stage_label = f"[Stage {stage_index + 1}/{len(stages)}] {cmd_name}"
if pipeline_session and worker_manager:
try:
worker_manager.log_step(pipeline_session.worker_id, f"{stage_label} started")
except Exception:
pass
else:
stage_session = _begin_worker_stage(
worker_manager=worker_manager,
cmd_name=cmd_name,
stage_tokens=stage_tokens,
config=config,
command_text=" ".join(stage_tokens),
)
try:
ret_code = cmd_fn(piped_result, stage_args, config)
# Store emitted results for next stage (or display if last stage)
if pipeline_ctx.emits:
if is_last_stage:
# Last stage - display results
if RESULT_TABLE_AVAILABLE and ResultTable is not None and pipeline_ctx.emits:
table_title = _get_table_title_for_command(cmd_name, pipeline_ctx.emits)
# Only set source_command for search/filter commands (not display-only or action commands)
# This preserves context so @N refers to the original search, not intermediate results
selectable_commands = {
'search-file', 'download-data', 'search_file', 'download_data',
'.config', '.worker'
}
# Display-only commands (just show data, don't modify or search)
display_only_commands = {
'get-url', 'get_url', 'get-note', 'get_note',
'get-relationship', 'get_relationship', 'get-file', 'get_file',
'check-file-status', 'check_file_status'
}
# Commands that manage their own table/history state (e.g. get-tag)
self_managing_commands = {
'get-tag', 'get_tag', 'tags'
}
if cmd_name in self_managing_commands:
# Command has already set the table and history
# Retrieve the table it set so we print the correct custom formatting
# Check for overlay table first (e.g. get-tag)
if hasattr(ctx, 'get_display_table'):
table = ctx.get_display_table()
else:
table = None
if table is None:
table = ctx.get_last_result_table()
if table is None:
# Fallback if something went wrong
table = ResultTable(table_title)
for emitted in pipeline_ctx.emits:
table.add_result(emitted)
else:
table = ResultTable(table_title)
for emitted in pipeline_ctx.emits:
table.add_result(emitted)
if cmd_name in selectable_commands:
table.set_source_command(cmd_name, stage_args)
ctx.set_last_result_table(table, pipeline_ctx.emits)
elif cmd_name in display_only_commands:
# Display-only: show table but preserve search context
ctx.set_last_result_items_only(pipeline_ctx.emits)
else:
# Action commands (add-*, delete-*): update items only, don't change table/history
ctx.set_last_result_items_only(pipeline_ctx.emits)
print()
print(table.format_plain())
else:
for emitted in pipeline_ctx.emits:
if isinstance(emitted, dict):
print(json.dumps(emitted, indent=2))
else:
print(emitted)
# For display-only results, also preserve context by not calling set_last_result_table
else:
# Intermediate stage - thread to next stage
piped_result = pipeline_ctx.emits
ctx.set_last_result_table(None, pipeline_ctx.emits)
if ret_code != 0:
stage_status = "failed"
stage_error = f"exit code {ret_code}"
print(f"[stage {stage_index} exit code: {ret_code}]\n")
if pipeline_session:
pipeline_status = "failed"
pipeline_error = f"{stage_label} failed ({stage_error})"
return
except Exception as e:
stage_status = "failed"
stage_error = f"{type(e).__name__}: {e}"
print(f"[error in stage {stage_index} ({cmd_name})]: {type(e).__name__}: {e}\n")
import traceback
traceback.print_exc()
if pipeline_session:
pipeline_status = "failed"
pipeline_error = f"{stage_label} error: {e}"
return
finally:
if stage_session:
stage_session.close(status=stage_status, error_msg=stage_error)
elif pipeline_session and worker_manager:
try:
worker_manager.log_step(
pipeline_session.worker_id,
f"{stage_label} {'completed' if stage_status == 'completed' else 'failed'}",
)
except Exception:
pass
# If we have a result but no stages left (e.g. pure selection @3 that didn't expand to a command), display it
if not stages and piped_result is not None:
if RESULT_TABLE_AVAILABLE and ResultTable is not None:
# Create a simple table for the result
table = ResultTable("Selection Result")
# Normalize to list
items = piped_result if isinstance(piped_result, list) else [piped_result]
for item in items:
table.add_result(item)
# Preserve context for further selection
ctx.set_last_result_items_only(items)
print()
print(table.format_plain())
else:
print(piped_result)
except Exception as e:
pipeline_status = "failed"
pipeline_error = str(e)
print(f"[error] Failed to execute pipeline: {e}\n")
import traceback
traceback.print_exc()
finally:
if pipeline_session:
pipeline_session.close(status=pipeline_status, error_msg=pipeline_error)
except Exception as e:
print(f"[error] Failed to execute pipeline: {e}\n")
import traceback
traceback.print_exc()
def _execute_cmdlet(cmd_name: str, args: list):
"""Execute a cmdlet with the given arguments.
Supports @ selection syntax for filtering results from previous commands:
- @2 - select row 2
- @2-5 - select rows 2-5
- @{1,3,5} - select rows 1, 3, 5
"""
try:
from cmdlets import REGISTRY
import json
import pipeline as ctx
# Get the cmdlet function
cmd_fn = REGISTRY.get(cmd_name)
if not cmd_fn:
print(f"Unknown command: {cmd_name}\n")
return
# Load config relative to CLI root
config = _load_cli_config()
# Check for @ selection syntax in arguments
# Extract @N, @N-M, @{N,M,P} syntax and remove from args
filtered_args = []
selected_indices = []
for arg in args:
if arg.startswith('@'):
# Parse selection: @2, @2-5, @{1,3,5}
selection_str = arg[1:] # Remove @
try:
if '{' in selection_str and '}' in selection_str:
# @{1,3,5} format
selection_str = selection_str.strip('{}')
selected_indices = [int(x.strip()) - 1 for x in selection_str.split(',')]
elif '-' in selection_str:
# @2-5 format
parts = selection_str.split('-')
start = int(parts[0]) - 1
end = int(parts[1])
selected_indices = list(range(start, end))
else:
# @2 format
selected_indices = [int(selection_str) - 1]
except (ValueError, IndexError):
# Invalid format, treat as regular arg
# Special case: @"string" should be treated as "string" (stripping @)
# This allows adding new items via @"New Item" syntax
if selection_str.startswith('"') or selection_str.startswith("'"):
filtered_args.append(selection_str.strip('"\''))
else:
filtered_args.append(arg)
else:
filtered_args.append(arg)
# Get piped items from previous command results
piped_items = ctx.get_last_result_items()
pipeline_ctx = ctx.PipelineStageContext(stage_index=0, total_stages=1)
ctx.set_stage_context(pipeline_ctx)
ctx.set_active(True)
# Create result object - pass full list (or filtered list if @ selection used) to cmdlet
result = None
if piped_items:
if selected_indices:
# Filter to selected indices only
result = [piped_items[idx] for idx in selected_indices if 0 <= idx < len(piped_items)]
else:
# No selection specified, pass all items (cmdlets handle lists via normalize_result_input)
result = piped_items
worker_manager = _ensure_worker_manager(config)
stage_session = _begin_worker_stage(
worker_manager=worker_manager,
cmd_name=cmd_name,
stage_tokens=[cmd_name, *filtered_args],
config=config,
command_text=" ".join([cmd_name, *filtered_args]).strip() or cmd_name,
)
stage_status = "completed"
stage_error = ""
# Execute the cmdlet
ctx.set_last_selection(selected_indices)
try:
ret_code = cmd_fn(result, filtered_args, config)
# Print emitted results using ResultTable for structured output
if pipeline_ctx.emits:
if RESULT_TABLE_AVAILABLE and ResultTable is not None and pipeline_ctx.emits:
# Check if these are format objects (from download-data format selection)
# Format objects have format_id and should not be displayed as a table
is_format_selection = False
if pipeline_ctx.emits and len(pipeline_ctx.emits) > 0:
first_emit = pipeline_ctx.emits[0]
if isinstance(first_emit, dict) and 'format_id' in first_emit:
is_format_selection = True
# Skip table display for format selection - user will use @N to select
if is_format_selection:
# Store items for @N selection but don't display table
ctx.set_last_result_items_only(pipeline_ctx.emits)
else:
# Try to format as a table if we have search results
table_title = _get_table_title_for_command(cmd_name, pipeline_ctx.emits)
# Only set source_command for search/filter commands (not display-only or action commands)
# This preserves context so @N refers to the original search, not intermediate results
selectable_commands = {
'search-file', 'download-data', 'search_file', 'download_data',
'.config', '.worker'
}
# Display-only commands (excluding get-tag which manages its own table)
display_only_commands = {
'get-url', 'get_url', 'get-note', 'get_note',
'get-relationship', 'get_relationship', 'get-file', 'get_file',
'check-file-status', 'check_file_status'
}
# Commands that manage their own table/history state (e.g. get-tag)
self_managing_commands = {
'get-tag', 'get_tag', 'tags'
}
if cmd_name in self_managing_commands:
# Command has already set the table and history
# Retrieve the table it set so we print the correct custom formatting
table = ctx.get_last_result_table()
if table is None:
# Fallback if something went wrong
table = ResultTable(table_title)
for emitted in pipeline_ctx.emits:
table.add_result(emitted)
else:
table = ResultTable(table_title)
for emitted in pipeline_ctx.emits:
table.add_result(emitted)
if cmd_name in selectable_commands:
table.set_source_command(cmd_name, filtered_args)
ctx.set_last_result_table(table, pipeline_ctx.emits)
# Clear any stale current_stage_table (e.g. from previous download-data formats)
# This ensures @N refers to these new results, not old format selections
ctx.set_current_stage_table(None)
elif cmd_name in display_only_commands:
# Display-only: show table but preserve search context
ctx.set_last_result_items_only(pipeline_ctx.emits)
else:
# Action commands: update items only without changing current table or history
ctx.set_last_result_items_only(pipeline_ctx.emits)
print()
print(table.format_plain())
# Special case: if this was a youtube search, print a hint about auto-piping
if cmd_name == 'search-file' and filtered_args and 'youtube' in filtered_args:
print("\n[Hint] Type @N to play a video in MPV (e.g. @1)")
else:
# Fallback to raw output if ResultTable not available
for emitted in pipeline_ctx.emits:
if isinstance(emitted, dict):
print(json.dumps(emitted, indent=2))
else:
print(emitted)
# Store emitted items for @ selection
selectable_commands = {
'search-file', 'download-data', 'search_file', 'download_data',
'.config', '.worker'
}
display_only_commands = {
'get-url', 'get_url', 'get-note', 'get_note',
'get-relationship', 'get_relationship', 'get-file', 'get_file',
'check-file-status', 'check_file_status'
}
self_managing_commands = {
'get-tag', 'get_tag', 'tags'
}
if cmd_name in self_managing_commands:
pass # Already handled by cmdlet
elif cmd_name in selectable_commands:
ctx.set_last_result_table(None, pipeline_ctx.emits)
elif cmd_name in display_only_commands:
ctx.set_last_result_items_only(pipeline_ctx.emits)
else:
# Action commands: items only, don't change table/history
ctx.set_last_result_items_only(pipeline_ctx.emits)
if ret_code != 0:
stage_status = "failed"
stage_error = f"exit code {ret_code}"
print(f"[exit code: {ret_code}]\n")
except Exception as e:
stage_status = "failed"
stage_error = f"{type(e).__name__}: {e}"
print(f"[error] {type(e).__name__}: {e}\n")
finally:
ctx.clear_last_selection()
if stage_session:
stage_session.close(status=stage_status, error_msg=stage_error)
except Exception as e:
print(f"[error] Failed to execute cmdlet: {e}\n")
def _show_cmdlet_list():
"""Display available cmdlets with full metadata: cmd:name alias:aliases args:args."""
try:
from cmdlets import REGISTRY
import os
# Collect unique commands by scanning cmdlet modules
cmdlet_info = {}
cmdlets_dir = os.path.join(os.path.dirname(__file__), "cmdlets")
# Iterate through cmdlet files
for filename in os.listdir(cmdlets_dir):
if filename.endswith(".py") and not filename.startswith("_"):
mod_name = filename[:-3]
try:
mod = import_module(f"cmdlets.{mod_name}")
if hasattr(mod, "CMDLET"):
cmdlet = getattr(mod, "CMDLET")
# Extract name, aliases, and args
if hasattr(cmdlet, "name"):
cmd_name = cmdlet.name
aliases = []
if hasattr(cmdlet, "aliases"):
aliases = cmdlet.aliases
# Extract argument names
arg_names = []
if hasattr(cmdlet, "args"):
for arg in cmdlet.args:
if hasattr(arg, "name"):
arg_names.append(arg.name)
elif isinstance(arg, dict):
arg_names.append(arg.get("name", ""))
# Store info (skip if already seen)
if cmd_name not in cmdlet_info:
cmdlet_info[cmd_name] = {
"aliases": aliases,
"args": arg_names,
}
except Exception:
# If we can't import the module, try to get info from REGISTRY
pass
# Also check root-level cmdlets (search_*, etc)
# Note: search_libgen, search_soulseek, and search_debrid are consolidated into search-file with providers
for mod_name in ["select_cmdlet", "unlock_link"]:
try:
mod = import_module(mod_name)
if hasattr(mod, "CMDLET"):
cmdlet = getattr(mod, "CMDLET")
if hasattr(cmdlet, "name"):
cmd_name = cmdlet.name
aliases = []
if hasattr(cmdlet, "aliases"):
aliases = cmdlet.aliases
# Extract argument names
arg_names = []
if hasattr(cmdlet, "args"):
for arg in cmdlet.args:
if hasattr(arg, "name"):
arg_names.append(arg.name)
elif isinstance(arg, dict):
arg_names.append(arg.get("name", ""))
if cmd_name not in cmdlet_info:
cmdlet_info[cmd_name] = {
"aliases": aliases,
"args": arg_names,
}
except Exception:
pass
# Fallback: Show registry entries that we don't have full metadata for
# This ensures all registered cmdlets are shown even if they have import errors
seen_names = set()
for cmd_name in cmdlet_info.keys():
seen_names.add(cmd_name)
# For aliases, add them too
for cmd_name in list(cmdlet_info.keys()):
for alias in cmdlet_info[cmd_name].get("aliases", []):
seen_names.add(alias)
# Now check registry for any missing cmdlets
for reg_name in REGISTRY.keys():
if reg_name not in seen_names:
# Add this as a basic cmdlet entry
# Try to find a matching primary name
found_match = False
for cmd_name in cmdlet_info.keys():
if reg_name in cmdlet_info[cmd_name].get("aliases", []):
found_match = True
break
if not found_match:
# This is a top-level cmdlet not in our collection
cmdlet_info[reg_name] = {
"aliases": [],
"args": [],
}
print("\nAvailable cmdlets:")
for cmd_name in sorted(cmdlet_info.keys()):
info = cmdlet_info[cmd_name]
aliases = info["aliases"]
args = info["args"]
# Build the display string
display = f" cmd:{cmd_name}"
if aliases:
alias_str = ", ".join(aliases)
display += f" alias:{alias_str}"
if args:
args_str = ", ".join(args)
display += f" args:{args_str}"
print(display)
print()
except Exception as e:
print(f"Error: {e}\n")
def _show_cmdlet_help(cmd_name: str):
"""Display help for a cmdlet."""
try:
mod_name = cmd_name.replace("-", "_")
try:
mod = import_module(f"cmdlets.{mod_name}")
data = getattr(mod, "CMDLET", None)
if data:
_print_metadata(cmd_name, data)
return
except ModuleNotFoundError:
pass
from cmdlets import REGISTRY
cmd_fn = REGISTRY.get(cmd_name)
if cmd_fn:
owner = import_module(getattr(cmd_fn, "__module__", ""))
data = getattr(owner, "CMDLET", None)
if data:
_print_metadata(cmd_name, data)
return
print(f"Unknown command: {cmd_name}\n")
except Exception as e:
print(f"Error: {e}\n")
def _print_metadata(cmd_name: str, data):
"""Print cmdlet metadata in PowerShell-style format."""
d = data.to_dict() if hasattr(data, "to_dict") else data
if not isinstance(d, dict):
print(f"Invalid metadata for {cmd_name}\n")
return
name = d.get('name', cmd_name)
summary = d.get("summary", "")
usage = d.get("usage", "")
description = d.get("description", "")
args = d.get("args", [])
details = d.get("details", [])
# NAME section
print(f"\nNAME")
print(f" {name}")
# SYNOPSIS section
print(f"\nSYNOPSIS")
if usage:
# Format usage similar to PowerShell syntax
print(f" {usage}")
else:
print(f" {name}")
# DESCRIPTION section
if summary or description:
print(f"\nDESCRIPTION")
if summary:
print(f" {summary}")
if description:
print(f" {description}")
# PARAMETERS section
if args and isinstance(args, list):
print(f"\nPARAMETERS")
for arg in args:
if isinstance(arg, dict):
name_str = arg.get("name", "?")
typ = arg.get("type", "string")
required = arg.get("required", False)
desc = arg.get("description", "")
else:
name_str = getattr(arg, "name", "?")
typ = getattr(arg, "type", "string")
required = getattr(arg, "required", False)
desc = getattr(arg, "description", "")
# Format: -Name <type> [required flag]
req_marker = "[required]" if required else "[optional]"
print(f" -{name_str} <{typ}>")
if desc:
print(f" {desc}")
print(f" {req_marker}")
print()
# REMARKS/DETAILS section
if details:
print(f"REMARKS")
for detail in details:
print(f" {detail}")
print()
# ============================================================================
# SELECTION UTILITIES - Consolidated from selection_syntax.py and select_utils.py
# ============================================================================
def _parse_selection_syntax(token: str) -> Optional[Set[int]]:
"""Parse @ selection syntax into a set of 1-based indices.
Args:
token: Token starting with @ (e.g., "@2", "@2-5", "@{1,3,5}", "@*", "@3,5,7", "@3-6,8")
Returns:
Set of 1-based indices (for concrete selections like @1, @2-5, @3,5,7)
None for special cases: @* (all), @.. (restore previous)
None for invalid format
Special handling:
- @* returns None and should be handled as "select all current items"
- @.. returns None and is handled as "restore previous table" (separate code path)
- Invalid selections like @-1 or @a return None and are treated as invalid args
Examples:
"@2" {2}
"@2-5" {2, 3, 4, 5}
"@{2,5,6}" {2, 5, 6}
"@2,5,6" {2, 5, 6}
"@2-5,8,10-12" {2, 3, 4, 5, 8, 10, 11, 12}
"@*" None (caller checks token=="@*" to handle as "all")
"@.." None (separate code path)
"""
if not token.startswith("@"):
return None
selector = token[1:].strip()
# Special case: @.. means restore previous result table (handled separately)
# Special case: @* means all items (should be converted to actual list by caller)
if selector in (".", "*"):
return None
indices = set()
# Handle set notation: @{2,5,6,7} (convert to standard format)
if selector.startswith("{") and selector.endswith("}"):
selector = selector[1:-1]
# Handle mixed comma and range notation: @2,5,7-9,10 or @2-5,8,10-12
parts = selector.split(",")
for part in parts:
part = part.strip()
if not part:
continue
try:
if "-" in part:
# Range notation: 2-5 or 7-9
range_parts = part.split("-", 1) # Split on first - only (in case of negative numbers)
if len(range_parts) == 2:
start_str = range_parts[0].strip()
end_str = range_parts[1].strip()
# Make sure both are valid positive integers
if start_str and end_str:
start = int(start_str)
end = int(end_str)
if start > 0 and end > 0 and start <= end:
indices.update(range(start, end + 1))
else:
return None # Invalid range
else:
return None
else:
return None
else:
# Single number
num = int(part)
if num > 0:
indices.add(num)
else:
return None
except (ValueError, AttributeError):
return None
return indices if indices else None
def _filter_items_by_selection(items: List, selection: Optional[Set[int]]) -> List:
"""Filter items by 1-based selection indices.
Args:
items: List of items to filter
selection: Set of 1-based indices, or None for all items
Returns:
Filtered list of items in original order
Examples:
_filter_items_by_selection([a, b, c, d], {2, 4}) [b, d]
_filter_items_by_selection([a, b, c, d], None) [a, b, c, d]
"""
if selection is None or len(selection) == 0:
return items
filtered = []
for i, item in enumerate(items, start=1):
if i in selection:
filtered.append(item)
return filtered
def _parse_line_selection(args: Sequence[str]) -> Set[int]:
"""Parse selection arguments to indices.
Args:
args: Line numbers and ranges (1-indexed)
Examples: ["3"], ["1", "3", "5"], ["1-3"]
Returns:
Set of 0-indexed line numbers to select
Raises:
ValueError: If selection is invalid
"""
selected_indices: Set[int] = set()
for arg in args:
arg = str(arg).strip()
# Check if it's a range (e.g., "1-3")
if '-' in arg and not arg.startswith('-'):
try:
parts = arg.split('-')
if len(parts) == 2:
start = int(parts[0]) - 1 # Convert to 0-indexed
end = int(parts[1]) # End is exclusive in range
for i in range(start, end):
selected_indices.add(i)
else:
raise ValueError(f"Invalid range format: {arg}")
except ValueError as e:
raise ValueError(f"Invalid range: {arg}") from e
else:
# Single line number (1-indexed)
try:
line_num = int(arg)
idx = line_num - 1 # Convert to 0-indexed
selected_indices.add(idx)
except ValueError:
raise ValueError(f"Invalid line number: {arg}")
return selected_indices
def _validate_indices(selected_indices: Set[int], total_lines: int) -> List[str]:
"""Validate indices are within bounds.
Args:
selected_indices: Set of 0-indexed line numbers
total_lines: Total number of available lines
Returns:
List of error messages (empty if all valid)
"""
errors = []
for idx in selected_indices:
if idx < 0 or idx >= total_lines:
errors.append(f"Line {idx + 1} out of range (1-{total_lines})")
return errors
def _select_lines(lines: List[str], selected_indices: Set[int]) -> List[str]:
"""Select specific lines from input.
Args:
lines: List of input lines
selected_indices: Set of 0-indexed line numbers to select
Returns:
List of selected lines in order
"""
selected_indices_sorted = sorted(selected_indices)
return [lines[idx] for idx in selected_indices_sorted]
# Keep helper references so static analyzers treat them as used in this module.
_SELECTION_HELPERS = (
_filter_items_by_selection,
_parse_line_selection,
_validate_indices,
_select_lines,
)
def main():
"""Entry point for the CLI."""
app = _create_cmdlet_cli()
if app:
app()
else:
print("Typer not available")
if __name__ == "__main__":
main()