Files
Medios-Macina/API/zerotier.py

595 lines
20 KiB
Python
Raw Normal View History

2026-01-13 20:04:24 -08:00
"""ZeroTier helpers and discovery utilities.
This module provides a small, dependency-light API for interacting with a
local zerotier-one node (preferred via Python module when available, else via
`zerotier-cli`), discovering peers on a given ZeroTier network, and probing
for services running on those peers (e.g., our remote storage server or a
Hydrus instance).
Notes:
- This is intentionally conservative and all operations are best-effort and
fail gracefully when the local system does not have ZeroTier installed.
- The implementation prefers a Python ZeroTier binding when available, else
falls back to calling the `zerotier-cli` binary (if present) and parsing
JSON output where possible.
Example usage:
from API import zerotier
if zerotier.is_available():
nets = zerotier.list_networks()
zerotier.join_network("8056c2e21c000001")
2026-01-14 01:33:25 -08:00
services = zerotier.discover_services_on_network("8056c2e21c000001", ports=[999], paths=["/health","/api_version"]) # noqa: E501
2026-01-13 20:04:24 -08:00
"""
from __future__ import annotations
import json
2026-01-14 01:33:25 -08:00
import os
2026-01-13 20:04:24 -08:00
import shutil
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from SYS.logger import debug, log
# Optional Python ZeroTier bindings - prefer them when available
_HAVE_PY_ZEROTIER = False
try:
# Try common package names; not all installations will have this available
# This import is optional and callers should still work via the CLI fallback.
import zerotier as _zt # type: ignore
_HAVE_PY_ZEROTIER = True
except Exception:
_HAVE_PY_ZEROTIER = False
@dataclass
class ZeroTierNetwork:
id: str
name: str
status: str
assigned_addresses: List[str]
@dataclass
class ZeroTierServiceProbe:
address: str
port: int
path: str
url: str
ok: bool
status_code: Optional[int]
payload: Optional[Any]
service_hint: Optional[str] = None
2026-01-14 01:33:25 -08:00
def _get_cli_path() -> Optional[str]:
"""Find the zerotier-cli binary or script across common locations."""
# 1. Check PATH
p = shutil.which("zerotier-cli")
if p:
return p
# 2. Check common installation paths
candidates = []
if sys.platform == "win32":
# Check various Program Files locations and both .bat and .exe
roots = [
os.environ.get("ProgramFiles(x86)", r"C:\Program Files (x86)"),
os.environ.get("ProgramFiles", r"C:\Program Files"),
os.environ.get("ProgramData", r"C:\ProgramData"),
]
for root in roots:
base = os.path.join(root, "ZeroTier", "One", "zerotier-cli")
candidates.append(base + ".bat")
candidates.append(base + ".exe")
else:
# Linux / macOS
candidates = [
"/usr/sbin/zerotier-cli",
"/usr/local/bin/zerotier-cli",
"/sbin/zerotier-cli",
"/var/lib/zerotier-one/zerotier-cli",
]
for c in candidates:
try:
if os.path.isfile(c):
return str(c)
except Exception:
pass
return None
def _get_home_path() -> Optional[str]:
"""Return the ZeroTier home directory (containing authtoken.secret)."""
if sys.platform == "win32":
path = os.path.join(os.environ.get("ProgramData", r"C:\ProgramData"), "ZeroTier", "One")
if os.path.isdir(path):
return path
else:
# Linux
if os.path.isdir("/var/lib/zerotier-one"):
return "/var/lib/zerotier-one"
# macOS
if os.path.isdir("/Library/Application Support/ZeroTier/One"):
return "/Library/Application Support/ZeroTier/One"
return None
def _get_authtoken() -> Optional[str]:
"""Try to read the local ZeroTier authtoken.secret from the ZeroTier home dir."""
home = _get_home_path()
if home:
token_file = os.path.join(home, "authtoken.secret")
if os.path.isfile(token_file):
try:
with open(token_file, "r") as f:
return f.read().strip()
except Exception:
pass
return None
def _read_token_file(path: str) -> Optional[str]:
"""Read a token from an arbitrary file path (safely).
Returns the stripped token string or None on error.
"""
try:
with open(path, "r") as f:
t = f.read().strip()
return t if t else None
except Exception as exc:
debug(f"read_token_file failed: {exc}")
return None
def _find_file_upwards(filename: str, start: Optional[str] = None) -> Optional[str]:
"""Search for `filename` by walking up parent directories starting at `start` (or CWD).
Returns the first matching path or None.
"""
start_dir = Path(start or os.getcwd()).resolve()
for p in [start_dir] + list(start_dir.parents):
candidate = p / filename
if candidate.is_file():
return str(candidate)
return None
def _find_repo_root(start: Optional[str] = None) -> Optional[str]:
"""Find a probable repository root by looking for .git/pyproject.toml/setup.py upwards from start.
Returns the directory path or None.
"""
start_dir = Path(start or Path(__file__).resolve().parent).resolve()
for p in [start_dir] + list(start_dir.parents):
if (p / ".git").exists() or (p / "pyproject.toml").exists() or (p / "setup.py").exists():
return str(p)
return None
def _get_token_path() -> Optional[str]:
"""Return the source of an auth token: 'env' or a filesystem path to authtoken.secret.
This checks in order: env token string, env token file, CWD (and parents), repo root,
user home, and finally the system ZeroTier home.
"""
# 1: token provided directly in env
if os.environ.get("ZEROTIER_AUTH_TOKEN") or os.environ.get("ZEROTIER_AUTHTOKEN"):
return "env"
# 2: token file path provided
p = os.environ.get("ZEROTIER_AUTH_TOKEN_FILE") or os.environ.get("ZEROTIER_AUTHTOKEN_FILE")
if p and os.path.isfile(p):
return p
# 3: token file in current working dir or any parent
up = _find_file_upwards("authtoken.secret", start=os.getcwd())
if up:
return up
# 4: token file at repository root (helpful if TUI runs with a different CWD)
repo = _find_repo_root()
if repo:
rp = os.path.join(repo, "authtoken.secret")
if os.path.isfile(rp):
return rp
# 5: token file in user's home
home_candidate = os.path.join(str(Path.home()), "authtoken.secret")
if os.path.isfile(home_candidate):
return home_candidate
# 6: fallback to the ZeroTier home location
zhome = _get_home_path()
if zhome:
tz = os.path.join(zhome, "authtoken.secret")
if os.path.isfile(tz):
return tz
return None
def _get_token_override() -> Optional[str]:
"""Read the token value using the path determined by `_get_token_path()` or env.
Returns the token string, or None if no token is available.
"""
path_or_env = _get_token_path()
if path_or_env == "env":
t = os.environ.get("ZEROTIER_AUTH_TOKEN") or os.environ.get("ZEROTIER_AUTHTOKEN")
return t.strip() if t else None
if path_or_env:
return _read_token_file(path_or_env)
return None
2026-01-13 20:04:24 -08:00
def _cli_available() -> bool:
2026-01-14 01:33:25 -08:00
return _get_cli_path() is not None
2026-01-13 20:04:24 -08:00
def is_available() -> bool:
"""Return True if we can interact with ZeroTier locally (module or CLI)."""
return _HAVE_PY_ZEROTIER or _cli_available()
2026-01-14 01:33:25 -08:00
def _run_cli_capture(*args: str, timeout: float = 5.0) -> Tuple[int, str, str]:
"""Run zerotier-cli and return (returncode, stdout, stderr).
2026-01-13 20:04:24 -08:00
2026-01-14 01:33:25 -08:00
This centralizes how we call the CLI so we can always capture stderr and
returncodes and make debugging failures much easier.
2026-01-13 20:04:24 -08:00
"""
2026-01-14 01:33:25 -08:00
bin_path = _get_cli_path()
2026-01-13 20:04:24 -08:00
if not bin_path:
raise RuntimeError("zerotier-cli not found")
2026-01-14 01:33:25 -08:00
full_args = list(args)
token = _get_token_override()
if token and not any(a.startswith("-T") for a in full_args):
# Do not log the token itself; we log only its presence/length for debugging
debug(f"Using external authtoken (len={len(token)}) for CLI auth")
full_args.insert(0, f"-T{token}")
home = _get_home_path()
if home and not any(a.startswith("-D") for a in full_args):
full_args.insert(0, f"-D{home}")
cmd = [bin_path, *full_args]
2026-01-13 20:04:24 -08:00
debug(f"Running zerotier-cli: {cmd}")
2026-01-14 01:33:25 -08:00
use_shell = sys.platform == "win32" and str(bin_path).lower().endswith(".bat")
proc = subprocess.run(cmd, timeout=timeout, capture_output=True, text=True, shell=use_shell)
return proc.returncode, proc.stdout, proc.stderr
def _run_cli_json(*args: str, timeout: float = 5.0) -> Any:
"""Run zerotier-cli with arguments and parse JSON output if possible.
Returns parsed JSON on success, or raises an exception with stderr when non-zero exit.
"""
rc, out, err = _run_cli_capture(*args, timeout=timeout)
if rc != 0:
# Surface stderr or stdout in the exception so callers (and logs) can show
# the actionable message instead of a blind CalledProcessError.
raise RuntimeError(f"zerotier-cli failed (rc={rc}): {err.strip() or out.strip()}")
2026-01-13 20:04:24 -08:00
try:
2026-01-14 01:33:25 -08:00
return json.loads(out)
2026-01-13 20:04:24 -08:00
except Exception:
# Some CLI invocations might print non-json; return as raw string
2026-01-14 01:33:25 -08:00
return out
2026-01-13 20:04:24 -08:00
def list_networks() -> List[ZeroTierNetwork]:
"""Return a list of configured ZeroTier networks on this node.
Best-effort: prefers Python binding, then `zerotier-cli listnetworks -j`.
"""
nets: List[ZeroTierNetwork] = []
if _HAVE_PY_ZEROTIER:
try:
# Attempt to use common API shape (best-effort)
raw = _zt.list_networks() # type: ignore[attr-defined]
2026-01-14 01:33:25 -08:00
# If the Python binding returned results, use them. If it returned
# an empty list/None, fall back to the CLI so we don't return a
# false-empty result to the UI.
if raw:
for n in raw:
# raw entries are expected to be dict-like
nets.append(ZeroTierNetwork(
id=str(n.get("id") or n.get("networkId") or ""),
name=str(n.get("name") or ""),
status=str(n.get("status") or ""),
assigned_addresses=list(n.get("assignedAddresses") or []),
))
return nets
else:
debug("py-zerotier returned no networks; falling back to CLI")
2026-01-13 20:04:24 -08:00
except Exception as exc: # pragma: no cover - optional dependency
debug(f"py-zerotier listing failed: {exc}")
# CLI fallback
try:
data = _run_cli_json("listnetworks", "-j")
if isinstance(data, list):
for entry in data:
nets.append(ZeroTierNetwork(
id=str(entry.get("id") or ""),
name=str(entry.get("name") or ""),
status=str(entry.get("status") or ""),
assigned_addresses=list(entry.get("assignedAddresses") or []),
))
except Exception as exc:
debug(f"list_networks failed: {exc}")
return nets
def join_network(network_id: str) -> bool:
"""Join the given ZeroTier network (best-effort).
Returns True on success, False otherwise.
"""
network_id = str(network_id or "").strip()
if not network_id:
raise ValueError("network_id is required")
if _HAVE_PY_ZEROTIER:
try:
_zt.join_network(network_id) # type: ignore[attr-defined]
return True
except Exception as exc: # pragma: no cover - optional dependency
debug(f"py-zerotier join failed: {exc}")
if _cli_available():
try:
2026-01-14 01:33:25 -08:00
rc, out, err = _run_cli_capture("join", network_id, timeout=10)
if rc == 0:
return True
# Surface the CLI's stderr/stdout to callers as an exception so the TUI
# can show a helpful error (instead of a generic 'failed to join').
raise RuntimeError(f"zerotier-cli join failed (rc={rc}): {err.strip() or out.strip()}")
except Exception:
# Re-raise so callers (UI/tests) can react to the exact error
raise
2026-01-13 20:04:24 -08:00
return False
def leave_network(network_id: str) -> bool:
network_id = str(network_id or "").strip()
if not network_id:
raise ValueError("network_id is required")
if _HAVE_PY_ZEROTIER:
try:
_zt.leave_network(network_id) # type: ignore[attr-defined]
return True
except Exception as exc: # pragma: no cover - optional dependency
debug(f"py-zerotier leave failed: {exc}")
if _cli_available():
try:
2026-01-14 01:33:25 -08:00
rc, out, err = _run_cli_capture("leave", network_id, timeout=10)
if rc == 0:
return True
raise RuntimeError(f"zerotier-cli leave failed (rc={rc}): {err.strip() or out.strip()}")
except Exception:
raise
2026-01-13 20:04:24 -08:00
return False
def _strip_addr(addr: str) -> str:
# Remove trailing CID parts like '/24' and zone IDs like '%eth0'
if not addr:
return addr
a = addr.split("/")[0]
if "%" in a:
a = a.split("%", 1)[0]
return a
def get_assigned_addresses(network_id: str) -> List[str]:
"""Return assigned ZeroTier addresses for the local node on the given network."""
network_id = str(network_id or "").strip()
if not network_id:
return []
for n in list_networks():
if n.id == network_id:
return [str(_strip_addr(a)) for a in n.assigned_addresses if a]
return []
2026-01-14 01:33:25 -08:00
def fetch_central_members(network_id: str, api_token: str) -> List[Dict[str, Any]]:
"""Fetch member details from ZeroTier Central API.
Requires a valid ZeroTier Central API token.
Returns a list of member objects containing 'config' with 'ipAssignments', etc.
"""
url = f"https://my.zerotier.com/api/v1/network/{network_id}/member"
headers = {"Authorization": f"token {api_token}"}
try:
import httpx
resp = httpx.get(url, headers=headers, timeout=10)
resp.raise_for_status()
return resp.json()
except Exception:
try:
import requests
resp = requests.get(url, headers=headers, timeout=10)
resp.raise_for_status()
return resp.json()
except Exception as exc:
debug(f"ZeroTier Central API fetch failed: {exc}")
return []
2026-01-13 20:04:24 -08:00
def list_peers() -> List[Dict[str, Any]]:
"""Return peers known to the local ZeroTier node (best-effort parsing).
If CLI supports JSON output for peers it will be parsed, otherwise we return
an empty list.
"""
if _HAVE_PY_ZEROTIER:
try:
peers = _zt.list_peers() # type: ignore[attr-defined]
return list(peers or [])
except Exception as exc: # pragma: no cover - optional dependency
debug(f"py-zerotier list_peers failed: {exc}")
if _cli_available():
try:
data = _run_cli_json("peers", "-j")
if isinstance(data, list):
return data
except Exception as exc:
debug(f"zerotier-cli peers failed: {exc}")
return []
def _probe_url(url: str, *, timeout: float = 2.0, accept_json: bool = True) -> Tuple[bool, Optional[int], Optional[Any]]:
"""Try fetching the URL and return (ok, status_code, payload).
Uses httpx if available, otherwise falls back to requests.
"""
try:
try:
import httpx
resp = httpx.get(url, timeout=timeout)
code = int(resp.status_code if hasattr(resp, "status_code") else resp.status)
content_type = str(resp.headers.get("content-type") or "").lower()
if code == 200 and accept_json and "json" in content_type:
try:
return True, code, resp.json()
except Exception:
return True, code, resp.text
return (code == 200), code, resp.text
except Exception:
import requests # type: ignore
resp = requests.get(url, timeout=timeout)
code = int(resp.status_code)
content_type = str(resp.headers.get("content-type") or "").lower()
if code == 200 and accept_json and "json" in content_type:
try:
return True, code, resp.json()
except Exception:
return True, code, resp.text
return (code == 200), code, resp.text
except Exception as exc:
debug(f"Probe failed: {url} -> {exc}")
return False, None, None
def discover_services_on_network(
network_id: str,
*,
ports: Optional[List[int]] = None,
paths: Optional[List[str]] = None,
timeout: float = 2.0,
accept_json: bool = True,
2026-01-14 01:33:25 -08:00
api_token: Optional[str] = None,
2026-01-13 20:04:24 -08:00
) -> List[ZeroTierServiceProbe]:
2026-01-14 01:33:25 -08:00
"""Probe peers on the given network for HTTP services.
2026-01-13 20:04:24 -08:00
2026-01-14 01:33:25 -08:00
If api_token is provided, it fetches all member IPs from ZeroTier Central.
Otherwise, it only probes the local node's assigned addresses (for now).
2026-01-13 20:04:24 -08:00
"""
net = str(network_id or "").strip()
if not net:
raise ValueError("network_id required")
2026-01-14 01:33:25 -08:00
ports = list(ports or [999])
2026-01-13 20:04:24 -08:00
paths = list(paths or ["/health", "/api_version", "/api_version/", "/session_key"])
addresses = get_assigned_addresses(net)
2026-01-14 01:33:25 -08:00
if api_token:
members = fetch_central_members(net, api_token)
for m in members:
# Look for online members with IP assignments
if m.get("online") and m.get("config", {}).get("ipAssignments"):
for ip in m["config"]["ipAssignments"]:
if ip not in addresses:
addresses.append(ip)
2026-01-13 20:04:24 -08:00
probes: List[ZeroTierServiceProbe] = []
for addr in addresses:
host = str(addr or "").strip()
if not host:
continue
# Try both http and https schemes
for port in ports:
for path in paths:
for scheme in ("http", "https"):
url = f"{scheme}://{host}:{port}{path}"
ok, code, payload = _probe_url(url, timeout=timeout, accept_json=accept_json)
if ok:
hint = None
# Heuristics: hydrus exposes /api_version with a JSON payload
try:
if isinstance(payload, dict) and payload.get("api_version"):
hint = "hydrus"
except Exception:
pass
try:
if isinstance(payload, dict) and payload.get("status"):
hint = hint or "remote_storage"
except Exception:
pass
probes.append(ZeroTierServiceProbe(
address=host,
port=int(port),
path=path,
url=url,
ok=True,
status_code=code,
payload=payload,
service_hint=hint,
))
# stop probing other schemes for this host/port/path
break
return probes
def find_peer_service(
network_id: str,
*,
service_hint: Optional[str] = None,
port: Optional[int] = None,
path_candidates: Optional[List[str]] = None,
timeout: float = 2.0,
2026-01-14 01:33:25 -08:00
api_token: Optional[str] = None,
2026-01-13 20:04:24 -08:00
) -> Optional[ZeroTierServiceProbe]:
"""Return the first probe that matches service_hint or is successful.
Useful for selecting a peer to configure a store against.
"""
paths = path_candidates or ["/health", "/api_version", "/session_key"]
2026-01-14 01:33:25 -08:00
ports = [port] if port is not None else [999, 5000, 45869, 80, 443]
2026-01-13 20:04:24 -08:00
2026-01-14 01:33:25 -08:00
probes = discover_services_on_network(
network_id, ports=ports, paths=paths, timeout=timeout, api_token=api_token
)
2026-01-13 20:04:24 -08:00
if not probes:
return None
if service_hint:
for p in probes:
if p.service_hint and service_hint.lower() in str(p.service_hint).lower():
return p
# Hydrus detection: check payload for 'api_version'
try:
if service_hint.lower() == "hydrus" and isinstance(p.payload, dict) and p.payload.get("api_version"):
return p
except Exception:
pass
# Fallback: return the first OK probe
return probes[0] if probes else None