add incident report function for automatically alerting other homeserver administrators, move lookup_homeserver_admin_email() function to report_commands.py

This commit is contained in:
PC-Admin 2023-07-24 06:02:27 +08:00
parent 6454a2e9f1
commit b2bbacc46d
4 changed files with 168 additions and 59 deletions

View File

@ -11,10 +11,12 @@ rdlist_bot_username = "mod_team" # The username to perform automated room shutdo
rdlist_recommended_tags = ['hub_room_links', 'hub_room_trade', 'preban', 'degen_misc', 'beastiality', 'degen_porn', 'gore', 'snuff', 'degen_larp', 'hub_room_sussy', 'bot_spam', 'cfm', 'jailbait', 'bot_porn', 'toddlercon', 'loli', 'csam', 'tfm', 'degen_meet', 'stylized_3d_loli', '3d_loli'] rdlist_recommended_tags = ['hub_room_links', 'hub_room_trade', 'preban', 'degen_misc', 'beastiality', 'degen_porn', 'gore', 'snuff', 'degen_larp', 'hub_room_sussy', 'bot_spam', 'cfm', 'jailbait', 'bot_porn', 'toddlercon', 'loli', 'csam', 'tfm', 'degen_meet', 'stylized_3d_loli', '3d_loli']
# report generator # report generator
report_folder = "./reports" # Reports folder name report_folder = "./reports" # Reports folder name
testing_mode = True # For testing this report generator, set this to True
# email settings # email settings
smtp_user = "abuse@matrix.example.org" smtp_user = "abuse@matrix.example.org"
smtp_password = "strong-stmp-password" smtp_password = "strong-stmp-password"
smtp_server = "smtp.provider.org" smtp_server = "smtp.provider.org"
smtp_port = 587 smtp_port = 587
report_return_email = "youradminemail@example.org"
########################################################################### ###########################################################################

View File

@ -57,7 +57,7 @@ while pass_token == False:
print("40) Delete and block a specific media.\t\t\t\t70) Generate user report.") print("40) Delete and block a specific media.\t\t\t\t70) Generate user report.")
print("41) Purge remote media repository up to a certain date.\t\t71) Decrypt user report .zip file.") print("41) Purge remote media repository up to a certain date.\t\t71) Decrypt user report .zip file.")
print("42) Prepare database for copying events of multiple rooms.\t72) Send a test email.") print("42) Prepare database for copying events of multiple rooms.\t72) Send a test email.")
print("43) Lookup homeserver admin contact email.") print("43) Lookup homeserver admin contact email.\t\t\t73) Send a test incident report to yourself.")
print("\n#### rdlist ####") print("\n#### rdlist ####")
print("50) Block all rooms with specific rdlist tags.") print("50) Block all rooms with specific rdlist tags.")
print("51) Block all rooms with recommended rdlist tags.") print("51) Block all rooms with recommended rdlist tags.")
@ -141,7 +141,7 @@ while pass_token == False:
elif menu_input == "42": elif menu_input == "42":
server_commands.prepare_database_copy_of_multiple_rooms() server_commands.prepare_database_copy_of_multiple_rooms()
elif menu_input == "43": elif menu_input == "43":
server_commands.lookup_homeserver_admin_email('') report_commands.lookup_homeserver_admin_email('')
elif menu_input == "50": elif menu_input == "50":
rdlist_commands.block_all_rooms_with_rdlist_tags(False,'','','','','') rdlist_commands.block_all_rooms_with_rdlist_tags(False,'','','','','')
elif menu_input == "51": elif menu_input == "51":
@ -156,6 +156,8 @@ while pass_token == False:
report_commands.decrypt_zip_file() report_commands.decrypt_zip_file()
elif menu_input == "72": elif menu_input == "72":
report_commands.test_send_email() report_commands.test_send_email()
elif menu_input == "73":
report_commands.test_send_incident_report()
elif menu_input == "q" or menu_input == "Q" or menu_input == "e" or menu_input == "E": elif menu_input == "q" or menu_input == "Q" or menu_input == "e" or menu_input == "E":
print("\nExiting...\n") print("\nExiting...\n")
pass_token = True pass_token = True

View File

@ -1,12 +1,14 @@
import os import os
import json import json
import whois
import random import random
import string import string
import datetime import datetime
import zipfile import zipfile
import pyAesCrypt import pyAesCrypt
import smtplib import smtplib
import requests
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase from email.mime.base import MIMEBase
from email.mime.text import MIMEText from email.mime.text import MIMEText
@ -17,9 +19,6 @@ import room_commands
import ipinfo_commands import ipinfo_commands
import hardcoded_variables import hardcoded_variables
# For testing the Report Generator, set this to True
testing_mode = False
def get_report_folder(): def get_report_folder():
# Get report_folder from hardcoded_variables # Get report_folder from hardcoded_variables
report_folder = hardcoded_variables.report_folder report_folder = hardcoded_variables.report_folder
@ -130,7 +129,7 @@ def generate_user_report(preset_username):
count += 1 count += 1
room = room.split(" ")[0] room = room.split(" ")[0]
room_commands.export_room_state(room, room_states_folder) room_commands.export_room_state(room, room_states_folder)
if count > 4 and testing_mode == True: if count > 4 and hardcoded_variables.testing_mode == True:
break break
# For each room the user is in, get the room details and write to ./report/username/room_details/ # For each room the user is in, get the room details and write to ./report/username/room_details/
@ -146,7 +145,7 @@ def generate_user_report(preset_username):
room_details_file = open(room_details_folder + room + ".json", "w") room_details_file = open(room_details_folder + room + ".json", "w")
room_details_file.write(str(room_details)) room_details_file.write(str(room_details))
room_details_file.close() room_details_file.close()
if count > 4 and testing_mode == True: if count > 4 and hardcoded_variables.testing_mode == True:
break break
# Generate a random password, then encrypt the ./report/username/ folder to a timestamped .zip file # Generate a random password, then encrypt the ./report/username/ folder to a timestamped .zip file
@ -161,6 +160,8 @@ def generate_user_report(preset_username):
print("Encrypted .zip file location: " + encrypted_zip_file_name) print("Encrypted .zip file location: " + encrypted_zip_file_name)
print("Encrypted .zip file size: " + str(encrypted_zip_file_size) + " MB\n") print("Encrypted .zip file size: " + str(encrypted_zip_file_size) + " MB\n")
return encrypted_zip_file_name, strong_password
def decrypt_zip_file(): def decrypt_zip_file():
# Ask user for the location of the encrypted .zip file # Ask user for the location of the encrypted .zip file
encrypted_zip_file_name = input("\nPlease enter the location of the encrypted .zip file: ") encrypted_zip_file_name = input("\nPlease enter the location of the encrypted .zip file: ")
@ -171,6 +172,56 @@ def decrypt_zip_file():
# Print the location of the decrypted ZIP file # Print the location of the decrypted ZIP file
print("\nDecrypted .zip file location: " + encrypted_zip_file_name[:-4] + "\n") print("\nDecrypted .zip file location: " + encrypted_zip_file_name[:-4] + "\n")
def lookup_homeserver_admin_email(preset_baseurl):
if preset_baseurl == '':
baseurl = input("\nEnter the base URL to collect the admin contact details (Example: matrix.org): ")
elif preset_baseurl != '':
baseurl = preset_baseurl
# If baseurl is matrix.org, return 'abuse@matrix.org' as a hardcoded response
if baseurl == "matrix.org":
print("\nAdmin contact email(s) for " + baseurl + " are: abuse@matrix.org")
return {"matrix.org": ["abuse@matrix.org"]}, False
# Check target homserver for MSC1929 support email
url = f"https://{baseurl}/.well-known/matrix/support"
try:
response = requests.get(url)
except requests.exceptions.RequestException as e:
print(f"Error: Unable to connect to server {baseurl}. Trying WHOIS data...")
response = None
# If the request was successful, the status code will be 200
if response and response.status_code == 200:
# Parse the response as JSON
data = json.loads(response.text)
# Extract the emails from the admins field and remove duplicates
admin_emails = list({admin['email_address'] for admin in data['admins']})
print("\nAdmin contact emails for " + baseurl + " are: " + str(admin_emails))
# Create a dictionary with baseurl as key and emails as value
email_dict = {baseurl: admin_emails}
return email_dict, False
else:
print(f"Error: Unable to collect admin email from server {baseurl}")
print("Attempting to collect admin email from WHOIS data...")
# Get WHOIS data
try:
w = whois.whois(baseurl)
if w.emails:
print("\nAdmin contact email(s) for " + baseurl + " are: " + str(w.emails))
return {baseurl: list(w.emails)}, True
else:
print(f"Error: Unable to collect admin email from WHOIS data for {baseurl}")
return None, False
except:
print(f"Error: Unable to collect WHOIS data for {baseurl}")
return None, False
def send_email(email_address, email_subject, email_content, email_attachments): def send_email(email_address, email_subject, email_content, email_attachments):
assert isinstance(email_attachments, list) assert isinstance(email_attachments, list)
@ -216,3 +267,109 @@ def test_send_email():
print("\nEmail successfully sent.") print("\nEmail successfully sent.")
else: else:
print("\nFailed to send email.") print("\nFailed to send email.")
def send_incident_report(full_username, room_id, rdlist_tags):
# First extract the baseurl from the username, for example '@billybob:matrix.org' becomes 'matrix.org'
baseurl = full_username.split(":")[1]
# Use the lookup function to get the admin's email
if hardcoded_variables.testing_mode == True:
admin_email_dict = {hardcoded_variables.base_url: [hardcoded_variables.report_return_email]}
print("admin_email_dict: " + str(admin_email_dict))
from_whois = True
elif hardcoded_variables.testing_mode == False:
admin_email_dict, from_whois = lookup_homeserver_admin_email(baseurl)
# If no admin emails are found, return False
if not admin_email_dict or baseurl not in admin_email_dict:
print(f"Unable to find any admin emails for {baseurl}")
return False
# Prepare the incident report email content
email_subject = f"Incident Report for user: {full_username}"
email_content = f"""Dear Administrator,
We regret to inform you that your user {full_username} has been involved in an incident in the following room: {room_id}
In that room they were exposed to the following content: {', '.join(rdlist_tags)}
We request your immediate attention to this matter. It is recommended that you:
- Generate a report on this user's account and send it to law enforcement.
- Block and purge this room from your homeserver.
- Deactivate this users account.
All of these actions can be done automatically using this moderation tool:
https://github.com/PC-Admin/matrix-moderation-tool
**********************************************************************
THIS EMAIL IS UNMONITORED, PLEASE DO NOT REPLY TO IT
**********************************************************************
To contact us please email {hardcoded_variables.report_return_email}.
Thank you for helping us make Matrix safer.
Best regards,
Incident Report Team
https://{hardcoded_variables.base_url}
"""
# If email address is collected from WHOIS data, append an extra paragraph
if from_whois:
email_content += f"""\n\n**********************************************************************
ATTENTION DOMAIN REGISTRAR, YOUR ACTION IS REQUIRED HERE
**********************************************************************
\nThis email is intended for the owner of the domain {baseurl}, but they have not published their email address.
\nAs the recipient of this email, you have a legal obligation to ensure that this email reaches them.
\nTo avoid receiving these emails in the future, please ask them to configure MSC1929 for their Matrix server:
https://github.com/matrix-org/matrix-spec-proposals/pull/1929
"""
# Confidentiality warning
confidentiality_warning = """\n\n**********************************************************************
WARNING! CONFIDENTIALITY NOTICE!
\nThis email message and any attached files may contain information
that is confidential and subject of legal privilege intended only for
use by the individual or entity to whom they are addressed. If you
are not the intended recipient or the person responsible for
delivering the message to the intended recipient be advised that you
have received this message in error and that any use, copying,
circulation, forwarding, printing or publication of this message or
attached files is strictly forbidden, as is the disclosure of the
information contained therein. If you have received this message in
error, please notify the sender immediately and delete it from your
inbox.
\n**********************************************************************
"""
# Append the confidentiality warning
email_content += confidentiality_warning
# Prepare the email attachments. This can be modified based on what you want to attach.
email_attachments = []
# Loop over each admin email address and send them the email
success = True
for email_address in admin_email_dict[baseurl]:
if not send_email(email_address, email_subject, email_content, email_attachments):
print(f"Failed to send email to {email_address}")
success = False
return success
def test_send_incident_report():
# Preset the parameters
full_username = f"@billybob:{hardcoded_variables.base_url}"
room_id = "!dummyid:matrix.org"
rdlist_tags = ["csam", "lolicon", "beastiality"]
# Try to send the incident report
try:
if hardcoded_variables.testing_mode == True:
print("\nWARNING: TESTING MODE ENABLED, SENDING EMAIL TO: " + hardcoded_variables.report_return_email + "\n")
if send_incident_report(full_username, room_id, rdlist_tags):
print("\nIncident report successfully sent.")
else:
print("\nFailed to send the incident report.")
except Exception as e:
print(f"\nFailed to send incident report: {e}")

