Files
Medios-Macina/API/HTTP.py

1239 lines
41 KiB
Python
Raw Normal View History

2025-11-25 20:09:33 -08:00
"""
Unified HTTP client for downlow using httpx.
Provides synchronous and asynchronous HTTP operations with:
- Automatic retries on transient failures
- Configurable timeouts and headers
- Built-in progress tracking for downloads
- Request/response logging support
"""
import httpx
import asyncio
2026-01-05 07:51:19 -08:00
import sys
import time
import traceback
import re
import os
from typing import Optional, Dict, Any, Callable, BinaryIO, List, Iterable, Set, Union
2025-11-25 20:09:33 -08:00
from pathlib import Path
2026-01-05 07:51:19 -08:00
from urllib.parse import unquote, urlparse, parse_qs
2025-11-25 20:09:33 -08:00
import logging
from SYS.logger import debug, is_debug_enabled, log
2026-01-05 07:51:19 -08:00
from SYS.models import DebugLogger, DownloadError, DownloadMediaResult, ProgressBar
from SYS.utils import ensure_directory, sha256_file
try: # Optional; used for metadata extraction when available
from SYS.metadata import extract_ytdlp_tags
except Exception: # pragma: no cover - optional dependency
extract_ytdlp_tags = None # type: ignore[assignment]
2025-11-25 20:09:33 -08:00
logger = logging.getLogger(__name__)
def _resolve_verify_value(verify_ssl: bool) -> Union[bool, str]:
"""Return the httpx verify argument, preferring system-aware bundles.
Order of precedence:
1. If verify_ssl is not True (False or path), return it.
2. Respect existing SSL_CERT_FILE env var if present.
3. Prefer `pip_system_certs` if present and it exposes a bundle path.
4. Prefer `certifi_win32`/similar helpers by invoking them and reading certifi.where().
5. Fall back to `certifi.where()` if available.
6. Otherwise, return True to let httpx use system defaults.
"""
if verify_ssl is not True:
return verify_ssl
env_cert = os.environ.get("SSL_CERT_FILE")
if env_cert:
return env_cert
def _try_module_bundle(mod_name: str) -> Optional[str]:
# Prefer checking sys.modules first (helps test injection / monkeypatching)
try:
mod = sys.modules.get(mod_name)
if mod is None:
mod = __import__(mod_name)
except Exception:
return None
# Common APIs that return a bundle path
for attr in ("where", "get_ca_bundle", "bundle_path", "get_bundle_path", "get_bundle"):
fn = getattr(mod, attr, None)
if callable(fn):
try:
res = fn()
if res:
return res
except Exception:
continue
elif isinstance(fn, str) and fn:
return fn
# Some helpers (e.g., certifi_win32) expose an action to merge system certs
for call_attr in ("add_windows_store_certs", "add_system_certs", "merge_system_certs"):
fn = getattr(mod, call_attr, None)
if callable(fn):
try:
fn()
try:
import certifi as _certifi
res = _certifi.where()
if res:
return res
except Exception:
pass
except Exception:
pass
return None
# Prefer pip_system_certs if available
for mod_name in ("pip_system_certs",):
path = _try_module_bundle(mod_name)
if path:
try:
os.environ["SSL_CERT_FILE"] = path
except Exception:
pass
logger.info(f"SSL_CERT_FILE not set; using bundle from {mod_name}: {path}")
return path
# Special-case helpers that merge system certs (eg. certifi_win32)
try:
import certifi_win32 as _cw # type: ignore
if hasattr(_cw, "add_windows_store_certs") and callable(_cw.add_windows_store_certs):
try:
_cw.add_windows_store_certs()
except Exception:
pass
try:
import certifi # type: ignore
path = certifi.where()
if path:
try:
os.environ["SSL_CERT_FILE"] = path
except Exception:
pass
logger.info(
f"SSL_CERT_FILE not set; using certifi bundle after certifi_win32: {path}"
)
return path
except Exception:
pass
except Exception:
pass
# Fallback to certifi
try:
import certifi # type: ignore
path = certifi.where()
if path:
try:
os.environ["SSL_CERT_FILE"] = path
except Exception:
pass
logger.info(f"SSL_CERT_FILE not set; using certifi bundle: {path}")
return path
except Exception:
pass
return True
2026-01-06 16:19:29 -08:00
def get_requests_verify_value(verify_ssl: bool = True) -> Union[bool, str]:
"""Expose the verified value for reuse outside of HTTPClient (requests sessions)."""
return _resolve_verify_value(verify_ssl)
2025-11-25 20:09:33 -08:00
# Default configuration
DEFAULT_TIMEOUT = 30.0
DEFAULT_RETRIES = 3
DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
class HTTPClient:
"""Unified HTTP client with sync support."""
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def __init__(
self,
timeout: float = DEFAULT_TIMEOUT,
retries: int = DEFAULT_RETRIES,
user_agent: str = DEFAULT_USER_AGENT,
verify_ssl: bool = True,
headers: Optional[Dict[str,
str]] = None,
2025-11-25 20:09:33 -08:00
):
"""
Initialize HTTP client.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
timeout: Request timeout in seconds
retries: Number of retries on transient failures
user_agent: User-Agent header value
verify_ssl: Whether to verify SSL certificates
headers: Additional headers to include in all requests
"""
self.timeout = timeout
self.retries = retries
self.user_agent = user_agent
self.verify_ssl = verify_ssl
self.base_headers = headers or {}
self._client: Optional[httpx.Client] = None
2025-12-29 17:05:03 -08:00
self._httpx_verify = _resolve_verify_value(verify_ssl)
# Debug helpers
def _debug_panel(self, title: str, rows: List[tuple[str, Any]]) -> None:
if not is_debug_enabled():
return
try:
from rich.table import Table
from rich.panel import Panel
grid = Table.grid(padding=(0, 1))
grid.add_column("Key", style="cyan", no_wrap=True)
grid.add_column("Value")
for key, val in rows:
try:
grid.add_row(str(key), str(val))
except Exception:
grid.add_row(str(key), "<unprintable>")
debug(Panel(grid, title=title, expand=False))
except Exception:
# Fallback to simple debug output
debug(title, rows)
2025-11-25 20:09:33 -08:00
def __enter__(self):
"""Context manager entry."""
self._client = httpx.Client(
timeout=self.timeout,
verify=self._httpx_verify,
2025-11-25 20:09:33 -08:00
headers=self._get_headers(),
)
return self
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
if self._client:
self._client.close()
self._client = None
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def _get_headers(self) -> Dict[str, str]:
"""Get request headers with user-agent."""
headers = {
"User-Agent": self.user_agent
}
2025-11-25 20:09:33 -08:00
headers.update(self.base_headers)
return headers
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def get(
self,
url: str,
params: Optional[Dict[str,
Any]] = None,
headers: Optional[Dict[str,
str]] = None,
2025-11-25 20:09:33 -08:00
allow_redirects: bool = True,
) -> httpx.Response:
"""
Make a GET request.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
url: Request URL
params: Query parameters
headers: Additional headers
allow_redirects: Follow redirects
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
httpx.Response object
"""
return self._request(
"GET",
url,
params=params,
headers=headers,
follow_redirects=allow_redirects,
)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def post(
self,
url: str,
data: Optional[Any] = None,
json: Optional[Dict] = None,
files: Optional[Dict] = None,
headers: Optional[Dict[str,
str]] = None,
2025-11-25 20:09:33 -08:00
) -> httpx.Response:
"""
Make a POST request.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
url: Request URL
data: Form data
json: JSON data
files: Files to upload
headers: Additional headers
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
httpx.Response object
"""
return self._request(
"POST",
url,
data=data,
json=json,
files=files,
headers=headers,
)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def put(
self,
url: str,
data: Optional[Any] = None,
json: Optional[Dict] = None,
content: Optional[Any] = None,
files: Optional[Dict] = None,
headers: Optional[Dict[str,
str]] = None,
2025-11-25 20:09:33 -08:00
) -> httpx.Response:
"""
Make a PUT request.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
url: Request URL
data: Form data
json: JSON data
content: Raw content
files: Files to upload
headers: Additional headers
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
httpx.Response object
"""
return self._request(
"PUT",
url,
data=data,
json=json,
content=content,
files=files,
headers=headers,
)
def delete(
self,
url: str,
headers: Optional[Dict[str,
str]] = None,
2025-11-25 20:09:33 -08:00
) -> httpx.Response:
"""
Make a DELETE request.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
url: Request URL
headers: Additional headers
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
httpx.Response object
"""
return self._request(
"DELETE",
url,
headers=headers,
)
2025-12-29 17:05:03 -08:00
def request(self, method: str, url: str, **kwargs) -> httpx.Response:
2025-11-25 20:09:33 -08:00
"""
Make a generic HTTP request.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
method: HTTP method
url: Request URL
**kwargs: Additional arguments
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
httpx.Response object
"""
return self._request(method, url, **kwargs)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def download(
self,
url: str,
file_path: str,
chunk_size: int = 8192,
progress_callback: Optional[Callable[[int,
int],
None]] = None,
headers: Optional[Dict[str,
str]] = None,
2025-11-25 20:09:33 -08:00
) -> Path:
"""
Download a file from URL with optional progress tracking.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
url: File URL
file_path: Local file path to save to
chunk_size: Download chunk size
progress_callback: Callback(bytes_downloaded, total_bytes)
headers: Additional headers
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
Path object of downloaded file
"""
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
2025-12-29 17:05:03 -08:00
with self._request_stream("GET",
url,
headers=headers,
follow_redirects=True) as response:
2025-11-25 20:09:33 -08:00
response.raise_for_status()
total_bytes = int(response.headers.get("content-length", 0))
bytes_downloaded = 0
2025-12-20 23:57:44 -08:00
# Render progress immediately (even if the transfer is very fast)
if progress_callback:
try:
progress_callback(0, total_bytes)
except Exception:
pass
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
with open(path, "wb") as f:
for chunk in response.iter_bytes(chunk_size):
if chunk:
f.write(chunk)
bytes_downloaded += len(chunk)
if progress_callback:
progress_callback(bytes_downloaded, total_bytes)
2025-12-20 23:57:44 -08:00
# Ensure a final callback is emitted.
if progress_callback:
try:
progress_callback(bytes_downloaded, total_bytes)
except Exception:
pass
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
return path
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def _request(
self,
method: str,
url: str,
2025-12-16 23:23:43 -08:00
raise_for_status: bool = True,
log_http_errors: bool = True,
2025-12-29 17:05:03 -08:00
**kwargs,
2025-11-25 20:09:33 -08:00
) -> httpx.Response:
"""
Make an HTTP request with automatic retries.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
method: HTTP method
url: Request URL
**kwargs: Additional arguments for httpx.Client.request()
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
httpx.Response object
"""
if not self._client:
raise RuntimeError(
"HTTPClient must be used with context manager (with statement)"
)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
# Merge headers
if "headers" in kwargs and kwargs["headers"]:
headers = self._get_headers()
headers.update(kwargs["headers"])
kwargs["headers"] = headers
else:
kwargs["headers"] = self._get_headers()
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
last_exception = None
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
for attempt in range(self.retries):
self._debug_panel(
"HTTP request",
[
("method", method),
("url", url),
("attempt", f"{attempt + 1}/{self.retries}"),
("params", kwargs.get("params")),
("headers", kwargs.get("headers")),
("verify", self._httpx_verify),
("follow_redirects", kwargs.get("follow_redirects", False)),
],
)
2025-11-25 20:09:33 -08:00
try:
response = self._client.request(method, url, **kwargs)
self._debug_panel(
"HTTP response",
[
("method", method),
("url", url),
("status", getattr(response, "status_code", "")),
("elapsed", getattr(response, "elapsed", "")),
(
"content_length",
response.headers.get("content-length") if hasattr(response, "headers") else "",
),
],
)
2025-12-16 23:23:43 -08:00
if raise_for_status:
response.raise_for_status()
2025-11-25 20:09:33 -08:00
return response
except httpx.TimeoutException as e:
last_exception = e
logger.warning(
f"Timeout on attempt {attempt + 1}/{self.retries}: {url}"
)
2025-11-25 20:09:33 -08:00
if attempt < self.retries - 1:
continue
except httpx.HTTPStatusError as e:
# Don't retry on 4xx errors
if 400 <= e.response.status_code < 500:
try:
response_text = e.response.text[:500]
except:
response_text = "<unable to read response>"
2025-12-16 23:23:43 -08:00
if log_http_errors:
logger.error(
f"HTTP {e.response.status_code} from {url}: {response_text}"
)
2025-11-25 20:09:33 -08:00
raise
last_exception = e
try:
response_text = e.response.text[:200]
except:
response_text = "<unable to read response>"
2025-12-29 17:05:03 -08:00
logger.warning(
f"HTTP {e.response.status_code} on attempt {attempt + 1}/{self.retries}: {url} - {response_text}"
)
2025-11-25 20:09:33 -08:00
if attempt < self.retries - 1:
continue
except (httpx.RequestError, httpx.ConnectError) as e:
last_exception = e
2025-12-29 17:05:03 -08:00
logger.warning(
f"Connection error on attempt {attempt + 1}/{self.retries}: {url} - {e}"
)
# Detect certificate verification failures in the underlying error
msg = str(e or "").lower()
if ("certificate verify failed" in msg or "unable to get local issuer certificate" in msg):
logger.info("Certificate verification failed; attempting to retry with a system-aware CA bundle")
try:
import httpx as _httpx
# Use the client's precomputed verify argument (set at init)
verify_override = self._httpx_verify
with _httpx.Client(timeout=self.timeout, verify=verify_override, headers=self._get_headers()) as temp_client:
try:
response = temp_client.request(method, url, **kwargs)
if raise_for_status:
response.raise_for_status()
return response
except Exception as e2:
last_exception = e2
except Exception:
# certifi/pip-system-certs/httpx not available; fall back to existing retry behavior
pass
if attempt < self.retries - 1:
continue
except Exception as e:
# Catch-all to handle non-httpx exceptions that may represent
# certificate verification failures from underlying transports.
last_exception = e
logger.warning(f"Request exception on attempt {attempt + 1}/{self.retries}: {url} - {e}")
msg = str(e or "").lower()
if ("certificate verify failed" in msg or "unable to get local issuer certificate" in msg):
logger.info("Certificate verification failed; attempting to retry with a system-aware CA bundle")
try:
import httpx as _httpx
# Use the client's precomputed verify argument (set at init)
verify_override = self._httpx_verify
with _httpx.Client(timeout=self.timeout, verify=verify_override, headers=self._get_headers()) as temp_client:
try:
response = temp_client.request(method, url, **kwargs)
if raise_for_status:
response.raise_for_status()
return response
except Exception as e2:
last_exception = e2
except Exception:
# certifi/pip-system-certs/httpx not available; fall back to existing retry behavior
pass
2025-11-25 20:09:33 -08:00
if attempt < self.retries - 1:
continue
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
if last_exception:
logger.error(
f"Request failed after {self.retries} attempts: {url} - {last_exception}"
)
2025-11-25 20:09:33 -08:00
raise last_exception
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
raise RuntimeError("Request failed after retries")
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def _request_stream(self, method: str, url: str, **kwargs):
"""Make a streaming request."""
if not self._client:
raise RuntimeError(
"HTTPClient must be used with context manager (with statement)"
)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
# Merge headers
if "headers" in kwargs and kwargs["headers"]:
headers = self._get_headers()
headers.update(kwargs["headers"])
kwargs["headers"] = headers
else:
kwargs["headers"] = self._get_headers()
2025-12-29 17:05:03 -08:00
self._debug_panel(
"HTTP stream",
[
("method", method),
("url", url),
("headers", kwargs.get("headers")),
("follow_redirects", kwargs.get("follow_redirects", False)),
],
)
2025-11-25 20:09:33 -08:00
return self._client.stream(method, url, **kwargs)
2026-01-05 07:51:19 -08:00
def download_direct_file(
url: str,
output_dir: Path,
debug_logger: Optional[DebugLogger] = None,
quiet: bool = False,
suggested_filename: Optional[str] = None,
pipeline_progress: Optional[Any] = None,
) -> DownloadMediaResult:
"""Download a direct file (PDF, image, document, etc.) with guardrails and metadata hooks."""
ensure_directory(output_dir)
def _sanitize_filename(name: str) -> str:
# Windows-safe filename sanitization.
text = str(name or "").strip()
if not text:
return ""
text = text.replace("/", "\\")
text = text.split("\\")[-1]
invalid = set('<>:"/\\|?*')
cleaned_chars: List[str] = []
for ch in text:
o = ord(ch)
if o < 32 or ch in invalid:
cleaned_chars.append(" ")
continue
cleaned_chars.append(ch)
cleaned = " ".join("".join(cleaned_chars).split()).strip()
cleaned = cleaned.rstrip(" .")
return cleaned
def _unique_path(path: Path) -> Path:
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
for i in range(1, 10_000):
candidate = parent / f"{stem} ({i}){suffix}"
if not candidate.exists():
return candidate
return parent / f"{stem} ({int(time.time())}){suffix}"
parsed_url = urlparse(url)
url_path = parsed_url.path
filename: Optional[str] = None
if parsed_url.query:
query_params = parse_qs(parsed_url.query)
for param_name in ("filename", "download", "file", "name"):
if param_name in query_params and query_params[param_name]:
filename = query_params[param_name][0]
filename = unquote(filename)
break
if not filename or not filename.strip():
filename = url_path.split("/")[-1] if url_path else ""
filename = unquote(filename)
if "?" in filename:
filename = filename.split("?")[0]
content_type = ""
try:
with HTTPClient(timeout=10.0) as client:
response = client._request("HEAD", url, follow_redirects=True)
content_disposition = response.headers.get("content-disposition", "")
try:
content_type = str(response.headers.get("content-type", "") or "").strip().lower()
except Exception:
content_type = ""
if content_disposition:
match = re.search(r'filename\*?=(?:"([^"]*)"|([^;\s]*))', content_disposition)
if match:
extracted_name = match.group(1) or match.group(2)
if extracted_name:
filename = unquote(extracted_name)
if not quiet:
debug(f"Filename from Content-Disposition: {filename}")
except Exception as exc:
if not quiet:
log(f"Could not get filename from headers: {exc}", file=sys.stderr)
try:
page_like_exts = {".php", ".asp", ".aspx", ".jsp", ".cgi"}
ext = ""
try:
ext = Path(str(filename or "")).suffix.lower()
except Exception:
ext = ""
ct0 = (content_type or "").split(";", 1)[0].strip().lower()
must_probe = bool(ct0.startswith("text/html") or ext in page_like_exts)
if must_probe:
with HTTPClient(timeout=10.0) as client:
with client._request_stream("GET", url, follow_redirects=True) as resp:
resp.raise_for_status()
ct = (
str(resp.headers.get("content-type", "") or "")
.split(";", 1)[0]
.strip()
.lower()
)
if ct.startswith("text/html"):
raise DownloadError("URL appears to be an HTML page, not a direct file")
except DownloadError:
raise
except Exception:
pass
suggested = _sanitize_filename(suggested_filename) if suggested_filename else ""
if suggested:
suggested_path = Path(suggested)
if suggested_path.suffix:
filename = suggested
else:
detected_ext = ""
try:
detected_ext = Path(str(filename)).suffix
except Exception:
detected_ext = ""
filename = suggested + detected_ext if detected_ext else suggested
try:
has_ext = bool(filename and Path(str(filename)).suffix)
except Exception:
has_ext = False
if filename and (not has_ext):
ct = (content_type or "").split(";", 1)[0].strip().lower()
ext_by_ct = {
"application/pdf": ".pdf",
"application/epub+zip": ".epub",
"application/x-mobipocket-ebook": ".mobi",
"image/jpeg": ".jpg",
"image/png": ".png",
"image/webp": ".webp",
"image/gif": ".gif",
"text/plain": ".txt",
"application/zip": ".zip",
}
if ct in ext_by_ct:
filename = f"{filename}{ext_by_ct[ct]}"
elif ct.startswith("text/html"):
raise DownloadError("URL appears to be an HTML page, not a direct file")
if not filename or not str(filename).strip():
raise DownloadError(
"Could not determine filename for URL (no Content-Disposition and no path filename)"
)
file_path = _unique_path(output_dir / str(filename))
use_pipeline_transfer = False
try:
if pipeline_progress is not None and hasattr(pipeline_progress, "update_transfer"):
ui = None
if hasattr(pipeline_progress, "ui_and_pipe_index"):
ui, _ = pipeline_progress.ui_and_pipe_index() # type: ignore[attr-defined]
use_pipeline_transfer = ui is not None
except Exception:
use_pipeline_transfer = False
progress_bar: Optional[ProgressBar] = None
if (not quiet) and (not use_pipeline_transfer):
progress_bar = ProgressBar()
transfer_started = [False]
if not quiet:
debug(f"Direct download: {filename}")
try:
start_time = time.time()
downloaded_bytes = [0]
transfer_started[0] = False
def _maybe_begin_transfer(content_length: int) -> None:
if pipeline_progress is None or transfer_started[0]:
return
try:
total_val: Optional[int] = (
int(content_length)
if isinstance(content_length, int) and content_length > 0
else None
)
except Exception:
total_val = None
try:
if hasattr(pipeline_progress, "begin_transfer"):
pipeline_progress.begin_transfer(
label=str(filename or "download"),
total=total_val,
)
transfer_started[0] = True
except Exception:
return
def progress_callback(bytes_downloaded: int, content_length: int) -> None:
downloaded_bytes[0] = int(bytes_downloaded or 0)
try:
if pipeline_progress is not None and hasattr(pipeline_progress, "update_transfer"):
_maybe_begin_transfer(content_length)
total_val: Optional[int] = (
int(content_length)
if isinstance(content_length, int) and content_length > 0
else None
)
pipeline_progress.update_transfer(
label=str(filename or "download"),
completed=int(bytes_downloaded or 0),
total=total_val,
)
except Exception:
pass
if progress_bar is not None:
progress_bar.update(
downloaded=int(bytes_downloaded or 0),
total=int(content_length) if content_length and content_length > 0 else None,
label=str(filename or "download"),
file=sys.stderr,
)
with HTTPClient(timeout=30.0) as client:
client.download(url, str(file_path), progress_callback=progress_callback)
elapsed = time.time() - start_time
try:
if progress_bar is not None:
progress_bar.finish()
except Exception:
pass
try:
if pipeline_progress is not None and transfer_started[0] and hasattr(
pipeline_progress, "finish_transfer"
):
pipeline_progress.finish_transfer(label=str(filename or "download"))
except Exception:
pass
if not quiet:
debug(f"✓ Downloaded in {elapsed:.1f}s")
ext_out = ""
try:
ext_out = Path(str(filename)).suffix.lstrip(".")
except Exception:
ext_out = ""
info: Dict[str, Any] = {
"id": str(filename).rsplit(".", 1)[0] if "." in str(filename) else str(filename),
"ext": ext_out,
"webpage_url": url,
}
hash_value = None
try:
hash_value = sha256_file(file_path)
except Exception:
pass
tags: List[str] = []
if extract_ytdlp_tags:
try:
tags = extract_ytdlp_tags(info)
except Exception as exc:
log(f"Error extracting tags: {exc}", file=sys.stderr)
if not any(str(t).startswith("title:") for t in tags):
info["title"] = str(filename)
tags = []
if extract_ytdlp_tags:
try:
tags = extract_ytdlp_tags(info)
except Exception as exc:
log(f"Error extracting tags with filename: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"direct-file-downloaded",
{"url": url, "path": str(file_path), "hash": hash_value},
)
return DownloadMediaResult(
path=file_path,
info=info,
tag=tags,
source_url=url,
hash_value=hash_value,
)
except (httpx.HTTPError, httpx.RequestError) as exc:
try:
if progress_bar is not None:
progress_bar.finish()
except Exception:
pass
try:
if pipeline_progress is not None and transfer_started[0] and hasattr(
pipeline_progress, "finish_transfer"
):
pipeline_progress.finish_transfer(label=str(filename or "download"))
except Exception:
pass
log(f"Download error: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{"phase": "direct-file", "url": url, "error": str(exc)},
)
raise DownloadError(f"Failed to download {url}: {exc}") from exc
except Exception as exc:
try:
if progress_bar is not None:
progress_bar.finish()
except Exception:
pass
try:
if pipeline_progress is not None and transfer_started[0] and hasattr(
pipeline_progress, "finish_transfer"
):
pipeline_progress.finish_transfer(label=str(filename or "download"))
except Exception:
pass
log(f"Error downloading file: {exc}", file=sys.stderr)
if debug_logger is not None:
debug_logger.write_record(
"exception",
{
"phase": "direct-file",
"url": url,
"error": str(exc),
"traceback": traceback.format_exc(),
},
)
raise DownloadError(f"Error downloading file: {exc}") from exc
# Back-compat alias
_download_direct_file = download_direct_file
2025-11-25 20:09:33 -08:00
class AsyncHTTPClient:
"""Unified async HTTP client with asyncio support."""
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def __init__(
self,
timeout: float = DEFAULT_TIMEOUT,
retries: int = DEFAULT_RETRIES,
user_agent: str = DEFAULT_USER_AGENT,
verify_ssl: bool = True,
headers: Optional[Dict[str,
str]] = None,
2025-11-25 20:09:33 -08:00
):
"""
Initialize async HTTP client.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
timeout: Request timeout in seconds
retries: Number of retries on transient failures
user_agent: User-Agent header value
verify_ssl: Whether to verify SSL certificates
headers: Additional headers to include in all requests
"""
self.timeout = timeout
self.retries = retries
self.user_agent = user_agent
self.verify_ssl = verify_ssl
self.base_headers = headers or {}
self._client: Optional[httpx.AsyncClient] = None
self._httpx_verify = _resolve_verify_value(verify_ssl)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
async def __aenter__(self):
"""Async context manager entry."""
self._client = httpx.AsyncClient(
timeout=self.timeout,
verify=self._httpx_verify,
2025-11-25 20:09:33 -08:00
headers=self._get_headers(),
)
return self
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self._client:
await self._client.aclose()
self._client = None
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def _get_headers(self) -> Dict[str, str]:
"""Get request headers with user-agent."""
headers = {
"User-Agent": self.user_agent
}
2025-11-25 20:09:33 -08:00
headers.update(self.base_headers)
return headers
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
async def get(
self,
url: str,
params: Optional[Dict[str,
Any]] = None,
headers: Optional[Dict[str,
str]] = None,
2025-11-25 20:09:33 -08:00
allow_redirects: bool = True,
) -> httpx.Response:
"""
Make an async GET request.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
url: Request URL
params: Query parameters
headers: Additional headers
allow_redirects: Follow redirects
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
httpx.Response object
"""
return await self._request(
"GET",
url,
params=params,
headers=headers,
follow_redirects=allow_redirects,
)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
async def post(
self,
url: str,
data: Optional[Any] = None,
json: Optional[Dict] = None,
headers: Optional[Dict[str,
str]] = None,
2025-11-25 20:09:33 -08:00
) -> httpx.Response:
"""
Make an async POST request.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
url: Request URL
data: Form data
json: JSON data
headers: Additional headers
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
httpx.Response object
"""
return await self._request(
"POST",
url,
data=data,
json=json,
headers=headers,
)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
async def download(
self,
url: str,
file_path: str,
chunk_size: int = 8192,
progress_callback: Optional[Callable[[int,
int],
None]] = None,
headers: Optional[Dict[str,
str]] = None,
2025-11-25 20:09:33 -08:00
) -> Path:
"""
Download a file from URL asynchronously with optional progress tracking.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
url: File URL
file_path: Local file path to save to
chunk_size: Download chunk size
progress_callback: Callback(bytes_downloaded, total_bytes)
headers: Additional headers
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
Path object of downloaded file
"""
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
async with self._request_stream("GET", url, headers=headers) as response:
response.raise_for_status()
total_bytes = int(response.headers.get("content-length", 0))
bytes_downloaded = 0
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
with open(path, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size):
if chunk:
f.write(chunk)
bytes_downloaded += len(chunk)
if progress_callback:
progress_callback(bytes_downloaded, total_bytes)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
return path
2025-12-29 17:05:03 -08:00
async def _request(self, method: str, url: str, **kwargs) -> httpx.Response:
2025-11-25 20:09:33 -08:00
"""
Make an async HTTP request with automatic retries.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
method: HTTP method
url: Request URL
**kwargs: Additional arguments for httpx.AsyncClient.request()
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
httpx.Response object
"""
if not self._client:
raise RuntimeError(
"AsyncHTTPClient must be used with async context manager"
)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
# Merge headers
if "headers" in kwargs and kwargs["headers"]:
headers = self._get_headers()
headers.update(kwargs["headers"])
kwargs["headers"] = headers
else:
kwargs["headers"] = self._get_headers()
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
last_exception = None
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
for attempt in range(self.retries):
try:
response = await self._client.request(method, url, **kwargs)
response.raise_for_status()
return response
except httpx.TimeoutException as e:
last_exception = e
logger.warning(
f"Timeout on attempt {attempt + 1}/{self.retries}: {url}"
)
2025-11-25 20:09:33 -08:00
if attempt < self.retries - 1:
await asyncio.sleep(0.5) # Brief delay before retry
continue
except httpx.HTTPStatusError as e:
# Don't retry on 4xx errors
if 400 <= e.response.status_code < 500:
try:
response_text = e.response.text[:500]
except:
response_text = "<unable to read response>"
logger.error(
f"HTTP {e.response.status_code} from {url}: {response_text}"
)
2025-11-25 20:09:33 -08:00
raise
last_exception = e
try:
response_text = e.response.text[:200]
except:
response_text = "<unable to read response>"
2025-12-29 17:05:03 -08:00
logger.warning(
f"HTTP {e.response.status_code} on attempt {attempt + 1}/{self.retries}: {url} - {response_text}"
)
2025-11-25 20:09:33 -08:00
if attempt < self.retries - 1:
await asyncio.sleep(0.5)
continue
except (httpx.RequestError, httpx.ConnectError) as e:
last_exception = e
2025-12-29 17:05:03 -08:00
logger.warning(
f"Connection error on attempt {attempt + 1}/{self.retries}: {url} - {e}"
)
2025-11-25 20:09:33 -08:00
if attempt < self.retries - 1:
await asyncio.sleep(0.5)
continue
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
if last_exception:
logger.error(
f"Request failed after {self.retries} attempts: {url} - {last_exception}"
)
2025-11-25 20:09:33 -08:00
raise last_exception
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
raise RuntimeError("Request failed after retries")
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def _request_stream(self, method: str, url: str, **kwargs):
"""Make a streaming request."""
if not self._client:
raise RuntimeError(
"AsyncHTTPClient must be used with async context manager"
)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
# Merge headers
if "headers" in kwargs and kwargs["headers"]:
headers = self._get_headers()
headers.update(kwargs["headers"])
kwargs["headers"] = headers
else:
kwargs["headers"] = self._get_headers()
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
return self._client.stream(method, url, **kwargs)
# Convenience function for quick sync requests
def get(url: str, **kwargs) -> httpx.Response:
"""Quick GET request without context manager."""
with HTTPClient() as client:
return client.get(url, **kwargs)
def post(url: str, **kwargs) -> httpx.Response:
"""Quick POST request without context manager."""
with HTTPClient() as client:
return client.post(url, **kwargs)
def download(
url: str,
file_path: str,
progress_callback: Optional[Callable[[int,
int],
None]] = None,
2025-12-29 17:05:03 -08:00
**kwargs,
2025-11-25 20:09:33 -08:00
) -> Path:
"""Quick file download without context manager."""
with HTTPClient() as client:
return client.download(
url,
file_path,
progress_callback=progress_callback,
**kwargs
)