This commit is contained in:
2026-01-20 03:33:11 -08:00
parent 1f65f9de2a
commit 1e2054189b
4 changed files with 143 additions and 115 deletions

3
.gitignore vendored
View File

@@ -241,4 +241,5 @@ tmp_*
# Ignore local ZeroTier auth tokens (project-local copy)
authtoken.secret
mypy.ini
mypy.
.idea

123
CLI.py
View File

@@ -39,7 +39,7 @@ import uuid
from copy import deepcopy
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, cast
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, cast, Callable
import typer
from prompt_toolkit import PromptSession
@@ -97,7 +97,7 @@ from ProviderCore.registry import provider_inline_query_choices
# Selection parsing and REPL lexer moved to SYS.cli_parsing
from SYS.cli_parsing import SelectionSyntax, SelectionFilterSyntax, MedeiaLexer
from SYS.cli_parsing import Lexer, DRIVE_RE, KEY_PREFIX_RE, TOKEN_PATTERN, SELECTION_RANGE_RE, SelectionSyntax, SelectionFilterSyntax
# SelectionFilterSyntax moved to SYS.cli_parsing (imported above)
@@ -2502,5 +2502,124 @@ Come to love it when others take what you share, as there is no greater joy
stop_zerotier_server()
_PTK_Lexer = object # type: ignore
# Expose a stable name used by the rest of the module
Lexer = _PTK_Lexer
class MedeiaLexer(Lexer):
def lex_document(self, document: "Document") -> Callable[[int], List[Tuple[str, str]]]: # type: ignore[override]
def get_line(lineno: int) -> List[Tuple[str, str]]:
"""Return token list for a single input line (used by prompt-toolkit)."""
line = document.lines[lineno]
tokens: List[Tuple[str, str]] = []
# Using TOKEN_PATTERN precompiled at module scope.
is_cmdlet = True
def _emit_keyed_value(word: str) -> bool:
"""Emit `key:` prefixes (comma-separated) as argument tokens.
Designed for values like:
clip:3m4s-3m14s,1h22m-1h33m,item:2-3
Avoids special-casing URLs (://) and Windows drive paths (C:\\...).
Returns True if it handled the token.
"""
if not word or ":" not in word:
return False
# Avoid URLs and common scheme patterns.
if "://" in word:
return False
# Avoid Windows drive paths (e.g., C:\\foo or D:/bar)
if DRIVE_RE.match(word):
return False
parts = word.split(",")
handled_any = False
for i, part in enumerate(parts):
if i > 0:
tokens.append(("class:value", ","))
if part == "":
continue
m = KEY_PREFIX_RE.match(part)
if m:
tokens.append(("class:argument", m.group(1)))
if m.group(2):
tokens.append(("class:value", m.group(2)))
handled_any = True
else:
tokens.append(("class:value", part))
handled_any = True
return handled_any
for match in TOKEN_PATTERN.finditer(line):
ws, pipe, quote, word = match.groups()
if ws:
tokens.append(("", ws))
continue
if pipe:
tokens.append(("class:pipe", pipe))
is_cmdlet = True
continue
if quote:
# If the quoted token contains a keyed spec (clip:/item:/hash:),
# highlight the `key:` portion in argument-blue even inside quotes.
if len(quote) >= 2 and quote[0] == quote[-1] and quote[0] in ('"', "'"):
q = quote[0]
inner = quote[1:-1]
start_index = len(tokens)
if _emit_keyed_value(inner):
# _emit_keyed_value already appended tokens for inner; insert opening quote
# before that chunk, then add the closing quote.
tokens.insert(start_index, ("class:string", q))
tokens.append(("class:string", q))
is_cmdlet = False
continue
tokens.append(("class:string", quote))
is_cmdlet = False
continue
if not word:
continue
if word.startswith("@"): # selection tokens
rest = word[1:]
if rest and SELECTION_RANGE_RE.fullmatch(rest):
tokens.append(("class:selection_at", "@"))
tokens.append(("class:selection_range", rest))
is_cmdlet = False
continue
if rest and ":" in rest:
tokens.append(("class:selection_at", "@"))
tokens.append(("class:selection_filter", rest))
is_cmdlet = False
continue
if rest == "":
tokens.append(("class:selection_at", "@"))
is_cmdlet = False
continue
if is_cmdlet:
tokens.append(("class:cmdlet", word))
is_cmdlet = False
elif word.startswith("-"):
tokens.append(("class:argument", word))
else:
if not _emit_keyed_value(word):
tokens.append(("class:value", word))
return tokens
return get_line
if __name__ == "__main__":
CLI().run()

