from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
import sys
import re
from SYS.logger import log
import pipeline as ctx
from . import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
normalize_hash = sh.normalize_hash
parse_cmdlet_args = sh.parse_cmdlet_args
normalize_result_input = sh.normalize_result_input
should_show_help = sh.should_show_help
from Store import Store
from SYS.utils import sha256_file
class Add_Note(Cmdlet):
def __init__(self) -> None:
super().__init__(
name="add-note",
summary="Add file store note",
usage="add-note (-query \"title:
,text:\") [ -store -hash | ]",
alias=[""],
arg=[
SharedArgs.STORE,
CmdletArg("hash", type="string", required=False, description="Target file hash (sha256). When omitted, uses piped item hash."),
SharedArgs.QUERY,
],
detail=[
"""
dde
"""
],
exec=self.run,
)
# Populate dynamic store choices for autocomplete
try:
SharedArgs.STORE.choices = SharedArgs.get_store_choices(None)
except Exception:
pass
self.register()
@staticmethod
def _commas_to_spaces_outside_quotes(text: str) -> str:
buf: List[str] = []
quote: Optional[str] = None
escaped = False
for ch in str(text or ""):
if escaped:
buf.append(ch)
escaped = False
continue
if ch == "\\" and quote is not None:
buf.append(ch)
escaped = True
continue
if ch in ('"', "'"):
if quote is None:
quote = ch
elif quote == ch:
quote = None
buf.append(ch)
continue
if ch == "," and quote is None:
buf.append(" ")
continue
buf.append(ch)
return "".join(buf)
@staticmethod
def _parse_note_query(query: str) -> Tuple[Optional[str], Optional[str]]:
"""Parse note payload from -query.
Expected:
title:,text:
Commas are treated as separators when not inside quotes.
"""
raw = str(query or "").strip()
if not raw:
return None, None
try:
from cli_syntax import parse_query, get_field
except Exception:
parse_query = None # type: ignore
get_field = None # type: ignore
normalized = Add_Note._commas_to_spaces_outside_quotes(raw)
if callable(parse_query) and callable(get_field):
parsed = parse_query(normalized)
name = get_field(parsed, "title")
text = get_field(parsed, "text")
name_s = str(name or "").strip() if name is not None else ""
text_s = str(text or "").strip() if text is not None else ""
return (name_s or None, text_s or None)
# Fallback: best-effort regex.
name_match = re.search(r"\btitle\s*:\s*([^,\s]+)", normalized, flags=re.IGNORECASE)
text_match = re.search(r"\btext\s*:\s*(.+)$", normalized, flags=re.IGNORECASE)
note_name = (name_match.group(1).strip() if name_match else "")
note_text = (text_match.group(1).strip() if text_match else "")
return (note_name or None, note_text or None)
def _resolve_hash(self, raw_hash: Optional[str], raw_path: Optional[str], override_hash: Optional[str]) -> Optional[str]:
resolved = normalize_hash(override_hash) if override_hash else normalize_hash(raw_hash)
if resolved:
return resolved
if raw_path:
try:
p = Path(str(raw_path))
stem = p.stem
if len(stem) == 64 and all(c in "0123456789abcdef" for c in stem.lower()):
return stem.lower()
if p.exists() and p.is_file():
return sha256_file(p)
except Exception:
return None
return None
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
if should_show_help(args):
log(f"Cmdlet: {self.name}\nSummary: {self.summary}\nUsage: {self.usage}")
return 0
parsed = parse_cmdlet_args(args, self)
store_override = parsed.get("store")
hash_override = normalize_hash(parsed.get("hash"))
note_name, note_text = self._parse_note_query(str(parsed.get("query") or ""))
if not note_name or not note_text:
log("[add_note] Error: -query must include title: and text:", file=sys.stderr)
return 1
if hash_override and not store_override:
log("[add_note] Error: -hash requires -store ", file=sys.stderr)
return 1
explicit_target = bool(hash_override and store_override)
results = normalize_result_input(result)
if results and explicit_target:
# Direct targeting mode: apply note once to the explicit target and
# pass through any piped items unchanged.
try:
store_registry = Store(config)
backend = store_registry[str(store_override)]
ok = bool(backend.set_note(str(hash_override), note_name, note_text, config=config))
if ok:
ctx.print_if_visible(f"✓ add-note: 1 item in '{store_override}'", file=sys.stderr)
except Exception as exc:
log(f"[add_note] Error: Failed to set note: {exc}", file=sys.stderr)
return 1
for res in results:
ctx.emit(res)
return 0
if not results:
if explicit_target:
# Allow standalone use (no piped input) and enable piping the target forward.
results = [{"store": str(store_override), "hash": hash_override}]
else:
log("[add_note] Error: Requires piped item(s) from add-file, or explicit -store and -hash ", file=sys.stderr)
return 1
store_registry = Store(config)
updated = 0
# Batch write plan: store -> [(hash, name, text), ...]
note_ops: Dict[str, List[Tuple[str, str, str]]] = {}
for res in results:
if not isinstance(res, dict):
ctx.emit(res)
continue
item_note_text = note_text
store_name = str(store_override or res.get("store") or "").strip()
raw_hash = res.get("hash")
raw_path = res.get("path")
if not store_name:
log("[add_note] Error: Missing -store and item has no store field", file=sys.stderr)
return 1
resolved_hash = self._resolve_hash(
raw_hash=str(raw_hash) if raw_hash else None,
raw_path=str(raw_path) if raw_path else None,
override_hash=str(hash_override) if hash_override else None,
)
if not resolved_hash:
log("[add_note] Warning: Item missing usable hash; skipping", file=sys.stderr)
ctx.emit(res)
continue
try:
backend = store_registry[store_name]
except Exception as exc:
log(f"[add_note] Error: Unknown store '{store_name}': {exc}", file=sys.stderr)
return 1
# Queue for bulk write per store. We still emit items immediately;
# the pipeline only advances after this cmdlet returns.
note_ops.setdefault(store_name, []).append((resolved_hash, note_name, item_note_text))
updated += 1
ctx.emit(res)
# Execute bulk writes per store.
wrote_any = False
for store_name, ops in note_ops.items():
if not ops:
continue
try:
backend = store_registry[store_name]
except Exception:
continue
bulk_fn = getattr(backend, "set_note_bulk", None)
if callable(bulk_fn):
try:
ok = bool(bulk_fn(list(ops), config=config))
wrote_any = wrote_any or ok or True
ctx.print_if_visible(f"✓ add-note: {len(ops)} item(s) in '{store_name}'", file=sys.stderr)
continue
except Exception as exc:
log(f"[add_note] Warning: bulk set_note failed for '{store_name}': {exc}; falling back", file=sys.stderr)
# Fallback: per-item writes
for file_hash, name, text in ops:
try:
ok = bool(backend.set_note(file_hash, name, text, config=config))
wrote_any = wrote_any or ok
except Exception:
continue
log(f"[add_note] Updated {updated} item(s)", file=sys.stderr)
return 0 if (updated > 0 and wrote_any) else (0 if updated > 0 else 1)
CMDLET = Add_Note()