This commit is contained in:
2026-03-04 16:50:19 -08:00
parent 818d0c0338
commit 4110c5ec00
6 changed files with 379 additions and 77 deletions

View File

@@ -1,6 +1,8 @@
from __future__ import annotations
import asyncio
import importlib.util
import inspect
import re
import shutil
import sys
@@ -11,6 +13,7 @@ from typing import Any, Dict, List, Optional, Sequence, Tuple
from urllib.parse import urlparse
from ProviderCore.base import Provider, SearchResult
from SYS.logger import debug
_TELEGRAM_DEFAULT_TIMESTAMP_STEM_RE = re.compile(
r"^(?P<prefix>photo|video|document|audio|voice|animation)_(?P<date>\d{4}-\d{2}-\d{2})_(?P<time>\d{2}-\d{2}-\d{2})(?: \(\d+\))?$",
@@ -170,6 +173,21 @@ class Telegram(Provider):
"label": "Bot Token (optional)",
"default": "",
"secret": True
},
{
"key": "part_size_kb",
"label": "Transfer chunk size KB (4-512)",
"default": "512",
},
{
"key": "connection_mode",
"label": "Connection mode (abridged|full)",
"default": "abridged",
},
{
"key": "receive_updates",
"label": "Receive updates during transfers",
"default": False,
}
]
@@ -185,13 +203,108 @@ class Telegram(Provider):
self._api_hash = telegram_conf.get("api_hash")
self._bot_token = telegram_conf.get("bot_token")
self._last_login_error: Optional[str] = None
# Telethon downloads are chunked; larger parts mean fewer round-trips.
# Telethon typically expects 4..1024 KB and divisible by 4.
# Telethon transfers are chunked; larger parts mean fewer round-trips.
# Telethon typically expects 4..512 KB and divisible by 4.
self._part_size_kb = telegram_conf.get("part_size_kb")
if self._part_size_kb is None:
self._part_size_kb = telegram_conf.get("chunk_kb")
if self._part_size_kb is None:
self._part_size_kb = telegram_conf.get("download_part_kb")
self._connection_mode = str(
telegram_conf.get("connection_mode") or telegram_conf.get("connection")
or "abridged"
).strip().lower()
self._receive_updates = self._coerce_bool(
telegram_conf.get("receive_updates"), default=False
)
self._cryptg_available = self._detect_cryptg_available()
self._emitted_cryptg_hint = False
self._download_media_accepts_part_size = self._detect_download_media_accepts_part_size()
@staticmethod
def _coerce_bool(value: Any, *, default: bool = False) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
try:
s = str(value).strip().lower()
except Exception:
return default
if s in {"1", "true", "yes", "on"}:
return True
if s in {"0", "false", "no", "off"}:
return False
return default
@staticmethod
def _detect_cryptg_available() -> bool:
try:
return importlib.util.find_spec("cryptg") is not None
except Exception:
return False
@staticmethod
def _detect_download_media_accepts_part_size() -> bool:
try:
from telethon import TelegramClient
sig = inspect.signature(TelegramClient.download_media)
params = sig.parameters
if "part_size_kb" in params:
return True
return any(
p.kind == inspect.Parameter.VAR_KEYWORD
for p in params.values()
)
except Exception:
return False
def _emit_cryptg_speed_hint_once(self) -> None:
if self._cryptg_available or self._emitted_cryptg_hint:
return
self._emitted_cryptg_hint = True
try:
sys.stderr.write(
"[telegram] Tip: install 'cryptg' for faster Telegram media transfer performance.\n"
)
sys.stderr.flush()
except Exception:
pass
def _new_client(
self,
*,
session_base: Path,
app_id: int,
api_hash: str,
receive_updates: Optional[bool] = None,
):
from telethon import TelegramClient
kwargs: Dict[str, Any] = {
"receive_updates": bool(
self._receive_updates
if receive_updates is None else receive_updates
)
}
mode = str(self._connection_mode or "").strip().lower()
try:
if mode in {"abridged", "tcpabridged", "fast"}:
from telethon.network.connection.tcpabridged import ConnectionTcpAbridged
kwargs["connection"] = ConnectionTcpAbridged
elif mode in {"full", "tcpfull"}:
from telethon.network.connection.tcpfull import ConnectionTcpFull
kwargs["connection"] = ConnectionTcpFull
except Exception:
pass
try:
return TelegramClient(str(session_base), app_id, api_hash, **kwargs)
except TypeError:
return TelegramClient(str(session_base), app_id, api_hash)
def _has_running_event_loop(self) -> bool:
try:
@@ -342,7 +455,11 @@ class Telegram(Provider):
session_base = self._session_base_path()
async def _check_async() -> bool:
client = TelegramClient(str(session_base), app_id, api_hash)
client = self._new_client(
session_base=session_base,
app_id=app_id,
api_hash=api_hash,
)
try:
await client.connect()
return bool(await client.is_user_authorized())
@@ -418,7 +535,12 @@ class Telegram(Provider):
session_base = self._session_base_path()
async def _auth_async() -> None:
client = TelegramClient(str(session_base), app_id, api_hash)
client = self._new_client(
session_base=session_base,
app_id=app_id,
api_hash=api_hash,
receive_updates=True,
)
try:
if use_bot:
await client.start(bot_token=bot_token)
@@ -515,7 +637,12 @@ class Telegram(Provider):
session_base = self._session_base_path()
async def _auth_async() -> None:
client = TelegramClient(str(session_base), app_id, api_hash)
client = self._new_client(
session_base=session_base,
app_id=app_id,
api_hash=api_hash,
receive_updates=True,
)
try:
await client.start(bot_token=bot_token)
finally:
@@ -545,15 +672,15 @@ class Telegram(Provider):
if val not in (None, ""):
ps = int(str(val).strip())
else:
ps = 1024
ps = 512
except Exception:
ps = 1024
ps = 512
# Clamp to Telethon-safe range.
if ps < 4:
ps = 4
if ps > 1024:
ps = 1024
if ps > 512:
ps = 512
# Must be divisible by 4.
ps = int(ps / 4) * 4
if ps <= 0:
@@ -640,7 +767,11 @@ class Telegram(Provider):
session_base = self._session_base_path()
async def _list_async() -> list[Dict[str, Any]]:
client = TelegramClient(str(session_base), app_id, api_hash)
client = self._new_client(
session_base=session_base,
app_id=app_id,
api_hash=api_hash,
)
rows: list[Dict[str, Any]] = []
try:
await client.connect()
@@ -840,13 +971,18 @@ class Telegram(Provider):
raise Exception("No chat selected")
async def _send_async() -> None:
client = TelegramClient(str(session_base), app_id, api_hash)
client = self._new_client(
session_base=session_base,
app_id=app_id,
api_hash=api_hash,
)
try:
await client.connect()
if not bool(await client.is_user_authorized()):
raise Exception(
"Telegram session is not authorized. Run: .telegram -login"
)
self._emit_cryptg_speed_hint_once()
# Resolve entities: prefer IDs. Only fall back to usernames when IDs are absent.
entities: list[Any] = []
@@ -999,7 +1135,11 @@ class Telegram(Provider):
chat, message_id = _parse_telegram_message_url(url)
async def _download_async() -> Tuple[Path, Dict[str, Any]]:
client = TelegramClient(str(session_base), app_id, api_hash)
client = self._new_client(
session_base=session_base,
app_id=app_id,
api_hash=api_hash,
)
try:
await client.connect()
if not bool(await client.is_user_authorized()):
@@ -1104,18 +1244,20 @@ class Telegram(Provider):
)
part_kb = self._resolve_part_size_kb(file_size)
self._emit_cryptg_speed_hint_once()
download_kwargs: Dict[str, Any] = {
"file": str(output_dir),
"progress_callback": _progress,
}
if self._download_media_accepts_part_size:
download_kwargs["part_size_kb"] = part_kb
try:
downloaded = await client.download_media(
message,
file=str(output_dir),
progress_callback=_progress,
part_size_kb=part_kb,
)
downloaded = await client.download_media(message, **download_kwargs)
except TypeError:
downloaded = await client.download_media(
message,
file=str(output_dir),
progress_callback=_progress
progress_callback=_progress,
)
progress_bar.finish()
if not downloaded:

