dfdfdf
This commit is contained in:
166
cli_syntax.py
Normal file
166
cli_syntax.py
Normal file
@@ -0,0 +1,166 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import re
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SyntaxErrorDetail:
|
||||
message: str
|
||||
expected: Optional[str] = None
|
||||
|
||||
|
||||
def validate_pipeline_text(text: str) -> Optional[SyntaxErrorDetail]:
|
||||
"""Validate raw CLI input before tokenization/execution.
|
||||
|
||||
This is intentionally lightweight and focuses on user-facing syntax issues:
|
||||
- Unbalanced single/double quotes
|
||||
- Dangling or empty pipeline stages (|)
|
||||
|
||||
Returns:
|
||||
None if valid, otherwise a SyntaxErrorDetail describing the issue.
|
||||
"""
|
||||
if text is None:
|
||||
return SyntaxErrorDetail("Empty command")
|
||||
|
||||
raw = text.strip()
|
||||
if not raw:
|
||||
return SyntaxErrorDetail("Empty command")
|
||||
|
||||
in_single = False
|
||||
in_double = False
|
||||
escaped = False
|
||||
last_pipe_outside_quotes: Optional[int] = None
|
||||
|
||||
for idx, ch in enumerate(raw):
|
||||
if escaped:
|
||||
escaped = False
|
||||
continue
|
||||
|
||||
if ch == "\\" and (in_single or in_double):
|
||||
escaped = True
|
||||
continue
|
||||
|
||||
if ch == '"' and not in_single:
|
||||
in_double = not in_double
|
||||
continue
|
||||
|
||||
if ch == "'" and not in_double:
|
||||
in_single = not in_single
|
||||
continue
|
||||
|
||||
if ch == "|" and not in_single and not in_double:
|
||||
# Record pipe locations to catch empty stages/dangling pipe.
|
||||
if last_pipe_outside_quotes is not None and last_pipe_outside_quotes == idx - 1:
|
||||
return SyntaxErrorDetail("Syntax error: empty pipeline stage (found '||').")
|
||||
last_pipe_outside_quotes = idx
|
||||
|
||||
if in_double:
|
||||
return SyntaxErrorDetail('Syntax error: missing closing ' + '"' + '.', expected='"')
|
||||
if in_single:
|
||||
return SyntaxErrorDetail("Syntax error: missing closing '.", expected="'")
|
||||
|
||||
# Dangling pipe at end / pipe as first non-space character
|
||||
if raw.startswith("|"):
|
||||
return SyntaxErrorDetail("Syntax error: pipeline cannot start with '|'.")
|
||||
if raw.endswith("|"):
|
||||
return SyntaxErrorDetail("Syntax error: pipeline cannot end with '|'.")
|
||||
|
||||
# Empty stage like "cmd1 | | cmd2" (spaces between pipes)
|
||||
if "|" in raw:
|
||||
# Simple pass: look for pipes that have only whitespace between them.
|
||||
# We only check outside quotes by re-scanning and counting non-space chars between pipes.
|
||||
in_single = False
|
||||
in_double = False
|
||||
escaped = False
|
||||
seen_nonspace_since_pipe = True # start true to allow leading command
|
||||
for ch in raw:
|
||||
if escaped:
|
||||
escaped = False
|
||||
continue
|
||||
if ch == "\\" and (in_single or in_double):
|
||||
escaped = True
|
||||
continue
|
||||
if ch == '"' and not in_single:
|
||||
in_double = not in_double
|
||||
continue
|
||||
if ch == "'" and not in_double:
|
||||
in_single = not in_single
|
||||
continue
|
||||
if ch == "|" and not in_single and not in_double:
|
||||
if not seen_nonspace_since_pipe:
|
||||
return SyntaxErrorDetail("Syntax error: empty pipeline stage (use a command between '|').")
|
||||
seen_nonspace_since_pipe = False
|
||||
continue
|
||||
if not in_single and not in_double and not ch.isspace():
|
||||
seen_nonspace_since_pipe = True
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def parse_query(query: str) -> Dict[str, Any]:
|
||||
"""Parse a query string into field:value pairs and free text.
|
||||
|
||||
Supports syntax like:
|
||||
- isbn:0557677203
|
||||
- author:"Albert Pike"
|
||||
- title:"Morals and Dogma" year:2010
|
||||
- Mixed with free text: Morals isbn:0557677203
|
||||
|
||||
Returns:
|
||||
Dict with keys:
|
||||
- fields: Dict[str, str]
|
||||
- text: str
|
||||
- raw: str
|
||||
"""
|
||||
|
||||
result: Dict[str, Any] = {
|
||||
"fields": {},
|
||||
"text": "",
|
||||
"raw": query,
|
||||
}
|
||||
|
||||
if not query or not query.strip():
|
||||
return result
|
||||
|
||||
raw = query.strip()
|
||||
remaining_parts: list[str] = []
|
||||
|
||||
# Match field:value where value is either a quoted string or a non-space token.
|
||||
pattern = r'(\w+):(?:"([^"]*)"|(\S+))'
|
||||
|
||||
pos = 0
|
||||
for match in re.finditer(pattern, raw):
|
||||
if match.start() > pos:
|
||||
before_text = raw[pos : match.start()].strip()
|
||||
if before_text:
|
||||
remaining_parts.append(before_text)
|
||||
|
||||
field_name = (match.group(1) or "").lower()
|
||||
field_value = match.group(2) if match.group(2) is not None else match.group(3)
|
||||
if field_name:
|
||||
result["fields"][field_name] = field_value
|
||||
|
||||
pos = match.end()
|
||||
|
||||
if pos < len(raw):
|
||||
remaining_text = raw[pos:].strip()
|
||||
if remaining_text:
|
||||
remaining_parts.append(remaining_text)
|
||||
|
||||
result["text"] = " ".join(remaining_parts)
|
||||
return result
|
||||
|
||||
|
||||
def get_field(parsed_query: Dict[str, Any], field_name: str, default: Optional[str] = None) -> Optional[str]:
|
||||
"""Get a field value from a parsed query."""
|
||||
|
||||
return parsed_query.get("fields", {}).get((field_name or "").lower(), default)
|
||||
|
||||
|
||||
def get_free_text(parsed_query: Dict[str, Any]) -> str:
|
||||
"""Get the free-text portion of a parsed query."""
|
||||
|
||||
return str(parsed_query.get("text", "") or "")
|
||||
Reference in New Issue
Block a user