redlight/redlight_client_module.py

107 lines
3.5 KiB
Python
Raw Normal View History

import logging
import hashlib
import json
from typing import Union
from synapse.module_api import ModuleApi, NOT_SPAM
from synapse.api.errors import AuthError
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
from twisted.internet import reactor
from twisted.internet import defer
from twisted.web.iweb import IBodyProducer
from zope.interface import implementer
# Define a handler and set its level and format
file_handler = logging.FileHandler('/var/log/matrix-synapse/redlight.log')
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
# Get your logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(file_handler)
# Ensure that this logger's messages don't propagate to the root logger
logger.propagate = False
@implementer(IBodyProducer)
class _JsonProducer:
def __init__(self, data):
self._data = json.dumps(data).encode("utf-8")
self.length = len(self._data)
def startProducing(self, consumer):
consumer.write(self._data)
return defer.succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass
class RedlightClientModule:
def __init__(self, config: dict, api: ModuleApi):
self._api = api
self._redlight_url = config.get("redlight_url", "https://duckdomain.xyz/_matrix/loj/v1/abuse_lookup")
self._agent = Agent(reactor)
logger.info("RedLightClientModule initialized.")
api.register_spam_checker_callbacks(
user_may_join_room=self.user_may_join_room
)
@staticmethod
def double_hash_sha256(data: str) -> str:
first_hash = hashlib.sha256(data.encode()).digest()
double_hashed = hashlib.sha256(first_hash).hexdigest()
return double_hashed
async def user_may_join_room(
self, user: str, room: str, is_invited: bool
) -> Union["synapse.module_api.NOT_SPAM", "synapse.module_api.errors.Codes"]:
logger.info(f"User {user} is attempting to join room {room}. Invitation status: {is_invited}.")
hashed_room_id = self.double_hash_sha256(room)
hashed_user_id = self.double_hash_sha256(user)
body = _JsonProducer({
"room_id_hash": hashed_room_id,
"user_id_hash": hashed_user_id
})
response = await self._agent.request(
b"PUT",
self._redlight_url.encode(),
Headers({'Content-Type': [b'application/json']}),
body
)
response_body_bytes = await readBody(response)
response_body = response_body_bytes.decode("utf-8")
try:
response_json = json.loads(response_body)
except json.JSONDecodeError:
logger.error(f"Failed to decode response body: {response_body}")
#return NOT_SPAM # default to allowing if there's an error
if response.code == 200:
raise AuthError(403, "User not allowed to join this room")
elif response.code == 204:
return NOT_SPAM
else:
logger.error(f"Unexpected response code {response.code} with body: {response_body}")
return NOT_SPAM # default to allowing if there's an unexpected response
def parse_config(config: dict) -> dict:
return config
def create_module(api: ModuleApi, config: dict) -> RedlightClientModule:
return RedlightClientModule(config, api)