from __future__ import annotations import asyncio import re from pathlib import Path from typing import Any, Dict, Optional, Tuple from urllib.parse import urlparse from ProviderCore.base import Provider, SearchResult def _looks_like_telegram_message_url(url: str) -> bool: try: parsed = urlparse(str(url)) except Exception: return False host = (parsed.hostname or "").lower().strip() if host in {"t.me", "telegram.me"}: return True if host.endswith(".t.me"): return True return False def _parse_telegram_message_url(url: str) -> Tuple[str, int]: """Parse a Telegram message URL into (entity, message_id). Supported: - https://t.me// - https://t.me/s// - https://t.me/c// """ parsed = urlparse(str(url)) path = (parsed.path or "").strip("/") if not path: raise ValueError(f"Invalid Telegram URL: {url}") parts = [p for p in path.split("/") if p] if not parts: raise ValueError(f"Invalid Telegram URL: {url}") # Strip preview prefix if parts and parts[0].lower() == "s": parts = parts[1:] if len(parts) < 2: raise ValueError(f"Invalid Telegram URL (expected //): {url}") chat = parts[0] msg_raw = parts[1] # t.me/c// if chat.lower() == "c": if len(parts) < 3: raise ValueError(f"Invalid Telegram /c/ URL: {url}") chat = f"c:{parts[1]}" msg_raw = parts[2] m = re.fullmatch(r"\d+", str(msg_raw).strip()) if not m: raise ValueError(f"Invalid Telegram message id in URL: {url}") return str(chat), int(msg_raw) class Telegram(Provider): """Telegram provider using Telethon. Config: [provider=telegram] app_id= api_hash= """ def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) telegram_conf = self.config.get("provider", {}).get("telegram", {}) if isinstance(self.config, dict) else {} self._app_id = telegram_conf.get("app_id") self._api_hash = telegram_conf.get("api_hash") def validate(self) -> bool: try: __import__("telethon") except Exception: return False try: app_id = int(self._app_id) if self._app_id not in (None, "") else None except Exception: app_id = None api_hash = str(self._api_hash).strip() if self._api_hash not in (None, "") else "" return bool(app_id and api_hash) def _session_base_path(self) -> Path: root = Path(__file__).resolve().parents[1] session_dir = root / "Log" / "medeia_macina" try: session_dir.mkdir(parents=True, exist_ok=True) except Exception: pass return session_dir / "telegram" def _credentials(self) -> Tuple[int, str]: raw_app_id = self._app_id if raw_app_id in (None, ""): raise Exception("Telegram app_id missing") try: app_id = int(str(raw_app_id).strip()) except Exception: raise Exception("Telegram app_id invalid") api_hash = str(self._api_hash or "").strip() if not api_hash: raise Exception("Telegram api_hash missing") return app_id, api_hash def _ensure_event_loop(self) -> None: """Telethon sync wrapper requires an event loop to exist in this thread.""" try: asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) def _download_message_media_sync(self, *, url: str, output_dir: Path) -> Tuple[Path, Dict[str, Any]]: try: from telethon import errors from telethon.sync import TelegramClient from telethon.tl.types import PeerChannel except Exception as exc: raise Exception(f"Telethon not available: {exc}") self._ensure_event_loop() loop = asyncio.get_event_loop() if getattr(loop, "is_running", lambda: False)(): raise Exception("Telegram provider cannot run while an event loop is already running") def _resolve(value): if asyncio.iscoroutine(value): return loop.run_until_complete(value) return value app_id, api_hash = self._credentials() session_base = self._session_base_path() chat, message_id = _parse_telegram_message_url(url) client = TelegramClient(str(session_base), app_id, api_hash) try: # This prompts on first run for phone/code and persists the session. _resolve(client.start()) if chat.startswith("c:"): channel_id = int(chat.split(":", 1)[1]) entity = PeerChannel(channel_id) else: entity = chat if isinstance(entity, str) and entity and not entity.startswith("@"): entity = "@" + entity # Use the list form to be robust across Telethon sync/async stubs. messages = _resolve(client.get_messages(entity, ids=[message_id])) message = None if isinstance(messages, (list, tuple)): message = messages[0] if messages else None else: try: # TotalList is list-like message = messages[0] # type: ignore[index] except Exception: message = None if not message: raise Exception("Telegram message not found") if not getattr(message, "media", None): raise Exception("Telegram message has no media") chat_title = "" chat_username = "" chat_id = None try: chat_obj = getattr(message, "chat", None) if chat_obj is not None: maybe_title = getattr(chat_obj, "title", None) maybe_username = getattr(chat_obj, "username", None) maybe_id = getattr(chat_obj, "id", None) if isinstance(maybe_title, str): chat_title = maybe_title.strip() if isinstance(maybe_username, str): chat_username = maybe_username.strip() if maybe_id is not None: chat_id = int(maybe_id) except Exception: pass caption = "" try: maybe_caption = getattr(message, "message", None) if isinstance(maybe_caption, str): caption = maybe_caption.strip() except Exception: pass msg_id = None msg_date = None try: msg_id = int(getattr(message, "id", 0) or 0) except Exception: msg_id = None try: msg_date = getattr(message, "date", None) except Exception: msg_date = None file_name = "" file_mime = "" file_size = None try: file_obj = getattr(message, "file", None) maybe_name = getattr(file_obj, "name", None) maybe_mime = getattr(file_obj, "mime_type", None) maybe_size = getattr(file_obj, "size", None) if isinstance(maybe_name, str): file_name = maybe_name.strip() if isinstance(maybe_mime, str): file_mime = maybe_mime.strip() if maybe_size is not None: file_size = int(maybe_size) except Exception: pass downloaded = _resolve(client.download_media(message, file=str(output_dir))) if not downloaded: raise Exception("Telegram download returned no file") downloaded_path = Path(str(downloaded)) date_iso = None try: if msg_date is not None and hasattr(msg_date, "isoformat"): date_iso = msg_date.isoformat() # type: ignore[union-attr] except Exception: date_iso = None info: Dict[str, Any] = { "provider": "telegram", "source_url": url, "chat": { "key": chat, "title": chat_title, "username": chat_username, "id": chat_id, }, "message": { "id": msg_id, "date": date_iso, "caption": caption, }, "file": { "name": file_name, "mime_type": file_mime, "size": file_size, "downloaded_path": str(downloaded_path), }, } return downloaded_path, info except errors.RPCError as exc: raise Exception(f"Telegram RPC error: {exc}") finally: try: _resolve(client.disconnect()) except Exception: pass def download_url(self, url: str, output_dir: Path) -> Tuple[Path, Dict[str, Any]]: """Download a Telegram message URL and return (path, metadata).""" if not _looks_like_telegram_message_url(url): raise ValueError("Not a Telegram URL") return self._download_message_media_sync(url=url, output_dir=output_dir) def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]: url = str(getattr(result, "path", "") or "") if not url: return None if not _looks_like_telegram_message_url(url): return None path, _info = self._download_message_media_sync(url=url, output_dir=output_dir) return path