kllk
Some checks failed
smoke-mm / Install & smoke test mm --help (push) Has been cancelled

This commit is contained in:
nose
2025-12-24 17:58:57 -08:00
parent d7fc8c4576
commit df24a0cb44
47 changed files with 8039 additions and 82 deletions

View File

@@ -536,6 +536,10 @@ class Folder(Store):
v = v + "%"
return v
def _like_pattern(term: str) -> str:
# Convert glob-like tokens to SQL LIKE wildcards.
return str(term or "").replace('*', '%').replace('?', '_')
tokens = [t.strip() for t in query.split(',') if t.strip()]
if not match_all and len(tokens) == 1 and _normalize_hash(query):
@@ -621,9 +625,6 @@ class Folder(Store):
if tokens and len(tokens) > 1:
url_fetch_limit = (limit or 45) * 50
def _like_pattern(term: str) -> str:
return term.replace('*', '%').replace('?', '_')
def _ids_for_token(token: str) -> set[int]:
token = token.strip()
if not token:
@@ -684,8 +685,22 @@ class Folder(Store):
term = token.lower()
like_pattern = f"%{_like_pattern(term)}%"
hashes = api.get_file_hashes_by_path_pattern(like_pattern)
hashes.update(api.get_file_hashes_by_tag_substring(like_pattern))
# Unqualified token: match file path, title: tags, and non-namespaced tags.
# Do NOT match other namespaces by default (e.g., artist:men at work).
hashes = set(api.get_file_hashes_by_path_pattern(like_pattern) or set())
try:
title_rows = api.get_files_by_namespace_pattern(f"title:{like_pattern}", url_fetch_limit)
hashes.update({row[0] for row in (title_rows or []) if row and row[0]})
except Exception:
pass
try:
simple_rows = api.get_files_by_simple_tag_pattern(like_pattern, url_fetch_limit)
hashes.update({row[0] for row in (simple_rows or []) if row and row[0]})
except Exception:
pass
return hashes
try:
@@ -892,58 +907,73 @@ class Folder(Store):
if limit is not None and len(results) >= limit:
return results
elif not match_all:
# Strict tag-based search only (no filename/path searching).
# Default (unqualified) search: AND semantics across terms.
# Each term must match at least one of:
# - file path (filename)
# - title: namespace tag
# - non-namespaced tag
# Other namespaces (artist:, series:, etc.) are excluded unless explicitly queried.
terms = [t.strip() for t in query_lower.replace(',', ' ').split() if t.strip()]
if not terms:
terms = [query_lower]
fetch_limit = (limit or 45) * 50
# AND semantics across terms: each term must match at least one tag.
hits: dict[str, dict[str, Any]] = {}
matching_hashes: Optional[set[str]] = None
for term in terms:
tag_pattern = f"%{term}%"
term_rows = api.get_files_by_namespace_pattern(tag_pattern, fetch_limit)
for file_hash, file_path_str, size_bytes, ext in term_rows:
if not file_path_str:
continue
if ext_hashes is not None and file_hash not in ext_hashes:
continue
entry = hits.get(file_hash)
if entry:
entry["count"] += 1
if size_bytes is not None:
entry["size"] = size_bytes
else:
hits[file_hash] = {
"path": file_path_str,
"size": size_bytes,
"hash": file_hash,
"count": 1,
}
required = len(terms)
seen_files: set[str] = set()
for file_hash, info in hits.items():
if info.get("count") != required:
if not term:
continue
file_path_str = info.get("path")
if not file_path_str or file_path_str in seen_files:
like_term = _like_pattern(term)
like_pattern = f"%{like_term}%"
term_hashes: set[str] = set()
try:
term_hashes.update(api.get_file_hashes_by_path_pattern(like_pattern))
except Exception:
pass
try:
title_rows = api.get_files_by_namespace_pattern(f"title:{like_pattern}", fetch_limit)
term_hashes.update({row[0] for row in (title_rows or []) if row and row[0]})
except Exception:
pass
try:
simple_rows = api.get_files_by_simple_tag_pattern(like_pattern, fetch_limit)
term_hashes.update({row[0] for row in (simple_rows or []) if row and row[0]})
except Exception:
pass
if ext_hashes is not None:
term_hashes &= ext_hashes
matching_hashes = term_hashes if matching_hashes is None else (matching_hashes & term_hashes)
if not matching_hashes:
return results
if not matching_hashes:
return results
rows = api.get_file_metadata(set(matching_hashes), limit)
for file_hash, file_path_str, size_bytes, ext in rows:
if not file_path_str:
continue
file_path = Path(file_path_str)
if not file_path.exists():
continue
seen_files.add(file_path_str)
size_bytes = info.get("size")
if size_bytes is None:
try:
size_bytes = file_path.stat().st_size
except OSError:
size_bytes = None
tags = api.get_tags_for_file(file_hash)
entry_obj = _create_entry(file_path, tags, size_bytes, info.get("hash"))
entry_obj = _create_entry(file_path, tags, size_bytes, file_hash)
try:
db_ext = str(ext or "").strip().lstrip('.')
if db_ext:
entry_obj["ext"] = db_ext
except Exception:
pass
results.append(entry_obj)
if limit is not None and len(results) >= limit:
break