This commit is contained in:
2026-01-30 10:47:47 -08:00
parent a44b80fd1d
commit ab94c57244
5 changed files with 872 additions and 99 deletions

View File

@@ -5,21 +5,37 @@ from __future__ import annotations
import json
import sqlite3
import time
import os
import traceback
import datetime
import sys
import getpass
import hashlib
from copy import deepcopy
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from SYS.logger import log
from SYS.utils import expand_path
from SYS.database import db, get_config_all, save_config_value
from SYS.database import db, get_config_all, save_config_value, rows_to_config
SCRIPT_DIR = Path(__file__).resolve().parent
# Save lock settings (cross-process)
_SAVE_LOCK_DIRNAME = ".medios_save_lock"
_SAVE_LOCK_TIMEOUT = 30.0 # seconds to wait for save lock
_SAVE_LOCK_STALE_SECONDS = 3600 # consider lock stale after 1 hour
_CONFIG_CACHE: Dict[str, Any] = {}
_LAST_SAVED_CONFIG: Dict[str, Any] = {}
_CONFIG_SAVE_MAX_RETRIES = 5
_CONFIG_SAVE_RETRY_DELAY = 0.15
class ConfigSaveConflict(Exception):
"""Raised when a save would overwrite external changes present on disk."""
pass
def global_config() -> List[Dict[str, Any]]:
"""Return configuration schema for global settings."""
return [
@@ -455,6 +471,70 @@ def load_config() -> Dict[str, Any]:
_sync_alldebrid_api_key(db_config)
_CONFIG_CACHE = db_config
_LAST_SAVED_CONFIG = deepcopy(db_config)
try:
# Log a compact summary to help detect startup overwrites/mismatches
provs = list(db_config.get("provider", {}).keys()) if isinstance(db_config.get("provider"), dict) else []
stores = list(db_config.get("store", {}).keys()) if isinstance(db_config.get("store"), dict) else []
mtime = None
try:
mtime = datetime.datetime.utcfromtimestamp(db.db_path.stat().st_mtime).isoformat() + "Z"
except Exception:
mtime = None
summary = (
f"Loaded config from {db.db_path.name}: providers={len(provs)} ({', '.join(provs[:10])}{'...' if len(provs)>10 else ''}), "
f"stores={len(stores)} ({', '.join(stores[:10])}{'...' if len(stores)>10 else ''}), mtime={mtime}"
)
log(summary)
# Try to detect if the most recent audit indicates we previously saved items
# that are no longer present in the loaded config (possible overwrite/restore)
try:
audit_path = Path(db.db_path).with_name("config_audit.log")
if audit_path.exists():
last_line = None
with audit_path.open("r", encoding="utf-8") as fh:
for line in fh:
if line and line.strip():
last_line = line
if last_line:
try:
last_entry = json.loads(last_line)
last_provs = set(last_entry.get("providers") or [])
current_provs = set(provs)
missing = sorted(list(last_provs - current_provs))
if missing:
log(
f"WARNING: Config mismatch on load - last saved providers {sorted(list(last_provs))} "
f"are missing from loaded config: {missing} (last saved {last_entry.get('dt')})"
)
try:
# Write a forensic mismatch record to help diagnose potential overwrites
mismatch_path = Path(db.db_path).with_name("config_mismatch.log")
record = {
"detected": datetime.datetime.utcnow().isoformat() + "Z",
"db": str(db.db_path),
"db_mtime": mtime,
"last_saved_dt": last_entry.get("dt"),
"last_saved_providers": sorted(list(last_provs)),
"missing": missing,
}
try:
backup_dir = Path(db.db_path).with_name("config_backups")
if backup_dir.exists():
files = sorted(backup_dir.glob("medios-backup-*.db"), key=lambda p: p.stat().st_mtime, reverse=True)
record["latest_backup"] = str(files[0]) if files else None
except Exception:
pass
with mismatch_path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record) + "\n")
except Exception:
pass
except Exception:
pass
except Exception:
pass
except Exception:
pass
return db_config
_LAST_SAVED_CONFIG = {}
@@ -466,64 +546,320 @@ def reload_config() -> Dict[str, Any]:
return load_config()
def _acquire_save_lock(timeout: float = _SAVE_LOCK_TIMEOUT):
"""Acquire a cross-process save lock implemented as a directory.
Returns the Path to the created lock directory. Raises ConfigSaveConflict
if the lock cannot be acquired within the timeout.
"""
lock_dir = Path(db.db_path).with_name(_SAVE_LOCK_DIRNAME)
start = time.time()
while True:
try:
lock_dir.mkdir(exist_ok=False)
# Write owner metadata for diagnostics
try:
(lock_dir / "owner.json").write_text(json.dumps({
"pid": os.getpid(),
"ts": time.time(),
"cmdline": " ".join(sys.argv),
}))
except Exception:
pass
return lock_dir
except FileExistsError:
# Check for stale lock
try:
owner = lock_dir / "owner.json"
if owner.exists():
data = json.loads(owner.read_text())
ts = data.get("ts") or 0
if time.time() - ts > _SAVE_LOCK_STALE_SECONDS:
try:
import shutil
shutil.rmtree(lock_dir)
continue
except Exception:
pass
else:
# No owner file; if directory is old enough consider it stale
try:
if time.time() - lock_dir.stat().st_mtime > _SAVE_LOCK_STALE_SECONDS:
import shutil
shutil.rmtree(lock_dir)
continue
except Exception:
pass
except Exception:
pass
if time.time() - start > timeout:
raise ConfigSaveConflict("Save lock busy; could not acquire in time")
time.sleep(0.1)
def _release_save_lock(lock_dir: Path) -> None:
try:
owner = lock_dir / "owner.json"
try:
if owner.exists():
owner.unlink()
except Exception:
pass
lock_dir.rmdir()
except Exception:
pass
def save_config(config: Dict[str, Any]) -> int:
global _CONFIG_CACHE, _LAST_SAVED_CONFIG
_sync_alldebrid_api_key(config)
# Acquire cross-process save lock to avoid concurrent saves from different
# processes which can lead to race conditions and DB-level overwrite.
lock_dir = None
try:
lock_dir = _acquire_save_lock()
except ConfigSaveConflict:
# Surface a clear exception to callers so they can retry or handle it.
raise
previous_config = deepcopy(_LAST_SAVED_CONFIG)
changed_count = _count_changed_entries(previous_config, config)
def _write_entries() -> int:
global _CONFIG_CACHE, _LAST_SAVED_CONFIG
count = 0
with db.transaction():
db.execute("DELETE FROM config")
# Use the transaction-provided connection directly to avoid re-acquiring
# the connection lock via db.* helpers which can lead to deadlock.
with db.transaction() as conn:
# Detect concurrent changes by reading the current DB state inside the
# same transaction before mutating it. Use the transaction connection
# directly to avoid acquiring the connection lock again (deadlock).
try:
cur = conn.cursor()
cur.execute("SELECT category, subtype, item_name, key, value FROM config")
rows = cur.fetchall()
current_disk = rows_to_config(rows)
cur.close()
except Exception:
current_disk = {}
if current_disk != _LAST_SAVED_CONFIG:
# If we have no local changes, refresh caches and skip the write.
if changed_count == 0:
log("Skip save: disk configuration changed since last load and no local changes; not writing to DB.")
# Refresh local caches to match the disk
_CONFIG_CACHE = current_disk
_LAST_SAVED_CONFIG = deepcopy(current_disk)
return 0
# Otherwise, abort to avoid overwriting external changes
raise ConfigSaveConflict(
"Configuration on disk changed since you started editing; save aborted to prevent overwrite. Reload and reapply your changes."
)
# Proceed with writing when no conflicting external changes detected
conn.execute("DELETE FROM config")
for key, value in config.items():
if key in ('store', 'provider', 'tool'):
if isinstance(value, dict):
for subtype, instances in value.items():
if not isinstance(instances, dict):
continue
if key == 'store':
for name, settings in instances.items():
if isinstance(settings, dict):
for k, v in settings.items():
save_config_value(key, subtype, name, k, v)
count += 1
else:
normalized_subtype = subtype
if key == 'provider':
normalized_subtype = _normalize_provider_name(subtype)
if not normalized_subtype:
continue
for k, v in instances.items():
save_config_value(key, normalized_subtype, "default", k, v)
count += 1
if key in ('store', 'provider', 'tool') and isinstance(value, dict):
for subtype, instances in value.items():
if not isinstance(instances, dict):
continue
if key == 'store':
for name, settings in instances.items():
if isinstance(settings, dict):
for k, v in settings.items():
val_str = json.dumps(v) if not isinstance(v, str) else v
conn.execute(
"INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)",
(key, subtype, name, k, val_str),
)
count += 1
else:
normalized_subtype = subtype
if key == 'provider':
normalized_subtype = _normalize_provider_name(subtype)
if not normalized_subtype:
continue
for k, v in instances.items():
val_str = json.dumps(v) if not isinstance(v, str) else v
conn.execute(
"INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)",
(key, normalized_subtype, "default", k, val_str),
)
count += 1
else:
if not key.startswith("_") and value is not None:
save_config_value("global", "none", "none", key, value)
val_str = json.dumps(value) if not isinstance(value, str) else value
conn.execute(
"INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)",
("global", "none", "none", key, val_str),
)
count += 1
return count
saved_entries = 0
attempts = 0
while True:
try:
saved_entries = _write_entries()
# Central log entry
log(
f"Synced {saved_entries} entries to {db.db_path} "
f"({changed_count} changed entries)"
)
# Try to checkpoint WAL to ensure main DB file reflects latest state.
# Use a separate short-lived connection to perform the checkpoint so
# we don't contend with our main connection lock or active transactions.
try:
try:
with sqlite3.connect(str(db.db_path), timeout=5.0) as _con:
_con.execute("PRAGMA wal_checkpoint(TRUNCATE)")
except Exception:
with sqlite3.connect(str(db.db_path), timeout=5.0) as _con:
_con.execute("PRAGMA wal_checkpoint")
except Exception as exc:
log(f"Warning: WAL checkpoint failed: {exc}")
# Audit to disk so we can correlate saves across restarts and processes.
# Audit to disk so we can correlate saves across restarts and processes.
try:
audit_path = Path(db.db_path).with_name("config_audit.log")
# Gather non-secret summary info (provider/store names)
provider_names = []
store_names = []
try:
pblock = config.get("provider")
if isinstance(pblock, dict):
provider_names = [str(k) for k in pblock.keys()]
except Exception:
provider_names = []
try:
sblock = config.get("store")
if isinstance(sblock, dict):
store_names = [str(k) for k in sblock.keys()]
except Exception:
store_names = []
stack = traceback.format_stack()
caller = stack[-1].strip() if stack else ""
# Try to include the database file modification time for correlation
db_mtime = None
try:
db_mtime = datetime.datetime.utcfromtimestamp(db.db_path.stat().st_mtime).isoformat() + "Z"
except Exception:
db_mtime = None
# Create a consistent timestamped backup of the DB so we can recover later
backup_path = None
try:
backup_dir = Path(db.db_path).with_name("config_backups")
backup_dir.mkdir(parents=False, exist_ok=True)
ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
candidate = backup_dir / f"medios-backup-{ts}.db"
try:
# Use sqlite backup API for a consistent copy
src_con = sqlite3.connect(str(db.db_path))
dest_con = sqlite3.connect(str(candidate))
src_con.backup(dest_con)
dest_con.close()
src_con.close()
backup_path = str(candidate)
except Exception as e:
log(f"Warning: Failed to create DB backup: {e}")
# Prune older backups (keep last 20)
try:
files = sorted(backup_dir.glob("medios-backup-*.db"), key=lambda p: p.stat().st_mtime, reverse=True)
for old in files[20:]:
try:
old.unlink()
except Exception:
pass
except Exception:
pass
except Exception:
backup_path = None
# Collect process/exec info and a short hash of the config for forensic tracing
try:
exe = sys.executable
argv = list(sys.argv)
cwd = os.getcwd()
user = getpass.getuser()
try:
cfg_hash = hashlib.md5(json.dumps(config, sort_keys=True).encode('utf-8')).hexdigest()
except Exception:
cfg_hash = None
except Exception:
exe = None
argv = None
cwd = None
user = None
cfg_hash = None
entry = {
"ts": time.time(),
"dt": datetime.datetime.utcnow().isoformat() + "Z",
"pid": os.getpid(),
"exe": exe,
"argv": argv,
"cwd": cwd,
"user": user,
"stack": "".join(stack[-20:]),
"caller": caller,
"config_hash": cfg_hash,
"saved_entries": saved_entries,
"changed_count": changed_count,
"db": str(db.db_path),
"db_mtime": db_mtime,
"backup": backup_path,
"providers": provider_names,
"stores": store_names,
}
try:
with audit_path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(entry) + "\n")
except Exception:
# Best-effort; don't fail the save if audit write fails
log("Warning: Failed to write config audit file")
except Exception:
pass
finally:
# Release the save lock we acquired earlier
try:
if lock_dir is not None and lock_dir.exists():
_release_save_lock(lock_dir)
except Exception:
pass
break
except sqlite3.OperationalError as exc:
attempts += 1
locked_error = "locked" in str(exc).lower()
if not locked_error or attempts >= _CONFIG_SAVE_MAX_RETRIES:
log(f"CRITICAL: Database write failed: {exc}")
# Ensure we release potential save lock before bubbling error
try:
if lock_dir is not None and lock_dir.exists():
_release_save_lock(lock_dir)
except Exception:
pass
raise
delay = _CONFIG_SAVE_RETRY_DELAY * attempts
log(f"Database locked; retry {attempts}/{_CONFIG_SAVE_MAX_RETRIES} in {delay:.2f}s")
time.sleep(delay)
except Exception as exc:
log(f"CRITICAL: Configuration save failed: {exc}")
try:
if lock_dir is not None and lock_dir.exists():
_release_save_lock(lock_dir)
except Exception:
pass
raise
clear_config_cache()
@@ -540,3 +876,12 @@ def load() -> Dict[str, Any]:
def save(config: Dict[str, Any]) -> int:
"""Persist *config* back to disk."""
return save_config(config)
def count_changed_entries(config: Dict[str, Any]) -> int:
"""Return the number of changed configuration entries compared to the last saved snapshot.
This is useful for user-facing messages that want to indicate how many entries
were actually modified, not the total number of rows persisted to the database.
"""
return _count_changed_entries(_LAST_SAVED_CONFIG, config)

