647 lines
22 KiB
Python
647 lines
22 KiB
Python
"""ZeroTier helpers and discovery utilities.
|
|
|
|
This module provides a small, dependency-light API for interacting with a
|
|
local zerotier-one node (preferred via Python module when available, else via
|
|
`zerotier-cli`), discovering peers on a given ZeroTier network, and probing
|
|
for services running on those peers (e.g., our remote storage server or a
|
|
Hydrus instance).
|
|
|
|
Notes:
|
|
- This is intentionally conservative and all operations are best-effort and
|
|
fail gracefully when the local system does not have ZeroTier installed.
|
|
- The implementation prefers a Python ZeroTier binding when available, else
|
|
falls back to calling the `zerotier-cli` binary (if present) and parsing
|
|
JSON output where possible.
|
|
|
|
Example usage:
|
|
from API import zerotier
|
|
if zerotier.is_available():
|
|
nets = zerotier.list_networks()
|
|
zerotier.join_network("8056c2e21c000001")
|
|
services = zerotier.discover_services_on_network("8056c2e21c000001", ports=[999], paths=["/health","/api_version"]) # noqa: E501
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from SYS.logger import debug
|
|
|
|
# Optional Python ZeroTier bindings - prefer them when available
|
|
_HAVE_PY_ZEROTIER = False
|
|
try:
|
|
# Try common package names; not all installations will have this available
|
|
# This import is optional and callers should still work via the CLI fallback.
|
|
import zerotier as _zt # type: ignore
|
|
_HAVE_PY_ZEROTIER = True
|
|
except Exception:
|
|
_HAVE_PY_ZEROTIER = False
|
|
|
|
|
|
@dataclass
|
|
class ZeroTierNetwork:
|
|
id: str
|
|
name: str
|
|
status: str
|
|
assigned_addresses: List[str]
|
|
|
|
|
|
@dataclass
|
|
class ZeroTierServiceProbe:
|
|
address: str
|
|
port: int
|
|
path: str
|
|
url: str
|
|
ok: bool
|
|
status_code: Optional[int]
|
|
payload: Optional[Any]
|
|
service_hint: Optional[str] = None
|
|
|
|
|
|
def _get_cli_path() -> Optional[str]:
|
|
"""Find the zerotier-cli binary or script across common locations."""
|
|
# 1. Check PATH
|
|
p = shutil.which("zerotier-cli")
|
|
if p:
|
|
return p
|
|
|
|
# 2. Check common installation paths
|
|
candidates = []
|
|
if sys.platform == "win32":
|
|
# Check various Program Files locations and both .bat and .exe
|
|
roots = [
|
|
os.environ.get("ProgramFiles(x86)", r"C:\Program Files (x86)"),
|
|
os.environ.get("ProgramFiles", r"C:\Program Files"),
|
|
os.environ.get("ProgramData", r"C:\ProgramData"),
|
|
]
|
|
for root in roots:
|
|
base = os.path.join(root, "ZeroTier", "One", "zerotier-cli")
|
|
candidates.append(base + ".bat")
|
|
candidates.append(base + ".exe")
|
|
else:
|
|
# Linux / macOS
|
|
candidates = [
|
|
"/usr/sbin/zerotier-cli",
|
|
"/usr/local/bin/zerotier-cli",
|
|
"/sbin/zerotier-cli",
|
|
"/var/lib/zerotier-one/zerotier-cli",
|
|
]
|
|
|
|
for c in candidates:
|
|
try:
|
|
if os.path.isfile(c):
|
|
return str(c)
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def _get_home_path() -> Optional[str]:
|
|
"""Return the ZeroTier home directory (containing authtoken.secret)."""
|
|
if sys.platform == "win32":
|
|
path = os.path.join(os.environ.get("ProgramData", r"C:\ProgramData"), "ZeroTier", "One")
|
|
if os.path.isdir(path):
|
|
return path
|
|
else:
|
|
# Linux
|
|
if os.path.isdir("/var/lib/zerotier-one"):
|
|
return "/var/lib/zerotier-one"
|
|
# macOS
|
|
if os.path.isdir("/Library/Application Support/ZeroTier/One"):
|
|
return "/Library/Application Support/ZeroTier/One"
|
|
return None
|
|
|
|
|
|
def _get_authtoken() -> Optional[str]:
|
|
"""Try to read the local ZeroTier authtoken.secret from the ZeroTier home dir."""
|
|
home = _get_home_path()
|
|
if home:
|
|
token_file = os.path.join(home, "authtoken.secret")
|
|
if os.path.isfile(token_file):
|
|
try:
|
|
with open(token_file, "r") as f:
|
|
return f.read().strip()
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _read_token_file(path: str) -> Optional[str]:
|
|
"""Read a token from an arbitrary file path (safely).
|
|
|
|
Returns the stripped token string or None on error.
|
|
"""
|
|
try:
|
|
with open(path, "r") as f:
|
|
t = f.read().strip()
|
|
return t if t else None
|
|
except Exception as exc:
|
|
debug(f"read_token_file failed: {exc}")
|
|
return None
|
|
|
|
|
|
def _find_file_upwards(filename: str, start: Optional[str] = None) -> Optional[str]:
|
|
"""Search for `filename` by walking up parent directories starting at `start` (or CWD).
|
|
|
|
Returns the first matching path or None.
|
|
"""
|
|
start_dir = Path(start or os.getcwd()).resolve()
|
|
for p in [start_dir] + list(start_dir.parents):
|
|
candidate = p / filename
|
|
if candidate.is_file():
|
|
return str(candidate)
|
|
return None
|
|
|
|
|
|
def _find_repo_root(start: Optional[str] = None) -> Optional[str]:
|
|
"""Find a probable repository root by looking for .git/pyproject.toml/setup.py upwards from start.
|
|
|
|
Returns the directory path or None.
|
|
"""
|
|
start_dir = Path(start or Path(__file__).resolve().parent).resolve()
|
|
for p in [start_dir] + list(start_dir.parents):
|
|
if (p / ".git").exists() or (p / "pyproject.toml").exists() or (p / "setup.py").exists():
|
|
return str(p)
|
|
return None
|
|
|
|
|
|
def _get_token_path() -> Optional[str]:
|
|
"""Return the source of an auth token: 'env' or a filesystem path to authtoken.secret.
|
|
|
|
This checks in order: env token string, env token file, CWD (and parents), repo root,
|
|
user home, and finally the system ZeroTier home.
|
|
"""
|
|
# 1: token provided directly in env
|
|
if os.environ.get("ZEROTIER_AUTH_TOKEN") or os.environ.get("ZEROTIER_AUTHTOKEN"):
|
|
return "env"
|
|
|
|
# 2: token file path provided
|
|
p = os.environ.get("ZEROTIER_AUTH_TOKEN_FILE") or os.environ.get("ZEROTIER_AUTHTOKEN_FILE")
|
|
if p and os.path.isfile(p):
|
|
return p
|
|
|
|
# 3: token file in current working dir or any parent
|
|
up = _find_file_upwards("authtoken.secret", start=os.getcwd())
|
|
if up:
|
|
return up
|
|
|
|
# 4: token file at repository root (helpful if TUI runs with a different CWD)
|
|
repo = _find_repo_root()
|
|
if repo:
|
|
rp = os.path.join(repo, "authtoken.secret")
|
|
if os.path.isfile(rp):
|
|
return rp
|
|
|
|
# 5: token file in user's home
|
|
home_candidate = os.path.join(str(Path.home()), "authtoken.secret")
|
|
if os.path.isfile(home_candidate):
|
|
return home_candidate
|
|
|
|
# 6: fallback to the ZeroTier home location
|
|
zhome = _get_home_path()
|
|
if zhome:
|
|
tz = os.path.join(zhome, "authtoken.secret")
|
|
if os.path.isfile(tz):
|
|
return tz
|
|
|
|
return None
|
|
|
|
|
|
def _get_token_override() -> Optional[str]:
|
|
"""Read the token value using the path determined by `_get_token_path()` or env.
|
|
|
|
Returns the token string, or None if no token is available.
|
|
"""
|
|
path_or_env = _get_token_path()
|
|
if path_or_env == "env":
|
|
t = os.environ.get("ZEROTIER_AUTH_TOKEN") or os.environ.get("ZEROTIER_AUTHTOKEN")
|
|
return t.strip() if t else None
|
|
if path_or_env:
|
|
return _read_token_file(path_or_env)
|
|
return None
|
|
|
|
|
|
def _cli_available() -> bool:
|
|
return _get_cli_path() is not None
|
|
|
|
|
|
def is_available() -> bool:
|
|
"""Return True if we can interact with ZeroTier locally (module or CLI)."""
|
|
return _HAVE_PY_ZEROTIER or _cli_available()
|
|
|
|
|
|
def _run_cli_capture(*args: str, timeout: float = 5.0) -> Tuple[int, str, str]:
|
|
"""Run zerotier-cli and return (returncode, stdout, stderr).
|
|
|
|
This centralizes how we call the CLI so we can always capture stderr and
|
|
returncodes and make debugging failures much easier.
|
|
"""
|
|
bin_path = _get_cli_path()
|
|
if not bin_path:
|
|
raise RuntimeError("zerotier-cli not found")
|
|
|
|
full_args = list(args)
|
|
token = _get_token_override()
|
|
if token and not any(a.startswith("-T") for a in full_args):
|
|
# Do not log the token itself; we log only its presence/length for debugging
|
|
debug(f"Using external authtoken (len={len(token)}) for CLI auth")
|
|
full_args.insert(0, f"-T{token}")
|
|
|
|
home = _get_home_path()
|
|
if home and not any(a.startswith("-D") for a in full_args):
|
|
full_args.insert(0, f"-D{home}")
|
|
|
|
cmd = [bin_path, *full_args]
|
|
debug(f"Running zerotier-cli: {cmd}")
|
|
use_shell = sys.platform == "win32" and str(bin_path).lower().endswith(".bat")
|
|
proc = subprocess.run(cmd, timeout=timeout, capture_output=True, text=True, shell=use_shell)
|
|
return proc.returncode, proc.stdout, proc.stderr
|
|
|
|
|
|
def _run_cli_json(*args: str, timeout: float = 5.0) -> Any:
|
|
"""Run zerotier-cli with arguments and parse JSON output if possible.
|
|
|
|
Returns parsed JSON on success, or raises an exception with stderr when non-zero exit.
|
|
"""
|
|
rc, out, err = _run_cli_capture(*args, timeout=timeout)
|
|
if rc != 0:
|
|
# Surface stderr or stdout in the exception so callers (and logs) can show
|
|
# the actionable message instead of a blind CalledProcessError.
|
|
raise RuntimeError(f"zerotier-cli failed (rc={rc}): {err.strip() or out.strip()}")
|
|
try:
|
|
return json.loads(out)
|
|
except Exception:
|
|
# Some CLI invocations might print non-json; return as raw string
|
|
return out
|
|
|
|
|
|
def list_networks() -> List[ZeroTierNetwork]:
|
|
"""Return a list of configured ZeroTier networks on this node.
|
|
|
|
Best-effort: prefers Python binding, then `zerotier-cli listnetworks -j`.
|
|
"""
|
|
nets: List[ZeroTierNetwork] = []
|
|
|
|
if _HAVE_PY_ZEROTIER:
|
|
try:
|
|
# Attempt to use common API shape (best-effort)
|
|
raw = _zt.list_networks() # type: ignore[attr-defined]
|
|
# If the Python binding returned results, use them. If it returned
|
|
# an empty list/None, fall back to the CLI so we don't return a
|
|
# false-empty result to the UI.
|
|
if raw:
|
|
for n in raw:
|
|
# raw entries are expected to be dict-like
|
|
nets.append(ZeroTierNetwork(
|
|
id=str(n.get("id") or n.get("networkId") or ""),
|
|
name=str(n.get("name") or ""),
|
|
status=str(n.get("status") or ""),
|
|
assigned_addresses=list(n.get("assignedAddresses") or []),
|
|
))
|
|
return nets
|
|
else:
|
|
debug("py-zerotier returned no networks; falling back to CLI")
|
|
except Exception as exc: # pragma: no cover - optional dependency
|
|
debug(f"py-zerotier listing failed: {exc}")
|
|
|
|
# CLI fallback
|
|
try:
|
|
data = _run_cli_json("listnetworks", "-j")
|
|
if isinstance(data, list):
|
|
for entry in data:
|
|
nets.append(ZeroTierNetwork(
|
|
id=str(entry.get("id") or ""),
|
|
name=str(entry.get("name") or ""),
|
|
status=str(entry.get("status") or ""),
|
|
assigned_addresses=list(entry.get("assignedAddresses") or []),
|
|
))
|
|
except Exception as exc:
|
|
debug(f"list_networks failed: {exc}")
|
|
|
|
return nets
|
|
|
|
|
|
def join_network(network_id: str) -> bool:
|
|
"""Join the given ZeroTier network (best-effort).
|
|
|
|
Returns True on success, False otherwise.
|
|
"""
|
|
network_id = str(network_id or "").strip()
|
|
if not network_id:
|
|
raise ValueError("network_id is required")
|
|
|
|
if _HAVE_PY_ZEROTIER:
|
|
try:
|
|
_zt.join_network(network_id) # type: ignore[attr-defined]
|
|
return True
|
|
except Exception as exc: # pragma: no cover - optional dependency
|
|
debug(f"py-zerotier join failed: {exc}")
|
|
|
|
if _cli_available():
|
|
try:
|
|
rc, out, err = _run_cli_capture("join", network_id, timeout=10)
|
|
if rc == 0:
|
|
return True
|
|
# Surface the CLI's stderr/stdout to callers as an exception so the TUI
|
|
# can show a helpful error (instead of a generic 'failed to join').
|
|
raise RuntimeError(f"zerotier-cli join failed (rc={rc}): {err.strip() or out.strip()}")
|
|
except Exception:
|
|
# Re-raise so callers (UI/tests) can react to the exact error
|
|
raise
|
|
return False
|
|
|
|
|
|
def leave_network(network_id: str) -> bool:
|
|
network_id = str(network_id or "").strip()
|
|
if not network_id:
|
|
raise ValueError("network_id is required")
|
|
|
|
if _HAVE_PY_ZEROTIER:
|
|
try:
|
|
_zt.leave_network(network_id) # type: ignore[attr-defined]
|
|
return True
|
|
except Exception as exc: # pragma: no cover - optional dependency
|
|
debug(f"py-zerotier leave failed: {exc}")
|
|
|
|
if _cli_available():
|
|
try:
|
|
rc, out, err = _run_cli_capture("leave", network_id, timeout=10)
|
|
if rc == 0:
|
|
return True
|
|
raise RuntimeError(f"zerotier-cli leave failed (rc={rc}): {err.strip() or out.strip()}")
|
|
except Exception:
|
|
raise
|
|
return False
|
|
|
|
|
|
def _strip_addr(addr: str) -> str:
|
|
# Remove trailing CID parts like '/24' and zone IDs like '%eth0'
|
|
if not addr:
|
|
return addr
|
|
a = addr.split("/")[0]
|
|
if "%" in a:
|
|
a = a.split("%", 1)[0]
|
|
return a
|
|
|
|
|
|
def get_assigned_addresses(network_id: str) -> List[str]:
|
|
"""Return assigned ZeroTier addresses for the local node on the given network."""
|
|
network_id = str(network_id or "").strip()
|
|
if not network_id:
|
|
return []
|
|
|
|
for n in list_networks():
|
|
if n.id == network_id:
|
|
return [str(_strip_addr(a)) for a in n.assigned_addresses if a]
|
|
return []
|
|
|
|
|
|
def get_assigned_subnets(network_id: str) -> List[str]:
|
|
"""Return CIDR subnets (e.g. '10.147.17.0/24') for the given network."""
|
|
network_id = str(network_id or "").strip()
|
|
if not network_id:
|
|
return []
|
|
|
|
subnets = []
|
|
for n in list_networks():
|
|
if n.id == network_id:
|
|
for addr in n.assigned_addresses:
|
|
if addr and "/" in addr:
|
|
# Calculate subnet base
|
|
try:
|
|
import ipaddress
|
|
net = ipaddress.ip_network(addr, strict=False)
|
|
subnets.append(str(net))
|
|
except Exception:
|
|
pass
|
|
return subnets
|
|
|
|
|
|
def fetch_central_members(network_id: str, api_token: str) -> List[Dict[str, Any]]:
|
|
"""Fetch member details from ZeroTier Central API.
|
|
|
|
Requires a valid ZeroTier Central API token.
|
|
Returns a list of member objects containing 'config' with 'ipAssignments', etc.
|
|
"""
|
|
url = f"https://my.zerotier.com/api/v1/network/{network_id}/member"
|
|
headers = {"Authorization": f"token {api_token}"}
|
|
try:
|
|
import httpx
|
|
|
|
resp = httpx.get(url, headers=headers, timeout=10)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception:
|
|
try:
|
|
import requests
|
|
|
|
resp = requests.get(url, headers=headers, timeout=10)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception as exc:
|
|
debug(f"ZeroTier Central API fetch failed: {exc}")
|
|
return []
|
|
|
|
|
|
def list_peers() -> List[Dict[str, Any]]:
|
|
"""Return peers known to the local ZeroTier node (best-effort parsing).
|
|
|
|
If CLI supports JSON output for peers it will be parsed, otherwise we return
|
|
an empty list.
|
|
"""
|
|
if _HAVE_PY_ZEROTIER:
|
|
try:
|
|
peers = _zt.list_peers() # type: ignore[attr-defined]
|
|
return list(peers or [])
|
|
except Exception as exc: # pragma: no cover - optional dependency
|
|
debug(f"py-zerotier list_peers failed: {exc}")
|
|
|
|
if _cli_available():
|
|
try:
|
|
data = _run_cli_json("peers", "-j")
|
|
if isinstance(data, list):
|
|
return data
|
|
except Exception as exc:
|
|
debug(f"zerotier-cli peers failed: {exc}")
|
|
return []
|
|
|
|
|
|
def _probe_url(url: str, *, timeout: float = 2.0, accept_json: bool = True) -> Tuple[bool, Optional[int], Optional[Any]]:
|
|
"""Try fetching the URL and return (ok, status_code, payload).
|
|
|
|
Uses httpx if available, otherwise falls back to requests.
|
|
"""
|
|
try:
|
|
try:
|
|
import httpx
|
|
resp = httpx.get(url, timeout=timeout)
|
|
code = int(resp.status_code if hasattr(resp, "status_code") else resp.status)
|
|
content_type = str(resp.headers.get("content-type") or "").lower()
|
|
if code == 200 and accept_json and "json" in content_type:
|
|
try:
|
|
return True, code, resp.json()
|
|
except Exception:
|
|
return True, code, resp.text
|
|
return (code == 200), code, resp.text
|
|
except Exception:
|
|
import requests # type: ignore
|
|
resp = requests.get(url, timeout=timeout)
|
|
code = int(resp.status_code)
|
|
content_type = str(resp.headers.get("content-type") or "").lower()
|
|
if code == 200 and accept_json and "json" in content_type:
|
|
try:
|
|
return True, code, resp.json()
|
|
except Exception:
|
|
return True, code, resp.text
|
|
return (code == 200), code, resp.text
|
|
except Exception as exc:
|
|
debug(f"Probe failed: {url} -> {exc}")
|
|
return False, None, None
|
|
|
|
|
|
def discover_services_on_network(
|
|
network_id: str,
|
|
*,
|
|
ports: Optional[List[int]] = None,
|
|
paths: Optional[List[str]] = None,
|
|
timeout: float = 2.0,
|
|
accept_json: bool = True,
|
|
api_token: Optional[str] = None,
|
|
) -> List[ZeroTierServiceProbe]:
|
|
"""Probe peers on the given network for HTTP services.
|
|
|
|
If api_token is provided, it fetches all member IPs from ZeroTier Central.
|
|
Otherwise, it only probes the local node's assigned addresses (for now).
|
|
"""
|
|
net = str(network_id or "").strip()
|
|
if not net:
|
|
raise ValueError("network_id required")
|
|
|
|
ports = list(ports or [999])
|
|
paths = list(paths or ["/health", "/api_version"])
|
|
|
|
addresses = get_assigned_addresses(net)
|
|
|
|
if api_token:
|
|
members = fetch_central_members(net, api_token)
|
|
for m in members:
|
|
# Look for online members with IP assignments
|
|
if m.get("online") and m.get("config", {}).get("ipAssignments"):
|
|
for ip in m["config"]["ipAssignments"]:
|
|
addr = str(ip).split("/")[0]
|
|
if addr not in addresses:
|
|
addresses.append(addr)
|
|
else:
|
|
# Fallback: if no Central token, and we are on a likely /24 subnet,
|
|
# we can try to guess/probe peers on that same subnet.
|
|
subnets = get_assigned_subnets(net)
|
|
for subnet_str in subnets:
|
|
try:
|
|
import ipaddress
|
|
subnet = ipaddress.ip_network(subnet_str, strict=False)
|
|
# Only scan if subnet is reasonably small (e.g. <= /24 = 256 hosts)
|
|
if subnet.num_addresses <= 256:
|
|
for ip in subnet.hosts():
|
|
addr = str(ip)
|
|
if addr not in addresses:
|
|
addresses.append(addr)
|
|
except Exception:
|
|
pass
|
|
|
|
probes: List[ZeroTierServiceProbe] = []
|
|
|
|
# Parallelize probes to make subnet scanning feasible
|
|
import concurrent.futures
|
|
|
|
def do_probe(host):
|
|
host_probes = []
|
|
for port in ports:
|
|
# Try HTTP first as it's the common case for local storage
|
|
for scheme in ("http", "https"):
|
|
# Fast probe of just the first path
|
|
path = paths[0]
|
|
url = f"{scheme}://{host}:{port}{path}"
|
|
ok, code, payload = _probe_url(url, timeout=timeout, accept_json=accept_json)
|
|
if ok or code == 401:
|
|
hint = None
|
|
try:
|
|
# remote_storage_server returns {"status": "ok", ...}
|
|
if code == 401:
|
|
hint = "remote_storage" # Most likely
|
|
elif isinstance(payload, dict) and payload.get("status"):
|
|
hint = "remote_storage"
|
|
# hydrus returns {"api_version": ...}
|
|
elif isinstance(payload, dict) and payload.get("api_version"):
|
|
hint = "hydrus"
|
|
except Exception:
|
|
pass
|
|
|
|
host_probes.append(ZeroTierServiceProbe(
|
|
address=host,
|
|
port=int(port),
|
|
path=path,
|
|
url=url,
|
|
ok=(code == 200),
|
|
status_code=code,
|
|
payload=payload,
|
|
service_hint=hint,
|
|
))
|
|
# Stop probing other schemes/paths for this host/port
|
|
break
|
|
return host_probes
|
|
|
|
# Use ThreadPoolExecutor for concurrent I/O probes
|
|
max_workers = min(50, len(addresses) or 1)
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
future_to_addr = {executor.submit(do_probe, addr): addr for addr in addresses}
|
|
for future in concurrent.futures.as_completed(future_to_addr):
|
|
try:
|
|
probes.extend(future.result())
|
|
except Exception:
|
|
pass
|
|
|
|
return probes
|
|
|
|
|
|
def find_peer_service(
|
|
network_id: str,
|
|
*,
|
|
service_hint: Optional[str] = None,
|
|
port: Optional[int] = None,
|
|
path_candidates: Optional[List[str]] = None,
|
|
timeout: float = 2.0,
|
|
api_token: Optional[str] = None,
|
|
) -> Optional[ZeroTierServiceProbe]:
|
|
"""Return the first probe that matches service_hint or is successful.
|
|
|
|
Useful for selecting a peer to configure a store against.
|
|
"""
|
|
paths = path_candidates or ["/health", "/api_version", "/session_key"]
|
|
ports = [port] if port is not None else [999, 5000, 45869, 80, 443]
|
|
|
|
probes = discover_services_on_network(
|
|
network_id, ports=ports, paths=paths, timeout=timeout, api_token=api_token
|
|
)
|
|
if not probes:
|
|
return None
|
|
if service_hint:
|
|
for p in probes:
|
|
if p.service_hint and service_hint.lower() in str(p.service_hint).lower():
|
|
return p
|
|
# Hydrus detection: check payload for 'api_version'
|
|
try:
|
|
if service_hint.lower() == "hydrus" and isinstance(p.payload, dict) and p.payload.get("api_version"):
|
|
return p
|
|
except Exception:
|
|
pass
|
|
# Fallback: return the first OK probe
|
|
return probes[0] if probes else None
|