Files
Medios-Macina/Provider/matrix.py

598 lines
20 KiB
Python
Raw Normal View History

2025-12-11 19:04:02 -08:00
from __future__ import annotations
import mimetypes
2025-12-16 01:45:01 -08:00
import time
import uuid
2025-12-11 19:04:02 -08:00
from pathlib import Path
2025-12-16 01:45:01 -08:00
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import quote
2025-12-11 19:04:02 -08:00
import requests
2025-12-19 02:29:42 -08:00
from ProviderCore.base import Provider
2025-12-11 19:04:02 -08:00
_MATRIX_INIT_CHECK_CACHE: Dict[str,
Tuple[bool,
Optional[str]]] = {}
2025-12-13 12:09:50 -08:00
2025-12-27 14:50:59 -08:00
def _sniff_mime_from_header(path: Path) -> Optional[str]:
2025-12-29 17:05:03 -08:00
"""Best-effort MIME sniffing from file headers.
Used when the file has no/unknown extension (common for exported/temp files).
Keeps dependencies to stdlib only.
"""
try:
if not path.exists() or not path.is_file():
return None
with open(path, "rb") as handle:
header = handle.read(512)
if not header:
return None
# Images
if header.startswith(b"\xff\xd8\xff"):
return "image/jpeg"
if header.startswith(b"\x89PNG\r\n\x1a\n"):
return "image/png"
if header.startswith(b"GIF87a") or header.startswith(b"GIF89a"):
return "image/gif"
if header.startswith(b"BM"):
return "image/bmp"
if header.startswith(b"RIFF") and len(header) >= 12 and header[8:12] == b"WEBP":
return "image/webp"
# Audio
if header.startswith(b"fLaC"):
return "audio/flac"
if header.startswith(b"OggS"):
# Could be audio or video; treat as audio unless extension suggests video.
return "audio/ogg"
if header.startswith(b"ID3"):
return "audio/mpeg"
if len(header) >= 2 and header[0] == 0xFF and (header[1] & 0xE0) == 0xE0:
return "audio/mpeg"
if header.startswith(b"RIFF") and len(header) >= 12 and header[8:12] == b"WAVE":
return "audio/wav"
# Video
if header.startswith(b"RIFF") and len(header) >= 12 and header[8:12] == b"AVI ":
return "video/x-msvideo"
if header.startswith(b"\x1a\x45\xdf\xa3"):
# EBML container: Matroska/WebM.
return "video/x-matroska"
if len(header) >= 12 and header[4:8] == b"ftyp":
# ISO BMFF: mp4/mov/m4a. Default to mp4; extension can refine.
return "video/mp4"
# MPEG-TS / M2TS (sync byte every 188 bytes)
try:
if path.stat().st_size >= 188 * 2 and header[0] == 0x47:
with open(path, "rb") as handle:
handle.seek(188)
b = handle.read(1)
if b == b"\x47":
return "video/mp2t"
except Exception:
pass
return None
except Exception:
return None
def _classify_matrix_upload(path: Path,
*,
explicit_mime_type: Optional[str] = None) -> Tuple[str,
str]:
2025-12-29 17:05:03 -08:00
"""Return (mime_type, msgtype) for Matrix uploads."""
mime_type = str(explicit_mime_type or "").strip() or None
if not mime_type:
# `mimetypes.guess_type` expects a string/URL; Path can return None on some platforms.
mime_type, _ = mimetypes.guess_type(str(path))
if not mime_type:
mime_type = _sniff_mime_from_header(path)
# Refinements based on extension for ambiguous containers.
ext = path.suffix.lower()
if ext in {".m4a",
".aac"}:
2025-12-29 17:05:03 -08:00
mime_type = mime_type or "audio/mp4"
if ext in {".mkv",
".webm"}:
2025-12-29 17:05:03 -08:00
mime_type = mime_type or "video/x-matroska"
if ext in {".ogv"}:
mime_type = mime_type or "video/ogg"
msgtype = "m.file"
if mime_type:
mt = mime_type.casefold()
if mt.startswith("image/"):
msgtype = "m.image"
elif mt.startswith("audio/"):
msgtype = "m.audio"
elif mt.startswith("video/"):
msgtype = "m.video"
# Final fallback for unknown MIME types.
if msgtype == "m.file":
audio_exts = {
".mp3",
".flac",
".wav",
".m4a",
".aac",
".ogg",
".opus",
".wma",
".mka",
".alac",
}
video_exts = {
".mp4",
".mkv",
".webm",
".mov",
".avi",
".flv",
".mpg",
".mpeg",
".ts",
".m4v",
".wmv",
".m2ts",
".mts",
".3gp",
".ogv",
}
image_exts = {".jpg",
".jpeg",
".png",
".gif",
".webp",
".bmp",
".tiff"}
2025-12-29 17:05:03 -08:00
if ext in audio_exts:
msgtype = "m.audio"
elif ext in video_exts:
msgtype = "m.video"
elif ext in image_exts:
msgtype = "m.image"
return (mime_type or "application/octet-stream"), msgtype
2025-12-27 14:50:59 -08:00
2025-12-13 12:09:50 -08:00
def _normalize_homeserver(value: str) -> str:
2025-12-29 17:05:03 -08:00
text = str(value or "").strip()
if not text:
return ""
if not text.startswith("http"):
text = f"https://{text}"
return text.rstrip("/")
def _matrix_health_check(*,
homeserver: str,
access_token: Optional[str]) -> Tuple[bool,
Optional[str]]:
2025-12-29 17:05:03 -08:00
"""Lightweight Matrix reachability/auth validation.
- Always checks `/versions` (no auth).
- If `access_token` is present, also checks `/whoami`.
"""
try:
base = _normalize_homeserver(homeserver)
if not base:
return False, "Matrix homeserver missing"
resp = requests.get(f"{base}/_matrix/client/versions", timeout=5)
if resp.status_code != 200:
return False, f"Homeserver returned {resp.status_code}"
if access_token:
headers = {
"Authorization": f"Bearer {access_token}"
}
2025-12-29 17:05:03 -08:00
resp = requests.get(
f"{base}/_matrix/client/v3/account/whoami",
headers=headers,
timeout=5
2025-12-29 17:05:03 -08:00
)
if resp.status_code != 200:
return False, f"Authentication failed: {resp.status_code}"
return True, None
except Exception as exc:
return False, str(exc)
2025-12-13 12:09:50 -08:00
2025-12-19 02:29:42 -08:00
class Matrix(Provider):
2025-12-29 17:05:03 -08:00
"""File provider for Matrix (Element) chat rooms."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
self._init_ok: Optional[bool] = None
self._init_reason: Optional[str] = None
matrix_conf = (
self.config.get("provider",
{}).get("matrix",
{}) if isinstance(self.config,
dict) else {}
2025-12-29 17:05:03 -08:00
)
homeserver = matrix_conf.get("homeserver")
access_token = matrix_conf.get("access_token")
password = matrix_conf.get("password")
# Not configured: keep instance but mark invalid via validate().
# Note: `room_id` is intentionally NOT required, since the CLI can prompt
# the user to select a room dynamically.
if not (homeserver and (access_token or password)):
self._init_ok = None
self._init_reason = None
return
cache_key = f"{_normalize_homeserver(str(homeserver))}|has_token:{bool(access_token)}"
cached = _MATRIX_INIT_CHECK_CACHE.get(cache_key)
if cached is None:
ok, reason = _matrix_health_check(
homeserver=str(homeserver), access_token=str(access_token) if access_token else None
)
_MATRIX_INIT_CHECK_CACHE[cache_key] = (ok, reason)
else:
ok, reason = cached
self._init_ok = ok
self._init_reason = reason
if not ok:
raise Exception(reason or "Matrix unavailable")
def validate(self) -> bool:
if not self.config:
return False
if self._init_ok is False:
return False
matrix_conf = self.config.get("provider",
{}).get("matrix",
{})
2025-12-29 17:05:03 -08:00
return bool(
matrix_conf.get("homeserver")
and (matrix_conf.get("access_token") or matrix_conf.get("password"))
)
def _get_homeserver_and_token(self) -> Tuple[str, str]:
matrix_conf = self.config.get("provider",
{}).get("matrix",
{})
2025-12-29 17:05:03 -08:00
homeserver = matrix_conf.get("homeserver")
access_token = matrix_conf.get("access_token")
if not homeserver:
raise Exception("Matrix homeserver missing")
if not access_token:
raise Exception("Matrix access_token missing")
base = _normalize_homeserver(str(homeserver))
if not base:
raise Exception("Matrix homeserver missing")
return base, str(access_token)
def list_joined_room_ids(self) -> List[str]:
"""Return joined room IDs for the current user.
Uses `GET /_matrix/client/v3/joined_rooms`.
"""
base, token = self._get_homeserver_and_token()
headers = {
"Authorization": f"Bearer {token}"
}
resp = requests.get(
f"{base}/_matrix/client/v3/joined_rooms",
headers=headers,
timeout=10
)
2025-12-29 17:05:03 -08:00
if resp.status_code != 200:
raise Exception(f"Matrix joined_rooms failed: {resp.text}")
data = resp.json() or {}
rooms = data.get("joined_rooms") or []
out: List[str] = []
for rid in rooms:
if not isinstance(rid, str) or not rid.strip():
continue
out.append(rid.strip())
return out
def list_rooms(self,
*,
room_ids: Optional[List[str]] = None) -> List[Dict[str,
Any]]:
2025-12-29 17:05:03 -08:00
"""Return joined rooms, optionally limited to a subset.
Performance note: room names require additional per-room HTTP requests.
If `room_ids` is provided, only those rooms will have name lookups.
"""
base, token = self._get_homeserver_and_token()
headers = {
"Authorization": f"Bearer {token}"
}
2025-12-29 17:05:03 -08:00
joined = self.list_joined_room_ids()
if room_ids:
allowed = {str(v).strip().casefold()
for v in room_ids if str(v).strip()}
2025-12-29 17:05:03 -08:00
if allowed:
# Accept either full IDs (!id:hs) or short IDs (!id).
def _is_allowed(rid: str) -> bool:
r = str(rid or "").strip()
if not r:
return False
rc = r.casefold()
if rc in allowed:
return True
short = r.split(":", 1)[0].strip().casefold()
return bool(short) and short in allowed
joined = [rid for rid in joined if _is_allowed(rid)]
out: List[Dict[str, Any]] = []
for room_id in joined:
name = ""
# Best-effort room name lookup (safe to fail).
try:
encoded = quote(room_id, safe="")
name_resp = requests.get(
f"{base}/_matrix/client/v3/rooms/{encoded}/state/m.room.name",
headers=headers,
timeout=5,
)
if name_resp.status_code == 200:
payload = name_resp.json() or {}
maybe = payload.get("name")
if isinstance(maybe, str):
name = maybe
except Exception:
pass
out.append({
"room_id": room_id,
"name": name
})
2025-12-29 17:05:03 -08:00
return out
def upload_to_room(self, file_path: str, room_id: str, **kwargs: Any) -> str:
"""Upload a file and send it to a specific room."""
from SYS.models import ProgressFileReader
2025-12-29 17:05:03 -08:00
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
if not room_id:
raise Exception("Matrix room_id missing")
base, token = self._get_homeserver_and_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/octet-stream",
}
mime_type, msgtype = _classify_matrix_upload(
path, explicit_mime_type=kwargs.get("mime_type")
)
headers["Content-Type"] = mime_type
filename = path.name
# Upload media
upload_url = f"{base}/_matrix/media/v3/upload"
with open(path, "rb") as handle:
wrapped = ProgressFileReader(
handle,
total_bytes=int(path.stat().st_size),
label="upload"
2025-12-29 17:05:03 -08:00
)
resp = requests.post(
upload_url,
headers=headers,
data=wrapped,
params={
"filename": filename
}
2025-12-29 17:05:03 -08:00
)
if resp.status_code != 200:
raise Exception(f"Matrix upload failed: {resp.text}")
content_uri = (resp.json() or {}).get("content_uri")
if not content_uri:
raise Exception("No content_uri returned")
# Build a fragment-free URL suitable for storage backends.
# `matrix.to` links use fragments (`#/...`) which some backends normalize away.
download_url_for_store = ""
try:
curi = str(content_uri or "").strip()
if curi.startswith("mxc://"):
rest = curi[len("mxc://"):]
2025-12-29 17:05:03 -08:00
if "/" in rest:
server_name, media_id = rest.split("/", 1)
server_name = str(server_name).strip()
media_id = str(media_id).strip()
if server_name and media_id:
download_url_for_store = f"{base}/_matrix/media/v3/download/{quote(server_name, safe='')}/{quote(media_id, safe='')}"
except Exception:
download_url_for_store = ""
info = {
"mimetype": mime_type,
"size": path.stat().st_size
}
payload = {
"msgtype": msgtype,
"body": filename,
"url": content_uri,
"info": info
}
2025-12-29 17:05:03 -08:00
# Correct Matrix client API send endpoint requires a transaction ID.
txn_id = f"mm_{int(time.time())}_{uuid.uuid4().hex[:8]}"
encoded_room = quote(str(room_id), safe="")
send_url = f"{base}/_matrix/client/v3/rooms/{encoded_room}/send/m.room.message/{txn_id}"
send_headers = {
"Authorization": f"Bearer {token}"
}
2025-12-29 17:05:03 -08:00
send_resp = requests.put(send_url, headers=send_headers, json=payload)
if send_resp.status_code != 200:
raise Exception(f"Matrix send message failed: {send_resp.text}")
event_id = (send_resp.json() or {}).get("event_id")
link = (
f"https://matrix.to/#/{room_id}/{event_id}"
if event_id else f"https://matrix.to/#/{room_id}"
2025-12-29 17:05:03 -08:00
)
# Optional: if a PipeObject is provided and it already has store+hash,
# attach the uploaded URL back to the stored file.
try:
pipe_obj = kwargs.get("pipe_obj")
if pipe_obj is not None:
from Store import Store
# Prefer the direct media download URL for storage backends.
Store(
self.config,
suppress_debug=True
).try_add_url_for_pipe_object(
2025-12-29 17:05:03 -08:00
pipe_obj,
download_url_for_store or link,
)
except Exception:
pass
return link
def send_text_to_room(self, text: str, room_id: str) -> str:
"""Send a plain text message to a specific room."""
message = str(text or "").strip()
if not message:
return ""
if not room_id:
raise Exception("Matrix room_id missing")
base, token = self._get_homeserver_and_token()
encoded_room = quote(str(room_id), safe="")
txn_id = f"mm_{int(time.time())}_{uuid.uuid4().hex[:8]}"
send_url = f"{base}/_matrix/client/v3/rooms/{encoded_room}/send/m.room.message/{txn_id}"
send_headers = {
"Authorization": f"Bearer {token}"
}
payload = {
"msgtype": "m.text",
"body": message
}
2025-12-29 17:05:03 -08:00
send_resp = requests.put(send_url, headers=send_headers, json=payload)
if send_resp.status_code != 200:
raise Exception(f"Matrix send text failed: {send_resp.text}")
event_id = (send_resp.json() or {}).get("event_id")
return (
f"https://matrix.to/#/{room_id}/{event_id}"
if event_id else f"https://matrix.to/#/{room_id}"
2025-12-29 17:05:03 -08:00
)
def upload(self, file_path: str, **kwargs: Any) -> str:
matrix_conf = self.config.get("provider",
{}).get("matrix",
{})
2025-12-29 17:05:03 -08:00
room_id = matrix_conf.get("room_id")
if not room_id:
raise Exception("Matrix room_id missing")
return self.upload_to_room(file_path, str(room_id))
def selector(
self,
selected_items: List[Any],
*,
ctx: Any,
stage_is_last: bool = True,
**_kwargs: Any
2025-12-29 17:05:03 -08:00
) -> bool:
"""Handle Matrix room selection via `@N`.
If the CLI has a pending upload stash, selecting a room triggers an upload.
"""
if not stage_is_last:
return False
pending = None
try:
pending = ctx.load_value("matrix_pending_uploads", default=None)
except Exception:
pending = None
pending_list = list(pending) if isinstance(pending, list) else []
if not pending_list:
return False
room_ids: List[str] = []
for item in selected_items or []:
rid = None
if isinstance(item, dict):
rid = item.get("room_id") or item.get("id")
else:
rid = getattr(item, "room_id", None) or getattr(item, "id", None)
if rid and str(rid).strip():
room_ids.append(str(rid).strip())
if not room_ids:
print("No Matrix room selected\n")
return True
any_failed = False
for room_id in room_ids:
for payload in pending_list:
try:
file_path = ""
delete_after = False
pipe_obj = None
if isinstance(payload, dict):
file_path = str(payload.get("path") or "")
delete_after = bool(payload.get("delete_after", False))
pipe_obj = payload.get("pipe_obj")
else:
file_path = str(getattr(payload, "path", "") or "")
if not file_path:
any_failed = True
continue
media_path = Path(file_path)
if not media_path.exists():
any_failed = True
print(f"Matrix upload file missing: {file_path}")
continue
link = self.upload_to_room(
str(media_path),
str(room_id),
pipe_obj=pipe_obj
)
2025-12-29 17:05:03 -08:00
if link:
print(link)
if delete_after:
try:
media_path.unlink(missing_ok=True) # type: ignore[arg-type]
except TypeError:
try:
if media_path.exists():
media_path.unlink()
except Exception:
pass
except Exception as exc:
any_failed = True
print(f"Matrix upload failed: {exc}")
try:
ctx.store_value("matrix_pending_uploads", [])
except Exception:
pass
if any_failed:
print("\nOne or more Matrix uploads failed\n")
return True