Files
Medios-Macina/cmdlet/search_store.py
2025-12-23 16:36:39 -08:00

468 lines
20 KiB
Python

"""Search-store cmdlet: Search for files in storage backends (Folder, Hydrus)."""
from __future__ import annotations
from typing import Any, Dict, Sequence, List, Optional
from pathlib import Path
from collections import OrderedDict
import re
import json
import sys
from SYS.logger import log, debug
from . import _shared as sh
Cmdlet, CmdletArg, SharedArgs, get_field, should_show_help, normalize_hash, first_title_tag, parse_hash_query = (
sh.Cmdlet,
sh.CmdletArg,
sh.SharedArgs,
sh.get_field,
sh.should_show_help,
sh.normalize_hash,
sh.first_title_tag,
sh.parse_hash_query,
)
import pipeline as ctx
STORAGE_ORIGINS = {"local", "hydrus", "folder"}
class Search_Store(Cmdlet):
"""Class-based search-store cmdlet for searching storage backends."""
def __init__(self) -> None:
super().__init__(
name="search-store",
summary="Search storage backends (Folder, Hydrus) for files.",
usage="search-store [-query <query>] [-store BACKEND] [-limit N]",
arg=[
CmdletArg("query", description="Search query string"),
CmdletArg("limit", type="integer", description="Limit results (default: 100)"),
SharedArgs.STORE,
],
detail=[
"Search across storage backends: Folder stores and Hydrus instances",
"Use -store to search a specific backend by name",
"URL search: url:* (any URL) or url:<value> (URL substring)",
"Extension search: ext:<value> (e.g., ext:png)",
"Hydrus-style extension: system:filetype = png",
"Results include hash for downstream commands (get-file, add-tag, etc.)",
"Examples:",
"search-store -query foo # Search all storage backends",
"search-store -store home -query '*' # Search 'home' Hydrus instance",
"search-store -store test -query 'video' # Search 'test' folder store",
"search-store -query 'hash:deadbeef...' # Search by SHA256 hash",
"search-store -query 'url:*' # Files that have any URL",
"search-store -query 'url:youtube.com' # Files whose URL contains substring",
"search-store -query 'ext:png' # Files whose metadata ext is png",
"search-store -query 'system:filetype = png' # Hydrus: native; Folder: maps to metadata.ext",
],
exec=self.run,
)
self.register()
# --- Helper methods -------------------------------------------------
@staticmethod
def _normalize_extension(ext_value: Any) -> str:
"""Sanitize extension strings to alphanumerics and cap at 5 chars."""
ext = str(ext_value or "").strip().lstrip(".")
for sep in (" ", "|", "(", "[", "{", ",", ";"):
if sep in ext:
ext = ext.split(sep, 1)[0]
break
if "." in ext:
ext = ext.split(".")[-1]
ext = "".join(ch for ch in ext if ch.isalnum())
return ext[:5]
def _ensure_storage_columns(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Ensure storage results have the necessary fields for result_table display."""
store_value = str(payload.get("store") or "").lower()
if store_value not in STORAGE_ORIGINS:
return payload
# Ensure we have title field
if "title" not in payload:
payload["title"] = payload.get("name") or payload.get("target") or payload.get("path") or "Result"
# Ensure we have ext field
if "ext" not in payload:
title = str(payload.get("title", ""))
path_obj = Path(title)
if path_obj.suffix:
payload["ext"] = self._normalize_extension(path_obj.suffix.lstrip('.'))
else:
payload["ext"] = payload.get("ext", "")
# Ensure size_bytes is present for display (already set by search_file())
# result_table will handle formatting it
# Don't create manual columns - let result_table handle display
# This allows the table to respect max_columns and apply consistent formatting
return payload
# --- Execution ------------------------------------------------------
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Search storage backends for files."""
if should_show_help(args):
log(f"Cmdlet: {self.name}\nSummary: {self.summary}\nUsage: {self.usage}")
return 0
args_list = [str(arg) for arg in (args or [])]
refresh_mode = any(str(a).strip().lower() in {"--refresh", "-refresh"} for a in args_list)
def _format_command_title(command: str, raw_args: List[str]) -> str:
def _quote(value: str) -> str:
text = str(value)
if not text:
return '""'
needs_quotes = any(ch.isspace() for ch in text) or '"' in text
if not needs_quotes:
return text
return '"' + text.replace('"', '\\"') + '"'
cleaned = [
str(a)
for a in (raw_args or [])
if str(a).strip().lower() not in {"--refresh", "-refresh"}
]
if not cleaned:
return command
return " ".join([command, *[_quote(a) for a in cleaned]])
raw_title = None
try:
raw_title = ctx.get_current_stage_text("") if hasattr(ctx, "get_current_stage_text") else None
except Exception:
raw_title = None
command_title = (str(raw_title).strip() if raw_title else "") or _format_command_title("search-store", list(args_list))
# Build dynamic flag variants from cmdlet arg definitions.
# This avoids hardcoding flag spellings in parsing loops.
flag_registry = self.build_flag_registry()
query_flags = {f.lower() for f in (flag_registry.get("query") or {"-query", "--query"})}
store_flags = {f.lower() for f in (flag_registry.get("store") or {"-store", "--store"})}
limit_flags = {f.lower() for f in (flag_registry.get("limit") or {"-limit", "--limit"})}
# Parse arguments
query = ""
storage_backend: Optional[str] = None
limit = 100
searched_backends: List[str] = []
i = 0
while i < len(args_list):
arg = args_list[i]
low = arg.lower()
if low in query_flags and i + 1 < len(args_list):
chunk = args_list[i + 1]
query = f"{query} {chunk}".strip() if query else chunk
i += 2
continue
if low in store_flags and i + 1 < len(args_list):
storage_backend = args_list[i + 1]
i += 2
elif low in limit_flags and i + 1 < len(args_list):
try:
limit = int(args_list[i + 1])
except ValueError:
limit = 100
i += 2
elif not arg.startswith("-"):
query = f"{query} {arg}".strip() if query else arg
i += 1
else:
i += 1
store_filter: Optional[str] = None
if query:
match = re.search(r"\bstore:([^\s,]+)", query, flags=re.IGNORECASE)
if match:
store_filter = match.group(1).strip() or None
query = re.sub(r"\s*[,]?\s*store:[^\s,]+", " ", query, flags=re.IGNORECASE)
query = re.sub(r"\s{2,}", " ", query)
query = query.strip().strip(',')
if store_filter and not storage_backend:
storage_backend = store_filter
hash_query = parse_hash_query(query)
if not query:
log("Provide a search query", file=sys.stderr)
return 1
from API.folder import API_folder_store
from config import get_local_storage_path
import uuid
worker_id = str(uuid.uuid4())
library_root = get_local_storage_path(config or {})
if not library_root:
log("No library root configured", file=sys.stderr)
return 1
# Use context manager to ensure database is always closed
with API_folder_store(library_root) as db:
try:
db.insert_worker(
worker_id,
"search-store",
title=f"Search: {query}",
description=f"Query: {query}",
pipe=ctx.get_current_command_text()
)
results_list = []
import result_table
import importlib
importlib.reload(result_table)
from result_table import ResultTable
table = ResultTable(command_title)
try:
table.set_source_command("search-store", list(args_list))
except Exception:
pass
if hash_query:
try:
table.set_preserve_order(True)
except Exception:
pass
from Store import Store
storage = Store(config=config or {})
from Store._base import Store as BaseStore
backend_to_search = storage_backend or None
if hash_query:
# Explicit hash list search: build rows from backend metadata.
backends_to_try: List[str] = []
if backend_to_search:
backends_to_try = [backend_to_search]
else:
backends_to_try = list(storage.list_backends())
found_any = False
for h in hash_query:
resolved_backend_name: Optional[str] = None
resolved_backend = None
for backend_name in backends_to_try:
try:
backend = storage[backend_name]
except Exception:
continue
try:
# If get_metadata works, consider it a hit; get_file can be optional (e.g. remote URL).
meta = backend.get_metadata(h)
if meta is None:
continue
resolved_backend_name = backend_name
resolved_backend = backend
break
except Exception:
continue
if resolved_backend_name is None or resolved_backend is None:
continue
found_any = True
searched_backends.append(resolved_backend_name)
# Resolve a path/URL string if possible
path_str: Optional[str] = None
# IMPORTANT: avoid calling get_file() for remote backends.
# For Hydrus, get_file() returns a browser URL (and may include access keys),
# which should not be pulled during search/refresh.
try:
if type(resolved_backend).__name__ == "Folder":
maybe_path = resolved_backend.get_file(h)
if isinstance(maybe_path, Path):
path_str = str(maybe_path)
elif isinstance(maybe_path, str) and maybe_path:
path_str = maybe_path
except Exception:
path_str = None
meta_obj: Dict[str, Any] = {}
try:
meta_obj = resolved_backend.get_metadata(h) or {}
except Exception:
meta_obj = {}
tags_list: List[str] = []
try:
tag_result = resolved_backend.get_tag(h)
if isinstance(tag_result, tuple) and tag_result:
maybe_tags = tag_result[0]
else:
maybe_tags = tag_result
if isinstance(maybe_tags, list):
tags_list = [str(t).strip() for t in maybe_tags if isinstance(t, str) and str(t).strip()]
except Exception:
tags_list = []
title_from_tag: Optional[str] = None
try:
title_tag = first_title_tag(tags_list)
if title_tag and ":" in title_tag:
title_from_tag = title_tag.split(":", 1)[1].strip()
except Exception:
title_from_tag = None
title = title_from_tag or meta_obj.get("title") or meta_obj.get("name")
if not title and path_str:
try:
title = Path(path_str).stem
except Exception:
title = path_str
ext_val = meta_obj.get("ext") or meta_obj.get("extension")
if not ext_val and path_str:
try:
ext_val = Path(path_str).suffix
except Exception:
ext_val = None
if not ext_val and title:
try:
ext_val = Path(str(title)).suffix
except Exception:
ext_val = None
size_bytes = meta_obj.get("size")
if size_bytes is None:
size_bytes = meta_obj.get("size_bytes")
try:
size_bytes_int: Optional[int] = int(size_bytes) if size_bytes is not None else None
except Exception:
size_bytes_int = None
payload: Dict[str, Any] = {
"title": str(title or h),
"hash": h,
"store": resolved_backend_name,
"path": path_str,
"ext": self._normalize_extension(ext_val),
"size_bytes": size_bytes_int,
"tag": tags_list,
}
table.add_result(payload)
results_list.append(payload)
ctx.emit(payload)
if found_any:
table.title = command_title
if refresh_mode:
ctx.set_last_result_table_preserve_history(table, results_list)
else:
ctx.set_last_result_table(table, results_list)
db.append_worker_stdout(worker_id, json.dumps(results_list, indent=2))
db.update_worker_status(worker_id, 'completed')
return 0
log("No results found", file=sys.stderr)
if refresh_mode:
try:
table.title = command_title
ctx.set_last_result_table_preserve_history(table, [])
except Exception:
pass
db.append_worker_stdout(worker_id, json.dumps([], indent=2))
db.update_worker_status(worker_id, 'completed')
return 0
if backend_to_search:
searched_backends.append(backend_to_search)
target_backend = storage[backend_to_search]
if type(target_backend).search is BaseStore.search:
log(f"Backend '{backend_to_search}' does not support searching", file=sys.stderr)
db.update_worker_status(worker_id, 'error')
return 1
debug(f"[search-store] Searching '{backend_to_search}'")
results = target_backend.search(query, limit=limit)
debug(f"[search-store] '{backend_to_search}' -> {len(results or [])} result(s)")
else:
all_results = []
for backend_name in storage.list_searchable_backends():
try:
backend = storage[backend_name]
searched_backends.append(backend_name)
debug(f"[search-store] Searching '{backend_name}'")
backend_results = backend.search(query, limit=limit - len(all_results))
debug(f"[search-store] '{backend_name}' -> {len(backend_results or [])} result(s)")
if backend_results:
all_results.extend(backend_results)
if len(all_results) >= limit:
break
except Exception as exc:
log(f"Backend {backend_name} search failed: {exc}", file=sys.stderr)
results = all_results[:limit]
if results:
for item in results:
def _as_dict(obj: Any) -> Dict[str, Any]:
if isinstance(obj, dict):
return dict(obj)
if hasattr(obj, "to_dict") and callable(getattr(obj, "to_dict")):
return obj.to_dict() # type: ignore[arg-type]
return {"title": str(obj)}
item_dict = _as_dict(item)
if store_filter:
store_val = str(item_dict.get("store") or "").lower()
if store_filter != store_val:
continue
normalized = self._ensure_storage_columns(item_dict)
# Make hash/store available for downstream cmdlet without rerunning search
hash_val = normalized.get("hash")
store_val = normalized.get("store") or item_dict.get("store")
if hash_val and not normalized.get("hash"):
normalized["hash"] = hash_val
if store_val and not normalized.get("store"):
normalized["store"] = store_val
table.add_result(normalized)
results_list.append(normalized)
ctx.emit(normalized)
table.title = command_title
if refresh_mode:
ctx.set_last_result_table_preserve_history(table, results_list)
else:
ctx.set_last_result_table(table, results_list)
db.append_worker_stdout(worker_id, json.dumps(results_list, indent=2))
else:
log("No results found", file=sys.stderr)
if refresh_mode:
try:
table.title = command_title
ctx.set_last_result_table_preserve_history(table, [])
except Exception:
pass
db.append_worker_stdout(worker_id, json.dumps([], indent=2))
db.update_worker_status(worker_id, 'completed')
return 0
except Exception as exc:
log(f"Search failed: {exc}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
try:
db.update_worker_status(worker_id, 'error')
except Exception:
pass
return 1
CMDLET = Search_Store()