diff --git a/cmdlet/delete_tag.py b/cmdlet/delete_tag.py index 67e11bf..ecd915b 100644 --- a/cmdlet/delete_tag.py +++ b/cmdlet/delete_tag.py @@ -4,6 +4,9 @@ from typing import Any, Dict, Sequence import sys from SYS import pipeline as ctx +from SYS.item_accessors import set_field +from SYS.payload_builders import extract_title_tag_value +from SYS.result_publication import publish_result_table from . import _shared as sh Cmdlet = sh.Cmdlet @@ -19,6 +22,172 @@ get_field = sh.get_field from SYS.logger import debug, log +def _matches_target( + item: Any, + target_hash: str | None, + target_path: str | None, + target_store: str | None = None, +) -> bool: + def norm(val: Any) -> str | None: + return str(val).lower() if val is not None else None + + target_hash_l = target_hash.lower() if target_hash else None + target_path_l = target_path.lower() if target_path else None + target_store_l = target_store.lower() if target_store else None + + if isinstance(item, dict): + hashes = [norm(item.get("hash"))] + paths = [norm(item.get("path")), norm(item.get("target"))] + stores = [norm(item.get("store"))] + else: + hashes = [norm(get_field(item, "hash"))] + paths = [norm(get_field(item, "path")), norm(get_field(item, "target"))] + stores = [norm(get_field(item, "store"))] + + if target_store_l and target_store_l not in stores: + return False + if target_hash_l and target_hash_l in hashes: + return True + if target_path_l and target_path_l in paths: + return True + return False + + +def _set_result_tags(result: Any, tags: list[str]) -> None: + normalized = list(tags or []) + set_field(result, "tag", normalized) + + if isinstance(result, dict): + if "tags" in result: + result["tags"] = list(normalized) + for container_name in ("extra", "metadata", "full_metadata"): + container = result.get(container_name) + if not isinstance(container, dict): + continue + if "tag" in container: + container["tag"] = list(normalized) + if "tags" in container: + container["tags"] = list(normalized) + return + + try: + setattr(result, "tags", list(normalized)) + except Exception: + pass + for container_name in ("extra", "metadata", "full_metadata"): + container = getattr(result, container_name, None) + if not isinstance(container, dict): + continue + if "tag" in container: + container["tag"] = list(normalized) + if "tags" in container: + container["tags"] = list(normalized) + + +def _apply_title_to_result(result: Any, title_value: str | None) -> None: + if not title_value: + return + + if isinstance(result, dict): + result["title"] = title_value + cols = result.get("columns") + if isinstance(cols, list): + updated_cols = [] + changed = False + for col in cols: + if isinstance(col, tuple) and len(col) == 2: + label, existing_value = col + if str(label).lower() == "title": + updated_cols.append((label, title_value)) + changed = True + else: + updated_cols.append((label, existing_value)) + else: + updated_cols.append(col) + if changed: + result["columns"] = updated_cols + return + + try: + setattr(result, "title", title_value) + except Exception: + pass + columns = getattr(result, "columns", None) + if isinstance(columns, list) and columns: + try: + label, *_ = columns[0] + if str(label).lower() == "title": + columns[0] = (label, title_value) + except Exception: + pass + + +def _refresh_result_table_tags( + new_tags: list[str], + target_hash: str | None, + target_store: str | None, + target_path: str | None, +) -> None: + try: + last_table = ctx.get_last_result_table() + items = ctx.get_last_result_items() + if not last_table or not items: + return + + updated_items = [] + match_found = False + title_value = extract_title_tag_value(new_tags) + for item in items: + try: + if _matches_target(item, target_hash, target_path, target_store): + _set_result_tags(item, new_tags) + if title_value: + _apply_title_to_result(item, title_value) + match_found = True + except Exception: + pass + updated_items.append(item) + + if not match_found: + return + + new_table = last_table.copy_with_title(getattr(last_table, "title", "")) + for item in updated_items: + new_table.add_result(item) + + publish_result_table(ctx, new_table, updated_items, overlay=True) + except Exception: + pass + + +def _expand_namespace_delete_tags(tags: Sequence[str], existing_tags: Sequence[str]) -> list[str]: + expanded: list[str] = [] + existing_list = [str(tag or "").strip() for tag in existing_tags or [] if str(tag or "").strip()] + + for raw_tag in tags or []: + text = str(raw_tag or "").strip() + if not text: + continue + namespace, sep, value = text.partition(":") + if sep and namespace.strip() and not value.strip(): + wanted = namespace.strip().casefold() + matches = [] + for existing in existing_list: + existing_ns, existing_sep, existing_value = existing.partition(":") + if not existing_sep: + continue + if existing_ns.strip().casefold() != wanted: + continue + if not existing_value.strip(): + continue + matches.append(existing) + expanded.extend(matches) + continue + expanded.append(text) + + return merge_sequences(expanded, case_sensitive=True) + + def _refresh_tag_view_if_current( file_hash: str | None, store_name: str | None, @@ -120,6 +289,102 @@ def _refresh_tag_view_if_current( pass +def _parse_delete_tag_arguments(arguments: Sequence[str]) -> list[str]: + def _split_top_level_commas(text: str) -> list[str]: + segments: list[str] = [] + current: list[str] = [] + paren_depth = 0 + angle_depth = 0 + quote: str | None = None + escape = False + + for ch in text: + if escape: + current.append(ch) + escape = False + continue + if ch == "\\": + current.append(ch) + escape = True + continue + if quote: + current.append(ch) + if ch == quote: + quote = None + continue + if ch in {"'", '"'}: + current.append(ch) + quote = ch + continue + if ch == "(": + paren_depth += 1 + current.append(ch) + continue + if ch == ")": + paren_depth = max(0, paren_depth - 1) + current.append(ch) + continue + if ch == "<": + angle_depth += 1 + current.append(ch) + continue + if ch == ">": + angle_depth = max(0, angle_depth - 1) + current.append(ch) + continue + if ch == "," and paren_depth == 0 and angle_depth == 0: + segments.append("".join(current).strip()) + current = [] + continue + current.append(ch) + + tail = "".join(current).strip() + if tail or segments: + segments.append(tail) + return segments + + def _expand_pipe_namespace(text: str) -> list[str]: + parts = text.split("|") + expanded: list[str] = [] + last_ns: str | None = None + for part in parts: + segment = part.strip() + if not segment: + continue + if ":" in segment: + ns, val = segment.split(":", 1) + ns = ns.strip() + val = val.strip() + last_ns = ns or last_ns + if last_ns is not None: + expanded.append(f"{last_ns}:{val}") + elif ns or val: + expanded.append(f"{ns}:{val}") + else: + if last_ns: + expanded.append(f"{last_ns}:{segment}") + else: + expanded.append(segment) + return expanded + + tags: list[str] = [] + for argument in arguments: + for token in _split_top_level_commas(str(argument)): + text = token.strip() + if not text: + continue + for entry in _expand_pipe_namespace(text): + candidate = entry.strip() + if not candidate: + continue + if ":" in candidate: + ns, val = candidate.split(":", 1) + candidate = f"{ns.strip()}:{val.strip()}" + if candidate: + tags.append(candidate) + return tags + + CMDLET = Cmdlet( name="delete-tag", summary="Remove tags from a file in a store.", @@ -221,7 +486,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: except Exception: grouped_table = "" grouped_tags = get_field(result, "tag") if result is not None else None - tags_arg = parse_tag_arguments(rest) + tags_arg = _parse_delete_tag_arguments(rest) if (grouped_table == "tag.selection" and isinstance(grouped_tags, list) and grouped_tags and not tags_arg): @@ -327,6 +592,10 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: config, result=item): success_count += 1 + try: + ctx.emit(item) + except Exception: + pass if success_count > 0: return 0 @@ -394,7 +663,7 @@ def _process_deletion( file=sys.stderr, ) - tags = list(resolved_tags) + tags = _expand_namespace_delete_tags(list(resolved_tags), existing_tag_list) if not tags: return False @@ -428,10 +697,24 @@ def _process_deletion( raise exc or KeyError(store_name) ok = backend.delete_tag(resolved_hash, list(tags), config=config) if ok: - preview = resolved_hash[:12] + ("…" if len(resolved_hash) > 12 else "") - debug( - f"Removed {len(tags)} tag(s) from {preview} via store '{store_name}'." - ) + refreshed_tags: list[str] = [] + try: + refreshed, _src = backend.get_tag(resolved_hash, config=config) + refreshed_tags = list(refreshed or []) + except Exception: + delete_set = {str(tag).strip().casefold() for tag in tags} + refreshed_tags = [ + existing_tag for existing_tag in existing_tag_list + if str(existing_tag).strip().casefold() not in delete_set + ] + + if result is not None: + _set_result_tags(result, refreshed_tags) + title_value = extract_title_tag_value(refreshed_tags) + if title_value: + _apply_title_to_result(result, title_value) + + _refresh_result_table_tags(refreshed_tags, resolved_hash, store_name, path) _refresh_tag_view_if_current(resolved_hash, store_name, path, config) return True return False diff --git a/cmdnat/table.py b/cmdnat/table.py index 0633b1d..9053e72 100644 --- a/cmdnat/table.py +++ b/cmdnat/table.py @@ -165,6 +165,11 @@ def _render_table(table: Any) -> int: log("No active result table", file=sys.stderr) return 1 + try: + setattr(table, "_rendered_by_cmdlet", True) + except Exception: + pass + try: if hasattr(table, "to_rich"): stdout_console().print(table.to_rich())