View File

@@ -22,6 +22,20 @@ except Exception: # pragma: no cover - optional dependency
# Expose a stable name used by the rest of the module
Lexer = _PTK_Lexer
# Pre-compiled regexes for the lexer (avoid recompiling on every call)
TOKEN_PATTERN = re.compile(
r"""
(\s+) | # 1. Whitespace
(\|) | # 2. Pipe
("(?:[^"\\]|\\.)*"|'(?:[^'\\]|\\.)*') | # 3. Quoted string
([^\s\|]+) # 4. Word
""",
re.VERBOSE,
)
KEY_PREFIX_RE = re.compile(r"^([A-Za-z_][A-Za-z0-9_-]*:)(.*)$")
SELECTION_RANGE_RE = re.compile(r"^[0-9\-\*,]+$")
DRIVE_RE = re.compile(r"^[A-Za-z]:[\\/]")
class SelectionSyntax:
"""Parses @ selection syntax into 1-based indices."""
@@ -349,114 +363,3 @@ class SelectionFilterSyntax:
return True
class MedeiaLexer(Lexer):
def lex_document(self, document: "Document"): # type: ignore[override]
def get_line(lineno: int):
line = document.lines[lineno]
tokens: List[tuple[str, str]] = []
pattern = re.compile(
r"""
(\s+) | # 1. Whitespace
(\|) | # 2. Pipe
("(?:[^"\\]|\\.)*"|'(?:[^'\\]|\\.)*') | # 3. Quoted string
([^\s\|]+) # 4. Word
""",
re.VERBOSE,
)
is_cmdlet = True
def _emit_keyed_value(word: str) -> bool:
"""Emit `key:` prefixes (comma-separated) as argument tokens.
Designed for values like:
clip:3m4s-3m14s,1h22m-1h33m,item:2-3
Avoids special-casing URLs (://) and Windows drive paths (C:\\...).
Returns True if it handled the token.
"""
if not word or ":" not in word:
return False
# Avoid URLs and common scheme patterns.
if "://" in word:
return False
# Avoid Windows drive paths (e.g., C:\\foo or D:/bar)
if re.match(r"^[A-Za-z]:[\\/]", word):
return False
key_prefix = re.compile(r"^([A-Za-z_][A-Za-z0-9_-]*:)(.*)$")
parts = word.split(",")
handled_any = False
for i, part in enumerate(parts):
if i > 0:
tokens.append(("class:value", ","))
if part == "":
continue
m = key_prefix.match(part)
if m:
tokens.append(("class:argument", m.group(1)))
if m.group(2):
tokens.append(("class:value", m.group(2)))
handled_any = True
else:
tokens.append(("class:value", part))
handled_any = True
return handled_any
for match in pattern.finditer(line):
ws, pipe, quote, word = match.groups()
if ws:
tokens.append(("", ws))
continue
if pipe:
tokens.append(("class:pipe", pipe))
is_cmdlet = True
continue
if quote:
# If the quoted token contains a keyed spec (clip:/item:/hash:),
# highlight the `key:` portion in argument-blue even inside quotes.
if len(quote) >= 2 and quote[0] == quote[-1] and quote[0] in ('"', "'"):
q = quote[0]
inner = quote[1:-1]
start_index = len(tokens)
if _emit_keyed_value(inner):
# _emit_keyed_value already appended tokens for inner; insert opening quote
# before that chunk, then add the closing quote.
tokens.insert(start_index, ("class:string", q))
tokens.append(("class:string", q))
is_cmdlet = False
continue
tokens.append(("class:string", quote))
is_cmdlet = False
continue
if not word:
continue
if word.startswith("@"): # selection tokens
rest = word[1:]
if rest and re.fullmatch(r"[0-9\-\*,]+", rest):
tokens.append(("class:selection_at", "@"))
tokens.append(("class:selection_range", rest))
is_cmdlet = False
continue
if rest == "":
tokens.append(("class:selection_at", "@"))
is_cmdlet = False
continue
if is_cmdlet:
tokens.append(("class:cmdlet", word))
is_cmdlet = False
elif word.startswith("-"):
tokens.append(("class:argument", word))
else:
if not _emit_keyed_value(word):
tokens.append(("class:value", word))
return tokens
return get_line

View File

@@ -997,7 +997,12 @@ class PipelineLiveProgress:
# Safe to call whether Live is running or paused.
if self._live is not None:
try:
self._live.stop()
try:
self._live.stop(clear=True)
except TypeError:
self._live.stop()
except Exception:
self._live.stop()
except Exception:
pass