from __future__ import annotations from typing import Any, Dict, Sequence import json from . import register import models import pipeline as ctx from helper import hydrus as hydrus_wrapper from ._shared import Cmdlet, CmdletArg, normalize_hash, parse_tag_arguments from helper.logger import debug, log CMDLET = Cmdlet( name="delete-tags", summary="Remove tags from a Hydrus file.", usage="del-tags [-hash ] [,...]", aliases=["del-tag", "del-tags", "delete-tag"], args=[ CmdletArg("-hash", description="Override the Hydrus file hash (SHA256) to target instead of the selected result."), CmdletArg("[,...]", required=True, description="One or more tags to remove. Comma- or space-separated."), ], details=[ "- Requires a Hydrus file (hash present) or explicit -hash override.", "- Multiple tags can be comma-separated or space-separated.", ], ) @register(["del-tag", "del-tags", "delete-tag", "delete-tags"]) # Still needed for backward compatibility def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: # Help try: if any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in args): log(json.dumps(CMDLET, ensure_ascii=False, indent=2)) return 0 except Exception: pass # Check if we have a piped TagItem with no args (i.e., from @1 | delete-tag) has_piped_tag = (result and hasattr(result, '__class__') and result.__class__.__name__ == 'TagItem' and hasattr(result, 'tag_name')) # Check if we have a piped list of TagItems (from @N selection) has_piped_tag_list = (isinstance(result, list) and result and hasattr(result[0], '__class__') and result[0].__class__.__name__ == 'TagItem') if not args and not has_piped_tag and not has_piped_tag_list: log("Requires at least one tag argument") return 1 # Parse -hash override and collect tags from remaining args override_hash: str | None = None rest: list[str] = [] i = 0 while i < len(args): a = args[i] low = str(a).lower() if low in {"-hash", "--hash", "hash"} and i + 1 < len(args): override_hash = str(args[i + 1]).strip() i += 2 continue rest.append(a) i += 1 # Check if first argument is @ syntax (result table selection) # @5 or @{2,5,8} to delete tags from ResultTable by index tags_from_at_syntax = [] hash_from_at_syntax = None file_path_from_at_syntax = None if rest and str(rest[0]).startswith("@"): selector_arg = str(rest[0]) pipe_selector = selector_arg[1:].strip() # Parse @N or @{N,M,K} syntax if pipe_selector.startswith("{") and pipe_selector.endswith("}"): # @{2,5,8} pipe_selector = pipe_selector[1:-1] try: indices = [int(tok.strip()) for tok in pipe_selector.split(',') if tok.strip()] except ValueError: log("Invalid selection syntax. Use @2 or @{2,5,8}") return 1 # Get the last ResultTable from pipeline context try: last_table = ctx._LAST_RESULT_TABLE if last_table: # Extract tags from selected rows for idx in indices: if 1 <= idx <= len(last_table.rows): # Look for a TagItem in _LAST_RESULT_ITEMS by index if idx - 1 < len(ctx._LAST_RESULT_ITEMS): item = ctx._LAST_RESULT_ITEMS[idx - 1] if hasattr(item, '__class__') and item.__class__.__name__ == 'TagItem': tag_name = getattr(item, 'tag_name', None) if tag_name: log(f"[delete_tag] Extracted tag from @{idx}: {tag_name}") tags_from_at_syntax.append(tag_name) # Also get hash from first item for consistency if not hash_from_at_syntax: hash_from_at_syntax = getattr(item, 'hash_hex', None) if not file_path_from_at_syntax: file_path_from_at_syntax = getattr(item, 'file_path', None) if not tags_from_at_syntax: log(f"No tags found at indices: {indices}") return 1 else: log("No ResultTable in pipeline (use @ after running get-tag)") return 1 except Exception as exc: log(f"Error processing @ selection: {exc}", file=__import__('sys').stderr) return 1 # Handle @N selection which creates a list - extract the first item # If we have a list of TagItems, we want to process ALL of them if no args provided # This handles: delete-tag @1 (where @1 expands to a list containing one TagItem) # Also handles: delete-tag @1,2 (where we want to delete tags from multiple files) # Normalize result to a list for processing items_to_process = [] if isinstance(result, list): items_to_process = result elif result: items_to_process = [result] # If we have TagItems and no args, we are deleting the tags themselves # If we have Files (or other objects) and args, we are deleting tags FROM those files # Check if we are in "delete selected tags" mode (TagItems) is_tag_item_mode = (items_to_process and hasattr(items_to_process[0], '__class__') and items_to_process[0].__class__.__name__ == 'TagItem') if is_tag_item_mode: # Collect all tags to delete from the TagItems # Group by hash/file_path to batch operations if needed, or just process one by one # For simplicity, we'll process one by one or group by file pass else: # "Delete tags from files" mode # We need args (tags to delete) if not args and not tags_from_at_syntax: log("Requires at least one tag argument when deleting from files") return 1 # Process each item success_count = 0 # If we have tags from @ syntax (e.g. delete-tag @{1,2}), we ignore the piped result for tag selection # but we might need the piped result for the file context if @ selection was from a Tag table # Actually, the @ selection logic above already extracted tags. if tags_from_at_syntax: # Special case: @ selection of tags. # We already extracted tags and hash/path. # Just run the deletion once using the extracted info. # This preserves the existing logic for @ selection. tags = tags_from_at_syntax hash_hex = normalize_hash(override_hash) if override_hash else normalize_hash(hash_from_at_syntax) file_path = file_path_from_at_syntax if _process_deletion(tags, hash_hex, file_path, config): success_count += 1 else: # Process items from pipe (or single result) # If args are provided, they are the tags to delete from EACH item # If items are TagItems and no args, the tag to delete is the item itself tags_arg = parse_tag_arguments(rest) for item in items_to_process: tags_to_delete = [] item_hash = normalize_hash(override_hash) if override_hash else normalize_hash(getattr(item, "hash_hex", None)) item_path = getattr(item, "path", None) or getattr(item, "file_path", None) or getattr(item, "target", None) # If result is a dict (e.g. from search-file), try getting path from keys if not item_path and isinstance(item, dict): item_path = item.get("path") or item.get("file_path") or item.get("target") item_source = getattr(item, "source", None) if hasattr(item, '__class__') and item.__class__.__name__ == 'TagItem': # It's a TagItem if tags_arg: # User provided tags to delete FROM this file (ignoring the tag name in the item?) # Or maybe they want to delete the tag in the item AND the args? # Usually if piping TagItems, we delete THOSE tags. # If args are present, maybe we should warn? # For now, if args are present, assume they override or add to the tag item? # Let's assume if args are present, we use args. If not, we use the tag name. tags_to_delete = tags_arg else: tag_name = getattr(item, 'tag_name', None) if tag_name: tags_to_delete = [tag_name] else: # It's a File or other object if tags_arg: tags_to_delete = tags_arg else: # No tags provided for a file object - skip or error? # We already logged an error if no args and not TagItem mode globally, # but inside the loop we might have mixed items? Unlikely. continue if tags_to_delete and (item_hash or item_path): if _process_deletion(tags_to_delete, item_hash, item_path, config, source=item_source): success_count += 1 if success_count > 0: return 0 return 1 def _process_deletion(tags: list[str], hash_hex: str | None, file_path: str | None, config: Dict[str, Any], source: str | None = None) -> bool: """Helper to execute the deletion logic for a single target.""" if not tags: return False if not hash_hex and not file_path: log("Item does not include a hash or file path") return False # Handle local file tag deletion if file_path and (source == "local" or (not hash_hex and source != "hydrus")): try: from helper.local_library import LocalLibraryDB from pathlib import Path path_obj = Path(file_path) if not path_obj.exists(): log(f"File not found: {file_path}") return False # Try to get local storage path from config from config import get_local_storage_path local_root = get_local_storage_path(config) if not local_root: # Fallback: assume file is in a library root or use its parent local_root = path_obj.parent db = LocalLibraryDB(local_root) db.remove_tags(path_obj, tags) debug(f"Removed {len(tags)} tag(s) from {path_obj.name} (local)") return True except Exception as exc: log(f"Failed to remove local tags: {exc}") return False # Hydrus deletion logic if not hash_hex: return False try: service_name = hydrus_wrapper.get_tag_service_name(config) client = hydrus_wrapper.get_client(config) if client is None: log("Hydrus client unavailable") return False debug(f"Sending deletion request: hash={hash_hex}, tags={tags}, service={service_name}") client.delete_tags(hash_hex, tags, service_name) preview = hash_hex[:12] + ('…' if len(hash_hex) > 12 else '') debug(f"Removed {len(tags)} tag(s) from {preview} via '{service_name}'.") return True except Exception as exc: log(f"Hydrus del-tag failed: {exc}") return False