refactor still on going

This commit is contained in:
2026-05-09 11:53:27 -07:00
parent 24f983473f
commit 9f0eb29289
11 changed files with 1772 additions and 1747 deletions
+5 -5
View File
@@ -90,12 +90,12 @@ def _load_helper_modules() -> None:
# Provider-specific module pre-loading removed; providers are loaded lazily
# through ProviderCore.registry when first referenced.
#
# Keep explicit imports for cmdlets that were moved under cmdlet/file so they
# remain registered under their legacy command names (add-note/add-url/add-relationship).
# Keep explicit imports for cmdlets moved into subpackages so they remain
# registered under their legacy command names.
for mod in (
".file.add_note",
".file.add_url",
".file.add_relationship",
".metadata.note_add",
".metadata.url_add",
".metadata.relationship_add",
".metadata.get_note",
".metadata.get_relationship",
):
+2 -367
View File
@@ -1,370 +1,5 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
import sys
import re
from SYS.logger import log
from SYS import pipeline as ctx
from .. import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
QueryArg = sh.QueryArg
SharedArgs = sh.SharedArgs
normalize_hash = sh.normalize_hash
parse_cmdlet_args = sh.parse_cmdlet_args
normalize_result_input = sh.normalize_result_input
should_show_help = sh.should_show_help
class Add_Note(Cmdlet):
DEFAULT_QUERY_HINTS = (
"title:",
"text:",
"hash:",
"caption:",
"sub:",
"subtitle:",
)
def __init__(self) -> None:
super().__init__(
name="add-note",
summary="Add file store note",
usage=
'add-note (-query "title:<title>,text:<text>[,instance:<instance>][,hash:<sha256>]") [ -instance <store> | <piped> ]',
alias=[""],
arg=[
SharedArgs.INSTANCE,
QueryArg(
"hash",
key="hash",
aliases=["sha256"],
type="string",
required=False,
handler=normalize_hash,
description=
"(Optional) Specific file hash target, provided via -query as hash:<sha256>. When omitted, uses piped item hash.",
query_only=True,
),
SharedArgs.QUERY,
],
detail=["""
dde
"""],
exec=self.run,
)
# Populate dynamic store choices for autocomplete
try:
SharedArgs.INSTANCE.choices = SharedArgs.get_store_choices(None)
except Exception:
pass
self.register()
@staticmethod
def _commas_to_spaces_outside_quotes(text: str) -> str:
buf: List[str] = []
quote: Optional[str] = None
escaped = False
for ch in str(text or ""):
if escaped:
buf.append(ch)
escaped = False
continue
if ch == "\\" and quote is not None:
buf.append(ch)
escaped = True
continue
if ch in ('"', "'"):
if quote is None:
quote = ch
elif quote == ch:
quote = None
buf.append(ch)
continue
if ch == "," and quote is None:
buf.append(" ")
continue
buf.append(ch)
return "".join(buf)
@staticmethod
def _parse_note_query(query: str) -> Tuple[Optional[str], Optional[str]]:
"""Parse note payload from -query.
Expected:
title:<title>,text:<text>
Commas are treated as separators when not inside quotes.
"""
raw = str(query or "").strip()
if not raw:
return None, None
try:
from SYS.cli_syntax import parse_query, get_field
except Exception:
parse_query = None # type: ignore
get_field = None # type: ignore
normalized = Add_Note._commas_to_spaces_outside_quotes(raw)
if callable(parse_query) and callable(get_field):
parsed = parse_query(normalized)
name = get_field(parsed, "title")
text = get_field(parsed, "text")
name_s = str(name or "").strip() if name is not None else ""
text_s = str(text or "").strip() if text is not None else ""
return (name_s or None, text_s or None)
# Fallback: best-effort regex.
name_match = re.search(
r"\btitle\s*:\s*([^,\s]+)",
normalized,
flags=re.IGNORECASE
)
text_match = re.search(r"\btext\s*:\s*(.+)$", normalized, flags=re.IGNORECASE)
note_name = name_match.group(1).strip() if name_match else ""
note_text = text_match.group(1).strip() if text_match else ""
return (note_name or None, note_text or None)
@classmethod
def _looks_like_note_query_token(cls, token: Any) -> bool:
text = str(token or "").strip().lower()
if not text:
return False
return any(hint in text for hint in cls.DEFAULT_QUERY_HINTS)
@classmethod
def _default_query_args(cls, args: Sequence[str]) -> List[str]:
tokens: List[str] = list(args or [])
lower_tokens = {str(tok).lower() for tok in tokens if tok is not None}
if "-query" in lower_tokens or "--query" in lower_tokens:
return tokens
for idx, tok in enumerate(tokens):
token_text = str(tok or "")
if not token_text or token_text.startswith("-"):
continue
if not cls._looks_like_note_query_token(token_text):
continue
combined_parts = [token_text]
end = idx + 1
while end < len(tokens):
next_text = str(tokens[end] or "")
if not next_text or next_text.startswith("-"):
break
if not cls._looks_like_note_query_token(next_text):
break
combined_parts.append(next_text)
end += 1
combined_query = " ".join(combined_parts)
tokens[idx:end] = [combined_query]
tokens.insert(idx, "-query")
return tokens
return tokens
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
if should_show_help(args):
log(f"Cmdlet: {self.name}\nSummary: {self.summary}\nUsage: {self.usage}")
return 0
parsed_args = self._default_query_args(args)
parsed = parse_cmdlet_args(parsed_args, self)
store_override = parsed.get("instance")
hash_override = normalize_hash(parsed.get("hash"))
note_name, note_text = self._parse_note_query(str(parsed.get("query") or ""))
note_name = str(note_name or "").strip()
note_text = str(note_text or "").strip()
if not note_name or not note_text:
pass # We now support implicit pipeline notes if -query is missing
# But if explicit targeting (store+hash) is used, we still demand args below.
if hash_override and not store_override:
log(
"[add_note] Error: hash:<sha256> requires instance:<instance> in -query or -instance <store>",
file=sys.stderr,
)
return 1
explicit_target = bool(hash_override and store_override)
results = normalize_result_input(result)
if explicit_target and (not note_name or not note_text):
log(
"[add_note] Error: Explicit target (store+hash) requires -query with title/text",
file=sys.stderr,
)
return 1
if results and explicit_target:
# Direct targeting mode: apply note once to the explicit target and
# pass through any piped items unchanged.
try:
backend, _store_registry, exc = sh.get_store_backend(
config,
str(store_override),
)
if backend is None:
raise exc or KeyError(store_override)
if not bool(getattr(backend, "supports_note_association", False)):
log(
f"[add_note] Error: Store '{store_override}' does not support notes",
file=sys.stderr,
)
return 1
ok = bool(
backend.set_note(
str(hash_override),
note_name,
note_text,
config=config
)
)
if ok:
ctx.print_if_visible(
f"✓ add-note: 1 item in '{store_override}'",
file=sys.stderr
)
log(
"[add_note] Updated 1/1 item(s)",
file=sys.stderr
)
for res in results:
ctx.emit(res)
return 0
log(
"[add_note] Warning: Note write reported failure",
file=sys.stderr
)
return 1
except Exception as exc:
log(f"[add_note] Error: Failed to set note: {exc}", file=sys.stderr)
return 1
if not results:
if explicit_target:
# Allow standalone use (no piped input) and enable piping the target forward.
results = [{
"store": str(store_override),
"hash": hash_override
}]
else:
log(
'[add_note] Error: Requires piped item(s) from add-file, or explicit targeting via store/hash (e.g., -query "instance:<instance> hash:<sha256> ...")',
file=sys.stderr,
)
return 1
store_registry = None
planned_ops = 0
# Batch write plan: store -> [(hash, name, text), ...]
note_ops: Dict[str,
List[Tuple[str,
str,
str]]] = {}
for res in results:
if not isinstance(res, dict):
ctx.emit(res)
continue
# Determine notes to write for this item
notes_to_write: List[Tuple[str, str]] = []
# 1. Explicit arguments always take precedence
if note_name and note_text:
notes_to_write.append((note_name, note_text))
# 2. Pipeline notes auto-ingestion
# Look for 'notes' dictionary in the item (propagated by pipeline/download-file)
# Structure: {'notes': {'lyric': '...', 'sub': '...'}}
# Check both root and nested 'extra'
# Check root 'notes' (dict or extra.notes)
pipeline_notes = res.get("notes")
if not isinstance(pipeline_notes, dict):
extra = res.get("extra")
if isinstance(extra, dict):
pipeline_notes = extra.get("notes")
if isinstance(pipeline_notes, dict):
for k, v in pipeline_notes.items():
# If arg-provided note conflicts effectively with pipeline note?
# We just append both.
if v and str(v).strip():
notes_to_write.append((str(k), str(v)))
if not notes_to_write:
# Pass through items that have nothing to add
ctx.emit(res)
continue
store_name, resolved_hash = sh.resolve_item_store_hash(
res,
override_store=str(store_override) if store_override else None,
override_hash=str(hash_override) if hash_override else None,
path_fields=("path",),
)
if not store_name:
log(
"[add_note] Error: Missing -instance and item has no store field",
file=sys.stderr
)
continue
if not resolved_hash:
log(
"[add_note] Warning: Item missing usable hash; skipping",
file=sys.stderr
)
ctx.emit(res)
continue
# Queue operations
if store_name not in note_ops:
note_ops[store_name] = []
for (n_name, n_text) in notes_to_write:
note_ops[store_name].append((resolved_hash, n_name, n_text))
planned_ops += 1
ctx.emit(res)
# Execute batch operations
def _on_store_error(store_name: str, exc: Exception) -> None:
log(f"[add_note] Store access failed '{store_name}': {exc}", file=sys.stderr)
def _on_unsupported_store(store_name: str) -> None:
log(f"[add_note] Store '{store_name}' does not support notes", file=sys.stderr)
def _on_item_error(store_name: str, hash_value: str, note_name_value: str, exc: Exception) -> None:
log(f"[add_note] Write failed {store_name}:{hash_value} ({note_name_value}): {exc}", file=sys.stderr)
store_registry, success_count = sh.run_store_note_batches(
config,
note_ops,
store_registry=store_registry,
on_store_error=_on_store_error,
on_unsupported_store=_on_unsupported_store,
on_item_error=_on_item_error,
)
if planned_ops > 0:
msg = f"✓ add-note: Updated {success_count}/{planned_ops} notes across {len(note_ops)} stores"
ctx.print_if_visible(msg, file=sys.stderr)
return 0
CMDLET = Add_Note()
"""Compatibility wrapper for moved metadata note add cmdlet."""
from cmdlet.metadata.note_add import * # noqa: F401,F403
File diff suppressed because it is too large Load Diff
+2 -201
View File
@@ -1,204 +1,5 @@
from __future__ import annotations
from typing import Any, Dict, List, Sequence, Tuple
import sys
"""Compatibility wrapper for moved metadata URL add cmdlet."""
from SYS import pipeline as ctx
from .. import _shared as sh
from SYS.logger import log
from Store import Store
class Add_Url(sh.Cmdlet):
"""Add URL associations to files via hash+store."""
def __init__(self) -> None:
super().__init__(
name="add-url",
summary="Associate a URL with a file",
usage="@1 | add-url <url>",
arg=[
sh.SharedArgs.QUERY,
sh.SharedArgs.INSTANCE,
sh.CmdletArg("url",
required=True,
description="URL to associate"),
],
detail=[
"- Associates URL with file identified by hash+store",
"- Multiple url can be comma-separated",
],
exec=self.run,
)
self.register()
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Add URL to file via hash+store backend."""
parsed = sh.parse_cmdlet_args(args, self)
# Compatibility/piping fix:
# `SharedArgs.QUERY` is positional in the shared parser, so `add-url <url>`
# (and `@N | add-url <url>`) can mistakenly parse the URL into `query`.
# If `url` is missing and `query` looks like an http(s) URL, treat it as `url`.
try:
if (not parsed.get("url")) and isinstance(parsed.get("query"), str):
q = str(parsed.get("query") or "").strip()
if q.startswith(("http://", "https://")):
parsed["url"] = q
parsed.pop("query", None)
except Exception:
pass
query_hash, query_valid = sh.require_single_hash_query(
parsed.get("query"),
"Error: -query must be of the form hash:<sha256>",
)
if not query_valid:
return 1
# Bulk input is common in pipelines; treat a list of PipeObjects as a batch.
results: List[Any] = (
result if isinstance(result,
list) else ([result] if result is not None else [])
)
if query_hash and len(results) > 1:
log("Error: -query hash:<sha256> cannot be used with multiple piped items")
return 1
# Extract hash and store from result or args
file_hash = query_hash or (
sh.get_field(result,
"hash") if result is not None else None
)
store_name = parsed.get("instance") or (
sh.get_field(result,
"store") if result is not None else None
)
url_arg = parsed.get("url")
if not url_arg:
try:
inferred = sh.extract_url_from_result(result)
if inferred:
candidate = inferred[0]
if isinstance(candidate, str) and candidate.strip():
url_arg = candidate.strip()
parsed["url"] = url_arg
except Exception:
pass
# If we have multiple piped items, we will resolve hash/store per item below.
if not results:
if not file_hash:
log(
'Error: No file hash provided (pipe an item or use -query "hash:<sha256>")'
)
return 1
if not store_name:
log("Error: No store name provided")
return 1
if not url_arg:
log("Error: No URL provided")
return 1
# Normalize hash (single-item mode)
if not results and file_hash:
file_hash = sh.normalize_hash(file_hash)
if not file_hash:
log("Error: Invalid hash format")
return 1
# Parse url (comma-separated)
urls = [u.strip() for u in str(url_arg).split(",") if u.strip()]
if not urls:
log("Error: No valid url provided")
return 1
# Get backend and add url
try:
storage = Store(config)
# Build batches per store.
store_override = parsed.get("instance")
if results:
def _warn(message: str) -> None:
ctx.print_if_visible(f"[add-url] Warning: {message}", file=sys.stderr)
batch, pass_through = sh.collect_store_hash_value_batch(
results,
store_registry=storage,
value_resolver=lambda _item: list(urls),
override_hash=query_hash,
override_store=store_override,
on_warning=_warn,
)
supported_batch: Dict[str, List[Tuple[str, Sequence[str]]]] = {}
for store_text, store_pairs in batch.items():
backend, storage, _exc = sh.get_store_backend(
config,
store_text,
store_registry=storage,
)
if backend is None:
_warn(f"Store '{store_text}' not configured; skipping")
continue
if not bool(getattr(backend, "supports_url_association", False)):
_warn(f"Store '{store_text}' does not support URLs; skipping")
continue
supported_batch[store_text] = store_pairs
# Execute per-instance batches.
storage, batch_stats = sh.run_store_hash_value_batches(
config,
supported_batch,
bulk_method_name="add_url_bulk",
single_method_name="add_url",
store_registry=storage,
)
for store_text, item_count, _value_count in batch_stats:
ctx.print_if_visible(
f"✓ add-url: {len(urls)} url(s) for {item_count} item(s) in '{store_text}'",
file=sys.stderr,
)
# Pass items through unchanged (but update url field for convenience).
for item in pass_through:
existing = sh.get_field(item, "url")
merged = sh.merge_urls(existing, list(urls))
sh.set_item_urls(item, merged)
ctx.emit(item)
return 0
# Single-item mode
backend, storage, exc = sh.get_store_backend(
config,
str(store_name),
store_registry=storage,
)
if backend is None:
log(f"Error: Storage backend '{store_name}' not configured")
return 1
if not bool(getattr(backend, "supports_url_association", False)):
log(f"Error: Store '{store_name}' does not support URL associations")
return 1
backend.add_url(str(file_hash), urls, config=config)
ctx.print_if_visible(
f"✓ add-url: {len(urls)} url(s) added",
file=sys.stderr
)
if result is not None:
existing = sh.get_field(result, "url")
merged = sh.merge_urls(existing, list(urls))
sh.set_item_urls(result, merged)
ctx.emit(result)
return 0
except Exception as exc:
log(f"Error adding URL: {exc}", file=sys.stderr)
return 1
CMDLET = Add_Url()
from cmdlet.metadata.url_add import * # noqa: F401,F403
+11 -5
View File
@@ -41,6 +41,11 @@ class Get_File(sh.Cmdlet):
"name",
description="Output filename (default: from metadata title)"
),
sh.CmdletArg(
"browser",
flag=True,
description="Open file in browser instead of saving to disk"
),
],
detail=[
"- Exports file from storage backend to local path",
@@ -78,6 +83,7 @@ class Get_File(sh.Cmdlet):
store_name = parsed.get("instance") or sh.get_field(result, "store")
output_path = parsed.get("path")
output_name = parsed.get("name")
browser_flag = bool(parsed.get("browser"))
if not file_hash:
log(
@@ -150,9 +156,9 @@ class Get_File(sh.Cmdlet):
return ""
# Get file from backend (may return Path or URL string depending on backend).
# We pass url=True if no explicit path was provided, which hints the backend
# (specifically Hydrus) to return a browser-friendly URL instead of a local path.
want_url = (output_path is None)
# If -browser is given, request a URL (for Hydrus viewer). If -path is given,
# always retrieve a local file. Otherwise default to local export.
want_url = browser_flag
source_path = backend.get_file(file_hash, url=want_url)
download_url = None
@@ -175,8 +181,8 @@ class Get_File(sh.Cmdlet):
except Exception:
pass
if download_url and output_path is None:
# Hydrus backend returns a URL; open it only when no output path
if download_url and (browser_flag or output_path is None):
# Open in browser: explicit -browser flag, or Hydrus returned a URL with no output path
try:
webbrowser.open(download_url)
except Exception as exc:
+1 -1
View File
@@ -8,7 +8,7 @@ def run_note_action(action: str, result: Any, args: Sequence[str], config: Dict[
act = str(action or "").strip().lower()
if act == "add":
from cmdlet.file.add_note import CMDLET as ADD_NOTE_CMDLET
from cmdlet.metadata.note_add import CMDLET as ADD_NOTE_CMDLET
return int(ADD_NOTE_CMDLET.run(result, args, config))
+370
View File
@@ -0,0 +1,370 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
import sys
import re
from SYS.logger import log
from SYS import pipeline as ctx
from .. import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
QueryArg = sh.QueryArg
SharedArgs = sh.SharedArgs
normalize_hash = sh.normalize_hash
parse_cmdlet_args = sh.parse_cmdlet_args
normalize_result_input = sh.normalize_result_input
should_show_help = sh.should_show_help
class Add_Note(Cmdlet):
DEFAULT_QUERY_HINTS = (
"title:",
"text:",
"hash:",
"caption:",
"sub:",
"subtitle:",
)
def __init__(self) -> None:
super().__init__(
name="add-note",
summary="Add file store note",
usage=
'add-note (-query "title:<title>,text:<text>[,instance:<instance>][,hash:<sha256>]") [ -instance <store> | <piped> ]',
alias=[""],
arg=[
SharedArgs.INSTANCE,
QueryArg(
"hash",
key="hash",
aliases=["sha256"],
type="string",
required=False,
handler=normalize_hash,
description=
"(Optional) Specific file hash target, provided via -query as hash:<sha256>. When omitted, uses piped item hash.",
query_only=True,
),
SharedArgs.QUERY,
],
detail=["""
dde
"""],
exec=self.run,
)
# Populate dynamic store choices for autocomplete
try:
SharedArgs.INSTANCE.choices = SharedArgs.get_store_choices(None)
except Exception:
pass
self.register()
@staticmethod
def _commas_to_spaces_outside_quotes(text: str) -> str:
buf: List[str] = []
quote: Optional[str] = None
escaped = False
for ch in str(text or ""):
if escaped:
buf.append(ch)
escaped = False
continue
if ch == "\\" and quote is not None:
buf.append(ch)
escaped = True
continue
if ch in ('"', "'"):
if quote is None:
quote = ch
elif quote == ch:
quote = None
buf.append(ch)
continue
if ch == "," and quote is None:
buf.append(" ")
continue
buf.append(ch)
return "".join(buf)
@staticmethod
def _parse_note_query(query: str) -> Tuple[Optional[str], Optional[str]]:
"""Parse note payload from -query.
Expected:
title:<title>,text:<text>
Commas are treated as separators when not inside quotes.
"""
raw = str(query or "").strip()
if not raw:
return None, None
try:
from SYS.cli_syntax import parse_query, get_field
except Exception:
parse_query = None # type: ignore
get_field = None # type: ignore
normalized = Add_Note._commas_to_spaces_outside_quotes(raw)
if callable(parse_query) and callable(get_field):
parsed = parse_query(normalized)
name = get_field(parsed, "title")
text = get_field(parsed, "text")
name_s = str(name or "").strip() if name is not None else ""
text_s = str(text or "").strip() if text is not None else ""
return (name_s or None, text_s or None)
# Fallback: best-effort regex.
name_match = re.search(
r"\btitle\s*:\s*([^,\s]+)",
normalized,
flags=re.IGNORECASE
)
text_match = re.search(r"\btext\s*:\s*(.+)$", normalized, flags=re.IGNORECASE)
note_name = name_match.group(1).strip() if name_match else ""
note_text = text_match.group(1).strip() if text_match else ""
return (note_name or None, note_text or None)
@classmethod
def _looks_like_note_query_token(cls, token: Any) -> bool:
text = str(token or "").strip().lower()
if not text:
return False
return any(hint in text for hint in cls.DEFAULT_QUERY_HINTS)
@classmethod
def _default_query_args(cls, args: Sequence[str]) -> List[str]:
tokens: List[str] = list(args or [])
lower_tokens = {str(tok).lower() for tok in tokens if tok is not None}
if "-query" in lower_tokens or "--query" in lower_tokens:
return tokens
for idx, tok in enumerate(tokens):
token_text = str(tok or "")
if not token_text or token_text.startswith("-"):
continue
if not cls._looks_like_note_query_token(token_text):
continue
combined_parts = [token_text]
end = idx + 1
while end < len(tokens):
next_text = str(tokens[end] or "")
if not next_text or next_text.startswith("-"):
break
if not cls._looks_like_note_query_token(next_text):
break
combined_parts.append(next_text)
end += 1
combined_query = " ".join(combined_parts)
tokens[idx:end] = [combined_query]
tokens.insert(idx, "-query")
return tokens
return tokens
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
if should_show_help(args):
log(f"Cmdlet: {self.name}\nSummary: {self.summary}\nUsage: {self.usage}")
return 0
parsed_args = self._default_query_args(args)
parsed = parse_cmdlet_args(parsed_args, self)
store_override = parsed.get("instance")
hash_override = normalize_hash(parsed.get("hash"))
note_name, note_text = self._parse_note_query(str(parsed.get("query") or ""))
note_name = str(note_name or "").strip()
note_text = str(note_text or "").strip()
if not note_name or not note_text:
pass # We now support implicit pipeline notes if -query is missing
# But if explicit targeting (store+hash) is used, we still demand args below.
if hash_override and not store_override:
log(
"[add_note] Error: hash:<sha256> requires instance:<instance> in -query or -instance <store>",
file=sys.stderr,
)
return 1
explicit_target = bool(hash_override and store_override)
results = normalize_result_input(result)
if explicit_target and (not note_name or not note_text):
log(
"[add_note] Error: Explicit target (store+hash) requires -query with title/text",
file=sys.stderr,
)
return 1
if results and explicit_target:
# Direct targeting mode: apply note once to the explicit target and
# pass through any piped items unchanged.
try:
backend, _store_registry, exc = sh.get_store_backend(
config,
str(store_override),
)
if backend is None:
raise exc or KeyError(store_override)
if not bool(getattr(backend, "supports_note_association", False)):
log(
f"[add_note] Error: Store '{store_override}' does not support notes",
file=sys.stderr,
)
return 1
ok = bool(
backend.set_note(
str(hash_override),
note_name,
note_text,
config=config
)
)
if ok:
ctx.print_if_visible(
f"✓ add-note: 1 item in '{store_override}'",
file=sys.stderr
)
log(
"[add_note] Updated 1/1 item(s)",
file=sys.stderr
)
for res in results:
ctx.emit(res)
return 0
log(
"[add_note] Warning: Note write reported failure",
file=sys.stderr
)
return 1
except Exception as exc:
log(f"[add_note] Error: Failed to set note: {exc}", file=sys.stderr)
return 1
if not results:
if explicit_target:
# Allow standalone use (no piped input) and enable piping the target forward.
results = [{
"store": str(store_override),
"hash": hash_override
}]
else:
log(
'[add_note] Error: Requires piped item(s) from add-file, or explicit targeting via store/hash (e.g., -query "instance:<instance> hash:<sha256> ...")',
file=sys.stderr,
)
return 1
store_registry = None
planned_ops = 0
# Batch write plan: store -> [(hash, name, text), ...]
note_ops: Dict[str,
List[Tuple[str,
str,
str]]] = {}
for res in results:
if not isinstance(res, dict):
ctx.emit(res)
continue
# Determine notes to write for this item
notes_to_write: List[Tuple[str, str]] = []
# 1. Explicit arguments always take precedence
if note_name and note_text:
notes_to_write.append((note_name, note_text))
# 2. Pipeline notes auto-ingestion
# Look for 'notes' dictionary in the item (propagated by pipeline/download-file)
# Structure: {'notes': {'lyric': '...', 'sub': '...'}}
# Check both root and nested 'extra'
# Check root 'notes' (dict or extra.notes)
pipeline_notes = res.get("notes")
if not isinstance(pipeline_notes, dict):
extra = res.get("extra")
if isinstance(extra, dict):
pipeline_notes = extra.get("notes")
if isinstance(pipeline_notes, dict):
for k, v in pipeline_notes.items():
# If arg-provided note conflicts effectively with pipeline note?
# We just append both.
if v and str(v).strip():
notes_to_write.append((str(k), str(v)))
if not notes_to_write:
# Pass through items that have nothing to add
ctx.emit(res)
continue
store_name, resolved_hash = sh.resolve_item_store_hash(
res,
override_store=str(store_override) if store_override else None,
override_hash=str(hash_override) if hash_override else None,
path_fields=("path",),
)
if not store_name:
log(
"[add_note] Error: Missing -instance and item has no store field",
file=sys.stderr
)
continue
if not resolved_hash:
log(
"[add_note] Warning: Item missing usable hash; skipping",
file=sys.stderr
)
ctx.emit(res)
continue
# Queue operations
if store_name not in note_ops:
note_ops[store_name] = []
for (n_name, n_text) in notes_to_write:
note_ops[store_name].append((resolved_hash, n_name, n_text))
planned_ops += 1
ctx.emit(res)
# Execute batch operations
def _on_store_error(store_name: str, exc: Exception) -> None:
log(f"[add_note] Store access failed '{store_name}': {exc}", file=sys.stderr)
def _on_unsupported_store(store_name: str) -> None:
log(f"[add_note] Store '{store_name}' does not support notes", file=sys.stderr)
def _on_item_error(store_name: str, hash_value: str, note_name_value: str, exc: Exception) -> None:
log(f"[add_note] Write failed {store_name}:{hash_value} ({note_name_value}): {exc}", file=sys.stderr)
store_registry, success_count = sh.run_store_note_batches(
config,
note_ops,
store_registry=store_registry,
on_store_error=_on_store_error,
on_unsupported_store=_on_unsupported_store,
on_item_error=_on_item_error,
)
if planned_ops > 0:
msg = f"✓ add-note: Updated {success_count}/{planned_ops} notes across {len(note_ops)} stores"
ctx.print_if_visible(msg, file=sys.stderr)
return 0
CMDLET = Add_Note()
+1 -1
View File
@@ -8,7 +8,7 @@ def run_relationship_action(action: str, result: Any, args: Sequence[str], confi
act = str(action or "").strip().lower()
if act == "add":
from cmdlet.file.add_relationship import _run as run_add_relationship
from cmdlet.metadata.relationship_add import _run as run_add_relationship
return int(run_add_relationship(result, args, config))
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -8,7 +8,7 @@ def run_url_action(action: str, result: Any, args: Sequence[str], config: Dict[s
act = str(action or "").strip().lower()
if act == "add":
from cmdlet.file.add_url import CMDLET as ADD_URL_CMDLET
from cmdlet.metadata.url_add import CMDLET as ADD_URL_CMDLET
return int(ADD_URL_CMDLET.run(result, args, config))
+204
View File
@@ -0,0 +1,204 @@
from __future__ import annotations
from typing import Any, Dict, List, Sequence, Tuple
import sys
from SYS import pipeline as ctx
from .. import _shared as sh
from SYS.logger import log
from Store import Store
class Add_Url(sh.Cmdlet):
"""Add URL associations to files via hash+store."""
def __init__(self) -> None:
super().__init__(
name="add-url",
summary="Associate a URL with a file",
usage="@1 | add-url <url>",
arg=[
sh.SharedArgs.QUERY,
sh.SharedArgs.INSTANCE,
sh.CmdletArg("url",
required=True,
description="URL to associate"),
],
detail=[
"- Associates URL with file identified by hash+store",
"- Multiple url can be comma-separated",
],
exec=self.run,
)
self.register()
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Add URL to file via hash+store backend."""
parsed = sh.parse_cmdlet_args(args, self)
# Compatibility/piping fix:
# `SharedArgs.QUERY` is positional in the shared parser, so `add-url <url>`
# (and `@N | add-url <url>`) can mistakenly parse the URL into `query`.
# If `url` is missing and `query` looks like an http(s) URL, treat it as `url`.
try:
if (not parsed.get("url")) and isinstance(parsed.get("query"), str):
q = str(parsed.get("query") or "").strip()
if q.startswith(("http://", "https://")):
parsed["url"] = q
parsed.pop("query", None)
except Exception:
pass
query_hash, query_valid = sh.require_single_hash_query(
parsed.get("query"),
"Error: -query must be of the form hash:<sha256>",
)
if not query_valid:
return 1
# Bulk input is common in pipelines; treat a list of PipeObjects as a batch.
results: List[Any] = (
result if isinstance(result,
list) else ([result] if result is not None else [])
)
if query_hash and len(results) > 1:
log("Error: -query hash:<sha256> cannot be used with multiple piped items")
return 1
# Extract hash and store from result or args
file_hash = query_hash or (
sh.get_field(result,
"hash") if result is not None else None
)
store_name = parsed.get("instance") or (
sh.get_field(result,
"store") if result is not None else None
)
url_arg = parsed.get("url")
if not url_arg:
try:
inferred = sh.extract_url_from_result(result)
if inferred:
candidate = inferred[0]
if isinstance(candidate, str) and candidate.strip():
url_arg = candidate.strip()
parsed["url"] = url_arg
except Exception:
pass
# If we have multiple piped items, we will resolve hash/store per item below.
if not results:
if not file_hash:
log(
'Error: No file hash provided (pipe an item or use -query "hash:<sha256>")'
)
return 1
if not store_name:
log("Error: No store name provided")
return 1
if not url_arg:
log("Error: No URL provided")
return 1
# Normalize hash (single-item mode)
if not results and file_hash:
file_hash = sh.normalize_hash(file_hash)
if not file_hash:
log("Error: Invalid hash format")
return 1
# Parse url (comma-separated)
urls = [u.strip() for u in str(url_arg).split(",") if u.strip()]
if not urls:
log("Error: No valid url provided")
return 1
# Get backend and add url
try:
storage = Store(config)
# Build batches per store.
store_override = parsed.get("instance")
if results:
def _warn(message: str) -> None:
ctx.print_if_visible(f"[add-url] Warning: {message}", file=sys.stderr)
batch, pass_through = sh.collect_store_hash_value_batch(
results,
store_registry=storage,
value_resolver=lambda _item: list(urls),
override_hash=query_hash,
override_store=store_override,
on_warning=_warn,
)
supported_batch: Dict[str, List[Tuple[str, Sequence[str]]]] = {}
for store_text, store_pairs in batch.items():
backend, storage, _exc = sh.get_store_backend(
config,
store_text,
store_registry=storage,
)
if backend is None:
_warn(f"Store '{store_text}' not configured; skipping")
continue
if not bool(getattr(backend, "supports_url_association", False)):
_warn(f"Store '{store_text}' does not support URLs; skipping")
continue
supported_batch[store_text] = store_pairs
# Execute per-instance batches.
storage, batch_stats = sh.run_store_hash_value_batches(
config,
supported_batch,
bulk_method_name="add_url_bulk",
single_method_name="add_url",
store_registry=storage,
)
for store_text, item_count, _value_count in batch_stats:
ctx.print_if_visible(
f"✓ add-url: {len(urls)} url(s) for {item_count} item(s) in '{store_text}'",
file=sys.stderr,
)
# Pass items through unchanged (but update url field for convenience).
for item in pass_through:
existing = sh.get_field(item, "url")
merged = sh.merge_urls(existing, list(urls))
sh.set_item_urls(item, merged)
ctx.emit(item)
return 0
# Single-item mode
backend, storage, exc = sh.get_store_backend(
config,
str(store_name),
store_registry=storage,
)
if backend is None:
log(f"Error: Storage backend '{store_name}' not configured")
return 1
if not bool(getattr(backend, "supports_url_association", False)):
log(f"Error: Store '{store_name}' does not support URL associations")
return 1
backend.add_url(str(file_hash), urls, config=config)
ctx.print_if_visible(
f"✓ add-url: {len(urls)} url(s) added",
file=sys.stderr
)
if result is not None:
existing = sh.get_field(result, "url")
merged = sh.merge_urls(existing, list(urls))
sh.set_item_urls(result, merged)
ctx.emit(result)
return 0
except Exception as exc:
log(f"Error adding URL: {exc}", file=sys.stderr)
return 1
CMDLET = Add_Url()