from __future__ import annotations from typing import Any, Dict, List, Optional, Sequence, Tuple import sys from SYS import pipeline as ctx from . import _shared as sh from SYS.logger import log from Store import Store class Add_Url(sh.Cmdlet): """Add URL associations to files via hash+store.""" def __init__(self) -> None: super().__init__( name="add-url", summary="Associate a URL with a file", usage="@1 | add-url ", arg=[ sh.SharedArgs.QUERY, sh.SharedArgs.STORE, sh.CmdletArg("url", required=True, description="URL to associate"), ], detail=[ "- Associates URL with file identified by hash+store", "- Multiple url can be comma-separated", ], exec=self.run, ) self.register() def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Add URL to file via hash+store backend.""" parsed = sh.parse_cmdlet_args(args, self) # Compatibility/piping fix: # `SharedArgs.QUERY` is positional in the shared parser, so `add-url ` # (and `@N | add-url `) can mistakenly parse the URL into `query`. # If `url` is missing and `query` looks like an http(s) URL, treat it as `url`. try: if (not parsed.get("url")) and isinstance(parsed.get("query"), str): q = str(parsed.get("query") or "").strip() if q.startswith(("http://", "https://")): parsed["url"] = q parsed.pop("query", None) except Exception: pass query_hash = sh.parse_single_hash_query(parsed.get("query")) if parsed.get("query") and not query_hash: log("Error: -query must be of the form hash:") return 1 # Bulk input is common in pipelines; treat a list of PipeObjects as a batch. results: List[Any] = ( result if isinstance(result, list) else ([result] if result is not None else []) ) if query_hash and len(results) > 1: log("Error: -query hash: cannot be used with multiple piped items") return 1 # Extract hash and store from result or args file_hash = query_hash or ( sh.get_field(result, "hash") if result is not None else None ) store_name = parsed.get("store") or ( sh.get_field(result, "store") if result is not None else None ) url_arg = parsed.get("url") # If we have multiple piped items, we will resolve hash/store per item below. if not results: if not file_hash: log( 'Error: No file hash provided (pipe an item or use -query "hash:")' ) return 1 if not store_name: log("Error: No store name provided") return 1 if not url_arg: log("Error: No URL provided") return 1 # Normalize hash (single-item mode) if not results and file_hash: file_hash = sh.normalize_hash(file_hash) if not file_hash: log("Error: Invalid hash format") return 1 # Parse url (comma-separated) urls = [u.strip() for u in str(url_arg).split(",") if u.strip()] if not urls: log("Error: No valid url provided") return 1 # Get backend and add url try: storage = Store(config) def _merge_urls(existing: Any, incoming: List[str]) -> List[str]: out: List[str] = [] try: if isinstance(existing, str): out.extend( [p.strip() for p in existing.split(",") if p.strip()] ) elif isinstance(existing, (list, tuple)): out.extend([str(u).strip() for u in existing if str(u).strip()]) except Exception: out = [] for u in incoming: if u and u not in out: out.append(u) return out def _set_item_url(item: Any, merged: List[str]) -> None: try: if isinstance(item, dict): if len(merged) == 1: item["url"] = merged[0] else: item["url"] = list(merged) return # PipeObject-like if hasattr(item, "url"): if len(merged) == 1: setattr(item, "url", merged[0]) else: setattr(item, "url", list(merged)) except Exception: return # Build batches per store. store_override = parsed.get("store") batch: Dict[str, List[Tuple[str, List[str]]]] = {} pass_through: List[Any] = [] if results: for item in results: pass_through.append(item) raw_hash = query_hash or sh.get_field(item, "hash") raw_store = store_override or sh.get_field(item, "store") if not raw_hash or not raw_store: ctx.print_if_visible( "[add-url] Warning: Item missing hash/store; skipping", file=sys.stderr ) continue normalized = sh.normalize_hash(raw_hash) if not normalized: ctx.print_if_visible( "[add-url] Warning: Item has invalid hash; skipping", file=sys.stderr ) continue store_text = str(raw_store).strip() if not store_text: ctx.print_if_visible( "[add-url] Warning: Item has empty store; skipping", file=sys.stderr ) continue # Validate backend exists (skip PATH/unknown). if not storage.is_available(store_text): ctx.print_if_visible( f"[add-url] Warning: Store '{store_text}' not configured; skipping", file=sys.stderr, ) continue batch.setdefault(store_text, []).append((normalized, list(urls))) # Execute per-store batches. for store_text, pairs in batch.items(): try: backend = storage[store_text] except Exception: continue # Coalesce duplicates per hash before passing to backend. merged: Dict[str, List[str]] = {} for h, ulist in pairs: merged.setdefault(h, []) for u in ulist or []: if u and u not in merged[h]: merged[h].append(u) bulk_pairs = [(h, merged[h]) for h in merged.keys()] bulk_fn = getattr(backend, "add_url_bulk", None) if callable(bulk_fn): bulk_fn(bulk_pairs, config=config) else: for h, ulist in bulk_pairs: backend.add_url(h, ulist, config=config) ctx.print_if_visible( f"✓ add-url: {len(urls)} url(s) for {len(bulk_pairs)} item(s) in '{store_text}'", file=sys.stderr, ) # Pass items through unchanged (but update url field for convenience). for item in pass_through: existing = sh.get_field(item, "url") merged = _merge_urls(existing, list(urls)) _set_item_url(item, merged) ctx.emit(item) return 0 # Single-item mode backend = storage[str(store_name)] backend.add_url(str(file_hash), urls, config=config) ctx.print_if_visible( f"✓ add-url: {len(urls)} url(s) added", file=sys.stderr ) if result is not None: existing = sh.get_field(result, "url") merged = _merge_urls(existing, list(urls)) _set_item_url(result, merged) ctx.emit(result) return 0 except KeyError: log(f"Error: Storage backend '{store_name}' not configured") return 1 except Exception as exc: log(f"Error adding URL: {exc}", file=sys.stderr) return 1 CMDLET = Add_Url()