View File

@ -3,8 +3,6 @@ import os
import subprocess import subprocess
import csv import csv
import time import time
import json
import whois
import requests import requests
import datetime import datetime
import hardcoded_variables import hardcoded_variables
@ -138,53 +136,3 @@ def prepare_database_copy_of_multiple_rooms():
print(chown_command_process.stdout) print(chown_command_process.stdout)
print("\nThe sql query files have been generated, as postgres user in container run:\n# docker exec -it matrix-postgres /bin/bash\nbash-5.0$ export PGPASSWORD=your-db-password\nbash-5.0$ for f in /var/lib/postgresql/data/ramdisk/*/dump_room_data.sql; do psql --host=127.0.0.1 --port=5432 --username=synapse -w -f $f; done\n\nAfter copying the data to a cloud location law enforcement can access, clean up the ramdisk like so:\n# rm -r /matrix/postgres/data/ramdisk/*\n# umount /matrix/postgres/data/ramdisk") print("\nThe sql query files have been generated, as postgres user in container run:\n# docker exec -it matrix-postgres /bin/bash\nbash-5.0$ export PGPASSWORD=your-db-password\nbash-5.0$ for f in /var/lib/postgresql/data/ramdisk/*/dump_room_data.sql; do psql --host=127.0.0.1 --port=5432 --username=synapse -w -f $f; done\n\nAfter copying the data to a cloud location law enforcement can access, clean up the ramdisk like so:\n# rm -r /matrix/postgres/data/ramdisk/*\n# umount /matrix/postgres/data/ramdisk")
def lookup_homeserver_admin_email(preset_baseurl):
if preset_baseurl == '':
baseurl = input("\nEnter the base URL to collect the admin contact details (Example: matrix.org): ")
elif preset_baseurl != '':
baseurl = preset_baseurl
# If baseurl is matrix.org, return 'abuse@matrix.org' as a hardcoded response
if baseurl == "matrix.org":
print("\nAdmin contact email(s) for " + baseurl + " are: abuse@matrix.org")
return {"matrix.org": ["abuse@matrix.org"]}, False
# Check target homserver for MSC1929 support email
url = f"https://{baseurl}/.well-known/matrix/support"
try:
response = requests.get(url)
except requests.exceptions.RequestException as e:
print(f"Error: Unable to connect to server {baseurl}. Trying WHOIS data...")
response = None
# If the request was successful, the status code will be 200
if response and response.status_code == 200:
# Parse the response as JSON
data = json.loads(response.text)
# Extract the emails from the admins field and remove duplicates
admin_emails = list({admin['email_address'] for admin in data['admins']})
print("\nAdmin contact emails for " + baseurl + " are: " + str(admin_emails))
# Create a dictionary with baseurl as key and emails as value
email_dict = {baseurl: admin_emails}
return email_dict, False
else:
print(f"Error: Unable to collect admin email from server {baseurl}")
print("Attempting to collect admin email from WHOIS data...")
# Get WHOIS data
try:
w = whois.whois(baseurl)
if w.emails:
print("\nAdmin contact email(s) for " + baseurl + " are: " + str(w.emails))
return {baseurl: list(w.emails)}, True
else:
print(f"Error: Unable to collect admin email from WHOIS data for {baseurl}")
return None, False
except:
print(f"Error: Unable to collect WHOIS data for {baseurl}")
return None, False