from __future__ import annotations from typing import Any, Dict, List, Sequence, Optional from pathlib import Path import sys import re from SYS.logger import log from SYS import models from SYS import pipeline as ctx from . import _shared as sh normalize_result_input = sh.normalize_result_input filter_results_by_temp = sh.filter_results_by_temp Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg SharedArgs = sh.SharedArgs normalize_hash = sh.normalize_hash parse_tag_arguments = sh.parse_tag_arguments expand_tag_groups = sh.expand_tag_groups parse_cmdlet_args = sh.parse_cmdlet_args collapse_namespace_tag = sh.collapse_namespace_tag should_show_help = sh.should_show_help get_field = sh.get_field from Store import Store from SYS.utils import sha256_file _FIELD_NAME_RE = re.compile(r"^[A-Za-z0-9_]+$") def _normalize_title_for_extract(text: str) -> str: """Normalize common separators in titles for matching. Helps when sources use unicode dashes or odd whitespace. """ s = str(text or "").strip() if not s: return s # Common unicode dash variants -> '-' s = s.replace("\u2013", "-") # en dash s = s.replace("\u2014", "-") # em dash s = s.replace("\u2212", "-") # minus sign s = s.replace("\u2010", "-") # hyphen s = s.replace("\u2011", "-") # non-breaking hyphen s = s.replace("\u2012", "-") # figure dash s = s.replace("\u2015", "-") # horizontal bar # Collapse any whitespace runs (including newlines/tabs) to a single space. # Some sources wrap the artist name or title across lines. try: s = re.sub(r"\s+", " ", s).strip() except Exception: s = " ".join(s.split()) return s def _strip_title_prefix(text: str) -> str: s = str(text or "").strip() if s.lower().startswith("title:"): s = s.split(":", 1)[1].strip() return s def _literal_to_title_pattern_regex(literal: str) -> str: """Convert a literal chunk of a template into a regex fragment. Keeps punctuation literal, but treats any whitespace run as \\s*. """ out: List[str] = [] i = 0 while i < len(literal): ch = literal[i] if ch.isspace(): while i < len(literal) and literal[i].isspace(): i += 1 out.append(r"\s*") continue out.append(re.escape(ch)) i += 1 return "".join(out) def _compile_extract_template(template: str) -> tuple[re.Pattern[str], List[str]]: """Compile a simple (field) template into a regex. Example template: (artist) - (album) - (disk)-(track) (title) This is *not* user-facing regex: we only support named fields in parentheses. """ tpl = str(template or "").strip() if not tpl: raise ValueError("empty extract template") matches = list(re.finditer(r"\(([^)]+)\)", tpl)) if not matches: raise ValueError("extract template must contain at least one (field)") field_names: List[str] = [] parts: List[str] = [r"^\s*"] last_end = 0 for idx, m in enumerate(matches): literal = tpl[last_end:m.start()] if literal: parts.append(_literal_to_title_pattern_regex(literal)) raw_name = (m.group(1) or "").strip() if not raw_name or not _FIELD_NAME_RE.fullmatch(raw_name): raise ValueError( f"invalid field name '{raw_name}' (use A-Z, 0-9, underscore)" ) field_names.append(raw_name) name_lower = raw_name.lower() is_last = idx == (len(matches) - 1) if is_last: parts.append(rf"(?P<{raw_name}>.+)") else: # Heuristic: common numeric fields should capture full digit runs. # This avoids ambiguous splits like track='2', title='3 ...'. if name_lower in { "disk", "disc", "cd", "track", "trk", "episode", "ep", "season", "year", }: parts.append(rf"(?P<{raw_name}>\d+)") else: parts.append(rf"(?P<{raw_name}>.+?)") last_end = m.end() tail = tpl[last_end:] if tail: parts.append(_literal_to_title_pattern_regex(tail)) parts.append(r"\s*$") rx = "".join(parts) return re.compile(rx, flags=re.IGNORECASE), field_names def _extract_tags_from_title(title_text: str, template: str) -> List[str]: """Extract (field)->value from title_text and return ['field:value', ...].""" title_clean = _normalize_title_for_extract(_strip_title_prefix(title_text)) if not title_clean: return [] pattern, field_names = _compile_extract_template(template) m = pattern.match(title_clean) if not m: return [] out: List[str] = [] for name in field_names: value = (m.group(name) or "").strip() if not value: continue out.append(f"{name}:{value}") return out def _get_title_candidates_for_extraction( res: Any, existing_tags: Optional[List[str]] = None ) -> List[str]: """Return a list of possible title strings in priority order.""" candidates: List[str] = [] def add_candidate(val: Any) -> None: if val is None: return s = _normalize_title_for_extract(_strip_title_prefix(str(val))) if not s: return if s not in candidates: candidates.append(s) # 1) Item's title field (may be a display title, not the title: tag) try: add_candidate(get_field(res, "title")) except Exception: pass if isinstance(res, dict): add_candidate(res.get("title")) # 2) title: tag from either store tags or piped tags tags = existing_tags if isinstance(existing_tags, list) else _extract_item_tags(res) add_candidate(_extract_title_tag(tags) or "") # 3) Filename stem try: path_val = get_field(res, "path") if path_val: p = Path(str(path_val)) add_candidate((p.stem or "").strip()) except Exception: pass return candidates def _extract_tags_from_title_candidates(candidates: List[str], template: str) -> tuple[List[str], Optional[str]]: """Try candidates in order; return (tags, matched_candidate).""" for c in candidates: extracted = _extract_tags_from_title(c, template) if extracted: return extracted, c return [], None def _try_compile_extract_template( template: Optional[str], ) -> tuple[Optional[re.Pattern[str]], Optional[str]]: """Compile template for debug; return (pattern, error_message).""" if template is None: return None, None try: pattern, _fields = _compile_extract_template(str(template)) return pattern, None except Exception as exc: return None, str(exc) def _extract_title_tag(tags: List[str]) -> Optional[str]: """Return the value of the first title: tag if present.""" for t in tags: if t.lower().startswith("title:"): value = t.split(":", 1)[1].strip() return value or None return None def _extract_item_tags(res: Any) -> List[str]: if isinstance(res, models.PipeObject): raw = getattr(res, "tag", None) elif isinstance(res, dict): raw = res.get("tag") else: raw = None if isinstance(raw, list): return [str(t) for t in raw if t is not None] if isinstance(raw, str) and raw.strip(): return [raw] return [] def _set_item_tags(res: Any, tags: List[str]) -> None: if isinstance(res, models.PipeObject): res.tag = tags elif isinstance(res, dict): res["tag"] = tags def _apply_title_to_result(res: Any, title_value: Optional[str]) -> None: """Update result object/dict title fields and columns in-place.""" if not title_value: return if isinstance(res, models.PipeObject): res.title = title_value # Update columns if present (Title column assumed index 0) columns = getattr(res, "columns", None) if isinstance(columns, list) and columns: label, *_ = columns[0] if str(label).lower() == "title": columns[0] = (label, title_value) elif isinstance(res, dict): res["title"] = title_value cols = res.get("columns") if isinstance(cols, list): updated = [] changed = False for col in cols: if isinstance(col, tuple) and len(col) == 2: label, _val = col if str(label).lower() == "title": updated.append((label, title_value)) changed = True else: updated.append(col) else: updated.append(col) if changed: res["columns"] = updated def _matches_target( item: Any, target_hash: Optional[str], target_path: Optional[str], target_store: Optional[str] = None, ) -> bool: """Determine whether a result item refers to the given target. Important: hashes can collide across backends in this app's UX (same media in multiple stores). When target_store is provided, it must match too. """ def norm(val: Any) -> Optional[str]: return str(val).lower() if val is not None else None target_hash_l = target_hash.lower() if target_hash else None target_path_l = target_path.lower() if target_path else None target_store_l = target_store.lower() if target_store else None if isinstance(item, dict): hashes = [norm(item.get("hash"))] paths = [norm(item.get("path"))] stores = [norm(item.get("store"))] else: hashes = [norm(get_field(item, "hash"))] paths = [norm(get_field(item, "path"))] stores = [norm(get_field(item, "store"))] if target_store_l: if target_store_l not in stores: return False if target_hash_l and target_hash_l in hashes: return True if target_path_l and target_path_l in paths: return True return False def _update_item_title_fields(item: Any, new_title: str) -> None: """Mutate an item to reflect a new title in plain fields and columns.""" if isinstance(item, models.PipeObject): item.title = new_title columns = getattr(item, "columns", None) if isinstance(columns, list) and columns: label, *_ = columns[0] if str(label).lower() == "title": columns[0] = (label, new_title) elif isinstance(item, dict): item["title"] = new_title cols = item.get("columns") if isinstance(cols, list): updated_cols = [] changed = False for col in cols: if isinstance(col, tuple) and len(col) == 2: label, _val = col if str(label).lower() == "title": updated_cols.append((label, new_title)) changed = True else: updated_cols.append(col) else: updated_cols.append(col) if changed: item["columns"] = updated_cols def _refresh_result_table_title( new_title: str, target_hash: Optional[str], target_store: Optional[str], target_path: Optional[str], ) -> None: """Refresh the cached result table with an updated title and redisplay it.""" try: last_table = ctx.get_last_result_table() items = ctx.get_last_result_items() if not last_table or not items: return updated_items = [] match_found = False for item in items: try: if _matches_target(item, target_hash, target_path, target_store): _update_item_title_fields(item, new_title) match_found = True except Exception: pass updated_items.append(item) if not match_found: return new_table = last_table.copy_with_title(getattr(last_table, "title", "")) for item in updated_items: new_table.add_result(item) # Keep the underlying history intact; update only the overlay so @.. can # clear the overlay then continue back to prior tables (e.g., the search list). ctx.set_last_result_table_overlay(new_table, updated_items) except Exception: pass def _refresh_tag_view( res: Any, target_hash: Optional[str], store_name: Optional[str], target_path: Optional[str], config: Dict[str, Any], ) -> None: """Refresh tag display via get-tag. Prefer current subject; fall back to direct hash refresh.""" try: from cmdlet import get as get_cmdlet # type: ignore except Exception: return if not target_hash or not store_name: return refresh_args: List[str] = ["-query", f"hash:{target_hash}", "-store", store_name] get_tag = None try: get_tag = get_cmdlet("get-tag") except Exception: get_tag = None if not callable(get_tag): return try: subject = ctx.get_last_result_subject() if subject and _matches_target(subject, target_hash, target_path, store_name): get_tag(subject, refresh_args, config) return except Exception: pass try: get_tag(res, refresh_args, config) except Exception: pass class Add_Tag(Cmdlet): """Class-based add-tag cmdlet with Cmdlet metadata inheritance.""" def __init__(self) -> None: super().__init__( name="add-tag", summary="Add tag to a file in a store.", usage= 'add-tag -store [-query "hash:"] [-duplicate ] [-list [,...]] [--all] [,...]', arg=[ CmdletArg( "tag", type="string", required=False, description= "One or more tag to add. Comma- or space-separated. Can also use {list_name} syntax. If omitted, uses tag from pipeline payload.", variadic=True, ), SharedArgs.QUERY, SharedArgs.STORE, CmdletArg( "-extract", type="string", description= 'Extract tags from the item\'s title using a simple template with (field) placeholders. Example: -extract "(artist) - (album) - (disk)-(track) (title)" will add artist:, album:, disk:, track:, title: tags.', ), CmdletArg( "--extract-debug", type="flag", description= "Print debug info for -extract matching (matched title source and extracted tags).", ), CmdletArg( "-duplicate", type="string", description= "Copy existing tag values to new namespaces. Formats: title:album,artist (explicit) or title,album,artist (inferred)", ), CmdletArg( "-list", type="string", description= "Load predefined tag lists from adjective.json. Comma-separated list names (e.g., -list philosophy,occult).", ), CmdletArg( "--all", type="flag", description= "Include temporary files in tagging (by default, only tag non-temporary files).", ), ], detail=[ "- By default, only tag non-temporary files (from pipelines). Use --all to tag everything.", "- Requires a store backend: use -store or pipe items that include store.", "- If -query is not provided, uses the piped item's hash (or derives from its path when possible).", "- Multiple tag can be comma-separated or space-separated.", "- Use -list to include predefined tag lists from adjective.json: -list philosophy,occult", '- tag can also reference lists with curly braces: add-tag {philosophy} "other:tag"', "- Use -duplicate to copy EXISTING tag values to new namespaces:", " Explicit format: -duplicate title:album,artist (copies title: to album: and artist:)", " Inferred format: -duplicate title,album,artist (first is source, rest are targets)", "- The source namespace must already exist in the file being tagged.", "- Target namespaces that already have a value are skipped (not overwritten).", "- Use -extract to derive namespaced tags from the current title (title field or title: tag) using a simple template.", ], exec=self.run, ) self.register() def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Add tag to a file with smart filtering for pipeline results.""" if should_show_help(args): log(f"Cmdlet: {self.name}\nSummary: {self.summary}\nUsage: {self.usage}") return 0 # Parse arguments parsed = parse_cmdlet_args(args, self) extract_template = parsed.get("extract") if extract_template is not None: extract_template = str(extract_template) extract_debug = bool(parsed.get("extract-debug", False)) extract_debug_rx, extract_debug_err = _try_compile_extract_template(extract_template) query_hash = sh.parse_single_hash_query(parsed.get("query")) if parsed.get("query") and not query_hash: log( "[add_tag] Error: -query must be of the form hash:", file=sys.stderr ) return 1 hash_override = normalize_hash(query_hash) if query_hash else None # If add-tag is in the middle of a pipeline (has downstream stages), default to # including temp files. This enables common flows like: # @N | download-file | add-tag ... | add-file ... store_override = parsed.get("store") stage_ctx = ctx.get_stage_context() has_downstream = bool( stage_ctx is not None and not getattr(stage_ctx, "is_last_stage", False) ) include_temp = bool(parsed.get("all", False)) if has_downstream and not include_temp and not store_override: include_temp = True # Normalize input to list results = normalize_result_input(result) # Filter by temp status (unless --all is set) if not include_temp: results = filter_results_by_temp(results, include_temp=False) # When no pipeline payload is present but -query/-store pinpoints a hash, tag it directly. if not results and hash_override and store_override: results = [{"hash": hash_override, "store": store_override}] if not results: log( "No valid files to tag (all results were temporary; use --all to include temporary files)", file=sys.stderr, ) return 1 # Get tag from arguments (or fallback to pipeline payload) raw_tag = parsed.get("tag", []) if isinstance(raw_tag, str): raw_tag = [raw_tag] # Fallback: if no tag provided explicitly, try to pull from first result payload. # IMPORTANT: when -extract is used, users typically want *only* extracted tags, # not "re-add whatever tags are already in the payload". if not raw_tag and results and not extract_template: first = results[0] payload_tag = None # Try multiple tag lookup strategies in order tag_lookups = [ lambda x: getattr(x, "tag", None), lambda x: x.get("tag") if isinstance(x, dict) else None, ] for lookup in tag_lookups: try: payload_tag = lookup(first) if payload_tag: break except (AttributeError, TypeError, KeyError): continue if payload_tag: if isinstance(payload_tag, str): raw_tag = [payload_tag] elif isinstance(payload_tag, list): raw_tag = payload_tag # Handle -list argument (convert to {list} syntax) list_arg = parsed.get("list") if list_arg: for l in list_arg.split(","): l = l.strip() if l: raw_tag.append(f"{{{l}}}") # Parse and expand tag tag_to_add = parse_tag_arguments(raw_tag) tag_to_add = expand_tag_groups(tag_to_add) if not tag_to_add and not extract_template: log( "No tag provided to add (and no -extract template provided)", file=sys.stderr ) return 1 if extract_template and extract_debug and extract_debug_err: log( f"[add_tag] extract template error: {extract_debug_err}", file=sys.stderr ) return 1 # Get other flags duplicate_arg = parsed.get("duplicate") # tag ARE provided - apply them to each store-backed result total_added = 0 total_modified = 0 store_registry = Store(config) extract_matched_items = 0 extract_no_match_items = 0 for res in results: store_name: Optional[str] raw_hash: Optional[str] raw_path: Optional[str] if isinstance(res, models.PipeObject): store_name = store_override or res.store raw_hash = res.hash raw_path = res.path elif isinstance(res, dict): store_name = store_override or res.get("store") raw_hash = res.get("hash") raw_path = res.get("path") else: ctx.emit(res) continue if not store_name: store_name = None # If the item isn't in a configured store backend yet (e.g., store=PATH) but has a local file, # treat add-tag as a pipeline mutation (carry tags forward for add-file) instead of a store write. if not store_override: store_name_str = str(store_name) if store_name is not None else "" local_mode_requested = ( (not store_name_str) or (store_name_str.upper() == "PATH") or (store_name_str.lower() == "local") ) is_known_backend = bool(store_name_str) and store_registry.is_available( store_name_str ) if local_mode_requested and raw_path: try: if Path(str(raw_path)).expanduser().exists(): existing_tag_list = _extract_item_tags(res) existing_lower = { t.lower() for t in existing_tag_list if isinstance(t, str) } item_tag_to_add = list(tag_to_add) if extract_template: candidates = _get_title_candidates_for_extraction( res, existing_tag_list ) extracted, matched = _extract_tags_from_title_candidates( candidates, extract_template ) if extracted: extract_matched_items += 1 if extract_debug: log( f"[add_tag] extract matched: {matched!r} -> {extracted}", file=sys.stderr, ) for new_tag in extracted: if new_tag.lower() not in existing_lower: item_tag_to_add.append(new_tag) else: extract_no_match_items += 1 if extract_debug: rx_preview = ( extract_debug_rx.pattern if extract_debug_rx else "" ) cand_preview = "; ".join( [repr(c) for c in candidates[:3]] ) log( f"[add_tag] extract no match for template {extract_template!r}. regex: {rx_preview!r}. candidates: {cand_preview}", file=sys.stderr, ) item_tag_to_add = collapse_namespace_tag( item_tag_to_add, "title", prefer="last" ) if duplicate_arg: parts = str(duplicate_arg).split(":") source_ns = "" targets: list[str] = [] if len(parts) > 1: source_ns = parts[0] targets = [ t.strip() for t in parts[1].split(",") if t.strip() ] else: parts2 = str(duplicate_arg).split(",") if len(parts2) > 1: source_ns = parts2[0] targets = [ t.strip() for t in parts2[1:] if t.strip() ] if source_ns and targets: source_prefix = source_ns.lower() + ":" for t in existing_tag_list: if not t.lower().startswith(source_prefix): continue value = t.split(":", 1)[1] for target_ns in targets: new_tag = f"{target_ns}:{value}" if new_tag.lower() not in existing_lower: item_tag_to_add.append(new_tag) removed_namespace_tag: list[str] = [] for new_tag in item_tag_to_add: if not isinstance(new_tag, str) or ":" not in new_tag: continue ns = new_tag.split(":", 1)[0].strip() if not ns: continue ns_prefix = ns.lower() + ":" for t in existing_tag_list: if (t.lower().startswith(ns_prefix) and t.lower() != new_tag.lower()): removed_namespace_tag.append(t) removed_namespace_tag = sorted( {t for t in removed_namespace_tag} ) actual_tag_to_add = [ t for t in item_tag_to_add if isinstance(t, str) and t.lower() not in existing_lower ] updated_tag_list = [ t for t in existing_tag_list if t not in removed_namespace_tag ] updated_tag_list.extend(actual_tag_to_add) _set_item_tags(res, updated_tag_list) final_title = _extract_title_tag(updated_tag_list) _apply_title_to_result(res, final_title) total_added += len(actual_tag_to_add) total_modified += ( 1 if (removed_namespace_tag or actual_tag_to_add) else 0 ) ctx.emit(res) continue except Exception: pass if local_mode_requested: log( "[add_tag] Error: Missing usable local path for tagging (or provide -store)", file=sys.stderr, ) return 1 if store_name_str and not is_known_backend: log( f"[add_tag] Error: Unknown store '{store_name_str}'. Available: {store_registry.list_backends()}", file=sys.stderr, ) return 1 resolved_hash = ( normalize_hash(hash_override) if hash_override else normalize_hash(raw_hash) ) if not resolved_hash and raw_path: try: p = Path(str(raw_path)) stem = p.stem if len(stem) == 64 and all(c in "0123456789abcdef" for c in stem.lower()): resolved_hash = stem.lower() elif p.exists() and p.is_file(): resolved_hash = sha256_file(p) except Exception: resolved_hash = None if not resolved_hash: log( "[add_tag] Warning: Item missing usable hash (and could not derive from path); skipping", file=sys.stderr, ) ctx.emit(res) continue try: backend = store_registry[str(store_name)] except Exception as exc: log( f"[add_tag] Error: Unknown store '{store_name}': {exc}", file=sys.stderr ) return 1 try: existing_tag, _src = backend.get_tag(resolved_hash, config=config) except Exception: existing_tag = [] existing_tag_list = [t for t in (existing_tag or []) if isinstance(t, str)] existing_lower = {t.lower() for t in existing_tag_list} original_title = _extract_title_tag(existing_tag_list) # Per-item tag list (do not mutate shared list) item_tag_to_add = list(tag_to_add) if extract_template: candidates2 = _get_title_candidates_for_extraction( res, existing_tag_list ) extracted2, matched2 = _extract_tags_from_title_candidates( candidates2, extract_template ) if extracted2: extract_matched_items += 1 if extract_debug: log( f"[add_tag] extract matched: {matched2!r} -> {extracted2}", file=sys.stderr, ) for new_tag in extracted2: if new_tag.lower() not in existing_lower: item_tag_to_add.append(new_tag) else: extract_no_match_items += 1 if extract_debug: rx_preview2 = ( extract_debug_rx.pattern if extract_debug_rx else "" ) cand_preview2 = "; ".join([repr(c) for c in candidates2[:3]]) log( f"[add_tag] extract no match for template {extract_template!r}. regex: {rx_preview2!r}. candidates: {cand_preview2}", file=sys.stderr, ) item_tag_to_add = collapse_namespace_tag( item_tag_to_add, "title", prefer="last" ) # Handle -duplicate logic (copy existing tag to new namespaces) if duplicate_arg: parts = str(duplicate_arg).split(":") source_ns = "" targets: list[str] = [] if len(parts) > 1: source_ns = parts[0] targets = [t.strip() for t in parts[1].split(",") if t.strip()] else: parts2 = str(duplicate_arg).split(",") if len(parts2) > 1: source_ns = parts2[0] targets = [t.strip() for t in parts2[1:] if t.strip()] if source_ns and targets: source_prefix = source_ns.lower() + ":" for t in existing_tag_list: if not t.lower().startswith(source_prefix): continue value = t.split(":", 1)[1] for target_ns in targets: new_tag = f"{target_ns}:{value}" if new_tag.lower() not in existing_lower: item_tag_to_add.append(new_tag) changed = False try: ok_add = backend.add_tag(resolved_hash, item_tag_to_add, config=config) if not ok_add: log("[add_tag] Warning: Store rejected tag update", file=sys.stderr) except Exception as exc: log(f"[add_tag] Warning: Failed adding tag: {exc}", file=sys.stderr) try: refreshed_tag, _src2 = backend.get_tag(resolved_hash, config=config) refreshed_list = [ t for t in (refreshed_tag or []) if isinstance(t, str) ] except Exception: refreshed_list = existing_tag_list # Decide whether anything actually changed (case-sensitive so title casing updates count). if set(refreshed_list) != set(existing_tag_list): changed = True before_lower = {t.lower() for t in existing_tag_list} after_lower = {t.lower() for t in refreshed_list} total_added += len(after_lower - before_lower) total_modified += 1 # Update the result's tag using canonical field if isinstance(res, models.PipeObject): res.tag = refreshed_list elif isinstance(res, dict): res["tag"] = refreshed_list final_title = _extract_title_tag(refreshed_list) _apply_title_to_result(res, final_title) if final_title and (not original_title or final_title != original_title): _refresh_result_table_title( final_title, resolved_hash, str(store_name), raw_path ) if changed: _refresh_tag_view(res, resolved_hash, str(store_name), raw_path, config) ctx.emit(res) log( f"[add_tag] Added {total_added} new tag(s) across {len(results)} item(s); modified {total_modified} item(s)", file=sys.stderr, ) if extract_template and extract_matched_items == 0: log( f"[add_tag] extract: no matches for template '{extract_template}' across {len(results)} item(s)", file=sys.stderr, ) elif extract_template and extract_no_match_items > 0 and extract_debug: log( f"[add_tag] extract: matched {extract_matched_items}, no-match {extract_no_match_items}", file=sys.stderr, ) return 0 CMDLET = Add_Tag()