Files
Medios-Macina/cmdlets/_shared.py

1230 lines
35 KiB
Python
Raw Permalink Normal View History

2025-11-25 20:09:33 -08:00
"""Shared utilities for cmdlets and funacts.
This module provides common utility functions for working with hashes, tags,
relationship data, and other frequently-needed operations.
"""
from __future__ import annotations
import json
import sys
import inspect
from collections.abc import Iterable as IterableABC
from helper.logger import log
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set
from dataclasses import dataclass, field
import models
@dataclass
class CmdletArg:
"""Represents a single cmdlet argument with optional enum choices."""
name: str
"""Argument name, e.g., '-path' or 'location'"""
type: str = "string"
"""Argument type: 'string', 'int', 'flag', 'enum', etc."""
required: bool = False
"""Whether this argument is required"""
description: str = ""
"""Human-readable description of the argument"""
choices: List[str] = field(default_factory=list)
"""Optional list of valid choices for enum/autocomplete, e.g., ['hydrus', 'local', '0x0.st']"""
alias: str = ""
"""Optional alias for the argument name, e.g., 'loc' for 'location'"""
handler: Optional[Any] = None
"""Optional handler function/callable for processing this argument's value"""
variadic: bool = False
"""Whether this argument accepts multiple values (consumes remaining positional args)"""
def to_dict(self) -> Dict[str, Any]:
"""Convert to dict for backward compatibility."""
d = {
"name": self.name,
"type": self.type,
"required": self.required,
"description": self.description,
"variadic": self.variadic,
}
if self.choices:
d["choices"] = self.choices
if self.alias:
d["alias"] = self.alias
return d
def resolve(self, value: Any) -> Any:
"""Resolve/process the argument value using the handler if available.
Args:
value: The raw argument value to process
Returns:
Processed value from handler, or original value if no handler
Example:
# For STORAGE arg with a handler
storage_path = SharedArgs.STORAGE.resolve('local') # Returns Path.home() / "Videos"
"""
if self.handler is not None and callable(self.handler):
return self.handler(value)
return value
def to_flags(self) -> tuple[str, ...]:
"""Generate all flag variants (short and long form) for this argument.
Returns a tuple of all valid flag forms for this argument, including:
- Long form with double dash: --name
- Single dash multi-char form: -name (for convenience)
- Short form with single dash: -alias (if alias exists)
For flags, also generates negation forms:
- --no-name, -name (negation of multi-char form)
- --no-name, -nalias (negation with alias)
Returns:
Tuple of flag strings, e.g., ('--archive', '-archive', '-arch')
or for flags: ('--archive', '-archive', '-arch', '--no-archive', '-narch')
Example:
archive_flags = SharedArgs.ARCHIVE.to_flags()
# Returns: ('--archive', '-archive', '-arch', '--no-archive', '-narch')
storage_flags = SharedArgs.STORAGE.to_flags()
# Returns: ('--storage', '-storage', '-s')
"""
flags = [f'--{self.name}', f'-{self.name}'] # Both double-dash and single-dash variants
# Add short form if alias exists
if self.alias:
flags.append(f'-{self.alias}')
# Add negation forms for flag type
if self.type == 'flag':
flags.append(f'--no-{self.name}')
flags.append(f'-no{self.name}') # Single-dash negation variant
if self.alias:
flags.append(f'-n{self.alias}')
return tuple(flags)
# ============================================================================
# SHARED ARGUMENTS - Reusable argument definitions across cmdlets
# ============================================================================
class SharedArgs:
"""Registry of shared CmdletArg definitions used across multiple cmdlets.
This class provides a centralized location for common arguments so they're
defined once and used consistently everywhere. Reduces duplication and ensures
all cmdlets handle the same arguments identically.
Example:
CMDLET = Cmdlet(
name="my-cmdlet",
summary="Does something",
usage="my-cmdlet",
args=[
SharedArgs.HASH, # Use predefined shared arg
SharedArgs.LOCATION, # Use another shared arg
CmdletArg(...), # Mix with custom args
]
)
"""
# File/Hash arguments
HASH = CmdletArg(
"hash",
type="string",
description="Override the Hydrus file hash (SHA256) to target instead of the selected result."
)
LOCATION = CmdletArg(
"location",
type="enum",
choices=["hydrus", "0x0", "local"],
required=True,
description="Destination location"
)
DELETE_FLAG = CmdletArg(
"delete",
type="flag",
description="Delete the file and its .tags after successful operation."
)
# Metadata arguments
ARTIST = CmdletArg(
"artist",
type="string",
description="Filter by artist name (case-insensitive, partial match)."
)
ALBUM = CmdletArg(
"album",
type="string",
description="Filter by album name (case-insensitive, partial match)."
)
TRACK = CmdletArg(
"track",
type="string",
description="Filter by track title (case-insensitive, partial match)."
)
# Library/Search arguments
LIBRARY = CmdletArg(
"library",
type="string",
choices=["hydrus", "local", "soulseek", "libgen", "debrid", "ftp"],
description="Search library or source location."
)
TIMEOUT = CmdletArg(
"timeout",
type="integer",
description="Search or operation timeout in seconds."
)
LIMIT = CmdletArg(
"limit",
type="integer",
description="Maximum number of results to return."
)
# Path/File arguments
PATH = CmdletArg(
"path",
type="string",
description="File or directory path."
)
OUTPUT = CmdletArg(
"output",
type="string",
description="Output file path."
)
STORAGE = CmdletArg(
"storage",
type="enum",
choices=["hydrus", "local", "debrid", "ftp"],
required=False,
description="Storage location or destination for saving/uploading files.",
alias="s",
handler=lambda val: SharedArgs.resolve_storage(val) if val else None
)
# Generic arguments
QUERY = CmdletArg(
"query",
type="string",
description="Search query string."
)
REASON = CmdletArg(
"reason",
type="string",
description="Reason or explanation for the operation."
)
ARCHIVE = CmdletArg(
"archive",
type="flag",
description="Archive the URL to Wayback Machine, Archive.today, and Archive.ph (requires URL argument in cmdlet).",
alias="arch"
)
@staticmethod
def resolve_storage(storage_value: Optional[str], default: Optional[Path] = None) -> Path:
"""Resolve a storage location name to a filesystem Path.
Maps storage identifiers (hydrus, local, debrid, ftp) to their actual
filesystem paths. This is the single source of truth for storage location resolution.
Note: 0x0.st is now accessed via file providers (-provider 0x0), not storage.
Args:
storage_value: One of 'hydrus', 'local', 'debrid', 'ftp', or None
default: Path to return if storage_value is None (defaults to Videos)
Returns:
Resolved Path object for the storage location
Raises:
ValueError: If storage_value is not a recognized storage type
Example:
# In a cmdlet:
storage_path = SharedArgs.resolve_storage(parsed.storage)
# With defaults:
path = SharedArgs.resolve_storage(None) # Returns home/Videos
path = SharedArgs.resolve_storage('local') # Returns home/Videos
path = SharedArgs.resolve_storage('hydrus') # Returns home/.hydrus/client_files
"""
storage_map = {
'local': Path.home() / "Videos",
'hydrus': Path.home() / ".hydrus" / "client_files",
'debrid': Path.home() / "Debrid",
'ftp': Path.home() / "FTP",
}
if storage_value is None:
return default or (Path.home() / "Videos")
storage_lower = storage_value.lower()
if storage_lower not in storage_map:
raise ValueError(
f"Unknown storage location '{storage_value}'. "
f"Must be one of: {', '.join(storage_map.keys())}"
)
return storage_map[storage_lower]
@classmethod
def get(cls, name: str) -> Optional[CmdletArg]:
"""Get a shared argument by name.
Args:
name: Uppercase name like 'HASH', 'LOCATION', etc.
Returns:
CmdletArg if found, None otherwise
Example:
arg = SharedArgs.get('HASH') # Returns SharedArgs.HASH
"""
try:
return getattr(cls, name.upper())
except AttributeError:
return None
@dataclass
class Cmdlet:
"""Represents a cmdlet with metadata and arguments.
Example:
cmd = Cmdlet(
name="add-file",
summary="Upload a media file",
usage="add-file <location>",
aliases=["add-file-alias"],
args=[
CmdletArg("location", required=True, description="Destination location"),
CmdletArg("-delete", type="flag", description="Delete after upload"),
],
details=[
"- This is a detail line",
"- Another detail",
]
)
# Access properties
log(cmd.name) # "add-file"
log(cmd.summary) # "Upload a media file"
log(cmd.args[0].name) # "location"
# Convert to dict for JSON serialization
log(json.dumps(cmd.to_dict()))
"""
name: str
"""Cmdlet name, e.g., 'add-file'"""
summary: str
"""One-line summary of the cmdlet"""
usage: str
"""Usage string, e.g., 'add-file <location> [-delete]'"""
aliases: List[str] = field(default_factory=list)
"""List of aliases for this cmdlet, e.g., ['add', 'add-f']"""
args: List[CmdletArg] = field(default_factory=list)
"""List of arguments accepted by this cmdlet"""
details: List[str] = field(default_factory=list)
"""Detailed explanation lines (for help text)"""
exec: Optional[Any] = field(default=None)
"""The execution function: func(result, args, config) -> int"""
def __post_init__(self) -> None:
"""Auto-discover _run function if exec not explicitly provided.
If exec is None, looks for a _run function in the module where
this Cmdlet was instantiated and uses it automatically.
"""
if self.exec is None:
# Walk up the call stack to find _run in the calling module
frame = inspect.currentframe()
try:
# Walk up frames until we find one with _run in globals
while frame:
if '_run' in frame.f_globals:
self.exec = frame.f_globals['_run']
break
frame = frame.f_back
finally:
del frame # Avoid reference cycles
def to_dict(self) -> Dict[str, Any]:
"""Convert to dict for backward compatibility with existing code.
Returns a dict matching the old CMDLET format so existing code
that expects a dict will still work.
"""
# Format command for display: "cmd: name alias: alias1, alias2"
cmd_display = f"cmd: {self.name}"
if self.aliases:
aliases_str = ", ".join(self.aliases)
cmd_display += f" alias: {aliases_str}"
return {
"name": self.name,
"summary": self.summary,
"usage": self.usage,
"cmd": cmd_display, # Display-friendly command name with aliases on one line
"aliases": self.aliases,
"args": [arg.to_dict() for arg in self.args],
"details": self.details,
}
def __getitem__(self, key: str) -> Any:
"""Dict-like access for backward compatibility.
Allows code like: cmdlet["name"] or cmdlet["args"]
"""
d = self.to_dict()
return d.get(key)
def get(self, key: str, default: Any = None) -> Any:
"""Dict-like get() method for backward compatibility."""
d = self.to_dict()
return d.get(key, default)
def get_flags(self, arg_name: str) -> set[str]:
"""Generate -name and --name flag variants for an argument.
Args:
arg_name: The argument name (e.g., 'library', 'tag', 'size')
Returns:
Set containing both single-dash and double-dash variants
(e.g., {'-library', '--library'})
Example:
if low in cmdlet.get_flags('library'):
# handle library flag
"""
return {f"-{arg_name}", f"--{arg_name}"}
def build_flag_registry(self) -> Dict[str, set[str]]:
"""Build a registry of all flag variants for this cmdlet's arguments.
Automatically generates all -name and --name variants for each argument.
Useful for parsing command-line arguments without hardcoding flags.
Returns:
Dict mapping argument names to their flag sets
(e.g., {'library': {'-library', '--library'}, 'tag': {'-tag', '--tag'}})
Example:
flags = cmdlet.build_flag_registry()
if low in flags.get('library', set()):
# handle library
elif low in flags.get('tag', set()):
# handle tag
"""
return {arg.name: self.get_flags(arg.name) for arg in self.args}
# Tag groups cache (loaded from JSON config file)
_TAG_GROUPS_CACHE: Optional[Dict[str, List[str]]] = None
_TAG_GROUPS_MTIME: Optional[float] = None
# Path to tag groups configuration (set by caller or lazily discovered)
TAG_GROUPS_PATH: Optional[Path] = None
def set_tag_groups_path(path: Path) -> None:
"""Set the path to the tag groups JSON file."""
global TAG_GROUPS_PATH
TAG_GROUPS_PATH = path
def parse_cmdlet_args(args: Sequence[str], cmdlet_spec: Dict[str, Any] | Cmdlet) -> Dict[str, Any]:
"""Parse command-line arguments based on cmdlet specification.
Extracts argument values from command-line tokens using the argument names
and types defined in the cmdlet metadata. Automatically supports single-dash
and double-dash variants of flag names. Arguments without dashes in definition
are treated as positional arguments.
Args:
args: Command-line arguments (e.g., ["-path", "/home/file.txt", "-foo", "bar"])
cmdlet_spec: Cmdlet metadata dict with "args" key containing list of arg specs,
or a Cmdlet object. Each arg spec should have at least "name" key.
Argument names can be defined with or without prefixes.
Returns:
Dict mapping canonical arg names to their parsed values. If an arg is not
provided, it will not be in the dict. Lookup will normalize prefixes.
Example:
cmdlet = {
"args": [
{"name": "path", "type": "string"}, # Positional - matches bare value or -path/--path
{"name": "count", "type": "int"} # Positional - matches bare value or -count/--count
]
}
result = parse_cmdlet_args(["value1", "-count", "5"], cmdlet)
# result = {"path": "value1", "count": "5"}
"""
result: Dict[str, Any] = {}
# Handle both dict and Cmdlet objects
if isinstance(cmdlet_spec, Cmdlet):
cmdlet_spec = cmdlet_spec.to_dict()
# Build arg specs tracking which are positional vs flagged
arg_specs: List[Dict[str, Any]] = cmdlet_spec.get("args", [])
positional_args: List[Dict[str, Any]] = [] # args without prefix in definition
flagged_args: List[Dict[str, Any]] = [] # args with prefix in definition
arg_spec_map: Dict[str, str] = {} # prefix variant -> canonical name (without prefix)
for spec in arg_specs:
name = spec.get("name")
if not name:
continue
name_str = str(name)
canonical_name = name_str.lstrip("-")
# Determine if this is positional (no dashes in original definition)
if "-" not in name_str:
positional_args.append(spec)
else:
flagged_args.append(spec)
# Register all prefix variants for flagged lookup
arg_spec_map[canonical_name.lower()] = canonical_name # bare name
arg_spec_map[f"-{canonical_name}".lower()] = canonical_name # single dash
arg_spec_map[f"--{canonical_name}".lower()] = canonical_name # double dash
# Parse arguments
i = 0
positional_index = 0 # Track which positional arg we're on
while i < len(args):
token = str(args[i])
token_lower = token.lower()
# Check if this token is a known flagged argument
if token_lower in arg_spec_map:
canonical_name = arg_spec_map[token_lower]
spec = next((s for s in arg_specs if str(s.get("name", "")).lstrip("-").lower() == canonical_name.lower()), None)
# Check if it's a flag type (which doesn't consume next value, just marks presence)
is_flag = spec and spec.get("type") == "flag"
if is_flag:
# For flags, just mark presence without consuming next token
result[canonical_name] = True
i += 1
else:
# For non-flags, consume next token as the value
if i + 1 < len(args) and not str(args[i + 1]).startswith("-"):
value = args[i + 1]
# Check if variadic
is_variadic = spec and spec.get("variadic", False)
if is_variadic:
if canonical_name not in result:
result[canonical_name] = []
elif not isinstance(result[canonical_name], list):
result[canonical_name] = [result[canonical_name]]
result[canonical_name].append(value)
else:
result[canonical_name] = value
i += 2
else:
i += 1
# Otherwise treat as positional if we have positional args remaining
elif positional_index < len(positional_args):
positional_spec = positional_args[positional_index]
canonical_name = str(positional_spec.get("name", "")).lstrip("-")
is_variadic = positional_spec.get("variadic", False)
if is_variadic:
# For variadic args, append to a list
if canonical_name not in result:
result[canonical_name] = []
elif not isinstance(result[canonical_name], list):
# Should not happen if logic is correct, but safety check
result[canonical_name] = [result[canonical_name]]
result[canonical_name].append(token)
# Do not increment positional_index so subsequent tokens also match this arg
# Note: Variadic args should typically be the last positional argument
i += 1
else:
result[canonical_name] = token
positional_index += 1
i += 1
else:
# Unknown token, skip it
i += 1
return result
def normalize_hash(hash_hex: Optional[str]) -> Optional[str]:
"""Normalize a hash string to lowercase, or return None if invalid.
Args:
hash_hex: String that should be a hex hash
Returns:
Lowercase hash string, or None if input is not a string or is empty
"""
if not isinstance(hash_hex, str):
return None
text = hash_hex.strip()
return text.lower() if text else None
def looks_like_hash(candidate: Optional[str]) -> bool:
"""Check if a string looks like a SHA256 hash (64 hex chars).
Args:
candidate: String to test
Returns:
True if the string is 64 lowercase hex characters
"""
if not isinstance(candidate, str):
return False
text = candidate.strip().lower()
return len(text) == 64 and all(ch in "0123456789abcdef" for ch in text)
def pipeline_item_local_path(item: Any) -> Optional[str]:
"""Extract local file path from a pipeline item.
Supports both dataclass objects with .target attribute and dicts.
Returns None for HTTP/HTTPS URLs.
Args:
item: Pipeline item (PipelineItem dataclass, dict, or other)
Returns:
Local file path string, or None if item is not a local file
"""
target: Optional[str] = None
if hasattr(item, "target"):
target = getattr(item, "target", None)
elif isinstance(item, dict):
raw = item.get("target") or item.get("path") or item.get("url")
target = str(raw) if raw is not None else None
if not isinstance(target, str):
return None
text = target.strip()
if not text:
return None
if text.lower().startswith(("http://", "https://")):
return None
return text
def collect_relationship_labels(payload: Any, label_stack: List[str] | None = None, mapping: Dict[str, str] | None = None) -> Dict[str, str]:
"""Recursively extract hash-to-label mappings from nested relationship data.
Walks through nested dicts/lists looking for sha256-like strings (64 hex chars)
and builds a mapping from hash to its path in the structure.
Example:
data = {
"duplicates": [
"abc123...", # Will be mapped to "duplicates"
{"type": "related", "items": ["def456..."]} # Will be mapped to "duplicates / type / items"
]
}
result = collect_relationship_labels(data)
# result = {"abc123...": "duplicates", "def456...": "duplicates / type / items"}
Args:
payload: Nested data structure (dict, list, string, etc.)
label_stack: Internal use - tracks path during recursion
mapping: Internal use - accumulates hash->label mappings
Returns:
Dict mapping hash strings to their path labels
"""
if label_stack is None:
label_stack = []
if mapping is None:
mapping = {}
if isinstance(payload, dict):
for key, value in payload.items():
next_stack = label_stack
if isinstance(key, str) and key:
formatted = key.replace('_', ' ').strip()
next_stack = label_stack + [formatted]
collect_relationship_labels(value, next_stack, mapping)
elif isinstance(payload, (list, tuple, set)):
for value in payload:
collect_relationship_labels(value, label_stack, mapping)
elif isinstance(payload, str) and looks_like_hash(payload):
hash_value = payload.lower()
if label_stack:
label = " / ".join(item for item in label_stack if item)
else:
label = "related"
mapping.setdefault(hash_value, label)
return mapping
def parse_tag_arguments(arguments: Sequence[str]) -> List[str]:
"""Parse tag arguments from command line tokens.
Handles both space-separated and comma-separated tags.
Example: parse_tag_arguments(["tag1,tag2", "tag3"]) -> ["tag1", "tag2", "tag3"]
Args:
arguments: Sequence of argument strings
Returns:
List of normalized tag strings (empty strings filtered out)
"""
tags: List[str] = []
for argument in arguments:
for token in argument.split(','):
text = token.strip()
if text:
tags.append(text)
return tags
def fmt_bytes(n: Optional[int]) -> str:
"""Format bytes as human-readable with 1 decimal place (MB/GB).
Args:
n: Number of bytes, or None
Returns:
Formatted string like "1.5 MB" or "2.0 GB", or "unknown"
"""
if n is None or n < 0:
return "unknown"
gb = n / (1024.0 * 1024.0 * 1024.0)
if gb >= 1.0:
return f"{gb:.1f} GB"
mb = n / (1024.0 * 1024.0)
return f"{mb:.1f} MB"
def _normalise_tag_group_entry(value: Any) -> Optional[str]:
"""Internal: Normalize a single tag group entry."""
if not isinstance(value, str):
value = str(value)
text = value.strip()
return text or None
def _load_tag_groups() -> Dict[str, List[str]]:
"""Load tag group definitions from JSON file with caching."""
global _TAG_GROUPS_CACHE, _TAG_GROUPS_MTIME, TAG_GROUPS_PATH
# Auto-discover adjective.json if not set
if TAG_GROUPS_PATH is None:
# Try to find adjective.json in the script directory or helper subdirectory
try:
script_dir = Path(__file__).parent.parent
# Check root directory
candidate = script_dir / "adjective.json"
if candidate.exists():
TAG_GROUPS_PATH = candidate
else:
# Check helper directory
candidate = script_dir / "helper" / "adjective.json"
if candidate.exists():
TAG_GROUPS_PATH = candidate
except Exception:
pass
if TAG_GROUPS_PATH is None:
return {}
path = TAG_GROUPS_PATH
try:
stat_result = path.stat()
except FileNotFoundError:
_TAG_GROUPS_CACHE = {}
_TAG_GROUPS_MTIME = None
return {}
except OSError as exc:
log(f"Failed to read tag groups: {exc}", file=sys.stderr)
_TAG_GROUPS_CACHE = {}
_TAG_GROUPS_MTIME = None
return {}
mtime = stat_result.st_mtime
if _TAG_GROUPS_CACHE is not None and _TAG_GROUPS_MTIME == mtime:
return _TAG_GROUPS_CACHE
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
log(f"Invalid tag group JSON ({path}): {exc}", file=sys.stderr)
_TAG_GROUPS_CACHE = {}
_TAG_GROUPS_MTIME = mtime
return {}
groups: Dict[str, List[str]] = {}
if isinstance(payload, dict):
for key, value in payload.items():
if not isinstance(key, str):
continue
name = key.strip().lower()
if not name:
continue
members: List[str] = []
if isinstance(value, list):
for entry in value:
normalised = _normalise_tag_group_entry(entry)
if normalised:
members.append(normalised)
elif isinstance(value, str):
normalised = _normalise_tag_group_entry(value)
if normalised:
members.extend(token.strip() for token in normalised.split(",") if token.strip())
if members:
groups[name] = members
_TAG_GROUPS_CACHE = groups
_TAG_GROUPS_MTIME = mtime
return groups
def expand_tag_groups(raw_tags: Iterable[str]) -> List[str]:
"""Expand tag group references (e.g., {my_group}) into member tags.
Tag groups are defined in JSON and can be nested. Groups are referenced
with curly braces: {group_name}.
Args:
raw_tags: Sequence of tag strings, some may reference groups like "{group_name}"
Returns:
List of expanded tags with group references replaced
"""
groups = _load_tag_groups()
if not groups:
return [tag for tag in raw_tags if isinstance(tag, str) and tag.strip()]
def _expand(tokens: Iterable[str], seen: Set[str]) -> List[str]:
result: List[str] = []
for token in tokens:
if not isinstance(token, str):
continue
candidate = token.strip()
if not candidate:
continue
if candidate.startswith("{") and candidate.endswith("}") and len(candidate) > 2:
name = candidate[1:-1].strip().lower()
if not name:
continue
if name in seen:
log(f"Tag group recursion detected for {{{name}}}; skipping", file=sys.stderr)
continue
members = groups.get(name)
if not members:
log(f"Unknown tag group {{{name}}}", file=sys.stderr)
result.append(candidate)
continue
result.extend(_expand(members, seen | {name}))
else:
result.append(candidate)
return result
return _expand(raw_tags, set())
def first_title_tag(source: Optional[Iterable[str]]) -> Optional[str]:
"""Find the first tag starting with "title:" in a collection.
Args:
source: Iterable of tag strings
Returns:
First title: tag found, or None
"""
if not source:
return None
for item in source:
if not isinstance(item, str):
continue
candidate = item.strip()
if candidate and candidate.lower().startswith("title:"):
return candidate
return None
def apply_preferred_title(tags: List[str], preferred: Optional[str]) -> List[str]:
"""Replace any title: tags with a preferred title tag.
Args:
tags: List of tags (may contain multiple "title:" entries)
preferred: Preferred title tag to use (full "title: ..." format)
Returns:
List with old title tags removed and preferred title added (at most once)
"""
if not preferred:
return tags
preferred_clean = preferred.strip()
if not preferred_clean:
return tags
preferred_lower = preferred_clean.lower()
filtered: List[str] = []
has_preferred = False
for tag in tags:
candidate = tag.strip()
if not candidate:
continue
if candidate.lower().startswith("title:"):
if candidate.lower() == preferred_lower:
if not has_preferred:
filtered.append(candidate)
has_preferred = True
continue
filtered.append(candidate)
if not has_preferred:
filtered.append(preferred_clean)
return filtered
# ============================================================================
# PIPEOBJECT UTILITIES (for chainable cmdlets and multi-action pipelines)
# ============================================================================
def create_pipe_object_result(
source: str,
identifier: str,
file_path: str,
cmdlet_name: str,
title: Optional[str] = None,
file_hash: Optional[str] = None,
is_temp: bool = False,
parent_hash: Optional[str] = None,
tags: Optional[List[str]] = None,
**extra: Any
) -> Dict[str, Any]:
"""Create a PipeObject-compatible result dict for pipeline chaining.
This is a helper to emit results in the standard format that downstream
cmdlets can process (filter, tag, cleanup, etc.).
Args:
source: Source system (e.g., 'local', 'hydrus', 'download')
identifier: Unique ID from source
file_path: Path to the file
cmdlet_name: Name of the cmdlet that created this (e.g., 'download-data', 'screen-shot')
title: Human-readable title
file_hash: SHA-256 hash of file (for integrity)
is_temp: If True, this is a temporary/intermediate artifact
parent_hash: Hash of the parent file in the chain (for provenance)
tags: List of tags to apply
**extra: Additional fields
Returns:
Dict with all PipeObject fields for emission
"""
result = {
'source': source,
'id': identifier,
'file_path': file_path,
'action': f'cmdlet:{cmdlet_name}', # Format: cmdlet:cmdlet_name
}
if title:
result['title'] = title
if file_hash:
result['file_hash'] = file_hash
if is_temp:
result['is_temp'] = True
if parent_hash:
result['parent_id'] = parent_hash # parent_id is the parent's file_hash
if tags:
result['tags'] = tags
# Add any extra fields
result.update(extra)
return result
def mark_as_temp(pipe_object: Dict[str, Any]) -> Dict[str, Any]:
"""Mark a PipeObject dict as temporary (intermediate artifact).
Args:
pipe_object: Result dict from cmdlet emission
Returns:
Modified dict with is_temp=True
"""
pipe_object['is_temp'] = True
return pipe_object
def set_parent_id(pipe_object: Dict[str, Any], parent_hash: str) -> Dict[str, Any]:
"""Set the parent_id for provenance tracking.
Args:
pipe_object: Result dict
parent_hash: Parent file's hash
Returns:
Modified dict with parent_id set to the hash
"""
pipe_object['parent_id'] = parent_hash
return pipe_object
def get_pipe_object_path(pipe_object: Any) -> Optional[str]:
"""Extract file path from PipeObject, dict, or pipeline-friendly object."""
if pipe_object is None:
return None
for attr in ('file_path', 'path', 'target'):
if hasattr(pipe_object, attr):
value = getattr(pipe_object, attr)
if value:
return value
if isinstance(pipe_object, dict):
for key in ('file_path', 'path', 'target'):
value = pipe_object.get(key)
if value:
return value
return None
def get_pipe_object_hash(pipe_object: Any) -> Optional[str]:
"""Extract file hash from PipeObject, dict, or pipeline-friendly object."""
if pipe_object is None:
return None
for attr in ('file_hash', 'hash_hex', 'hash'):
if hasattr(pipe_object, attr):
value = getattr(pipe_object, attr)
if value:
return value
if isinstance(pipe_object, dict):
for key in ('file_hash', 'hash_hex', 'hash'):
value = pipe_object.get(key)
if value:
return value
return None
def normalize_result_input(result: Any) -> List[Dict[str, Any]]:
"""Normalize input result to a list of dicts.
Handles:
- None -> []
- Dict -> [dict]
- List of dicts -> list as-is
- PipeObject -> [dict]
- List of PipeObjects -> list of dicts
Args:
result: Result from piped input
Returns:
List of result dicts (may be empty)
"""
if result is None:
return []
# Single dict
if isinstance(result, dict):
return [result]
# List - convert each item to dict if needed
if isinstance(result, list):
output = []
for item in result:
if isinstance(item, dict):
output.append(item)
elif hasattr(item, 'to_dict'):
output.append(item.to_dict())
else:
# Try as-is
output.append(item)
return output
# PipeObject or other object with to_dict
if hasattr(result, 'to_dict'):
return [result.to_dict()]
# Fallback: wrap it
if isinstance(result, dict):
return [result]
return []
def filter_results_by_temp(results: List[Any], include_temp: bool = False) -> List[Any]:
"""Filter results by temporary status.
Args:
results: List of result dicts or PipeObjects
include_temp: If True, keep temp files; if False, exclude them
Returns:
Filtered list
"""
if include_temp:
return results
filtered = []
for result in results:
is_temp = False
# Check PipeObject
if hasattr(result, 'is_temp'):
is_temp = result.is_temp
# Check dict
elif isinstance(result, dict):
is_temp = result.get('is_temp', False)
if not is_temp:
filtered.append(result)
return filtered
def merge_sequences(*sources: Optional[Iterable[Any]], case_sensitive: bool = True) -> list[str]:
"""Merge iterable sources while preserving order and removing duplicates."""
seen: set[str] = set()
merged: list[str] = []
for source in sources:
if not source:
continue
if isinstance(source, str) or not isinstance(source, IterableABC):
iterable = [source]
else:
iterable = source
for value in iterable:
if value is None:
continue
text = str(value).strip()
if not text:
continue
key = text if case_sensitive else text.lower()
if key in seen:
continue
seen.add(key)
merged.append(text)
return merged
def extract_tags_from_result(result: Any) -> list[str]:
tags: list[str] = []
if isinstance(result, models.PipeObject):
tags.extend(result.tags or [])
tags.extend(result.extra.get('tags', []))
elif hasattr(result, 'tags'):
# Handle objects with tags attribute (e.g. SearchResult)
val = getattr(result, 'tags')
if isinstance(val, (list, set, tuple)):
tags.extend(val)
elif isinstance(val, str):
tags.append(val)
if isinstance(result, dict):
raw_tags = result.get('tags')
if isinstance(raw_tags, list):
tags.extend(raw_tags)
elif isinstance(raw_tags, str):
tags.append(raw_tags)
extra = result.get('extra')
if isinstance(extra, dict):
extra_tags = extra.get('tags')
if isinstance(extra_tags, list):
tags.extend(extra_tags)
elif isinstance(extra_tags, str):
tags.append(extra_tags)
return merge_sequences(tags, case_sensitive=True)
def extract_title_from_result(result: Any) -> Optional[str]:
"""Extract the title from a result dict or PipeObject."""
if isinstance(result, models.PipeObject):
return result.title
elif hasattr(result, 'title'):
return getattr(result, 'title')
elif isinstance(result, dict):
return result.get('title')
return None
def extract_known_urls_from_result(result: Any) -> list[str]:
urls: list[str] = []
def _extend(candidate: Any) -> None:
if not candidate:
return
if isinstance(candidate, list):
urls.extend(candidate)
elif isinstance(candidate, str):
urls.append(candidate)
if isinstance(result, models.PipeObject):
_extend(result.extra.get('known_urls'))
if isinstance(result.metadata, dict):
_extend(result.metadata.get('known_urls'))
_extend(result.metadata.get('urls'))
elif hasattr(result, 'known_urls') or hasattr(result, 'urls'):
# Handle objects with known_urls/urls attribute
_extend(getattr(result, 'known_urls', None))
_extend(getattr(result, 'urls', None))
if isinstance(result, dict):
_extend(result.get('known_urls'))
_extend(result.get('urls'))
extra = result.get('extra')
if isinstance(extra, dict):
_extend(extra.get('known_urls'))
_extend(extra.get('urls'))
return merge_sequences(urls, case_sensitive=True)
def extract_relationships(result: Any) -> Optional[Dict[str, Any]]:
if isinstance(result, models.PipeObject):
relationships = result.get_relationships()
return relationships or None
if isinstance(result, dict):
relationships = result.get('relationships')
if isinstance(relationships, dict) and relationships:
return relationships
return None
def extract_duration(result: Any) -> Optional[float]:
duration = None
if isinstance(result, models.PipeObject):
duration = result.duration
elif isinstance(result, dict):
duration = result.get('duration')
if duration is None:
metadata = result.get('metadata')
if isinstance(metadata, dict):
duration = metadata.get('duration')
if duration is None:
return None
try:
return float(duration)
except (TypeError, ValueError):
return None