from __future__ import annotations from typing import Any, Dict, Sequence from pathlib import Path import json import sys import models import pipeline as ctx from . import _shared as sh Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg SharedArgs = sh.SharedArgs normalize_hash = sh.normalize_hash parse_tag_arguments = sh.parse_tag_arguments should_show_help = sh.should_show_help get_field = sh.get_field from SYS.logger import debug, log from Store import Store def _refresh_tag_view_if_current(file_hash: str | None, store_name: str | None, path: str | None, config: Dict[str, Any]) -> None: """If the current subject matches the target, refresh tags via get-tag.""" try: from cmdlet import get as get_cmdlet # type: ignore except Exception: return get_tag = None try: get_tag = get_cmdlet("get-tag") except Exception: get_tag = None if not callable(get_tag): return try: subject = ctx.get_last_result_subject() if subject is None: return def norm(val: Any) -> str: return str(val).lower() target_hash = norm(file_hash) if file_hash else None target_path = norm(path) if path else None subj_hashes: list[str] = [] subj_paths: list[str] = [] if isinstance(subject, dict): subj_hashes = [norm(v) for v in [subject.get("hash")] if v] subj_paths = [norm(v) for v in [subject.get("path"), subject.get("target")] if v] else: subj_hashes = [norm(get_field(subject, f)) for f in ("hash",) if get_field(subject, f)] subj_paths = [norm(get_field(subject, f)) for f in ("path", "target") if get_field(subject, f)] is_match = False if target_hash and target_hash in subj_hashes: is_match = True if target_path and target_path in subj_paths: is_match = True if not is_match: return refresh_args: list[str] = [] if file_hash: refresh_args.extend(["-query", f"hash:{file_hash}"]) if store_name: refresh_args.extend(["-store", store_name]) get_tag(subject, refresh_args, config) except Exception: pass CMDLET = Cmdlet( name="delete-tag", summary="Remove tags from a file in a store.", usage="delete-tag -store [-query \"hash:\"] [,...]", arg=[ SharedArgs.QUERY, SharedArgs.STORE, CmdletArg("[,...]", required=True, description="One or more tags to remove. Comma- or space-separated."), ], detail=[ "- Requires a Hydrus file (hash present) or explicit -query override.", "- Multiple tags can be comma-separated or space-separated.", ], ) def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: # Help if should_show_help(args): log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}") return 0 def _looks_like_tag_row(obj: Any) -> bool: if obj is None: return False # TagItem (direct) or PipeObject/dict emitted from get-tag table rows. try: if hasattr(obj, '__class__') and obj.__class__.__name__ == 'TagItem' and hasattr(obj, 'tag_name'): return True except Exception: pass try: return bool(get_field(obj, 'tag_name')) except Exception: return False has_piped_tag = _looks_like_tag_row(result) has_piped_tag_list = isinstance(result, list) and bool(result) and _looks_like_tag_row(result[0]) # Parse -query/-store overrides and collect remaining args. override_query: str | None = None override_hash: str | None = None override_store: str | None = None rest: list[str] = [] i = 0 while i < len(args): a = args[i] low = str(a).lower() if low in {"-query", "--query", "query"} and i + 1 < len(args): override_query = str(args[i + 1]).strip() i += 2 continue if low in {"-store", "--store", "store"} and i + 1 < len(args): override_store = str(args[i + 1]).strip() i += 2 continue rest.append(a) i += 1 override_hash = sh.parse_single_hash_query(override_query) if override_query else None if override_query and not override_hash: log("Invalid -query value (expected hash:)", file=sys.stderr) return 1 # Selection syntax (@...) is handled by the pipeline runner, not by this cmdlet. # If @ reaches here as a literal argument, it's almost certainly user error. if rest and str(rest[0]).startswith("@") and not (has_piped_tag or has_piped_tag_list): log("Selection syntax is only supported via piping. Use: @N | delete-tag") return 1 # Special case: grouped tag selection created by the pipeline runner. # This represents "delete these selected tags" (not "delete tags from this file"). grouped_table = "" try: grouped_table = str(get_field(result, "table") or "").strip().lower() except Exception: grouped_table = "" grouped_tags = get_field(result, "tag") if result is not None else None tags_arg = parse_tag_arguments(rest) if grouped_table == "tag.selection" and isinstance(grouped_tags, list) and grouped_tags and not tags_arg: file_hash = normalize_hash(override_hash) if override_hash else normalize_hash(get_field(result, "hash")) store_name = override_store or get_field(result, "store") path = get_field(result, "path") or get_field(result, "target") tags = [str(t) for t in grouped_tags if t] return 0 if _process_deletion(tags, file_hash, path, store_name, config) else 1 if not tags_arg and not has_piped_tag and not has_piped_tag_list: log("Requires at least one tag argument") return 1 # Normalize result to a list for processing items_to_process = [] if isinstance(result, list): items_to_process = result elif result: items_to_process = [result] # Process each item success_count = 0 # If we have TagItems and no args, we are deleting the tags themselves # If we have Files (or other objects) and args, we are deleting tags FROM those files # Check if we are in "delete selected tags" mode (tag rows) is_tag_item_mode = bool(items_to_process) and _looks_like_tag_row(items_to_process[0]) if is_tag_item_mode: # Collect all tags to delete from the TagItems and batch per file. # This keeps delete-tag efficient (one backend call per file). groups: Dict[tuple[str, str, str], list[str]] = {} for item in items_to_process: tag_name = get_field(item, "tag_name") if not tag_name: continue item_hash = normalize_hash(override_hash) if override_hash else normalize_hash(get_field(item, "hash")) item_store = override_store or get_field(item, "store") item_path = get_field(item, "path") or get_field(item, "target") key = (str(item_hash or ""), str(item_store or ""), str(item_path or "")) groups.setdefault(key, []).append(str(tag_name)) for (h, s, p), tag_list in groups.items(): if not tag_list: continue if _process_deletion(tag_list, h or None, p or None, s or None, config): success_count += 1 return 0 if success_count > 0 else 1 else: # "Delete tags from files" mode # We need args (tags to delete) if not tags_arg: log("Requires at least one tag argument when deleting from files") return 1 # Process each item # If we have tags from @ syntax (e.g. delete-tag @{1,2}), we ignore the piped result for tag selection # but we might need the piped result for the file context if @ selection was from a Tag table # Actually, the @ selection logic above already extracted tags. # Process items from pipe (or single result) # If args are provided, they are the tags to delete from EACH item # If items are TagItems and no args, the tag to delete is the item itself for item in items_to_process: tags_to_delete: list[str] = [] item_hash = normalize_hash(override_hash) if override_hash else normalize_hash(get_field(item, "hash")) item_path = ( get_field(item, "path") or get_field(item, "target") ) item_store = override_store or get_field(item, "store") if _looks_like_tag_row(item): if tags_arg: tags_to_delete = tags_arg else: tag_name = get_field(item, 'tag_name') if tag_name: tags_to_delete = [str(tag_name)] else: if tags_arg: tags_to_delete = tags_arg else: continue if tags_to_delete: if _process_deletion(tags_to_delete, item_hash, item_path, item_store, config): success_count += 1 if success_count > 0: return 0 return 1 def _process_deletion(tags: list[str], file_hash: str | None, path: str | None, store_name: str | None, config: Dict[str, Any]) -> bool: """Helper to execute the deletion logic for a single target.""" if not tags: return False if not store_name: log("Store is required (use -store or pipe a result with store)", file=sys.stderr) return False resolved_hash = normalize_hash(file_hash) if file_hash else None if not resolved_hash and path: try: from SYS.utils import sha256_file resolved_hash = sha256_file(Path(path)) except Exception: resolved_hash = None if not resolved_hash: log("Item does not include a usable hash (and hash could not be derived from path)", file=sys.stderr) return False def _fetch_existing_tags() -> list[str]: try: backend = Store(config)[store_name] existing, _src = backend.get_tag(resolved_hash, config=config) return list(existing or []) except Exception: return [] # Safety: only block if this deletion would remove the final title tag title_tags = [t for t in tags if isinstance(t, str) and t.lower().startswith("title:")] if title_tags: existing_tags = _fetch_existing_tags() current_titles = [t for t in existing_tags if isinstance(t, str) and t.lower().startswith("title:")] del_title_set = {t.lower() for t in title_tags} remaining_titles = [t for t in current_titles if t.lower() not in del_title_set] if current_titles and not remaining_titles: log("Cannot delete the last title: tag. Add a replacement title first (add-tags \"title:new title\").", file=sys.stderr) return False try: backend = Store(config)[store_name] ok = backend.delete_tag(resolved_hash, list(tags), config=config) if ok: preview = resolved_hash[:12] + ('…' if len(resolved_hash) > 12 else '') debug(f"Removed {len(tags)} tag(s) from {preview} via store '{store_name}'.") _refresh_tag_view_if_current(resolved_hash, store_name, path, config) return True return False except Exception as exc: log(f"del-tag failed: {exc}") return False # Register cmdlet (no legacy decorator) CMDLET.exec = _run CMDLET.register()