from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, Optional import re @dataclass(frozen=True) class SyntaxErrorDetail: message: str expected: Optional[str] = None def validate_pipeline_text(text: str) -> Optional[SyntaxErrorDetail]: """Validate raw CLI input before tokenization/execution. This is intentionally lightweight and focuses on user-facing syntax issues: - Unbalanced single/double quotes - Dangling or empty pipeline stages (|) Returns: None if valid, otherwise a SyntaxErrorDetail describing the issue. """ if text is None: return SyntaxErrorDetail("Empty command") raw = text.strip() if not raw: return SyntaxErrorDetail("Empty command") in_single = False in_double = False escaped = False last_pipe_outside_quotes: Optional[int] = None for idx, ch in enumerate(raw): if escaped: escaped = False continue if ch == "\\" and (in_single or in_double): escaped = True continue if ch == '"' and not in_single: in_double = not in_double continue if ch == "'" and not in_double: in_single = not in_single continue if ch == "|" and not in_single and not in_double: # Record pipe locations to catch empty stages/dangling pipe. if last_pipe_outside_quotes is not None and last_pipe_outside_quotes == idx - 1: return SyntaxErrorDetail("Syntax error: empty pipeline stage (found '||').") last_pipe_outside_quotes = idx if in_double: return SyntaxErrorDetail('Syntax error: missing closing ' + '"' + '.', expected='"') if in_single: return SyntaxErrorDetail("Syntax error: missing closing '.", expected="'") # Dangling pipe at end / pipe as first non-space character if raw.startswith("|"): return SyntaxErrorDetail("Syntax error: pipeline cannot start with '|'.") if raw.endswith("|"): return SyntaxErrorDetail("Syntax error: pipeline cannot end with '|'.") # Empty stage like "cmd1 | | cmd2" (spaces between pipes) if "|" in raw: # Simple pass: look for pipes that have only whitespace between them. # We only check outside quotes by re-scanning and counting non-space chars between pipes. in_single = False in_double = False escaped = False seen_nonspace_since_pipe = True # start true to allow leading command for ch in raw: if escaped: escaped = False continue if ch == "\\" and (in_single or in_double): escaped = True continue if ch == '"' and not in_single: in_double = not in_double continue if ch == "'" and not in_double: in_single = not in_single continue if ch == "|" and not in_single and not in_double: if not seen_nonspace_since_pipe: return SyntaxErrorDetail("Syntax error: empty pipeline stage (use a command between '|').") seen_nonspace_since_pipe = False continue if not in_single and not in_double and not ch.isspace(): seen_nonspace_since_pipe = True return None def parse_query(query: str) -> Dict[str, Any]: """Parse a query string into field:value pairs and free text. Supports syntax like: - isbn:0557677203 - author:"Albert Pike" - title:"Morals and Dogma" year:2010 - Mixed with free text: Morals isbn:0557677203 Returns: Dict with keys: - fields: Dict[str, str] - text: str - raw: str """ result: Dict[str, Any] = { "fields": {}, "text": "", "raw": query, } if not query or not query.strip(): return result raw = query.strip() remaining_parts: list[str] = [] # Match field:value where value is either a quoted string or a non-space token. pattern = r'(\w+):(?:"([^"]*)"|(\S+))' pos = 0 for match in re.finditer(pattern, raw): if match.start() > pos: before_text = raw[pos : match.start()].strip() if before_text: remaining_parts.append(before_text) field_name = (match.group(1) or "").lower() field_value = match.group(2) if match.group(2) is not None else match.group(3) if field_name: result["fields"][field_name] = field_value pos = match.end() if pos < len(raw): remaining_text = raw[pos:].strip() if remaining_text: remaining_parts.append(remaining_text) result["text"] = " ".join(remaining_parts) return result def get_field(parsed_query: Dict[str, Any], field_name: str, default: Optional[str] = None) -> Optional[str]: """Get a field value from a parsed query.""" return parsed_query.get("fields", {}).get((field_name or "").lower(), default) def get_free_text(parsed_query: Dict[str, Any]) -> str: """Get the free-text portion of a parsed query.""" return str(parsed_query.get("text", "") or "")