h
This commit is contained in:
460
SYS/cli_parsing.py
Normal file
460
SYS/cli_parsing.py
Normal file
@@ -0,0 +1,460 @@
|
||||
"""CLI parsing helpers moved out of `CLI.py`.
|
||||
|
||||
Contains selection parsing and the REPL lexer so `CLI.py` can be smaller and
|
||||
these pure helpers are easier to test.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
|
||||
# Prompt-toolkit lexer types are optional at import time; fall back to lightweight
|
||||
# stubs if prompt_toolkit is not available so imports remain safe for testing.
|
||||
try:
|
||||
from prompt_toolkit.document import Document
|
||||
from prompt_toolkit.lexers import Lexer
|
||||
except Exception: # pragma: no cover - optional dependency
|
||||
Document = object # type: ignore
|
||||
|
||||
class Lexer: # simple fallback base
|
||||
pass
|
||||
|
||||
|
||||
class SelectionSyntax:
|
||||
"""Parses @ selection syntax into 1-based indices."""
|
||||
|
||||
_RANGE_RE = re.compile(r"^[0-9\-]+$")
|
||||
|
||||
@staticmethod
|
||||
def parse(token: str) -> Optional[Set[int]]:
|
||||
"""Return 1-based indices or None when not a concrete selection.
|
||||
|
||||
Concrete selections:
|
||||
- @2
|
||||
- @2-5
|
||||
- @{1,3,5}
|
||||
- @2,5,7-9
|
||||
|
||||
Special (non-concrete) selectors return None:
|
||||
- @* (select all)
|
||||
- @.. (history prev)
|
||||
- @,, (history next)
|
||||
"""
|
||||
|
||||
if not token or not token.startswith("@"):
|
||||
return None
|
||||
|
||||
selector = token[1:].strip()
|
||||
if selector in (".", ",", "*"):
|
||||
return None
|
||||
|
||||
if selector.startswith("{") and selector.endswith("}"):
|
||||
selector = selector[1:-1].strip()
|
||||
|
||||
indices: Set[int] = set()
|
||||
for part in selector.split(","):
|
||||
part = part.strip()
|
||||
if not part:
|
||||
continue
|
||||
|
||||
if "-" in part:
|
||||
pieces = part.split("-", 1)
|
||||
if len(pieces) != 2:
|
||||
return None
|
||||
start_str = pieces[0].strip()
|
||||
end_str = pieces[1].strip()
|
||||
if not start_str or not end_str:
|
||||
return None
|
||||
try:
|
||||
start = int(start_str)
|
||||
end = int(end_str)
|
||||
except ValueError:
|
||||
return None
|
||||
if start <= 0 or end <= 0 or start > end:
|
||||
return None
|
||||
indices.update(range(start, end + 1))
|
||||
continue
|
||||
|
||||
try:
|
||||
value = int(part)
|
||||
except ValueError:
|
||||
return None
|
||||
if value <= 0:
|
||||
return None
|
||||
indices.add(value)
|
||||
|
||||
return indices if indices else None
|
||||
|
||||
|
||||
class SelectionFilterSyntax:
|
||||
"""Parses and applies @"COL:filter" selection filters.
|
||||
|
||||
Notes:
|
||||
- CLI tokenization (shlex) strips quotes, so a user input of `@"TITLE:foo"`
|
||||
arrives as `@TITLE:foo`. We support both forms.
|
||||
- Filters apply to the *current selectable table items* (in-memory), not to
|
||||
provider searches.
|
||||
"""
|
||||
|
||||
_OP_RE = re.compile(r"^(>=|<=|!=|==|>|<|=)\s*(.+)$")
|
||||
_DUR_TOKEN_RE = re.compile(r"(?i)(\d+)\s*([hms])")
|
||||
|
||||
@staticmethod
|
||||
def parse(token: str) -> Optional[List[Tuple[str, str]]]:
|
||||
"""Return list of (column, raw_expression) or None when not a filter token."""
|
||||
|
||||
if not token or not str(token).startswith("@"):
|
||||
return None
|
||||
|
||||
if token.strip() == "@*":
|
||||
return None
|
||||
|
||||
# If this is a concrete numeric selection (@2, @1-3, @{1,3}), do not treat it as a filter.
|
||||
try:
|
||||
if SelectionSyntax.parse(str(token)) is not None:
|
||||
return None
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
raw = str(token)[1:].strip()
|
||||
if not raw:
|
||||
return None
|
||||
|
||||
# If quotes survived tokenization, strip a single symmetric wrapper.
|
||||
if len(raw) >= 2 and raw[0] == raw[-1] and raw[0] in ('"', "'"):
|
||||
raw = raw[1:-1].strip()
|
||||
|
||||
# Shorthand: @"foo" means Title contains "foo".
|
||||
if ":" not in raw:
|
||||
if raw:
|
||||
return [("Title", raw)]
|
||||
return None
|
||||
|
||||
parts = [p.strip() for p in raw.split(",") if p.strip()]
|
||||
conditions: List[Tuple[str, str]] = []
|
||||
for part in parts:
|
||||
if ":" not in part:
|
||||
return None
|
||||
col, expr = part.split(":", 1)
|
||||
col = str(col or "").strip()
|
||||
expr = str(expr or "").strip()
|
||||
if not col:
|
||||
return None
|
||||
conditions.append((col, expr))
|
||||
|
||||
return conditions if conditions else None
|
||||
|
||||
@staticmethod
|
||||
def _norm_key(text: str) -> str:
|
||||
return re.sub(r"\s+", " ", str(text or "").strip().lower())
|
||||
|
||||
@staticmethod
|
||||
def _item_column_map(item: Any) -> Dict[str, str]:
|
||||
out: Dict[str, str] = {}
|
||||
|
||||
def _set(k: Any, v: Any) -> None:
|
||||
key = SelectionFilterSyntax._norm_key(str(k or ""))
|
||||
if not key:
|
||||
return
|
||||
if v is None:
|
||||
return
|
||||
try:
|
||||
if isinstance(v, (list, tuple, set)):
|
||||
text = ", ".join(str(x) for x in v if x is not None)
|
||||
else:
|
||||
text = str(v)
|
||||
except Exception:
|
||||
return
|
||||
out[key] = text
|
||||
|
||||
if isinstance(item, dict):
|
||||
# Display columns (primary UX surface)
|
||||
cols = item.get("columns")
|
||||
if isinstance(cols, list):
|
||||
for pair in cols:
|
||||
try:
|
||||
if isinstance(pair, (list, tuple)) and len(pair) == 2:
|
||||
_set(pair[0], pair[1])
|
||||
except Exception:
|
||||
continue
|
||||
# Direct keys as fallback
|
||||
for k, v in item.items():
|
||||
if k == "columns":
|
||||
continue
|
||||
_set(k, v)
|
||||
else:
|
||||
cols = getattr(item, "columns", None)
|
||||
if isinstance(cols, list):
|
||||
for pair in cols:
|
||||
try:
|
||||
if isinstance(pair, (list, tuple)) and len(pair) == 2:
|
||||
_set(pair[0], pair[1])
|
||||
except Exception:
|
||||
continue
|
||||
for k in ("title", "path", "detail", "provider", "store", "table"):
|
||||
try:
|
||||
_set(k, getattr(item, k, None))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return out
|
||||
|
||||
@staticmethod
|
||||
def _parse_duration_seconds(text: str) -> Optional[int]:
|
||||
s = str(text or "").strip()
|
||||
if not s:
|
||||
return None
|
||||
|
||||
if s.isdigit():
|
||||
try:
|
||||
return max(0, int(s))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
# clock format: M:SS or H:MM:SS
|
||||
if ":" in s:
|
||||
parts = [p.strip() for p in s.split(":")]
|
||||
if len(parts) == 2 and all(p.isdigit() for p in parts):
|
||||
m, sec = parts
|
||||
return max(0, int(m) * 60 + int(sec))
|
||||
if len(parts) == 3 and all(p.isdigit() for p in parts):
|
||||
h, m, sec = parts
|
||||
return max(0, int(h) * 3600 + int(m) * 60 + int(sec))
|
||||
|
||||
# token format: 1h2m3s (tokens can appear in any combination)
|
||||
total = 0
|
||||
found = False
|
||||
for m in SelectionFilterSyntax._DUR_TOKEN_RE.finditer(s):
|
||||
found = True
|
||||
n = int(m.group(1))
|
||||
unit = m.group(2).lower()
|
||||
if unit == "h":
|
||||
total += n * 3600
|
||||
elif unit == "m":
|
||||
total += n * 60
|
||||
elif unit == "s":
|
||||
total += n
|
||||
if found:
|
||||
return max(0, int(total))
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _parse_float(text: str) -> Optional[float]:
|
||||
s = str(text or "").strip()
|
||||
if not s:
|
||||
return None
|
||||
s = s.replace(",", "")
|
||||
try:
|
||||
return float(s)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _parse_op(expr: str) -> Tuple[Optional[str], str]:
|
||||
text = str(expr or "").strip()
|
||||
if not text:
|
||||
return None, ""
|
||||
m = SelectionFilterSyntax._OP_RE.match(text)
|
||||
if not m:
|
||||
return None, text
|
||||
return m.group(1), str(m.group(2) or "").strip()
|
||||
|
||||
@staticmethod
|
||||
def matches(item: Any, conditions: List[Tuple[str, str]]) -> bool:
|
||||
colmap = SelectionFilterSyntax._item_column_map(item)
|
||||
|
||||
for col, expr in conditions:
|
||||
key = SelectionFilterSyntax._norm_key(col)
|
||||
actual = colmap.get(key)
|
||||
|
||||
# Convenience aliases for common UX names.
|
||||
if actual is None:
|
||||
if key == "duration":
|
||||
actual = colmap.get("duration")
|
||||
elif key == "title":
|
||||
actual = colmap.get("title")
|
||||
|
||||
if actual is None:
|
||||
return False
|
||||
|
||||
op, rhs = SelectionFilterSyntax._parse_op(expr)
|
||||
left_text = str(actual or "").strip()
|
||||
right_text = str(rhs or "").strip()
|
||||
|
||||
if op is None:
|
||||
if not right_text:
|
||||
return False
|
||||
if right_text.lower() not in left_text.lower():
|
||||
return False
|
||||
continue
|
||||
|
||||
# Comparator: try duration parsing first when it looks time-like.
|
||||
prefer_duration = (
|
||||
key == "duration"
|
||||
or any(ch in right_text for ch in (":", "h", "m", "s"))
|
||||
or any(ch in left_text for ch in (":", "h", "m", "s"))
|
||||
)
|
||||
|
||||
left_num: Optional[float] = None
|
||||
right_num: Optional[float] = None
|
||||
|
||||
if prefer_duration:
|
||||
ldur = SelectionFilterSyntax._parse_duration_seconds(left_text)
|
||||
rdur = SelectionFilterSyntax._parse_duration_seconds(right_text)
|
||||
if ldur is not None and rdur is not None:
|
||||
left_num = float(ldur)
|
||||
right_num = float(rdur)
|
||||
|
||||
if left_num is None or right_num is None:
|
||||
left_num = SelectionFilterSyntax._parse_float(left_text)
|
||||
right_num = SelectionFilterSyntax._parse_float(right_text)
|
||||
|
||||
if left_num is not None and right_num is not None:
|
||||
if op in ("=", "=="):
|
||||
if not (left_num == right_num):
|
||||
return False
|
||||
elif op == "!=":
|
||||
if not (left_num != right_num):
|
||||
return False
|
||||
elif op == ">":
|
||||
if not (left_num > right_num):
|
||||
return False
|
||||
elif op == ">=":
|
||||
if not (left_num >= right_num):
|
||||
return False
|
||||
elif op == "<":
|
||||
if not (left_num < right_num):
|
||||
return False
|
||||
elif op == "<=":
|
||||
if not (left_num <= right_num):
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
continue
|
||||
|
||||
# Fallback to string equality for =/!= when numeric parsing fails.
|
||||
if op in ("=", "=="):
|
||||
if left_text.lower() != right_text.lower():
|
||||
return False
|
||||
elif op == "!=":
|
||||
if left_text.lower() == right_text.lower():
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class MedeiaLexer(Lexer):
|
||||
def lex_document(self, document: "Document"): # type: ignore[override]
|
||||
|
||||
def get_line(lineno: int):
|
||||
line = document.lines[lineno]
|
||||
tokens: List[tuple[str, str]] = []
|
||||
|
||||
pattern = re.compile(
|
||||
r"""
|
||||
(\s+) | # 1. Whitespace
|
||||
(\|) | # 2. Pipe
|
||||
("(?:[^"\\]|\\.)*"|'(?:[^'\\]|\\.)*') | # 3. Quoted string
|
||||
([^\s\|]+) # 4. Word
|
||||
""",
|
||||
re.VERBOSE,
|
||||
)
|
||||
|
||||
is_cmdlet = True
|
||||
|
||||
def _emit_keyed_value(word: str) -> bool:
|
||||
"""Emit `key:` prefixes (comma-separated) as argument tokens.
|
||||
|
||||
Designed for values like:
|
||||
clip:3m4s-3m14s,1h22m-1h33m,item:2-3
|
||||
|
||||
Avoids special-casing URLs (://) and Windows drive paths (C:\\...).
|
||||
Returns True if it handled the token.
|
||||
"""
|
||||
if not word or ":" not in word:
|
||||
return False
|
||||
# Avoid URLs and common scheme patterns.
|
||||
if "://" in word:
|
||||
return False
|
||||
# Avoid Windows drive paths (e.g., C:\\foo or D:/bar)
|
||||
if re.match(r"^[A-Za-z]:[\\/]", word):
|
||||
return False
|
||||
|
||||
key_prefix = re.compile(r"^([A-Za-z_][A-Za-z0-9_-]*:)(.*)$")
|
||||
parts = word.split(",")
|
||||
handled_any = False
|
||||
for i, part in enumerate(parts):
|
||||
if i > 0:
|
||||
tokens.append(("class:value", ","))
|
||||
if part == "":
|
||||
continue
|
||||
m = key_prefix.match(part)
|
||||
if m:
|
||||
tokens.append(("class:argument", m.group(1)))
|
||||
if m.group(2):
|
||||
tokens.append(("class:value", m.group(2)))
|
||||
handled_any = True
|
||||
else:
|
||||
tokens.append(("class:value", part))
|
||||
handled_any = True
|
||||
|
||||
return handled_any
|
||||
|
||||
for match in pattern.finditer(line):
|
||||
ws, pipe, quote, word = match.groups()
|
||||
if ws:
|
||||
tokens.append(("", ws))
|
||||
continue
|
||||
if pipe:
|
||||
tokens.append(("class:pipe", pipe))
|
||||
is_cmdlet = True
|
||||
continue
|
||||
if quote:
|
||||
# If the quoted token contains a keyed spec (clip:/item:/hash:),
|
||||
# highlight the `key:` portion in argument-blue even inside quotes.
|
||||
if len(quote) >= 2 and quote[0] == quote[-1] and quote[0] in ('"', "'"):
|
||||
q = quote[0]
|
||||
inner = quote[1:-1]
|
||||
start_index = len(tokens)
|
||||
if _emit_keyed_value(inner):
|
||||
# _emit_keyed_value already appended tokens for inner; insert opening quote
|
||||
# before that chunk, then add the closing quote.
|
||||
tokens.insert(start_index, ("class:string", q))
|
||||
tokens.append(("class:string", q))
|
||||
is_cmdlet = False
|
||||
continue
|
||||
|
||||
tokens.append(("class:string", quote))
|
||||
is_cmdlet = False
|
||||
continue
|
||||
if not word:
|
||||
continue
|
||||
|
||||
if word.startswith("@"): # selection tokens
|
||||
rest = word[1:]
|
||||
if rest and re.fullmatch(r"[0-9\-\*,]+", rest):
|
||||
tokens.append(("class:selection_at", "@"))
|
||||
tokens.append(("class:selection_range", rest))
|
||||
is_cmdlet = False
|
||||
continue
|
||||
if rest == "":
|
||||
tokens.append(("class:selection_at", "@"))
|
||||
is_cmdlet = False
|
||||
continue
|
||||
|
||||
if is_cmdlet:
|
||||
tokens.append(("class:cmdlet", word))
|
||||
is_cmdlet = False
|
||||
elif word.startswith("-"):
|
||||
tokens.append(("class:argument", word))
|
||||
else:
|
||||
if not _emit_keyed_value(word):
|
||||
tokens.append(("class:value", word))
|
||||
|
||||
return tokens
|
||||
|
||||
return get_line
|
||||
@@ -341,7 +341,7 @@ class TUIResultCard:
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResultColumn:
|
||||
class Column:
|
||||
"""Represents a single column in a result table."""
|
||||
|
||||
name: str
|
||||
@@ -361,10 +361,10 @@ class ResultColumn:
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResultRow:
|
||||
class Row:
|
||||
"""Represents a single row in a result table."""
|
||||
|
||||
columns: List[ResultColumn] = field(default_factory=list)
|
||||
columns: List[Column] = field(default_factory=list)
|
||||
selection_args: Optional[List[str]] = None
|
||||
"""Arguments to use for this row when selected via @N syntax (e.g., ['-item', '3'])"""
|
||||
selection_action: Optional[List[str]] = None
|
||||
@@ -398,7 +398,7 @@ class ResultRow:
|
||||
if formatted:
|
||||
str_value = formatted
|
||||
|
||||
self.columns.append(ResultColumn(normalized_name, str_value))
|
||||
self.columns.append(Column(normalized_name, str_value))
|
||||
|
||||
def get_column(self, name: str) -> Optional[str]:
|
||||
"""Get column value by name."""
|
||||
@@ -420,7 +420,7 @@ class ResultRow:
|
||||
return " | ".join(str(col) for col in self.columns)
|
||||
|
||||
|
||||
class ResultTable:
|
||||
class Table:
|
||||
"""Unified table formatter for search results, metadata, and pipeline objects.
|
||||
|
||||
Provides a structured way to display results in the CLI with consistent formatting.
|
||||
@@ -491,7 +491,7 @@ class ResultTable:
|
||||
self.max_columns = (
|
||||
max_columns if max_columns is not None else 5
|
||||
) # Default 5 for cleaner display
|
||||
self.rows: List[ResultRow] = []
|
||||
self.rows: List[Row] = []
|
||||
self.column_widths: Dict[str,
|
||||
int] = {}
|
||||
self.input_options: Dict[str,
|
||||
@@ -503,9 +503,9 @@ class ResultTable:
|
||||
"""Base arguments for the source command"""
|
||||
self.header_lines: List[str] = []
|
||||
"""Optional metadata lines rendered under the title"""
|
||||
self.preserve_order: bool = preserve_order
|
||||
self.perseverance: bool = preserve_order
|
||||
"""If True, skip automatic sorting so display order matches input order."""
|
||||
self.no_choice: bool = False
|
||||
self.interactive: bool = False
|
||||
"""When True, suppress row numbers/selection to make the table non-interactive."""
|
||||
self.table: Optional[str] = None
|
||||
"""Table type (e.g., 'youtube', 'soulseek') for context-aware selection logic."""
|
||||
@@ -516,7 +516,7 @@ class ResultTable:
|
||||
self.value_case: str = "lower"
|
||||
"""Display-only value casing: 'lower' (default), 'upper', or 'preserve'."""
|
||||
|
||||
def set_value_case(self, value_case: str) -> "ResultTable":
|
||||
def set_value_case(self, value_case: str) -> "Table":
|
||||
"""Configure display-only casing for rendered cell values."""
|
||||
case = str(value_case or "").strip().lower()
|
||||
if case not in {"lower",
|
||||
@@ -535,12 +535,12 @@ class ResultTable:
|
||||
return text
|
||||
return text.lower()
|
||||
|
||||
def set_table(self, table: str) -> "ResultTable":
|
||||
def set_table(self, table: str) -> "Table":
|
||||
"""Set the table type for context-aware selection logic."""
|
||||
self.table = table
|
||||
return self
|
||||
|
||||
def set_table_metadata(self, metadata: Optional[Dict[str, Any]]) -> "ResultTable":
|
||||
def set_table_metadata(self, metadata: Optional[Dict[str, Any]]) -> "Table":
|
||||
"""Attach provider/table metadata for downstream selection logic."""
|
||||
self.table_metadata = dict(metadata or {})
|
||||
return self
|
||||
@@ -552,19 +552,19 @@ class ResultTable:
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
def set_no_choice(self, no_choice: bool = True) -> "ResultTable":
|
||||
def _interactive(self, interactive: bool = True) -> "Table":
|
||||
"""Mark the table as non-interactive (no row numbers, no selection parsing)."""
|
||||
self.no_choice = bool(no_choice)
|
||||
self.interactive = bool(interactive)
|
||||
return self
|
||||
|
||||
def set_preserve_order(self, preserve: bool = True) -> "ResultTable":
|
||||
def _perseverance(self, perseverance: bool = True) -> "Table":
|
||||
"""Configure whether this table should skip automatic sorting."""
|
||||
self.preserve_order = bool(preserve)
|
||||
self.perseverance = bool(perseverance)
|
||||
return self
|
||||
|
||||
def add_row(self) -> ResultRow:
|
||||
def add_row(self) -> Row:
|
||||
"""Add a new row to the table and return it for configuration."""
|
||||
row = ResultRow()
|
||||
row = Row()
|
||||
row.source_index = len(self.rows)
|
||||
self.rows.append(row)
|
||||
return row
|
||||
@@ -573,7 +573,7 @@ class ResultTable:
|
||||
self,
|
||||
command: str,
|
||||
args: Optional[List[str]] = None
|
||||
) -> "ResultTable":
|
||||
) -> "Table":
|
||||
"""Set the source command that generated this table.
|
||||
|
||||
This is used for @N expansion: when user runs @2 | next-cmd, it will expand to:
|
||||
@@ -596,7 +596,7 @@ class ResultTable:
|
||||
command: str,
|
||||
args: Optional[List[str]] = None,
|
||||
preserve_order: bool = False,
|
||||
) -> "ResultTable":
|
||||
) -> "Table":
|
||||
"""Initialize table with title, command, args, and preserve_order in one call.
|
||||
|
||||
Consolidates common initialization pattern: ResultTable(title) + set_source_command(cmd, args) + set_preserve_order(preserve_order)
|
||||
@@ -613,10 +613,10 @@ class ResultTable:
|
||||
self.title = title
|
||||
self.source_command = command
|
||||
self.source_args = args or []
|
||||
self.preserve_order = preserve_order
|
||||
self.perseverance = preserve_order
|
||||
return self
|
||||
|
||||
def copy_with_title(self, new_title: str) -> "ResultTable":
|
||||
def copy_with_title(self, new_title: str) -> "Table":
|
||||
"""Create a new table copying settings from this one but with a new title.
|
||||
|
||||
Consolidates pattern: new_table = ResultTable(title); new_table.set_source_command(...)
|
||||
@@ -628,16 +628,16 @@ class ResultTable:
|
||||
Returns:
|
||||
New ResultTable with copied settings and new title
|
||||
"""
|
||||
new_table = ResultTable(
|
||||
new_table = Table(
|
||||
title=new_title,
|
||||
title_width=self.title_width,
|
||||
max_columns=self.max_columns,
|
||||
preserve_order=self.preserve_order,
|
||||
preserve_order=self.perseverance,
|
||||
)
|
||||
new_table.source_command = self.source_command
|
||||
new_table.source_args = list(self.source_args) if self.source_args else []
|
||||
new_table.input_options = dict(self.input_options) if self.input_options else {}
|
||||
new_table.no_choice = self.no_choice
|
||||
new_table.interactive = self.interactive
|
||||
new_table.table = self.table
|
||||
new_table.table_metadata = (
|
||||
dict(self.table_metadata) if getattr(self, "table_metadata", None) else {}
|
||||
@@ -663,12 +663,12 @@ class ResultTable:
|
||||
if 0 <= row_index < len(self.rows):
|
||||
self.rows[row_index].selection_action = selection_action
|
||||
|
||||
def set_header_lines(self, lines: List[str]) -> "ResultTable":
|
||||
def set_header_lines(self, lines: List[str]) -> "Table":
|
||||
"""Attach metadata lines that render beneath the title."""
|
||||
self.header_lines = [line for line in lines if line]
|
||||
return self
|
||||
|
||||
def set_header_line(self, line: str) -> "ResultTable":
|
||||
def set_header_line(self, line: str) -> "Table":
|
||||
"""Attach a single metadata line beneath the title."""
|
||||
return self.set_header_lines([line] if line else [])
|
||||
|
||||
@@ -699,7 +699,7 @@ class ResultTable:
|
||||
self.set_header_line(summary)
|
||||
return summary
|
||||
|
||||
def sort_by_title(self) -> "ResultTable":
|
||||
def sort_by_title(self) -> "Table":
|
||||
"""Sort rows alphabetically by Title or Name column.
|
||||
|
||||
Looks for columns named 'Title', 'Name', or 'Tag' (in that order).
|
||||
@@ -737,7 +737,7 @@ class ResultTable:
|
||||
|
||||
return self
|
||||
|
||||
def add_result(self, result: Any) -> "ResultTable":
|
||||
def add_result(self, result: Any) -> "Table":
|
||||
"""Add a result object (SearchResult, PipeObject, ResultItem, TagItem, or dict) as a row.
|
||||
|
||||
Args:
|
||||
@@ -793,7 +793,7 @@ class ResultTable:
|
||||
return payloads
|
||||
|
||||
@classmethod
|
||||
def from_api_table(cls, api_table: Any) -> "ResultTable":
|
||||
def from_api_table(cls, api_table: Any) -> "Table":
|
||||
"""Convert a strict SYS.result_table_api.ResultTable into an interactive monolith ResultTable.
|
||||
|
||||
This allows providers using the new strict API to benefit from the monolith's
|
||||
@@ -831,7 +831,7 @@ class ResultTable:
|
||||
|
||||
return instance
|
||||
|
||||
def _add_result_model(self, row: ResultRow, result: ResultModel) -> None:
|
||||
def _add_result_model(self, row: Row, result: ResultModel) -> None:
|
||||
"""Extract and add ResultModel fields from the new API to row."""
|
||||
row.add_column("Title", result.title)
|
||||
|
||||
@@ -848,7 +848,7 @@ class ResultTable:
|
||||
# Add a placeholder for metadata-like display if needed in the main table
|
||||
# but usually metadata is handled by the detail panel now
|
||||
|
||||
def _add_search_result(self, row: ResultRow, result: Any) -> None:
|
||||
def _add_search_result(self, row: Row, result: Any) -> None:
|
||||
"""Extract and add SearchResult fields to row."""
|
||||
cols = getattr(result, "columns", None)
|
||||
used_explicit_columns = False
|
||||
@@ -925,7 +925,7 @@ class ResultTable:
|
||||
if selection_action:
|
||||
row.selection_action = [str(a) for a in selection_action if a is not None]
|
||||
|
||||
def _add_result_item(self, row: ResultRow, item: Any) -> None:
|
||||
def _add_result_item(self, row: Row, item: Any) -> None:
|
||||
"""Extract and add ResultItem fields to row (compact display for search results).
|
||||
|
||||
Shows only essential columns:
|
||||
@@ -970,7 +970,7 @@ class ResultTable:
|
||||
if hasattr(item, "size_bytes") and item.size_bytes:
|
||||
row.add_column("Size", _format_size(item.size_bytes, integer_only=False))
|
||||
|
||||
def _add_tag_item(self, row: ResultRow, item: Any) -> None:
|
||||
def _add_tag_item(self, row: Row, item: Any) -> None:
|
||||
"""Extract and add TagItem fields to row (compact tag display).
|
||||
|
||||
Shows the Tag column with the tag name and Source column to identify
|
||||
@@ -986,7 +986,7 @@ class ResultTable:
|
||||
if hasattr(item, "source") and item.source:
|
||||
row.add_column("Store", item.source)
|
||||
|
||||
def _add_pipe_object(self, row: ResultRow, obj: Any) -> None:
|
||||
def _add_pipe_object(self, row: Row, obj: Any) -> None:
|
||||
"""Extract and add PipeObject fields to row."""
|
||||
# Source and identifier
|
||||
if hasattr(obj, "source") and obj.source:
|
||||
@@ -1019,7 +1019,7 @@ class ResultTable:
|
||||
warnings_str += f" (+{len(obj.warnings) - 2} more)"
|
||||
row.add_column("Warnings", warnings_str)
|
||||
|
||||
def _add_dict(self, row: ResultRow, data: Dict[str, Any]) -> None:
|
||||
def _add_dict(self, row: Row, data: Dict[str, Any]) -> None:
|
||||
"""Extract and add dict fields to row using first-match priority groups.
|
||||
|
||||
Respects max_columns limit to keep table compact and readable.
|
||||
@@ -1251,7 +1251,7 @@ class ResultTable:
|
||||
# Don't display it
|
||||
added_fields.add("_selection_args")
|
||||
|
||||
def _add_generic_object(self, row: ResultRow, obj: Any) -> None:
|
||||
def _add_generic_object(self, row: Row, obj: Any) -> None:
|
||||
"""Extract and add fields from generic objects."""
|
||||
if hasattr(obj, "__dict__"):
|
||||
for key, value in obj.__dict__.items():
|
||||
@@ -1282,7 +1282,7 @@ class ResultTable:
|
||||
show_lines=False,
|
||||
)
|
||||
|
||||
if not self.no_choice:
|
||||
if not self.interactive:
|
||||
table.add_column("#", justify="right", no_wrap=True)
|
||||
|
||||
# Render headers in uppercase, but keep original column keys for lookup.
|
||||
@@ -1301,7 +1301,7 @@ class ResultTable:
|
||||
|
||||
for row_idx, row in enumerate(self.rows, 1):
|
||||
cells: List[str] = []
|
||||
if not self.no_choice:
|
||||
if not self.interactive:
|
||||
cells.append(str(row_idx))
|
||||
for name in col_names:
|
||||
val = row.get_column(name) or ""
|
||||
@@ -1381,7 +1381,7 @@ class ResultTable:
|
||||
"""Iterate over rows."""
|
||||
return iter(self.rows)
|
||||
|
||||
def __getitem__(self, index: int) -> ResultRow:
|
||||
def __getitem__(self, index: int) -> Row:
|
||||
"""Get row by index."""
|
||||
return self.rows[index]
|
||||
|
||||
@@ -1410,7 +1410,7 @@ class ResultTable:
|
||||
If accept_args=False: List of 0-based indices, or None if cancelled
|
||||
If accept_args=True: Dict with "indices" and "args" keys, or None if cancelled
|
||||
"""
|
||||
if self.no_choice:
|
||||
if self.interactive:
|
||||
from SYS.rich_display import stdout_console
|
||||
|
||||
stdout_console().print(self)
|
||||
@@ -1494,7 +1494,7 @@ class ResultTable:
|
||||
Returns:
|
||||
List of 0-based indices, or None if invalid
|
||||
"""
|
||||
if self.no_choice:
|
||||
if self.interactive:
|
||||
return None
|
||||
|
||||
indices = set()
|
||||
@@ -1604,7 +1604,7 @@ class ResultTable:
|
||||
"args": cmdlet_args
|
||||
}
|
||||
|
||||
def add_input_option(self, option: InputOption) -> "ResultTable":
|
||||
def add_input_option(self, option: InputOption) -> "Table":
|
||||
"""Add an interactive input option to the table.
|
||||
|
||||
Input options allow users to specify cmdlet arguments interactively,
|
||||
@@ -1708,7 +1708,7 @@ class ResultTable:
|
||||
result[name] = value
|
||||
return result
|
||||
|
||||
def select_by_index(self, index: int) -> Optional[ResultRow]:
|
||||
def select_by_index(self, index: int) -> Optional[Row]:
|
||||
"""Get a row by 1-based index (user-friendly).
|
||||
|
||||
Args:
|
||||
@@ -1740,7 +1740,7 @@ class ResultTable:
|
||||
return rows
|
||||
|
||||
def _format_datatable_row(self,
|
||||
row: ResultRow,
|
||||
row: Row,
|
||||
source: str = "unknown") -> List[str]:
|
||||
"""Format a ResultRow for DataTable display.
|
||||
|
||||
@@ -1769,7 +1769,7 @@ class ResultTable:
|
||||
cards.append(card)
|
||||
return cards
|
||||
|
||||
def _row_to_card(self, row: ResultRow) -> TUIResultCard:
|
||||
def _row_to_card(self, row: Row) -> TUIResultCard:
|
||||
"""Convert a ResultRow to a TUIResultCard.
|
||||
|
||||
Args:
|
||||
@@ -1886,7 +1886,7 @@ def format_result(result: Any, title: str = "") -> str:
|
||||
Returns:
|
||||
Formatted string
|
||||
"""
|
||||
table = ResultTable(title)
|
||||
table = Table(title)
|
||||
|
||||
if isinstance(result, list):
|
||||
for item in result:
|
||||
@@ -1997,7 +1997,7 @@ def extract_item_metadata(item: Any) -> Dict[str, Any]:
|
||||
return out
|
||||
|
||||
|
||||
class ItemDetailView(ResultTable):
|
||||
class ItemDetailView(Table):
|
||||
"""A specialized view that displays item details alongside a list of related items (tags, urls, etc).
|
||||
|
||||
This is used for 'get-tag', 'get-url' and similar cmdlets where we want to contextually show
|
||||
|
||||
@@ -80,7 +80,7 @@ def show_provider_config_panel(
|
||||
provider_names: str | List[str],
|
||||
) -> None:
|
||||
"""Show a Rich panel explaining how to configure providers."""
|
||||
from rich.table import Table
|
||||
from rich.table import Table as RichTable
|
||||
from rich.text import Text
|
||||
from rich.console import Group
|
||||
|
||||
@@ -89,7 +89,7 @@ def show_provider_config_panel(
|
||||
else:
|
||||
providers = provider_names
|
||||
|
||||
table = Table.grid(padding=(0, 1))
|
||||
table = RichTable.grid(padding=(0, 1))
|
||||
table.add_column(style="bold red")
|
||||
|
||||
for provider in providers:
|
||||
@@ -116,7 +116,7 @@ def show_store_config_panel(
|
||||
store_names: str | List[str],
|
||||
) -> None:
|
||||
"""Show a Rich panel explaining how to configure storage backends."""
|
||||
from rich.table import Table
|
||||
from rich.table import Table as RichTable
|
||||
from rich.text import Text
|
||||
from rich.console import Group
|
||||
|
||||
@@ -125,7 +125,7 @@ def show_store_config_panel(
|
||||
else:
|
||||
stores = store_names
|
||||
|
||||
table = Table.grid(padding=(0, 1))
|
||||
table = RichTable.grid(padding=(0, 1))
|
||||
table.add_column(style="bold yellow")
|
||||
|
||||
for store in stores:
|
||||
@@ -269,7 +269,7 @@ def render_item_details_panel(item: Dict[str, Any], *, title: Optional[str] = No
|
||||
|
||||
# Create a specialized view with no results rows (only the metadata panel)
|
||||
# We set no_choice=True to hide the "#" column (not that there are any rows).
|
||||
view = ItemDetailView(item_metadata=metadata, detail_title=title).set_no_choice(True)
|
||||
view = ItemDetailView(item_metadata=metadata, detail_title=title)._interactive(True)
|
||||
# Ensure no title leaks in (prevents an empty "No results" table from rendering).
|
||||
try:
|
||||
view.title = ""
|
||||
|
||||
Reference in New Issue
Block a user