2025-12-11 12:47:30 -08:00
from __future__ import annotations
from typing import Any , Dict , List , Sequence , Optional
from pathlib import Path
import sys
2025-12-27 06:05:07 -08:00
import re
2025-12-11 12:47:30 -08:00
2025-12-11 19:04:02 -08:00
from SYS . logger import log
2025-12-11 12:47:30 -08:00
2025-12-29 23:28:15 -08:00
from SYS import models
from SYS import pipeline as ctx
2025-12-16 23:23:43 -08:00
from . import _shared as sh
normalize_result_input = sh . normalize_result_input
filter_results_by_temp = sh . filter_results_by_temp
Cmdlet = sh . Cmdlet
CmdletArg = sh . CmdletArg
SharedArgs = sh . SharedArgs
normalize_hash = sh . normalize_hash
parse_tag_arguments = sh . parse_tag_arguments
expand_tag_groups = sh . expand_tag_groups
parse_cmdlet_args = sh . parse_cmdlet_args
collapse_namespace_tag = sh . collapse_namespace_tag
should_show_help = sh . should_show_help
get_field = sh . get_field
2025-12-11 23:21:45 -08:00
from Store import Store
from SYS . utils import sha256_file
2025-12-27 06:05:07 -08:00
_FIELD_NAME_RE = re . compile ( r " ^[A-Za-z0-9_]+$ " )
def _normalize_title_for_extract ( text : str ) - > str :
""" Normalize common separators in titles for matching.
Helps when sources use unicode dashes or odd whitespace .
"""
s = str ( text or " " ) . strip ( )
if not s :
return s
# Common unicode dash variants -> '-'
s = s . replace ( " \u2013 " , " - " ) # en dash
s = s . replace ( " \u2014 " , " - " ) # em dash
s = s . replace ( " \u2212 " , " - " ) # minus sign
s = s . replace ( " \u2010 " , " - " ) # hyphen
s = s . replace ( " \u2011 " , " - " ) # non-breaking hyphen
s = s . replace ( " \u2012 " , " - " ) # figure dash
s = s . replace ( " \u2015 " , " - " ) # horizontal bar
2025-12-27 14:50:59 -08:00
# Collapse any whitespace runs (including newlines/tabs) to a single space.
# Some sources wrap the artist name or title across lines.
try :
s = re . sub ( r " \ s+ " , " " , s ) . strip ( )
except Exception :
s = " " . join ( s . split ( ) )
2025-12-27 06:05:07 -08:00
return s
def _strip_title_prefix ( text : str ) - > str :
s = str ( text or " " ) . strip ( )
if s . lower ( ) . startswith ( " title: " ) :
s = s . split ( " : " , 1 ) [ 1 ] . strip ( )
return s
def _literal_to_title_pattern_regex ( literal : str ) - > str :
""" Convert a literal chunk of a template into a regex fragment.
Keeps punctuation literal , but treats any whitespace run as \\s * .
"""
out : List [ str ] = [ ]
i = 0
while i < len ( literal ) :
ch = literal [ i ]
if ch . isspace ( ) :
while i < len ( literal ) and literal [ i ] . isspace ( ) :
i + = 1
2025-12-27 14:50:59 -08:00
out . append ( r " \ s* " )
2025-12-27 06:05:07 -08:00
continue
out . append ( re . escape ( ch ) )
i + = 1
return " " . join ( out )
def _compile_extract_template ( template : str ) - > tuple [ re . Pattern [ str ] , List [ str ] ] :
""" Compile a simple (field) template into a regex.
Example template :
( artist ) - ( album ) - ( disk ) - ( track ) ( title )
This is * not * user - facing regex : we only support named fields in parentheses .
"""
tpl = str ( template or " " ) . strip ( )
if not tpl :
raise ValueError ( " empty extract template " )
matches = list ( re . finditer ( r " \ (([^)]+) \ ) " , tpl ) )
if not matches :
raise ValueError ( " extract template must contain at least one (field) " )
field_names : List [ str ] = [ ]
2025-12-27 14:50:59 -08:00
parts : List [ str ] = [ r " ^ \ s* " ]
2025-12-27 06:05:07 -08:00
last_end = 0
for idx , m in enumerate ( matches ) :
2025-12-29 18:42:02 -08:00
literal = tpl [ last_end : m . start ( ) ]
2025-12-27 06:05:07 -08:00
if literal :
parts . append ( _literal_to_title_pattern_regex ( literal ) )
raw_name = ( m . group ( 1 ) or " " ) . strip ( )
if not raw_name or not _FIELD_NAME_RE . fullmatch ( raw_name ) :
2025-12-29 18:42:02 -08:00
raise ValueError (
f " invalid field name ' { raw_name } ' (use A-Z, 0-9, underscore) "
)
2025-12-27 06:05:07 -08:00
field_names . append ( raw_name )
2025-12-27 14:50:59 -08:00
name_lower = raw_name . lower ( )
2025-12-27 06:05:07 -08:00
is_last = idx == ( len ( matches ) - 1 )
if is_last :
2025-12-29 17:05:03 -08:00
parts . append ( rf " (?P< { raw_name } >.+) " )
2025-12-27 06:05:07 -08:00
else :
2025-12-27 14:50:59 -08:00
# Heuristic: common numeric fields should capture full digit runs.
# This avoids ambiguous splits like track='2', title='3 ...'.
2025-12-29 17:05:03 -08:00
if name_lower in {
2025-12-29 18:42:02 -08:00
" disk " ,
" disc " ,
" cd " ,
" track " ,
" trk " ,
" episode " ,
" ep " ,
" season " ,
" year " ,
2025-12-29 17:05:03 -08:00
} :
parts . append ( rf " (?P< { raw_name } > \ d+) " )
2025-12-27 14:50:59 -08:00
else :
2025-12-29 17:05:03 -08:00
parts . append ( rf " (?P< { raw_name } >.+?) " )
2025-12-27 06:05:07 -08:00
last_end = m . end ( )
tail = tpl [ last_end : ]
if tail :
parts . append ( _literal_to_title_pattern_regex ( tail ) )
2025-12-27 14:50:59 -08:00
parts . append ( r " \ s*$ " )
2025-12-27 06:05:07 -08:00
rx = " " . join ( parts )
return re . compile ( rx , flags = re . IGNORECASE ) , field_names
def _extract_tags_from_title ( title_text : str , template : str ) - > List [ str ] :
""" Extract (field)->value from title_text and return [ ' field:value ' , ...]. """
title_clean = _normalize_title_for_extract ( _strip_title_prefix ( title_text ) )
if not title_clean :
return [ ]
pattern , field_names = _compile_extract_template ( template )
m = pattern . match ( title_clean )
if not m :
return [ ]
out : List [ str ] = [ ]
for name in field_names :
value = ( m . group ( name ) or " " ) . strip ( )
if not value :
continue
out . append ( f " { name } : { value } " )
return out
2025-12-29 17:05:03 -08:00
def _get_title_candidates_for_extraction (
2025-12-29 18:42:02 -08:00
res : Any ,
existing_tags : Optional [ List [ str ] ] = None
2025-12-29 17:05:03 -08:00
) - > List [ str ] :
2025-12-27 06:05:07 -08:00
""" Return a list of possible title strings in priority order. """
candidates : List [ str ] = [ ]
def add_candidate ( val : Any ) - > None :
if val is None :
return
s = _normalize_title_for_extract ( _strip_title_prefix ( str ( val ) ) )
if not s :
return
if s not in candidates :
candidates . append ( s )
# 1) Item's title field (may be a display title, not the title: tag)
try :
add_candidate ( get_field ( res , " title " ) )
except Exception :
pass
if isinstance ( res , dict ) :
add_candidate ( res . get ( " title " ) )
# 2) title: tag from either store tags or piped tags
tags = existing_tags if isinstance ( existing_tags , list ) else _extract_item_tags ( res )
add_candidate ( _extract_title_tag ( tags ) or " " )
# 3) Filename stem
try :
path_val = get_field ( res , " path " )
if path_val :
p = Path ( str ( path_val ) )
add_candidate ( ( p . stem or " " ) . strip ( ) )
except Exception :
pass
return candidates
2025-12-29 18:42:02 -08:00
def _extract_tags_from_title_candidates ( candidates : List [ str ] ,
template : str ) - > tuple [ List [ str ] ,
Optional [ str ] ] :
2025-12-27 06:05:07 -08:00
""" Try candidates in order; return (tags, matched_candidate). """
for c in candidates :
extracted = _extract_tags_from_title ( c , template )
if extracted :
return extracted , c
return [ ] , None
2025-12-29 17:05:03 -08:00
def _try_compile_extract_template (
template : Optional [ str ] ,
2025-12-29 18:42:02 -08:00
) - > tuple [ Optional [ re . Pattern [ str ] ] ,
Optional [ str ] ] :
2025-12-27 06:05:07 -08:00
""" Compile template for debug; return (pattern, error_message). """
if template is None :
return None , None
try :
pattern , _fields = _compile_extract_template ( str ( template ) )
return pattern , None
except Exception as exc :
return None , str ( exc )
2025-12-11 23:21:45 -08:00
def _extract_title_tag ( tags : List [ str ] ) - > Optional [ str ] :
""" Return the value of the first title: tag if present. """
for t in tags :
if t . lower ( ) . startswith ( " title: " ) :
value = t . split ( " : " , 1 ) [ 1 ] . strip ( )
return value or None
return None
2025-12-13 00:18:30 -08:00
def _extract_item_tags ( res : Any ) - > List [ str ] :
if isinstance ( res , models . PipeObject ) :
raw = getattr ( res , " tag " , None )
elif isinstance ( res , dict ) :
raw = res . get ( " tag " )
else :
raw = None
if isinstance ( raw , list ) :
return [ str ( t ) for t in raw if t is not None ]
if isinstance ( raw , str ) and raw . strip ( ) :
return [ raw ]
return [ ]
def _set_item_tags ( res : Any , tags : List [ str ] ) - > None :
if isinstance ( res , models . PipeObject ) :
res . tag = tags
elif isinstance ( res , dict ) :
res [ " tag " ] = tags
2025-12-11 23:21:45 -08:00
def _apply_title_to_result ( res : Any , title_value : Optional [ str ] ) - > None :
""" Update result object/dict title fields and columns in-place. """
if not title_value :
return
if isinstance ( res , models . PipeObject ) :
res . title = title_value
# Update columns if present (Title column assumed index 0)
columns = getattr ( res , " columns " , None )
if isinstance ( columns , list ) and columns :
label , * _ = columns [ 0 ]
if str ( label ) . lower ( ) == " title " :
columns [ 0 ] = ( label , title_value )
elif isinstance ( res , dict ) :
res [ " title " ] = title_value
cols = res . get ( " columns " )
if isinstance ( cols , list ) :
updated = [ ]
changed = False
for col in cols :
if isinstance ( col , tuple ) and len ( col ) == 2 :
label , _val = col
if str ( label ) . lower ( ) == " title " :
updated . append ( ( label , title_value ) )
changed = True
else :
updated . append ( col )
else :
updated . append ( col )
if changed :
res [ " columns " ] = updated
2025-12-12 21:55:38 -08:00
def _matches_target (
item : Any ,
target_hash : Optional [ str ] ,
target_path : Optional [ str ] ,
target_store : Optional [ str ] = None ,
) - > bool :
""" Determine whether a result item refers to the given target.
Important : hashes can collide across backends in this app ' s UX (same media in
multiple stores ) . When target_store is provided , it must match too .
"""
2025-12-11 23:21:45 -08:00
def norm ( val : Any ) - > Optional [ str ] :
return str ( val ) . lower ( ) if val is not None else None
target_hash_l = target_hash . lower ( ) if target_hash else None
target_path_l = target_path . lower ( ) if target_path else None
2025-12-12 21:55:38 -08:00
target_store_l = target_store . lower ( ) if target_store else None
2025-12-11 23:21:45 -08:00
if isinstance ( item , dict ) :
hashes = [ norm ( item . get ( " hash " ) ) ]
paths = [ norm ( item . get ( " path " ) ) ]
2025-12-12 21:55:38 -08:00
stores = [ norm ( item . get ( " store " ) ) ]
2025-12-11 23:21:45 -08:00
else :
hashes = [ norm ( get_field ( item , " hash " ) ) ]
paths = [ norm ( get_field ( item , " path " ) ) ]
2025-12-12 21:55:38 -08:00
stores = [ norm ( get_field ( item , " store " ) ) ]
if target_store_l :
if target_store_l not in stores :
return False
2025-12-11 23:21:45 -08:00
if target_hash_l and target_hash_l in hashes :
return True
if target_path_l and target_path_l in paths :
return True
return False
def _update_item_title_fields ( item : Any , new_title : str ) - > None :
""" Mutate an item to reflect a new title in plain fields and columns. """
if isinstance ( item , models . PipeObject ) :
item . title = new_title
columns = getattr ( item , " columns " , None )
if isinstance ( columns , list ) and columns :
label , * _ = columns [ 0 ]
if str ( label ) . lower ( ) == " title " :
columns [ 0 ] = ( label , new_title )
elif isinstance ( item , dict ) :
item [ " title " ] = new_title
cols = item . get ( " columns " )
if isinstance ( cols , list ) :
updated_cols = [ ]
changed = False
for col in cols :
if isinstance ( col , tuple ) and len ( col ) == 2 :
label , _val = col
if str ( label ) . lower ( ) == " title " :
updated_cols . append ( ( label , new_title ) )
changed = True
else :
updated_cols . append ( col )
else :
updated_cols . append ( col )
if changed :
item [ " columns " ] = updated_cols
2025-12-12 21:55:38 -08:00
def _refresh_result_table_title (
new_title : str ,
target_hash : Optional [ str ] ,
target_store : Optional [ str ] ,
target_path : Optional [ str ] ,
) - > None :
2025-12-11 23:21:45 -08:00
""" Refresh the cached result table with an updated title and redisplay it. """
try :
last_table = ctx . get_last_result_table ( )
items = ctx . get_last_result_items ( )
if not last_table or not items :
return
updated_items = [ ]
match_found = False
for item in items :
try :
2025-12-12 21:55:38 -08:00
if _matches_target ( item , target_hash , target_path , target_store ) :
2025-12-11 23:21:45 -08:00
_update_item_title_fields ( item , new_title )
match_found = True
except Exception :
pass
updated_items . append ( item )
if not match_found :
return
new_table = last_table . copy_with_title ( getattr ( last_table , " title " , " " ) )
for item in updated_items :
new_table . add_result ( item )
# Keep the underlying history intact; update only the overlay so @.. can
# clear the overlay then continue back to prior tables (e.g., the search list).
ctx . set_last_result_table_overlay ( new_table , updated_items )
except Exception :
pass
2025-12-29 17:05:03 -08:00
def _refresh_tag_view (
res : Any ,
target_hash : Optional [ str ] ,
store_name : Optional [ str ] ,
target_path : Optional [ str ] ,
2025-12-29 18:42:02 -08:00
config : Dict [ str ,
Any ] ,
2025-12-29 17:05:03 -08:00
) - > None :
2025-12-11 23:21:45 -08:00
""" Refresh tag display via get-tag. Prefer current subject; fall back to direct hash refresh. """
try :
2025-12-12 21:55:38 -08:00
from cmdlet import get as get_cmdlet # type: ignore
2025-12-11 23:21:45 -08:00
except Exception :
return
if not target_hash or not store_name :
return
2025-12-20 02:12:45 -08:00
refresh_args : List [ str ] = [ " -query " , f " hash: { target_hash } " , " -store " , store_name ]
2025-12-11 23:21:45 -08:00
2025-12-12 21:55:38 -08:00
get_tag = None
try :
get_tag = get_cmdlet ( " get-tag " )
except Exception :
get_tag = None
if not callable ( get_tag ) :
return
2025-12-11 23:21:45 -08:00
try :
subject = ctx . get_last_result_subject ( )
2025-12-12 21:55:38 -08:00
if subject and _matches_target ( subject , target_hash , target_path , store_name ) :
get_tag ( subject , refresh_args , config )
2025-12-11 23:21:45 -08:00
return
except Exception :
pass
try :
2025-12-12 21:55:38 -08:00
get_tag ( res , refresh_args , config )
2025-12-11 23:21:45 -08:00
except Exception :
pass
2025-12-11 12:47:30 -08:00
class Add_Tag ( Cmdlet ) :
""" Class-based add-tag cmdlet with Cmdlet metadata inheritance. """
def __init__ ( self ) - > None :
super ( ) . __init__ (
name = " add-tag " ,
2025-12-11 23:21:45 -08:00
summary = " Add tag to a file in a store. " ,
2025-12-29 18:42:02 -08:00
usage =
' add-tag -store <store> [-query " hash:<sha256> " ] [-duplicate <format>] [-list <list>[,<list>...]] [--all] <tag>[,<tag>...] ' ,
2025-12-11 12:47:30 -08:00
arg = [
2025-12-29 17:05:03 -08:00
CmdletArg (
" tag " ,
type = " string " ,
required = False ,
2025-12-29 18:42:02 -08:00
description =
" One or more tag to add. Comma- or space-separated. Can also use {list_name} syntax. If omitted, uses tag from pipeline payload. " ,
2025-12-29 17:05:03 -08:00
variadic = True ,
) ,
2025-12-20 02:12:45 -08:00
SharedArgs . QUERY ,
2025-12-11 12:47:30 -08:00
SharedArgs . STORE ,
2025-12-29 17:05:03 -08:00
CmdletArg (
" -extract " ,
type = " string " ,
2025-12-29 18:42:02 -08:00
description =
' Extract tags from the item \' s title using a simple template with (field) placeholders. Example: -extract " (artist) - (album) - (disk)-(track) (title) " will add artist:, album:, disk:, track:, title: tags. ' ,
2025-12-29 17:05:03 -08:00
) ,
CmdletArg (
" --extract-debug " ,
type = " flag " ,
2025-12-29 18:42:02 -08:00
description =
" Print debug info for -extract matching (matched title source and extracted tags). " ,
2025-12-29 17:05:03 -08:00
) ,
CmdletArg (
" -duplicate " ,
type = " string " ,
2025-12-29 18:42:02 -08:00
description =
" Copy existing tag values to new namespaces. Formats: title:album,artist (explicit) or title,album,artist (inferred) " ,
2025-12-29 17:05:03 -08:00
) ,
CmdletArg (
" -list " ,
type = " string " ,
2025-12-29 18:42:02 -08:00
description =
" Load predefined tag lists from adjective.json. Comma-separated list names (e.g., -list philosophy,occult). " ,
2025-12-29 17:05:03 -08:00
) ,
CmdletArg (
" --all " ,
type = " flag " ,
2025-12-29 18:42:02 -08:00
description =
" Include temporary files in tagging (by default, only tag non-temporary files). " ,
2025-12-29 17:05:03 -08:00
) ,
2025-12-11 12:47:30 -08:00
] ,
detail = [
2025-12-11 23:21:45 -08:00
" - By default, only tag non-temporary files (from pipelines). Use --all to tag everything. " ,
" - Requires a store backend: use -store or pipe items that include store. " ,
2025-12-20 02:12:45 -08:00
" - If -query is not provided, uses the piped item ' s hash (or derives from its path when possible). " ,
2025-12-11 23:21:45 -08:00
" - Multiple tag can be comma-separated or space-separated. " ,
2025-12-11 12:47:30 -08:00
" - Use -list to include predefined tag lists from adjective.json: -list philosophy,occult " ,
2025-12-29 17:05:03 -08:00
' - tag can also reference lists with curly braces: add-tag {philosophy} " other:tag " ' ,
2025-12-11 12:47:30 -08:00
" - Use -duplicate to copy EXISTING tag values to new namespaces: " ,
" Explicit format: -duplicate title:album,artist (copies title: to album: and artist:) " ,
" Inferred format: -duplicate title,album,artist (first is source, rest are targets) " ,
" - The source namespace must already exist in the file being tagged. " ,
" - Target namespaces that already have a value are skipped (not overwritten). " ,
2025-12-27 06:05:07 -08:00
" - Use -extract to derive namespaced tags from the current title (title field or title: tag) using a simple template. " ,
2025-12-11 12:47:30 -08:00
] ,
exec = self . run ,
)
self . register ( )
def run ( self , result : Any , args : Sequence [ str ] , config : Dict [ str , Any ] ) - > int :
2025-12-11 23:21:45 -08:00
""" Add tag to a file with smart filtering for pipeline results. """
2025-12-11 12:47:30 -08:00
if should_show_help ( args ) :
log ( f " Cmdlet: { self . name } \n Summary: { self . summary } \n Usage: { self . usage } " )
return 0
2025-12-11 23:21:45 -08:00
# Parse arguments
2025-12-11 12:47:30 -08:00
parsed = parse_cmdlet_args ( args , self )
2025-12-27 06:05:07 -08:00
extract_template = parsed . get ( " extract " )
if extract_template is not None :
extract_template = str ( extract_template )
extract_debug = bool ( parsed . get ( " extract-debug " , False ) )
extract_debug_rx , extract_debug_err = _try_compile_extract_template ( extract_template )
2025-12-20 02:12:45 -08:00
query_hash = sh . parse_single_hash_query ( parsed . get ( " query " ) )
if parsed . get ( " query " ) and not query_hash :
2025-12-29 18:42:02 -08:00
log (
" [add_tag] Error: -query must be of the form hash:<sha256> " ,
file = sys . stderr
)
2025-12-20 02:12:45 -08:00
return 1
2025-12-16 01:45:01 -08:00
# If add-tag is in the middle of a pipeline (has downstream stages), default to
# including temp files. This enables common flows like:
# @N | download-media | add-tag ... | add-file ...
store_override = parsed . get ( " store " )
stage_ctx = ctx . get_stage_context ( )
2025-12-29 17:05:03 -08:00
has_downstream = bool (
2025-12-29 18:42:02 -08:00
stage_ctx is not None and not getattr ( stage_ctx ,
" is_last_stage " ,
False )
2025-12-29 17:05:03 -08:00
)
2025-12-16 01:45:01 -08:00
include_temp = bool ( parsed . get ( " all " , False ) )
if has_downstream and not include_temp and not store_override :
include_temp = True
2025-12-11 12:47:30 -08:00
# Normalize input to list
results = normalize_result_input ( result )
# Filter by temp status (unless --all is set)
if not include_temp :
results = filter_results_by_temp ( results , include_temp = False )
if not results :
2025-12-29 17:05:03 -08:00
log (
" No valid files to tag (all results were temporary; use --all to include temporary files) " ,
file = sys . stderr ,
)
2025-12-11 12:47:30 -08:00
return 1
2025-12-11 23:21:45 -08:00
# Get tag from arguments (or fallback to pipeline payload)
raw_tag = parsed . get ( " tag " , [ ] )
if isinstance ( raw_tag , str ) :
raw_tag = [ raw_tag ]
2025-12-11 12:47:30 -08:00
2025-12-27 06:05:07 -08:00
# Fallback: if no tag provided explicitly, try to pull from first result payload.
# IMPORTANT: when -extract is used, users typically want *only* extracted tags,
# not "re-add whatever tags are already in the payload".
if not raw_tag and results and not extract_template :
2025-12-11 12:47:30 -08:00
first = results [ 0 ]
2025-12-11 23:21:45 -08:00
payload_tag = None
2025-12-29 17:05:03 -08:00
2025-12-11 12:47:30 -08:00
# Try multiple tag lookup strategies in order
tag_lookups = [
2025-12-11 23:21:45 -08:00
lambda x : getattr ( x , " tag " , None ) ,
lambda x : x . get ( " tag " ) if isinstance ( x , dict ) else None ,
2025-12-11 12:47:30 -08:00
]
2025-12-29 17:05:03 -08:00
2025-12-11 12:47:30 -08:00
for lookup in tag_lookups :
try :
2025-12-11 23:21:45 -08:00
payload_tag = lookup ( first )
if payload_tag :
2025-12-11 12:47:30 -08:00
break
except ( AttributeError , TypeError , KeyError ) :
continue
2025-12-29 17:05:03 -08:00
2025-12-11 23:21:45 -08:00
if payload_tag :
if isinstance ( payload_tag , str ) :
raw_tag = [ payload_tag ]
elif isinstance ( payload_tag , list ) :
raw_tag = payload_tag
2025-12-11 12:47:30 -08:00
# Handle -list argument (convert to {list} syntax)
list_arg = parsed . get ( " list " )
if list_arg :
2025-12-29 17:05:03 -08:00
for l in list_arg . split ( " , " ) :
2025-12-11 12:47:30 -08:00
l = l . strip ( )
if l :
2025-12-11 23:21:45 -08:00
raw_tag . append ( f " {{ { l } }} " )
2025-12-11 12:47:30 -08:00
2025-12-11 23:21:45 -08:00
# Parse and expand tag
tag_to_add = parse_tag_arguments ( raw_tag )
tag_to_add = expand_tag_groups ( tag_to_add )
2025-12-11 12:47:30 -08:00
2025-12-27 06:05:07 -08:00
if not tag_to_add and not extract_template :
2025-12-29 18:42:02 -08:00
log (
" No tag provided to add (and no -extract template provided) " ,
file = sys . stderr
)
2025-12-27 06:05:07 -08:00
return 1
if extract_template and extract_debug and extract_debug_err :
2025-12-29 18:42:02 -08:00
log (
f " [add_tag] extract template error: { extract_debug_err } " ,
file = sys . stderr
)
2025-12-11 12:47:30 -08:00
return 1
2025-12-20 02:12:45 -08:00
# Get other flags
hash_override = normalize_hash ( query_hash ) if query_hash else None
2025-12-11 12:47:30 -08:00
duplicate_arg = parsed . get ( " duplicate " )
2025-12-11 23:21:45 -08:00
# tag ARE provided - apply them to each store-backed result
total_added = 0
2025-12-11 12:47:30 -08:00
total_modified = 0
2025-12-11 23:21:45 -08:00
2025-12-13 00:18:30 -08:00
store_registry = Store ( config )
2025-12-27 06:05:07 -08:00
extract_matched_items = 0
extract_no_match_items = 0
2025-12-11 12:47:30 -08:00
for res in results :
2025-12-11 23:21:45 -08:00
store_name : Optional [ str ]
raw_hash : Optional [ str ]
raw_path : Optional [ str ]
if isinstance ( res , models . PipeObject ) :
store_name = store_override or res . store
raw_hash = res . hash
raw_path = res . path
elif isinstance ( res , dict ) :
store_name = store_override or res . get ( " store " )
raw_hash = res . get ( " hash " )
raw_path = res . get ( " path " )
else :
2025-12-11 19:04:02 -08:00
ctx . emit ( res )
continue
2025-12-11 23:21:45 -08:00
if not store_name :
2025-12-13 00:18:30 -08:00
store_name = None
# If the item isn't in a configured store backend yet (e.g., store=PATH) but has a local file,
# treat add-tag as a pipeline mutation (carry tags forward for add-file) instead of a store write.
if not store_override :
store_name_str = str ( store_name ) if store_name is not None else " "
2025-12-29 17:05:03 -08:00
local_mode_requested = (
2025-12-29 18:42:02 -08:00
( not store_name_str ) or ( store_name_str . upper ( ) == " PATH " )
2025-12-29 17:05:03 -08:00
or ( store_name_str . lower ( ) == " local " )
)
is_known_backend = bool ( store_name_str ) and store_registry . is_available (
store_name_str
)
2025-12-13 00:18:30 -08:00
if local_mode_requested and raw_path :
try :
if Path ( str ( raw_path ) ) . expanduser ( ) . exists ( ) :
existing_tag_list = _extract_item_tags ( res )
2025-12-29 17:05:03 -08:00
existing_lower = {
2025-12-29 18:42:02 -08:00
t . lower ( )
for t in existing_tag_list if isinstance ( t , str )
2025-12-29 17:05:03 -08:00
}
2025-12-13 00:18:30 -08:00
item_tag_to_add = list ( tag_to_add )
2025-12-27 06:05:07 -08:00
if extract_template :
2025-12-29 17:05:03 -08:00
candidates = _get_title_candidates_for_extraction (
2025-12-29 18:42:02 -08:00
res ,
existing_tag_list
2025-12-29 17:05:03 -08:00
)
extracted , matched = _extract_tags_from_title_candidates (
candidates , extract_template
)
2025-12-27 06:05:07 -08:00
if extracted :
extract_matched_items + = 1
if extract_debug :
2025-12-29 17:05:03 -08:00
log (
f " [add_tag] extract matched: { matched !r} -> { extracted } " ,
file = sys . stderr ,
)
2025-12-27 06:05:07 -08:00
for new_tag in extracted :
if new_tag . lower ( ) not in existing_lower :
item_tag_to_add . append ( new_tag )
else :
extract_no_match_items + = 1
if extract_debug :
2025-12-29 17:05:03 -08:00
rx_preview = (
extract_debug_rx . pattern
2025-12-29 18:42:02 -08:00
if extract_debug_rx else " <uncompiled> "
)
cand_preview = " ; " . join (
[ repr ( c ) for c in candidates [ : 3 ] ]
2025-12-29 17:05:03 -08:00
)
log (
f " [add_tag] extract no match for template { extract_template !r} . regex: { rx_preview !r} . candidates: { cand_preview } " ,
file = sys . stderr ,
)
2025-12-27 06:05:07 -08:00
2025-12-29 17:05:03 -08:00
item_tag_to_add = collapse_namespace_tag (
2025-12-29 18:42:02 -08:00
item_tag_to_add ,
" title " ,
prefer = " last "
2025-12-29 17:05:03 -08:00
)
2025-12-13 00:18:30 -08:00
if duplicate_arg :
2025-12-29 17:05:03 -08:00
parts = str ( duplicate_arg ) . split ( " : " )
2025-12-13 00:18:30 -08:00
source_ns = " "
targets : list [ str ] = [ ]
if len ( parts ) > 1 :
source_ns = parts [ 0 ]
2025-12-29 18:42:02 -08:00
targets = [
t . strip ( ) for t in parts [ 1 ] . split ( " , " )
if t . strip ( )
]
2025-12-13 00:18:30 -08:00
else :
2025-12-29 17:05:03 -08:00
parts2 = str ( duplicate_arg ) . split ( " , " )
2025-12-13 00:18:30 -08:00
if len ( parts2 ) > 1 :
source_ns = parts2 [ 0 ]
2025-12-29 18:42:02 -08:00
targets = [
t . strip ( ) for t in parts2 [ 1 : ] if t . strip ( )
]
2025-12-13 00:18:30 -08:00
if source_ns and targets :
source_prefix = source_ns . lower ( ) + " : "
for t in existing_tag_list :
if not t . lower ( ) . startswith ( source_prefix ) :
continue
value = t . split ( " : " , 1 ) [ 1 ]
for target_ns in targets :
new_tag = f " { target_ns } : { value } "
if new_tag . lower ( ) not in existing_lower :
item_tag_to_add . append ( new_tag )
removed_namespace_tag : list [ str ] = [ ]
for new_tag in item_tag_to_add :
if not isinstance ( new_tag , str ) or " : " not in new_tag :
continue
ns = new_tag . split ( " : " , 1 ) [ 0 ] . strip ( )
if not ns :
continue
ns_prefix = ns . lower ( ) + " : "
for t in existing_tag_list :
2025-12-29 18:42:02 -08:00
if ( t . lower ( ) . startswith ( ns_prefix )
and t . lower ( ) != new_tag . lower ( ) ) :
2025-12-13 00:18:30 -08:00
removed_namespace_tag . append ( t )
2025-12-29 18:42:02 -08:00
removed_namespace_tag = sorted (
{ t
for t in removed_namespace_tag }
)
2025-12-13 00:18:30 -08:00
actual_tag_to_add = [
2025-12-29 18:42:02 -08:00
t for t in item_tag_to_add if isinstance ( t , str )
and t . lower ( ) not in existing_lower
2025-12-13 00:18:30 -08:00
]
2025-12-29 17:05:03 -08:00
updated_tag_list = [
2025-12-29 18:42:02 -08:00
t for t in existing_tag_list
if t not in removed_namespace_tag
2025-12-29 17:05:03 -08:00
]
2025-12-13 00:18:30 -08:00
updated_tag_list . extend ( actual_tag_to_add )
_set_item_tags ( res , updated_tag_list )
final_title = _extract_title_tag ( updated_tag_list )
_apply_title_to_result ( res , final_title )
total_added + = len ( actual_tag_to_add )
2025-12-29 17:05:03 -08:00
total_modified + = (
1 if ( removed_namespace_tag or actual_tag_to_add ) else 0
)
2025-12-13 00:18:30 -08:00
ctx . emit ( res )
continue
except Exception :
pass
if local_mode_requested :
2025-12-29 17:05:03 -08:00
log (
" [add_tag] Error: Missing usable local path for tagging (or provide -store) " ,
file = sys . stderr ,
)
2025-12-13 00:18:30 -08:00
return 1
if store_name_str and not is_known_backend :
2025-12-29 17:05:03 -08:00
log (
f " [add_tag] Error: Unknown store ' { store_name_str } ' . Available: { store_registry . list_backends ( ) } " ,
file = sys . stderr ,
)
2025-12-13 00:18:30 -08:00
return 1
2025-12-11 23:21:45 -08:00
2025-12-29 17:05:03 -08:00
resolved_hash = (
2025-12-29 18:42:02 -08:00
normalize_hash ( hash_override )
if hash_override else normalize_hash ( raw_hash )
2025-12-29 17:05:03 -08:00
)
2025-12-11 23:21:45 -08:00
if not resolved_hash and raw_path :
try :
p = Path ( str ( raw_path ) )
stem = p . stem
2025-12-29 18:42:02 -08:00
if len ( stem ) == 64 and all ( c in " 0123456789abcdef "
for c in stem . lower ( ) ) :
2025-12-11 23:21:45 -08:00
resolved_hash = stem . lower ( )
elif p . exists ( ) and p . is_file ( ) :
resolved_hash = sha256_file ( p )
except Exception :
resolved_hash = None
if not resolved_hash :
2025-12-29 17:05:03 -08:00
log (
" [add_tag] Warning: Item missing usable hash (and could not derive from path); skipping " ,
file = sys . stderr ,
)
2025-12-11 12:47:30 -08:00
ctx . emit ( res )
continue
2025-12-11 23:21:45 -08:00
try :
2025-12-13 00:18:30 -08:00
backend = store_registry [ str ( store_name ) ]
2025-12-11 23:21:45 -08:00
except Exception as exc :
2025-12-29 18:42:02 -08:00
log (
f " [add_tag] Error: Unknown store ' { store_name } ' : { exc } " ,
file = sys . stderr
)
2025-12-11 23:21:45 -08:00
return 1
try :
existing_tag , _src = backend . get_tag ( resolved_hash , config = config )
except Exception :
existing_tag = [ ]
existing_tag_list = [ t for t in ( existing_tag or [ ] ) if isinstance ( t , str ) ]
2025-12-29 18:42:02 -08:00
existing_lower = { t . lower ( )
for t in existing_tag_list }
2025-12-11 23:21:45 -08:00
original_title = _extract_title_tag ( existing_tag_list )
# Per-item tag list (do not mutate shared list)
item_tag_to_add = list ( tag_to_add )
2025-12-27 06:05:07 -08:00
if extract_template :
2025-12-29 18:42:02 -08:00
candidates2 = _get_title_candidates_for_extraction (
res ,
existing_tag_list
)
2025-12-29 17:05:03 -08:00
extracted2 , matched2 = _extract_tags_from_title_candidates (
candidates2 , extract_template
)
2025-12-27 06:05:07 -08:00
if extracted2 :
extract_matched_items + = 1
if extract_debug :
2025-12-29 17:05:03 -08:00
log (
f " [add_tag] extract matched: { matched2 !r} -> { extracted2 } " ,
file = sys . stderr ,
)
2025-12-27 06:05:07 -08:00
for new_tag in extracted2 :
if new_tag . lower ( ) not in existing_lower :
item_tag_to_add . append ( new_tag )
else :
extract_no_match_items + = 1
if extract_debug :
2025-12-29 17:05:03 -08:00
rx_preview2 = (
2025-12-29 18:42:02 -08:00
extract_debug_rx . pattern
if extract_debug_rx else " <uncompiled> "
2025-12-29 17:05:03 -08:00
)
2025-12-27 06:05:07 -08:00
cand_preview2 = " ; " . join ( [ repr ( c ) for c in candidates2 [ : 3 ] ] )
2025-12-29 17:05:03 -08:00
log (
f " [add_tag] extract no match for template { extract_template !r} . regex: { rx_preview2 !r} . candidates: { cand_preview2 } " ,
file = sys . stderr ,
)
2025-12-27 06:05:07 -08:00
2025-12-29 18:42:02 -08:00
item_tag_to_add = collapse_namespace_tag (
item_tag_to_add ,
" title " ,
prefer = " last "
)
2025-12-11 23:21:45 -08:00
# Handle -duplicate logic (copy existing tag to new namespaces)
2025-12-11 12:47:30 -08:00
if duplicate_arg :
2025-12-29 17:05:03 -08:00
parts = str ( duplicate_arg ) . split ( " : " )
2025-12-11 12:47:30 -08:00
source_ns = " "
2025-12-11 23:21:45 -08:00
targets : list [ str ] = [ ]
2025-12-11 12:47:30 -08:00
if len ( parts ) > 1 :
source_ns = parts [ 0 ]
2025-12-29 17:05:03 -08:00
targets = [ t . strip ( ) for t in parts [ 1 ] . split ( " , " ) if t . strip ( ) ]
2025-12-11 12:47:30 -08:00
else :
2025-12-29 17:05:03 -08:00
parts2 = str ( duplicate_arg ) . split ( " , " )
2025-12-11 23:21:45 -08:00
if len ( parts2 ) > 1 :
source_ns = parts2 [ 0 ]
targets = [ t . strip ( ) for t in parts2 [ 1 : ] if t . strip ( ) ]
2025-12-11 12:47:30 -08:00
if source_ns and targets :
2025-12-11 23:21:45 -08:00
source_prefix = source_ns . lower ( ) + " : "
for t in existing_tag_list :
if not t . lower ( ) . startswith ( source_prefix ) :
continue
value = t . split ( " : " , 1 ) [ 1 ]
2025-12-11 12:47:30 -08:00
for target_ns in targets :
new_tag = f " { target_ns } : { value } "
2025-12-11 23:21:45 -08:00
if new_tag . lower ( ) not in existing_lower :
item_tag_to_add . append ( new_tag )
2025-12-11 12:47:30 -08:00
2025-12-11 23:21:45 -08:00
changed = False
2025-12-20 23:57:44 -08:00
try :
ok_add = backend . add_tag ( resolved_hash , item_tag_to_add , config = config )
if not ok_add :
log ( " [add_tag] Warning: Store rejected tag update " , file = sys . stderr )
except Exception as exc :
log ( f " [add_tag] Warning: Failed adding tag: { exc } " , file = sys . stderr )
2025-12-11 19:04:02 -08:00
try :
2025-12-11 23:21:45 -08:00
refreshed_tag , _src2 = backend . get_tag ( resolved_hash , config = config )
2025-12-29 18:42:02 -08:00
refreshed_list = [
t for t in ( refreshed_tag or [ ] ) if isinstance ( t , str )
]
2025-12-11 23:21:45 -08:00
except Exception :
refreshed_list = existing_tag_list
2025-12-11 19:04:02 -08:00
2025-12-20 23:57:44 -08:00
# Decide whether anything actually changed (case-sensitive so title casing updates count).
if set ( refreshed_list ) != set ( existing_tag_list ) :
changed = True
2025-12-29 18:42:02 -08:00
before_lower = { t . lower ( )
for t in existing_tag_list }
after_lower = { t . lower ( )
for t in refreshed_list }
2025-12-20 23:57:44 -08:00
total_added + = len ( after_lower - before_lower )
total_modified + = 1
2025-12-11 23:21:45 -08:00
# Update the result's tag using canonical field
if isinstance ( res , models . PipeObject ) :
res . tag = refreshed_list
elif isinstance ( res , dict ) :
res [ " tag " ] = refreshed_list
final_title = _extract_title_tag ( refreshed_list )
_apply_title_to_result ( res , final_title )
2025-12-11 19:04:02 -08:00
2025-12-20 23:57:44 -08:00
if final_title and ( not original_title or final_title != original_title ) :
2025-12-29 18:42:02 -08:00
_refresh_result_table_title (
final_title ,
resolved_hash ,
str ( store_name ) ,
raw_path
)
2025-12-11 23:21:45 -08:00
if changed :
_refresh_tag_view ( res , resolved_hash , str ( store_name ) , raw_path , config )
2025-12-11 12:47:30 -08:00
ctx . emit ( res )
2025-12-11 23:21:45 -08:00
log (
f " [add_tag] Added { total_added } new tag(s) across { len ( results ) } item(s); modified { total_modified } item(s) " ,
file = sys . stderr ,
)
2025-12-27 06:05:07 -08:00
if extract_template and extract_matched_items == 0 :
2025-12-29 17:05:03 -08:00
log (
f " [add_tag] extract: no matches for template ' { extract_template } ' across { len ( results ) } item(s) " ,
file = sys . stderr ,
)
2025-12-27 06:05:07 -08:00
elif extract_template and extract_no_match_items > 0 and extract_debug :
2025-12-29 17:05:03 -08:00
log (
f " [add_tag] extract: matched { extract_matched_items } , no-match { extract_no_match_items } " ,
file = sys . stderr ,
)
2025-12-27 06:05:07 -08:00
2025-12-11 12:47:30 -08:00
return 0
2025-12-29 17:05:03 -08:00
CMDLET = Add_Tag ( )