View File

@@ -8,8 +8,14 @@ from queue import Queue
from pathlib import Path
from typing import Any, Dict, List, Optional
from contextlib import contextmanager
import time
import datetime
from SYS.logger import log
# DB execute retry settings (for transient 'database is locked' errors)
_DB_EXEC_RETRY_MAX = 5
_DB_EXEC_RETRY_BASE_DELAY = 0.05
# The database is located in the project root (prefer explicit repo hints).
def _resolve_root_dir() -> Path:
env_root = (
@@ -65,9 +71,14 @@ class Database:
timeout=30.0 # Increase timeout to 30s to avoid locking issues
)
self.conn.row_factory = sqlite3.Row
# Reentrant lock to allow nested DB calls within the same thread (e.g., transaction ->
# get_config_all / save_config_value) without deadlocking.
self._conn_lock = threading.RLock()
# Use WAL mode for better concurrency (allows multiple readers + 1 writer)
# Set a busy timeout so SQLite waits for short locks rather than immediately failing
try:
self.conn.execute("PRAGMA busy_timeout = 30000")
self.conn.execute("PRAGMA journal_mode=WAL")
self.conn.execute("PRAGMA synchronous=NORMAL")
except sqlite3.Error:
@@ -139,61 +150,129 @@ class Database:
def get_connection(self):
return self.conn
def execute(self, query: str, params: tuple = ()):
cursor = self.conn.cursor()
try:
cursor.execute(query, params)
if not self.conn.in_transaction:
self.conn.commit()
return cursor
except Exception:
if not self.conn.in_transaction:
self.conn.rollback()
raise
def execute(self, query: str, params: tuple = ()):
attempts = 0
while True:
# Serialize access to the underlying sqlite connection to avoid
# concurrent use from multiple threads which can trigger locks.
with self._conn_lock:
cursor = self.conn.cursor()
try:
cursor.execute(query, params)
if not self.conn.in_transaction:
self.conn.commit()
return cursor
except sqlite3.OperationalError as exc:
msg = str(exc).lower()
# Retry a few times on transient lock errors
if 'locked' in msg and attempts < _DB_EXEC_RETRY_MAX:
attempts += 1
delay = _DB_EXEC_RETRY_BASE_DELAY * attempts
log(f"Database locked on execute; retry {attempts}/{_DB_EXEC_RETRY_MAX} in {delay:.2f}s")
try:
if not self.conn.in_transaction:
self.conn.rollback()
except Exception:
pass
time.sleep(delay)
continue
# Not recoverable or out of retries
if not self.conn.in_transaction:
try:
self.conn.rollback()
except Exception:
pass
raise
except Exception:
if not self.conn.in_transaction:
try:
self.conn.rollback()
except Exception:
pass
raise
def executemany(self, query: str, param_list: List[tuple]):
cursor = self.conn.cursor()
try:
cursor.executemany(query, param_list)
if not self.conn.in_transaction:
self.conn.commit()
return cursor
except Exception:
if not self.conn.in_transaction:
self.conn.rollback()
raise
attempts = 0
while True:
with self._conn_lock:
cursor = self.conn.cursor()
try:
cursor.executemany(query, param_list)
if not self.conn.in_transaction:
self.conn.commit()
return cursor
except sqlite3.OperationalError as exc:
msg = str(exc).lower()
if 'locked' in msg and attempts < _DB_EXEC_RETRY_MAX:
attempts += 1
delay = _DB_EXEC_RETRY_BASE_DELAY * attempts
log(f"Database locked on executemany; retry {attempts}/{_DB_EXEC_RETRY_MAX} in {delay:.2f}s")
try:
if not self.conn.in_transaction:
self.conn.rollback()
except Exception:
pass
time.sleep(delay)
continue
if not self.conn.in_transaction:
try:
self.conn.rollback()
except Exception:
pass
raise
except Exception:
if not self.conn.in_transaction:
try:
self.conn.rollback()
except Exception:
pass
raise
@contextmanager
def transaction(self):
"""Context manager for a database transaction."""
"""Context manager for a database transaction.
Transactions acquire the connection lock for the duration of the transaction
to prevent other threads from performing concurrent operations on the
same sqlite connection which can lead to locking issues.
"""
if self.conn.in_transaction:
# Already in a transaction, just yield
yield self.conn
else:
# Hold the connection lock for the lifetime of the transaction
self._conn_lock.acquire()
try:
self.conn.execute("BEGIN")
yield self.conn
self.conn.commit()
except Exception:
self.conn.rollback()
raise
try:
yield self.conn
self.conn.commit()
except Exception:
self.conn.rollback()
raise
finally:
try:
self._conn_lock.release()
except Exception:
pass
def fetchall(self, query: str, params: tuple = ()):
cursor = self.conn.cursor()
try:
cursor.execute(query, params)
return cursor.fetchall()
finally:
cursor.close()
def fetchone(self, query: str, params: tuple = ()):
cursor = self.conn.cursor()
try:
cursor.execute(query, params)
return cursor.fetchone()
finally:
cursor.close()
with self._conn_lock:
cursor = self.conn.cursor()
try:
cursor.execute(query, params)
return cursor.fetchall()
finally:
cursor.close()
def fetchone(self, query: str, params: tuple = ()):
with self._conn_lock:
cursor = self.conn.cursor()
try:
cursor.execute(query, params)
return cursor.fetchone()
finally:
cursor.close()
# Singleton instance
db = Database()
@@ -203,15 +282,50 @@ _LOG_THREAD_LOCK = threading.Lock()
def _log_worker_loop() -> None:
"""Background log writer using a temporary per-write connection with
small retry/backoff and a file fallback when writes fail repeatedly.
"""
while True:
level, module, message = _LOG_QUEUE.get()
try:
db.execute(
"INSERT INTO logs (level, module, message) VALUES (?, ?, ?)",
(level, module, message)
)
except Exception:
pass
attempts = 0
written = False
while attempts < 3 and not written:
try:
# Create a short-lived connection for the logging write so the
# logging thread does not contend with the main connection lock.
conn = sqlite3.connect(str(db.db_path), timeout=5.0)
cur = conn.cursor()
cur.execute("INSERT INTO logs (level, module, message) VALUES (?, ?, ?)", (level, module, message))
conn.commit()
cur.close()
conn.close()
written = True
except sqlite3.OperationalError as exc:
attempts += 1
if 'locked' in str(exc).lower():
time.sleep(0.05 * attempts)
continue
# Non-lock operational errors: abort attempts
log(f"Warning: Failed to write log entry (operational): {exc}")
break
except Exception as exc:
log(f"Warning: Failed to write log entry: {exc}")
break
if not written:
# Fallback to a file-based log so we never lose the message silently
try:
fallback_dir = Path(db.db_path).with_name("logs")
fallback_dir.mkdir(parents=True, exist_ok=True)
fallback_file = fallback_dir / "log_fallback.txt"
with fallback_file.open("a", encoding="utf-8") as fh:
fh.write(f"{datetime.datetime.utcnow().isoformat()}Z [{level}] {module}: {message}\n")
except Exception:
# Last resort: print to stderr
try:
log(f"ERROR: Could not persist log message: {level} {module} {message}")
except Exception:
pass
finally:
try:
_LOG_QUEUE.task_done()
@@ -261,11 +375,14 @@ def save_config_value(category: str, subtype: str, item_name: str, key: str, val
(category, subtype, item_name, key, val_str)
)
def get_config_all() -> Dict[str, Any]:
"""Retrieve all configuration from the database in the legacy dict format."""
rows = db.fetchall("SELECT category, subtype, item_name, key, value FROM config")
def rows_to_config(rows) -> Dict[str, Any]:
"""Convert DB rows (category, subtype, item_name, key, value) into a config dict.
This central helper is used by `get_config_all` and callers that need to
parse rows fetched with a transaction connection to avoid nested lock
acquisitions.
"""
config: Dict[str, Any] = {}
for row in rows:
cat = row['category']
sub = row['subtype']
@@ -276,13 +393,33 @@ def get_config_all() -> Dict[str, Any]:
# Drop legacy folder store entries (folder store is removed).
if cat == 'store' and str(sub).strip().lower() == 'folder':
continue
# Try to parse JSON value, fallback to string
# Conservative JSON parsing: only attempt to decode when the value
# looks like JSON (object/array/quoted string/true/false/null/number).
parsed_val = val
try:
parsed_val = json.loads(val)
if isinstance(val, str):
s = val.strip()
if s == "":
parsed_val = ""
else:
first = s[0]
lowered = s.lower()
if first in ('{', '[', '"') or lowered in ('true', 'false', 'null') or __import__('re').fullmatch(r'-?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?', s):
try:
parsed_val = json.loads(val)
except Exception:
parsed_val = val
else:
parsed_val = val
else:
try:
parsed_val = json.loads(val)
except Exception:
parsed_val = val
except Exception:
parsed_val = val
if cat == 'global':
config[key] = parsed_val
else:
@@ -298,9 +435,15 @@ def get_config_all() -> Dict[str, Any]:
name_dict[key] = parsed_val
else:
config.setdefault(cat, {})[key] = parsed_val
return config
def get_config_all() -> Dict[str, Any]:
"""Retrieve all configuration from the database in the legacy dict format."""
rows = db.fetchall("SELECT category, subtype, item_name, key, value FROM config")
return rows_to_config(rows)
# Worker Management Methods for medios.db
def insert_worker(worker_id: str, worker_type: str, title: str = "", description: str = "") -> bool: