diff --git a/API/data/alldebrid.json b/API/data/alldebrid.json index a70390d..be2b5e5 100644 --- a/API/data/alldebrid.json +++ b/API/data/alldebrid.json @@ -234,7 +234,7 @@ "ddl\\.to/([0-9a-zA-Z]{12})" ], "regexp": "((ddownload\\.com/[0-9a-zA-Z]{12}))|(ddl\\.to/([0-9a-zA-Z]{12}))", - "status": true + "status": false }, "dropapk": { "name": "dropapk", diff --git a/API/folder.py b/API/folder.py index a9756e9..87790c8 100644 --- a/API/folder.py +++ b/API/folder.py @@ -2508,34 +2508,25 @@ class API_folder_store: ) return False + payload = text if text.endswith("\n") else f"{text}\n" cursor = self.connection.cursor() cursor.execute( - "SELECT stdout FROM worker WHERE worker_id = ?", - (worker_id, - ) + """ + UPDATE worker + SET stdout = CASE + WHEN stdout IS NULL OR stdout = '' THEN ? + WHEN substr(stdout, -1, 1) = '\n' THEN stdout || ? + ELSE stdout || '\n' || ? + END, + last_updated = CURRENT_TIMESTAMP, + last_stdout_at = CURRENT_TIMESTAMP + WHERE worker_id = ? + """, + (payload, payload, payload, worker_id), ) - row = cursor.fetchone() - - if not row: + if cursor.rowcount <= 0: logger.warning(f"Worker {worker_id} not found for stdout append") return False - - current_stdout = row[0] or "" - separator = ( - "" if not current_stdout else - ("" if current_stdout.endswith("\n") else "\n") - ) - new_stdout = f"{current_stdout}{separator}{text}\n" - - cursor.execute( - """ - UPDATE worker SET stdout = ?, last_updated = CURRENT_TIMESTAMP, - last_stdout_at = CURRENT_TIMESTAMP - WHERE worker_id = ? - """, - (new_stdout, - worker_id), - ) self._insert_worker_log_entry( cursor, worker_id, diff --git a/CLI.py b/CLI.py index 40e5e83..b8d7348 100644 --- a/CLI.py +++ b/CLI.py @@ -2856,6 +2856,56 @@ class PipelineExecutor: except Exception: auto_stage = None + source_cmd_for_selection = None + source_args_for_selection: List[str] = [] + try: + source_cmd_for_selection = ( + ctx.get_current_stage_table_source_command() + or ctx.get_last_result_table_source_command() + ) + source_args_for_selection = ( + ctx.get_current_stage_table_source_args() + or ctx.get_last_result_table_source_args() + or [] + ) + except Exception: + source_cmd_for_selection = None + source_args_for_selection = [] + + if not stages and selection_indices and source_cmd_for_selection: + src_norm = _norm_cmd(source_cmd_for_selection) + if src_norm in {".worker", "worker", "workers"}: + if len(selection_indices) == 1: + idx = selection_indices[0] + row_args = None + try: + row_args = ctx.get_current_stage_table_row_selection_args(idx) + except Exception: + row_args = None + if not row_args: + try: + row_args = ctx.get_last_result_table_row_selection_args(idx) + except Exception: + row_args = None + if not row_args: + try: + items = ctx.get_last_result_items() or [] + if 0 <= idx < len(items): + maybe = items[idx] + if isinstance(maybe, dict): + candidate = maybe.get("_selection_args") + if isinstance(candidate, (list, tuple)): + row_args = [str(x) for x in candidate if x is not None] + except Exception: + row_args = row_args or None + + if row_args: + stages.append( + [str(source_cmd_for_selection)] + + [str(x) for x in row_args if x is not None] + + [str(x) for x in source_args_for_selection if x is not None] + ) + def _apply_row_action_to_stage(stage_idx: int) -> bool: if not selection_indices or len(selection_indices) != 1: return False diff --git a/SYS/worker_manager.py b/SYS/worker_manager.py index d95ca16..d14c2d5 100644 --- a/SYS/worker_manager.py +++ b/SYS/worker_manager.py @@ -6,7 +6,7 @@ persistence to database and optional auto-refresh callbacks. import logging from pathlib import Path -from typing import Optional, Dict, Any, List, Callable +from typing import Optional, Dict, Any, List, Callable, Tuple from datetime import datetime from threading import Thread, Lock import time @@ -270,6 +270,13 @@ class WorkerManager: WorkerLoggingHandler] = {} # Track active handlers self._worker_last_step: Dict[str, str] = {} + # Buffered stdout/log batching to reduce DB lock contention. + self._stdout_buffers: Dict[Tuple[str, str], List[str]] = {} + self._stdout_buffer_sizes: Dict[Tuple[str, str], int] = {} + self._stdout_buffer_steps: Dict[Tuple[str, str], Optional[str]] = {} + self._stdout_last_flush: Dict[Tuple[str, str], float] = {} + self._stdout_flush_bytes = 4096 + self._stdout_flush_interval = 0.75 def close(self) -> None: """Close the database connection.""" @@ -392,9 +399,15 @@ class WorkerManager: root_logger = logging.getLogger() root_logger.removeHandler(handler) - logger.debug( - f"[WorkerManager] Disabled logging for worker: {worker_id}" - ) + # Flush any buffered stdout/log data for this worker + try: + self.flush_worker_stdout(worker_id) + except Exception: + pass + + logger.debug( + f"[WorkerManager] Disabled logging for worker: {worker_id}" + ) except Exception as e: logger.error( f"[WorkerManager] Error disabling logging for worker {worker_id}: {e}", @@ -508,6 +521,10 @@ class WorkerManager: True if update was successful """ try: + try: + self.flush_worker_stdout(worker_id) + except Exception: + pass kwargs = { "status": result, "completed_at": datetime.now().isoformat() @@ -742,17 +759,119 @@ class WorkerManager: Returns: True if append was successful """ + if not text: + return True + + now = time.monotonic() + step_label = self._get_last_step(worker_id) + key = (worker_id, channel) + pending_flush: List[Tuple[str, str, Optional[str], str]] = [] + + try: + with self._lock: + # Initialize last flush time for this buffer + if key not in self._stdout_last_flush: + self._stdout_last_flush[key] = now + + current_step = self._stdout_buffer_steps.get(key) + if current_step is None: + self._stdout_buffer_steps[key] = step_label + current_step = step_label + + # If step changes, flush existing buffer to keep step tags coherent + if current_step != step_label: + buffered = "".join(self._stdout_buffers.get(key, [])) + if buffered: + pending_flush.append((worker_id, channel, current_step, buffered)) + self._stdout_buffers[key] = [] + self._stdout_buffer_sizes[key] = 0 + self._stdout_last_flush[key] = now + self._stdout_buffer_steps[key] = step_label + + buf = self._stdout_buffers.setdefault(key, []) + buf.append(text) + size = self._stdout_buffer_sizes.get(key, 0) + len(text) + self._stdout_buffer_sizes[key] = size + + last_flush = self._stdout_last_flush.get(key, now) + should_flush = ( + size >= self._stdout_flush_bytes + or (now - last_flush) >= self._stdout_flush_interval + ) + if should_flush: + buffered = "".join(self._stdout_buffers.get(key, [])) + if buffered: + pending_flush.append( + (worker_id, channel, self._stdout_buffer_steps.get(key), buffered) + ) + self._stdout_buffers[key] = [] + self._stdout_buffer_sizes[key] = 0 + self._stdout_last_flush[key] = now + self._stdout_buffer_steps[key] = None + except Exception as e: + logger.error(f"[WorkerManager] Error buffering stdout: {e}", exc_info=True) + return False + + ok = True + for wid, ch, step, payload in pending_flush: + try: + with self._db_lock: + result = self.db.append_worker_stdout( + wid, + payload, + step=step, + channel=ch + ) + ok = ok and result + except Exception as e: + logger.error( + f"[WorkerManager] Error flushing stdout for {wid}: {e}", + exc_info=True, + ) + ok = False + return ok + + def flush_worker_stdout(self, worker_id: str) -> bool: + """Flush any buffered stdout/log data for a worker.""" + keys_to_flush: List[Tuple[str, str]] = [] + with self._lock: + for key in list(self._stdout_buffers.keys()): + if key[0] == worker_id: + keys_to_flush.append(key) + + ok = True + for wid, channel in keys_to_flush: + ok = self._flush_stdout_buffer(wid, channel) and ok + return ok + + def _flush_stdout_buffer(self, worker_id: str, channel: str) -> bool: + key = (worker_id, channel) + with self._lock: + chunks = self._stdout_buffers.get(key) + if not chunks: + return True + text = "".join(chunks) + step = self._stdout_buffer_steps.get(key) + self._stdout_buffers[key] = [] + self._stdout_buffer_sizes[key] = 0 + self._stdout_last_flush[key] = time.monotonic() + self._stdout_buffer_steps[key] = None + + if not text: + return True try: - step_label = self._get_last_step(worker_id) with self._db_lock: return self.db.append_worker_stdout( worker_id, text, - step=step_label, - channel=channel + step=step, + channel=channel, ) except Exception as e: - logger.error(f"[WorkerManager] Error appending stdout: {e}", exc_info=True) + logger.error( + f"[WorkerManager] Error flushing stdout for {worker_id}: {e}", + exc_info=True, + ) return False def get_stdout(self, worker_id: str) -> str: @@ -799,6 +918,17 @@ class WorkerManager: def close(self) -> None: """Close the worker manager and database connection.""" self.stop_auto_refresh() + try: + self._flush_all_stdout_buffers() + except Exception: + pass with self._db_lock: self.db.close() logger.info("[WorkerManager] Closed") + + def _flush_all_stdout_buffers(self) -> None: + keys_to_flush: List[Tuple[str, str]] = [] + with self._lock: + keys_to_flush = list(self._stdout_buffers.keys()) + for wid, channel in keys_to_flush: + self._flush_stdout_buffer(wid, channel) diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index 55f8bcf..b987643 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -9,6 +9,7 @@ import re import shutil import sys import tempfile +import time from collections.abc import Iterable as IterableABC from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse @@ -2640,6 +2641,9 @@ def propagate_metadata( is_same_length = len(new_items) == len(prev_normalized) for i, item in enumerate(new_items): + if isinstance(item, dict) and item.get("_skip_metadata_propagation"): + normalized.append(item) + continue try: obj = coerce_to_pipe_object(item) except Exception: @@ -3058,6 +3062,9 @@ def check_url_exists_in_storage( stage_ctx = None in_pipeline = bool(stage_ctx is not None or ("|" in str(current_cmd_text or ""))) + start_time = time.monotonic() + time_budget = 45.0 + debug(f"[preflight] check_url_exists_in_storage: checking {len(urls)} url(s)") if in_pipeline: try: already_checked = bool( @@ -3101,6 +3108,18 @@ def check_url_exists_in_storage( preflight_cache["url_duplicates"] = url_dup_cache _store_preflight_cache(preflight_cache) + def _timed_out(reason: str) -> bool: + try: + if (time.monotonic() - start_time) >= time_budget: + debug( + f"Bulk URL preflight timed out after {time_budget:.0f}s ({reason}); continuing" + ) + _mark_preflight_checked() + return True + except Exception: + return False + return False + if in_pipeline: try: cached_cmd = pipeline_context.load_value("preflight.url_duplicates.command", default="") @@ -3358,7 +3377,10 @@ def check_url_exists_in_storage( _mark_preflight_checked() return True - bulk_mode = len(unique_urls) >= 8 + if _timed_out("before backend scan"): + return True + + bulk_mode = len(unique_urls) > 1 def _build_bulk_patterns(needles_map: Dict[str, List[str]], max_per_url: int = 3, max_total: int = 240) -> List[str]: patterns: List[str] = [] @@ -3562,12 +3584,16 @@ def check_url_exists_in_storage( HydrusNetwork = None # type: ignore for backend_name in backend_names: + if _timed_out("backend scan"): + return True if len(match_rows) >= max_rows: break try: backend = storage[backend_name] except Exception: continue + + debug(f"[preflight] Scanning backend: {backend_name}") if HydrusNetwork is not None and isinstance(backend, HydrusNetwork): client = getattr(backend, "_client", None) @@ -3576,6 +3602,9 @@ def check_url_exists_in_storage( if not hydrus_available: debug("Bulk URL preflight: hydrus availability check failed; attempting best-effort lookup") + if _timed_out("hydrus scan"): + return True + if bulk_mode and bulk_patterns: bulk_hits: Optional[List[Any]] = None bulk_limit = min(2000, max(200, len(unique_urls) * 8)) @@ -3591,40 +3620,49 @@ def check_url_exists_in_storage( except Exception: bulk_hits = None - if bulk_hits is not None: - for hit in bulk_hits: - if len(match_rows) >= max_rows: - break - url_values = _extract_urls_from_hit(hit, backend, allow_backend_lookup=False) - if not url_values: - continue - - for original_url, needles in url_needles.items(): - if len(match_rows) >= max_rows: - break - if (original_url, str(backend_name)) in seen_pairs: - continue - - matched = False - for url_value in url_values: - for needle in (needles or []): - if _match_normalized_url(str(needle or ""), str(url_value or "")): - matched = True - break - if matched: - break - - if not matched: - continue - - seen_pairs.add((original_url, str(backend_name))) - matched_urls.add(original_url) - match_rows.append( - _build_display_row_for_hit(hit, str(backend_name), original_url) - ) + if bulk_hits is None: + debug("Bulk URL preflight: Hydrus bulk scan failed; skipping per-URL checks") continue + for hit in bulk_hits: + if _timed_out("hydrus bulk scan"): + return True + if len(match_rows) >= max_rows: + break + url_values = _extract_urls_from_hit(hit, backend, allow_backend_lookup=False) + if not url_values: + continue + + for original_url, needles in url_needles.items(): + if _timed_out("hydrus bulk scan"): + return True + if len(match_rows) >= max_rows: + break + if (original_url, str(backend_name)) in seen_pairs: + continue + + matched = False + for url_value in url_values: + for needle in (needles or []): + if _match_normalized_url(str(needle or ""), str(url_value or "")): + matched = True + break + if matched: + break + + if not matched: + continue + + seen_pairs.add((original_url, str(backend_name))) + matched_urls.add(original_url) + match_rows.append( + _build_display_row_for_hit(hit, str(backend_name), original_url) + ) + continue + for original_url, needles in url_needles.items(): + if _timed_out("hydrus per-url scan"): + return True if len(match_rows) >= max_rows: break if (original_url, str(backend_name)) in seen_pairs: @@ -3705,6 +3743,8 @@ def check_url_exists_in_storage( if bulk_hits is not None: for hit in bulk_hits: + if _timed_out("backend bulk scan"): + return True if len(match_rows) >= max_rows: break url_values = _extract_urls_from_hit(hit, backend, allow_backend_lookup=False) @@ -3712,6 +3752,8 @@ def check_url_exists_in_storage( continue for original_url, needles in url_needles.items(): + if _timed_out("backend bulk scan"): + return True if len(match_rows) >= max_rows: break if (original_url, str(backend_name)) in seen_pairs: @@ -3737,6 +3779,8 @@ def check_url_exists_in_storage( continue for original_url, needles in url_needles.items(): + if _timed_out("backend per-url scan"): + return True if len(match_rows) >= max_rows: break if (original_url, str(backend_name)) in seen_pairs: diff --git a/cmdlet/download_file.py b/cmdlet/download_file.py index 6937d46..4f1cde5 100644 --- a/cmdlet/download_file.py +++ b/cmdlet/download_file.py @@ -387,18 +387,23 @@ class Download_File(Cmdlet): total_items = len(expanded_items) processed_items = 0 + debug(f"[download-file] Processing {total_items} piped item(s)...") + try: if total_items: progress.set_percent(0) except Exception: pass - for item in expanded_items: + for idx, item in enumerate(expanded_items, 1): try: label = "item" table = get_field(item, "table") title = get_field(item, "title") target = get_field(item, "path") or get_field(item, "url") + + debug(f"[download-file] Item {idx}/{total_items}: {title or target or 'unnamed'}") + media_kind = get_field(item, "media_kind") tags_val = get_field(item, "tag") tags_list: Optional[List[str]] @@ -931,15 +936,26 @@ class Download_File(Cmdlet): @staticmethod def _init_storage(config: Dict[str, Any]) -> tuple[Optional[Any], bool]: + # Cache storage object in config to avoid excessive DB initialization in loops + if isinstance(config, dict) and "_storage_cache" in config: + cached = config["_storage_cache"] + if isinstance(cached, tuple) and len(cached) == 2: + return cached # type: ignore + storage = None hydrus_available = True try: from Store import Store from API.HydrusNetwork import is_hydrus_available + debug(f"[download-file] Initializing storage interface...") storage = Store(config=config or {}, suppress_debug=True) hydrus_available = bool(is_hydrus_available(config or {})) - except Exception: + + if isinstance(config, dict): + config["_storage_cache"] = (storage, hydrus_available) + except Exception as e: + debug(f"[download-file] Storage initialization error: {e}") storage = None return storage, hydrus_available @@ -1052,6 +1068,7 @@ class Download_File(Cmdlet): @staticmethod def _canonicalize_url_for_storage(*, requested_url: str, ytdlp_tool: YtDlpTool, playlist_items: Optional[str]) -> str: if playlist_items: + debug(f"[download-file] Skipping canonicalization for playlist item(s): {playlist_items}") return str(requested_url) try: cf = None @@ -1061,14 +1078,19 @@ class Download_File(Cmdlet): cf = str(cookie_path) except Exception: cf = None + + debug(f"[download-file] Canonicalizing URL: {requested_url}") pr = probe_url(requested_url, no_playlist=False, timeout_seconds=15, cookiefile=cf) if isinstance(pr, dict): for key in ("webpage_url", "original_url", "url", "requested_url"): value = pr.get(key) if isinstance(value, str) and value.strip(): - return value.strip() - except Exception: - pass + canon = value.strip() + if canon != requested_url: + debug(f"[download-file] Resolved canonical URL: {requested_url} -> {canon}") + return canon + except Exception as e: + debug(f"[download-file] Canonicalization error for {requested_url}: {e}") return str(requested_url) @@ -1113,6 +1135,10 @@ class Download_File(Cmdlet): def _maybe_show_playlist_table(self, *, url: str, ytdlp_tool: YtDlpTool) -> bool: + ctx = pipeline_context.get_stage_context() + if ctx is not None and getattr(ctx, "total_stages", 0) > 1: + return False + try: cf = self._cookiefile_str(ytdlp_tool) pr = probe_url(url, no_playlist=False, timeout_seconds=15, cookiefile=cf) @@ -1240,6 +1266,13 @@ class Download_File(Cmdlet): args: Sequence[str], skip_preflight: bool = False, ) -> Optional[int]: + try: + ctx = pipeline_context.get_stage_context() + if ctx is not None and getattr(ctx, "total_stages", 0) > 1: + # In pipelines, skip interactive format tables; require explicit -query format. + return None + except Exception: + pass if ( mode != "audio" and not clip_spec @@ -1415,7 +1448,7 @@ class Download_File(Cmdlet): for url in supported_url: try: - debug(f"Processing: {url}") + debug(f"[download-file] Processing URL in loop (1/3 stage 1): {url}") canonical_url = self._canonicalize_url_for_storage( requested_url=url, @@ -1424,6 +1457,7 @@ class Download_File(Cmdlet): ) if not skip_per_url_preflight: + debug(f"[download-file] Running duplicate preflight for: {canonical_url}") if not self._preflight_url_duplicate( storage=storage, hydrus_available=hydrus_available, @@ -1431,7 +1465,7 @@ class Download_File(Cmdlet): candidate_url=canonical_url, extra_urls=[url], ): - log(f"Skipping download: {url}", file=sys.stderr) + log(f"Skipping download (duplicate found): {url}", file=sys.stderr) continue PipelineProgress(pipeline_context).begin_steps(2) @@ -1510,9 +1544,9 @@ class Download_File(Cmdlet): ) PipelineProgress(pipeline_context).step("downloading") - debug(f"Starting download with 5-minute timeout...") + debug(f"Starting download for {url} (format: {actual_format or 'default'}) with {download_timeout_seconds}s activity timeout...") result_obj = _download_with_timeout(opts, timeout_seconds=download_timeout_seconds) - debug(f"Download completed, building pipe object...") + debug(f"Download completed for {url}, building pipe object...") break except DownloadError as e: cause = getattr(e, "__cause__", None) @@ -1816,14 +1850,21 @@ class Download_File(Cmdlet): debug(f"Output directory: {final_output_dir}") try: - PipelineProgress(pipeline_context).ensure_local_ui( - label="download-file", - total_items=len(supported_url), - items_preview=supported_url, - ) - except Exception: - pass + # If we are already in a pipeline stage, the parent UI is already handling progress. + # Calling ensure_local_ui can cause re-initialization hangs on some platforms. + if pipeline_context.get_stage_context() is None: + debug("[download-file] Initializing local UI...") + PipelineProgress(pipeline_context).ensure_local_ui( + label="download-file", + total_items=len(supported_url), + items_preview=supported_url, + ) + else: + debug("[download-file] Skipping local UI: running inside pipeline stage") + except Exception as e: + debug(f"[download-file] PipelineProgress update error: {e}") + debug("[download-file] Parsing clip and query specs...") clip_spec = parsed.get("clip") query_spec = parsed.get("query") @@ -1914,6 +1955,7 @@ class Download_File(Cmdlet): if query_format and not query_wants_audio: try: + debug(f"[download-file] Resolving numeric format for {candidate_url}...") idx_fmt = self._format_id_for_query_index(query_format, candidate_url, formats_cache, ytdlp_tool) except ValueError as e: log(f"Error parsing format selection: {e}", file=sys.stderr) @@ -1923,6 +1965,7 @@ class Download_File(Cmdlet): ytdl_format = idx_fmt if not ytdl_format: + debug(f"[download-file] Checking for playlist at {candidate_url}...") if self._maybe_show_playlist_table(url=candidate_url, ytdlp_tool=ytdlp_tool): playlist_selection_handled = True try: @@ -1996,6 +2039,7 @@ class Download_File(Cmdlet): forced_single_format_id = None forced_single_format_for_batch = False + debug(f"[download-file] Checking if format table should be shown...") early_ret = self._maybe_show_format_table_for_single_url( mode=mode, clip_spec=clip_spec, @@ -2023,6 +2067,7 @@ class Download_File(Cmdlet): except Exception: timeout_seconds = 300 + debug(f"[download-file] Proceeding to final download call for {len(supported_url)} URL(s)...") return self._download_supported_urls( supported_url=supported_url, ytdlp_tool=ytdlp_tool, @@ -2693,14 +2738,17 @@ class Download_File(Cmdlet): config["_skip_direct_on_streaming_failure"] = True if isinstance(config, dict) and config.get("_pipeobject_timeout_seconds") is None: - config["_pipeobject_timeout_seconds"] = 60 + # Use a generous default for individual items + config["_pipeobject_timeout_seconds"] = 600 successes = 0 failures = 0 last_code = 0 - for run_args in selection_runs: - debug(f"[ytdlp] Detected selection args from table selection: {run_args}") - debug(f"[ytdlp] Re-invoking download-file with: {run_args}") + total_selection = len(selection_runs) + debug(f"[download-file] Processing {total_selection} selected item(s) from table...") + for idx, run_args in enumerate(selection_runs, 1): + debug(f"[download-file] Item {idx}/{total_selection}: {run_args}") + debug(f"[download-file] Re-invoking download-file for selected item...") exit_code = self._run_impl(None, run_args, config) if exit_code == 0: successes += 1 diff --git a/cmdnat/worker.py b/cmdnat/worker.py index 3c8259d..4f3bbc4 100644 --- a/cmdnat/worker.py +++ b/cmdnat/worker.py @@ -200,7 +200,8 @@ def _render_worker_list(db, status_filter: str | None, limit: int) -> int: date_str = _extract_date(started) start_time = _format_event_timestamp(started) end_time = _format_event_timestamp(ended) - worker_id = str(worker.get("worker_id") or worker.get("id") or "unknown") + worker_id_value = worker.get("worker_id") or worker.get("id") + worker_id = str(worker_id_value) if worker_id_value is not None else "" status = str(worker.get("status") or "unknown") result_state = str(worker.get("result") or "") status_label = status @@ -223,11 +224,17 @@ def _render_worker_list(db, status_filter: str | None, limit: int) -> int: if description and description != error_message: columns.append(("Details", description[:200])) + selection_args = None + if worker_id: + selection_args = ["-id", worker_id] item = { "columns": columns, "__worker_metadata": worker, - "_selection_args": ["-id", worker.get("worker_id")], + "worker_id": worker_id, } + if selection_args: + item["_selection_args"] = list(selection_args) + item["selection_args"] = list(selection_args) ctx.emit(item) log( f"Worker {worker_id[:8]} status={status_label} pipe={pipe_display} " @@ -282,47 +289,81 @@ def _resolve_worker_record(db, payload: Any) -> Dict[str, Any] | None: def _emit_worker_detail(worker: Dict[str, Any], events: List[Dict[str, Any]]) -> None: - stdout_content = worker.get("stdout", "") or "" + rows_emitted = False - lines = stdout_content.splitlines() - - for line in lines: - line = line.strip() - if not line: - continue - - timestamp = "" - level = "INFO" - message = line - - try: - parts = line.split(" - ", 3) - if len(parts) >= 4: - ts_str, _, lvl, msg = parts - timestamp = _format_event_timestamp(ts_str) - level = lvl - message = msg - elif len(parts) == 3: - ts_str, lvl, msg = parts - timestamp = _format_event_timestamp(ts_str) - level = lvl - message = msg - except Exception: - pass - - item = { - "columns": [ - ("Time", - timestamp), - ("Level", - level), - ("Message", - message), - ] + def _emit_columns(columns: List[tuple[str, str]]) -> None: + nonlocal rows_emitted + payload = { + "columns": columns, + "_skip_metadata_propagation": True, } - ctx.emit(item) + ctx.emit(payload) + rows_emitted = True - # Events are already always derived from stdout for now. + if events: + for event in events: + message = _normalize_text(event.get("message")) + if not message: + continue + + level = _normalize_text(event.get("event_type") or event.get("channel") or "INFO") + step = _normalize_text(event.get("step")) + if step: + message = f"[{step}] {message}" + + timestamp = _format_event_timestamp(event.get("created_at") or "") + + _emit_columns([ + ("Time", timestamp), + ("Level", level or "INFO"), + ("Message", message), + ]) + + if not rows_emitted: + stdout_content = worker.get("stdout", "") or "" + lines = stdout_content.splitlines() + + for line in lines: + line = line.strip() + if not line: + continue + + timestamp = "" + level = "INFO" + message = line + + try: + parts = line.split(" - ", 3) + if len(parts) >= 4: + ts_str, _, lvl, msg = parts + timestamp = _format_event_timestamp(ts_str) + level = lvl + message = msg + elif len(parts) == 3: + ts_str, lvl, msg = parts + timestamp = _format_event_timestamp(ts_str) + level = lvl + message = msg + except Exception: + pass + + _emit_columns([ + ("Time", timestamp), + ("Level", level), + ("Message", message), + ]) + + if not rows_emitted: + fallback = ( + _normalize_text(worker.get("error_message")) + or _normalize_text(worker.get("description")) + or "No log output captured for this worker." + ) + _emit_columns([ + ("Time", ""), + ("Level", "INFO"), + ("Message", fallback), + ]) def _summarize_pipe(pipe_value: Any, limit: int = 200) -> str: diff --git a/tool/ytdlp.py b/tool/ytdlp.py index 4e28e65..77025ce 100644 --- a/tool/ytdlp.py +++ b/tool/ytdlp.py @@ -145,6 +145,7 @@ def list_formats( no_playlist: bool = False, playlist_items: Optional[str] = None, cookiefile: Optional[str] = None, + timeout_seconds: int = 20, ) -> Optional[List[Dict[str, Any]]]: """Get available formats for a URL. @@ -154,47 +155,67 @@ def list_formats( if not is_url_supported_by_ytdlp(url): return None - ensure_yt_dlp_ready() - assert yt_dlp is not None + result_container: List[Optional[Any]] = [None, None] # [result, error] - ydl_opts: Dict[str, Any] = { - "quiet": True, - "no_warnings": True, - "skip_download": True, - "noprogress": True, - } + def _do_list() -> None: + try: + ensure_yt_dlp_ready() + assert yt_dlp is not None - if cookiefile: - ydl_opts["cookiefile"] = str(cookiefile) - else: - # Best effort attempt to use browser cookies if no file is explicitly passed - ydl_opts["cookiesfrombrowser"] = "chrome" + ydl_opts: Dict[str, Any] = { + "quiet": True, + "no_warnings": True, + "skip_download": True, + "noprogress": True, + "socket_timeout": min(10, max(1, int(timeout_seconds))), + "retries": 2, + } - if no_playlist: - ydl_opts["noplaylist"] = True - if playlist_items: - ydl_opts["playlist_items"] = str(playlist_items) + if cookiefile: + ydl_opts["cookiefile"] = str(cookiefile) + else: + # Best effort attempt to use browser cookies if no file is explicitly passed + ydl_opts["cookiesfrombrowser"] = "chrome" - try: - with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type] - info = ydl.extract_info(url, download=False) - except Exception as exc: - debug(f"yt-dlp format probe failed for {url}: {exc}") + if no_playlist: + ydl_opts["noplaylist"] = True + if playlist_items: + ydl_opts["playlist_items"] = str(playlist_items) + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type] + info = ydl.extract_info(url, download=False) + + if not isinstance(info, dict): + result_container[0] = None + return + + formats = info.get("formats") + if not isinstance(formats, list): + result_container[0] = None + return + + out: List[Dict[str, Any]] = [] + for fmt in formats: + if isinstance(fmt, dict): + out.append(fmt) + result_container[0] = out + except Exception as exc: + debug(f"yt-dlp format probe failed for {url}: {exc}") + result_container[1] = exc + + # Use daemon=True so a hung thread doesn't block process exit + thread = threading.Thread(target=_do_list, daemon=True) + thread.start() + thread.join(timeout=max(1, int(timeout_seconds))) + + if thread.is_alive(): + debug(f"yt-dlp format probe timed out for {url} (>={timeout_seconds}s)") return None - if not isinstance(info, dict): + if result_container[1] is not None: return None - formats = info.get("formats") - if not isinstance(formats, list): - return None - - out: List[Dict[str, Any]] = [] - for fmt in formats: - if isinstance(fmt, dict): - out.append(fmt) - - return out + return cast(Optional[List[Dict[str, Any]]], result_container[0]) def probe_url( @@ -216,6 +237,7 @@ def probe_url( def _do_probe() -> None: try: + debug(f"[probe] Starting probe for {url}") ensure_yt_dlp_ready() assert yt_dlp is not None @@ -235,7 +257,9 @@ def probe_url( ydl_opts["noplaylist"] = True with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type] + debug(f"[probe] ytdlp extract_info (download=False) start: {url}") info = ydl.extract_info(url, download=False) + debug(f"[probe] ytdlp extract_info (download=False) done: {url}") if not isinstance(info, dict): result_container[0] = None @@ -258,7 +282,8 @@ def probe_url( debug(f"Probe error for {url}: {exc}") result_container[1] = exc - thread = threading.Thread(target=_do_probe, daemon=False) + # Use daemon=True so a hung probe doesn't block the process + thread = threading.Thread(target=_do_probe, daemon=True) thread.start() thread.join(timeout=timeout_seconds) @@ -1194,6 +1219,7 @@ except ImportError: def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] = None) -> Any: """Download streaming media exclusively via yt-dlp.""" + debug(f"[download_media] start: {opts.url}") try: netloc = urlparse(opts.url).netloc.lower() except Exception: @@ -1536,20 +1562,37 @@ def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300) -> except Exception as exc: result_container[1] = exc - thread = threading.Thread(target=_do_download, daemon=False) + # Use daemon=True so a hung download doesn't block process exit if the wall timeout hits. + thread = threading.Thread(target=_do_download, daemon=True) thread.start() start_time = time.monotonic() + + # We use two timeouts: + # 1. Activity timeout (no progress updates for X seconds) + # 2. Hard wall-clock timeout (total time for this URL) + # The wall-clock timeout is slightly larger than the activity timeout + # to allow for slow-but-steady progress, up to a hard cap (e.g. 10 minutes). + wall_timeout = max(timeout_seconds * 2, 600) + _record_progress_activity(start_time) try: while thread.is_alive(): thread.join(1) if not thread.is_alive(): break + + now = time.monotonic() + + # Check activity timeout last_activity = _get_last_progress_activity() if last_activity <= 0: last_activity = start_time - if time.monotonic() - last_activity > timeout_seconds: - raise DownloadError(f"Download timeout after {timeout_seconds} seconds for {opts.url}") + if now - last_activity > timeout_seconds: + raise DownloadError(f"Download activity timeout after {timeout_seconds} seconds for {opts.url}") + + # Check hard wall-clock timeout + if now - start_time > wall_timeout: + raise DownloadError(f"Download hard timeout after {wall_timeout} seconds for {opts.url}") finally: _clear_progress_activity()