2026-01-22 01:53:13 -08:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import sqlite3
|
|
|
|
|
import json
|
2026-01-31 16:11:25 -08:00
|
|
|
import ast
|
2026-01-23 02:24:19 -08:00
|
|
|
import threading
|
2026-01-23 16:46:48 -08:00
|
|
|
import os
|
2026-01-23 02:24:19 -08:00
|
|
|
from queue import Queue
|
2026-01-22 01:53:13 -08:00
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any, Dict, List, Optional
|
2026-01-22 11:05:40 -08:00
|
|
|
from contextlib import contextmanager
|
2026-01-30 10:47:47 -08:00
|
|
|
import time
|
|
|
|
|
import datetime
|
2026-01-30 12:04:37 -08:00
|
|
|
from SYS.logger import debug, log
|
2026-01-31 19:57:09 -08:00
|
|
|
import logging
|
|
|
|
|
logger = logging.getLogger(__name__)
|
2026-01-22 01:53:13 -08:00
|
|
|
|
2026-01-30 10:47:47 -08:00
|
|
|
# DB execute retry settings (for transient 'database is locked' errors)
|
|
|
|
|
_DB_EXEC_RETRY_MAX = 5
|
|
|
|
|
_DB_EXEC_RETRY_BASE_DELAY = 0.05
|
|
|
|
|
|
2026-01-23 16:46:48 -08:00
|
|
|
# The database is located in the project root (prefer explicit repo hints).
|
|
|
|
|
def _resolve_root_dir() -> Path:
|
|
|
|
|
env_root = (
|
|
|
|
|
os.environ.get("MM_REPO")
|
|
|
|
|
or os.environ.get("MM_ROOT")
|
|
|
|
|
or os.environ.get("REPO")
|
|
|
|
|
)
|
|
|
|
|
if env_root:
|
|
|
|
|
try:
|
|
|
|
|
candidate = Path(env_root).expanduser().resolve()
|
|
|
|
|
if candidate.exists():
|
|
|
|
|
return candidate
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.debug("_resolve_root_dir: failed to resolve env_root %r: %s", env_root, exc, exc_info=True)
|
2026-01-23 16:46:48 -08:00
|
|
|
|
|
|
|
|
cwd = Path.cwd().resolve()
|
|
|
|
|
for base in [cwd, *cwd.parents]:
|
|
|
|
|
if (base / "medios.db").exists():
|
|
|
|
|
return base
|
|
|
|
|
if (base / "CLI.py").exists():
|
|
|
|
|
return base
|
|
|
|
|
if (base / "config.conf").exists():
|
|
|
|
|
return base
|
|
|
|
|
if (base / "scripts").exists() and (base / "SYS").exists():
|
|
|
|
|
return base
|
|
|
|
|
|
|
|
|
|
return Path(__file__).resolve().parent.parent
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ROOT_DIR = _resolve_root_dir()
|
|
|
|
|
DB_PATH = (ROOT_DIR / "medios.db").resolve()
|
2026-02-01 19:01:47 -08:00
|
|
|
LOG_DB_PATH = (ROOT_DIR / "logs.db").resolve()
|
2026-01-22 01:53:13 -08:00
|
|
|
|
|
|
|
|
class Database:
|
|
|
|
|
_instance: Optional[Database] = None
|
|
|
|
|
|
|
|
|
|
def __new__(cls):
|
|
|
|
|
if cls._instance is None:
|
|
|
|
|
cls._instance = super(Database, cls).__new__(cls)
|
|
|
|
|
cls._instance._init_db()
|
|
|
|
|
return cls._instance
|
|
|
|
|
|
|
|
|
|
def _init_db(self):
|
2026-01-23 16:46:48 -08:00
|
|
|
self.db_path = DB_PATH
|
|
|
|
|
db_existed = self.db_path.exists()
|
|
|
|
|
if db_existed:
|
2026-01-30 12:04:37 -08:00
|
|
|
debug(f"Opening existing medios.db at {self.db_path}")
|
2026-01-23 16:46:48 -08:00
|
|
|
else:
|
2026-01-30 12:04:37 -08:00
|
|
|
debug(f"Creating medios.db at {self.db_path}")
|
2026-01-23 16:46:48 -08:00
|
|
|
|
2026-01-22 11:05:40 -08:00
|
|
|
self.conn = sqlite3.connect(
|
2026-01-23 16:46:48 -08:00
|
|
|
str(self.db_path),
|
2026-01-22 11:05:40 -08:00
|
|
|
check_same_thread=False,
|
|
|
|
|
timeout=30.0 # Increase timeout to 30s to avoid locking issues
|
|
|
|
|
)
|
2026-01-22 01:53:13 -08:00
|
|
|
self.conn.row_factory = sqlite3.Row
|
2026-01-30 10:47:47 -08:00
|
|
|
# Reentrant lock to allow nested DB calls within the same thread (e.g., transaction ->
|
|
|
|
|
# get_config_all / save_config_value) without deadlocking.
|
|
|
|
|
self._conn_lock = threading.RLock()
|
|
|
|
|
|
2026-01-22 11:05:40 -08:00
|
|
|
# Use WAL mode for better concurrency (allows multiple readers + 1 writer)
|
2026-01-30 10:47:47 -08:00
|
|
|
# Set a busy timeout so SQLite waits for short locks rather than immediately failing
|
2026-01-22 11:05:40 -08:00
|
|
|
try:
|
2026-01-30 10:47:47 -08:00
|
|
|
self.conn.execute("PRAGMA busy_timeout = 30000")
|
2026-01-22 11:05:40 -08:00
|
|
|
self.conn.execute("PRAGMA journal_mode=WAL")
|
|
|
|
|
self.conn.execute("PRAGMA synchronous=NORMAL")
|
|
|
|
|
except sqlite3.Error:
|
|
|
|
|
pass
|
|
|
|
|
|
2026-01-22 01:53:13 -08:00
|
|
|
self._create_tables()
|
|
|
|
|
|
|
|
|
|
def _create_tables(self):
|
|
|
|
|
cursor = self.conn.cursor()
|
|
|
|
|
|
|
|
|
|
# Config table: stores all settings previously in config.conf
|
|
|
|
|
# category: global, store, provider, tool, networking
|
|
|
|
|
# subtype: e.g., hydrusnetwork, folder, alldebrid
|
|
|
|
|
# item_name: e.g., hn-local, default
|
|
|
|
|
# key: the setting key
|
|
|
|
|
# value: the setting value (serialized to string)
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS config (
|
|
|
|
|
category TEXT,
|
|
|
|
|
subtype TEXT,
|
|
|
|
|
item_name TEXT,
|
|
|
|
|
key TEXT,
|
|
|
|
|
value TEXT,
|
|
|
|
|
PRIMARY KEY (category, subtype, item_name, key)
|
|
|
|
|
)
|
|
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
# Logs table
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS logs (
|
|
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
|
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
|
level TEXT,
|
|
|
|
|
module TEXT,
|
|
|
|
|
message TEXT
|
|
|
|
|
)
|
|
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
# Workers table (for background tasks)
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS workers (
|
|
|
|
|
id TEXT PRIMARY KEY,
|
|
|
|
|
type TEXT,
|
|
|
|
|
title TEXT,
|
|
|
|
|
description TEXT,
|
|
|
|
|
status TEXT DEFAULT 'running',
|
|
|
|
|
progress REAL DEFAULT 0.0,
|
|
|
|
|
details TEXT,
|
|
|
|
|
result TEXT DEFAULT 'pending',
|
|
|
|
|
error_message TEXT,
|
|
|
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
|
|
|
)
|
|
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
# Worker stdout/logs
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS worker_stdout (
|
|
|
|
|
worker_id TEXT,
|
|
|
|
|
channel TEXT DEFAULT 'stdout',
|
|
|
|
|
content TEXT,
|
|
|
|
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
|
FOREIGN KEY(worker_id) REFERENCES workers(id)
|
|
|
|
|
)
|
|
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
self.conn.commit()
|
|
|
|
|
|
|
|
|
|
def get_connection(self):
|
|
|
|
|
return self.conn
|
|
|
|
|
|
2026-01-30 10:47:47 -08:00
|
|
|
def execute(self, query: str, params: tuple = ()):
|
|
|
|
|
attempts = 0
|
|
|
|
|
while True:
|
|
|
|
|
# Serialize access to the underlying sqlite connection to avoid
|
|
|
|
|
# concurrent use from multiple threads which can trigger locks.
|
|
|
|
|
with self._conn_lock:
|
|
|
|
|
cursor = self.conn.cursor()
|
|
|
|
|
try:
|
|
|
|
|
cursor.execute(query, params)
|
|
|
|
|
if not self.conn.in_transaction:
|
|
|
|
|
self.conn.commit()
|
|
|
|
|
return cursor
|
|
|
|
|
except sqlite3.OperationalError as exc:
|
|
|
|
|
msg = str(exc).lower()
|
|
|
|
|
# Retry a few times on transient lock errors
|
|
|
|
|
if 'locked' in msg and attempts < _DB_EXEC_RETRY_MAX:
|
|
|
|
|
attempts += 1
|
|
|
|
|
delay = _DB_EXEC_RETRY_BASE_DELAY * attempts
|
|
|
|
|
log(f"Database locked on execute; retry {attempts}/{_DB_EXEC_RETRY_MAX} in {delay:.2f}s")
|
|
|
|
|
try:
|
|
|
|
|
if not self.conn.in_transaction:
|
|
|
|
|
self.conn.rollback()
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Rollback failed while retrying locked execute: %s", exc)
|
2026-01-30 10:47:47 -08:00
|
|
|
time.sleep(delay)
|
|
|
|
|
continue
|
|
|
|
|
# Not recoverable or out of retries
|
|
|
|
|
if not self.conn.in_transaction:
|
|
|
|
|
try:
|
|
|
|
|
self.conn.rollback()
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Rollback failed in non-recoverable execute path: %s", exc)
|
2026-01-30 10:47:47 -08:00
|
|
|
raise
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
2026-01-30 10:47:47 -08:00
|
|
|
if not self.conn.in_transaction:
|
|
|
|
|
try:
|
|
|
|
|
self.conn.rollback()
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as rb_exc:
|
|
|
|
|
logger.exception("Rollback failed during unexpected execute exception: %s", rb_exc)
|
|
|
|
|
logger.exception("Unexpected exception during DB execute: %s", exc)
|
2026-01-30 10:47:47 -08:00
|
|
|
raise
|
2026-01-22 11:05:40 -08:00
|
|
|
|
|
|
|
|
def executemany(self, query: str, param_list: List[tuple]):
|
2026-01-30 10:47:47 -08:00
|
|
|
attempts = 0
|
|
|
|
|
while True:
|
|
|
|
|
with self._conn_lock:
|
|
|
|
|
cursor = self.conn.cursor()
|
|
|
|
|
try:
|
|
|
|
|
cursor.executemany(query, param_list)
|
|
|
|
|
if not self.conn.in_transaction:
|
|
|
|
|
self.conn.commit()
|
|
|
|
|
return cursor
|
|
|
|
|
except sqlite3.OperationalError as exc:
|
|
|
|
|
msg = str(exc).lower()
|
|
|
|
|
if 'locked' in msg and attempts < _DB_EXEC_RETRY_MAX:
|
|
|
|
|
attempts += 1
|
|
|
|
|
delay = _DB_EXEC_RETRY_BASE_DELAY * attempts
|
|
|
|
|
log(f"Database locked on executemany; retry {attempts}/{_DB_EXEC_RETRY_MAX} in {delay:.2f}s")
|
|
|
|
|
try:
|
|
|
|
|
if not self.conn.in_transaction:
|
|
|
|
|
self.conn.rollback()
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Rollback failed while retrying locked executemany: %s", exc)
|
2026-01-30 10:47:47 -08:00
|
|
|
time.sleep(delay)
|
|
|
|
|
continue
|
|
|
|
|
if not self.conn.in_transaction:
|
|
|
|
|
try:
|
|
|
|
|
self.conn.rollback()
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Rollback failed in non-recoverable executemany path: %s", exc)
|
2026-01-30 10:47:47 -08:00
|
|
|
raise
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
2026-01-30 10:47:47 -08:00
|
|
|
if not self.conn.in_transaction:
|
|
|
|
|
try:
|
|
|
|
|
self.conn.rollback()
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as rb_exc:
|
|
|
|
|
logger.exception("Rollback failed during unexpected executemany exception: %s", rb_exc)
|
|
|
|
|
logger.exception("Unexpected exception during DB executemany: %s", exc)
|
2026-01-30 10:47:47 -08:00
|
|
|
raise
|
2026-01-22 11:05:40 -08:00
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
|
def transaction(self):
|
2026-01-30 10:47:47 -08:00
|
|
|
"""Context manager for a database transaction.
|
|
|
|
|
|
|
|
|
|
Transactions acquire the connection lock for the duration of the transaction
|
|
|
|
|
to prevent other threads from performing concurrent operations on the
|
|
|
|
|
same sqlite connection which can lead to locking issues.
|
|
|
|
|
"""
|
2026-01-22 11:05:40 -08:00
|
|
|
if self.conn.in_transaction:
|
|
|
|
|
# Already in a transaction, just yield
|
|
|
|
|
yield self.conn
|
|
|
|
|
else:
|
2026-01-30 10:47:47 -08:00
|
|
|
# Hold the connection lock for the lifetime of the transaction
|
|
|
|
|
self._conn_lock.acquire()
|
2026-01-22 11:05:40 -08:00
|
|
|
try:
|
|
|
|
|
self.conn.execute("BEGIN")
|
2026-01-30 10:47:47 -08:00
|
|
|
try:
|
|
|
|
|
yield self.conn
|
|
|
|
|
self.conn.commit()
|
|
|
|
|
except Exception:
|
|
|
|
|
self.conn.rollback()
|
|
|
|
|
raise
|
|
|
|
|
finally:
|
|
|
|
|
try:
|
|
|
|
|
self._conn_lock.release()
|
|
|
|
|
except Exception:
|
2026-01-31 19:57:09 -08:00
|
|
|
logger.exception("Failed to release DB connection lock")
|
2026-01-22 01:53:13 -08:00
|
|
|
|
|
|
|
|
def fetchall(self, query: str, params: tuple = ()):
|
2026-01-30 10:47:47 -08:00
|
|
|
with self._conn_lock:
|
|
|
|
|
cursor = self.conn.cursor()
|
|
|
|
|
try:
|
|
|
|
|
cursor.execute(query, params)
|
|
|
|
|
return cursor.fetchall()
|
|
|
|
|
finally:
|
|
|
|
|
cursor.close()
|
|
|
|
|
|
|
|
|
|
def fetchone(self, query: str, params: tuple = ()):
|
|
|
|
|
with self._conn_lock:
|
|
|
|
|
cursor = self.conn.cursor()
|
|
|
|
|
try:
|
|
|
|
|
cursor.execute(query, params)
|
|
|
|
|
return cursor.fetchone()
|
|
|
|
|
finally:
|
|
|
|
|
cursor.close()
|
2026-01-22 01:53:13 -08:00
|
|
|
# Singleton instance
|
|
|
|
|
db = Database()
|
|
|
|
|
|
2026-01-23 02:24:19 -08:00
|
|
|
_LOG_QUEUE: Queue[tuple[str, str, str]] = Queue()
|
|
|
|
|
_LOG_THREAD_STARTED = False
|
|
|
|
|
_LOG_THREAD_LOCK = threading.Lock()
|
|
|
|
|
|
|
|
|
|
|
2026-02-01 19:01:47 -08:00
|
|
|
def _ensure_log_db_schema() -> None:
|
|
|
|
|
try:
|
|
|
|
|
conn = sqlite3.connect(
|
|
|
|
|
str(LOG_DB_PATH),
|
|
|
|
|
timeout=30.0,
|
|
|
|
|
check_same_thread=False,
|
|
|
|
|
)
|
|
|
|
|
try:
|
|
|
|
|
conn.execute("PRAGMA busy_timeout = 30000")
|
|
|
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
|
|
|
conn.execute("PRAGMA synchronous=NORMAL")
|
|
|
|
|
conn.execute(
|
|
|
|
|
"""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS logs (
|
|
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
|
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
|
level TEXT,
|
|
|
|
|
module TEXT,
|
|
|
|
|
message TEXT
|
|
|
|
|
)
|
|
|
|
|
"""
|
|
|
|
|
)
|
|
|
|
|
conn.commit()
|
|
|
|
|
finally:
|
|
|
|
|
conn.close()
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_ensure_log_db_schema()
|
|
|
|
|
|
|
|
|
|
|
2026-01-23 02:24:19 -08:00
|
|
|
def _log_worker_loop() -> None:
|
2026-01-30 10:47:47 -08:00
|
|
|
"""Background log writer using a temporary per-write connection with
|
|
|
|
|
small retry/backoff and a file fallback when writes fail repeatedly.
|
|
|
|
|
"""
|
2026-01-23 02:24:19 -08:00
|
|
|
while True:
|
|
|
|
|
level, module, message = _LOG_QUEUE.get()
|
|
|
|
|
try:
|
2026-01-30 10:47:47 -08:00
|
|
|
attempts = 0
|
|
|
|
|
written = False
|
|
|
|
|
while attempts < 3 and not written:
|
|
|
|
|
try:
|
2026-02-01 19:01:47 -08:00
|
|
|
conn = sqlite3.connect(str(LOG_DB_PATH), timeout=30.0)
|
|
|
|
|
try:
|
|
|
|
|
conn.execute("PRAGMA busy_timeout = 30000")
|
|
|
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
|
|
|
conn.execute("PRAGMA synchronous=NORMAL")
|
|
|
|
|
except sqlite3.Error:
|
|
|
|
|
pass
|
2026-01-30 10:47:47 -08:00
|
|
|
cur = conn.cursor()
|
|
|
|
|
cur.execute("INSERT INTO logs (level, module, message) VALUES (?, ?, ?)", (level, module, message))
|
|
|
|
|
conn.commit()
|
|
|
|
|
cur.close()
|
|
|
|
|
conn.close()
|
|
|
|
|
written = True
|
|
|
|
|
except sqlite3.OperationalError as exc:
|
|
|
|
|
attempts += 1
|
|
|
|
|
if 'locked' in str(exc).lower():
|
|
|
|
|
time.sleep(0.05 * attempts)
|
|
|
|
|
continue
|
|
|
|
|
# Non-lock operational errors: abort attempts
|
|
|
|
|
log(f"Warning: Failed to write log entry (operational): {exc}")
|
|
|
|
|
break
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
log(f"Warning: Failed to write log entry: {exc}")
|
|
|
|
|
break
|
|
|
|
|
if not written:
|
|
|
|
|
# Fallback to a file-based log so we never lose the message silently
|
|
|
|
|
try:
|
|
|
|
|
fallback_dir = Path(db.db_path).with_name("logs")
|
|
|
|
|
fallback_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
fallback_file = fallback_dir / "log_fallback.txt"
|
|
|
|
|
with fallback_file.open("a", encoding="utf-8") as fh:
|
|
|
|
|
fh.write(f"{datetime.datetime.utcnow().isoformat()}Z [{level}] {module}: {message}\n")
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
2026-01-30 10:47:47 -08:00
|
|
|
# Last resort: print to stderr
|
|
|
|
|
try:
|
|
|
|
|
log(f"ERROR: Could not persist log message: {level} {module} {message}")
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
2026-01-31 19:57:09 -08:00
|
|
|
try:
|
|
|
|
|
import sys as _sys, traceback as _tb
|
|
|
|
|
_sys.stderr.write(f"CRITICAL: Could not persist log message to fallback file: {exc}\n")
|
|
|
|
|
_tb.print_exc(file=_sys.stderr)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
2026-01-23 02:24:19 -08:00
|
|
|
finally:
|
|
|
|
|
try:
|
|
|
|
|
_LOG_QUEUE.task_done()
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
try:
|
|
|
|
|
import sys as _sys, traceback as _tb
|
|
|
|
|
_sys.stderr.write(f"CRITICAL: Failed to mark log task done: {exc}\n")
|
|
|
|
|
_tb.print_exc(file=_sys.stderr)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
2026-01-23 02:24:19 -08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _ensure_log_thread() -> None:
|
|
|
|
|
global _LOG_THREAD_STARTED
|
|
|
|
|
if _LOG_THREAD_STARTED:
|
|
|
|
|
return
|
|
|
|
|
with _LOG_THREAD_LOCK:
|
|
|
|
|
if _LOG_THREAD_STARTED:
|
|
|
|
|
return
|
|
|
|
|
thread = threading.Thread(
|
|
|
|
|
target=_log_worker_loop,
|
|
|
|
|
name="mediosdb-log",
|
|
|
|
|
daemon=True
|
|
|
|
|
)
|
|
|
|
|
thread.start()
|
|
|
|
|
_LOG_THREAD_STARTED = True
|
|
|
|
|
|
2026-01-22 01:53:13 -08:00
|
|
|
def get_db() -> Database:
|
|
|
|
|
return db
|
|
|
|
|
|
|
|
|
|
def log_to_db(level: str, module: str, message: str):
|
2026-01-23 02:24:19 -08:00
|
|
|
"""Log a message to the database asynchronously."""
|
2026-01-22 01:53:13 -08:00
|
|
|
try:
|
2026-01-23 02:24:19 -08:00
|
|
|
_ensure_log_thread()
|
|
|
|
|
_LOG_QUEUE.put((level, module, message))
|
2026-01-22 01:53:13 -08:00
|
|
|
except Exception:
|
2026-01-23 02:24:19 -08:00
|
|
|
# Avoid recursive logging errors if the queue fails
|
2026-01-22 01:53:13 -08:00
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# Initialize DB logger in the unified logger
|
|
|
|
|
try:
|
|
|
|
|
from SYS.logger import set_db_logger
|
|
|
|
|
set_db_logger(log_to_db)
|
|
|
|
|
except ImportError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def save_config_value(category: str, subtype: str, item_name: str, key: str, value: Any):
|
|
|
|
|
"""Save a configuration value to the database."""
|
|
|
|
|
val_str = json.dumps(value) if not isinstance(value, str) else value
|
|
|
|
|
db.execute(
|
|
|
|
|
"INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)",
|
|
|
|
|
(category, subtype, item_name, key, val_str)
|
|
|
|
|
)
|
|
|
|
|
|
2026-01-30 10:47:47 -08:00
|
|
|
def rows_to_config(rows) -> Dict[str, Any]:
|
|
|
|
|
"""Convert DB rows (category, subtype, item_name, key, value) into a config dict.
|
|
|
|
|
|
|
|
|
|
This central helper is used by `get_config_all` and callers that need to
|
|
|
|
|
parse rows fetched with a transaction connection to avoid nested lock
|
|
|
|
|
acquisitions.
|
|
|
|
|
"""
|
2026-01-22 01:53:13 -08:00
|
|
|
config: Dict[str, Any] = {}
|
|
|
|
|
for row in rows:
|
|
|
|
|
cat = row['category']
|
|
|
|
|
sub = row['subtype']
|
|
|
|
|
name = row['item_name']
|
|
|
|
|
key = row['key']
|
|
|
|
|
val = row['value']
|
|
|
|
|
|
|
|
|
|
# Drop legacy folder store entries (folder store is removed).
|
|
|
|
|
if cat == 'store' and str(sub).strip().lower() == 'folder':
|
|
|
|
|
continue
|
2026-01-30 10:47:47 -08:00
|
|
|
|
|
|
|
|
# Conservative JSON parsing: only attempt to decode when the value
|
|
|
|
|
# looks like JSON (object/array/quoted string/true/false/null/number).
|
2026-01-31 16:11:25 -08:00
|
|
|
# If JSON decoding fails, also attempt Python literal parsing (e.g., single-quoted
|
|
|
|
|
# dict/list reprs) using ast.literal_eval as a safe fallback. If both
|
|
|
|
|
# attempts fail, use the raw string value.
|
2026-01-30 10:47:47 -08:00
|
|
|
parsed_val = val
|
2026-01-22 01:53:13 -08:00
|
|
|
try:
|
2026-01-30 10:47:47 -08:00
|
|
|
if isinstance(val, str):
|
|
|
|
|
s = val.strip()
|
|
|
|
|
if s == "":
|
|
|
|
|
parsed_val = ""
|
|
|
|
|
else:
|
|
|
|
|
first = s[0]
|
|
|
|
|
lowered = s.lower()
|
|
|
|
|
if first in ('{', '[', '"') or lowered in ('true', 'false', 'null') or __import__('re').fullmatch(r'-?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?', s):
|
|
|
|
|
try:
|
|
|
|
|
parsed_val = json.loads(val)
|
|
|
|
|
except Exception:
|
2026-01-31 16:11:25 -08:00
|
|
|
# Try parsing Python literal formats (single-quoted dicts/lists)
|
|
|
|
|
try:
|
|
|
|
|
parsed_val = ast.literal_eval(val)
|
|
|
|
|
debug(f"Parsed config value for key '{key}' using ast.literal_eval (non-JSON literal)")
|
|
|
|
|
except Exception:
|
|
|
|
|
parsed_val = val
|
2026-01-30 10:47:47 -08:00
|
|
|
else:
|
|
|
|
|
parsed_val = val
|
|
|
|
|
else:
|
|
|
|
|
try:
|
|
|
|
|
parsed_val = json.loads(val)
|
|
|
|
|
except Exception:
|
2026-01-31 16:11:25 -08:00
|
|
|
# Non-string values can sometimes be bytes or Python literals; try decoding when appropriate
|
|
|
|
|
try:
|
|
|
|
|
if isinstance(val, (bytes, bytearray)):
|
|
|
|
|
parsed_val = json.loads(val.decode('utf-8', errors='replace'))
|
|
|
|
|
else:
|
|
|
|
|
parsed_val = val
|
|
|
|
|
except Exception:
|
|
|
|
|
parsed_val = val
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.debug("rows_to_config: failed to parse value for key %s; using raw value", key, exc_info=True)
|
2026-01-22 01:53:13 -08:00
|
|
|
parsed_val = val
|
2026-01-30 10:47:47 -08:00
|
|
|
|
2026-01-22 01:53:13 -08:00
|
|
|
if cat == 'global':
|
|
|
|
|
config[key] = parsed_val
|
|
|
|
|
else:
|
|
|
|
|
# Modular structure: config[cat][sub][name][key]
|
2026-01-22 02:45:08 -08:00
|
|
|
if cat in ('provider', 'tool'):
|
2026-01-22 01:53:13 -08:00
|
|
|
cat_dict = config.setdefault(cat, {})
|
|
|
|
|
sub_dict = cat_dict.setdefault(sub, {})
|
|
|
|
|
sub_dict[key] = parsed_val
|
|
|
|
|
elif cat == 'store':
|
|
|
|
|
cat_dict = config.setdefault(cat, {})
|
|
|
|
|
sub_dict = cat_dict.setdefault(sub, {})
|
|
|
|
|
name_dict = sub_dict.setdefault(name, {})
|
|
|
|
|
name_dict[key] = parsed_val
|
|
|
|
|
else:
|
|
|
|
|
config.setdefault(cat, {})[key] = parsed_val
|
2026-01-30 10:47:47 -08:00
|
|
|
|
2026-01-22 01:53:13 -08:00
|
|
|
return config
|
|
|
|
|
|
2026-01-30 10:47:47 -08:00
|
|
|
|
|
|
|
|
def get_config_all() -> Dict[str, Any]:
|
|
|
|
|
"""Retrieve all configuration from the database in the legacy dict format."""
|
|
|
|
|
rows = db.fetchall("SELECT category, subtype, item_name, key, value FROM config")
|
|
|
|
|
return rows_to_config(rows)
|
|
|
|
|
|
2026-01-22 01:53:13 -08:00
|
|
|
# Worker Management Methods for medios.db
|
|
|
|
|
|
|
|
|
|
def insert_worker(worker_id: str, worker_type: str, title: str = "", description: str = "") -> bool:
|
|
|
|
|
try:
|
|
|
|
|
db.execute(
|
|
|
|
|
"INSERT INTO workers (id, type, title, description, status) VALUES (?, ?, ?, ?, 'running')",
|
|
|
|
|
(worker_id, worker_type, title, description)
|
|
|
|
|
)
|
|
|
|
|
return True
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Failed to insert worker %s: %s", worker_id, exc)
|
2026-01-22 01:53:13 -08:00
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def update_worker(worker_id: str, **kwargs) -> bool:
|
|
|
|
|
if not kwargs:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
# Filter valid columns
|
|
|
|
|
valid_cols = {'type', 'title', 'description', 'status', 'progress', 'details', 'result', 'error_message'}
|
|
|
|
|
cols = []
|
|
|
|
|
vals = []
|
|
|
|
|
for k, v in kwargs.items():
|
|
|
|
|
if k in valid_cols:
|
|
|
|
|
cols.append(f"{k} = ?")
|
|
|
|
|
vals.append(v)
|
|
|
|
|
|
|
|
|
|
if not cols:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
cols.append("updated_at = CURRENT_TIMESTAMP")
|
|
|
|
|
query = f"UPDATE workers SET {', '.join(cols)} WHERE id = ?"
|
|
|
|
|
vals.append(worker_id)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
db.execute(query, tuple(vals))
|
|
|
|
|
return True
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Failed to update worker %s: %s", worker_id, exc)
|
2026-01-22 01:53:13 -08:00
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def append_worker_stdout(worker_id: str, content: str, channel: str = 'stdout'):
|
|
|
|
|
try:
|
|
|
|
|
db.execute(
|
|
|
|
|
"INSERT INTO worker_stdout (worker_id, channel, content) VALUES (?, ?, ?)",
|
|
|
|
|
(worker_id, channel, content)
|
|
|
|
|
)
|
2026-01-31 19:57:09 -08:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.exception("Failed to append worker stdout for %s", worker_id)
|
2026-01-22 01:53:13 -08:00
|
|
|
|
|
|
|
|
def get_worker_stdout(worker_id: str, channel: Optional[str] = None) -> str:
|
|
|
|
|
query = "SELECT content FROM worker_stdout WHERE worker_id = ?"
|
|
|
|
|
params = [worker_id]
|
|
|
|
|
if channel:
|
|
|
|
|
query += " AND channel = ?"
|
|
|
|
|
params.append(channel)
|
|
|
|
|
query += " ORDER BY timestamp ASC"
|
|
|
|
|
|
|
|
|
|
rows = db.fetchall(query, tuple(params))
|
|
|
|
|
return "\n".join(row['content'] for row in rows)
|
|
|
|
|
|
|
|
|
|
def get_active_workers() -> List[Dict[str, Any]]:
|
|
|
|
|
rows = db.fetchall("SELECT * FROM workers WHERE status = 'running' ORDER BY created_at DESC")
|
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
|
|
|
|
def get_worker(worker_id: str) -> Optional[Dict[str, Any]]:
|
|
|
|
|
row = db.fetchone("SELECT * FROM workers WHERE id = ?", (worker_id,))
|
|
|
|
|
return dict(row) if row else None
|
|
|
|
|
|
|
|
|
|
def expire_running_workers(older_than_seconds: int = 300, status: str = 'error', reason: str = 'timeout') -> int:
|
|
|
|
|
# SQLITE doesn't have a simple way to do DATETIME - INTERVAL, so we'll use strftime/unixepoch if available
|
|
|
|
|
# or just do regular update for all running ones for now as a simple fallback
|
|
|
|
|
query = f"UPDATE workers SET status = ?, error_message = ? WHERE status = 'running'"
|
|
|
|
|
db.execute(query, (status, reason))
|
|
|
|
|
return 0 # We don't easily get the rowcount from db.execute right now
|