from __future__ import annotations from typing import Any, Dict, List, Optional, Sequence, Tuple import sys import pipeline as ctx from . import _shared as sh Cmdlet, CmdletArg, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = ( sh.Cmdlet, sh.CmdletArg, sh.SharedArgs, sh.parse_cmdlet_args, sh.get_field, sh.normalize_hash, ) from SYS.logger import log from Store import Store class Delete_Url(Cmdlet): """Delete URL associations from files via hash+store.""" def __init__(self) -> None: super().__init__( name="delete-url", summary="Remove a URL association from a file", usage="@1 | delete-url ", arg=[ SharedArgs.QUERY, SharedArgs.STORE, CmdletArg("url", required=False, description="URL to remove (optional when piping url rows)"), ], detail=[ "- Removes URL association from file identified by hash+store", "- Multiple url can be comma-separated", ], exec=self.run, ) self.register() def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Delete URL from file via hash+store backend.""" parsed = parse_cmdlet_args(args, self) query_hash = sh.parse_single_hash_query(parsed.get("query")) if parsed.get("query") and not query_hash: log("Error: -query must be of the form hash:") return 1 # Bulk input is common in pipelines; treat a list of PipeObjects as a batch. results: List[Any] = result if isinstance(result, list) else ([result] if result is not None else []) if query_hash and len(results) > 1: log("Error: -query hash: cannot be used with multiple piped items") return 1 # Extract hash and store from result or args file_hash = query_hash or (get_field(result, "hash") if result is not None else None) store_name = parsed.get("store") or (get_field(result, "store") if result is not None else None) url_arg = parsed.get("url") # If we have multiple piped items, we will resolve hash/store per item below. if not results: if not file_hash: log("Error: No file hash provided (pipe an item or use -query \"hash:\")") return 1 if not store_name: log("Error: No store name provided") return 1 # Normalize hash (single-item mode) if not results and file_hash: file_hash = normalize_hash(file_hash) if not file_hash: log("Error: Invalid hash format") return 1 from metadata import normalize_urls def _urls_from_arg(raw: Any) -> List[str]: if raw is None: return [] # Support comma-separated input for backwards compatibility if isinstance(raw, str) and "," in raw: return [u.strip() for u in raw.split(",") if u.strip()] return [u.strip() for u in normalize_urls(raw) if str(u).strip()] urls_from_cli = _urls_from_arg(url_arg) # Get backend and delete url try: storage = Store(config) def _remove_urls(existing: Any, remove: List[str]) -> Any: # Preserve prior shape: keep str when 1 url, list when multiple. current: List[str] = [] try: if isinstance(existing, str): current = [p.strip() for p in existing.split(",") if p.strip()] elif isinstance(existing, (list, tuple)): current = [str(u).strip() for u in existing if str(u).strip()] except Exception: current = [] remove_set = {u for u in (remove or []) if u} new_urls = [u for u in current if u not in remove_set] if len(new_urls) == 1: return new_urls[0] return new_urls def _set_item_url(item: Any, merged: Any) -> None: try: if isinstance(item, dict): item["url"] = merged return if hasattr(item, "url"): setattr(item, "url", merged) except Exception: return store_override = parsed.get("store") batch: Dict[str, List[Tuple[str, List[str]]]] = {} pass_through: List[Any] = [] if results: for item in results: pass_through.append(item) raw_hash = query_hash or get_field(item, "hash") raw_store = store_override or get_field(item, "store") if not raw_hash or not raw_store: ctx.print_if_visible("[delete-url] Warning: Item missing hash/store; skipping", file=sys.stderr) continue normalized = normalize_hash(raw_hash) if not normalized: ctx.print_if_visible("[delete-url] Warning: Item has invalid hash; skipping", file=sys.stderr) continue store_text = str(raw_store).strip() if not store_text: ctx.print_if_visible("[delete-url] Warning: Item has empty store; skipping", file=sys.stderr) continue if not storage.is_available(store_text): ctx.print_if_visible( f"[delete-url] Warning: Store '{store_text}' not configured; skipping", file=sys.stderr ) continue # Determine which URLs to delete. # - If user passed an explicit , apply it to all items. # - Otherwise, when piping url rows from get-url, delete the url(s) from each item. item_urls = list(urls_from_cli) if not item_urls: item_urls = [u.strip() for u in normalize_urls(get_field(item, "url") or get_field(item, "source_url")) if str(u).strip()] if not item_urls: ctx.print_if_visible("[delete-url] Warning: Item has no url field; skipping", file=sys.stderr) continue batch.setdefault(store_text, []).append((normalized, item_urls)) for store_text, pairs in batch.items(): try: backend = storage[store_text] except Exception: continue merged: Dict[str, List[str]] = {} for h, ulist in pairs: merged.setdefault(h, []) for u in (ulist or []): if u and u not in merged[h]: merged[h].append(u) bulk_pairs = [(h, merged[h]) for h in merged.keys()] bulk_fn = getattr(backend, "delete_url_bulk", None) if callable(bulk_fn): bulk_fn(bulk_pairs, config=config) else: for h, ulist in bulk_pairs: backend.delete_url(h, ulist, config=config) deleted_count = 0 for _h, ulist in bulk_pairs: deleted_count += len(ulist or []) ctx.print_if_visible( f"✓ delete-url: {deleted_count} url(s) for {len(bulk_pairs)} item(s) in '{store_text}'", file=sys.stderr, ) for item in pass_through: existing = get_field(item, "url") # In batch mode we removed the union of requested urls for the file. # Using urls_from_cli (if present) matches the user's explicit intent; otherwise # remove the piped url row(s). remove_set = urls_from_cli if not remove_set: remove_set = [u.strip() for u in normalize_urls(get_field(item, "url") or get_field(item, "source_url")) if str(u).strip()] _set_item_url(item, _remove_urls(existing, list(remove_set))) ctx.emit(item) return 0 # Single-item mode if not urls_from_cli: urls_from_cli = [u.strip() for u in normalize_urls(get_field(result, "url") or get_field(result, "source_url")) if str(u).strip()] if not urls_from_cli: log("Error: No URL provided") return 1 backend = storage[str(store_name)] backend.delete_url(str(file_hash), list(urls_from_cli), config=config) ctx.print_if_visible(f"✓ delete-url: {len(urls_from_cli)} url(s) removed", file=sys.stderr) if result is not None: existing = get_field(result, "url") _set_item_url(result, _remove_urls(existing, list(urls_from_cli))) ctx.emit(result) return 0 except KeyError: log(f"Error: Storage backend '{store_name}' not configured") return 1 except Exception as exc: log(f"Error deleting URL: {exc}", file=sys.stderr) return 1 CMDLET = Delete_Url()