Files
Medios-Macina/cmdlets/check_file_status.py

154 lines
6.0 KiB
Python
Raw Permalink Normal View History

2025-11-25 20:09:33 -08:00
from __future__ import annotations
from typing import Any, Dict, Sequence
import json
import sys
from helper.logger import log
from . import register
from helper import hydrus as hydrus_wrapper
from ._shared import Cmdlet, CmdletArg, normalize_hash
CMDLET = Cmdlet(
name="check-file-status",
summary="Check if a file is active, deleted, or corrupted in Hydrus.",
usage="check-file-status [-hash <sha256>]",
args=[
CmdletArg("-hash", description="File hash (SHA256) to check. If not provided, uses selected result."),
],
details=[
"- Shows whether file is active in Hydrus or marked as deleted",
"- Detects corrupted data (e.g., comma-separated URLs)",
"- Displays file metadata and service locations",
"- Note: Hydrus keeps deleted files for recovery. Use cleanup-corrupted for full removal.",
],
)
@register(["check-file-status", "check-status", "file-status", "status"])
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
# Help
try:
if any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in args):
log(json.dumps(CMDLET, ensure_ascii=False, indent=2))
return 0
except Exception:
pass
# Parse arguments
override_hash: str | None = None
i = 0
while i < len(args):
token = args[i]
low = str(token).lower()
if low in {"-hash", "--hash", "hash"} and i + 1 < len(args):
override_hash = str(args[i + 1]).strip()
i += 2
continue
i += 1
hash_hex = normalize_hash(override_hash) if override_hash else normalize_hash(getattr(result, "hash_hex", None))
if not hash_hex:
log("No hash provided and no result selected", file=sys.stderr)
return 1
try:
client = hydrus_wrapper.get_client(config)
except Exception as exc:
log(f"Hydrus client unavailable: {exc}", file=sys.stderr)
return 1
if client is None:
log("Hydrus client unavailable", file=sys.stderr)
return 1
try:
result_data = client.fetch_file_metadata(hashes=[hash_hex])
if not result_data.get("metadata"):
log(f"File not found: {hash_hex[:16]}...", file=sys.stderr)
return 1
file_info = result_data["metadata"][0]
# Status summary
is_deleted = file_info.get("is_deleted", False)
is_local = file_info.get("is_local", False)
is_trashed = file_info.get("is_trashed", False)
status_str = "DELETED" if is_deleted else ("TRASHED" if is_trashed else "ACTIVE")
log(f"File status: {status_str}", file=sys.stderr)
# File info
log(f"\n📄 File Information:", file=sys.stderr)
log(f" Hash: {file_info['hash'][:16]}...", file=sys.stderr)
log(f" Size: {file_info['size']:,} bytes", file=sys.stderr)
log(f" MIME: {file_info['mime']}", file=sys.stderr)
log(f" Dimensions: {file_info.get('width', '?')}x{file_info.get('height', '?')}", file=sys.stderr)
# Service status
file_services = file_info.get("file_services", {})
current_services = file_services.get("current", {})
deleted_services = file_services.get("deleted", {})
if current_services:
log(f"\n✓ In services ({len(current_services)}):", file=sys.stderr)
for service_key, service_info in current_services.items():
sname = service_info.get("name", "unknown")
stype = service_info.get("type_pretty", "unknown")
log(f" - {sname} ({stype})", file=sys.stderr)
if deleted_services:
log(f"\n✗ Deleted from services ({len(deleted_services)}):", file=sys.stderr)
for service_key, service_info in deleted_services.items():
sname = service_info.get("name", "unknown")
stype = service_info.get("type_pretty", "unknown")
time_deleted = service_info.get("time_deleted", "?")
log(f" - {sname} ({stype}) - deleted at {time_deleted}", file=sys.stderr)
# URL check
urls = file_info.get("known_urls", [])
log(f"\n🔗 URLs ({len(urls)}):", file=sys.stderr)
corrupted_count = 0
for i, url in enumerate(urls, 1):
if "," in url:
corrupted_count += 1
log(f" [{i}] ⚠️ CORRUPTED (comma-separated): {url[:50]}...", file=sys.stderr)
else:
log(f" [{i}] {url[:70]}{'...' if len(url) > 70 else ''}", file=sys.stderr)
if corrupted_count > 0:
log(f"\n⚠️ WARNING: Found {corrupted_count} corrupted URL(s)", file=sys.stderr)
# Tags
tags_dict = file_info.get("tags", {})
total_tags = 0
for service_key, service_data in tags_dict.items():
service_name = service_data.get("name", "unknown")
display_tags = service_data.get("display_tags", {}).get("0", [])
total_tags += len(display_tags)
if total_tags > 0:
log(f"\n🏷️ Tags ({total_tags}):", file=sys.stderr)
for service_key, service_data in tags_dict.items():
display_tags = service_data.get("display_tags", {}).get("0", [])
if display_tags:
service_name = service_data.get("name", "unknown")
log(f" {service_name}:", file=sys.stderr)
for tag in display_tags[:5]: # Show first 5
log(f" - {tag}", file=sys.stderr)
if len(display_tags) > 5:
log(f" ... and {len(display_tags) - 5} more", file=sys.stderr)
log("\n", file=sys.stderr)
return 0
except Exception as exc:
log(f"Error checking file status: {exc}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return 1