This commit is contained in:
2026-01-12 20:26:45 -08:00
parent 749ffb7e34
commit 8b7f518725

View File

@@ -25,7 +25,8 @@ from SYS.utils import sha256_file, expand_path
from SYS.logger import debug as mm_debug from SYS.logger import debug as mm_debug
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
WORKER_LOG_MAX_ENTRIES = 99 WORKER_LOG_MAX_ENTRIES = 50 # Reduced from 99 to keep log size down
MAX_FINISHED_WORKERS = 100 # Only keep 100 finished workers globally
# Helper: decorate DB write methods to retry transient SQLITE 'database is locked' errors # Helper: decorate DB write methods to retry transient SQLITE 'database is locked' errors
def _db_retry(max_attempts: int = 6, base_sleep: float = 0.1): def _db_retry(max_attempts: int = 6, base_sleep: float = 0.1):
@@ -304,10 +305,22 @@ class API_folder_store:
) )
self.connection.row_factory = sqlite3.Row self.connection.row_factory = sqlite3.Row
# Enable Write-Ahead Logging (WAL) for better concurrency # Performance & Size Optimizations
# 1. WAL mode for better concurrency and fewer locks
self.connection.execute("PRAGMA journal_mode=WAL") self.connection.execute("PRAGMA journal_mode=WAL")
# Enable foreign keys # 2. auto_vacuum=FULL to automatically reclaim space from deleted rows/logs
self.connection.execute("PRAGMA auto_vacuum = FULL")
# 3. Increase page size for modern file systems
self.connection.execute("PRAGMA page_size = 4096")
# 4. Memory and Sync optimizations
self.connection.execute("PRAGMA synchronous = NORMAL")
self.connection.execute("PRAGMA temp_store = MEMORY")
self.connection.execute("PRAGMA cache_size = -2000")
# Use memory mapping for the entire DB (up to 30MB) for near-instant reads
self.connection.execute("PRAGMA mmap_size = 30000000")
# 5. Standard features
self.connection.execute("PRAGMA foreign_keys = ON") self.connection.execute("PRAGMA foreign_keys = ON")
# Bound how long sqlite will wait on locks before raising. # Bound how long sqlite will wait on locks before raising.
try: try:
self.connection.execute("PRAGMA busy_timeout = 5000") self.connection.execute("PRAGMA busy_timeout = 5000")
@@ -315,6 +328,10 @@ class API_folder_store:
pass pass
self._create_tables() self._create_tables()
# Run maintenance if the DB has grown suspiciously large
self._run_maintenance_if_needed()
logger.info(f"Database initialized at {self.db_path}") logger.info(f"Database initialized at {self.db_path}")
except Exception as e: except Exception as e:
logger.error(f"Failed to initialize database: {e}", exc_info=True) logger.error(f"Failed to initialize database: {e}", exc_info=True)
@@ -326,6 +343,84 @@ class API_folder_store:
self.connection = None self.connection = None
raise raise
def _run_maintenance_if_needed(self) -> None:
"""Perform a one-time VACUUM if the database file is large."""
try:
if not self.db_path.exists():
return
# Global cleanup of old workers and logs regardless of size
self._global_cleanup()
# If the database is larger than 30MB, run a vacuum to ensure space is reclaimed.
# We only do this on startup to minimize performance impact.
file_stats = self.db_path.stat()
size_mb = file_stats.st_size / (1024 * 1024)
if size_mb > 30:
logger.debug(f"Database size ({size_mb:.1f}MB) exceeds maintenance threshold. Vacuuming...")
# We use a cursor to avoid blocking the main connection state if possible
self.connection.execute("VACUUM")
# Also optimize the query planner indices
self.connection.execute("ANALYZE")
new_size_mb = self.db_path.stat().st_size / (1024 * 1024)
reduction = size_mb - new_size_mb
if reduction > 1.0:
logger.info(f"Maintenance reclaimed {reduction:.1f}MB. Current size: {new_size_mb:.1f}MB")
except Exception as e:
# Maintenance should never block application startup
logger.warning(f"Database maintenance skipped: {e}")
def _global_cleanup(self) -> None:
"""Aggressively prune old workers and logs to prevent database bloat."""
try:
cursor = self.connection.cursor()
# 1. Prune finished/failed workers older than MAX_FINISHED_WORKERS
# We keep the newest ones based on completed_at or started_at
cursor.execute(
"""
DELETE FROM worker
WHERE status != 'running'
AND id NOT IN (
SELECT id FROM worker
WHERE status != 'running'
ORDER BY COALESCE(completed_at, started_at) DESC
LIMIT ?
)
""",
(MAX_FINISHED_WORKERS,)
)
worker_deletes = cursor.rowcount
# 2. Orphans check: Remove logs that no longer have a parent worker
cursor.execute(
"DELETE FROM worker_log WHERE worker_id NOT IN (SELECT worker_id FROM worker)"
)
log_orphans = cursor.rowcount
# 3. Global log limit: Ensure we don't have millions of log rows even if workers are within limit
# Limit total log entries to something reasonable like 5,000
cursor.execute(
"""
DELETE FROM worker_log
WHERE id NOT IN (
SELECT id FROM worker_log
ORDER BY created_at DESC
LIMIT 5000
)
"""
)
log_limit_deletes = cursor.rowcount
if worker_deletes > 0 or log_orphans > 0 or log_limit_deletes > 0:
logger.info(f"Global cleanup: Removed {worker_deletes} workers and {log_orphans + log_limit_deletes} log entries.")
self.connection.commit()
except Exception as e:
logger.warning(f"Global cleanup failed: {e}")
def _create_tables(self) -> None: def _create_tables(self) -> None:
"""Create database tables if they don't exist.""" """Create database tables if they don't exist."""
cursor = self.connection.cursor() cursor = self.connection.cursor()
@@ -417,6 +512,15 @@ class API_folder_store:
# Notes indices (after migration so columns exist) # Notes indices (after migration so columns exist)
cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_hash ON note(hash)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_hash ON note(hash)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_name ON note(name)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_name ON note(name)")
# Additional optimizations for search speed
# Covering index for tags helps query 'tags for hash' without hitting the table
cursor.execute("CREATE INDEX IF NOT EXISTS idx_tag_covering ON tag(hash, tag)")
# Index on metadata size and imports for common sorting
cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata_size ON metadata(size)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata_imported ON metadata(time_imported)")
self.connection.commit() self.connection.commit()
logger.debug("Database tables created/verified") logger.debug("Database tables created/verified")
@@ -1938,8 +2042,17 @@ class API_folder_store:
total_steps total_steps
), ),
) )
worker_rowid = cursor.lastrowid or 0
# Prune occasionally (1 in 50 chance) or just run it to keep it clean
# Running it every time might be overkill, but let's do a light version
cursor.execute(
"DELETE FROM worker WHERE status != 'running' AND id < (SELECT MAX(id) - ? FROM worker)",
(MAX_FINISHED_WORKERS * 2,)
)
self.connection.commit() self.connection.commit()
return cursor.lastrowid or 0 return worker_rowid
except sqlite3.IntegrityError: except sqlite3.IntegrityError:
return self.update_worker_status(worker_id, "running") return self.update_worker_status(worker_id, "running")
except Exception as e: except Exception as e: