"""ZeroTier helpers and discovery utilities. This module provides a small, dependency-light API for interacting with a local zerotier-one node (preferred via Python module when available, else via `zerotier-cli`), discovering peers on a given ZeroTier network, and probing for services running on those peers (e.g., our remote storage server or a Hydrus instance). Notes: - This is intentionally conservative and all operations are best-effort and fail gracefully when the local system does not have ZeroTier installed. - The implementation prefers a Python ZeroTier binding when available, else falls back to calling the `zerotier-cli` binary (if present) and parsing JSON output where possible. Example usage: from API import zerotier if zerotier.is_available(): nets = zerotier.list_networks() zerotier.join_network("8056c2e21c000001") services = zerotier.discover_services_on_network("8056c2e21c000001", ports=[999], paths=["/health","/api_version"]) # noqa: E501 """ from __future__ import annotations import json import os import shutil import subprocess import sys from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from SYS.logger import debug, log # Optional Python ZeroTier bindings - prefer them when available _HAVE_PY_ZEROTIER = False try: # Try common package names; not all installations will have this available # This import is optional and callers should still work via the CLI fallback. import zerotier as _zt # type: ignore _HAVE_PY_ZEROTIER = True except Exception: _HAVE_PY_ZEROTIER = False @dataclass class ZeroTierNetwork: id: str name: str status: str assigned_addresses: List[str] @dataclass class ZeroTierServiceProbe: address: str port: int path: str url: str ok: bool status_code: Optional[int] payload: Optional[Any] service_hint: Optional[str] = None def _get_cli_path() -> Optional[str]: """Find the zerotier-cli binary or script across common locations.""" # 1. Check PATH p = shutil.which("zerotier-cli") if p: return p # 2. Check common installation paths candidates = [] if sys.platform == "win32": # Check various Program Files locations and both .bat and .exe roots = [ os.environ.get("ProgramFiles(x86)", r"C:\Program Files (x86)"), os.environ.get("ProgramFiles", r"C:\Program Files"), os.environ.get("ProgramData", r"C:\ProgramData"), ] for root in roots: base = os.path.join(root, "ZeroTier", "One", "zerotier-cli") candidates.append(base + ".bat") candidates.append(base + ".exe") else: # Linux / macOS candidates = [ "/usr/sbin/zerotier-cli", "/usr/local/bin/zerotier-cli", "/sbin/zerotier-cli", "/var/lib/zerotier-one/zerotier-cli", ] for c in candidates: try: if os.path.isfile(c): return str(c) except Exception: pass return None def _get_home_path() -> Optional[str]: """Return the ZeroTier home directory (containing authtoken.secret).""" if sys.platform == "win32": path = os.path.join(os.environ.get("ProgramData", r"C:\ProgramData"), "ZeroTier", "One") if os.path.isdir(path): return path else: # Linux if os.path.isdir("/var/lib/zerotier-one"): return "/var/lib/zerotier-one" # macOS if os.path.isdir("/Library/Application Support/ZeroTier/One"): return "/Library/Application Support/ZeroTier/One" return None def _get_authtoken() -> Optional[str]: """Try to read the local ZeroTier authtoken.secret from the ZeroTier home dir.""" home = _get_home_path() if home: token_file = os.path.join(home, "authtoken.secret") if os.path.isfile(token_file): try: with open(token_file, "r") as f: return f.read().strip() except Exception: pass return None def _read_token_file(path: str) -> Optional[str]: """Read a token from an arbitrary file path (safely). Returns the stripped token string or None on error. """ try: with open(path, "r") as f: t = f.read().strip() return t if t else None except Exception as exc: debug(f"read_token_file failed: {exc}") return None def _find_file_upwards(filename: str, start: Optional[str] = None) -> Optional[str]: """Search for `filename` by walking up parent directories starting at `start` (or CWD). Returns the first matching path or None. """ start_dir = Path(start or os.getcwd()).resolve() for p in [start_dir] + list(start_dir.parents): candidate = p / filename if candidate.is_file(): return str(candidate) return None def _find_repo_root(start: Optional[str] = None) -> Optional[str]: """Find a probable repository root by looking for .git/pyproject.toml/setup.py upwards from start. Returns the directory path or None. """ start_dir = Path(start or Path(__file__).resolve().parent).resolve() for p in [start_dir] + list(start_dir.parents): if (p / ".git").exists() or (p / "pyproject.toml").exists() or (p / "setup.py").exists(): return str(p) return None def _get_token_path() -> Optional[str]: """Return the source of an auth token: 'env' or a filesystem path to authtoken.secret. This checks in order: env token string, env token file, CWD (and parents), repo root, user home, and finally the system ZeroTier home. """ # 1: token provided directly in env if os.environ.get("ZEROTIER_AUTH_TOKEN") or os.environ.get("ZEROTIER_AUTHTOKEN"): return "env" # 2: token file path provided p = os.environ.get("ZEROTIER_AUTH_TOKEN_FILE") or os.environ.get("ZEROTIER_AUTHTOKEN_FILE") if p and os.path.isfile(p): return p # 3: token file in current working dir or any parent up = _find_file_upwards("authtoken.secret", start=os.getcwd()) if up: return up # 4: token file at repository root (helpful if TUI runs with a different CWD) repo = _find_repo_root() if repo: rp = os.path.join(repo, "authtoken.secret") if os.path.isfile(rp): return rp # 5: token file in user's home home_candidate = os.path.join(str(Path.home()), "authtoken.secret") if os.path.isfile(home_candidate): return home_candidate # 6: fallback to the ZeroTier home location zhome = _get_home_path() if zhome: tz = os.path.join(zhome, "authtoken.secret") if os.path.isfile(tz): return tz return None def _get_token_override() -> Optional[str]: """Read the token value using the path determined by `_get_token_path()` or env. Returns the token string, or None if no token is available. """ path_or_env = _get_token_path() if path_or_env == "env": t = os.environ.get("ZEROTIER_AUTH_TOKEN") or os.environ.get("ZEROTIER_AUTHTOKEN") return t.strip() if t else None if path_or_env: return _read_token_file(path_or_env) return None def _cli_available() -> bool: return _get_cli_path() is not None def is_available() -> bool: """Return True if we can interact with ZeroTier locally (module or CLI).""" return _HAVE_PY_ZEROTIER or _cli_available() def _run_cli_capture(*args: str, timeout: float = 5.0) -> Tuple[int, str, str]: """Run zerotier-cli and return (returncode, stdout, stderr). This centralizes how we call the CLI so we can always capture stderr and returncodes and make debugging failures much easier. """ bin_path = _get_cli_path() if not bin_path: raise RuntimeError("zerotier-cli not found") full_args = list(args) token = _get_token_override() if token and not any(a.startswith("-T") for a in full_args): # Do not log the token itself; we log only its presence/length for debugging debug(f"Using external authtoken (len={len(token)}) for CLI auth") full_args.insert(0, f"-T{token}") home = _get_home_path() if home and not any(a.startswith("-D") for a in full_args): full_args.insert(0, f"-D{home}") cmd = [bin_path, *full_args] debug(f"Running zerotier-cli: {cmd}") use_shell = sys.platform == "win32" and str(bin_path).lower().endswith(".bat") proc = subprocess.run(cmd, timeout=timeout, capture_output=True, text=True, shell=use_shell) return proc.returncode, proc.stdout, proc.stderr def _run_cli_json(*args: str, timeout: float = 5.0) -> Any: """Run zerotier-cli with arguments and parse JSON output if possible. Returns parsed JSON on success, or raises an exception with stderr when non-zero exit. """ rc, out, err = _run_cli_capture(*args, timeout=timeout) if rc != 0: # Surface stderr or stdout in the exception so callers (and logs) can show # the actionable message instead of a blind CalledProcessError. raise RuntimeError(f"zerotier-cli failed (rc={rc}): {err.strip() or out.strip()}") try: return json.loads(out) except Exception: # Some CLI invocations might print non-json; return as raw string return out def list_networks() -> List[ZeroTierNetwork]: """Return a list of configured ZeroTier networks on this node. Best-effort: prefers Python binding, then `zerotier-cli listnetworks -j`. """ nets: List[ZeroTierNetwork] = [] if _HAVE_PY_ZEROTIER: try: # Attempt to use common API shape (best-effort) raw = _zt.list_networks() # type: ignore[attr-defined] # If the Python binding returned results, use them. If it returned # an empty list/None, fall back to the CLI so we don't return a # false-empty result to the UI. if raw: for n in raw: # raw entries are expected to be dict-like nets.append(ZeroTierNetwork( id=str(n.get("id") or n.get("networkId") or ""), name=str(n.get("name") or ""), status=str(n.get("status") or ""), assigned_addresses=list(n.get("assignedAddresses") or []), )) return nets else: debug("py-zerotier returned no networks; falling back to CLI") except Exception as exc: # pragma: no cover - optional dependency debug(f"py-zerotier listing failed: {exc}") # CLI fallback try: data = _run_cli_json("listnetworks", "-j") if isinstance(data, list): for entry in data: nets.append(ZeroTierNetwork( id=str(entry.get("id") or ""), name=str(entry.get("name") or ""), status=str(entry.get("status") or ""), assigned_addresses=list(entry.get("assignedAddresses") or []), )) except Exception as exc: debug(f"list_networks failed: {exc}") return nets def join_network(network_id: str) -> bool: """Join the given ZeroTier network (best-effort). Returns True on success, False otherwise. """ network_id = str(network_id or "").strip() if not network_id: raise ValueError("network_id is required") if _HAVE_PY_ZEROTIER: try: _zt.join_network(network_id) # type: ignore[attr-defined] return True except Exception as exc: # pragma: no cover - optional dependency debug(f"py-zerotier join failed: {exc}") if _cli_available(): try: rc, out, err = _run_cli_capture("join", network_id, timeout=10) if rc == 0: return True # Surface the CLI's stderr/stdout to callers as an exception so the TUI # can show a helpful error (instead of a generic 'failed to join'). raise RuntimeError(f"zerotier-cli join failed (rc={rc}): {err.strip() or out.strip()}") except Exception: # Re-raise so callers (UI/tests) can react to the exact error raise return False def leave_network(network_id: str) -> bool: network_id = str(network_id or "").strip() if not network_id: raise ValueError("network_id is required") if _HAVE_PY_ZEROTIER: try: _zt.leave_network(network_id) # type: ignore[attr-defined] return True except Exception as exc: # pragma: no cover - optional dependency debug(f"py-zerotier leave failed: {exc}") if _cli_available(): try: rc, out, err = _run_cli_capture("leave", network_id, timeout=10) if rc == 0: return True raise RuntimeError(f"zerotier-cli leave failed (rc={rc}): {err.strip() or out.strip()}") except Exception: raise return False def _strip_addr(addr: str) -> str: # Remove trailing CID parts like '/24' and zone IDs like '%eth0' if not addr: return addr a = addr.split("/")[0] if "%" in a: a = a.split("%", 1)[0] return a def get_assigned_addresses(network_id: str) -> List[str]: """Return assigned ZeroTier addresses for the local node on the given network.""" network_id = str(network_id or "").strip() if not network_id: return [] for n in list_networks(): if n.id == network_id: return [str(_strip_addr(a)) for a in n.assigned_addresses if a] return [] def fetch_central_members(network_id: str, api_token: str) -> List[Dict[str, Any]]: """Fetch member details from ZeroTier Central API. Requires a valid ZeroTier Central API token. Returns a list of member objects containing 'config' with 'ipAssignments', etc. """ url = f"https://my.zerotier.com/api/v1/network/{network_id}/member" headers = {"Authorization": f"token {api_token}"} try: import httpx resp = httpx.get(url, headers=headers, timeout=10) resp.raise_for_status() return resp.json() except Exception: try: import requests resp = requests.get(url, headers=headers, timeout=10) resp.raise_for_status() return resp.json() except Exception as exc: debug(f"ZeroTier Central API fetch failed: {exc}") return [] def list_peers() -> List[Dict[str, Any]]: """Return peers known to the local ZeroTier node (best-effort parsing). If CLI supports JSON output for peers it will be parsed, otherwise we return an empty list. """ if _HAVE_PY_ZEROTIER: try: peers = _zt.list_peers() # type: ignore[attr-defined] return list(peers or []) except Exception as exc: # pragma: no cover - optional dependency debug(f"py-zerotier list_peers failed: {exc}") if _cli_available(): try: data = _run_cli_json("peers", "-j") if isinstance(data, list): return data except Exception as exc: debug(f"zerotier-cli peers failed: {exc}") return [] def _probe_url(url: str, *, timeout: float = 2.0, accept_json: bool = True) -> Tuple[bool, Optional[int], Optional[Any]]: """Try fetching the URL and return (ok, status_code, payload). Uses httpx if available, otherwise falls back to requests. """ try: try: import httpx resp = httpx.get(url, timeout=timeout) code = int(resp.status_code if hasattr(resp, "status_code") else resp.status) content_type = str(resp.headers.get("content-type") or "").lower() if code == 200 and accept_json and "json" in content_type: try: return True, code, resp.json() except Exception: return True, code, resp.text return (code == 200), code, resp.text except Exception: import requests # type: ignore resp = requests.get(url, timeout=timeout) code = int(resp.status_code) content_type = str(resp.headers.get("content-type") or "").lower() if code == 200 and accept_json and "json" in content_type: try: return True, code, resp.json() except Exception: return True, code, resp.text return (code == 200), code, resp.text except Exception as exc: debug(f"Probe failed: {url} -> {exc}") return False, None, None def discover_services_on_network( network_id: str, *, ports: Optional[List[int]] = None, paths: Optional[List[str]] = None, timeout: float = 2.0, accept_json: bool = True, api_token: Optional[str] = None, ) -> List[ZeroTierServiceProbe]: """Probe peers on the given network for HTTP services. If api_token is provided, it fetches all member IPs from ZeroTier Central. Otherwise, it only probes the local node's assigned addresses (for now). """ net = str(network_id or "").strip() if not net: raise ValueError("network_id required") ports = list(ports or [999]) paths = list(paths or ["/health", "/api_version"]) addresses = get_assigned_addresses(net) if api_token: members = fetch_central_members(net, api_token) for m in members: # Look for online members with IP assignments if m.get("online") and m.get("config", {}).get("ipAssignments"): for ip in m["config"]["ipAssignments"]: addr = str(ip).split("/")[0] if addr not in addresses: addresses.append(addr) probes: List[ZeroTierServiceProbe] = [] for addr in addresses: host = str(addr or "").strip() if not host: continue # Performance optimization: if we have many addresses, skip those clearly not on our ZT subnet # (Though fetch_central_members already filters for this network) for port in ports: # Try HTTP first as it's the common case for local storage for scheme in ("http", "https"): # Fast probe of just the first path path = paths[0] url = f"{scheme}://{host}:{port}{path}" ok, code, payload = _probe_url(url, timeout=timeout, accept_json=accept_json) if ok: hint = None try: # remote_storage_server returns {"status": "ok", ...} if isinstance(payload, dict) and payload.get("status"): hint = "remote_storage" # hydrus returns {"api_version": ...} if isinstance(payload, dict) and payload.get("api_version"): hint = "hydrus" except Exception: pass probes.append(ZeroTierServiceProbe( address=host, port=int(port), path=path, url=url, ok=True, status_code=code, payload=payload, service_hint=hint, )) # Stop probing other schemes/paths for this host/port break return probes def find_peer_service( network_id: str, *, service_hint: Optional[str] = None, port: Optional[int] = None, path_candidates: Optional[List[str]] = None, timeout: float = 2.0, api_token: Optional[str] = None, ) -> Optional[ZeroTierServiceProbe]: """Return the first probe that matches service_hint or is successful. Useful for selecting a peer to configure a store against. """ paths = path_candidates or ["/health", "/api_version", "/session_key"] ports = [port] if port is not None else [999, 5000, 45869, 80, 443] probes = discover_services_on_network( network_id, ports=ports, paths=paths, timeout=timeout, api_token=api_token ) if not probes: return None if service_hint: for p in probes: if p.service_hint and service_hint.lower() in str(p.service_hint).lower(): return p # Hydrus detection: check payload for 'api_version' try: if service_hint.lower() == "hydrus" and isinstance(p.payload, dict) and p.payload.get("api_version"): return p except Exception: pass # Fallback: return the first OK probe return probes[0] if probes else None