Files
Medios-Macina/Provider/matrix.py

806 lines
28 KiB
Python
Raw Permalink Normal View History

2025-12-11 19:04:02 -08:00
from __future__ import annotations
import mimetypes
2026-01-09 01:22:06 -08:00
import sys
2025-12-16 01:45:01 -08:00
import time
import uuid
2025-12-11 19:04:02 -08:00
from pathlib import Path
2026-01-09 01:22:06 -08:00
from typing import Any, Dict, Iterable, List, Optional, Tuple
2025-12-16 01:45:01 -08:00
from urllib.parse import quote
2025-12-11 19:04:02 -08:00
import requests
2026-01-09 01:22:06 -08:00
from ProviderCore.base import Provider, SearchResult
from SYS.provider_helpers import TableProviderMixin
from SYS.logger import log
2025-12-11 19:04:02 -08:00
_MATRIX_INIT_CHECK_CACHE: Dict[str,
Tuple[bool,
Optional[str]]] = {}
2025-12-13 12:09:50 -08:00
2025-12-27 14:50:59 -08:00
def _sniff_mime_from_header(path: Path) -> Optional[str]:
2025-12-29 17:05:03 -08:00
"""Best-effort MIME sniffing from file headers.
Used when the file has no/unknown extension (common for exported/temp files).
Keeps dependencies to stdlib only.
"""
try:
if not path.exists() or not path.is_file():
return None
with open(path, "rb") as handle:
header = handle.read(512)
if not header:
return None
# Images
if header.startswith(b"\xff\xd8\xff"):
return "image/jpeg"
if header.startswith(b"\x89PNG\r\n\x1a\n"):
return "image/png"
if header.startswith(b"GIF87a") or header.startswith(b"GIF89a"):
return "image/gif"
if header.startswith(b"BM"):
return "image/bmp"
if header.startswith(b"RIFF") and len(header) >= 12 and header[8:12] == b"WEBP":
return "image/webp"
# Audio
if header.startswith(b"fLaC"):
return "audio/flac"
if header.startswith(b"OggS"):
# Could be audio or video; treat as audio unless extension suggests video.
return "audio/ogg"
if header.startswith(b"ID3"):
return "audio/mpeg"
if len(header) >= 2 and header[0] == 0xFF and (header[1] & 0xE0) == 0xE0:
return "audio/mpeg"
if header.startswith(b"RIFF") and len(header) >= 12 and header[8:12] == b"WAVE":
return "audio/wav"
# Video
if header.startswith(b"RIFF") and len(header) >= 12 and header[8:12] == b"AVI ":
return "video/x-msvideo"
if header.startswith(b"\x1a\x45\xdf\xa3"):
# EBML container: Matroska/WebM.
return "video/x-matroska"
if len(header) >= 12 and header[4:8] == b"ftyp":
# ISO BMFF: mp4/mov/m4a. Default to mp4; extension can refine.
return "video/mp4"
# MPEG-TS / M2TS (sync byte every 188 bytes)
try:
if path.stat().st_size >= 188 * 2 and header[0] == 0x47:
with open(path, "rb") as handle:
handle.seek(188)
b = handle.read(1)
if b == b"\x47":
return "video/mp2t"
except Exception:
pass
return None
except Exception:
return None
def _classify_matrix_upload(path: Path,
*,
explicit_mime_type: Optional[str] = None) -> Tuple[str,
str]:
2025-12-29 17:05:03 -08:00
"""Return (mime_type, msgtype) for Matrix uploads."""
mime_type = str(explicit_mime_type or "").strip() or None
if not mime_type:
# `mimetypes.guess_type` expects a string/URL; Path can return None on some platforms.
mime_type, _ = mimetypes.guess_type(str(path))
if not mime_type:
mime_type = _sniff_mime_from_header(path)
# Refinements based on extension for ambiguous containers.
ext = path.suffix.lower()
if ext in {".m4a",
".aac"}:
2025-12-29 17:05:03 -08:00
mime_type = mime_type or "audio/mp4"
if ext in {".mkv",
".webm"}:
2025-12-29 17:05:03 -08:00
mime_type = mime_type or "video/x-matroska"
if ext in {".ogv"}:
mime_type = mime_type or "video/ogg"
msgtype = "m.file"
if mime_type:
mt = mime_type.casefold()
if mt.startswith("image/"):
msgtype = "m.image"
elif mt.startswith("audio/"):
msgtype = "m.audio"
elif mt.startswith("video/"):
msgtype = "m.video"
# Final fallback for unknown MIME types.
if msgtype == "m.file":
audio_exts = {
".mp3",
".flac",
".wav",
".m4a",
".aac",
".ogg",
".opus",
".wma",
".mka",
".alac",
}
video_exts = {
".mp4",
".mkv",
".webm",
".mov",
".avi",
".flv",
".mpg",
".mpeg",
".ts",
".m4v",
".wmv",
".m2ts",
".mts",
".3gp",
".ogv",
}
image_exts = {".jpg",
".jpeg",
".png",
".gif",
".webp",
".bmp",
".tiff"}
2025-12-29 17:05:03 -08:00
if ext in audio_exts:
msgtype = "m.audio"
elif ext in video_exts:
msgtype = "m.video"
elif ext in image_exts:
msgtype = "m.image"
return (mime_type or "application/octet-stream"), msgtype
2025-12-27 14:50:59 -08:00
2025-12-13 12:09:50 -08:00
def _normalize_homeserver(value: str) -> str:
2025-12-29 17:05:03 -08:00
text = str(value or "").strip()
if not text:
return ""
if not text.startswith("http"):
text = f"https://{text}"
return text.rstrip("/")
def _matrix_health_check(*,
homeserver: str,
access_token: Optional[str]) -> Tuple[bool,
Optional[str]]:
2025-12-29 17:05:03 -08:00
"""Lightweight Matrix reachability/auth validation.
- Always checks `/versions` (no auth).
- If `access_token` is present, also checks `/whoami`.
"""
try:
base = _normalize_homeserver(homeserver)
if not base:
return False, "Matrix homeserver missing"
resp = requests.get(f"{base}/_matrix/client/versions", timeout=5)
if resp.status_code != 200:
return False, f"Homeserver returned {resp.status_code}"
if access_token:
headers = {
"Authorization": f"Bearer {access_token}"
}
2025-12-29 17:05:03 -08:00
resp = requests.get(
f"{base}/_matrix/client/v3/account/whoami",
headers=headers,
timeout=5
2025-12-29 17:05:03 -08:00
)
if resp.status_code != 200:
return False, f"Authentication failed: {resp.status_code}"
return True, None
except Exception as exc:
return False, str(exc)
2025-12-13 12:09:50 -08:00
2026-01-09 01:22:06 -08:00
class Matrix(TableProviderMixin, Provider):
"""Matrix (Element) room provider with file uploads and selection.
This provider uses the new table system (strict ResultTable adapter pattern) for
consistent room listing and selection. It exposes Matrix joined rooms as selectable
rows with metadata enrichment for:
- room_id: Unique Matrix room identifier
- room_name: Human-readable room display name
- _selection_args: For @N expansion control and upload routing
KEY FEATURES:
- Table system: Using ResultTable adapter for strict column/metadata handling
- Room discovery: search() or list_rooms() to enumerate joined rooms
- Selection integration: @N selection triggers upload_to_room() via selector()
- Deferred uploads: Files can be queued for upload to multiple rooms
- MIME detection: Automatic content type classification for Matrix msgtype
SELECTION FLOW:
1. User runs: search-file -provider matrix "room" (or .matrix -list-rooms)
2. Results show available joined rooms
3. User selects rooms: @1 @2 (or @1,2)
4. Selection triggers upload of pending files to selected rooms
"""
2025-12-29 17:05:03 -08:00
2026-01-11 03:24:49 -08:00
@classmethod
def config(cls) -> List[Dict[str, Any]]:
return [
{
"key": "homeserver",
"label": "Homeserver URL",
"default": "https://matrix.org",
"required": True
},
{
"key": "access_token",
"label": "Access Token",
"default": "",
"secret": True
},
{
"key": "password",
"label": "Password (fallback)",
"default": "",
"secret": True
}
]
2025-12-29 17:05:03 -08:00
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
self._init_ok: Optional[bool] = None
self._init_reason: Optional[str] = None
matrix_conf = (
self.config.get("provider",
{}).get("matrix",
{}) if isinstance(self.config,
dict) else {}
2025-12-29 17:05:03 -08:00
)
homeserver = matrix_conf.get("homeserver")
access_token = matrix_conf.get("access_token")
password = matrix_conf.get("password")
# Not configured: keep instance but mark invalid via validate().
# Note: `room_id` is intentionally NOT required, since the CLI can prompt
# the user to select a room dynamically.
if not (homeserver and (access_token or password)):
self._init_ok = None
self._init_reason = None
return
cache_key = f"{_normalize_homeserver(str(homeserver))}|has_token:{bool(access_token)}"
cached = _MATRIX_INIT_CHECK_CACHE.get(cache_key)
if cached is None:
ok, reason = _matrix_health_check(
homeserver=str(homeserver), access_token=str(access_token) if access_token else None
)
_MATRIX_INIT_CHECK_CACHE[cache_key] = (ok, reason)
else:
ok, reason = cached
self._init_ok = ok
self._init_reason = reason
if not ok:
raise Exception(reason or "Matrix unavailable")
def validate(self) -> bool:
if not self.config:
return False
if self._init_ok is False:
return False
matrix_conf = self.config.get("provider",
{}).get("matrix",
{})
2025-12-29 17:05:03 -08:00
return bool(
matrix_conf.get("homeserver")
and (matrix_conf.get("access_token") or matrix_conf.get("password"))
)
2026-01-09 01:22:06 -08:00
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[SearchResult]:
"""Search/list joined Matrix rooms.
If query is empty or "*", returns all joined rooms.
Otherwise, filters rooms by name/ID matching the query.
"""
try:
rooms = self.list_rooms()
except Exception as exc:
log(f"[matrix] Failed to list rooms: {exc}", file=sys.stderr)
return []
q = (query or "").strip().lower()
needle = "" if q in {"*", "all", "list"} else q
results: List[SearchResult] = []
for room in rooms:
if len(results) >= limit:
break
room_id = room.get("room_id") or ""
room_name = room.get("name") or ""
# Filter by query if provided
if needle:
match_text = f"{room_name} {room_id}".lower()
if needle not in match_text:
continue
if not room_id:
continue
display_name = room_name or room_id
results.append(
SearchResult(
table="matrix",
title=display_name,
path=f"matrix:room:{room_id}",
detail=room_id if room_name else "",
annotations=["room"],
media_kind="folder",
columns=[
("Room", display_name),
("ID", room_id),
],
full_metadata={
"room_id": room_id,
"room_name": room_name,
"provider": "matrix",
# Selection metadata for table system and @N expansion
"_selection_args": ["-room-id", room_id],
},
)
)
return results
2025-12-29 17:05:03 -08:00
def _get_homeserver_and_token(self) -> Tuple[str, str]:
matrix_conf = self.config.get("provider",
{}).get("matrix",
{})
2025-12-29 17:05:03 -08:00
homeserver = matrix_conf.get("homeserver")
access_token = matrix_conf.get("access_token")
if not homeserver:
raise Exception("Matrix homeserver missing")
if not access_token:
raise Exception("Matrix access_token missing")
base = _normalize_homeserver(str(homeserver))
if not base:
raise Exception("Matrix homeserver missing")
return base, str(access_token)
def list_joined_room_ids(self) -> List[str]:
"""Return joined room IDs for the current user.
Uses `GET /_matrix/client/v3/joined_rooms`.
"""
base, token = self._get_homeserver_and_token()
headers = {
"Authorization": f"Bearer {token}"
}
resp = requests.get(
f"{base}/_matrix/client/v3/joined_rooms",
headers=headers,
timeout=10
)
2025-12-29 17:05:03 -08:00
if resp.status_code != 200:
raise Exception(f"Matrix joined_rooms failed: {resp.text}")
data = resp.json() or {}
rooms = data.get("joined_rooms") or []
out: List[str] = []
for rid in rooms:
if not isinstance(rid, str) or not rid.strip():
continue
out.append(rid.strip())
return out
def list_rooms(self,
*,
room_ids: Optional[List[str]] = None) -> List[Dict[str,
Any]]:
2025-12-29 17:05:03 -08:00
"""Return joined rooms, optionally limited to a subset.
Performance note: room names require additional per-room HTTP requests.
If `room_ids` is provided, only those rooms will have name lookups.
"""
base, token = self._get_homeserver_and_token()
headers = {
"Authorization": f"Bearer {token}"
}
2025-12-29 17:05:03 -08:00
joined = self.list_joined_room_ids()
if room_ids:
allowed = {str(v).strip().casefold()
for v in room_ids if str(v).strip()}
2025-12-29 17:05:03 -08:00
if allowed:
# Accept either full IDs (!id:hs) or short IDs (!id).
def _is_allowed(rid: str) -> bool:
r = str(rid or "").strip()
if not r:
return False
rc = r.casefold()
if rc in allowed:
return True
short = r.split(":", 1)[0].strip().casefold()
return bool(short) and short in allowed
joined = [rid for rid in joined if _is_allowed(rid)]
out: List[Dict[str, Any]] = []
for room_id in joined:
name = ""
# Best-effort room name lookup (safe to fail).
try:
encoded = quote(room_id, safe="")
name_resp = requests.get(
f"{base}/_matrix/client/v3/rooms/{encoded}/state/m.room.name",
headers=headers,
timeout=5,
)
if name_resp.status_code == 200:
payload = name_resp.json() or {}
maybe = payload.get("name")
if isinstance(maybe, str):
name = maybe
except Exception:
pass
out.append({
"room_id": room_id,
"name": name
})
2025-12-29 17:05:03 -08:00
return out
def upload_to_room(self, file_path: str, room_id: str, **kwargs: Any) -> str:
"""Upload a file and send it to a specific room."""
from SYS.models import ProgressFileReader
2025-12-29 17:05:03 -08:00
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
if not room_id:
raise Exception("Matrix room_id missing")
base, token = self._get_homeserver_and_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/octet-stream",
}
mime_type, msgtype = _classify_matrix_upload(
path, explicit_mime_type=kwargs.get("mime_type")
)
headers["Content-Type"] = mime_type
filename = path.name
# Upload media
upload_url = f"{base}/_matrix/media/v3/upload"
with open(path, "rb") as handle:
wrapped = ProgressFileReader(
handle,
total_bytes=int(path.stat().st_size),
label="upload"
2025-12-29 17:05:03 -08:00
)
resp = requests.post(
upload_url,
headers=headers,
data=wrapped,
params={
"filename": filename
}
2025-12-29 17:05:03 -08:00
)
if resp.status_code != 200:
raise Exception(f"Matrix upload failed: {resp.text}")
content_uri = (resp.json() or {}).get("content_uri")
if not content_uri:
raise Exception("No content_uri returned")
# Build a fragment-free URL suitable for storage backends.
# `matrix.to` links use fragments (`#/...`) which some backends normalize away.
download_url_for_store = ""
try:
curi = str(content_uri or "").strip()
if curi.startswith("mxc://"):
rest = curi[len("mxc://"):]
2025-12-29 17:05:03 -08:00
if "/" in rest:
server_name, media_id = rest.split("/", 1)
server_name = str(server_name).strip()
media_id = str(media_id).strip()
if server_name and media_id:
download_url_for_store = f"{base}/_matrix/media/v3/download/{quote(server_name, safe='')}/{quote(media_id, safe='')}"
except Exception:
download_url_for_store = ""
info = {
"mimetype": mime_type,
"size": path.stat().st_size
}
payload = {
"msgtype": msgtype,
"body": filename,
"url": content_uri,
"info": info
}
2025-12-29 17:05:03 -08:00
# Correct Matrix client API send endpoint requires a transaction ID.
txn_id = f"mm_{int(time.time())}_{uuid.uuid4().hex[:8]}"
encoded_room = quote(str(room_id), safe="")
send_url = f"{base}/_matrix/client/v3/rooms/{encoded_room}/send/m.room.message/{txn_id}"
send_headers = {
"Authorization": f"Bearer {token}"
}
2025-12-29 17:05:03 -08:00
send_resp = requests.put(send_url, headers=send_headers, json=payload)
if send_resp.status_code != 200:
raise Exception(f"Matrix send message failed: {send_resp.text}")
event_id = (send_resp.json() or {}).get("event_id")
link = (
f"https://matrix.to/#/{room_id}/{event_id}"
if event_id else f"https://matrix.to/#/{room_id}"
2025-12-29 17:05:03 -08:00
)
# Optional: if a PipeObject is provided and it already has store+hash,
# attach the uploaded URL back to the stored file.
try:
pipe_obj = kwargs.get("pipe_obj")
if pipe_obj is not None:
from Store import Store
# Prefer the direct media download URL for storage backends.
Store(
self.config,
suppress_debug=True
).try_add_url_for_pipe_object(
2025-12-29 17:05:03 -08:00
pipe_obj,
download_url_for_store or link,
)
except Exception:
pass
return link
def send_text_to_room(self, text: str, room_id: str) -> str:
"""Send a plain text message to a specific room."""
message = str(text or "").strip()
if not message:
return ""
if not room_id:
raise Exception("Matrix room_id missing")
base, token = self._get_homeserver_and_token()
encoded_room = quote(str(room_id), safe="")
txn_id = f"mm_{int(time.time())}_{uuid.uuid4().hex[:8]}"
send_url = f"{base}/_matrix/client/v3/rooms/{encoded_room}/send/m.room.message/{txn_id}"
send_headers = {
"Authorization": f"Bearer {token}"
}
payload = {
"msgtype": "m.text",
"body": message
}
2025-12-29 17:05:03 -08:00
send_resp = requests.put(send_url, headers=send_headers, json=payload)
if send_resp.status_code != 200:
raise Exception(f"Matrix send text failed: {send_resp.text}")
event_id = (send_resp.json() or {}).get("event_id")
return (
f"https://matrix.to/#/{room_id}/{event_id}"
if event_id else f"https://matrix.to/#/{room_id}"
2025-12-29 17:05:03 -08:00
)
def upload(self, file_path: str, **kwargs: Any) -> str:
matrix_conf = self.config.get("provider",
{}).get("matrix",
{})
2025-12-29 17:05:03 -08:00
room_id = matrix_conf.get("room_id")
if not room_id:
raise Exception("Matrix room_id missing")
return self.upload_to_room(file_path, str(room_id))
def selector(
self,
selected_items: List[Any],
*,
ctx: Any,
stage_is_last: bool = True,
**_kwargs: Any
2025-12-29 17:05:03 -08:00
) -> bool:
"""Handle Matrix room selection via `@N`.
If the CLI has a pending upload stash, selecting a room triggers an upload.
"""
if not stage_is_last:
return False
pending = None
try:
pending = ctx.load_value("matrix_pending_uploads", default=None)
except Exception:
pending = None
pending_list = list(pending) if isinstance(pending, list) else []
if not pending_list:
return False
room_ids: List[str] = []
for item in selected_items or []:
rid = None
if isinstance(item, dict):
rid = item.get("room_id") or item.get("id")
else:
rid = getattr(item, "room_id", None) or getattr(item, "id", None)
if rid and str(rid).strip():
room_ids.append(str(rid).strip())
if not room_ids:
print("No Matrix room selected\n")
return True
any_failed = False
for room_id in room_ids:
for payload in pending_list:
try:
file_path = ""
delete_after = False
pipe_obj = None
if isinstance(payload, dict):
file_path = str(payload.get("path") or "")
delete_after = bool(payload.get("delete_after", False))
pipe_obj = payload.get("pipe_obj")
else:
file_path = str(getattr(payload, "path", "") or "")
if not file_path:
any_failed = True
continue
media_path = Path(file_path)
if not media_path.exists():
any_failed = True
print(f"Matrix upload file missing: {file_path}")
continue
link = self.upload_to_room(
str(media_path),
str(room_id),
pipe_obj=pipe_obj
)
2025-12-29 17:05:03 -08:00
if link:
print(link)
if delete_after:
try:
media_path.unlink(missing_ok=True) # type: ignore[arg-type]
except TypeError:
try:
if media_path.exists():
media_path.unlink()
except Exception:
pass
except Exception as exc:
any_failed = True
print(f"Matrix upload failed: {exc}")
try:
ctx.store_value("matrix_pending_uploads", [])
except Exception:
pass
if any_failed:
print("\nOne or more Matrix uploads failed\n")
return True
2026-01-09 01:22:06 -08:00
# Minimal provider registration for the new table system
try:
from SYS.result_table_adapters import register_provider
from SYS.result_table_api import ResultModel, ColumnSpec, metadata_column, title_column
def _convert_search_result_to_model(sr: Any) -> ResultModel:
"""Convert Matrix SearchResult to ResultModel for strict table display."""
d = sr.to_dict() if hasattr(sr, "to_dict") else (sr if isinstance(sr, dict) else {"title": getattr(sr, "title", str(sr))})
title = d.get("title") or ""
path = d.get("path") or None
columns = d.get("columns") or getattr(sr, "columns", None) or []
# Extract metadata from columns and full_metadata
metadata: Dict[str, Any] = {}
for name, value in columns:
key = str(name or "").strip().lower()
if key in ("room_id", "room_name", "id", "name"):
metadata[key] = value
try:
fm = d.get("full_metadata") or {}
if isinstance(fm, dict):
for k, v in fm.items():
metadata[str(k).strip().lower()] = v
except Exception:
pass
return ResultModel(
title=str(title),
path=str(path) if path else None,
ext=None,
size_bytes=None,
metadata=metadata,
source="matrix"
)
def _adapter(items: Iterable[Any]) -> Iterable[ResultModel]:
"""Adapter to convert SearchResults to ResultModels."""
for it in items:
try:
yield _convert_search_result_to_model(it)
except Exception:
continue
def _has_metadata(rows: List[ResultModel], key: str) -> bool:
"""Check if any row has a given metadata key with a non-empty value."""
for row in rows:
md = row.metadata or {}
if key in md:
val = md[key]
if val is None:
continue
if isinstance(val, str) and not val.strip():
continue
return True
return False
def _columns_factory(rows: List[ResultModel]) -> List[ColumnSpec]:
"""Build column specifications from available metadata in rows."""
cols = [title_column()]
if _has_metadata(rows, "room_id"):
cols.append(metadata_column("room_id", "Room ID"))
if _has_metadata(rows, "room_name"):
cols.append(metadata_column("room_name", "Name"))
return cols
def _selection_fn(row: ResultModel) -> List[str]:
"""Return selection args for @N expansion and room selection.
Uses explicit -room-id flag to identify the selected room for file uploads.
"""
metadata = row.metadata or {}
# Check for explicit selection args first
args = metadata.get("_selection_args") or metadata.get("selection_args")
if isinstance(args, (list, tuple)) and args:
return [str(x) for x in args if x is not None]
# Fallback to room_id
room_id = metadata.get("room_id")
if room_id:
return ["-room-id", str(room_id)]
return ["-title", row.title or ""]
register_provider(
"matrix",
_adapter,
columns=_columns_factory,
selection_fn=_selection_fn,
metadata={"description": "Matrix room provider with file uploads"},
)
except Exception:
# best-effort registration
pass