Files
Medios-Macina/cmdlet/add_tag.py
2026-01-01 20:37:27 -08:00

993 lines
37 KiB
Python

from __future__ import annotations
from typing import Any, Dict, List, Sequence, Optional
from pathlib import Path
import sys
import re
from SYS.logger import log
from SYS import models
from SYS import pipeline as ctx
from . import _shared as sh
normalize_result_input = sh.normalize_result_input
filter_results_by_temp = sh.filter_results_by_temp
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
normalize_hash = sh.normalize_hash
parse_tag_arguments = sh.parse_tag_arguments
expand_tag_groups = sh.expand_tag_groups
parse_cmdlet_args = sh.parse_cmdlet_args
collapse_namespace_tag = sh.collapse_namespace_tag
should_show_help = sh.should_show_help
get_field = sh.get_field
from Store import Store
from SYS.utils import sha256_file
_FIELD_NAME_RE = re.compile(r"^[A-Za-z0-9_]+$")
def _normalize_title_for_extract(text: str) -> str:
"""Normalize common separators in titles for matching.
Helps when sources use unicode dashes or odd whitespace.
"""
s = str(text or "").strip()
if not s:
return s
# Common unicode dash variants -> '-'
s = s.replace("\u2013", "-") # en dash
s = s.replace("\u2014", "-") # em dash
s = s.replace("\u2212", "-") # minus sign
s = s.replace("\u2010", "-") # hyphen
s = s.replace("\u2011", "-") # non-breaking hyphen
s = s.replace("\u2012", "-") # figure dash
s = s.replace("\u2015", "-") # horizontal bar
# Collapse any whitespace runs (including newlines/tabs) to a single space.
# Some sources wrap the artist name or title across lines.
try:
s = re.sub(r"\s+", " ", s).strip()
except Exception:
s = " ".join(s.split())
return s
def _strip_title_prefix(text: str) -> str:
s = str(text or "").strip()
if s.lower().startswith("title:"):
s = s.split(":", 1)[1].strip()
return s
def _literal_to_title_pattern_regex(literal: str) -> str:
"""Convert a literal chunk of a template into a regex fragment.
Keeps punctuation literal, but treats any whitespace run as \\s*.
"""
out: List[str] = []
i = 0
while i < len(literal):
ch = literal[i]
if ch.isspace():
while i < len(literal) and literal[i].isspace():
i += 1
out.append(r"\s*")
continue
out.append(re.escape(ch))
i += 1
return "".join(out)
def _compile_extract_template(template: str) -> tuple[re.Pattern[str], List[str]]:
"""Compile a simple (field) template into a regex.
Example template:
(artist) - (album) - (disk)-(track) (title)
This is *not* user-facing regex: we only support named fields in parentheses.
"""
tpl = str(template or "").strip()
if not tpl:
raise ValueError("empty extract template")
matches = list(re.finditer(r"\(([^)]+)\)", tpl))
if not matches:
raise ValueError("extract template must contain at least one (field)")
field_names: List[str] = []
parts: List[str] = [r"^\s*"]
last_end = 0
for idx, m in enumerate(matches):
literal = tpl[last_end:m.start()]
if literal:
parts.append(_literal_to_title_pattern_regex(literal))
raw_name = (m.group(1) or "").strip()
if not raw_name or not _FIELD_NAME_RE.fullmatch(raw_name):
raise ValueError(
f"invalid field name '{raw_name}' (use A-Z, 0-9, underscore)"
)
field_names.append(raw_name)
name_lower = raw_name.lower()
is_last = idx == (len(matches) - 1)
if is_last:
parts.append(rf"(?P<{raw_name}>.+)")
else:
# Heuristic: common numeric fields should capture full digit runs.
# This avoids ambiguous splits like track='2', title='3 ...'.
if name_lower in {
"disk",
"disc",
"cd",
"track",
"trk",
"episode",
"ep",
"season",
"year",
}:
parts.append(rf"(?P<{raw_name}>\d+)")
else:
parts.append(rf"(?P<{raw_name}>.+?)")
last_end = m.end()
tail = tpl[last_end:]
if tail:
parts.append(_literal_to_title_pattern_regex(tail))
parts.append(r"\s*$")
rx = "".join(parts)
return re.compile(rx, flags=re.IGNORECASE), field_names
def _extract_tags_from_title(title_text: str, template: str) -> List[str]:
"""Extract (field)->value from title_text and return ['field:value', ...]."""
title_clean = _normalize_title_for_extract(_strip_title_prefix(title_text))
if not title_clean:
return []
pattern, field_names = _compile_extract_template(template)
m = pattern.match(title_clean)
if not m:
return []
out: List[str] = []
for name in field_names:
value = (m.group(name) or "").strip()
if not value:
continue
out.append(f"{name}:{value}")
return out
def _get_title_candidates_for_extraction(
res: Any,
existing_tags: Optional[List[str]] = None
) -> List[str]:
"""Return a list of possible title strings in priority order."""
candidates: List[str] = []
def add_candidate(val: Any) -> None:
if val is None:
return
s = _normalize_title_for_extract(_strip_title_prefix(str(val)))
if not s:
return
if s not in candidates:
candidates.append(s)
# 1) Item's title field (may be a display title, not the title: tag)
try:
add_candidate(get_field(res, "title"))
except Exception:
pass
if isinstance(res, dict):
add_candidate(res.get("title"))
# 2) title: tag from either store tags or piped tags
tags = existing_tags if isinstance(existing_tags, list) else _extract_item_tags(res)
add_candidate(_extract_title_tag(tags) or "")
# 3) Filename stem
try:
path_val = get_field(res, "path")
if path_val:
p = Path(str(path_val))
add_candidate((p.stem or "").strip())
except Exception:
pass
return candidates
def _extract_tags_from_title_candidates(candidates: List[str],
template: str) -> tuple[List[str],
Optional[str]]:
"""Try candidates in order; return (tags, matched_candidate)."""
for c in candidates:
extracted = _extract_tags_from_title(c, template)
if extracted:
return extracted, c
return [], None
def _try_compile_extract_template(
template: Optional[str],
) -> tuple[Optional[re.Pattern[str]],
Optional[str]]:
"""Compile template for debug; return (pattern, error_message)."""
if template is None:
return None, None
try:
pattern, _fields = _compile_extract_template(str(template))
return pattern, None
except Exception as exc:
return None, str(exc)
def _extract_title_tag(tags: List[str]) -> Optional[str]:
"""Return the value of the first title: tag if present."""
for t in tags:
if t.lower().startswith("title:"):
value = t.split(":", 1)[1].strip()
return value or None
return None
def _extract_item_tags(res: Any) -> List[str]:
if isinstance(res, models.PipeObject):
raw = getattr(res, "tag", None)
elif isinstance(res, dict):
raw = res.get("tag")
else:
raw = None
if isinstance(raw, list):
return [str(t) for t in raw if t is not None]
if isinstance(raw, str) and raw.strip():
return [raw]
return []
def _set_item_tags(res: Any, tags: List[str]) -> None:
if isinstance(res, models.PipeObject):
res.tag = tags
elif isinstance(res, dict):
res["tag"] = tags
def _apply_title_to_result(res: Any, title_value: Optional[str]) -> None:
"""Update result object/dict title fields and columns in-place."""
if not title_value:
return
if isinstance(res, models.PipeObject):
res.title = title_value
# Update columns if present (Title column assumed index 0)
columns = getattr(res, "columns", None)
if isinstance(columns, list) and columns:
label, *_ = columns[0]
if str(label).lower() == "title":
columns[0] = (label, title_value)
elif isinstance(res, dict):
res["title"] = title_value
cols = res.get("columns")
if isinstance(cols, list):
updated = []
changed = False
for col in cols:
if isinstance(col, tuple) and len(col) == 2:
label, _val = col
if str(label).lower() == "title":
updated.append((label, title_value))
changed = True
else:
updated.append(col)
else:
updated.append(col)
if changed:
res["columns"] = updated
def _matches_target(
item: Any,
target_hash: Optional[str],
target_path: Optional[str],
target_store: Optional[str] = None,
) -> bool:
"""Determine whether a result item refers to the given target.
Important: hashes can collide across backends in this app's UX (same media in
multiple stores). When target_store is provided, it must match too.
"""
def norm(val: Any) -> Optional[str]:
return str(val).lower() if val is not None else None
target_hash_l = target_hash.lower() if target_hash else None
target_path_l = target_path.lower() if target_path else None
target_store_l = target_store.lower() if target_store else None
if isinstance(item, dict):
hashes = [norm(item.get("hash"))]
paths = [norm(item.get("path"))]
stores = [norm(item.get("store"))]
else:
hashes = [norm(get_field(item, "hash"))]
paths = [norm(get_field(item, "path"))]
stores = [norm(get_field(item, "store"))]
if target_store_l:
if target_store_l not in stores:
return False
if target_hash_l and target_hash_l in hashes:
return True
if target_path_l and target_path_l in paths:
return True
return False
def _update_item_title_fields(item: Any, new_title: str) -> None:
"""Mutate an item to reflect a new title in plain fields and columns."""
if isinstance(item, models.PipeObject):
item.title = new_title
columns = getattr(item, "columns", None)
if isinstance(columns, list) and columns:
label, *_ = columns[0]
if str(label).lower() == "title":
columns[0] = (label, new_title)
elif isinstance(item, dict):
item["title"] = new_title
cols = item.get("columns")
if isinstance(cols, list):
updated_cols = []
changed = False
for col in cols:
if isinstance(col, tuple) and len(col) == 2:
label, _val = col
if str(label).lower() == "title":
updated_cols.append((label, new_title))
changed = True
else:
updated_cols.append(col)
else:
updated_cols.append(col)
if changed:
item["columns"] = updated_cols
def _refresh_result_table_title(
new_title: str,
target_hash: Optional[str],
target_store: Optional[str],
target_path: Optional[str],
) -> None:
"""Refresh the cached result table with an updated title and redisplay it."""
try:
last_table = ctx.get_last_result_table()
items = ctx.get_last_result_items()
if not last_table or not items:
return
updated_items = []
match_found = False
for item in items:
try:
if _matches_target(item, target_hash, target_path, target_store):
_update_item_title_fields(item, new_title)
match_found = True
except Exception:
pass
updated_items.append(item)
if not match_found:
return
new_table = last_table.copy_with_title(getattr(last_table, "title", ""))
for item in updated_items:
new_table.add_result(item)
# Keep the underlying history intact; update only the overlay so @.. can
# clear the overlay then continue back to prior tables (e.g., the search list).
ctx.set_last_result_table_overlay(new_table, updated_items)
except Exception:
pass
def _refresh_tag_view(
res: Any,
target_hash: Optional[str],
store_name: Optional[str],
target_path: Optional[str],
config: Dict[str,
Any],
) -> None:
"""Refresh tag display via get-tag. Prefer current subject; fall back to direct hash refresh."""
try:
from cmdlet import get as get_cmdlet # type: ignore
except Exception:
return
if not target_hash or not store_name:
return
refresh_args: List[str] = ["-query", f"hash:{target_hash}", "-store", store_name]
get_tag = None
try:
get_tag = get_cmdlet("get-tag")
except Exception:
get_tag = None
if not callable(get_tag):
return
try:
subject = ctx.get_last_result_subject()
if subject and _matches_target(subject, target_hash, target_path, store_name):
get_tag(subject, refresh_args, config)
return
except Exception:
pass
try:
get_tag(res, refresh_args, config)
except Exception:
pass
class Add_Tag(Cmdlet):
"""Class-based add-tag cmdlet with Cmdlet metadata inheritance."""
def __init__(self) -> None:
super().__init__(
name="add-tag",
summary="Add tag to a file in a store.",
usage=
'add-tag -store <store> [-query "hash:<sha256>"] [-duplicate <format>] [-list <list>[,<list>...]] [--all] <tag>[,<tag>...]',
arg=[
CmdletArg(
"tag",
type="string",
required=False,
description=
"One or more tag to add. Comma- or space-separated. Can also use {list_name} syntax. If omitted, uses tag from pipeline payload.",
variadic=True,
),
SharedArgs.QUERY,
SharedArgs.STORE,
CmdletArg(
"-extract",
type="string",
description=
'Extract tags from the item\'s title using a simple template with (field) placeholders. Example: -extract "(artist) - (album) - (disk)-(track) (title)" will add artist:, album:, disk:, track:, title: tags.',
),
CmdletArg(
"--extract-debug",
type="flag",
description=
"Print debug info for -extract matching (matched title source and extracted tags).",
),
CmdletArg(
"-duplicate",
type="string",
description=
"Copy existing tag values to new namespaces. Formats: title:album,artist (explicit) or title,album,artist (inferred)",
),
CmdletArg(
"-list",
type="string",
description=
"Load predefined tag lists from adjective.json. Comma-separated list names (e.g., -list philosophy,occult).",
),
CmdletArg(
"--all",
type="flag",
description=
"Include temporary files in tagging (by default, only tag non-temporary files).",
),
],
detail=[
"- By default, only tag non-temporary files (from pipelines). Use --all to tag everything.",
"- Requires a store backend: use -store or pipe items that include store.",
"- If -query is not provided, uses the piped item's hash (or derives from its path when possible).",
"- Multiple tag can be comma-separated or space-separated.",
"- Use -list to include predefined tag lists from adjective.json: -list philosophy,occult",
'- tag can also reference lists with curly braces: add-tag {philosophy} "other:tag"',
"- Use -duplicate to copy EXISTING tag values to new namespaces:",
" Explicit format: -duplicate title:album,artist (copies title: to album: and artist:)",
" Inferred format: -duplicate title,album,artist (first is source, rest are targets)",
"- The source namespace must already exist in the file being tagged.",
"- Target namespaces that already have a value are skipped (not overwritten).",
"- Use -extract to derive namespaced tags from the current title (title field or title: tag) using a simple template.",
],
exec=self.run,
)
self.register()
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Add tag to a file with smart filtering for pipeline results."""
if should_show_help(args):
log(f"Cmdlet: {self.name}\nSummary: {self.summary}\nUsage: {self.usage}")
return 0
# Parse arguments
parsed = parse_cmdlet_args(args, self)
extract_template = parsed.get("extract")
if extract_template is not None:
extract_template = str(extract_template)
extract_debug = bool(parsed.get("extract-debug", False))
extract_debug_rx, extract_debug_err = _try_compile_extract_template(extract_template)
query_hash = sh.parse_single_hash_query(parsed.get("query"))
if parsed.get("query") and not query_hash:
log(
"[add_tag] Error: -query must be of the form hash:<sha256>",
file=sys.stderr
)
return 1
hash_override = normalize_hash(query_hash) if query_hash else None
# If add-tag is in the middle of a pipeline (has downstream stages), default to
# including temp files. This enables common flows like:
# @N | download-file | add-tag ... | add-file ...
store_override = parsed.get("store")
stage_ctx = ctx.get_stage_context()
has_downstream = bool(
stage_ctx is not None and not getattr(stage_ctx,
"is_last_stage",
False)
)
include_temp = bool(parsed.get("all", False))
if has_downstream and not include_temp and not store_override:
include_temp = True
# Normalize input to list
results = normalize_result_input(result)
# Filter by temp status (unless --all is set)
if not include_temp:
results = filter_results_by_temp(results, include_temp=False)
# When no pipeline payload is present but -query/-store pinpoints a hash, tag it directly.
if not results and hash_override and store_override:
results = [{"hash": hash_override, "store": store_override}]
if not results:
log(
"No valid files to tag (all results were temporary; use --all to include temporary files)",
file=sys.stderr,
)
return 1
# Get tag from arguments (or fallback to pipeline payload)
raw_tag = parsed.get("tag", [])
if isinstance(raw_tag, str):
raw_tag = [raw_tag]
# Fallback: if no tag provided explicitly, try to pull from first result payload.
# IMPORTANT: when -extract is used, users typically want *only* extracted tags,
# not "re-add whatever tags are already in the payload".
if not raw_tag and results and not extract_template:
first = results[0]
payload_tag = None
# Try multiple tag lookup strategies in order
tag_lookups = [
lambda x: getattr(x, "tag", None),
lambda x: x.get("tag") if isinstance(x, dict) else None,
]
for lookup in tag_lookups:
try:
payload_tag = lookup(first)
if payload_tag:
break
except (AttributeError, TypeError, KeyError):
continue
if payload_tag:
if isinstance(payload_tag, str):
raw_tag = [payload_tag]
elif isinstance(payload_tag, list):
raw_tag = payload_tag
# Handle -list argument (convert to {list} syntax)
list_arg = parsed.get("list")
if list_arg:
for l in list_arg.split(","):
l = l.strip()
if l:
raw_tag.append(f"{{{l}}}")
# Parse and expand tag
tag_to_add = parse_tag_arguments(raw_tag)
tag_to_add = expand_tag_groups(tag_to_add)
if not tag_to_add and not extract_template:
log(
"No tag provided to add (and no -extract template provided)",
file=sys.stderr
)
return 1
if extract_template and extract_debug and extract_debug_err:
log(
f"[add_tag] extract template error: {extract_debug_err}",
file=sys.stderr
)
return 1
# Get other flags
duplicate_arg = parsed.get("duplicate")
# tag ARE provided - apply them to each store-backed result
total_added = 0
total_modified = 0
store_registry = Store(config)
extract_matched_items = 0
extract_no_match_items = 0
for res in results:
store_name: Optional[str]
raw_hash: Optional[str]
raw_path: Optional[str]
if isinstance(res, models.PipeObject):
store_name = store_override or res.store
raw_hash = res.hash
raw_path = res.path
elif isinstance(res, dict):
store_name = store_override or res.get("store")
raw_hash = res.get("hash")
raw_path = res.get("path")
else:
ctx.emit(res)
continue
if not store_name:
store_name = None
# If the item isn't in a configured store backend yet (e.g., store=PATH) but has a local file,
# treat add-tag as a pipeline mutation (carry tags forward for add-file) instead of a store write.
if not store_override:
store_name_str = str(store_name) if store_name is not None else ""
local_mode_requested = (
(not store_name_str) or (store_name_str.upper() == "PATH")
or (store_name_str.lower() == "local")
)
is_known_backend = bool(store_name_str) and store_registry.is_available(
store_name_str
)
if local_mode_requested and raw_path:
try:
if Path(str(raw_path)).expanduser().exists():
existing_tag_list = _extract_item_tags(res)
existing_lower = {
t.lower()
for t in existing_tag_list if isinstance(t, str)
}
item_tag_to_add = list(tag_to_add)
if extract_template:
candidates = _get_title_candidates_for_extraction(
res,
existing_tag_list
)
extracted, matched = _extract_tags_from_title_candidates(
candidates, extract_template
)
if extracted:
extract_matched_items += 1
if extract_debug:
log(
f"[add_tag] extract matched: {matched!r} -> {extracted}",
file=sys.stderr,
)
for new_tag in extracted:
if new_tag.lower() not in existing_lower:
item_tag_to_add.append(new_tag)
else:
extract_no_match_items += 1
if extract_debug:
rx_preview = (
extract_debug_rx.pattern
if extract_debug_rx else "<uncompiled>"
)
cand_preview = "; ".join(
[repr(c) for c in candidates[:3]]
)
log(
f"[add_tag] extract no match for template {extract_template!r}. regex: {rx_preview!r}. candidates: {cand_preview}",
file=sys.stderr,
)
item_tag_to_add = collapse_namespace_tag(
item_tag_to_add,
"title",
prefer="last"
)
if duplicate_arg:
parts = str(duplicate_arg).split(":")
source_ns = ""
targets: list[str] = []
if len(parts) > 1:
source_ns = parts[0]
targets = [
t.strip() for t in parts[1].split(",")
if t.strip()
]
else:
parts2 = str(duplicate_arg).split(",")
if len(parts2) > 1:
source_ns = parts2[0]
targets = [
t.strip() for t in parts2[1:] if t.strip()
]
if source_ns and targets:
source_prefix = source_ns.lower() + ":"
for t in existing_tag_list:
if not t.lower().startswith(source_prefix):
continue
value = t.split(":", 1)[1]
for target_ns in targets:
new_tag = f"{target_ns}:{value}"
if new_tag.lower() not in existing_lower:
item_tag_to_add.append(new_tag)
removed_namespace_tag: list[str] = []
for new_tag in item_tag_to_add:
if not isinstance(new_tag, str) or ":" not in new_tag:
continue
ns = new_tag.split(":", 1)[0].strip()
if not ns:
continue
ns_prefix = ns.lower() + ":"
for t in existing_tag_list:
if (t.lower().startswith(ns_prefix)
and t.lower() != new_tag.lower()):
removed_namespace_tag.append(t)
removed_namespace_tag = sorted(
{t
for t in removed_namespace_tag}
)
actual_tag_to_add = [
t for t in item_tag_to_add if isinstance(t, str)
and t.lower() not in existing_lower
]
updated_tag_list = [
t for t in existing_tag_list
if t not in removed_namespace_tag
]
updated_tag_list.extend(actual_tag_to_add)
_set_item_tags(res, updated_tag_list)
final_title = _extract_title_tag(updated_tag_list)
_apply_title_to_result(res, final_title)
total_added += len(actual_tag_to_add)
total_modified += (
1 if (removed_namespace_tag or actual_tag_to_add) else 0
)
ctx.emit(res)
continue
except Exception:
pass
if local_mode_requested:
log(
"[add_tag] Error: Missing usable local path for tagging (or provide -store)",
file=sys.stderr,
)
return 1
if store_name_str and not is_known_backend:
log(
f"[add_tag] Error: Unknown store '{store_name_str}'. Available: {store_registry.list_backends()}",
file=sys.stderr,
)
return 1
resolved_hash = (
normalize_hash(hash_override)
if hash_override else normalize_hash(raw_hash)
)
if not resolved_hash and raw_path:
try:
p = Path(str(raw_path))
stem = p.stem
if len(stem) == 64 and all(c in "0123456789abcdef"
for c in stem.lower()):
resolved_hash = stem.lower()
elif p.exists() and p.is_file():
resolved_hash = sha256_file(p)
except Exception:
resolved_hash = None
if not resolved_hash:
log(
"[add_tag] Warning: Item missing usable hash (and could not derive from path); skipping",
file=sys.stderr,
)
ctx.emit(res)
continue
try:
backend = store_registry[str(store_name)]
except Exception as exc:
log(
f"[add_tag] Error: Unknown store '{store_name}': {exc}",
file=sys.stderr
)
return 1
try:
existing_tag, _src = backend.get_tag(resolved_hash, config=config)
except Exception:
existing_tag = []
existing_tag_list = [t for t in (existing_tag or []) if isinstance(t, str)]
existing_lower = {t.lower()
for t in existing_tag_list}
original_title = _extract_title_tag(existing_tag_list)
# Per-item tag list (do not mutate shared list)
item_tag_to_add = list(tag_to_add)
if extract_template:
candidates2 = _get_title_candidates_for_extraction(
res,
existing_tag_list
)
extracted2, matched2 = _extract_tags_from_title_candidates(
candidates2, extract_template
)
if extracted2:
extract_matched_items += 1
if extract_debug:
log(
f"[add_tag] extract matched: {matched2!r} -> {extracted2}",
file=sys.stderr,
)
for new_tag in extracted2:
if new_tag.lower() not in existing_lower:
item_tag_to_add.append(new_tag)
else:
extract_no_match_items += 1
if extract_debug:
rx_preview2 = (
extract_debug_rx.pattern
if extract_debug_rx else "<uncompiled>"
)
cand_preview2 = "; ".join([repr(c) for c in candidates2[:3]])
log(
f"[add_tag] extract no match for template {extract_template!r}. regex: {rx_preview2!r}. candidates: {cand_preview2}",
file=sys.stderr,
)
item_tag_to_add = collapse_namespace_tag(
item_tag_to_add,
"title",
prefer="last"
)
# Handle -duplicate logic (copy existing tag to new namespaces)
if duplicate_arg:
parts = str(duplicate_arg).split(":")
source_ns = ""
targets: list[str] = []
if len(parts) > 1:
source_ns = parts[0]
targets = [t.strip() for t in parts[1].split(",") if t.strip()]
else:
parts2 = str(duplicate_arg).split(",")
if len(parts2) > 1:
source_ns = parts2[0]
targets = [t.strip() for t in parts2[1:] if t.strip()]
if source_ns and targets:
source_prefix = source_ns.lower() + ":"
for t in existing_tag_list:
if not t.lower().startswith(source_prefix):
continue
value = t.split(":", 1)[1]
for target_ns in targets:
new_tag = f"{target_ns}:{value}"
if new_tag.lower() not in existing_lower:
item_tag_to_add.append(new_tag)
changed = False
try:
ok_add = backend.add_tag(resolved_hash, item_tag_to_add, config=config)
if not ok_add:
log("[add_tag] Warning: Store rejected tag update", file=sys.stderr)
except Exception as exc:
log(f"[add_tag] Warning: Failed adding tag: {exc}", file=sys.stderr)
try:
refreshed_tag, _src2 = backend.get_tag(resolved_hash, config=config)
refreshed_list = [
t for t in (refreshed_tag or []) if isinstance(t, str)
]
except Exception:
refreshed_list = existing_tag_list
# Decide whether anything actually changed (case-sensitive so title casing updates count).
if set(refreshed_list) != set(existing_tag_list):
changed = True
before_lower = {t.lower()
for t in existing_tag_list}
after_lower = {t.lower()
for t in refreshed_list}
total_added += len(after_lower - before_lower)
total_modified += 1
# Update the result's tag using canonical field
if isinstance(res, models.PipeObject):
res.tag = refreshed_list
elif isinstance(res, dict):
res["tag"] = refreshed_list
final_title = _extract_title_tag(refreshed_list)
_apply_title_to_result(res, final_title)
if final_title and (not original_title or final_title != original_title):
_refresh_result_table_title(
final_title,
resolved_hash,
str(store_name),
raw_path
)
if changed:
_refresh_tag_view(res, resolved_hash, str(store_name), raw_path, config)
ctx.emit(res)
log(
f"[add_tag] Added {total_added} new tag(s) across {len(results)} item(s); modified {total_modified} item(s)",
file=sys.stderr,
)
if extract_template and extract_matched_items == 0:
log(
f"[add_tag] extract: no matches for template '{extract_template}' across {len(results)} item(s)",
file=sys.stderr,
)
elif extract_template and extract_no_match_items > 0 and extract_debug:
log(
f"[add_tag] extract: matched {extract_matched_items}, no-match {extract_no_match_items}",
file=sys.stderr,
)
return 0
CMDLET = Add_Tag()