This commit is contained in:
2026-01-15 00:45:42 -08:00
parent ac10e607bb
commit 3a02a52863
5 changed files with 837 additions and 784 deletions

View File

@@ -261,6 +261,7 @@ class API_folder_store:
def _init_db(self) -> None: def _init_db(self) -> None:
"""Initialize database connection and create tables if needed.""" """Initialize database connection and create tables if needed."""
with self._db_lock:
try: try:
# Ensure the library root exists; sqlite cannot create parent dirs. # Ensure the library root exists; sqlite cannot create parent dirs.
try: try:
@@ -352,14 +353,21 @@ class API_folder_store:
# Global cleanup of old workers and logs regardless of size # Global cleanup of old workers and logs regardless of size
self._global_cleanup() self._global_cleanup()
# If the database is larger than 30MB, run a vacuum to ensure space is reclaimed. # If the database is larger than 64MB, check if a vacuum is worth the time.
# We only do this on startup to minimize performance impact. # We only do this check on startup to minimize performance impact.
file_stats = self.db_path.stat() file_stats = self.db_path.stat()
size_mb = file_stats.st_size / (1024 * 1024) size_mb = file_stats.st_size / (1024 * 1024)
if size_mb > 30: if size_mb > 64:
logger.debug(f"Database size ({size_mb:.1f}MB) exceeds maintenance threshold. Vacuuming...") # Check fragmentation (freelist count)
# We use a cursor to avoid blocking the main connection state if possible try:
freelist = self.connection.execute("PRAGMA freelist_count").fetchone()[0]
page_size = self.connection.execute("PRAGMA page_size").fetchone()[0]
free_mb = (freelist * page_size) / (1024 * 1024)
# If more than 25% or 10MB of the file is free space, it's worth a VACUUM.
if free_mb > 10 or (free_mb / size_mb) > 0.25:
logger.debug(f"Database size ({size_mb:.1f}MB) has {free_mb:.1f}MB free. Vacuuming...")
self.connection.execute("VACUUM") self.connection.execute("VACUUM")
# Also optimize the query planner indices # Also optimize the query planner indices
self.connection.execute("ANALYZE") self.connection.execute("ANALYZE")
@@ -368,6 +376,11 @@ class API_folder_store:
reduction = size_mb - new_size_mb reduction = size_mb - new_size_mb
if reduction > 1.0: if reduction > 1.0:
logger.info(f"Maintenance reclaimed {reduction:.1f}MB. Current size: {new_size_mb:.1f}MB") logger.info(f"Maintenance reclaimed {reduction:.1f}MB. Current size: {new_size_mb:.1f}MB")
except Exception as inner_e:
logger.debug(f"Refined maintenance check failed: {inner_e}")
# Fallback to simple size threshold if PRAGMA fails
if size_mb > 128:
self.connection.execute("VACUUM")
except Exception as e: except Exception as e:
# Maintenance should never block application startup # Maintenance should never block application startup
logger.warning(f"Database maintenance skipped: {e}") logger.warning(f"Database maintenance skipped: {e}")
@@ -710,6 +723,7 @@ class API_folder_store:
@_db_retry() @_db_retry()
def _update_metadata_modified_time(self, file_hash: str) -> None: def _update_metadata_modified_time(self, file_hash: str) -> None:
"""Update the time_modified timestamp for a file's metadata.""" """Update the time_modified timestamp for a file's metadata."""
with self._db_lock:
try: try:
cursor = self.connection.cursor() cursor = self.connection.cursor()
cursor.execute( cursor.execute(
@@ -1149,6 +1163,7 @@ class API_folder_store:
tags: List[str] tags: List[str]
) -> None: ) -> None:
"""Save metadata and tags for a file in a single transaction.""" """Save metadata and tags for a file in a single transaction."""
with self._db_lock:
try: try:
abs_path = self._normalize_input_path(file_path) abs_path = self._normalize_input_path(file_path)
db_path = self._to_db_file_path(abs_path) db_path = self._to_db_file_path(abs_path)
@@ -1206,9 +1221,7 @@ class API_folder_store:
# 2. Save Tags # 2. Save Tags
# We assume tags list is complete and includes title if needed # We assume tags list is complete and includes title if needed
cursor.execute("DELETE FROM tag WHERE hash = ?", cursor.execute("DELETE FROM tag WHERE hash = ?", (file_hash, ))
(file_hash,
))
for tag in tags: for tag in tags:
tag = tag.strip() tag = tag.strip()
@@ -1218,8 +1231,7 @@ class API_folder_store:
INSERT OR IGNORE INTO tag (hash, tag) INSERT OR IGNORE INTO tag (hash, tag)
VALUES (?, ?) VALUES (?, ?)
""", """,
(file_hash, (file_hash, tag),
tag),
) )
self.connection.commit() self.connection.commit()
@@ -1230,8 +1242,7 @@ class API_folder_store:
except Exception as e: except Exception as e:
logger.error( logger.error(
f"[save_file_info] ❌ Error saving file info for {file_path}: {e}", f"[save_file_info] ❌ Error saving file info for {file_path}: {e}",
exc_info=True exc_info=True)
)
raise raise
def get_tags(self, file_hash: str) -> List[str]: def get_tags(self, file_hash: str) -> List[str]:
@@ -1346,6 +1357,7 @@ class API_folder_store:
@_db_retry() @_db_retry()
def add_tags(self, file_path: Path, tags: List[str]) -> None: def add_tags(self, file_path: Path, tags: List[str]) -> None:
"""Add tags to a file.""" """Add tags to a file."""
with self._db_lock:
try: try:
file_hash = self.get_or_create_file_entry(file_path) file_hash = self.get_or_create_file_entry(file_path)
cursor = self.connection.cursor() cursor = self.connection.cursor()
@@ -1413,6 +1425,7 @@ class API_folder_store:
@_db_retry() @_db_retry()
def remove_tags(self, file_path: Path, tags: List[str]) -> None: def remove_tags(self, file_path: Path, tags: List[str]) -> None:
"""Remove specific tags from a file.""" """Remove specific tags from a file."""
with self._db_lock:
try: try:
file_hash = self.get_or_create_file_entry(file_path) file_hash = self.get_or_create_file_entry(file_path)
cursor = self.connection.cursor() cursor = self.connection.cursor()
@@ -1439,6 +1452,7 @@ class API_folder_store:
@_db_retry() @_db_retry()
def add_tags_to_hash(self, file_hash: str, tags: List[str]) -> None: def add_tags_to_hash(self, file_hash: str, tags: List[str]) -> None:
"""Add tags to a file by hash.""" """Add tags to a file by hash."""
with self._db_lock:
try: try:
cursor = self.connection.cursor() cursor = self.connection.cursor()
@@ -1481,6 +1495,7 @@ class API_folder_store:
@_db_retry() @_db_retry()
def remove_tags_from_hash(self, file_hash: str, tags: List[str]) -> None: def remove_tags_from_hash(self, file_hash: str, tags: List[str]) -> None:
"""Remove specific tags from a file by hash.""" """Remove specific tags from a file by hash."""
with self._db_lock:
try: try:
cursor = self.connection.cursor() cursor = self.connection.cursor()
@@ -1514,6 +1529,7 @@ class API_folder_store:
Any] Any]
) -> None: ) -> None:
"""Update metadata for a file by hash.""" """Update metadata for a file by hash."""
with self._db_lock:
try: try:
cursor = self.connection.cursor() cursor = self.connection.cursor()
@@ -1566,6 +1582,7 @@ class API_folder_store:
related_file_path: Path to the related file related_file_path: Path to the related file
rel_type: Type of relationship ('king', 'alt', 'related') rel_type: Type of relationship ('king', 'alt', 'related')
""" """
with self._db_lock:
try: try:
str_path = str(file_path.resolve()) str_path = str(file_path.resolve())
str_related_path = str(related_file_path.resolve()) str_related_path = str(related_file_path.resolve())
@@ -1757,6 +1774,7 @@ class API_folder_store:
def set_note(self, file_path: Path, name: str, note: str) -> None: def set_note(self, file_path: Path, name: str, note: str) -> None:
"""Set a named note for a file.""" """Set a named note for a file."""
with self._db_lock:
try: try:
note_name = str(name or "").strip() note_name = str(name or "").strip()
if not note_name: if not note_name:
@@ -1784,6 +1802,7 @@ class API_folder_store:
def delete_note(self, file_hash: str, name: str) -> None: def delete_note(self, file_hash: str, name: str) -> None:
"""Delete a named note for a file by hash.""" """Delete a named note for a file by hash."""
with self._db_lock:
try: try:
note_name = str(name or "").strip() note_name = str(name or "").strip()
if not note_name: if not note_name:
@@ -1920,6 +1939,7 @@ class API_folder_store:
backlinks in other files so no file retains dangling references to the backlinks in other files so no file retains dangling references to the
deleted hash. deleted hash.
""" """
with self._db_lock:
try: try:
abs_path = self._normalize_input_path(file_path) abs_path = self._normalize_input_path(file_path)
str_path = self._to_db_file_path(abs_path) str_path = self._to_db_file_path(abs_path)
@@ -2065,6 +2085,7 @@ class API_folder_store:
def update_worker(self, worker_id: str, **kwargs) -> bool: def update_worker(self, worker_id: str, **kwargs) -> bool:
"""Update worker entry with given fields.""" """Update worker entry with given fields."""
with self._db_lock:
try: try:
allowed_fields = { allowed_fields = {
"status", "status",
@@ -2187,6 +2208,7 @@ class API_folder_store:
def delete_worker(self, worker_id: str) -> bool: def delete_worker(self, worker_id: str) -> bool:
"""Delete a worker entry.""" """Delete a worker entry."""
with self._db_lock:
try: try:
cursor = self.connection.cursor() cursor = self.connection.cursor()
cursor.execute("DELETE FROM worker WHERE worker_id = ?", cursor.execute("DELETE FROM worker WHERE worker_id = ?",
@@ -2294,6 +2316,7 @@ class API_folder_store:
"""Append text to a worker's stdout log and timeline.""" """Append text to a worker's stdout log and timeline."""
if not text: if not text:
return True return True
with self._db_lock:
try: try:
# Check if connection is valid # Check if connection is valid
if not self.connection: if not self.connection:

View File

@@ -7,6 +7,7 @@ import os
import shutil import shutil
import sys import sys
import time import time
from threading import RLock
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Protocol, TextIO from typing import Any, Callable, Dict, List, Optional, Protocol, TextIO
@@ -755,6 +756,7 @@ class PipelineLiveProgress:
def __init__(self, pipe_labels: List[str], *, enabled: bool = True) -> None: def __init__(self, pipe_labels: List[str], *, enabled: bool = True) -> None:
self._enabled = bool(enabled) self._enabled = bool(enabled)
self._pipe_labels = [str(x) for x in (pipe_labels or [])] self._pipe_labels = [str(x) for x in (pipe_labels or [])]
self._lock = RLock()
self._console: Optional[Console] = None self._console: Optional[Console] = None
self._live: Optional[Live] = None self._live: Optional[Live] = None
@@ -826,6 +828,7 @@ class PipelineLiveProgress:
the spinner without needing manual Live.update() calls. the spinner without needing manual Live.update() calls.
""" """
with self._lock:
pipe_progress = self._pipe_progress pipe_progress = self._pipe_progress
status = self._status status = self._status
transfers = self._transfers transfers = self._transfers
@@ -1029,6 +1032,8 @@ class PipelineLiveProgress:
return return
if not self._ensure_pipe(int(pipe_index)): if not self._ensure_pipe(int(pipe_index)):
return return
with self._lock:
prog = self._status prog = self._status
if prog is None: if prog is None:
return return
@@ -1061,6 +1066,10 @@ class PipelineLiveProgress:
pass pass
def clear_pipe_status_text(self, pipe_index: int) -> None: def clear_pipe_status_text(self, pipe_index: int) -> None:
if not self._enabled:
return
with self._lock:
prog = self._status prog = self._status
if prog is None: if prog is None:
return return
@@ -1095,6 +1104,31 @@ class PipelineLiveProgress:
pct = max(0, min(100, int(percent))) pct = max(0, min(100, int(percent)))
pipe_task = self._pipe_tasks[pidx] pipe_task = self._pipe_tasks[pidx]
pipe_progress.update(pipe_task, completed=pct, total=100, refresh=True) pipe_progress.update(pipe_task, completed=pct, total=100, refresh=True)
self._update_overall()
except Exception:
pass
def _update_overall(self) -> None:
"""Update the overall pipeline progress task."""
if self._overall is None or self._overall_task is None:
return
completed = 0
try:
# Count a pipe as completed if its 'done' count matches or exceeds the advertised total.
completed = sum(
1 for i in range(len(self._pipe_labels))
if self._pipe_done[i] >= max(1, self._pipe_totals[i])
)
except Exception:
completed = 0
try:
self._overall.update(
self._overall_task,
completed=min(completed, max(1, len(self._pipe_labels))),
description=f"Pipeline: {completed}/{len(self._pipe_labels)} pipes completed",
)
except Exception: except Exception:
pass pass
@@ -1108,6 +1142,7 @@ class PipelineLiveProgress:
if not self._ensure_pipe(int(pipe_index)): if not self._ensure_pipe(int(pipe_index)):
return return
with self._lock:
try: try:
pidx = int(pipe_index) pidx = int(pipe_index)
tot = max(1, int(total_steps)) tot = max(1, int(total_steps))
@@ -1287,6 +1322,8 @@ class PipelineLiveProgress:
except Exception: except Exception:
pass pass
self._update_overall()
labels: List[str] = [] labels: List[str] = []
if isinstance(items_preview, list) and items_preview: if isinstance(items_preview, list) and items_preview:
labels = [_pipeline_progress_item_label(x) for x in items_preview] labels = [_pipeline_progress_item_label(x) for x in items_preview]
@@ -1372,6 +1409,8 @@ class PipelineLiveProgress:
else: else:
pipe_progress.update(pipe_task, completed=done) pipe_progress.update(pipe_task, completed=done)
self._update_overall()
# Clear any status line now that it emitted. # Clear any status line now that it emitted.
try: try:
self.clear_pipe_status_text(pipe_index) self.clear_pipe_status_text(pipe_index)
@@ -1452,23 +1491,7 @@ class PipelineLiveProgress:
except Exception: except Exception:
pass pass
if self._overall_task is not None: self._update_overall()
completed = 0
try:
completed = sum(
1 for i in range(len(self._pipe_labels))
if self._pipe_done[i] >= max(1, self._pipe_totals[i])
)
except Exception:
completed = 0
overall.update(
self._overall_task,
completed=min(completed,
max(1,
len(self._pipe_labels))),
description=
f"Pipeline: {completed}/{len(self._pipe_labels)} pipes completed",
)
class PipelineStageContext: class PipelineStageContext:

View File

@@ -325,8 +325,11 @@ class HydrusNetwork(Store):
] ]
try: try:
# Compute file hash # Compute file hash (or use hint from kwargs to avoid redundant IO)
file_hash = kwargs.get("hash") or kwargs.get("file_hash")
if not file_hash:
file_hash = sha256_file(file_path) file_hash = sha256_file(file_path)
debug(f"{self._log_prefix()} file hash: {file_hash}") debug(f"{self._log_prefix()} file hash: {file_hash}")
# Use persistent client with session key # Use persistent client with session key

View File

@@ -369,11 +369,12 @@ class Add_File(Cmdlet):
# Many add-file flows don't emit intermediate items, so without steps the pipe can look "stuck". # Many add-file flows don't emit intermediate items, so without steps the pipe can look "stuck".
use_steps = False use_steps = False
steps_started = False steps_started = False
step2_done = False
step3_done = False
try: try:
ui, _ = progress.ui_and_pipe_index() ui, _ = progress.ui_and_pipe_index()
use_steps = (ui is not None) and (len(items_to_process) == 1) use_steps = (ui is not None) and (len(items_to_process) == 1)
if use_steps:
progress.begin_steps(5)
steps_started = True
except Exception: except Exception:
use_steps = False use_steps = False
@@ -545,10 +546,8 @@ class Add_File(Cmdlet):
temp_dir_to_cleanup: Optional[Path] = None temp_dir_to_cleanup: Optional[Path] = None
delete_after_item = delete_after delete_after_item = delete_after
try: try:
if use_steps and (not steps_started): if use_steps and steps_started:
progress.begin_steps(5)
progress.step("resolving source") progress.step("resolving source")
steps_started = True
media_path, file_hash, temp_dir_to_cleanup = self._resolve_source( media_path, file_hash, temp_dir_to_cleanup = self._resolve_source(
item, path_arg, pipe_obj, config, store_instance=storage_registry item, path_arg, pipe_obj, config, store_instance=storage_registry
@@ -560,32 +559,20 @@ class Add_File(Cmdlet):
failures += 1 failures += 1
continue continue
# Update pipe_obj with resolved path
pipe_obj.path = str(media_path)
# When using -path (filesystem export), allow all file types.
# When using -store (backend), restrict to SUPPORTED_MEDIA_EXTENSIONS.
allow_all_files = not (location and is_storage_backend_location)
if not self._validate_source(media_path, allow_all_extensions=allow_all_files):
failures += 1
continue
if use_steps and steps_started: if use_steps and steps_started:
if not file_hash:
progress.step("hashing file") progress.step("hashing file")
# Update pipe_obj with resolved path
pipe_obj.path = str(media_path)
# When using -path (filesystem export), allow all file types.
# When using -store (backend), restrict to SUPPORTED_MEDIA_EXTENSIONS.
allow_all_files = not (location and is_storage_backend_location)
if not self._validate_source(media_path, allow_all_extensions=allow_all_files):
failures += 1
continue
if use_steps and steps_started and (not step2_done):
progress.step("ingesting file") progress.step("ingesting file")
step2_done = True
# Update pipe_obj with resolved path
pipe_obj.path = str(media_path)
# When using -path (filesystem export), allow all file types.
# When using -store (backend), restrict to SUPPORTED_MEDIA_EXTENSIONS.
allow_all_files = not (location and is_storage_backend_location)
if not self._validate_source(media_path, allow_all_extensions=allow_all_files):
failures += 1
continue
if provider_name: if provider_name:
if str(provider_name).strip().lower() == "matrix": if str(provider_name).strip().lower() == "matrix":
@@ -690,9 +677,8 @@ class Add_File(Cmdlet):
seen: set[str] = set() seen: set[str] = set()
hashes = [h for h in hashes if not (h in seen or seen.add(h))] hashes = [h for h in hashes if not (h in seen or seen.add(h))]
if use_steps and steps_started and (not step3_done): if use_steps and steps_started:
progress.step("refreshing display") progress.step("refreshing display")
step3_done = True
refreshed_items = Add_File._try_emit_search_file_by_hashes( refreshed_items = Add_File._try_emit_search_file_by_hashes(
store=str(location), store=str(location),
@@ -700,6 +686,7 @@ class Add_File(Cmdlet):
config=config, config=config,
store_instance=storage_registry, store_instance=storage_registry,
) )
debug(f"[add-file] Internal refresh returned refreshed_items count={len(refreshed_items) if refreshed_items else 0}")
if not refreshed_items: if not refreshed_items:
# Fallback: at least show the add-file payloads as a display overlay # Fallback: at least show the add-file payloads as a display overlay
from SYS.result_table import ResultTable from SYS.result_table import ResultTable
@@ -756,7 +743,7 @@ class Add_File(Cmdlet):
from cmdlet.search_file import CMDLET as search_file_cmdlet from cmdlet.search_file import CMDLET as search_file_cmdlet
query = "hash:" + ",".join(hashes) query = "hash:" + ",".join(hashes)
args = ["-store", str(store), query] args = ["-store", str(store), "-internal-refresh", query]
debug(f'[add-file] Refresh: search-file -store {store} "{query}"') debug(f'[add-file] Refresh: search-file -store {store} "{query}"')
# Run search-file under a temporary stage context so its ctx.emit() calls # Run search-file under a temporary stage context so its ctx.emit() calls

View File

@@ -258,9 +258,6 @@ class search_file(Cmdlet):
try: try:
results_list: List[Dict[str, Any]] = [] results_list: List[Dict[str, Any]] = []
from SYS import result_table
importlib.reload(result_table)
from SYS.result_table import ResultTable from SYS.result_table import ResultTable
provider_text = str(provider_name or "").strip() provider_text = str(provider_name or "").strip()
@@ -453,8 +450,8 @@ class search_file(Cmdlet):
args_list = [str(arg) for arg in (args or [])] args_list = [str(arg) for arg in (args or [])]
refresh_mode = any( refresh_mode = any(
str(a).strip().lower() in {"--refresh", str(a).strip().lower() in {"--refresh", "-refresh", "-internal-refresh"}
"-refresh"} for a in args_list for a in args_list
) )
def _format_command_title(command: str, raw_args: List[str]) -> str: def _format_command_title(command: str, raw_args: List[str]) -> str:
@@ -470,7 +467,7 @@ class search_file(Cmdlet):
cleaned = [ cleaned = [
str(a) for a in (raw_args or []) str(a) for a in (raw_args or [])
if str(a).strip().lower() not in {"--refresh", "-refresh"} if str(a).strip().lower() not in {"--refresh", "-refresh", "-internal-refresh"}
] ]
if not cleaned: if not cleaned:
return command return command
@@ -626,6 +623,10 @@ class search_file(Cmdlet):
continue continue
if not library_root: if not library_root:
# Internal refreshes should not trigger config panels or stop progress.
if "-internal-refresh" in args_list:
return 1
from SYS import pipeline as ctx_mod from SYS import pipeline as ctx_mod
progress = None progress = None
if hasattr(ctx_mod, "get_pipeline_state"): if hasattr(ctx_mod, "get_pipeline_state"):
@@ -641,6 +642,7 @@ class search_file(Cmdlet):
# Use context manager to ensure database is always closed # Use context manager to ensure database is always closed
with API_folder_store(library_root) as db: with API_folder_store(library_root) as db:
try: try:
if "-internal-refresh" not in args_list:
db.insert_worker( db.insert_worker(
worker_id, worker_id,
"search-file", "search-file",
@@ -650,10 +652,6 @@ class search_file(Cmdlet):
) )
results_list = [] results_list = []
from SYS import result_table
import importlib
importlib.reload(result_table)
from SYS.result_table import ResultTable from SYS.result_table import ResultTable
table = ResultTable(command_title) table = ResultTable(command_title)
@@ -802,6 +800,16 @@ class search_file(Cmdlet):
if found_any: if found_any:
table.title = command_title table.title = command_title
# Add-file refresh quality-of-life: if exactly 1 item is being refreshed,
# show the detailed item panel instead of a single-row table.
if refresh_mode and len(results_list) == 1:
try:
from SYS.rich_display import render_item_details_panel
render_item_details_panel(results_list[0])
table._rendered_by_cmdlet = True
except Exception:
pass
if refresh_mode: if refresh_mode:
ctx.set_last_result_table_preserve_history( ctx.set_last_result_table_preserve_history(
table, table,
@@ -918,6 +926,15 @@ class search_file(Cmdlet):
table.title = command_title table.title = command_title
# If exactly 1 item is being refreshed, show the detailed item panel.
if refresh_mode and len(results_list) == 1:
try:
from SYS.rich_display import render_item_details_panel
render_item_details_panel(results_list[0])
table._rendered_by_cmdlet = True
except Exception:
pass
if refresh_mode: if refresh_mode:
ctx.set_last_result_table_preserve_history(table, results_list) ctx.set_last_result_table_preserve_history(table, results_list)
else: else: