"""Delete file relationships.""" from __future__ import annotations from typing import Any, Dict, Optional, Sequence import json from pathlib import Path import sys from SYS.logger import log import pipeline as ctx from . import _shared as sh Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg SharedArgs = sh.SharedArgs parse_cmdlet_args = sh.parse_cmdlet_args normalize_hash = sh.normalize_hash normalize_result_input = sh.normalize_result_input get_field = sh.get_field should_show_help = sh.should_show_help from API.folder import API_folder_store from Store import Store from config import get_local_storage_path def _extract_hash(item: Any) -> Optional[str]: h = get_field(item, "hash_hex") or get_field(item, "hash") or get_field(item, "file_hash") return normalize_hash(str(h)) if h else None def _upsert_relationships(db: API_folder_store, file_hash: str, relationships: Dict[str, Any]) -> None: conn = db.connection if conn is None: raise RuntimeError("Store DB connection is not initialized") cursor = conn.cursor() cursor.execute( """ INSERT INTO metadata (hash, relationships) VALUES (?, ?) ON CONFLICT(hash) DO UPDATE SET relationships = excluded.relationships, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP """, (file_hash, json.dumps(relationships) if relationships else "{}"), ) def _remove_reverse_link(db: API_folder_store, *, src_hash: str, dst_hash: str, rel_type: str) -> None: meta = db.get_metadata(dst_hash) or {} rels = meta.get("relationships") if isinstance(meta, dict) else None if not isinstance(rels, dict) or not rels: return key_to_edit: Optional[str] = None for k in list(rels.keys()): if str(k).lower() == str(rel_type).lower(): key_to_edit = str(k) break if not key_to_edit: return bucket = rels.get(key_to_edit) if not isinstance(bucket, list) or not bucket: return new_bucket = [h for h in bucket if str(h).lower() != str(src_hash).lower()] if new_bucket: rels[key_to_edit] = new_bucket else: try: del rels[key_to_edit] except Exception: rels[key_to_edit] = [] _upsert_relationships(db, dst_hash, rels) def _refresh_relationship_view_if_current(target_hash: Optional[str], target_path: Optional[str], other: Optional[str], config: Dict[str, Any]) -> None: """If the current subject matches the target, refresh relationships via get-relationship.""" try: from cmdlet import get as get_cmdlet # type: ignore except Exception: return try: subject = ctx.get_last_result_subject() if subject is None: return def norm(val: Any) -> str: return str(val).lower() target_hashes = [norm(v) for v in [target_hash, other] if v] target_paths = [norm(v) for v in [target_path, other] if v] subj_hashes: list[str] = [] subj_paths: list[str] = [] for field in ("hydrus_hash", "hash", "hash_hex", "file_hash"): val = get_field(subject, field) if val: subj_hashes.append(norm(val)) for field in ("file_path", "path", "target"): val = get_field(subject, field) if val: subj_paths.append(norm(val)) is_match = False if target_hashes and any(h in subj_hashes for h in target_hashes): is_match = True if target_paths and any(p in subj_paths for p in target_paths): is_match = True if not is_match: return refresh_args: list[str] = [] if target_hash: refresh_args.extend(["-hash", target_hash]) cmd = get_cmdlet("get-relationship") if not cmd: return cmd(subject, refresh_args, config) except Exception: pass def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Delete relationships from files. Args: result: Input result(s) from previous cmdlet args: Command arguments config: CLI configuration Returns: Exit code (0 = success) """ try: if should_show_help(args): log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}") return 0 # Parse arguments parsed_args = parse_cmdlet_args(args, CMDLET) delete_all_flag = parsed_args.get("all", False) rel_type_filter = parsed_args.get("type") override_store = parsed_args.get("store") override_hash = parsed_args.get("hash") raw_path = parsed_args.get("path") # Normalize input results = normalize_result_input(result) # Allow store/hash-first usage when no pipeline items were provided if (not results) and override_hash: raw = str(override_hash) parts = [p.strip() for p in raw.replace(";", ",").split(",") if p.strip()] hashes = [h for h in (normalize_hash(p) for p in parts) if h] if not hashes: log("Invalid -hash value (expected 64-hex sha256)", file=sys.stderr) return 1 if not override_store: log("-store is required when using -hash without piped items", file=sys.stderr) return 1 results = [{"hash": h, "store": str(override_store)} for h in hashes] if not results: # Legacy -path mode below may still apply if raw_path: results = [{"file_path": raw_path}] else: log("No results to process", file=sys.stderr) return 1 # Decide store (for same-store enforcement + folder-store DB routing) store_name: Optional[str] = str(override_store).strip() if override_store else None if not store_name: stores = {str(get_field(r, "store")) for r in results if get_field(r, "store")} if len(stores) == 1: store_name = next(iter(stores)) elif len(stores) > 1: log("Multiple stores detected in pipeline; use -store to choose one", file=sys.stderr) return 1 deleted_count = 0 # STORE/HASH FIRST: folder-store DB deletion (preferred) if store_name: backend = None store_root: Optional[Path] = None try: store = Store(config) backend = store[str(store_name)] loc = getattr(backend, "location", None) if callable(loc): store_root = Path(str(loc())) except Exception: backend = None store_root = None if store_root is not None: try: with API_folder_store(store_root) as db: conn = db.connection if conn is None: raise RuntimeError("Store DB connection is not initialized") for single_result in results: # Enforce same-store when items carry store info item_store = get_field(single_result, "store") if item_store and str(item_store) != str(store_name): log(f"Cross-store delete blocked: item store '{item_store}' != '{store_name}'", file=sys.stderr) return 1 file_hash = _extract_hash(single_result) if not file_hash: # Try path -> hash lookup within this store fp = ( get_field(single_result, "file_path") or get_field(single_result, "path") or get_field(single_result, "target") ) if fp: try: file_hash = db.get_file_hash(Path(str(fp))) except Exception: file_hash = None if not file_hash: log("Could not extract file hash for deletion (use -hash or ensure pipeline includes hash)", file=sys.stderr) return 1 meta = db.get_metadata(file_hash) or {} rels = meta.get("relationships") if isinstance(meta, dict) else None if not isinstance(rels, dict) or not rels: continue if delete_all_flag: # remove reverse edges for all types for rt, hashes in list(rels.items()): if not isinstance(hashes, list): continue for other_hash in hashes: other_norm = normalize_hash(str(other_hash)) if other_norm: _remove_reverse_link(db, src_hash=file_hash, dst_hash=other_norm, rel_type=str(rt)) rels = {} elif rel_type_filter: # delete one type (case-insensitive key match) key_to_delete: Optional[str] = None for k in list(rels.keys()): if str(k).lower() == str(rel_type_filter).lower(): key_to_delete = str(k) break if not key_to_delete: continue hashes = rels.get(key_to_delete) if isinstance(hashes, list): for other_hash in hashes: other_norm = normalize_hash(str(other_hash)) if other_norm: _remove_reverse_link(db, src_hash=file_hash, dst_hash=other_norm, rel_type=str(key_to_delete)) try: del rels[key_to_delete] except Exception: rels[key_to_delete] = [] else: log("Specify --all to delete all relationships or -type to delete specific type", file=sys.stderr) return 1 _upsert_relationships(db, file_hash, rels) conn.commit() _refresh_relationship_view_if_current(file_hash, None, None, config) deleted_count += 1 log(f"Successfully deleted relationships from {deleted_count} file(s)", file=sys.stderr) return 0 except Exception as exc: log(f"Error deleting store relationships: {exc}", file=sys.stderr) return 1 # LEGACY PATH MODE (single local DB) # Get storage path local_storage_path = get_local_storage_path(config) if not local_storage_path: log("Local storage path not configured", file=sys.stderr) return 1 try: with API_folder_store(Path(local_storage_path)) as db: conn = db.connection if conn is None: raise RuntimeError("Store DB connection is not initialized") for single_result in results: # Get file path from result file_path_from_result = ( get_field(single_result, "file_path") or get_field(single_result, "path") or get_field(single_result, "target") or (str(single_result) if not isinstance(single_result, dict) else None) ) if not file_path_from_result: log("Could not extract file path from result", file=sys.stderr) return 1 file_path_obj = Path(str(file_path_from_result)) if not file_path_obj.exists(): log(f"File not found: {file_path_obj}", file=sys.stderr) return 1 try: file_hash = db.get_file_hash(file_path_obj) except Exception: file_hash = None file_hash = normalize_hash(str(file_hash)) if file_hash else None if not file_hash: log(f"File not in database: {file_path_obj.name}", file=sys.stderr) continue meta = db.get_metadata(file_hash) or {} rels = meta.get("relationships") if isinstance(meta, dict) else None if not isinstance(rels, dict) or not rels: continue if delete_all_flag: for rt, hashes in list(rels.items()): if not isinstance(hashes, list): continue for other_hash in hashes: other_norm = normalize_hash(str(other_hash)) if other_norm: _remove_reverse_link(db, src_hash=file_hash, dst_hash=other_norm, rel_type=str(rt)) rels = {} elif rel_type_filter: key_to_delete: Optional[str] = None for k in list(rels.keys()): if str(k).lower() == str(rel_type_filter).lower(): key_to_delete = str(k) break if not key_to_delete: continue hashes = rels.get(key_to_delete) if isinstance(hashes, list): for other_hash in hashes: other_norm = normalize_hash(str(other_hash)) if other_norm: _remove_reverse_link(db, src_hash=file_hash, dst_hash=other_norm, rel_type=str(key_to_delete)) try: del rels[key_to_delete] except Exception: rels[key_to_delete] = [] else: log("Specify --all to delete all relationships or -type to delete specific type", file=sys.stderr) return 1 _upsert_relationships(db, file_hash, rels) conn.commit() _refresh_relationship_view_if_current(file_hash, str(file_path_obj), None, config) deleted_count += 1 except Exception as exc: log(f"Error deleting relationship: {exc}", file=sys.stderr) return 1 log(f"Successfully deleted relationships from {deleted_count} file(s)", file=sys.stderr) return 0 except Exception as exc: log(f"Error in delete-relationship: {exc}", file=sys.stderr) return 1 CMDLET = Cmdlet( name="delete-relationship", summary="Remove relationships from files.", usage="@1 | delete-relationship --all OR delete-relationship -path --all OR @1-3 | delete-relationship -type alt", arg=[ CmdletArg("path", type="string", description="Specify the local file path (legacy mode, if not piping a result)."), SharedArgs.STORE, SharedArgs.HASH, CmdletArg("all", type="flag", description="Delete all relationships for the file(s)."), CmdletArg("type", type="string", description="Delete specific relationship type ('alt', 'king', 'related'). Default: delete all types."), ], detail=[ "- Delete all relationships: pipe files | delete-relationship --all", "- Delete specific type: pipe files | delete-relationship -type alt", "- Delete all from file: delete-relationship -path --all", ], ) CMDLET.exec = _run CMDLET.register()