Files
Medios-Macina/cmdlet/get_url.py
2026-01-19 06:24:09 -08:00

708 lines
27 KiB
Python

from __future__ import annotations
from queue import SimpleQueue
from threading import Thread
from dataclasses import dataclass
from typing import Any, Dict, List, Sequence, Optional, Set, Tuple
import sys
import re
from fnmatch import fnmatch
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
from ._shared import (
Cmdlet,
SharedArgs,
parse_cmdlet_args,
get_field,
normalize_hash,
)
from SYS.logger import log
from SYS.result_table import Table
from Store import Store
from SYS import pipeline as ctx
@dataclass
class UrlItem:
url: str
hash: str
store: str
title: str = ""
size: int | None = None
ext: str = ""
class Get_Url(Cmdlet):
"""Get url associated with files via hash+store, or search urls by pattern."""
STORE_SEARCH_TIMEOUT_SECONDS = 6.0
def __init__(self) -> None:
super().__init__(
name="get-url",
summary="List url associated with a file, or search urls by pattern",
usage='@1 | get-url OR get-url -url "https://www.youtube.com/watch?v=xx"',
arg=[SharedArgs.QUERY,
SharedArgs.STORE,
SharedArgs.URL],
detail=[
"- Get url for file: @1 | get-url (requires hash+store from result)",
'- Search url across stores: get-url -url "www.google.com" (strips protocol & www prefix)',
'- Wildcard matching: get-url -url "youtube.com*" (matches all youtube.com urls)',
"- Pattern matching: domain matching ignores protocol (https://, http://, ftp://)",
],
exec=self.run,
)
self.register()
@staticmethod
def _normalize_url_for_search(url: str) -> str:
"""Strip protocol and www prefix from URL for searching.
Examples:
https://www.youtube.com/watch?v=xx -> youtube.com/watch?v=xx
http://www.google.com -> google.com
ftp://files.example.com -> files.example.com
"""
url = str(url or "").strip()
# Strip fragment (e.g., #t=10) before matching
url = url.split("#", 1)[0]
# Strip common time/tracking query params for matching
try:
parsed = urlparse(url)
except Exception:
parsed = None
if parsed is not None and parsed.query:
time_keys = {"t", "start", "time_continue", "timestamp", "time", "begin"}
tracking_prefixes = ("utm_",)
try:
pairs = parse_qsl(parsed.query, keep_blank_values=True)
filtered = []
for key, val in pairs:
key_norm = str(key or "").lower()
if key_norm in time_keys:
continue
if key_norm.startswith(tracking_prefixes):
continue
filtered.append((key, val))
if filtered:
url = urlunparse(parsed._replace(query=urlencode(filtered, doseq=True)))
else:
url = urlunparse(parsed._replace(query=""))
except Exception:
pass
# Remove protocol (http://, https://, ftp://, etc.)
url = re.sub(r"^[a-z][a-z0-9+.-]*://", "", url, flags=re.IGNORECASE)
# Remove www. prefix (case-insensitive)
url = re.sub(r"^www\.", "", url, flags=re.IGNORECASE)
return url.lower()
@staticmethod
def _looks_like_url_pattern(value: str) -> bool:
v = str(value or "").strip().lower()
if not v:
return False
if "://" in v:
return True
if v.startswith(("magnet:", "torrent:", "ytdl:", "tidal:", "ftp:", "sftp:", "file:")):
return True
return "." in v and "/" in v
@staticmethod
def _match_url_pattern(url: str, pattern: str) -> bool:
"""Match URL against pattern with wildcard support.
Strips protocol/www from both URL and pattern before matching.
Supports * and ? wildcards.
"""
raw_pattern = str(pattern or "").strip()
normalized_url = Get_Url._normalize_url_for_search(url)
normalized_pattern = Get_Url._normalize_url_for_search(raw_pattern)
looks_like_url = Get_Url._looks_like_url_pattern(raw_pattern)
has_wildcards = "*" in normalized_pattern or (
not looks_like_url and "?" in normalized_pattern
)
if has_wildcards:
return fnmatch(normalized_url, normalized_pattern)
normalized_url_no_slash = normalized_url.rstrip("/")
normalized_pattern_no_slash = normalized_pattern.rstrip("/")
if normalized_pattern_no_slash and normalized_pattern_no_slash == normalized_url_no_slash:
return True
return normalized_pattern in normalized_url
def _execute_search_with_timeout(
self,
backend: Any,
query: str,
limit: int,
store_name: str,
**kwargs: Any,
) -> Optional[List[Any]]:
queue: SimpleQueue[tuple[str, Any]] = SimpleQueue()
def _worker() -> None:
try:
queue.put(("ok", backend.search(query, limit=limit, **kwargs)))
except Exception as exc:
queue.put(("err", exc))
worker = Thread(target=_worker, daemon=True)
worker.start()
worker.join(timeout=self.STORE_SEARCH_TIMEOUT_SECONDS)
if worker.is_alive():
debug(
f"Store '{store_name}' search timed out after {self.STORE_SEARCH_TIMEOUT_SECONDS}s",
file=sys.stderr,
)
return None
if queue.empty():
return []
status, payload = queue.get()
if status == "err":
debug(
f"Store '{store_name}' search failed: {payload}",
file=sys.stderr,
)
return []
return payload or []
@staticmethod
def _extract_first_url(value: Any) -> Optional[str]:
if isinstance(value, str):
v = value.strip()
return v or None
if isinstance(value, (list, tuple)):
for item in value:
if isinstance(item, str) and item.strip():
return item.strip()
return None
@staticmethod
def _extract_urls_from_hit(hit: Any) -> List[str]:
"""Extract candidate URLs directly from a search hit, if present."""
raw = None
try:
raw = get_field(hit, "known_urls")
if not raw:
raw = get_field(hit, "urls")
if not raw:
raw = get_field(hit, "url")
if not raw:
raw = get_field(hit, "source_url") or get_field(hit, "source_urls")
except Exception:
raw = None
if isinstance(raw, str):
val = raw.strip()
return [val] if val else []
if isinstance(raw, (list, tuple)):
out: list[str] = []
for item in raw:
if not isinstance(item, str):
continue
v = item.strip()
if v:
out.append(v)
return out
return []
@staticmethod
def _extract_title_from_result(result: Any) -> Optional[str]:
# Prefer explicit title field.
# Fall back to ResultTable-style columns list.
cols = None
if isinstance(result, dict):
cols = result.get("columns")
else:
cols = getattr(result, "columns", None)
if isinstance(cols, list):
for pair in cols:
try:
if isinstance(pair, (list, tuple)) and len(pair) == 2:
k, v = pair
if str(k or "").strip().lower() in {"title", "name"}:
if isinstance(v, str) and v.strip():
return v.strip()
except Exception:
continue
return None
@staticmethod
def _resolve_title_for_hash(backend: Any, file_hash: str, hit: Any = None) -> str:
"""Best-effort title resolution for a found hash.
Strategy:
- Use the hit's existing title/columns when present.
- Prefer backend.get_metadata(hash) when available (direct lookup).
- Fallback to backend.search('hash:<sha>', limit=1) and read title.
"""
try:
if hit is not None:
from_hit = Get_Url._extract_title_from_result(hit)
if from_hit:
return from_hit
except Exception:
pass
try:
if hasattr(backend, "get_metadata"):
meta = backend.get_metadata(file_hash)
if isinstance(meta, dict):
t = meta.get("title")
if isinstance(t, str) and t.strip():
return t.strip()
except Exception:
pass
try:
if hasattr(backend, "search"):
hits = backend.search(f"hash:{file_hash}", limit=1)
if isinstance(hits, list) and hits:
t2 = Get_Url._extract_title_from_result(hits[0])
if t2:
return t2
except Exception:
pass
return ""
@staticmethod
def _resolve_size_ext_for_hash(backend: Any, file_hash: str, hit: Any = None) -> tuple[int | None, str]:
"""Best-effort (size, ext) resolution for a found hash."""
# First: see if the hit already includes these fields.
try:
size_val = get_field(hit, "size")
if size_val is None:
size_val = get_field(hit, "file_size")
if size_val is None:
size_val = get_field(hit, "filesize")
if size_val is None:
size_val = get_field(hit, "size_bytes")
size_int = int(size_val) if isinstance(size_val, (int, float)) else None
except Exception:
size_int = None
try:
ext_val = get_field(hit, "ext")
if ext_val is None:
ext_val = get_field(hit, "extension")
ext = str(ext_val).strip().lstrip(".") if isinstance(ext_val, str) else ""
except Exception:
ext = ""
if size_int is not None or ext:
return size_int, ext
# Next: backend.get_metadata(hash) when available.
try:
if hasattr(backend, "get_metadata"):
meta = backend.get_metadata(file_hash)
if isinstance(meta, dict):
size_val2 = meta.get("size")
if size_val2 is None:
size_val2 = meta.get("file_size")
if size_val2 is None:
size_val2 = meta.get("filesize")
if size_val2 is None:
size_val2 = meta.get("size_bytes")
if isinstance(size_val2, (int, float)):
size_int = int(size_val2)
ext_val2 = meta.get("ext")
if ext_val2 is None:
ext_val2 = meta.get("extension")
if isinstance(ext_val2, str) and ext_val2.strip():
ext = ext_val2.strip().lstrip(".")
except Exception:
pass
return size_int, ext
def _search_urls_across_stores(self,
pattern: str,
config: Dict[str,
Any]) -> Tuple[List[UrlItem],
List[str]]:
"""Search for URLs matching pattern across all stores.
Returns:
Tuple of (matching_items, found_stores)
"""
items: List[UrlItem] = []
found_stores: Set[str] = set()
MAX_RESULTS = 256
try:
storage = Store(config)
store_names = storage.list_backends() if hasattr(storage,
"list_backends") else []
if not store_names:
log("Error: No stores configured", file=sys.stderr)
return items, list(found_stores)
for store_name in store_names:
if len(items) >= MAX_RESULTS:
break
try:
backend = storage[store_name]
title_cache: Dict[str, str] = {}
meta_cache: Dict[str, tuple[int | None, str]] = {}
# Search only URL-bearing records using the backend's URL search capability.
# This avoids the expensive/incorrect "search('*')" scan.
try:
raw_pattern = str(pattern or "").strip()
looks_like_url = self._looks_like_url_pattern(raw_pattern)
has_wildcards = "*" in raw_pattern or (
not looks_like_url and "?" in raw_pattern
)
# If this is a Hydrus backend and the pattern is a single URL,
# normalize it through the official API. Skip for bare domains.
normalized_url = None
normalized_search_pattern = None
if not has_wildcards and looks_like_url:
normalized_search_pattern = self._normalize_url_for_search(
raw_pattern
)
if (
normalized_search_pattern
and normalized_search_pattern != raw_pattern
):
debug(
"get-url normalized raw pattern: %s -> %s",
raw_pattern,
normalized_search_pattern,
)
if hasattr(backend, "get_url_info"):
try:
info = backend.get_url_info(raw_pattern) # type: ignore[attr-defined]
if isinstance(info, dict):
norm = (
info.get("normalized_url")
or info.get("normalized_url")
)
if isinstance(norm, str) and norm.strip():
normalized_url = self._normalize_url_for_search(
norm.strip()
)
except Exception:
pass
if (
normalized_url
and normalized_url != normalized_search_pattern
and normalized_url != raw_pattern
):
debug(
"get-url normalized backend result: %s -> %s",
raw_pattern,
normalized_url,
)
target_pattern = (
normalized_url
or normalized_search_pattern
or raw_pattern
)
if has_wildcards or not target_pattern:
search_query = "url:*"
else:
wrapped_pattern = f"*{target_pattern}*"
search_query = f"url:{wrapped_pattern}"
search_limit = max(1, min(MAX_RESULTS, 1000))
search_results = self._execute_search_with_timeout(
backend,
search_query,
search_limit,
store_name,
pattern_hint=target_pattern,
)
if search_results is None:
continue
search_results = search_results or []
if not search_results and target_pattern and not has_wildcards:
fallback_results = self._execute_search_with_timeout(
backend,
"url:*",
search_limit,
store_name,
pattern_hint=target_pattern,
)
if fallback_results is None:
continue
search_results = fallback_results or []
for hit in (search_results or []):
if len(items) >= MAX_RESULTS:
break
file_hash = None
if isinstance(hit, dict):
file_hash = hit.get("hash") or hit.get("file_hash")
if not file_hash:
continue
file_hash = str(file_hash)
title = title_cache.get(file_hash, "")
if not title:
try:
title = (
get_field(hit, "title")
or get_field(hit, "name")
or get_field(hit, "file_title")
or ""
)
except Exception:
title = ""
if not title:
title = self._resolve_title_for_hash(backend, file_hash, hit)
title_cache[file_hash] = title
size, ext = meta_cache.get(file_hash, (None, ""))
if size is None and not ext:
try:
size = get_field(hit, "size")
if size is None:
size = get_field(hit, "size_bytes")
if size is None:
size = get_field(hit, "file_size")
if size is None:
size = get_field(hit, "filesize")
size = int(size) if isinstance(size, (int, float)) else None
except Exception:
size = None
try:
ext = get_field(hit, "ext") or get_field(hit, "extension")
ext = str(ext).strip().lstrip(".") if isinstance(ext, str) else ""
except Exception:
ext = ""
if size is None and not ext:
size, ext = self._resolve_size_ext_for_hash(backend, file_hash, hit)
meta_cache[file_hash] = (size, ext)
urls = self._extract_urls_from_hit(hit)
if not urls:
try:
urls = backend.get_url(file_hash)
except Exception:
urls = []
for url in (urls or []):
if len(items) >= MAX_RESULTS:
break
if not self._match_url_pattern(str(url), raw_pattern):
continue
from SYS.metadata import normalize_urls
valid = normalize_urls([str(url)])
if not valid:
continue
items.append(
UrlItem(
url=str(url),
hash=str(file_hash),
store=str(store_name),
title=str(title or ""),
size=size,
ext=str(ext or ""),
)
)
found_stores.add(str(store_name))
if len(items) >= MAX_RESULTS:
break
except Exception as exc:
debug(
f"Error searching store '{store_name}': {exc}",
file=sys.stderr
)
continue
except KeyError:
continue
except Exception as exc:
debug(
f"Error searching store '{store_name}': {exc}",
file=sys.stderr
)
continue
return items, list(found_stores)
except Exception as exc:
log(f"Error searching stores: {exc}", file=sys.stderr)
return items, []
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Get url for file via hash+store, or search urls by pattern."""
parsed = parse_cmdlet_args(args, self)
# Check if user provided a URL pattern to search for
search_pattern = parsed.get("url")
if search_pattern:
# URL search mode: find all files with matching URLs across stores
items, stores_searched = self._search_urls_across_stores(search_pattern, config)
if not items:
log(f"No urls matching pattern: {search_pattern}", file=sys.stderr)
return 1
# NOTE: The CLI can auto-render tables from emitted items. When emitting
# dataclass objects, the generic-object renderer will include `hash` as a
# visible column. To keep HASH available for chaining but hidden from the
# table, emit dicts (dict rendering hides `hash`) and provide an explicit
# `columns` list to force display order and size formatting.
display_items: List[Dict[str, Any]] = []
table = (
Table(
"url",
max_columns=5
)._perseverance(True).set_table("url").set_value_case("preserve")
)
table.set_source_command("get-url", ["-url", search_pattern])
for item in items:
payload: Dict[str, Any] = {
# Keep fields for downstream cmdlets.
"hash": item.hash,
"store": item.store,
"url": item.url,
"title": item.title,
"size": item.size,
"ext": item.ext,
# Force the visible table columns + ordering.
"columns": [
("Title", item.title),
("Url", item.url),
("Size", item.size),
("Ext", item.ext),
("Store", item.store),
],
}
display_items.append(payload)
table.add_result(payload)
ctx.set_last_result_table(table if display_items else None, display_items, subject=result)
# Emit after table state is finalized to prevent side effects in TUI rendering
for d in display_items:
ctx.emit(d)
log(
f"Found {len(items)} matching url(s) in {len(stores_searched)} store(s)"
)
return 0
# Original mode: Get URLs for a specific file by hash+store
query_hash = sh.parse_single_hash_query(parsed.get("query"))
if parsed.get("query") and not query_hash:
log("Error: -query must be of the form hash:<sha256>")
return 1
# Extract hash and store from result or args
file_hash = query_hash or get_field(result, "hash")
store_name = parsed.get("store") or get_field(result, "store")
if not file_hash:
log(
'Error: No file hash provided (pipe an item or use -query "hash:<sha256>")'
)
return 1
if not store_name:
log("Error: No store name provided")
return 1
# Get backend and retrieve url
try:
storage = Store(config)
backend = storage[store_name]
urls = backend.get_url(file_hash)
# Filter URLs to avoid data leakage from dirty DBs
from SYS.metadata import normalize_urls
urls = normalize_urls(urls)
from SYS.result_table import ItemDetailView, extract_item_metadata
# Prepare metadata for the detail view
metadata = extract_item_metadata(result)
if file_hash:
metadata["Hash"] = file_hash
if store_name:
metadata["Store"] = store_name
table = (
ItemDetailView(
"Urls",
item_metadata=metadata,
max_columns=1
)._perseverance(True).set_table("url").set_value_case("preserve")
)
table.set_source_command("get-url", [])
items: List[UrlItem] = []
for u in list(urls or []):
u = str(u or "").strip()
if not u:
continue
row = table.add_row()
row.add_column("Url", u)
item = UrlItem(url=u, hash=file_hash, store=str(store_name))
items.append(item)
# Use overlay mode to avoid "merging" with the previous status/table state.
# This is idiomatic for detail views and prevents the search table from being
# contaminated by partial re-renders.
ctx.set_last_result_table_overlay(table, items, subject=result)
# Emit items at the end for pipeline continuity
for item in items:
ctx.emit(item)
if not items:
# Still log it but the panel will show the item context
log("No url found", file=sys.stderr)
return 0
except KeyError:
log(f"Error: Storage backend '{store_name}' not configured")
return 1
except Exception as exc:
log(f"Error retrieving url: {exc}", file=sys.stderr)
return 1
# Import debug function from logger if available
try:
from SYS.logger import debug
except ImportError:
def debug(*args, **kwargs):
pass # Fallback no-op
CMDLET = Get_Url()