from __future__ import annotations from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple import sys import re from SYS.logger import log import pipeline as ctx from . import _shared as sh Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg QueryArg = sh.QueryArg SharedArgs = sh.SharedArgs normalize_hash = sh.normalize_hash parse_cmdlet_args = sh.parse_cmdlet_args normalize_result_input = sh.normalize_result_input should_show_help = sh.should_show_help from Store import Store from SYS.utils import sha256_file class Add_Note(Cmdlet): def __init__(self) -> None: super().__init__( name="add-note", summary="Add file store note", usage="add-note (-query \"title:,text:<text>[,store:<store>][,hash:<sha256>]\") [ -store <store> | <piped> ]", alias=[""], arg=[ SharedArgs.STORE, QueryArg( "hash", key="hash", aliases=["sha256"], type="string", required=False, handler=normalize_hash, description="(Optional) Specific file hash target, provided via -query as hash:<sha256>. When omitted, uses piped item hash.", query_only=True, ), SharedArgs.QUERY, ], detail=[ """ dde """ ], exec=self.run, ) # Populate dynamic store choices for autocomplete try: SharedArgs.STORE.choices = SharedArgs.get_store_choices(None) except Exception: pass self.register() @staticmethod def _commas_to_spaces_outside_quotes(text: str) -> str: buf: List[str] = [] quote: Optional[str] = None escaped = False for ch in str(text or ""): if escaped: buf.append(ch) escaped = False continue if ch == "\\" and quote is not None: buf.append(ch) escaped = True continue if ch in ('"', "'"): if quote is None: quote = ch elif quote == ch: quote = None buf.append(ch) continue if ch == "," and quote is None: buf.append(" ") continue buf.append(ch) return "".join(buf) @staticmethod def _parse_note_query(query: str) -> Tuple[Optional[str], Optional[str]]: """Parse note payload from -query. Expected: title:<title>,text:<text> Commas are treated as separators when not inside quotes. """ raw = str(query or "").strip() if not raw: return None, None try: from cli_syntax import parse_query, get_field except Exception: parse_query = None # type: ignore get_field = None # type: ignore normalized = Add_Note._commas_to_spaces_outside_quotes(raw) if callable(parse_query) and callable(get_field): parsed = parse_query(normalized) name = get_field(parsed, "title") text = get_field(parsed, "text") name_s = str(name or "").strip() if name is not None else "" text_s = str(text or "").strip() if text is not None else "" return (name_s or None, text_s or None) # Fallback: best-effort regex. name_match = re.search(r"\btitle\s*:\s*([^,\s]+)", normalized, flags=re.IGNORECASE) text_match = re.search(r"\btext\s*:\s*(.+)$", normalized, flags=re.IGNORECASE) note_name = (name_match.group(1).strip() if name_match else "") note_text = (text_match.group(1).strip() if text_match else "") return (note_name or None, note_text or None) def _resolve_hash(self, raw_hash: Optional[str], raw_path: Optional[str], override_hash: Optional[str]) -> Optional[str]: resolved = normalize_hash(override_hash) if override_hash else normalize_hash(raw_hash) if resolved: return resolved if raw_path: try: p = Path(str(raw_path)) stem = p.stem if len(stem) == 64 and all(c in "0123456789abcdef" for c in stem.lower()): return stem.lower() if p.exists() and p.is_file(): return sha256_file(p) except Exception: return None return None def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: if should_show_help(args): log(f"Cmdlet: {self.name}\nSummary: {self.summary}\nUsage: {self.usage}") return 0 parsed = parse_cmdlet_args(args, self) store_override = parsed.get("store") hash_override = normalize_hash(parsed.get("hash")) note_name, note_text = self._parse_note_query(str(parsed.get("query") or "")) if not note_name or not note_text: log("[add_note] Error: -query must include title:<title> and text:<text>", file=sys.stderr) return 1 if hash_override and not store_override: log("[add_note] Error: hash:<sha256> requires store:<store> in -query or -store <store>", file=sys.stderr) return 1 explicit_target = bool(hash_override and store_override) results = normalize_result_input(result) if results and explicit_target: # Direct targeting mode: apply note once to the explicit target and # pass through any piped items unchanged. try: store_registry = Store(config) backend = store_registry[str(store_override)] ok = bool(backend.set_note(str(hash_override), note_name, note_text, config=config)) if ok: ctx.print_if_visible(f"✓ add-note: 1 item in '{store_override}'", file=sys.stderr) except Exception as exc: log(f"[add_note] Error: Failed to set note: {exc}", file=sys.stderr) return 1 for res in results: ctx.emit(res) return 0 if not results: if explicit_target: # Allow standalone use (no piped input) and enable piping the target forward. results = [{"store": str(store_override), "hash": hash_override}] else: log("[add_note] Error: Requires piped item(s) from add-file, or explicit targeting via store/hash (e.g., -query \"store:<store> hash:<sha256> ...\")", file=sys.stderr) return 1 store_registry = Store(config) updated = 0 # Batch write plan: store -> [(hash, name, text), ...] note_ops: Dict[str, List[Tuple[str, str, str]]] = {} for res in results: if not isinstance(res, dict): ctx.emit(res) continue item_note_text = note_text store_name = str(store_override or res.get("store") or "").strip() raw_hash = res.get("hash") raw_path = res.get("path") if not store_name: log("[add_note] Error: Missing -store and item has no store field", file=sys.stderr) return 1 resolved_hash = self._resolve_hash( raw_hash=str(raw_hash) if raw_hash else None, raw_path=str(raw_path) if raw_path else None, override_hash=str(hash_override) if hash_override else None, ) if not resolved_hash: log("[add_note] Warning: Item missing usable hash; skipping", file=sys.stderr) ctx.emit(res) continue try: backend = store_registry[store_name] except Exception as exc: log(f"[add_note] Error: Unknown store '{store_name}': {exc}", file=sys.stderr) return 1 # Queue for bulk write per store. We still emit items immediately; # the pipeline only advances after this cmdlet returns. note_ops.setdefault(store_name, []).append((resolved_hash, note_name, item_note_text)) updated += 1 ctx.emit(res) # Execute bulk writes per store. wrote_any = False for store_name, ops in note_ops.items(): if not ops: continue try: backend = store_registry[store_name] except Exception: continue bulk_fn = getattr(backend, "set_note_bulk", None) if callable(bulk_fn): try: ok = bool(bulk_fn(list(ops), config=config)) wrote_any = wrote_any or ok or True ctx.print_if_visible(f"✓ add-note: {len(ops)} item(s) in '{store_name}'", file=sys.stderr) continue except Exception as exc: log(f"[add_note] Warning: bulk set_note failed for '{store_name}': {exc}; falling back", file=sys.stderr) # Fallback: per-item writes for file_hash, name, text in ops: try: ok = bool(backend.set_note(file_hash, name, text, config=config)) wrote_any = wrote_any or ok except Exception: continue log(f"[add_note] Updated {updated} item(s)", file=sys.stderr) return 0 if (updated > 0 and wrote_any) else (0 if updated > 0 else 1) CMDLET = Add_Note()