Files
Medios-Macina/cmdlet/delete_tag.py

383 lines
13 KiB
Python
Raw Normal View History

2025-11-25 20:09:33 -08:00
from __future__ import annotations
from typing import Any, Dict, Sequence
2025-12-06 00:10:19 -08:00
from pathlib import Path
2025-11-25 20:09:33 -08:00
import json
2025-12-05 03:42:57 -08:00
import sys
2025-11-25 20:09:33 -08:00
from SYS import models
from SYS import pipeline as ctx
2025-12-16 23:23:43 -08:00
from . import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
normalize_hash = sh.normalize_hash
parse_tag_arguments = sh.parse_tag_arguments
should_show_help = sh.should_show_help
get_field = sh.get_field
2025-12-11 19:04:02 -08:00
from SYS.logger import debug, log
from Store import Store
2025-11-25 20:09:33 -08:00
2025-12-29 17:05:03 -08:00
def _refresh_tag_view_if_current(
file_hash: str | None,
store_name: str | None,
path: str | None,
config: Dict[str,
Any]
2025-12-29 17:05:03 -08:00
) -> None:
2025-12-06 00:10:19 -08:00
"""If the current subject matches the target, refresh tags via get-tag."""
try:
2025-12-12 21:55:38 -08:00
from cmdlet import get as get_cmdlet # type: ignore
2025-12-06 00:10:19 -08:00
except Exception:
return
2025-12-12 21:55:38 -08:00
get_tag = None
try:
get_tag = get_cmdlet("get-tag")
except Exception:
get_tag = None
if not callable(get_tag):
return
2025-12-06 00:10:19 -08:00
try:
subject = ctx.get_last_result_subject()
if subject is None:
return
def norm(val: Any) -> str:
return str(val).lower()
2025-12-11 19:04:02 -08:00
target_hash = norm(file_hash) if file_hash else None
target_path = norm(path) if path else None
2025-12-06 00:10:19 -08:00
subj_hashes: list[str] = []
subj_paths: list[str] = []
if isinstance(subject, dict):
2025-12-11 19:04:02 -08:00
subj_hashes = [norm(v) for v in [subject.get("hash")] if v]
subj_paths = [
norm(v) for v in [subject.get("path"), subject.get("target")] if v
]
2025-12-06 00:10:19 -08:00
else:
subj_hashes = [
norm(get_field(subject,
f)) for f in ("hash", ) if get_field(subject, f)
]
2025-12-29 17:05:03 -08:00
subj_paths = [
norm(get_field(subject,
f)) for f in ("path", "target") if get_field(subject, f)
2025-12-29 17:05:03 -08:00
]
2025-12-06 00:10:19 -08:00
is_match = False
if target_hash and target_hash in subj_hashes:
is_match = True
if target_path and target_path in subj_paths:
is_match = True
if not is_match:
return
refresh_args: list[str] = []
2025-12-11 19:04:02 -08:00
if file_hash:
2025-12-20 02:12:45 -08:00
refresh_args.extend(["-query", f"hash:{file_hash}"])
2025-12-12 21:55:38 -08:00
if store_name:
refresh_args.extend(["-store", store_name])
get_tag(subject, refresh_args, config)
2025-12-06 00:10:19 -08:00
except Exception:
pass
2025-11-25 20:09:33 -08:00
CMDLET = Cmdlet(
2025-12-11 19:04:02 -08:00
name="delete-tag",
summary="Remove tags from a file in a store.",
2025-12-29 17:05:03 -08:00
usage='delete-tag -store <store> [-query "hash:<sha256>"] <tag>[,<tag>...]',
2025-12-11 12:47:30 -08:00
arg=[
2025-12-20 02:12:45 -08:00
SharedArgs.QUERY,
2025-12-11 19:04:02 -08:00
SharedArgs.STORE,
2025-12-29 17:05:03 -08:00
CmdletArg(
"<tag>[,<tag>...]",
required=True,
description="One or more tags to remove. Comma- or space-separated.",
),
2025-11-25 20:09:33 -08:00
],
2025-12-11 12:47:30 -08:00
detail=[
2025-12-20 02:12:45 -08:00
"- Requires a Hydrus file (hash present) or explicit -query override.",
2025-11-25 20:09:33 -08:00
"- Multiple tags can be comma-separated or space-separated.",
],
)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
# Help
2025-12-11 12:47:30 -08:00
if should_show_help(args):
2025-12-12 21:55:38 -08:00
log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}")
2025-12-11 12:47:30 -08:00
return 0
2025-12-29 17:05:03 -08:00
2025-12-18 22:50:21 -08:00
def _looks_like_tag_row(obj: Any) -> bool:
if obj is None:
return False
# TagItem (direct) or PipeObject/dict emitted from get-tag table rows.
try:
if (hasattr(obj,
"__class__") and obj.__class__.__name__ == "TagItem"
and hasattr(obj,
"tag_name")):
2025-12-18 22:50:21 -08:00
return True
except Exception:
pass
try:
2025-12-29 17:05:03 -08:00
return bool(get_field(obj, "tag_name"))
2025-12-18 22:50:21 -08:00
except Exception:
return False
has_piped_tag = _looks_like_tag_row(result)
2025-12-29 17:05:03 -08:00
has_piped_tag_list = (
isinstance(result,
list) and bool(result) and _looks_like_tag_row(result[0])
2025-12-29 17:05:03 -08:00
)
2025-11-25 20:09:33 -08:00
2025-12-20 02:12:45 -08:00
# Parse -query/-store overrides and collect remaining args.
override_query: str | None = None
2025-11-25 20:09:33 -08:00
override_hash: str | None = None
2025-12-11 19:04:02 -08:00
override_store: str | None = None
2025-11-25 20:09:33 -08:00
rest: list[str] = []
i = 0
while i < len(args):
a = args[i]
low = str(a).lower()
if low in {"-query",
"--query",
"query"} and i + 1 < len(args):
2025-12-20 02:12:45 -08:00
override_query = str(args[i + 1]).strip()
2025-11-25 20:09:33 -08:00
i += 2
continue
if low in {"-store",
"--store",
"store"} and i + 1 < len(args):
2025-12-11 19:04:02 -08:00
override_store = str(args[i + 1]).strip()
i += 2
continue
2025-11-25 20:09:33 -08:00
rest.append(a)
i += 1
2025-12-20 02:12:45 -08:00
override_hash = sh.parse_single_hash_query(
override_query
) if override_query else None
2025-12-20 02:12:45 -08:00
if override_query and not override_hash:
log("Invalid -query value (expected hash:<sha256>)", file=sys.stderr)
return 1
# Selection syntax (@...) is handled by the pipeline runner, not by this cmdlet.
# If @ reaches here as a literal argument, it's almost certainly user error.
if rest and str(rest[0]
).startswith("@") and not (has_piped_tag or has_piped_tag_list):
2025-12-20 02:12:45 -08:00
log("Selection syntax is only supported via piping. Use: @N | delete-tag")
return 1
# Special case: grouped tag selection created by the pipeline runner.
# This represents "delete these selected tags" (not "delete tags from this file").
grouped_table = ""
try:
grouped_table = str(get_field(result, "table") or "").strip().lower()
except Exception:
grouped_table = ""
grouped_tags = get_field(result, "tag") if result is not None else None
tags_arg = parse_tag_arguments(rest)
if (grouped_table == "tag.selection" and isinstance(grouped_tags,
list) and grouped_tags
and not tags_arg):
2025-12-29 17:05:03 -08:00
file_hash = (
normalize_hash(override_hash)
if override_hash else normalize_hash(get_field(result,
"hash"))
2025-12-29 17:05:03 -08:00
)
2025-12-20 02:12:45 -08:00
store_name = override_store or get_field(result, "store")
path = get_field(result, "path") or get_field(result, "target")
tags = [str(t) for t in grouped_tags if t]
return 0 if _process_deletion(tags, file_hash, path, store_name, config) else 1
if not tags_arg and not has_piped_tag and not has_piped_tag_list:
log("Requires at least one tag argument")
return 1
2025-12-29 17:05:03 -08:00
2025-11-27 10:59:01 -08:00
# Normalize result to a list for processing
items_to_process = []
if isinstance(result, list):
items_to_process = result
elif result:
items_to_process = [result]
2025-12-20 02:12:45 -08:00
# Process each item
success_count = 0
2025-12-29 17:05:03 -08:00
2025-11-27 10:59:01 -08:00
# If we have TagItems and no args, we are deleting the tags themselves
# If we have Files (or other objects) and args, we are deleting tags FROM those files
2025-12-29 17:05:03 -08:00
2025-12-18 22:50:21 -08:00
# Check if we are in "delete selected tags" mode (tag rows)
is_tag_item_mode = bool(items_to_process) and _looks_like_tag_row(
items_to_process[0]
)
2025-12-29 17:05:03 -08:00
2025-11-27 10:59:01 -08:00
if is_tag_item_mode:
2025-12-20 02:12:45 -08:00
# Collect all tags to delete from the TagItems and batch per file.
# This keeps delete-tag efficient (one backend call per file).
groups: Dict[tuple[str,
str,
str],
list[str]] = {}
2025-12-20 02:12:45 -08:00
for item in items_to_process:
tag_name = get_field(item, "tag_name")
if not tag_name:
continue
2025-12-29 17:05:03 -08:00
item_hash = (
normalize_hash(override_hash)
if override_hash else normalize_hash(get_field(item,
"hash"))
2025-12-29 17:05:03 -08:00
)
2025-12-20 02:12:45 -08:00
item_store = override_store or get_field(item, "store")
item_path = get_field(item, "path") or get_field(item, "target")
key = (str(item_hash or ""), str(item_store or ""), str(item_path or ""))
groups.setdefault(key, []).append(str(tag_name))
for (h, s, p), tag_list in groups.items():
if not tag_list:
continue
if _process_deletion(tag_list, h or None, p or None, s or None, config):
success_count += 1
return 0 if success_count > 0 else 1
2025-11-27 10:59:01 -08:00
else:
# "Delete tags from files" mode
# We need args (tags to delete)
2025-12-20 02:12:45 -08:00
if not tags_arg:
2025-12-29 17:05:03 -08:00
log("Requires at least one tag argument when deleting from files")
return 1
2025-11-27 10:59:01 -08:00
# Process each item
2025-12-29 17:05:03 -08:00
2025-11-27 10:59:01 -08:00
# If we have tags from @ syntax (e.g. delete-tag @{1,2}), we ignore the piped result for tag selection
# but we might need the piped result for the file context if @ selection was from a Tag table
# Actually, the @ selection logic above already extracted tags.
2025-12-29 17:05:03 -08:00
2025-12-20 02:12:45 -08:00
# Process items from pipe (or single result)
# If args are provided, they are the tags to delete from EACH item
# If items are TagItems and no args, the tag to delete is the item itself
for item in items_to_process:
tags_to_delete: list[str] = []
2025-12-29 17:05:03 -08:00
item_hash = (
normalize_hash(override_hash)
if override_hash else normalize_hash(get_field(item,
"hash"))
2025-12-20 02:12:45 -08:00
)
2025-12-29 17:05:03 -08:00
item_path = get_field(item, "path") or get_field(item, "target")
2025-12-20 02:12:45 -08:00
item_store = override_store or get_field(item, "store")
if _looks_like_tag_row(item):
if tags_arg:
tags_to_delete = tags_arg
else:
2025-12-29 17:05:03 -08:00
tag_name = get_field(item, "tag_name")
2025-12-20 02:12:45 -08:00
if tag_name:
tags_to_delete = [str(tag_name)]
else:
if tags_arg:
tags_to_delete = tags_arg
2025-11-27 10:59:01 -08:00
else:
2025-12-20 02:12:45 -08:00
continue
if tags_to_delete:
if _process_deletion(tags_to_delete,
item_hash,
item_path,
item_store,
config):
2025-12-20 02:12:45 -08:00
success_count += 1
2025-11-27 10:59:01 -08:00
if success_count > 0:
return 0
return 1
2025-12-29 17:05:03 -08:00
def _process_deletion(
tags: list[str],
file_hash: str | None,
path: str | None,
store_name: str | None,
config: Dict[str,
Any],
2025-12-29 17:05:03 -08:00
) -> bool:
2025-11-27 10:59:01 -08:00
"""Helper to execute the deletion logic for a single target."""
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
if not tags:
2025-11-27 10:59:01 -08:00
return False
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
if not store_name:
log(
"Store is required (use -store or pipe a result with store)",
file=sys.stderr
)
2025-12-11 19:04:02 -08:00
return False
resolved_hash = normalize_hash(file_hash) if file_hash else None
if not resolved_hash and path:
try:
from SYS.utils import sha256_file
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
resolved_hash = sha256_file(Path(path))
except Exception:
resolved_hash = None
if not resolved_hash:
2025-12-29 17:05:03 -08:00
log(
"Item does not include a usable hash (and hash could not be derived from path)",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
return False
2025-12-06 00:10:19 -08:00
def _fetch_existing_tags() -> list[str]:
2025-12-11 19:04:02 -08:00
try:
backend = Store(config)[store_name]
existing, _src = backend.get_tag(resolved_hash, config=config)
return list(existing or [])
except Exception:
return []
2025-12-05 03:42:57 -08:00
2025-12-06 00:10:19 -08:00
# Safety: only block if this deletion would remove the final title tag
title_tags = [
t for t in tags if isinstance(t, str) and t.lower().startswith("title:")
]
2025-12-05 03:42:57 -08:00
if title_tags:
2025-12-06 00:10:19 -08:00
existing_tags = _fetch_existing_tags()
2025-12-29 17:05:03 -08:00
current_titles = [
t for t in existing_tags
if isinstance(t, str) and t.lower().startswith("title:")
2025-12-29 17:05:03 -08:00
]
del_title_set = {t.lower()
for t in title_tags}
2025-12-06 00:10:19 -08:00
remaining_titles = [t for t in current_titles if t.lower() not in del_title_set]
if current_titles and not remaining_titles:
2025-12-29 17:05:03 -08:00
log(
'Cannot delete the last title: tag. Add a replacement title first (add-tags "title:new title").',
file=sys.stderr,
)
2025-12-06 00:10:19 -08:00
return False
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
backend = Store(config)[store_name]
ok = backend.delete_tag(resolved_hash, list(tags), config=config)
if ok:
2025-12-29 17:05:03 -08:00
preview = resolved_hash[:12] + ("" if len(resolved_hash) > 12 else "")
debug(
f"Removed {len(tags)} tag(s) from {preview} via store '{store_name}'."
)
2025-12-11 19:04:02 -08:00
_refresh_tag_view_if_current(resolved_hash, store_name, path, config)
2025-11-27 10:59:01 -08:00
return True
return False
2025-11-25 20:09:33 -08:00
except Exception as exc:
2025-12-11 19:04:02 -08:00
log(f"del-tag failed: {exc}")
2025-11-27 10:59:01 -08:00
return False
2025-11-25 20:09:33 -08:00
2025-12-12 21:55:38 -08:00
# Register cmdlet (no legacy decorator)
CMDLET.exec = _run
CMDLET.register()