Files
Medios-Macina/hydrus_health_check.py

543 lines
19 KiB
Python
Raw Permalink Normal View History

2025-11-25 20:09:33 -08:00
"""Hydrus API health check and initialization.
Provides startup health checks for Hydrus API availability and gracefully
disables Hydrus features if the API is unavailable.
"""
import logging
import sys
2025-11-27 10:59:01 -08:00
from helper.logger import log, debug
2025-11-25 20:09:33 -08:00
from typing import Tuple, Optional, Dict, Any
from pathlib import Path
logger = logging.getLogger(__name__)
# Global state for Hydrus availability
_HYDRUS_AVAILABLE: Optional[bool] = None
_HYDRUS_UNAVAILABLE_REASON: Optional[str] = None
_HYDRUS_CHECK_COMPLETE = False
# Global state for Debrid availability
_DEBRID_AVAILABLE: Optional[bool] = None
_DEBRID_UNAVAILABLE_REASON: Optional[str] = None
_DEBRID_CHECK_COMPLETE = False
# Global state for MPV availability
_MPV_AVAILABLE: Optional[bool] = None
_MPV_UNAVAILABLE_REASON: Optional[str] = None
_MPV_CHECK_COMPLETE = False
2025-11-27 10:59:01 -08:00
# Global state for Matrix availability
_MATRIX_AVAILABLE: Optional[bool] = None
_MATRIX_UNAVAILABLE_REASON: Optional[str] = None
_MATRIX_CHECK_COMPLETE = False
2025-11-25 20:09:33 -08:00
def check_hydrus_availability(config: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Check if Hydrus API is available by pinging it.
Args:
config: Application configuration dictionary
Returns:
Tuple of (is_available: bool, reason: Optional[str])
- (True, None) if Hydrus is available
- (False, reason) if Hydrus is unavailable with reason
"""
try:
from helper.hydrus import is_available as _is_hydrus_available
logger.info("[Hydrus Health Check] Pinging Hydrus API...")
is_available, reason = _is_hydrus_available(config, use_cache=False)
if is_available:
logger.info("[Hydrus Health Check] ✅ Hydrus API is AVAILABLE")
return True, None
else:
reason_str = f": {reason}" if reason else ""
logger.warning(f"[Hydrus Health Check] ❌ Hydrus API is UNAVAILABLE{reason_str}")
return False, reason
except Exception as e:
error_msg = str(e)
logger.error(f"[Hydrus Health Check] ❌ Error checking Hydrus availability: {error_msg}")
return False, error_msg
def initialize_hydrus_health_check(config: Dict[str, Any]) -> None:
"""Initialize Hydrus health check at startup.
This should be called once at application startup to determine if Hydrus
features should be enabled or disabled.
Args:
config: Application configuration dictionary
"""
global _HYDRUS_AVAILABLE, _HYDRUS_UNAVAILABLE_REASON, _HYDRUS_CHECK_COMPLETE
logger.info("[Startup] Starting Hydrus health check...")
try:
is_available, reason = check_hydrus_availability(config)
_HYDRUS_AVAILABLE = is_available
_HYDRUS_UNAVAILABLE_REASON = reason
_HYDRUS_CHECK_COMPLETE = True
if is_available:
2025-11-27 10:59:01 -08:00
debug("✅ Hydrus: ENABLED - All Hydrus features available", file=sys.stderr)
2025-11-25 20:09:33 -08:00
else:
2025-11-27 10:59:01 -08:00
debug(f"⚠️ Hydrus: DISABLED - {reason or 'Connection failed'}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
except Exception as e:
logger.error(f"[Startup] Failed to initialize Hydrus health check: {e}", exc_info=True)
_HYDRUS_AVAILABLE = False
_HYDRUS_UNAVAILABLE_REASON = str(e)
_HYDRUS_CHECK_COMPLETE = True
2025-11-27 10:59:01 -08:00
debug(f"⚠️ Hydrus: DISABLED - Error during health check: {e}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
def check_debrid_availability(config: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Check if Debrid API is available.
Args:
config: Application configuration dictionary
Returns:
Tuple of (is_available: bool, reason: Optional[str])
- (True, None) if Debrid API is available
- (False, reason) if Debrid API is unavailable with reason
"""
try:
from helper.http_client import HTTPClient
logger.info("[Debrid Health Check] Pinging Debrid API at https://api.alldebrid.com/v4/ping...")
try:
# Use the public ping endpoint to check API availability
# This endpoint doesn't require authentication
with HTTPClient(timeout=10.0, verify_ssl=True) as client:
response = client.get('https://api.alldebrid.com/v4/ping')
logger.debug(f"[Debrid Health Check] Response status: {response.status_code}")
# Read response text first (handles gzip decompression)
try:
response_text = response.text
logger.debug(f"[Debrid Health Check] Response text: {response_text}")
except Exception as e:
logger.error(f"[Debrid Health Check] ❌ Failed to read response text: {e}")
return False, f"Failed to read response: {e}"
# Parse JSON
try:
result = response.json()
logger.debug(f"[Debrid Health Check] Response JSON: {result}")
except Exception as e:
logger.error(f"[Debrid Health Check] ❌ Failed to parse JSON: {e}")
logger.error(f"[Debrid Health Check] Response was: {response_text}")
return False, f"Failed to parse response: {e}"
# Validate response format
if result.get('status') == 'success' and result.get('data', {}).get('ping') == 'pong':
logger.info("[Debrid Health Check] ✅ Debrid API is AVAILABLE")
return True, None
else:
logger.warning(f"[Debrid Health Check] ❌ Debrid API returned unexpected response: {result}")
return False, "Invalid API response"
except Exception as e:
error_msg = str(e)
logger.warning(f"[Debrid Health Check] ❌ Debrid API error: {error_msg}")
import traceback
logger.debug(f"[Debrid Health Check] Traceback: {traceback.format_exc()}")
return False, error_msg
except Exception as e:
error_msg = str(e)
logger.error(f"[Debrid Health Check] ❌ Error checking Debrid availability: {error_msg}")
return False, error_msg
def initialize_debrid_health_check(config: Dict[str, Any]) -> None:
"""Initialize Debrid health check at startup.
This should be called once at application startup to determine if Debrid
features should be enabled or disabled.
Args:
config: Application configuration dictionary
"""
global _DEBRID_AVAILABLE, _DEBRID_UNAVAILABLE_REASON, _DEBRID_CHECK_COMPLETE
logger.info("[Startup] Starting Debrid health check...")
try:
is_available, reason = check_debrid_availability(config)
_DEBRID_AVAILABLE = is_available
_DEBRID_UNAVAILABLE_REASON = reason
_DEBRID_CHECK_COMPLETE = True
if is_available:
2025-11-27 10:59:01 -08:00
debug("✅ Debrid: ENABLED - All Debrid features available", file=sys.stderr)
2025-11-25 20:09:33 -08:00
logger.info("[Startup] Debrid health check PASSED")
else:
2025-11-27 10:59:01 -08:00
debug(f"⚠️ Debrid: DISABLED - {reason or 'Connection failed'}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
logger.warning(f"[Startup] Debrid health check FAILED: {reason}")
except Exception as e:
logger.error(f"[Startup] Failed to initialize Debrid health check: {e}", exc_info=True)
_DEBRID_AVAILABLE = False
_DEBRID_UNAVAILABLE_REASON = str(e)
_DEBRID_CHECK_COMPLETE = True
2025-11-27 10:59:01 -08:00
debug(f"⚠️ Debrid: DISABLED - Error during health check: {e}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
def check_mpv_availability() -> Tuple[bool, Optional[str]]:
"""Check if MPV is available (installed and runnable).
Returns:
Tuple of (is_available: bool, reason: Optional[str])
"""
global _MPV_AVAILABLE, _MPV_UNAVAILABLE_REASON, _MPV_CHECK_COMPLETE
if _MPV_CHECK_COMPLETE and _MPV_AVAILABLE is not None:
return _MPV_AVAILABLE, _MPV_UNAVAILABLE_REASON
import shutil
import subprocess
logger.info("[MPV Health Check] Checking for MPV executable...")
mpv_path = shutil.which("mpv")
if not mpv_path:
_MPV_AVAILABLE = False
_MPV_UNAVAILABLE_REASON = "Executable 'mpv' not found in PATH"
_MPV_CHECK_COMPLETE = True
logger.warning(f"[MPV Health Check] ❌ MPV is UNAVAILABLE: {_MPV_UNAVAILABLE_REASON}")
return False, _MPV_UNAVAILABLE_REASON
# Try to get version to confirm it works
try:
result = subprocess.run(
[mpv_path, "--version"],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
version_line = result.stdout.split('\n')[0]
_MPV_AVAILABLE = True
_MPV_UNAVAILABLE_REASON = None
_MPV_CHECK_COMPLETE = True
logger.info(f"[MPV Health Check] ✅ MPV is AVAILABLE ({version_line})")
return True, None
else:
_MPV_AVAILABLE = False
_MPV_UNAVAILABLE_REASON = f"MPV returned non-zero exit code: {result.returncode}"
_MPV_CHECK_COMPLETE = True
logger.warning(f"[MPV Health Check] ❌ MPV is UNAVAILABLE: {_MPV_UNAVAILABLE_REASON}")
return False, _MPV_UNAVAILABLE_REASON
except Exception as e:
_MPV_AVAILABLE = False
_MPV_UNAVAILABLE_REASON = f"Error running MPV: {e}"
_MPV_CHECK_COMPLETE = True
logger.warning(f"[MPV Health Check] ❌ MPV is UNAVAILABLE: {_MPV_UNAVAILABLE_REASON}")
return False, _MPV_UNAVAILABLE_REASON
def initialize_mpv_health_check() -> None:
"""Initialize MPV health check at startup.
This should be called once at application startup to determine if MPV
features should be enabled or disabled.
"""
global _MPV_AVAILABLE, _MPV_UNAVAILABLE_REASON, _MPV_CHECK_COMPLETE
logger.info("[Startup] Starting MPV health check...")
try:
is_available, reason = check_mpv_availability()
_MPV_AVAILABLE = is_available
_MPV_UNAVAILABLE_REASON = reason
_MPV_CHECK_COMPLETE = True
if is_available:
2025-11-27 10:59:01 -08:00
debug("✅ MPV: ENABLED - All MPV features available", file=sys.stderr)
2025-11-25 20:09:33 -08:00
logger.info("[Startup] MPV health check PASSED")
else:
2025-11-27 10:59:01 -08:00
debug(f"⚠️ MPV: DISABLED - {reason or 'Connection failed'}", file=sys.stderr)
debug("→ Hydrus features still available", file=sys.stderr)
2025-11-25 20:09:33 -08:00
logger.warning(f"[Startup] MPV health check FAILED: {reason}")
except Exception as e:
logger.error(f"[Startup] Failed to initialize MPV health check: {e}", exc_info=True)
_MPV_AVAILABLE = False
_MPV_UNAVAILABLE_REASON = str(e)
_MPV_CHECK_COMPLETE = True
2025-11-27 10:59:01 -08:00
debug(f"⚠️ MPV: DISABLED - Error during health check: {e}", file=sys.stderr)
def check_matrix_availability(config: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Check if Matrix homeserver is reachable and credentials are valid.
Args:
config: Application configuration dictionary
Returns:
Tuple of (is_available: bool, reason: Optional[str])
"""
try:
import requests
matrix_conf = config.get('storage', {}).get('matrix', {})
homeserver = matrix_conf.get('homeserver')
access_token = matrix_conf.get('access_token')
if not homeserver:
return False, "Not configured"
if not homeserver.startswith('http'):
homeserver = f"https://{homeserver}"
# Check versions endpoint (no auth required)
try:
resp = requests.get(f"{homeserver}/_matrix/client/versions", timeout=5)
if resp.status_code != 200:
return False, f"Homeserver returned {resp.status_code}"
except Exception as e:
return False, f"Homeserver unreachable: {e}"
# Check auth if token provided (whoami)
if access_token:
try:
headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get(f"{homeserver}/_matrix/client/v3/account/whoami", headers=headers, timeout=5)
if resp.status_code != 200:
return False, f"Authentication failed: {resp.status_code}"
except Exception as e:
return False, f"Auth check failed: {e}"
return True, None
except Exception as e:
return False, str(e)
def initialize_matrix_health_check(config: Dict[str, Any]) -> None:
"""Initialize Matrix health check at startup."""
global _MATRIX_AVAILABLE, _MATRIX_UNAVAILABLE_REASON, _MATRIX_CHECK_COMPLETE
logger.info("[Startup] Starting Matrix health check...")
try:
is_available, reason = check_matrix_availability(config)
_MATRIX_AVAILABLE = is_available
_MATRIX_UNAVAILABLE_REASON = reason
_MATRIX_CHECK_COMPLETE = True
if is_available:
debug("Matrix: ENABLED - Homeserver reachable", file=sys.stderr)
else:
if reason != "Not configured":
debug(f"Matrix: DISABLED - {reason}", file=sys.stderr)
except Exception as e:
logger.error(f"[Startup] Failed to initialize Matrix health check: {e}", exc_info=True)
_MATRIX_AVAILABLE = False
_MATRIX_UNAVAILABLE_REASON = str(e)
_MATRIX_CHECK_COMPLETE = True
2025-11-25 20:09:33 -08:00
def is_hydrus_available() -> bool:
"""Check if Hydrus is available (from cached health check).
Returns:
True if Hydrus API is available, False otherwise
"""
return _HYDRUS_AVAILABLE is True
def get_hydrus_unavailable_reason() -> Optional[str]:
"""Get the reason why Hydrus is unavailable.
Returns:
String explaining why Hydrus is unavailable, or None if available
"""
return _HYDRUS_UNAVAILABLE_REASON if not is_hydrus_available() else None
def is_hydrus_check_complete() -> bool:
"""Check if the Hydrus health check has been completed.
Returns:
True if health check has run, False if still pending
"""
return _HYDRUS_CHECK_COMPLETE
def disable_hydrus_features() -> None:
"""Manually disable all Hydrus features (for testing/fallback).
This can be called if Hydrus connectivity is lost after startup.
"""
global _HYDRUS_AVAILABLE, _HYDRUS_UNAVAILABLE_REASON
_HYDRUS_AVAILABLE = False
_HYDRUS_UNAVAILABLE_REASON = "Manually disabled or lost connection"
logger.warning("[Hydrus] Features manually disabled")
def enable_hydrus_features() -> None:
"""Manually enable Hydrus features (for testing/fallback).
This can be called if Hydrus connectivity is restored after startup.
"""
global _HYDRUS_AVAILABLE, _HYDRUS_UNAVAILABLE_REASON
_HYDRUS_AVAILABLE = True
_HYDRUS_UNAVAILABLE_REASON = None
logger.info("[Hydrus] Features manually enabled")
def is_debrid_available() -> bool:
"""Check if Debrid is available (from cached health check).
Returns:
True if Debrid API is available, False otherwise
"""
return _DEBRID_AVAILABLE is True
def get_debrid_unavailable_reason() -> Optional[str]:
"""Get the reason why Debrid is unavailable.
Returns:
String explaining why Debrid is unavailable, or None if available
"""
return _DEBRID_UNAVAILABLE_REASON if not is_debrid_available() else None
def is_debrid_check_complete() -> bool:
"""Check if the Debrid health check has been completed.
Returns:
True if health check has run, False if still pending
"""
return _DEBRID_CHECK_COMPLETE
def disable_debrid_features() -> None:
"""Manually disable all Debrid features (for testing/fallback).
This can be called if Debrid connectivity is lost after startup.
"""
global _DEBRID_AVAILABLE, _DEBRID_UNAVAILABLE_REASON
_DEBRID_AVAILABLE = False
_DEBRID_UNAVAILABLE_REASON = "Manually disabled or lost connection"
logger.warning("[Debrid] Features manually disabled")
def enable_debrid_features() -> None:
"""Manually enable Debrid features (for testing/fallback).
This can be called if Debrid connectivity is restored after startup.
"""
global _DEBRID_AVAILABLE, _DEBRID_UNAVAILABLE_REASON
_DEBRID_AVAILABLE = True
_DEBRID_UNAVAILABLE_REASON = None
logger.info("[Debrid] Features manually enabled")
def is_mpv_available() -> bool:
"""Check if MPV is available (from cached health check).
Returns:
True if MPV is available, False otherwise
"""
return _MPV_AVAILABLE is True
def get_mpv_unavailable_reason() -> Optional[str]:
"""Get the reason why MPV is unavailable.
Returns:
String explaining why MPV is unavailable, or None if available
"""
return _MPV_UNAVAILABLE_REASON if not is_mpv_available() else None
def is_mpv_check_complete() -> bool:
"""Check if the MPV health check has been completed.
Returns:
True if health check has run, False if still pending
"""
return _MPV_CHECK_COMPLETE
def disable_mpv_features() -> None:
"""Manually disable all MPV features (for testing/fallback).
This can be called if MPV connectivity is lost after startup.
"""
global _MPV_AVAILABLE, _MPV_UNAVAILABLE_REASON
_MPV_AVAILABLE = False
_MPV_UNAVAILABLE_REASON = "Manually disabled or lost connection"
logger.warning("[MPV] Features manually disabled")
def enable_mpv_features() -> None:
"""Manually enable MPV features (for testing/fallback).
This can be called if MPV connectivity is restored after startup.
"""
global _MPV_AVAILABLE, _MPV_UNAVAILABLE_REASON
_MPV_AVAILABLE = True
_MPV_UNAVAILABLE_REASON = None
logger.info("[MPV] Features manually enabled")
2025-11-27 10:59:01 -08:00
def is_matrix_available() -> bool:
"""Check if Matrix is available (from cached health check).
Returns:
True if Matrix is available, False otherwise
"""
return _MATRIX_AVAILABLE is True
def get_matrix_unavailable_reason() -> Optional[str]:
"""Get the reason why Matrix is unavailable.
Returns:
String explaining why Matrix is unavailable, or None if available
"""
return _MATRIX_UNAVAILABLE_REASON if not is_matrix_available() else None
def is_matrix_check_complete() -> bool:
"""Check if the Matrix health check has been completed.
Returns:
True if health check has run, False if still pending
"""
return _MATRIX_CHECK_COMPLETE
def disable_matrix_features() -> None:
"""Manually disable all Matrix features (for testing/fallback).
This can be called if Matrix connectivity is lost after startup.
"""
global _MATRIX_AVAILABLE, _MATRIX_UNAVAILABLE_REASON
_MATRIX_AVAILABLE = False
_MATRIX_UNAVAILABLE_REASON = "Manually disabled or lost connection"
logger.warning("[Matrix] Features manually disabled")
def enable_matrix_features() -> None:
"""Manually enable Matrix features (for testing/fallback).
This can be called if Matrix connectivity is restored after startup.
"""
global _MATRIX_AVAILABLE, _MATRIX_UNAVAILABLE_REASON
_MATRIX_AVAILABLE = True
_MATRIX_UNAVAILABLE_REASON = None
logger.info("[Matrix] Features manually enabled")