""" """ from __future__ import annotations import json import sys from collections.abc import Iterable as IterableABC from SYS.logger import log from pathlib import Path from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Set from dataclasses import dataclass, field import models @dataclass class CmdletArg: """Represents a single cmdlet argument with optional enum choices.""" name: str """Argument name, e.g., '-path' or 'location'""" type: str = "string" """Argument type: 'string', 'int', 'flag', 'enum', etc.""" required: bool = False """Whether this argument is required""" description: str = "" """Human-readable description of the argument""" choices: List[str] = field(default_factory=list) """Optional list of valid choices for enum/autocomplete, e.g., ['hydrus', 'local', '0x0.st']""" alias: str = "" """Optional alias for the argument name, e.g., 'loc' for 'location'""" handler: Optional[Any] = None """Optional handler function/callable for processing this argument's value""" variadic: bool = False """Whether this argument accepts multiple values (consumes remaining positional args)""" usage: str = "" """dsf""" def resolve(self, value: Any) -> Any: """Resolve/process the argument value using the handler if available. Args: value: The raw argument value to process Returns: Processed value from handler, or original value if no handler Example: # For STORAGE arg with a handler storage_path = SharedArgs.STORAGE.resolve('local') # Returns Path.home() / "Videos" """ if self.handler is not None and callable(self.handler): return self.handler(value) return value def to_flags(self) -> tuple[str, ...]: """Generate all flag variants (short and long form) for this argument. Returns a tuple of all valid flag forms for this argument, including: - Long form with double dash: --name - Single dash multi-char form: -name (for convenience) - Short form with single dash: -alias (if alias exists) For flags, also generates negation forms: - --no-name, -name (negation of multi-char form) - --no-name, -nalias (negation with alias) Returns: Tuple of flag strings, e.g., ('--archive', '-archive', '-arch') or for flags: ('--archive', '-archive', '-arch', '--no-archive', '-narch') Example: archive_flags = SharedArgs.ARCHIVE.to_flags() # Returns: ('--archive', '-archive', '-arch', '--no-archive', '-narch') storage_flags = SharedArgs.STORAGE.to_flags() # Returns: ('--storage', '-storage', '-s') """ flags = [f'--{self.name}', f'-{self.name}'] # Both double-dash and single-dash variants # Add short form if alias exists if self.alias: flags.append(f'-{self.alias}') # Add negation forms for flag type if self.type == 'flag': flags.append(f'--no-{self.name}') flags.append(f'-no{self.name}') # Single-dash negation variant if self.alias: flags.append(f'-n{self.alias}') return tuple(flags) # ============================================================================ # SHARED ARGUMENTS - Reusable argument definitions across cmdlet # ============================================================================ class SharedArgs: """Registry of shared CmdletArg definitions used across multiple cmdlet. This class provides a centralized location for common arguments so they're defined once and used consistently everywhere. Reduces duplication and ensures all cmdlet handle the same arguments identically. Example: CMDLET = Cmdlet( name="my-cmdlet", summary="Does something", usage="my-cmdlet", args=[ SharedArgs.HASH, # Use predefined shared arg SharedArgs.LOCATION, # Use another shared arg CmdletArg(...), # Mix with custom args ] ) """ # File/Hash arguments HASH = CmdletArg( name="hash", type="string", description="File hash (SHA256, 64-char hex string)", ) STORE = CmdletArg( name="store", type="enum", choices=[], # Dynamically populated via get_store_choices() description="Selects store", ) PATH = CmdletArg( name="path", type="string", choices=[], # Dynamically populated via get_store_choices() description="Selects store", ) URL = CmdletArg( name="url", type="string", description="http parser", ) @staticmethod def get_store_choices(config: Optional[Dict[str, Any]] = None) -> List[str]: """Get list of available store backend names. This method dynamically discovers all configured storage backends instead of using a static list. Should be called when building autocomplete choices or validating store names. Args: config: Optional config dict. If not provided, will try to load from config module. Returns: List of backend names (e.g., ['default', 'test', 'home', 'work']) Example: SharedArgs.STORE.choices = SharedArgs.get_store_choices(config) """ try: from Store import Store # If no config provided, try to load it if config is None: try: from config import load_config config = load_config() except Exception: return [] store = Store(config) return store.list_backends() except Exception: # Fallback to empty list if FileStorage isn't available return [] LOCATION = CmdletArg( "location", type="enum", choices=["hydrus", "0x0", "local"], required=True, description="Destination location" ) DELETE_FLAG = CmdletArg( "delete", type="flag", description="Delete the file and its .tag after successful operation." ) # Metadata arguments ARTIST = CmdletArg( "artist", type="string", description="Filter by artist name (case-insensitive, partial match)." ) ALBUM = CmdletArg( "album", type="string", description="Filter by album name (case-insensitive, partial match)." ) TRACK = CmdletArg( "track", type="string", description="Filter by track title (case-insensitive, partial match)." ) # Library/Search arguments LIBRARY = CmdletArg( "library", type="string", choices=["hydrus", "local", "soulseek", "libgen", "ftp"], description="Search library or source location." ) TIMEOUT = CmdletArg( "timeout", type="integer", description="Search or operation timeout in seconds." ) LIMIT = CmdletArg( "limit", type="integer", description="Maximum number of results to return." ) # Path/File arguments PATH = CmdletArg( "path", type="string", description="File or directory path." ) OUTPUT = CmdletArg( "output", type="string", description="Output file path." ) # Generic arguments QUERY = CmdletArg( "query", type="string", description="Search query string." ) REASON = CmdletArg( "reason", type="string", description="Reason or explanation for the operation." ) ARCHIVE = CmdletArg( "archive", type="flag", description="Archive the URL to Wayback Machine, Archive.today, and Archive.ph (requires URL argument in cmdlet).", alias="arch" ) @staticmethod def resolve_storage(storage_value: Optional[str], default: Optional[Path] = None) -> Path: """Resolve a storage location name to a filesystem Path. Maps storage identifiers (hydrus, local, ftp) to their actual filesystem paths. This is the single source of truth for storage location resolution. Note: 0x0.st is now accessed via file providers (-provider 0x0), not storage. Args: storage_value: One of 'hydrus', 'local', 'ftp', or None default: Path to return if storage_value is None (defaults to Videos) Returns: Resolved Path object for the storage location Raises: ValueError: If storage_value is not a recognized storage type Example: # In a cmdlet: storage_path = SharedArgs.resolve_storage(parsed.storage) # With defaults: path = SharedArgs.resolve_storage(None) # Returns home/Videos path = SharedArgs.resolve_storage('local') # Returns home/Videos path = SharedArgs.resolve_storage('hydrus') # Returns home/.hydrus/client_files """ storage_map = { 'local': Path.home() / "Videos", 'hydrus': Path.home() / ".hydrus" / "client_files", 'ftp': Path.home() / "FTP", 'matrix': Path.home() / "Matrix", # Placeholder, not used for upload path } if storage_value is None: return default or (Path.home() / "Videos") storage_lower = storage_value.lower() if storage_lower not in storage_map: raise ValueError( f"Unknown storage location '{storage_value}'. " f"Must be one of: {', '.join(storage_map.keys())}" ) return storage_map[storage_lower] @classmethod def get(cls, name: str) -> Optional[CmdletArg]: """Get a shared argument by name. Args: name: Uppercase name like 'HASH', 'LOCATION', etc. Returns: CmdletArg if found, None otherwise Example: arg = SharedArgs.get('HASH') # Returns SharedArgs.HASH """ try: return getattr(cls, name.upper()) except AttributeError: return None @dataclass class Cmdlet: """Represents a cmdlet with metadata and arguments. Example: cmd = Cmdlet( name="add-file", summary="Upload a media file", usage="add-file ", aliases=["add-file-alias"], args=[ CmdletArg("location", required=True, description="Destination location"), CmdletArg("-delete", type="flag", description="Delete after upload"), ], details=[ "- This is a detail line", "- Another detail", ] ) # Access properties log(cmd.name) # "add-file" log(cmd.summary) # "Upload a media file" log(cmd.args[0].name) # "location" """ name: str """""" summary: str """One-line summary of the cmdlet""" usage: str """Usage string, e.g., 'add-file [-delete]'""" alias: List[str] = field(default_factory=list) """List of aliases for this cmdlet, e.g., ['add', 'add-f']""" arg: List[CmdletArg] = field(default_factory=list) """List of arguments accepted by this cmdlet""" detail: List[str] = field(default_factory=list) """Detailed explanation lines (for help text)""" # Execution function: func(result, args, config) -> int exec: Optional[Callable[[Any, Sequence[str], Dict[str, Any]], int]] = field(default=None) def _collect_names(self) -> List[str]: """Collect primary name plus aliases, de-duplicated and normalized.""" names: List[str] = [] if self.name: names.append(self.name) for alias in (self.alias or []): if alias: names.append(alias) for alias in (getattr(self, "aliases", None) or []): if alias: names.append(alias) seen: Set[str] = set() deduped: List[str] = [] for name in names: key = name.replace("_", "-").lower() if key in seen: continue seen.add(key) deduped.append(name) return deduped def register(self) -> "Cmdlet": """Register this cmdlet's exec under its name and aliases.""" if not callable(self.exec): return self try: from . import register_callable as _register_callable # Local import to avoid circular import cost except Exception: return self names = self._collect_names() if not names: return self _register_callable(names, self.exec) return self def get_flags(self, arg_name: str) -> set[str]: """Generate -name and --name flag variants for an argument. Args: arg_name: The argument name (e.g., 'library', 'tag', 'size') Returns: Set containing both single-dash and double-dash variants (e.g., {'-library', '--library'}) Example: if low in cmdlet.get_flags('library'): # handle library flag """ return {f"-{arg_name}", f"--{arg_name}"} def build_flag_registry(self) -> Dict[str, set[str]]: """Build a registry of all flag variants for this cmdlet's arguments. Automatically generates all -name and --name variants for each argument. Useful for parsing command-line arguments without hardcoding flags. Returns: Dict mapping argument names to their flag sets (e.g., {'library': {'-library', '--library'}, 'tag': {'-tag', '--tag'}}) Example: flags = cmdlet.build_flag_registry() if low in flags.get('library', set()): # handle library elif low in flags.get('tag', set()): # handle tag """ return {arg.name: self.get_flags(arg.name) for arg in self.arg} # Tag groups cache (loaded from JSON config file) _TAG_GROUPS_CACHE: Optional[Dict[str, List[str]]] = None _TAG_GROUPS_MTIME: Optional[float] = None # Path to tag groups configuration (set by caller or lazily discovered) TAG_GROUPS_PATH: Optional[Path] = None def set_tag_groups_path(path: Path) -> None: """Set the path to the tag groups JSON file.""" global TAG_GROUPS_PATH TAG_GROUPS_PATH = path def parse_cmdlet_args(args: Sequence[str], cmdlet_spec: Dict[str, Any] | Cmdlet) -> Dict[str, Any]: """Parse command-line arguments based on cmdlet specification. Extracts argument values from command-line tokens using the argument names and types defined in the cmdlet metadata. Automatically supports single-dash and double-dash variants of flag names. Arguments without dashes in definition are treated as positional arguments. Args: args: Command-line arguments (e.g., ["-path", "/home/file.txt", "-foo", "bar"]) cmdlet_spec: Cmdlet metadata dict with "args" key containing list of arg specs, or a Cmdlet object. Each arg spec should have at least "name" key. Argument names can be defined with or without prefixes. Returns: Dict mapping canonical arg names to their parsed values. If an arg is not provided, it will not be in the dict. Lookup will normalize prefixes. Example: cmdlet = { "args": [ {"name": "path", "type": "string"}, # Positional - matches bare value or -path/--path {"name": "count", "type": "int"} # Positional - matches bare value or -count/--count ] } result = parse_cmdlet_args(["value1", "-count", "5"], cmdlet) # result = {"path": "value1", "count": "5"} """ result: Dict[str, Any] = {} # Only accept Cmdlet objects if not isinstance(cmdlet_spec, Cmdlet): raise TypeError(f"Expected Cmdlet, got {type(cmdlet_spec).__name__}") # Build arg specs from cmdlet arg_specs: List[CmdletArg] = cmdlet_spec.arg positional_args: List[CmdletArg] = [] # args without prefix in definition flagged_args: List[CmdletArg] = [] # args with prefix in definition arg_spec_map: Dict[str, str] = {} # prefix variant -> canonical name (without prefix) for spec in arg_specs: name = spec.name if not name: continue name_str = str(name) canonical_name = name_str.lstrip("-") # Determine if this is positional (no dashes in original definition) if "-" not in name_str: positional_args.append(spec) else: flagged_args.append(spec) # Register all prefix variants for flagged lookup arg_spec_map[canonical_name.lower()] = canonical_name # bare name arg_spec_map[f"-{canonical_name}".lower()] = canonical_name # single dash arg_spec_map[f"--{canonical_name}".lower()] = canonical_name # double dash # Parse arguments i = 0 positional_index = 0 # Track which positional arg we're on while i < len(args): token = str(args[i]) token_lower = token.lower() # Check if this token is a known flagged argument if token_lower in arg_spec_map: canonical_name = arg_spec_map[token_lower] spec = next((s for s in arg_specs if str(s.name).lstrip("-").lower() == canonical_name.lower()), None) # Check if it's a flag type (which doesn't consume next value, just marks presence) is_flag = spec and spec.type == "flag" if is_flag: # For flags, just mark presence without consuming next token result[canonical_name] = True i += 1 else: # For non-flags, consume next token as the value if i + 1 < len(args) and not str(args[i + 1]).startswith("-"): value = args[i + 1] # Check if variadic is_variadic = spec and spec.variadic if is_variadic: if canonical_name not in result: result[canonical_name] = [] elif not isinstance(result[canonical_name], list): result[canonical_name] = [result[canonical_name]] result[canonical_name].append(value) else: result[canonical_name] = value i += 2 else: i += 1 # Otherwise treat as positional if we have positional args remaining elif positional_index < len(positional_args): positional_spec = positional_args[positional_index] canonical_name = str(positional_spec.name).lstrip("-") is_variadic = positional_spec.variadic if is_variadic: # For variadic args, append to a list if canonical_name not in result: result[canonical_name] = [] elif not isinstance(result[canonical_name], list): # Should not happen if logic is correct, but safety check result[canonical_name] = [result[canonical_name]] result[canonical_name].append(token) # Do not increment positional_index so subsequent tokens also match this arg # Note: Variadic args should typically be the last positional argument i += 1 else: result[canonical_name] = token positional_index += 1 i += 1 else: # Unknown token, skip it i += 1 return result def normalize_hash(hash_hex: Optional[str]) -> Optional[str]: """Normalize a hash string to lowercase, or return None if invalid. Args: hash_hex: String that should be a hex hash Returns: Lowercase hash string, or None if input is not a string or is empty """ if not isinstance(hash_hex, str): return None text = hash_hex.strip().lower() if not text: return None if len(text) != 64: return None if not all(ch in "0123456789abcdef" for ch in text): return None return text def get_hash_for_operation(override_hash: Optional[str], result: Any, field_name: str = "hash") -> Optional[str]: """Get normalized hash from override or result object, consolidating common pattern. Eliminates repeated pattern: normalize_hash(override) if override else normalize_hash(get_field(result, ...)) Args: override_hash: Hash passed as command argument (takes precedence) result: Object containing hash field (fallback) field_name: Name of hash field in result object (default: "hash") Returns: Normalized hash string, or None if neither override nor result provides valid hash """ if override_hash: return normalize_hash(override_hash) hash_value = get_field(result, field_name) or getattr(result, field_name, None) or getattr(result, "hash", None) return normalize_hash(hash_value) def fetch_hydrus_metadata(config: Any, hash_hex: str, **kwargs) -> tuple[Optional[Dict[str, Any]], Optional[int]]: """Fetch metadata from Hydrus for a given hash, consolidating common fetch pattern. Eliminates repeated boilerplate: client initialization, error handling, metadata extraction. Args: config: Configuration object (passed to hydrus_wrapper.get_client) hash_hex: File hash to fetch metadata for **kwargs: Additional arguments to pass to client.fetch_file_metadata() Common: include_service_keys_to_tags, include_notes, include_file_url, include_duration, etc. Returns: Tuple of (metadata_dict, error_code) - metadata_dict: Dict from Hydrus (first item in metadata list) or None if unavailable - error_code: 0 on success, 1 on any error (suitable for returning from cmdlet execute()) """ from API import HydrusNetwork hydrus_wrapper = HydrusNetwork try: client = hydrus_wrapper.get_client(config) except Exception as exc: log(f"Hydrus client unavailable: {exc}") return None, 1 if client is None: log("Hydrus client unavailable") return None, 1 try: payload = client.fetch_file_metadata(hashes=[hash_hex], **kwargs) except Exception as exc: log(f"Hydrus metadata fetch failed: {exc}") return None, 1 items = payload.get("metadata") if isinstance(payload, dict) else None meta = items[0] if (isinstance(items, list) and items and isinstance(items[0], dict)) else None return meta, 0 def get_field(obj: Any, field: str, default: Optional[Any] = None) -> Any: """Extract a field from either a dict or object with fallback default. Handles both dict.get(field) and getattr(obj, field) access patterns. Also handles lists by accessing the first element. For PipeObjects, checks the extra field as well. Used throughout cmdlet to uniformly access fields from mixed types. Args: obj: Dict, object, or list to extract from field: Field name to retrieve default: Value to return if field not found (default: None) Returns: Field value if found, otherwise the default value Examples: get_field(result, "hash") # From dict or object get_field(result, "table", "unknown") # With default """ # Handle lists by accessing the first element if isinstance(obj, list): if not obj: return default obj = obj[0] if isinstance(obj, dict): return obj.get(field, default) else: # Try direct attribute access first value = getattr(obj, field, None) if value is not None: return value # For PipeObjects, also check the extra field extra_val = getattr(obj, 'extra', None) if isinstance(extra_val, dict): return extra_val.get(field, default) return default def should_show_help(args: Sequence[str]) -> bool: """Check if help flag was passed in arguments. Consolidates repeated pattern of checking for help flags across cmdlet. Args: args: Command arguments to check Returns: True if any help flag is present (-?, /?, --help, -h, help, --cmdlet) Examples: if should_show_help(args): log(json.dumps(CMDLET, ensure_ascii=False, indent=2)) return 0 """ try: return any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in args) except Exception: return False def looks_like_hash(candidate: Optional[str]) -> bool: """Check if a string looks like a SHA256 hash (64 hex chars). Args: candidate: String to test Returns: True if the string is 64 lowercase hex characters """ if not isinstance(candidate, str): return False text = candidate.strip().lower() return len(text) == 64 and all(ch in "0123456789abcdef" for ch in text) def pipeline_item_local_path(item: Any) -> Optional[str]: """Extract local file path from a pipeline item. Supports both dataclass objects with .path attribute and dicts. Returns None for HTTP/HTTPS url. Args: item: Pipeline item (PipelineItem dataclass, dict, or other) Returns: Local file path string, or None if item is not a local file """ path_value: Optional[str] = None if hasattr(item, "path"): path_value = getattr(item, "path", None) elif isinstance(item, dict): raw = item.get("path") or item.get("url") path_value = str(raw) if raw is not None else None if not isinstance(path_value, str): return None text = path_value.strip() if not text: return None if text.lower().startswith(("http://", "https://")): return None return text def collect_relationship_labels(payload: Any, label_stack: List[str] | None = None, mapping: Dict[str, str] | None = None) -> Dict[str, str]: """Recursively extract hash-to-label mappings from nested relationship data. Walks through nested dicts/lists looking for sha256-like strings (64 hex chars) and builds a mapping from hash to its path in the structure. Example: data = { "duplicates": [ "abc123...", # Will be mapped to "duplicates" {"type": "related", "items": ["def456..."]} # Will be mapped to "duplicates / type / items" ] } result = collect_relationship_labels(data) # result = {"abc123...": "duplicates", "def456...": "duplicates / type / items"} Args: payload: Nested data structure (dict, list, string, etc.) label_stack: Internal use - tracks path during recursion mapping: Internal use - accumulates hash->label mappings Returns: Dict mapping hash strings to their path labels """ if label_stack is None: label_stack = [] if mapping is None: mapping = {} if isinstance(payload, dict): for key, value in payload.items(): next_stack = label_stack if isinstance(key, str) and key: formatted = key.replace('_', ' ').strip() next_stack = label_stack + [formatted] collect_relationship_labels(value, next_stack, mapping) elif isinstance(payload, (list, tuple, set)): for value in payload: collect_relationship_labels(value, label_stack, mapping) elif isinstance(payload, str) and looks_like_hash(payload): hash_value = payload.lower() if label_stack: label = " / ".join(item for item in label_stack if item) else: label = "related" mapping.setdefault(hash_value, label) return mapping def parse_tag_arguments(arguments: Sequence[str]) -> List[str]: """Parse tag arguments from command line tokens. - Supports comma-separated tags. - Supports pipe namespace shorthand: "artist:A|B|C" -> artist:A, artist:B, artist:C. Args: arguments: Sequence of argument strings Returns: List of normalized tag strings (empty strings filtered out) """ def _expand_pipe_namespace(text: str) -> List[str]: parts = text.split('|') expanded: List[str] = [] last_ns: Optional[str] = None for part in parts: segment = part.strip() if not segment: continue if ':' in segment: ns, val = segment.split(':', 1) ns = ns.strip() val = val.strip() last_ns = ns or last_ns if last_ns and val: expanded.append(f"{last_ns}:{val}") elif ns or val: expanded.append(f"{ns}:{val}".strip(':')) else: if last_ns: expanded.append(f"{last_ns}:{segment}") else: expanded.append(segment) return expanded tags: List[str] = [] for argument in arguments: for token in argument.split(','): text = token.strip() if not text: continue # Expand namespace shorthand with pipes pipe_expanded = _expand_pipe_namespace(text) for entry in pipe_expanded: candidate = entry.strip() if not candidate: continue if ':' in candidate: ns, val = candidate.split(':', 1) ns = ns.strip() val = val.strip() candidate = f"{ns}:{val}" if ns or val else "" if candidate: tags.append(candidate) return tags def fmt_bytes(n: Optional[int]) -> str: """Format bytes as human-readable with 1 decimal place (MB/GB). Args: n: Number of bytes, or None Returns: Formatted string like "1.5 MB" or "2.0 GB", or "unknown" """ if n is None or n < 0: return "unknown" gb = n / (1024.0 * 1024.0 * 1024.0) if gb >= 1.0: return f"{gb:.1f} GB" mb = n / (1024.0 * 1024.0) return f"{mb:.1f} MB" def _normalise_tag_group_entry(value: Any) -> Optional[str]: """Internal: Normalize a single tag group entry.""" if not isinstance(value, str): value = str(value) text = value.strip() return text or None def _load_tag_groups() -> Dict[str, List[str]]: """Load tag group definitions from JSON file with caching.""" global _TAG_GROUPS_CACHE, _TAG_GROUPS_MTIME, TAG_GROUPS_PATH # Auto-discover adjective.json if not set if TAG_GROUPS_PATH is None: # Try to find adjective.json in the script directory or helper subdirectory try: script_dir = Path(__file__).parent.parent # Check root directory candidate = script_dir / "adjective.json" if candidate.exists(): TAG_GROUPS_PATH = candidate else: # Check helper directory candidate = script_dir / "helper" / "adjective.json" if candidate.exists(): TAG_GROUPS_PATH = candidate except Exception: pass if TAG_GROUPS_PATH is None: return {} path = TAG_GROUPS_PATH try: stat_result = path.stat() except FileNotFoundError: _TAG_GROUPS_CACHE = {} _TAG_GROUPS_MTIME = None return {} except OSError as exc: log(f"Failed to read tag groups: {exc}", file=sys.stderr) _TAG_GROUPS_CACHE = {} _TAG_GROUPS_MTIME = None return {} mtime = stat_result.st_mtime if _TAG_GROUPS_CACHE is not None and _TAG_GROUPS_MTIME == mtime: return _TAG_GROUPS_CACHE try: payload = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: log(f"Invalid tag group JSON ({path}): {exc}", file=sys.stderr) _TAG_GROUPS_CACHE = {} _TAG_GROUPS_MTIME = mtime return {} groups: Dict[str, List[str]] = {} if isinstance(payload, dict): for key, value in payload.items(): if not isinstance(key, str): continue name = key.strip().lower() if not name: continue members: List[str] = [] if isinstance(value, list): for entry in value: normalised = _normalise_tag_group_entry(entry) if normalised: members.append(normalised) elif isinstance(value, str): normalised = _normalise_tag_group_entry(value) if normalised: members.extend(token.strip() for token in normalised.split(",") if token.strip()) if members: groups[name] = members _TAG_GROUPS_CACHE = groups _TAG_GROUPS_MTIME = mtime return groups def expand_tag_groups(raw_tags: Iterable[str]) -> List[str]: """Expand tag group references (e.g., {my_group}) into member tags. Tag groups are defined in JSON and can be nested. Groups are referenced with curly braces: {group_name}. Args: raw_tags: Sequence of tag strings, some may reference groups like "{group_name}" Returns: List of expanded tags with group references replaced """ groups = _load_tag_groups() if not groups: return [tag for tag in raw_tags if isinstance(tag, str) and tag.strip()] def _expand(tokens: Iterable[str], seen: Set[str]) -> List[str]: result: List[str] = [] for token in tokens: if not isinstance(token, str): continue candidate = token.strip() if not candidate: continue if candidate.startswith("{") and candidate.endswith("}") and len(candidate) > 2: name = candidate[1:-1].strip().lower() if not name: continue if name in seen: log(f"Tag group recursion detected for {{{name}}}; skipping", file=sys.stderr) continue members = groups.get(name) if not members: log(f"Unknown tag group {{{name}}}", file=sys.stderr) result.append(candidate) continue result.extend(_expand(members, seen | {name})) else: result.append(candidate) return result return _expand(raw_tags, set()) def first_title_tag(source: Optional[Iterable[str]]) -> Optional[str]: """Find the first tag starting with "title:" in a collection. Args: source: Iterable of tag strings Returns: First title: tag found, or None """ if not source: return None for item in source: if not isinstance(item, str): continue candidate = item.strip() if candidate and candidate.lower().startswith("title:"): return candidate return None def apply_preferred_title(tags: List[str], preferred: Optional[str]) -> List[str]: """Replace any title: tags with a preferred title tag. Args: tags: List of tags (may contain multiple "title:" entries) preferred: Preferred title tag to use (full "title: ..." format) Returns: List with old title tags removed and preferred title added (at most once) """ if not preferred: return tags preferred_clean = preferred.strip() if not preferred_clean: return tags preferred_lower = preferred_clean.lower() filtered: List[str] = [] has_preferred = False for tag in tags: candidate = tag.strip() if not candidate: continue if candidate.lower().startswith("title:"): if candidate.lower() == preferred_lower: if not has_preferred: filtered.append(candidate) has_preferred = True continue filtered.append(candidate) if not has_preferred: filtered.append(preferred_clean) return filtered # ============================================================================ # PIPEOBJECT UTILITIES (for chainable cmdlet and multi-action pipelines) # ============================================================================ def create_pipe_object_result( source: str, identifier: str, file_path: str, cmdlet_name: str, title: Optional[str] = None, hash_value: Optional[str] = None, is_temp: bool = False, parent_hash: Optional[str] = None, tag: Optional[List[str]] = None, **extra: Any ) -> Dict[str, Any]: """Create a PipeObject-compatible result dict for pipeline chaining. This is a helper to emit results in the standard format that downstream cmdlet can process (filter, tag, cleanup, etc.). Args: source: Source system (e.g., 'local', 'hydrus', 'download') identifier: Unique ID from source file_path: Path to the file cmdlet_name: Name of the cmdlet that created this (e.g., 'download-data', 'screen-shot') title: Human-readable title hash_value: SHA-256 hash of file (for integrity) is_temp: If True, this is a temporary/intermediate artifact parent_hash: Hash of the parent file in the chain (for provenance) tag: List of tag values to apply **extra: Additional fields Returns: Dict with all PipeObject fields for emission """ result: Dict[str, Any] = { 'source': source, 'id': identifier, 'path': file_path, 'action': f'cmdlet:{cmdlet_name}', # Format: cmdlet:cmdlet_name } if title: result['title'] = title if hash_value: result['hash'] = hash_value if is_temp: result['is_temp'] = True if parent_hash: result['parent_hash'] = parent_hash if tag: result['tag'] = tag # Canonical store field: use source for compatibility try: if source: result['store'] = source except Exception: pass # Add any extra fields result.update(extra) return result def mark_as_temp(pipe_object: Dict[str, Any]) -> Dict[str, Any]: """Mark a PipeObject dict as temporary (intermediate artifact). Args: pipe_object: Result dict from cmdlet emission Returns: Modified dict with is_temp=True """ pipe_object['is_temp'] = True return pipe_object def set_parent_hash(pipe_object: Dict[str, Any], parent_hash: str) -> Dict[str, Any]: """Set the parent_hash for provenance tracking. Args: pipe_object: Result dict parent_hash: Parent file's hash Returns: Modified dict with parent_hash set to the hash """ pipe_object['parent_hash'] = parent_hash return pipe_object def get_pipe_object_path(pipe_object: Any) -> Optional[str]: """Extract file path from PipeObject, dict, or pipeline-friendly object.""" if pipe_object is None: return None for attr in ('path', 'target'): if hasattr(pipe_object, attr): value = getattr(pipe_object, attr) if value: return value if isinstance(pipe_object, dict): for key in ('path', 'target'): value = pipe_object.get(key) if value: return value return None def get_pipe_object_hash(pipe_object: Any) -> Optional[str]: """Extract file hash from PipeObject, dict, or pipeline-friendly object.""" if pipe_object is None: return None for attr in ('hash',): if hasattr(pipe_object, attr): value = getattr(pipe_object, attr) if value: return value if isinstance(pipe_object, dict): for key in ('hash',): value = pipe_object.get(key) if value: return value return None def normalize_result_input(result: Any) -> List[Dict[str, Any]]: """Normalize input result to a list of dicts. Handles: - None -> [] - Dict -> [dict] - List of dicts -> list as-is - PipeObject -> [dict] - List of PipeObjects -> list of dicts Args: result: Result from piped input Returns: List of result dicts (may be empty) """ if result is None: return [] # Single dict if isinstance(result, dict): return [result] # List - convert each item to dict if needed if isinstance(result, list): output = [] for item in result: if isinstance(item, dict): output.append(item) elif hasattr(item, 'to_dict'): output.append(item.to_dict()) else: # Try as-is output.append(item) return output # PipeObject or other object with to_dict if hasattr(result, 'to_dict'): return [result.to_dict()] # Fallback: wrap it if isinstance(result, dict): return [result] return [] def filter_results_by_temp(results: List[Any], include_temp: bool = False) -> List[Any]: """Filter results by temporary status. Args: results: List of result dicts or PipeObjects include_temp: If True, keep temp files; if False, exclude them Returns: Filtered list """ if include_temp: return results filtered = [] for result in results: is_temp = False # Check PipeObject if hasattr(result, 'is_temp'): is_temp = result.is_temp # Check dict elif isinstance(result, dict): is_temp = result.get('is_temp', False) if not is_temp: filtered.append(result) return filtered def merge_sequences(*sources: Optional[Iterable[Any]], case_sensitive: bool = True) -> list[str]: """Merge iterable sources while preserving order and removing duplicates.""" seen: set[str] = set() merged: list[str] = [] for source in sources: if not source: continue if isinstance(source, str) or not isinstance(source, IterableABC): iterable = [source] else: iterable = source for value in iterable: if value is None: continue text = str(value).strip() if not text: continue key = text if case_sensitive else text.lower() if key in seen: continue seen.add(key) merged.append(text) return merged def collapse_namespace_tags(tags: Optional[Iterable[Any]], namespace: str, prefer: str = "last") -> list[str]: """Reduce tags so only one entry for a given namespace remains. Keeps either the first or last occurrence (default last) while preserving overall order for non-matching tags. Useful for ensuring a single title: tag. """ if not tags: return [] ns = str(namespace or "").strip().lower() if not ns: return list(tags) if isinstance(tags, list) else list(tags) prefer_last = str(prefer or "last").lower() != "first" ns_prefix = ns + ":" items = list(tags) if prefer_last: kept: list[str] = [] seen_ns = False for tag in reversed(items): text = str(tag) if text.lower().startswith(ns_prefix): if seen_ns: continue seen_ns = True kept.append(text) kept.reverse() return kept else: kept_ns = False result: list[str] = [] for tag in items: text = str(tag) if text.lower().startswith(ns_prefix): if kept_ns: continue kept_ns = True result.append(text) return result def collapse_namespace_tag(tags: Optional[Iterable[Any]], namespace: str, prefer: str = "last") -> list[str]: """Singular alias for collapse_namespace_tags. Some cmdlet prefer the singular name; keep behavior centralized. """ return collapse_namespace_tags(tags, namespace, prefer=prefer) def extract_tag_from_result(result: Any) -> list[str]: tag: list[str] = [] if isinstance(result, models.PipeObject): tag.extend(result.tag or []) if isinstance(result.extra, dict): extra_tag = result.extra.get('tag') if isinstance(extra_tag, list): tag.extend(extra_tag) elif isinstance(extra_tag, str): tag.append(extra_tag) elif hasattr(result, 'tag'): # Handle objects with tag attribute (e.g. SearchResult) val = getattr(result, 'tag') if isinstance(val, (list, set, tuple)): tag.extend(val) elif isinstance(val, str): tag.append(val) if isinstance(result, dict): raw_tag = result.get('tag') if isinstance(raw_tag, list): tag.extend(raw_tag) elif isinstance(raw_tag, str): tag.append(raw_tag) extra = result.get('extra') if isinstance(extra, dict): extra_tag = extra.get('tag') if isinstance(extra_tag, list): tag.extend(extra_tag) elif isinstance(extra_tag, str): tag.append(extra_tag) return merge_sequences(tag, case_sensitive=True) def extract_title_from_result(result: Any) -> Optional[str]: """Extract the title from a result dict or PipeObject.""" if isinstance(result, models.PipeObject): return result.title elif hasattr(result, 'title'): return getattr(result, 'title') elif isinstance(result, dict): return result.get('title') return None def extract_url_from_result(result: Any) -> list[str]: url: list[str] = [] def _extend(candidate: Any) -> None: if not candidate: return if isinstance(candidate, list): url.extend(candidate) elif isinstance(candidate, str): url.append(candidate) if isinstance(result, models.PipeObject): _extend(result.extra.get('url')) _extend(result.extra.get('url')) # Also check singular url if isinstance(result.metadata, dict): _extend(result.metadata.get('url')) _extend(result.metadata.get('url')) _extend(result.metadata.get('url')) elif hasattr(result, 'url') or hasattr(result, 'url'): # Handle objects with url/url attribute _extend(getattr(result, 'url', None)) _extend(getattr(result, 'url', None)) if isinstance(result, dict): _extend(result.get('url')) _extend(result.get('url')) _extend(result.get('url')) extra = result.get('extra') if isinstance(extra, dict): _extend(extra.get('url')) _extend(extra.get('url')) _extend(extra.get('url')) return merge_sequences(url, case_sensitive=True) def extract_relationships(result: Any) -> Optional[Dict[str, Any]]: if isinstance(result, models.PipeObject): relationships = result.get_relationships() return relationships or None if isinstance(result, dict): relationships = result.get('relationships') if isinstance(relationships, dict) and relationships: return relationships return None def extract_duration(result: Any) -> Optional[float]: duration = None if isinstance(result, models.PipeObject): duration = result.duration elif isinstance(result, dict): duration = result.get('duration') if duration is None: metadata = result.get('metadata') if isinstance(metadata, dict): duration = metadata.get('duration') if duration is None: return None try: return float(duration) except (TypeError, ValueError): return None def coerce_to_pipe_object(value: Any, default_path: Optional[str] = None) -> models.PipeObject: """Normalize any incoming result to a PipeObject for single-source-of-truth state. Uses hash+store canonical pattern. """ # Debug: Print ResultItem details if coming from search_file.py try: from SYS.logger import is_debug_enabled, debug if is_debug_enabled() and hasattr(value, '__class__') and value.__class__.__name__ == 'ResultItem': debug("[ResultItem -> PipeObject conversion]") debug(f" title={getattr(value, 'title', None)}") debug(f" target={getattr(value, 'target', None)}") debug(f" hash={getattr(value, 'hash', None)}") debug(f" media_kind={getattr(value, 'media_kind', None)}") debug(f" tag={getattr(value, 'tag', None)}") debug(f" tag_summary={getattr(value, 'tag_summary', None)}") debug(f" size_bytes={getattr(value, 'size_bytes', None)}") debug(f" duration_seconds={getattr(value, 'duration_seconds', None)}") debug(f" relationships={getattr(value, 'relationships', None)}") debug(f" url={getattr(value, 'url', None)}") debug(f" full_metadata keys={list(getattr(value, 'full_metadata', {}).keys()) if hasattr(value, 'full_metadata') and value.full_metadata else []}") except Exception: pass if isinstance(value, models.PipeObject): return value known_keys = { "hash", "store", "tag", "title", "url", "source_url", "duration", "metadata", "warnings", "path", "relationships", "is_temp", "action", "parent_hash", } # Convert ResultItem to dict to preserve all attributes if hasattr(value, 'to_dict'): value = value.to_dict() if isinstance(value, dict): # Extract hash and store (canonical identifiers) hash_val = value.get("hash") store_val = value.get("store") or "PATH" if not store_val or store_val == "PATH": try: extra_store = value.get("extra", {}).get("store") except Exception: extra_store = None if extra_store: store_val = extra_store # If no hash, try to compute from path or use placeholder if not hash_val: path_val = value.get("path") if path_val: try: from SYS.utils import sha256_file from pathlib import Path hash_val = sha256_file(Path(path_val)) except Exception: hash_val = "unknown" else: hash_val = "unknown" # Extract title from filename if not provided title_val = value.get("title") if not title_val: path_val = value.get("path") if path_val: try: from pathlib import Path title_val = Path(path_val).stem except Exception: pass extra = {k: v for k, v in value.items() if k not in known_keys} # Extract URL: prefer direct url field, then url list from metadata import normalize_urls url_list = normalize_urls(value.get("url")) url_val = url_list[0] if url_list else None if len(url_list) > 1: extra["url"] = url_list # Extract relationships rels = value.get("relationships") or {} # Canonical tag: accept list or single string tag_val: list[str] = [] if "tag" in value: raw_tag = value["tag"] if isinstance(raw_tag, list): tag_val = [str(t) for t in raw_tag if t is not None] elif isinstance(raw_tag, str): tag_val = [raw_tag] # Consolidate path: prefer explicit path key, but NOT target if it's a URL path_val = value.get("path") # Only use target as path if it's not a URL (url should stay in url field) if not path_val and "target" in value: target = value["target"] if target and not (isinstance(target, str) and (target.startswith("http://") or target.startswith("https://"))): path_val = target # If the path value is actually a URL, move it to url_val and clear path_val try: if isinstance(path_val, str) and (path_val.startswith("http://") or path_val.startswith("https://")): # Prefer existing url_val if present, otherwise move path_val into url_val if not url_val: url_val = path_val path_val = None except Exception: pass # Extract media_kind if available if "media_kind" in value: extra["media_kind"] = value["media_kind"] pipe_obj = models.PipeObject( hash=hash_val, store=store_val, tag=tag_val, title=title_val, url=url_val, source_url=value.get("source_url"), duration=value.get("duration") or value.get("duration_seconds"), metadata=value.get("metadata") or value.get("full_metadata") or {}, warnings=list(value.get("warnings") or []), path=path_val, relationships=rels, is_temp=bool(value.get("is_temp", False)), action=value.get("action"), parent_hash=value.get("parent_hash"), extra=extra, ) # Debug: Print formatted table pipe_obj.debug_table() return pipe_obj # Fallback: build from path argument or bare value hash_val = "unknown" path_val = default_path or getattr(value, "path", None) title_val = None if path_val and path_val != "unknown": try: from SYS.utils import sha256_file from pathlib import Path path_obj = Path(path_val) hash_val = sha256_file(path_obj) # Extract title from filename (without extension) title_val = path_obj.stem except Exception: pass # When coming from path argument, store should be "PATH" (file path, not a backend) store_val = "PATH" pipe_obj = models.PipeObject( hash=hash_val, store=store_val, path=str(path_val) if path_val and path_val != "unknown" else None, title=title_val, tag=[], extra={}, ) # Debug: Print formatted table pipe_obj.debug_table() return pipe_obj def register_url_with_local_library(pipe_obj: models.PipeObject, config: Dict[str, Any]) -> bool: """Register url with a file in the local library database. This is called automatically by download cmdlet to ensure url are persisted without requiring a separate add-url step in the pipeline. Args: pipe_obj: PipeObject with path and url config: Config dict containing local library path Returns: True if url were registered, False otherwise """ try: from config import get_local_storage_path from API.folder import API_folder_store file_path = get_field(pipe_obj, "path") url_field = get_field(pipe_obj, "url", []) urls: List[str] = [] if isinstance(url_field, str): urls = [u.strip() for u in url_field.split(",") if u.strip()] elif isinstance(url_field, (list, tuple)): urls = [u for u in url_field if isinstance(u, str) and u.strip()] if not file_path or not urls: return False path_obj = Path(file_path) if not path_obj.exists(): return False storage_path = get_local_storage_path(config) if not storage_path: return False with API_folder_store(storage_path) as db: file_hash = db.get_file_hash(path_obj) if not file_hash: return False metadata = db.get_metadata(file_hash) or {} existing_url = metadata.get("url") or [] # Add any new url changed = False for u in urls: if u not in existing_url: existing_url.append(u) changed = True if changed: metadata["url"] = existing_url db.save_metadata(path_obj, metadata) return True return True # url already existed except Exception: return False