from __future__ import annotations import asyncio import re import shutil import sys import time from pathlib import Path from typing import Any, Dict, Optional, Tuple from urllib.parse import urlparse from ProviderCore.base import Provider, SearchResult def _looks_like_telegram_message_url(url: str) -> bool: try: parsed = urlparse(str(url)) except Exception: return False host = (parsed.hostname or "").lower().strip() if host in {"t.me", "telegram.me"}: return True if host.endswith(".t.me"): return True return False def _parse_telegram_message_url(url: str) -> Tuple[str, int]: """Parse a Telegram message URL into (entity, message_id). Supported: - https://t.me// - https://t.me/s// - https://t.me/c// """ parsed = urlparse(str(url)) path = (parsed.path or "").strip("/") if not path: raise ValueError(f"Invalid Telegram URL: {url}") parts = [p for p in path.split("/") if p] if not parts: raise ValueError(f"Invalid Telegram URL: {url}") # Strip preview prefix if parts and parts[0].lower() == "s": parts = parts[1:] if len(parts) < 2: raise ValueError(f"Invalid Telegram URL (expected //): {url}") chat = parts[0] msg_raw = parts[1] # t.me/c// if chat.lower() == "c": if len(parts) < 3: raise ValueError(f"Invalid Telegram /c/ URL: {url}") chat = f"c:{parts[1]}" msg_raw = parts[2] m = re.fullmatch(r"\d+", str(msg_raw).strip()) if not m: raise ValueError(f"Invalid Telegram message id in URL: {url}") return str(chat), int(msg_raw) class Telegram(Provider): """Telegram provider using Telethon. Config: [provider=telegram] app_id= api_hash= """ def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) telegram_conf = self.config.get("provider", {}).get("telegram", {}) if isinstance(self.config, dict) else {} self._app_id = telegram_conf.get("app_id") self._api_hash = telegram_conf.get("api_hash") self._bot_token = telegram_conf.get("bot_token") # Telethon downloads are chunked; larger parts mean fewer round-trips. # Telethon typically expects 4..1024 KB and divisible by 4. self._part_size_kb = telegram_conf.get("part_size_kb") if self._part_size_kb is None: self._part_size_kb = telegram_conf.get("chunk_kb") if self._part_size_kb is None: self._part_size_kb = telegram_conf.get("download_part_kb") # Avoid repeatedly prompting during startup where validate() may be called multiple times. _startup_auth_attempted: bool = False def _legacy_session_base_path(self) -> Path: # Older versions stored sessions under Log/medeia_macina. root = Path(__file__).resolve().parents[1] return root / "Log" / "medeia_macina" / "telegram" def _migrate_legacy_session_if_needed(self) -> None: """If a legacy Telethon session exists, copy it to the new root location.""" try: new_base = self._session_base_path() new_session = Path(str(new_base) + ".session") if new_session.is_file(): return legacy_base = self._legacy_session_base_path() legacy_session = Path(str(legacy_base) + ".session") if not legacy_session.is_file(): return for suffix in (".session", ".session-journal", ".session-wal", ".session-shm"): src = Path(str(legacy_base) + suffix) dst = Path(str(new_base) + suffix) try: if src.is_file() and not dst.exists(): shutil.copy2(str(src), str(dst)) except Exception: continue except Exception: return def _session_file_path(self) -> Path: self._migrate_legacy_session_if_needed() base = self._session_base_path() return Path(str(base) + ".session") def _has_session(self) -> bool: self._migrate_legacy_session_if_needed() try: return self._session_file_path().is_file() except Exception: return False def _ensure_session_interactive(self) -> bool: """Best-effort interactive auth to create a Telethon session file. Returns True if a session exists after the attempt. """ if self._has_session(): return True # Never prompt in non-interactive contexts. try: if not bool(getattr(sys.stdin, "isatty", lambda: False)()): return False except Exception: return False try: from telethon.sync import TelegramClient except Exception: return False try: app_id, api_hash = self._credentials() except Exception: return False self._ensure_event_loop() loop = asyncio.get_event_loop() if getattr(loop, "is_running", lambda: False)(): # Can't safely prompt/auth while another loop is running. return False def _resolve(value): if asyncio.iscoroutine(value): return loop.run_until_complete(value) return value try: sys.stderr.write("[telegram] No session found; login required.\n") sys.stderr.write("[telegram] Choose login method: 1) phone 2) bot token\n") sys.stderr.write("[telegram] Enter 1 or 2: ") sys.stderr.flush() choice = "" try: choice = str(input()).strip().lower() except EOFError: choice = "" use_bot = choice in {"2", "b", "bot", "token"} bot_token = "" if use_bot: sys.stderr.write("[telegram] Bot token: ") sys.stderr.flush() try: bot_token = str(input()).strip() except EOFError: bot_token = "" if not bot_token: return False self._bot_token = bot_token else: sys.stderr.write("[telegram] Phone login selected (Telethon will prompt for phone + code).\n") sys.stderr.flush() session_base = self._session_base_path() client = TelegramClient(str(session_base), app_id, api_hash) try: if use_bot: _resolve(client.start(bot_token=bot_token)) else: _resolve(client.start()) finally: try: _resolve(client.disconnect()) except Exception: pass finally: try: sys.stderr.write("\n") sys.stderr.flush() except Exception: pass return self._has_session() def _ensure_session_with_bot_token(self, bot_token: str) -> bool: """Create a Telethon session using a bot token without prompting. Returns True if a session exists after the attempt. """ if self._has_session(): return True bot_token = str(bot_token or "").strip() if not bot_token: return False try: from telethon.sync import TelegramClient except Exception: return False try: app_id, api_hash = self._credentials() except Exception: return False self._ensure_event_loop() loop = asyncio.get_event_loop() if getattr(loop, "is_running", lambda: False)(): return False def _resolve(value): if asyncio.iscoroutine(value): return loop.run_until_complete(value) return value session_base = self._session_base_path() client = TelegramClient(str(session_base), app_id, api_hash) try: _resolve(client.start(bot_token=bot_token)) finally: try: _resolve(client.disconnect()) except Exception: pass return self._has_session() def _resolve_part_size_kb(self, file_size: Optional[int]) -> int: # Default: bias to max throughput. val = self._part_size_kb try: if val not in (None, ""): ps = int(str(val).strip()) else: ps = 1024 except Exception: ps = 1024 # Clamp to Telethon-safe range. if ps < 4: ps = 4 if ps > 1024: ps = 1024 # Must be divisible by 4. ps = int(ps / 4) * 4 if ps <= 0: ps = 64 # For very small files, reduce overhead a bit (still divisible by 4). try: if file_size is not None and int(file_size) > 0: if int(file_size) < 2 * 1024 * 1024: ps = min(ps, 256) elif int(file_size) < 10 * 1024 * 1024: ps = min(ps, 512) except Exception: pass return ps def validate(self) -> bool: try: __import__("telethon") except Exception: return False try: app_id = int(self._app_id) if self._app_id not in (None, "") else None except Exception: app_id = None api_hash = str(self._api_hash).strip() if self._api_hash not in (None, "") else "" if not bool(app_id and api_hash): return False # Consider Telegram "configured" only if a persisted session exists. if self._has_session(): return True # If a bot token is already configured, attempt a non-interactive login. bot_token = str(self._bot_token or "").strip() if bot_token: return bool(self._ensure_session_with_bot_token(bot_token)) # Interactive startup prompt (only once per process). if Telegram._startup_auth_attempted: return False Telegram._startup_auth_attempted = True return bool(self._ensure_session_interactive()) def _session_base_path(self) -> Path: # Store session alongside cookies.txt at repo root. # Telethon uses this as base name and writes ".session". root = Path(__file__).resolve().parents[1] return root / "telegram" def _credentials(self) -> Tuple[int, str]: raw_app_id = self._app_id if raw_app_id in (None, ""): raise Exception("Telegram app_id missing") try: app_id = int(str(raw_app_id).strip()) except Exception: raise Exception("Telegram app_id invalid") api_hash = str(self._api_hash or "").strip() if not api_hash: raise Exception("Telegram api_hash missing") return app_id, api_hash def _ensure_event_loop(self) -> None: """Telethon sync wrapper requires an event loop to exist in this thread.""" try: asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) def _download_message_media_sync(self, *, url: str, output_dir: Path) -> Tuple[Path, Dict[str, Any]]: try: from telethon import errors from telethon.sync import TelegramClient from telethon.tl.types import PeerChannel except Exception as exc: raise Exception(f"Telethon not available: {exc}") self._ensure_event_loop() loop = asyncio.get_event_loop() if getattr(loop, "is_running", lambda: False)(): raise Exception("Telegram provider cannot run while an event loop is already running") def _resolve(value): if asyncio.iscoroutine(value): return loop.run_until_complete(value) return value app_id, api_hash = self._credentials() session_base = self._session_base_path() chat, message_id = _parse_telegram_message_url(url) def _format_bytes(num: Optional[int]) -> str: try: if num is None: return "?B" n = float(num) suffixes = ["B", "KB", "MB", "GB", "TB"] for s in suffixes: if n < 1024 or s == suffixes[-1]: if s == "B": return f"{int(n)}{s}" return f"{n:.1f}{s}" n /= 1024 except Exception: return "?B" client = TelegramClient(str(session_base), app_id, api_hash) try: # This prompts on first run for phone/code and persists the session. _resolve(client.start()) if chat.startswith("c:"): channel_id = int(chat.split(":", 1)[1]) entity = PeerChannel(channel_id) else: entity = chat if isinstance(entity, str) and entity and not entity.startswith("@"): entity = "@" + entity # Use the list form to be robust across Telethon sync/async stubs. messages = _resolve(client.get_messages(entity, ids=[message_id])) message = None if isinstance(messages, (list, tuple)): message = messages[0] if messages else None else: try: # TotalList is list-like message = messages[0] # type: ignore[index] except Exception: message = None if not message: raise Exception("Telegram message not found") if not getattr(message, "media", None): raise Exception("Telegram message has no media") chat_title = "" chat_username = "" chat_id = None try: chat_obj = getattr(message, "chat", None) if chat_obj is not None: maybe_title = getattr(chat_obj, "title", None) maybe_username = getattr(chat_obj, "username", None) maybe_id = getattr(chat_obj, "id", None) if isinstance(maybe_title, str): chat_title = maybe_title.strip() if isinstance(maybe_username, str): chat_username = maybe_username.strip() if maybe_id is not None: chat_id = int(maybe_id) except Exception: pass caption = "" try: maybe_caption = getattr(message, "message", None) if isinstance(maybe_caption, str): caption = maybe_caption.strip() except Exception: pass msg_id = None msg_date = None try: msg_id = int(getattr(message, "id", 0) or 0) except Exception: msg_id = None try: msg_date = getattr(message, "date", None) except Exception: msg_date = None file_name = "" file_mime = "" file_size = None try: file_obj = getattr(message, "file", None) maybe_name = getattr(file_obj, "name", None) maybe_mime = getattr(file_obj, "mime_type", None) maybe_size = getattr(file_obj, "size", None) if isinstance(maybe_name, str): file_name = maybe_name.strip() if isinstance(maybe_mime, str): file_mime = maybe_mime.strip() if maybe_size is not None: file_size = int(maybe_size) except Exception: pass # Progress callback: prints to stderr so it doesn't interfere with pipeline stdout. from models import ProgressBar progress_bar = ProgressBar() last_print = {"t": 0.0} def _progress(current: int, total: int) -> None: now = time.monotonic() # Throttle to avoid spamming. if now - float(last_print.get("t", 0.0)) < 0.25 and current < total: return last_print["t"] = now progress_bar.update(downloaded=int(current), total=int(total), label="telegram", file=sys.stderr) part_kb = self._resolve_part_size_kb(file_size) try: downloaded = _resolve( client.download_media( message, file=str(output_dir), progress_callback=_progress, part_size_kb=part_kb, ) ) except TypeError: # Older/newer Telethon versions may not accept part_size_kb on download_media. downloaded = _resolve(client.download_media(message, file=str(output_dir), progress_callback=_progress)) progress_bar.finish() if not downloaded: raise Exception("Telegram download returned no file") downloaded_path = Path(str(downloaded)) date_iso = None try: if msg_date is not None and hasattr(msg_date, "isoformat"): date_iso = msg_date.isoformat() # type: ignore[union-attr] except Exception: date_iso = None info: Dict[str, Any] = { "provider": "telegram", "source_url": url, "chat": { "key": chat, "title": chat_title, "username": chat_username, "id": chat_id, }, "message": { "id": msg_id, "date": date_iso, "caption": caption, }, "file": { "name": file_name, "mime_type": file_mime, "size": file_size, "downloaded_path": str(downloaded_path), }, } return downloaded_path, info except errors.RPCError as exc: raise Exception(f"Telegram RPC error: {exc}") finally: try: _resolve(client.disconnect()) except Exception: pass def download_url(self, url: str, output_dir: Path) -> Tuple[Path, Dict[str, Any]]: """Download a Telegram message URL and return (path, metadata).""" if not _looks_like_telegram_message_url(url): raise ValueError("Not a Telegram URL") return self._download_message_media_sync(url=url, output_dir=output_dir) def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]: url = str(getattr(result, "path", "") or "") if not url: return None if not _looks_like_telegram_message_url(url): return None path, _info = self._download_message_media_sync(url=url, output_dir=output_dir) return path