from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, List, Sequence, Optional, Set, Tuple import sys import re from fnmatch import fnmatch from urllib.parse import urlparse import pipeline as ctx from . import _shared as sh Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = ( sh.Cmdlet, sh.SharedArgs, sh.parse_cmdlet_args, sh.get_field, sh.normalize_hash, ) from SYS.logger import log from Store import Store @dataclass class UrlItem: url: str hash: str store: str class Get_Url(Cmdlet): """Get url associated with files via hash+store, or search urls by pattern.""" def __init__(self) -> None: super().__init__( name="get-url", summary="List url associated with a file, or search urls by pattern", usage='@1 | get-url OR get-url -url "https://www.youtube.com/watch?v=xx"', arg=[SharedArgs.QUERY, SharedArgs.STORE, SharedArgs.URL], detail=[ "- Get url for file: @1 | get-url (requires hash+store from result)", '- Search url across stores: get-url -url "www.google.com" (strips protocol & www prefix)', '- Wildcard matching: get-url -url "youtube.com*" (matches all youtube.com urls)', "- Pattern matching: domain matching ignores protocol (https://, http://, ftp://)", ], exec=self.run, ) self.register() @staticmethod def _normalize_url_for_search(url: str) -> str: """Strip protocol and www prefix from URL for searching. Examples: https://www.youtube.com/watch?v=xx -> youtube.com/watch?v=xx http://www.google.com -> google.com ftp://files.example.com -> files.example.com """ url = str(url or "").strip() # Remove protocol (http://, https://, ftp://, etc.) url = re.sub(r"^[a-z][a-z0-9+.-]*://", "", url, flags=re.IGNORECASE) # Remove www. prefix (case-insensitive) url = re.sub(r"^www\.", "", url, flags=re.IGNORECASE) return url.lower() @staticmethod def _match_url_pattern(url: str, pattern: str) -> bool: """Match URL against pattern with wildcard support. Strips protocol/www from both URL and pattern before matching. Supports * and ? wildcards. """ normalized_url = Get_Url._normalize_url_for_search(url) normalized_pattern = Get_Url._normalize_url_for_search(pattern) # Use fnmatch for wildcard matching (* and ?) return fnmatch(normalized_url, normalized_pattern) def _search_urls_across_stores(self, pattern: str, config: Dict[str, Any]) -> Tuple[List[UrlItem], List[str]]: """Search for URLs matching pattern across all stores. Returns: Tuple of (matching_items, found_stores) """ items: List[UrlItem] = [] found_stores: Set[str] = set() try: storage = Store(config) store_names = storage.list_backends() if hasattr(storage, "list_backends") else [] if not store_names: log("Error: No stores configured", file=sys.stderr) return items, list(found_stores) for store_name in store_names: try: backend = storage[store_name] # Try to search files in this backend # For now, we'll iterate through known files (this is a limitation) # Each backend should ideally support get_all_files() or similar # For now, we use search with a broad query to find candidates try: # Try to get files via search (backend-specific) search_results = backend.search("*", limit=1000) if search_results: for result in search_results: file_hash = result.get("hash" ) or result.get("file_hash") if not file_hash: continue try: urls = backend.get_url(file_hash) if urls: for url in urls: if self._match_url_pattern(str(url), pattern): items.append( UrlItem( url=str(url), hash=file_hash, store=store_name, ) ) found_stores.add(store_name) except Exception: pass except Exception: # Backend might not support search; skip pass except KeyError: continue except Exception as exc: debug( f"Error searching store '{store_name}': {exc}", file=sys.stderr ) continue return items, list(found_stores) except Exception as exc: log(f"Error searching stores: {exc}", file=sys.stderr) return items, [] def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Get url for file via hash+store, or search urls by pattern.""" parsed = parse_cmdlet_args(args, self) # Check if user provided a URL pattern to search for search_pattern = parsed.get("url") if search_pattern: # URL search mode: find all files with matching URLs across stores items, stores_searched = self._search_urls_across_stores(search_pattern, config) if not items: log(f"No urls matching pattern: {search_pattern}", file=sys.stderr) return 1 # Create result table from result_table import ResultTable table = ( ResultTable( "URL Search Results", max_columns=3 ).set_preserve_order(True).set_table("urls").set_value_case("preserve") ) table.set_source_command("get-url", ["-url", search_pattern]) # Group by store for display by_store: Dict[str, List[UrlItem]] = {} for item in items: if item.store not in by_store: by_store[item.store] = [] by_store[item.store].append(item) # Add rows grouped by store for store_name in sorted(by_store.keys()): store_items = by_store[store_name] for idx, item in enumerate(store_items): row = table.add_row() if idx == 0: row.add_column("Store", store_name) else: row.add_column("Store", "") row.add_column("Url", item.url) # Normalize for display normalized = self._normalize_url_for_search(item.url) row.add_column("Hash", item.hash[:16]) # Show first 16 chars ctx.emit(item) ctx.set_last_result_table(table if items else None, items, subject=result) log( f"Found {len(items)} matching url(s) in {len(stores_searched)} store(s)" ) return 0 # Original mode: Get URLs for a specific file by hash+store query_hash = sh.parse_single_hash_query(parsed.get("query")) if parsed.get("query") and not query_hash: log("Error: -query must be of the form hash:") return 1 # Extract hash and store from result or args file_hash = query_hash or get_field(result, "hash") store_name = parsed.get("store") or get_field(result, "store") if not file_hash: log( 'Error: No file hash provided (pipe an item or use -query "hash:")' ) return 1 if not store_name: log("Error: No store name provided") return 1 # Normalize hash file_hash = normalize_hash(file_hash) if not file_hash: log("Error: Invalid hash format") return 1 # Get backend and retrieve url try: storage = Store(config) backend = storage[store_name] urls = backend.get_url(file_hash) from result_table import ResultTable title = str(get_field(result, "title") or "").strip() table_title = "Title" if title: table_title = f"Title: {title}" table = ( ResultTable( table_title, max_columns=1 ).set_preserve_order(True).set_table("url").set_value_case("preserve") ) table.set_source_command("get-url", []) items: List[UrlItem] = [] for u in list(urls or []): u = str(u or "").strip() if not u: continue row = table.add_row() row.add_column("Url", u) item = UrlItem(url=u, hash=file_hash, store=str(store_name)) items.append(item) ctx.emit(item) # Make this a real result table so @.. / @,, can navigate it ctx.set_last_result_table(table if items else None, items, subject=result) if not items: log("No url found", file=sys.stderr) return 0 except KeyError: log(f"Error: Storage backend '{store_name}' not configured") return 1 except Exception as exc: log(f"Error retrieving url: {exc}", file=sys.stderr) return 1 # Import debug function from logger if available try: from SYS.logger import debug except ImportError: def debug(*args, **kwargs): pass # Fallback no-op CMDLET = Get_Url()