This commit is contained in:
2026-01-15 03:20:52 -08:00
parent 3a02a52863
commit dabc8f9d51
3 changed files with 313 additions and 148 deletions

View File

@@ -16,6 +16,7 @@ import logging
import subprocess
import shutil
import time
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path, PurePosixPath
from threading import RLock
@@ -218,6 +219,26 @@ class API_folder_store:
self._db_lock = self._shared_db_lock
self._init_db()
@contextmanager
def _with_db_lock(self, *, timeout: float = 8.0):
"""Acquire the shared DB lock with a bounded wait to avoid indefinite stalls."""
locked = False
try:
locked = self._db_lock.acquire(timeout=timeout)
if not locked:
mm_debug(f"[folder-db] lock acquisition timed out after {timeout:.1f}s; proceeding unlocked")
except Exception as exc:
locked = False
mm_debug(f"[folder-db] lock acquisition failed ({exc}); proceeding unlocked")
try:
yield
finally:
if locked:
try:
self._db_lock.release()
except RuntimeError:
pass
def _normalize_input_path(self, file_path: Path) -> Path:
p = expand_path(file_path).resolve()
# If the path is relative to the current working directory, we check if it's meant to be in the library_root.
@@ -261,7 +282,7 @@ class API_folder_store:
def _init_db(self) -> None:
"""Initialize database connection and create tables if needed."""
with self._db_lock:
with self._with_db_lock():
try:
# Ensure the library root exists; sqlite cannot create parent dirs.
try:
@@ -723,7 +744,7 @@ class API_folder_store:
@_db_retry()
def _update_metadata_modified_time(self, file_hash: str) -> None:
"""Update the time_modified timestamp for a file's metadata."""
with self._db_lock:
with self._with_db_lock():
try:
cursor = self.connection.cursor()
cursor.execute(
@@ -770,7 +791,7 @@ class API_folder_store:
attempt = 0
while True:
try:
with self._db_lock:
with self._with_db_lock():
cursor = self.connection.cursor()
mm_debug("[folder-db] SELECT files by file_path")
@@ -877,46 +898,61 @@ class API_folder_store:
def get_metadata(self, file_hash: str) -> Optional[Dict[str, Any]]:
"""Get metadata for a file by hash."""
try:
with self._db_lock:
cursor = self.connection.cursor()
max_attempts = 5
attempt = 0
while True:
try:
with self._with_db_lock():
cursor = self.connection.cursor()
cursor.execute(
"""
SELECT m.* FROM metadata m
WHERE m.hash = ?
""",
(file_hash,
),
cursor.execute(
"""
SELECT m.* FROM metadata m
WHERE m.hash = ?
""",
(file_hash,
),
)
row = cursor.fetchone()
if not row:
return None
metadata = dict(row)
# Parse JSON fields
for field in ["url", "relationships"]:
if metadata.get(field):
try:
metadata[field] = json.loads(metadata[field])
except (json.JSONDecodeError, TypeError):
metadata[field] = [] if field == "url" else {}
# Ensure relationships is always a dict
if metadata.get("relationships") is None:
metadata["relationships"] = {}
if not isinstance(metadata.get("relationships"), dict):
metadata["relationships"] = {}
return metadata
except sqlite3.OperationalError as e:
msg = str(e or "").lower()
if "database is locked" in msg and attempt < max_attempts:
attempt += 1
sleep_time = min(0.1 * (2 ** (attempt - 1)), 1.0)
time.sleep(sleep_time)
continue
logger.error(
f"Error getting metadata for hash {file_hash}: {e}",
exc_info=True
)
row = cursor.fetchone()
if not row:
return None
metadata = dict(row)
# Parse JSON fields
for field in ["url", "relationships"]:
if metadata.get(field):
try:
metadata[field] = json.loads(metadata[field])
except (json.JSONDecodeError, TypeError):
metadata[field] = [] if field == "url" else {}
# Ensure relationships is always a dict
if metadata.get("relationships") is None:
metadata["relationships"] = {}
if not isinstance(metadata.get("relationships"), dict):
metadata["relationships"] = {}
return metadata
except Exception as e:
logger.error(
f"Error getting metadata for hash {file_hash}: {e}",
exc_info=True
)
return None
return None
except Exception as e:
logger.error(
f"Error getting metadata for hash {file_hash}: {e}",
exc_info=True
)
return None
def set_relationship_by_hash(
self,
@@ -1113,7 +1149,7 @@ class API_folder_store:
file_type = get_type_from_ext(str(ext))
with self._db_lock:
with self._with_db_lock():
cursor = self.connection.cursor()
cursor.execute(
"""
@@ -1163,7 +1199,7 @@ class API_folder_store:
tags: List[str]
) -> None:
"""Save metadata and tags for a file in a single transaction."""
with self._db_lock:
with self._with_db_lock():
try:
abs_path = self._normalize_input_path(file_path)
db_path = self._to_db_file_path(abs_path)
@@ -1247,24 +1283,36 @@ class API_folder_store:
def get_tags(self, file_hash: str) -> List[str]:
"""Get all tags for a file by hash."""
try:
with self._db_lock:
cursor = self.connection.cursor()
max_attempts = 5
attempt = 0
while True:
try:
with self._with_db_lock():
cursor = self.connection.cursor()
cursor.execute(
"""
SELECT t.tag FROM tag t
WHERE t.hash = ?
ORDER BY t.tag
""",
(file_hash,
),
)
cursor.execute(
"""
SELECT t.tag FROM tag t
WHERE t.hash = ?
ORDER BY t.tag
""",
(file_hash,
),
)
return [row[0] for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Error getting tags for hash {file_hash}: {e}", exc_info=True)
return []
return [row[0] for row in cursor.fetchall()]
except sqlite3.OperationalError as e:
msg = str(e or "").lower()
if "database is locked" in msg and attempt < max_attempts:
attempt += 1
sleep_time = min(0.1 * (2 ** (attempt - 1)), 1.0)
time.sleep(sleep_time)
continue
logger.error(f"Error getting tags for hash {file_hash}: {e}", exc_info=True)
return []
except Exception as e:
logger.error(f"Error getting tags for hash {file_hash}: {e}", exc_info=True)
return []
@_db_retry()
def save_tags(self, file_path: Path, tags: List[str]) -> None:
@@ -1357,7 +1405,7 @@ class API_folder_store:
@_db_retry()
def add_tags(self, file_path: Path, tags: List[str]) -> None:
"""Add tags to a file."""
with self._db_lock:
with self._with_db_lock():
try:
file_hash = self.get_or_create_file_entry(file_path)
cursor = self.connection.cursor()
@@ -1425,7 +1473,7 @@ class API_folder_store:
@_db_retry()
def remove_tags(self, file_path: Path, tags: List[str]) -> None:
"""Remove specific tags from a file."""
with self._db_lock:
with self._with_db_lock():
try:
file_hash = self.get_or_create_file_entry(file_path)
cursor = self.connection.cursor()
@@ -1452,7 +1500,7 @@ class API_folder_store:
@_db_retry()
def add_tags_to_hash(self, file_hash: str, tags: List[str]) -> None:
"""Add tags to a file by hash."""
with self._db_lock:
with self._with_db_lock():
try:
cursor = self.connection.cursor()
@@ -1495,7 +1543,7 @@ class API_folder_store:
@_db_retry()
def remove_tags_from_hash(self, file_hash: str, tags: List[str]) -> None:
"""Remove specific tags from a file by hash."""
with self._db_lock:
with self._with_db_lock():
try:
cursor = self.connection.cursor()
@@ -1529,7 +1577,7 @@ class API_folder_store:
Any]
) -> None:
"""Update metadata for a file by hash."""
with self._db_lock:
with self._with_db_lock():
try:
cursor = self.connection.cursor()
@@ -1582,7 +1630,7 @@ class API_folder_store:
related_file_path: Path to the related file
rel_type: Type of relationship ('king', 'alt', 'related')
"""
with self._db_lock:
with self._with_db_lock():
try:
str_path = str(file_path.resolve())
str_related_path = str(related_file_path.resolve())
@@ -1734,75 +1782,141 @@ class API_folder_store:
)
return []
def get_note(self, file_hash: str) -> Optional[str]:
"""Get the default note for a file by hash."""
try:
notes = self.get_notes(file_hash)
if not notes:
return None
return notes.get("default")
except Exception as e:
logger.error(f"Error getting note for hash {file_hash}: {e}", exc_info=True)
def get_note(self, file_hash: str, name: str = "default") -> Optional[str]:
"""Get a named note (default note by default) for a file hash."""
normalized_hash = str(file_hash or "").strip().lower()
if len(normalized_hash) != 64:
return None
def get_notes(self, file_hash: str) -> Dict[str, str]:
"""Get all notes for a file by hash."""
try:
cursor = self.connection.cursor()
cursor.execute(
"SELECT name, note FROM note WHERE hash = ? ORDER BY name ASC",
(file_hash,
),
)
out: Dict[str,
str] = {}
for name, note in cursor.fetchall() or []:
if not name:
continue
out[str(name)] = str(note or "")
return out
except Exception as e:
logger.error(
f"Error getting notes for hash {file_hash}: {e}",
exc_info=True
)
return {}
note_name = str(name or "default").strip() or "default"
max_attempts = 5
import time
def save_note(self, file_path: Path, note: str) -> None:
"""Save the default note for a file."""
self.set_note(file_path, "default", note)
for attempt in range(max_attempts):
try:
with self._with_db_lock():
cursor = self.connection.cursor()
cursor.execute(
"SELECT note FROM note WHERE hash = ? AND name = ?",
(normalized_hash,
note_name),
)
row = cursor.fetchone()
if row:
return row[0]
if note_name != "default":
return None
cursor.execute(
"SELECT note FROM note WHERE hash = ? ORDER BY updated_at DESC LIMIT 1",
(normalized_hash,
),
)
row = cursor.fetchone()
return row[0] if row else None
except sqlite3.OperationalError as e:
msg = str(e or "").lower()
if "database is locked" in msg and attempt < (max_attempts - 1):
sleep_time = min(0.1 * (2 ** attempt), 1.0)
time.sleep(sleep_time)
continue
logger.error(
f"Error getting note for hash {file_hash}: {e}",
exc_info=True
)
return None
except Exception as e:
logger.error(
f"Error getting note for hash {file_hash}: {e}",
exc_info=True
)
return None
return None
def set_note_by_hash(self, file_hash: str, name: str, note: str) -> None:
"""Set a named note using a known file hash (no re-hash)."""
note_name = str(name or "").strip()
normalized_hash = str(file_hash or "").strip().lower()
if not note_name:
raise ValueError("Note name is required")
if len(normalized_hash) != 64:
raise ValueError("File hash must be a 64-character hex string")
max_attempts = 5
import time
for attempt in range(max_attempts):
try:
with self._with_db_lock():
cursor = self.connection.cursor()
cursor.execute(
"SELECT 1 FROM file WHERE hash = ?",
(normalized_hash,
),
)
exists = cursor.fetchone() is not None
if not exists:
raise ValueError(
f"Hash {normalized_hash} not found in file table"
)
cursor.execute(
"""
INSERT INTO note (hash, name, note)
VALUES (?, ?, ?)
ON CONFLICT(hash, name) DO UPDATE SET
note = excluded.note,
updated_at = CURRENT_TIMESTAMP
""",
(normalized_hash,
note_name,
note),
)
self.connection.commit()
logger.debug(
f"Saved note '{note_name}' for hash {normalized_hash}"
)
return
except sqlite3.OperationalError as e:
msg = str(e or "").lower()
if "database is locked" in msg and attempt < (max_attempts - 1):
sleep_time = min(0.1 * (2 ** attempt), 1.0)
time.sleep(sleep_time)
continue
logger.error(
f"Error saving note for hash {normalized_hash}: {e}",
exc_info=True
)
raise
except Exception as e:
logger.error(
f"Error saving note for hash {normalized_hash}: {e}",
exc_info=True
)
raise
def set_note(self, file_path: Path, name: str, note: str) -> None:
"""Set a named note for a file."""
with self._db_lock:
try:
note_name = str(name or "").strip()
if not note_name:
raise ValueError("Note name is required")
"""Set a named note for a file path, computing hash if needed."""
note_name = str(name or "").strip()
if not note_name:
raise ValueError("Note name is required")
file_hash = self.get_or_create_file_entry(file_path)
cursor = self.connection.cursor()
cursor.execute(
"""
INSERT INTO note (hash, name, note)
VALUES (?, ?, ?)
ON CONFLICT(hash, name) DO UPDATE SET
note = excluded.note,
updated_at = CURRENT_TIMESTAMP
""",
(file_hash,
note_name,
note),
)
self.connection.commit()
logger.debug(f"Saved note '{note_name}' for {file_path}")
except Exception as e:
logger.error(f"Error saving note for {file_path}: {e}", exc_info=True)
raise
try:
file_hash = self.get_or_create_file_entry(file_path)
self.set_note_by_hash(file_hash, note_name, note)
except Exception as e:
logger.error(f"Error saving note for {file_path}: {e}", exc_info=True)
raise
def save_note(self, file_path: Path, note: str, name: str = "default") -> None:
"""Backward-compatible helper to store a note for a file path."""
self.set_note(file_path, name, note)
def delete_note(self, file_hash: str, name: str) -> None:
"""Delete a named note for a file by hash."""
with self._db_lock:
with self._with_db_lock():
try:
note_name = str(name or "").strip()
if not note_name:
@@ -1854,7 +1968,7 @@ class API_folder_store:
def search_hash(self, file_hash: str) -> Optional[Path]:
"""Search for a file by hash."""
try:
with self._db_lock:
with self._with_db_lock():
cursor = self.connection.cursor()
cursor.execute(
@@ -1939,7 +2053,7 @@ class API_folder_store:
backlinks in other files so no file retains dangling references to the
deleted hash.
"""
with self._db_lock:
with self._with_db_lock():
try:
abs_path = self._normalize_input_path(file_path)
str_path = self._to_db_file_path(abs_path)
@@ -2048,7 +2162,7 @@ class API_folder_store:
pipe: Optional[str] = None,
) -> int:
"""Insert a new worker entry into the database."""
with self._db_lock:
with self._with_db_lock():
try:
cursor = self.connection.cursor()
cursor.execute(
@@ -2085,7 +2199,7 @@ class API_folder_store:
def update_worker(self, worker_id: str, **kwargs) -> bool:
"""Update worker entry with given fields."""
with self._db_lock:
with self._with_db_lock():
try:
allowed_fields = {
"status",
@@ -2129,7 +2243,7 @@ class API_folder_store:
def update_worker_status(self, worker_id: str, status: str) -> int:
"""Update worker status and return its database ID."""
with self._db_lock:
with self._with_db_lock():
try:
cursor = self.connection.cursor()
@@ -2208,7 +2322,7 @@ class API_folder_store:
def delete_worker(self, worker_id: str) -> bool:
"""Delete a worker entry."""
with self._db_lock:
with self._with_db_lock():
try:
cursor = self.connection.cursor()
cursor.execute("DELETE FROM worker WHERE worker_id = ?",
@@ -2316,7 +2430,7 @@ class API_folder_store:
"""Append text to a worker's stdout log and timeline."""
if not text:
return True
with self._db_lock:
with self._with_db_lock():
try:
# Check if connection is valid
if not self.connection: