This commit is contained in:
nose
2025-12-11 12:47:30 -08:00
parent 6b05dc5552
commit 65d12411a2
92 changed files with 17447 additions and 14308 deletions

View File

@@ -11,7 +11,7 @@ import sys
import inspect
from collections.abc import Iterable as IterableABC
from helper.logger import log
from helper.logger import log, debug
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set
from dataclasses import dataclass, field
@@ -37,22 +37,9 @@ class CmdletArg:
"""Optional handler function/callable for processing this argument's value"""
variadic: bool = False
"""Whether this argument accepts multiple values (consumes remaining positional args)"""
def to_dict(self) -> Dict[str, Any]:
"""Convert to dict for backward compatibility."""
d = {
"name": self.name,
"type": self.type,
"required": self.required,
"description": self.description,
"variadic": self.variadic,
}
if self.choices:
d["choices"] = self.choices
if self.alias:
d["alias"] = self.alias
return d
usage: str = ""
"""dsf"""
def resolve(self, value: Any) -> Any:
"""Resolve/process the argument value using the handler if available.
@@ -135,11 +122,68 @@ class SharedArgs:
# File/Hash arguments
HASH = CmdletArg(
"hash",
name="hash",
type="string",
description="Override the Hydrus file hash (SHA256) to target instead of the selected result."
description="File hash (SHA256, 64-char hex string)",
)
STORE = CmdletArg(
name="store",
type="enum",
choices=[], # Dynamically populated via get_store_choices()
description="Selects store",
)
PATH = CmdletArg(
name="path",
type="string",
choices=[], # Dynamically populated via get_store_choices()
description="Selects store",
)
URL = CmdletArg(
name="url",
type="string",
description="http parser",
)
@staticmethod
def get_store_choices(config: Optional[Dict[str, Any]] = None) -> List[str]:
"""Get list of available storage backend names from FileStorage.
This method dynamically discovers all configured storage backends
instead of using a static list. Should be called when building
autocomplete choices or validating store names.
Args:
config: Optional config dict. If not provided, will try to load from config module.
Returns:
List of backend names (e.g., ['default', 'test', 'home', 'work'])
Example:
# In a cmdlet that needs dynamic choices
from helper.store import FileStorage
storage = FileStorage(config)
SharedArgs.STORE.choices = SharedArgs.get_store_choices(config)
"""
try:
from helper.store import FileStorage
# If no config provided, try to load it
if config is None:
try:
from config import load_config
config = load_config()
except Exception:
return []
file_storage = FileStorage(config)
return file_storage.list_backends()
except Exception:
# Fallback to empty list if FileStorage isn't available
return []
LOCATION = CmdletArg(
"location",
type="enum",
@@ -205,16 +249,7 @@ class SharedArgs:
type="string",
description="Output file path."
)
STORAGE = CmdletArg(
"storage",
type="enum",
choices=["hydrus", "local", "ftp", "matrix"],
required=False,
description="Storage location or destination for saving/uploading files.",
alias="s",
handler=lambda val: SharedArgs.resolve_storage(val) if val else None
)
# Generic arguments
QUERY = CmdletArg(
@@ -325,78 +360,61 @@ class Cmdlet:
log(cmd.name) # "add-file"
log(cmd.summary) # "Upload a media file"
log(cmd.args[0].name) # "location"
# Convert to dict for JSON serialization
log(json.dumps(cmd.to_dict()))
"""
name: str
"""Cmdlet name, e.g., 'add-file'"""
""""""
summary: str
"""One-line summary of the cmdlet"""
usage: str
"""Usage string, e.g., 'add-file <location> [-delete]'"""
aliases: List[str] = field(default_factory=list)
alias: List[str] = field(default_factory=list)
"""List of aliases for this cmdlet, e.g., ['add', 'add-f']"""
args: List[CmdletArg] = field(default_factory=list)
arg: List[CmdletArg] = field(default_factory=list)
"""List of arguments accepted by this cmdlet"""
details: List[str] = field(default_factory=list)
detail: List[str] = field(default_factory=list)
"""Detailed explanation lines (for help text)"""
exec: Optional[Any] = field(default=None)
"""The execution function: func(result, args, config) -> int"""
def __post_init__(self) -> None:
"""Auto-discover _run function if exec not explicitly provided.
If exec is None, looks for a _run function in the module where
this Cmdlet was instantiated and uses it automatically.
"""
if self.exec is None:
# Walk up the call stack to find _run in the calling module
frame = inspect.currentframe()
try:
# Walk up frames until we find one with _run in globals
while frame:
if '_run' in frame.f_globals:
self.exec = frame.f_globals['_run']
break
frame = frame.f_back
finally:
del frame # Avoid reference cycles
def to_dict(self) -> Dict[str, Any]:
"""Convert to dict for backward compatibility with existing code.
Returns a dict matching the old CMDLET format so existing code
that expects a dict will still work.
"""
# Format command for display: "cmd: name alias: alias1, alias2"
cmd_display = f"cmd: {self.name}"
if self.aliases:
aliases_str = ", ".join(self.aliases)
cmd_display += f" alias: {aliases_str}"
return {
"name": self.name,
"summary": self.summary,
"usage": self.usage,
"cmd": cmd_display, # Display-friendly command name with aliases on one line
"aliases": self.aliases,
"args": [arg.to_dict() for arg in self.args],
"details": self.details,
}
def __getitem__(self, key: str) -> Any:
"""Dict-like access for backward compatibility.
Allows code like: cmdlet["name"] or cmdlet["args"]
"""
d = self.to_dict()
return d.get(key)
def get(self, key: str, default: Any = None) -> Any:
"""Dict-like get() method for backward compatibility."""
d = self.to_dict()
return d.get(key, default)
def _collect_names(self) -> List[str]:
"""Collect primary name plus aliases, de-duplicated and normalized."""
names: List[str] = []
if self.name:
names.append(self.name)
for alias in (self.alias or []):
if alias:
names.append(alias)
for alias in (getattr(self, "aliases", None) or []):
if alias:
names.append(alias)
seen: Set[str] = set()
deduped: List[str] = []
for name in names:
key = name.replace("_", "-").lower()
if key in seen:
continue
seen.add(key)
deduped.append(name)
return deduped
def register(self) -> "Cmdlet":
"""Register this cmdlet's exec under its name and aliases."""
if not callable(self.exec):
return self
try:
from . import register as _register # Local import to avoid circular import cost
except Exception:
return self
names = self._collect_names()
if not names:
return self
_register(names)(self.exec)
return self
def get_flags(self, arg_name: str) -> set[str]:
"""Generate -name and --name flag variants for an argument.
@@ -432,7 +450,7 @@ class Cmdlet:
elif low in flags.get('tag', set()):
# handle tag
"""
return {arg.name: self.get_flags(arg.name) for arg in self.args}
return {arg.name: self.get_flags(arg.name) for arg in self.arg}
# Tag groups cache (loaded from JSON config file)
@@ -479,19 +497,19 @@ def parse_cmdlet_args(args: Sequence[str], cmdlet_spec: Dict[str, Any] | Cmdlet)
"""
result: Dict[str, Any] = {}
# Handle both dict and Cmdlet objects
if isinstance(cmdlet_spec, Cmdlet):
cmdlet_spec = cmdlet_spec.to_dict()
# Only accept Cmdlet objects
if not isinstance(cmdlet_spec, Cmdlet):
raise TypeError(f"Expected Cmdlet, got {type(cmdlet_spec).__name__}")
# Build arg specs tracking which are positional vs flagged
arg_specs: List[Dict[str, Any]] = cmdlet_spec.get("args", [])
positional_args: List[Dict[str, Any]] = [] # args without prefix in definition
flagged_args: List[Dict[str, Any]] = [] # args with prefix in definition
# Build arg specs from cmdlet
arg_specs: List[CmdletArg] = cmdlet_spec.arg
positional_args: List[CmdletArg] = [] # args without prefix in definition
flagged_args: List[CmdletArg] = [] # args with prefix in definition
arg_spec_map: Dict[str, str] = {} # prefix variant -> canonical name (without prefix)
for spec in arg_specs:
name = spec.get("name")
name = spec.name
if not name:
continue
@@ -520,10 +538,10 @@ def parse_cmdlet_args(args: Sequence[str], cmdlet_spec: Dict[str, Any] | Cmdlet)
# Check if this token is a known flagged argument
if token_lower in arg_spec_map:
canonical_name = arg_spec_map[token_lower]
spec = next((s for s in arg_specs if str(s.get("name", "")).lstrip("-").lower() == canonical_name.lower()), None)
spec = next((s for s in arg_specs if str(s.name).lstrip("-").lower() == canonical_name.lower()), None)
# Check if it's a flag type (which doesn't consume next value, just marks presence)
is_flag = spec and spec.get("type") == "flag"
is_flag = spec and spec.type == "flag"
if is_flag:
# For flags, just mark presence without consuming next token
@@ -535,7 +553,7 @@ def parse_cmdlet_args(args: Sequence[str], cmdlet_spec: Dict[str, Any] | Cmdlet)
value = args[i + 1]
# Check if variadic
is_variadic = spec and spec.get("variadic", False)
is_variadic = spec and spec.variadic
if is_variadic:
if canonical_name not in result:
result[canonical_name] = []
@@ -550,8 +568,8 @@ def parse_cmdlet_args(args: Sequence[str], cmdlet_spec: Dict[str, Any] | Cmdlet)
# Otherwise treat as positional if we have positional args remaining
elif positional_index < len(positional_args):
positional_spec = positional_args[positional_index]
canonical_name = str(positional_spec.get("name", "")).lstrip("-")
is_variadic = positional_spec.get("variadic", False)
canonical_name = str(positional_spec.name).lstrip("-")
is_variadic = positional_spec.variadic
if is_variadic:
# For variadic args, append to a list
@@ -591,6 +609,183 @@ def normalize_hash(hash_hex: Optional[str]) -> Optional[str]:
return text.lower() if text else None
def get_hash_for_operation(override_hash: Optional[str], result: Any, field_name: str = "hash_hex") -> Optional[str]:
"""Get normalized hash from override or result object, consolidating common pattern.
Eliminates repeated pattern: normalize_hash(override) if override else normalize_hash(get_field(result, ...))
Args:
override_hash: Hash passed as command argument (takes precedence)
result: Object containing hash field (fallback)
field_name: Name of hash field in result object (default: "hash_hex")
Returns:
Normalized hash string, or None if neither override nor result provides valid hash
"""
if override_hash:
return normalize_hash(override_hash)
# Try multiple field names for robustness
hash_value = get_field(result, field_name) or getattr(result, field_name, None) or getattr(result, "hash", None) or result.get("file_hash") if isinstance(result, dict) else None
return normalize_hash(hash_value)
def fetch_hydrus_metadata(config: Any, hash_hex: str, **kwargs) -> tuple[Optional[Dict[str, Any]], Optional[int]]:
"""Fetch metadata from Hydrus for a given hash, consolidating common fetch pattern.
Eliminates repeated boilerplate: client initialization, error handling, metadata extraction.
Args:
config: Configuration object (passed to hydrus_wrapper.get_client)
hash_hex: File hash to fetch metadata for
**kwargs: Additional arguments to pass to client.fetch_file_metadata()
Common: include_service_keys_to_tags, include_notes, include_file_url, include_duration, etc.
Returns:
Tuple of (metadata_dict, error_code)
- metadata_dict: Dict from Hydrus (first item in metadata list) or None if unavailable
- error_code: 0 on success, 1 on any error (suitable for returning from cmdlet execute())
"""
from helper import hydrus
hydrus_wrapper = hydrus
try:
client = hydrus_wrapper.get_client(config)
except Exception as exc:
log(f"Hydrus client unavailable: {exc}")
return None, 1
if client is None:
log("Hydrus client unavailable")
return None, 1
try:
payload = client.fetch_file_metadata(hashes=[hash_hex], **kwargs)
except Exception as exc:
log(f"Hydrus metadata fetch failed: {exc}")
return None, 1
items = payload.get("metadata") if isinstance(payload, dict) else None
meta = items[0] if (isinstance(items, list) and items and isinstance(items[0], dict)) else None
return meta, 0
def get_origin(obj: Any, default: Optional[str] = None) -> Optional[str]:
"""Extract origin field with fallback to store/source field, consolidating common pattern.
Supports both dict and object access patterns.
Args:
obj: Object (dict or dataclass) with 'store', 'origin', or 'source' field
default: Default value if none of the fields are found
Returns:
Store/origin/source string, or default if none exist
"""
if isinstance(obj, dict):
return obj.get("store") or obj.get("origin") or obj.get("source") or default
else:
return getattr(obj, "store", None) or getattr(obj, "origin", None) or getattr(obj, "source", None) or default
def get_field(obj: Any, field: str, default: Optional[Any] = None) -> Any:
"""Extract a field from either a dict or object with fallback default.
Handles both dict.get(field) and getattr(obj, field) access patterns.
Also handles lists by accessing the first element.
For PipeObjects, checks the extra field as well.
Used throughout cmdlets to uniformly access fields from mixed types.
Args:
obj: Dict, object, or list to extract from
field: Field name to retrieve
default: Value to return if field not found (default: None)
Returns:
Field value if found, otherwise the default value
Examples:
get_field(result, "hash") # From dict or object
get_field(result, "origin", "unknown") # With default
"""
# Handle lists by accessing the first element
if isinstance(obj, list) and obj:
obj = obj[0]
if isinstance(obj, dict):
# Direct lookup first
val = obj.get(field, default)
if val is not None:
return val
# Fallback aliases for common fields
if field == "path":
for alt in ("file_path", "target", "filepath", "file"):
v = obj.get(alt)
if v:
return v
if field == "hash":
for alt in ("file_hash", "hash_hex"):
v = obj.get(alt)
if v:
return v
if field == "store":
for alt in ("storage", "storage_source", "origin"):
v = obj.get(alt)
if v:
return v
return default
else:
# Try direct attribute access first
value = getattr(obj, field, None)
if value is not None:
return value
# Attribute fallback aliases for common fields
if field == "path":
for alt in ("file_path", "target", "filepath", "file", "url"):
v = getattr(obj, alt, None)
if v:
return v
if field == "hash":
for alt in ("file_hash", "hash_hex"):
v = getattr(obj, alt, None)
if v:
return v
if field == "store":
for alt in ("storage", "storage_source", "origin"):
v = getattr(obj, alt, None)
if v:
return v
# For PipeObjects, also check the extra field
if hasattr(obj, 'extra') and isinstance(obj.extra, dict):
return obj.extra.get(field, default)
return default
def should_show_help(args: Sequence[str]) -> bool:
"""Check if help flag was passed in arguments.
Consolidates repeated pattern of checking for help flags across cmdlets.
Args:
args: Command arguments to check
Returns:
True if any help flag is present (-?, /?, --help, -h, help, --cmdlet)
Examples:
if should_show_help(args):
log(json.dumps(CMDLET, ensure_ascii=False, indent=2))
return 0
"""
try:
return any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in args)
except Exception:
return False
def looks_like_hash(candidate: Optional[str]) -> bool:
"""Check if a string looks like a SHA256 hash (64 hex chars).
@@ -609,8 +804,8 @@ def looks_like_hash(candidate: Optional[str]) -> bool:
def pipeline_item_local_path(item: Any) -> Optional[str]:
"""Extract local file path from a pipeline item.
Supports both dataclass objects with .target attribute and dicts.
Returns None for HTTP/HTTPS URLs.
Supports both dataclass objects with .path attribute and dicts.
Returns None for HTTP/HTTPS url.
Args:
item: Pipeline item (PipelineItem dataclass, dict, or other)
@@ -618,15 +813,15 @@ def pipeline_item_local_path(item: Any) -> Optional[str]:
Returns:
Local file path string, or None if item is not a local file
"""
target: Optional[str] = None
if hasattr(item, "target"):
target = getattr(item, "target", None)
path_value: Optional[str] = None
if hasattr(item, "path"):
path_value = getattr(item, "path", None)
elif isinstance(item, dict):
raw = item.get("target") or item.get("path") or item.get("url")
target = str(raw) if raw is not None else None
if not isinstance(target, str):
raw = item.get("path") or item.get("url")
path_value = str(raw) if raw is not None else None
if not isinstance(path_value, str):
return None
text = target.strip()
text = path_value.strip()
if not text:
return None
if text.lower().startswith(("http://", "https://")):
@@ -686,22 +881,60 @@ def collect_relationship_labels(payload: Any, label_stack: List[str] | None = No
def parse_tag_arguments(arguments: Sequence[str]) -> List[str]:
"""Parse tag arguments from command line tokens.
Handles both space-separated and comma-separated tags.
Example: parse_tag_arguments(["tag1,tag2", "tag3"]) -> ["tag1", "tag2", "tag3"]
- Supports comma-separated tags.
- Supports pipe namespace shorthand: "artist:A|B|C" -> artist:A, artist:B, artist:C.
Args:
arguments: Sequence of argument strings
Returns:
List of normalized tag strings (empty strings filtered out)
"""
def _expand_pipe_namespace(text: str) -> List[str]:
parts = text.split('|')
expanded: List[str] = []
last_ns: Optional[str] = None
for part in parts:
segment = part.strip()
if not segment:
continue
if ':' in segment:
ns, val = segment.split(':', 1)
ns = ns.strip()
val = val.strip()
last_ns = ns or last_ns
if last_ns and val:
expanded.append(f"{last_ns}:{val}")
elif ns or val:
expanded.append(f"{ns}:{val}".strip(':'))
else:
if last_ns:
expanded.append(f"{last_ns}:{segment}")
else:
expanded.append(segment)
return expanded
tags: List[str] = []
for argument in arguments:
for token in argument.split(','):
text = token.strip()
if text:
tags.append(text)
if not text:
continue
# Expand namespace shorthand with pipes
pipe_expanded = _expand_pipe_namespace(text)
for entry in pipe_expanded:
candidate = entry.strip()
if not candidate:
continue
if ':' in candidate:
ns, val = candidate.split(':', 1)
ns = ns.strip()
val = val.strip()
candidate = f"{ns}:{val}" if ns or val else ""
if candidate:
tags.append(candidate)
return tags
@@ -944,7 +1177,7 @@ def create_pipe_object_result(
result = {
'source': source,
'id': identifier,
'file_path': file_path,
'path': file_path,
'action': f'cmdlet:{cmdlet_name}', # Format: cmdlet:cmdlet_name
}
@@ -952,6 +1185,7 @@ def create_pipe_object_result(
result['title'] = title
if file_hash:
result['file_hash'] = file_hash
result['hash'] = file_hash
if is_temp:
result['is_temp'] = True
if parent_hash:
@@ -959,6 +1193,13 @@ def create_pipe_object_result(
if tags:
result['tags'] = tags
# Canonical store field: use source for compatibility
try:
if source:
result['store'] = source
except Exception:
pass
# Add any extra fields
result.update(extra)
@@ -996,13 +1237,13 @@ def get_pipe_object_path(pipe_object: Any) -> Optional[str]:
"""Extract file path from PipeObject, dict, or pipeline-friendly object."""
if pipe_object is None:
return None
for attr in ('file_path', 'path', 'target'):
for attr in ('path', 'target'):
if hasattr(pipe_object, attr):
value = getattr(pipe_object, attr)
if value:
return value
if isinstance(pipe_object, dict):
for key in ('file_path', 'path', 'target'):
for key in ('path', 'target'):
value = pipe_object.get(key)
if value:
return value
@@ -1209,40 +1450,40 @@ def extract_title_from_result(result: Any) -> Optional[str]:
return None
def extract_known_urls_from_result(result: Any) -> list[str]:
urls: list[str] = []
def extract_url_from_result(result: Any) -> list[str]:
url: list[str] = []
def _extend(candidate: Any) -> None:
if not candidate:
return
if isinstance(candidate, list):
urls.extend(candidate)
url.extend(candidate)
elif isinstance(candidate, str):
urls.append(candidate)
url.append(candidate)
if isinstance(result, models.PipeObject):
_extend(result.extra.get('known_urls'))
_extend(result.extra.get('url'))
_extend(result.extra.get('url')) # Also check singular url
if isinstance(result.metadata, dict):
_extend(result.metadata.get('known_urls'))
_extend(result.metadata.get('urls'))
_extend(result.metadata.get('url'))
elif hasattr(result, 'known_urls') or hasattr(result, 'urls'):
# Handle objects with known_urls/urls attribute
_extend(getattr(result, 'known_urls', None))
_extend(getattr(result, 'urls', None))
_extend(result.metadata.get('url'))
_extend(result.metadata.get('url'))
elif hasattr(result, 'url') or hasattr(result, 'url'):
# Handle objects with url/url attribute
_extend(getattr(result, 'url', None))
_extend(getattr(result, 'url', None))
if isinstance(result, dict):
_extend(result.get('known_urls'))
_extend(result.get('urls'))
_extend(result.get('url'))
_extend(result.get('url'))
_extend(result.get('url'))
extra = result.get('extra')
if isinstance(extra, dict):
_extend(extra.get('known_urls'))
_extend(extra.get('urls'))
_extend(extra.get('url'))
_extend(extra.get('url'))
_extend(extra.get('url'))
return merge_sequences(urls, case_sensitive=True)
return merge_sequences(url, case_sensitive=True)
def extract_relationships(result: Any) -> Optional[Dict[str, Any]]:
@@ -1272,3 +1513,248 @@ def extract_duration(result: Any) -> Optional[float]:
return float(duration)
except (TypeError, ValueError):
return None
def coerce_to_pipe_object(value: Any, default_path: Optional[str] = None) -> models.PipeObject:
"""Normalize any incoming result to a PipeObject for single-source-of-truth state.
Uses hash+store canonical pattern.
"""
# Debug: Print ResultItem details if coming from search_file.py
try:
from helper.logger import is_debug_enabled, debug
if is_debug_enabled() and hasattr(value, '__class__') and value.__class__.__name__ == 'ResultItem':
debug("[ResultItem -> PipeObject conversion]")
debug(f" origin={getattr(value, 'origin', None)}")
debug(f" title={getattr(value, 'title', None)}")
debug(f" target={getattr(value, 'target', None)}")
debug(f" hash_hex={getattr(value, 'hash_hex', None)}")
debug(f" media_kind={getattr(value, 'media_kind', None)}")
debug(f" tags={getattr(value, 'tags', None)}")
debug(f" tag_summary={getattr(value, 'tag_summary', None)}")
debug(f" size_bytes={getattr(value, 'size_bytes', None)}")
debug(f" duration_seconds={getattr(value, 'duration_seconds', None)}")
debug(f" relationships={getattr(value, 'relationships', None)}")
debug(f" url={getattr(value, 'url', None)}")
debug(f" full_metadata keys={list(getattr(value, 'full_metadata', {}).keys()) if hasattr(value, 'full_metadata') and value.full_metadata else []}")
except Exception:
pass
if isinstance(value, models.PipeObject):
return value
known_keys = {
"hash", "store", "tags", "title", "url", "source_url", "duration", "metadata",
"warnings", "path", "relationships", "is_temp", "action", "parent_hash",
}
# Convert ResultItem to dict to preserve all attributes
if hasattr(value, 'to_dict'):
value = value.to_dict()
if isinstance(value, dict):
# Extract hash and store (canonical identifiers)
hash_val = value.get("hash") or value.get("file_hash")
# Recognize multiple possible store naming conventions (store, origin, storage, storage_source)
store_val = value.get("store") or value.get("origin") or value.get("storage") or value.get("storage_source") or "PATH"
# If the store value is embedded under extra, also detect it
if not store_val or store_val in ("local", "PATH"):
extra_store = None
try:
extra_store = value.get("extra", {}).get("store") or value.get("extra", {}).get("storage") or value.get("extra", {}).get("storage_source")
except Exception:
extra_store = None
if extra_store:
store_val = extra_store
# If no hash, try to compute from path or use placeholder
if not hash_val:
path_val = value.get("path")
if path_val:
try:
from helper.utils import sha256_file
from pathlib import Path
hash_val = sha256_file(Path(path_val))
except Exception:
hash_val = "unknown"
else:
hash_val = "unknown"
# Extract title from filename if not provided
title_val = value.get("title")
if not title_val:
path_val = value.get("path")
if path_val:
try:
from pathlib import Path
title_val = Path(path_val).stem
except Exception:
pass
extra = {k: v for k, v in value.items() if k not in known_keys}
# Extract URL: prefer direct url field, then url list
url_val = value.get("url")
if not url_val:
url = value.get("url") or value.get("url") or []
if url and isinstance(url, list) and len(url) > 0:
url_val = url[0]
# Preserve url in extra if multiple url exist
if url and len(url) > 1:
extra["url"] = url
# Extract relationships
rels = value.get("relationships") or {}
# Consolidate tags: prefer tags_set over tags, tag_summary
tags_val = []
if "tags_set" in value and value["tags_set"]:
tags_val = list(value["tags_set"])
elif "tags" in value and isinstance(value["tags"], (list, set)):
tags_val = list(value["tags"])
elif "tag" in value:
# Single tag string or list
if isinstance(value["tag"], list):
tags_val = value["tag"] # Already a list
else:
tags_val = [value["tag"]] # Wrap single string in list
# Consolidate path: prefer explicit path key, but NOT target if it's a URL
path_val = value.get("path")
# Only use target as path if it's not a URL (url should stay in url field)
if not path_val and "target" in value:
target = value["target"]
if target and not (isinstance(target, str) and (target.startswith("http://") or target.startswith("https://"))):
path_val = target
# If the path value is actually a URL, move it to url_val and clear path_val
try:
if isinstance(path_val, str) and (path_val.startswith("http://") or path_val.startswith("https://")):
# Prefer existing url_val if present, otherwise move path_val into url_val
if not url_val:
url_val = path_val
path_val = None
except Exception:
pass
# Extract media_kind if available
if "media_kind" in value:
extra["media_kind"] = value["media_kind"]
pipe_obj = models.PipeObject(
hash=hash_val,
store=store_val,
tags=tags_val,
title=title_val,
url=url_val,
source_url=value.get("source_url"),
duration=value.get("duration") or value.get("duration_seconds"),
metadata=value.get("metadata") or value.get("full_metadata") or {},
warnings=list(value.get("warnings") or []),
path=path_val,
relationships=rels,
is_temp=bool(value.get("is_temp", False)),
action=value.get("action"),
parent_hash=value.get("parent_hash") or value.get("parent_id"),
extra=extra,
)
# Debug: Print formatted table
pipe_obj.debug_table()
return pipe_obj
# Fallback: build from path argument or bare value
hash_val = "unknown"
path_val = default_path or getattr(value, "path", None)
title_val = None
if path_val and path_val != "unknown":
try:
from helper.utils import sha256_file
from pathlib import Path
path_obj = Path(path_val)
hash_val = sha256_file(path_obj)
# Extract title from filename (without extension)
title_val = path_obj.stem
except Exception:
pass
# When coming from path argument, store should be "PATH" (file path, not a backend)
store_val = "PATH"
pipe_obj = models.PipeObject(
hash=hash_val,
store=store_val,
path=str(path_val) if path_val and path_val != "unknown" else None,
title=title_val,
tags=[],
extra={},
)
# Debug: Print formatted table
pipe_obj.debug_table()
return pipe_obj
def register_url_with_local_library(pipe_obj: models.PipeObject, config: Dict[str, Any]) -> bool:
"""Register url with a file in the local library database.
This is called automatically by download cmdlets to ensure url are persisted
without requiring a separate add-url step in the pipeline.
Args:
pipe_obj: PipeObject with path and url
config: Config dict containing local library path
Returns:
True if url were registered, False otherwise
"""
try:
from config import get_local_storage_path
from helper.folder_store import FolderDB
file_path = get_field(pipe_obj, "path")
url_field = get_field(pipe_obj, "url", [])
urls: List[str] = []
if isinstance(url_field, str):
urls = [u.strip() for u in url_field.split(",") if u.strip()]
elif isinstance(url_field, (list, tuple)):
urls = [u for u in url_field if isinstance(u, str) and u.strip()]
if not file_path or not urls:
return False
path_obj = Path(file_path)
if not path_obj.exists():
return False
storage_path = get_local_storage_path(config)
if not storage_path:
return False
with FolderDB(storage_path) as db:
file_hash = db.get_file_hash(path_obj)
if not file_hash:
return False
metadata = db.get_metadata(file_hash) or {}
existing_url = metadata.get("url") or []
# Add any new url
changed = False
for u in urls:
if u not in existing_url:
existing_url.append(u)
changed = True
if changed:
metadata["url"] = existing_url
db.save_metadata(path_obj, metadata)
return True
return True # url already existed
except Exception:
return False