"""ZeroTier helpers and discovery utilities. This module provides a small, dependency-light API for interacting with a local zerotier-one node (preferred via Python module when available, else via `zerotier-cli`), discovering peers on a given ZeroTier network, and probing for services running on those peers (e.g., our remote storage server or a Hydrus instance). Notes: - This is intentionally conservative and all operations are best-effort and fail gracefully when the local system does not have ZeroTier installed. - The implementation prefers a Python ZeroTier binding when available, else falls back to calling the `zerotier-cli` binary (if present) and parsing JSON output where possible. Example usage: from API import zerotier if zerotier.is_available(): nets = zerotier.list_networks() zerotier.join_network("8056c2e21c000001") services = zerotier.discover_services_on_network("8056c2e21c000001", ports=[5000], paths=["/health","/api_version"]) # noqa: E501 """ from __future__ import annotations import json import shutil import subprocess import sys from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from SYS.logger import debug, log # Optional Python ZeroTier bindings - prefer them when available _HAVE_PY_ZEROTIER = False try: # Try common package names; not all installations will have this available # This import is optional and callers should still work via the CLI fallback. import zerotier as _zt # type: ignore _HAVE_PY_ZEROTIER = True except Exception: _HAVE_PY_ZEROTIER = False @dataclass class ZeroTierNetwork: id: str name: str status: str assigned_addresses: List[str] @dataclass class ZeroTierServiceProbe: address: str port: int path: str url: str ok: bool status_code: Optional[int] payload: Optional[Any] service_hint: Optional[str] = None def _cli_available() -> bool: return bool(shutil.which("zerotier-cli")) def is_available() -> bool: """Return True if we can interact with ZeroTier locally (module or CLI).""" return _HAVE_PY_ZEROTIER or _cli_available() def _run_cli_json(*args: str, timeout: float = 5.0) -> Any: """Run zerotier-cli with arguments and parse JSON output if possible. Returns parsed JSON on success, or raises an exception. """ bin_path = shutil.which("zerotier-cli") if not bin_path: raise RuntimeError("zerotier-cli not found") cmd = [bin_path, *args] debug(f"Running zerotier-cli: {cmd}") out = subprocess.check_output(cmd, timeout=timeout) try: return json.loads(out.decode("utf-8")) except Exception: # Some CLI invocations might print non-json; return as raw string return out.decode("utf-8", "replace") def list_networks() -> List[ZeroTierNetwork]: """Return a list of configured ZeroTier networks on this node. Best-effort: prefers Python binding, then `zerotier-cli listnetworks -j`. """ nets: List[ZeroTierNetwork] = [] if _HAVE_PY_ZEROTIER: try: # Attempt to use common API shape (best-effort) raw = _zt.list_networks() # type: ignore[attr-defined] for n in raw or []: nets.append(ZeroTierNetwork( id=str(n.get("id") or n.get("networkId") or ""), name=str(n.get("name") or ""), status=str(n.get("status") or ""), assigned_addresses=list(n.get("assignedAddresses") or []), )) return nets except Exception as exc: # pragma: no cover - optional dependency debug(f"py-zerotier listing failed: {exc}") # CLI fallback try: data = _run_cli_json("listnetworks", "-j") if isinstance(data, list): for entry in data: nets.append(ZeroTierNetwork( id=str(entry.get("id") or ""), name=str(entry.get("name") or ""), status=str(entry.get("status") or ""), assigned_addresses=list(entry.get("assignedAddresses") or []), )) except Exception as exc: debug(f"list_networks failed: {exc}") return nets def join_network(network_id: str) -> bool: """Join the given ZeroTier network (best-effort). Returns True on success, False otherwise. """ network_id = str(network_id or "").strip() if not network_id: raise ValueError("network_id is required") if _HAVE_PY_ZEROTIER: try: _zt.join_network(network_id) # type: ignore[attr-defined] return True except Exception as exc: # pragma: no cover - optional dependency debug(f"py-zerotier join failed: {exc}") if _cli_available(): try: subprocess.check_call([shutil.which("zerotier-cli"), "join", network_id], timeout=10) return True except Exception as exc: debug(f"zerotier-cli join failed: {exc}") return False def leave_network(network_id: str) -> bool: network_id = str(network_id or "").strip() if not network_id: raise ValueError("network_id is required") if _HAVE_PY_ZEROTIER: try: _zt.leave_network(network_id) # type: ignore[attr-defined] return True except Exception as exc: # pragma: no cover - optional dependency debug(f"py-zerotier leave failed: {exc}") if _cli_available(): try: subprocess.check_call([shutil.which("zerotier-cli"), "leave", network_id], timeout=10) return True except Exception as exc: debug(f"zerotier-cli leave failed: {exc}") return False def _strip_addr(addr: str) -> str: # Remove trailing CID parts like '/24' and zone IDs like '%eth0' if not addr: return addr a = addr.split("/")[0] if "%" in a: a = a.split("%", 1)[0] return a def get_assigned_addresses(network_id: str) -> List[str]: """Return assigned ZeroTier addresses for the local node on the given network.""" network_id = str(network_id or "").strip() if not network_id: return [] for n in list_networks(): if n.id == network_id: return [str(_strip_addr(a)) for a in n.assigned_addresses if a] return [] def list_peers() -> List[Dict[str, Any]]: """Return peers known to the local ZeroTier node (best-effort parsing). If CLI supports JSON output for peers it will be parsed, otherwise we return an empty list. """ if _HAVE_PY_ZEROTIER: try: peers = _zt.list_peers() # type: ignore[attr-defined] return list(peers or []) except Exception as exc: # pragma: no cover - optional dependency debug(f"py-zerotier list_peers failed: {exc}") if _cli_available(): try: data = _run_cli_json("peers", "-j") if isinstance(data, list): return data except Exception as exc: debug(f"zerotier-cli peers failed: {exc}") return [] def _probe_url(url: str, *, timeout: float = 2.0, accept_json: bool = True) -> Tuple[bool, Optional[int], Optional[Any]]: """Try fetching the URL and return (ok, status_code, payload). Uses httpx if available, otherwise falls back to requests. """ try: try: import httpx resp = httpx.get(url, timeout=timeout) code = int(resp.status_code if hasattr(resp, "status_code") else resp.status) content_type = str(resp.headers.get("content-type") or "").lower() if code == 200 and accept_json and "json" in content_type: try: return True, code, resp.json() except Exception: return True, code, resp.text return (code == 200), code, resp.text except Exception: import requests # type: ignore resp = requests.get(url, timeout=timeout) code = int(resp.status_code) content_type = str(resp.headers.get("content-type") or "").lower() if code == 200 and accept_json and "json" in content_type: try: return True, code, resp.json() except Exception: return True, code, resp.text return (code == 200), code, resp.text except Exception as exc: debug(f"Probe failed: {url} -> {exc}") return False, None, None def discover_services_on_network( network_id: str, *, ports: Optional[List[int]] = None, paths: Optional[List[str]] = None, timeout: float = 2.0, accept_json: bool = True, ) -> List[ZeroTierServiceProbe]: """Probe assigned addresses on the given network for HTTP services. Returns a list of ZeroTierServiceProbe entries for successful probes. By default probes `ports=[5000]` (our remote_storage_server default) and `paths=["/health","/api_version"]` which should detect either our remote_storage_server or Hydrus instances. """ net = str(network_id or "").strip() if not net: raise ValueError("network_id required") ports = list(ports or [5000]) paths = list(paths or ["/health", "/api_version", "/api_version/", "/session_key"]) addresses = get_assigned_addresses(net) probes: List[ZeroTierServiceProbe] = [] for addr in addresses: host = str(addr or "").strip() if not host: continue # Try both http and https schemes for port in ports: for path in paths: for scheme in ("http", "https"): url = f"{scheme}://{host}:{port}{path}" ok, code, payload = _probe_url(url, timeout=timeout, accept_json=accept_json) if ok: hint = None # Heuristics: hydrus exposes /api_version with a JSON payload try: if isinstance(payload, dict) and payload.get("api_version"): hint = "hydrus" except Exception: pass try: if isinstance(payload, dict) and payload.get("status"): hint = hint or "remote_storage" except Exception: pass probes.append(ZeroTierServiceProbe( address=host, port=int(port), path=path, url=url, ok=True, status_code=code, payload=payload, service_hint=hint, )) # stop probing other schemes for this host/port/path break return probes def find_peer_service( network_id: str, *, service_hint: Optional[str] = None, port: Optional[int] = None, path_candidates: Optional[List[str]] = None, timeout: float = 2.0, ) -> Optional[ZeroTierServiceProbe]: """Return the first probe that matches service_hint or is successful. Useful for selecting a peer to configure a store against. """ paths = path_candidates or ["/health", "/api_version", "/session_key"] ports = [port] if port is not None else [5000, 45869, 80, 443] probes = discover_services_on_network(network_id, ports=ports, paths=paths, timeout=timeout) if not probes: return None if service_hint: for p in probes: if p.service_hint and service_hint.lower() in str(p.service_hint).lower(): return p # Hydrus detection: check payload for 'api_version' try: if service_hint.lower() == "hydrus" and isinstance(p.payload, dict) and p.payload.get("api_version"): return p except Exception: pass # Fallback: return the first OK probe return probes[0] if probes else None