Files
Medios-Macina/TUI/modalscreen/download.py

1881 lines
89 KiB
Python
Raw Permalink Normal View History

2025-11-25 20:09:33 -08:00
"""Download request modal screen for initiating new downloads.
This modal allows users to specify:
- URL or search query (paragraph)
- Tags to apply
- Source (Hydrus, local, AllDebrid, etc.)
- Actions (download, screenshot)
"""
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.containers import Container, Horizontal, Vertical, ScrollableContainer
from textual.widgets import Static, Button, Label, Select, Checkbox, TextArea, ProgressBar, Tree, Input
from textual.binding import Binding
from textual import work
import logging
from typing import Optional, Callable, Any
from pathlib import Path
import sys
from helper.logger import log
import json
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
# Import cmdlets system to call get-tag
try:
from cmdlets import get as get_cmdlet
except ImportError:
get_cmdlet = None
# Import tag processing helpers
try:
from metadata import expand_tag_lists, process_tags_from_string
except ImportError:
expand_tag_lists = None
process_tags_from_string = None
logger = logging.getLogger(__name__)
class DownloadModal(ModalScreen):
"""Modal screen for initiating new download requests."""
BINDINGS = [
Binding("escape", "cancel", "Cancel"),
Binding("ctrl+enter", "submit", "Submit"),
]
CSS_PATH = "download.tcss"
def __init__(
self,
on_submit: Optional[Callable[[dict], None]] = None,
available_sources: Optional[list] = None,
config: Optional[dict] = None
):
"""Initialize the download modal.
Args:
on_submit: Callback function that receives download request dict
available_sources: List of available source names (e.g., ['hydrus', 'local', 'alldebrid'])
config: Configuration dict with download settings
"""
super().__init__()
self.on_submit = on_submit
self.available_sources = available_sources or ['hydrus', 'local', 'alldebrid']
self.config = config or {}
# UI Component references
self.paragraph_textarea: TextArea = None # type: ignore
self.tags_textarea: TextArea = None # type: ignore
self.source_select: Select = None # type: ignore
self.files_select: Select = None # type: ignore
self.download_checkbox: Checkbox = None # type: ignore
self.screenshot_checkbox: Checkbox = None # type: ignore
self.progress_bar: ProgressBar = None # type: ignore
self.selected_files: set = set() # Track selected files
# Playlist support
self.playlist_tree: Tree = None # type: ignore
self.playlist_input: Input = None # type: ignore
self.playlist_merge_checkbox: Checkbox = None # type: ignore
self.is_playlist: bool = False # Track if current URL is a playlist
self.playlist_items: list = [] # Store playlist items
def compose(self) -> ComposeResult:
"""Compose the download request modal."""
yield Vertical(
# Title
Static("📥 New Download Request", id="download_title"),
# Main layout: Horizontal split into left and right columns
Horizontal(
# Left column: URL (top) and Tags (bottom)
Vertical(
Container(
TextArea(
id="paragraph_textarea",
language="",
show_line_numbers=True,
),
id="url_container",
classes="grid_container",
),
Container(
TextArea(
id="tags_textarea",
language="",
show_line_numbers=True,
),
id="tags_container",
classes="grid_container"
),
id="left_column"
),
# Right column: Files/Playlist
Vertical(
# Formats Select (for single files)
Container(
Select(
id="files_select",
options=[], # Populated dynamically
),
id="files_container",
classes="grid_container"
),
# Playlist Tree + Input + Merge (for playlists)
Container(
Vertical(
Tree(
"Playlist",
id="playlist_tree",
),
Horizontal(
Input(
placeholder="Track selection (e.g., 1-3, all, merge, 1 5 8)",
id="playlist_input",
),
Checkbox(
label="Merge",
id="playlist_merge_checkbox",
value=False,
),
id="playlist_input_row"
),
),
id="playlist_container",
classes="grid_container"
),
id="right_column"
),
id="main_layout"
),
# Footer: All on one row - Checkboxes left, Source middle, Buttons right
Horizontal(
# Left: Checkboxes
Container(
Checkbox(label="Download", id="download_checkbox"),
Checkbox(label="Screenshot", id="screenshot_checkbox"),
id="checkbox_row"
),
# Middle: Source selector
Select(
id="source_select",
options=self._build_source_options()
),
# Progress bar (shown during download)
ProgressBar(id="progress_bar"),
# Right: Buttons
Horizontal(
Button("Cancel", id="cancel_btn", variant="default"),
Button("Submit", id="submit_btn", variant="primary"),
id="button_row"
),
id="footer_layout",
classes="modal_footer"
),
id="download_modal",
classes="modal_vertical"
)
def _build_source_options(self) -> list[tuple[str, str]]:
"""Build source select options.
Returns:
List of (label, value) tuples for Select widget
"""
source_icons = {
'hydrus': '🗃️ Hydrus',
'local': '📁 Local',
'alldebrid': '☁️ AllDebrid',
'debrid': '☁️ Debrid',
'soulseek': '🎵 Soulseek',
'libgen': '📚 LibGen',
}
options = []
for source in self.available_sources:
label = source_icons.get(source.lower(), source)
options.append((label, source))
return options
def on_mount(self) -> None:
"""Called when the modal is mounted."""
# Get references to widgets
self.paragraph_textarea = self.query_one("#paragraph_textarea", TextArea)
self.tags_textarea = self.query_one("#tags_textarea", TextArea)
self.source_select = self.query_one("#source_select", Select)
self.files_select = self.query_one("#files_select", Select)
self.download_checkbox = self.query_one("#download_checkbox", Checkbox)
self.screenshot_checkbox = self.query_one("#screenshot_checkbox", Checkbox)
self.progress_bar = self.query_one("#progress_bar", ProgressBar)
self.playlist_tree = self.query_one("#playlist_tree", Tree)
self.playlist_input = self.query_one("#playlist_input", Input)
self.playlist_merge_checkbox = self.query_one("#playlist_merge_checkbox", Checkbox)
# Set default actions
self.download_checkbox.value = True
self.screenshot_checkbox.value = False
self.playlist_merge_checkbox.value = False
# Initialize PDF playlist URLs (set by _handle_pdf_playlist)
self.pdf_urls = []
self.is_pdf_playlist = False
# Hide playlist by default (show format select)
self._show_format_select()
# Focus on tags textarea
self.tags_textarea.focus()
logger.debug("Download modal mounted")
def action_submit(self) -> None:
"""Submit the download request by executing cmdlet pipeline in background."""
# Validate and get values first (on main thread)
url = self.paragraph_textarea.text.strip()
tags_str = self.tags_textarea.text.strip()
source = self.source_select.value or 'local'
download_enabled = self.download_checkbox.value
merge_enabled = self.playlist_merge_checkbox.value if self.is_playlist else False
if not url:
logger.warning("Download request missing URL")
self.app.notify(
"URL is required",
title="Missing Input",
severity="warning"
)
return
# Parse tags (one per line)
tags = []
if tags_str:
tags = [tag.strip() for tag in tags_str.split('\n') if tag.strip()]
# Get playlist selection if this is a playlist
playlist_selection = ""
if self.is_playlist and not self.is_pdf_playlist:
# Regular playlist (non-PDF)
playlist_selection = self.playlist_input.value.strip()
if not playlist_selection:
# No selection provided - default to downloading all tracks
playlist_selection = f"1-{len(self.playlist_items)}"
logger.info(f"No selection provided, defaulting to all tracks: {playlist_selection}")
elif self.is_playlist and self.is_pdf_playlist:
# PDF playlist - handle selection
playlist_selection = self.playlist_input.value.strip()
if not playlist_selection:
# No selection provided - default to all PDFs
playlist_selection = f"1-{len(self.playlist_items)}"
logger.info(f"PDF playlist: no selection provided, defaulting to all PDFs: {playlist_selection}")
merge_enabled = True # Always merge PDFs if multiple selected
# Launch the background worker with PDF playlist info
self._submit_worker(url, tags, source, download_enabled, playlist_selection, merge_enabled,
is_pdf_playlist=self.is_pdf_playlist, pdf_urls=self.pdf_urls if self.is_pdf_playlist else [])
@work(thread=True)
def _submit_worker(self, url: str, tags: list, source: str, download_enabled: bool, playlist_selection: str = "", merge_enabled: bool = False, is_pdf_playlist: bool = False, pdf_urls: Optional[list] = None) -> None:
"""Background worker to execute the cmdlet pipeline.
Args:
url: URL to download
tags: List of tags to apply
source: Source for metadata
download_enabled: Whether to download the file
playlist_selection: Playlist track selection (e.g., "1-3", "all", "merge")
merge_enabled: Whether to merge playlist files after download
is_pdf_playlist: Whether this is a PDF pseudo-playlist
pdf_urls: List of PDF URLs if is_pdf_playlist is True
"""
if pdf_urls is None:
pdf_urls = []
# Initialize worker to None so outer exception handler can check it
worker = None
try:
# Show progress bar on main thread
self.app.call_from_thread(self._show_progress)
logger.info(f"Building cmdlet pipeline: URL={url}, tags={len(tags)}, source={source}, download={download_enabled}, playlist_selection={playlist_selection}")
# Create a worker instance using the app's helper method
worker = None
try:
if hasattr(self.app, 'create_worker'):
worker = self.app.create_worker(
'download',
title=f"Download: {url[:50]}",
description=f"Tags: {', '.join(tags) if tags else 'None'}"
)
else:
# Fallback if helper not available
import uuid
from helper.worker_manager import Worker
worker_id = f"dl_{uuid.uuid4().hex[:8]}"
worker = Worker(worker_id, "download", f"Download: {url[:50]}",
f"Tags: {', '.join(tags) if tags else 'None'}", None)
except Exception as e:
logger.error(f"Error creating worker: {e}")
worker = None
# Log initial step
if worker:
worker.log_step("Download initiated")
# Handle PDF playlist specially
if is_pdf_playlist and pdf_urls:
logger.info(f"Processing PDF playlist with {len(pdf_urls)} PDFs")
self._handle_pdf_playlist_download(pdf_urls, tags, playlist_selection, merge_enabled)
self.app.call_from_thread(self._hide_progress)
self.app.call_from_thread(self.dismiss)
return
# Build the cmdlet pipeline
# Start with URL as initial object
result_obj = self._create_url_result(url)
# Import cmdlet system
if not get_cmdlet:
logger.error("cmdlets module not available")
self.app.call_from_thread(
self.app.notify,
"Cmdlets system unavailable",
title="Error",
severity="error"
)
self.app.call_from_thread(self._hide_progress)
return
# Stage 1: Download data if enabled
download_succeeded = False
download_stderr_text = "" # Store for merge stage
if download_enabled:
download_cmdlet = get_cmdlet("download-data")
if download_cmdlet:
logger.info("📥 Executing download-data stage")
logger.info(f"download_cmdlet object: {download_cmdlet}")
logger.info(f"result_obj: {result_obj}")
# Log step to worker
if worker:
worker.log_step("Starting download-data stage...")
# Build arguments for download-data
cmdlet_args = []
if self.is_playlist:
# Always use yt-dlp's native --playlist-items for playlists
if playlist_selection:
# User provided specific selection
ytdlp_selection = self._convert_selection_to_ytdlp(playlist_selection)
logger.info(f"Playlist with user selection: {playlist_selection}{ytdlp_selection}")
else:
# No selection provided, download all
ytdlp_selection = f"1-{len(self.playlist_items)}"
logger.info(f"Playlist mode: downloading all {len(self.playlist_items)} items")
cmdlet_args = ["--playlist-items", ytdlp_selection]
logger.info(f"Built cmdlet_args: {cmdlet_args}")
logger.info(f"About to call download_cmdlet({result_obj}, {cmdlet_args}, {type(self.config).__name__})")
if worker:
worker.append_stdout(f"📥 Downloading from: {url}\n")
if cmdlet_args:
worker.append_stdout(f" Args: {cmdlet_args}\n")
try:
# Capture output from the cmdlet using temp files (more reliable than redirect)
import tempfile
import subprocess
# Try normal redirect first
import io
from contextlib import redirect_stdout, redirect_stderr
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
# Always capture output
try:
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
logger.info(f"Calling download_cmdlet...")
returncode = download_cmdlet(result_obj, cmdlet_args, self.config)
logger.info(f"download_cmdlet returned: {returncode}")
except Exception as cmdlet_error:
# If cmdlet throws an exception, log it
logger.error(f"❌ download-cmdlet exception: {cmdlet_error}", exc_info=True)
if worker:
import traceback
worker.append_stdout(f"❌ download-cmdlet exception: {cmdlet_error}\n{traceback.format_exc()}\n")
returncode = 1
stdout_text = stdout_buf.getvalue()
stderr_text = stderr_buf.getvalue()
download_stderr_text = stderr_text # Save for merge stage
# Log raw output
logger.info(f"download-cmdlet returncode: {returncode}")
logger.info(f"stdout ({len(stdout_text)} chars): {stdout_text[:200] if stdout_text else '(empty)'}")
logger.info(f"stderr ({len(stderr_text)} chars): {stderr_text[:200] if stderr_text else '(empty)'}")
# Always append output to worker for debugging
if worker:
if stdout_text:
worker.append_stdout(f"[download-data stdout]\n{stdout_text}\n")
if stderr_text:
worker.append_stdout(f"[download-data stderr]\n{stderr_text}\n")
# Log the output so it gets captured by WorkerLoggingHandler
if stdout_text:
logger.info(f"[download-data output]\n{stdout_text}")
if stderr_text:
logger.info(f"[download-data stderr]\n{stderr_text}")
if returncode != 0:
download_failed_msg = f"❌ download-data stage failed with code {returncode}\nstdout: {stdout_text}\nstderr: {stderr_text}"
logger.error(download_failed_msg)
if worker:
worker.append_stdout(f"\n{download_failed_msg}\n")
worker.finish("error", "Download stage failed - see logs above for details")
# Log to stderr as well so it shows in terminal
log(f"Return code: {returncode}", file=sys.stderr)
log(f"stdout:\n{stdout_text}", file=sys.stderr)
log(f"stderr:\n{stderr_text}", file=sys.stderr)
# Extract error reason from stderr/stdout for user notification
# Try to extract meaningful error from yt-dlp output
error_reason = "Unknown error"
# Search for yt-dlp error patterns (case-insensitive)
error_text = (stderr_text + "\n" + stdout_text).lower()
# Look for specific error keywords in priority order
if "http error 403" in error_text or "error 403" in error_text:
error_reason = "HTTP 403: Access forbidden (YouTube blocked download, may be georestricted or SABR issue)"
elif "http error 401" in error_text or "error 401" in error_text:
error_reason = "HTTP 401: Authentication required (may need login credentials)"
elif "http error 404" in error_text or "error 404" in error_text:
error_reason = "HTTP 404: URL not found (video/content may have been deleted)"
elif "http error" in error_text:
# Extract the actual HTTP error code
import re
http_match = re.search(r'HTTP Error (\d{3})', stderr_text + stdout_text, re.IGNORECASE)
if http_match:
error_reason = f"HTTP Error {http_match.group(1)}: Server returned an error"
else:
error_reason = "HTTP error from server"
elif "no such file or directory" in error_text or "file not found" in error_text:
error_reason = "File not found (yt-dlp may not be installed or not in PATH)"
elif "unable to download" in error_text:
error_reason = "Unable to download video (network issue or content unavailable)"
elif "connection" in error_text or "timeout" in error_text or "timed out" in error_text:
error_reason = "Network connection failed or timed out"
elif "permission" in error_text or "access denied" in error_text:
error_reason = "Permission denied (may need elevated privileges or login)"
elif "private video" in error_text or "private" in error_text:
error_reason = "Video is private (not accessible)"
elif "age restricted" in error_text or "age gate" in error_text:
error_reason = "Video is age-restricted and requires login"
elif "region restricted" in error_text or "georestrict" in error_text:
error_reason = "Video is region-restricted (not available in your country)"
elif "member-only" in error_text or "members only" in error_text:
error_reason = "Video is available to members only"
# If still unknown, try to extract last line of stderr as it often contains the actual error
if error_reason == "Unknown error":
stderr_lines = [line.strip() for line in stderr_text.split('\n') if line.strip()]
if stderr_lines:
# Look for error-like lines (usually contain "error", "failed", "ERROR", etc)
for line in reversed(stderr_lines):
if any(keyword in line.lower() for keyword in ["error", "failed", "exception", "traceback", "warning"]):
error_reason = line[:150] # Limit to 150 chars
break
# If no error keyword found, use the last line
if error_reason == "Unknown error":
error_reason = stderr_lines[-1][:150]
# Log the extracted error reason for debugging
logger.error(f"Extracted error reason: {error_reason}")
self.app.call_from_thread(
self.app.notify,
f"Download failed: {error_reason}",
title="Download Error",
severity="error"
)
# Finish worker with error status
try:
self.app.call_from_thread(
self.app.finish_worker,
worker_id,
"error",
f"Download failed: {error_reason}"
)
except Exception:
pass
# Also append detailed error info to worker stdout for visibility
if worker:
worker.append_stdout(f"\n❌ DOWNLOAD FAILED\n")
worker.append_stdout(f"Reason: {error_reason}\n")
if stderr_text and stderr_text.strip():
worker.append_stdout(f"\nFull error output:\n{stderr_text}\n")
if stdout_text and stdout_text.strip():
worker.append_stdout(f"\nStandard output:\n{stdout_text}\n")
# Don't try to tag if download failed
self.app.call_from_thread(self._hide_progress)
self.app.call_from_thread(self.dismiss)
return
else:
download_succeeded = True
# Always log output at INFO level so we can see what happened
logger.info(f"download-data stage completed successfully")
if stdout_text:
logger.info(f"download-data stdout:\n{stdout_text}")
if stderr_text:
logger.info(f"download-data stderr:\n{stderr_text}")
# Log step to worker
if worker:
worker.log_step(f"Download completed: {len(stdout_text.split('Saved to')) - 1} items downloaded")
# For playlists with merge enabled, scan the output directory for ALL downloaded files
# instead of trying to parse individual "Saved to" lines
downloaded_files = []
if self.is_playlist and merge_enabled:
# Get output directory
from pathlib import Path
from config import resolve_output_dir
output_dir = resolve_output_dir(self.config)
logger.info(f"Merge enabled: scanning {output_dir} for downloaded files")
# First, try to extract filenames from download output
# Look for patterns like "→ filename.mp3" from yt-dlp output
extracted_files = []
for line in stdout_text.split('\n'):
if '' in line:
# Extract filename from arrow marker
parts = line.split('')
if len(parts) > 1:
filename = parts[1].strip()
if filename:
full_path = output_dir / filename
if full_path.exists():
extracted_files.append(str(full_path))
logger.debug(f"Found downloaded file from output: {filename}")
if extracted_files:
downloaded_files = extracted_files
logger.info(f"Found {len(downloaded_files)} downloaded files from output markers")
else:
# Fallback: List all recent mp3/m4a files in output directory
if output_dir.exists():
import time
current_time = time.time()
recent_files = []
for f in list(output_dir.glob("*.mp3")) + list(output_dir.glob("*.m4a")) + list(output_dir.glob("*.mp4")):
# Files modified in last 30 minutes (extended window)
if current_time - f.stat().st_mtime < 1800:
recent_files.append((f, f.stat().st_mtime))
# Sort by modification time to preserve order
recent_files.sort(key=lambda x: x[1])
downloaded_files = [str(f[0]) for f in recent_files]
logger.info(f"Found {len(downloaded_files)} recently modified files in directory (fallback)")
if downloaded_files:
logger.info(f"Found {len(downloaded_files)} files to merge")
if downloaded_files:
logger.info(f"Files to merge: {downloaded_files[:3]}... (showing first 3)")
else:
# For non-merge or non-playlist, just look for "Saved to" pattern
combined_output = stdout_text + "\n" + stderr_text
for line in combined_output.split('\n'):
if 'Saved to' in line:
# Extract path after "Saved to "
saved_idx = line.find('Saved to')
if saved_idx != -1:
path = line[saved_idx + 8:].strip()
if path:
downloaded_files.append(path)
logger.debug(f"Found downloaded file: {path}")
# For merge scenarios, DON'T set to first file yet - merge first, then tag
# For non-merge, set to first file for tagging
if downloaded_files:
if not (self.is_playlist and merge_enabled):
# Non-merge case: set to first file for tagging
first_file = downloaded_files[0]
result_obj.target = first_file
result_obj.path = first_file
logger.info(f"Set result target/path to first file: {first_file}")
else:
# Merge case: save all files, will set to merged file after merge
logger.info(f"Merge enabled - will merge {len(downloaded_files)} files before tagging")
download_stderr_text = f"DOWNLOADED_FILES:{','.join(downloaded_files)}\n" + download_stderr_text
logger.info("download-data stage completed successfully")
except Exception as e:
logger.error(f"download-data execution error: {e}", exc_info=True)
self.app.call_from_thread(
self.app.notify,
f"Download error: {e}",
title="Download Error",
severity="error"
)
# Finish worker with error status
try:
self.app.call_from_thread(
self.app.finish_worker,
worker_id,
"error",
f"Download error: {str(e)}"
)
except Exception:
pass
self.app.call_from_thread(self._hide_progress)
self.app.call_from_thread(self.dismiss)
return
# Stage 2: Merge files if enabled and this is a playlist (BEFORE tagging)
merged_file_path = None
if merge_enabled and download_succeeded and self.is_playlist:
merge_cmdlet = get_cmdlet("merge-file")
if merge_cmdlet:
from pathlib import Path
logger.info("Executing merge-file stage")
# Log step to worker
if worker:
worker.log_step("Starting merge-file stage...")
merge_args = ["-delete", "-format", "mka"] # Delete source files, use MKA for speed (stream copy) and chapters
try:
# For merge, we pass a list of result objects
# The merge-file cmdlet expects objects with 'target' attribute
files_to_merge = []
# Check if we have the special marker with downloaded files
if download_stderr_text.startswith("DOWNLOADED_FILES:"):
# Extract file list from marker
files_line = download_stderr_text.split('\n')[0]
if files_line.startswith("DOWNLOADED_FILES:"):
files_str = files_line[len("DOWNLOADED_FILES:"):]
file_list = [f.strip() for f in files_str.split(',') if f.strip()]
logger.info(f"Found {len(file_list)} downloaded files from marker")
# Create result objects with proper attributes
for filepath in file_list:
filepath_obj = Path(filepath)
file_result = type('FileResult', (), {
'target': str(filepath),
'path': str(filepath),
'media_kind': 'audio',
'hash_hex': None,
'hash': None,
'known_urls': [],
'title': filepath_obj.stem
})()
files_to_merge.append(file_result)
if files_to_merge:
logger.info(f"Merging {len(files_to_merge)} files: {[f.target for f in files_to_merge]}")
# Call merge-file with list of results
import io
from contextlib import redirect_stdout, redirect_stderr
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
# Pass the list of file results to merge-file
merge_returncode = merge_cmdlet(files_to_merge, merge_args, self.config)
merge_stdout = stdout_buf.getvalue()
merge_stderr = stderr_buf.getvalue()
# Log the merge output so it gets captured by WorkerLoggingHandler
if merge_stdout:
logger.info(f"[merge-file output]\n{merge_stdout}")
if merge_stderr:
logger.info(f"[merge-file stderr]\n{merge_stderr}")
if merge_returncode != 0:
logger.error(f"merge-file stage failed with code {merge_returncode}")
logger.error(f" stderr: {merge_stderr}")
self.app.call_from_thread(
self.app.notify,
f"Merge failed: {merge_stderr[:100] if merge_stderr else 'unknown error'}",
title="Merge Error",
severity="warning"
)
# Don't fail entirely - files were downloaded
else:
logger.info("merge-file stage completed successfully")
if merge_stdout:
logger.info(f"merge-file stdout: {merge_stdout}")
if merge_stderr:
logger.info(f"merge-file stderr: {merge_stderr}")
# Log step to worker
if worker:
worker.log_step("Merge completed successfully")
# Extract merged file path from stderr
# The merge-file cmdlet outputs: "[merge-file] Merged N files into: /path/to/merged.mp3"
for line in merge_stderr.split('\n'):
if 'Merged' in line and 'into:' in line:
# Extract path after "into: "
into_idx = line.find('into:')
if into_idx != -1:
merged_file_path = line[into_idx + 5:].strip()
if merged_file_path:
logger.info(f"Detected merged file path: {merged_file_path}")
break
# If not found in stderr, try stdout
if not merged_file_path:
for line in merge_stdout.split('\n'):
if 'merged' in line.lower() or line.endswith('.mp3') or line.endswith('.m4a'):
merged_file_path = line.strip()
if merged_file_path and not merged_file_path.startswith('['):
logger.info(f"Detected merged file path: {merged_file_path}")
break
# If we found the merged file, update result_obj to point to it
if merged_file_path:
result_obj.target = merged_file_path
result_obj.path = merged_file_path
logger.info(f"Updated result object to point to merged file: {merged_file_path}")
else:
logger.warning(f"No files found to merge. download_stderr_text length: {len(download_stderr_text)}, content preview: {download_stderr_text[:100]}")
except Exception as e:
logger.error(f"merge-file execution error: {e}", exc_info=True)
self.app.call_from_thread(
self.app.notify,
f"Merge error: {e}",
title="Merge Error",
severity="warning"
)
# Don't fail entirely - files were downloaded
else:
logger.info("merge-file cmdlet not found")
# Stage 3: Add tags (now after merge, if merge happened)
# If merge succeeded, result_obj now points to merged file
if tags and (download_succeeded or not download_enabled):
add_tags_cmdlet = get_cmdlet("add-tag")
if add_tags_cmdlet:
logger.info(f"Executing add-tag stage with {len(tags)} tags")
logger.info(f" Tags: {tags}")
logger.info(f" Source: {source}")
logger.info(f" Result path: {result_obj.path}")
logger.info(f" Result hash: {result_obj.hash_hex}")
# Log step to worker
if worker:
worker.log_step(f"Starting add-tag stage with {len(tags)} tags...")
# Build add-tag arguments: tag1 tag2 tag3 --source <source>
tag_args = [str(t) for t in tags] + ["--source", str(source)]
logger.info(f" Tag args: {tag_args}")
logger.info(f" Result object attributes: target={getattr(result_obj, 'target', 'MISSING')}, path={getattr(result_obj, 'path', 'MISSING')}, hash_hex={getattr(result_obj, 'hash_hex', 'MISSING')}")
try:
# Capture output from the cmdlet
import io
from contextlib import redirect_stdout, redirect_stderr
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
returncode = add_tags_cmdlet(result_obj, tag_args, self.config)
stdout_text = stdout_buf.getvalue()
stderr_text = stderr_buf.getvalue()
# Log the tag output so it gets captured by WorkerLoggingHandler
if stdout_text:
logger.info(f"[add-tag output]\n{stdout_text}")
if stderr_text:
logger.info(f"[add-tag stderr]\n{stderr_text}")
if returncode != 0:
logger.error(f"add-tag stage failed with code {returncode}")
logger.error(f" stdout: {stdout_text}")
logger.error(f" stderr: {stderr_text}")
self.app.call_from_thread(
self.app.notify,
f"Failed to add tags: {stderr_text[:100] if stderr_text else stdout_text[:100] if stdout_text else 'unknown error'}",
title="Error",
severity="error"
)
# Don't dismiss on tag failure - let user retry or cancel, but hide progress
self.app.call_from_thread(self._hide_progress)
return
else:
if stdout_text:
logger.debug(f"add-tag stdout: {stdout_text}")
if stderr_text:
logger.debug(f"add-tag stderr: {stderr_text}")
logger.info("add-tag stage completed successfully")
# Log step to worker
if worker:
worker.log_step(f"Successfully added {len(tags)} tags")
except Exception as e:
logger.error(f"add-tag execution error: {e}", exc_info=True)
self.app.call_from_thread(
self.app.notify,
f"Error adding tags: {e}",
title="Error",
severity="error"
)
self.app.call_from_thread(self._hide_progress)
return
else:
logger.error("add-tag cmdlet not found")
else:
if tags and download_enabled and not download_succeeded:
skip_msg = "⚠️ Skipping add-tag stage because download failed"
logger.info(skip_msg)
if worker:
worker.append_stdout(f"\n{skip_msg}\n")
worker.finish("error", "Download stage failed - see logs above for details")
elif tags:
logger.info("No tags to add (tags list is empty)")
# Success notification
self.app.call_from_thread(
self.app.notify,
f"Download request processed: {url}",
title="Success",
severity="information",
timeout=2
)
# Finish worker with success status
if worker:
worker.finish("completed", "Download completed successfully")
logger.info("Download request processing complete")
# Hide progress and dismiss the modal
self.app.call_from_thread(self._hide_progress)
self.app.call_from_thread(self.dismiss)
except Exception as e:
logger.error(f"Error in download submit: {e}", exc_info=True)
# Ensure worker is marked as finished even on exception
if worker:
try:
worker.finish("error", f"Download failed: {str(e)}")
except Exception:
pass
self.app.call_from_thread(self._hide_progress)
self.app.call_from_thread(
self.app.notify,
f"Error: {e}",
title="Error",
severity="error"
)
def _create_url_result(self, url: str):
"""Create a result object from a URL for cmdlet processing."""
class URLDownloadResult:
def __init__(self, u):
self.target = u
self.url = u
self.path: str | None = None
self.hash_hex: str | None = None
self.media_kind = "url"
return URLDownloadResult(url)
def action_cancel(self) -> None:
"""Cancel the download request."""
self.dismiss()
def on_key(self, event) -> None:
"""Handle key presses to implement context-sensitive Ctrl+T."""
if event.key == "ctrl+t":
# Check which widget has focus
focused_widget = self.app.focused
if focused_widget and focused_widget.id == "paragraph_textarea":
# URL textarea: scrape fresh metadata, wipe tags and source
self._action_scrape_url_metadata()
event.prevent_default()
elif focused_widget and focused_widget.id == "tags_textarea":
# Tags textarea: scrape special fields and adjectives
self._action_scrape_tags()
event.prevent_default()
def _action_scrape_url_metadata(self) -> None:
"""Scrape metadata from URL(s) in URL textarea - wipes tags and source.
This is triggered by Ctrl+T when URL textarea is focused.
Supports single URL or multiple URLs (newline/comma-separated).
For multiple PDF URLs, creates pseudo-playlist for merge workflow.
"""
try:
text = self.paragraph_textarea.text.strip()
if not text:
logger.warning("No URL to scrape metadata from")
return
# Parse multiple URLs (newline or comma-separated)
urls = []
for line in text.split('\n'):
line = line.strip()
if line:
# Handle comma-separated URLs within a line
for url in line.split(','):
url = url.strip()
if url:
urls.append(url)
# Check if multiple URLs provided
if len(urls) > 1:
logger.info(f"Detected {len(urls)} URLs - checking for PDF pseudo-playlist")
# Check if all URLs appear to be PDFs
all_pdfs = all(url.endswith('.pdf') or 'pdf' in url.lower() for url in urls)
if all_pdfs:
logger.info(f"All URLs are PDFs - creating pseudo-playlist")
self._handle_pdf_playlist(urls)
return
# Single URL - proceed with normal metadata scraping
url = urls[0] if urls else text.strip()
logger.info(f"Scraping fresh metadata from: {url}")
# Check if tags are already provided in textarea
existing_tags = self.tags_textarea.text.strip()
wipe_tags = not existing_tags # Only wipe if no tags exist
# Run in background to prevent UI freezing
self._scrape_metadata_worker(url, wipe_tags_and_source=wipe_tags, skip_tag_scraping=not wipe_tags)
except Exception as e:
logger.error(f"Error in _action_scrape_url_metadata: {e}", exc_info=True)
def _action_scrape_tags(self) -> None:
"""Process tags from tags textarea, expanding tag lists like {philosophy}.
This is triggered by Ctrl+T when tags textarea is focused.
Processes tag list references from adjective.json (e.g., {psychology})
and expands them to the full list of tags.
"""
try:
current_tags = self.tags_textarea.text.strip()
if not current_tags:
logger.warning("No tags to process")
return
if not expand_tag_lists or not process_tags_from_string:
logger.warning("tag_helpers not available")
self.app.notify(
"Tag processing unavailable",
title="Error",
severity="error",
timeout=2
)
return
logger.info(f"Processing tags: {current_tags[:50]}...")
# Parse tags from current text
tags_set = process_tags_from_string(current_tags, expand_lists=False)
if not tags_set:
logger.warning("No tags parsed from text")
return
# Expand tag list references like {psychology}
expanded_tags = expand_tag_lists(tags_set)
if len(expanded_tags) > len(tags_set):
# Tags were expanded
tags_count_added = len(expanded_tags) - len(tags_set)
logger.info(f"Expanded tags: added {tags_count_added} new tags")
self.app.notify(
f"Expanded: {tags_count_added} new tags added from tag lists",
title="Tags Expanded",
severity="information",
timeout=2
)
else:
logger.info("No tag list expansions found")
self.app.notify(
"No {list} references found to expand",
title="Info",
severity="information",
timeout=2
)
# Update textarea with expanded tags (one per line)
self.tags_textarea.text = '\n'.join(sorted(expanded_tags))
logger.info(f"Updated tags textarea with {len(expanded_tags)} tags")
except Exception as e:
logger.error(f"Error in _action_scrape_tags: {e}", exc_info=True)
self.app.notify(
f"Error processing tags: {e}",
title="Error",
severity="error"
)
def _handle_pdf_playlist(self, pdf_urls: list) -> None:
"""Handle multiple PDF URLs as a pseudo-playlist.
Creates a playlist-like structure with PDF metadata for merge workflow.
Extracts title from URL or uses default naming.
Args:
pdf_urls: List of PDF URLs to process
"""
try:
logger.info(f"Creating PDF pseudo-playlist with {len(pdf_urls)} items")
# Create playlist items from PDF URLs
playlist_items = []
for idx, url in enumerate(pdf_urls, 1):
# Extract filename from URL for display
try:
# Get filename from URL path
from urllib.parse import urlparse
parsed = urlparse(url)
filename = parsed.path.split('/')[-1]
if not filename or filename.endswith('.pdf'):
filename = filename or f'pdf_{idx}.pdf'
# Remove .pdf extension for display
title = filename.replace('.pdf', '').replace('_', ' ').replace('-', ' ')
except Exception as e:
logger.debug(f"Could not extract filename: {e}")
title = f'PDF {idx}'
item = {
'id': str(idx - 1), # 0-based index
'title': title,
'duration': '', # PDFs don't have duration, leave empty
'url': url # Store the URL for later download
}
playlist_items.append(item)
# Build minimal metadata structure for UI population
metadata = {
'title': f'{len(pdf_urls)} PDF Documents',
'tags': [],
'formats': [('pdf', 'pdf')], # Default format is PDF
'playlist_items': playlist_items,
'is_pdf_playlist': True # Mark as PDF pseudo-playlist
}
# Store URLs for later use during merge
self.pdf_urls = pdf_urls
self.is_pdf_playlist = True
# Populate the modal with metadata
logger.info(f"Populating modal with {len(playlist_items)} PDF items")
self._populate_from_metadata(metadata, wipe_tags_and_source=True)
self.app.notify(
f"Loaded {len(pdf_urls)} PDFs as playlist",
title="PDF Playlist",
severity="information",
timeout=3
)
except Exception as e:
logger.error(f"Error handling PDF playlist: {e}", exc_info=True)
self.app.notify(
f"Error loading PDF playlist: {e}",
title="Error",
severity="error",
timeout=3
)
def _handle_pdf_playlist_download(self, pdf_urls: list, tags: list, selection: str, merge_enabled: bool) -> None:
"""Download and merge PDF playlist.
Args:
pdf_urls: List of PDF URLs to download
tags: Tags to apply to the merged PDF
selection: Selection string like "1-3" or "1,3,5"
merge_enabled: Whether to merge the PDFs
"""
# Check if PyPDF2 is available for merge (needed at function start)
try:
from PyPDF2 import PdfWriter, PdfReader
HAS_PYPDF2 = True
except ImportError:
HAS_PYPDF2 = False
PdfWriter = None
PdfReader = None
try:
from pathlib import Path
import requests
from config import resolve_output_dir
# Create temporary list of playlist items for selection parsing
# We need this because _parse_playlist_selection uses self.playlist_items
temp_items = []
for url in pdf_urls:
temp_items.append({'title': url})
self.playlist_items = temp_items
# Parse selection to get which PDFs to download
selected_indices = self._parse_playlist_selection(selection)
if not selected_indices:
# No valid selection, use all
selected_indices = list(range(len(pdf_urls)))
selected_urls = [pdf_urls[i] for i in selected_indices]
logger.info(f"Downloading {len(selected_urls)} selected PDFs for merge")
# Download PDFs to temporary directory
temp_dir = Path.home() / ".downlow_temp_pdfs"
temp_dir.mkdir(exist_ok=True)
downloaded_files = []
for idx, url in enumerate(selected_urls, 1):
try:
logger.info(f"Downloading PDF {idx}/{len(selected_urls)}: {url}")
response = requests.get(url, timeout=30)
response.raise_for_status()
# Generate filename from URL
from urllib.parse import urlparse
parsed = urlparse(url)
filename = parsed.path.split('/')[-1]
if not filename.endswith('.pdf'):
filename = f'pdf_{idx}.pdf'
pdf_path = temp_dir / filename
with open(pdf_path, 'wb') as f:
f.write(response.content)
downloaded_files.append(pdf_path)
logger.info(f"Downloaded to: {pdf_path}")
except Exception as e:
logger.error(f"Failed to download PDF {idx}: {e}")
self.app.call_from_thread(
self.app.notify,
f"Failed to download PDF {idx}: {e}",
title="Download Error",
severity="error"
)
return
# Merge PDFs if requested
if merge_enabled and len(downloaded_files) > 1:
if not HAS_PYPDF2:
logger.error("PyPDF2 not available for PDF merge")
self.app.call_from_thread(
self.app.notify,
"PyPDF2 required for PDF merge. Install with: pip install PyPDF2",
title="Missing Dependency",
severity="error"
)
return
logger.info(f"Merging {len(downloaded_files)} PDFs")
try:
writer = PdfWriter()
for pdf_file in downloaded_files:
reader = PdfReader(pdf_file)
for page in reader.pages:
writer.add_page(page)
logger.info(f"Added {len(reader.pages)} pages from {pdf_file.name}")
# Save merged PDF to output directory
output_dir = Path(resolve_output_dir(self.config))
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / "merged_pdfs.pdf"
# Make filename unique if it exists
counter = 1
while output_path.exists():
output_path = output_dir / f"merged_pdfs_{counter}.pdf"
counter += 1
with open(output_path, 'wb') as f:
writer.write(f)
logger.info(f"Merged PDF saved to: {output_path}")
# Tag the file if tags provided
if tags and get_cmdlet:
tag_cmdlet = get_cmdlet("add-tags")
if tag_cmdlet:
logger.info(f"Tagging merged PDF with {len(tags)} tags")
# Create a result object for the PDF
class PDFResult:
def __init__(self, p):
self.path = str(p)
self.target = str(p)
self.hash_hex = None
result_obj = PDFResult(output_path)
import io
from contextlib import redirect_stdout, redirect_stderr
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
tag_returncode = tag_cmdlet(result_obj, tags, self.config)
if tag_returncode != 0:
logger.warning(f"Tag stage returned code {tag_returncode}")
self.app.call_from_thread(
self.app.notify,
f"Successfully merged {len(downloaded_files)} PDFs",
title="Merge Complete",
severity="information",
timeout=3
)
except Exception as e:
logger.error(f"PDF merge error: {e}", exc_info=True)
self.app.call_from_thread(
self.app.notify,
f"PDF merge failed: {e}",
title="Merge Error",
severity="error"
)
else:
# Save individual PDFs to output
output_dir = Path(resolve_output_dir(self.config))
output_dir.mkdir(parents=True, exist_ok=True)
for pdf_file in downloaded_files:
output_path = output_dir / pdf_file.name
# Make filename unique if it exists
counter = 1
base_name = pdf_file.stem
while output_path.exists():
output_path = output_dir / f"{base_name}_{counter}.pdf"
counter += 1
import shutil
shutil.copy2(pdf_file, output_path)
logger.info(f"Saved PDF to: {output_path}")
self.app.call_from_thread(
self.app.notify,
f"Downloaded {len(downloaded_files)} PDFs",
title="Download Complete",
severity="information",
timeout=3
)
except Exception as e:
logger.error(f"Error in PDF playlist download: {e}", exc_info=True)
self.app.call_from_thread(
self.app.notify,
f"Error processing PDF playlist: {e}",
title="Error",
severity="error"
)
@work(thread=True)
def _scrape_metadata_worker(self, url: str, wipe_tags_and_source: bool = False, skip_tag_scraping: bool = False) -> None:
"""Background worker to scrape metadata using get-tag cmdlet.
Args:
url: URL to scrape metadata from
wipe_tags_and_source: If True, clear tags and source before populating
skip_tag_scraping: If True, don't scrape tags (only title/formats)
"""
try:
logger.info(f"Metadata worker started for: {url}")
# Call get-tag cmdlet to scrape URL
if not get_cmdlet:
logger.error("cmdlets module not available")
self.app.call_from_thread(
self.app.notify,
"cmdlets module not available",
title="Error",
severity="error"
)
return
# Get the get-tag cmdlet
get_tag_cmdlet = get_cmdlet("get-tag")
if not get_tag_cmdlet:
logger.error("get-tag cmdlet not found")
self.app.call_from_thread(
self.app.notify,
"get-tag cmdlet not found",
title="Error",
severity="error"
)
return
# Create a simple result object for the cmdlet
class URLResult:
def __init__(self, u):
self.target = u
self.hash_hex = None
self.path = None
result_obj = URLResult(url)
# Call the cmdlet with -scrape flag (unless skipping tag scraping)
import io
from contextlib import redirect_stdout, redirect_stderr
output_buffer = io.StringIO()
error_buffer = io.StringIO()
# Only scrape if not skipping tag scraping
args = [] if skip_tag_scraping else ["-scrape", url]
with redirect_stdout(output_buffer), redirect_stderr(error_buffer):
returncode = get_tag_cmdlet(result_obj, args, {})
if returncode != 0:
error_msg = error_buffer.getvalue()
logger.error(f"get-tag cmdlet failed: {error_msg}")
try:
self.app.call_from_thread(
self.app.notify,
f"Failed to scrape metadata: {error_msg}",
title="Error",
severity="error"
)
except Exception as e:
logger.debug(f"Could not notify user: {e}")
return
# Parse the JSON output
output = output_buffer.getvalue().strip()
if not output:
logger.warning("get-tag returned no output")
try:
self.app.call_from_thread(
self.app.notify,
"No metadata returned from get-tag",
title="Error",
severity="error"
)
except Exception as e:
logger.debug(f"Could not notify user: {e}")
return
# Extract the JSON line (skip debug messages that start with [get-tag])
json_line = None
for line in output.split('\n'):
if line.strip().startswith('{'):
json_line = line.strip()
break
if not json_line:
logger.error(f"No JSON found in get-tag output")
logger.debug(f"Raw output: {output}")
try:
self.app.call_from_thread(
self.app.notify,
"No metadata found in response",
title="Error",
severity="error"
)
except Exception as e:
logger.debug(f"Could not notify user: {e}")
return
try:
metadata_result = json.loads(json_line)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON: {e}")
logger.debug(f"JSON line: {json_line}")
try:
self.app.call_from_thread(
self.app.notify,
f"Failed to parse metadata: {e}",
title="Error",
severity="error"
)
except Exception as ne:
logger.debug(f"Could not notify user: {ne}")
return
# Build metadata dict in the format expected by _populate_from_metadata
# If skipping tag scraping, preserve existing tags
existing_tags = self.tags_textarea.text.strip().split('\n') if skip_tag_scraping else []
existing_tags = [tag.strip() for tag in existing_tags if tag.strip()]
# Extract playlist items if present
playlist_items = metadata_result.get('playlist_items', [])
metadata = {
'title': metadata_result.get('title', 'Unknown'),
'url': url,
'tags': metadata_result.get('tags', []) or existing_tags, # Use existing if new are empty
'formats': metadata_result.get('formats', []),
'playlist_items': playlist_items,
}
logger.info(f"Retrieved metadata: title={metadata['title']}, tags={len(metadata['tags'])}, formats={len(metadata['formats'])}, playlist_items={len(playlist_items)}")
# Update UI on main thread
self.app.call_from_thread(
self._populate_from_metadata,
metadata,
wipe_tags_and_source
)
except Exception as e:
logger.error(f"Metadata worker error: {e}", exc_info=True)
try:
self.app.call_from_thread(
self.app.notify,
f"Failed to scrape metadata: {e}",
title="Error",
severity="error"
)
except Exception as ne:
logger.debug(f"Could not notify user of error: {ne}")
def _convert_selection_to_ytdlp(self, selection_str: str) -> str:
"""Convert playlist selection string to yt-dlp --playlist-items format.
Args:
selection_str: Selection string like "1-3", "all", "merge", "1,3,5-8"
Can also include multiple keywords separated by spaces
Returns:
yt-dlp format string like "1-3,5,8" or "1-10" for all
"""
if not selection_str:
return ""
selection_str = selection_str.strip().upper()
max_idx = len(self.playlist_items)
# Handle keywords (all, merge, a, m) - can be space or comma separated
# "ALL MERGE", "A M", "ALL,MERGE" etc all mean download all items
if any(kw in selection_str.replace(',', ' ').split() for kw in {'A', 'ALL', 'M', 'MERGE'}):
# User said to get all items (merge is same as all in this context)
return f"1-{max_idx}"
# Parse ranges like "1,3,5-8" and convert to yt-dlp format
# The selection is already in 1-based format from user, keep it that way
# yt-dlp expects 1-based indices
try:
parts = []
for part in selection_str.split(','):
part = part.strip()
if part: # Skip empty parts
parts.append(part)
return ','.join(parts)
except (ValueError, AttributeError):
logger.error(f"Failed to convert playlist selection: {selection_str}")
return ""
def _parse_playlist_selection(self, selection_str: str) -> list:
"""Parse playlist selection string into list of track indices (0-based).
Args:
selection_str: Selection string like "1-3", "all", "merge", "1,3,5-8"
Returns:
List of 0-based indices, or empty list if invalid
"""
if not selection_str:
return []
selection_str = selection_str.strip().upper()
max_idx = len(self.playlist_items)
# Handle keywords (all, merge, a, m) - can be space or comma separated
# "ALL MERGE", "A M", "ALL,MERGE" etc all mean download all items
if any(kw in selection_str.replace(',', ' ').split() for kw in {'A', 'ALL', 'M', 'MERGE'}):
# User said to get all items
return list(range(max_idx))
# Parse ranges like "1,3,5-8"
indices = set()
try:
for part in selection_str.split(','):
part = part.strip()
if '-' in part:
# Range like "5-8"
start_str, end_str = part.split('-', 1)
start = int(start_str.strip()) - 1 # Convert to 0-based
end = int(end_str.strip()) # end is inclusive in user terms
for i in range(start, end):
if 0 <= i < max_idx:
indices.add(i)
else:
# Single number
idx = int(part.strip()) - 1 # Convert to 0-based
if 0 <= idx < max_idx:
indices.add(idx)
return sorted(list(indices))
except (ValueError, AttributeError):
logger.error(f"Failed to parse playlist selection: {selection_str}")
return []
def _execute_download_pipeline(self, result_obj: Any, tags: list, source: str, download_enabled: bool, worker=None) -> None:
"""Execute the download pipeline for a single item.
Args:
result_obj: URL result object
tags: List of tags to apply
source: Source for metadata
download_enabled: Whether to download the file
worker: Optional Worker instance for logging
"""
# Import cmdlet system
if not get_cmdlet:
error_msg = "cmdlets module not available"
logger.error(error_msg)
if worker:
worker.append_stdout(f"❌ ERROR: {error_msg}\n")
self.app.call_from_thread(
self.app.notify,
"Cmdlets system unavailable",
title="Error",
severity="error"
)
return
# Stage 1: Download data if enabled
if download_enabled:
download_cmdlet = get_cmdlet("download-data")
if download_cmdlet:
stage_msg = "📥 Executing download-data stage"
logger.info(stage_msg)
if worker:
worker.append_stdout(f"{stage_msg}\n")
try:
import io
from contextlib import redirect_stdout, redirect_stderr
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
returncode = download_cmdlet(result_obj, [], self.config)
stdout_text = stdout_buf.getvalue()
stderr_text = stderr_buf.getvalue()
if stdout_text:
logger.debug(f"download-data stdout: {stdout_text}")
if worker:
worker.append_stdout(stdout_text)
if stderr_text:
logger.debug(f"download-data stderr: {stderr_text}")
if worker:
worker.append_stdout(f"⚠️ stderr: {stderr_text}\n")
if returncode != 0:
error_msg = f"❌ download-data stage failed with code {returncode}\nstderr: {stderr_text}"
logger.error(error_msg)
if worker:
worker.append_stdout(f"{error_msg}\n")
self.app.call_from_thread(
self.app.notify,
f"Download failed: {stderr_text[:100]}",
title="Download Error",
severity="error"
)
return
else:
success_msg = "✅ download-data completed successfully"
logger.info(success_msg)
if worker:
worker.append_stdout(f"{success_msg}\n")
except Exception as e:
error_msg = f"❌ download-data error: {e}"
logger.error(error_msg, exc_info=True)
if worker:
worker.append_stdout(f"{error_msg}\nTraceback:\n{__import__('traceback').format_exc()}\n")
self.app.call_from_thread(
self.app.notify,
str(e)[:100],
title="Download Error",
severity="error"
)
return
# Stage 2: Tag the file if tags provided
if tags:
tag_cmdlet = get_cmdlet("add-tags")
if tag_cmdlet and result_obj.get('path'):
stage_msg = f"🏷️ Tagging with {len(tags)} tags"
logger.info(stage_msg)
if worker:
worker.append_stdout(f"{stage_msg}\n")
try:
tag_args = tags
import io
from contextlib import redirect_stdout, redirect_stderr
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
tag_returncode = tag_cmdlet(result_obj, tag_args, {})
stdout_text = stdout_buf.getvalue()
stderr_text = stderr_buf.getvalue()
if stdout_text:
logger.debug(f"tag stdout: {stdout_text}")
if worker:
worker.append_stdout(stdout_text)
if tag_returncode != 0:
warning_msg = f"⚠️ Tag stage returned code {tag_returncode}: {stderr_text}"
logger.warning(warning_msg)
if worker:
worker.append_stdout(f"{warning_msg}\n")
else:
if worker:
worker.append_stdout("✅ Tags applied successfully\n")
except Exception as e:
error_msg = f"❌ Tagging error: {e}"
logger.error(error_msg, exc_info=True)
if worker:
worker.append_stdout(f"{error_msg}\n")
else:
if not result_obj.get('path'):
warning_msg = "⚠️ No file path in result - skipping tagging"
logger.warning(warning_msg)
if worker:
worker.append_stdout(f"{warning_msg}\n")
else:
if worker:
worker.append_stdout("✅ Download complete (no tags to apply)\n")
def _show_format_select(self) -> None:
"""Show format select (always visible for single files)."""
try:
files_container = self.query_one("#files_container", Container)
playlist_container = self.query_one("#playlist_container", Container)
# Format select always visible, playlist hidden by default
files_container.styles.height = "1fr"
playlist_container.styles.height = "0"
self.is_playlist = False
except Exception as e:
logger.error(f"Error showing format select: {e}")
def _show_playlist_controls(self) -> None:
"""Show playlist tree and input alongside format select (for playlists)."""
try:
playlist_container = self.query_one("#playlist_container", Container)
# Just make playlist visible - format select remains visible above it
playlist_container.styles.height = "auto"
self.is_playlist = True
except Exception as e:
logger.error(f"Error showing playlist controls: {e}")
def _populate_playlist_tree(self, items: list) -> None:
"""Populate the playlist tree with track items.
Args:
items: List of track info dicts with 'id', 'title', 'duration', etc.
"""
try:
self.playlist_tree.clear()
self.playlist_items = items
for idx, item in enumerate(items, 1):
title = item.get('title', f'Track {idx}')
duration = item.get('duration', '')
# Format: "1. Song Title (3:45)"
label = f"{idx}. {title}"
if duration:
label += f" ({duration})"
self.playlist_tree.root.add_leaf(label)
logger.info(f"Populated playlist tree with {len(items)} items")
except Exception as e:
logger.error(f"Error populating playlist tree: {e}")
def _populate_from_metadata(self, metadata: dict, wipe_tags_and_source: bool = False) -> None:
"""Populate modal fields from extracted metadata.
Args:
metadata: Dictionary with title, tags, formats
wipe_tags_and_source: If True, clear tags and source before populating
"""
try:
# Wipe tags and source if requested (fresh scrape from URL)
if wipe_tags_and_source:
self.tags_textarea.text = ""
# Reset source to first available option
try:
# Get all options and select the first one
source_options = self._build_source_options()
if source_options:
self.source_select.value = source_options[0][1]
except Exception as e:
logger.warning(f"Could not reset source select: {e}")
# Populate tags - using extracted tags (one per line format)
tags = metadata.get('tags', [])
existing_tags = self.tags_textarea.text.strip()
title = metadata.get('title', 'Unknown')
# Extract meaningful tags:
# 1. Freeform tags (tag:value)
# 2. Creator/artist metadata (creator:, artist:, channel:)
# 3. Other meaningful namespaces (genre:, album:, track:, etc.)
meaningful_tags = []
# Add title tag first (so user can edit it)
if title and title != 'Unknown':
meaningful_tags.append(f"title:{title}")
# Namespaces to exclude (metadata-only, not user-facing)
excluded_namespaces = {
'hash', # Hash values (internal)
'known_url', # URLs (internal)
'relationship', # Internal relationships
'url', # URLs (internal)
}
# Add all other tags
for tag in tags:
if ':' in tag:
namespace, value = tag.split(':', 1)
# Skip internal/metadata namespaces
if namespace.lower() not in excluded_namespaces:
meaningful_tags.append(tag)
else:
# Tags without namespace are freeform - always include
meaningful_tags.append(tag)
# Build tags string (one per line)
tags_str = '\n'.join(meaningful_tags)
if existing_tags:
self.tags_textarea.text = existing_tags + '\n' + tags_str
else:
self.tags_textarea.text = tags_str
# Check if this is a playlist
playlist_items = metadata.get('playlist_items', [])
formats = metadata.get('formats', [])
# Always show format select (single file or default for playlist)
self._show_format_select()
if formats:
# formats may be lists (from JSON) or tuples, convert to tuples
format_tuples = []
for fmt in formats:
if isinstance(fmt, (list, tuple)) and len(fmt) == 2:
format_tuples.append(tuple(fmt))
if format_tuples:
self.files_select.set_options(format_tuples)
# Select the first format by default
self.files_select.value = format_tuples[0][1]
self.selected_files = {format_tuples[0][0]}
# If playlist, also show the tree for track selection
if playlist_items and len(playlist_items) > 0:
logger.info(f"Detected playlist with {len(playlist_items)} items")
self._populate_playlist_tree(playlist_items)
# Show playlist tree alongside format select (height: auto to show)
playlist_container = self.query_one("#playlist_container", Container)
playlist_container.styles.height = "auto"
# SET FLAG SO action_submit() KNOWS THIS IS A PLAYLIST
self.is_playlist = True
logger.info(f"Populated modal from metadata: {len(meaningful_tags)} tags, {len(playlist_items)} playlist items, {len(formats)} formats")
# Notify user
self.app.notify(
f"Scraped metadata: {title}",
title="Metadata Loaded",
severity="information",
timeout=3
)
except Exception as e:
logger.error(f"Error populating metadata: {e}", exc_info=True)
self.app.notify(
f"Failed to populate metadata: {e}",
title="Error",
severity="error"
)
def on_select_changed(self, event: Select.Changed) -> None:
"""Handle Select widget changes (format selection)."""
if event.select.id == "files_select":
# Update selected_files to track the chosen format value
if event.value:
self.selected_files = {str(event.value)}
logger.debug(f"Selected format: {event.value}")
def on_button_pressed(self, event) -> None:
"""Handle button clicks."""
if event.button.id == "submit_btn":
self.action_submit()
elif event.button.id == "cancel_btn":
self.action_cancel()
def _show_progress(self) -> None:
"""Show the progress bar and hide buttons."""
try:
# Show progress bar by setting height
self.progress_bar.styles.height = 1
self.progress_bar.update(total=100)
# Hide buttons during download
button_row = self.query_one("#button_row", Horizontal)
button_row.display = False
except Exception as e:
logger.error(f"Error showing progress bar: {e}")
def _hide_progress(self) -> None:
"""Hide the progress bar and show buttons again."""
try:
# Hide progress bar by setting height to 0
self.progress_bar.styles.height = 0
# Show buttons again
button_row = self.query_one("#button_row", Horizontal)
button_row.display = True
except Exception as e:
logger.error(f"Error hiding progress bar: {e}")