This commit is contained in:
2026-01-14 04:27:54 -08:00
parent cd60c86868
commit 187a230e98
8 changed files with 318 additions and 154 deletions

View File

@@ -3569,7 +3569,7 @@ class LocalLibrarySearchOptimizer:
except Exception as e:
logger.warning(f"Failed to update search result for {file_path}: {e}")
def search_by_tag(self, tag: str, limit: int = 100) -> List[Path]:
def search_by_tag(self, tag: str, limit: int = 100) -> List[Dict[str, Any]]:
"""Fast tag-based search using database."""
if not self.db:
return []
@@ -3578,9 +3578,10 @@ class LocalLibrarySearchOptimizer:
cursor = self.db.connection.cursor()
cursor.execute(
"""
SELECT f.file_path
SELECT f.hash, f.file_path, m.duration, m.size, m.type as media_kind, m.url
FROM file f
JOIN tag t ON f.hash = t.hash
LEFT JOIN metadata m ON f.hash = m.hash
WHERE t.tag LIKE ?
LIMIT ?
""",
@@ -3588,11 +3589,47 @@ class LocalLibrarySearchOptimizer:
limit),
)
return [self.db._from_db_file_path(row[0]) for row in cursor.fetchall()]
results = []
for row in cursor.fetchall():
res = dict(row)
# Resolve path to absolute string for remote consumption
res["file_path"] = str(self.db._from_db_file_path(res["file_path"]))
results.append(res)
return results
except Exception as e:
logger.error(f"Tag search failed: {e}")
return []
def search_by_name(self, query: str, limit: int = 100) -> List[Dict[str, Any]]:
"""Fast name-based search using database."""
if not self.db:
return []
try:
cursor = self.db.connection.cursor()
cursor.execute(
"""
SELECT f.hash, f.file_path, m.duration, m.size, m.type as media_kind, m.url
FROM file f
LEFT JOIN metadata m ON f.hash = m.hash
WHERE f.file_path LIKE ?
LIMIT ?
""",
(f"%{query}%",
limit),
)
results = []
for row in cursor.fetchall():
res = dict(row)
# Resolve path to absolute string for remote consumption
res["file_path"] = str(self.db._from_db_file_path(res["file_path"]))
results.append(res)
return results
except Exception as e:
logger.error(f"Name search failed: {e}")
return []
def save_playlist(self, name: str, items: List[Dict[str, Any]]) -> bool:
"""Save a playlist to the database."""
if not self.db:
@@ -3705,9 +3742,6 @@ class LocalLibrarySearchOptimizer:
except Exception as e:
logger.error(f"Failed to delete playlist ID {playlist_id}: {e}")
return False
if not self.db:
return []
return self.db.search_by_tag(tag, limit)
def search_by_hash(self, file_hash: str) -> Optional[Path]:
"""Fast hash-based search using database."""

View File

