"""Store registry. Concrete store implementations live in the `Store/` package. This module is the single source of truth for store discovery. This registry is config-driven: - Each store subtype (e.g. `hydrusnetwork`) maps to a concrete store class. - Each store class advertises its required config keys via `StoreClass.__new__.keys`. - Instances are created from config using those keys (case-insensitive lookup). """ from __future__ import annotations import importlib import inspect import pkgutil import re from pathlib import Path from typing import Any, Dict, Iterable, Optional, Type from SYS.logger import debug from SYS.utils import expand_path from Store._base import Store as BaseStore _SHA256_HEX_RE = re.compile(r"^[0-9a-fA-F]{64}$") # Backends that failed to initialize earlier in the current process. # Keyed by (store_type, instance_key) where instance_key is the name used under config.store... _FAILED_BACKEND_CACHE: Dict[tuple[str, str], str] = {} def _normalize_store_type(value: str) -> str: return "".join(ch.lower() for ch in str(value or "") if ch.isalnum()) def _normalize_config_key(value: str) -> str: return str(value or "").strip().upper() def _get_case_insensitive(mapping: Dict[str, Any], key: str) -> Any: if key in mapping: return mapping[key] desired = _normalize_config_key(key) for k, v in mapping.items(): if _normalize_config_key(k) == desired: return v return None def _discover_store_classes() -> Dict[str, Type[BaseStore]]: """Discover store classes from the Store package. Convention: - The store type key is the normalized class name (e.g. HydrusNetwork -> hydrusnetwork). """ import Store as store_pkg discovered: Dict[str, Type[BaseStore]] = {} for module_info in pkgutil.iter_modules(store_pkg.__path__): module_name = module_info.name if module_name in {"__init__", "_base", "registry"}: continue module = importlib.import_module(f"Store.{module_name}") for _, obj in vars(module).items(): if not inspect.isclass(obj): continue if obj is BaseStore: continue if not issubclass(obj, BaseStore): continue discovered[_normalize_store_type(obj.__name__)] = obj return discovered def _required_keys_for(store_cls: Type[BaseStore]) -> list[str]: # Support new config() schema if hasattr(store_cls, "config") and callable(store_cls.config): try: schema = store_cls.config() keys = [] if isinstance(schema, list): for field in schema: if isinstance(field, dict) and field.get("required"): k = field.get("key") if k: keys.append(str(k)) if keys: return keys except Exception: pass # Legacy __new__.keys support keys = getattr(store_cls.__new__, "keys", None) if keys is None: return [] if isinstance(keys, dict): return [str(k) for k in keys.keys()] if isinstance(keys, (list, tuple, set, frozenset)): return [str(k) for k in keys] if isinstance(keys, str): return [keys] raise TypeError( f"Unsupported __new__.keys type for {store_cls.__name__}: {type(keys)}" ) def _build_kwargs(store_cls: Type[BaseStore], instance_name: str, instance_config: Any) -> Dict[str, Any]: if isinstance(instance_config, dict): cfg_dict = dict(instance_config) else: cfg_dict = {} required = _required_keys_for(store_cls) # If NAME is required but not present, allow the instance key to provide it. if (any(_normalize_config_key(k) == "NAME" for k in required) and _get_case_insensitive(cfg_dict, "NAME") is None): cfg_dict["NAME"] = str(instance_name) kwargs: Dict[str, Any] = {} missing: list[str] = [] for key in required: value = _get_case_insensitive(cfg_dict, key) if value is None or value == "": missing.append(str(key)) continue kwargs[str(key)] = value if missing: raise ValueError( f"Missing required keys for {store_cls.__name__}: {', '.join(missing)}" ) return kwargs class Store: def __init__( self, config: Optional[Dict[str, Any]] = None, suppress_debug: bool = False ) -> None: self._config = config or {} self._suppress_debug = suppress_debug self._backends: Dict[str, BaseStore] = {} self._backend_errors: Dict[str, str] = {} self._backend_types: Dict[str, str] = {} self._load_backends() def _maybe_register_temp_alias( self, store_type: str, backend_name: str, kwargs: Dict[str, Any], backend: BaseStore ) -> None: """If a folder backend points at config['temp'], also expose it as the 'temp' backend. This keeps config compatibility (e.g. existing 'default') while presenting the temp directory under a clearer name. """ try: if _normalize_store_type(store_type) != "folder": return temp_value = self._config.get("temp") if not temp_value: return path_value = kwargs.get("PATH") or kwargs.get("path") if not path_value: return temp_path = expand_path(temp_value).resolve() backend_path = expand_path(path_value).resolve() if backend_path != temp_path: return # If the user already has a dedicated temp backend, do nothing. if "temp" in self._backends: return # Keep original name working, but add an alias. if backend_name != "temp": self._backends["temp"] = backend self._backend_types["temp"] = store_type except Exception: return def _load_backends(self) -> None: store_cfg = self._config.get("store") if not isinstance(store_cfg, dict): store_cfg = {} self._backend_types = {} classes_by_type = _discover_store_classes() for raw_store_type, instances in store_cfg.items(): if not isinstance(instances, dict): continue store_type = _normalize_store_type(str(raw_store_type)) store_cls = classes_by_type.get(store_type) if store_cls is None: if not self._suppress_debug: debug(f"[Store] Unknown store type '{raw_store_type}'") continue for instance_name, instance_config in instances.items(): backend_name = str(instance_name) # If this backend already failed earlier in this process, skip re-instantiation. cache_key = (store_type, str(instance_name)) cached_error = _FAILED_BACKEND_CACHE.get(cache_key) if cached_error: self._backend_errors[str(instance_name)] = str(cached_error) if isinstance(instance_config, dict): override_name = _get_case_insensitive( dict(instance_config), "NAME" ) if override_name: self._backend_errors[str(override_name)] = str(cached_error) continue try: kwargs = _build_kwargs( store_cls, str(instance_name), instance_config ) # Convenience normalization for filesystem-like paths. for key in list(kwargs.keys()): if _normalize_config_key(key) in {"PATH", "LOCATION"}: kwargs[key] = str(expand_path(kwargs[key])) backend = store_cls(**kwargs) backend_name = str(kwargs.get("NAME") or instance_name) self._backends[backend_name] = backend self._backend_types[backend_name] = store_type # If this is the configured temp directory, also alias it as 'temp'. self._maybe_register_temp_alias( store_type, backend_name, kwargs, backend ) except Exception as exc: err_text = str(exc) self._backend_errors[str(instance_name)] = err_text _FAILED_BACKEND_CACHE[cache_key] = err_text if not self._suppress_debug: debug( f"[Store] Failed to register {store_cls.__name__} instance '{instance_name}': {exc}" ) def _resolve_backend_name(self, backend_name: str) -> tuple[Optional[str], Optional[str]]: requested = str(backend_name or "") if requested in self._backends: return requested, None requested_norm = _normalize_store_type(requested) ci_matches = [ name for name in self._backends if _normalize_store_type(name) == requested_norm ] if len(ci_matches) == 1: return ci_matches[0], None if len(ci_matches) > 1: return None, f"Ambiguous store alias '{backend_name}' matches {ci_matches}" type_matches = [ name for name, store_type in self._backend_types.items() if store_type == requested_norm ] if len(type_matches) == 1: return type_matches[0], None if len(type_matches) > 1: return None, ( f"Ambiguous store alias '{backend_name}' matches type '{requested_norm}': {type_matches}" ) prefix_matches = [ name for name, store_type in self._backend_types.items() if store_type.startswith(requested_norm) ] if len(prefix_matches) == 1: return prefix_matches[0], None if len(prefix_matches) > 1: return None, ( f"Ambiguous store alias '{backend_name}' matches type prefix '{requested_norm}': {prefix_matches}" ) return None, None def get_backend_error(self, backend_name: str) -> Optional[str]: return self._backend_errors.get(str(backend_name)) def list_backends(self) -> list[str]: return sorted(self._backends.keys()) def list_searchable_backends(self) -> list[str]: # De-duplicate backends by instance (aliases can point at the same object). def _rank(name: str) -> int: n = str(name or "").strip().lower() if n == "temp": return 0 if n == "default": return 2 return 1 chosen: Dict[int, str] = {} for name, backend in self._backends.items(): if type(backend).search is BaseStore.search: continue key = id(backend) prev = chosen.get(key) if prev is None or _rank(name) < _rank(prev): chosen[key] = name return sorted(chosen.values()) def __getitem__(self, backend_name: str) -> BaseStore: resolved, err = self._resolve_backend_name(backend_name) if resolved: return self._backends[resolved] if err: raise KeyError( f"Unknown store backend: {backend_name}. {err}" ) raise KeyError( f"Unknown store backend: {backend_name}. Available: {list(self._backends.keys())}" ) def is_available(self, backend_name: str) -> bool: resolved, _err = self._resolve_backend_name(backend_name) return resolved is not None def try_add_url_for_pipe_object(self, pipe_obj: Any, url: str) -> bool: """Best-effort helper: if `pipe_obj` contains `store` + `hash`, add `url` to that store backend. Intended for providers to attach newly generated/hosted URLs back to an existing stored file. """ try: url_text = str(url or "").strip() if not url_text: return False store_name = None file_hash = None if isinstance(pipe_obj, dict): store_name = pipe_obj.get("store") file_hash = pipe_obj.get("hash") else: store_name = getattr(pipe_obj, "store", None) file_hash = getattr(pipe_obj, "hash", None) store_name = str(store_name).strip() if store_name is not None else "" file_hash = str(file_hash).strip() if file_hash is not None else "" if not store_name or not file_hash: return False if not _SHA256_HEX_RE.fullmatch(file_hash): return False backend = self[store_name] add_url = getattr(backend, "add_url", None) if not callable(add_url): return False ok = add_url(file_hash.lower(), [url_text]) return bool(ok) if ok is not None else True except Exception: return False def list_configured_backend_names(config: Optional[Dict[str, Any]]) -> list[str]: """Return backend instance names present in the provided config WITHOUT instantiating backends. This is a lightweight helper for CLI usage where we only need to know if a configured backend exists (e.g., to distinguish a store name from a filesystem path) without triggering backend initialization (which may perform network calls). Behaviour: - For each configured store type, returns the per-instance NAME override (case-insensitive) when present, otherwise the instance key. - Includes a 'temp' alias when a folder backend points to the configured 'temp' path. """ try: store_cfg = (config or {}).get("store") or {} if not isinstance(store_cfg, dict): return [] names: list[str] = [] for raw_store_type, instances in store_cfg.items(): if not isinstance(instances, dict): continue for instance_name, instance_config in instances.items(): if isinstance(instance_config, dict): override_name = _get_case_insensitive(dict(instance_config), "NAME") if override_name: names.append(str(override_name)) else: names.append(str(instance_name)) else: names.append(str(instance_name)) # Best-effort: alias 'temp' when a folder backend points at config['temp'] try: temp_value = (config or {}).get("temp") if temp_value: temp_path = str(expand_path(temp_value).resolve()) for raw_store_type, instances in store_cfg.items(): if not isinstance(instances, dict): continue if _normalize_store_type(str(raw_store_type)) != "folder": continue for instance_name, instance_config in instances.items(): if not isinstance(instance_config, dict): continue path_value = instance_config.get("PATH") or instance_config.get("path") if not path_value: continue if str(expand_path(path_value).resolve()) == temp_path: if "temp" not in names: names.append("temp") except Exception: pass return sorted(set(names)) except Exception: return []