f
This commit is contained in:
@@ -283,8 +283,7 @@ class SharedArgs:
|
||||
"location",
|
||||
type="enum",
|
||||
choices=["hydrus",
|
||||
"0x0",
|
||||
"local"],
|
||||
"0x0"],
|
||||
required=True,
|
||||
description="Destination location",
|
||||
)
|
||||
@@ -292,7 +291,7 @@ class SharedArgs:
|
||||
DELETE = CmdletArg(
|
||||
"delete",
|
||||
type="flag",
|
||||
description="Delete the file and its .tag after successful operation.",
|
||||
description="Delete the file after successful operation.",
|
||||
)
|
||||
|
||||
# Metadata arguments
|
||||
|
||||
@@ -34,7 +34,6 @@ coerce_to_path = sh.coerce_to_path
|
||||
build_pipeline_preview = sh.build_pipeline_preview
|
||||
get_field = sh.get_field
|
||||
|
||||
from API.folder import read_sidecar, find_sidecar, write_sidecar, API_folder_store
|
||||
from SYS.utils import sha256_file, unique_path
|
||||
from SYS.metadata import write_metadata
|
||||
|
||||
@@ -2401,31 +2400,6 @@ class Add_File(Cmdlet):
|
||||
List[str],
|
||||
List[str]]:
|
||||
"""Load sidecar metadata."""
|
||||
if store and store.lower() == "local":
|
||||
try:
|
||||
from SYS.config import get_local_storage_path
|
||||
|
||||
db_root = get_local_storage_path(config)
|
||||
if db_root:
|
||||
with API_folder_store(Path(db_root)) as db:
|
||||
file_hash = db.get_file_hash(media_path)
|
||||
if file_hash:
|
||||
tags = db.get_tags(file_hash) or []
|
||||
metadata = db.get_metadata(file_hash) or {}
|
||||
url = metadata.get("url") or []
|
||||
f_hash = metadata.get("hash") or file_hash
|
||||
if tags or url or f_hash:
|
||||
return None, f_hash, tags, url
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
sidecar_path = find_sidecar(media_path)
|
||||
if sidecar_path and sidecar_path.exists():
|
||||
h, t, u = read_sidecar(sidecar_path)
|
||||
return sidecar_path, h, t or [], u or []
|
||||
except Exception:
|
||||
pass
|
||||
return None, None, [], []
|
||||
|
||||
@staticmethod
|
||||
@@ -2465,26 +2439,7 @@ class Add_File(Cmdlet):
|
||||
duration: Any,
|
||||
media_kind: str,
|
||||
):
|
||||
payload = {
|
||||
"hash": f_hash,
|
||||
"url": url,
|
||||
"relationships": relationships or [],
|
||||
"duration": duration,
|
||||
"size": None,
|
||||
"ext": dest_path.suffix.lower(),
|
||||
"media_type": media_kind,
|
||||
"media_kind": media_kind,
|
||||
}
|
||||
try:
|
||||
payload["size"] = dest_path.stat().st_size
|
||||
except OSError:
|
||||
payload["size"] = None
|
||||
|
||||
with API_folder_store(library_root) as db:
|
||||
try:
|
||||
db.save_file_info(dest_path, payload, tags)
|
||||
except Exception as exc:
|
||||
log(f"⚠️ Failed to persist metadata: {exc}", file=sys.stderr)
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _copy_sidecars(source_path: Path, target_path: Path):
|
||||
|
||||
@@ -20,7 +20,6 @@ parse_cmdlet_args = sh.parse_cmdlet_args
|
||||
normalize_result_input = sh.normalize_result_input
|
||||
should_show_help = sh.should_show_help
|
||||
get_field = sh.get_field
|
||||
from API.folder import read_sidecar, find_sidecar, API_folder_store
|
||||
from Store import Store
|
||||
|
||||
CMDLET = Cmdlet(
|
||||
@@ -862,8 +861,7 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int:
|
||||
None)
|
||||
|
||||
# Legacy LOCAL STORAGE MODE: Handle relationships for local files
|
||||
# (kept for -path sidecar workflows; store/hash mode above is preferred)
|
||||
from API.folder import LocalLibrarySearchOptimizer
|
||||
# (kept as stub - folder store removed)
|
||||
from SYS.config import get_local_storage_path
|
||||
|
||||
local_storage_path = get_local_storage_path(config) if config else None
|
||||
|
||||
@@ -1,28 +1,13 @@
|
||||
"""Delete file relationships."""
|
||||
"""Delete file relationships (Currently Disabled)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict, Optional, Sequence
|
||||
import json
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
from SYS.logger import log
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from SYS import pipeline as ctx
|
||||
from . import _shared as sh
|
||||
|
||||
Cmdlet = sh.Cmdlet
|
||||
CmdletArg = sh.CmdletArg
|
||||
SharedArgs = sh.SharedArgs
|
||||
parse_cmdlet_args = sh.parse_cmdlet_args
|
||||
normalize_hash = sh.normalize_hash
|
||||
normalize_result_input = sh.normalize_result_input
|
||||
get_field = sh.get_field
|
||||
should_show_help = sh.should_show_help
|
||||
from API.folder import API_folder_store
|
||||
from Store import Store
|
||||
from SYS.config import get_local_storage_path
|
||||
|
||||
|
||||
def _extract_hash(item: Any) -> Optional[str]:
|
||||
@@ -33,471 +18,42 @@ def _extract_hash(item: Any) -> Optional[str]:
|
||||
return normalize_hash(str(h)) if h else None
|
||||
|
||||
|
||||
def _upsert_relationships(
|
||||
db: API_folder_store,
|
||||
file_hash: str,
|
||||
relationships: Dict[str,
|
||||
Any]
|
||||
) -> None:
|
||||
conn = db.connection
|
||||
if conn is None:
|
||||
raise RuntimeError("Store DB connection is not initialized")
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO metadata (hash, relationships)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT(hash) DO UPDATE SET
|
||||
relationships = excluded.relationships,
|
||||
time_modified = CURRENT_TIMESTAMP,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
""",
|
||||
(file_hash,
|
||||
json.dumps(relationships) if relationships else "{}"),
|
||||
)
|
||||
|
||||
|
||||
def _remove_reverse_link(
|
||||
db: API_folder_store,
|
||||
*,
|
||||
src_hash: str,
|
||||
dst_hash: str,
|
||||
rel_type: str
|
||||
) -> None:
|
||||
meta = db.get_metadata(dst_hash) or {}
|
||||
rels = meta.get("relationships") if isinstance(meta, dict) else None
|
||||
if not isinstance(rels, dict) or not rels:
|
||||
return
|
||||
|
||||
key_to_edit: Optional[str] = None
|
||||
for k in list(rels.keys()):
|
||||
if str(k).lower() == str(rel_type).lower():
|
||||
key_to_edit = str(k)
|
||||
break
|
||||
if not key_to_edit:
|
||||
return
|
||||
|
||||
bucket = rels.get(key_to_edit)
|
||||
if not isinstance(bucket, list) or not bucket:
|
||||
return
|
||||
|
||||
new_bucket = [h for h in bucket if str(h).lower() != str(src_hash).lower()]
|
||||
if new_bucket:
|
||||
rels[key_to_edit] = new_bucket
|
||||
else:
|
||||
try:
|
||||
del rels[key_to_edit]
|
||||
except Exception:
|
||||
rels[key_to_edit] = []
|
||||
|
||||
_upsert_relationships(db, dst_hash, rels)
|
||||
|
||||
|
||||
def _refresh_relationship_view_if_current(
|
||||
target_hash: Optional[str],
|
||||
target_path: Optional[str],
|
||||
other: Optional[str],
|
||||
config: Dict[str,
|
||||
Any],
|
||||
) -> None:
|
||||
"""If the current subject matches the target, refresh relationships via get-relationship."""
|
||||
def _run(args: List[str], config: Dict[str, Any]) -> int:
|
||||
try:
|
||||
from cmdlet import get as get_cmdlet # type: ignore
|
||||
except Exception:
|
||||
return
|
||||
|
||||
try:
|
||||
subject = ctx.get_last_result_subject()
|
||||
if subject is None:
|
||||
return
|
||||
|
||||
def norm(val: Any) -> str:
|
||||
return str(val).lower()
|
||||
|
||||
target_hashes = [norm(v) for v in [target_hash, other] if v]
|
||||
target_paths = [norm(v) for v in [target_path, other] if v]
|
||||
|
||||
subj_hashes: list[str] = []
|
||||
subj_paths: list[str] = []
|
||||
for field in ("hydrus_hash", "hash", "hash_hex", "file_hash"):
|
||||
val = get_field(subject, field)
|
||||
if val:
|
||||
subj_hashes.append(norm(val))
|
||||
for field in ("file_path", "path", "target"):
|
||||
val = get_field(subject, field)
|
||||
if val:
|
||||
subj_paths.append(norm(val))
|
||||
|
||||
is_match = False
|
||||
if target_hashes and any(h in subj_hashes for h in target_hashes):
|
||||
is_match = True
|
||||
if target_paths and any(p in subj_paths for p in target_paths):
|
||||
is_match = True
|
||||
if not is_match:
|
||||
return
|
||||
|
||||
refresh_args: list[str] = []
|
||||
if target_hash:
|
||||
refresh_args.extend(["-query", f"hash:{target_hash}"])
|
||||
|
||||
cmd = get_cmdlet("get-relationship")
|
||||
if not cmd:
|
||||
return
|
||||
cmd(subject, refresh_args, config)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
|
||||
"""Delete relationships from files.
|
||||
|
||||
Args:
|
||||
result: Input result(s) from previous cmdlet
|
||||
args: Command arguments
|
||||
config: CLI configuration
|
||||
|
||||
Returns:
|
||||
Exit code (0 = success)
|
||||
"""
|
||||
try:
|
||||
if should_show_help(args):
|
||||
log(
|
||||
f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}"
|
||||
)
|
||||
parsed, _, _ = sh.parse_cmdlet_args(args)
|
||||
if sh.should_show_help(parsed):
|
||||
return 0
|
||||
|
||||
# Parse arguments
|
||||
parsed_args = parse_cmdlet_args(args, CMDLET)
|
||||
delete_all_flag = parsed_args.get("all", False)
|
||||
rel_type_filter = parsed_args.get("type")
|
||||
override_store = parsed_args.get("store")
|
||||
override_hashes = sh.parse_hash_query(parsed_args.get("query"))
|
||||
if parsed_args.get("query") and not override_hashes:
|
||||
log("Invalid -query value (expected hash:<sha256>)", file=sys.stderr)
|
||||
return 1
|
||||
raw_path = parsed_args.get("path")
|
||||
|
||||
# Normalize input
|
||||
results = normalize_result_input(result)
|
||||
|
||||
# Allow store/hash-first usage when no pipeline items were provided
|
||||
if (not results) and override_hashes:
|
||||
if not override_store:
|
||||
log(
|
||||
"-store is required when using -query without piped items",
|
||||
file=sys.stderr
|
||||
)
|
||||
return 1
|
||||
results = [
|
||||
{
|
||||
"hash": h,
|
||||
"store": str(override_store)
|
||||
} for h in override_hashes
|
||||
]
|
||||
|
||||
if not results:
|
||||
# Legacy -path mode below may still apply
|
||||
if raw_path:
|
||||
results = [{
|
||||
"file_path": raw_path
|
||||
}]
|
||||
else:
|
||||
log("No results to process", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
# Decide store (for same-store enforcement + folder-store DB routing)
|
||||
store_name: Optional[str] = str(override_store
|
||||
).strip() if override_store else None
|
||||
if not store_name:
|
||||
stores = {
|
||||
str(get_field(r,
|
||||
"store"))
|
||||
for r in results if get_field(r, "store")
|
||||
}
|
||||
if len(stores) == 1:
|
||||
store_name = next(iter(stores))
|
||||
elif len(stores) > 1:
|
||||
log(
|
||||
"Multiple stores detected in pipeline; use -store to choose one",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 1
|
||||
|
||||
deleted_count = 0
|
||||
|
||||
# STORE/HASH FIRST: folder-store DB deletion (preferred)
|
||||
if store_name:
|
||||
backend = None
|
||||
store_root: Optional[Path] = None
|
||||
try:
|
||||
store = Store(config)
|
||||
backend = store[str(store_name)]
|
||||
loc = getattr(backend, "location", None)
|
||||
if callable(loc):
|
||||
store_root = Path(str(loc()))
|
||||
except Exception:
|
||||
backend = None
|
||||
store_root = None
|
||||
|
||||
if store_root is not None:
|
||||
try:
|
||||
with API_folder_store(store_root) as db:
|
||||
conn = db.connection
|
||||
if conn is None:
|
||||
raise RuntimeError("Store DB connection is not initialized")
|
||||
for single_result in results:
|
||||
# Enforce same-store when items carry store info
|
||||
item_store = get_field(single_result, "store")
|
||||
if item_store and str(item_store) != str(store_name):
|
||||
log(
|
||||
f"Cross-store delete blocked: item store '{item_store}' != '{store_name}'",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 1
|
||||
|
||||
file_hash = _extract_hash(single_result)
|
||||
if not file_hash:
|
||||
# Try path -> hash lookup within this store
|
||||
fp = (
|
||||
get_field(single_result,
|
||||
"file_path")
|
||||
or get_field(single_result,
|
||||
"path")
|
||||
or get_field(single_result,
|
||||
"target")
|
||||
)
|
||||
if fp:
|
||||
try:
|
||||
file_hash = db.get_file_hash(Path(str(fp)))
|
||||
except Exception:
|
||||
file_hash = None
|
||||
if not file_hash:
|
||||
log(
|
||||
'Could not extract file hash for deletion (use -query "hash:<sha256>" or ensure pipeline includes hash)',
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 1
|
||||
|
||||
meta = db.get_metadata(file_hash) or {}
|
||||
rels = meta.get("relationships"
|
||||
) if isinstance(meta,
|
||||
dict) else None
|
||||
if not isinstance(rels, dict) or not rels:
|
||||
continue
|
||||
|
||||
if delete_all_flag:
|
||||
# remove reverse edges for all types
|
||||
for rt, hashes in list(rels.items()):
|
||||
if not isinstance(hashes, list):
|
||||
continue
|
||||
for other_hash in hashes:
|
||||
other_norm = normalize_hash(str(other_hash))
|
||||
if other_norm:
|
||||
_remove_reverse_link(
|
||||
db,
|
||||
src_hash=file_hash,
|
||||
dst_hash=other_norm,
|
||||
rel_type=str(rt),
|
||||
)
|
||||
rels = {}
|
||||
elif rel_type_filter:
|
||||
# delete one type (case-insensitive key match)
|
||||
key_to_delete: Optional[str] = None
|
||||
for k in list(rels.keys()):
|
||||
if str(k).lower() == str(rel_type_filter).lower():
|
||||
key_to_delete = str(k)
|
||||
break
|
||||
if not key_to_delete:
|
||||
continue
|
||||
hashes = rels.get(key_to_delete)
|
||||
if isinstance(hashes, list):
|
||||
for other_hash in hashes:
|
||||
other_norm = normalize_hash(str(other_hash))
|
||||
if other_norm:
|
||||
_remove_reverse_link(
|
||||
db,
|
||||
src_hash=file_hash,
|
||||
dst_hash=other_norm,
|
||||
rel_type=str(key_to_delete),
|
||||
)
|
||||
try:
|
||||
del rels[key_to_delete]
|
||||
except Exception:
|
||||
rels[key_to_delete] = []
|
||||
else:
|
||||
log(
|
||||
"Specify --all to delete all relationships or -type <type> to delete specific type",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 1
|
||||
|
||||
_upsert_relationships(db, file_hash, rels)
|
||||
conn.commit()
|
||||
_refresh_relationship_view_if_current(
|
||||
file_hash,
|
||||
None,
|
||||
None,
|
||||
config
|
||||
)
|
||||
deleted_count += 1
|
||||
|
||||
log(
|
||||
f"Successfully deleted relationships from {deleted_count} file(s)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 0
|
||||
except Exception as exc:
|
||||
log(f"Error deleting store relationships: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
# LEGACY PATH MODE (single local DB)
|
||||
# Get storage path
|
||||
local_storage_path = get_local_storage_path(config)
|
||||
if not local_storage_path:
|
||||
log("Local storage path not configured", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
try:
|
||||
with API_folder_store(Path(local_storage_path)) as db:
|
||||
conn = db.connection
|
||||
if conn is None:
|
||||
raise RuntimeError("Store DB connection is not initialized")
|
||||
|
||||
for single_result in results:
|
||||
# Get file path from result
|
||||
file_path_from_result = (
|
||||
get_field(single_result,
|
||||
"file_path") or get_field(single_result,
|
||||
"path")
|
||||
or get_field(single_result,
|
||||
"target")
|
||||
or (
|
||||
str(single_result) if not isinstance(single_result,
|
||||
dict) else None
|
||||
)
|
||||
)
|
||||
|
||||
if not file_path_from_result:
|
||||
log("Could not extract file path from result", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
file_path_obj = Path(str(file_path_from_result))
|
||||
|
||||
if not file_path_obj.exists():
|
||||
log(f"File not found: {file_path_obj}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
try:
|
||||
file_hash = db.get_file_hash(file_path_obj)
|
||||
except Exception:
|
||||
file_hash = None
|
||||
file_hash = normalize_hash(str(file_hash)) if file_hash else None
|
||||
if not file_hash:
|
||||
log(
|
||||
f"File not in database: {file_path_obj.name}",
|
||||
file=sys.stderr
|
||||
)
|
||||
continue
|
||||
|
||||
meta = db.get_metadata(file_hash) or {}
|
||||
rels = meta.get("relationships") if isinstance(meta, dict) else None
|
||||
if not isinstance(rels, dict) or not rels:
|
||||
continue
|
||||
|
||||
if delete_all_flag:
|
||||
for rt, hashes in list(rels.items()):
|
||||
if not isinstance(hashes, list):
|
||||
continue
|
||||
for other_hash in hashes:
|
||||
other_norm = normalize_hash(str(other_hash))
|
||||
if other_norm:
|
||||
_remove_reverse_link(
|
||||
db,
|
||||
src_hash=file_hash,
|
||||
dst_hash=other_norm,
|
||||
rel_type=str(rt),
|
||||
)
|
||||
rels = {}
|
||||
elif rel_type_filter:
|
||||
key_to_delete: Optional[str] = None
|
||||
for k in list(rels.keys()):
|
||||
if str(k).lower() == str(rel_type_filter).lower():
|
||||
key_to_delete = str(k)
|
||||
break
|
||||
if not key_to_delete:
|
||||
continue
|
||||
hashes = rels.get(key_to_delete)
|
||||
if isinstance(hashes, list):
|
||||
for other_hash in hashes:
|
||||
other_norm = normalize_hash(str(other_hash))
|
||||
if other_norm:
|
||||
_remove_reverse_link(
|
||||
db,
|
||||
src_hash=file_hash,
|
||||
dst_hash=other_norm,
|
||||
rel_type=str(key_to_delete),
|
||||
)
|
||||
try:
|
||||
del rels[key_to_delete]
|
||||
except Exception:
|
||||
rels[key_to_delete] = []
|
||||
else:
|
||||
log(
|
||||
"Specify --all to delete all relationships or -type <type> to delete specific type",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 1
|
||||
|
||||
_upsert_relationships(db, file_hash, rels)
|
||||
conn.commit()
|
||||
_refresh_relationship_view_if_current(
|
||||
file_hash,
|
||||
str(file_path_obj),
|
||||
None,
|
||||
config
|
||||
)
|
||||
deleted_count += 1
|
||||
except Exception as exc:
|
||||
log(f"Error deleting relationship: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
log(
|
||||
f"Successfully deleted relationships from {deleted_count} file(s)",
|
||||
file=sys.stderr
|
||||
)
|
||||
return 0
|
||||
# Relationship deletion is currently not implemented for Hydrus.
|
||||
# Legacy Folder storage has been removed.
|
||||
sh.log("Relationship deletion is currently only supported via Folder storage, which has been removed.", file=sys.stderr)
|
||||
sh.log("Hydrus relationship management via Medios-Macina is planned for a future update.", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
except Exception as exc:
|
||||
log(f"Error in delete-relationship: {exc}", file=sys.stderr)
|
||||
sh.log(f"Error in delete-relationship: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
|
||||
CMDLET = Cmdlet(
|
||||
CMDLET = sh.Cmdlet(
|
||||
name="delete-relationship",
|
||||
summary="Remove relationships from files.",
|
||||
usage=
|
||||
"@1 | delete-relationship --all OR delete-relationship -path <file> --all OR @1-3 | delete-relationship -type alt",
|
||||
summary="Remove relationships from files (Currently Disabled).",
|
||||
usage="@1 | delete-relationship --all",
|
||||
arg=[
|
||||
SharedArgs.PATH,
|
||||
SharedArgs.STORE,
|
||||
SharedArgs.QUERY,
|
||||
CmdletArg(
|
||||
sh.SharedArgs.PATH,
|
||||
sh.SharedArgs.STORE,
|
||||
sh.SharedArgs.QUERY,
|
||||
sh.CmdletArg(
|
||||
"all",
|
||||
type="flag",
|
||||
description="Delete all relationships for the file(s)."
|
||||
),
|
||||
CmdletArg(
|
||||
sh.CmdletArg(
|
||||
"type",
|
||||
type="string",
|
||||
description=
|
||||
"Delete specific relationship type ('alt', 'king', 'related'). Default: delete all types.",
|
||||
description="Delete specific relationship type.",
|
||||
),
|
||||
],
|
||||
detail=[
|
||||
"- Delete all relationships: pipe files | delete-relationship --all",
|
||||
"- Delete specific type: pipe files | delete-relationship -type alt",
|
||||
"- Delete all from file: delete-relationship -path <file> --all",
|
||||
],
|
||||
)
|
||||
|
||||
CMDLET.exec = _run
|
||||
|
||||
@@ -25,7 +25,6 @@ from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Sequence, Tuple
|
||||
|
||||
from SYS import pipeline as ctx
|
||||
from API.folder import read_sidecar, write_sidecar
|
||||
from . import _shared as sh
|
||||
|
||||
normalize_hash = sh.normalize_hash
|
||||
@@ -525,157 +524,6 @@ def _apply_result_updates_from_tags(result: Any, tag_list: List[str]) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def _handle_title_rename(old_path: Path, tags_list: List[str]) -> Optional[Path]:
|
||||
"""If a title: tag is present, rename the file and its .tag sidecar to match.
|
||||
|
||||
Returns the new path if renamed, otherwise returns None.
|
||||
"""
|
||||
# Extract title from tags
|
||||
new_title = None
|
||||
for tag in tags_list:
|
||||
if isinstance(tag, str) and tag.lower().startswith("title:"):
|
||||
new_title = tag.split(":", 1)[1].strip()
|
||||
break
|
||||
|
||||
if not new_title or not old_path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
# Build new filename with same extension
|
||||
old_name = old_path.name
|
||||
old_suffix = old_path.suffix
|
||||
|
||||
# Create new filename: title + extension
|
||||
new_name = f"{new_title}{old_suffix}"
|
||||
new_path = old_path.parent / new_name
|
||||
|
||||
# Don't rename if already the same name
|
||||
if new_path == old_path:
|
||||
return None
|
||||
|
||||
# Rename the main file
|
||||
if new_path.exists():
|
||||
log(f"Warning: Target filename already exists: {new_name}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
old_path.rename(new_path)
|
||||
log(f"Renamed file: {old_name} → {new_name}", file=sys.stderr)
|
||||
|
||||
# Rename the .tag sidecar if it exists
|
||||
old_tags_path = old_path.parent / (old_name + ".tag")
|
||||
if old_tags_path.exists():
|
||||
new_tags_path = old_path.parent / (new_name + ".tag")
|
||||
if new_tags_path.exists():
|
||||
log(
|
||||
f"Warning: Target sidecar already exists: {new_tags_path.name}",
|
||||
file=sys.stderr
|
||||
)
|
||||
else:
|
||||
old_tags_path.rename(new_tags_path)
|
||||
log(
|
||||
f"Renamed sidecar: {old_tags_path.name} → {new_tags_path.name}",
|
||||
file=sys.stderr
|
||||
)
|
||||
|
||||
return new_path
|
||||
except Exception as exc:
|
||||
log(f"Warning: Failed to rename file: {exc}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def _read_sidecar_fallback(p: Path) -> tuple[Optional[str], List[str], List[str]]:
|
||||
"""Fallback sidecar reader if metadata module unavailable.
|
||||
|
||||
Format:
|
||||
- Lines with "hash:" prefix: file hash
|
||||
- Lines with "url:" or "url:" prefix: url
|
||||
- Lines with "relationship:" prefix: ignored (internal relationships)
|
||||
- Lines with "key:", "namespace:value" format: treated as namespace tags
|
||||
- Plain lines without colons: freeform tags
|
||||
|
||||
Excluded namespaces (treated as metadata, not tags): hash, url, url, relationship
|
||||
"""
|
||||
try:
|
||||
raw = p.read_text(encoding="utf-8", errors="ignore")
|
||||
except OSError:
|
||||
return None, [], []
|
||||
t: List[str] = []
|
||||
u: List[str] = []
|
||||
h: Optional[str] = None
|
||||
|
||||
# Namespaces to exclude from tags
|
||||
excluded_namespaces = {"hash",
|
||||
"url",
|
||||
"url",
|
||||
"relationship"}
|
||||
|
||||
for line in raw.splitlines():
|
||||
s = line.strip()
|
||||
if not s:
|
||||
continue
|
||||
low = s.lower()
|
||||
|
||||
# Check if this is a hash line
|
||||
if low.startswith("hash:"):
|
||||
h = s.split(":", 1)[1].strip() if ":" in s else h
|
||||
# Check if this is a URL line
|
||||
elif low.startswith("url:") or low.startswith("url:"):
|
||||
val = s.split(":", 1)[1].strip() if ":" in s else ""
|
||||
if val:
|
||||
u.append(val)
|
||||
# Check if this is an excluded namespace
|
||||
elif ":" in s:
|
||||
namespace = s.split(":", 1)[0].strip().lower()
|
||||
if namespace not in excluded_namespaces:
|
||||
# Include as namespace tag (e.g., "title: The Freemasons")
|
||||
t.append(s)
|
||||
else:
|
||||
# Plain text without colon = freeform tag
|
||||
t.append(s)
|
||||
|
||||
return h, t, u
|
||||
|
||||
|
||||
def _write_sidecar(
|
||||
p: Path,
|
||||
media: Path,
|
||||
tag_list: List[str],
|
||||
url: List[str],
|
||||
hash_in_sidecar: Optional[str]
|
||||
) -> Path:
|
||||
"""Write tags to sidecar file and handle title-based renaming.
|
||||
|
||||
Returns the new media path if renamed, otherwise returns the original media path.
|
||||
"""
|
||||
success = write_sidecar(media, tag_list, url, hash_in_sidecar)
|
||||
if success:
|
||||
_apply_result_updates_from_tags(None, tag_list)
|
||||
# Check if we should rename the file based on title tag
|
||||
new_media = _handle_title_rename(media, tag_list)
|
||||
if new_media:
|
||||
return new_media
|
||||
return media
|
||||
|
||||
# Fallback writer
|
||||
ordered = [s for s in tag_list if s and s.strip()]
|
||||
lines = []
|
||||
if hash_in_sidecar:
|
||||
lines.append(f"hash:{hash_in_sidecar}")
|
||||
lines.extend(ordered)
|
||||
for u in url:
|
||||
lines.append(f"url:{u}")
|
||||
try:
|
||||
p.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||||
# Check if we should rename the file based on title tag
|
||||
new_media = _handle_title_rename(media, tag_list)
|
||||
if new_media:
|
||||
return new_media
|
||||
return media
|
||||
except OSError as exc:
|
||||
log(f"Failed to write sidecar: {exc}", file=sys.stderr)
|
||||
return media
|
||||
|
||||
|
||||
def _emit_tag_payload(
|
||||
source: str,
|
||||
tags_list: List[str],
|
||||
@@ -1497,17 +1345,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
|
||||
str) and file_path and not file_path.lower().startswith(
|
||||
("http://",
|
||||
"https://"))):
|
||||
try:
|
||||
media_path = Path(str(file_path))
|
||||
if media_path.exists():
|
||||
tags_from_sidecar = read_sidecar(media_path)
|
||||
if isinstance(tags_from_sidecar, list):
|
||||
identifier_tags = [
|
||||
str(t) for t in tags_from_sidecar
|
||||
if isinstance(t, (str, bytes))
|
||||
]
|
||||
except Exception:
|
||||
pass
|
||||
pass
|
||||
|
||||
title_from_tags = _extract_tag_value(identifier_tags, "title")
|
||||
artist_from_tags = _extract_tag_value(identifier_tags, "artist")
|
||||
|
||||
@@ -181,27 +181,6 @@ def _persist_alt_relationship(
|
||||
if len(alt_norm) != 64 or len(king_norm) != 64 or alt_norm == king_norm:
|
||||
return
|
||||
|
||||
# Folder-backed local DB
|
||||
try:
|
||||
if (type(backend).__name__ == "Folder" and hasattr(backend,
|
||||
"location")
|
||||
and callable(getattr(backend,
|
||||
"location"))):
|
||||
from API.folder import API_folder_store
|
||||
from pathlib import Path
|
||||
|
||||
root = Path(str(backend.location())).expanduser()
|
||||
with API_folder_store(root) as db:
|
||||
db.set_relationship_by_hash(
|
||||
alt_norm,
|
||||
king_norm,
|
||||
"alt",
|
||||
bidirectional=False
|
||||
)
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Hydrus-like backend
|
||||
try:
|
||||
client = getattr(backend, "_client", None)
|
||||
@@ -461,28 +440,14 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
|
||||
store = Store(config)
|
||||
if store.is_available(store_name):
|
||||
backend = store[str(store_name)]
|
||||
move_flag = type(backend).__name__ == "Folder"
|
||||
stored_hash = backend.add_file(
|
||||
Path(str(output_path)),
|
||||
title=new_title,
|
||||
tag=new_tags,
|
||||
url=urls,
|
||||
move=move_flag,
|
||||
move=False,
|
||||
)
|
||||
stored_store = store_name
|
||||
|
||||
# Best-effort resolve stored path for folder backends.
|
||||
try:
|
||||
if type(backend).__name__ == "Folder" and hasattr(
|
||||
backend,
|
||||
"get_file"):
|
||||
p = backend.get_file(str(stored_hash))
|
||||
if isinstance(p, Path):
|
||||
stored_path = str(p)
|
||||
elif isinstance(p, str) and p:
|
||||
stored_path = p
|
||||
except Exception:
|
||||
stored_path = None
|
||||
except Exception as exc:
|
||||
log(
|
||||
f"Failed to add clip to store '{store_name}': {exc}",
|
||||
|
||||
Reference in New Issue
Block a user