From a97657a75735a12f5886ad4084529497f20cc8ee Mon Sep 17 00:00:00 2001 From: Nose Date: Tue, 30 Dec 2025 05:48:01 -0800 Subject: [PATCH] sd --- API/HydrusNetwork.py | 19 ++++ Store/HydrusNetwork.py | 30 ++++++ cmdlet/get_url.py | 233 +++++++++++++++++++++++++++++++---------- 3 files changed, 224 insertions(+), 58 deletions(-) diff --git a/API/HydrusNetwork.py b/API/HydrusNetwork.py index 28d8e0b..5fe3321 100644 --- a/API/HydrusNetwork.py +++ b/API/HydrusNetwork.py @@ -567,6 +567,25 @@ class HydrusNetwork: "batched": results } + def get_url_info(self, url: str) -> dict[str, Any]: + """Get information about a URL. + + Hydrus Client API: GET /add_urls/get_url_info + Docs: https://hydrusnetwork.github.io/hydrus/developer_api.html#add_urls_get_url_info + """ + url = str(url or "").strip() + if not url: + raise ValueError("url must not be empty") + + spec = HydrusRequestSpec( + method="GET", + endpoint="/add_urls/get_url_info", + query={ + "url": url + }, + ) + return cast(dict[str, Any], self._perform_request(spec)) + def delete_url(self, file_hashes: Union[str, Iterable[str]], diff --git a/Store/HydrusNetwork.py b/Store/HydrusNetwork.py index 1de2f6e..0f122de 100644 --- a/Store/HydrusNetwork.py +++ b/Store/HydrusNetwork.py @@ -1476,6 +1476,36 @@ class HydrusNetwork(Store): debug(f"{self._log_prefix()} get_url failed: {exc}") return [] + def get_url_info(self, url: str, **kwargs: Any) -> dict[str, Any] | None: + """Return Hydrus URL info for a single URL (Hydrus-only helper). + + Uses: GET /add_urls/get_url_info + """ + try: + client = self._client + if client is None: + return None + u = str(url or "").strip() + if not u: + return None + try: + return client.get_url_info(u) # type: ignore[attr-defined] + except Exception: + from API.HydrusNetwork import HydrusRequestSpec + + spec = HydrusRequestSpec( + method="GET", + endpoint="/add_urls/get_url_info", + query={ + "url": u + }, + ) + response = client._perform_request(spec) # type: ignore[attr-defined] + return response if isinstance(response, dict) else None + except Exception as exc: + debug(f"{self._log_prefix()} get_url_info failed: {exc}") + return None + def add_url(self, file_identifier: str, url: List[str], **kwargs: Any) -> bool: """Associate one or more url with a Hydrus file.""" try: diff --git a/cmdlet/get_url.py b/cmdlet/get_url.py index 8c2bff5..9e1adc1 100644 --- a/cmdlet/get_url.py +++ b/cmdlet/get_url.py @@ -16,6 +16,7 @@ Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = ( sh.normalize_hash, ) from SYS.logger import log +from SYS.result_table import ResultTable from Store import Store from SYS import pipeline as ctx @@ -25,6 +26,7 @@ class UrlItem: url: str hash: str store: str + title: str = "" class Get_Url(Cmdlet): @@ -80,6 +82,107 @@ class Get_Url(Cmdlet): # Use fnmatch for wildcard matching (* and ?) return fnmatch(normalized_url, normalized_pattern) + @staticmethod + def _extract_first_url(value: Any) -> Optional[str]: + if isinstance(value, str): + v = value.strip() + return v or None + if isinstance(value, (list, tuple)): + for item in value: + if isinstance(item, str) and item.strip(): + return item.strip() + return None + + @staticmethod + def _extract_url_from_result(result: Any) -> Optional[str]: + # Prefer explicit url field. + u = Get_Url._extract_first_url(get_field(result, "url")) + if u: + return u + + # Fall back to ResultTable-style columns list. + cols = None + if isinstance(result, dict): + cols = result.get("columns") + else: + cols = getattr(result, "columns", None) + if isinstance(cols, list): + for pair in cols: + try: + if isinstance(pair, (list, tuple)) and len(pair) == 2: + k, v = pair + if str(k or "").strip().lower() in {"url", "urls"}: + u2 = Get_Url._extract_first_url(v) + if u2: + return u2 + except Exception: + continue + return None + + @staticmethod + def _extract_title_from_result(result: Any) -> Optional[str]: + # Prefer explicit title field. + t = get_field(result, "title") + if isinstance(t, str) and t.strip(): + return t.strip() + + # Fall back to ResultTable-style columns list. + cols = None + if isinstance(result, dict): + cols = result.get("columns") + else: + cols = getattr(result, "columns", None) + if isinstance(cols, list): + for pair in cols: + try: + if isinstance(pair, (list, tuple)) and len(pair) == 2: + k, v = pair + if str(k or "").strip().lower() in {"title", "name"}: + if isinstance(v, str) and v.strip(): + return v.strip() + except Exception: + continue + return None + + @staticmethod + def _resolve_title_for_hash(backend: Any, file_hash: str, hit: Any = None) -> str: + """Best-effort title resolution for a found hash. + + Strategy: + - Use the hit's existing title/columns when present. + - Prefer backend.get_metadata(hash) when available (direct lookup). + - Fallback to backend.search('hash:', limit=1) and read title. + """ + try: + if hit is not None: + from_hit = Get_Url._extract_title_from_result(hit) + if from_hit: + return from_hit + except Exception: + pass + + try: + if hasattr(backend, "get_metadata"): + meta = backend.get_metadata(file_hash) + if isinstance(meta, dict): + t = meta.get("title") + if isinstance(t, str) and t.strip(): + return t.strip() + except Exception: + pass + + try: + if hasattr(backend, "search"): + hits = backend.search(f"hash:{file_hash}", limit=1) + if isinstance(hits, list) and hits: + t2 = Get_Url._extract_title_from_result(hits[0]) + if t2: + return t2 + except Exception: + pass + + return "" + def _search_urls_across_stores(self, pattern: str, config: Dict[str, @@ -106,39 +209,70 @@ class Get_Url(Cmdlet): try: backend = storage[store_name] - # Try to search files in this backend - # For now, we'll iterate through known files (this is a limitation) - # Each backend should ideally support get_all_files() or similar - # For now, we use search with a broad query to find candidates - try: - # Try to get files via search (backend-specific) - search_results = backend.search("*", limit=1000) - if search_results: - for result in search_results: - file_hash = result.get("hash" - ) or result.get("file_hash") - if not file_hash: - continue + title_cache: Dict[str, str] = {} - try: - urls = backend.get_url(file_hash) - if urls: - for url in urls: - if self._match_url_pattern(str(url), - pattern): - items.append( - UrlItem( - url=str(url), - hash=file_hash, - store=store_name, - ) - ) - found_stores.add(store_name) - except Exception: - pass - except Exception: - # Backend might not support search; skip - pass + # Search only URL-bearing records using the backend's URL search capability. + # This avoids the expensive/incorrect "search('*')" scan. + try: + raw_pattern = str(pattern or "").strip() + has_wildcards = any(ch in raw_pattern for ch in ("*", "?")) + + # If this is a Hydrus backend and the pattern is a single URL, + # normalize it through the official API. + normalized_url = None + if not has_wildcards and hasattr(backend, "get_url_info"): + try: + info = backend.get_url_info(raw_pattern) # type: ignore[attr-defined] + if isinstance(info, dict): + norm = info.get("normalised_url") or info.get("normalized_url") + if isinstance(norm, str) and norm.strip(): + normalized_url = norm.strip() + except Exception: + normalized_url = None + + search_query = "url:*" if has_wildcards else f"url:{normalized_url or raw_pattern}" + try: + search_results = backend.search(search_query, limit=1000) + except Exception: + search_results = [] + + for hit in (search_results or []): + file_hash = None + if isinstance(hit, dict): + file_hash = hit.get("hash") or hit.get("file_hash") + if not file_hash: + continue + + file_hash = str(file_hash) + + title = title_cache.get(file_hash, "") + if not title: + title = self._resolve_title_for_hash(backend, file_hash, hit) + title_cache[file_hash] = title + + try: + urls = backend.get_url(file_hash) + except Exception: + urls = [] + + for url in (urls or []): + if not self._match_url_pattern(str(url), raw_pattern): + continue + items.append( + UrlItem( + url=str(url), + hash=str(file_hash), + store=str(store_name), + title=str(title or ""), + ) + ) + found_stores.add(str(store_name)) + except Exception as exc: + debug( + f"Error searching store '{store_name}': {exc}", + file=sys.stderr + ) + continue except KeyError: continue @@ -162,6 +296,10 @@ class Get_Url(Cmdlet): # Check if user provided a URL pattern to search for search_pattern = parsed.get("url") + # Allow piping a URL row (or any result with a url field/column) into get-url. + if not search_pattern: + search_pattern = self._extract_url_from_result(result) + if search_pattern: # URL search mode: find all files with matching URLs across stores items, stores_searched = self._search_urls_across_stores(search_pattern, config) @@ -170,9 +308,6 @@ class Get_Url(Cmdlet): log(f"No urls matching pattern: {search_pattern}", file=sys.stderr) return 1 - # Create result table - from SYS.result_table import ResultTable - table = ( ResultTable( "URL Search Results", @@ -181,28 +316,12 @@ class Get_Url(Cmdlet): ) table.set_source_command("get-url", ["-url", search_pattern]) - # Group by store for display - by_store: Dict[str, - List[UrlItem]] = {} for item in items: - if item.store not in by_store: - by_store[item.store] = [] - by_store[item.store].append(item) - - # Add rows grouped by store - for store_name in sorted(by_store.keys()): - store_items = by_store[store_name] - for idx, item in enumerate(store_items): - row = table.add_row() - if idx == 0: - row.add_column("Store", store_name) - else: - row.add_column("Store", "") - row.add_column("Url", item.url) - # Normalize for display - normalized = self._normalize_url_for_search(item.url) - row.add_column("Hash", item.hash[:16]) # Show first 16 chars - ctx.emit(item) + row = table.add_row() + row.add_column("Title", item.title) + row.add_column("Url", item.url) + row.add_column("Store", item.store) + ctx.emit(item) ctx.set_last_result_table(table if items else None, items, subject=result) log( @@ -243,8 +362,6 @@ class Get_Url(Cmdlet): urls = backend.get_url(file_hash) - from SYS.result_table import ResultTable - title = str(get_field(result, "title") or "").strip() table_title = "Title" if title: