This commit is contained in:
2026-01-18 03:18:48 -08:00
parent 3f874af54a
commit aa675a625a
8 changed files with 508 additions and 161 deletions

View File

@@ -234,7 +234,7 @@
"ddl\\.to/([0-9a-zA-Z]{12})"
],
"regexp": "((ddownload\\.com/[0-9a-zA-Z]{12}))|(ddl\\.to/([0-9a-zA-Z]{12}))",
"status": true
"status": false
},
"dropapk": {
"name": "dropapk",

View File

@@ -2508,34 +2508,25 @@ class API_folder_store:
)
return False
payload = text if text.endswith("\n") else f"{text}\n"
cursor = self.connection.cursor()
cursor.execute(
"SELECT stdout FROM worker WHERE worker_id = ?",
(worker_id,
)
"""
UPDATE worker
SET stdout = CASE
WHEN stdout IS NULL OR stdout = '' THEN ?
WHEN substr(stdout, -1, 1) = '\n' THEN stdout || ?
ELSE stdout || '\n' || ?
END,
last_updated = CURRENT_TIMESTAMP,
last_stdout_at = CURRENT_TIMESTAMP
WHERE worker_id = ?
""",
(payload, payload, payload, worker_id),
)
row = cursor.fetchone()
if not row:
if cursor.rowcount <= 0:
logger.warning(f"Worker {worker_id} not found for stdout append")
return False
current_stdout = row[0] or ""
separator = (
"" if not current_stdout else
("" if current_stdout.endswith("\n") else "\n")
)
new_stdout = f"{current_stdout}{separator}{text}\n"
cursor.execute(
"""
UPDATE worker SET stdout = ?, last_updated = CURRENT_TIMESTAMP,
last_stdout_at = CURRENT_TIMESTAMP
WHERE worker_id = ?
""",
(new_stdout,
worker_id),
)
self._insert_worker_log_entry(
cursor,
worker_id,

50
CLI.py
View File

@@ -2856,6 +2856,56 @@ class PipelineExecutor:
except Exception:
auto_stage = None
source_cmd_for_selection = None
source_args_for_selection: List[str] = []
try:
source_cmd_for_selection = (
ctx.get_current_stage_table_source_command()
or ctx.get_last_result_table_source_command()
)
source_args_for_selection = (
ctx.get_current_stage_table_source_args()
or ctx.get_last_result_table_source_args()
or []
)
except Exception:
source_cmd_for_selection = None
source_args_for_selection = []
if not stages and selection_indices and source_cmd_for_selection:
src_norm = _norm_cmd(source_cmd_for_selection)
if src_norm in {".worker", "worker", "workers"}:
if len(selection_indices) == 1:
idx = selection_indices[0]
row_args = None
try:
row_args = ctx.get_current_stage_table_row_selection_args(idx)
except Exception:
row_args = None
if not row_args:
try:
row_args = ctx.get_last_result_table_row_selection_args(idx)
except Exception:
row_args = None
if not row_args:
try:
items = ctx.get_last_result_items() or []
if 0 <= idx < len(items):
maybe = items[idx]
if isinstance(maybe, dict):
candidate = maybe.get("_selection_args")
if isinstance(candidate, (list, tuple)):
row_args = [str(x) for x in candidate if x is not None]
except Exception:
row_args = row_args or None
if row_args:
stages.append(
[str(source_cmd_for_selection)]
+ [str(x) for x in row_args if x is not None]
+ [str(x) for x in source_args_for_selection if x is not None]
)
def _apply_row_action_to_stage(stage_idx: int) -> bool:
if not selection_indices or len(selection_indices) != 1:
return False

View File

@@ -6,7 +6,7 @@ persistence to database and optional auto-refresh callbacks.
import logging
from pathlib import Path
from typing import Optional, Dict, Any, List, Callable
from typing import Optional, Dict, Any, List, Callable, Tuple
from datetime import datetime
from threading import Thread, Lock
import time
@@ -270,6 +270,13 @@ class WorkerManager:
WorkerLoggingHandler] = {} # Track active handlers
self._worker_last_step: Dict[str,
str] = {}
# Buffered stdout/log batching to reduce DB lock contention.
self._stdout_buffers: Dict[Tuple[str, str], List[str]] = {}
self._stdout_buffer_sizes: Dict[Tuple[str, str], int] = {}
self._stdout_buffer_steps: Dict[Tuple[str, str], Optional[str]] = {}
self._stdout_last_flush: Dict[Tuple[str, str], float] = {}
self._stdout_flush_bytes = 4096
self._stdout_flush_interval = 0.75
def close(self) -> None:
"""Close the database connection."""
@@ -392,9 +399,15 @@ class WorkerManager:
root_logger = logging.getLogger()
root_logger.removeHandler(handler)
logger.debug(
f"[WorkerManager] Disabled logging for worker: {worker_id}"
)
# Flush any buffered stdout/log data for this worker
try:
self.flush_worker_stdout(worker_id)
except Exception:
pass
logger.debug(
f"[WorkerManager] Disabled logging for worker: {worker_id}"
)
except Exception as e:
logger.error(
f"[WorkerManager] Error disabling logging for worker {worker_id}: {e}",
@@ -508,6 +521,10 @@ class WorkerManager:
True if update was successful
"""
try:
try:
self.flush_worker_stdout(worker_id)
except Exception:
pass
kwargs = {
"status": result,
"completed_at": datetime.now().isoformat()
@@ -742,17 +759,119 @@ class WorkerManager:
Returns:
True if append was successful
"""
if not text:
return True
now = time.monotonic()
step_label = self._get_last_step(worker_id)
key = (worker_id, channel)
pending_flush: List[Tuple[str, str, Optional[str], str]] = []
try:
with self._lock:
# Initialize last flush time for this buffer
if key not in self._stdout_last_flush:
self._stdout_last_flush[key] = now
current_step = self._stdout_buffer_steps.get(key)
if current_step is None:
self._stdout_buffer_steps[key] = step_label
current_step = step_label
# If step changes, flush existing buffer to keep step tags coherent
if current_step != step_label:
buffered = "".join(self._stdout_buffers.get(key, []))
if buffered:
pending_flush.append((worker_id, channel, current_step, buffered))
self._stdout_buffers[key] = []
self._stdout_buffer_sizes[key] = 0
self._stdout_last_flush[key] = now
self._stdout_buffer_steps[key] = step_label
buf = self._stdout_buffers.setdefault(key, [])
buf.append(text)
size = self._stdout_buffer_sizes.get(key, 0) + len(text)
self._stdout_buffer_sizes[key] = size
last_flush = self._stdout_last_flush.get(key, now)
should_flush = (
size >= self._stdout_flush_bytes
or (now - last_flush) >= self._stdout_flush_interval
)
if should_flush:
buffered = "".join(self._stdout_buffers.get(key, []))
if buffered:
pending_flush.append(
(worker_id, channel, self._stdout_buffer_steps.get(key), buffered)
)
self._stdout_buffers[key] = []
self._stdout_buffer_sizes[key] = 0
self._stdout_last_flush[key] = now
self._stdout_buffer_steps[key] = None
except Exception as e:
logger.error(f"[WorkerManager] Error buffering stdout: {e}", exc_info=True)
return False
ok = True
for wid, ch, step, payload in pending_flush:
try:
with self._db_lock:
result = self.db.append_worker_stdout(
wid,
payload,
step=step,
channel=ch
)
ok = ok and result
except Exception as e:
logger.error(
f"[WorkerManager] Error flushing stdout for {wid}: {e}",
exc_info=True,
)
ok = False
return ok
def flush_worker_stdout(self, worker_id: str) -> bool:
"""Flush any buffered stdout/log data for a worker."""
keys_to_flush: List[Tuple[str, str]] = []
with self._lock:
for key in list(self._stdout_buffers.keys()):
if key[0] == worker_id:
keys_to_flush.append(key)
ok = True
for wid, channel in keys_to_flush:
ok = self._flush_stdout_buffer(wid, channel) and ok
return ok
def _flush_stdout_buffer(self, worker_id: str, channel: str) -> bool:
key = (worker_id, channel)
with self._lock:
chunks = self._stdout_buffers.get(key)
if not chunks:
return True
text = "".join(chunks)
step = self._stdout_buffer_steps.get(key)
self._stdout_buffers[key] = []
self._stdout_buffer_sizes[key] = 0
self._stdout_last_flush[key] = time.monotonic()
self._stdout_buffer_steps[key] = None
if not text:
return True
try:
step_label = self._get_last_step(worker_id)
with self._db_lock:
return self.db.append_worker_stdout(
worker_id,
text,
step=step_label,
channel=channel
step=step,
channel=channel,
)
except Exception as e:
logger.error(f"[WorkerManager] Error appending stdout: {e}", exc_info=True)
logger.error(
f"[WorkerManager] Error flushing stdout for {worker_id}: {e}",
exc_info=True,
)
return False
def get_stdout(self, worker_id: str) -> str:
@@ -799,6 +918,17 @@ class WorkerManager:
def close(self) -> None:
"""Close the worker manager and database connection."""
self.stop_auto_refresh()
try:
self._flush_all_stdout_buffers()
except Exception:
pass
with self._db_lock:
self.db.close()
logger.info("[WorkerManager] Closed")
def _flush_all_stdout_buffers(self) -> None:
keys_to_flush: List[Tuple[str, str]] = []
with self._lock:
keys_to_flush = list(self._stdout_buffers.keys())
for wid, channel in keys_to_flush:
self._flush_stdout_buffer(wid, channel)

View File

@@ -9,6 +9,7 @@ import re
import shutil
import sys
import tempfile
import time
from collections.abc import Iterable as IterableABC
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
@@ -2640,6 +2641,9 @@ def propagate_metadata(
is_same_length = len(new_items) == len(prev_normalized)
for i, item in enumerate(new_items):
if isinstance(item, dict) and item.get("_skip_metadata_propagation"):
normalized.append(item)
continue
try:
obj = coerce_to_pipe_object(item)
except Exception:
@@ -3058,6 +3062,9 @@ def check_url_exists_in_storage(
stage_ctx = None
in_pipeline = bool(stage_ctx is not None or ("|" in str(current_cmd_text or "")))
start_time = time.monotonic()
time_budget = 45.0
debug(f"[preflight] check_url_exists_in_storage: checking {len(urls)} url(s)")
if in_pipeline:
try:
already_checked = bool(
@@ -3101,6 +3108,18 @@ def check_url_exists_in_storage(
preflight_cache["url_duplicates"] = url_dup_cache
_store_preflight_cache(preflight_cache)
def _timed_out(reason: str) -> bool:
try:
if (time.monotonic() - start_time) >= time_budget:
debug(
f"Bulk URL preflight timed out after {time_budget:.0f}s ({reason}); continuing"
)
_mark_preflight_checked()
return True
except Exception:
return False
return False
if in_pipeline:
try:
cached_cmd = pipeline_context.load_value("preflight.url_duplicates.command", default="")
@@ -3358,7 +3377,10 @@ def check_url_exists_in_storage(
_mark_preflight_checked()
return True
bulk_mode = len(unique_urls) >= 8
if _timed_out("before backend scan"):
return True
bulk_mode = len(unique_urls) > 1
def _build_bulk_patterns(needles_map: Dict[str, List[str]], max_per_url: int = 3, max_total: int = 240) -> List[str]:
patterns: List[str] = []
@@ -3562,12 +3584,16 @@ def check_url_exists_in_storage(
HydrusNetwork = None # type: ignore
for backend_name in backend_names:
if _timed_out("backend scan"):
return True
if len(match_rows) >= max_rows:
break
try:
backend = storage[backend_name]
except Exception:
continue
debug(f"[preflight] Scanning backend: {backend_name}")
if HydrusNetwork is not None and isinstance(backend, HydrusNetwork):
client = getattr(backend, "_client", None)
@@ -3576,6 +3602,9 @@ def check_url_exists_in_storage(
if not hydrus_available:
debug("Bulk URL preflight: hydrus availability check failed; attempting best-effort lookup")
if _timed_out("hydrus scan"):
return True
if bulk_mode and bulk_patterns:
bulk_hits: Optional[List[Any]] = None
bulk_limit = min(2000, max(200, len(unique_urls) * 8))
@@ -3591,40 +3620,49 @@ def check_url_exists_in_storage(
except Exception:
bulk_hits = None
if bulk_hits is not None:
for hit in bulk_hits:
if len(match_rows) >= max_rows:
break
url_values = _extract_urls_from_hit(hit, backend, allow_backend_lookup=False)
if not url_values:
continue
for original_url, needles in url_needles.items():
if len(match_rows) >= max_rows:
break
if (original_url, str(backend_name)) in seen_pairs:
continue
matched = False
for url_value in url_values:
for needle in (needles or []):
if _match_normalized_url(str(needle or ""), str(url_value or "")):
matched = True
break
if matched:
break
if not matched:
continue
seen_pairs.add((original_url, str(backend_name)))
matched_urls.add(original_url)
match_rows.append(
_build_display_row_for_hit(hit, str(backend_name), original_url)
)
if bulk_hits is None:
debug("Bulk URL preflight: Hydrus bulk scan failed; skipping per-URL checks")
continue
for hit in bulk_hits:
if _timed_out("hydrus bulk scan"):
return True
if len(match_rows) >= max_rows:
break
url_values = _extract_urls_from_hit(hit, backend, allow_backend_lookup=False)
if not url_values:
continue
for original_url, needles in url_needles.items():
if _timed_out("hydrus bulk scan"):
return True
if len(match_rows) >= max_rows:
break
if (original_url, str(backend_name)) in seen_pairs:
continue
matched = False
for url_value in url_values:
for needle in (needles or []):
if _match_normalized_url(str(needle or ""), str(url_value or "")):
matched = True
break
if matched:
break
if not matched:
continue
seen_pairs.add((original_url, str(backend_name)))
matched_urls.add(original_url)
match_rows.append(
_build_display_row_for_hit(hit, str(backend_name), original_url)
)
continue
for original_url, needles in url_needles.items():
if _timed_out("hydrus per-url scan"):
return True
if len(match_rows) >= max_rows:
break
if (original_url, str(backend_name)) in seen_pairs:
@@ -3705,6 +3743,8 @@ def check_url_exists_in_storage(
if bulk_hits is not None:
for hit in bulk_hits:
if _timed_out("backend bulk scan"):
return True
if len(match_rows) >= max_rows:
break
url_values = _extract_urls_from_hit(hit, backend, allow_backend_lookup=False)
@@ -3712,6 +3752,8 @@ def check_url_exists_in_storage(
continue
for original_url, needles in url_needles.items():
if _timed_out("backend bulk scan"):
return True
if len(match_rows) >= max_rows:
break
if (original_url, str(backend_name)) in seen_pairs:
@@ -3737,6 +3779,8 @@ def check_url_exists_in_storage(
continue
for original_url, needles in url_needles.items():
if _timed_out("backend per-url scan"):
return True
if len(match_rows) >= max_rows:
break
if (original_url, str(backend_name)) in seen_pairs:

View File

@@ -387,18 +387,23 @@ class Download_File(Cmdlet):
total_items = len(expanded_items)
processed_items = 0
debug(f"[download-file] Processing {total_items} piped item(s)...")
try:
if total_items:
progress.set_percent(0)
except Exception:
pass
for item in expanded_items:
for idx, item in enumerate(expanded_items, 1):
try:
label = "item"
table = get_field(item, "table")
title = get_field(item, "title")
target = get_field(item, "path") or get_field(item, "url")
debug(f"[download-file] Item {idx}/{total_items}: {title or target or 'unnamed'}")
media_kind = get_field(item, "media_kind")
tags_val = get_field(item, "tag")
tags_list: Optional[List[str]]
@@ -931,15 +936,26 @@ class Download_File(Cmdlet):
@staticmethod
def _init_storage(config: Dict[str, Any]) -> tuple[Optional[Any], bool]:
# Cache storage object in config to avoid excessive DB initialization in loops
if isinstance(config, dict) and "_storage_cache" in config:
cached = config["_storage_cache"]
if isinstance(cached, tuple) and len(cached) == 2:
return cached # type: ignore
storage = None
hydrus_available = True
try:
from Store import Store
from API.HydrusNetwork import is_hydrus_available
debug(f"[download-file] Initializing storage interface...")
storage = Store(config=config or {}, suppress_debug=True)
hydrus_available = bool(is_hydrus_available(config or {}))
except Exception:
if isinstance(config, dict):
config["_storage_cache"] = (storage, hydrus_available)
except Exception as e:
debug(f"[download-file] Storage initialization error: {e}")
storage = None
return storage, hydrus_available
@@ -1052,6 +1068,7 @@ class Download_File(Cmdlet):
@staticmethod
def _canonicalize_url_for_storage(*, requested_url: str, ytdlp_tool: YtDlpTool, playlist_items: Optional[str]) -> str:
if playlist_items:
debug(f"[download-file] Skipping canonicalization for playlist item(s): {playlist_items}")
return str(requested_url)
try:
cf = None
@@ -1061,14 +1078,19 @@ class Download_File(Cmdlet):
cf = str(cookie_path)
except Exception:
cf = None
debug(f"[download-file] Canonicalizing URL: {requested_url}")
pr = probe_url(requested_url, no_playlist=False, timeout_seconds=15, cookiefile=cf)
if isinstance(pr, dict):
for key in ("webpage_url", "original_url", "url", "requested_url"):
value = pr.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
except Exception:
pass
canon = value.strip()
if canon != requested_url:
debug(f"[download-file] Resolved canonical URL: {requested_url} -> {canon}")
return canon
except Exception as e:
debug(f"[download-file] Canonicalization error for {requested_url}: {e}")
return str(requested_url)
@@ -1113,6 +1135,10 @@ class Download_File(Cmdlet):
def _maybe_show_playlist_table(self, *, url: str, ytdlp_tool: YtDlpTool) -> bool:
ctx = pipeline_context.get_stage_context()
if ctx is not None and getattr(ctx, "total_stages", 0) > 1:
return False
try:
cf = self._cookiefile_str(ytdlp_tool)
pr = probe_url(url, no_playlist=False, timeout_seconds=15, cookiefile=cf)
@@ -1240,6 +1266,13 @@ class Download_File(Cmdlet):
args: Sequence[str],
skip_preflight: bool = False,
) -> Optional[int]:
try:
ctx = pipeline_context.get_stage_context()
if ctx is not None and getattr(ctx, "total_stages", 0) > 1:
# In pipelines, skip interactive format tables; require explicit -query format.
return None
except Exception:
pass
if (
mode != "audio"
and not clip_spec
@@ -1415,7 +1448,7 @@ class Download_File(Cmdlet):
for url in supported_url:
try:
debug(f"Processing: {url}")
debug(f"[download-file] Processing URL in loop (1/3 stage 1): {url}")
canonical_url = self._canonicalize_url_for_storage(
requested_url=url,
@@ -1424,6 +1457,7 @@ class Download_File(Cmdlet):
)
if not skip_per_url_preflight:
debug(f"[download-file] Running duplicate preflight for: {canonical_url}")
if not self._preflight_url_duplicate(
storage=storage,
hydrus_available=hydrus_available,
@@ -1431,7 +1465,7 @@ class Download_File(Cmdlet):
candidate_url=canonical_url,
extra_urls=[url],
):
log(f"Skipping download: {url}", file=sys.stderr)
log(f"Skipping download (duplicate found): {url}", file=sys.stderr)
continue
PipelineProgress(pipeline_context).begin_steps(2)
@@ -1510,9 +1544,9 @@ class Download_File(Cmdlet):
)
PipelineProgress(pipeline_context).step("downloading")
debug(f"Starting download with 5-minute timeout...")
debug(f"Starting download for {url} (format: {actual_format or 'default'}) with {download_timeout_seconds}s activity timeout...")
result_obj = _download_with_timeout(opts, timeout_seconds=download_timeout_seconds)
debug(f"Download completed, building pipe object...")
debug(f"Download completed for {url}, building pipe object...")
break
except DownloadError as e:
cause = getattr(e, "__cause__", None)
@@ -1816,14 +1850,21 @@ class Download_File(Cmdlet):
debug(f"Output directory: {final_output_dir}")
try:
PipelineProgress(pipeline_context).ensure_local_ui(
label="download-file",
total_items=len(supported_url),
items_preview=supported_url,
)
except Exception:
pass
# If we are already in a pipeline stage, the parent UI is already handling progress.
# Calling ensure_local_ui can cause re-initialization hangs on some platforms.
if pipeline_context.get_stage_context() is None:
debug("[download-file] Initializing local UI...")
PipelineProgress(pipeline_context).ensure_local_ui(
label="download-file",
total_items=len(supported_url),
items_preview=supported_url,
)
else:
debug("[download-file] Skipping local UI: running inside pipeline stage")
except Exception as e:
debug(f"[download-file] PipelineProgress update error: {e}")
debug("[download-file] Parsing clip and query specs...")
clip_spec = parsed.get("clip")
query_spec = parsed.get("query")
@@ -1914,6 +1955,7 @@ class Download_File(Cmdlet):
if query_format and not query_wants_audio:
try:
debug(f"[download-file] Resolving numeric format for {candidate_url}...")
idx_fmt = self._format_id_for_query_index(query_format, candidate_url, formats_cache, ytdlp_tool)
except ValueError as e:
log(f"Error parsing format selection: {e}", file=sys.stderr)
@@ -1923,6 +1965,7 @@ class Download_File(Cmdlet):
ytdl_format = idx_fmt
if not ytdl_format:
debug(f"[download-file] Checking for playlist at {candidate_url}...")
if self._maybe_show_playlist_table(url=candidate_url, ytdlp_tool=ytdlp_tool):
playlist_selection_handled = True
try:
@@ -1996,6 +2039,7 @@ class Download_File(Cmdlet):
forced_single_format_id = None
forced_single_format_for_batch = False
debug(f"[download-file] Checking if format table should be shown...")
early_ret = self._maybe_show_format_table_for_single_url(
mode=mode,
clip_spec=clip_spec,
@@ -2023,6 +2067,7 @@ class Download_File(Cmdlet):
except Exception:
timeout_seconds = 300
debug(f"[download-file] Proceeding to final download call for {len(supported_url)} URL(s)...")
return self._download_supported_urls(
supported_url=supported_url,
ytdlp_tool=ytdlp_tool,
@@ -2693,14 +2738,17 @@ class Download_File(Cmdlet):
config["_skip_direct_on_streaming_failure"] = True
if isinstance(config, dict) and config.get("_pipeobject_timeout_seconds") is None:
config["_pipeobject_timeout_seconds"] = 60
# Use a generous default for individual items
config["_pipeobject_timeout_seconds"] = 600
successes = 0
failures = 0
last_code = 0
for run_args in selection_runs:
debug(f"[ytdlp] Detected selection args from table selection: {run_args}")
debug(f"[ytdlp] Re-invoking download-file with: {run_args}")
total_selection = len(selection_runs)
debug(f"[download-file] Processing {total_selection} selected item(s) from table...")
for idx, run_args in enumerate(selection_runs, 1):
debug(f"[download-file] Item {idx}/{total_selection}: {run_args}")
debug(f"[download-file] Re-invoking download-file for selected item...")
exit_code = self._run_impl(None, run_args, config)
if exit_code == 0:
successes += 1

View File

@@ -200,7 +200,8 @@ def _render_worker_list(db, status_filter: str | None, limit: int) -> int:
date_str = _extract_date(started)
start_time = _format_event_timestamp(started)
end_time = _format_event_timestamp(ended)
worker_id = str(worker.get("worker_id") or worker.get("id") or "unknown")
worker_id_value = worker.get("worker_id") or worker.get("id")
worker_id = str(worker_id_value) if worker_id_value is not None else ""
status = str(worker.get("status") or "unknown")
result_state = str(worker.get("result") or "")
status_label = status
@@ -223,11 +224,17 @@ def _render_worker_list(db, status_filter: str | None, limit: int) -> int:
if description and description != error_message:
columns.append(("Details", description[:200]))
selection_args = None
if worker_id:
selection_args = ["-id", worker_id]
item = {
"columns": columns,
"__worker_metadata": worker,
"_selection_args": ["-id", worker.get("worker_id")],
"worker_id": worker_id,
}
if selection_args:
item["_selection_args"] = list(selection_args)
item["selection_args"] = list(selection_args)
ctx.emit(item)
log(
f"Worker {worker_id[:8]} status={status_label} pipe={pipe_display} "
@@ -282,47 +289,81 @@ def _resolve_worker_record(db, payload: Any) -> Dict[str, Any] | None:
def _emit_worker_detail(worker: Dict[str, Any], events: List[Dict[str, Any]]) -> None:
stdout_content = worker.get("stdout", "") or ""
rows_emitted = False
lines = stdout_content.splitlines()
for line in lines:
line = line.strip()
if not line:
continue
timestamp = ""
level = "INFO"
message = line
try:
parts = line.split(" - ", 3)
if len(parts) >= 4:
ts_str, _, lvl, msg = parts
timestamp = _format_event_timestamp(ts_str)
level = lvl
message = msg
elif len(parts) == 3:
ts_str, lvl, msg = parts
timestamp = _format_event_timestamp(ts_str)
level = lvl
message = msg
except Exception:
pass
item = {
"columns": [
("Time",
timestamp),
("Level",
level),
("Message",
message),
]
def _emit_columns(columns: List[tuple[str, str]]) -> None:
nonlocal rows_emitted
payload = {
"columns": columns,
"_skip_metadata_propagation": True,
}
ctx.emit(item)
ctx.emit(payload)
rows_emitted = True
# Events are already always derived from stdout for now.
if events:
for event in events:
message = _normalize_text(event.get("message"))
if not message:
continue
level = _normalize_text(event.get("event_type") or event.get("channel") or "INFO")
step = _normalize_text(event.get("step"))
if step:
message = f"[{step}] {message}"
timestamp = _format_event_timestamp(event.get("created_at") or "")
_emit_columns([
("Time", timestamp),
("Level", level or "INFO"),
("Message", message),
])
if not rows_emitted:
stdout_content = worker.get("stdout", "") or ""
lines = stdout_content.splitlines()
for line in lines:
line = line.strip()
if not line:
continue
timestamp = ""
level = "INFO"
message = line
try:
parts = line.split(" - ", 3)
if len(parts) >= 4:
ts_str, _, lvl, msg = parts
timestamp = _format_event_timestamp(ts_str)
level = lvl
message = msg
elif len(parts) == 3:
ts_str, lvl, msg = parts
timestamp = _format_event_timestamp(ts_str)
level = lvl
message = msg
except Exception:
pass
_emit_columns([
("Time", timestamp),
("Level", level),
("Message", message),
])
if not rows_emitted:
fallback = (
_normalize_text(worker.get("error_message"))
or _normalize_text(worker.get("description"))
or "No log output captured for this worker."
)
_emit_columns([
("Time", ""),
("Level", "INFO"),
("Message", fallback),
])
def _summarize_pipe(pipe_value: Any, limit: int = 200) -> str:

View File

@@ -145,6 +145,7 @@ def list_formats(
no_playlist: bool = False,
playlist_items: Optional[str] = None,
cookiefile: Optional[str] = None,
timeout_seconds: int = 20,
) -> Optional[List[Dict[str, Any]]]:
"""Get available formats for a URL.
@@ -154,47 +155,67 @@ def list_formats(
if not is_url_supported_by_ytdlp(url):
return None
ensure_yt_dlp_ready()
assert yt_dlp is not None
result_container: List[Optional[Any]] = [None, None] # [result, error]
ydl_opts: Dict[str, Any] = {
"quiet": True,
"no_warnings": True,
"skip_download": True,
"noprogress": True,
}
def _do_list() -> None:
try:
ensure_yt_dlp_ready()
assert yt_dlp is not None
if cookiefile:
ydl_opts["cookiefile"] = str(cookiefile)
else:
# Best effort attempt to use browser cookies if no file is explicitly passed
ydl_opts["cookiesfrombrowser"] = "chrome"
ydl_opts: Dict[str, Any] = {
"quiet": True,
"no_warnings": True,
"skip_download": True,
"noprogress": True,
"socket_timeout": min(10, max(1, int(timeout_seconds))),
"retries": 2,
}
if no_playlist:
ydl_opts["noplaylist"] = True
if playlist_items:
ydl_opts["playlist_items"] = str(playlist_items)
if cookiefile:
ydl_opts["cookiefile"] = str(cookiefile)
else:
# Best effort attempt to use browser cookies if no file is explicitly passed
ydl_opts["cookiesfrombrowser"] = "chrome"
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
info = ydl.extract_info(url, download=False)
except Exception as exc:
debug(f"yt-dlp format probe failed for {url}: {exc}")
if no_playlist:
ydl_opts["noplaylist"] = True
if playlist_items:
ydl_opts["playlist_items"] = str(playlist_items)
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
info = ydl.extract_info(url, download=False)
if not isinstance(info, dict):
result_container[0] = None
return
formats = info.get("formats")
if not isinstance(formats, list):
result_container[0] = None
return
out: List[Dict[str, Any]] = []
for fmt in formats:
if isinstance(fmt, dict):
out.append(fmt)
result_container[0] = out
except Exception as exc:
debug(f"yt-dlp format probe failed for {url}: {exc}")
result_container[1] = exc
# Use daemon=True so a hung thread doesn't block process exit
thread = threading.Thread(target=_do_list, daemon=True)
thread.start()
thread.join(timeout=max(1, int(timeout_seconds)))
if thread.is_alive():
debug(f"yt-dlp format probe timed out for {url} (>={timeout_seconds}s)")
return None
if not isinstance(info, dict):
if result_container[1] is not None:
return None
formats = info.get("formats")
if not isinstance(formats, list):
return None
out: List[Dict[str, Any]] = []
for fmt in formats:
if isinstance(fmt, dict):
out.append(fmt)
return out
return cast(Optional[List[Dict[str, Any]]], result_container[0])
def probe_url(
@@ -216,6 +237,7 @@ def probe_url(
def _do_probe() -> None:
try:
debug(f"[probe] Starting probe for {url}")
ensure_yt_dlp_ready()
assert yt_dlp is not None
@@ -235,7 +257,9 @@ def probe_url(
ydl_opts["noplaylist"] = True
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[arg-type]
debug(f"[probe] ytdlp extract_info (download=False) start: {url}")
info = ydl.extract_info(url, download=False)
debug(f"[probe] ytdlp extract_info (download=False) done: {url}")
if not isinstance(info, dict):
result_container[0] = None
@@ -258,7 +282,8 @@ def probe_url(
debug(f"Probe error for {url}: {exc}")
result_container[1] = exc
thread = threading.Thread(target=_do_probe, daemon=False)
# Use daemon=True so a hung probe doesn't block the process
thread = threading.Thread(target=_do_probe, daemon=True)
thread.start()
thread.join(timeout=timeout_seconds)
@@ -1194,6 +1219,7 @@ except ImportError:
def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] = None) -> Any:
"""Download streaming media exclusively via yt-dlp."""
debug(f"[download_media] start: {opts.url}")
try:
netloc = urlparse(opts.url).netloc.lower()
except Exception:
@@ -1536,20 +1562,37 @@ def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300) ->
except Exception as exc:
result_container[1] = exc
thread = threading.Thread(target=_do_download, daemon=False)
# Use daemon=True so a hung download doesn't block process exit if the wall timeout hits.
thread = threading.Thread(target=_do_download, daemon=True)
thread.start()
start_time = time.monotonic()
# We use two timeouts:
# 1. Activity timeout (no progress updates for X seconds)
# 2. Hard wall-clock timeout (total time for this URL)
# The wall-clock timeout is slightly larger than the activity timeout
# to allow for slow-but-steady progress, up to a hard cap (e.g. 10 minutes).
wall_timeout = max(timeout_seconds * 2, 600)
_record_progress_activity(start_time)
try:
while thread.is_alive():
thread.join(1)
if not thread.is_alive():
break
now = time.monotonic()
# Check activity timeout
last_activity = _get_last_progress_activity()
if last_activity <= 0:
last_activity = start_time
if time.monotonic() - last_activity > timeout_seconds:
raise DownloadError(f"Download timeout after {timeout_seconds} seconds for {opts.url}")
if now - last_activity > timeout_seconds:
raise DownloadError(f"Download activity timeout after {timeout_seconds} seconds for {opts.url}")
# Check hard wall-clock timeout
if now - start_time > wall_timeout:
raise DownloadError(f"Download hard timeout after {wall_timeout} seconds for {opts.url}")
finally:
_clear_progress_activity()