cmdlet refactor

This commit is contained in:
2026-05-04 18:41:01 -07:00
parent 3ce339b3c1
commit 24f983473f
44 changed files with 1320 additions and 309 deletions
+21
View File
@@ -0,0 +1,21 @@
from __future__ import annotations
"""Metadata domain command handlers.
This package centralizes routing for metadata sub-domains (tag, url,
relationship, note, inspect) behind the top-level `metadata` cmdlet.
"""
from .tag import run_tag_action
from .url import run_url_action
from .relationship import run_relationship_action
from .note import run_note_action
from .inspect import run_inspect_action
__all__ = [
"run_tag_action",
"run_url_action",
"run_relationship_action",
"run_note_action",
"run_inspect_action",
]
+181
View File
@@ -0,0 +1,181 @@
from __future__ import annotations
from typing import Any, Dict, List, Sequence
import sys
from SYS.logger import log
from SYS.detail_view_helpers import create_detail_view, prepare_detail_metadata
from SYS.payload_builders import build_table_result_payload
from SYS.result_publication import publish_result_table
from SYS.result_table_helpers import add_row_columns
from SYS import pipeline as ctx
from .. import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
normalize_hash = sh.normalize_hash
parse_cmdlet_args = sh.parse_cmdlet_args
normalize_result_input = sh.normalize_result_input
should_show_help = sh.should_show_help
class Get_Note(Cmdlet):
def __init__(self) -> None:
super().__init__(
name="get-note",
summary="List notes on a file in a store.",
usage='get-note -instance <store> [-query "hash:<sha256>"]',
alias=["get-notes",
"get_note"],
arg=[
SharedArgs.INSTANCE,
SharedArgs.QUERY,
],
detail=[
"- Notes are retrieved via the selected store backend.",
"- Lyrics are stored in a note named 'lyric'.",
],
exec=self.run,
)
try:
SharedArgs.INSTANCE.choices = SharedArgs.get_store_choices(None)
except Exception:
pass
self.register()
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
if should_show_help(args):
log(f"Cmdlet: {self.name}\nSummary: {self.summary}\nUsage: {self.usage}")
return 0
parsed = parse_cmdlet_args(args, self)
store_override = parsed.get("instance")
query_hash, query_valid = sh.require_single_hash_query(
parsed.get("query"),
"[get_note] Error: -query must be of the form hash:<sha256>",
log_file=sys.stderr,
)
if not query_valid:
return 1
results = normalize_result_input(result)
if not results:
if store_override and query_hash:
results = [{
"store": str(store_override),
"hash": query_hash
}]
else:
log(
'[get_note] Error: Requires piped item(s) or -instance and -query "hash:<sha256>"',
file=sys.stderr,
)
return 1
store_registry = None
any_notes = False
display_items: List[Dict[str, Any]] = []
# We assume single subject for get-note detail view
main_res = results[0]
metadata = prepare_detail_metadata(main_res)
note_table = create_detail_view(
"Notes",
metadata,
table_name="note",
source_command=("get-note", []),
)
for res in results:
if not isinstance(res, dict):
continue
store_name, resolved_hash = sh.resolve_item_store_hash(
res,
override_store=str(store_override) if store_override else None,
override_hash=str(query_hash) if query_hash else None,
path_fields=("path",),
)
if not store_name:
log(
"[get_note] Error: Missing -instance and item has no store field",
file=sys.stderr
)
return 1
if not resolved_hash:
continue
# Update metadata if we resolved a hash that wasn't in source
if resolved_hash and not metadata.get("Hash"):
metadata["Hash"] = resolved_hash
if store_name and not metadata.get("Store"):
metadata["Store"] = store_name
backend, store_registry, exc = sh.get_store_backend(
config,
store_name,
store_registry=store_registry,
)
if backend is None:
log(
f"[get_note] Error: Unknown store '{store_name}': {exc}",
file=sys.stderr
)
return 1
notes = {}
try:
notes = backend.get_note(
resolved_hash,
config=config
) or {}
except Exception:
notes = {}
if not notes:
continue
any_notes = True
# Emit each note as its own row so CLI renders a proper note table
for k in sorted(notes.keys(), key=lambda x: str(x).lower()):
v = notes.get(k)
raw_text = str(v or "")
# Keep payload small for IPC/pipes.
raw_text = raw_text[:999]
preview = " ".join(raw_text.replace("\r", "").split("\n"))
payload = build_table_result_payload(
columns=[
("Name", str(k)),
("Text", preview.strip()),
],
store=store_name,
hash=resolved_hash,
note_name=str(k),
note_text=raw_text,
)
display_items.append(payload)
if note_table is not None:
add_row_columns(
note_table,
[("Name", str(k)), ("Text", preview.strip())],
)
ctx.emit(payload)
# Always set the table overlay even if empty to show item details
publish_result_table(ctx, note_table, display_items, subject=result, overlay=True)
if not any_notes:
log("No notes found.")
return 0
CMDLET = Get_Note()
+392
View File
@@ -0,0 +1,392 @@
from __future__ import annotations
from typing import Any, Dict, Sequence, Optional
import sys
from SYS.detail_view_helpers import create_detail_view, prepare_detail_metadata
from SYS.logger import log
from ProviderCore.registry import get_plugin
from SYS.result_table_helpers import add_row_columns
from SYS.selection_builder import build_hash_store_selection
from SYS.result_publication import publish_result_table
from SYS import pipeline as ctx
from .. import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
normalize_hash = sh.normalize_hash
fmt_bytes = sh.fmt_bytes
get_hash_for_operation = sh.get_hash_for_operation
fetch_hydrus_metadata = sh.fetch_hydrus_metadata
should_show_help = sh.should_show_help
get_field = sh.get_field
CMDLET = Cmdlet(
name="get-relationship",
summary="Print relationships for the selected file (Hydrus).",
usage='get-relationship [-query "hash:<sha256>"]',
alias=[],
arg=[
SharedArgs.QUERY,
SharedArgs.INSTANCE,
],
detail=[
"- Lists relationship data as returned by Hydrus.",
],
)
def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int:
# Help
if should_show_help(_args):
log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}")
return 0
# Parse -query and -instance override
override_query: str | None = None
override_store: str | None = None
args_list = list(_args)
i = 0
while i < len(args_list):
a = args_list[i]
low = str(a).lower()
if low in {"-query", "--query", "query"} and i + 1 < len(args_list):
override_query = str(args_list[i + 1]).strip()
i += 2
continue
if low in {"-instance", "--instance"} and i + 1 < len(args_list):
override_store = str(args_list[i + 1]).strip()
i += 2
continue
i += 1
override_hash, query_valid = sh.require_single_hash_query(
override_query,
'get-relationship requires -query "hash:<sha256>"',
log_file=sys.stderr,
)
if not query_valid:
return 1
# Handle @N selection which creates a list
if isinstance(result, list):
if len(result) == 0:
result = None
elif len(result) > 1 and not override_hash:
log(
'get-relationship expects a single item; select one row (e.g. @1) or pass -query "hash:<sha256>"',
file=sys.stderr,
)
return 1
else:
result = result[0]
# Initialize results collection
found_relationships = [] # List of dicts: {hash, type, title, path, store}
source_title = "Unknown"
# Store/hash-first subject resolution
store_name: Optional[str] = override_store
if not store_name:
store_name = get_field(result, "store")
hash_hex = (
normalize_hash(override_hash)
if override_hash else normalize_hash(get_hash_for_operation(None, result))
)
if not source_title or source_title == "Unknown":
source_title = (
get_field(result, "title") or get_field(result, "name")
or (hash_hex[:16] + "..." if hash_hex else "Unknown")
)
if not hash_hex:
log('get-relationship requires -query "hash:<sha256>"', file=sys.stderr)
return 1
# Fetch Hydrus relationships if we have a hash.
hydrus_provider = get_plugin("hydrusnetwork", config)
hash_hex = (
normalize_hash(override_hash)
if override_hash else normalize_hash(get_hash_for_operation(None,
result))
)
if hash_hex:
try:
store_label = "hydrus"
if store_name:
store_label = str(store_name)
if hydrus_provider is None:
log(
f"Hydrus client unavailable for store '{store_name}'",
file=sys.stderr
)
return 1
relationships = hydrus_provider.get_relationships(hash_hex, store_name=store_name)
else:
relationships = hydrus_provider.get_relationships(hash_hex) if hydrus_provider is not None else None
def _resolve_related_title(rel_hash: str) -> str:
"""Best-effort resolve a Hydrus hash to a human title.
Preference order:
- title: tag from the backend (fast path)
- Hydrus metadata tags via fetch_hydrus_metadata
- fallback to short hash
"""
h = normalize_hash(rel_hash)
if not h:
return str(rel_hash)
# Prefer provider-backed title resolution when available.
if hydrus_provider is not None:
try:
resolved_title = hydrus_provider.get_title(
h,
store_name=store_label if store_name else None,
)
if isinstance(resolved_title, str) and resolved_title.strip():
return resolved_title.strip()
except Exception:
pass
# Fallback: fetch minimal metadata and scan for a title tag.
try:
meta, _ = fetch_hydrus_metadata(
config,
h,
store_name=store_label if store_name else None,
include_service_keys_to_tags=True,
include_file_url=False,
include_duration=False,
include_size=False,
include_mime=False,
)
if isinstance(meta, dict):
tags_payload = meta.get("tags")
tag_candidates: list[str] = []
if isinstance(tags_payload, dict):
for svc_data in tags_payload.values():
if not isinstance(svc_data, dict):
continue
storage = svc_data.get("storage_tags")
if isinstance(storage, dict):
for group in storage.values():
if isinstance(group, list):
tag_candidates.extend(
[
str(x) for x in group
if isinstance(x, str)
]
)
display = svc_data.get("display_tags")
if isinstance(display, list):
tag_candidates.extend(
[str(x) for x in display if isinstance(x, str)]
)
flat = meta.get("tags_flat")
if isinstance(flat, list):
tag_candidates.extend(
[str(x) for x in flat if isinstance(x, str)]
)
for t in tag_candidates:
if isinstance(t, str) and t.lower().startswith("title:"):
val = t.split(":", 1)[1].strip()
if val:
return val
except Exception:
pass
return h[:16] + "..."
if relationships:
file_rels = relationships.get("file_relationships",
{})
this_file_rels = file_rels.get(hash_hex)
if this_file_rels:
# Map Hydrus relationship IDs to names.
# For /manage_file_relationships/get_file_relationships, the Hydrus docs define:
# 0=potential duplicates, 1=false positives, 3=alternates, 8=duplicates
# Additionally, this endpoint includes metadata keys like 'king'/'is_king'.
rel_map = {
"0": "potential",
"1": "false positive",
"3": "alternate",
"8": "duplicate",
}
for rel_type_id, rel_value in this_file_rels.items():
key = str(rel_type_id)
# Handle metadata keys explicitly.
if key in {"is_king",
"king_is_on_file_domain",
"king_is_local"}:
continue
# Some Hydrus responses provide a direct king hash under the 'king' key.
if key == "king":
king_hash = (
normalize_hash(rel_value)
if isinstance(rel_value,
str) else None
)
if king_hash and king_hash != hash_hex:
if not any(str(r.get("hash",
"")).lower() == king_hash
for r in found_relationships):
found_relationships.append(
{
"hash": king_hash,
"type": "king",
"title":
_resolve_related_title(king_hash),
"path": None,
"store": store_label,
}
)
continue
rel_name = rel_map.get(key, f"type-{key}")
# The relationship value is typically a list of hashes.
if isinstance(rel_value, list):
for rel_hash in rel_value:
rel_hash_norm = (
normalize_hash(rel_hash)
if isinstance(rel_hash,
str) else None
)
if not rel_hash_norm or rel_hash_norm == hash_hex:
continue
if not any(str(r.get("hash",
"")).lower() == rel_hash_norm
for r in found_relationships):
found_relationships.append(
{
"hash":
rel_hash_norm,
"type":
rel_name,
"title":
_resolve_related_title(rel_hash_norm),
"path":
None,
"store":
store_label,
}
)
# Defensive: sometimes the API may return a single hash string.
elif isinstance(rel_value, str):
rel_hash_norm = normalize_hash(rel_value)
if rel_hash_norm and rel_hash_norm != hash_hex:
if not any(str(r.get("hash",
"")).lower() == rel_hash_norm
for r in found_relationships):
found_relationships.append(
{
"hash":
rel_hash_norm,
"type":
rel_name,
"title":
_resolve_related_title(rel_hash_norm),
"path":
None,
"store":
store_label,
}
)
except Exception as exc:
# Only log error if we didn't find local relationships either
if not found_relationships:
log(f"Hydrus relationships fetch failed: {exc}", file=sys.stderr)
# Display results
metadata = prepare_detail_metadata(
result,
title=(source_title if source_title and source_title != "Unknown" else None),
hash_value=hash_hex,
)
table = create_detail_view(
"Relationships",
metadata,
init_command=("get-relationship", []),
value_case=None,
perseverance=False,
)
# Sort by type then title
# Custom sort order: King first, then Derivative, then others
def type_sort_key(item):
t = item["type"].lower()
if t == "king":
return 0
elif t == "derivative":
return 1
elif t in {"alternative",
"alternate",
"alt"}:
return 2
elif t == "duplicate":
return 3
else:
return 4
found_relationships.sort(key=lambda x: (type_sort_key(x), x["title"]))
pipeline_results = []
for i, item in enumerate(found_relationships):
add_row_columns(
table,
[
("Type", item["type"].title()),
("Title", item["title"]),
("Store", item["store"]),
],
)
# Create result object for pipeline
res_obj = {
"title": item["title"],
"hash": item["hash"],
"file_hash": item["hash"],
"relationship_type": item["type"],
"store": item["store"],
}
# Target is always hash in store/hash-first mode
res_obj["target"] = item["hash"]
pipeline_results.append(res_obj)
# Set selection args
selection_args, _selection_action = build_hash_store_selection(
item["hash"],
item["store"],
)
if selection_args:
table.set_row_selection_args(i, selection_args)
# Ensure empty state is still navigable/visible
publish_result_table(ctx, table, pipeline_results, overlay=True)
from SYS.rich_display import stdout_console
stdout_console().print(table)
if not found_relationships:
log("No relationships found.")
return 0
CMDLET.exec = _run
CMDLET.register()
+13
View File
@@ -0,0 +1,13 @@
from __future__ import annotations
from typing import Any, Dict, Sequence
def run_inspect_action(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Route metadata inspect to get-metadata implementation."""
from cmdlet.get_metadata import CMDLET as GET_METADATA_CMDLET
exec_fn = getattr(GET_METADATA_CMDLET, "exec", None)
if callable(exec_fn):
return int(exec_fn(result, args, config))
return int(GET_METADATA_CMDLET.run(result, args, config))
+25
View File
@@ -0,0 +1,25 @@
from __future__ import annotations
from typing import Any, Dict, Sequence
def run_note_action(action: str, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Route metadata note actions to note cmdlets."""
act = str(action or "").strip().lower()
if act == "add":
from cmdlet.file.add_note import CMDLET as ADD_NOTE_CMDLET
return int(ADD_NOTE_CMDLET.run(result, args, config))
if act == "delete":
from cmdlet.delete_note import CMDLET as DELETE_NOTE_CMDLET
return int(DELETE_NOTE_CMDLET.run(result, args, config))
if act == "get":
from cmdlet.metadata.get_note import CMDLET as GET_NOTE_CMDLET
return int(GET_NOTE_CMDLET.run(result, args, config))
return 1
+25
View File
@@ -0,0 +1,25 @@
from __future__ import annotations
from typing import Any, Dict, Sequence
def run_relationship_action(action: str, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Route metadata relationship actions to relationship cmdlets."""
act = str(action or "").strip().lower()
if act == "add":
from cmdlet.file.add_relationship import _run as run_add_relationship
return int(run_add_relationship(result, args, config))
if act == "delete":
from cmdlet.delete_relationship import _run as run_delete_relationship
return int(run_delete_relationship(list(args), config))
if act == "get":
from cmdlet.metadata.get_relationship import _run as run_get_relationship
return int(run_get_relationship(result, args, config))
return 1
+25
View File
@@ -0,0 +1,25 @@
from __future__ import annotations
from typing import Any, Dict, Sequence
def run_tag_action(action: str, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Route metadata tag actions to the existing tag implementations."""
act = str(action or "").strip().lower()
if act == "add":
from cmdlet.metadata.tag_add import Add_Tag
return Add_Tag(register_cmdlet=False).run(result, args, config)
if act == "delete":
from cmdlet.metadata.tag_delete import _run as run_delete
return run_delete(result, args, config)
if act == "get":
from cmdlet.metadata.tag_get import _run as run_get
return run_get(result, args, config)
return 1
File diff suppressed because it is too large Load Diff
+805
View File
@@ -0,0 +1,805 @@
from __future__ import annotations
from typing import Any, Dict, Sequence
import sys
from SYS import pipeline as ctx
from SYS.item_accessors import set_field
from SYS.payload_builders import extract_title_tag_value
from SYS.result_publication import publish_result_table
from .. import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
normalize_hash = sh.normalize_hash
parse_tag_arguments = sh.parse_tag_arguments
render_tag_value_templates = sh.render_tag_value_templates
merge_sequences = sh.merge_sequences
extract_tag_from_result = sh.extract_tag_from_result
should_show_help = sh.should_show_help
get_field = sh.get_field
from SYS.logger import debug, log
def _matches_target(
item: Any,
target_hash: str | None,
target_path: str | None,
target_store: str | None = None,
) -> bool:
def norm(val: Any) -> str | None:
return str(val).lower() if val is not None else None
target_hash_l = target_hash.lower() if target_hash else None
target_path_l = target_path.lower() if target_path else None
target_store_l = target_store.lower() if target_store else None
if isinstance(item, dict):
hashes = [norm(item.get("hash"))]
paths = [norm(item.get("path")), norm(item.get("target"))]
stores = [norm(item.get("store"))]
else:
hashes = [norm(get_field(item, "hash"))]
paths = [norm(get_field(item, "path")), norm(get_field(item, "target"))]
stores = [norm(get_field(item, "store"))]
if target_store_l and target_store_l not in stores:
return False
if target_hash_l and target_hash_l in hashes:
return True
if target_path_l and target_path_l in paths:
return True
return False
def _set_result_tags(result: Any, tags: list[str]) -> None:
normalized = list(tags or [])
set_field(result, "tag", normalized)
if isinstance(result, dict):
if "tags" in result:
result["tags"] = list(normalized)
for container_name in ("extra", "metadata", "full_metadata"):
container = result.get(container_name)
if not isinstance(container, dict):
continue
if "tag" in container:
container["tag"] = list(normalized)
if "tags" in container:
container["tags"] = list(normalized)
return
try:
setattr(result, "tags", list(normalized))
except Exception:
pass
for container_name in ("extra", "metadata", "full_metadata"):
container = getattr(result, container_name, None)
if not isinstance(container, dict):
continue
if "tag" in container:
container["tag"] = list(normalized)
if "tags" in container:
container["tags"] = list(normalized)
def _apply_title_to_result(result: Any, title_value: str | None) -> None:
if not title_value:
return
if isinstance(result, dict):
result["title"] = title_value
cols = result.get("columns")
if isinstance(cols, list):
updated_cols = []
changed = False
for col in cols:
if isinstance(col, tuple) and len(col) == 2:
label, existing_value = col
if str(label).lower() == "title":
updated_cols.append((label, title_value))
changed = True
else:
updated_cols.append((label, existing_value))
else:
updated_cols.append(col)
if changed:
result["columns"] = updated_cols
return
try:
setattr(result, "title", title_value)
except Exception:
pass
columns = getattr(result, "columns", None)
if isinstance(columns, list) and columns:
try:
label, *_ = columns[0]
if str(label).lower() == "title":
columns[0] = (label, title_value)
except Exception:
pass
def _refresh_result_table_tags(
new_tags: list[str],
target_hash: str | None,
target_store: str | None,
target_path: str | None,
) -> None:
try:
last_table = ctx.get_last_result_table()
items = ctx.get_last_result_items()
if not last_table or not items:
return
updated_items = []
match_found = False
title_value = extract_title_tag_value(new_tags)
for item in items:
try:
if _matches_target(item, target_hash, target_path, target_store):
_set_result_tags(item, new_tags)
if title_value:
_apply_title_to_result(item, title_value)
match_found = True
except Exception:
pass
updated_items.append(item)
if not match_found:
return
new_table = last_table.copy_with_title(getattr(last_table, "title", ""))
for item in updated_items:
new_table.add_result(item)
publish_result_table(ctx, new_table, updated_items, overlay=True)
except Exception:
pass
def _expand_namespace_delete_tags(tags: Sequence[str], existing_tags: Sequence[str]) -> list[str]:
expanded: list[str] = []
existing_list = [str(tag or "").strip() for tag in existing_tags or [] if str(tag or "").strip()]
for raw_tag in tags or []:
text = str(raw_tag or "").strip()
if not text:
continue
namespace, sep, value = text.partition(":")
if sep and namespace.strip() and not value.strip():
wanted = namespace.strip().casefold()
matches = []
for existing in existing_list:
existing_ns, existing_sep, existing_value = existing.partition(":")
if not existing_sep:
continue
if existing_ns.strip().casefold() != wanted:
continue
if not existing_value.strip():
continue
matches.append(existing)
expanded.extend(matches)
continue
expanded.append(text)
return merge_sequences(expanded, case_sensitive=True)
def _refresh_tag_view_if_current(
file_hash: str | None,
store_name: str | None,
path: str | None,
config: Dict[str,
Any]
) -> None:
"""If the current subject matches the target, refresh tags via get-tag."""
try:
from cmdlet import get as get_cmdlet # type: ignore
except Exception:
return
get_tag = None
try:
get_tag = get_cmdlet("metadata")
except Exception:
get_tag = None
if not callable(get_tag):
return
try:
subject = ctx.get_last_result_subject()
if subject is None:
return
def norm(val: Any) -> str:
return str(val).lower()
target_hash = norm(file_hash) if file_hash else None
target_path = norm(path) if path else None
subj_hashes: list[str] = []
subj_paths: list[str] = []
if isinstance(subject, dict):
subj_hashes = [norm(v) for v in [subject.get("hash")] if v]
subj_paths = [
norm(v) for v in [subject.get("path"), subject.get("target")] if v
]
else:
subj_hashes = [
norm(get_field(subject,
f)) for f in ("hash", ) if get_field(subject, f)
]
subj_paths = [
norm(get_field(subject,
f)) for f in ("path", "target") if get_field(subject, f)
]
is_match = False
if target_hash and target_hash in subj_hashes:
is_match = True
if target_path and target_path in subj_paths:
is_match = True
if not is_match:
return
refresh_args: list[str] = ["-get"]
if file_hash:
refresh_args.extend(["-query", f"hash:{file_hash}"])
# Build a lean subject so get-tag fetches fresh tags instead of reusing cached payloads.
def _build_refresh_subject() -> Dict[str, Any]:
payload: Dict[str, Any] = {}
payload["hash"] = file_hash
store_value = store_name or get_field(subject, "store")
if sh.value_has_content(store_value):
payload["store"] = store_value
path_value = path or get_field(subject, "path")
if not sh.value_has_content(path_value):
path_value = get_field(subject, "target")
if sh.value_has_content(path_value):
payload["path"] = path_value
for key in ("title", "name", "url", "relations", "service_name"):
val = get_field(subject, key)
if sh.value_has_content(val):
payload[key] = val
extra_value = get_field(subject, "extra")
if isinstance(extra_value, dict):
cleaned = {
k: v for k, v in extra_value.items()
if str(k).lower() not in {"tag", "tags"}
}
if cleaned:
payload["extra"] = cleaned
elif sh.value_has_content(extra_value):
payload["extra"] = extra_value
return payload
refresh_subject = _build_refresh_subject()
# Do not pass -instance here as it triggers emit_mode/quiet in get-tag
with ctx.suspend_live_progress():
get_tag(refresh_subject, refresh_args, config)
except Exception:
pass
def _parse_delete_tag_arguments(arguments: Sequence[str]) -> list[str]:
def _split_top_level_commas(text: str) -> list[str]:
segments: list[str] = []
current: list[str] = []
paren_depth = 0
angle_depth = 0
quote: str | None = None
escape = False
for ch in text:
if escape:
current.append(ch)
escape = False
continue
if ch == "\\":
current.append(ch)
escape = True
continue
if quote:
current.append(ch)
if ch == quote:
quote = None
continue
if ch in {"'", '"'}:
current.append(ch)
quote = ch
continue
if ch == "(":
paren_depth += 1
current.append(ch)
continue
if ch == ")":
paren_depth = max(0, paren_depth - 1)
current.append(ch)
continue
if ch == "<":
angle_depth += 1
current.append(ch)
continue
if ch == ">":
angle_depth = max(0, angle_depth - 1)
current.append(ch)
continue
if ch == "," and paren_depth == 0 and angle_depth == 0:
segments.append("".join(current).strip())
current = []
continue
current.append(ch)
tail = "".join(current).strip()
if tail or segments:
segments.append(tail)
return segments
def _expand_pipe_namespace(text: str) -> list[str]:
parts = text.split("|")
expanded: list[str] = []
last_ns: str | None = None
for part in parts:
segment = part.strip()
if not segment:
continue
if ":" in segment:
ns, val = segment.split(":", 1)
ns = ns.strip()
val = val.strip()
last_ns = ns or last_ns
if last_ns is not None:
expanded.append(f"{last_ns}:{val}")
elif ns or val:
expanded.append(f"{ns}:{val}")
else:
if last_ns:
expanded.append(f"{last_ns}:{segment}")
else:
expanded.append(segment)
return expanded
tags: list[str] = []
for argument in arguments:
for token in _split_top_level_commas(str(argument)):
text = token.strip()
if not text:
continue
for entry in _expand_pipe_namespace(text):
candidate = entry.strip()
if not candidate:
continue
if ":" in candidate:
ns, val = candidate.split(":", 1)
candidate = f"{ns.strip()}:{val.strip()}"
if candidate:
tags.append(candidate)
return tags
_DELETE_TAG_CMDLET = Cmdlet(
name="tag",
summary="Remove tags from a file in a store.",
usage='metadata -delete -instance <store> [-query "hash:<sha256>"] <tag>[,<tag>...]',
arg=[
SharedArgs.QUERY,
SharedArgs.INSTANCE,
CmdletArg(
"<tag>[,<tag>...]",
required=True,
description="One or more tags to remove. Comma- or space-separated.",
),
],
detail=[
"- Requires a Hydrus file (hash present) or explicit -query override.",
"- Multiple tags can be comma-separated or space-separated.",
"- Use #(namespace) inside a tag value to remove a derived tag, e.g. metadata -delete \"title:#(track) - #(series)\".",
"- Angle-bracket transforms match metadata -add syntax, e.g. metadata -delete \"code:e<padding(00,#(episode))>\".",
"- Current documented transforms include padding, default, replace, and increment.",
"- Template examples assume lowercase tag text; case transforms are intentionally not part of the documented syntax.",
"- See docs/tag_template_syntax.md for recipe-style examples and the current shared template syntax.",
],
)
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
# Help
if should_show_help(args):
log(f"Cmdlet: {_DELETE_TAG_CMDLET.name}\nSummary: {_DELETE_TAG_CMDLET.summary}\nUsage: {_DELETE_TAG_CMDLET.usage}")
return 0
def _looks_like_tag_row(obj: Any) -> bool:
if obj is None:
return False
# TagItem (direct) or PipeObject/dict emitted from get-tag table rows.
try:
if (hasattr(obj,
"__class__") and obj.__class__.__name__ == "TagItem"
and hasattr(obj,
"tag_name")):
return True
except Exception:
pass
try:
return bool(get_field(obj, "tag_name"))
except Exception:
return False
has_piped_tag = _looks_like_tag_row(result)
has_piped_tag_list = (
isinstance(result,
list) and bool(result) and _looks_like_tag_row(result[0])
)
# Parse -query/-instance overrides and collect remaining args.
override_query: str | None = None
override_hash: str | None = None
override_store: str | None = None
rest: list[str] = []
i = 0
while i < len(args):
a = args[i]
low = str(a).lower()
if low in {"-query",
"--query",
"query"} and i + 1 < len(args):
override_query = str(args[i + 1]).strip()
i += 2
continue
if low in {"-instance",
"--instance"} and i + 1 < len(args):
override_store = str(args[i + 1]).strip()
i += 2
continue
rest.append(a)
i += 1
override_hash, query_valid = sh.require_single_hash_query(
override_query,
"Invalid -query value (expected hash:<sha256>)",
log_file=sys.stderr,
)
if not query_valid:
return 1
# Selection syntax (@...) is handled by the pipeline runner, not by this cmdlet.
# If @ reaches here as a literal argument, it's almost certainly user error.
if rest and str(rest[0]
).startswith("@") and not (has_piped_tag or has_piped_tag_list):
log("Selection syntax is only supported via piping. Use: @N | metadata -delete")
return 1
# Special case: grouped tag selection created by the pipeline runner.
# This represents "delete these selected tags" (not "delete tags from this file").
grouped_table = ""
try:
grouped_table = str(get_field(result, "table") or "").strip().lower()
except Exception:
grouped_table = ""
grouped_tags = get_field(result, "tag") if result is not None else None
tags_arg = _parse_delete_tag_arguments(rest)
if (grouped_table == "tag.selection" and isinstance(grouped_tags,
list) and grouped_tags
and not tags_arg):
file_hash = (
normalize_hash(override_hash)
if override_hash else normalize_hash(get_field(result,
"hash"))
)
store_name = override_store or get_field(result, "store")
path = get_field(result, "path") or get_field(result, "target")
tags = [str(t) for t in grouped_tags if t]
return 0 if _process_deletion(tags, file_hash, path, store_name, config, result=result) else 1
if not tags_arg and not has_piped_tag and not has_piped_tag_list:
log("Requires at least one tag argument")
return 1
# Normalize result to a list for processing
items_to_process = sh.normalize_result_items(result)
# Process each item
success_count = 0
# If we have TagItems and no args, we are deleting the tags themselves
# If we have Files (or other objects) and args, we are deleting tags FROM those files
# Check if we are in "delete selected tags" mode (tag rows)
is_tag_item_mode = bool(items_to_process) and _looks_like_tag_row(
items_to_process[0]
)
if is_tag_item_mode:
# Collect all tags to delete from the TagItems and batch per file.
# This keeps delete-tag efficient (one backend call per file).
groups: Dict[tuple[str,
str,
str],
list[str]] = {}
for item in items_to_process:
tag_name = get_field(item, "tag_name")
if not tag_name:
continue
item_hash = (
normalize_hash(override_hash)
if override_hash else normalize_hash(get_field(item,
"hash"))
)
item_store = override_store or get_field(item, "store")
item_path = get_field(item, "path") or get_field(item, "target")
key = (str(item_hash or ""), str(item_store or ""), str(item_path or ""))
groups.setdefault(key, []).append(str(tag_name))
for (h, s, p), tag_list in groups.items():
if not tag_list:
continue
if _process_deletion(tag_list, h or None, p or None, s or None, config):
success_count += 1
return 0 if success_count > 0 else 1
else:
# "Delete tags from files" mode
# We need args (tags to delete)
if not tags_arg:
log("Requires at least one tag argument when deleting from files")
return 1
# Collect (store_name, tags_key) -> {backend, hashes, items} groups for bulk dispatch.
# Items that need per-item existing-tag resolution (e.g. namespace-wildcard expand)
# are handled individually; static literal tag sets are batched.
_backend_cache: Dict[str, Any] = {}
def _get_backend(store_name_str: str) -> Any | None:
if store_name_str in _backend_cache:
return _backend_cache[store_name_str]
try:
backend, _reg, _exc = sh.get_preferred_store_backend(
config, store_name_str, suppress_debug=True
)
except TypeError:
backend, _reg, _exc = sh.get_store_backend(
config, store_name_str, suppress_debug=True
)
if backend is not None:
_backend_cache[store_name_str] = backend
return backend
# Bucket: key = (store_name, sorted_tag_tuple) → list of (hash, item, path)
bulk_groups: Dict[tuple[str, tuple[str, ...]], list[tuple[str, Any, str | None]]] = {}
items_needing_individual: list[tuple[Any, str, str | None, str]] = []
tags_has_namespace_wildcard = any(
(isinstance(t, str) and ":" in t and not t.split(":", 1)[1].strip())
for t in tags_arg
)
tags_has_template = any(
(isinstance(t, str) and "#(" in t)
for t in tags_arg
)
needs_individual = tags_has_namespace_wildcard or tags_has_template
for item in items_to_process:
item_hash = (
normalize_hash(override_hash)
if override_hash else normalize_hash(get_field(item, "hash"))
)
item_path = get_field(item, "path") or get_field(item, "target")
item_store = override_store or get_field(item, "store")
if _looks_like_tag_row(item):
if tags_arg:
tags_to_delete = tags_arg
else:
tag_name = get_field(item, "tag_name")
tags_to_delete = [str(tag_name)] if tag_name else []
else:
tags_to_delete = tags_arg or []
if not tags_to_delete or not item_hash or not item_store:
continue
store_str = str(item_store)
# Namespace wildcards (e.g. "album:") and template tags (e.g. "title:#(track)")
# need existing tags to expand — handle individually.
if needs_individual:
items_needing_individual.append((item, item_hash, item_path, store_str))
continue
tag_key = tuple(sorted(str(t).strip().lower() for t in tags_to_delete if str(t).strip()))
bulk_groups.setdefault((store_str, tag_key), []).append((item_hash, item, item_path))
# --- Bulk dispatch ---
for (store_str, tag_key), entries in bulk_groups.items():
backend = _get_backend(store_str)
if backend is None:
log(f"Store '{store_str}' not found", file=sys.stderr)
continue
hashes = [h for h, _item, _path in entries]
tag_list = list(tag_key)
bulk_fn = getattr(backend, "delete_tags_bulk", None)
bulk_ok = False
if callable(bulk_fn):
try:
bulk_ok = bool(bulk_fn([(h, tag_list) for h in hashes]))
except Exception:
bulk_ok = False
if not bulk_ok:
# fallback: individual delete_tag per hash
for h in hashes:
try:
backend.delete_tag(h, tag_list, config=config)
except Exception:
pass
success_count += 1
delete_set = {t.lower() for t in tag_key}
for h, item, path in entries:
# Update in-memory tag list on each result
old_tags = [str(t) for t in (get_field(item, "tag") or []) if t]
new_tags = [t for t in old_tags if t.strip().casefold() not in delete_set]
_set_result_tags(item, new_tags)
title_value = extract_title_tag_value(new_tags)
if title_value:
_apply_title_to_result(item, title_value)
_refresh_result_table_tags(new_tags, h, store_str, path)
try:
ctx.emit(item)
except Exception:
pass
# --- Individual dispatch (namespace wildcards) ---
for item, item_hash, item_path, store_str in items_needing_individual:
if _process_deletion(tags_arg, item_hash, item_path, store_str, config, result=item):
success_count += 1
try:
ctx.emit(item)
except Exception:
pass
if success_count > 0:
return 0
return 1
def _process_deletion(
tags: list[str],
file_hash: str | None,
path: str | None,
store_name: str | None,
config: Dict[str,
Any],
result: Any = None,
) -> bool:
"""Helper to execute the deletion logic for a single target."""
if not tags:
return False
if not store_name:
log(
"Store is required (use -instance or pipe a result with store)",
file=sys.stderr
)
return False
resolved_hash = sh.resolve_hash_for_cmdlet(file_hash, path, None)
if not resolved_hash:
log(
"Item does not include a usable hash (and hash could not be derived from path)",
file=sys.stderr,
)
return False
def _resolve_backend() -> tuple[Any | None, Any, Exception | None]:
try:
return sh.get_preferred_store_backend(
config,
store_name,
suppress_debug=True,
)
except TypeError as exc:
# Some tests monkeypatch get_store_backend with a reduced signature.
# Fall back so runtime still prefers plugin instance resolution while
# preserving compatibility with those injected callables.
if "store_registry" in str(exc):
return sh.get_store_backend(
config,
store_name,
suppress_debug=True,
)
raise
def _fetch_existing_tags() -> list[str]:
try:
backend, _store_registry, _exc = _resolve_backend()
if backend is None:
return []
existing, _src = backend.get_tag(resolved_hash, config=config)
return list(existing or [])
except Exception:
return []
existing_tag_list = merge_sequences(
extract_tag_from_result(result),
_fetch_existing_tags(),
case_sensitive=True,
)
resolved_tags, unresolved_templates = render_tag_value_templates(
tags,
existing_tags=existing_tag_list,
result=result,
)
if unresolved_templates:
log(
f"[delete_tag] skipped {len(unresolved_templates)} tag template(s) with unresolved #(namespace) placeholders",
file=sys.stderr,
)
tags = _expand_namespace_delete_tags(list(resolved_tags), existing_tag_list)
if not tags:
return False
# Safety: only block if this deletion would remove the final title tag
title_tags = [
t for t in tags if isinstance(t, str) and t.lower().startswith("title:")
]
if title_tags:
existing_tags = existing_tag_list
current_titles = [
t for t in existing_tags
if isinstance(t, str) and t.lower().startswith("title:")
]
del_title_set = {t.lower()
for t in title_tags}
remaining_titles = [t for t in current_titles if t.lower() not in del_title_set]
if current_titles and not remaining_titles:
log(
'Cannot delete the last title: tag. Add a replacement title first (metadata -add "title:new title").',
file=sys.stderr,
)
return False
try:
backend, _store_registry, exc = _resolve_backend()
if backend is None:
raise exc or KeyError(store_name)
ok = backend.delete_tag(resolved_hash, list(tags), config=config)
if ok:
refreshed_tags: list[str] = []
try:
refreshed, _src = backend.get_tag(resolved_hash, config=config)
refreshed_tags = list(refreshed or [])
except Exception:
delete_set = {str(tag).strip().casefold() for tag in tags}
refreshed_tags = [
existing_tag for existing_tag in existing_tag_list
if str(existing_tag).strip().casefold() not in delete_set
]
if result is not None:
_set_result_tags(result, refreshed_tags)
title_value = extract_title_tag_value(refreshed_tags)
if title_value:
_apply_title_to_result(result, title_value)
_refresh_result_table_tags(refreshed_tags, resolved_hash, store_name, path)
_refresh_tag_view_if_current(resolved_hash, store_name, path, config)
return True
return False
except Exception as exc:
log(f"del-tag failed: {exc}")
return False
File diff suppressed because it is too large Load Diff
+25
View File
@@ -0,0 +1,25 @@
from __future__ import annotations
from typing import Any, Dict, Sequence
def run_url_action(action: str, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Route metadata URL actions to URL cmdlets."""
act = str(action or "").strip().lower()
if act == "add":
from cmdlet.file.add_url import CMDLET as ADD_URL_CMDLET
return int(ADD_URL_CMDLET.run(result, args, config))
if act == "delete":
from cmdlet.delete_url import CMDLET as DELETE_URL_CMDLET
return int(DELETE_URL_CMDLET.run(result, args, config))
if act == "get":
from cmdlet.get_url import CMDLET as GET_URL_CMDLET
return int(GET_URL_CMDLET.run(result, args, config))
return 1