from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, List, Sequence, Optional, Set, Tuple import sys import re from fnmatch import fnmatch from urllib.parse import urlparse from . import _shared as sh Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = ( sh.Cmdlet, sh.SharedArgs, sh.parse_cmdlet_args, sh.get_field, sh.normalize_hash, ) from SYS.logger import log from Store import Store from SYS import pipeline as ctx @dataclass class UrlItem: url: str hash: str store: str class Get_Url(Cmdlet): """Get url associated with files via hash+store, or search urls by pattern.""" def __init__(self) -> None: super().__init__( name="get-url", summary="List url associated with a file, or search urls by pattern", usage='@1 | get-url OR get-url -url "https://www.youtube.com/watch?v=xx"', arg=[SharedArgs.QUERY, SharedArgs.STORE, SharedArgs.URL], detail=[ "- Get url for file: @1 | get-url (requires hash+store from result)", '- Search url across stores: get-url -url "www.google.com" (strips protocol & www prefix)', '- Wildcard matching: get-url -url "youtube.com*" (matches all youtube.com urls)', "- Pattern matching: domain matching ignores protocol (https://, http://, ftp://)", ], exec=self.run, ) self.register() @staticmethod def _normalize_url_for_search(url: str) -> str: """Strip protocol and www prefix from URL for searching. Examples: https://www.youtube.com/watch?v=xx -> youtube.com/watch?v=xx http://www.google.com -> google.com ftp://files.example.com -> files.example.com """ url = str(url or "").strip() # Remove protocol (http://, https://, ftp://, etc.) url = re.sub(r"^[a-z][a-z0-9+.-]*://", "", url, flags=re.IGNORECASE) # Remove www. prefix (case-insensitive) url = re.sub(r"^www\.", "", url, flags=re.IGNORECASE) return url.lower() @staticmethod def _match_url_pattern(url: str, pattern: str) -> bool: """Match URL against pattern with wildcard support. Strips protocol/www from both URL and pattern before matching. Supports * and ? wildcards. """ normalized_url = Get_Url._normalize_url_for_search(url) normalized_pattern = Get_Url._normalize_url_for_search(pattern) # Use fnmatch for wildcard matching (* and ?) return fnmatch(normalized_url, normalized_pattern) def _search_urls_across_stores(self, pattern: str, config: Dict[str, Any]) -> Tuple[List[UrlItem], List[str]]: """Search for URLs matching pattern across all stores. Returns: Tuple of (matching_items, found_stores) """ items: List[UrlItem] = [] found_stores: Set[str] = set() try: storage = Store(config) store_names = storage.list_backends() if hasattr(storage, "list_backends") else [] if not store_names: log("Error: No stores configured", file=sys.stderr) return items, list(found_stores) for store_name in store_names: try: backend = storage[store_name] # Try to search files in this backend # For now, we'll iterate through known files (this is a limitation) # Each backend should ideally support get_all_files() or similar # For now, we use search with a broad query to find candidates try: # Try to get files via search (backend-specific) search_results = backend.search("*", limit=1000) if search_results: for result in search_results: file_hash = result.get("hash" ) or result.get("file_hash") if not file_hash: continue try: urls = backend.get_url(file_hash) if urls: for url in urls: if self._match_url_pattern(str(url), pattern): items.append( UrlItem( url=str(url), hash=file_hash, store=store_name, ) ) found_stores.add(store_name) except Exception: pass except Exception: # Backend might not support search; skip pass except KeyError: continue except Exception as exc: debug( f"Error searching store '{store_name}': {exc}", file=sys.stderr ) continue return items, list(found_stores) except Exception as exc: log(f"Error searching stores: {exc}", file=sys.stderr) return items, [] def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Get url for file via hash+store, or search urls by pattern.""" parsed = parse_cmdlet_args(args, self) # Check if user provided a URL pattern to search for search_pattern = parsed.get("url") if search_pattern: # URL search mode: find all files with matching URLs across stores items, stores_searched = self._search_urls_across_stores(search_pattern, config) if not items: log(f"No urls matching pattern: {search_pattern}", file=sys.stderr) return 1 # Create result table from SYS.result_table import ResultTable table = ( ResultTable( "URL Search Results", max_columns=3 ).set_preserve_order(True).set_table("urls").set_value_case("preserve") ) table.set_source_command("get-url", ["-url", search_pattern]) # Group by store for display by_store: Dict[str, List[UrlItem]] = {} for item in items: if item.store not in by_store: by_store[item.store] = [] by_store[item.store].append(item) # Add rows grouped by store for store_name in sorted(by_store.keys()): store_items = by_store[store_name] for idx, item in enumerate(store_items): row = table.add_row() if idx == 0: row.add_column("Store", store_name) else: row.add_column("Store", "") row.add_column("Url", item.url) # Normalize for display normalized = self._normalize_url_for_search(item.url) row.add_column("Hash", item.hash[:16]) # Show first 16 chars ctx.emit(item) ctx.set_last_result_table(table if items else None, items, subject=result) log( f"Found {len(items)} matching url(s) in {len(stores_searched)} store(s)" ) return 0 # Original mode: Get URLs for a specific file by hash+store query_hash = sh.parse_single_hash_query(parsed.get("query")) if parsed.get("query") and not query_hash: log("Error: -query must be of the form hash:") return 1 # Extract hash and store from result or args file_hash = query_hash or get_field(result, "hash") store_name = parsed.get("store") or get_field(result, "store") if not file_hash: log( 'Error: No file hash provided (pipe an item or use -query "hash:")' ) return 1 if not store_name: log("Error: No store name provided") return 1 # Normalize hash file_hash = normalize_hash(file_hash) if not file_hash: log("Error: Invalid hash format") return 1 # Get backend and retrieve url try: storage = Store(config) backend = storage[store_name] urls = backend.get_url(file_hash) from SYS.result_table import ResultTable title = str(get_field(result, "title") or "").strip() table_title = "Title" if title: table_title = f"Title: {title}" table = ( ResultTable( table_title, max_columns=1 ).set_preserve_order(True).set_table("url").set_value_case("preserve") ) table.set_source_command("get-url", []) items: List[UrlItem] = [] for u in list(urls or []): u = str(u or "").strip() if not u: continue row = table.add_row() row.add_column("Url", u) item = UrlItem(url=u, hash=file_hash, store=str(store_name)) items.append(item) ctx.emit(item) # Make this a real result table so @.. / @,, can navigate it ctx.set_last_result_table(table if items else None, items, subject=result) if not items: log("No url found", file=sys.stderr) return 0 except KeyError: log(f"Error: Storage backend '{store_name}' not configured") return 1 except Exception as exc: log(f"Error retrieving url: {exc}", file=sys.stderr) return 1 # Import debug function from logger if available try: from SYS.logger import debug except ImportError: def debug(*args, **kwargs): pass # Fallback no-op CMDLET = Get_Url()