from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, List, Sequence, Optional, Set, Tuple import sys import re from fnmatch import fnmatch from urllib.parse import urlparse from . import _shared as sh Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = ( sh.Cmdlet, sh.SharedArgs, sh.parse_cmdlet_args, sh.get_field, sh.normalize_hash, ) from SYS.logger import log from SYS.result_table import ResultTable from Store import Store from SYS import pipeline as ctx @dataclass class UrlItem: url: str hash: str store: str title: str = "" size: int | None = None ext: str = "" class Get_Url(Cmdlet): """Get url associated with files via hash+store, or search urls by pattern.""" def __init__(self) -> None: super().__init__( name="get-url", summary="List url associated with a file, or search urls by pattern", usage='@1 | get-url OR get-url -url "https://www.youtube.com/watch?v=xx"', arg=[SharedArgs.QUERY, SharedArgs.STORE, SharedArgs.URL], detail=[ "- Get url for file: @1 | get-url (requires hash+store from result)", '- Search url across stores: get-url -url "www.google.com" (strips protocol & www prefix)', '- Wildcard matching: get-url -url "youtube.com*" (matches all youtube.com urls)', "- Pattern matching: domain matching ignores protocol (https://, http://, ftp://)", ], exec=self.run, ) self.register() @staticmethod def _normalize_url_for_search(url: str) -> str: """Strip protocol and www prefix from URL for searching. Examples: https://www.youtube.com/watch?v=xx -> youtube.com/watch?v=xx http://www.google.com -> google.com ftp://files.example.com -> files.example.com """ url = str(url or "").strip() # Remove protocol (http://, https://, ftp://, etc.) url = re.sub(r"^[a-z][a-z0-9+.-]*://", "", url, flags=re.IGNORECASE) # Remove www. prefix (case-insensitive) url = re.sub(r"^www\.", "", url, flags=re.IGNORECASE) return url.lower() @staticmethod def _match_url_pattern(url: str, pattern: str) -> bool: """Match URL against pattern with wildcard support. Strips protocol/www from both URL and pattern before matching. Supports * and ? wildcards. """ normalized_url = Get_Url._normalize_url_for_search(url) normalized_pattern = Get_Url._normalize_url_for_search(pattern) # Use fnmatch for wildcard matching (* and ?) return fnmatch(normalized_url, normalized_pattern) @staticmethod def _extract_first_url(value: Any) -> Optional[str]: if isinstance(value, str): v = value.strip() return v or None if isinstance(value, (list, tuple)): for item in value: if isinstance(item, str) and item.strip(): return item.strip() return None @staticmethod def _extract_url_from_result(result: Any) -> Optional[str]: # Prefer explicit url field. u = Get_Url._extract_first_url(get_field(result, "url")) if u: return u # Fall back to ResultTable-style columns list. cols = None if isinstance(result, dict): cols = result.get("columns") else: cols = getattr(result, "columns", None) if isinstance(cols, list): for pair in cols: try: if isinstance(pair, (list, tuple)) and len(pair) == 2: k, v = pair if str(k or "").strip().lower() in {"url", "urls"}: u2 = Get_Url._extract_first_url(v) if u2: return u2 except Exception: continue return None @staticmethod def _extract_title_from_result(result: Any) -> Optional[str]: # Prefer explicit title field. t = get_field(result, "title") if isinstance(t, str) and t.strip(): return t.strip() # Fall back to ResultTable-style columns list. cols = None if isinstance(result, dict): cols = result.get("columns") else: cols = getattr(result, "columns", None) if isinstance(cols, list): for pair in cols: try: if isinstance(pair, (list, tuple)) and len(pair) == 2: k, v = pair if str(k or "").strip().lower() in {"title", "name"}: if isinstance(v, str) and v.strip(): return v.strip() except Exception: continue return None @staticmethod def _resolve_title_for_hash(backend: Any, file_hash: str, hit: Any = None) -> str: """Best-effort title resolution for a found hash. Strategy: - Use the hit's existing title/columns when present. - Prefer backend.get_metadata(hash) when available (direct lookup). - Fallback to backend.search('hash:', limit=1) and read title. """ try: if hit is not None: from_hit = Get_Url._extract_title_from_result(hit) if from_hit: return from_hit except Exception: pass try: if hasattr(backend, "get_metadata"): meta = backend.get_metadata(file_hash) if isinstance(meta, dict): t = meta.get("title") if isinstance(t, str) and t.strip(): return t.strip() except Exception: pass try: if hasattr(backend, "search"): hits = backend.search(f"hash:{file_hash}", limit=1) if isinstance(hits, list) and hits: t2 = Get_Url._extract_title_from_result(hits[0]) if t2: return t2 except Exception: pass return "" @staticmethod def _resolve_size_ext_for_hash(backend: Any, file_hash: str, hit: Any = None) -> tuple[int | None, str]: """Best-effort (size, ext) resolution for a found hash.""" # First: see if the hit already includes these fields. try: size_val = get_field(hit, "size") if size_val is None: size_val = get_field(hit, "file_size") if size_val is None: size_val = get_field(hit, "filesize") if size_val is None: size_val = get_field(hit, "size_bytes") size_int = int(size_val) if isinstance(size_val, (int, float)) else None except Exception: size_int = None try: ext_val = get_field(hit, "ext") if ext_val is None: ext_val = get_field(hit, "extension") ext = str(ext_val).strip().lstrip(".") if isinstance(ext_val, str) else "" except Exception: ext = "" if size_int is not None or ext: return size_int, ext # Next: backend.get_metadata(hash) when available. try: if hasattr(backend, "get_metadata"): meta = backend.get_metadata(file_hash) if isinstance(meta, dict): size_val2 = meta.get("size") if size_val2 is None: size_val2 = meta.get("file_size") if size_val2 is None: size_val2 = meta.get("filesize") if size_val2 is None: size_val2 = meta.get("size_bytes") if isinstance(size_val2, (int, float)): size_int = int(size_val2) ext_val2 = meta.get("ext") if ext_val2 is None: ext_val2 = meta.get("extension") if isinstance(ext_val2, str) and ext_val2.strip(): ext = ext_val2.strip().lstrip(".") except Exception: pass return size_int, ext def _search_urls_across_stores(self, pattern: str, config: Dict[str, Any]) -> Tuple[List[UrlItem], List[str]]: """Search for URLs matching pattern across all stores. Returns: Tuple of (matching_items, found_stores) """ items: List[UrlItem] = [] found_stores: Set[str] = set() try: storage = Store(config) store_names = storage.list_backends() if hasattr(storage, "list_backends") else [] if not store_names: log("Error: No stores configured", file=sys.stderr) return items, list(found_stores) for store_name in store_names: try: backend = storage[store_name] title_cache: Dict[str, str] = {} meta_cache: Dict[str, tuple[int | None, str]] = {} # Search only URL-bearing records using the backend's URL search capability. # This avoids the expensive/incorrect "search('*')" scan. try: raw_pattern = str(pattern or "").strip() has_wildcards = any(ch in raw_pattern for ch in ("*", "?")) # If this is a Hydrus backend and the pattern is a single URL, # normalize it through the official API. normalized_url = None if not has_wildcards and hasattr(backend, "get_url_info"): try: info = backend.get_url_info(raw_pattern) # type: ignore[attr-defined] if isinstance(info, dict): norm = info.get("normalised_url") or info.get("normalized_url") if isinstance(norm, str) and norm.strip(): normalized_url = norm.strip() except Exception: normalized_url = None search_query = "url:*" if has_wildcards else f"url:{normalized_url or raw_pattern}" try: search_results = backend.search(search_query, limit=1000) except Exception: search_results = [] for hit in (search_results or []): file_hash = None if isinstance(hit, dict): file_hash = hit.get("hash") or hit.get("file_hash") if not file_hash: continue file_hash = str(file_hash) title = title_cache.get(file_hash, "") if not title: title = self._resolve_title_for_hash(backend, file_hash, hit) title_cache[file_hash] = title size, ext = meta_cache.get(file_hash, (None, "")) if size is None and not ext: size, ext = self._resolve_size_ext_for_hash(backend, file_hash, hit) meta_cache[file_hash] = (size, ext) try: urls = backend.get_url(file_hash) except Exception: urls = [] for url in (urls or []): if not self._match_url_pattern(str(url), raw_pattern): continue items.append( UrlItem( url=str(url), hash=str(file_hash), store=str(store_name), title=str(title or ""), size=size, ext=str(ext or ""), ) ) found_stores.add(str(store_name)) except Exception as exc: debug( f"Error searching store '{store_name}': {exc}", file=sys.stderr ) continue except KeyError: continue except Exception as exc: debug( f"Error searching store '{store_name}': {exc}", file=sys.stderr ) continue return items, list(found_stores) except Exception as exc: log(f"Error searching stores: {exc}", file=sys.stderr) return items, [] def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Get url for file via hash+store, or search urls by pattern.""" parsed = parse_cmdlet_args(args, self) # Check if user provided a URL pattern to search for search_pattern = parsed.get("url") # Allow piping a URL row (or any result with a url field/column) into get-url. if not search_pattern: search_pattern = self._extract_url_from_result(result) if search_pattern: # URL search mode: find all files with matching URLs across stores items, stores_searched = self._search_urls_across_stores(search_pattern, config) if not items: log(f"No urls matching pattern: {search_pattern}", file=sys.stderr) return 1 # NOTE: The CLI can auto-render tables from emitted items. When emitting # dataclass objects, the generic-object renderer will include `hash` as a # visible column. To keep HASH available for chaining but hidden from the # table, emit dicts (dict rendering hides `hash`) and provide an explicit # `columns` list to force display order and size formatting. display_items: List[Dict[str, Any]] = [] table = ( ResultTable( "url", max_columns=5 ).set_preserve_order(True).set_table("url").set_value_case("preserve") ) table.set_source_command("get-url", ["-url", search_pattern]) for item in items: payload: Dict[str, Any] = { # Keep fields for downstream cmdlets. "hash": item.hash, "store": item.store, "url": item.url, "title": item.title, "size": item.size, "ext": item.ext, # Force the visible table columns + ordering. "columns": [ ("Title", item.title), ("Url", item.url), ("Size", item.size), ("Ext", item.ext), ("Store", item.store), ], } display_items.append(payload) table.add_result(payload) ctx.emit(payload) ctx.set_last_result_table(table if display_items else None, display_items, subject=result) log( f"Found {len(items)} matching url(s) in {len(stores_searched)} store(s)" ) return 0 # Original mode: Get URLs for a specific file by hash+store query_hash = sh.parse_single_hash_query(parsed.get("query")) if parsed.get("query") and not query_hash: log("Error: -query must be of the form hash:") return 1 # Extract hash and store from result or args file_hash = query_hash or get_field(result, "hash") store_name = parsed.get("store") or get_field(result, "store") if not file_hash: log( 'Error: No file hash provided (pipe an item or use -query "hash:")' ) return 1 if not store_name: log("Error: No store name provided") return 1 # Normalize hash file_hash = normalize_hash(file_hash) if not file_hash: log("Error: Invalid hash format") return 1 # Get backend and retrieve url try: storage = Store(config) backend = storage[store_name] urls = backend.get_url(file_hash) title = str(get_field(result, "title") or "").strip() table_title = "Title" if title: table_title = f"Title: {title}" table = ( ResultTable( table_title, max_columns=1 ).set_preserve_order(True).set_table("url").set_value_case("preserve") ) table.set_source_command("get-url", []) items: List[UrlItem] = [] for u in list(urls or []): u = str(u or "").strip() if not u: continue row = table.add_row() row.add_column("Url", u) item = UrlItem(url=u, hash=file_hash, store=str(store_name)) items.append(item) ctx.emit(item) # Make this a real result table so @.. / @,, can navigate it ctx.set_last_result_table(table if items else None, items, subject=result) if not items: log("No url found", file=sys.stderr) return 0 except KeyError: log(f"Error: Storage backend '{store_name}' not configured") return 1 except Exception as exc: log(f"Error retrieving url: {exc}", file=sys.stderr) return 1 # Import debug function from logger if available try: from SYS.logger import debug except ImportError: def debug(*args, **kwargs): pass # Fallback no-op CMDLET = Get_Url()