Files
Medios-Macina/cmdlet/get_url.py
2025-12-30 23:19:02 -08:00

499 lines
19 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Sequence, Optional, Set, Tuple
import sys
import re
from fnmatch import fnmatch
from urllib.parse import urlparse
from . import _shared as sh
Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = (
sh.Cmdlet,
sh.SharedArgs,
sh.parse_cmdlet_args,
sh.get_field,
sh.normalize_hash,
)
from SYS.logger import log
from SYS.result_table import ResultTable
from Store import Store
from SYS import pipeline as ctx
@dataclass
class UrlItem:
url: str
hash: str
store: str
title: str = ""
size: int | None = None
ext: str = ""
class Get_Url(Cmdlet):
"""Get url associated with files via hash+store, or search urls by pattern."""
def __init__(self) -> None:
super().__init__(
name="get-url",
summary="List url associated with a file, or search urls by pattern",
usage='@1 | get-url OR get-url -url "https://www.youtube.com/watch?v=xx"',
arg=[SharedArgs.QUERY,
SharedArgs.STORE,
SharedArgs.URL],
detail=[
"- Get url for file: @1 | get-url (requires hash+store from result)",
'- Search url across stores: get-url -url "www.google.com" (strips protocol & www prefix)',
'- Wildcard matching: get-url -url "youtube.com*" (matches all youtube.com urls)',
"- Pattern matching: domain matching ignores protocol (https://, http://, ftp://)",
],
exec=self.run,
)
self.register()
@staticmethod
def _normalize_url_for_search(url: str) -> str:
"""Strip protocol and www prefix from URL for searching.
Examples:
https://www.youtube.com/watch?v=xx -> youtube.com/watch?v=xx
http://www.google.com -> google.com
ftp://files.example.com -> files.example.com
"""
url = str(url or "").strip()
# Remove protocol (http://, https://, ftp://, etc.)
url = re.sub(r"^[a-z][a-z0-9+.-]*://", "", url, flags=re.IGNORECASE)
# Remove www. prefix (case-insensitive)
url = re.sub(r"^www\.", "", url, flags=re.IGNORECASE)
return url.lower()
@staticmethod
def _match_url_pattern(url: str, pattern: str) -> bool:
"""Match URL against pattern with wildcard support.
Strips protocol/www from both URL and pattern before matching.
Supports * and ? wildcards.
"""
normalized_url = Get_Url._normalize_url_for_search(url)
normalized_pattern = Get_Url._normalize_url_for_search(pattern)
# Use fnmatch for wildcard matching (* and ?)
return fnmatch(normalized_url, normalized_pattern)
@staticmethod
def _extract_first_url(value: Any) -> Optional[str]:
if isinstance(value, str):
v = value.strip()
return v or None
if isinstance(value, (list, tuple)):
for item in value:
if isinstance(item, str) and item.strip():
return item.strip()
return None
@staticmethod
def _extract_url_from_result(result: Any) -> Optional[str]:
# Prefer explicit url field.
u = Get_Url._extract_first_url(get_field(result, "url"))
if u:
return u
# Fall back to ResultTable-style columns list.
cols = None
if isinstance(result, dict):
cols = result.get("columns")
else:
cols = getattr(result, "columns", None)
if isinstance(cols, list):
for pair in cols:
try:
if isinstance(pair, (list, tuple)) and len(pair) == 2:
k, v = pair
if str(k or "").strip().lower() in {"url", "urls"}:
u2 = Get_Url._extract_first_url(v)
if u2:
return u2
except Exception:
continue
return None
@staticmethod
def _extract_title_from_result(result: Any) -> Optional[str]:
# Prefer explicit title field.
t = get_field(result, "title")
if isinstance(t, str) and t.strip():
return t.strip()
# Fall back to ResultTable-style columns list.
cols = None
if isinstance(result, dict):
cols = result.get("columns")
else:
cols = getattr(result, "columns", None)
if isinstance(cols, list):
for pair in cols:
try:
if isinstance(pair, (list, tuple)) and len(pair) == 2:
k, v = pair
if str(k or "").strip().lower() in {"title", "name"}:
if isinstance(v, str) and v.strip():
return v.strip()
except Exception:
continue
return None
@staticmethod
def _resolve_title_for_hash(backend: Any, file_hash: str, hit: Any = None) -> str:
"""Best-effort title resolution for a found hash.
Strategy:
- Use the hit's existing title/columns when present.
- Prefer backend.get_metadata(hash) when available (direct lookup).
- Fallback to backend.search('hash:<sha>', limit=1) and read title.
"""
try:
if hit is not None:
from_hit = Get_Url._extract_title_from_result(hit)
if from_hit:
return from_hit
except Exception:
pass
try:
if hasattr(backend, "get_metadata"):
meta = backend.get_metadata(file_hash)
if isinstance(meta, dict):
t = meta.get("title")
if isinstance(t, str) and t.strip():
return t.strip()
except Exception:
pass
try:
if hasattr(backend, "search"):
hits = backend.search(f"hash:{file_hash}", limit=1)
if isinstance(hits, list) and hits:
t2 = Get_Url._extract_title_from_result(hits[0])
if t2:
return t2
except Exception:
pass
return ""
@staticmethod
def _resolve_size_ext_for_hash(backend: Any, file_hash: str, hit: Any = None) -> tuple[int | None, str]:
"""Best-effort (size, ext) resolution for a found hash."""
# First: see if the hit already includes these fields.
try:
size_val = get_field(hit, "size")
if size_val is None:
size_val = get_field(hit, "file_size")
if size_val is None:
size_val = get_field(hit, "filesize")
if size_val is None:
size_val = get_field(hit, "size_bytes")
size_int = int(size_val) if isinstance(size_val, (int, float)) else None
except Exception:
size_int = None
try:
ext_val = get_field(hit, "ext")
if ext_val is None:
ext_val = get_field(hit, "extension")
ext = str(ext_val).strip().lstrip(".") if isinstance(ext_val, str) else ""
except Exception:
ext = ""
if size_int is not None or ext:
return size_int, ext
# Next: backend.get_metadata(hash) when available.
try:
if hasattr(backend, "get_metadata"):
meta = backend.get_metadata(file_hash)
if isinstance(meta, dict):
size_val2 = meta.get("size")
if size_val2 is None:
size_val2 = meta.get("file_size")
if size_val2 is None:
size_val2 = meta.get("filesize")
if size_val2 is None:
size_val2 = meta.get("size_bytes")
if isinstance(size_val2, (int, float)):
size_int = int(size_val2)
ext_val2 = meta.get("ext")
if ext_val2 is None:
ext_val2 = meta.get("extension")
if isinstance(ext_val2, str) and ext_val2.strip():
ext = ext_val2.strip().lstrip(".")
except Exception:
pass
return size_int, ext
def _search_urls_across_stores(self,
pattern: str,
config: Dict[str,
Any]) -> Tuple[List[UrlItem],
List[str]]:
"""Search for URLs matching pattern across all stores.
Returns:
Tuple of (matching_items, found_stores)
"""
items: List[UrlItem] = []
found_stores: Set[str] = set()
try:
storage = Store(config)
store_names = storage.list_backends() if hasattr(storage,
"list_backends") else []
if not store_names:
log("Error: No stores configured", file=sys.stderr)
return items, list(found_stores)
for store_name in store_names:
try:
backend = storage[store_name]
title_cache: Dict[str, str] = {}
meta_cache: Dict[str, tuple[int | None, str]] = {}
# Search only URL-bearing records using the backend's URL search capability.
# This avoids the expensive/incorrect "search('*')" scan.
try:
raw_pattern = str(pattern or "").strip()
has_wildcards = any(ch in raw_pattern for ch in ("*", "?"))
# If this is a Hydrus backend and the pattern is a single URL,
# normalize it through the official API.
normalized_url = None
if not has_wildcards and hasattr(backend, "get_url_info"):
try:
info = backend.get_url_info(raw_pattern) # type: ignore[attr-defined]
if isinstance(info, dict):
norm = info.get("normalised_url") or info.get("normalized_url")
if isinstance(norm, str) and norm.strip():
normalized_url = norm.strip()
except Exception:
normalized_url = None
search_query = "url:*" if has_wildcards else f"url:{normalized_url or raw_pattern}"
try:
search_results = backend.search(search_query, limit=1000)
except Exception:
search_results = []
for hit in (search_results or []):
file_hash = None
if isinstance(hit, dict):
file_hash = hit.get("hash") or hit.get("file_hash")
if not file_hash:
continue
file_hash = str(file_hash)
title = title_cache.get(file_hash, "")
if not title:
title = self._resolve_title_for_hash(backend, file_hash, hit)
title_cache[file_hash] = title
size, ext = meta_cache.get(file_hash, (None, ""))
if size is None and not ext:
size, ext = self._resolve_size_ext_for_hash(backend, file_hash, hit)
meta_cache[file_hash] = (size, ext)
try:
urls = backend.get_url(file_hash)
except Exception:
urls = []
for url in (urls or []):
if not self._match_url_pattern(str(url), raw_pattern):
continue
items.append(
UrlItem(
url=str(url),
hash=str(file_hash),
store=str(store_name),
title=str(title or ""),
size=size,
ext=str(ext or ""),
)
)
found_stores.add(str(store_name))
except Exception as exc:
debug(
f"Error searching store '{store_name}': {exc}",
file=sys.stderr
)
continue
except KeyError:
continue
except Exception as exc:
debug(
f"Error searching store '{store_name}': {exc}",
file=sys.stderr
)
continue
return items, list(found_stores)
except Exception as exc:
log(f"Error searching stores: {exc}", file=sys.stderr)
return items, []
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Get url for file via hash+store, or search urls by pattern."""
parsed = parse_cmdlet_args(args, self)
# Check if user provided a URL pattern to search for
search_pattern = parsed.get("url")
# Allow piping a URL row (or any result with a url field/column) into get-url.
if not search_pattern:
search_pattern = self._extract_url_from_result(result)
if search_pattern:
# URL search mode: find all files with matching URLs across stores
items, stores_searched = self._search_urls_across_stores(search_pattern, config)
if not items:
log(f"No urls matching pattern: {search_pattern}", file=sys.stderr)
return 1
# NOTE: The CLI can auto-render tables from emitted items. When emitting
# dataclass objects, the generic-object renderer will include `hash` as a
# visible column. To keep HASH available for chaining but hidden from the
# table, emit dicts (dict rendering hides `hash`) and provide an explicit
# `columns` list to force display order and size formatting.
display_items: List[Dict[str, Any]] = []
table = (
ResultTable(
"url",
max_columns=5
).set_preserve_order(True).set_table("url").set_value_case("preserve")
)
table.set_source_command("get-url", ["-url", search_pattern])
for item in items:
payload: Dict[str, Any] = {
# Keep fields for downstream cmdlets.
"hash": item.hash,
"store": item.store,
"url": item.url,
"title": item.title,
"size": item.size,
"ext": item.ext,
# Force the visible table columns + ordering.
"columns": [
("Title", item.title),
("Url", item.url),
("Size", item.size),
("Ext", item.ext),
("Store", item.store),
],
}
display_items.append(payload)
table.add_result(payload)
ctx.emit(payload)
ctx.set_last_result_table(table if display_items else None, display_items, subject=result)
log(
f"Found {len(items)} matching url(s) in {len(stores_searched)} store(s)"
)
return 0
# Original mode: Get URLs for a specific file by hash+store
query_hash = sh.parse_single_hash_query(parsed.get("query"))
if parsed.get("query") and not query_hash:
log("Error: -query must be of the form hash:<sha256>")
return 1
# Extract hash and store from result or args
file_hash = query_hash or get_field(result, "hash")
store_name = parsed.get("store") or get_field(result, "store")
if not file_hash:
log(
'Error: No file hash provided (pipe an item or use -query "hash:<sha256>")'
)
return 1
if not store_name:
log("Error: No store name provided")
return 1
# Normalize hash
file_hash = normalize_hash(file_hash)
if not file_hash:
log("Error: Invalid hash format")
return 1
# Get backend and retrieve url
try:
storage = Store(config)
backend = storage[store_name]
urls = backend.get_url(file_hash)
title = str(get_field(result, "title") or "").strip()
table_title = "Title"
if title:
table_title = f"Title: {title}"
table = (
ResultTable(
table_title,
max_columns=1
).set_preserve_order(True).set_table("url").set_value_case("preserve")
)
table.set_source_command("get-url", [])
items: List[UrlItem] = []
for u in list(urls or []):
u = str(u or "").strip()
if not u:
continue
row = table.add_row()
row.add_column("Url", u)
item = UrlItem(url=u, hash=file_hash, store=str(store_name))
items.append(item)
ctx.emit(item)
# Make this a real result table so @.. / @,, can navigate it
ctx.set_last_result_table(table if items else None, items, subject=result)
if not items:
log("No url found", file=sys.stderr)
return 0
except KeyError:
log(f"Error: Storage backend '{store_name}' not configured")
return 1
except Exception as exc:
log(f"Error retrieving url: {exc}", file=sys.stderr)
return 1
# Import debug function from logger if available
try:
from SYS.logger import debug
except ImportError:
def debug(*args, **kwargs):
pass # Fallback no-op
CMDLET = Get_Url()