from __future__ import annotations from queue import SimpleQueue from threading import Thread from dataclasses import dataclass from typing import Any, Dict, List, Sequence, Optional, Set, Tuple import sys import re from fnmatch import fnmatch from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse from . import _shared as sh Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = ( sh.Cmdlet, sh.SharedArgs, sh.parse_cmdlet_args, sh.get_field, sh.normalize_hash, ) from SYS.logger import log from SYS.result_table import ResultTable from Store import Store from SYS import pipeline as ctx @dataclass class UrlItem: url: str hash: str store: str title: str = "" size: int | None = None ext: str = "" class Get_Url(Cmdlet): """Get url associated with files via hash+store, or search urls by pattern.""" STORE_SEARCH_TIMEOUT_SECONDS = 6.0 def __init__(self) -> None: super().__init__( name="get-url", summary="List url associated with a file, or search urls by pattern", usage='@1 | get-url OR get-url -url "https://www.youtube.com/watch?v=xx"', arg=[SharedArgs.QUERY, SharedArgs.STORE, SharedArgs.URL], detail=[ "- Get url for file: @1 | get-url (requires hash+store from result)", '- Search url across stores: get-url -url "www.google.com" (strips protocol & www prefix)', '- Wildcard matching: get-url -url "youtube.com*" (matches all youtube.com urls)', "- Pattern matching: domain matching ignores protocol (https://, http://, ftp://)", ], exec=self.run, ) self.register() @staticmethod def _normalize_url_for_search(url: str) -> str: """Strip protocol and www prefix from URL for searching. Examples: https://www.youtube.com/watch?v=xx -> youtube.com/watch?v=xx http://www.google.com -> google.com ftp://files.example.com -> files.example.com """ url = str(url or "").strip() # Strip fragment (e.g., #t=10) before matching url = url.split("#", 1)[0] # Strip common time/tracking query params for matching try: parsed = urlparse(url) except Exception: parsed = None if parsed is not None and parsed.query: time_keys = {"t", "start", "time_continue", "timestamp", "time", "begin"} tracking_prefixes = ("utm_",) try: pairs = parse_qsl(parsed.query, keep_blank_values=True) filtered = [] for key, val in pairs: key_norm = str(key or "").lower() if key_norm in time_keys: continue if key_norm.startswith(tracking_prefixes): continue filtered.append((key, val)) if filtered: url = urlunparse(parsed._replace(query=urlencode(filtered, doseq=True))) else: url = urlunparse(parsed._replace(query="")) except Exception: pass # Remove protocol (http://, https://, ftp://, etc.) url = re.sub(r"^[a-z][a-z0-9+.-]*://", "", url, flags=re.IGNORECASE) # Remove www. prefix (case-insensitive) url = re.sub(r"^www\.", "", url, flags=re.IGNORECASE) return url.lower() @staticmethod def _looks_like_url_pattern(value: str) -> bool: v = str(value or "").strip().lower() if not v: return False if "://" in v: return True if v.startswith(("magnet:", "torrent:", "ytdl:", "tidal:", "ftp:", "sftp:", "file:")): return True return "." in v and "/" in v @staticmethod def _match_url_pattern(url: str, pattern: str) -> bool: """Match URL against pattern with wildcard support. Strips protocol/www from both URL and pattern before matching. Supports * and ? wildcards. """ raw_pattern = str(pattern or "").strip() normalized_url = Get_Url._normalize_url_for_search(url) normalized_pattern = Get_Url._normalize_url_for_search(raw_pattern) looks_like_url = Get_Url._looks_like_url_pattern(raw_pattern) has_wildcards = "*" in normalized_pattern or ( not looks_like_url and "?" in normalized_pattern ) if has_wildcards: return fnmatch(normalized_url, normalized_pattern) normalized_url_no_slash = normalized_url.rstrip("/") normalized_pattern_no_slash = normalized_pattern.rstrip("/") if normalized_pattern_no_slash and normalized_pattern_no_slash == normalized_url_no_slash: return True return normalized_pattern in normalized_url def _execute_search_with_timeout( self, backend: Any, query: str, limit: int, store_name: str, **kwargs: Any, ) -> Optional[List[Any]]: queue: SimpleQueue[tuple[str, Any]] = SimpleQueue() def _worker() -> None: try: queue.put(("ok", backend.search(query, limit=limit, **kwargs))) except Exception as exc: queue.put(("err", exc)) worker = Thread(target=_worker, daemon=True) worker.start() worker.join(timeout=self.STORE_SEARCH_TIMEOUT_SECONDS) if worker.is_alive(): debug( f"Store '{store_name}' search timed out after {self.STORE_SEARCH_TIMEOUT_SECONDS}s", file=sys.stderr, ) return None if queue.empty(): return [] status, payload = queue.get() if status == "err": debug( f"Store '{store_name}' search failed: {payload}", file=sys.stderr, ) return [] return payload or [] @staticmethod def _extract_first_url(value: Any) -> Optional[str]: if isinstance(value, str): v = value.strip() return v or None if isinstance(value, (list, tuple)): for item in value: if isinstance(item, str) and item.strip(): return item.strip() return None @staticmethod def _extract_urls_from_hit(hit: Any) -> List[str]: """Extract candidate URLs directly from a search hit, if present.""" raw = None try: raw = get_field(hit, "known_urls") if not raw: raw = get_field(hit, "urls") if not raw: raw = get_field(hit, "url") if not raw: raw = get_field(hit, "source_url") or get_field(hit, "source_urls") except Exception: raw = None if isinstance(raw, str): val = raw.strip() return [val] if val else [] if isinstance(raw, (list, tuple)): out: list[str] = [] for item in raw: if not isinstance(item, str): continue v = item.strip() if v: out.append(v) return out return [] @staticmethod def _extract_title_from_result(result: Any) -> Optional[str]: # Prefer explicit title field. # Fall back to ResultTable-style columns list. cols = None if isinstance(result, dict): cols = result.get("columns") else: cols = getattr(result, "columns", None) if isinstance(cols, list): for pair in cols: try: if isinstance(pair, (list, tuple)) and len(pair) == 2: k, v = pair if str(k or "").strip().lower() in {"title", "name"}: if isinstance(v, str) and v.strip(): return v.strip() except Exception: continue return None @staticmethod def _resolve_title_for_hash(backend: Any, file_hash: str, hit: Any = None) -> str: """Best-effort title resolution for a found hash. Strategy: - Use the hit's existing title/columns when present. - Prefer backend.get_metadata(hash) when available (direct lookup). - Fallback to backend.search('hash:', limit=1) and read title. """ try: if hit is not None: from_hit = Get_Url._extract_title_from_result(hit) if from_hit: return from_hit except Exception: pass try: if hasattr(backend, "get_metadata"): meta = backend.get_metadata(file_hash) if isinstance(meta, dict): t = meta.get("title") if isinstance(t, str) and t.strip(): return t.strip() except Exception: pass try: if hasattr(backend, "search"): hits = backend.search(f"hash:{file_hash}", limit=1) if isinstance(hits, list) and hits: t2 = Get_Url._extract_title_from_result(hits[0]) if t2: return t2 except Exception: pass return "" @staticmethod def _resolve_size_ext_for_hash(backend: Any, file_hash: str, hit: Any = None) -> tuple[int | None, str]: """Best-effort (size, ext) resolution for a found hash.""" # First: see if the hit already includes these fields. try: size_val = get_field(hit, "size") if size_val is None: size_val = get_field(hit, "file_size") if size_val is None: size_val = get_field(hit, "filesize") if size_val is None: size_val = get_field(hit, "size_bytes") size_int = int(size_val) if isinstance(size_val, (int, float)) else None except Exception: size_int = None try: ext_val = get_field(hit, "ext") if ext_val is None: ext_val = get_field(hit, "extension") ext = str(ext_val).strip().lstrip(".") if isinstance(ext_val, str) else "" except Exception: ext = "" if size_int is not None or ext: return size_int, ext # Next: backend.get_metadata(hash) when available. try: if hasattr(backend, "get_metadata"): meta = backend.get_metadata(file_hash) if isinstance(meta, dict): size_val2 = meta.get("size") if size_val2 is None: size_val2 = meta.get("file_size") if size_val2 is None: size_val2 = meta.get("filesize") if size_val2 is None: size_val2 = meta.get("size_bytes") if isinstance(size_val2, (int, float)): size_int = int(size_val2) ext_val2 = meta.get("ext") if ext_val2 is None: ext_val2 = meta.get("extension") if isinstance(ext_val2, str) and ext_val2.strip(): ext = ext_val2.strip().lstrip(".") except Exception: pass return size_int, ext def _search_urls_across_stores(self, pattern: str, config: Dict[str, Any]) -> Tuple[List[UrlItem], List[str]]: """Search for URLs matching pattern across all stores. Returns: Tuple of (matching_items, found_stores) """ items: List[UrlItem] = [] found_stores: Set[str] = set() MAX_RESULTS = 256 try: storage = Store(config) store_names = storage.list_backends() if hasattr(storage, "list_backends") else [] if not store_names: log("Error: No stores configured", file=sys.stderr) return items, list(found_stores) for store_name in store_names: if len(items) >= MAX_RESULTS: break try: backend = storage[store_name] title_cache: Dict[str, str] = {} meta_cache: Dict[str, tuple[int | None, str]] = {} # Search only URL-bearing records using the backend's URL search capability. # This avoids the expensive/incorrect "search('*')" scan. try: raw_pattern = str(pattern or "").strip() looks_like_url = self._looks_like_url_pattern(raw_pattern) has_wildcards = "*" in raw_pattern or ( not looks_like_url and "?" in raw_pattern ) # If this is a Hydrus backend and the pattern is a single URL, # normalize it through the official API. Skip for bare domains. normalized_url = None normalized_search_pattern = None if not has_wildcards and looks_like_url: normalized_search_pattern = self._normalize_url_for_search( raw_pattern ) if ( normalized_search_pattern and normalized_search_pattern != raw_pattern ): debug( "get-url normalized raw pattern: %s -> %s", raw_pattern, normalized_search_pattern, ) if hasattr(backend, "get_url_info"): try: info = backend.get_url_info(raw_pattern) # type: ignore[attr-defined] if isinstance(info, dict): norm = ( info.get("normalized_url") or info.get("normalized_url") ) if isinstance(norm, str) and norm.strip(): normalized_url = self._normalize_url_for_search( norm.strip() ) except Exception: pass if ( normalized_url and normalized_url != normalized_search_pattern and normalized_url != raw_pattern ): debug( "get-url normalized backend result: %s -> %s", raw_pattern, normalized_url, ) target_pattern = ( normalized_url or normalized_search_pattern or raw_pattern ) if has_wildcards or not target_pattern: search_query = "url:*" else: wrapped_pattern = f"*{target_pattern}*" search_query = f"url:{wrapped_pattern}" search_limit = max(1, min(MAX_RESULTS, 1000)) search_results = self._execute_search_with_timeout( backend, search_query, search_limit, store_name, pattern_hint=target_pattern, ) if search_results is None: continue search_results = search_results or [] if not search_results and target_pattern and not has_wildcards: fallback_results = self._execute_search_with_timeout( backend, "url:*", search_limit, store_name, pattern_hint=target_pattern, ) if fallback_results is None: continue search_results = fallback_results or [] for hit in (search_results or []): if len(items) >= MAX_RESULTS: break file_hash = None if isinstance(hit, dict): file_hash = hit.get("hash") or hit.get("file_hash") if not file_hash: continue file_hash = str(file_hash) title = title_cache.get(file_hash, "") if not title: try: title = ( get_field(hit, "title") or get_field(hit, "name") or get_field(hit, "file_title") or "" ) except Exception: title = "" if not title: title = self._resolve_title_for_hash(backend, file_hash, hit) title_cache[file_hash] = title size, ext = meta_cache.get(file_hash, (None, "")) if size is None and not ext: try: size = get_field(hit, "size") if size is None: size = get_field(hit, "size_bytes") if size is None: size = get_field(hit, "file_size") if size is None: size = get_field(hit, "filesize") size = int(size) if isinstance(size, (int, float)) else None except Exception: size = None try: ext = get_field(hit, "ext") or get_field(hit, "extension") ext = str(ext).strip().lstrip(".") if isinstance(ext, str) else "" except Exception: ext = "" if size is None and not ext: size, ext = self._resolve_size_ext_for_hash(backend, file_hash, hit) meta_cache[file_hash] = (size, ext) urls = self._extract_urls_from_hit(hit) if not urls: try: urls = backend.get_url(file_hash) except Exception: urls = [] for url in (urls or []): if len(items) >= MAX_RESULTS: break if not self._match_url_pattern(str(url), raw_pattern): continue from SYS.metadata import normalize_urls valid = normalize_urls([str(url)]) if not valid: continue items.append( UrlItem( url=str(url), hash=str(file_hash), store=str(store_name), title=str(title or ""), size=size, ext=str(ext or ""), ) ) found_stores.add(str(store_name)) if len(items) >= MAX_RESULTS: break except Exception as exc: debug( f"Error searching store '{store_name}': {exc}", file=sys.stderr ) continue except KeyError: continue except Exception as exc: debug( f"Error searching store '{store_name}': {exc}", file=sys.stderr ) continue return items, list(found_stores) except Exception as exc: log(f"Error searching stores: {exc}", file=sys.stderr) return items, [] def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Get url for file via hash+store, or search urls by pattern.""" parsed = parse_cmdlet_args(args, self) # Check if user provided a URL pattern to search for search_pattern = parsed.get("url") if search_pattern: # URL search mode: find all files with matching URLs across stores items, stores_searched = self._search_urls_across_stores(search_pattern, config) if not items: log(f"No urls matching pattern: {search_pattern}", file=sys.stderr) return 1 # NOTE: The CLI can auto-render tables from emitted items. When emitting # dataclass objects, the generic-object renderer will include `hash` as a # visible column. To keep HASH available for chaining but hidden from the # table, emit dicts (dict rendering hides `hash`) and provide an explicit # `columns` list to force display order and size formatting. display_items: List[Dict[str, Any]] = [] table = ( ResultTable( "url", max_columns=5 ).set_preserve_order(True).set_table("url").set_value_case("preserve") ) table.set_source_command("get-url", ["-url", search_pattern]) for item in items: payload: Dict[str, Any] = { # Keep fields for downstream cmdlets. "hash": item.hash, "store": item.store, "url": item.url, "title": item.title, "size": item.size, "ext": item.ext, # Force the visible table columns + ordering. "columns": [ ("Title", item.title), ("Url", item.url), ("Size", item.size), ("Ext", item.ext), ("Store", item.store), ], } display_items.append(payload) table.add_result(payload) ctx.set_last_result_table(table if display_items else None, display_items, subject=result) # Emit after table state is finalized to prevent side effects in TUI rendering for d in display_items: ctx.emit(d) log( f"Found {len(items)} matching url(s) in {len(stores_searched)} store(s)" ) return 0 # Original mode: Get URLs for a specific file by hash+store query_hash = sh.parse_single_hash_query(parsed.get("query")) if parsed.get("query") and not query_hash: log("Error: -query must be of the form hash:") return 1 # Extract hash and store from result or args file_hash = query_hash or get_field(result, "hash") store_name = parsed.get("store") or get_field(result, "store") if not file_hash: log( 'Error: No file hash provided (pipe an item or use -query "hash:")' ) return 1 if not store_name: log("Error: No store name provided") return 1 # Get backend and retrieve url try: storage = Store(config) backend = storage[store_name] urls = backend.get_url(file_hash) # Filter URLs to avoid data leakage from dirty DBs from SYS.metadata import normalize_urls urls = normalize_urls(urls) from SYS.result_table import ItemDetailView, extract_item_metadata # Prepare metadata for the detail view metadata = extract_item_metadata(result) if file_hash: metadata["Hash"] = file_hash if store_name: metadata["Store"] = store_name table = ( ItemDetailView( "Urls", item_metadata=metadata, max_columns=1 ).set_preserve_order(True).set_table("url").set_value_case("preserve") ) table.set_source_command("get-url", []) items: List[UrlItem] = [] for u in list(urls or []): u = str(u or "").strip() if not u: continue row = table.add_row() row.add_column("Url", u) item = UrlItem(url=u, hash=file_hash, store=str(store_name)) items.append(item) # Use overlay mode to avoid "merging" with the previous status/table state. # This is idiomatic for detail views and prevents the search table from being # contaminated by partial re-renders. ctx.set_last_result_table_overlay(table, items, subject=result) # Emit items at the end for pipeline continuity for item in items: ctx.emit(item) if not items: # Still log it but the panel will show the item context log("No url found", file=sys.stderr) return 0 except KeyError: log(f"Error: Storage backend '{store_name}' not configured") return 1 except Exception as exc: log(f"Error retrieving url: {exc}", file=sys.stderr) return 1 # Import debug function from logger if available try: from SYS.logger import debug except ImportError: def debug(*args, **kwargs): pass # Fallback no-op CMDLET = Get_Url()