from __future__ import annotations from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple import sys import re from SYS.logger import log from SYS import pipeline as ctx from . import _shared as sh Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg QueryArg = sh.QueryArg SharedArgs = sh.SharedArgs normalize_hash = sh.normalize_hash parse_cmdlet_args = sh.parse_cmdlet_args normalize_result_input = sh.normalize_result_input should_show_help = sh.should_show_help from Store import Store from SYS.utils import sha256_file class Add_Note(Cmdlet): DEFAULT_QUERY_HINTS = ( "title:", "text:", "hash:", "caption:", "sub:", "subtitle:", ) def __init__(self) -> None: super().__init__( name="add-note", summary="Add file store note", usage= 'add-note (-query "title:,text:<text>[,store:<store>][,hash:<sha256>]") [ -store <store> | <piped> ]', alias=[""], arg=[ SharedArgs.STORE, QueryArg( "hash", key="hash", aliases=["sha256"], type="string", required=False, handler=normalize_hash, description= "(Optional) Specific file hash target, provided via -query as hash:<sha256>. When omitted, uses piped item hash.", query_only=True, ), SharedArgs.QUERY, ], detail=[""" dde """], exec=self.run, ) # Populate dynamic store choices for autocomplete try: SharedArgs.STORE.choices = SharedArgs.get_store_choices(None) except Exception: pass self.register() @staticmethod def _commas_to_spaces_outside_quotes(text: str) -> str: buf: List[str] = [] quote: Optional[str] = None escaped = False for ch in str(text or ""): if escaped: buf.append(ch) escaped = False continue if ch == "\\" and quote is not None: buf.append(ch) escaped = True continue if ch in ('"', "'"): if quote is None: quote = ch elif quote == ch: quote = None buf.append(ch) continue if ch == "," and quote is None: buf.append(" ") continue buf.append(ch) return "".join(buf) @staticmethod def _parse_note_query(query: str) -> Tuple[Optional[str], Optional[str]]: """Parse note payload from -query. Expected: title:<title>,text:<text> Commas are treated as separators when not inside quotes. """ raw = str(query or "").strip() if not raw: return None, None try: from SYS.cli_syntax import parse_query, get_field except Exception: parse_query = None # type: ignore get_field = None # type: ignore normalized = Add_Note._commas_to_spaces_outside_quotes(raw) if callable(parse_query) and callable(get_field): parsed = parse_query(normalized) name = get_field(parsed, "title") text = get_field(parsed, "text") name_s = str(name or "").strip() if name is not None else "" text_s = str(text or "").strip() if text is not None else "" return (name_s or None, text_s or None) # Fallback: best-effort regex. name_match = re.search( r"\btitle\s*:\s*([^,\s]+)", normalized, flags=re.IGNORECASE ) text_match = re.search(r"\btext\s*:\s*(.+)$", normalized, flags=re.IGNORECASE) note_name = name_match.group(1).strip() if name_match else "" note_text = text_match.group(1).strip() if text_match else "" return (note_name or None, note_text or None) @classmethod def _looks_like_note_query_token(cls, token: Any) -> bool: text = str(token or "").strip().lower() if not text: return False return any(hint in text for hint in cls.DEFAULT_QUERY_HINTS) @classmethod def _default_query_args(cls, args: Sequence[str]) -> List[str]: tokens: List[str] = list(args or []) lower_tokens = {str(tok).lower() for tok in tokens if tok is not None} if "-query" in lower_tokens or "--query" in lower_tokens: return tokens for idx, tok in enumerate(tokens): token_text = str(tok or "") if not token_text or token_text.startswith("-"): continue if not cls._looks_like_note_query_token(token_text): continue combined_parts = [token_text] end = idx + 1 while end < len(tokens): next_text = str(tokens[end] or "") if not next_text or next_text.startswith("-"): break if not cls._looks_like_note_query_token(next_text): break combined_parts.append(next_text) end += 1 combined_query = " ".join(combined_parts) tokens[idx:end] = [combined_query] tokens.insert(idx, "-query") return tokens return tokens def _resolve_hash( self, raw_hash: Optional[str], raw_path: Optional[str], override_hash: Optional[str] ) -> Optional[str]: resolved = normalize_hash(override_hash ) if override_hash else normalize_hash(raw_hash) if resolved: return resolved if raw_path: try: p = Path(str(raw_path)) stem = p.stem if len(stem) == 64 and all(c in "0123456789abcdef" for c in stem.lower()): return stem.lower() if p.exists() and p.is_file(): return sha256_file(p) except Exception: return None return None def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: if should_show_help(args): log(f"Cmdlet: {self.name}\nSummary: {self.summary}\nUsage: {self.usage}") return 0 parsed_args = self._default_query_args(args) parsed = parse_cmdlet_args(parsed_args, self) store_override = parsed.get("store") hash_override = normalize_hash(parsed.get("hash")) note_name, note_text = self._parse_note_query(str(parsed.get("query") or "")) note_name = str(note_name or "").strip() note_text = str(note_text or "").strip() if not note_name or not note_text: log( "[add_note] Error: -query must include title:<title> and text:<text>", file=sys.stderr, ) return 1 if hash_override and not store_override: log( "[add_note] Error: hash:<sha256> requires store:<store> in -query or -store <store>", file=sys.stderr, ) return 1 explicit_target = bool(hash_override and store_override) results = normalize_result_input(result) if results and explicit_target: # Direct targeting mode: apply note once to the explicit target and # pass through any piped items unchanged. try: store_registry = Store(config) backend = store_registry[str(store_override)] ok = bool( backend.set_note( str(hash_override), note_name, note_text, config=config ) ) if ok: ctx.print_if_visible( f"✓ add-note: 1 item in '{store_override}'", file=sys.stderr ) log( "[add_note] Updated 1/1 item(s)", file=sys.stderr ) for res in results: ctx.emit(res) return 0 log( "[add_note] Warning: Note write reported failure", file=sys.stderr ) return 1 except Exception as exc: log(f"[add_note] Error: Failed to set note: {exc}", file=sys.stderr) return 1 if not results: if explicit_target: # Allow standalone use (no piped input) and enable piping the target forward. results = [{ "store": str(store_override), "hash": hash_override }] else: log( '[add_note] Error: Requires piped item(s) from add-file, or explicit targeting via store/hash (e.g., -query "store:<store> hash:<sha256> ...")', file=sys.stderr, ) return 1 store_registry = Store(config) planned_ops = 0 # Batch write plan: store -> [(hash, name, text), ...] note_ops: Dict[str, List[Tuple[str, str, str]]] = {} for res in results: if not isinstance(res, dict): ctx.emit(res) continue item_note_text = note_text store_name = str(store_override or res.get("store") or "").strip() raw_hash = res.get("hash") raw_path = res.get("path") if not store_name: log( "[add_note] Error: Missing -store and item has no store field", file=sys.stderr ) return 1 resolved_hash = self._resolve_hash( raw_hash=str(raw_hash) if raw_hash else None, raw_path=str(raw_path) if raw_path else None, override_hash=str(hash_override) if hash_override else None, ) if not resolved_hash: log( "[add_note] Warning: Item missing usable hash; skipping", file=sys.stderr ) ctx.emit(res) continue try: backend = store_registry[store_name] except Exception as exc: log( f"[add_note] Error: Unknown store '{store_name}': {exc}", file=sys.stderr ) return 1 # Queue for bulk write per store. We still emit items immediately; # the pipeline only advances after this cmdlet returns. note_ops.setdefault(store_name, []).append((resolved_hash, note_name, item_note_text)) planned_ops += 1 ctx.emit(res) # Execute bulk writes per store. successful_writes = 0 for store_name, ops in note_ops.items(): if not ops: continue try: backend = store_registry[store_name] except Exception: continue store_success = 0 bulk_fn = getattr(backend, "set_note_bulk", None) if callable(bulk_fn): try: ok = bool(bulk_fn(list(ops), config=config)) if ok: store_success += len(ops) ctx.print_if_visible( f"✓ add-note: {len(ops)} item(s) in '{store_name}'", file=sys.stderr ) successful_writes += store_success continue log( f"[add_note] Warning: bulk set_note returned False for '{store_name}'", file=sys.stderr, ) except Exception as exc: log( f"[add_note] Warning: bulk set_note failed for '{store_name}': {exc}; falling back", file=sys.stderr, ) # Fallback: per-item writes for file_hash, name, text in ops: try: ok = bool(backend.set_note(file_hash, name, text, config=config)) if ok: store_success += 1 except Exception: continue if store_success: successful_writes += store_success ctx.print_if_visible( f"✓ add-note: {store_success} item(s) in '{store_name}'", file=sys.stderr ) log( f"[add_note] Updated {successful_writes}/{planned_ops} item(s)", file=sys.stderr ) return 0 if successful_writes > 0 else 1 CMDLET = Add_Note()