from __future__ import annotations import sqlite3 import json from pathlib import Path from typing import Any, Dict, List, Optional # The database is located in the project root ROOT_DIR = Path(__file__).resolve().parent.parent DB_PATH = ROOT_DIR / "medios.db" class Database: _instance: Optional[Database] = None def __new__(cls): if cls._instance is None: cls._instance = super(Database, cls).__new__(cls) cls._instance._init_db() return cls._instance def _init_db(self): self.conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) self.conn.row_factory = sqlite3.Row self._create_tables() def _create_tables(self): cursor = self.conn.cursor() # Config table: stores all settings previously in config.conf # category: global, store, provider, tool, networking # subtype: e.g., hydrusnetwork, folder, alldebrid # item_name: e.g., hn-local, default # key: the setting key # value: the setting value (serialized to string) cursor.execute(""" CREATE TABLE IF NOT EXISTS config ( category TEXT, subtype TEXT, item_name TEXT, key TEXT, value TEXT, PRIMARY KEY (category, subtype, item_name, key) ) """) # Logs table cursor.execute(""" CREATE TABLE IF NOT EXISTS logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, level TEXT, module TEXT, message TEXT ) """) # Workers table (for background tasks) cursor.execute(""" CREATE TABLE IF NOT EXISTS workers ( id TEXT PRIMARY KEY, type TEXT, title TEXT, description TEXT, status TEXT DEFAULT 'running', progress REAL DEFAULT 0.0, details TEXT, result TEXT DEFAULT 'pending', error_message TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Worker stdout/logs cursor.execute(""" CREATE TABLE IF NOT EXISTS worker_stdout ( worker_id TEXT, channel TEXT DEFAULT 'stdout', content TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(worker_id) REFERENCES workers(id) ) """) self.conn.commit() def get_connection(self): return self.conn def execute(self, query: str, params: tuple = ()): cursor = self.conn.cursor() cursor.execute(query, params) self.conn.commit() return cursor def fetchall(self, query: str, params: tuple = ()): cursor = self.conn.cursor() cursor.execute(query, params) return cursor.fetchall() def fetchone(self, query: str, params: tuple = ()): cursor = self.conn.cursor() cursor.execute(query, params) return cursor.fetchone() # Singleton instance db = Database() def get_db() -> Database: return db def log_to_db(level: str, module: str, message: str): """Log a message to the database.""" try: db.execute( "INSERT INTO logs (level, module, message) VALUES (?, ?, ?)", (level, module, message) ) except Exception: # Avoid recursive logging errors if DB is locked pass # Initialize DB logger in the unified logger try: from SYS.logger import set_db_logger set_db_logger(log_to_db) except ImportError: pass def save_config_value(category: str, subtype: str, item_name: str, key: str, value: Any): """Save a configuration value to the database.""" val_str = json.dumps(value) if not isinstance(value, str) else value db.execute( "INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)", (category, subtype, item_name, key, val_str) ) def get_config_all() -> Dict[str, Any]: """Retrieve all configuration from the database in the legacy dict format.""" try: db.execute("DELETE FROM config WHERE category='store' AND LOWER(subtype) in ('folder', 'zerotier')") db.execute("DELETE FROM config WHERE category='networking'") except Exception: pass rows = db.fetchall("SELECT category, subtype, item_name, key, value FROM config") config: Dict[str, Any] = {} for row in rows: cat = row['category'] sub = row['subtype'] name = row['item_name'] key = row['key'] val = row['value'] # Drop legacy folder store entries (folder store is removed). if cat == 'store' and str(sub).strip().lower() == 'folder': continue # Try to parse JSON value, fallback to string try: parsed_val = json.loads(val) except Exception: parsed_val = val if cat == 'global': config[key] = parsed_val else: # Modular structure: config[cat][sub][name][key] if cat in ('provider', 'tool'): cat_dict = config.setdefault(cat, {}) sub_dict = cat_dict.setdefault(sub, {}) sub_dict[key] = parsed_val elif cat == 'store': cat_dict = config.setdefault(cat, {}) sub_dict = cat_dict.setdefault(sub, {}) name_dict = sub_dict.setdefault(name, {}) name_dict[key] = parsed_val else: config.setdefault(cat, {})[key] = parsed_val return config # Worker Management Methods for medios.db def insert_worker(worker_id: str, worker_type: str, title: str = "", description: str = "") -> bool: try: db.execute( "INSERT INTO workers (id, type, title, description, status) VALUES (?, ?, ?, ?, 'running')", (worker_id, worker_type, title, description) ) return True except Exception: return False def update_worker(worker_id: str, **kwargs) -> bool: if not kwargs: return True # Filter valid columns valid_cols = {'type', 'title', 'description', 'status', 'progress', 'details', 'result', 'error_message'} cols = [] vals = [] for k, v in kwargs.items(): if k in valid_cols: cols.append(f"{k} = ?") vals.append(v) if not cols: return True cols.append("updated_at = CURRENT_TIMESTAMP") query = f"UPDATE workers SET {', '.join(cols)} WHERE id = ?" vals.append(worker_id) try: db.execute(query, tuple(vals)) return True except Exception: return False def append_worker_stdout(worker_id: str, content: str, channel: str = 'stdout'): try: db.execute( "INSERT INTO worker_stdout (worker_id, channel, content) VALUES (?, ?, ?)", (worker_id, channel, content) ) except Exception: pass def get_worker_stdout(worker_id: str, channel: Optional[str] = None) -> str: query = "SELECT content FROM worker_stdout WHERE worker_id = ?" params = [worker_id] if channel: query += " AND channel = ?" params.append(channel) query += " ORDER BY timestamp ASC" rows = db.fetchall(query, tuple(params)) return "\n".join(row['content'] for row in rows) def get_active_workers() -> List[Dict[str, Any]]: rows = db.fetchall("SELECT * FROM workers WHERE status = 'running' ORDER BY created_at DESC") return [dict(row) for row in rows] def get_worker(worker_id: str) -> Optional[Dict[str, Any]]: row = db.fetchone("SELECT * FROM workers WHERE id = ?", (worker_id,)) return dict(row) if row else None def expire_running_workers(older_than_seconds: int = 300, status: str = 'error', reason: str = 'timeout') -> int: # SQLITE doesn't have a simple way to do DATETIME - INTERVAL, so we'll use strftime/unixepoch if available # or just do regular update for all running ones for now as a simple fallback query = f"UPDATE workers SET status = ?, error_message = ? WHERE status = 'running'" db.execute(query, (status, reason)) return 0 # We don't easily get the rowcount from db.execute right now