"""Search modal screen for OpenLibrary and Soulseek.""" from textual.app import ComposeResult from textual.screen import ModalScreen from textual.containers import Container, Horizontal, Vertical from textual.widgets import Static, Button, Input, Select, DataTable, TextArea from textual.binding import Binding from textual.message import Message import logging from typing import Optional, Any, List from pathlib import Path import sys import asyncio # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from config import load_config from result_table import ResultTable from helper.search_provider import get_provider logger = logging.getLogger(__name__) class SearchModal(ModalScreen): """Modal screen for searching OpenLibrary and Soulseek.""" BINDINGS = [ Binding("escape", "cancel", "Cancel"), Binding("enter", "search_focused", "Search"), Binding("ctrl+t", "scrape_tags", "Scrape Tags"), ] CSS_PATH = "search.tcss" class SearchSelected(Message): """Posted when user selects a search result.""" def __init__(self, result: dict) -> None: self.result = result super().__init__() def __init__(self, app_instance=None): """Initialize the search modal. Args: app_instance: Reference to the main App instance for worker creation """ super().__init__() self.app_instance = app_instance self.source_select: Optional[Select] = None self.search_input: Optional[Input] = None self.results_table: Optional[DataTable] = None self.tags_textarea: Optional[TextArea] = None self.library_source_select: Optional[Select] = None self.current_results: List[Any] = [] # List of SearchResult objects self.current_result_table: Optional[ResultTable] = None self.is_searching = False self.current_worker = None # Track worker for search operations def compose(self) -> ComposeResult: """Create child widgets for the search modal.""" with Vertical(id="search-container"): yield Static("Search Books & Music", id="search-title") with Horizontal(id="search-controls"): # Source selector self.source_select = Select( [("OpenLibrary", "openlibrary"), ("Soulseek", "soulseek")], value="openlibrary", id="source-select" ) yield self.source_select # Search input self.search_input = Input( placeholder="Enter search query...", id="search-input" ) yield self.search_input # Search button yield Button("Search", id="search-button", variant="primary") # Results table self.results_table = DataTable(id="results-table") yield self.results_table # Two-column layout: tags on left, source/submit on right with Horizontal(id="bottom-controls"): # Left column: Tags textarea with Vertical(id="tags-column"): self.tags_textarea = TextArea( text="", id="result-tags-textarea", read_only=False ) self.tags_textarea.border_title = "Tags [Ctrl+T: Scrape]" yield self.tags_textarea # Right column: Library source and submit button with Vertical(id="source-submit-column"): # Library source selector (for OpenLibrary results) self.library_source_select = Select( [("Local", "local"), ("Download", "download")], value="local", id="library-source-select" ) yield self.library_source_select # Submit button yield Button("Submit", id="submit-button", variant="primary") # Buttons at bottom with Horizontal(id="search-buttons"): yield Button("Select", id="select-button", variant="primary") yield Button("Download", id="download-button", variant="primary") yield Button("Cancel", id="cancel-button", variant="default") def on_mount(self) -> None: """Set up the table columns and focus.""" # Set up results table columns self.results_table.add_columns( "Title", "Author/Artist", "Year/Album", "Details" ) # Focus on search input self.search_input.focus() async def _perform_search(self) -> None: """Perform the actual search based on selected source.""" if not self.search_input or not self.source_select or not self.results_table: logger.error("[search-modal] Widgets not initialized") return query = self.search_input.value.strip() if not query: logger.warning("[search-modal] Empty search query") return source = self.source_select.value # Clear existing results self.results_table.clear(columns=True) self.current_results = [] self.current_result_table = None self.is_searching = True # Create worker for tracking if self.app_instance and hasattr(self.app_instance, 'create_worker'): self.current_worker = self.app_instance.create_worker( source, title=f"{source.capitalize()} Search: {query[:40]}", description=f"Searching {source} for: {query}" ) self.current_worker.log_step(f"Connecting to {source}...") try: provider = get_provider(source) if not provider: logger.error(f"[search-modal] Provider not available: {source}") if self.current_worker: self.current_worker.finish("error", f"Provider not available: {source}") return logger.info(f"[search-modal] Searching {source} for: {query}") results = provider.search(query, limit=20) self.current_results = results if self.current_worker: self.current_worker.log_step(f"Found {len(results)} results") # Create ResultTable table = ResultTable(f"Search Results: {query}") for res in results: row = table.add_row() # Add columns from result.columns if res.columns: for name, value in res.columns: row.add_column(name, value) else: # Fallback if no columns defined row.add_column("Title", res.title) row.add_column("Target", res.target) self.current_result_table = table # Populate UI if table.rows: # Add headers headers = [col.name for col in table.rows[0].columns] self.results_table.add_columns(*headers) # Add rows for row_vals in table.to_datatable_rows(): self.results_table.add_row(*row_vals) else: self.results_table.add_columns("Message") self.results_table.add_row("No results found") # Finish worker if self.current_worker: self.current_worker.finish("completed", f"Found {len(results)} results") except Exception as e: logger.error(f"[search-modal] Search error: {e}", exc_info=True) if self.current_worker: self.current_worker.finish("error", f"Search failed: {str(e)}") finally: self.is_searching = False def on_button_pressed(self, event: Button.Pressed) -> None: """Handle button presses.""" button_id = event.button.id if button_id == "search-button": # Run search asynchronously asyncio.create_task(self._perform_search()) elif button_id == "select-button": # Get selected row and populate tags textarea if self.results_table and self.results_table.row_count > 0: selected_row = self.results_table.cursor_row if 0 <= selected_row < len(self.current_results): result = self.current_results[selected_row] # Populate tags textarea with result metadata self._populate_tags_from_result(result) else: logger.warning("[search-modal] No results to select") elif button_id == "download-button": # Download the selected result if self.current_results and self.results_table.row_count > 0: selected_row = self.results_table.cursor_row if 0 <= selected_row < len(self.current_results): result = self.current_results[selected_row] if result.get("source") == "openlibrary": asyncio.create_task(self._download_book(result)) else: logger.warning("[search-modal] Download only supported for OpenLibrary results") else: logger.warning("[search-modal] No result selected for download") elif button_id == "submit-button": # Submit the current result with tags and source if self.current_results and self.results_table.row_count > 0: selected_row = self.results_table.cursor_row if 0 <= selected_row < len(self.current_results): result = self.current_results[selected_row] # Convert to dict if needed for submission if hasattr(result, 'to_dict'): result_dict = result.to_dict() else: result_dict = result # Get tags from textarea tags_text = self.tags_textarea.text if self.tags_textarea else "" # Get library source (if OpenLibrary) library_source = self.library_source_select.value if self.library_source_select else "local" # Add tags and source to result result_dict["tags_text"] = tags_text result_dict["library_source"] = library_source # Post message and dismiss self.post_message(self.SearchSelected(result_dict)) self.dismiss(result_dict) else: logger.warning("[search-modal] No result selected for submission") elif button_id == "cancel-button": self.dismiss(None) def _populate_tags_from_result(self, result: Any) -> None: """Populate the tags textarea from a selected result.""" if not self.tags_textarea: return # Handle both SearchResult objects and dicts if hasattr(result, 'full_metadata'): metadata = result.full_metadata or {} source = result.origin title = result.title else: # Handle dict (legacy or from to_dict) if 'full_metadata' in result: metadata = result['full_metadata'] or {} elif 'raw_data' in result: metadata = result['raw_data'] or {} else: metadata = result source = result.get('origin', result.get('source', '')) title = result.get('title', '') # Format tags based on result source if source == "openlibrary": # For OpenLibrary: title, author, year author = ", ".join(metadata.get("authors", [])) if isinstance(metadata.get("authors"), list) else metadata.get("authors", "") year = str(metadata.get("year", "")) tags = [] if title: tags.append(title) if author: tags.append(author) if year: tags.append(year) tags_text = "\n".join(tags) elif source == "soulseek": # For Soulseek: artist, album, title, track tags = [] if metadata.get("artist"): tags.append(metadata["artist"]) if metadata.get("album"): tags.append(metadata["album"]) if metadata.get("track_num"): tags.append(f"Track {metadata['track_num']}") if title: tags.append(title) tags_text = "\n".join(tags) else: # Generic fallback tags = [title] tags_text = "\n".join(tags) self.tags_textarea.text = tags_text logger.info(f"[search-modal] Populated tags textarea from result") async def _download_book(self, result: Any) -> None: """Download a book from OpenLibrary using unified downloader.""" try: from helper.unified_book_downloader import UnifiedBookDownloader from config import load_config # Convert SearchResult to dict if needed if hasattr(result, 'to_dict'): result_dict = result.to_dict() # Ensure raw_data is populated for downloader if 'raw_data' not in result_dict and result.full_metadata: result_dict['raw_data'] = result.full_metadata else: result_dict = result logger.info(f"[search-modal] Starting download for: {result_dict.get('title')}") config = load_config() downloader = UnifiedBookDownloader(config=config) # Get download options for this book options = downloader.get_download_options(result_dict) if not options['methods']: logger.warning(f"[search-modal] No download methods available for: {result_dict.get('title')}") # Could show a modal dialog here return # For now, use the first available method (we could show a dialog to choose) method = options['methods'][0] logger.info(f"[search-modal] Using download method: {method.get('label')}") # Perform the download success, message = await downloader.download_book(method) if success: logger.info(f"[search-modal] Download successful: {message}") # Could show success dialog else: logger.warning(f"[search-modal] Download failed: {message}") # Could show error dialog downloader.close() except Exception as e: logger.error(f"[search-modal] Download error: {e}", exc_info=True) def action_search_focused(self) -> None: """Action for Enter key - only search if search input is focused.""" if self.search_input and self.search_input.has_focus and not self.is_searching: asyncio.create_task(self._perform_search()) def action_scrape_tags(self) -> None: """Action for Ctrl+T - populate tags from selected result.""" if self.current_results and self.results_table and self.results_table.row_count > 0: try: selected_row = self.results_table.cursor_row if 0 <= selected_row < len(self.current_results): result = self.current_results[selected_row] self._populate_tags_from_result(result) logger.info(f"[search-modal] Ctrl+T: Populated tags from result at row {selected_row}") else: logger.warning(f"[search-modal] Ctrl+T: Invalid row index {selected_row}") except Exception as e: logger.error(f"[search-modal] Ctrl+T error: {e}") else: logger.warning("[search-modal] Ctrl+T: No results selected") def action_cancel(self) -> None: """Action for Escape key - close modal.""" self.dismiss(None) def on_input_submitted(self, event: Input.Submitted) -> None: """Handle Enter key in search input - only trigger search here.""" if event.input.id == "search-input": if not self.is_searching: asyncio.create_task(self._perform_search())