Files
Medios-Macina/Store/registry.py
2026-01-11 00:52:54 -08:00

436 lines
16 KiB
Python

"""Store registry.
Concrete store implementations live in the `Store/` package.
This module is the single source of truth for store discovery.
This registry is config-driven:
- Each store subtype (e.g. `hydrusnetwork`) maps to a concrete store class.
- Each store class advertises its required config keys via `StoreClass.__new__.keys`.
- Instances are created from config using those keys (case-insensitive lookup).
"""
from __future__ import annotations
import importlib
import inspect
import pkgutil
import re
from pathlib import Path
from typing import Any, Dict, Iterable, Optional, Type
from SYS.logger import debug
from SYS.utils import expand_path
from Store._base import Store as BaseStore
_SHA256_HEX_RE = re.compile(r"^[0-9a-fA-F]{64}$")
# Backends that failed to initialize earlier in the current process.
# Keyed by (store_type, instance_key) where instance_key is the name used under config.store.<type>.<instance_key>.
_FAILED_BACKEND_CACHE: Dict[tuple[str,
str],
str] = {}
def _normalize_store_type(value: str) -> str:
return "".join(ch.lower() for ch in str(value or "") if ch.isalnum())
def _normalize_config_key(value: str) -> str:
return str(value or "").strip().upper()
def _get_case_insensitive(mapping: Dict[str, Any], key: str) -> Any:
if key in mapping:
return mapping[key]
desired = _normalize_config_key(key)
for k, v in mapping.items():
if _normalize_config_key(k) == desired:
return v
return None
def _discover_store_classes() -> Dict[str, Type[BaseStore]]:
"""Discover store classes from the Store package.
Convention:
- The store type key is the normalized class name (e.g. HydrusNetwork -> hydrusnetwork).
"""
import Store as store_pkg
discovered: Dict[str,
Type[BaseStore]] = {}
for module_info in pkgutil.iter_modules(store_pkg.__path__):
module_name = module_info.name
if module_name in {"__init__",
"_base",
"registry"}:
continue
module = importlib.import_module(f"Store.{module_name}")
for _, obj in vars(module).items():
if not inspect.isclass(obj):
continue
if obj is BaseStore:
continue
if not issubclass(obj, BaseStore):
continue
discovered[_normalize_store_type(obj.__name__)] = obj
return discovered
def _required_keys_for(store_cls: Type[BaseStore]) -> list[str]:
keys = getattr(store_cls.__new__, "keys", None)
if keys is None:
return []
if isinstance(keys, dict):
return [str(k) for k in keys.keys()]
if isinstance(keys, (list, tuple, set, frozenset)):
return [str(k) for k in keys]
if isinstance(keys, str):
return [keys]
raise TypeError(
f"Unsupported __new__.keys type for {store_cls.__name__}: {type(keys)}"
)
def _build_kwargs(store_cls: Type[BaseStore],
instance_name: str,
instance_config: Any) -> Dict[str,
Any]:
if isinstance(instance_config, dict):
cfg_dict = dict(instance_config)
else:
cfg_dict = {}
required = _required_keys_for(store_cls)
# If NAME is required but not present, allow the instance key to provide it.
if (any(_normalize_config_key(k) == "NAME" for k in required)
and _get_case_insensitive(cfg_dict,
"NAME") is None):
cfg_dict["NAME"] = str(instance_name)
kwargs: Dict[str,
Any] = {}
missing: list[str] = []
for key in required:
value = _get_case_insensitive(cfg_dict, key)
if value is None or value == "":
missing.append(str(key))
continue
kwargs[str(key)] = value
if missing:
raise ValueError(
f"Missing required keys for {store_cls.__name__}: {', '.join(missing)}"
)
return kwargs
class Store:
def __init__(
self,
config: Optional[Dict[str,
Any]] = None,
suppress_debug: bool = False
) -> None:
self._config = config or {}
self._suppress_debug = suppress_debug
self._backends: Dict[str,
BaseStore] = {}
self._backend_errors: Dict[str,
str] = {}
self._backend_types: Dict[str,
str] = {}
self._load_backends()
def _maybe_register_temp_alias(
self,
store_type: str,
backend_name: str,
kwargs: Dict[str,
Any],
backend: BaseStore
) -> None:
"""If a folder backend points at config['temp'], also expose it as the 'temp' backend.
This keeps config compatibility (e.g. existing 'default') while presenting the temp
directory under a clearer name.
"""
try:
if _normalize_store_type(store_type) != "folder":
return
temp_value = self._config.get("temp")
if not temp_value:
return
path_value = kwargs.get("PATH") or kwargs.get("path")
if not path_value:
return
temp_path = expand_path(temp_value).resolve()
backend_path = expand_path(path_value).resolve()
if backend_path != temp_path:
return
# If the user already has a dedicated temp backend, do nothing.
if "temp" in self._backends:
return
# Keep original name working, but add an alias.
if backend_name != "temp":
self._backends["temp"] = backend
self._backend_types["temp"] = store_type
except Exception:
return
def _load_backends(self) -> None:
store_cfg = self._config.get("store")
if not isinstance(store_cfg, dict):
store_cfg = {}
self._backend_types = {}
classes_by_type = _discover_store_classes()
for raw_store_type, instances in store_cfg.items():
if not isinstance(instances, dict):
continue
store_type = _normalize_store_type(str(raw_store_type))
store_cls = classes_by_type.get(store_type)
if store_cls is None:
if not self._suppress_debug:
debug(f"[Store] Unknown store type '{raw_store_type}'")
continue
for instance_name, instance_config in instances.items():
backend_name = str(instance_name)
# If this backend already failed earlier in this process, skip re-instantiation.
cache_key = (store_type, str(instance_name))
cached_error = _FAILED_BACKEND_CACHE.get(cache_key)
if cached_error:
self._backend_errors[str(instance_name)] = str(cached_error)
if isinstance(instance_config, dict):
override_name = _get_case_insensitive(
dict(instance_config),
"NAME"
)
if override_name:
self._backend_errors[str(override_name)] = str(cached_error)
continue
try:
kwargs = _build_kwargs(
store_cls,
str(instance_name),
instance_config
)
# Convenience normalization for filesystem-like paths.
for key in list(kwargs.keys()):
if _normalize_config_key(key) in {"PATH",
"LOCATION"}:
kwargs[key] = str(expand_path(kwargs[key]))
backend = store_cls(**kwargs)
backend_name = str(kwargs.get("NAME") or instance_name)
self._backends[backend_name] = backend
self._backend_types[backend_name] = store_type
# If this is the configured temp directory, also alias it as 'temp'.
self._maybe_register_temp_alias(
store_type,
backend_name,
kwargs,
backend
)
except Exception as exc:
err_text = str(exc)
self._backend_errors[str(instance_name)] = err_text
_FAILED_BACKEND_CACHE[cache_key] = err_text
if not self._suppress_debug:
debug(
f"[Store] Failed to register {store_cls.__name__} instance '{instance_name}': {exc}"
)
def _resolve_backend_name(self,
backend_name: str) -> tuple[Optional[str], Optional[str]]:
requested = str(backend_name or "")
if requested in self._backends:
return requested, None
requested_norm = _normalize_store_type(requested)
ci_matches = [
name for name in self._backends
if _normalize_store_type(name) == requested_norm
]
if len(ci_matches) == 1:
return ci_matches[0], None
if len(ci_matches) > 1:
return None, f"Ambiguous store alias '{backend_name}' matches {ci_matches}"
type_matches = [
name for name, store_type in self._backend_types.items()
if store_type == requested_norm
]
if len(type_matches) == 1:
return type_matches[0], None
if len(type_matches) > 1:
return None, (
f"Ambiguous store alias '{backend_name}' matches type '{requested_norm}': {type_matches}"
)
prefix_matches = [
name for name, store_type in self._backend_types.items()
if store_type.startswith(requested_norm)
]
if len(prefix_matches) == 1:
return prefix_matches[0], None
if len(prefix_matches) > 1:
return None, (
f"Ambiguous store alias '{backend_name}' matches type prefix '{requested_norm}': {prefix_matches}"
)
return None, None
def get_backend_error(self, backend_name: str) -> Optional[str]:
return self._backend_errors.get(str(backend_name))
def list_backends(self) -> list[str]:
return sorted(self._backends.keys())
def list_searchable_backends(self) -> list[str]:
# De-duplicate backends by instance (aliases can point at the same object).
def _rank(name: str) -> int:
n = str(name or "").strip().lower()
if n == "temp":
return 0
if n == "default":
return 2
return 1
chosen: Dict[int,
str] = {}
for name, backend in self._backends.items():
if type(backend).search is BaseStore.search:
continue
key = id(backend)
prev = chosen.get(key)
if prev is None or _rank(name) < _rank(prev):
chosen[key] = name
return sorted(chosen.values())
def __getitem__(self, backend_name: str) -> BaseStore:
resolved, err = self._resolve_backend_name(backend_name)
if resolved:
return self._backends[resolved]
if err:
raise KeyError(
f"Unknown store backend: {backend_name}. {err}"
)
raise KeyError(
f"Unknown store backend: {backend_name}. Available: {list(self._backends.keys())}"
)
def is_available(self, backend_name: str) -> bool:
resolved, _err = self._resolve_backend_name(backend_name)
return resolved is not None
def try_add_url_for_pipe_object(self, pipe_obj: Any, url: str) -> bool:
"""Best-effort helper: if `pipe_obj` contains `store` + `hash`, add `url` to that store backend.
Intended for providers to attach newly generated/hosted URLs back to an existing stored file.
"""
try:
url_text = str(url or "").strip()
if not url_text:
return False
store_name = None
file_hash = None
if isinstance(pipe_obj, dict):
store_name = pipe_obj.get("store")
file_hash = pipe_obj.get("hash")
else:
store_name = getattr(pipe_obj, "store", None)
file_hash = getattr(pipe_obj, "hash", None)
store_name = str(store_name).strip() if store_name is not None else ""
file_hash = str(file_hash).strip() if file_hash is not None else ""
if not store_name or not file_hash:
return False
if not _SHA256_HEX_RE.fullmatch(file_hash):
return False
backend = self[store_name]
add_url = getattr(backend, "add_url", None)
if not callable(add_url):
return False
ok = add_url(file_hash.lower(), [url_text])
return bool(ok) if ok is not None else True
except Exception:
return False
def list_configured_backend_names(config: Optional[Dict[str, Any]]) -> list[str]:
"""Return backend instance names present in the provided config WITHOUT instantiating backends.
This is a lightweight helper for CLI usage where we only need to know if a
configured backend exists (e.g., to distinguish a store name from a filesystem path)
without triggering backend initialization (which may perform network calls).
Behaviour:
- For each configured store type, returns the per-instance NAME override (case-insensitive)
when present, otherwise the instance key.
- Includes a 'temp' alias when a folder backend points to the configured 'temp' path.
"""
try:
store_cfg = (config or {}).get("store") or {}
if not isinstance(store_cfg, dict):
return []
names: list[str] = []
for raw_store_type, instances in store_cfg.items():
if not isinstance(instances, dict):
continue
for instance_name, instance_config in instances.items():
if isinstance(instance_config, dict):
override_name = _get_case_insensitive(dict(instance_config), "NAME")
if override_name:
names.append(str(override_name))
else:
names.append(str(instance_name))
else:
names.append(str(instance_name))
# Best-effort: alias 'temp' when a folder backend points at config['temp']
try:
temp_value = (config or {}).get("temp")
if temp_value:
temp_path = str(expand_path(temp_value).resolve())
for raw_store_type, instances in store_cfg.items():
if not isinstance(instances, dict):
continue
if _normalize_store_type(str(raw_store_type)) != "folder":
continue
for instance_name, instance_config in instances.items():
if not isinstance(instance_config, dict):
continue
path_value = instance_config.get("PATH") or instance_config.get("path")
if not path_value:
continue
if str(expand_path(path_value).resolve()) == temp_path:
if "temp" not in names:
names.append("temp")
except Exception:
pass
return sorted(set(names))
except Exception:
return []