Files
Medios-Macina/TUI/modalscreen/search.py

506 lines
22 KiB
Python
Raw Normal View History

2025-11-25 20:09:33 -08:00
"""Search modal screen for OpenLibrary and Soulseek."""
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Static, Button, Input, Select, DataTable, TextArea
from textual.binding import Binding
from textual.message import Message
import logging
from typing import Optional, Any, List
from pathlib import Path
import sys
import asyncio
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import load_config
logger = logging.getLogger(__name__)
class SearchModal(ModalScreen):
"""Modal screen for searching OpenLibrary and Soulseek."""
BINDINGS = [
Binding("escape", "cancel", "Cancel"),
Binding("enter", "search_focused", "Search"),
Binding("ctrl+t", "scrape_tags", "Scrape Tags"),
]
CSS_PATH = "search.tcss"
class SearchSelected(Message):
"""Posted when user selects a search result."""
def __init__(self, result: dict) -> None:
self.result = result
super().__init__()
def __init__(self, app_instance=None):
"""Initialize the search modal.
Args:
app_instance: Reference to the main App instance for worker creation
"""
super().__init__()
self.app_instance = app_instance
self.source_select: Optional[Select] = None
self.search_input: Optional[Input] = None
self.results_table: Optional[DataTable] = None
self.tags_textarea: Optional[TextArea] = None
self.library_source_select: Optional[Select] = None
self.current_results: List[dict] = []
self.is_searching = False
self.current_worker = None # Track worker for search operations
def compose(self) -> ComposeResult:
"""Create child widgets for the search modal."""
with Vertical(id="search-container"):
yield Static("Search Books & Music", id="search-title")
with Horizontal(id="search-controls"):
# Source selector
self.source_select = Select(
[("OpenLibrary", "openlibrary"), ("Soulseek", "soulseek")],
value="openlibrary",
id="source-select"
)
yield self.source_select
# Search input
self.search_input = Input(
placeholder="Enter search query...",
id="search-input"
)
yield self.search_input
# Search button
yield Button("Search", id="search-button", variant="primary")
# Results table
self.results_table = DataTable(id="results-table")
yield self.results_table
# Two-column layout: tags on left, source/submit on right
with Horizontal(id="bottom-controls"):
# Left column: Tags textarea
with Vertical(id="tags-column"):
self.tags_textarea = TextArea(
text="",
id="result-tags-textarea",
read_only=False
)
self.tags_textarea.border_title = "Tags [Ctrl+T: Scrape]"
yield self.tags_textarea
# Right column: Library source and submit button
with Vertical(id="source-submit-column"):
# Library source selector (for OpenLibrary results)
self.library_source_select = Select(
[("Local", "local"), ("Download", "download")],
value="local",
id="library-source-select"
)
yield self.library_source_select
# Submit button
yield Button("Submit", id="submit-button", variant="primary")
# Buttons at bottom
with Horizontal(id="search-buttons"):
yield Button("Select", id="select-button", variant="primary")
yield Button("Download", id="download-button", variant="primary")
yield Button("Cancel", id="cancel-button", variant="default")
def on_mount(self) -> None:
"""Set up the table columns and focus."""
# Set up results table columns
self.results_table.add_columns(
"Title",
"Author/Artist",
"Year/Album",
"Details"
)
# Focus on search input
self.search_input.focus()
async def _search_openlibrary(self, query: str) -> List[dict]:
"""Search OpenLibrary for books."""
try:
from helper.search_provider import get_provider
logger.info(f"[search-modal] Searching OpenLibrary for: {query}")
# Get the OpenLibrary provider (now has smart search built-in)
provider = get_provider("openlibrary")
if not provider:
logger.error("[search-modal] OpenLibrary provider not available")
return []
# Search using the provider (smart search is now default)
search_results = provider.search(query, limit=20)
formatted_results = []
for result in search_results:
# Extract metadata from SearchResult.full_metadata
metadata = result.full_metadata or {}
formatted_results.append({
"title": result.title,
"author": ", ".join(metadata.get("authors", [])) if metadata.get("authors") else "Unknown",
"year": metadata.get("year", ""),
"publisher": metadata.get("publisher", ""),
"isbn": metadata.get("isbn", ""),
"oclc": metadata.get("oclc", ""),
"lccn": metadata.get("lccn", ""),
"openlibrary_id": metadata.get("olid", ""),
"pages": metadata.get("pages", ""),
"language": metadata.get("language", ""),
"source": "openlibrary",
"columns": result.columns,
"raw_data": metadata
})
logger.info(f"[search-modal] Found {len(formatted_results)} OpenLibrary results")
return formatted_results
except Exception as e:
logger.error(f"[search-modal] OpenLibrary search error: {e}", exc_info=True)
import traceback
traceback.print_exc()
return []
async def _search_soulseek(self, query: str) -> List[dict]:
"""Search Soulseek for music with automatic worker tracking."""
try:
from helper.search_provider import get_provider
# Create worker for tracking
worker = None
if self.app_instance and hasattr(self.app_instance, 'create_worker'):
worker = self.app_instance.create_worker(
'soulseek',
title=f"Soulseek Search: {query[:40]}",
description=f"Searching P2P network for music"
)
self.current_worker = worker
if worker:
worker.log_step("Connecting to Soulseek peer network...")
logger.info(f"[search-modal] Searching Soulseek for: {query}")
provider = get_provider("soulseek")
search_results = provider.search(query, limit=20)
if worker:
worker.log_step(f"Search returned {len(search_results)} results")
logger.info(f"[search-modal] Found {len(search_results)} Soulseek results")
# Format results for display
formatted_results = []
for idx, result in enumerate(search_results):
metadata = result.full_metadata or {}
artist = metadata.get('artist', '')
album = metadata.get('album', '')
title = result.title
track_num = metadata.get('track_num', '')
size_bytes = result.size_bytes or 0
# Format size as human-readable
if size_bytes > 1024 * 1024:
size_str = f"{size_bytes / (1024 * 1024):.1f} MB"
elif size_bytes > 1024:
size_str = f"{size_bytes / 1024:.1f} KB"
else:
size_str = f"{size_bytes} B"
# Build columns for display
columns = [
("#", str(idx + 1)),
("Title", title[:50] if title else "Unknown"),
("Artist", artist[:30] if artist else "(no artist)"),
("Album", album[:30] if album else ""),
]
formatted_results.append({
"title": title if title else "Unknown",
"artist": artist if artist else "(no artist)",
"album": album,
"track": track_num,
"filesize": size_str,
"bitrate": "", # Not available in Soulseek results
"source": "soulseek",
"columns": columns,
"raw_data": result.to_dict()
})
return formatted_results
except Exception as e:
logger.error(f"[search-modal] Soulseek search error: {e}")
import traceback
traceback.print_exc()
return []
async def _perform_search(self) -> None:
"""Perform the actual search based on selected source."""
if not self.search_input or not self.source_select or not self.results_table:
logger.error("[search-modal] Widgets not initialized")
return
query = self.search_input.value.strip()
if not query:
logger.warning("[search-modal] Empty search query")
return
source = self.source_select.value
# Clear existing results
self.results_table.clear()
self.current_results = []
self.is_searching = True
try:
if source == "openlibrary":
results = await self._search_openlibrary(query)
elif source == "soulseek":
results = await self._search_soulseek(query)
else:
logger.warning(f"[search-modal] Unknown source: {source}")
if self.current_worker:
self.current_worker.finish("error", "Unknown search source")
return
self.current_results = results
# Populate table with results
if results:
# Check if first result has columns field
first_result = results[0]
if "columns" in first_result and first_result["columns"]:
# Use dynamic columns from result
# Clear existing columns and rebuild based on result columns
self.results_table.clear()
# Extract column headers from first result's columns field
column_headers = [col[0] for col in first_result["columns"]]
# Remove existing columns (we'll readd them with the right headers)
# Note: This is a workaround since Textual's DataTable doesn't support dynamic column management well
# For now, we just use the dynamic column headers from the result
logger.info(f"[search-modal] Using dynamic columns: {column_headers}")
# Populate rows using the column order from results
for result in results:
if "columns" in result and result["columns"]:
# Extract values in column order
row_data = [col[1] for col in result["columns"]]
self.results_table.add_row(*row_data)
else:
# Fallback for results without columns
logger.warning(f"[search-modal] Result missing columns field: {result.get('title', 'Unknown')}")
else:
# Fallback to original hardcoded behavior if columns not available
logger.info("[search-modal] No dynamic columns found, using default formatting")
for result in results:
if source == "openlibrary":
# Format OpenLibrary results (original hardcoded)
year = str(result.get("year", ""))[:4] if result.get("year") else ""
details = f"ISBN: {result.get('isbn', '')}" if result.get('isbn') else ""
if result.get('openlibrary_id'):
details += f" | OL: {result.get('openlibrary_id')}"
row_data = [
result["title"][:60],
result["author"][:35],
year,
details[:40]
]
else: # soulseek
row_data = [
result["title"][:50],
result["artist"][:30],
result["album"][:30],
result['filesize']
]
self.results_table.add_row(*row_data)
else:
# Add a "no results" message
self.results_table.add_row("No results found", "", "", "")
# Finish worker if tracking
if self.current_worker:
self.current_worker.finish("completed", f"Found {len(results)} results")
except Exception as e:
logger.error(f"[search-modal] Search error: {e}")
if self.current_worker:
self.current_worker.finish("error", f"Search failed: {str(e)}")
finally:
self.is_searching = False
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
button_id = event.button.id
if button_id == "search-button":
# Run search asynchronously
asyncio.create_task(self._perform_search())
elif button_id == "select-button":
# Get selected row and populate tags textarea
if self.results_table and self.results_table.row_count > 0:
selected_row = self.results_table.cursor_row
if 0 <= selected_row < len(self.current_results):
result = self.current_results[selected_row]
# Populate tags textarea with result metadata
self._populate_tags_from_result(result)
else:
logger.warning("[search-modal] No results to select")
elif button_id == "download-button":
# Download the selected result
if self.current_results and self.results_table.row_count > 0:
selected_row = self.results_table.cursor_row
if 0 <= selected_row < len(self.current_results):
result = self.current_results[selected_row]
if result.get("source") == "openlibrary":
asyncio.create_task(self._download_book(result))
else:
logger.warning("[search-modal] Download only supported for OpenLibrary results")
else:
logger.warning("[search-modal] No result selected for download")
elif button_id == "submit-button":
# Submit the current result with tags and source
if self.current_results and self.results_table.row_count > 0:
selected_row = self.results_table.cursor_row
if 0 <= selected_row < len(self.current_results):
result = self.current_results[selected_row]
# Get tags from textarea
tags_text = self.tags_textarea.text if self.tags_textarea else ""
# Get library source (if OpenLibrary)
library_source = self.library_source_select.value if self.library_source_select else "local"
# Add tags and source to result
result["tags_text"] = tags_text
result["library_source"] = library_source
# Post message and dismiss
self.post_message(self.SearchSelected(result))
self.dismiss(result)
else:
logger.warning("[search-modal] No result selected for submission")
elif button_id == "cancel-button":
self.dismiss(None)
def _populate_tags_from_result(self, result: dict) -> None:
"""Populate the tags textarea from a selected result."""
if not self.tags_textarea:
return
# Format tags based on result source
if result.get("source") == "openlibrary":
# For OpenLibrary: title, author, year
title = result.get("title", "")
author = result.get("author", "")
year = result.get("year", "")
tags = []
if title:
tags.append(title)
if author:
tags.append(author)
if year:
tags.append(year)
tags_text = "\n".join(tags)
else: # soulseek
# For Soulseek: artist, album, title, track
tags = []
if result.get("artist"):
tags.append(result["artist"])
if result.get("album"):
tags.append(result["album"])
if result.get("track"):
tags.append(f"Track {result['track']}")
if result.get("title"):
tags.append(result["title"])
tags_text = "\n".join(tags)
self.tags_textarea.text = tags_text
logger.info(f"[search-modal] Populated tags textarea from result")
async def _download_book(self, result: dict) -> None:
"""Download a book from OpenLibrary using unified downloader."""
try:
from helper.unified_book_downloader import UnifiedBookDownloader
from config import load_config
logger.info(f"[search-modal] Starting download for: {result.get('title')}")
config = load_config()
downloader = UnifiedBookDownloader(config=config)
# Get download options for this book
options = downloader.get_download_options(result)
if not options['methods']:
logger.warning(f"[search-modal] No download methods available for: {result.get('title')}")
# Could show a modal dialog here
return
# For now, use the first available method (we could show a dialog to choose)
method = options['methods'][0]
logger.info(f"[search-modal] Using download method: {method.get('label')}")
# Perform the download
success, message = await downloader.download_book(method)
if success:
logger.info(f"[search-modal] Download successful: {message}")
# Could show success dialog
else:
logger.warning(f"[search-modal] Download failed: {message}")
# Could show error dialog
downloader.close()
except Exception as e:
logger.error(f"[search-modal] Download error: {e}", exc_info=True)
def action_search_focused(self) -> None:
"""Action for Enter key - only search if search input is focused."""
if self.search_input and self.search_input.has_focus and not self.is_searching:
asyncio.create_task(self._perform_search())
def action_scrape_tags(self) -> None:
"""Action for Ctrl+T - populate tags from selected result."""
if self.current_results and self.results_table and self.results_table.row_count > 0:
try:
selected_row = self.results_table.cursor_row
if 0 <= selected_row < len(self.current_results):
result = self.current_results[selected_row]
self._populate_tags_from_result(result)
logger.info(f"[search-modal] Ctrl+T: Populated tags from result at row {selected_row}")
else:
logger.warning(f"[search-modal] Ctrl+T: Invalid row index {selected_row}")
except Exception as e:
logger.error(f"[search-modal] Ctrl+T error: {e}")
else:
logger.warning("[search-modal] Ctrl+T: No results selected")
def action_cancel(self) -> None:
"""Action for Escape key - close modal."""
self.dismiss(None)
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle Enter key in search input - only trigger search here."""
if event.input.id == "search-input":
if not self.is_searching:
asyncio.create_task(self._perform_search())