@@ -536,14 +536,16 @@ def discover_services_on_network(
path = paths[0]
url = f"{scheme}://{host}:{port}{path}"
ok, code, payload = _probe_url(url, timeout=timeout, accept_json=accept_json)
if ok:
if ok or code == 401:
hint = None
try:
# remote_storage_server returns {"status": "ok", ...}
if isinstance(payload, dict) and payload.get("status"):
if code == 401:
hint = "remote_storage" # Most likely
elif isinstance(payload, dict) and payload.get("status"):
hint = "remote_storage"
# hydrus returns {"api_version": ...}
if isinstance(payload, dict) and payload.get("api_version"):
elif isinstance(payload, dict) and payload.get("api_version"):
hint = "hydrus"
except Exception:
pass
@@ -553,7 +555,7 @@ def discover_services_on_network(
port=int(port),
path=path,
url=url,
ok=True,
ok=(code == 200),
status_code=code,
payload=payload,
service_hint=hint,

5
CLI.py
View File

@@ -4571,6 +4571,8 @@ class MedeiaCLI:
return app
def run(self) -> None:
ensure_zerotier_server_running()
# Ensure Rich tracebacks are active even when invoking subcommands.
try:
config = self._config_loader.load()
@@ -4587,7 +4589,6 @@ class MedeiaCLI:
self.build_app()()
def run_repl(self) -> None:
ensure_zerotier_server_running()
# console = Console(width=100)
# Valid Rich rainbow colors
@@ -5443,6 +5444,8 @@ Come to love it when others take what you share, as there is no greater joy
if pipeline_ctx_ref:
pipeline_ctx_ref.clear_current_command_text()
stop_zerotier_server()
if __name__ == "__main__":
MedeiaCLI().run()

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import os
import sys
import subprocess
import atexit
from pathlib import Path
from typing import Optional
@@ -12,6 +13,11 @@ from SYS.logger import debug, log
_zt_server_proc: Optional[subprocess.Popen] = None
_zt_server_last_config: Optional[str] = None
# We no longer use atexit here because explicit lifecycle management
# is preferred in TUI/REPL, and background servers use a monitor thread
# to shut down when the parent dies.
# atexit.register(lambda: stop_zerotier_server())
def ensure_zerotier_server_running() -> None:
"""Check config and ensure the ZeroTier storage server is running if needed."""
global _zt_server_proc, _zt_server_last_config
@@ -92,7 +98,8 @@ def ensure_zerotier_server_running() -> None:
cmd = [python_exe, str(server_script),
"--storage-path", str(storage_path),
"--port", str(port)]
"--port", str(port),
"--monitor"]
if api_key:
cmd += ["--api-key", str(api_key)]

View File

@@ -191,6 +191,8 @@ class ZeroTier(Store):
try:
import httpx
resp = httpx.request(method, url, params=params, json=json_body, headers=headers, timeout=timeout or self._timeout)
if resp.status_code == 401:
log(f"[Store={self._name}] Remote service at {url} requires an API Key. Please configure 'API_KEY' for this store.", severity="warning")
resp.raise_for_status()
try:
return resp.json()

View File

@@ -131,7 +131,7 @@ class ConfigModal(ModalScreen):
yield Label("Categories", classes="config-label")
with ListView(id="category-list"):
yield ListItem(Label("Global Settings"), id="cat-globals")
yield ListItem(Label("Networking"), id="cat-networking")
yield ListItem(Label("Connectors"), id="cat-networking")
yield ListItem(Label("Stores"), id="cat-stores")
yield ListItem(Label("Providers"), id="cat-providers")
@@ -277,10 +277,10 @@ class ConfigModal(ModalScreen):
container.mount(Static(f"Error listing ZeroTier networks: {exc}"))
container.mount(Rule())
container.mount(Label("Networking Services", classes="config-label"))
container.mount(Label("Connectors", classes="config-label"))
net = self.config_data.get("networking", {})
if not net:
container.mount(Static("No networking services configured."))
container.mount(Static("No connectors configured."))
else:
idx = 0
for ntype, conf in net.items():
@@ -290,8 +290,21 @@ class ConfigModal(ModalScreen):
self._button_id_map[del_id] = ("del", "networking", ntype)
idx += 1
label = ntype
if ntype == "zerotier":
serve = conf.get("serve", "Unknown")
net_id = conf.get("network_id", "Unknown")
net_name = net_id
try:
for ln in local_nets:
if ln.id == net_id:
net_name = ln.name
break
except Exception: pass
label = f"{serve} ---> {net_name}"
row = Horizontal(
Static(ntype, classes="item-label"),
Static(label, classes="item-label"),
Button("Edit", id=edit_id),
Button("Delete", variant="error", id=del_id),
classes="item-row"
@@ -395,9 +408,23 @@ class ConfigModal(ModalScreen):
# Fetch Networking schema
if item_type == "networking":
if item_name == "zerotier":
from API import zerotier as zt
local_net_choices = []
try:
for n in zt.list_networks():
local_net_choices.append((f"{n.name} ({n.id})", n.id))
except Exception: pass
local_store_choices = []
for s_type, s_data in self.config_data.get("store", {}).items():
for s_name in s_data.keys():
local_store_choices.append(s_name)
schema = [
{"key": "api_key", "label": "ZeroTier Central API Token", "default": "", "secret": True},
{"key": "network_id", "label": "Network ID to Join", "default": ""},
{"key": "network_id", "label": "Network to Share on", "choices": local_net_choices},
{"key": "serve", "label": "Local Store to Share", "choices": local_store_choices},
{"key": "port", "label": "Port", "default": "999"},
{"key": "api_key", "label": "Access Key (API Key)", "default": "", "secret": True},
]
for f in schema:
provider_schema_map[f["key"].upper()] = f
@@ -440,10 +467,19 @@ class ConfigModal(ModalScreen):
if choices:
# Select takes a list of (label, value) tuples
select_options = [(str(c), str(c)) for c in choices]
select_options = []
choice_values = []
for c in choices:
if isinstance(c, tuple) and len(c) == 2:
select_options.append((str(c[0]), str(c[1])))
choice_values.append(str(c[1]))
else:
select_options.append((str(c), str(c)))
choice_values.append(str(c))
# If current value not in choices, add it or stay blank
current_val = str(v)
if current_val not in [str(c) for c in choices]:
if current_val not in choice_values:
select_options.insert(0, (current_val, current_val))
sel = Select(select_options, value=current_val, id=inp_id)
@@ -475,7 +511,19 @@ class ConfigModal(ModalScreen):
self._input_id_map[inp_id] = key
if choices:
select_options = [(str(c), str(c)) for c in choices]
select_options = []
choice_values = []
for c in choices:
if isinstance(c, tuple) and len(c) == 2:
select_options.append((str(c[0]), str(c[1])))
choice_values.append(str(c[1]))
else:
select_options.append((str(c), str(c)))
choice_values.append(str(c))
if default_val not in choice_values:
select_options.insert(0, (default_val, default_val))
sel = Select(select_options, value=default_val, id=inp_id)
container.mount(sel)
else:
@@ -778,7 +826,12 @@ class ConfigModal(ModalScreen):
peer_name = "Unnamed Peer"
if isinstance(p.payload, dict):
peer_name = p.payload.get("name") or p.payload.get("NAME") or peer_name
peer_options.append(f"{p.address} ({peer_name})")
status_label = ""
if p.status_code == 401:
status_label = " [Locked/401]"
peer_options.append(f"{p.address} ({peer_name}){status_label}")
def on_peer_selected(peer_choice: str):
if not peer_choice: return
@@ -796,9 +849,12 @@ class ConfigModal(ModalScreen):
"PORT": "999",
"SERVICE": "remote"
}
if match and match.service_hint == "hydrus":
if match:
if match.service_hint == "hydrus":
new_config["SERVICE"] = "hydrus"
new_config["PORT"] = "45869"
if match.status_code == 401:
self.notify("This peer requires an API Key. Please enter it in the settings panel.", severity="warning")
store_cfg[new_name] = new_config

View File

@@ -1,151 +1,153 @@
from __future__ import annotations
import os
from typing import Any, Dict, List, Optional, Sequence, Tuple
import sys
import requests
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence
# Add project root to sys.path
root = Path(__file__).resolve().parent.parent
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from cmdlet._shared import Cmdlet, CmdletArg
from SYS.result_table import ResultTable
from SYS.config import load_config
from SYS.result_table import ResultTable
from API import zerotier as zt
def exec(pipe: Sequence[Any], args: Sequence[str], options: Dict[str, Any]) -> ResultTable:
table = ResultTable(title="ZeroTier Status", max_columns=10)
def exec_zerotier(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
# Use provided config or fall back to CWD load
cfg = config if config else load_config(Path.cwd())
cfg = load_config()
# 1. Local Node Status
node_id = "unknown"
try:
# Best effort to get local node ID
st = zt._run_cli_json("status")
if isinstance(st, dict):
node_id = st.get("address") or node_id
except: pass
# 2. Hosting Status
zt_net = cfg.get("networking", {}).get("zerotier", {})
serve_target = zt_net.get("serve")
if serve_target:
port = zt_net.get("port") or 999
net_id = zt_net.get("network_id") or "all"
status = "OFFLINE"
detail = ""
# Try to find the local ZT address for this network
zt_addrs = []
if net_id and net_id != "all":
zt_addrs = zt.get_assigned_addresses(net_id)
# We probe localhost for hosting status, but show ZT IP in the table
display_addr = zt_addrs[0] if zt_addrs else "localhost"
# Try probes
# Using 127.0.0.1 is often more reliable than 'localhost' on Windows
probe_targets = [f"http://127.0.0.1:{port}/health"]
if zt_addrs:
probe_targets.insert(0, f"http://{zt_addrs[0]}:{port}/health")
from API.HTTP import HTTPClient
with HTTPClient(timeout=1.0, retries=0) as client:
for url in probe_targets:
try:
resp = client.get(url)
if resp.status_code == 200:
status = "ONLINE"
payload = resp.json()
detail = f"Serving {payload.get('name') or serve_target}"
break
else:
status = f"HTTP {resp.status_code}"
except Exception as exc:
if not detail: # Keep the first failure reason if all fail
detail = f"Probe failed: {exc}"
if status == "OFFLINE" and not zt_addrs:
detail = "No ZeroTier IP assigned yet. Check 'zerotier-cli listnetworks'."
table = ResultTable("ZeroTier Status")
# 1. Local Hub Status
row = table.add_row()
row.add_column("TYPE", "HOST")
row.add_column("NAME", serve_target)
row.add_column("ID", net_id)
row.add_column("ADDRESS", f"{display_addr}:{port}")
row.add_column("STATUS", status)
row.add_column("DETAIL", detail)
row.add_column("NAME", "localhost")
# 3. Connections (Remote Stores)
zt_stores = cfg.get("store", {}).get("zerotier", {})
if zt_stores:
for name, sconf in zt_stores.items():
net_id = sconf.get("NETWORK_ID") or sconf.get("network_id") or ""
host = sconf.get("HOST") or sconf.get("host") or ""
port = sconf.get("PORT") or sconf.get("port") or 999
svc = sconf.get("SERVICE") or sconf.get("service") or "remote"
status = "probing..."
detail = ""
if not host:
status = "MISCONFIGURED"
detail = "No host IP"
else:
# Try to get node ID via CLI info
node_id = "???"
try:
from API.HTTP import HTTPClient
with HTTPClient(timeout=2.0) as client:
# Paths depend on service type
path = "/api_version" if svc == "hydrus" else "/health"
resp = client.get(f"http://{host}:{port}{path}")
if hasattr(zt, "_run_cli_json"):
info = zt._run_cli_json("info", "-j")
node_id = info.get("address", "???")
except:
pass
row.add_column("ID", node_id)
# Check if local server is responsive
try:
# endpoint is /health for remote_storage_server
# We try 127.0.0.1 first with a more generous timeout
# Using a list of potential local hits to be robust against Windows networking quirks
import socket
def get_local_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return None
hosts = ["127.0.0.1", "localhost"]
local_ip = get_local_ip()
if local_ip:
hosts.append(local_ip)
success = False
last_err = ""
# Try multiple times if server just started
import time
for attempt in range(3):
for host in hosts:
try:
resp = requests.get(f"http://{host}:999/health", timeout=3, proxies={"http": None, "https": None})
if resp.status_code == 200:
status = "ONLINE"
if svc == "remote":
p = resp.json()
detail = f"Remote store: {p.get('name', 'unknown')}"
else:
detail = "Hydrus API"
else:
status = f"HTTP {resp.status_code}"
except Exception as exc:
status = "OFFLINE"
detail = str(exc)
row.add_column("STATUS", "ONLINE")
row.add_column("ADDRESS", f"{host}:999")
row.add_column("DETAIL", f"Serving {cfg.get('active_store', 'default')}")
success = True
break
elif resp.status_code == 401:
row.add_column("STATUS", "Serving (Locked)")
row.add_column("ADDRESS", f"{host}:999")
row.add_column("DETAIL", "401 Unauthorized - API Key required")
success = True
break
except Exception as e:
last_err = str(e)
continue
if success:
break
time.sleep(1) # Wait between attempts
row = table.add_row()
row.add_column("TYPE", "REMOTE")
row.add_column("NAME", name)
row.add_column("ID", net_id)
row.add_column("ADDRESS", f"{host}:{port}")
row.add_column("STATUS", status)
row.add_column("DETAIL", detail)
if not success:
row.add_column("STATUS", "OFFLINE")
row.add_column("ADDRESS", "127.0.0.1:999")
row.add_column("DETAIL", f"Server not responding on port 999. Last attempt ({hosts[-1]}): {last_err}")
# 4. Networking Networks (Raw ZT status)
except Exception as e:
row.add_column("STATUS", "OFFLINE")
row.add_column("ADDRESS", "127.0.0.1:999")
row.add_column("DETAIL", f"Status check failed: {e}")
# 2. Add Networks
if zt.is_available():
try:
nets = zt.list_networks()
if not nets:
row = table.add_row()
row.add_column("TYPE", "INFO")
row.add_column("NAME", "ZeroTier CLI")
row.add_column("STATUS", "No networks found or CLI error")
row.add_column("DETAIL", f"CLI Path: {zt._get_cli_path() or 'Not found'}")
for n in nets:
networks = zt.list_networks()
for net in networks:
row = table.add_row()
row.add_column("TYPE", "NETWORK")
row.add_column("NAME", n.name)
row.add_column("ID", n.id)
row.add_column("ADDRESS", ", ".join(n.assigned_addresses))
row.add_column("STATUS", n.status)
row.add_column("DETAIL", "")
except Exception as exc:
row.add_column("NAME", getattr(net, "name", "Unnamed"))
row.add_column("ID", getattr(net, "id", ""))
status = getattr(net, "status", "OK")
assigned = getattr(net, "assigned_addresses", [])
ip_str = assigned[0] if assigned else ""
row.add_column("STATUS", status)
row.add_column("ADDRESS", ip_str)
except Exception as e:
row = table.add_row()
row.add_column("TYPE", "ERROR")
row.add_column("NAME", "ZeroTier CLI")
row.add_column("STATUS", "EXCEPTION")
row.add_column("DETAIL", str(exc))
row.add_column("DETAIL", f"Failed to list networks: {e}")
else:
row = table.add_row()
row.add_column("TYPE", "SYSTEM")
row.add_column("NAME", "ZeroTier")
row.add_column("STATUS", "NOT FOUND")
row.add_column("DETAIL", "zerotier-cli not in path")
return table
# Output
try:
from cmdnat.out_table import TableOutput
TableOutput().render(table)
except Exception:
# Fallback for raw CLI
print(f"\n--- {table.title} ---")
for r in table.rows:
# Use the get_column method from ResultRow
t = r.get_column("TYPE") or ""
n = r.get_column("NAME") or ""
s = r.get_column("STATUS") or ""
a = r.get_column("ADDRESS") or ""
id = r.get_column("ID") or ""
d = r.get_column("DETAIL") or ""
print(f"[{t:7}] {n:15} | {s:15} | {a:20} | {id} | {d}")
print("-" * 100)
return 0
CMDLET = Cmdlet(
name=".zerotier",
summary="Check ZeroTier hosting and connection status",
summary="Check ZeroTier node and hosting status",
usage=".zerotier",
exec=exec
exec=exec_zerotier,
)
if __name__ == "__main__":
exec_zerotier(None, sys.argv[1:], {})

View File

@@ -44,6 +44,8 @@ import sys
import json
import argparse
import logging
import threading
import time
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime
@@ -81,6 +83,46 @@ except ImportError:
# ============================================================================
def monitor_parent(parent_pid: int):
"""Monitor the parent process and shut down if it dies."""
if parent_pid <= 1:
return
logger.info(f"Monitoring parent process {parent_pid}")
# On Windows, we might need a different approach if os.kill(pid, 0) is unreliable
is_windows = sys.platform == "win32"
while True:
try:
if is_windows:
# OpenProcess with PROCESS_QUERY_LIMITED_INFORMATION (0x1000)
# This is safer than os.kill on Windows for existence checks
import ctypes
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
handle = ctypes.windll.kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, parent_pid)
if handle:
exit_code = ctypes.c_ulong()
ctypes.windll.kernel32.GetExitCodeProcess(handle, ctypes.byref(exit_code))
ctypes.windll.kernel32.CloseHandle(handle)
# STILL_ACTIVE is 259
if exit_code.value != 259:
logger.info(f"Parent process {parent_pid} finished with code {exit_code.value}. Shutting down...")
os._exit(0)
else:
# On Windows, sometimes we lose access to the handle if the parent is transitioning
# or if it was started from a shell that already closed.
# We'll ignore handle failures for now unless we want to be very strict.
pass
else:
os.kill(parent_pid, 0)
except Exception as e:
# Parent is dead or inaccessible
logger.info(f"Parent process {parent_pid} no longer accessible: {e}. Shutting down server...")
os._exit(0)
time.sleep(5) # Increase check interval to be less aggressive
def get_local_ip() -> Optional[str]:
"""Get the local IP address that would be used for external connections."""
import socket
@@ -594,9 +636,25 @@ def main():
help="API key for authentication (optional)"
)
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
parser.add_argument(
"--monitor",
action="store_true",
help="Shut down if parent process dies"
)
args = parser.parse_args()
# Start monitor thread if requested
if args.monitor:
ppid = os.getppid()
if ppid > 1:
monitor_thread = threading.Thread(
target=monitor_parent,
args=(ppid, ),
daemon=True
)
monitor_thread.start()
global STORAGE_PATH, API_KEY
STORAGE_PATH = Path(args.storage_path).resolve()
API_KEY = args.api_key