1709 lines
78 KiB
Python
1709 lines
78 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import shutil
|
|
import sys
|
|
from fnmatch import translate
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from SYS.logger import debug, log
|
|
from SYS.utils import sha256_file
|
|
|
|
from Store._base import Store
|
|
|
|
|
|
def _normalize_hash(value: Any) -> Optional[str]:
|
|
candidate = str(value or '').strip().lower()
|
|
if len(candidate) != 64:
|
|
return None
|
|
if any(ch not in '0123456789abcdef' for ch in candidate):
|
|
return None
|
|
return candidate
|
|
|
|
|
|
def _resolve_file_hash(db_hash: Optional[str], file_path: Path) -> Optional[str]:
|
|
normalized = _normalize_hash(db_hash) if db_hash else None
|
|
if normalized:
|
|
return normalized
|
|
return _normalize_hash(file_path.stem)
|
|
|
|
|
|
|
|
|
|
class Folder(Store):
|
|
""""""
|
|
# Track which locations have already been migrated to avoid repeated migrations
|
|
_migrated_locations = set()
|
|
# Cache scan results to avoid repeated full scans across repeated instantiations
|
|
_scan_cache: Dict[str, Tuple[bool, str, Dict[str, int]]] = {}
|
|
|
|
def __new__(cls, *args: Any, **kwargs: Any) -> "Folder":
|
|
return super().__new__(cls)
|
|
|
|
setattr(__new__, "keys", ("NAME", "PATH"))
|
|
|
|
def __init__(
|
|
self,
|
|
location: Optional[str] = None,
|
|
name: Optional[str] = None,
|
|
*,
|
|
NAME: Optional[str] = None,
|
|
PATH: Optional[str] = None,
|
|
) -> None:
|
|
if name is None and NAME is not None:
|
|
name = str(NAME)
|
|
if location is None and PATH is not None:
|
|
location = str(PATH)
|
|
|
|
self._location = location
|
|
self._name = name
|
|
|
|
# Scan status (set during init)
|
|
self.scan_ok: bool = True
|
|
self.scan_detail: str = ""
|
|
self.scan_stats: Dict[str, int] = {}
|
|
|
|
if self._location:
|
|
try:
|
|
from API.folder import API_folder_store
|
|
from API.folder import LocalLibraryInitializer
|
|
from pathlib import Path
|
|
location_path = Path(self._location).expanduser()
|
|
|
|
# Use context manager to ensure connection is properly closed
|
|
with API_folder_store(location_path) as db:
|
|
if db.connection:
|
|
db.connection.commit()
|
|
|
|
# Call migration and discovery at startup
|
|
Folder.migrate_location(self._location)
|
|
|
|
# Local library scan/index (one-time per location per process)
|
|
location_key = str(location_path)
|
|
cached = Folder._scan_cache.get(location_key)
|
|
if cached is None:
|
|
try:
|
|
initializer = LocalLibraryInitializer(location_path)
|
|
stats = initializer.scan_and_index() or {}
|
|
files_new = int(stats.get('files_new', 0) or 0)
|
|
sidecars = int(stats.get('sidecars_imported', 0) or 0)
|
|
total_db = int(stats.get('files_total_db', 0) or 0)
|
|
if files_new > 0 or sidecars > 0:
|
|
detail = f"New: {files_new}, Sidecars: {sidecars}" + (f" (Total: {total_db})" if total_db else "")
|
|
else:
|
|
detail = ("Up to date" + (f" (Total: {total_db})" if total_db else ""))
|
|
Folder._scan_cache[location_key] = (True, detail, dict(stats))
|
|
except Exception as exc:
|
|
Folder._scan_cache[location_key] = (False, f"Scan failed: {exc}", {})
|
|
|
|
ok, detail, stats = Folder._scan_cache.get(location_key, (True, "", {}))
|
|
self.scan_ok = bool(ok)
|
|
self.scan_detail = str(detail or "")
|
|
self.scan_stats = dict(stats or {})
|
|
except Exception as exc:
|
|
debug(f"Failed to initialize database for '{name}': {exc}")
|
|
|
|
@classmethod
|
|
def migrate_location(cls, location: Optional[str]) -> None:
|
|
"""Migrate a location to hash-based storage (one-time operation, call explicitly at startup)."""
|
|
if not location:
|
|
return
|
|
|
|
from pathlib import Path
|
|
location_path = Path(location).expanduser()
|
|
location_str = str(location_path)
|
|
|
|
# Only migrate once per location
|
|
if location_str in cls._migrated_locations:
|
|
return
|
|
|
|
cls._migrated_locations.add(location_str)
|
|
|
|
cls._migrate_to_hash_storage(location_path)
|
|
|
|
@classmethod
|
|
def _migrate_to_hash_storage(cls, location_path: Path) -> None:
|
|
"""Migrate existing files from filename-based to hash-based storage.
|
|
|
|
Checks for sidecars (.metadata, .tag) and imports them before renaming.
|
|
Also ensures all files have a title: tag.
|
|
"""
|
|
from API.folder import API_folder_store, read_sidecar, write_sidecar, find_sidecar
|
|
|
|
try:
|
|
with API_folder_store(location_path) as db:
|
|
cursor = db.connection.cursor()
|
|
|
|
# First pass: migrate filename-based files and add title tags
|
|
# Scan all files in the storage directory
|
|
for file_path in sorted(location_path.iterdir()):
|
|
if not file_path.is_file():
|
|
continue
|
|
|
|
# Skip database files and sidecars
|
|
if file_path.suffix in ('.db', '.metadata', '.tag', '-shm', '-wal'):
|
|
continue
|
|
# Also skip if the file ends with -shm or -wal (SQLite journal files)
|
|
if file_path.name.endswith(('-shm', '-wal')):
|
|
continue
|
|
|
|
# Check if filename is already a hash (without extension)
|
|
if len(file_path.stem) == 64 and all(c in '0123456789abcdef' for c in file_path.stem.lower()):
|
|
continue # Already migrated, will process in second pass
|
|
|
|
try:
|
|
# Compute file hash
|
|
file_hash = sha256_file(file_path)
|
|
# Preserve extension in the hash-based filename
|
|
file_ext = file_path.suffix # e.g., '.mp4'
|
|
hash_filename = file_hash + file_ext if file_ext else file_hash
|
|
hash_path = location_path / hash_filename
|
|
|
|
# Check for sidecars and import them
|
|
sidecar_path = find_sidecar(file_path)
|
|
tags_to_add = []
|
|
url_to_add = []
|
|
has_title_tag = False
|
|
|
|
if sidecar_path and sidecar_path.exists():
|
|
try:
|
|
_, tags, url = read_sidecar(sidecar_path)
|
|
if tags:
|
|
tags_to_add = list(tags)
|
|
# Check if title tag exists
|
|
has_title_tag = any(t.lower().startswith('title:') for t in tags_to_add)
|
|
if url:
|
|
url_to_add = list(url)
|
|
debug(f"Found sidecar for {file_path.name}: {len(tags_to_add)} tags, {len(url_to_add)} url", file=sys.stderr)
|
|
# Delete the sidecar after importing
|
|
sidecar_path.unlink()
|
|
except Exception as exc:
|
|
debug(f"Failed to read sidecar for {file_path.name}: {exc}", file=sys.stderr)
|
|
|
|
# Ensure there's a title tag (use original filename if not present)
|
|
if not has_title_tag:
|
|
tags_to_add.append(f"title:{file_path.name}")
|
|
|
|
# Rename file to hash if needed
|
|
if hash_path != file_path and not hash_path.exists():
|
|
debug(f"Migrating: {file_path.name} -> {hash_filename}", file=sys.stderr)
|
|
file_path.rename(hash_path)
|
|
|
|
# Ensure DB points to the renamed path (update by hash).
|
|
try:
|
|
cursor.execute(
|
|
"UPDATE files SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
|
|
(str(hash_path.resolve()), file_hash),
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Create or update database entry
|
|
db.get_or_create_file_entry(hash_path)
|
|
|
|
# Save extension metadata
|
|
ext_clean = file_ext.lstrip('.') if file_ext else ''
|
|
db.save_metadata(hash_path, {
|
|
'hash': file_hash,
|
|
'ext': ext_clean,
|
|
'size': hash_path.stat().st_size
|
|
})
|
|
|
|
# Add all tags (including title tag)
|
|
if tags_to_add:
|
|
db.save_tags(hash_path, tags_to_add)
|
|
debug(f"Added {len(tags_to_add)} tags to {file_hash}", file=sys.stderr)
|
|
|
|
# Note: url would need a separate table if you want to store them
|
|
# For now, we're just noting them in debug
|
|
if url_to_add:
|
|
debug(f"Imported {len(url_to_add)} url for {file_hash}: {url_to_add}", file=sys.stderr)
|
|
|
|
except Exception as exc:
|
|
debug(f"Failed to migrate file {file_path.name}: {exc}", file=sys.stderr)
|
|
|
|
# Second pass: ensure all files in database have a title: tag
|
|
db.connection.commit()
|
|
cursor.execute('''
|
|
SELECT f.hash, f.file_path
|
|
FROM files f
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM tags t WHERE t.hash = f.hash AND LOWER(t.tag) LIKE 'title:%'
|
|
)
|
|
''')
|
|
files_without_title = cursor.fetchall()
|
|
|
|
for file_hash, file_path_str in files_without_title:
|
|
try:
|
|
file_path = Path(file_path_str)
|
|
if file_path.exists():
|
|
# Use the filename as the title
|
|
title_tag = f"title:{file_path.name}"
|
|
db.save_tags(file_path, [title_tag])
|
|
debug(f"Added title tag to {file_path.name}", file=sys.stderr)
|
|
except Exception as exc:
|
|
debug(f"Failed to add title tag to file {file_path_str}: {exc}", file=sys.stderr)
|
|
|
|
db.connection.commit()
|
|
|
|
# Third pass: discover files on disk that aren't in the database yet
|
|
# These are hash-named files that were added after initial indexing
|
|
cursor.execute('SELECT LOWER(hash) FROM files')
|
|
db_hashes = {row[0] for row in cursor.fetchall()}
|
|
|
|
discovered = 0
|
|
for file_path in sorted(location_path.rglob("*")):
|
|
if file_path.is_file():
|
|
# Check if file name (without extension) is a 64-char hex hash
|
|
name_without_ext = file_path.stem
|
|
if len(name_without_ext) == 64 and all(c in '0123456789abcdef' for c in name_without_ext.lower()):
|
|
file_hash = name_without_ext.lower()
|
|
|
|
# Skip if already in DB
|
|
if file_hash in db_hashes:
|
|
continue
|
|
|
|
try:
|
|
# Add file to DB (creates entry and auto-adds title: tag)
|
|
db.get_or_create_file_entry(file_path)
|
|
|
|
# Save extension metadata
|
|
file_ext = file_path.suffix
|
|
ext_clean = file_ext.lstrip('.') if file_ext else ''
|
|
db.save_metadata(file_path, {
|
|
'hash': file_hash,
|
|
'ext': ext_clean,
|
|
'size': file_path.stat().st_size
|
|
})
|
|
|
|
discovered += 1
|
|
except Exception as e:
|
|
debug(f"Failed to discover file {file_path.name}: {e}", file=sys.stderr)
|
|
|
|
if discovered > 0:
|
|
debug(f"Discovered and indexed {discovered} undiscovered files in {location_path.name}", file=sys.stderr)
|
|
db.connection.commit()
|
|
except Exception as exc:
|
|
debug(f"Migration to hash storage failed: {exc}", file=sys.stderr)
|
|
|
|
|
|
def location(self) -> str:
|
|
return self._location
|
|
|
|
def name(self) -> str:
|
|
return self._name
|
|
|
|
def add_file(self, file_path: Path, **kwargs: Any) -> str:
|
|
"""Add file to local folder storage with full metadata support.
|
|
|
|
Args:
|
|
file_path: Path to the file to add
|
|
move: If True, move file instead of copy (default: False)
|
|
tag: Optional list of tag values to add
|
|
url: Optional list of url to associate with the file
|
|
title: Optional title (will be added as 'title:value' tag)
|
|
|
|
Returns:
|
|
File hash (SHA256 hex string) as identifier
|
|
"""
|
|
move_file = bool(kwargs.get("move"))
|
|
tag_list = kwargs.get("tag", [])
|
|
url = kwargs.get("url", [])
|
|
title = kwargs.get("title")
|
|
|
|
# Extract title from tags if not explicitly provided
|
|
if not title:
|
|
for candidate in tag_list:
|
|
if isinstance(candidate, str) and candidate.lower().startswith("title:"):
|
|
title = candidate.split(":", 1)[1].strip()
|
|
break
|
|
|
|
# Fallback to filename if no title
|
|
if not title:
|
|
title = file_path.name
|
|
|
|
# Ensure title is in tags
|
|
title_tag = f"title:{title}"
|
|
if not any(str(candidate).lower().startswith("title:") for candidate in tag_list):
|
|
tag_list = [title_tag] + list(tag_list)
|
|
|
|
try:
|
|
file_hash = sha256_file(file_path)
|
|
debug(f"File hash: {file_hash}", file=sys.stderr)
|
|
|
|
# Preserve extension in the stored filename
|
|
file_ext = file_path.suffix # e.g., '.mp4'
|
|
save_filename = file_hash + file_ext if file_ext else file_hash
|
|
save_file = Path(self._location) / save_filename
|
|
|
|
# Check if file already exists
|
|
from API.folder import API_folder_store
|
|
with API_folder_store(Path(self._location)) as db:
|
|
existing_path = db.search_hash(file_hash)
|
|
if existing_path and existing_path.exists():
|
|
log(
|
|
f"✓ File already in local storage: {existing_path}",
|
|
file=sys.stderr,
|
|
)
|
|
# Still add tags and url if provided
|
|
if tag_list:
|
|
self.add_tag(file_hash, tag_list)
|
|
if url:
|
|
self.add_url(file_hash, url)
|
|
return file_hash
|
|
|
|
# Move or copy file (with progress bar on actual byte transfer).
|
|
# Note: a same-volume move may be a fast rename and won't show progress.
|
|
def _copy_with_progress(src: Path, dst: Path, *, label: str) -> None:
|
|
from models import ProgressFileReader
|
|
|
|
total_bytes = None
|
|
try:
|
|
total_bytes = int(src.stat().st_size)
|
|
except Exception:
|
|
total_bytes = None
|
|
|
|
with src.open("rb") as r, dst.open("wb") as w:
|
|
reader = ProgressFileReader(r, total_bytes=total_bytes, label=label)
|
|
while True:
|
|
chunk = reader.read(1024 * 1024)
|
|
if not chunk:
|
|
break
|
|
w.write(chunk)
|
|
|
|
# Preserve file metadata similar to shutil.copy2
|
|
try:
|
|
shutil.copystat(str(src), str(dst))
|
|
except Exception:
|
|
pass
|
|
|
|
if move_file:
|
|
# Prefer native move; fall back to copy+delete with progress on failure.
|
|
try:
|
|
shutil.move(str(file_path), str(save_file))
|
|
debug(f"Local move: {save_file}", file=sys.stderr)
|
|
# After a move, the original path no longer exists; use destination for subsequent ops.
|
|
file_path = save_file
|
|
except Exception:
|
|
_copy_with_progress(file_path, save_file, label=f"folder:{self._name} move")
|
|
try:
|
|
file_path.unlink(missing_ok=True) # type: ignore[arg-type]
|
|
except Exception:
|
|
try:
|
|
if file_path.exists():
|
|
file_path.unlink()
|
|
except Exception:
|
|
pass
|
|
debug(f"Local move (copy+delete): {save_file}", file=sys.stderr)
|
|
file_path = save_file
|
|
else:
|
|
_copy_with_progress(file_path, save_file, label=f"folder:{self._name} copy")
|
|
debug(f"Local copy: {save_file}", file=sys.stderr)
|
|
|
|
# Best-effort: capture duration for media
|
|
duration_value: float | None = None
|
|
try:
|
|
from SYS.utils import ffprobe
|
|
probe = ffprobe(str(save_file))
|
|
duration = probe.get("duration")
|
|
if isinstance(duration, (int, float)) and duration > 0:
|
|
duration_value = float(duration)
|
|
except Exception:
|
|
duration_value = None
|
|
|
|
# Save to database
|
|
with API_folder_store(Path(self._location)) as db:
|
|
db.get_or_create_file_entry(save_file)
|
|
# Save metadata including extension
|
|
ext_clean = file_ext.lstrip('.') if file_ext else ''
|
|
db.save_metadata(save_file, {
|
|
'hash': file_hash,
|
|
'ext': ext_clean,
|
|
'size': save_file.stat().st_size,
|
|
'duration': duration_value,
|
|
})
|
|
|
|
# Add tags if provided
|
|
if tag_list:
|
|
self.add_tag(file_hash, tag_list)
|
|
|
|
# Add url if provided
|
|
if url:
|
|
self.add_url(file_hash, url)
|
|
|
|
##log(f"✓ Added to local storage: {save_file.name}", file=sys.stderr)
|
|
return file_hash
|
|
|
|
except Exception as exc:
|
|
log(f"❌ Local storage failed: {exc}", file=sys.stderr)
|
|
raise
|
|
|
|
def search(self, query: str, **kwargs: Any) -> list[Dict[str, Any]]:
|
|
"""Search local database for files by title tag or filename."""
|
|
from fnmatch import fnmatch
|
|
from API.folder import DatabaseAPI
|
|
import unicodedata
|
|
|
|
limit = kwargs.get("limit")
|
|
try:
|
|
limit = int(limit) if limit is not None else None
|
|
except (TypeError, ValueError):
|
|
limit = None
|
|
if isinstance(limit, int) and limit <= 0:
|
|
limit = None
|
|
|
|
query = query.lower()
|
|
query_lower = query # Ensure query_lower is defined for all code paths
|
|
|
|
def _normalize_namespace_text(text: str, *, allow_wildcards: bool) -> str:
|
|
"""Normalize tag namespace values for consistent matching.
|
|
|
|
Removes control/format chars (e.g. zero-width spaces) that frequently appear in scraped tags,
|
|
collapses whitespace, and lowercases.
|
|
"""
|
|
s = str(text or "")
|
|
# Normalize newlines/tabs/etc to spaces early.
|
|
s = s.replace("\r", " ").replace("\n", " ").replace("\t", " ")
|
|
# Drop control / format chars (Cc/Cf) while preserving wildcard tokens when requested.
|
|
cleaned_chars: list[str] = []
|
|
for ch in s:
|
|
if allow_wildcards and ch in {"*", "?"}:
|
|
cleaned_chars.append(ch)
|
|
continue
|
|
cat = unicodedata.category(ch)
|
|
if cat in {"Cc", "Cf"}:
|
|
continue
|
|
cleaned_chars.append(ch)
|
|
s = "".join(cleaned_chars)
|
|
# Collapse any remaining unicode whitespace runs.
|
|
s = " ".join(s.split())
|
|
return s.strip().lower()
|
|
|
|
def _normalize_ext_filter(value: str) -> str:
|
|
v = str(value or "").strip().lower().lstrip('.')
|
|
v = "".join(ch for ch in v if ch.isalnum())
|
|
return v
|
|
|
|
def _extract_system_filetype_ext(text: str) -> Optional[str]:
|
|
# Match: system:filetype = png (allow optional '=' and flexible spaces)
|
|
m = re.search(r"\bsystem:filetype\s*(?:=\s*)?([^\s,]+)", text)
|
|
if not m:
|
|
m = re.search(r"\bsystem:filetype\s*=\s*([^\s,]+)", text)
|
|
if not m:
|
|
return None
|
|
return _normalize_ext_filter(m.group(1)) or None
|
|
|
|
# Support `ext:<value>` and Hydrus-style `system:filetype = <value>` anywhere
|
|
# in the query (space or comma separated).
|
|
ext_filter: Optional[str] = None
|
|
try:
|
|
sys_ext = _extract_system_filetype_ext(query_lower)
|
|
if sys_ext:
|
|
ext_filter = sys_ext
|
|
query_lower = re.sub(r"\s*\bsystem:filetype\s*(?:=\s*)?[^\s,]+", " ", query_lower)
|
|
query_lower = re.sub(r"\s{2,}", " ", query_lower).strip().strip(',')
|
|
query = query_lower
|
|
|
|
m = re.search(r"\bext:([^\s,]+)", query_lower)
|
|
if not m:
|
|
m = re.search(r"\bextension:([^\s,]+)", query_lower)
|
|
if m:
|
|
ext_filter = _normalize_ext_filter(m.group(1)) or None
|
|
query_lower = re.sub(r"\s*\b(?:ext|extension):[^\s,]+", " ", query_lower)
|
|
query_lower = re.sub(r"\s{2,}", " ", query_lower).strip().strip(',')
|
|
query = query_lower
|
|
except Exception:
|
|
ext_filter = None
|
|
|
|
match_all = query == "*" or (not query and bool(ext_filter))
|
|
results = []
|
|
search_dir = Path(self._location).expanduser()
|
|
|
|
def _url_like_pattern(value: str) -> str:
|
|
# Interpret user patterns as substring matches (with optional glob wildcards).
|
|
v = (value or "").strip().lower()
|
|
if not v or v == "*":
|
|
return "%"
|
|
v = v.replace("%", "\\%").replace("_", "\\_")
|
|
v = v.replace("*", "%").replace("?", "_")
|
|
if "%" not in v and "_" not in v:
|
|
return f"%{v}%"
|
|
if not v.startswith("%"):
|
|
v = "%" + v
|
|
if not v.endswith("%"):
|
|
v = v + "%"
|
|
return v
|
|
|
|
tokens = [t.strip() for t in query.split(',') if t.strip()]
|
|
|
|
if not match_all and len(tokens) == 1 and _normalize_hash(query):
|
|
debug("Hash queries require 'hash:' prefix for local search")
|
|
return results
|
|
|
|
if not match_all and _normalize_hash(query):
|
|
debug("Hash queries require 'hash:' prefix for local search")
|
|
return results
|
|
|
|
def _create_entry(file_path: Path, tags: list[str], size_bytes: int | None, db_hash: Optional[str]) -> dict[str, Any]:
|
|
path_str = str(file_path)
|
|
# Get title from tags if available, otherwise use hash as fallback
|
|
title = next((t.split(':', 1)[1] for t in tags if t.lower().startswith('title:')), None)
|
|
if not title:
|
|
# Fallback to hash if no title tag exists
|
|
hash_value = _resolve_file_hash(db_hash, file_path)
|
|
title = hash_value if hash_value else file_path.stem
|
|
|
|
# Extract extension from file path
|
|
ext = file_path.suffix.lstrip('.')
|
|
if not ext:
|
|
# Fallback: try to extract from title (original filename might be in title)
|
|
title_path = Path(title)
|
|
ext = title_path.suffix.lstrip('.')
|
|
|
|
# Build clean entry with only necessary fields
|
|
hash_value = _resolve_file_hash(db_hash, file_path)
|
|
entry = {
|
|
"title": title,
|
|
"ext": ext,
|
|
"path": path_str,
|
|
"target": path_str,
|
|
"store": self._name,
|
|
"size": size_bytes,
|
|
"hash": hash_value,
|
|
"tag": tags,
|
|
}
|
|
return entry
|
|
|
|
try:
|
|
if not search_dir.exists():
|
|
debug(f"Search directory does not exist: {search_dir}")
|
|
return results
|
|
|
|
try:
|
|
with DatabaseAPI(search_dir) as api:
|
|
ext_hashes: set[str] | None = None
|
|
if ext_filter:
|
|
# Fetch a bounded set of hashes to intersect with other filters.
|
|
ext_fetch_limit = (limit or 45) * 50
|
|
ext_hashes = api.get_file_hashes_by_ext(ext_filter, limit=ext_fetch_limit)
|
|
|
|
# ext-only search: query is empty (or coerced to match_all above).
|
|
if ext_filter and (not query_lower or query_lower == "*"):
|
|
rows = api.get_files_by_ext(ext_filter, limit)
|
|
for file_hash, file_path_str, size_bytes, ext in rows:
|
|
if not file_path_str:
|
|
continue
|
|
file_path = Path(file_path_str)
|
|
if not file_path.exists():
|
|
continue
|
|
if size_bytes is None:
|
|
try:
|
|
size_bytes = file_path.stat().st_size
|
|
except OSError:
|
|
size_bytes = None
|
|
tags = api.get_tags_for_file(file_hash)
|
|
entry = _create_entry(file_path, tags, size_bytes, file_hash)
|
|
try:
|
|
db_ext = str(ext or "").strip().lstrip('.')
|
|
if db_ext:
|
|
entry["ext"] = db_ext
|
|
except Exception:
|
|
pass
|
|
results.append(entry)
|
|
if limit is not None and len(results) >= limit:
|
|
return results
|
|
backend_label = str(getattr(self, "_name", "") or getattr(self, "NAME", "") or "folder")
|
|
debug(f"[folder:{backend_label}] {len(results)} result(s)")
|
|
return results
|
|
|
|
if tokens and len(tokens) > 1:
|
|
url_fetch_limit = (limit or 45) * 50
|
|
|
|
def _like_pattern(term: str) -> str:
|
|
return term.replace('*', '%').replace('?', '_')
|
|
|
|
def _ids_for_token(token: str) -> set[int]:
|
|
token = token.strip()
|
|
if not token:
|
|
return set()
|
|
|
|
if ':' in token and not token.startswith(':'):
|
|
namespace, pattern = token.split(':', 1)
|
|
namespace = namespace.strip().lower()
|
|
pattern = pattern.strip().lower()
|
|
|
|
if namespace == 'hash':
|
|
normalized_hash = _normalize_hash(pattern)
|
|
if not normalized_hash:
|
|
return set()
|
|
h = api.get_file_hash_by_hash(normalized_hash)
|
|
return {h} if h else set()
|
|
|
|
if namespace == 'url':
|
|
if not pattern or pattern == '*':
|
|
return api.get_file_hashes_with_any_url(limit=url_fetch_limit)
|
|
return api.get_file_hashes_by_url_like(_url_like_pattern(pattern), limit=url_fetch_limit)
|
|
|
|
if namespace == 'system':
|
|
# Hydrus-compatible query: system:filetype = png
|
|
m_ft = re.match(r"^filetype\s*(?:=\s*)?(.+)$", pattern)
|
|
if m_ft:
|
|
normalized_ext = _normalize_ext_filter(m_ft.group(1))
|
|
if not normalized_ext:
|
|
return set()
|
|
return api.get_file_hashes_by_ext(normalized_ext, limit=url_fetch_limit)
|
|
return set()
|
|
|
|
if namespace in {'ext', 'extension'}:
|
|
normalized_ext = _normalize_ext_filter(pattern)
|
|
if not normalized_ext:
|
|
return set()
|
|
return api.get_file_hashes_by_ext(normalized_ext, limit=url_fetch_limit)
|
|
|
|
if namespace == 'store':
|
|
if pattern not in {'local', 'file', 'filesystem'}:
|
|
return set()
|
|
return api.get_all_file_hashes()
|
|
|
|
query_pattern = f"{namespace}:%"
|
|
tag_rows = api.get_file_hashes_by_tag_pattern(query_pattern)
|
|
matched: set[str] = set()
|
|
for file_hash, tag_val in tag_rows:
|
|
if not tag_val:
|
|
continue
|
|
tag_lower = str(tag_val).lower()
|
|
if not tag_lower.startswith(f"{namespace}:"):
|
|
continue
|
|
value = _normalize_namespace_text(tag_lower[len(namespace) + 1 :], allow_wildcards=False)
|
|
pat = _normalize_namespace_text(pattern, allow_wildcards=True)
|
|
if fnmatch(value, pat):
|
|
matched.add(file_hash)
|
|
return matched
|
|
|
|
term = token.lower()
|
|
like_pattern = f"%{_like_pattern(term)}%"
|
|
hashes = api.get_file_hashes_by_path_pattern(like_pattern)
|
|
hashes.update(api.get_file_hashes_by_tag_substring(like_pattern))
|
|
return hashes
|
|
|
|
try:
|
|
matching_hashes: set[str] | None = None
|
|
for token in tokens:
|
|
hashes = _ids_for_token(token)
|
|
matching_hashes = hashes if matching_hashes is None else matching_hashes & hashes
|
|
if not matching_hashes:
|
|
return results
|
|
|
|
if ext_hashes is not None:
|
|
matching_hashes = (matching_hashes or set()) & ext_hashes
|
|
if not matching_hashes:
|
|
return results
|
|
|
|
if not matching_hashes:
|
|
return results
|
|
|
|
rows = api.get_file_metadata(matching_hashes, limit)
|
|
for file_hash, file_path_str, size_bytes, ext in rows:
|
|
if not file_path_str:
|
|
continue
|
|
file_path = Path(file_path_str)
|
|
if not file_path.exists():
|
|
continue
|
|
if size_bytes is None:
|
|
try:
|
|
size_bytes = file_path.stat().st_size
|
|
except OSError:
|
|
size_bytes = None
|
|
tags = api.get_tags_for_file(file_hash)
|
|
entry = _create_entry(file_path, tags, size_bytes, file_hash)
|
|
try:
|
|
db_ext = str(ext or "").strip().lstrip('.')
|
|
if db_ext:
|
|
entry["ext"] = db_ext
|
|
except Exception:
|
|
pass
|
|
results.append(entry)
|
|
if limit is not None and len(results) >= limit:
|
|
return results
|
|
return results
|
|
except Exception as exc:
|
|
log(f"⚠️ AND search failed: {exc}", file=sys.stderr)
|
|
debug(f"AND search exception details: {exc}")
|
|
return []
|
|
|
|
if ":" in query and not query.startswith(":"):
|
|
namespace, pattern = query.split(":", 1)
|
|
namespace = namespace.strip().lower()
|
|
pattern = pattern.strip().lower()
|
|
debug(f"Performing namespace search: {namespace}:{pattern}")
|
|
|
|
if namespace == "hash":
|
|
normalized_hash = _normalize_hash(pattern)
|
|
if not normalized_hash:
|
|
return results
|
|
h = api.get_file_hash_by_hash(normalized_hash)
|
|
hashes = {h} if h else set()
|
|
rows = api.get_file_metadata(hashes, limit)
|
|
for file_hash, file_path_str, size_bytes, ext in rows:
|
|
if not file_path_str:
|
|
continue
|
|
file_path = Path(file_path_str)
|
|
if not file_path.exists():
|
|
continue
|
|
if size_bytes is None:
|
|
try:
|
|
size_bytes = file_path.stat().st_size
|
|
except OSError:
|
|
size_bytes = None
|
|
tags = api.get_tags_for_file(file_hash)
|
|
entry = _create_entry(file_path, tags, size_bytes, file_hash)
|
|
try:
|
|
db_ext = str(ext or "").strip().lstrip('.')
|
|
if db_ext:
|
|
entry["ext"] = db_ext
|
|
except Exception:
|
|
pass
|
|
results.append(entry)
|
|
if limit is not None and len(results) >= limit:
|
|
return results
|
|
return results
|
|
|
|
if namespace == "url":
|
|
if not pattern or pattern == "*":
|
|
rows = api.get_files_with_any_url(limit)
|
|
else:
|
|
rows = api.get_files_by_url_like(_url_like_pattern(pattern), limit)
|
|
for file_hash, file_path_str, size_bytes, ext in rows:
|
|
if not file_path_str:
|
|
continue
|
|
file_path = Path(file_path_str)
|
|
if not file_path.exists():
|
|
continue
|
|
if size_bytes is None:
|
|
try:
|
|
size_bytes = file_path.stat().st_size
|
|
except OSError:
|
|
size_bytes = None
|
|
tags = api.get_tags_for_file(file_hash)
|
|
entry = _create_entry(file_path, tags, size_bytes, file_hash)
|
|
results.append(entry)
|
|
if limit is not None and len(results) >= limit:
|
|
return results
|
|
return results
|
|
|
|
if namespace == "system":
|
|
# Hydrus-compatible query: system:filetype = png
|
|
m_ft = re.match(r"^filetype\s*(?:=\s*)?(.+)$", pattern)
|
|
if m_ft:
|
|
normalized_ext = _normalize_ext_filter(m_ft.group(1))
|
|
if not normalized_ext:
|
|
return results
|
|
rows = api.get_files_by_ext(normalized_ext, limit)
|
|
for file_hash, file_path_str, size_bytes, ext in rows:
|
|
if not file_path_str:
|
|
continue
|
|
file_path = Path(file_path_str)
|
|
if not file_path.exists():
|
|
continue
|
|
if size_bytes is None:
|
|
try:
|
|
size_bytes = file_path.stat().st_size
|
|
except OSError:
|
|
size_bytes = None
|
|
tags = api.get_tags_for_file(file_hash)
|
|
entry = _create_entry(file_path, tags, size_bytes, file_hash)
|
|
try:
|
|
db_ext = str(ext or "").strip().lstrip('.')
|
|
if db_ext:
|
|
entry["ext"] = db_ext
|
|
except Exception:
|
|
pass
|
|
results.append(entry)
|
|
if limit is not None and len(results) >= limit:
|
|
return results
|
|
return results
|
|
|
|
if namespace in {"ext", "extension"}:
|
|
normalized_ext = _normalize_ext_filter(pattern)
|
|
if not normalized_ext:
|
|
return results
|
|
rows = api.get_files_by_ext(normalized_ext, limit)
|
|
for file_hash, file_path_str, size_bytes, ext in rows:
|
|
if not file_path_str:
|
|
continue
|
|
file_path = Path(file_path_str)
|
|
if not file_path.exists():
|
|
continue
|
|
if size_bytes is None:
|
|
try:
|
|
size_bytes = file_path.stat().st_size
|
|
except OSError:
|
|
size_bytes = None
|
|
tags = api.get_tags_for_file(file_hash)
|
|
entry = _create_entry(file_path, tags, size_bytes, file_hash)
|
|
try:
|
|
db_ext = str(ext or "").strip().lstrip('.')
|
|
if db_ext:
|
|
entry["ext"] = db_ext
|
|
except Exception:
|
|
pass
|
|
results.append(entry)
|
|
if limit is not None and len(results) >= limit:
|
|
return results
|
|
return results
|
|
|
|
query_pattern = f"{namespace}:%"
|
|
rows = api.get_files_by_namespace_pattern(query_pattern, limit)
|
|
debug(f"Found {len(rows)} potential matches in DB")
|
|
|
|
for file_hash, file_path_str, size_bytes, ext in rows:
|
|
if not file_path_str:
|
|
continue
|
|
|
|
tags = api.get_tags_by_namespace_and_file(file_hash, query_pattern)
|
|
|
|
for tag in tags:
|
|
tag_lower = tag.lower()
|
|
if tag_lower.startswith(f"{namespace}:"):
|
|
value = _normalize_namespace_text(tag_lower[len(namespace) + 1 :], allow_wildcards=False)
|
|
pat = _normalize_namespace_text(pattern, allow_wildcards=True)
|
|
if fnmatch(value, pat):
|
|
if ext_hashes is not None and file_hash not in ext_hashes:
|
|
break
|
|
file_path = Path(file_path_str)
|
|
if file_path.exists():
|
|
if size_bytes is None:
|
|
size_bytes = file_path.stat().st_size
|
|
all_tags = api.get_tags_for_file(file_hash)
|
|
entry = _create_entry(file_path, all_tags, size_bytes, file_hash)
|
|
try:
|
|
db_ext = str(ext or "").strip().lstrip('.')
|
|
if db_ext:
|
|
entry["ext"] = db_ext
|
|
except Exception:
|
|
pass
|
|
results.append(entry)
|
|
else:
|
|
debug(f"File missing on disk: {file_path}")
|
|
break
|
|
|
|
if limit is not None and len(results) >= limit:
|
|
return results
|
|
elif not match_all:
|
|
# Strict tag-based search only (no filename/path searching).
|
|
terms = [t.strip() for t in query_lower.replace(',', ' ').split() if t.strip()]
|
|
if not terms:
|
|
terms = [query_lower]
|
|
|
|
fetch_limit = (limit or 45) * 50
|
|
|
|
# AND semantics across terms: each term must match at least one tag.
|
|
hits: dict[str, dict[str, Any]] = {}
|
|
for term in terms:
|
|
tag_pattern = f"%{term}%"
|
|
term_rows = api.get_files_by_namespace_pattern(tag_pattern, fetch_limit)
|
|
for file_hash, file_path_str, size_bytes, ext in term_rows:
|
|
if not file_path_str:
|
|
continue
|
|
if ext_hashes is not None and file_hash not in ext_hashes:
|
|
continue
|
|
entry = hits.get(file_hash)
|
|
if entry:
|
|
entry["count"] += 1
|
|
if size_bytes is not None:
|
|
entry["size"] = size_bytes
|
|
else:
|
|
hits[file_hash] = {
|
|
"path": file_path_str,
|
|
"size": size_bytes,
|
|
"hash": file_hash,
|
|
"count": 1,
|
|
}
|
|
|
|
required = len(terms)
|
|
seen_files: set[str] = set()
|
|
for file_hash, info in hits.items():
|
|
if info.get("count") != required:
|
|
continue
|
|
file_path_str = info.get("path")
|
|
if not file_path_str or file_path_str in seen_files:
|
|
continue
|
|
file_path = Path(file_path_str)
|
|
if not file_path.exists():
|
|
continue
|
|
seen_files.add(file_path_str)
|
|
|
|
size_bytes = info.get("size")
|
|
if size_bytes is None:
|
|
try:
|
|
size_bytes = file_path.stat().st_size
|
|
except OSError:
|
|
size_bytes = None
|
|
|
|
tags = api.get_tags_for_file(file_hash)
|
|
entry_obj = _create_entry(file_path, tags, size_bytes, info.get("hash"))
|
|
results.append(entry_obj)
|
|
if limit is not None and len(results) >= limit:
|
|
break
|
|
|
|
else:
|
|
rows = api.get_all_files(limit)
|
|
for file_hash, file_path_str, size_bytes, ext in rows:
|
|
if file_path_str:
|
|
if ext_hashes is not None and file_hash not in ext_hashes:
|
|
continue
|
|
file_path = Path(file_path_str)
|
|
if file_path.exists():
|
|
if size_bytes is None:
|
|
size_bytes = file_path.stat().st_size
|
|
|
|
tags = api.get_tags_for_file(file_hash)
|
|
entry = _create_entry(file_path, tags, size_bytes, file_hash)
|
|
try:
|
|
db_ext = str(ext or "").strip().lstrip('.')
|
|
if db_ext:
|
|
entry["ext"] = db_ext
|
|
except Exception:
|
|
pass
|
|
results.append(entry)
|
|
|
|
backend_label = str(getattr(self, "_name", "") or getattr(self, "NAME", "") or "folder")
|
|
debug(f"[folder:{backend_label}] {len(results)} result(s)")
|
|
return results
|
|
|
|
except Exception as e:
|
|
log(f"⚠️ Database search failed: {e}", file=sys.stderr)
|
|
debug(f"DB search exception details: {e}")
|
|
return []
|
|
|
|
except Exception as exc:
|
|
log(f"❌ Local search failed: {exc}", file=sys.stderr)
|
|
raise
|
|
|
|
|
|
def _resolve_library_root(self, file_path: Path, config: Dict[str, Any]) -> Optional[Path]:
|
|
"""Return the library root containing medios-macina.db.
|
|
|
|
Prefer the store's configured location, then config override, then walk parents
|
|
of the file path to find a directory with medios-macina.db."""
|
|
candidates: list[Path] = []
|
|
if self._location:
|
|
candidates.append(Path(self._location).expanduser())
|
|
cfg_root = get_local_storage_path(config) if config else None
|
|
if cfg_root:
|
|
candidates.append(Path(cfg_root).expanduser())
|
|
|
|
for root in candidates:
|
|
db_path = root / "medios-macina.db"
|
|
if db_path.exists():
|
|
return root
|
|
|
|
try:
|
|
for parent in [file_path] + list(file_path.parents):
|
|
db_path = parent / "medios-macina.db"
|
|
if db_path.exists():
|
|
return parent
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def get_file(self, file_hash: str, **kwargs: Any) -> Optional[Path]:
|
|
"""Retrieve file by hash, returning path to the file.
|
|
|
|
Args:
|
|
file_hash: SHA256 hash of the file (64-char hex string)
|
|
|
|
Returns:
|
|
Path to the file or None if not found
|
|
"""
|
|
try:
|
|
# Normalize the hash
|
|
normalized_hash = _normalize_hash(file_hash)
|
|
if not normalized_hash:
|
|
return None
|
|
|
|
search_dir = Path(self._location).expanduser()
|
|
from API.folder import API_folder_store
|
|
|
|
with API_folder_store(search_dir) as db:
|
|
# Search for file by hash
|
|
file_path = db.search_hash(normalized_hash)
|
|
|
|
if file_path and file_path.exists():
|
|
return file_path
|
|
|
|
return None
|
|
|
|
except Exception as exc:
|
|
debug(f"Failed to get file for hash {file_hash}: {exc}")
|
|
return None
|
|
|
|
def get_metadata(self, file_hash: str) -> Optional[Dict[str, Any]]:
|
|
"""Get metadata for a file from the database by hash.
|
|
|
|
Args:
|
|
file_hash: SHA256 hash of the file (64-char hex string)
|
|
|
|
Returns:
|
|
Dict with metadata fields (ext, size, hash, duration, etc.) or None if not found
|
|
"""
|
|
try:
|
|
# Normalize the hash
|
|
normalized_hash = _normalize_hash(file_hash)
|
|
if not normalized_hash:
|
|
return None
|
|
|
|
search_dir = Path(self._location).expanduser()
|
|
from API.folder import DatabaseAPI
|
|
|
|
with DatabaseAPI(search_dir) as api:
|
|
# Get file hash
|
|
file_hash_result = api.get_file_hash_by_hash(normalized_hash)
|
|
if not file_hash_result:
|
|
return None
|
|
|
|
# Query metadata directly from database
|
|
cursor = api.get_cursor()
|
|
cursor.execute("""
|
|
SELECT * FROM metadata WHERE hash = ?
|
|
""", (file_hash_result,))
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return None
|
|
|
|
metadata = dict(row)
|
|
|
|
# Canonicalize metadata keys (no legacy aliases)
|
|
if "file_path" in metadata and "path" not in metadata:
|
|
metadata["path"] = metadata.get("file_path")
|
|
metadata.pop("file_path", None)
|
|
|
|
# Parse JSON fields
|
|
for field in ['url', 'relationships']:
|
|
if metadata.get(field):
|
|
try:
|
|
metadata[field] = json.loads(metadata[field])
|
|
except (json.JSONDecodeError, TypeError):
|
|
metadata[field] = [] if field == 'url' else []
|
|
|
|
return metadata
|
|
except Exception as exc:
|
|
debug(f"Failed to get metadata for hash {file_hash}: {exc}")
|
|
return None
|
|
|
|
def get_tag(self, file_identifier: str, **kwargs: Any) -> Tuple[List[str], str]:
|
|
"""Get tags for a local file by hash.
|
|
|
|
Returns:
|
|
Tuple of (tags_list, store_name) where store_name is the actual store name
|
|
"""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
file_hash = file_identifier
|
|
if self._location:
|
|
try:
|
|
with API_folder_store(Path(self._location)) as db:
|
|
db_tags = db.get_tags(file_hash)
|
|
if db_tags:
|
|
# Return actual store name instead of generic "local_db"
|
|
store_name = self._name if self._name else "local"
|
|
return [str(t).strip().lower() for t in db_tags if isinstance(t, str) and t.strip()], store_name
|
|
except Exception as exc:
|
|
debug(f"Local DB lookup failed: {exc}")
|
|
return [], "unknown"
|
|
except Exception as exc:
|
|
debug(f"get_tags failed for local file: {exc}")
|
|
return [], "unknown"
|
|
|
|
def add_tag(self, hash: str, tag: List[str], **kwargs: Any) -> bool:
|
|
"""Add tags to a local file by hash (via API_folder_store).
|
|
|
|
Handles namespace collapsing: when adding namespace:value, removes existing namespace:* tags.
|
|
Returns True if tags were successfully added.
|
|
"""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
if not self._location:
|
|
return False
|
|
|
|
try:
|
|
with API_folder_store(Path(self._location)) as db:
|
|
existing_tags = [t for t in (db.get_tags(hash) or []) if isinstance(t, str) and t.strip()]
|
|
|
|
from metadata import compute_namespaced_tag_overwrite
|
|
|
|
_to_remove, _to_add, merged = compute_namespaced_tag_overwrite(existing_tags, tag or [])
|
|
if not _to_remove and not _to_add:
|
|
return True
|
|
|
|
# Folder DB tag table is case-sensitive and add_tags_to_hash() is additive.
|
|
# To enforce lowercase-only tags and namespace overwrites, rewrite the full tag set.
|
|
cursor = db.connection.cursor()
|
|
cursor.execute("DELETE FROM tags WHERE hash = ?", (hash,))
|
|
for t in merged:
|
|
t = str(t).strip().lower()
|
|
if t:
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO tags (hash, tag) VALUES (?, ?)",
|
|
(hash, t),
|
|
)
|
|
db.connection.commit()
|
|
try:
|
|
db._update_metadata_modified_time(hash)
|
|
except Exception:
|
|
pass
|
|
return True
|
|
except Exception as exc:
|
|
debug(f"Local DB add_tags failed: {exc}")
|
|
return False
|
|
except Exception as exc:
|
|
debug(f"add_tag failed for local file: {exc}")
|
|
return False
|
|
|
|
def delete_tag(self, file_identifier: str, tags: List[str], **kwargs: Any) -> bool:
|
|
"""Remove tags from a local file by hash."""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
file_hash = file_identifier
|
|
if self._location:
|
|
try:
|
|
with API_folder_store(Path(self._location)) as db:
|
|
tag_list = [str(t).strip().lower() for t in (tags or []) if isinstance(t, str) and str(t).strip()]
|
|
if not tag_list:
|
|
return True
|
|
db.remove_tags_from_hash(file_hash, tag_list)
|
|
return True
|
|
except Exception as exc:
|
|
debug(f"Local DB remove_tags failed: {exc}")
|
|
return False
|
|
except Exception as exc:
|
|
debug(f"delete_tag failed for local file: {exc}")
|
|
return False
|
|
|
|
def get_url(self, file_identifier: str, **kwargs: Any) -> List[str]:
|
|
"""Get known url for a local file by hash."""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
file_hash = file_identifier
|
|
if self._location:
|
|
try:
|
|
from metadata import normalize_urls
|
|
with API_folder_store(Path(self._location)) as db:
|
|
meta = db.get_metadata(file_hash) or {}
|
|
urls = normalize_urls(meta.get("url"))
|
|
return urls
|
|
except Exception as exc:
|
|
debug(f"Local DB get_metadata failed: {exc}")
|
|
return []
|
|
except Exception as exc:
|
|
debug(f"get_url failed for local file: {exc}")
|
|
return []
|
|
|
|
def add_url(self, file_identifier: str, url: List[str], **kwargs: Any) -> bool:
|
|
"""Add known url to a local file by hash."""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
file_hash = file_identifier
|
|
if self._location:
|
|
try:
|
|
from metadata import normalize_urls
|
|
with API_folder_store(Path(self._location)) as db:
|
|
meta = db.get_metadata(file_hash) or {}
|
|
existing_urls = normalize_urls(meta.get("url"))
|
|
incoming_urls = normalize_urls(url)
|
|
changed = False
|
|
for u in list(incoming_urls or []):
|
|
if not u:
|
|
continue
|
|
if u not in existing_urls:
|
|
existing_urls.append(u)
|
|
changed = True
|
|
if changed:
|
|
db.update_metadata_by_hash(file_hash, {"url": existing_urls})
|
|
return True
|
|
except Exception as exc:
|
|
debug(f"Local DB add_url failed: {exc}")
|
|
return False
|
|
except Exception as exc:
|
|
debug(f"add_url failed for local file: {exc}")
|
|
return False
|
|
|
|
def add_url_bulk(self, items: List[tuple[str, List[str]]], **kwargs: Any) -> bool:
|
|
"""Add known urls to many local files in one DB session.
|
|
|
|
This is a performance optimization used by cmdlets that receive many PipeObjects.
|
|
"""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
if not self._location:
|
|
return False
|
|
|
|
# Normalize + coalesce duplicates per hash.
|
|
try:
|
|
from metadata import normalize_urls
|
|
except Exception:
|
|
normalize_urls = None # type: ignore
|
|
|
|
merged_by_hash: Dict[str, List[str]] = {}
|
|
for file_identifier, url_list in (items or []):
|
|
file_hash = str(file_identifier or "").strip().lower()
|
|
if not file_hash:
|
|
continue
|
|
|
|
incoming: List[str]
|
|
if normalize_urls is not None:
|
|
try:
|
|
incoming = normalize_urls(url_list)
|
|
except Exception:
|
|
incoming = [str(u).strip() for u in (url_list or []) if str(u).strip()]
|
|
else:
|
|
incoming = [str(u).strip() for u in (url_list or []) if str(u).strip()]
|
|
|
|
if not incoming:
|
|
continue
|
|
|
|
existing = merged_by_hash.get(file_hash) or []
|
|
for u in incoming:
|
|
if u and u not in existing:
|
|
existing.append(u)
|
|
merged_by_hash[file_hash] = existing
|
|
|
|
if not merged_by_hash:
|
|
return True
|
|
|
|
import json
|
|
|
|
with API_folder_store(Path(self._location)) as db:
|
|
conn = getattr(db, "connection", None)
|
|
if conn is None:
|
|
return False
|
|
cursor = conn.cursor()
|
|
|
|
# Ensure metadata rows exist (may be needed for older entries).
|
|
for file_hash in merged_by_hash.keys():
|
|
try:
|
|
cursor.execute("INSERT OR IGNORE INTO metadata (hash) VALUES (?)", (file_hash,))
|
|
except Exception:
|
|
continue
|
|
|
|
# Load existing urls for all hashes in chunks.
|
|
existing_urls_by_hash: Dict[str, List[str]] = {h: [] for h in merged_by_hash.keys()}
|
|
hashes = list(merged_by_hash.keys())
|
|
chunk_size = 400
|
|
for i in range(0, len(hashes), chunk_size):
|
|
chunk = hashes[i : i + chunk_size]
|
|
if not chunk:
|
|
continue
|
|
placeholders = ",".join(["?"] * len(chunk))
|
|
try:
|
|
cursor.execute(f"SELECT hash, url FROM metadata WHERE hash IN ({placeholders})", chunk)
|
|
rows = cursor.fetchall() or []
|
|
except Exception:
|
|
rows = []
|
|
|
|
for row in rows:
|
|
try:
|
|
row_hash = str(row[0]).strip().lower()
|
|
except Exception:
|
|
continue
|
|
raw_urls = None
|
|
try:
|
|
raw_urls = row[1]
|
|
except Exception:
|
|
raw_urls = None
|
|
|
|
parsed_urls: List[str] = []
|
|
if raw_urls:
|
|
try:
|
|
parsed = json.loads(raw_urls)
|
|
if normalize_urls is not None:
|
|
parsed_urls = normalize_urls(parsed)
|
|
else:
|
|
if isinstance(parsed, list):
|
|
parsed_urls = [str(u).strip() for u in parsed if str(u).strip()]
|
|
except Exception:
|
|
parsed_urls = []
|
|
|
|
existing_urls_by_hash[row_hash] = parsed_urls
|
|
|
|
# Compute updates and write in one commit.
|
|
updates: List[tuple[str, str]] = []
|
|
for file_hash, incoming_urls in merged_by_hash.items():
|
|
existing_urls = existing_urls_by_hash.get(file_hash) or []
|
|
final = list(existing_urls)
|
|
for u in incoming_urls:
|
|
if u and u not in final:
|
|
final.append(u)
|
|
if final != existing_urls:
|
|
try:
|
|
updates.append((json.dumps(final), file_hash))
|
|
except Exception:
|
|
continue
|
|
|
|
if updates:
|
|
cursor.executemany(
|
|
"UPDATE metadata SET url = ?, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
|
|
updates,
|
|
)
|
|
|
|
conn.commit()
|
|
return True
|
|
except Exception as exc:
|
|
debug(f"add_url_bulk failed for local file: {exc}")
|
|
return False
|
|
|
|
def delete_url(self, file_identifier: str, url: List[str], **kwargs: Any) -> bool:
|
|
"""Delete known url from a local file by hash."""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
file_hash = file_identifier
|
|
if self._location:
|
|
try:
|
|
from metadata import normalize_urls
|
|
with API_folder_store(Path(self._location)) as db:
|
|
meta = db.get_metadata(file_hash) or {}
|
|
existing_urls = normalize_urls(meta.get("url"))
|
|
remove_set = {u for u in normalize_urls(url) if u}
|
|
if not remove_set:
|
|
return False
|
|
new_urls = [u for u in existing_urls if u not in remove_set]
|
|
if new_urls != existing_urls:
|
|
db.update_metadata_by_hash(file_hash, {"url": new_urls})
|
|
return True
|
|
except Exception as exc:
|
|
debug(f"Local DB delete_url failed: {exc}")
|
|
return False
|
|
except Exception as exc:
|
|
debug(f"delete_url failed for local file: {exc}")
|
|
return False
|
|
|
|
def delete_url_bulk(self, items: List[tuple[str, List[str]]], **kwargs: Any) -> bool:
|
|
"""Delete known urls from many local files in one DB session."""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
if not self._location:
|
|
return False
|
|
|
|
try:
|
|
from metadata import normalize_urls
|
|
except Exception:
|
|
normalize_urls = None # type: ignore
|
|
|
|
remove_by_hash: Dict[str, set[str]] = {}
|
|
for file_identifier, url_list in (items or []):
|
|
file_hash = str(file_identifier or "").strip().lower()
|
|
if not file_hash:
|
|
continue
|
|
|
|
incoming: List[str]
|
|
if normalize_urls is not None:
|
|
try:
|
|
incoming = normalize_urls(url_list)
|
|
except Exception:
|
|
incoming = [str(u).strip() for u in (url_list or []) if str(u).strip()]
|
|
else:
|
|
incoming = [str(u).strip() for u in (url_list or []) if str(u).strip()]
|
|
|
|
remove = {u for u in incoming if u}
|
|
if not remove:
|
|
continue
|
|
remove_by_hash.setdefault(file_hash, set()).update(remove)
|
|
|
|
if not remove_by_hash:
|
|
return True
|
|
|
|
import json
|
|
|
|
with API_folder_store(Path(self._location)) as db:
|
|
conn = getattr(db, "connection", None)
|
|
if conn is None:
|
|
return False
|
|
cursor = conn.cursor()
|
|
|
|
# Ensure metadata rows exist.
|
|
for file_hash in remove_by_hash.keys():
|
|
try:
|
|
cursor.execute("INSERT OR IGNORE INTO metadata (hash) VALUES (?)", (file_hash,))
|
|
except Exception:
|
|
continue
|
|
|
|
# Load existing urls for hashes in chunks.
|
|
existing_urls_by_hash: Dict[str, List[str]] = {h: [] for h in remove_by_hash.keys()}
|
|
hashes = list(remove_by_hash.keys())
|
|
chunk_size = 400
|
|
for i in range(0, len(hashes), chunk_size):
|
|
chunk = hashes[i : i + chunk_size]
|
|
if not chunk:
|
|
continue
|
|
placeholders = ",".join(["?"] * len(chunk))
|
|
try:
|
|
cursor.execute(f"SELECT hash, url FROM metadata WHERE hash IN ({placeholders})", chunk)
|
|
rows = cursor.fetchall() or []
|
|
except Exception:
|
|
rows = []
|
|
|
|
for row in rows:
|
|
try:
|
|
row_hash = str(row[0]).strip().lower()
|
|
except Exception:
|
|
continue
|
|
raw_urls = None
|
|
try:
|
|
raw_urls = row[1]
|
|
except Exception:
|
|
raw_urls = None
|
|
|
|
parsed_urls: List[str] = []
|
|
if raw_urls:
|
|
try:
|
|
parsed = json.loads(raw_urls)
|
|
if normalize_urls is not None:
|
|
parsed_urls = normalize_urls(parsed)
|
|
else:
|
|
if isinstance(parsed, list):
|
|
parsed_urls = [str(u).strip() for u in parsed if str(u).strip()]
|
|
except Exception:
|
|
parsed_urls = []
|
|
|
|
existing_urls_by_hash[row_hash] = parsed_urls
|
|
|
|
# Apply removals + write updates.
|
|
updates: List[tuple[str, str]] = []
|
|
for file_hash, remove_set in remove_by_hash.items():
|
|
existing_urls = existing_urls_by_hash.get(file_hash) or []
|
|
new_urls = [u for u in existing_urls if u not in remove_set]
|
|
if new_urls != existing_urls:
|
|
try:
|
|
updates.append((json.dumps(new_urls), file_hash))
|
|
except Exception:
|
|
continue
|
|
|
|
if updates:
|
|
cursor.executemany(
|
|
"UPDATE metadata SET url = ?, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
|
|
updates,
|
|
)
|
|
|
|
conn.commit()
|
|
return True
|
|
except Exception as exc:
|
|
debug(f"delete_url_bulk failed for local file: {exc}")
|
|
return False
|
|
|
|
def get_note(self, file_identifier: str, **kwargs: Any) -> Dict[str, str]:
|
|
"""Get notes for a local file by hash."""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
if not self._location:
|
|
return {}
|
|
file_hash = str(file_identifier or "").strip().lower()
|
|
if not _normalize_hash(file_hash):
|
|
return {}
|
|
with API_folder_store(Path(self._location)) as db:
|
|
getter = getattr(db, "get_notes", None)
|
|
if callable(getter):
|
|
notes = getter(file_hash)
|
|
return notes if isinstance(notes, dict) else {}
|
|
# Fallback: default-only
|
|
note = db.get_note(file_hash)
|
|
return {"default": str(note or "")} if note else {}
|
|
except Exception as exc:
|
|
debug(f"get_note failed for local file: {exc}")
|
|
return {}
|
|
|
|
def set_note(self, file_identifier: str, name: str, text: str, **kwargs: Any) -> bool:
|
|
"""Set a named note for a local file by hash."""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
if not self._location:
|
|
return False
|
|
file_hash = str(file_identifier or "").strip().lower()
|
|
if not _normalize_hash(file_hash):
|
|
return False
|
|
|
|
file_path = self.get_file(file_hash, **kwargs)
|
|
if not file_path or not isinstance(file_path, Path) or not file_path.exists():
|
|
return False
|
|
|
|
with API_folder_store(Path(self._location)) as db:
|
|
setter = getattr(db, "set_note", None)
|
|
if callable(setter):
|
|
setter(file_path, str(name), str(text))
|
|
return True
|
|
db.save_note(file_path, str(text))
|
|
return True
|
|
except Exception as exc:
|
|
debug(f"set_note failed for local file: {exc}")
|
|
return False
|
|
|
|
def set_note_bulk(self, items: List[tuple[str, str, str]], **kwargs: Any) -> bool:
|
|
"""Set notes for many local files in one DB session.
|
|
|
|
Preserves existing semantics by only setting notes for hashes that still
|
|
map to a file path that exists on disk.
|
|
"""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
if not self._location:
|
|
return False
|
|
|
|
# Normalize input.
|
|
normalized: List[tuple[str, str, str]] = []
|
|
for file_identifier, name, text in (items or []):
|
|
file_hash = str(file_identifier or "").strip().lower()
|
|
note_name = str(name or "").strip()
|
|
note_text = str(text or "")
|
|
if not file_hash or not _normalize_hash(file_hash) or not note_name:
|
|
continue
|
|
normalized.append((file_hash, note_name, note_text))
|
|
|
|
if not normalized:
|
|
return True
|
|
|
|
with API_folder_store(Path(self._location)) as db:
|
|
conn = getattr(db, "connection", None)
|
|
if conn is None:
|
|
return False
|
|
cursor = conn.cursor()
|
|
|
|
# Look up file paths for hashes in chunks (to verify existence).
|
|
wanted_hashes = sorted({h for (h, _n, _t) in normalized})
|
|
hash_to_path: Dict[str, str] = {}
|
|
chunk_size = 400
|
|
for i in range(0, len(wanted_hashes), chunk_size):
|
|
chunk = wanted_hashes[i : i + chunk_size]
|
|
if not chunk:
|
|
continue
|
|
placeholders = ",".join(["?"] * len(chunk))
|
|
try:
|
|
cursor.execute(f"SELECT hash, file_path FROM files WHERE hash IN ({placeholders})", chunk)
|
|
rows = cursor.fetchall() or []
|
|
except Exception:
|
|
rows = []
|
|
for row in rows:
|
|
try:
|
|
h = str(row[0]).strip().lower()
|
|
p = str(row[1]).strip()
|
|
except Exception:
|
|
continue
|
|
if h and p:
|
|
hash_to_path[h] = p
|
|
|
|
# Ensure notes rows exist and only write for existing files.
|
|
inserts: List[tuple[str, str, str]] = []
|
|
for h, note_name, note_text in normalized:
|
|
p = hash_to_path.get(h)
|
|
if not p:
|
|
continue
|
|
try:
|
|
if not Path(p).exists():
|
|
continue
|
|
except Exception:
|
|
continue
|
|
inserts.append((h, note_name, note_text))
|
|
|
|
if not inserts:
|
|
return False
|
|
|
|
# Prefer upsert when supported, else fall back to INSERT OR REPLACE.
|
|
try:
|
|
cursor.executemany(
|
|
"INSERT INTO notes (hash, name, note) VALUES (?, ?, ?) "
|
|
"ON CONFLICT(hash, name) DO UPDATE SET note = excluded.note, updated_at = CURRENT_TIMESTAMP",
|
|
inserts,
|
|
)
|
|
except Exception:
|
|
cursor.executemany(
|
|
"INSERT OR REPLACE INTO notes (hash, name, note) VALUES (?, ?, ?)",
|
|
inserts,
|
|
)
|
|
|
|
conn.commit()
|
|
return True
|
|
except Exception as exc:
|
|
debug(f"set_note_bulk failed for local file: {exc}")
|
|
return False
|
|
|
|
def delete_note(self, file_identifier: str, name: str, **kwargs: Any) -> bool:
|
|
"""Delete a named note for a local file by hash."""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
if not self._location:
|
|
return False
|
|
file_hash = str(file_identifier or "").strip().lower()
|
|
if not _normalize_hash(file_hash):
|
|
return False
|
|
with API_folder_store(Path(self._location)) as db:
|
|
deleter = getattr(db, "delete_note", None)
|
|
if callable(deleter):
|
|
deleter(file_hash, str(name))
|
|
return True
|
|
# Default-only fallback
|
|
if str(name).strip().lower() == "default":
|
|
deleter2 = getattr(db, "save_note", None)
|
|
if callable(deleter2):
|
|
file_path = self.get_file(file_hash, **kwargs)
|
|
if file_path and isinstance(file_path, Path) and file_path.exists():
|
|
deleter2(file_path, "")
|
|
return True
|
|
return False
|
|
except Exception as exc:
|
|
debug(f"delete_note failed for local file: {exc}")
|
|
return False
|
|
|
|
def delete_file(self, file_identifier: str, **kwargs: Any) -> bool:
|
|
"""Delete a file from the folder store.
|
|
|
|
Args:
|
|
file_identifier: The file path (as string) or hash of the file to delete
|
|
**kwargs: Optional parameters
|
|
|
|
Returns:
|
|
True if deletion succeeded, False otherwise
|
|
"""
|
|
from API.folder import API_folder_store
|
|
try:
|
|
if not self._location:
|
|
return False
|
|
|
|
raw = str(file_identifier or "").strip()
|
|
if not raw:
|
|
return False
|
|
|
|
store_root = Path(self._location).expanduser()
|
|
|
|
# Support deletion by hash (common for store items where `path` is the hash).
|
|
file_hash = _normalize_hash(raw)
|
|
resolved_path: Optional[Path] = None
|
|
with API_folder_store(store_root) as db:
|
|
if file_hash:
|
|
resolved_path = db.search_hash(file_hash)
|
|
else:
|
|
p = Path(raw)
|
|
resolved_path = p if p.is_absolute() else (store_root / p)
|
|
|
|
if resolved_path is None:
|
|
debug(f"delete_file: could not resolve identifier: {raw}")
|
|
return False
|
|
|
|
# Delete from database (also cleans up relationship backlinks).
|
|
db.delete_file(resolved_path)
|
|
|
|
# Delete the actual file from disk (best-effort).
|
|
try:
|
|
if resolved_path.exists():
|
|
resolved_path.unlink()
|
|
debug(f"Deleted file: {resolved_path}")
|
|
else:
|
|
debug(f"File not found on disk: {resolved_path}")
|
|
except Exception:
|
|
pass
|
|
|
|
return True
|
|
except Exception as exc:
|
|
debug(f"delete_file failed: {exc}")
|
|
return False
|