View File

@@ -25,6 +25,18 @@ from API.httpx_shared import get_shared_httpx_client
from SYS.logger import log
_DEFAULT_TIDAL_TRACK_API_BASES = (
"https://triton.squid.wtf",
"https://wolf.qqdl.site",
"https://maus.qqdl.site",
"https://vogel.qqdl.site",
"https://katze.qqdl.site",
"https://hund.qqdl.site",
"https://tidal.kinoplus.online",
"https://tidal-api.binimum.org",
)
def resolve_tidal_manifest_path(item: Any) -> Optional[str]:
"""Persist the Tidal manifest (MPD) and return a local path or URL.
@@ -114,6 +126,41 @@ def resolve_tidal_manifest_path(item: Any) -> Optional[str]:
return _persist_mpd_bytes(item, metadata, manifest_bytes)
def _normalize_api_base(candidate: Any) -> Optional[str]:
text = str(candidate or "").strip()
if not text:
return None
if not re.match(r"^https?://", text, flags=re.IGNORECASE):
return None
return text.rstrip("/")
def _iter_track_api_bases(metadata: Dict[str, Any]) -> list[str]:
bases: list[str] = []
seen: set[str] = set()
dynamic_candidates = [
metadata.get("_tidal_api_base"),
metadata.get("_api_base"),
metadata.get("api_base"),
metadata.get("base_url"),
]
for candidate in dynamic_candidates:
normalized = _normalize_api_base(candidate)
if normalized and normalized not in seen:
seen.add(normalized)
bases.append(normalized)
for candidate in _DEFAULT_TIDAL_TRACK_API_BASES:
normalized = _normalize_api_base(candidate)
if normalized and normalized not in seen:
seen.add(normalized)
bases.append(normalized)
return bases
def _maybe_fetch_track_manifest(item: Any, metadata: Dict[str, Any]) -> None:
"""If we only have a track id, fetch details from the proxy to populate `manifest`."""
@@ -155,29 +202,40 @@ def _maybe_fetch_track_manifest(item: Any, metadata: Dict[str, Any]) -> None:
try:
client = get_shared_httpx_client()
except Exception:
return
resp = client.get(
"https://tidal-api.binimum.org/track/",
params={"id": str(track_int)},
timeout=10.0,
)
resp.raise_for_status()
payload = resp.json()
data = payload.get("data") if isinstance(payload, dict) else None
if isinstance(data, dict) and data:
attempted = False
for base in _iter_track_api_bases(metadata):
attempted = True
track_data: Optional[Dict[str, Any]] = None
for params in ({"id": str(track_int)}, {"id": str(track_int), "quality": "LOSSLESS"}):
try:
metadata.update(data)
resp = client.get(
f"{base}/track/",
params=params,
timeout=10.0,
)
resp.raise_for_status()
payload = resp.json()
data = payload.get("data") if isinstance(payload, dict) else None
if isinstance(data, dict) and data:
track_data = data
break
except Exception:
pass
continue
if isinstance(track_data, dict) and track_data:
try:
metadata["_tidal_track_details_fetched"] = True
metadata.update(track_data)
except Exception:
pass
if not metadata.get("url"):
if not metadata.get("manifest") or not metadata.get("url"):
try:
resp_info = client.get(
"https://tidal-api.binimum.org/info/",
f"{base}/info/",
params={"id": str(track_int)},
timeout=10.0,
)
@@ -186,21 +244,22 @@ def _maybe_fetch_track_manifest(item: Any, metadata: Dict[str, Any]) -> None:
info_data = info_payload.get("data") if isinstance(info_payload, dict) else None
if isinstance(info_data, dict) and info_data:
try:
for k, v in info_data.items():
if k not in metadata:
metadata[k] = v
except Exception:
pass
try:
if info_data.get("url"):
metadata["url"] = info_data.get("url")
for key, value in info_data.items():
if key not in metadata or not metadata.get(key):
metadata[key] = value
except Exception:
pass
except Exception:
pass
except Exception:
return
if metadata.get("manifest"):
break
if attempted:
try:
metadata["_tidal_track_details_fetched"] = True
except Exception:
pass
def _resolve_json_manifest_urls(metadata: Dict[str, Any], manifest_bytes: bytes) -> Optional[str]: