Files
Medios-Macina/Store/Folder.py

2278 lines
98 KiB
Python
Raw Normal View History

2025-12-11 19:04:02 -08:00
from __future__ import annotations
import json
import re
import shutil
import sys
2026-01-16 01:47:00 -08:00
from fnmatch import fnmatch, translate
2025-12-11 19:04:02 -08:00
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from SYS.logger import debug, log
2026-01-11 00:52:54 -08:00
from SYS.utils import sha256_file, expand_path
2025-12-11 19:04:02 -08:00
2025-12-11 23:21:45 -08:00
from Store._base import Store
2025-12-11 19:04:02 -08:00
def _normalize_hash(value: Any) -> Optional[str]:
2025-12-29 17:05:03 -08:00
candidate = str(value or "").strip().lower()
2025-12-11 19:04:02 -08:00
if len(candidate) != 64:
return None
2025-12-29 17:05:03 -08:00
if any(ch not in "0123456789abcdef" for ch in candidate):
2025-12-11 19:04:02 -08:00
return None
return candidate
def _resolve_file_hash(db_hash: Optional[str], file_path: Path) -> Optional[str]:
normalized = _normalize_hash(db_hash) if db_hash else None
if normalized:
return normalized
return _normalize_hash(file_path.stem)
2026-01-16 01:47:00 -08:00
def _normalize_url_for_search(url: str) -> str:
value = str(url or "").strip()
value = re.sub(r"^[a-z][a-z0-9+.-]*://", "", value, flags=re.IGNORECASE)
value = re.sub(r"^www\.", "", value, flags=re.IGNORECASE)
return value.lower()
def _match_url_pattern(url: str, pattern: str) -> bool:
normalized_url = _normalize_url_for_search(url)
normalized_pattern = _normalize_url_for_search(pattern)
if not normalized_pattern:
return False
has_wildcards = any(ch in normalized_pattern for ch in ("*", "?"))
if has_wildcards:
return fnmatch(normalized_url, normalized_pattern)
normalized_url_no_slash = normalized_url.rstrip("/")
normalized_pattern_no_slash = normalized_pattern.rstrip("/")
if normalized_pattern_no_slash and normalized_pattern_no_slash == normalized_url_no_slash:
return True
return normalized_pattern in normalized_url
2025-12-11 23:21:45 -08:00
class Folder(Store):
2025-12-11 19:04:02 -08:00
""""""
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Track which locations have already been migrated to avoid repeated migrations
_migrated_locations = set()
2025-12-13 12:09:50 -08:00
# Cache scan results to avoid repeated full scans across repeated instantiations
_scan_cache: Dict[str,
Tuple[bool,
str,
Dict[str,
int]]] = {}
2025-12-13 00:18:30 -08:00
2026-01-11 03:24:49 -08:00
@classmethod
def config(cls) -> List[Dict[str, Any]]:
return [
{
"key": "NAME",
"label": "Store Name",
"default": "",
"required": True
},
{
"key": "PATH",
"label": "Folder Path",
"default": "",
"required": True
}
]
2025-12-29 17:05:03 -08:00
2025-12-13 00:18:30 -08:00
def __init__(
self,
location: Optional[str] = None,
name: Optional[str] = None,
*,
NAME: Optional[str] = None,
PATH: Optional[str] = None,
) -> None:
if name is None and NAME is not None:
name = str(NAME)
if location is None and PATH is not None:
location = str(PATH)
2026-01-02 02:28:59 -08:00
self._location = str(location) if location is not None else ""
2025-12-11 19:04:02 -08:00
self._name = name
2025-12-13 12:09:50 -08:00
# Scan status (set during init)
self.scan_ok: bool = True
self.scan_detail: str = ""
self.scan_stats: Dict[str,
int] = {}
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
if self._location:
try:
from API.folder import API_folder_store
2025-12-13 12:09:50 -08:00
from API.folder import LocalLibraryInitializer
2025-12-29 17:05:03 -08:00
2026-01-11 00:52:54 -08:00
location_path = expand_path(self._location)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Use context manager to ensure connection is properly closed
with API_folder_store(location_path) as db:
if db.connection:
db.connection.commit()
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Call migration and discovery at startup
Folder.migrate_location(self._location)
2025-12-13 12:09:50 -08:00
# Local library scan/index (one-time per location per process)
location_key = str(location_path)
cached = Folder._scan_cache.get(location_key)
if cached is None:
try:
2026-01-15 16:26:22 -08:00
debug(f"[folder] Initializing library scan for {location_path}...")
2025-12-13 12:09:50 -08:00
initializer = LocalLibraryInitializer(location_path)
stats = initializer.scan_and_index() or {}
2026-01-15 16:26:22 -08:00
debug(f"[folder] Scan complete. Stats: {stats}")
2025-12-29 17:05:03 -08:00
files_new = int(stats.get("files_new", 0) or 0)
sidecars = int(stats.get("sidecars_imported", 0) or 0)
total_db = int(stats.get("files_total_db", 0) or 0)
2025-12-13 12:09:50 -08:00
if files_new > 0 or sidecars > 0:
2025-12-29 17:05:03 -08:00
detail = f"New: {files_new}, Sidecars: {sidecars}" + (
f" (Total: {total_db})" if total_db else ""
)
2025-12-13 12:09:50 -08:00
else:
detail = "Up to date" + (
f" (Total: {total_db})" if total_db else ""
)
2025-12-13 12:09:50 -08:00
Folder._scan_cache[location_key] = (True, detail, dict(stats))
except Exception as exc:
Folder._scan_cache[location_key] = (
False,
f"Scan failed: {exc}",
{}
)
2025-12-13 12:09:50 -08:00
ok, detail, stats = Folder._scan_cache.get(location_key, (True, "", {}))
self.scan_ok = bool(ok)
self.scan_detail = str(detail or "")
self.scan_stats = dict(stats or {})
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(f"Failed to initialize database for '{name}': {exc}")
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
@classmethod
def migrate_location(cls, location: Optional[str]) -> None:
"""Migrate a location to hash-based storage (one-time operation, call explicitly at startup)."""
if not location:
return
2025-12-29 17:05:03 -08:00
2026-01-11 00:52:54 -08:00
location_path = expand_path(location)
2025-12-11 19:04:02 -08:00
location_str = str(location_path)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Only migrate once per location
if location_str in cls._migrated_locations:
return
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
cls._migrated_locations.add(location_str)
2025-12-13 12:09:50 -08:00
cls._migrate_to_hash_storage(location_path)
@classmethod
def _migrate_to_hash_storage(cls, location_path: Path) -> None:
2025-12-11 19:04:02 -08:00
"""Migrate existing files from filename-based to hash-based storage.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Checks for sidecars (.metadata, .tag) and imports them before renaming.
Also ensures all files have a title: tag.
"""
from API.folder import API_folder_store, read_sidecar, write_sidecar, find_sidecar
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
with API_folder_store(location_path) as db:
cursor = db.connection.cursor()
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# First pass: migrate filename-based files and add title tags
# Scan all files in the storage directory
for file_path in sorted(location_path.iterdir()):
if not file_path.is_file():
continue
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Skip database files and sidecars
2025-12-29 17:05:03 -08:00
if file_path.suffix in (".db", ".metadata", ".tag", "-shm", "-wal"):
2025-12-11 19:04:02 -08:00
continue
# Also skip if the file ends with -shm or -wal (SQLite journal files)
2025-12-29 17:05:03 -08:00
if file_path.name.endswith(("-shm", "-wal")):
2025-12-11 19:04:02 -08:00
continue
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Check if filename is already a hash (without extension)
2025-12-29 17:05:03 -08:00
if len(file_path.stem) == 64 and all(
c in "0123456789abcdef" for c in file_path.stem.lower()):
2025-12-11 19:04:02 -08:00
continue # Already migrated, will process in second pass
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
# Compute file hash
file_hash = sha256_file(file_path)
# Preserve extension in the hash-based filename
file_ext = file_path.suffix # e.g., '.mp4'
hash_filename = file_hash + file_ext if file_ext else file_hash
hash_path = location_path / hash_filename
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Check for sidecars and import them
sidecar_path = find_sidecar(file_path)
tags_to_add = []
url_to_add = []
has_title_tag = False
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
if sidecar_path and sidecar_path.exists():
try:
_, tags, url = read_sidecar(sidecar_path)
if tags:
tags_to_add = list(tags)
# Check if title tag exists
2025-12-29 17:05:03 -08:00
has_title_tag = any(
t.lower().startswith("title:")
for t in tags_to_add
2025-12-29 17:05:03 -08:00
)
2025-12-11 19:04:02 -08:00
if url:
url_to_add = list(url)
2025-12-29 17:05:03 -08:00
debug(
f"Found sidecar for {file_path.name}: {len(tags_to_add)} tags, {len(url_to_add)} url",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
# Delete the sidecar after importing
sidecar_path.unlink()
except Exception as exc:
2025-12-29 17:05:03 -08:00
debug(
f"Failed to read sidecar for {file_path.name}: {exc}",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
# Ensure there's a title tag (use original filename if not present)
if not has_title_tag:
tags_to_add.append(f"title:{file_path.name}")
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Rename file to hash if needed
if hash_path != file_path and not hash_path.exists():
2025-12-29 17:05:03 -08:00
debug(
f"Migrating: {file_path.name} -> {hash_filename}",
file=sys.stderr
2025-12-29 17:05:03 -08:00
)
2025-12-11 19:04:02 -08:00
file_path.rename(hash_path)
2025-12-13 12:09:50 -08:00
# Ensure DB points to the renamed path (update by hash).
try:
cursor.execute(
2026-01-02 02:28:59 -08:00
"UPDATE file SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
(db._to_db_file_path(hash_path),
file_hash),
2025-12-13 12:09:50 -08:00
)
except Exception:
pass
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Create or update database entry
db.get_or_create_file_entry(hash_path)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Save extension metadata
2025-12-29 17:05:03 -08:00
ext_clean = file_ext.lstrip(".") if file_ext else ""
db.save_metadata(
hash_path,
{
"hash": file_hash,
"ext": ext_clean,
"size": hash_path.stat().st_size,
},
)
2025-12-11 19:04:02 -08:00
# Add all tags (including title tag)
if tags_to_add:
db.save_tags(hash_path, tags_to_add)
2025-12-29 17:05:03 -08:00
debug(
f"Added {len(tags_to_add)} tags to {file_hash}",
file=sys.stderr
2025-12-29 17:05:03 -08:00
)
2025-12-11 19:04:02 -08:00
# Note: url would need a separate table if you want to store them
# For now, we're just noting them in debug
if url_to_add:
2025-12-29 17:05:03 -08:00
debug(
f"Imported {len(url_to_add)} url for {file_hash}: {url_to_add}",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(
f"Failed to migrate file {file_path.name}: {exc}",
file=sys.stderr
)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Second pass: ensure all files in database have a title: tag
db.connection.commit()
2025-12-29 17:05:03 -08:00
cursor.execute(
"""
2025-12-11 19:04:02 -08:00
SELECT f.hash, f.file_path
2026-01-02 02:28:59 -08:00
FROM file f
2025-12-11 19:04:02 -08:00
WHERE NOT EXISTS (
2026-01-02 02:28:59 -08:00
SELECT 1 FROM tag t WHERE t.hash = f.hash AND LOWER(t.tag) LIKE 'title:%'
2025-12-11 19:04:02 -08:00
)
2025-12-29 17:05:03 -08:00
"""
)
2025-12-11 19:04:02 -08:00
files_without_title = cursor.fetchall()
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
for file_hash, file_path_str in files_without_title:
try:
2025-12-24 22:15:54 -08:00
file_path = location_path / str(file_path_str)
2025-12-11 19:04:02 -08:00
if file_path.exists():
# Use the filename as the title
title_tag = f"title:{file_path.name}"
db.save_tags(file_path, [title_tag])
debug(
f"Added title tag to {file_path.name}",
file=sys.stderr
)
2025-12-11 19:04:02 -08:00
except Exception as exc:
2025-12-29 17:05:03 -08:00
debug(
f"Failed to add title tag to file {file_path_str}: {exc}",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
db.connection.commit()
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Third pass: discover files on disk that aren't in the database yet
# These are hash-named files that were added after initial indexing
2026-01-02 02:28:59 -08:00
cursor.execute("SELECT LOWER(hash) FROM file")
db_hashes = {row[0]
for row in cursor.fetchall()}
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
discovered = 0
for file_path in sorted(location_path.rglob("*")):
if file_path.is_file():
# Check if file name (without extension) is a 64-char hex hash
name_without_ext = file_path.stem
2025-12-29 17:05:03 -08:00
if len(name_without_ext) == 64 and all(
c in "0123456789abcdef"
for c in name_without_ext.lower()):
2025-12-11 19:04:02 -08:00
file_hash = name_without_ext.lower()
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Skip if already in DB
if file_hash in db_hashes:
continue
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
# Add file to DB (creates entry and auto-adds title: tag)
db.get_or_create_file_entry(file_path)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Save extension metadata
file_ext = file_path.suffix
2025-12-29 17:05:03 -08:00
ext_clean = file_ext.lstrip(".") if file_ext else ""
db.save_metadata(
file_path,
{
"hash": file_hash,
"ext": ext_clean,
"size": file_path.stat().st_size,
},
)
2025-12-11 19:04:02 -08:00
discovered += 1
except Exception as e:
2025-12-29 17:05:03 -08:00
debug(
f"Failed to discover file {file_path.name}: {e}",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
if discovered > 0:
2025-12-29 17:05:03 -08:00
debug(
f"Discovered and indexed {discovered} undiscovered files in {location_path.name}",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
db.connection.commit()
except Exception as exc:
debug(f"Migration to hash storage failed: {exc}", file=sys.stderr)
def location(self) -> str:
return self._location
def name(self) -> str:
return self._name
def add_file(self, file_path: Path, **kwargs: Any) -> str:
"""Add file to local folder storage with full metadata support.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Args:
file_path: Path to the file to add
move: If True, move file instead of copy (default: False)
2025-12-11 23:21:45 -08:00
tag: Optional list of tag values to add
2025-12-11 19:04:02 -08:00
url: Optional list of url to associate with the file
title: Optional title (will be added as 'title:value' tag)
2026-01-14 22:21:19 -08:00
file_hash: Optional pre-calculated SHA256 hash (skips re-hashing)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Returns:
File hash (SHA256 hex string) as identifier
"""
move_file = bool(kwargs.get("move"))
2025-12-11 23:21:45 -08:00
tag_list = kwargs.get("tag", [])
2025-12-11 19:04:02 -08:00
url = kwargs.get("url", [])
title = kwargs.get("title")
2026-01-14 22:21:19 -08:00
file_hash = kwargs.get("file_hash")
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Extract title from tags if not explicitly provided
if not title:
2025-12-11 23:21:45 -08:00
for candidate in tag_list:
if isinstance(candidate,
str) and candidate.lower().startswith("title:"):
2025-12-11 23:21:45 -08:00
title = candidate.split(":", 1)[1].strip()
2025-12-11 19:04:02 -08:00
break
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Fallback to filename if no title
if not title:
title = file_path.name
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Ensure title is in tags
title_tag = f"title:{title}"
if not any(str(candidate).lower().startswith("title:")
for candidate in tag_list):
2025-12-11 23:21:45 -08:00
tag_list = [title_tag] + list(tag_list)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
2026-01-14 22:21:19 -08:00
if not file_hash or len(str(file_hash)) != 64:
debug(f"[folder] Re-hashing file: {file_path}", file=sys.stderr)
file_hash = sha256_file(file_path)
2025-12-11 19:04:02 -08:00
debug(f"File hash: {file_hash}", file=sys.stderr)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Preserve extension in the stored filename
file_ext = file_path.suffix # e.g., '.mp4'
save_filename = file_hash + file_ext if file_ext else file_hash
save_file = Path(self._location) / save_filename
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Check if file already exists
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with API_folder_store(Path(self._location)) as db:
existing_path = db.search_hash(file_hash)
if existing_path and existing_path.exists():
log(
f"✓ File already in local storage: {existing_path}",
file=sys.stderr,
)
# Still add tags and url if provided
2025-12-11 23:21:45 -08:00
if tag_list:
self.add_tag(file_hash, tag_list)
2025-12-11 19:04:02 -08:00
if url:
self.add_url(file_hash, url)
return file_hash
2025-12-19 15:20:08 -08:00
# Move or copy file (with progress bar on actual byte transfer).
# Note: a same-volume move may be a fast rename and won't show progress.
def _copy_with_progress(src: Path, dst: Path, *, label: str) -> None:
from SYS.models import ProgressFileReader
2025-12-19 15:20:08 -08:00
total_bytes = None
try:
total_bytes = int(src.stat().st_size)
except Exception:
total_bytes = None
with src.open("rb") as r, dst.open("wb") as w:
reader = ProgressFileReader(r, total_bytes=total_bytes, label=label)
while True:
chunk = reader.read(1024 * 1024)
if not chunk:
break
w.write(chunk)
# Preserve file metadata similar to shutil.copy2
try:
shutil.copystat(str(src), str(dst))
except Exception:
pass
2025-12-11 19:04:02 -08:00
if move_file:
2025-12-19 15:20:08 -08:00
# Prefer native move; fall back to copy+delete with progress on failure.
try:
shutil.move(str(file_path), str(save_file))
debug(f"Local move: {save_file}", file=sys.stderr)
2025-12-23 16:36:39 -08:00
# After a move, the original path no longer exists; use destination for subsequent ops.
file_path = save_file
2025-12-19 15:20:08 -08:00
except Exception:
_copy_with_progress(
file_path,
save_file,
label=f"folder:{self._name} move"
)
2025-12-19 15:20:08 -08:00
try:
file_path.unlink(missing_ok=True) # type: ignore[arg-type]
except Exception:
try:
if file_path.exists():
file_path.unlink()
except Exception:
pass
debug(f"Local move (copy+delete): {save_file}", file=sys.stderr)
2025-12-23 16:36:39 -08:00
file_path = save_file
2025-12-11 19:04:02 -08:00
else:
_copy_with_progress(
file_path,
save_file,
label=f"folder:{self._name} copy"
)
2025-12-11 19:04:02 -08:00
debug(f"Local copy: {save_file}", file=sys.stderr)
2025-12-14 00:53:52 -08:00
# Best-effort: capture duration for media
duration_value: float | None = None
try:
from SYS.utils import ffprobe
2025-12-29 17:05:03 -08:00
2025-12-14 00:53:52 -08:00
probe = ffprobe(str(save_file))
duration = probe.get("duration")
if isinstance(duration, (int, float)) and duration > 0:
duration_value = float(duration)
except Exception:
duration_value = None
2025-12-29 17:05:03 -08:00
2026-01-02 02:28:59 -08:00
# Save to database (metadata + tag/url updates share one connection)
2025-12-11 19:04:02 -08:00
with API_folder_store(Path(self._location)) as db:
2026-01-02 02:28:59 -08:00
conn = getattr(db, "connection", None)
if conn is None:
raise RuntimeError("Folder store DB connection unavailable")
cursor = conn.cursor()
debug(
f"[Folder.add_file] saving metadata for hash {file_hash}",
file=sys.stderr,
)
2025-12-29 17:05:03 -08:00
ext_clean = file_ext.lstrip(".") if file_ext else ""
db.save_metadata(
save_file,
{
"hash": file_hash,
"ext": ext_clean,
"size": save_file.stat().st_size,
"duration": duration_value,
},
)
2026-01-02 02:28:59 -08:00
debug(
f"[Folder.add_file] metadata stored for hash {file_hash}",
file=sys.stderr,
)
2025-12-29 17:05:03 -08:00
2026-01-02 02:28:59 -08:00
if tag_list:
try:
debug(
f"[Folder.add_file] merging {len(tag_list)} tags for {file_hash}",
file=sys.stderr,
)
from SYS.metadata import compute_namespaced_tag_overwrite
existing_tags = [
t for t in (db.get_tags(file_hash) or [])
if isinstance(t, str) and t.strip()
]
_to_remove, _to_add, merged = compute_namespaced_tag_overwrite(
existing_tags, tag_list or []
)
if _to_remove or _to_add:
cursor.execute("DELETE FROM tag WHERE hash = ?",
(file_hash,))
for t in merged:
tag_val = str(t).strip().lower()
if tag_val:
cursor.execute(
"INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?)",
(file_hash, tag_val),
)
conn.commit()
debug(
f"[Folder.add_file] tags rewritten for {file_hash}",
file=sys.stderr,
)
try:
db._update_metadata_modified_time(file_hash)
except Exception:
pass
except Exception as exc:
debug(f"Local DB tag merge failed: {exc}", file=sys.stderr)
if url:
try:
from SYS.metadata import normalize_urls
2025-12-29 17:05:03 -08:00
2026-01-02 02:28:59 -08:00
existing_meta = db.get_metadata(file_hash) or {}
existing_urls = normalize_urls(existing_meta.get("url"))
incoming_urls = normalize_urls(url)
2026-01-12 04:05:52 -08:00
debug(
f"[Folder.add_file] merging {len(incoming_urls)} URLs for {file_hash}: {incoming_urls}",
file=sys.stderr,
)
2026-01-02 02:28:59 -08:00
changed = False
for entry in list(incoming_urls or []):
if not entry:
continue
if entry not in existing_urls:
existing_urls.append(entry)
changed = True
if changed:
db.update_metadata_by_hash(
file_hash,
{"url": existing_urls},
)
debug(
2026-01-12 04:05:52 -08:00
f"[Folder.add_file] URLs merged for {file_hash}: {existing_urls}",
2026-01-02 02:28:59 -08:00
file=sys.stderr,
)
except Exception as exc:
debug(f"Local DB URL merge failed: {exc}", file=sys.stderr)
2025-12-29 17:05:03 -08:00
2025-12-17 03:16:41 -08:00
##log(f"✓ Added to local storage: {save_file.name}", file=sys.stderr)
2025-12-11 19:04:02 -08:00
return file_hash
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
except Exception as exc:
log(f"❌ Local storage failed: {exc}", file=sys.stderr)
raise
2025-12-11 23:21:45 -08:00
def search(self, query: str, **kwargs: Any) -> list[Dict[str, Any]]:
2025-12-11 19:04:02 -08:00
"""Search local database for files by title tag or filename."""
from fnmatch import fnmatch
from API.folder import DatabaseAPI
2025-12-23 16:36:39 -08:00
import unicodedata
2025-12-11 19:04:02 -08:00
limit = kwargs.get("limit")
try:
limit = int(limit) if limit is not None else None
except (TypeError, ValueError):
limit = None
if isinstance(limit, int) and limit <= 0:
limit = None
query = query.lower()
query_lower = query # Ensure query_lower is defined for all code paths
2025-12-20 23:57:44 -08:00
2025-12-23 16:36:39 -08:00
def _normalize_namespace_text(text: str, *, allow_wildcards: bool) -> str:
"""Normalize tag namespace values for consistent matching.
Removes control/format chars (e.g. zero-width spaces) that frequently appear in scraped tags,
collapses whitespace, and lowercases.
"""
s = str(text or "")
# Normalize newlines/tabs/etc to spaces early.
s = s.replace("\r", " ").replace("\n", " ").replace("\t", " ")
# Drop control / format chars (Cc/Cf) while preserving wildcard tokens when requested.
cleaned_chars: list[str] = []
for ch in s:
if allow_wildcards and ch in {"*",
"?"}:
2025-12-23 16:36:39 -08:00
cleaned_chars.append(ch)
continue
cat = unicodedata.category(ch)
if cat in {"Cc",
"Cf"}:
2025-12-23 16:36:39 -08:00
continue
cleaned_chars.append(ch)
s = "".join(cleaned_chars)
# Collapse any remaining unicode whitespace runs.
s = " ".join(s.split())
return s.strip().lower()
2025-12-20 23:57:44 -08:00
def _normalize_ext_filter(value: str) -> str:
2025-12-29 17:05:03 -08:00
v = str(value or "").strip().lower().lstrip(".")
2025-12-20 23:57:44 -08:00
v = "".join(ch for ch in v if ch.isalnum())
return v
def _extract_system_filetype_ext(text: str) -> Optional[str]:
# Match: system:filetype = png (allow optional '=' and flexible spaces)
m = re.search(r"\bsystem:filetype\s*(?:=\s*)?([^\s,]+)", text)
if not m:
m = re.search(r"\bsystem:filetype\s*=\s*([^\s,]+)", text)
if not m:
return None
return _normalize_ext_filter(m.group(1)) or None
# Support `ext:<value>` and Hydrus-style `system:filetype = <value>` anywhere
# in the query (space or comma separated).
ext_filter: Optional[str] = None
try:
sys_ext = _extract_system_filetype_ext(query_lower)
if sys_ext:
ext_filter = sys_ext
query_lower = re.sub(
r"\s*\bsystem:filetype\s*(?:=\s*)?[^\s,]+",
" ",
query_lower
)
2025-12-29 17:05:03 -08:00
query_lower = re.sub(r"\s{2,}", " ", query_lower).strip().strip(",")
2025-12-20 23:57:44 -08:00
query = query_lower
m = re.search(r"\bext:([^\s,]+)", query_lower)
if not m:
m = re.search(r"\bextension:([^\s,]+)", query_lower)
if m:
ext_filter = _normalize_ext_filter(m.group(1)) or None
query_lower = re.sub(
r"\s*\b(?:ext|extension):[^\s,]+",
" ",
query_lower
)
2025-12-29 17:05:03 -08:00
query_lower = re.sub(r"\s{2,}", " ", query_lower).strip().strip(",")
2025-12-20 23:57:44 -08:00
query = query_lower
except Exception:
ext_filter = None
match_all = query == "*" or (not query and bool(ext_filter))
2025-12-11 19:04:02 -08:00
results = []
2026-01-11 00:52:54 -08:00
search_dir = expand_path(self._location)
2026-01-16 01:47:00 -08:00
backend_label = str(
getattr(self, "_name", "") or getattr(self, "NAME", "") or "folder"
)
debug(
f"[folder:{backend_label}] search start: query={query} limit={limit} root={search_dir}"
)
2025-12-11 19:04:02 -08:00
2025-12-14 00:53:52 -08:00
def _url_like_pattern(value: str) -> str:
# Interpret user patterns as substring matches (with optional glob wildcards).
v = (value or "").strip().lower()
if not v or v == "*":
return "%"
v = v.replace("%", "\\%").replace("_", "\\_")
v = v.replace("*", "%").replace("?", "_")
if "%" not in v and "_" not in v:
return f"%{v}%"
if not v.startswith("%"):
v = "%" + v
if not v.endswith("%"):
v = v + "%"
return v
2025-12-24 17:58:57 -08:00
def _like_pattern(term: str) -> str:
# Convert glob-like tokens to SQL LIKE wildcards.
2025-12-29 17:05:03 -08:00
return str(term or "").replace("*", "%").replace("?", "_")
2025-12-24 17:58:57 -08:00
2025-12-29 17:05:03 -08:00
tokens = [t.strip() for t in query.split(",") if t.strip()]
2025-12-11 19:04:02 -08:00
if not match_all and len(tokens) == 1 and _normalize_hash(query):
debug("Hash queries require 'hash:' prefix for local search")
return results
if not match_all and _normalize_hash(query):
debug("Hash queries require 'hash:' prefix for local search")
return results
2025-12-29 17:05:03 -08:00
def _create_entry(
file_path: Path,
tags: list[str],
size_bytes: int | None,
db_hash: Optional[str]
) -> dict[str,
Any]:
2025-12-11 19:04:02 -08:00
path_str = str(file_path)
# Get title from tags if available, otherwise use hash as fallback
title = next(
(t.split(":",
1)[1] for t in tags if t.lower().startswith("title:")),
None
)
2025-12-11 19:04:02 -08:00
if not title:
# Fallback to hash if no title tag exists
hash_value = _resolve_file_hash(db_hash, file_path)
title = hash_value if hash_value else file_path.stem
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Extract extension from file path
2025-12-29 17:05:03 -08:00
ext = file_path.suffix.lstrip(".")
2025-12-11 19:04:02 -08:00
if not ext:
# Fallback: try to extract from title (original filename might be in title)
title_path = Path(title)
2025-12-29 17:05:03 -08:00
ext = title_path.suffix.lstrip(".")
2025-12-11 19:04:02 -08:00
# Build clean entry with only necessary fields
hash_value = _resolve_file_hash(db_hash, file_path)
entry = {
"title": title,
"ext": ext,
"path": path_str,
"target": path_str,
"store": self._name,
"size": size_bytes,
"hash": hash_value,
"tag": tags,
}
return entry
try:
if not search_dir.exists():
debug(f"Search directory does not exist: {search_dir}")
return results
try:
with DatabaseAPI(search_dir) as api:
2025-12-20 23:57:44 -08:00
ext_hashes: set[str] | None = None
if ext_filter:
# Fetch a bounded set of hashes to intersect with other filters.
ext_fetch_limit = (limit or 45) * 50
ext_hashes = api.get_file_hashes_by_ext(
ext_filter,
limit=ext_fetch_limit
)
2025-12-20 23:57:44 -08:00
# ext-only search: query is empty (or coerced to match_all above).
if ext_filter and (not query_lower or query_lower == "*"):
rows = api.get_files_by_ext(ext_filter, limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-20 23:57:44 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
results.append(entry)
if limit is not None and len(results) >= limit:
return results
2025-12-29 17:05:03 -08:00
backend_label = str(
getattr(self,
"_name",
"") or getattr(self,
"NAME",
"") or "folder"
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
debug(f"[folder:{backend_label}] {len(results)} result(s)")
return results
2025-12-11 19:04:02 -08:00
if tokens and len(tokens) > 1:
2025-12-14 00:53:52 -08:00
url_fetch_limit = (limit or 45) * 50
2025-12-11 19:04:02 -08:00
def _ids_for_token(token: str) -> set[int]:
token = token.strip()
if not token:
return set()
2025-12-29 17:05:03 -08:00
if ":" in token and not token.startswith(":"):
namespace, pattern = token.split(":", 1)
2025-12-11 19:04:02 -08:00
namespace = namespace.strip().lower()
pattern = pattern.strip().lower()
2025-12-29 17:05:03 -08:00
if namespace == "hash":
2025-12-11 19:04:02 -08:00
normalized_hash = _normalize_hash(pattern)
if not normalized_hash:
return set()
h = api.get_file_hash_by_hash(normalized_hash)
return {h} if h else set()
2025-12-29 17:05:03 -08:00
if namespace == "url":
if not pattern or pattern == "*":
return api.get_file_hashes_with_any_url(
limit=url_fetch_limit
)
return api.get_file_hashes_by_url_like(
_url_like_pattern(pattern),
limit=url_fetch_limit
2025-12-29 17:05:03 -08:00
)
2025-12-14 00:53:52 -08:00
2025-12-29 17:05:03 -08:00
if namespace == "system":
2025-12-20 23:57:44 -08:00
# Hydrus-compatible query: system:filetype = png
m_ft = re.match(
r"^filetype\s*(?:=\s*)?(.+)$",
pattern
)
2025-12-20 23:57:44 -08:00
if m_ft:
normalized_ext = _normalize_ext_filter(
m_ft.group(1)
)
2025-12-20 23:57:44 -08:00
if not normalized_ext:
return set()
2025-12-29 17:05:03 -08:00
return api.get_file_hashes_by_ext(
normalized_ext,
limit=url_fetch_limit
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
return set()
if namespace in {"ext",
"extension"}:
2025-12-20 23:57:44 -08:00
normalized_ext = _normalize_ext_filter(pattern)
if not normalized_ext:
return set()
2025-12-29 17:05:03 -08:00
return api.get_file_hashes_by_ext(
normalized_ext,
limit=url_fetch_limit
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
2025-12-29 17:05:03 -08:00
if namespace == "store":
if pattern not in {"local",
"file",
"filesystem"}:
2025-12-11 19:04:02 -08:00
return set()
return api.get_all_file_hashes()
query_pattern = f"{namespace}:%"
tag_rows = api.get_file_hashes_by_tag_pattern(
query_pattern
)
2025-12-11 19:04:02 -08:00
matched: set[str] = set()
for file_hash, tag_val in tag_rows:
if not tag_val:
continue
tag_lower = str(tag_val).lower()
if not tag_lower.startswith(f"{namespace}:"):
continue
2025-12-29 17:05:03 -08:00
value = _normalize_namespace_text(
tag_lower[len(namespace) + 1:],
allow_wildcards=False
)
pat = _normalize_namespace_text(
pattern,
allow_wildcards=True
2025-12-29 17:05:03 -08:00
)
2025-12-23 16:36:39 -08:00
if fnmatch(value, pat):
2025-12-11 19:04:02 -08:00
matched.add(file_hash)
return matched
term = token.lower()
like_pattern = f"%{_like_pattern(term)}%"
2025-12-24 17:58:57 -08:00
# Unqualified token: match file path, title: tags, and non-namespaced tags.
# Do NOT match other namespaces by default (e.g., artist:men at work).
hashes = set(
api.get_file_hashes_by_path_pattern(like_pattern)
or set()
)
2025-12-24 17:58:57 -08:00
try:
2025-12-29 17:05:03 -08:00
title_rows = api.get_files_by_namespace_pattern(
f"title:{like_pattern}",
url_fetch_limit
2025-12-29 17:05:03 -08:00
)
hashes.update(
{
row[0]
for row in (title_rows or []) if row and row[0]
}
2025-12-29 17:05:03 -08:00
)
2025-12-24 17:58:57 -08:00
except Exception:
pass
try:
2025-12-29 17:05:03 -08:00
simple_rows = api.get_files_by_simple_tag_pattern(
like_pattern,
url_fetch_limit
2025-12-29 17:05:03 -08:00
)
hashes.update(
{
row[0]
for row in (simple_rows or []) if row and row[0]
}
2025-12-29 17:05:03 -08:00
)
2025-12-24 17:58:57 -08:00
except Exception:
pass
2025-12-11 19:04:02 -08:00
return hashes
try:
matching_hashes: set[str] | None = None
for token in tokens:
hashes = _ids_for_token(token)
2025-12-29 17:05:03 -08:00
matching_hashes = (
hashes if matching_hashes is None else
matching_hashes & hashes
2025-12-29 17:05:03 -08:00
)
2025-12-11 19:04:02 -08:00
if not matching_hashes:
return results
2025-12-20 23:57:44 -08:00
if ext_hashes is not None:
matching_hashes = (
matching_hashes or set()
) & ext_hashes
2025-12-20 23:57:44 -08:00
if not matching_hashes:
return results
2025-12-11 19:04:02 -08:00
if not matching_hashes:
return results
rows = api.get_file_metadata(matching_hashes, limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-11 19:04:02 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
2025-12-11 19:04:02 -08:00
results.append(entry)
if limit is not None and len(results) >= limit:
return results
return results
except Exception as exc:
log(f"⚠️ AND search failed: {exc}", file=sys.stderr)
debug(f"AND search exception details: {exc}")
return []
if ":" in query and not query.startswith(":"):
namespace, pattern = query.split(":", 1)
namespace = namespace.strip().lower()
pattern = pattern.strip().lower()
2026-01-16 01:47:00 -08:00
debug(f"[folder:{backend_label}] namespace search: {namespace}:{pattern}")
2025-12-11 19:04:02 -08:00
if namespace == "hash":
normalized_hash = _normalize_hash(pattern)
if not normalized_hash:
return results
h = api.get_file_hash_by_hash(normalized_hash)
hashes = {h} if h else set()
rows = api.get_file_metadata(hashes, limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-11 19:04:02 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
2025-12-11 19:04:02 -08:00
results.append(entry)
if limit is not None and len(results) >= limit:
return results
return results
2025-12-14 00:53:52 -08:00
if namespace == "url":
2026-01-16 01:47:00 -08:00
pattern_hint = kwargs.get("pattern_hint")
def _parse_url_value(raw: Any) -> list[str]:
if raw is None:
return []
if isinstance(raw, list):
return [str(u).strip() for u in raw if str(u).strip()]
if isinstance(raw, str):
text = raw.strip()
if not text:
return []
try:
parsed = json.loads(text)
if isinstance(parsed, list):
return [
str(u).strip()
for u in parsed
if str(u).strip()
]
except Exception:
pass
return [text]
return []
def _matches_pattern(url_list: list[str]) -> bool:
if not pattern_hint:
return True
for candidate_url in url_list:
if _match_url_pattern(candidate_url, pattern_hint):
return True
return False
2025-12-14 00:53:52 -08:00
if not pattern or pattern == "*":
2026-01-16 01:47:00 -08:00
debug(f"[folder:{backend_label}] url search: any-url (limit={limit})")
2025-12-14 00:53:52 -08:00
rows = api.get_files_with_any_url(limit)
else:
2026-01-16 01:47:00 -08:00
debug(
f"[folder:{backend_label}] url search: like={pattern} (limit={limit})"
)
rows = api.get_files_by_url_like(
_url_like_pattern(pattern),
limit
)
2026-01-16 01:47:00 -08:00
for file_hash, file_path_str, size_bytes, ext, url_raw in rows:
2025-12-14 00:53:52 -08:00
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-14 00:53:52 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
2026-01-16 01:47:00 -08:00
urls = _parse_url_value(url_raw)
if not urls or not _matches_pattern(urls):
continue
2025-12-14 00:53:52 -08:00
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2026-01-16 01:47:00 -08:00
entry["urls"] = urls
2025-12-14 00:53:52 -08:00
results.append(entry)
if limit is not None and len(results) >= limit:
return results
return results
2025-12-20 23:57:44 -08:00
if namespace == "system":
# Hydrus-compatible query: system:filetype = png
m_ft = re.match(r"^filetype\s*(?:=\s*)?(.+)$", pattern)
if m_ft:
normalized_ext = _normalize_ext_filter(m_ft.group(1))
if not normalized_ext:
return results
rows = api.get_files_by_ext(normalized_ext, limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-20 23:57:44 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
results.append(entry)
if limit is not None and len(results) >= limit:
return results
return results
if namespace in {"ext",
"extension"}:
2025-12-20 23:57:44 -08:00
normalized_ext = _normalize_ext_filter(pattern)
if not normalized_ext:
return results
rows = api.get_files_by_ext(normalized_ext, limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-20 23:57:44 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
results.append(entry)
if limit is not None and len(results) >= limit:
return results
return results
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
query_pattern = f"{namespace}:%"
rows = api.get_files_by_namespace_pattern(query_pattern, limit)
debug(f"Found {len(rows)} potential matches in DB")
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-29 17:05:03 -08:00
tags = api.get_tags_by_namespace_and_file(
file_hash,
query_pattern
)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
for tag in tags:
tag_lower = tag.lower()
if tag_lower.startswith(f"{namespace}:"):
2025-12-29 17:05:03 -08:00
value = _normalize_namespace_text(
tag_lower[len(namespace) + 1:],
allow_wildcards=False
)
pat = _normalize_namespace_text(
pattern,
allow_wildcards=True
2025-12-29 17:05:03 -08:00
)
2025-12-23 16:36:39 -08:00
if fnmatch(value, pat):
2025-12-20 23:57:44 -08:00
if ext_hashes is not None and file_hash not in ext_hashes:
break
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-11 19:04:02 -08:00
if file_path.exists():
if size_bytes is None:
size_bytes = file_path.stat().st_size
all_tags = api.get_tags_for_file(file_hash)
2025-12-29 17:05:03 -08:00
entry = _create_entry(
file_path,
all_tags,
size_bytes,
file_hash
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
try:
db_ext = str(ext
or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
2025-12-11 19:04:02 -08:00
results.append(entry)
else:
debug(f"File missing on disk: {file_path}")
break
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
if limit is not None and len(results) >= limit:
return results
elif not match_all:
2025-12-24 17:58:57 -08:00
# Default (unqualified) search: AND semantics across terms.
# Each term must match at least one of:
# - file path (filename)
# - title: namespace tag
# - non-namespaced tag
# Other namespaces (artist:, series:, etc.) are excluded unless explicitly queried.
2025-12-29 17:05:03 -08:00
terms = [
t.strip() for t in query_lower.replace(",", " ").split()
if t.strip()
2025-12-29 17:05:03 -08:00
]
2025-12-11 19:04:02 -08:00
if not terms:
terms = [query_lower]
2025-12-14 00:53:52 -08:00
2025-12-11 19:04:02 -08:00
fetch_limit = (limit or 45) * 50
2025-12-24 17:58:57 -08:00
matching_hashes: Optional[set[str]] = None
2025-12-14 00:53:52 -08:00
for term in terms:
2025-12-24 17:58:57 -08:00
if not term:
2025-12-14 00:53:52 -08:00
continue
2025-12-24 17:58:57 -08:00
like_term = _like_pattern(term)
like_pattern = f"%{like_term}%"
term_hashes: set[str] = set()
try:
2025-12-29 17:05:03 -08:00
term_hashes.update(
api.get_file_hashes_by_path_pattern(like_pattern)
)
2025-12-24 17:58:57 -08:00
except Exception:
pass
try:
2025-12-29 17:05:03 -08:00
title_rows = api.get_files_by_namespace_pattern(
f"title:{like_pattern}",
fetch_limit
2025-12-29 17:05:03 -08:00
)
term_hashes.update(
{
row[0]
for row in (title_rows or []) if row and row[0]
}
2025-12-29 17:05:03 -08:00
)
2025-12-24 17:58:57 -08:00
except Exception:
pass
try:
2025-12-29 17:05:03 -08:00
simple_rows = api.get_files_by_simple_tag_pattern(
like_pattern,
fetch_limit
2025-12-29 17:05:03 -08:00
)
term_hashes.update(
{
row[0]
for row in (simple_rows or []) if row and row[0]
}
2025-12-29 17:05:03 -08:00
)
2025-12-24 17:58:57 -08:00
except Exception:
pass
if ext_hashes is not None:
term_hashes &= ext_hashes
2025-12-29 17:05:03 -08:00
matching_hashes = (
term_hashes if matching_hashes is None else
(matching_hashes & term_hashes)
2025-12-29 17:05:03 -08:00
)
2025-12-24 17:58:57 -08:00
if not matching_hashes:
return results
if not matching_hashes:
return results
rows = api.get_file_metadata(set(matching_hashes), limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
2025-12-11 19:04:02 -08:00
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-14 00:53:52 -08:00
if not file_path.exists():
2025-12-11 19:04:02 -08:00
continue
2025-12-14 00:53:52 -08:00
if size_bytes is None:
try:
2025-12-11 19:04:02 -08:00
size_bytes = file_path.stat().st_size
2025-12-14 00:53:52 -08:00
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry_obj = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-24 17:58:57 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-24 17:58:57 -08:00
if db_ext:
entry_obj["ext"] = db_ext
except Exception:
pass
2025-12-14 00:53:52 -08:00
results.append(entry_obj)
if limit is not None and len(results) >= limit:
break
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
else:
rows = api.get_all_files(limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if file_path_str:
2025-12-20 23:57:44 -08:00
if ext_hashes is not None and file_hash not in ext_hashes:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-11 19:04:02 -08:00
if file_path.exists():
if size_bytes is None:
size_bytes = file_path.stat().st_size
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
2025-12-11 19:04:02 -08:00
results.append(entry)
2025-12-29 17:05:03 -08:00
backend_label = str(
getattr(self,
"_name",
"") or getattr(self,
"NAME",
"") or "folder"
2025-12-29 17:05:03 -08:00
)
2025-12-14 00:53:52 -08:00
debug(f"[folder:{backend_label}] {len(results)} result(s)")
2025-12-11 19:04:02 -08:00
return results
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
except Exception as e:
log(f"⚠️ Database search failed: {e}", file=sys.stderr)
debug(f"DB search exception details: {e}")
return []
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
except Exception as exc:
log(f"❌ Local search failed: {exc}", file=sys.stderr)
raise
def _resolve_library_root(self,
file_path: Path,
config: Dict[str,
Any]) -> Optional[Path]:
2025-12-11 19:04:02 -08:00
"""Return the library root containing medios-macina.db.
Prefer the store's configured location, then config override, then walk parents
of the file path to find a directory with medios-macina.db."""
candidates: list[Path] = []
if self._location:
2026-01-11 00:52:54 -08:00
candidates.append(expand_path(self._location))
2025-12-11 19:04:02 -08:00
cfg_root = get_local_storage_path(config) if config else None
if cfg_root:
2026-01-11 00:52:54 -08:00
candidates.append(expand_path(cfg_root))
2025-12-11 19:04:02 -08:00
for root in candidates:
db_path = root / "medios-macina.db"
if db_path.exists():
return root
try:
for parent in [file_path] + list(file_path.parents):
db_path = parent / "medios-macina.db"
if db_path.exists():
return parent
except Exception:
pass
return None
def get_file(self, file_hash: str, **kwargs: Any) -> Optional[Path]:
"""Retrieve file by hash, returning path to the file.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Args:
file_hash: SHA256 hash of the file (64-char hex string)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Returns:
Path to the file or None if not found
"""
try:
# Normalize the hash
normalized_hash = _normalize_hash(file_hash)
if not normalized_hash:
return None
2025-12-29 17:05:03 -08:00
2026-01-11 00:52:54 -08:00
search_dir = expand_path(self._location)
2025-12-11 19:04:02 -08:00
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with API_folder_store(search_dir) as db:
# Search for file by hash
file_path = db.search_hash(normalized_hash)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
if file_path and file_path.exists():
return file_path
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
return None
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(f"Failed to get file for hash {file_hash}: {exc}")
return None
def get_metadata(self, file_hash: str) -> Optional[Dict[str, Any]]:
"""Get metadata for a file from the database by hash.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Args:
file_hash: SHA256 hash of the file (64-char hex string)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Returns:
Dict with metadata fields (ext, size, hash, duration, etc.) or None if not found
"""
try:
# Normalize the hash
normalized_hash = _normalize_hash(file_hash)
if not normalized_hash:
return None
2025-12-29 17:05:03 -08:00
2026-01-11 00:52:54 -08:00
search_dir = expand_path(self._location)
2025-12-11 19:04:02 -08:00
from API.folder import DatabaseAPI
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with DatabaseAPI(search_dir) as api:
# Get file hash
file_hash_result = api.get_file_hash_by_hash(normalized_hash)
if not file_hash_result:
return None
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Query metadata directly from database
cursor = api.get_cursor()
2025-12-29 17:05:03 -08:00
cursor.execute(
"""
2025-12-11 19:04:02 -08:00
SELECT * FROM metadata WHERE hash = ?
2025-12-29 17:05:03 -08:00
""",
(file_hash_result,
),
2025-12-29 17:05:03 -08:00
)
2025-12-11 19:04:02 -08:00
row = cursor.fetchone()
if not row:
return None
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
metadata = dict(row)
# Canonicalize metadata keys (no legacy aliases)
if "file_path" in metadata and "path" not in metadata:
metadata["path"] = metadata.get("file_path")
metadata.pop("file_path", None)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Parse JSON fields
2025-12-29 17:05:03 -08:00
for field in ["url", "relationships"]:
2025-12-11 19:04:02 -08:00
if metadata.get(field):
try:
metadata[field] = json.loads(metadata[field])
except (json.JSONDecodeError, TypeError):
2025-12-29 17:05:03 -08:00
metadata[field] = [] if field == "url" else []
2025-12-11 19:04:02 -08:00
return metadata
except Exception as exc:
debug(f"Failed to get metadata for hash {file_hash}: {exc}")
return None
2026-01-02 02:28:59 -08:00
def set_relationship(self, alt_hash: str, king_hash: str, kind: str = "alt") -> bool:
"""Persist a relationship in the folder store DB.
This is a thin wrapper around the folder DB API so cmdlets can avoid
backend-specific branching.
"""
try:
if not self._location:
return False
alt_norm = _normalize_hash(alt_hash)
king_norm = _normalize_hash(king_hash)
if not alt_norm or not king_norm or alt_norm == king_norm:
return False
from API.folder import API_folder_store
2026-01-11 00:52:54 -08:00
with API_folder_store(expand_path(self._location)) as db:
2026-01-02 02:28:59 -08:00
db.set_relationship_by_hash(
alt_norm,
king_norm,
str(kind or "alt"),
bidirectional=False,
)
return True
except Exception:
return False
2025-12-11 19:04:02 -08:00
def get_tag(self, file_identifier: str, **kwargs: Any) -> Tuple[List[str], str]:
"""Get tags for a local file by hash.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Returns:
Tuple of (tags_list, store_name) where store_name is the actual store name
"""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
file_hash = file_identifier
if self._location:
try:
with API_folder_store(Path(self._location)) as db:
db_tags = db.get_tags(file_hash)
if db_tags:
# Return actual store name instead of generic "local_db"
store_name = self._name if self._name else "local"
2025-12-29 17:05:03 -08:00
return [
str(t).strip().lower()
for t in db_tags
if isinstance(t, str) and t.strip()
], store_name
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(f"Local DB lookup failed: {exc}")
return [], "unknown"
except Exception as exc:
debug(f"get_tags failed for local file: {exc}")
return [], "unknown"
def add_tag(self, hash: str, tag: List[str], **kwargs: Any) -> bool:
"""Add tags to a local file by hash (via API_folder_store).
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Handles namespace collapsing: when adding namespace:value, removes existing namespace:* tags.
Returns True if tags were successfully added.
"""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
if not self._location:
return False
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
with API_folder_store(Path(self._location)) as db:
2025-12-29 17:05:03 -08:00
existing_tags = [
t for t in (db.get_tags(hash) or [])
if isinstance(t, str) and t.strip()
2025-12-29 17:05:03 -08:00
]
2025-12-20 23:57:44 -08:00
from SYS.metadata import compute_namespaced_tag_overwrite
2025-12-20 23:57:44 -08:00
2025-12-29 17:05:03 -08:00
_to_remove, _to_add, merged = compute_namespaced_tag_overwrite(
existing_tags, tag or []
)
2025-12-20 23:57:44 -08:00
if not _to_remove and not _to_add:
return True
# Folder DB tag table is case-sensitive and add_tags_to_hash() is additive.
# To enforce lowercase-only tags and namespace overwrites, rewrite the full tag set.
cursor = db.connection.cursor()
2026-01-02 02:28:59 -08:00
cursor.execute("DELETE FROM tag WHERE hash = ?",
(hash,
))
2025-12-20 23:57:44 -08:00
for t in merged:
t = str(t).strip().lower()
if t:
cursor.execute(
2026-01-02 02:28:59 -08:00
"INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?)",
(hash,
t),
2025-12-20 23:57:44 -08:00
)
db.connection.commit()
try:
db._update_metadata_modified_time(hash)
except Exception:
pass
2025-12-11 19:04:02 -08:00
return True
except Exception as exc:
debug(f"Local DB add_tags failed: {exc}")
return False
except Exception as exc:
debug(f"add_tag failed for local file: {exc}")
return False
def delete_tag(self, file_identifier: str, tags: List[str], **kwargs: Any) -> bool:
"""Remove tags from a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
file_hash = file_identifier
if self._location:
try:
with API_folder_store(Path(self._location)) as db:
2025-12-29 17:05:03 -08:00
tag_list = [
str(t).strip().lower() for t in (tags or [])
2025-12-29 17:05:03 -08:00
if isinstance(t, str) and str(t).strip()
]
2025-12-20 23:57:44 -08:00
if not tag_list:
return True
db.remove_tags_from_hash(file_hash, tag_list)
2025-12-11 19:04:02 -08:00
return True
except Exception as exc:
debug(f"Local DB remove_tags failed: {exc}")
return False
except Exception as exc:
debug(f"delete_tag failed for local file: {exc}")
return False
def get_url(self, file_identifier: str, **kwargs: Any) -> List[str]:
"""Get known url for a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
file_hash = file_identifier
if self._location:
try:
from SYS.metadata import normalize_urls
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with API_folder_store(Path(self._location)) as db:
meta = db.get_metadata(file_hash) or {}
2025-12-14 00:53:52 -08:00
urls = normalize_urls(meta.get("url"))
return urls
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(f"Local DB get_metadata failed: {exc}")
return []
except Exception as exc:
debug(f"get_url failed for local file: {exc}")
return []
def add_url(self, file_identifier: str, url: List[str], **kwargs: Any) -> bool:
"""Add known url to a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
file_hash = file_identifier
if self._location:
try:
from SYS.metadata import normalize_urls
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with API_folder_store(Path(self._location)) as db:
meta = db.get_metadata(file_hash) or {}
2025-12-14 00:53:52 -08:00
existing_urls = normalize_urls(meta.get("url"))
incoming_urls = normalize_urls(url)
2025-12-11 19:04:02 -08:00
changed = False
2025-12-14 00:53:52 -08:00
for u in list(incoming_urls or []):
2025-12-11 19:04:02 -08:00
if not u:
continue
if u not in existing_urls:
existing_urls.append(u)
changed = True
if changed:
db.update_metadata_by_hash(
file_hash,
{
"url": existing_urls
}
)
2025-12-11 19:04:02 -08:00
return True
except Exception as exc:
debug(f"Local DB add_url failed: {exc}")
return False
except Exception as exc:
debug(f"add_url failed for local file: {exc}")
return False
2025-12-20 23:57:44 -08:00
def add_url_bulk(self, items: List[tuple[str, List[str]]], **kwargs: Any) -> bool:
"""Add known urls to many local files in one DB session.
This is a performance optimization used by cmdlets that receive many PipeObjects.
"""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-20 23:57:44 -08:00
try:
if not self._location:
return False
# Normalize + coalesce duplicates per hash.
try:
from SYS.metadata import normalize_urls
2025-12-20 23:57:44 -08:00
except Exception:
normalize_urls = None # type: ignore
merged_by_hash: Dict[str,
List[str]] = {}
2025-12-29 17:05:03 -08:00
for file_identifier, url_list in items or []:
2025-12-20 23:57:44 -08:00
file_hash = str(file_identifier or "").strip().lower()
if not file_hash:
continue
incoming: List[str]
if normalize_urls is not None:
try:
incoming = normalize_urls(url_list)
except Exception:
incoming = [
str(u).strip() for u in (url_list or []) if str(u).strip()
]
2025-12-20 23:57:44 -08:00
else:
incoming = [
str(u).strip() for u in (url_list or []) if str(u).strip()
]
2025-12-20 23:57:44 -08:00
if not incoming:
continue
existing = merged_by_hash.get(file_hash) or []
for u in incoming:
if u and u not in existing:
existing.append(u)
merged_by_hash[file_hash] = existing
if not merged_by_hash:
return True
import json
with API_folder_store(Path(self._location)) as db:
conn = getattr(db, "connection", None)
if conn is None:
return False
cursor = conn.cursor()
# Ensure metadata rows exist (may be needed for older entries).
for file_hash in merged_by_hash.keys():
try:
2025-12-29 17:05:03 -08:00
cursor.execute(
"INSERT OR IGNORE INTO metadata (hash) VALUES (?)",
(file_hash,
)
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
except Exception:
continue
# Load existing urls for all hashes in chunks.
existing_urls_by_hash: Dict[str,
List[str]] = {
h: []
for h in merged_by_hash.keys()
}
2025-12-20 23:57:44 -08:00
hashes = list(merged_by_hash.keys())
chunk_size = 400
for i in range(0, len(hashes), chunk_size):
chunk = hashes[i:i + chunk_size]
2025-12-20 23:57:44 -08:00
if not chunk:
continue
placeholders = ",".join(["?"] * len(chunk))
try:
2025-12-29 17:05:03 -08:00
cursor.execute(
f"SELECT hash, url FROM metadata WHERE hash IN ({placeholders})",
chunk
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
rows = cursor.fetchall() or []
except Exception:
rows = []
for row in rows:
try:
row_hash = str(row[0]).strip().lower()
except Exception:
continue
raw_urls = None
try:
raw_urls = row[1]
except Exception:
raw_urls = None
parsed_urls: List[str] = []
if raw_urls:
try:
parsed = json.loads(raw_urls)
if normalize_urls is not None:
parsed_urls = normalize_urls(parsed)
else:
if isinstance(parsed, list):
2025-12-29 17:05:03 -08:00
parsed_urls = [
str(u).strip() for u in parsed
if str(u).strip()
2025-12-29 17:05:03 -08:00
]
2025-12-20 23:57:44 -08:00
except Exception:
parsed_urls = []
existing_urls_by_hash[row_hash] = parsed_urls
# Compute updates and write in one commit.
updates: List[tuple[str, str]] = []
for file_hash, incoming_urls in merged_by_hash.items():
existing_urls = existing_urls_by_hash.get(file_hash) or []
final = list(existing_urls)
for u in incoming_urls:
if u and u not in final:
final.append(u)
if final != existing_urls:
try:
updates.append((json.dumps(final), file_hash))
except Exception:
continue
if updates:
cursor.executemany(
"UPDATE metadata SET url = ?, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
updates,
)
conn.commit()
return True
except Exception as exc:
debug(f"add_url_bulk failed for local file: {exc}")
return False
2025-12-11 19:04:02 -08:00
def delete_url(self, file_identifier: str, url: List[str], **kwargs: Any) -> bool:
"""Delete known url from a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
file_hash = file_identifier
if self._location:
try:
from SYS.metadata import normalize_urls
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with API_folder_store(Path(self._location)) as db:
meta = db.get_metadata(file_hash) or {}
2025-12-14 00:53:52 -08:00
existing_urls = normalize_urls(meta.get("url"))
remove_set = {u
for u in normalize_urls(url) if u}
2025-12-11 19:04:02 -08:00
if not remove_set:
return False
new_urls = [u for u in existing_urls if u not in remove_set]
if new_urls != existing_urls:
db.update_metadata_by_hash(file_hash,
{
"url": new_urls
})
2025-12-11 19:04:02 -08:00
return True
except Exception as exc:
debug(f"Local DB delete_url failed: {exc}")
return False
except Exception as exc:
debug(f"delete_url failed for local file: {exc}")
return False
def delete_url_bulk(
self,
items: List[tuple[str,
List[str]]],
**kwargs: Any
) -> bool:
2025-12-20 23:57:44 -08:00
"""Delete known urls from many local files in one DB session."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-20 23:57:44 -08:00
try:
if not self._location:
return False
try:
from SYS.metadata import normalize_urls
2025-12-20 23:57:44 -08:00
except Exception:
normalize_urls = None # type: ignore
remove_by_hash: Dict[str,
set[str]] = {}
2025-12-29 17:05:03 -08:00
for file_identifier, url_list in items or []:
2025-12-20 23:57:44 -08:00
file_hash = str(file_identifier or "").strip().lower()
if not file_hash:
continue
incoming: List[str]
if normalize_urls is not None:
try:
incoming = normalize_urls(url_list)
except Exception:
incoming = [
str(u).strip() for u in (url_list or []) if str(u).strip()
]
2025-12-20 23:57:44 -08:00
else:
incoming = [
str(u).strip() for u in (url_list or []) if str(u).strip()
]
2025-12-20 23:57:44 -08:00
remove = {u
for u in incoming if u}
2025-12-20 23:57:44 -08:00
if not remove:
continue
remove_by_hash.setdefault(file_hash, set()).update(remove)
if not remove_by_hash:
return True
import json
with API_folder_store(Path(self._location)) as db:
conn = getattr(db, "connection", None)
if conn is None:
return False
cursor = conn.cursor()
# Ensure metadata rows exist.
for file_hash in remove_by_hash.keys():
try:
2025-12-29 17:05:03 -08:00
cursor.execute(
"INSERT OR IGNORE INTO metadata (hash) VALUES (?)",
(file_hash,
)
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
except Exception:
continue
# Load existing urls for hashes in chunks.
existing_urls_by_hash: Dict[str,
List[str]] = {
h: []
for h in remove_by_hash.keys()
}
2025-12-20 23:57:44 -08:00
hashes = list(remove_by_hash.keys())
chunk_size = 400
for i in range(0, len(hashes), chunk_size):
chunk = hashes[i:i + chunk_size]
2025-12-20 23:57:44 -08:00
if not chunk:
continue
placeholders = ",".join(["?"] * len(chunk))
try:
2025-12-29 17:05:03 -08:00
cursor.execute(
f"SELECT hash, url FROM metadata WHERE hash IN ({placeholders})",
chunk
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
rows = cursor.fetchall() or []
except Exception:
rows = []
for row in rows:
try:
row_hash = str(row[0]).strip().lower()
except Exception:
continue
raw_urls = None
try:
raw_urls = row[1]
except Exception:
raw_urls = None
parsed_urls: List[str] = []
if raw_urls:
try:
parsed = json.loads(raw_urls)
if normalize_urls is not None:
parsed_urls = normalize_urls(parsed)
else:
if isinstance(parsed, list):
2025-12-29 17:05:03 -08:00
parsed_urls = [
str(u).strip() for u in parsed
if str(u).strip()
2025-12-29 17:05:03 -08:00
]
2025-12-20 23:57:44 -08:00
except Exception:
parsed_urls = []
existing_urls_by_hash[row_hash] = parsed_urls
# Apply removals + write updates.
updates: List[tuple[str, str]] = []
for file_hash, remove_set in remove_by_hash.items():
existing_urls = existing_urls_by_hash.get(file_hash) or []
new_urls = [u for u in existing_urls if u not in remove_set]
if new_urls != existing_urls:
try:
updates.append((json.dumps(new_urls), file_hash))
except Exception:
continue
if updates:
cursor.executemany(
"UPDATE metadata SET url = ?, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
updates,
)
conn.commit()
return True
except Exception as exc:
debug(f"delete_url_bulk failed for local file: {exc}")
return False
2025-12-12 21:55:38 -08:00
def get_note(self, file_identifier: str, **kwargs: Any) -> Dict[str, str]:
"""Get notes for a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-12 21:55:38 -08:00
try:
if not self._location:
return {}
file_hash = str(file_identifier or "").strip().lower()
if not _normalize_hash(file_hash):
return {}
with API_folder_store(Path(self._location)) as db:
getter = getattr(db, "get_notes", None)
if callable(getter):
notes = getter(file_hash)
return notes if isinstance(notes,
dict) else {}
2025-12-12 21:55:38 -08:00
# Fallback: default-only
note = db.get_note(file_hash)
return {
"default": str(note or "")
} if note else {}
2025-12-12 21:55:38 -08:00
except Exception as exc:
debug(f"get_note failed for local file: {exc}")
return {}
def set_note(
self,
file_identifier: str,
name: str,
text: str,
**kwargs: Any
) -> bool:
2025-12-12 21:55:38 -08:00
"""Set a named note for a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-12 21:55:38 -08:00
try:
if not self._location:
return False
file_hash = str(file_identifier or "").strip().lower()
2026-01-15 03:20:52 -08:00
note_name = str(name or "").strip()
2025-12-12 21:55:38 -08:00
if not _normalize_hash(file_hash):
return False
2026-01-15 03:20:52 -08:00
if not note_name:
2025-12-12 21:55:38 -08:00
return False
with API_folder_store(Path(self._location)) as db:
2026-01-15 03:20:52 -08:00
setter_hash = getattr(db, "set_note_by_hash", None)
if callable(setter_hash):
setter_hash(file_hash, note_name, str(text))
return True
file_path = self.get_file(file_hash, **kwargs)
if not file_path or not isinstance(file_path,
Path) or not file_path.exists():
return False
2025-12-12 21:55:38 -08:00
setter = getattr(db, "set_note", None)
if callable(setter):
2026-01-15 03:20:52 -08:00
setter(file_path, note_name, str(text))
2025-12-12 21:55:38 -08:00
return True
db.save_note(file_path, str(text))
return True
except Exception as exc:
debug(f"set_note failed for local file: {exc}")
return False
2025-12-20 23:57:44 -08:00
def set_note_bulk(self, items: List[tuple[str, str, str]], **kwargs: Any) -> bool:
"""Set notes for many local files in one DB session.
Preserves existing semantics by only setting notes for hashes that still
map to a file path that exists on disk.
"""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-20 23:57:44 -08:00
try:
if not self._location:
return False
# Normalize input.
normalized: List[tuple[str, str, str]] = []
2025-12-29 17:05:03 -08:00
for file_identifier, name, text in items or []:
2025-12-20 23:57:44 -08:00
file_hash = str(file_identifier or "").strip().lower()
note_name = str(name or "").strip()
note_text = str(text or "")
if not file_hash or not _normalize_hash(file_hash) or not note_name:
continue
normalized.append((file_hash, note_name, note_text))
if not normalized:
return True
with API_folder_store(Path(self._location)) as db:
conn = getattr(db, "connection", None)
if conn is None:
return False
cursor = conn.cursor()
# Look up file paths for hashes in chunks (to verify existence).
wanted_hashes = sorted({h
for (h, _n, _t) in normalized})
hash_to_path: Dict[str,
str] = {}
2025-12-20 23:57:44 -08:00
chunk_size = 400
for i in range(0, len(wanted_hashes), chunk_size):
chunk = wanted_hashes[i:i + chunk_size]
2025-12-20 23:57:44 -08:00
if not chunk:
continue
placeholders = ",".join(["?"] * len(chunk))
try:
2025-12-29 17:05:03 -08:00
cursor.execute(
2026-01-02 02:28:59 -08:00
f"SELECT hash, file_path FROM file WHERE hash IN ({placeholders})",
2025-12-29 17:05:03 -08:00
chunk,
)
2025-12-20 23:57:44 -08:00
rows = cursor.fetchall() or []
except Exception:
rows = []
for row in rows:
try:
h = str(row[0]).strip().lower()
p = str(row[1]).strip()
except Exception:
continue
if h and p:
hash_to_path[h] = p
# Ensure notes rows exist and only write for existing files.
inserts: List[tuple[str, str, str]] = []
for h, note_name, note_text in normalized:
p = hash_to_path.get(h)
if not p:
continue
try:
2025-12-24 22:15:54 -08:00
if not (Path(self._location) / p).exists():
2025-12-20 23:57:44 -08:00
continue
except Exception:
continue
inserts.append((h, note_name, note_text))
if not inserts:
return False
# Prefer upsert when supported, else fall back to INSERT OR REPLACE.
try:
cursor.executemany(
2026-01-02 02:28:59 -08:00
"INSERT INTO note (hash, name, note) VALUES (?, ?, ?) "
2025-12-20 23:57:44 -08:00
"ON CONFLICT(hash, name) DO UPDATE SET note = excluded.note, updated_at = CURRENT_TIMESTAMP",
inserts,
)
except Exception:
cursor.executemany(
2026-01-02 02:28:59 -08:00
"INSERT OR REPLACE INTO note (hash, name, note) VALUES (?, ?, ?)",
2025-12-20 23:57:44 -08:00
inserts,
)
conn.commit()
return True
except Exception as exc:
debug(f"set_note_bulk failed for local file: {exc}")
return False
2025-12-12 21:55:38 -08:00
def delete_note(self, file_identifier: str, name: str, **kwargs: Any) -> bool:
"""Delete a named note for a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-12 21:55:38 -08:00
try:
if not self._location:
return False
file_hash = str(file_identifier or "").strip().lower()
if not _normalize_hash(file_hash):
return False
with API_folder_store(Path(self._location)) as db:
deleter = getattr(db, "delete_note", None)
if callable(deleter):
deleter(file_hash, str(name))
return True
# Default-only fallback
if str(name).strip().lower() == "default":
deleter2 = getattr(db, "save_note", None)
if callable(deleter2):
file_path = self.get_file(file_hash, **kwargs)
if file_path and isinstance(file_path,
Path) and file_path.exists():
2025-12-12 21:55:38 -08:00
deleter2(file_path, "")
return True
return False
except Exception as exc:
debug(f"delete_note failed for local file: {exc}")
return False
2025-12-11 19:04:02 -08:00
def delete_file(self, file_identifier: str, **kwargs: Any) -> bool:
"""Delete a file from the folder store.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Args:
file_identifier: The file path (as string) or hash of the file to delete
**kwargs: Optional parameters
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Returns:
True if deletion succeeded, False otherwise
"""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
2025-12-23 16:36:39 -08:00
if not self._location:
return False
raw = str(file_identifier or "").strip()
if not raw:
return False
2026-01-11 00:52:54 -08:00
store_root = expand_path(self._location)
2025-12-23 16:36:39 -08:00
# Support deletion by hash (common for store items where `path` is the hash).
file_hash = _normalize_hash(raw)
resolved_path: Optional[Path] = None
with API_folder_store(store_root) as db:
if file_hash:
resolved_path = db.search_hash(file_hash)
else:
2026-01-11 00:52:54 -08:00
p = expand_path(raw)
2025-12-23 16:36:39 -08:00
resolved_path = p if p.is_absolute() else (store_root / p)
if resolved_path is None:
debug(f"delete_file: could not resolve identifier: {raw}")
return False
# Delete from database (also cleans up relationship backlinks).
db.delete_file(resolved_path)
# Delete the actual file from disk (best-effort).
try:
if resolved_path.exists():
resolved_path.unlink()
debug(f"Deleted file: {resolved_path}")
else:
debug(f"File not found on disk: {resolved_path}")
except Exception:
pass
return True
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(f"delete_file failed: {exc}")
return False