Files
Medios-Macina/Store/ZeroTier.py
2026-01-14 01:59:30 -08:00

528 lines
20 KiB
Python

"""ZeroTier-backed Store implementation.
This store locates a service running on peers in a ZeroTier network and
proxies store operations to that remote service. The remote service can be
our `remote_storage_server` (default) or a Hydrus API server (`service=hydrus`).
Configuration keys:
- NAME: store instance name (required)
- NETWORK_ID: ZeroTier network ID to use for discovery (required)
- SERVICE: 'remote' or 'hydrus' (default: 'remote')
- PORT: service port (default: 999 for remote, 45869 for hydrus)
- API_KEY: optional API key to include in requests
- HOST: optional preferred peer address (skip discovery if provided)
Notes:
- This implementation focuses on read operations (search, get_file, get_metadata,
tag/url ops). Uploads can be implemented later when the remote server
supports a robust, authenticated upload endpoint.
"""
from __future__ import annotations
import json
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from SYS.logger import debug, log
from Store._base import Store
class ZeroTier(Store):
@classmethod
def config(cls) -> List[Dict[str, Any]]:
return [
{"key": "NAME", "label": "Store Name", "default": "", "required": True},
{"key": "NETWORK_ID", "label": "ZeroTier Network ID", "default": "", "required": True},
{"key": "HOST", "label": "Peer address (IP)", "default": "", "required": True},
{"key": "PORT", "label": "Service Port", "default": "999", "required": False},
{"key": "SERVICE", "label": "Service Type (remote|hydrus)", "default": "remote", "required": False},
{"key": "API_KEY", "label": "API Key (optional)", "default": "", "required": False, "secret": True},
{"key": "TIMEOUT", "label": "Request timeout (s)", "default": "5", "required": False},
]
def __new__(cls, *args: Any, **kwargs: Any) -> "ZeroTier":
inst = super().__new__(cls)
name = kwargs.get("NAME")
if name is not None:
setattr(inst, "NAME", str(name))
return inst
def __init__(
self,
instance_name: Optional[str] = None,
network_id: Optional[str] = None,
service: Optional[str] = None,
port: Optional[int] = None,
api_key: Optional[str] = None,
host: Optional[str] = None,
timeout: Optional[int] = None,
*,
NAME: Optional[str] = None,
NETWORK_ID: Optional[str] = None,
SERVICE: Optional[str] = None,
PORT: Optional[int] = None,
API_KEY: Optional[str] = None,
HOST: Optional[str] = None,
TIMEOUT: Optional[int] = None,
) -> None:
if instance_name is None and NAME is not None:
instance_name = str(NAME)
if network_id is None and NETWORK_ID is not None:
network_id = str(NETWORK_ID)
if service is None and SERVICE is not None:
service = str(SERVICE)
if port is None and PORT is not None:
try:
port = int(PORT)
except Exception:
port = None
if api_key is None and API_KEY is not None:
api_key = str(API_KEY)
if host is None and HOST is not None:
host = str(HOST)
if timeout is None and TIMEOUT is not None:
try:
timeout = int(TIMEOUT)
except Exception:
timeout = None
self._name = str(instance_name or "")
self._network_id = str(network_id or "").strip()
self._service = (str(service or "remote") or "remote").lower()
self._port = int(port if port is not None else (45869 if self._service == "hydrus" else 999))
self._api_key = str(api_key or "").strip() or None
self._preferred_host = str(host or "").strip() or None
self._timeout = int(timeout or 5)
# Cached discovery result
self._cached_peer: Optional[Tuple[str, int]] = None
self._cached_client: Optional[Any] = None
def name(self) -> str:
return str(getattr(self, "_name", "zerotier"))
# -------------------- internal helpers --------------------
def _discover_peer(self, *, refresh: bool = False) -> Optional[Tuple[str, int]]:
"""Discover a peer host:port for this service on the configured network.
Returns (host, port) or None.
"""
if self._preferred_host and not refresh:
return (self._preferred_host, self._port)
if self._cached_peer and not refresh:
return self._cached_peer
try:
from API import zerotier as zt
except Exception as exc:
debug(f"ZeroTier discovery helper not available: {exc}")
return None
# Try to find a central API key for better discovery
from SYS.config import load_config
conf = load_config()
net_conf = conf.get("networking", {}).get("zerotier", {})
central_token = net_conf.get("api_key")
# Look for a matching service on the network
probe = zt.find_peer_service(
self._network_id,
service_hint=("hydrus" if self._service == "hydrus" else None),
port=self._port,
api_token=central_token,
)
if probe:
# Extract host:port
host = probe.address
port = probe.port or self._port
self._cached_peer = (host, int(port))
debug(f"ZeroTier store '{self.name()}' discovered peer {host}:{port}")
return self._cached_peer
debug(f"ZeroTier store '{self.name()}' found no peers on network {self._network_id}")
return None
def _ensure_client(self, *, refresh: bool = False) -> Optional[Any]:
"""Return a remote client object or base URL depending on service type.
For 'hydrus' service we return an API.HydrusNetwork instance; for 'remote'
service we return a base URL string to send HTTP requests to.
"""
if self._cached_client and not refresh:
return self._cached_client
peer = self._discover_peer(refresh=refresh)
if not peer:
return None
host, port = peer
if self._service == "hydrus":
try:
from API.HydrusNetwork import HydrusNetwork as HydrusClient
base_url = f"http://{host}:{port}"
client = HydrusClient(url=base_url, access_key=(self._api_key or ""), timeout=self._timeout)
self._cached_client = client
return client
except Exception as exc:
debug(f"Failed to instantiate Hydrus client for ZeroTier peer {host}:{port}: {exc}")
return None
# Default: remote_storage 'http' style API
self._cached_client = f"http://{host}:{port}"
return self._cached_client
def _request_remote(self, method: str, path: str, *, params: Optional[Dict[str, Any]] = None, json_body: Optional[Any] = None, timeout: Optional[int] = None) -> Optional[Any]:
base = self._ensure_client()
if base is None or not isinstance(base, str):
debug("No remote base URL available for ZeroTier store")
return None
url = base.rstrip("/") + path
headers = {}
if self._api_key:
headers["X-API-Key"] = self._api_key
try:
import httpx
resp = httpx.request(method, url, params=params, json=json_body, headers=headers, timeout=timeout or self._timeout)
resp.raise_for_status()
try:
return resp.json()
except Exception:
return resp.text
except Exception as exc:
debug(f"ZeroTier HTTP request failed: {method} {url} -> {exc}")
return None
# -------------------- Store API --------------------
def search(self, query: str, **kwargs: Any) -> List[Dict[str, Any]]:
"""Search for files on the remote service."""
client = self._ensure_client()
if client is None:
debug("ZeroTier search: no client available")
return []
if self._service == "hydrus":
# Hydrus API expects tags list; best-effort: treat query as a single tag or raw search term
try:
tags = [query]
payload = client.search_files(tags, return_hashes=True, return_file_ids=False, return_file_count=False)
# Hydrus JSON shape varies; normalize to simple list
files = []
try:
if isinstance(payload, dict):
rows = payload.get("files") or payload.get("metadata") or []
for r in rows:
files.append(r if isinstance(r, dict) else {})
except Exception:
pass
return files
except Exception as exc:
debug(f"Hydrus search failed: {exc}")
return []
# remote_storage path
params = {"q": query, "limit": int(kwargs.get("limit", 100))}
res = self._request_remote("GET", "/files/search", params=params)
if isinstance(res, dict):
return list(res.get("files") or [])
return []
def get_file(self, file_hash: str, **kwargs: Any) -> Optional[Path | str]:
"""Return either a URL (hydrus or remote capable) or local path (not implemented).
For Hydrus: return the direct file URL (Hydrus client URL with access token appended if needed).
For remote_storage: currently return the metadata path (if available) or None.
"""
client = self._ensure_client()
if client is None:
return None
if self._service == "hydrus":
try:
# Hydrus wrapper provides file_url() convenience
return client.file_url(file_hash)
except Exception as exc:
debug(f"Hydrus get_file failed: {exc}")
return None
# remote storage: try metadata endpoint
res = self._request_remote("GET", f"/files/{file_hash}")
if isinstance(res, dict):
# remote server returns a 'path' to the file (server-local path)
p = res.get("path") or res.get("file") or None
if isinstance(p, str) and p.startswith("http"):
return p
return p
return None
def add_file(self, file_path: Path, **kwargs: Any) -> Optional[str]:
"""Upload a local file to the remote ZeroTier peer (supports 'remote' and 'hydrus' services).
Returns the file hash on success, or None on failure.
"""
from SYS.utils import sha256_file
p = Path(file_path)
if not p.exists():
debug(f"ZeroTier add_file: local file not found: {p}")
return None
# Hydrus: delegate to Hydrus client add_file()
if self._service == "hydrus":
try:
client = self._ensure_client()
if client is None:
debug("ZeroTier add_file: Hydrus client unavailable")
return None
return client.add_file(p, **kwargs)
except Exception as exc:
debug(f"ZeroTier hydrus add_file failed: {exc}")
return None
# Remote server: POST /files/upload multipart/form-data
base = self._ensure_client()
if base is None or not isinstance(base, str):
debug("ZeroTier add_file: no remote base URL available")
return None
url = base.rstrip("/") + "/files/upload"
headers = {}
if self._api_key:
headers["X-API-Key"] = self._api_key
try:
import httpx
with open(p, "rb") as fh:
# Build form fields for tags/urls (support list or comma-separated)
data = []
if "tag" in kwargs:
tags = kwargs.get("tag") or []
if isinstance(tags, str):
tags = [t.strip() for t in tags.split(",") if t.strip()]
for t in tags:
data.append(("tag", t))
if "url" in kwargs:
urls = kwargs.get("url") or []
if isinstance(urls, str):
urls = [u.strip() for u in urls.split(",") if u.strip()]
for u in urls:
data.append(("url", u))
files = {"file": (p.name, fh, "application/octet-stream")}
resp = httpx.post(url, headers=headers, files=files, data=data, timeout=self._timeout)
resp.raise_for_status()
if resp.status_code in (200, 201):
try:
payload = resp.json()
file_hash = payload.get("hash") or payload.get("file_hash")
return file_hash
except Exception:
return None
debug(f"ZeroTier add_file failed: status {resp.status_code}")
return None
except Exception as exc:
debug(f"ZeroTier add_file exception: {exc}")
return None
def get_metadata(self, file_hash: str, **kwargs: Any) -> Optional[Dict[str, Any]]:
client = self._ensure_client()
if client is None:
return None
if self._service == "hydrus":
try:
payload = client.fetch_file_metadata(hashes=[file_hash], include_file_url=True, include_size=True, include_mime=True)
return payload
except Exception as exc:
debug(f"Hydrus fetch_file_metadata failed: {exc}")
return None
res = self._request_remote("GET", f"/files/{file_hash}")
if isinstance(res, dict):
return res
return None
def get_tag(self, file_identifier: str, **kwargs: Any) -> Tuple[List[str], str]:
# Return (tags, service). For hydrus use fetch_file_metadata service keys.
client = self._ensure_client()
if client is None:
return ([], "")
if self._service == "hydrus":
try:
payload = client.fetch_file_metadata(hashes=[file_identifier], include_service_keys_to_tags=True)
tags = []
if isinstance(payload, dict):
metas = payload.get("metadata") or []
if metas and isinstance(metas, list) and metas:
md = metas[0]
if isinstance(md, dict):
tags = md.get("service_keys_to_tags") or []
return (tags, "hydrus")
except Exception as exc:
debug(f"Hydrus get_tag failed: {exc}")
return ([], "hydrus")
res = self._request_remote("GET", f"/tags/{file_identifier}")
if isinstance(res, dict):
return (list(res.get("tag") or []), "remote")
return ([], "remote")
def add_tag(self, file_identifier: str, tags: List[str], **kwargs: Any) -> bool:
client = self._ensure_client()
if client is None:
return False
if self._service == "hydrus":
try:
service_name = kwargs.get("service_name") or "my tags"
client.add_tag(file_identifier, tags, service_name)
return True
except Exception as exc:
debug(f"Hydrus add_tag failed: {exc}")
return False
payload = {"tag": tags}
res = self._request_remote("POST", f"/tags/{file_identifier}", json_body=payload)
return bool(res)
def delete_tag(self, file_identifier: str, tags: List[str], **kwargs: Any) -> bool:
client = self._ensure_client()
if client is None:
return False
if self._service == "hydrus":
try:
service_name = kwargs.get("service_name") or "my tags"
client.delete_tag(file_identifier, tags, service_name)
return True
except Exception as exc:
debug(f"Hydrus delete_tag failed: {exc}")
return False
# remote_storage DELETE /tags/<hash>?tag=tag1,tag2
query = {"tag": ",".join(tags)}
res = self._request_remote("DELETE", f"/tags/{file_identifier}", params=query)
return bool(res)
def get_url(self, file_identifier: str, **kwargs: Any) -> List[str]:
# For Hydrus, use fetch_file_metadata to include file URL; for remote, GET tags endpoint includes urls
client = self._ensure_client()
if client is None:
return []
if self._service == "hydrus":
try:
payload = client.fetch_file_metadata(hashes=[file_identifier], include_file_url=True)
try:
metas = payload.get("metadata") or []
if metas and isinstance(metas, list) and metas:
md = metas[0]
if isinstance(md, dict):
urls = md.get("file_urls") or []
return list(urls)
except Exception:
pass
return []
except Exception as exc:
debug(f"Hydrus get_url failed: {exc}")
return []
meta = self._request_remote("GET", f"/files/{file_identifier}")
if isinstance(meta, dict):
urls = meta.get("url") or []
return list(urls)
return []
def add_url(self, file_identifier: str, url: List[str], **kwargs: Any) -> bool:
client = self._ensure_client()
if client is None:
return False
if self._service == "hydrus":
try:
client.associate_url(hashes=[file_identifier], url=url[0])
return True
except Exception as exc:
debug(f"Hydrus add_url failed: {exc}")
return False
payload = {"url": url}
res = self._request_remote("POST", f"/files/{file_identifier}/url", json_body=payload)
return bool(res)
def delete_url(self, file_identifier: str, url: List[str], **kwargs: Any) -> bool:
client = self._ensure_client()
if client is None:
return False
if self._service == "hydrus":
try:
client.delete_urls(hashes=[file_identifier], urls=url)
return True
except Exception as exc:
debug(f"Hydrus delete_url failed: {exc}")
return False
payload = {"url": url}
res = self._request_remote("DELETE", f"/files/{file_identifier}/url", json_body=payload)
return bool(res)
def get_note(self, file_identifier: str, **kwargs: Any) -> Dict[str, str]:
"""Get named notes for a file. Returns a mapping of name->text."""
client = self._ensure_client()
if client is None:
return {}
if self._service == "hydrus":
try:
# Hydrus API may expose notes via fetch_file_metadata; best-effort
payload = client.fetch_file_metadata(hashes=[file_identifier], include_notes=True)
if isinstance(payload, dict):
metas = payload.get("metadata") or []
if metas and isinstance(metas, list):
md = metas[0]
notes = md.get("notes") or {}
return dict(notes)
except Exception:
return {}
# Remote storage has no notes API yet
return {}
def set_note(self, file_identifier: str, name: str, text: str, **kwargs: Any) -> bool:
client = self._ensure_client()
if client is None:
return False
if self._service == "hydrus":
try:
client.set_note(file_identifier, name, text)
return True
except Exception:
return False
# Remote storage: not supported
return False
def delete_note(self, file_identifier: str, name: str, **kwargs: Any) -> bool:
client = self._ensure_client()
if client is None:
return False
if self._service == "hydrus":
try:
client.delete_note(file_identifier, name)
return True
except Exception:
return False
return False