Files
2025-11-27 10:59:01 -08:00

514 lines
22 KiB
Python

"""Export modal screen for exporting files with metadata."""
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Static, Button, Input, TextArea, Tree, Select
from textual.binding import Binding
import logging
from typing import Optional, Any
from pathlib import Path
import json
import sys
import subprocess
from datetime import datetime
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from helper.utils import format_metadata_value
from config import load_config
logger = logging.getLogger(__name__)
class ExportModal(ModalScreen):
"""Modal screen for exporting files with metadata and tags."""
BINDINGS = [
Binding("escape", "cancel", "Cancel"),
]
CSS_PATH = "export.tcss"
def __init__(self, result_data: Optional[dict] = None, hydrus_available: bool = False, debrid_available: bool = False):
"""Initialize the export modal with result data.
Args:
result_data: Dictionary containing:
- title: str - Item title
- tags: str - Comma-separated tags
- metadata: dict - File metadata (source-specific from item.metadata or local DB)
- source: str - Source identifier ('local', 'hydrus', 'debrid', etc)
- current_result: object - The full search result object
hydrus_available: bool - Whether Hydrus API is available
debrid_available: bool - Whether Debrid API is available
"""
super().__init__()
self.result_data = result_data or {}
self.hydrus_available = hydrus_available
self.debrid_available = debrid_available
self.metadata_display: Optional[Static] = None
self.tags_textarea: Optional[TextArea] = None
self.export_to_select: Optional[Select] = None
self.custom_path_input: Optional[Input] = None
self.libraries_select: Optional[Select] = None
self.size_input: Optional[Input] = None
self.format_select: Optional[Select] = None
self.file_ext: Optional[str] = None # Store the file extension for format filtering
self.file_type: Optional[str] = None # Store the file type (audio, video, image, document)
self.default_format: Optional[str] = None # Store the default format to set after mount
def _determine_file_type(self, ext: str) -> tuple[str, list]:
"""Determine file type from extension and return type and format options.
Args:
ext: File extension (e.g., '.mp3', '.mp4', '.jpg')
Returns:
Tuple of (file_type, format_options) where format_options is a list of (label, value) tuples
"""
ext_lower = ext.lower() if ext else ''
from helper.utils_constant import mime_maps
found_type = "unknown"
# Find type based on extension
for category, formats in mime_maps.items():
for fmt_key, fmt_info in formats.items():
if fmt_info.get("ext") == ext_lower:
found_type = category
break
if found_type != "unknown":
break
# Build format options for the found type
format_options = []
# If unknown, fallback to audio (matching legacy behavior)
target_type = found_type if found_type in mime_maps else "audio"
if target_type in mime_maps:
# Sort formats alphabetically
sorted_formats = sorted(mime_maps[target_type].items())
for fmt_key, fmt_info in sorted_formats:
label = fmt_key.upper()
value = fmt_key
format_options.append((label, value))
return (target_type, format_options)
def _get_library_options(self) -> list:
"""Get available library options from config.json."""
options = [("Local", "local")]
try:
# Try to load config
config_path = Path(__file__).parent.parent / "config.json"
if not config_path.exists():
return options
with open(config_path, 'r') as f:
config = json.load(f)
# Check if Hydrus is configured AND available (supports both new and old format)
from config import get_hydrus_instance
hydrus_instance = get_hydrus_instance(config, "home")
if self.hydrus_available and hydrus_instance and hydrus_instance.get("key") and hydrus_instance.get("url"):
options.append(("Hydrus Network", "hydrus"))
# Check if Debrid is configured AND available (supports both new and old format)
from config import get_debrid_api_key
debrid_api_key = get_debrid_api_key(config)
if self.debrid_available and debrid_api_key:
options.append(("Debrid", "debrid"))
except Exception as e:
logger.error(f"Error loading config for libraries: {e}")
return options
def _get_metadata_text(self) -> str:
"""Format metadata from result data in a consistent display format."""
metadata = self.result_data.get('metadata', {})
source = self.result_data.get('source', 'unknown')
logger.info(f"_get_metadata_text called - source: {source}, metadata type: {type(metadata)}, keys: {list(metadata.keys()) if metadata else 'empty'}")
if not metadata:
logger.info(f"_get_metadata_text - No metadata found, returning 'No metadata available'")
return "No metadata available"
lines = []
# Only display these specific fields in this order
display_fields = [
'duration', 'size', 'ext', 'media_type', 'time_imported', 'time_modified', 'hash'
]
# Display fields in a consistent order
for field in display_fields:
if field in metadata:
value = metadata[field]
# Skip complex types and None values
if isinstance(value, (dict, list)) or value is None:
continue
# Use central formatting rule
formatted_value = format_metadata_value(field, value)
# Format: "Field Name: value"
field_label = field.replace('_', ' ').title()
lines.append(f"{field_label}: {formatted_value}")
# If we found any fields, display them
if lines:
logger.info(f"_get_metadata_text - Returning {len(lines)} formatted metadata lines")
return "\n".join(lines)
else:
logger.info(f"_get_metadata_text - No matching fields found in metadata")
return "No metadata available"
def compose(self) -> ComposeResult:
"""Compose the export modal screen."""
with Container(id="export-container"):
yield Static("Export File with Metadata", id="export-title")
# Row 1: Three columns (Tags, Metadata, Export-To Options)
self.tags_textarea = TextArea(
text=self._format_tags(),
id="tags-area",
read_only=False,
)
yield self.tags_textarea
self.tags_textarea.border_title = "Tags"
# Metadata display instead of files tree
self.metadata_display = Static(
self._get_metadata_text(),
id="metadata-display",
)
yield self.metadata_display
self.metadata_display.border = ("solid", "dodgerblue")
# Right column: Export options
with Vertical(id="export-options"):
# Export To selector
self.export_to_select = Select(
[("0x0", "0x0"), ("Libraries", "libraries"), ("Custom Path", "path")],
id="export-to-select"
)
yield self.export_to_select
# Libraries selector (initially hidden)
library_options = self._get_library_options()
self.libraries_select = Select(
library_options,
id="libraries-select"
)
yield self.libraries_select
# Custom path input (initially hidden)
self.custom_path_input = Input(
placeholder="Enter custom export path",
id="custom-path-input"
)
yield self.custom_path_input
# Get metadata for size and format options
metadata = self.result_data.get('metadata', {})
original_size = metadata.get('size', '')
ext = metadata.get('ext', '')
# Store the extension and determine file type
self.file_ext = ext
self.file_type, format_options = self._determine_file_type(ext)
# Format size in MB for display
if original_size:
size_mb = int(original_size / (1024 * 1024)) if isinstance(original_size, (int, float)) else original_size
size_display = f"{size_mb}Mb"
else:
size_display = ""
# Size input
self.size_input = Input(
value=size_display,
placeholder="Size (can reduce)",
id="size-input",
disabled=(self.file_type == 'document') # Disable for documents - no resizing needed
)
yield self.size_input
# Determine the default format value (match current extension to format options)
default_format = None
if ext and format_options:
# Map extension to format value (e.g., .flac -> "flac", .mp3 -> "mp3", .m4a -> "m4a")
ext_lower = ext.lower().lstrip('.') # Remove leading dot if present
# Try to find matching format option
for _, value in format_options:
if value and (ext_lower == value or f".{ext_lower}" == ext or ext.endswith(f".{value}")):
default_format = value
logger.debug(f"Matched extension {ext} to format {value}")
break
# If no exact match, use first option
if not default_format and format_options:
default_format = format_options[0][1]
logger.debug(f"No format match for {ext}, using first option: {default_format}")
# Store the default format to apply after mount
self.default_format = default_format
# Format selector based on file type
self.format_select = Select(
format_options if format_options else [("No conversion", "")],
id="format-select",
disabled=not format_options # Disable if no format options (e.g., documents)
)
yield self.format_select
# Row 2: Buttons
with Horizontal(id="export-buttons"):
yield Button("Cancel", id="cancel-btn", variant="default")
yield Button("Export", id="export-btn", variant="primary")
def _format_tags(self) -> str:
"""Format tags from result data."""
tags = self.result_data.get('tags', '')
if isinstance(tags, str):
# Split by comma and rejoin with newlines
tags_list = [tag.strip() for tag in tags.split(',') if tag.strip()]
return '\n'.join(tags_list)
elif isinstance(tags, list):
return '\n'.join(tags)
return ''
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press events."""
button_id = event.button.id
if button_id == "export-btn":
self._handle_export()
elif button_id == "cancel-btn":
self.action_cancel()
def on_select_changed(self, event: Select.Changed) -> None:
"""Handle select widget changes."""
if event.control.id == "export-to-select":
# Show/hide custom path and libraries based on selection
if self.custom_path_input:
self.custom_path_input.display = (event.value == "path")
if self.libraries_select:
self.libraries_select.display = (event.value == "libraries")
elif event.control.id == "libraries-select":
# Handle library selection (no special action needed currently)
logger.debug(f"Library selected: {event.value}")
def on_mount(self) -> None:
"""Handle mount event."""
# Initially hide custom path and libraries inputs (default is "0x0")
if self.custom_path_input:
self.custom_path_input.display = False
if self.libraries_select:
self.libraries_select.display = False
# Set the default format value to show it selected instead of "Select"
if self.default_format and self.format_select:
self.format_select.value = self.default_format
logger.debug(f"Set format selector to default value: {self.default_format}")
# Refresh metadata display after mount to ensure data is loaded
if self.metadata_display:
metadata_text = self._get_metadata_text()
self.metadata_display.update(metadata_text)
logger.debug(f"Updated metadata display on mount: {bool(self.result_data.get('metadata'))}")
def _handle_export(self) -> None:
"""Handle the export action."""
try:
tags_text = self.tags_textarea.text.strip()
export_to = self.export_to_select.value if self.export_to_select else "0x0"
custom_path = self.custom_path_input.value.strip() if self.custom_path_input else ""
# Get library value - handle Select.BLANK case
library = "local" # default
if self.libraries_select and str(self.libraries_select.value) != "Select.BLANK":
library = str(self.libraries_select.value)
elif self.libraries_select and self.libraries_select:
# If value is Select.BLANK, try to get from the options
try:
# Get first available library option as fallback
options = self._get_library_options()
if options:
library = options[0][1] # Get the value part of first option tuple
except Exception:
library = "local"
size = self.size_input.value.strip() if self.size_input else ""
file_format = self.format_select.value if self.format_select else "mp4"
# Parse tags from textarea (one per line)
export_tags = set()
for line in tags_text.split('\n'):
tag = line.strip()
if tag:
export_tags.add(tag)
# For Hydrus export, filter out metadata-only tags (hash:, known_url:, relationship:)
if export_to == "libraries" and library == "hydrus":
metadata_prefixes = {'hash:', 'known_url:', 'relationship:'}
export_tags = {tag for tag in export_tags if not any(tag.lower().startswith(prefix) for prefix in metadata_prefixes)}
logger.info(f"Filtered tags for Hydrus - removed metadata tags, {len(export_tags)} tags remaining")
# Extract title and add as searchable tags if not already present
title = self.result_data.get('title', '').strip()
if title:
# Add the full title as a tag if not already present
title_tag = f"title:{title}"
if title_tag not in export_tags and not any(t.startswith('title:') for t in export_tags):
export_tags.add(title_tag)
# Extract individual words from title as searchable tags (if reasonable length)
# Skip very short words and common stop words
if len(title) < 100: # Only for reasonably short titles
stop_words = {'the', 'a', 'an', 'and', 'or', 'of', 'in', 'to', 'for', 'is', 'it', 'at', 'by', 'from', 'with', 'as', 'be', 'on', 'that', 'this', 'this'}
words = title.lower().split()
for word in words:
# Clean up word (remove punctuation)
clean_word = ''.join(c for c in word if c.isalnum())
# Only add if not a stop word and has some length
if clean_word and len(clean_word) > 2 and clean_word not in stop_words:
if clean_word not in export_tags:
export_tags.add(clean_word)
logger.info(f"Extracted {len(words)} words from title, added searchable title tags")
# Validate required fields - allow export to continue for Hydrus even with 0 actual tags
# (metadata tags will still be in the sidecar, and tags can be added later)
if not export_tags and export_to != "libraries":
logger.warning("No tags provided for export")
return
if export_to == "libraries" and not export_tags:
logger.warning("No actual tags for Hydrus export (only metadata was present)")
# Don't return - allow export to continue, file will be added to Hydrus even without tags
# Determine export path
export_path = None
if export_to == "path":
if not custom_path:
logger.warning("Custom path required but not provided")
return
export_path = custom_path
elif export_to == "libraries":
export_path = library # "local", "hydrus", "debrid"
else:
export_path = export_to # "0x0"
# Get metadata from result_data
metadata = self.result_data.get('metadata', {})
# Extract file source info from result_data (passed by hub-ui)
file_hash = self.result_data.get('file_hash')
file_url = self.result_data.get('file_url')
file_path = self.result_data.get('file_path') # For local files
source = self.result_data.get('source', 'unknown')
# Prepare export data
export_data = {
'export_to': export_to,
'export_path': export_path,
'library': library if export_to == "libraries" else None,
'tags': export_tags,
'size': size if size else None,
'format': file_format,
'metadata': metadata,
'original_data': self.result_data,
'file_hash': file_hash,
'file_url': file_url,
'file_path': file_path, # Pass file path for local files
'source': source,
}
logger.info(f"Export initiated: destination={export_path}, format={file_format}, size={size}, tags={export_tags}, source={source}, hash={file_hash}, path={file_path}")
# Dismiss the modal and return the export data
self.dismiss(export_data)
except Exception as e:
logger.error(f"Error during export: {e}", exc_info=True)
def action_cancel(self) -> None:
"""Handle cancel action."""
self.dismiss(None)
def create_notes_sidecar(file_path: Path, notes: str) -> None:
"""Create a .notes sidecar file with notes text.
Only creates file if notes are not empty.
Args:
file_path: Path to the exported file
notes: Notes text
"""
if not notes or not notes.strip():
return
notes_path = file_path.with_suffix(file_path.suffix + '.notes')
try:
with open(notes_path, 'w', encoding='utf-8') as f:
f.write(notes.strip())
logger.info(f"Created notes sidecar: {notes_path}")
except Exception as e:
logger.error(f"Failed to create notes sidecar: {e}", exc_info=True)
def determine_needs_conversion(current_ext: str, target_format: str) -> bool:
"""Determine if conversion is needed between two formats.
Args:
current_ext: Current file extension (e.g., '.flac')
target_format: Target format name (e.g., 'mp3') or NoSelection object
Returns:
True if conversion is needed, False if it's already the target format
"""
# Handle NoSelection or None
if not target_format or target_format == "" or str(target_format.__class__.__name__) == 'NoSelection':
return False # No conversion requested
# Normalize the current extension
current_ext_lower = current_ext.lower().lstrip('.')
target_format_lower = str(target_format).lower()
# Check if they match
return current_ext_lower != target_format_lower
def calculate_size_tolerance(metadata: dict, user_size_mb: Optional[str]) -> tuple[Optional[int], Optional[int]]:
"""Calculate target size with 1MB grace period.
Args:
metadata: File metadata containing 'size' in bytes
user_size_mb: User-entered size like "756Mb" or empty string
Returns:
Tuple of (target_bytes, grace_bytes) where grace_bytes is 1MB (1048576),
or (None, None) if no size specified
"""
grace_bytes = 1 * 1024 * 1024 # 1MB grace period
if not user_size_mb or not user_size_mb.strip():
return None, grace_bytes
try:
# Parse the size string (format like "756Mb")
size_str = user_size_mb.strip().lower()
if size_str.endswith('mb'):
size_str = size_str[:-2]
elif size_str.endswith('m'):
size_str = size_str[:-1]
size_mb = float(size_str)
target_bytes = int(size_mb * 1024 * 1024)
return target_bytes, grace_bytes
except (ValueError, AttributeError):
return None, grace_bytes