Files
Medios-Macina/Store/Folder.py

2186 lines
93 KiB
Python
Raw Normal View History

2025-12-11 19:04:02 -08:00
from __future__ import annotations
import json
import re
import shutil
import sys
from fnmatch import translate
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from SYS.logger import debug, log
from SYS.utils import sha256_file
2025-12-11 23:21:45 -08:00
from Store._base import Store
2025-12-11 19:04:02 -08:00
def _normalize_hash(value: Any) -> Optional[str]:
2025-12-29 17:05:03 -08:00
candidate = str(value or "").strip().lower()
2025-12-11 19:04:02 -08:00
if len(candidate) != 64:
return None
2025-12-29 17:05:03 -08:00
if any(ch not in "0123456789abcdef" for ch in candidate):
2025-12-11 19:04:02 -08:00
return None
return candidate
def _resolve_file_hash(db_hash: Optional[str], file_path: Path) -> Optional[str]:
normalized = _normalize_hash(db_hash) if db_hash else None
if normalized:
return normalized
return _normalize_hash(file_path.stem)
2025-12-11 23:21:45 -08:00
class Folder(Store):
2025-12-11 19:04:02 -08:00
""""""
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Track which locations have already been migrated to avoid repeated migrations
_migrated_locations = set()
2025-12-13 12:09:50 -08:00
# Cache scan results to avoid repeated full scans across repeated instantiations
_scan_cache: Dict[str,
Tuple[bool,
str,
Dict[str,
int]]] = {}
2025-12-13 00:18:30 -08:00
def __new__(cls, *args: Any, **kwargs: Any) -> "Folder":
return super().__new__(cls)
setattr(__new__, "keys", ("NAME", "PATH"))
2025-12-29 17:05:03 -08:00
2025-12-13 00:18:30 -08:00
def __init__(
self,
location: Optional[str] = None,
name: Optional[str] = None,
*,
NAME: Optional[str] = None,
PATH: Optional[str] = None,
) -> None:
if name is None and NAME is not None:
name = str(NAME)
if location is None and PATH is not None:
location = str(PATH)
2026-01-02 02:28:59 -08:00
self._location = str(location) if location is not None else ""
2025-12-11 19:04:02 -08:00
self._name = name
2025-12-13 12:09:50 -08:00
# Scan status (set during init)
self.scan_ok: bool = True
self.scan_detail: str = ""
self.scan_stats: Dict[str,
int] = {}
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
if self._location:
try:
from API.folder import API_folder_store
2025-12-13 12:09:50 -08:00
from API.folder import LocalLibraryInitializer
2025-12-11 19:04:02 -08:00
from pathlib import Path
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
location_path = Path(self._location).expanduser()
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Use context manager to ensure connection is properly closed
with API_folder_store(location_path) as db:
if db.connection:
db.connection.commit()
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Call migration and discovery at startup
Folder.migrate_location(self._location)
2025-12-13 12:09:50 -08:00
# Local library scan/index (one-time per location per process)
location_key = str(location_path)
cached = Folder._scan_cache.get(location_key)
if cached is None:
try:
initializer = LocalLibraryInitializer(location_path)
stats = initializer.scan_and_index() or {}
2025-12-29 17:05:03 -08:00
files_new = int(stats.get("files_new", 0) or 0)
sidecars = int(stats.get("sidecars_imported", 0) or 0)
total_db = int(stats.get("files_total_db", 0) or 0)
2025-12-13 12:09:50 -08:00
if files_new > 0 or sidecars > 0:
2025-12-29 17:05:03 -08:00
detail = f"New: {files_new}, Sidecars: {sidecars}" + (
f" (Total: {total_db})" if total_db else ""
)
2025-12-13 12:09:50 -08:00
else:
detail = "Up to date" + (
f" (Total: {total_db})" if total_db else ""
)
2025-12-13 12:09:50 -08:00
Folder._scan_cache[location_key] = (True, detail, dict(stats))
except Exception as exc:
Folder._scan_cache[location_key] = (
False,
f"Scan failed: {exc}",
{}
)
2025-12-13 12:09:50 -08:00
ok, detail, stats = Folder._scan_cache.get(location_key, (True, "", {}))
self.scan_ok = bool(ok)
self.scan_detail = str(detail or "")
self.scan_stats = dict(stats or {})
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(f"Failed to initialize database for '{name}': {exc}")
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
@classmethod
def migrate_location(cls, location: Optional[str]) -> None:
"""Migrate a location to hash-based storage (one-time operation, call explicitly at startup)."""
if not location:
return
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
from pathlib import Path
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
location_path = Path(location).expanduser()
location_str = str(location_path)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Only migrate once per location
if location_str in cls._migrated_locations:
return
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
cls._migrated_locations.add(location_str)
2025-12-13 12:09:50 -08:00
cls._migrate_to_hash_storage(location_path)
@classmethod
def _migrate_to_hash_storage(cls, location_path: Path) -> None:
2025-12-11 19:04:02 -08:00
"""Migrate existing files from filename-based to hash-based storage.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Checks for sidecars (.metadata, .tag) and imports them before renaming.
Also ensures all files have a title: tag.
"""
from API.folder import API_folder_store, read_sidecar, write_sidecar, find_sidecar
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
with API_folder_store(location_path) as db:
cursor = db.connection.cursor()
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# First pass: migrate filename-based files and add title tags
# Scan all files in the storage directory
for file_path in sorted(location_path.iterdir()):
if not file_path.is_file():
continue
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Skip database files and sidecars
2025-12-29 17:05:03 -08:00
if file_path.suffix in (".db", ".metadata", ".tag", "-shm", "-wal"):
2025-12-11 19:04:02 -08:00
continue
# Also skip if the file ends with -shm or -wal (SQLite journal files)
2025-12-29 17:05:03 -08:00
if file_path.name.endswith(("-shm", "-wal")):
2025-12-11 19:04:02 -08:00
continue
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Check if filename is already a hash (without extension)
2025-12-29 17:05:03 -08:00
if len(file_path.stem) == 64 and all(
c in "0123456789abcdef" for c in file_path.stem.lower()):
2025-12-11 19:04:02 -08:00
continue # Already migrated, will process in second pass
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
# Compute file hash
file_hash = sha256_file(file_path)
# Preserve extension in the hash-based filename
file_ext = file_path.suffix # e.g., '.mp4'
hash_filename = file_hash + file_ext if file_ext else file_hash
hash_path = location_path / hash_filename
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Check for sidecars and import them
sidecar_path = find_sidecar(file_path)
tags_to_add = []
url_to_add = []
has_title_tag = False
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
if sidecar_path and sidecar_path.exists():
try:
_, tags, url = read_sidecar(sidecar_path)
if tags:
tags_to_add = list(tags)
# Check if title tag exists
2025-12-29 17:05:03 -08:00
has_title_tag = any(
t.lower().startswith("title:")
for t in tags_to_add
2025-12-29 17:05:03 -08:00
)
2025-12-11 19:04:02 -08:00
if url:
url_to_add = list(url)
2025-12-29 17:05:03 -08:00
debug(
f"Found sidecar for {file_path.name}: {len(tags_to_add)} tags, {len(url_to_add)} url",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
# Delete the sidecar after importing
sidecar_path.unlink()
except Exception as exc:
2025-12-29 17:05:03 -08:00
debug(
f"Failed to read sidecar for {file_path.name}: {exc}",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
# Ensure there's a title tag (use original filename if not present)
if not has_title_tag:
tags_to_add.append(f"title:{file_path.name}")
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Rename file to hash if needed
if hash_path != file_path and not hash_path.exists():
2025-12-29 17:05:03 -08:00
debug(
f"Migrating: {file_path.name} -> {hash_filename}",
file=sys.stderr
2025-12-29 17:05:03 -08:00
)
2025-12-11 19:04:02 -08:00
file_path.rename(hash_path)
2025-12-13 12:09:50 -08:00
# Ensure DB points to the renamed path (update by hash).
try:
cursor.execute(
2026-01-02 02:28:59 -08:00
"UPDATE file SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
(db._to_db_file_path(hash_path),
file_hash),
2025-12-13 12:09:50 -08:00
)
except Exception:
pass
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Create or update database entry
db.get_or_create_file_entry(hash_path)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Save extension metadata
2025-12-29 17:05:03 -08:00
ext_clean = file_ext.lstrip(".") if file_ext else ""
db.save_metadata(
hash_path,
{
"hash": file_hash,
"ext": ext_clean,
"size": hash_path.stat().st_size,
},
)
2025-12-11 19:04:02 -08:00
# Add all tags (including title tag)
if tags_to_add:
db.save_tags(hash_path, tags_to_add)
2025-12-29 17:05:03 -08:00
debug(
f"Added {len(tags_to_add)} tags to {file_hash}",
file=sys.stderr
2025-12-29 17:05:03 -08:00
)
2025-12-11 19:04:02 -08:00
# Note: url would need a separate table if you want to store them
# For now, we're just noting them in debug
if url_to_add:
2025-12-29 17:05:03 -08:00
debug(
f"Imported {len(url_to_add)} url for {file_hash}: {url_to_add}",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(
f"Failed to migrate file {file_path.name}: {exc}",
file=sys.stderr
)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Second pass: ensure all files in database have a title: tag
db.connection.commit()
2025-12-29 17:05:03 -08:00
cursor.execute(
"""
2025-12-11 19:04:02 -08:00
SELECT f.hash, f.file_path
2026-01-02 02:28:59 -08:00
FROM file f
2025-12-11 19:04:02 -08:00
WHERE NOT EXISTS (
2026-01-02 02:28:59 -08:00
SELECT 1 FROM tag t WHERE t.hash = f.hash AND LOWER(t.tag) LIKE 'title:%'
2025-12-11 19:04:02 -08:00
)
2025-12-29 17:05:03 -08:00
"""
)
2025-12-11 19:04:02 -08:00
files_without_title = cursor.fetchall()
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
for file_hash, file_path_str in files_without_title:
try:
2025-12-24 22:15:54 -08:00
file_path = location_path / str(file_path_str)
2025-12-11 19:04:02 -08:00
if file_path.exists():
# Use the filename as the title
title_tag = f"title:{file_path.name}"
db.save_tags(file_path, [title_tag])
debug(
f"Added title tag to {file_path.name}",
file=sys.stderr
)
2025-12-11 19:04:02 -08:00
except Exception as exc:
2025-12-29 17:05:03 -08:00
debug(
f"Failed to add title tag to file {file_path_str}: {exc}",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
db.connection.commit()
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Third pass: discover files on disk that aren't in the database yet
# These are hash-named files that were added after initial indexing
2026-01-02 02:28:59 -08:00
cursor.execute("SELECT LOWER(hash) FROM file")
db_hashes = {row[0]
for row in cursor.fetchall()}
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
discovered = 0
for file_path in sorted(location_path.rglob("*")):
if file_path.is_file():
# Check if file name (without extension) is a 64-char hex hash
name_without_ext = file_path.stem
2025-12-29 17:05:03 -08:00
if len(name_without_ext) == 64 and all(
c in "0123456789abcdef"
for c in name_without_ext.lower()):
2025-12-11 19:04:02 -08:00
file_hash = name_without_ext.lower()
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Skip if already in DB
if file_hash in db_hashes:
continue
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
# Add file to DB (creates entry and auto-adds title: tag)
db.get_or_create_file_entry(file_path)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Save extension metadata
file_ext = file_path.suffix
2025-12-29 17:05:03 -08:00
ext_clean = file_ext.lstrip(".") if file_ext else ""
db.save_metadata(
file_path,
{
"hash": file_hash,
"ext": ext_clean,
"size": file_path.stat().st_size,
},
)
2025-12-11 19:04:02 -08:00
discovered += 1
except Exception as e:
2025-12-29 17:05:03 -08:00
debug(
f"Failed to discover file {file_path.name}: {e}",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
if discovered > 0:
2025-12-29 17:05:03 -08:00
debug(
f"Discovered and indexed {discovered} undiscovered files in {location_path.name}",
file=sys.stderr,
)
2025-12-11 19:04:02 -08:00
db.connection.commit()
except Exception as exc:
debug(f"Migration to hash storage failed: {exc}", file=sys.stderr)
def location(self) -> str:
return self._location
def name(self) -> str:
return self._name
def add_file(self, file_path: Path, **kwargs: Any) -> str:
"""Add file to local folder storage with full metadata support.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Args:
file_path: Path to the file to add
move: If True, move file instead of copy (default: False)
2025-12-11 23:21:45 -08:00
tag: Optional list of tag values to add
2025-12-11 19:04:02 -08:00
url: Optional list of url to associate with the file
title: Optional title (will be added as 'title:value' tag)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Returns:
File hash (SHA256 hex string) as identifier
"""
move_file = bool(kwargs.get("move"))
2025-12-11 23:21:45 -08:00
tag_list = kwargs.get("tag", [])
2025-12-11 19:04:02 -08:00
url = kwargs.get("url", [])
title = kwargs.get("title")
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Extract title from tags if not explicitly provided
if not title:
2025-12-11 23:21:45 -08:00
for candidate in tag_list:
if isinstance(candidate,
str) and candidate.lower().startswith("title:"):
2025-12-11 23:21:45 -08:00
title = candidate.split(":", 1)[1].strip()
2025-12-11 19:04:02 -08:00
break
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Fallback to filename if no title
if not title:
title = file_path.name
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Ensure title is in tags
title_tag = f"title:{title}"
if not any(str(candidate).lower().startswith("title:")
for candidate in tag_list):
2025-12-11 23:21:45 -08:00
tag_list = [title_tag] + list(tag_list)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
file_hash = sha256_file(file_path)
debug(f"File hash: {file_hash}", file=sys.stderr)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Preserve extension in the stored filename
file_ext = file_path.suffix # e.g., '.mp4'
save_filename = file_hash + file_ext if file_ext else file_hash
save_file = Path(self._location) / save_filename
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Check if file already exists
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with API_folder_store(Path(self._location)) as db:
existing_path = db.search_hash(file_hash)
if existing_path and existing_path.exists():
log(
f"✓ File already in local storage: {existing_path}",
file=sys.stderr,
)
# Still add tags and url if provided
2025-12-11 23:21:45 -08:00
if tag_list:
self.add_tag(file_hash, tag_list)
2025-12-11 19:04:02 -08:00
if url:
self.add_url(file_hash, url)
return file_hash
2025-12-19 15:20:08 -08:00
# Move or copy file (with progress bar on actual byte transfer).
# Note: a same-volume move may be a fast rename and won't show progress.
def _copy_with_progress(src: Path, dst: Path, *, label: str) -> None:
from SYS.models import ProgressFileReader
2025-12-19 15:20:08 -08:00
total_bytes = None
try:
total_bytes = int(src.stat().st_size)
except Exception:
total_bytes = None
with src.open("rb") as r, dst.open("wb") as w:
reader = ProgressFileReader(r, total_bytes=total_bytes, label=label)
while True:
chunk = reader.read(1024 * 1024)
if not chunk:
break
w.write(chunk)
# Preserve file metadata similar to shutil.copy2
try:
shutil.copystat(str(src), str(dst))
except Exception:
pass
2025-12-11 19:04:02 -08:00
if move_file:
2025-12-19 15:20:08 -08:00
# Prefer native move; fall back to copy+delete with progress on failure.
try:
shutil.move(str(file_path), str(save_file))
debug(f"Local move: {save_file}", file=sys.stderr)
2025-12-23 16:36:39 -08:00
# After a move, the original path no longer exists; use destination for subsequent ops.
file_path = save_file
2025-12-19 15:20:08 -08:00
except Exception:
_copy_with_progress(
file_path,
save_file,
label=f"folder:{self._name} move"
)
2025-12-19 15:20:08 -08:00
try:
file_path.unlink(missing_ok=True) # type: ignore[arg-type]
except Exception:
try:
if file_path.exists():
file_path.unlink()
except Exception:
pass
debug(f"Local move (copy+delete): {save_file}", file=sys.stderr)
2025-12-23 16:36:39 -08:00
file_path = save_file
2025-12-11 19:04:02 -08:00
else:
_copy_with_progress(
file_path,
save_file,
label=f"folder:{self._name} copy"
)
2025-12-11 19:04:02 -08:00
debug(f"Local copy: {save_file}", file=sys.stderr)
2025-12-14 00:53:52 -08:00
# Best-effort: capture duration for media
duration_value: float | None = None
try:
from SYS.utils import ffprobe
2025-12-29 17:05:03 -08:00
2025-12-14 00:53:52 -08:00
probe = ffprobe(str(save_file))
duration = probe.get("duration")
if isinstance(duration, (int, float)) and duration > 0:
duration_value = float(duration)
except Exception:
duration_value = None
2025-12-29 17:05:03 -08:00
2026-01-02 02:28:59 -08:00
# Save to database (metadata + tag/url updates share one connection)
2025-12-11 19:04:02 -08:00
with API_folder_store(Path(self._location)) as db:
2026-01-02 02:28:59 -08:00
conn = getattr(db, "connection", None)
if conn is None:
raise RuntimeError("Folder store DB connection unavailable")
cursor = conn.cursor()
debug(
f"[Folder.add_file] saving metadata for hash {file_hash}",
file=sys.stderr,
)
2025-12-29 17:05:03 -08:00
ext_clean = file_ext.lstrip(".") if file_ext else ""
db.save_metadata(
save_file,
{
"hash": file_hash,
"ext": ext_clean,
"size": save_file.stat().st_size,
"duration": duration_value,
},
)
2026-01-02 02:28:59 -08:00
debug(
f"[Folder.add_file] metadata stored for hash {file_hash}",
file=sys.stderr,
)
2025-12-29 17:05:03 -08:00
2026-01-02 02:28:59 -08:00
if tag_list:
try:
debug(
f"[Folder.add_file] merging {len(tag_list)} tags for {file_hash}",
file=sys.stderr,
)
from SYS.metadata import compute_namespaced_tag_overwrite
existing_tags = [
t for t in (db.get_tags(file_hash) or [])
if isinstance(t, str) and t.strip()
]
_to_remove, _to_add, merged = compute_namespaced_tag_overwrite(
existing_tags, tag_list or []
)
if _to_remove or _to_add:
cursor.execute("DELETE FROM tag WHERE hash = ?",
(file_hash,))
for t in merged:
tag_val = str(t).strip().lower()
if tag_val:
cursor.execute(
"INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?)",
(file_hash, tag_val),
)
conn.commit()
debug(
f"[Folder.add_file] tags rewritten for {file_hash}",
file=sys.stderr,
)
try:
db._update_metadata_modified_time(file_hash)
except Exception:
pass
except Exception as exc:
debug(f"Local DB tag merge failed: {exc}", file=sys.stderr)
if url:
try:
debug(
f"[Folder.add_file] merging {len(url)} URLs for {file_hash}",
file=sys.stderr,
)
from SYS.metadata import normalize_urls
2025-12-29 17:05:03 -08:00
2026-01-02 02:28:59 -08:00
existing_meta = db.get_metadata(file_hash) or {}
existing_urls = normalize_urls(existing_meta.get("url"))
incoming_urls = normalize_urls(url)
changed = False
for entry in list(incoming_urls or []):
if not entry:
continue
if entry not in existing_urls:
existing_urls.append(entry)
changed = True
if changed:
db.update_metadata_by_hash(
file_hash,
{"url": existing_urls},
)
debug(
f"[Folder.add_file] URLs merged for {file_hash}",
file=sys.stderr,
)
except Exception as exc:
debug(f"Local DB URL merge failed: {exc}", file=sys.stderr)
2025-12-29 17:05:03 -08:00
2025-12-17 03:16:41 -08:00
##log(f"✓ Added to local storage: {save_file.name}", file=sys.stderr)
2025-12-11 19:04:02 -08:00
return file_hash
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
except Exception as exc:
log(f"❌ Local storage failed: {exc}", file=sys.stderr)
raise
2025-12-11 23:21:45 -08:00
def search(self, query: str, **kwargs: Any) -> list[Dict[str, Any]]:
2025-12-11 19:04:02 -08:00
"""Search local database for files by title tag or filename."""
from fnmatch import fnmatch
from API.folder import DatabaseAPI
2025-12-23 16:36:39 -08:00
import unicodedata
2025-12-11 19:04:02 -08:00
limit = kwargs.get("limit")
try:
limit = int(limit) if limit is not None else None
except (TypeError, ValueError):
limit = None
if isinstance(limit, int) and limit <= 0:
limit = None
query = query.lower()
query_lower = query # Ensure query_lower is defined for all code paths
2025-12-20 23:57:44 -08:00
2025-12-23 16:36:39 -08:00
def _normalize_namespace_text(text: str, *, allow_wildcards: bool) -> str:
"""Normalize tag namespace values for consistent matching.
Removes control/format chars (e.g. zero-width spaces) that frequently appear in scraped tags,
collapses whitespace, and lowercases.
"""
s = str(text or "")
# Normalize newlines/tabs/etc to spaces early.
s = s.replace("\r", " ").replace("\n", " ").replace("\t", " ")
# Drop control / format chars (Cc/Cf) while preserving wildcard tokens when requested.
cleaned_chars: list[str] = []
for ch in s:
if allow_wildcards and ch in {"*",
"?"}:
2025-12-23 16:36:39 -08:00
cleaned_chars.append(ch)
continue
cat = unicodedata.category(ch)
if cat in {"Cc",
"Cf"}:
2025-12-23 16:36:39 -08:00
continue
cleaned_chars.append(ch)
s = "".join(cleaned_chars)
# Collapse any remaining unicode whitespace runs.
s = " ".join(s.split())
return s.strip().lower()
2025-12-20 23:57:44 -08:00
def _normalize_ext_filter(value: str) -> str:
2025-12-29 17:05:03 -08:00
v = str(value or "").strip().lower().lstrip(".")
2025-12-20 23:57:44 -08:00
v = "".join(ch for ch in v if ch.isalnum())
return v
def _extract_system_filetype_ext(text: str) -> Optional[str]:
# Match: system:filetype = png (allow optional '=' and flexible spaces)
m = re.search(r"\bsystem:filetype\s*(?:=\s*)?([^\s,]+)", text)
if not m:
m = re.search(r"\bsystem:filetype\s*=\s*([^\s,]+)", text)
if not m:
return None
return _normalize_ext_filter(m.group(1)) or None
# Support `ext:<value>` and Hydrus-style `system:filetype = <value>` anywhere
# in the query (space or comma separated).
ext_filter: Optional[str] = None
try:
sys_ext = _extract_system_filetype_ext(query_lower)
if sys_ext:
ext_filter = sys_ext
query_lower = re.sub(
r"\s*\bsystem:filetype\s*(?:=\s*)?[^\s,]+",
" ",
query_lower
)
2025-12-29 17:05:03 -08:00
query_lower = re.sub(r"\s{2,}", " ", query_lower).strip().strip(",")
2025-12-20 23:57:44 -08:00
query = query_lower
m = re.search(r"\bext:([^\s,]+)", query_lower)
if not m:
m = re.search(r"\bextension:([^\s,]+)", query_lower)
if m:
ext_filter = _normalize_ext_filter(m.group(1)) or None
query_lower = re.sub(
r"\s*\b(?:ext|extension):[^\s,]+",
" ",
query_lower
)
2025-12-29 17:05:03 -08:00
query_lower = re.sub(r"\s{2,}", " ", query_lower).strip().strip(",")
2025-12-20 23:57:44 -08:00
query = query_lower
except Exception:
ext_filter = None
match_all = query == "*" or (not query and bool(ext_filter))
2025-12-11 19:04:02 -08:00
results = []
search_dir = Path(self._location).expanduser()
2025-12-14 00:53:52 -08:00
def _url_like_pattern(value: str) -> str:
# Interpret user patterns as substring matches (with optional glob wildcards).
v = (value or "").strip().lower()
if not v or v == "*":
return "%"
v = v.replace("%", "\\%").replace("_", "\\_")
v = v.replace("*", "%").replace("?", "_")
if "%" not in v and "_" not in v:
return f"%{v}%"
if not v.startswith("%"):
v = "%" + v
if not v.endswith("%"):
v = v + "%"
return v
2025-12-24 17:58:57 -08:00
def _like_pattern(term: str) -> str:
# Convert glob-like tokens to SQL LIKE wildcards.
2025-12-29 17:05:03 -08:00
return str(term or "").replace("*", "%").replace("?", "_")
2025-12-24 17:58:57 -08:00
2025-12-29 17:05:03 -08:00
tokens = [t.strip() for t in query.split(",") if t.strip()]
2025-12-11 19:04:02 -08:00
if not match_all and len(tokens) == 1 and _normalize_hash(query):
debug("Hash queries require 'hash:' prefix for local search")
return results
if not match_all and _normalize_hash(query):
debug("Hash queries require 'hash:' prefix for local search")
return results
2025-12-29 17:05:03 -08:00
def _create_entry(
file_path: Path,
tags: list[str],
size_bytes: int | None,
db_hash: Optional[str]
) -> dict[str,
Any]:
2025-12-11 19:04:02 -08:00
path_str = str(file_path)
# Get title from tags if available, otherwise use hash as fallback
title = next(
(t.split(":",
1)[1] for t in tags if t.lower().startswith("title:")),
None
)
2025-12-11 19:04:02 -08:00
if not title:
# Fallback to hash if no title tag exists
hash_value = _resolve_file_hash(db_hash, file_path)
title = hash_value if hash_value else file_path.stem
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Extract extension from file path
2025-12-29 17:05:03 -08:00
ext = file_path.suffix.lstrip(".")
2025-12-11 19:04:02 -08:00
if not ext:
# Fallback: try to extract from title (original filename might be in title)
title_path = Path(title)
2025-12-29 17:05:03 -08:00
ext = title_path.suffix.lstrip(".")
2025-12-11 19:04:02 -08:00
# Build clean entry with only necessary fields
hash_value = _resolve_file_hash(db_hash, file_path)
entry = {
"title": title,
"ext": ext,
"path": path_str,
"target": path_str,
"store": self._name,
"size": size_bytes,
"hash": hash_value,
"tag": tags,
}
return entry
try:
if not search_dir.exists():
debug(f"Search directory does not exist: {search_dir}")
return results
try:
with DatabaseAPI(search_dir) as api:
2025-12-20 23:57:44 -08:00
ext_hashes: set[str] | None = None
if ext_filter:
# Fetch a bounded set of hashes to intersect with other filters.
ext_fetch_limit = (limit or 45) * 50
ext_hashes = api.get_file_hashes_by_ext(
ext_filter,
limit=ext_fetch_limit
)
2025-12-20 23:57:44 -08:00
# ext-only search: query is empty (or coerced to match_all above).
if ext_filter and (not query_lower or query_lower == "*"):
rows = api.get_files_by_ext(ext_filter, limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-20 23:57:44 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
results.append(entry)
if limit is not None and len(results) >= limit:
return results
2025-12-29 17:05:03 -08:00
backend_label = str(
getattr(self,
"_name",
"") or getattr(self,
"NAME",
"") or "folder"
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
debug(f"[folder:{backend_label}] {len(results)} result(s)")
return results
2025-12-11 19:04:02 -08:00
if tokens and len(tokens) > 1:
2025-12-14 00:53:52 -08:00
url_fetch_limit = (limit or 45) * 50
2025-12-11 19:04:02 -08:00
def _ids_for_token(token: str) -> set[int]:
token = token.strip()
if not token:
return set()
2025-12-29 17:05:03 -08:00
if ":" in token and not token.startswith(":"):
namespace, pattern = token.split(":", 1)
2025-12-11 19:04:02 -08:00
namespace = namespace.strip().lower()
pattern = pattern.strip().lower()
2025-12-29 17:05:03 -08:00
if namespace == "hash":
2025-12-11 19:04:02 -08:00
normalized_hash = _normalize_hash(pattern)
if not normalized_hash:
return set()
h = api.get_file_hash_by_hash(normalized_hash)
return {h} if h else set()
2025-12-29 17:05:03 -08:00
if namespace == "url":
if not pattern or pattern == "*":
return api.get_file_hashes_with_any_url(
limit=url_fetch_limit
)
return api.get_file_hashes_by_url_like(
_url_like_pattern(pattern),
limit=url_fetch_limit
2025-12-29 17:05:03 -08:00
)
2025-12-14 00:53:52 -08:00
2025-12-29 17:05:03 -08:00
if namespace == "system":
2025-12-20 23:57:44 -08:00
# Hydrus-compatible query: system:filetype = png
m_ft = re.match(
r"^filetype\s*(?:=\s*)?(.+)$",
pattern
)
2025-12-20 23:57:44 -08:00
if m_ft:
normalized_ext = _normalize_ext_filter(
m_ft.group(1)
)
2025-12-20 23:57:44 -08:00
if not normalized_ext:
return set()
2025-12-29 17:05:03 -08:00
return api.get_file_hashes_by_ext(
normalized_ext,
limit=url_fetch_limit
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
return set()
if namespace in {"ext",
"extension"}:
2025-12-20 23:57:44 -08:00
normalized_ext = _normalize_ext_filter(pattern)
if not normalized_ext:
return set()
2025-12-29 17:05:03 -08:00
return api.get_file_hashes_by_ext(
normalized_ext,
limit=url_fetch_limit
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
2025-12-29 17:05:03 -08:00
if namespace == "store":
if pattern not in {"local",
"file",
"filesystem"}:
2025-12-11 19:04:02 -08:00
return set()
return api.get_all_file_hashes()
query_pattern = f"{namespace}:%"
tag_rows = api.get_file_hashes_by_tag_pattern(
query_pattern
)
2025-12-11 19:04:02 -08:00
matched: set[str] = set()
for file_hash, tag_val in tag_rows:
if not tag_val:
continue
tag_lower = str(tag_val).lower()
if not tag_lower.startswith(f"{namespace}:"):
continue
2025-12-29 17:05:03 -08:00
value = _normalize_namespace_text(
tag_lower[len(namespace) + 1:],
allow_wildcards=False
)
pat = _normalize_namespace_text(
pattern,
allow_wildcards=True
2025-12-29 17:05:03 -08:00
)
2025-12-23 16:36:39 -08:00
if fnmatch(value, pat):
2025-12-11 19:04:02 -08:00
matched.add(file_hash)
return matched
term = token.lower()
like_pattern = f"%{_like_pattern(term)}%"
2025-12-24 17:58:57 -08:00
# Unqualified token: match file path, title: tags, and non-namespaced tags.
# Do NOT match other namespaces by default (e.g., artist:men at work).
hashes = set(
api.get_file_hashes_by_path_pattern(like_pattern)
or set()
)
2025-12-24 17:58:57 -08:00
try:
2025-12-29 17:05:03 -08:00
title_rows = api.get_files_by_namespace_pattern(
f"title:{like_pattern}",
url_fetch_limit
2025-12-29 17:05:03 -08:00
)
hashes.update(
{
row[0]
for row in (title_rows or []) if row and row[0]
}
2025-12-29 17:05:03 -08:00
)
2025-12-24 17:58:57 -08:00
except Exception:
pass
try:
2025-12-29 17:05:03 -08:00
simple_rows = api.get_files_by_simple_tag_pattern(
like_pattern,
url_fetch_limit
2025-12-29 17:05:03 -08:00
)
hashes.update(
{
row[0]
for row in (simple_rows or []) if row and row[0]
}
2025-12-29 17:05:03 -08:00
)
2025-12-24 17:58:57 -08:00
except Exception:
pass
2025-12-11 19:04:02 -08:00
return hashes
try:
matching_hashes: set[str] | None = None
for token in tokens:
hashes = _ids_for_token(token)
2025-12-29 17:05:03 -08:00
matching_hashes = (
hashes if matching_hashes is None else
matching_hashes & hashes
2025-12-29 17:05:03 -08:00
)
2025-12-11 19:04:02 -08:00
if not matching_hashes:
return results
2025-12-20 23:57:44 -08:00
if ext_hashes is not None:
matching_hashes = (
matching_hashes or set()
) & ext_hashes
2025-12-20 23:57:44 -08:00
if not matching_hashes:
return results
2025-12-11 19:04:02 -08:00
if not matching_hashes:
return results
rows = api.get_file_metadata(matching_hashes, limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-11 19:04:02 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
2025-12-11 19:04:02 -08:00
results.append(entry)
if limit is not None and len(results) >= limit:
return results
return results
except Exception as exc:
log(f"⚠️ AND search failed: {exc}", file=sys.stderr)
debug(f"AND search exception details: {exc}")
return []
if ":" in query and not query.startswith(":"):
namespace, pattern = query.split(":", 1)
namespace = namespace.strip().lower()
pattern = pattern.strip().lower()
debug(f"Performing namespace search: {namespace}:{pattern}")
if namespace == "hash":
normalized_hash = _normalize_hash(pattern)
if not normalized_hash:
return results
h = api.get_file_hash_by_hash(normalized_hash)
hashes = {h} if h else set()
rows = api.get_file_metadata(hashes, limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-11 19:04:02 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
2025-12-11 19:04:02 -08:00
results.append(entry)
if limit is not None and len(results) >= limit:
return results
return results
2025-12-14 00:53:52 -08:00
if namespace == "url":
if not pattern or pattern == "*":
rows = api.get_files_with_any_url(limit)
else:
rows = api.get_files_by_url_like(
_url_like_pattern(pattern),
limit
)
2025-12-14 00:53:52 -08:00
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-14 00:53:52 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-14 00:53:52 -08:00
results.append(entry)
if limit is not None and len(results) >= limit:
return results
return results
2025-12-20 23:57:44 -08:00
if namespace == "system":
# Hydrus-compatible query: system:filetype = png
m_ft = re.match(r"^filetype\s*(?:=\s*)?(.+)$", pattern)
if m_ft:
normalized_ext = _normalize_ext_filter(m_ft.group(1))
if not normalized_ext:
return results
rows = api.get_files_by_ext(normalized_ext, limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-20 23:57:44 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
results.append(entry)
if limit is not None and len(results) >= limit:
return results
return results
if namespace in {"ext",
"extension"}:
2025-12-20 23:57:44 -08:00
normalized_ext = _normalize_ext_filter(pattern)
if not normalized_ext:
return results
rows = api.get_files_by_ext(normalized_ext, limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-20 23:57:44 -08:00
if not file_path.exists():
continue
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
results.append(entry)
if limit is not None and len(results) >= limit:
return results
return results
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
query_pattern = f"{namespace}:%"
rows = api.get_files_by_namespace_pattern(query_pattern, limit)
debug(f"Found {len(rows)} potential matches in DB")
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
2025-12-29 17:05:03 -08:00
tags = api.get_tags_by_namespace_and_file(
file_hash,
query_pattern
)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
for tag in tags:
tag_lower = tag.lower()
if tag_lower.startswith(f"{namespace}:"):
2025-12-29 17:05:03 -08:00
value = _normalize_namespace_text(
tag_lower[len(namespace) + 1:],
allow_wildcards=False
)
pat = _normalize_namespace_text(
pattern,
allow_wildcards=True
2025-12-29 17:05:03 -08:00
)
2025-12-23 16:36:39 -08:00
if fnmatch(value, pat):
2025-12-20 23:57:44 -08:00
if ext_hashes is not None and file_hash not in ext_hashes:
break
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-11 19:04:02 -08:00
if file_path.exists():
if size_bytes is None:
size_bytes = file_path.stat().st_size
all_tags = api.get_tags_for_file(file_hash)
2025-12-29 17:05:03 -08:00
entry = _create_entry(
file_path,
all_tags,
size_bytes,
file_hash
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
try:
db_ext = str(ext
or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
2025-12-11 19:04:02 -08:00
results.append(entry)
else:
debug(f"File missing on disk: {file_path}")
break
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
if limit is not None and len(results) >= limit:
return results
elif not match_all:
2025-12-24 17:58:57 -08:00
# Default (unqualified) search: AND semantics across terms.
# Each term must match at least one of:
# - file path (filename)
# - title: namespace tag
# - non-namespaced tag
# Other namespaces (artist:, series:, etc.) are excluded unless explicitly queried.
2025-12-29 17:05:03 -08:00
terms = [
t.strip() for t in query_lower.replace(",", " ").split()
if t.strip()
2025-12-29 17:05:03 -08:00
]
2025-12-11 19:04:02 -08:00
if not terms:
terms = [query_lower]
2025-12-14 00:53:52 -08:00
2025-12-11 19:04:02 -08:00
fetch_limit = (limit or 45) * 50
2025-12-24 17:58:57 -08:00
matching_hashes: Optional[set[str]] = None
2025-12-14 00:53:52 -08:00
for term in terms:
2025-12-24 17:58:57 -08:00
if not term:
2025-12-14 00:53:52 -08:00
continue
2025-12-24 17:58:57 -08:00
like_term = _like_pattern(term)
like_pattern = f"%{like_term}%"
term_hashes: set[str] = set()
try:
2025-12-29 17:05:03 -08:00
term_hashes.update(
api.get_file_hashes_by_path_pattern(like_pattern)
)
2025-12-24 17:58:57 -08:00
except Exception:
pass
try:
2025-12-29 17:05:03 -08:00
title_rows = api.get_files_by_namespace_pattern(
f"title:{like_pattern}",
fetch_limit
2025-12-29 17:05:03 -08:00
)
term_hashes.update(
{
row[0]
for row in (title_rows or []) if row and row[0]
}
2025-12-29 17:05:03 -08:00
)
2025-12-24 17:58:57 -08:00
except Exception:
pass
try:
2025-12-29 17:05:03 -08:00
simple_rows = api.get_files_by_simple_tag_pattern(
like_pattern,
fetch_limit
2025-12-29 17:05:03 -08:00
)
term_hashes.update(
{
row[0]
for row in (simple_rows or []) if row and row[0]
}
2025-12-29 17:05:03 -08:00
)
2025-12-24 17:58:57 -08:00
except Exception:
pass
if ext_hashes is not None:
term_hashes &= ext_hashes
2025-12-29 17:05:03 -08:00
matching_hashes = (
term_hashes if matching_hashes is None else
(matching_hashes & term_hashes)
2025-12-29 17:05:03 -08:00
)
2025-12-24 17:58:57 -08:00
if not matching_hashes:
return results
if not matching_hashes:
return results
rows = api.get_file_metadata(set(matching_hashes), limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
2025-12-11 19:04:02 -08:00
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-14 00:53:52 -08:00
if not file_path.exists():
2025-12-11 19:04:02 -08:00
continue
2025-12-14 00:53:52 -08:00
if size_bytes is None:
try:
2025-12-11 19:04:02 -08:00
size_bytes = file_path.stat().st_size
2025-12-14 00:53:52 -08:00
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry_obj = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-24 17:58:57 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-24 17:58:57 -08:00
if db_ext:
entry_obj["ext"] = db_ext
except Exception:
pass
2025-12-14 00:53:52 -08:00
results.append(entry_obj)
if limit is not None and len(results) >= limit:
break
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
else:
rows = api.get_all_files(limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if file_path_str:
2025-12-20 23:57:44 -08:00
if ext_hashes is not None and file_hash not in ext_hashes:
continue
2025-12-24 22:15:54 -08:00
file_path = search_dir / str(file_path_str)
2025-12-11 19:04:02 -08:00
if file_path.exists():
if size_bytes is None:
size_bytes = file_path.stat().st_size
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
tags = api.get_tags_for_file(file_hash)
entry = _create_entry(
file_path,
tags,
size_bytes,
file_hash
)
2025-12-20 23:57:44 -08:00
try:
2025-12-29 17:05:03 -08:00
db_ext = str(ext or "").strip().lstrip(".")
2025-12-20 23:57:44 -08:00
if db_ext:
entry["ext"] = db_ext
except Exception:
pass
2025-12-11 19:04:02 -08:00
results.append(entry)
2025-12-29 17:05:03 -08:00
backend_label = str(
getattr(self,
"_name",
"") or getattr(self,
"NAME",
"") or "folder"
2025-12-29 17:05:03 -08:00
)
2025-12-14 00:53:52 -08:00
debug(f"[folder:{backend_label}] {len(results)} result(s)")
2025-12-11 19:04:02 -08:00
return results
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
except Exception as e:
log(f"⚠️ Database search failed: {e}", file=sys.stderr)
debug(f"DB search exception details: {e}")
return []
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
except Exception as exc:
log(f"❌ Local search failed: {exc}", file=sys.stderr)
raise
def _resolve_library_root(self,
file_path: Path,
config: Dict[str,
Any]) -> Optional[Path]:
2025-12-11 19:04:02 -08:00
"""Return the library root containing medios-macina.db.
Prefer the store's configured location, then config override, then walk parents
of the file path to find a directory with medios-macina.db."""
candidates: list[Path] = []
if self._location:
candidates.append(Path(self._location).expanduser())
cfg_root = get_local_storage_path(config) if config else None
if cfg_root:
candidates.append(Path(cfg_root).expanduser())
for root in candidates:
db_path = root / "medios-macina.db"
if db_path.exists():
return root
try:
for parent in [file_path] + list(file_path.parents):
db_path = parent / "medios-macina.db"
if db_path.exists():
return parent
except Exception:
pass
return None
def get_file(self, file_hash: str, **kwargs: Any) -> Optional[Path]:
"""Retrieve file by hash, returning path to the file.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Args:
file_hash: SHA256 hash of the file (64-char hex string)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Returns:
Path to the file or None if not found
"""
try:
# Normalize the hash
normalized_hash = _normalize_hash(file_hash)
if not normalized_hash:
return None
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
search_dir = Path(self._location).expanduser()
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with API_folder_store(search_dir) as db:
# Search for file by hash
file_path = db.search_hash(normalized_hash)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
if file_path and file_path.exists():
return file_path
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
return None
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(f"Failed to get file for hash {file_hash}: {exc}")
return None
def get_metadata(self, file_hash: str) -> Optional[Dict[str, Any]]:
"""Get metadata for a file from the database by hash.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Args:
file_hash: SHA256 hash of the file (64-char hex string)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Returns:
Dict with metadata fields (ext, size, hash, duration, etc.) or None if not found
"""
try:
# Normalize the hash
normalized_hash = _normalize_hash(file_hash)
if not normalized_hash:
return None
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
search_dir = Path(self._location).expanduser()
from API.folder import DatabaseAPI
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with DatabaseAPI(search_dir) as api:
# Get file hash
file_hash_result = api.get_file_hash_by_hash(normalized_hash)
if not file_hash_result:
return None
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Query metadata directly from database
cursor = api.get_cursor()
2025-12-29 17:05:03 -08:00
cursor.execute(
"""
2025-12-11 19:04:02 -08:00
SELECT * FROM metadata WHERE hash = ?
2025-12-29 17:05:03 -08:00
""",
(file_hash_result,
),
2025-12-29 17:05:03 -08:00
)
2025-12-11 19:04:02 -08:00
row = cursor.fetchone()
if not row:
return None
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
metadata = dict(row)
# Canonicalize metadata keys (no legacy aliases)
if "file_path" in metadata and "path" not in metadata:
metadata["path"] = metadata.get("file_path")
metadata.pop("file_path", None)
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
# Parse JSON fields
2025-12-29 17:05:03 -08:00
for field in ["url", "relationships"]:
2025-12-11 19:04:02 -08:00
if metadata.get(field):
try:
metadata[field] = json.loads(metadata[field])
except (json.JSONDecodeError, TypeError):
2025-12-29 17:05:03 -08:00
metadata[field] = [] if field == "url" else []
2025-12-11 19:04:02 -08:00
return metadata
except Exception as exc:
debug(f"Failed to get metadata for hash {file_hash}: {exc}")
return None
2026-01-02 02:28:59 -08:00
def set_relationship(self, alt_hash: str, king_hash: str, kind: str = "alt") -> bool:
"""Persist a relationship in the folder store DB.
This is a thin wrapper around the folder DB API so cmdlets can avoid
backend-specific branching.
"""
try:
if not self._location:
return False
alt_norm = _normalize_hash(alt_hash)
king_norm = _normalize_hash(king_hash)
if not alt_norm or not king_norm or alt_norm == king_norm:
return False
from API.folder import API_folder_store
with API_folder_store(Path(self._location).expanduser()) as db:
db.set_relationship_by_hash(
alt_norm,
king_norm,
str(kind or "alt"),
bidirectional=False,
)
return True
except Exception:
return False
2025-12-11 19:04:02 -08:00
def get_tag(self, file_identifier: str, **kwargs: Any) -> Tuple[List[str], str]:
"""Get tags for a local file by hash.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Returns:
Tuple of (tags_list, store_name) where store_name is the actual store name
"""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
file_hash = file_identifier
if self._location:
try:
with API_folder_store(Path(self._location)) as db:
db_tags = db.get_tags(file_hash)
if db_tags:
# Return actual store name instead of generic "local_db"
store_name = self._name if self._name else "local"
2025-12-29 17:05:03 -08:00
return [
str(t).strip().lower()
for t in db_tags
if isinstance(t, str) and t.strip()
], store_name
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(f"Local DB lookup failed: {exc}")
return [], "unknown"
except Exception as exc:
debug(f"get_tags failed for local file: {exc}")
return [], "unknown"
def add_tag(self, hash: str, tag: List[str], **kwargs: Any) -> bool:
"""Add tags to a local file by hash (via API_folder_store).
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Handles namespace collapsing: when adding namespace:value, removes existing namespace:* tags.
Returns True if tags were successfully added.
"""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
if not self._location:
return False
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
with API_folder_store(Path(self._location)) as db:
2025-12-29 17:05:03 -08:00
existing_tags = [
t for t in (db.get_tags(hash) or [])
if isinstance(t, str) and t.strip()
2025-12-29 17:05:03 -08:00
]
2025-12-20 23:57:44 -08:00
from SYS.metadata import compute_namespaced_tag_overwrite
2025-12-20 23:57:44 -08:00
2025-12-29 17:05:03 -08:00
_to_remove, _to_add, merged = compute_namespaced_tag_overwrite(
existing_tags, tag or []
)
2025-12-20 23:57:44 -08:00
if not _to_remove and not _to_add:
return True
# Folder DB tag table is case-sensitive and add_tags_to_hash() is additive.
# To enforce lowercase-only tags and namespace overwrites, rewrite the full tag set.
cursor = db.connection.cursor()
2026-01-02 02:28:59 -08:00
cursor.execute("DELETE FROM tag WHERE hash = ?",
(hash,
))
2025-12-20 23:57:44 -08:00
for t in merged:
t = str(t).strip().lower()
if t:
cursor.execute(
2026-01-02 02:28:59 -08:00
"INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?)",
(hash,
t),
2025-12-20 23:57:44 -08:00
)
db.connection.commit()
try:
db._update_metadata_modified_time(hash)
except Exception:
pass
2025-12-11 19:04:02 -08:00
return True
except Exception as exc:
debug(f"Local DB add_tags failed: {exc}")
return False
except Exception as exc:
debug(f"add_tag failed for local file: {exc}")
return False
def delete_tag(self, file_identifier: str, tags: List[str], **kwargs: Any) -> bool:
"""Remove tags from a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
file_hash = file_identifier
if self._location:
try:
with API_folder_store(Path(self._location)) as db:
2025-12-29 17:05:03 -08:00
tag_list = [
str(t).strip().lower() for t in (tags or [])
2025-12-29 17:05:03 -08:00
if isinstance(t, str) and str(t).strip()
]
2025-12-20 23:57:44 -08:00
if not tag_list:
return True
db.remove_tags_from_hash(file_hash, tag_list)
2025-12-11 19:04:02 -08:00
return True
except Exception as exc:
debug(f"Local DB remove_tags failed: {exc}")
return False
except Exception as exc:
debug(f"delete_tag failed for local file: {exc}")
return False
def get_url(self, file_identifier: str, **kwargs: Any) -> List[str]:
"""Get known url for a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
file_hash = file_identifier
if self._location:
try:
from SYS.metadata import normalize_urls
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with API_folder_store(Path(self._location)) as db:
meta = db.get_metadata(file_hash) or {}
2025-12-14 00:53:52 -08:00
urls = normalize_urls(meta.get("url"))
return urls
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(f"Local DB get_metadata failed: {exc}")
return []
except Exception as exc:
debug(f"get_url failed for local file: {exc}")
return []
def add_url(self, file_identifier: str, url: List[str], **kwargs: Any) -> bool:
"""Add known url to a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
file_hash = file_identifier
if self._location:
try:
from SYS.metadata import normalize_urls
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with API_folder_store(Path(self._location)) as db:
meta = db.get_metadata(file_hash) or {}
2025-12-14 00:53:52 -08:00
existing_urls = normalize_urls(meta.get("url"))
incoming_urls = normalize_urls(url)
2025-12-11 19:04:02 -08:00
changed = False
2025-12-14 00:53:52 -08:00
for u in list(incoming_urls or []):
2025-12-11 19:04:02 -08:00
if not u:
continue
if u not in existing_urls:
existing_urls.append(u)
changed = True
if changed:
db.update_metadata_by_hash(
file_hash,
{
"url": existing_urls
}
)
2025-12-11 19:04:02 -08:00
return True
except Exception as exc:
debug(f"Local DB add_url failed: {exc}")
return False
except Exception as exc:
debug(f"add_url failed for local file: {exc}")
return False
2025-12-20 23:57:44 -08:00
def add_url_bulk(self, items: List[tuple[str, List[str]]], **kwargs: Any) -> bool:
"""Add known urls to many local files in one DB session.
This is a performance optimization used by cmdlets that receive many PipeObjects.
"""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-20 23:57:44 -08:00
try:
if not self._location:
return False
# Normalize + coalesce duplicates per hash.
try:
from SYS.metadata import normalize_urls
2025-12-20 23:57:44 -08:00
except Exception:
normalize_urls = None # type: ignore
merged_by_hash: Dict[str,
List[str]] = {}
2025-12-29 17:05:03 -08:00
for file_identifier, url_list in items or []:
2025-12-20 23:57:44 -08:00
file_hash = str(file_identifier or "").strip().lower()
if not file_hash:
continue
incoming: List[str]
if normalize_urls is not None:
try:
incoming = normalize_urls(url_list)
except Exception:
incoming = [
str(u).strip() for u in (url_list or []) if str(u).strip()
]
2025-12-20 23:57:44 -08:00
else:
incoming = [
str(u).strip() for u in (url_list or []) if str(u).strip()
]
2025-12-20 23:57:44 -08:00
if not incoming:
continue
existing = merged_by_hash.get(file_hash) or []
for u in incoming:
if u and u not in existing:
existing.append(u)
merged_by_hash[file_hash] = existing
if not merged_by_hash:
return True
import json
with API_folder_store(Path(self._location)) as db:
conn = getattr(db, "connection", None)
if conn is None:
return False
cursor = conn.cursor()
# Ensure metadata rows exist (may be needed for older entries).
for file_hash in merged_by_hash.keys():
try:
2025-12-29 17:05:03 -08:00
cursor.execute(
"INSERT OR IGNORE INTO metadata (hash) VALUES (?)",
(file_hash,
)
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
except Exception:
continue
# Load existing urls for all hashes in chunks.
existing_urls_by_hash: Dict[str,
List[str]] = {
h: []
for h in merged_by_hash.keys()
}
2025-12-20 23:57:44 -08:00
hashes = list(merged_by_hash.keys())
chunk_size = 400
for i in range(0, len(hashes), chunk_size):
chunk = hashes[i:i + chunk_size]
2025-12-20 23:57:44 -08:00
if not chunk:
continue
placeholders = ",".join(["?"] * len(chunk))
try:
2025-12-29 17:05:03 -08:00
cursor.execute(
f"SELECT hash, url FROM metadata WHERE hash IN ({placeholders})",
chunk
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
rows = cursor.fetchall() or []
except Exception:
rows = []
for row in rows:
try:
row_hash = str(row[0]).strip().lower()
except Exception:
continue
raw_urls = None
try:
raw_urls = row[1]
except Exception:
raw_urls = None
parsed_urls: List[str] = []
if raw_urls:
try:
parsed = json.loads(raw_urls)
if normalize_urls is not None:
parsed_urls = normalize_urls(parsed)
else:
if isinstance(parsed, list):
2025-12-29 17:05:03 -08:00
parsed_urls = [
str(u).strip() for u in parsed
if str(u).strip()
2025-12-29 17:05:03 -08:00
]
2025-12-20 23:57:44 -08:00
except Exception:
parsed_urls = []
existing_urls_by_hash[row_hash] = parsed_urls
# Compute updates and write in one commit.
updates: List[tuple[str, str]] = []
for file_hash, incoming_urls in merged_by_hash.items():
existing_urls = existing_urls_by_hash.get(file_hash) or []
final = list(existing_urls)
for u in incoming_urls:
if u and u not in final:
final.append(u)
if final != existing_urls:
try:
updates.append((json.dumps(final), file_hash))
except Exception:
continue
if updates:
cursor.executemany(
"UPDATE metadata SET url = ?, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
updates,
)
conn.commit()
return True
except Exception as exc:
debug(f"add_url_bulk failed for local file: {exc}")
return False
2025-12-11 19:04:02 -08:00
def delete_url(self, file_identifier: str, url: List[str], **kwargs: Any) -> bool:
"""Delete known url from a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
file_hash = file_identifier
if self._location:
try:
from SYS.metadata import normalize_urls
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
with API_folder_store(Path(self._location)) as db:
meta = db.get_metadata(file_hash) or {}
2025-12-14 00:53:52 -08:00
existing_urls = normalize_urls(meta.get("url"))
remove_set = {u
for u in normalize_urls(url) if u}
2025-12-11 19:04:02 -08:00
if not remove_set:
return False
new_urls = [u for u in existing_urls if u not in remove_set]
if new_urls != existing_urls:
db.update_metadata_by_hash(file_hash,
{
"url": new_urls
})
2025-12-11 19:04:02 -08:00
return True
except Exception as exc:
debug(f"Local DB delete_url failed: {exc}")
return False
except Exception as exc:
debug(f"delete_url failed for local file: {exc}")
return False
def delete_url_bulk(
self,
items: List[tuple[str,
List[str]]],
**kwargs: Any
) -> bool:
2025-12-20 23:57:44 -08:00
"""Delete known urls from many local files in one DB session."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-20 23:57:44 -08:00
try:
if not self._location:
return False
try:
from SYS.metadata import normalize_urls
2025-12-20 23:57:44 -08:00
except Exception:
normalize_urls = None # type: ignore
remove_by_hash: Dict[str,
set[str]] = {}
2025-12-29 17:05:03 -08:00
for file_identifier, url_list in items or []:
2025-12-20 23:57:44 -08:00
file_hash = str(file_identifier or "").strip().lower()
if not file_hash:
continue
incoming: List[str]
if normalize_urls is not None:
try:
incoming = normalize_urls(url_list)
except Exception:
incoming = [
str(u).strip() for u in (url_list or []) if str(u).strip()
]
2025-12-20 23:57:44 -08:00
else:
incoming = [
str(u).strip() for u in (url_list or []) if str(u).strip()
]
2025-12-20 23:57:44 -08:00
remove = {u
for u in incoming if u}
2025-12-20 23:57:44 -08:00
if not remove:
continue
remove_by_hash.setdefault(file_hash, set()).update(remove)
if not remove_by_hash:
return True
import json
with API_folder_store(Path(self._location)) as db:
conn = getattr(db, "connection", None)
if conn is None:
return False
cursor = conn.cursor()
# Ensure metadata rows exist.
for file_hash in remove_by_hash.keys():
try:
2025-12-29 17:05:03 -08:00
cursor.execute(
"INSERT OR IGNORE INTO metadata (hash) VALUES (?)",
(file_hash,
)
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
except Exception:
continue
# Load existing urls for hashes in chunks.
existing_urls_by_hash: Dict[str,
List[str]] = {
h: []
for h in remove_by_hash.keys()
}
2025-12-20 23:57:44 -08:00
hashes = list(remove_by_hash.keys())
chunk_size = 400
for i in range(0, len(hashes), chunk_size):
chunk = hashes[i:i + chunk_size]
2025-12-20 23:57:44 -08:00
if not chunk:
continue
placeholders = ",".join(["?"] * len(chunk))
try:
2025-12-29 17:05:03 -08:00
cursor.execute(
f"SELECT hash, url FROM metadata WHERE hash IN ({placeholders})",
chunk
2025-12-29 17:05:03 -08:00
)
2025-12-20 23:57:44 -08:00
rows = cursor.fetchall() or []
except Exception:
rows = []
for row in rows:
try:
row_hash = str(row[0]).strip().lower()
except Exception:
continue
raw_urls = None
try:
raw_urls = row[1]
except Exception:
raw_urls = None
parsed_urls: List[str] = []
if raw_urls:
try:
parsed = json.loads(raw_urls)
if normalize_urls is not None:
parsed_urls = normalize_urls(parsed)
else:
if isinstance(parsed, list):
2025-12-29 17:05:03 -08:00
parsed_urls = [
str(u).strip() for u in parsed
if str(u).strip()
2025-12-29 17:05:03 -08:00
]
2025-12-20 23:57:44 -08:00
except Exception:
parsed_urls = []
existing_urls_by_hash[row_hash] = parsed_urls
# Apply removals + write updates.
updates: List[tuple[str, str]] = []
for file_hash, remove_set in remove_by_hash.items():
existing_urls = existing_urls_by_hash.get(file_hash) or []
new_urls = [u for u in existing_urls if u not in remove_set]
if new_urls != existing_urls:
try:
updates.append((json.dumps(new_urls), file_hash))
except Exception:
continue
if updates:
cursor.executemany(
"UPDATE metadata SET url = ?, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
updates,
)
conn.commit()
return True
except Exception as exc:
debug(f"delete_url_bulk failed for local file: {exc}")
return False
2025-12-12 21:55:38 -08:00
def get_note(self, file_identifier: str, **kwargs: Any) -> Dict[str, str]:
"""Get notes for a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-12 21:55:38 -08:00
try:
if not self._location:
return {}
file_hash = str(file_identifier or "").strip().lower()
if not _normalize_hash(file_hash):
return {}
with API_folder_store(Path(self._location)) as db:
getter = getattr(db, "get_notes", None)
if callable(getter):
notes = getter(file_hash)
return notes if isinstance(notes,
dict) else {}
2025-12-12 21:55:38 -08:00
# Fallback: default-only
note = db.get_note(file_hash)
return {
"default": str(note or "")
} if note else {}
2025-12-12 21:55:38 -08:00
except Exception as exc:
debug(f"get_note failed for local file: {exc}")
return {}
def set_note(
self,
file_identifier: str,
name: str,
text: str,
**kwargs: Any
) -> bool:
2025-12-12 21:55:38 -08:00
"""Set a named note for a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-12 21:55:38 -08:00
try:
if not self._location:
return False
file_hash = str(file_identifier or "").strip().lower()
if not _normalize_hash(file_hash):
return False
file_path = self.get_file(file_hash, **kwargs)
if not file_path or not isinstance(file_path,
Path) or not file_path.exists():
2025-12-12 21:55:38 -08:00
return False
with API_folder_store(Path(self._location)) as db:
setter = getattr(db, "set_note", None)
if callable(setter):
setter(file_path, str(name), str(text))
return True
db.save_note(file_path, str(text))
return True
except Exception as exc:
debug(f"set_note failed for local file: {exc}")
return False
2025-12-20 23:57:44 -08:00
def set_note_bulk(self, items: List[tuple[str, str, str]], **kwargs: Any) -> bool:
"""Set notes for many local files in one DB session.
Preserves existing semantics by only setting notes for hashes that still
map to a file path that exists on disk.
"""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-20 23:57:44 -08:00
try:
if not self._location:
return False
# Normalize input.
normalized: List[tuple[str, str, str]] = []
2025-12-29 17:05:03 -08:00
for file_identifier, name, text in items or []:
2025-12-20 23:57:44 -08:00
file_hash = str(file_identifier or "").strip().lower()
note_name = str(name or "").strip()
note_text = str(text or "")
if not file_hash or not _normalize_hash(file_hash) or not note_name:
continue
normalized.append((file_hash, note_name, note_text))
if not normalized:
return True
with API_folder_store(Path(self._location)) as db:
conn = getattr(db, "connection", None)
if conn is None:
return False
cursor = conn.cursor()
# Look up file paths for hashes in chunks (to verify existence).
wanted_hashes = sorted({h
for (h, _n, _t) in normalized})
hash_to_path: Dict[str,
str] = {}
2025-12-20 23:57:44 -08:00
chunk_size = 400
for i in range(0, len(wanted_hashes), chunk_size):
chunk = wanted_hashes[i:i + chunk_size]
2025-12-20 23:57:44 -08:00
if not chunk:
continue
placeholders = ",".join(["?"] * len(chunk))
try:
2025-12-29 17:05:03 -08:00
cursor.execute(
2026-01-02 02:28:59 -08:00
f"SELECT hash, file_path FROM file WHERE hash IN ({placeholders})",
2025-12-29 17:05:03 -08:00
chunk,
)
2025-12-20 23:57:44 -08:00
rows = cursor.fetchall() or []
except Exception:
rows = []
for row in rows:
try:
h = str(row[0]).strip().lower()
p = str(row[1]).strip()
except Exception:
continue
if h and p:
hash_to_path[h] = p
# Ensure notes rows exist and only write for existing files.
inserts: List[tuple[str, str, str]] = []
for h, note_name, note_text in normalized:
p = hash_to_path.get(h)
if not p:
continue
try:
2025-12-24 22:15:54 -08:00
if not (Path(self._location) / p).exists():
2025-12-20 23:57:44 -08:00
continue
except Exception:
continue
inserts.append((h, note_name, note_text))
if not inserts:
return False
# Prefer upsert when supported, else fall back to INSERT OR REPLACE.
try:
cursor.executemany(
2026-01-02 02:28:59 -08:00
"INSERT INTO note (hash, name, note) VALUES (?, ?, ?) "
2025-12-20 23:57:44 -08:00
"ON CONFLICT(hash, name) DO UPDATE SET note = excluded.note, updated_at = CURRENT_TIMESTAMP",
inserts,
)
except Exception:
cursor.executemany(
2026-01-02 02:28:59 -08:00
"INSERT OR REPLACE INTO note (hash, name, note) VALUES (?, ?, ?)",
2025-12-20 23:57:44 -08:00
inserts,
)
conn.commit()
return True
except Exception as exc:
debug(f"set_note_bulk failed for local file: {exc}")
return False
2025-12-12 21:55:38 -08:00
def delete_note(self, file_identifier: str, name: str, **kwargs: Any) -> bool:
"""Delete a named note for a local file by hash."""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-12 21:55:38 -08:00
try:
if not self._location:
return False
file_hash = str(file_identifier or "").strip().lower()
if not _normalize_hash(file_hash):
return False
with API_folder_store(Path(self._location)) as db:
deleter = getattr(db, "delete_note", None)
if callable(deleter):
deleter(file_hash, str(name))
return True
# Default-only fallback
if str(name).strip().lower() == "default":
deleter2 = getattr(db, "save_note", None)
if callable(deleter2):
file_path = self.get_file(file_hash, **kwargs)
if file_path and isinstance(file_path,
Path) and file_path.exists():
2025-12-12 21:55:38 -08:00
deleter2(file_path, "")
return True
return False
except Exception as exc:
debug(f"delete_note failed for local file: {exc}")
return False
2025-12-11 19:04:02 -08:00
def delete_file(self, file_identifier: str, **kwargs: Any) -> bool:
"""Delete a file from the folder store.
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Args:
file_identifier: The file path (as string) or hash of the file to delete
**kwargs: Optional parameters
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
Returns:
True if deletion succeeded, False otherwise
"""
from API.folder import API_folder_store
2025-12-29 17:05:03 -08:00
2025-12-11 19:04:02 -08:00
try:
2025-12-23 16:36:39 -08:00
if not self._location:
return False
raw = str(file_identifier or "").strip()
if not raw:
return False
store_root = Path(self._location).expanduser()
# Support deletion by hash (common for store items where `path` is the hash).
file_hash = _normalize_hash(raw)
resolved_path: Optional[Path] = None
with API_folder_store(store_root) as db:
if file_hash:
resolved_path = db.search_hash(file_hash)
else:
p = Path(raw)
resolved_path = p if p.is_absolute() else (store_root / p)
if resolved_path is None:
debug(f"delete_file: could not resolve identifier: {raw}")
return False
# Delete from database (also cleans up relationship backlinks).
db.delete_file(resolved_path)
# Delete the actual file from disk (best-effort).
try:
if resolved_path.exists():
resolved_path.unlink()
debug(f"Deleted file: {resolved_path}")
else:
debug(f"File not found on disk: {resolved_path}")
except Exception:
pass
return True
2025-12-11 19:04:02 -08:00
except Exception as exc:
debug(f"delete_file failed: {exc}")
return False