Files
Medios-Macina/SYS/database.py
2026-01-23 02:24:19 -08:00

344 lines
11 KiB
Python

from __future__ import annotations
import sqlite3
import json
import threading
from queue import Queue
from pathlib import Path
from typing import Any, Dict, List, Optional
from contextlib import contextmanager
# The database is located in the project root
ROOT_DIR = Path(__file__).resolve().parent.parent
DB_PATH = ROOT_DIR / "medios.db"
class Database:
_instance: Optional[Database] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(Database, cls).__new__(cls)
cls._instance._init_db()
return cls._instance
def _init_db(self):
self.conn = sqlite3.connect(
str(DB_PATH),
check_same_thread=False,
timeout=30.0 # Increase timeout to 30s to avoid locking issues
)
self.conn.row_factory = sqlite3.Row
# Use WAL mode for better concurrency (allows multiple readers + 1 writer)
try:
self.conn.execute("PRAGMA journal_mode=WAL")
self.conn.execute("PRAGMA synchronous=NORMAL")
except sqlite3.Error:
pass
self._create_tables()
def _create_tables(self):
cursor = self.conn.cursor()
# Config table: stores all settings previously in config.conf
# category: global, store, provider, tool, networking
# subtype: e.g., hydrusnetwork, folder, alldebrid
# item_name: e.g., hn-local, default
# key: the setting key
# value: the setting value (serialized to string)
cursor.execute("""
CREATE TABLE IF NOT EXISTS config (
category TEXT,
subtype TEXT,
item_name TEXT,
key TEXT,
value TEXT,
PRIMARY KEY (category, subtype, item_name, key)
)
""")
# Logs table
cursor.execute("""
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
level TEXT,
module TEXT,
message TEXT
)
""")
# Workers table (for background tasks)
cursor.execute("""
CREATE TABLE IF NOT EXISTS workers (
id TEXT PRIMARY KEY,
type TEXT,
title TEXT,
description TEXT,
status TEXT DEFAULT 'running',
progress REAL DEFAULT 0.0,
details TEXT,
result TEXT DEFAULT 'pending',
error_message TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Worker stdout/logs
cursor.execute("""
CREATE TABLE IF NOT EXISTS worker_stdout (
worker_id TEXT,
channel TEXT DEFAULT 'stdout',
content TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(worker_id) REFERENCES workers(id)
)
""")
self.conn.commit()
def get_connection(self):
return self.conn
def execute(self, query: str, params: tuple = ()):
cursor = self.conn.cursor()
try:
cursor.execute(query, params)
if not self.conn.in_transaction:
self.conn.commit()
return cursor
except Exception:
if not self.conn.in_transaction:
self.conn.rollback()
raise
def executemany(self, query: str, param_list: List[tuple]):
cursor = self.conn.cursor()
try:
cursor.executemany(query, param_list)
if not self.conn.in_transaction:
self.conn.commit()
return cursor
except Exception:
if not self.conn.in_transaction:
self.conn.rollback()
raise
@contextmanager
def transaction(self):
"""Context manager for a database transaction."""
if self.conn.in_transaction:
# Already in a transaction, just yield
yield self.conn
else:
try:
self.conn.execute("BEGIN")
yield self.conn
self.conn.commit()
except Exception:
self.conn.rollback()
raise
def fetchall(self, query: str, params: tuple = ()):
cursor = self.conn.cursor()
try:
cursor.execute(query, params)
return cursor.fetchall()
finally:
cursor.close()
def fetchone(self, query: str, params: tuple = ()):
cursor = self.conn.cursor()
try:
cursor.execute(query, params)
return cursor.fetchone()
finally:
cursor.close()
# Singleton instance
db = Database()
_LOG_QUEUE: Queue[tuple[str, str, str]] = Queue()
_LOG_THREAD_STARTED = False
_LOG_THREAD_LOCK = threading.Lock()
def _log_worker_loop() -> None:
while True:
level, module, message = _LOG_QUEUE.get()
try:
db.execute(
"INSERT INTO logs (level, module, message) VALUES (?, ?, ?)",
(level, module, message)
)
except Exception:
pass
finally:
try:
_LOG_QUEUE.task_done()
except Exception:
pass
def _ensure_log_thread() -> None:
global _LOG_THREAD_STARTED
if _LOG_THREAD_STARTED:
return
with _LOG_THREAD_LOCK:
if _LOG_THREAD_STARTED:
return
thread = threading.Thread(
target=_log_worker_loop,
name="mediosdb-log",
daemon=True
)
thread.start()
_LOG_THREAD_STARTED = True
def get_db() -> Database:
return db
def log_to_db(level: str, module: str, message: str):
"""Log a message to the database asynchronously."""
try:
_ensure_log_thread()
_LOG_QUEUE.put((level, module, message))
except Exception:
# Avoid recursive logging errors if the queue fails
pass
# Initialize DB logger in the unified logger
try:
from SYS.logger import set_db_logger
set_db_logger(log_to_db)
except ImportError:
pass
def save_config_value(category: str, subtype: str, item_name: str, key: str, value: Any):
"""Save a configuration value to the database."""
val_str = json.dumps(value) if not isinstance(value, str) else value
db.execute(
"INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)",
(category, subtype, item_name, key, val_str)
)
def get_config_all() -> Dict[str, Any]:
"""Retrieve all configuration from the database in the legacy dict format."""
try:
db.execute("DELETE FROM config WHERE category='store' AND LOWER(subtype) in ('folder', 'zerotier')")
db.execute("DELETE FROM config WHERE category='networking'")
except Exception:
pass
rows = db.fetchall("SELECT category, subtype, item_name, key, value FROM config")
config: Dict[str, Any] = {}
for row in rows:
cat = row['category']
sub = row['subtype']
name = row['item_name']
key = row['key']
val = row['value']
# Drop legacy folder store entries (folder store is removed).
if cat == 'store' and str(sub).strip().lower() == 'folder':
continue
# Try to parse JSON value, fallback to string
try:
parsed_val = json.loads(val)
except Exception:
parsed_val = val
if cat == 'global':
config[key] = parsed_val
else:
# Modular structure: config[cat][sub][name][key]
if cat in ('provider', 'tool'):
cat_dict = config.setdefault(cat, {})
sub_dict = cat_dict.setdefault(sub, {})
sub_dict[key] = parsed_val
elif cat == 'store':
cat_dict = config.setdefault(cat, {})
sub_dict = cat_dict.setdefault(sub, {})
name_dict = sub_dict.setdefault(name, {})
name_dict[key] = parsed_val
else:
config.setdefault(cat, {})[key] = parsed_val
return config
# Worker Management Methods for medios.db
def insert_worker(worker_id: str, worker_type: str, title: str = "", description: str = "") -> bool:
try:
db.execute(
"INSERT INTO workers (id, type, title, description, status) VALUES (?, ?, ?, ?, 'running')",
(worker_id, worker_type, title, description)
)
return True
except Exception:
return False
def update_worker(worker_id: str, **kwargs) -> bool:
if not kwargs:
return True
# Filter valid columns
valid_cols = {'type', 'title', 'description', 'status', 'progress', 'details', 'result', 'error_message'}
cols = []
vals = []
for k, v in kwargs.items():
if k in valid_cols:
cols.append(f"{k} = ?")
vals.append(v)
if not cols:
return True
cols.append("updated_at = CURRENT_TIMESTAMP")
query = f"UPDATE workers SET {', '.join(cols)} WHERE id = ?"
vals.append(worker_id)
try:
db.execute(query, tuple(vals))
return True
except Exception:
return False
def append_worker_stdout(worker_id: str, content: str, channel: str = 'stdout'):
try:
db.execute(
"INSERT INTO worker_stdout (worker_id, channel, content) VALUES (?, ?, ?)",
(worker_id, channel, content)
)
except Exception:
pass
def get_worker_stdout(worker_id: str, channel: Optional[str] = None) -> str:
query = "SELECT content FROM worker_stdout WHERE worker_id = ?"
params = [worker_id]
if channel:
query += " AND channel = ?"
params.append(channel)
query += " ORDER BY timestamp ASC"
rows = db.fetchall(query, tuple(params))
return "\n".join(row['content'] for row in rows)
def get_active_workers() -> List[Dict[str, Any]]:
rows = db.fetchall("SELECT * FROM workers WHERE status = 'running' ORDER BY created_at DESC")
return [dict(row) for row in rows]
def get_worker(worker_id: str) -> Optional[Dict[str, Any]]:
row = db.fetchone("SELECT * FROM workers WHERE id = ?", (worker_id,))
return dict(row) if row else None
def expire_running_workers(older_than_seconds: int = 300, status: str = 'error', reason: str = 'timeout') -> int:
# SQLITE doesn't have a simple way to do DATETIME - INTERVAL, so we'll use strftime/unixepoch if available
# or just do regular update for all running ones for now as a simple fallback
query = f"UPDATE workers SET status = ?, error_message = ? WHERE status = 'running'"
db.execute(query, (status, reason))
return 0 # We don't easily get the rowcount from db.execute right now