from __future__ import annotations from queue import SimpleQueue from threading import Thread from dataclasses import dataclass from typing import Any, Dict, List, Sequence, Optional, Set, Tuple import sys import re from fnmatch import fnmatch from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse from ._shared import ( Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash, ) from . import _shared as sh from SYS.logger import log from SYS.result_table import Table from Store import Store from SYS import pipeline as ctx @dataclass class UrlItem: url: str hash: str store: str title: str = "" size: int | None = None ext: str = "" class Get_Url(Cmdlet): """Get url associated with files via hash+store, or search urls by pattern.""" STORE_SEARCH_TIMEOUT_SECONDS = 6.0 def __init__(self) -> None: super().__init__( name="get-url", summary="List url associated with a file, or search urls by pattern", usage='@1 | get-url OR get-url -url "https://www.youtube.com/watch?v=xx"', arg=[SharedArgs.QUERY, SharedArgs.STORE, SharedArgs.URL], detail=[ "- Get url for file: @1 | get-url (requires hash+store from result)", '- Search url across stores: get-url -url "www.google.com" (strips protocol & www prefix)', '- Wildcard matching: get-url -url "youtube.com*" (matches all youtube.com urls)', "- Pattern matching: domain matching ignores protocol (https://, http://, ftp://)", ], exec=self.run, ) self.register() @staticmethod def _normalize_url_for_search(url: str) -> str: """Strip protocol and www prefix from URL for searching. Examples: https://www.youtube.com/watch?v=xx -> youtube.com/watch?v=xx http://www.google.com -> google.com ftp://files.example.com -> files.example.com """ url = str(url or "").strip() # Strip fragment (e.g., #t=10) before matching url = url.split("#", 1)[0] # Strip common time/tracking query params for matching try: parsed = urlparse(url) except Exception: parsed = None if parsed is not None and parsed.query: time_keys = {"t", "start", "time_continue", "timestamp", "time", "begin"} tracking_prefixes = ("utm_",) try: pairs = parse_qsl(parsed.query, keep_blank_values=True) filtered = [] for key, val in pairs: key_norm = str(key or "").lower() if key_norm in time_keys: continue if key_norm.startswith(tracking_prefixes): continue filtered.append((key, val)) if filtered: url = urlunparse(parsed._replace(query=urlencode(filtered, doseq=True))) else: url = urlunparse(parsed._replace(query="")) except Exception: pass # Remove protocol (http://, https://, ftp://, etc.) url = re.sub(r"^[a-z][a-z0-9+.-]*://", "", url, flags=re.IGNORECASE) # Remove www. prefix (case-insensitive) url = re.sub(r"^www\.", "", url, flags=re.IGNORECASE) return url.lower() @staticmethod def _looks_like_url_pattern(value: str) -> bool: v = str(value or "").strip().lower() if not v: return False if "://" in v: return True if v.startswith(("magnet:", "torrent:", "ytdl:", "tidal:", "ftp:", "sftp:", "file:")): return True return "." in v and "/" in v @staticmethod def _match_url_pattern(url: str, pattern: str) -> bool: """Match URL against pattern with wildcard support. Strips protocol/www from both URL and pattern before matching. Supports * and ? wildcards. """ raw_pattern = str(pattern or "").strip() normalized_url = Get_Url._normalize_url_for_search(url) normalized_pattern = Get_Url._normalize_url_for_search(raw_pattern) looks_like_url = Get_Url._looks_like_url_pattern(raw_pattern) has_wildcards = "*" in normalized_pattern or ( not looks_like_url and "?" in normalized_pattern ) if has_wildcards: return fnmatch(normalized_url, normalized_pattern) normalized_url_no_slash = normalized_url.rstrip("/") normalized_pattern_no_slash = normalized_pattern.rstrip("/") if normalized_pattern_no_slash and normalized_pattern_no_slash == normalized_url_no_slash: return True return normalized_pattern in normalized_url def _execute_search_with_timeout( self, backend: Any, query: str, limit: int, store_name: str, **kwargs: Any, ) -> Optional[List[Any]]: queue: SimpleQueue[tuple[str, Any]] = SimpleQueue() def _worker() -> None: try: queue.put(("ok", backend.search(query, limit=limit, **kwargs))) except Exception as exc: queue.put(("err", exc)) worker = Thread(target=_worker, daemon=True) worker.start() worker.join(timeout=self.STORE_SEARCH_TIMEOUT_SECONDS) if worker.is_alive(): debug( f"Store '{store_name}' search timed out after {self.STORE_SEARCH_TIMEOUT_SECONDS}s", file=sys.stderr, ) return None if queue.empty(): return [] status, payload = queue.get() if status == "err": debug( f"Store '{store_name}' search failed: {payload}", file=sys.stderr, ) return [] return payload or [] @staticmethod def _extract_first_url(value: Any) -> Optional[str]: if isinstance(value, str): v = value.strip() return v or None if isinstance(value, (list, tuple)): for item in value: if isinstance(item, str) and item.strip(): return item.strip() return None @staticmethod def _extract_urls_from_hit(hit: Any) -> List[str]: """Extract candidate URLs directly from a search hit, if present.""" raw = None try: raw = get_field(hit, "known_urls") if not raw: raw = get_field(hit, "urls") if not raw: raw = get_field(hit, "url") if not raw: raw = get_field(hit, "source_url") or get_field(hit, "source_urls") except Exception: raw = None if isinstance(raw, str): val = raw.strip() return [val] if val else [] if isinstance(raw, (list, tuple)): out: list[str] = [] for item in raw: if not isinstance(item, str): continue v = item.strip() if v: out.append(v) return out return [] @staticmethod def _extract_title_from_result(result: Any) -> Optional[str]: # Prefer explicit title field. # Fall back to ResultTable-style columns list. cols = None if isinstance(result, dict): cols = result.get("columns") else: cols = getattr(result, "columns", None) if isinstance(cols, list): for pair in cols: try: if isinstance(pair, (list, tuple)) and len(pair) == 2: k, v = pair if str(k or "").strip().lower() in {"title", "name"}: if isinstance(v, str) and v.strip(): return v.strip() except Exception: continue return None @staticmethod def _extract_size_from_hit(hit: Any) -> int | None: for key in ("size", "file_size", "filesize", "size_bytes"): try: val = get_field(hit, key) except Exception: val = None if val is None: continue if isinstance(val, (int, float)): return int(val) try: return int(val) except Exception: continue return None @staticmethod def _extract_ext_from_hit(hit: Any) -> str: for key in ("ext", "extension"): try: ext_val = get_field(hit, key) except Exception: ext_val = None if isinstance(ext_val, str) and ext_val.strip(): return ext_val.strip().lstrip(".") return "" def _search_urls_across_stores(self, pattern: str, config: Dict[str, Any]) -> Tuple[List[UrlItem], List[str]]: """Search for URLs matching pattern across all stores. Returns: Tuple of (matching_items, found_stores) """ items: List[UrlItem] = [] found_stores: Set[str] = set() MAX_RESULTS = 256 try: storage = Store(config) store_names = storage.list_backends() if hasattr(storage, "list_backends") else [] if not store_names: log("Error: No stores configured", file=sys.stderr) return items, list(found_stores) for store_name in store_names: if len(items) >= MAX_RESULTS: break try: backend = storage[store_name] # Search only URL-bearing records using the backend's URL search capability. # This avoids the expensive/incorrect "search('*')" scan. try: raw_pattern = str(pattern or "").strip() looks_like_url = self._looks_like_url_pattern(raw_pattern) has_wildcards = "*" in raw_pattern or ( not looks_like_url and "?" in raw_pattern ) # If this is a Hydrus backend and the pattern is a single URL, # normalize it through the official API. Skip for bare domains. normalized_url = None normalized_search_pattern = None if not has_wildcards and looks_like_url: normalized_search_pattern = self._normalize_url_for_search( raw_pattern ) if ( normalized_search_pattern and normalized_search_pattern != raw_pattern ): debug( "get-url normalized raw pattern: %s -> %s", raw_pattern, normalized_search_pattern, ) if hasattr(backend, "get_url_info"): try: info = backend.get_url_info(raw_pattern) # type: ignore[attr-defined] if isinstance(info, dict): norm = ( info.get("normalized_url") or info.get("normalized_url") ) if isinstance(norm, str) and norm.strip(): normalized_url = self._normalize_url_for_search( norm.strip() ) except Exception: pass if ( normalized_url and normalized_url != normalized_search_pattern and normalized_url != raw_pattern ): debug( "get-url normalized backend result: %s -> %s", raw_pattern, normalized_url, ) target_pattern = ( normalized_url or normalized_search_pattern or raw_pattern ) if has_wildcards or not target_pattern: search_query = "url:*" else: wrapped_pattern = f"*{target_pattern}*" search_query = f"url:{wrapped_pattern}" search_limit = max(1, min(MAX_RESULTS, 1000)) search_results = self._execute_search_with_timeout( backend, search_query, search_limit, store_name, pattern_hint=target_pattern, minimal=True, ) if search_results is None: continue search_results = search_results or [] for hit in (search_results or []): if len(items) >= MAX_RESULTS: break file_hash = None if isinstance(hit, dict): file_hash = hit.get("hash") or hit.get("file_hash") if not file_hash: continue file_hash = str(file_hash) title = self._extract_title_from_result(hit) or "" size = self._extract_size_from_hit(hit) ext = self._extract_ext_from_hit(hit) urls = self._extract_urls_from_hit(hit) if not urls: try: urls = backend.get_url(file_hash) except Exception: urls = [] hit_added = False for url in (urls or []): if len(items) >= MAX_RESULTS: break if not self._match_url_pattern(str(url), raw_pattern): continue from SYS.metadata import normalize_urls valid = normalize_urls([str(url)]) if not valid: continue items.append( UrlItem( url=str(url), hash=str(file_hash), store=str(store_name), title=str(title or ""), size=size, ext=str(ext or ""), ) ) hit_added = True if hit_added: found_stores.add(str(store_name)) if len(items) >= MAX_RESULTS: break except Exception as exc: debug( f"Error searching store '{store_name}': {exc}", file=sys.stderr ) continue except KeyError: continue except Exception as exc: debug( f"Error searching store '{store_name}': {exc}", file=sys.stderr ) continue return items, list(found_stores) except Exception as exc: log(f"Error searching stores: {exc}", file=sys.stderr) return items, [] def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Get url for file via hash+store, or search urls by pattern.""" parsed = parse_cmdlet_args(args, self) # Check if user provided a URL pattern to search for search_pattern = parsed.get("url") # Support positional URL search or "url:" query prefix if not search_pattern: query = parsed.get("query") if query: if str(query).lower().startswith("url:"): search_pattern = query[4:].strip() elif self._looks_like_url_pattern(query) or ( "." in str(query) and len(str(query)) < 64 ): # If it looks like a domain or URL, and isn't a long hash, # treat a positional query as a search pattern. search_pattern = query if search_pattern: # URL search mode: find all files with matching URLs across stores items, stores_searched = self._search_urls_across_stores(search_pattern, config) if not items: log(f"No urls matching pattern: {search_pattern}", file=sys.stderr) return 1 # NOTE: The CLI can auto-render tables from emitted items. When emitting # dataclass objects, the generic-object renderer will include `hash` as a # visible column. To keep HASH available for chaining but hidden from the # table, emit dicts (dict rendering hides `hash`) and provide an explicit # `columns` list to force display order and size formatting. display_items: List[Dict[str, Any]] = [] table = ( Table( "url", max_columns=5 )._perseverance(True).set_table("url").set_value_case("preserve") ) table.set_source_command("get-url", ["-url", search_pattern]) for item in items: payload: Dict[str, Any] = { # Keep fields for downstream cmdlets. "hash": item.hash, "store": item.store, "url": item.url, "title": item.title, "size": item.size, "ext": item.ext, # Force the visible table columns + ordering. "columns": [ ("Title", item.title), ("Url", item.url), ("Size", item.size), ("Ext", item.ext), ("Store", item.store), ], } display_items.append(payload) table.add_result(payload) ctx.set_last_result_table(table if display_items else None, display_items, subject=result) # Emit after table state is finalized to prevent side effects in TUI rendering for d in display_items: ctx.emit(d) log( f"Found {len(items)} matching url(s) in {len(stores_searched)} store(s)" ) return 0 # Original mode: Get URLs for a specific file by hash+store query_hash = sh.parse_single_hash_query(parsed.get("query")) if parsed.get("query") and not query_hash: log("Error: -query must be of the form hash:") return 1 # Extract hash and store from result or args file_hash = query_hash or get_field(result, "hash") store_name = parsed.get("store") or get_field(result, "store") if not file_hash: log( 'Error: No file hash provided (pipe an item or use -query "hash:")' ) return 1 if not store_name: log("Error: No store name provided") return 1 # Get backend and retrieve url try: storage = Store(config) backend = storage[store_name] urls = backend.get_url(file_hash) # Filter URLs to avoid data leakage from dirty DBs from SYS.metadata import normalize_urls urls = normalize_urls(urls) from SYS.result_table import ItemDetailView, extract_item_metadata # Prepare metadata for the detail view metadata = extract_item_metadata(result) # Enrich the metadata with tags if missing if not metadata.get("Tags"): try: item_tags = get_field(result, "tag") or get_field(result, "tags") or [] row_tags = [] if isinstance(item_tags, list): row_tags.extend([str(t) for t in item_tags]) elif isinstance(item_tags, str): row_tags.append(item_tags) # Also collect from backend if file_hash and store_name: try: # Re-use existing backend variable if backend and hasattr(backend, "get_tag"): b_tags, _ = backend.get_tag(file_hash) if b_tags: row_tags.extend([str(t) for t in b_tags]) except Exception: pass if row_tags: row_tags = sorted(list(set(row_tags))) metadata["Tags"] = ", ".join(row_tags) except Exception: pass if file_hash: metadata["Hash"] = file_hash if store_name: metadata["Store"] = store_name table = ( ItemDetailView( "Urls", item_metadata=metadata, max_columns=1 )._perseverance(True).set_table("url").set_value_case("preserve") ) table.set_source_command("get-url", []) items: List[UrlItem] = [] for u in list(urls or []): u = str(u or "").strip() if not u: continue row = table.add_row() row.add_column("Url", u) item = UrlItem(url=u, hash=file_hash, store=str(store_name)) items.append(item) # Use overlay mode to avoid "merging" with the previous status/table state. # This is idiomatic for detail views and prevents the search table from being # contaminated by partial re-renders. ctx.set_last_result_table_overlay(table, items, subject=result) # Emit items at the end for pipeline continuity for item in items: ctx.emit(item) if not items: # Still log it but the panel will show the item context log("No url found", file=sys.stderr) return 0 except KeyError: log(f"Error: Storage backend '{store_name}' not configured") return 1 except Exception as exc: log(f"Error retrieving url: {exc}", file=sys.stderr) return 1 # Import debug function from logger if available try: from SYS.logger import debug except ImportError: def debug(*args, **kwargs): pass # Fallback no-op CMDLET = Get_Url()