"""Download request modal screen for initiating new downloads. This modal allows users to specify: - URL or search query (paragraph) - Tags to apply - Source (Hydrus, local, AllDebrid, etc.) - Actions (download, screenshot) """ from textual.app import ComposeResult from textual.screen import ModalScreen from textual.containers import Container, Horizontal, Vertical, ScrollableContainer from textual.widgets import ( Static, Button, Label, Select, Checkbox, TextArea, ProgressBar, Tree, Input, ) from textual.binding import Binding from textual import work import logging from typing import Optional, Callable, Any from pathlib import Path import sys from SYS.logger import log import json # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) # Import cmdlet system to call get-tag try: from cmdlet import get as get_cmdlet except ImportError: get_cmdlet = None # Import tag processing helpers try: from SYS.metadata import expand_tag_lists, process_tags_from_string except ImportError: expand_tag_lists = None process_tags_from_string = None logger = logging.getLogger(__name__) class DownloadModal(ModalScreen): """Modal screen for initiating new download requests.""" BINDINGS = [ Binding("escape", "cancel", "Cancel"), Binding("ctrl+enter", "submit", "Submit"), ] CSS_PATH = "download.tcss" def __init__( self, on_submit: Optional[Callable[[dict], None]] = None, available_sources: Optional[list] = None, config: Optional[dict] = None, ): """Initialize the download modal. Args: on_submit: Callback function that receives download request dict available_sources: List of available source names (e.g., ['hydrus', 'local', 'alldebrid']) config: Configuration dict with download settings """ super().__init__() self.on_submit = on_submit self.available_sources = available_sources or ["hydrus", "local", "alldebrid"] self.config = config or {} # UI Component references self.paragraph_textarea: TextArea = None # type: ignore self.tags_textarea: TextArea = None # type: ignore self.source_select: Select = None # type: ignore self.files_select: Select = None # type: ignore self.download_checkbox: Checkbox = None # type: ignore self.screenshot_checkbox: Checkbox = None # type: ignore self.progress_bar: ProgressBar = None # type: ignore self.selected_files: set = set() # Track selected files # Playlist support self.playlist_tree: Tree = None # type: ignore self.playlist_input: Input = None # type: ignore self.playlist_merge_checkbox: Checkbox = None # type: ignore self.is_playlist: bool = False # Track if current URL is a playlist self.playlist_items: list = [] # Store playlist items def compose(self) -> ComposeResult: """Compose the download request modal.""" yield Vertical( # Title Static("šŸ“„ New Download Request", id="download_title"), # Main layout: Horizontal split into left and right columns Horizontal( # Left column: URL (top) and Tags (bottom) Vertical( Container( TextArea( id="paragraph_textarea", language="", show_line_numbers=True, ), id="url_container", classes="grid_container", ), Container( TextArea( id="tags_textarea", language="", show_line_numbers=True, ), id="tags_container", classes="grid_container", ), id="left_column", ), # Right column: Files/Playlist Vertical( # Formats Select (for single files) Container( Select( id="files_select", options=[], # Populated dynamically ), id="files_container", classes="grid_container", ), # Playlist Tree + Input + Merge (for playlists) Container( Vertical( Tree( "Playlist", id="playlist_tree", ), Horizontal( Input( placeholder="Track selection (e.g., 1-3, all, merge, 1 5 8)", id="playlist_input", ), Checkbox( label="Merge", id="playlist_merge_checkbox", value=False, ), id="playlist_input_row", ), ), id="playlist_container", classes="grid_container", ), id="right_column", ), id="main_layout", ), # Footer: All on one row - Checkboxes left, Source middle, Buttons right Horizontal( # Left: Checkboxes Container( Checkbox(label="Download", id="download_checkbox"), Checkbox(label="Screenshot", id="screenshot_checkbox"), id="checkbox_row", ), # Middle: Source selector Select(id="source_select", options=self._build_source_options()), # Progress bar (shown during download) ProgressBar(id="progress_bar"), # Right: Buttons Horizontal( Button("Cancel", id="cancel_btn", variant="default"), Button("Submit", id="submit_btn", variant="primary"), id="button_row", ), id="footer_layout", classes="modal_footer", ), id="download_modal", classes="modal_vertical", ) def _build_source_options(self) -> list[tuple[str, str]]: """Build source select options. Returns: List of (label, value) tuples for Select widget """ source_icons = { "hydrus": "šŸ—ƒļø Hydrus", "local": "šŸ“ Local", "alldebrid": "ā˜ļø AllDebrid", "debrid": "ā˜ļø Debrid", "soulseek": "šŸŽµ Soulseek", "libgen": "šŸ“š LibGen", } options = [] for source in self.available_sources: label = source_icons.get(source.lower(), source) options.append((label, source)) return options def on_mount(self) -> None: """Called when the modal is mounted.""" # Get references to widgets self.paragraph_textarea = self.query_one("#paragraph_textarea", TextArea) self.tags_textarea = self.query_one("#tags_textarea", TextArea) self.source_select = self.query_one("#source_select", Select) self.files_select = self.query_one("#files_select", Select) self.download_checkbox = self.query_one("#download_checkbox", Checkbox) self.screenshot_checkbox = self.query_one("#screenshot_checkbox", Checkbox) self.progress_bar = self.query_one("#progress_bar", ProgressBar) self.playlist_tree = self.query_one("#playlist_tree", Tree) self.playlist_input = self.query_one("#playlist_input", Input) self.playlist_merge_checkbox = self.query_one( "#playlist_merge_checkbox", Checkbox ) # Set default actions self.download_checkbox.value = True self.screenshot_checkbox.value = False self.playlist_merge_checkbox.value = False # Initialize PDF playlist url (set by _handle_pdf_playlist) self.pdf_url = [] self.is_pdf_playlist = False # Hide playlist by default (show format select) self._show_format_select() # Focus on tags textarea self.tags_textarea.focus() logger.debug("Download modal mounted") def action_submit(self) -> None: """Submit the download request by executing cmdlet pipeline in background.""" # Validate and get values first (on main thread) url = self.paragraph_textarea.text.strip() tags_str = self.tags_textarea.text.strip() source = self.source_select.value or "local" download_enabled = self.download_checkbox.value merge_enabled = self.playlist_merge_checkbox.value if self.is_playlist else False if not url: logger.warning("Download request missing URL") self.app.notify( "URL is required", title="Missing Input", severity="warning" ) return # Parse tags (one per line) tags = [] if tags_str: tags = [tag.strip() for tag in tags_str.split("\n") if tag.strip()] # Get playlist selection if this is a playlist playlist_selection = "" if self.is_playlist and not self.is_pdf_playlist: # Regular playlist (non-PDF) playlist_selection = self.playlist_input.value.strip() if not playlist_selection: # No selection provided - default to downloading all tracks playlist_selection = f"1-{len(self.playlist_items)}" logger.info( f"No selection provided, defaulting to all tracks: {playlist_selection}" ) elif self.is_playlist and self.is_pdf_playlist: # PDF playlist - handle selection playlist_selection = self.playlist_input.value.strip() if not playlist_selection: # No selection provided - default to all PDFs playlist_selection = f"1-{len(self.playlist_items)}" logger.info( f"PDF playlist: no selection provided, defaulting to all PDFs: {playlist_selection}" ) merge_enabled = True # Always merge PDFs if multiple selected # Launch the background worker with PDF playlist info self._submit_worker( url, tags, source, download_enabled, playlist_selection, merge_enabled, is_pdf_playlist=self.is_pdf_playlist, pdf_url=self.pdf_url if self.is_pdf_playlist else [], ) @work(thread=True) def _submit_worker( self, url: str, tags: list, source: str, download_enabled: bool, playlist_selection: str = "", merge_enabled: bool = False, is_pdf_playlist: bool = False, pdf_url: Optional[list] = None, ) -> None: """Background worker to execute the cmdlet pipeline. Args: url: URL to download tags: List of tags to apply source: Source for metadata download_enabled: Whether to download the file playlist_selection: Playlist track selection (e.g., "1-3", "all", "merge") merge_enabled: Whether to merge playlist files after download is_pdf_playlist: Whether this is a PDF pseudo-playlist pdf_url: List of PDF url if is_pdf_playlist is True """ if pdf_url is None: pdf_url = [] # Initialize worker to None so outer exception handler can check it worker = None try: # Show progress bar on main thread self.app.call_from_thread(self._show_progress) logger.info( f"Building cmdlet pipeline: URL={url}, tags={len(tags)}, source={source}, download={download_enabled}, playlist_selection={playlist_selection}" ) # Create a worker instance using the app's helper method worker = None try: if hasattr(self.app, "create_worker"): worker = self.app.create_worker( "download", title=f"Download: {url[:50]}", description=f"Tags: {', '.join(tags) if tags else 'None'}", ) else: # Fallback if helper not available import uuid from SYS.worker_manager import Worker worker_id = f"dl_{uuid.uuid4().hex[:8]}" worker = Worker( worker_id, "download", f"Download: {url[:50]}", f"Tags: {', '.join(tags) if tags else 'None'}", None, ) except Exception as e: logger.error(f"Error creating worker: {e}") worker = None # Log initial step if worker: worker.log_step("Download initiated") # Handle PDF playlist specially if is_pdf_playlist and pdf_url: logger.info(f"Processing PDF playlist with {len(pdf_url)} PDFs") self._handle_pdf_playlist_download( pdf_url, tags, playlist_selection, merge_enabled ) self.app.call_from_thread(self._hide_progress) self.app.call_from_thread(self.dismiss) return # Build the cmdlet pipeline # Start with URL as initial object result_obj = self._create_url_result(url) # Import cmdlet system if not get_cmdlet: logger.error("cmdlet module not available") self.app.call_from_thread( self.app.notify, "cmdlet system unavailable", title="Error", severity="error" ) self.app.call_from_thread(self._hide_progress) return # Stage 1: Download if enabled download_succeeded = False download_stderr_text = "" # Store for merge stage if download_enabled: download_cmdlet_name = "download-media" if self.is_playlist else "download-file" download_cmdlet = get_cmdlet(download_cmdlet_name) if download_cmdlet: logger.info(f"šŸ“„ Executing {download_cmdlet_name} stage") logger.info(f"download_cmdlet object: {download_cmdlet}") logger.info(f"result_obj: {result_obj}") # Log step to worker if worker: worker.log_step(f"Starting {download_cmdlet_name} stage...") # Build arguments for download-media (yt-dlp) playlists; download-file takes no yt-dlp args. cmdlet_args = [] if download_cmdlet_name == "download-media" and self.is_playlist: # Always use yt-dlp's native --playlist-items for playlists if playlist_selection: # User provided specific selection ytdlp_selection = self._convert_selection_to_ytdlp( playlist_selection ) logger.info( f"Playlist with user selection: {playlist_selection} → {ytdlp_selection}" ) else: # No selection provided, download all ytdlp_selection = f"1-{len(self.playlist_items)}" logger.info( f"Playlist mode: downloading all {len(self.playlist_items)} items" ) cmdlet_args = ["--playlist-items", ytdlp_selection] logger.info(f"Built cmdlet_args: {cmdlet_args}") logger.info( f"About to call download_cmdlet({result_obj}, {cmdlet_args}, {type(self.config).__name__})" ) if worker: worker.append_stdout(f"šŸ“„ Downloading from: {url}\n") if cmdlet_args: worker.append_stdout(f" Args: {cmdlet_args}\n") try: # Capture output from the cmdlet using temp files (more reliable than redirect) import tempfile import subprocess # Try normal redirect first import io from contextlib import redirect_stdout, redirect_stderr stdout_buf = io.StringIO() stderr_buf = io.StringIO() # Always capture output try: with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf): logger.info(f"Calling download_cmdlet...") cmd_config = ( dict(self.config) if isinstance(self.config, dict) else self.config ) if isinstance(cmd_config, dict): cmd_config["_quiet_background_output"] = True returncode = download_cmdlet( result_obj, cmdlet_args, cmd_config ) logger.info(f"download_cmdlet returned: {returncode}") except Exception as cmdlet_error: # If cmdlet throws an exception, log it logger.error( f"āŒ download-cmdlet exception: {cmdlet_error}", exc_info=True ) if worker: import traceback worker.append_stdout( f"āŒ download-cmdlet exception: {cmdlet_error}\n{traceback.format_exc()}\n" ) returncode = 1 stdout_text = stdout_buf.getvalue() stderr_text = stderr_buf.getvalue() download_stderr_text = stderr_text # Save for merge stage # Log raw output logger.info(f"download-cmdlet returncode: {returncode}") logger.info( f"stdout ({len(stdout_text)} chars): {stdout_text[:200] if stdout_text else '(empty)'}" ) logger.info( f"stderr ({len(stderr_text)} chars): {stderr_text[:200] if stderr_text else '(empty)'}" ) # Always append output to worker for debugging if worker: if stdout_text: worker.append_stdout( f"[{download_cmdlet_name} stdout]\n{stdout_text}\n" ) if stderr_text: worker.append_stdout( f"[{download_cmdlet_name} stderr]\n{stderr_text}\n" ) # Log the output so it gets captured by WorkerLoggingHandler if stdout_text: logger.info( f"[{download_cmdlet_name} output]\n{stdout_text}" ) if stderr_text: logger.info( f"[{download_cmdlet_name} stderr]\n{stderr_text}" ) if returncode != 0: download_failed_msg = f"āŒ {download_cmdlet_name} stage failed with code {returncode}\nstdout: {stdout_text}\nstderr: {stderr_text}" logger.error(download_failed_msg) if worker: worker.append_stdout(f"\n{download_failed_msg}\n") worker.finish( "error", "Download stage failed - see logs above for details" ) # Log to stderr as well so it shows in terminal log(f"Return code: {returncode}", file=sys.stderr) log(f"stdout:\n{stdout_text}", file=sys.stderr) log(f"stderr:\n{stderr_text}", file=sys.stderr) # Extract error reason from stderr/stdout for user notification # Try to extract meaningful error from yt-dlp output error_reason = "Unknown error" # Search for yt-dlp error patterns (case-insensitive) error_text = (stderr_text + "\n" + stdout_text).lower() # Look for specific error keywords in priority order if "http error 403" in error_text or "error 403" in error_text: error_reason = "HTTP 403: Access forbidden (YouTube blocked download, may be georestricted or SABR issue)" elif "http error 401" in error_text or "error 401" in error_text: error_reason = ( "HTTP 401: Authentication required (may need login credentials)" ) elif "http error 404" in error_text or "error 404" in error_text: error_reason = ( "HTTP 404: URL not found (video/content may have been deleted)" ) elif "http error" in error_text: # Extract the actual HTTP error code import re http_match = re.search( r"HTTP Error (\d{3})", stderr_text + stdout_text, re.IGNORECASE ) if http_match: error_reason = f"HTTP Error {http_match.group(1)}: Server returned an error" else: error_reason = "HTTP error from server" elif ("no such file or directory" in error_text or "file not found" in error_text): error_reason = ( "File not found (yt-dlp may not be installed or not in PATH)" ) elif "unable to download" in error_text: error_reason = "Unable to download video (network issue or content unavailable)" elif ("connection" in error_text or "timeout" in error_text or "timed out" in error_text): error_reason = "Network connection failed or timed out" elif "permission" in error_text or "access denied" in error_text: error_reason = ( "Permission denied (may need elevated privileges or login)" ) elif "private video" in error_text or "private" in error_text: error_reason = "Video is private (not accessible)" elif "age restricted" in error_text or "age gate" in error_text: error_reason = "Video is age-restricted and requires login" elif "region restricted" in error_text or "georestrict" in error_text: error_reason = ( "Video is region-restricted (not available in your country)" ) elif "member-only" in error_text or "members only" in error_text: error_reason = "Video is available to members only" # If still unknown, try to extract last line of stderr as it often contains the actual error if error_reason == "Unknown error": stderr_lines = [ line.strip() for line in stderr_text.split("\n") if line.strip() ] if stderr_lines: # Look for error-like lines (usually contain "error", "failed", "ERROR", etc) for line in reversed(stderr_lines): if any(keyword in line.lower() for keyword in [ "error", "failed", "exception", "traceback", "warning", ]): error_reason = line[:150 ] # Limit to 150 chars break # If no error keyword found, use the last line if error_reason == "Unknown error": error_reason = stderr_lines[-1][:150] # Log the extracted error reason for debugging logger.error(f"Extracted error reason: {error_reason}") self.app.call_from_thread( self.app.notify, f"Download failed: {error_reason}", title="Download Error", severity="error", ) # Finish worker with error status try: self.app.call_from_thread( self.app.finish_worker, worker_id, "error", f"Download failed: {error_reason}", ) except Exception: pass # Also append detailed error info to worker stdout for visibility if worker: worker.append_stdout(f"\nāŒ DOWNLOAD FAILED\n") worker.append_stdout(f"Reason: {error_reason}\n") if stderr_text and stderr_text.strip(): worker.append_stdout( f"\nFull error output:\n{stderr_text}\n" ) if stdout_text and stdout_text.strip(): worker.append_stdout( f"\nStandard output:\n{stdout_text}\n" ) # Don't try to tag if download failed self.app.call_from_thread(self._hide_progress) self.app.call_from_thread(self.dismiss) return else: download_succeeded = True # Always log output at INFO level so we can see what happened logger.info( f"{download_cmdlet_name} stage completed successfully" ) if stdout_text: logger.info( f"{download_cmdlet_name} stdout:\n{stdout_text}" ) if stderr_text: logger.info( f"{download_cmdlet_name} stderr:\n{stderr_text}" ) # Log step to worker if worker: worker.log_step( f"Download completed: {len(stdout_text.split('Saved to')) - 1} items downloaded" ) # For playlists with merge enabled, scan the output directory for ALL downloaded files # instead of trying to parse individual "Saved to" lines downloaded_files = [] if self.is_playlist and merge_enabled: # Get output directory from pathlib import Path from SYS.config import resolve_output_dir output_dir = resolve_output_dir(self.config) logger.info( f"Merge enabled: scanning {output_dir} for downloaded files" ) # First, try to extract filenames from download output # Look for patterns like "→ filename.mp3" from yt-dlp output extracted_files = [] for line in stdout_text.split("\n"): if "→" in line: # Extract filename from arrow marker parts = line.split("→") if len(parts) > 1: filename = parts[1].strip() if filename: full_path = output_dir / filename if full_path.exists(): extracted_files.append( str(full_path) ) logger.debug( f"Found downloaded file from output: {filename}" ) if extracted_files: downloaded_files = extracted_files logger.info( f"Found {len(downloaded_files)} downloaded files from output markers" ) else: # Fallback: List all recent mp3/m4a files in output directory if output_dir.exists(): import time current_time = time.time() recent_files = [] for f in (list(output_dir.glob("*.mp3")) + list(output_dir.glob("*.m4a")) + list(output_dir.glob("*.mp4"))): # Files modified in last 30 minutes (extended window) if current_time - f.stat().st_mtime < 1800: recent_files.append( (f, f.stat().st_mtime) ) # Sort by modification time to preserve order recent_files.sort(key=lambda x: x[1]) downloaded_files = [ str(f[0]) for f in recent_files ] logger.info( f"Found {len(downloaded_files)} recently modified files in directory (fallback)" ) if downloaded_files: logger.info( f"Found {len(downloaded_files)} files to merge" ) if downloaded_files: logger.info( f"Files to merge: {downloaded_files[:3]}... (showing first 3)" ) else: # For non-merge or non-playlist, just look for "Saved to" pattern combined_output = stdout_text + "\n" + stderr_text for line in combined_output.split("\n"): if "Saved to" in line: # Extract path after "Saved to " saved_idx = line.find("Saved to") if saved_idx != -1: path = line[saved_idx + 8:].strip() if path: downloaded_files.append(path) logger.debug( f"Found downloaded file: {path}" ) # For merge scenarios, DON'T set to first file yet - merge first, then tag # For non-merge, set to first file for tagging if downloaded_files: if not (self.is_playlist and merge_enabled): # Non-merge case: set to first file for tagging first_file = downloaded_files[0] result_obj.target = first_file result_obj.path = first_file logger.info( f"Set result target/path to first file: {first_file}" ) else: # Merge case: save all files, will set to merged file after merge logger.info( f"Merge enabled - will merge {len(downloaded_files)} files before tagging" ) download_stderr_text = ( f"DOWNLOADED_FILES:{','.join(downloaded_files)}\n" + download_stderr_text ) logger.info( f"{download_cmdlet_name} stage completed successfully" ) except Exception as e: logger.error( f"{download_cmdlet_name} execution error: {e}", exc_info=True ) self.app.call_from_thread( self.app.notify, f"Download error: {e}", title="Download Error", severity="error", ) # Finish worker with error status try: self.app.call_from_thread( self.app.finish_worker, worker_id, "error", f"Download error: {str(e)}", ) except Exception: pass self.app.call_from_thread(self._hide_progress) self.app.call_from_thread(self.dismiss) return # Stage 2: Merge files if enabled and this is a playlist (BEFORE tagging) merged_file_path = None if merge_enabled and download_succeeded and self.is_playlist: merge_cmdlet = get_cmdlet("merge-file") if merge_cmdlet: from pathlib import Path logger.info("Executing merge-file stage") # Log step to worker if worker: worker.log_step("Starting merge-file stage...") merge_args = [ "-delete", "-format", "mka", ] # Delete source files, use MKA for speed (stream copy) and chapters try: # For merge, we pass a list of result objects # The merge-file cmdlet expects objects with 'target' attribute files_to_merge = [] # Check if we have the special marker with downloaded files if download_stderr_text.startswith("DOWNLOADED_FILES:"): # Extract file list from marker files_line = download_stderr_text.split("\n")[0] if files_line.startswith("DOWNLOADED_FILES:"): files_str = files_line[len("DOWNLOADED_FILES:"):] file_list = [ f.strip() for f in files_str.split(",") if f.strip() ] logger.info( f"Found {len(file_list)} downloaded files from marker" ) # Create result objects with proper attributes for filepath in file_list: filepath_obj = Path(filepath) file_result = type( "FileResult", (), { "target": str(filepath), "path": str(filepath), "media_kind": "audio", "hash": None, "url": [], "title": filepath_obj.stem, }, )() files_to_merge.append(file_result) if files_to_merge: logger.info( f"Merging {len(files_to_merge)} files: {[f.target for f in files_to_merge]}" ) # Call merge-file with list of results import io from contextlib import redirect_stdout, redirect_stderr stdout_buf = io.StringIO() stderr_buf = io.StringIO() with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf): # Pass the list of file results to merge-file merge_returncode = merge_cmdlet( files_to_merge, merge_args, self.config ) merge_stdout = stdout_buf.getvalue() merge_stderr = stderr_buf.getvalue() # Log the merge output so it gets captured by WorkerLoggingHandler if merge_stdout: logger.info(f"[merge-file output]\n{merge_stdout}") if merge_stderr: logger.info(f"[merge-file stderr]\n{merge_stderr}") if merge_returncode != 0: logger.error( f"merge-file stage failed with code {merge_returncode}" ) logger.error(f" stderr: {merge_stderr}") self.app.call_from_thread( self.app.notify, f"Merge failed: {merge_stderr[:100] if merge_stderr else 'unknown error'}", title="Merge Error", severity="warning", ) # Don't fail entirely - files were downloaded else: logger.info("merge-file stage completed successfully") if merge_stdout: logger.info(f"merge-file stdout: {merge_stdout}") if merge_stderr: logger.info(f"merge-file stderr: {merge_stderr}") # Log step to worker if worker: worker.log_step("Merge completed successfully") # Extract merged file path from stderr # The merge-file cmdlet outputs: "[merge-file] Merged N files into: /path/to/merged.mp3" for line in merge_stderr.split("\n"): if "Merged" in line and "into:" in line: # Extract path after "into: " into_idx = line.find("into:") if into_idx != -1: merged_file_path = line[into_idx + 5:].strip() if merged_file_path: logger.info( f"Detected merged file path: {merged_file_path}" ) break # If not found in stderr, try stdout if not merged_file_path: for line in merge_stdout.split("\n"): if ("merged" in line.lower() or line.endswith(".mp3") or line.endswith(".m4a")): merged_file_path = line.strip() if (merged_file_path and not merged_file_path.startswith("[") ): logger.info( f"Detected merged file path: {merged_file_path}" ) break # If we found the merged file, update result_obj to point to it if merged_file_path: result_obj.target = merged_file_path result_obj.path = merged_file_path logger.info( f"Updated result object to point to merged file: {merged_file_path}" ) else: logger.warning( f"No files found to merge. download_stderr_text length: {len(download_stderr_text)}, content preview: {download_stderr_text[:100]}" ) except Exception as e: logger.error(f"merge-file execution error: {e}", exc_info=True) self.app.call_from_thread( self.app.notify, f"Merge error: {e}", title="Merge Error", severity="warning", ) # Don't fail entirely - files were downloaded else: logger.info("merge-file cmdlet not found") # Stage 3: Add tags (now after merge, if merge happened) # If merge succeeded, result_obj now points to merged file if tags and (download_succeeded or not download_enabled): add_tags_cmdlet = get_cmdlet("add-tags") if add_tags_cmdlet: logger.info(f"Executing add-tags stage with {len(tags)} tags") logger.info(f" Tags: {tags}") logger.info(f" Source: {source}") logger.info(f" Result path: {result_obj.path}") logger.info(f" Result hash: {result_obj.hash_hex}") # Log step to worker if worker: worker.log_step( f"Starting add-tags stage with {len(tags)} tags..." ) # Build add-tags arguments. add-tags requires a store; for downloads, default to local sidecar tagging. tag_args = ( ["-store", "local"] + [str(t) for t in tags] + ["--source", str(source)] ) logger.info(f" Tag args: {tag_args}") logger.info( f" Result object attributes: target={getattr(result_obj, 'target', 'MISSING')}, path={getattr(result_obj, 'path', 'MISSING')}, hash_hex={getattr(result_obj, 'hash_hex', 'MISSING')}" ) try: # Capture output from the cmdlet import io from contextlib import redirect_stdout, redirect_stderr stdout_buf = io.StringIO() stderr_buf = io.StringIO() with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf): returncode = add_tags_cmdlet( result_obj, tag_args, self.config ) stdout_text = stdout_buf.getvalue() stderr_text = stderr_buf.getvalue() # Log the tag output so it gets captured by WorkerLoggingHandler if stdout_text: logger.info(f"[add-tags output]\n{stdout_text}") if stderr_text: logger.info(f"[add-tags stderr]\n{stderr_text}") if returncode != 0: logger.error( f"add-tags stage failed with code {returncode}" ) logger.error(f" stdout: {stdout_text}") logger.error(f" stderr: {stderr_text}") self.app.call_from_thread( self.app.notify, f"Failed to add tags: {stderr_text[:100] if stderr_text else stdout_text[:100] if stdout_text else 'unknown error'}", title="Error", severity="error", ) # Don't dismiss on tag failure - let user retry or cancel, but hide progress self.app.call_from_thread(self._hide_progress) return else: if stdout_text: logger.debug(f"add-tags stdout: {stdout_text}") if stderr_text: logger.debug(f"add-tags stderr: {stderr_text}") logger.info("add-tags stage completed successfully") # Log step to worker if worker: worker.log_step(f"Successfully added {len(tags)} tags") except Exception as e: logger.error(f"add-tags execution error: {e}", exc_info=True) self.app.call_from_thread( self.app.notify, f"Error adding tags: {e}", title="Error", severity="error", ) self.app.call_from_thread(self._hide_progress) return else: logger.error("add-tags cmdlet not found") else: if tags and download_enabled and not download_succeeded: skip_msg = "āš ļø Skipping add-tags stage because download failed" logger.info(skip_msg) if worker: worker.append_stdout(f"\n{skip_msg}\n") worker.finish( "error", "Download stage failed - see logs above for details" ) elif tags: logger.info("No tags to add (tags list is empty)") # Success notification self.app.call_from_thread( self.app.notify, f"Download request processed: {url}", title="Success", severity="information", timeout=2, ) # Finish worker with success status if worker: worker.finish("completed", "Download completed successfully") logger.info("Download request processing complete") # Hide progress and dismiss the modal self.app.call_from_thread(self._hide_progress) self.app.call_from_thread(self.dismiss) except Exception as e: logger.error(f"Error in download submit: {e}", exc_info=True) # Ensure worker is marked as finished even on exception if worker: try: worker.finish("error", f"Download failed: {str(e)}") except Exception: pass self.app.call_from_thread(self._hide_progress) self.app.call_from_thread( self.app.notify, f"Error: {e}", title="Error", severity="error" ) def _create_url_result(self, url: str): """Create a result object from a URL for cmdlet processing.""" class URLDownloadResult: def __init__(self, u): self.target = u self.url = u self.path: str | None = None self.hash_hex: str | None = None self.media_kind = "url" return URLDownloadResult(url) def action_cancel(self) -> None: """Cancel the download request.""" self.dismiss() def on_key(self, event) -> None: """Handle key presses to implement context-sensitive Ctrl+T.""" if event.key == "ctrl+t": # Check which widget has focus focused_widget = self.app.focused if focused_widget and focused_widget.id == "paragraph_textarea": # URL textarea: scrape fresh metadata, wipe tags and source self._action_scrape_url_metadata() event.prevent_default() elif focused_widget and focused_widget.id == "tags_textarea": # Tags textarea: scrape special fields and adjectives self._action_scrape_tags() event.prevent_default() def _action_scrape_url_metadata(self) -> None: """Scrape metadata from URL(s) in URL textarea - wipes tags and source. This is triggered by Ctrl+T when URL textarea is focused. Supports single URL or multiple url (newline/comma-separated). For multiple PDF url, creates pseudo-playlist for merge workflow. """ try: text = self.paragraph_textarea.text.strip() if not text: logger.warning("No URL to scrape metadata from") return # Parse multiple url (newline or comma-separated) url = [] for line in text.split("\n"): line = line.strip() if line: # Handle comma-separated url within a line for url in line.split(","): url = url.strip() if url: url.append(url) # Check if multiple url provided if len(url) > 1: logger.info( f"Detected {len(url)} url - checking for PDF pseudo-playlist" ) # Check if all url appear to be PDFs all_pdfs = all( url.endswith(".pdf") or "pdf" in url.lower() for url in url ) if all_pdfs: logger.info(f"All url are PDFs - creating pseudo-playlist") self._handle_pdf_playlist(url) return # Single URL - proceed with normal metadata scraping url = url[0] if url else text.strip() logger.info(f"Scraping fresh metadata from: {url}") # Check if tags are already provided in textarea existing_tags = self.tags_textarea.text.strip() wipe_tags = not existing_tags # Only wipe if no tags exist # Run in background to prevent UI freezing self._scrape_metadata_worker( url, wipe_tags_and_source=wipe_tags, skip_tag_scraping=not wipe_tags ) except Exception as e: logger.error(f"Error in _action_scrape_url_metadata: {e}", exc_info=True) def _action_scrape_tags(self) -> None: """Process tags from tags textarea, expanding tag lists like {philosophy}. This is triggered by Ctrl+T when tags textarea is focused. Processes tag list references from adjective.json (e.g., {psychology}) and expands them to the full list of tags. """ try: current_tags = self.tags_textarea.text.strip() if not current_tags: logger.warning("No tags to process") return if not expand_tag_lists or not process_tags_from_string: logger.warning("tag_helpers not available") self.app.notify( "Tag processing unavailable", title="Error", severity="error", timeout=2 ) return logger.info(f"Processing tags: {current_tags[:50]}...") # Parse tags from current text tags_set = process_tags_from_string(current_tags, expand_lists=False) if not tags_set: logger.warning("No tags parsed from text") return # Expand tag list references like {psychology} expanded_tags = expand_tag_lists(tags_set) if len(expanded_tags) > len(tags_set): # Tags were expanded tags_count_added = len(expanded_tags) - len(tags_set) logger.info(f"Expanded tags: added {tags_count_added} new tags") self.app.notify( f"Expanded: {tags_count_added} new tags added from tag lists", title="Tags Expanded", severity="information", timeout=2, ) else: logger.info("No tag list expansions found") self.app.notify( "No {list} references found to expand", title="Info", severity="information", timeout=2, ) # Update textarea with expanded tags (one per line) self.tags_textarea.text = "\n".join(sorted(expanded_tags)) logger.info(f"Updated tags textarea with {len(expanded_tags)} tags") except Exception as e: logger.error(f"Error in _action_scrape_tags: {e}", exc_info=True) self.app.notify( f"Error processing tags: {e}", title="Error", severity="error" ) def _handle_pdf_playlist(self, pdf_url: list) -> None: """Handle multiple PDF url as a pseudo-playlist. Creates a playlist-like structure with PDF metadata for merge workflow. Extracts title from URL or uses default naming. Args: pdf_url: List of PDF url to process """ try: logger.info(f"Creating PDF pseudo-playlist with {len(pdf_url)} items") # Create playlist items from PDF url playlist_items = [] for idx, url in enumerate(pdf_url, 1): # Extract filename from URL for display try: # Get filename from URL path from urllib.parse import urlparse parsed = urlparse(url) filename = parsed.path.split("/")[-1] if not filename or filename.endswith(".pdf"): filename = filename or f"pdf_{idx}.pdf" # Remove .pdf extension for display title = filename.replace(".pdf", "").replace("_", " ").replace("-", " ") except Exception as e: logger.debug(f"Could not extract filename: {e}") title = f"PDF {idx}" item = { "id": str(idx - 1), # 0-based index "title": title, "duration": "", # PDFs don't have duration, leave empty "url": url, # Store the URL for later download } playlist_items.append(item) # Build minimal metadata structure for UI population metadata = { "title": f"{len(pdf_url)} PDF Documents", "tags": [], "formats": [("pdf", "pdf")], # Default format is PDF "playlist_items": playlist_items, "is_pdf_playlist": True, # Mark as PDF pseudo-playlist } # Store url for later use during merge self.pdf_url = pdf_url self.is_pdf_playlist = True # Populate the modal with metadata logger.info(f"Populating modal with {len(playlist_items)} PDF items") self._populate_from_metadata(metadata, wipe_tags_and_source=True) self.app.notify( f"Loaded {len(pdf_url)} PDFs as playlist", title="PDF Playlist", severity="information", timeout=3, ) except Exception as e: logger.error(f"Error handling PDF playlist: {e}", exc_info=True) self.app.notify( f"Error loading PDF playlist: {e}", title="Error", severity="error", timeout=3 ) def _handle_pdf_playlist_download( self, pdf_url: list, tags: list, selection: str, merge_enabled: bool ) -> None: """Download and merge PDF playlist. Args: pdf_url: List of PDF url to download tags: Tags to apply to the merged PDF selection: Selection string like "1-3" or "1,3,5" merge_enabled: Whether to merge the PDFs """ # Check if pypdf is available for merge (needed at function start) try: from pypdf import PdfWriter, PdfReader HAS_PYPDF = True except ImportError: HAS_PYPDF = False PdfWriter = None PdfReader = None try: from pathlib import Path import requests from SYS.config import resolve_output_dir # Create temporary list of playlist items for selection parsing # We need this because _parse_playlist_selection uses self.playlist_items temp_items = [] for url in pdf_url: temp_items.append({ "title": url }) self.playlist_items = temp_items # Parse selection to get which PDFs to download selected_indices = self._parse_playlist_selection(selection) if not selected_indices: # No valid selection, use all selected_indices = list(range(len(pdf_url))) selected_url = [pdf_url[i] for i in selected_indices] logger.info(f"Downloading {len(selected_url)} selected PDFs for merge") # Download PDFs to temporary directory temp_dir = Path.home() / ".downlow_temp_pdfs" temp_dir.mkdir(exist_ok=True) downloaded_files = [] for idx, url in enumerate(selected_url, 1): try: logger.info(f"Downloading PDF {idx}/{len(selected_url)}: {url}") response = requests.get(url, timeout=30) response.raise_for_status() # Generate filename from URL from urllib.parse import urlparse parsed = urlparse(url) filename = parsed.path.split("/")[-1] if not filename.endswith(".pdf"): filename = f"pdf_{idx}.pdf" pdf_path = temp_dir / filename with open(pdf_path, "wb") as f: f.write(response.content) downloaded_files.append(pdf_path) logger.info(f"Downloaded to: {pdf_path}") except Exception as e: logger.error(f"Failed to download PDF {idx}: {e}") self.app.call_from_thread( self.app.notify, f"Failed to download PDF {idx}: {e}", title="Download Error", severity="error", ) return # Merge PDFs if requested if merge_enabled and len(downloaded_files) > 1: if not HAS_PYPDF: logger.error("pypdf not available for PDF merge") self.app.call_from_thread( self.app.notify, "pypdf required for PDF merge. Install with: pip install pypdf", title="Missing Dependency", severity="error", ) return logger.info(f"Merging {len(downloaded_files)} PDFs") try: writer = PdfWriter() for pdf_file in downloaded_files: reader = PdfReader(pdf_file) for page in reader.pages: writer.add_page(page) logger.info( f"Added {len(reader.pages)} pages from {pdf_file.name}" ) # Save merged PDF to output directory output_dir = Path(resolve_output_dir(self.config)) output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / "merged_pdfs.pdf" # Make filename unique if it exists counter = 1 while output_path.exists(): output_path = output_dir / f"merged_pdfs_{counter}.pdf" counter += 1 with open(output_path, "wb") as f: writer.write(f) logger.info(f"Merged PDF saved to: {output_path}") # Tag the file if tags provided if tags and get_cmdlet: tag_cmdlet = get_cmdlet("add-tags") if tag_cmdlet: logger.info(f"Tagging merged PDF with {len(tags)} tags") # Create a result object for the PDF class PDFResult: def __init__(self, p): self.path = str(p) self.target = str(p) self.hash_hex = None result_obj = PDFResult(output_path) import io from contextlib import redirect_stdout, redirect_stderr stdout_buf = io.StringIO() stderr_buf = io.StringIO() tag_args = ["-store", "local"] + [str(t) for t in tags] with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf): tag_returncode = tag_cmdlet( result_obj, tag_args, self.config ) if tag_returncode != 0: logger.warning( f"Tag stage returned code {tag_returncode}" ) self.app.call_from_thread( self.app.notify, f"Successfully merged {len(downloaded_files)} PDFs", title="Merge Complete", severity="information", timeout=3, ) except Exception as e: logger.error(f"PDF merge error: {e}", exc_info=True) self.app.call_from_thread( self.app.notify, f"PDF merge failed: {e}", title="Merge Error", severity="error", ) else: # Save individual PDFs to output output_dir = Path(resolve_output_dir(self.config)) output_dir.mkdir(parents=True, exist_ok=True) for pdf_file in downloaded_files: output_path = output_dir / pdf_file.name # Make filename unique if it exists counter = 1 base_name = pdf_file.stem while output_path.exists(): output_path = output_dir / f"{base_name}_{counter}.pdf" counter += 1 import shutil shutil.copy2(pdf_file, output_path) logger.info(f"Saved PDF to: {output_path}") self.app.call_from_thread( self.app.notify, f"Downloaded {len(downloaded_files)} PDFs", title="Download Complete", severity="information", timeout=3, ) except Exception as e: logger.error(f"Error in PDF playlist download: {e}", exc_info=True) self.app.call_from_thread( self.app.notify, f"Error processing PDF playlist: {e}", title="Error", severity="error", ) @work(thread=True) def _scrape_metadata_worker( self, url: str, wipe_tags_and_source: bool = False, skip_tag_scraping: bool = False ) -> None: """Background worker to scrape metadata using get-tag cmdlet. Args: url: URL to scrape metadata from wipe_tags_and_source: If True, clear tags and source before populating skip_tag_scraping: If True, don't scrape tags (only title/formats) """ try: logger.info(f"Metadata worker started for: {url}") # Call get-tag cmdlet to scrape URL if not get_cmdlet: logger.error("cmdlet module not available") self.app.call_from_thread( self.app.notify, "cmdlet module not available", title="Error", severity="error" ) return # Get the get-tag cmdlet get_tag_cmdlet = get_cmdlet("get-tag") if not get_tag_cmdlet: logger.error("get-tag cmdlet not found") self.app.call_from_thread( self.app.notify, "get-tag cmdlet not found", title="Error", severity="error" ) return # Create a simple result object for the cmdlet class URLResult: def __init__(self, u): self.target = u self.hash_hex = None self.path = None result_obj = URLResult(url) # Call the cmdlet with -scrape flag (unless skipping tag scraping) import io from contextlib import redirect_stdout, redirect_stderr output_buffer = io.StringIO() error_buffer = io.StringIO() # Only scrape if not skipping tag scraping args = [] if skip_tag_scraping else ["-scrape", url] with redirect_stdout(output_buffer), redirect_stderr(error_buffer): returncode = get_tag_cmdlet(result_obj, args, {}) if returncode != 0: error_msg = error_buffer.getvalue() logger.error(f"get-tag cmdlet failed: {error_msg}") try: self.app.call_from_thread( self.app.notify, f"Failed to scrape metadata: {error_msg}", title="Error", severity="error", ) except Exception as e: logger.debug(f"Could not notify user: {e}") return # Parse the JSON output output = output_buffer.getvalue().strip() if not output: logger.warning("get-tag returned no output") try: self.app.call_from_thread( self.app.notify, "No metadata returned from get-tag", title="Error", severity="error", ) except Exception as e: logger.debug(f"Could not notify user: {e}") return # Extract the JSON line (skip debug messages that start with [get-tag]) json_line = None for line in output.split("\n"): if line.strip().startswith("{"): json_line = line.strip() break if not json_line: logger.error(f"No JSON found in get-tag output") logger.debug(f"Raw output: {output}") try: self.app.call_from_thread( self.app.notify, "No metadata found in response", title="Error", severity="error", ) except Exception as e: logger.debug(f"Could not notify user: {e}") return try: metadata_result = json.loads(json_line) except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON: {e}") logger.debug(f"JSON line: {json_line}") try: self.app.call_from_thread( self.app.notify, f"Failed to parse metadata: {e}", title="Error", severity="error", ) except Exception as ne: logger.debug(f"Could not notify user: {ne}") return # Build metadata dict in the format expected by _populate_from_metadata # If skipping tag scraping, preserve existing tags existing_tags = self.tags_textarea.text.strip( ).split("\n") if skip_tag_scraping else [] existing_tags = [tag.strip() for tag in existing_tags if tag.strip()] # Extract playlist items if present playlist_items = metadata_result.get("playlist_items", []) metadata = { "title": metadata_result.get("title", "Unknown"), "url": url, "tags": metadata_result.get("tags", []) or existing_tags, # Use existing if new are empty "formats": metadata_result.get("formats", []), "playlist_items": playlist_items, } logger.info( f"Retrieved metadata: title={metadata['title']}, tags={len(metadata['tags'])}, formats={len(metadata['formats'])}, playlist_items={len(playlist_items)}" ) # Update UI on main thread self.app.call_from_thread( self._populate_from_metadata, metadata, wipe_tags_and_source ) except Exception as e: logger.error(f"Metadata worker error: {e}", exc_info=True) try: self.app.call_from_thread( self.app.notify, f"Failed to scrape metadata: {e}", title="Error", severity="error", ) except Exception as ne: logger.debug(f"Could not notify user of error: {ne}") def _convert_selection_to_ytdlp(self, selection_str: str) -> str: """Convert playlist selection string to yt-dlp --playlist-items format. Args: selection_str: Selection string like "1-3", "all", "merge", "1,3,5-8" Can also include multiple keywords separated by spaces Returns: yt-dlp format string like "1-3,5,8" or "1-10" for all """ if not selection_str: return "" selection_str = selection_str.strip().upper() max_idx = len(self.playlist_items) # Handle keywords (all, merge, a, m) - can be space or comma separated # "ALL MERGE", "A M", "ALL,MERGE" etc all mean download all items if any(kw in selection_str.replace(",", " ").split() for kw in {"A", "ALL", "M", "MERGE"}): # User said to get all items (merge is same as all in this context) return f"1-{max_idx}" # Parse ranges like "1,3,5-8" and convert to yt-dlp format # The selection is already in 1-based format from user, keep it that way # yt-dlp expects 1-based indices try: parts = [] for part in selection_str.split(","): part = part.strip() if part: # Skip empty parts parts.append(part) return ",".join(parts) except (ValueError, AttributeError): logger.error(f"Failed to convert playlist selection: {selection_str}") return "" def _parse_playlist_selection(self, selection_str: str) -> list: """Parse playlist selection string into list of track indices (0-based). Args: selection_str: Selection string like "1-3", "all", "merge", "1,3,5-8" Returns: List of 0-based indices, or empty list if invalid """ if not selection_str: return [] selection_str = selection_str.strip().upper() max_idx = len(self.playlist_items) # Handle keywords (all, merge, a, m) - can be space or comma separated # "ALL MERGE", "A M", "ALL,MERGE" etc all mean download all items if any(kw in selection_str.replace(",", " ").split() for kw in {"A", "ALL", "M", "MERGE"}): # User said to get all items return list(range(max_idx)) # Parse ranges like "1,3,5-8" indices = set() try: for part in selection_str.split(","): part = part.strip() if "-" in part: # Range like "5-8" start_str, end_str = part.split("-", 1) start = int(start_str.strip()) - 1 # Convert to 0-based end = int(end_str.strip()) # end is inclusive in user terms for i in range(start, end): if 0 <= i < max_idx: indices.add(i) else: # Single number idx = int(part.strip()) - 1 # Convert to 0-based if 0 <= idx < max_idx: indices.add(idx) return sorted(list(indices)) except (ValueError, AttributeError): logger.error(f"Failed to parse playlist selection: {selection_str}") return [] def _execute_download_pipeline( self, result_obj: Any, tags: list, source: str, download_enabled: bool, worker=None ) -> None: """Execute the download pipeline for a single item. Args: result_obj: URL result object tags: List of tags to apply source: Source for metadata download_enabled: Whether to download the file worker: Optional Worker instance for logging """ # Import cmdlet system if not get_cmdlet: error_msg = "cmdlet module not available" logger.error(error_msg) if worker: worker.append_stdout(f"āŒ ERROR: {error_msg}\n") self.app.call_from_thread( self.app.notify, "cmdlet system unavailable", title="Error", severity="error" ) return # Stage 1: Download data if enabled if download_enabled: download_cmdlet_name = "download-file" download_cmdlet = get_cmdlet(download_cmdlet_name) if download_cmdlet: stage_msg = f"šŸ“„ Executing {download_cmdlet_name} stage" logger.info(stage_msg) if worker: worker.append_stdout(f"{stage_msg}\n") try: import io from contextlib import redirect_stdout, redirect_stderr stdout_buf = io.StringIO() stderr_buf = io.StringIO() with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf): cmd_config = ( dict(self.config) if isinstance(self.config, dict) else self.config ) if isinstance(cmd_config, dict): cmd_config["_quiet_background_output"] = True returncode = download_cmdlet(result_obj, [], cmd_config) stdout_text = stdout_buf.getvalue() stderr_text = stderr_buf.getvalue() if stdout_text: logger.debug(f"{download_cmdlet_name} stdout: {stdout_text}") if worker: worker.append_stdout(stdout_text) if stderr_text: logger.debug(f"{download_cmdlet_name} stderr: {stderr_text}") if worker: worker.append_stdout(f"āš ļø stderr: {stderr_text}\n") if returncode != 0: error_msg = f"āŒ {download_cmdlet_name} stage failed with code {returncode}\nstderr: {stderr_text}" logger.error(error_msg) if worker: worker.append_stdout(f"{error_msg}\n") self.app.call_from_thread( self.app.notify, f"Download failed: {stderr_text[:100]}", title="Download Error", severity="error", ) return else: success_msg = f"{download_cmdlet_name} completed successfully" logger.info(success_msg) if worker: worker.append_stdout(f"{success_msg}\n") except Exception as e: error_msg = f"āŒ {download_cmdlet_name} error: {e}" logger.error(error_msg, exc_info=True) if worker: worker.append_stdout( f"{error_msg}\nTraceback:\n{__import__('traceback').format_exc()}\n" ) self.app.call_from_thread( self.app.notify, str(e)[:100], title="Download Error", severity="error" ) return # Stage 2: Tag the file if tags provided if tags: tag_cmdlet = get_cmdlet("add-tags") if tag_cmdlet and result_obj.get("path"): stage_msg = f"šŸ·ļø Tagging with {len(tags)} tags" logger.info(stage_msg) if worker: worker.append_stdout(f"{stage_msg}\n") try: tag_args = tags import io from contextlib import redirect_stdout, redirect_stderr stdout_buf = io.StringIO() stderr_buf = io.StringIO() with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf): tag_returncode = tag_cmdlet(result_obj, tag_args, {}) stdout_text = stdout_buf.getvalue() stderr_text = stderr_buf.getvalue() if stdout_text: logger.debug(f"tag stdout: {stdout_text}") if worker: worker.append_stdout(stdout_text) if tag_returncode != 0: warning_msg = f"āš ļø Tag stage returned code {tag_returncode}: {stderr_text}" logger.warning(warning_msg) if worker: worker.append_stdout(f"{warning_msg}\n") else: if worker: worker.append_stdout("Tags applied successfully\n") except Exception as e: error_msg = f"āŒ Tagging error: {e}" logger.error(error_msg, exc_info=True) if worker: worker.append_stdout(f"{error_msg}\n") else: if not result_obj.get("path"): warning_msg = "āš ļø No file path in result - skipping tagging" logger.warning(warning_msg) if worker: worker.append_stdout(f"{warning_msg}\n") else: if worker: worker.append_stdout("Download complete (no tags to apply)\n") def _show_format_select(self) -> None: """Show format select (always visible for single files).""" try: files_container = self.query_one("#files_container", Container) playlist_container = self.query_one("#playlist_container", Container) # Format select always visible, playlist hidden by default files_container.styles.height = "1fr" playlist_container.styles.height = "0" self.is_playlist = False except Exception as e: logger.error(f"Error showing format select: {e}") def _show_playlist_controls(self) -> None: """Show playlist tree and input alongside format select (for playlists).""" try: playlist_container = self.query_one("#playlist_container", Container) # Just make playlist visible - format select remains visible above it playlist_container.styles.height = "auto" self.is_playlist = True except Exception as e: logger.error(f"Error showing playlist controls: {e}") def _populate_playlist_tree(self, items: list) -> None: """Populate the playlist tree with track items. Args: items: List of track info dicts with 'id', 'title', 'duration', etc. """ try: self.playlist_tree.clear() self.playlist_items = items for idx, item in enumerate(items, 1): title = item.get("title", f"Track {idx}") duration = item.get("duration", "") # Format: "1. Song Title (3:45)" label = f"{idx}. {title}" if duration: label += f" ({duration})" self.playlist_tree.root.add_leaf(label) logger.info(f"Populated playlist tree with {len(items)} items") except Exception as e: logger.error(f"Error populating playlist tree: {e}") def _populate_from_metadata( self, metadata: dict, wipe_tags_and_source: bool = False ) -> None: """Populate modal fields from extracted metadata. Args: metadata: Dictionary with title, tags, formats wipe_tags_and_source: If True, clear tags and source before populating """ try: # Wipe tags and source if requested (fresh scrape from URL) if wipe_tags_and_source: self.tags_textarea.text = "" # Reset source to first available option try: # Get all options and select the first one source_options = self._build_source_options() if source_options: self.source_select.value = source_options[0][1] except Exception as e: logger.warning(f"Could not reset source select: {e}") # Populate tags - using extracted tags (one per line format) tags = metadata.get("tags", []) existing_tags = self.tags_textarea.text.strip() title = metadata.get("title", "Unknown") # Extract meaningful tags: # 1. Freeform tags (tag:value) # 2. Creator/artist metadata (creator:, artist:, channel:) # 3. Other meaningful namespaces (genre:, album:, track:, etc.) meaningful_tags = [] # Add title tag first (so user can edit it) if title and title != "Unknown": meaningful_tags.append(f"title:{title}") # Namespaces to exclude (metadata-only, not user-facing) excluded_namespaces = { "hash", # Hash values (internal) "url", # url (internal) "relationship", # Internal relationships "url", # url (internal) } # Add all other tags for tag in tags: if ":" in tag: namespace, value = tag.split(":", 1) # Skip internal/metadata namespaces if namespace.lower() not in excluded_namespaces: meaningful_tags.append(tag) else: # Tags without namespace are freeform - always include meaningful_tags.append(tag) # Build tags string (one per line) tags_str = "\n".join(meaningful_tags) if existing_tags: self.tags_textarea.text = existing_tags + "\n" + tags_str else: self.tags_textarea.text = tags_str # Check if this is a playlist playlist_items = metadata.get("playlist_items", []) formats = metadata.get("formats", []) # Always show format select (single file or default for playlist) self._show_format_select() if formats: # formats may be lists (from JSON) or tuples, convert to tuples format_tuples = [] for fmt in formats: if isinstance(fmt, (list, tuple)) and len(fmt) == 2: format_tuples.append(tuple(fmt)) if format_tuples: self.files_select.set_options(format_tuples) # Select the first format by default self.files_select.value = format_tuples[0][1] self.selected_files = {format_tuples[0][0]} # If playlist, also show the tree for track selection if playlist_items and len(playlist_items) > 0: logger.info(f"Detected playlist with {len(playlist_items)} items") self._populate_playlist_tree(playlist_items) # Show playlist tree alongside format select (height: auto to show) playlist_container = self.query_one("#playlist_container", Container) playlist_container.styles.height = "auto" # SET FLAG SO action_submit() KNOWS THIS IS A PLAYLIST self.is_playlist = True logger.info( f"Populated modal from metadata: {len(meaningful_tags)} tags, {len(playlist_items)} playlist items, {len(formats)} formats" ) # Notify user self.app.notify( f"Scraped metadata: {title}", title="Metadata Loaded", severity="information", timeout=3, ) except Exception as e: logger.error(f"Error populating metadata: {e}", exc_info=True) self.app.notify( f"Failed to populate metadata: {e}", title="Error", severity="error" ) def on_select_changed(self, event: Select.Changed) -> None: """Handle Select widget changes (format selection).""" if event.select.id == "files_select": # Update selected_files to track the chosen format value if event.value: self.selected_files = {str(event.value)} logger.debug(f"Selected format: {event.value}") def on_button_pressed(self, event) -> None: """Handle button clicks.""" if event.button.id == "submit_btn": self.action_submit() elif event.button.id == "cancel_btn": self.action_cancel() def _show_progress(self) -> None: """Show the progress bar and hide buttons.""" try: # Show progress bar by setting height self.progress_bar.styles.height = 1 self.progress_bar.update(total=100) # Hide buttons during download button_row = self.query_one("#button_row", Horizontal) button_row.display = False except Exception as e: logger.error(f"Error showing progress bar: {e}") def _hide_progress(self) -> None: """Hide the progress bar and show buttons again.""" try: # Hide progress bar by setting height to 0 self.progress_bar.styles.height = 0 # Show buttons again button_row = self.query_one("#button_row", Horizontal) button_row.display = True except Exception as e: logger.error(f"Error hiding progress bar: {e}")