418 lines
16 KiB
Python
418 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, Sequence, Optional
|
|
import json
|
|
import sys
|
|
|
|
from SYS.logger import log
|
|
|
|
from . import _shared as sh
|
|
|
|
Cmdlet = sh.Cmdlet
|
|
CmdletArg = sh.CmdletArg
|
|
SharedArgs = sh.SharedArgs
|
|
parse_cmdlet_args = sh.parse_cmdlet_args
|
|
get_field = sh.get_field
|
|
from SYS import pipeline as ctx
|
|
from SYS.result_table import Table
|
|
|
|
|
|
class Get_Metadata(Cmdlet):
|
|
"""Class-based get-metadata cmdlet with self-registration."""
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize get-metadata cmdlet."""
|
|
super().__init__(
|
|
name="get-metadata",
|
|
summary="Print metadata for files by hash and storage backend.",
|
|
usage='get-metadata [-query "hash:<sha256>"] [-store <backend>]',
|
|
alias=["meta"],
|
|
arg=[
|
|
SharedArgs.QUERY,
|
|
SharedArgs.STORE,
|
|
],
|
|
detail=[
|
|
"- Retrieves metadata from storage backend using file hash as identifier.",
|
|
"- Shows hash, MIME type, size, duration/pages, known url, and import timestamp.",
|
|
"- Hash and store are taken from piped result or can be overridden with -query/-store flags.",
|
|
"- All metadata is retrieved from the storage backend's database (single source of truth).",
|
|
],
|
|
exec=self.run,
|
|
)
|
|
self.register()
|
|
|
|
@staticmethod
|
|
def _extract_imported_ts(meta: Dict[str, Any]) -> Optional[int]:
|
|
"""Extract an imported timestamp from metadata if available.
|
|
|
|
Attempts to parse imported timestamp from metadata dict in multiple formats:
|
|
- Numeric Unix timestamp (int/float)
|
|
- ISO format string (e.g., "2024-01-15T10:30:00")
|
|
|
|
Args:
|
|
meta: Metadata dictionary from backend (e.g., from get_metadata())
|
|
|
|
Returns:
|
|
Unix timestamp as integer if found, None otherwise
|
|
"""
|
|
if not isinstance(meta, dict):
|
|
return None
|
|
|
|
# Prefer explicit time_imported if present
|
|
explicit = meta.get("time_imported")
|
|
if isinstance(explicit, (int, float)):
|
|
return int(explicit)
|
|
|
|
# Try parsing string timestamps
|
|
if isinstance(explicit, str):
|
|
try:
|
|
import datetime as _dt
|
|
|
|
return int(_dt.datetime.fromisoformat(explicit).timestamp())
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def _format_imported(ts: Optional[int]) -> str:
|
|
"""Format Unix timestamp as human-readable date string (UTC).
|
|
|
|
Converts Unix timestamp to YYYY-MM-DD HH:MM:SS format.
|
|
Used for displaying file import dates to users.
|
|
|
|
Args:
|
|
ts: Unix timestamp (integer) or None
|
|
|
|
Returns:
|
|
Formatted date string (e.g., "2024-01-15 10:30:00") or empty string if invalid
|
|
"""
|
|
if not ts:
|
|
return ""
|
|
try:
|
|
import datetime as _dt
|
|
|
|
return _dt.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
|
|
except Exception:
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _build_table_row(
|
|
title: str,
|
|
store: str,
|
|
path: str,
|
|
mime: str,
|
|
size_bytes: Optional[int],
|
|
dur_seconds: Optional[int],
|
|
imported_ts: Optional[int],
|
|
url: list[str],
|
|
hash_value: Optional[str],
|
|
pages: Optional[int] = None,
|
|
tag: Optional[List[str]] = None,
|
|
ext: Optional[str] = None,
|
|
) -> Dict[str,
|
|
Any]:
|
|
"""Build a normalized metadata row dict for display and piping.
|
|
|
|
Converts raw metadata fields into a standardized row format suitable for:
|
|
- Display in result tables
|
|
- Piping to downstream cmdlets
|
|
- JSON serialization
|
|
|
|
Args:
|
|
title: File or resource title
|
|
store: Backend store name (e.g., "hydrus", "local")
|
|
path: File path or resource identifier
|
|
mime: MIME type (e.g., "image/jpeg", "video/mp4")
|
|
size_bytes: File size in bytes
|
|
dur_seconds: Duration in seconds (for video/audio)
|
|
imported_ts: Unix timestamp when item was imported
|
|
url: List of known URLs associated with file
|
|
hash_value: File hash (SHA256 or other)
|
|
pages: Number of pages (for PDFs)
|
|
tag: List of tags applied to file
|
|
ext: File extension (e.g., "jpg", "mp4")
|
|
|
|
Returns:
|
|
Dictionary with normalized metadata fields and display columns
|
|
"""
|
|
size_mb = None
|
|
size_int: Optional[int] = None
|
|
if size_bytes is not None:
|
|
try:
|
|
size_int = int(size_bytes)
|
|
except Exception:
|
|
size_int = None
|
|
if isinstance(size_int, int):
|
|
try:
|
|
size_mb = int(size_int / (1024 * 1024))
|
|
except Exception:
|
|
size_mb = None
|
|
|
|
dur_int = int(dur_seconds) if isinstance(dur_seconds, (int, float)) else None
|
|
pages_int = int(pages) if isinstance(pages, (int, float)) else None
|
|
imported_label = Get_Metadata._format_imported(imported_ts)
|
|
|
|
duration_label = "Duration(s)"
|
|
duration_value = str(dur_int) if dur_int is not None else ""
|
|
if mime and mime.lower().startswith("application/pdf"):
|
|
duration_label = "Pages"
|
|
duration_value = str(pages_int) if pages_int is not None else ""
|
|
|
|
columns = [
|
|
("Title",
|
|
title or ""),
|
|
("Hash",
|
|
hash_value or ""),
|
|
("MIME",
|
|
mime or ""),
|
|
("Size(MB)",
|
|
str(size_mb) if size_mb is not None else ""),
|
|
(duration_label,
|
|
duration_value),
|
|
("Imported",
|
|
imported_label),
|
|
("Store",
|
|
store or ""),
|
|
]
|
|
|
|
return {
|
|
"title": title or path,
|
|
"path": path,
|
|
"store": store,
|
|
"mime": mime,
|
|
"ext": ext or "",
|
|
"size_bytes": size_int,
|
|
"duration_seconds": dur_int,
|
|
"pages": pages_int,
|
|
"imported_ts": imported_ts,
|
|
"imported": imported_label,
|
|
"hash": hash_value,
|
|
"url": url,
|
|
"tag": tag or [],
|
|
"columns": columns,
|
|
}
|
|
|
|
@staticmethod
|
|
def _add_table_body_row(table: Table, row: Dict[str, Any]) -> None:
|
|
"""Add a single metadata row to the result table.
|
|
|
|
Extracts column values from row dict and adds to result table using
|
|
standard column ordering (Hash, MIME, Size, Duration/Pages).
|
|
|
|
Args:
|
|
table: Result table to add row to
|
|
row: Metadata row dict (from _build_table_row)
|
|
"""
|
|
columns = row.get("columns") if isinstance(row, dict) else None
|
|
lookup: Dict[str,
|
|
Any] = {}
|
|
if isinstance(columns, list):
|
|
for col in columns:
|
|
if isinstance(col, tuple) and len(col) == 2:
|
|
label, value = col
|
|
lookup[str(label)] = value
|
|
|
|
row_obj = table.add_row()
|
|
row_obj.add_column("Hash", lookup.get("Hash", ""))
|
|
row_obj.add_column("MIME", lookup.get("MIME", ""))
|
|
row_obj.add_column("Size(MB)", lookup.get("Size(MB)", ""))
|
|
if "Duration(s)" in lookup:
|
|
row_obj.add_column("Duration(s)", lookup.get("Duration(s)", ""))
|
|
elif "Pages" in lookup:
|
|
row_obj.add_column("Pages", lookup.get("Pages", ""))
|
|
else:
|
|
row_obj.add_column("Duration(s)", "")
|
|
|
|
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
|
|
"""Execute get-metadata cmdlet - retrieve and display file metadata.
|
|
|
|
Queries a storage backend (Hydrus, local, etc.) for file metadata using hash.
|
|
Extracts tags embedded in metadata response (avoiding duplicate API calls).
|
|
Displays metadata in rich detail panel and result table.
|
|
Allows piping (@N) to other cmdlets for chaining operations.
|
|
|
|
Optimizations:
|
|
- Extracts tags from metadata response (no separate get_tag() call)
|
|
- Single HTTP request to backends per file
|
|
|
|
Args:
|
|
result: Piped input (dict with optional hash/store/title/tag fields)
|
|
args: Command line arguments ([-query "hash:..."] [-store backend])
|
|
config: Application configuration dict
|
|
|
|
Returns:
|
|
0 on success, 1 on error (no metadata found, backend unavailable, etc.)
|
|
"""
|
|
# Parse arguments
|
|
parsed = parse_cmdlet_args(args, self)
|
|
|
|
query_hash = sh.parse_single_hash_query(parsed.get("query"))
|
|
if parsed.get("query") and not query_hash:
|
|
log('No hash available - use -query "hash:<sha256>"', file=sys.stderr)
|
|
return 1
|
|
|
|
# Get hash and store from parsed args or result
|
|
file_hash = query_hash or get_field(result, "hash")
|
|
storage_source = parsed.get("store") or get_field(result, "store")
|
|
|
|
if not file_hash:
|
|
log('No hash available - use -query "hash:<sha256>"', file=sys.stderr)
|
|
return 1
|
|
|
|
if not storage_source:
|
|
log("No storage backend specified - use -store to specify", file=sys.stderr)
|
|
return 1
|
|
|
|
# Use storage backend to get metadata
|
|
try:
|
|
# Instantiate only the required backend when possible to avoid initializing all configured backends
|
|
try:
|
|
from Store.registry import get_backend_instance
|
|
backend = get_backend_instance(config, storage_source, suppress_debug=True)
|
|
except Exception:
|
|
backend = None
|
|
|
|
if backend is None:
|
|
try:
|
|
from Store import Store
|
|
storage = Store(config)
|
|
backend = storage[storage_source]
|
|
except Exception:
|
|
log(f"Storage backend '{storage_source}' not found", file=sys.stderr)
|
|
return 1
|
|
|
|
# Get metadata from backend
|
|
metadata = backend.get_metadata(file_hash)
|
|
|
|
if not metadata:
|
|
log(
|
|
f"No metadata found for hash {file_hash[:8]}... in {storage_source}",
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
# Extract title from tags if available
|
|
title = get_field(result, "title") or file_hash[:16]
|
|
|
|
# Get tags from input result
|
|
item_tags = get_field(result, "tag") or get_field(result, "tags") or []
|
|
if not isinstance(item_tags, list):
|
|
item_tags = [str(item_tags)]
|
|
else:
|
|
item_tags = [str(t) for t in item_tags]
|
|
|
|
# Extract tags from metadata response instead of making a separate get_tag() request
|
|
# This prevents duplicate API calls to Hydrus (metadata already includes tags)
|
|
metadata_tags = metadata.get("tags")
|
|
if isinstance(metadata_tags, dict):
|
|
# metadata["tags"] is {service_key: {service_data}}
|
|
for service_data in metadata_tags.values():
|
|
if isinstance(service_data, dict):
|
|
display_tags = service_data.get("display_tags", {})
|
|
if isinstance(display_tags, dict):
|
|
# display_tags is typically {status: tag_list}
|
|
for tag_list in display_tags.values():
|
|
if isinstance(tag_list, list):
|
|
for t in tag_list:
|
|
ts = str(t) if t else ""
|
|
if ts and ts not in item_tags:
|
|
item_tags.append(ts)
|
|
# Check for title tag
|
|
if not get_field(result, "title") and ts.lower().startswith("title:"):
|
|
parts = ts.split(":", 1)
|
|
if len(parts) > 1:
|
|
title = parts[1].strip()
|
|
break # Only use first status level
|
|
if any(t for t in item_tags if str(t).lower().startswith("title:")):
|
|
break # Found title tag, stop searching services
|
|
|
|
|
|
# Extract metadata fields
|
|
mime_type = metadata.get("mime") or metadata.get("ext", "")
|
|
file_ext = metadata.get("ext", "") # Extract file extension separately
|
|
file_size = metadata.get("size")
|
|
duration_seconds = metadata.get("duration")
|
|
if duration_seconds is None:
|
|
duration_seconds = metadata.get("duration_seconds")
|
|
if duration_seconds is None:
|
|
duration_seconds = metadata.get("length")
|
|
if duration_seconds is None and isinstance(metadata.get("duration_ms"),
|
|
(int,
|
|
float)):
|
|
try:
|
|
duration_seconds = float(metadata["duration_ms"]) / 1000.0
|
|
except Exception:
|
|
duration_seconds = None
|
|
|
|
if isinstance(duration_seconds, str):
|
|
s = duration_seconds.strip()
|
|
if s:
|
|
try:
|
|
duration_seconds = float(s)
|
|
except ValueError:
|
|
if ":" in s:
|
|
parts = [p.strip() for p in s.split(":") if p.strip()]
|
|
if len(parts) in {2,
|
|
3} and all(p.isdigit() for p in parts):
|
|
nums = [int(p) for p in parts]
|
|
if len(nums) == 2:
|
|
duration_seconds = float(nums[0] * 60 + nums[1])
|
|
else:
|
|
duration_seconds = float(
|
|
nums[0] * 3600 + nums[1] * 60 + nums[2]
|
|
)
|
|
else:
|
|
duration_seconds = None
|
|
pages = metadata.get("pages")
|
|
url = metadata.get("url") or []
|
|
imported_ts = self._extract_imported_ts(metadata)
|
|
|
|
# Normalize url
|
|
if isinstance(url, str):
|
|
try:
|
|
url = json.loads(url)
|
|
except (json.JSONDecodeError, TypeError):
|
|
url = []
|
|
if not isinstance(url, list):
|
|
url = []
|
|
|
|
# Build display row
|
|
row = self._build_table_row(
|
|
title=title,
|
|
store=storage_source,
|
|
path=metadata.get("path",
|
|
""),
|
|
mime=mime_type,
|
|
size_bytes=file_size,
|
|
dur_seconds=duration_seconds,
|
|
imported_ts=imported_ts,
|
|
url=url,
|
|
hash_value=file_hash,
|
|
pages=pages,
|
|
tag=item_tags,
|
|
ext=file_ext,
|
|
)
|
|
|
|
table_title = f"get-metadata: {title}" if title else "get-metadata"
|
|
table = Table(table_title
|
|
).init_command(table_title,
|
|
"get-metadata",
|
|
list(args))
|
|
self._add_table_body_row(table, row)
|
|
# Use helper to display item and make it @-selectable
|
|
from ._shared import display_and_persist_items
|
|
display_and_persist_items([row], title=table_title, subject=row)
|
|
ctx.emit(row)
|
|
return 0
|
|
|
|
except KeyError:
|
|
log(f"Storage backend '{storage_source}' not found", file=sys.stderr)
|
|
return 1
|
|
except Exception as exc:
|
|
log(f"Failed to get metadata: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
CMDLET = Get_Metadata()
|