This commit is contained in:
2025-12-27 06:05:07 -08:00
parent 71b542ae91
commit 8d8a2637d5
9 changed files with 943 additions and 23 deletions

View File

@@ -117,6 +117,72 @@ class Add_File(Cmdlet):
stage_ctx = ctx.get_stage_context()
is_last_stage = (stage_ctx is None) or bool(getattr(stage_ctx, "is_last_stage", False))
# Directory-mode selector:
# - First pass: `add-file -store X -path <DIR>` should ONLY show a selectable table.
# - Second pass (triggered by @ selection expansion): re-run add-file with `-path file1,file2,...`
# and actually ingest/copy.
dir_scan_mode = False
dir_scan_results: Optional[List[Dict[str, Any]]] = None
explicit_path_list_results: Optional[List[Dict[str, Any]]] = None
if path_arg and location and not provider_name:
# Support comma-separated path lists: -path "file1,file2,file3"
# This is the mechanism used by @N expansion for directory tables.
try:
path_text = str(path_arg)
except Exception:
path_text = ""
if "," in path_text:
parts = [p.strip().strip('"') for p in path_text.split(",")]
parts = [p for p in parts if p]
batch: List[Dict[str, Any]] = []
for p in parts:
try:
file_path = Path(p)
except Exception:
continue
if not file_path.exists() or not file_path.is_file():
continue
ext = file_path.suffix.lower()
if ext not in SUPPORTED_MEDIA_EXTENSIONS:
continue
try:
hv = sha256_file(file_path)
except Exception:
continue
try:
size = file_path.stat().st_size
except Exception:
size = 0
batch.append({
"path": file_path,
"name": file_path.name,
"hash": hv,
"size": size,
"ext": ext,
})
if batch:
explicit_path_list_results = batch
# Clear path_arg so add-file doesn't treat it as a single path.
path_arg = None
else:
# Directory scan (selector table, no ingest yet)
try:
candidate_dir = Path(str(path_arg))
if candidate_dir.exists() and candidate_dir.is_dir():
dir_scan_mode = True
debug(f"[add-file] Scanning directory for batch add: {candidate_dir}")
dir_scan_results = Add_File._scan_directory_for_files(candidate_dir)
if dir_scan_results:
debug(f"[add-file] Found {len(dir_scan_results)} supported files in directory")
# Clear path_arg so it doesn't trigger single-item mode.
path_arg = None
except Exception as exc:
debug(f"[add-file] Directory scan failed: {exc}")
# Determine if -store targets a registered backend (vs a filesystem export path).
is_storage_backend_location = False
if location:
@@ -127,9 +193,16 @@ class Add_File(Cmdlet):
is_storage_backend_location = False
# Decide which items to process.
# - If directory scan was performed, use those results
# - If user provided -path (and it was not reinterpreted as destination), treat this invocation as single-item.
# - Otherwise, if piped input is a list, ingest each item.
if path_arg:
if explicit_path_list_results:
items_to_process = explicit_path_list_results
debug(f"[add-file] Using {len(items_to_process)} files from -path list")
elif dir_scan_results:
items_to_process = dir_scan_results
debug(f"[add-file] Using {len(items_to_process)} files from directory scan")
elif path_arg:
items_to_process: List[Any] = [result]
elif isinstance(result, list) and result:
items_to_process = list(result)
@@ -152,6 +225,65 @@ class Add_File(Cmdlet):
debug(f"[add-file] INPUT result is list with {len(result)} items")
debug(f"[add-file] PARSED args: location={location}, provider={provider_name}, delete={delete_after}")
# If this invocation was directory selector mode, show a selectable table and stop.
# The user then runs @N (optionally piped), which replays add-file with selected paths.
if dir_scan_mode:
try:
from result_table import ResultTable
from pathlib import Path as _Path
# Build base args to replay: keep everything except the directory -path.
base_args: List[str] = []
skip_next = False
for tok in list(args or []):
if skip_next:
skip_next = False
continue
t = str(tok)
if t in {"-path", "--path", "-p"}:
skip_next = True
continue
base_args.append(t)
table = ResultTable(title="Files in Directory", preserve_order=True)
table.set_table("add-file.directory")
table.set_source_command("add-file", base_args)
rows: List[Dict[str, Any]] = []
for file_info in (dir_scan_results or []):
p = file_info.get("path")
hp = str(file_info.get("hash") or "")
name = str(file_info.get("name") or "unknown")
try:
clean_title = _Path(name).stem
except Exception:
clean_title = name
ext = str(file_info.get("ext") or "").lstrip(".")
size = file_info.get("size", 0)
row_item = {
"path": str(p) if p is not None else "",
"hash": hp,
"title": clean_title,
"columns": [
("Title", clean_title),
("Hash", hp),
("Size", size),
("Ext", ext),
],
# Used by @N replay (CLI will combine selected rows into -path file1,file2,...)
"_selection_args": ["-path", str(p) if p is not None else ""],
}
rows.append(row_item)
table.add_result(row_item)
ctx.set_current_stage_table(table)
ctx.set_last_result_table(table, rows, subject={"table": "add-file.directory"})
log(f"✓ Found {len(rows)} files. Select with @N (e.g., @1 or @1-3).")
return 0
except Exception as exc:
debug(f"[add-file] Failed to display directory scan result table: {exc}")
collected_payloads: List[Dict[str, Any]] = []
pending_relationship_pairs: Dict[str, set[tuple[str, str]]] = {}
pending_url_associations: Dict[str, List[tuple[str, List[str]]]] = {}
@@ -976,7 +1108,23 @@ class Add_File(Cmdlet):
Returns (media_path_or_url, file_hash)
where media_path_or_url can be a Path object or a URL string.
"""
# PRIORITY 1: Try hash+store from result dict (most reliable for @N selections)
# PRIORITY 1a: Try hash+path from directory scan result (has 'path' and 'hash' keys)
if isinstance(result, dict):
result_path = result.get("path")
result_hash = result.get("hash")
# Check if this looks like a directory scan result (has path and hash but no 'store' key)
result_store = result.get("store")
if result_path and result_hash and not result_store:
try:
media_path = Path(result_path) if not isinstance(result_path, Path) else result_path
if media_path.exists() and media_path.is_file():
debug(f"[add-file] Using path+hash from directory scan: {media_path}")
pipe_obj.path = str(media_path)
return media_path, str(result_hash)
except Exception as exc:
debug(f"[add-file] Failed to use directory scan result: {exc}")
# PRIORITY 1b: Try hash+store from result dict (most reliable for @N selections)
if isinstance(result, dict):
result_hash = result.get("hash")
result_store = result.get("store")
@@ -1104,6 +1252,56 @@ class Add_File(Cmdlet):
log("File path could not be resolved")
return None, None
@staticmethod
def _scan_directory_for_files(directory: Path) -> List[Dict[str, Any]]:
"""Scan a directory for supported media files and return list of file info dicts.
Each dict contains:
- path: Path object
- name: filename
- hash: sha256 hash
- size: file size in bytes
- ext: file extension
"""
if not directory.exists() or not directory.is_dir():
return []
files_info: List[Dict[str, Any]] = []
try:
for item in directory.iterdir():
if not item.is_file():
continue
ext = item.suffix.lower()
if ext not in SUPPORTED_MEDIA_EXTENSIONS:
continue
# Compute hash
try:
file_hash = sha256_file(item)
except Exception as exc:
debug(f"Failed to hash {item}: {exc}")
continue
# Get file size
try:
size = item.stat().st_size
except Exception:
size = 0
files_info.append({
"path": item,
"name": item.name,
"hash": file_hash,
"size": size,
"ext": ext,
})
except Exception as exc:
debug(f"Error scanning directory {directory}: {exc}")
return files_info
@staticmethod
def _fetch_hydrus_path(
file_hash: str,

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from typing import Any, Dict, List, Sequence, Optional
from pathlib import Path
import sys
import re
from SYS.logger import log
@@ -26,6 +27,184 @@ from Store import Store
from SYS.utils import sha256_file
_FIELD_NAME_RE = re.compile(r"^[A-Za-z0-9_]+$")
def _normalize_title_for_extract(text: str) -> str:
"""Normalize common separators in titles for matching.
Helps when sources use unicode dashes or odd whitespace.
"""
s = str(text or "").strip()
if not s:
return s
# Common unicode dash variants -> '-'
s = s.replace("\u2013", "-") # en dash
s = s.replace("\u2014", "-") # em dash
s = s.replace("\u2212", "-") # minus sign
s = s.replace("\u2010", "-") # hyphen
s = s.replace("\u2011", "-") # non-breaking hyphen
s = s.replace("\u2012", "-") # figure dash
s = s.replace("\u2015", "-") # horizontal bar
return s
def _strip_title_prefix(text: str) -> str:
s = str(text or "").strip()
if s.lower().startswith("title:"):
s = s.split(":", 1)[1].strip()
return s
def _literal_to_title_pattern_regex(literal: str) -> str:
"""Convert a literal chunk of a template into a regex fragment.
Keeps punctuation literal, but treats any whitespace run as \\s*.
"""
out: List[str] = []
i = 0
while i < len(literal):
ch = literal[i]
if ch.isspace():
while i < len(literal) and literal[i].isspace():
i += 1
out.append(r"\\s*")
continue
out.append(re.escape(ch))
i += 1
return "".join(out)
def _compile_extract_template(template: str) -> tuple[re.Pattern[str], List[str]]:
"""Compile a simple (field) template into a regex.
Example template:
(artist) - (album) - (disk)-(track) (title)
This is *not* user-facing regex: we only support named fields in parentheses.
"""
tpl = str(template or "").strip()
if not tpl:
raise ValueError("empty extract template")
matches = list(re.finditer(r"\(([^)]+)\)", tpl))
if not matches:
raise ValueError("extract template must contain at least one (field)")
field_names: List[str] = []
parts: List[str] = [r"^\\s*"]
last_end = 0
for idx, m in enumerate(matches):
literal = tpl[last_end : m.start()]
if literal:
parts.append(_literal_to_title_pattern_regex(literal))
raw_name = (m.group(1) or "").strip()
if not raw_name or not _FIELD_NAME_RE.fullmatch(raw_name):
raise ValueError(f"invalid field name '{raw_name}' (use A-Z, 0-9, underscore)")
field_names.append(raw_name)
is_last = idx == (len(matches) - 1)
if is_last:
parts.append(fr"(?P<{raw_name}>.+)")
else:
parts.append(fr"(?P<{raw_name}>.+?)")
last_end = m.end()
tail = tpl[last_end:]
if tail:
parts.append(_literal_to_title_pattern_regex(tail))
parts.append(r"\\s*$")
rx = "".join(parts)
return re.compile(rx, flags=re.IGNORECASE), field_names
def _extract_tags_from_title(title_text: str, template: str) -> List[str]:
"""Extract (field)->value from title_text and return ['field:value', ...]."""
title_clean = _normalize_title_for_extract(_strip_title_prefix(title_text))
if not title_clean:
return []
pattern, field_names = _compile_extract_template(template)
m = pattern.match(title_clean)
if not m:
return []
out: List[str] = []
for name in field_names:
value = (m.group(name) or "").strip()
if not value:
continue
out.append(f"{name}:{value}")
return out
def _get_title_candidates_for_extraction(res: Any, existing_tags: Optional[List[str]] = None) -> List[str]:
"""Return a list of possible title strings in priority order."""
candidates: List[str] = []
def add_candidate(val: Any) -> None:
if val is None:
return
s = _normalize_title_for_extract(_strip_title_prefix(str(val)))
if not s:
return
if s not in candidates:
candidates.append(s)
# 1) Item's title field (may be a display title, not the title: tag)
try:
add_candidate(get_field(res, "title"))
except Exception:
pass
if isinstance(res, dict):
add_candidate(res.get("title"))
# 2) title: tag from either store tags or piped tags
tags = existing_tags if isinstance(existing_tags, list) else _extract_item_tags(res)
add_candidate(_extract_title_tag(tags) or "")
# 3) Filename stem
try:
path_val = get_field(res, "path")
if path_val:
p = Path(str(path_val))
add_candidate((p.stem or "").strip())
except Exception:
pass
return candidates
def _extract_tags_from_title_candidates(candidates: List[str], template: str) -> tuple[List[str], Optional[str]]:
"""Try candidates in order; return (tags, matched_candidate)."""
for c in candidates:
extracted = _extract_tags_from_title(c, template)
if extracted:
return extracted, c
return [], None
def _try_compile_extract_template(template: Optional[str]) -> tuple[Optional[re.Pattern[str]], Optional[str]]:
"""Compile template for debug; return (pattern, error_message)."""
if template is None:
return None, None
try:
pattern, _fields = _compile_extract_template(str(template))
return pattern, None
except Exception as exc:
return None, str(exc)
def _extract_title_tag(tags: List[str]) -> Optional[str]:
"""Return the value of the first title: tag if present."""
for t in tags:
@@ -242,6 +421,8 @@ class Add_Tag(Cmdlet):
CmdletArg("tag", type="string", required=False, description="One or more tag to add. Comma- or space-separated. Can also use {list_name} syntax. If omitted, uses tag from pipeline payload.", variadic=True),
SharedArgs.QUERY,
SharedArgs.STORE,
CmdletArg("-extract", type="string", description="Extract tags from the item's title using a simple template with (field) placeholders. Example: -extract \"(artist) - (album) - (disk)-(track) (title)\" will add artist:, album:, disk:, track:, title: tags."),
CmdletArg("--extract-debug", type="flag", description="Print debug info for -extract matching (matched title source and extracted tags)."),
CmdletArg("-duplicate", type="string", description="Copy existing tag values to new namespaces. Formats: title:album,artist (explicit) or title,album,artist (inferred)"),
CmdletArg("-list", type="string", description="Load predefined tag lists from adjective.json. Comma-separated list names (e.g., -list philosophy,occult)."),
CmdletArg("--all", type="flag", description="Include temporary files in tagging (by default, only tag non-temporary files)."),
@@ -258,6 +439,7 @@ class Add_Tag(Cmdlet):
" Inferred format: -duplicate title,album,artist (first is source, rest are targets)",
"- The source namespace must already exist in the file being tagged.",
"- Target namespaces that already have a value are skipped (not overwritten).",
"- Use -extract to derive namespaced tags from the current title (title field or title: tag) using a simple template.",
],
exec=self.run,
)
@@ -272,6 +454,13 @@ class Add_Tag(Cmdlet):
# Parse arguments
parsed = parse_cmdlet_args(args, self)
extract_template = parsed.get("extract")
if extract_template is not None:
extract_template = str(extract_template)
extract_debug = bool(parsed.get("extract-debug", False))
extract_debug_rx, extract_debug_err = _try_compile_extract_template(extract_template)
query_hash = sh.parse_single_hash_query(parsed.get("query"))
if parsed.get("query") and not query_hash:
log("[add_tag] Error: -query must be of the form hash:<sha256>", file=sys.stderr)
@@ -304,8 +493,10 @@ class Add_Tag(Cmdlet):
if isinstance(raw_tag, str):
raw_tag = [raw_tag]
# Fallback: if no tag provided explicitly, try to pull from first result payload
if not raw_tag and results:
# Fallback: if no tag provided explicitly, try to pull from first result payload.
# IMPORTANT: when -extract is used, users typically want *only* extracted tags,
# not "re-add whatever tags are already in the payload".
if not raw_tag and results and not extract_template:
first = results[0]
payload_tag = None
@@ -341,8 +532,12 @@ class Add_Tag(Cmdlet):
tag_to_add = parse_tag_arguments(raw_tag)
tag_to_add = expand_tag_groups(tag_to_add)
if not tag_to_add:
log("No tag provided to add", file=sys.stderr)
if not tag_to_add and not extract_template:
log("No tag provided to add (and no -extract template provided)", file=sys.stderr)
return 1
if extract_template and extract_debug and extract_debug_err:
log(f"[add_tag] extract template error: {extract_debug_err}", file=sys.stderr)
return 1
# Get other flags
@@ -355,6 +550,9 @@ class Add_Tag(Cmdlet):
store_registry = Store(config)
extract_matched_items = 0
extract_no_match_items = 0
for res in results:
store_name: Optional[str]
raw_hash: Optional[str]
@@ -389,6 +587,24 @@ class Add_Tag(Cmdlet):
existing_lower = {t.lower() for t in existing_tag_list if isinstance(t, str)}
item_tag_to_add = list(tag_to_add)
if extract_template:
candidates = _get_title_candidates_for_extraction(res, existing_tag_list)
extracted, matched = _extract_tags_from_title_candidates(candidates, extract_template)
if extracted:
extract_matched_items += 1
if extract_debug:
log(f"[add_tag] extract matched: {matched!r} -> {extracted}", file=sys.stderr)
for new_tag in extracted:
if new_tag.lower() not in existing_lower:
item_tag_to_add.append(new_tag)
else:
extract_no_match_items += 1
if extract_debug:
rx_preview = extract_debug_rx.pattern if extract_debug_rx else "<uncompiled>"
cand_preview = "; ".join([repr(c) for c in candidates[:3]])
log(f"[add_tag] extract no match for template {extract_template!r}. regex: {rx_preview!r}. candidates: {cand_preview}", file=sys.stderr)
item_tag_to_add = collapse_namespace_tag(item_tag_to_add, "title", prefer="last")
if duplicate_arg:
@@ -492,6 +708,24 @@ class Add_Tag(Cmdlet):
# Per-item tag list (do not mutate shared list)
item_tag_to_add = list(tag_to_add)
if extract_template:
candidates2 = _get_title_candidates_for_extraction(res, existing_tag_list)
extracted2, matched2 = _extract_tags_from_title_candidates(candidates2, extract_template)
if extracted2:
extract_matched_items += 1
if extract_debug:
log(f"[add_tag] extract matched: {matched2!r} -> {extracted2}", file=sys.stderr)
for new_tag in extracted2:
if new_tag.lower() not in existing_lower:
item_tag_to_add.append(new_tag)
else:
extract_no_match_items += 1
if extract_debug:
rx_preview2 = extract_debug_rx.pattern if extract_debug_rx else "<uncompiled>"
cand_preview2 = "; ".join([repr(c) for c in candidates2[:3]])
log(f"[add_tag] extract no match for template {extract_template!r}. regex: {rx_preview2!r}. candidates: {cand_preview2}", file=sys.stderr)
item_tag_to_add = collapse_namespace_tag(item_tag_to_add, "title", prefer="last")
# Handle -duplicate logic (copy existing tag to new namespaces)
@@ -563,6 +797,12 @@ class Add_Tag(Cmdlet):
f"[add_tag] Added {total_added} new tag(s) across {len(results)} item(s); modified {total_modified} item(s)",
file=sys.stderr,
)
if extract_template and extract_matched_items == 0:
log(f"[add_tag] extract: no matches for template '{extract_template}' across {len(results)} item(s)", file=sys.stderr)
elif extract_template and extract_no_match_items > 0 and extract_debug:
log(f"[add_tag] extract: matched {extract_matched_items}, no-match {extract_no_match_items}", file=sys.stderr)
return 0