dont promote rdlist bot user to server admin, there's no point yet. add matrix messaging for incident reports via matrix-nio. hard code rdlist bot password.

This commit is contained in:
PC-Admin 2023-07-29 23:46:32 +08:00
parent 81739a456d
commit 1598d73599
6 changed files with 221 additions and 113 deletions

View File

@ -86,7 +86,7 @@ To do:
- https://matrix-org.github.io/synapse/latest/admin_api/user_admin_api.html#find-a-user-based-on-their-third-party-id-threepid-or-3pid
- https://github.com/matrix-org/synapse/blob/master/docs/admin_api/delete_group.md
2) Add fully automated (should just return a web link and decryption password) reporting functions for users:
- Description of why the report was made (what happened)
- Description of why the report was made (what happened), include key information
- User's ID - DONE
- Whois Data - DONE
- Account Data - DONE
@ -97,18 +97,16 @@ To do:
- Any other usernames associated with that IP
- Timestamp for when illegal material was accessed
- Description of report format and contents (to guide the reader)
- Summary of key information
- Collect state event dumps of recently read rooms as well (as they may have looked at other suss rooms recently)
3) Have recommended rdlist function:
- return a list of offending accounts and the tags they accessed (for creating incident_dict's)
- add the shadowban function to prevent members alerting others after mass shutdowns
- add the shadowban function to prevent members alerting others after mass shutdowns - DONE
4) Only email reportID in incident report?
5) Add a room report function to create a properly formatted report for rdlist
6) Skip already shutdown rooms for speeding up rdlist blocking
7) Add function for probing the support email of another server automatically
8) Automated incident report email to other server owners who has users in rdlist rooms for more scalable coordination
9) Automated public room joining and reminder if reporting email is not available?
10) Refine ipinfo module to also return region/state of IP
6) Expand the incident reporting to also issue reports over Matrix
7) Automated public room joining and reminder if reporting email is not available?
8) Refine ipinfo module to also return extra details about the IP
9) Make existing functions compatible with JSON formatted inputs
***

62
bot_commands.py Normal file
View File

@ -0,0 +1,62 @@
from nio import AsyncClient, RoomCreateResponse, LoginResponse
import asyncio
import hardcoded_variables
async def create_session(username: str, password: str, homeserver: str) -> AsyncClient:
client = AsyncClient(homeserver, username)
response = await client.login(password)
if isinstance(response, LoginResponse):
await client.sync() # Perform a sync after login
return client
raise Exception(f"Failed to log in: {response}")
async def create_room(client: AsyncClient, name: str) -> str:
response = await client.room_create(name=name)
if isinstance(response, RoomCreateResponse):
return response.room_id
raise Exception(f"Failed to create room: {response}")
async def invite_user(client: AsyncClient, room_id: str, user_id: str):
response = await client.room_invite(room_id, user_id)
if not response:
raise Exception(f"Failed to invite user: {response}")
async def send_message(receiver: str, message: str):
homeserver = "https://" + hardcoded_variables.homeserver_url
client = await create_session(hardcoded_variables.rdlist_bot_username, hardcoded_variables.rdlist_bot_password, homeserver)
try:
# Check if room with the receiver already exists
for room in client.rooms.values():
if receiver in room.users and len(room.users) == 2:
room_id = room.room_id
break
else:
# Create new room if it doesn't exist
room_id = await create_room(client, "Incident Report")
await invite_user(client, room_id, receiver)
content = {
"msgtype": "m.text",
"body": message,
}
response = await client.room_send(room_id, message_type="m.room.message", content=content)
if not response:
raise Exception(f"Failed to send message: {response}")
finally:
await client.close()
def test_matrix_message():
async def main():
receiver = hardcoded_variables.report_return_mxid
message = "Hello! This is a test message. Please ignore it."
await send_message(receiver, message)
print("\nMessage successfully sent.")
asyncio.get_event_loop().run_until_complete(main())

View File

@ -8,15 +8,17 @@ access_token = "" # Your homeserver admin access token
ipinfo_token = "" # Leave blank to disable ipinfo.io lookups
# rdlist specific
rdlist_bot_username = "mod_team" # The username to perform automated room shutdowns
rdlist_bot_username = "strong-password" # The password for this user
rdlist_recommended_tags = ['hub_room_links', 'hub_room_trade', 'preban', 'degen_misc', 'beastiality', 'degen_porn', 'gore', 'snuff', 'degen_larp', 'hub_room_sussy', 'bot_spam', 'cfm', 'jailbait', 'bot_porn', 'toddlercon', 'loli', 'csam', 'tfm', 'degen_meet', 'stylized_3d_loli', '3d_loli']
# User report generator
report_folder = "./reports" # Reports folder name
testing_mode = True # For testing this report generator, set this to True
testing_mode = True # Prevents the incident report feature from messaging/emailing anyone besides you, also limits the number of room states are exported when generating user reports.
# Incident report email settings
smtp_user = "abuse@matrix.example.org"
smtp_password = "strong-stmp-password"
smtp_server = "smtp.provider.org"
smtp_port = 587
report_return_email = "youradminemail@example.org"
incident_report_return_email = "youradminemail@example.org"
incident_report_return_mxid = "@yourmxid:example.com
###########################################################################

View File

@ -6,6 +6,7 @@ import server_commands
import ipinfo_commands
import rdlist_commands
import report_commands
import bot_commands
import hardcoded_variables
# check if homeserver url is hard coded, if not set it
@ -59,9 +60,9 @@ while pass_token == False:
print("100) Delete and block a specific media.\t\t\t\t150) Generate user report.")
print("101) Purge remote media repository up to a certain date.\t151) Decrypt user report .zip file.")
print("102) Prepare database for copying events of multiple rooms.\t152) Lookup homeserver admin contact email.")
print("\t\t\t\t\t\t\t\t153) Send a test email.")
print("#### rdlist ####\t\t\t\t\t\t154) Send test incident reports to yourself.")
print("120) Block all rooms with specific rdlist tags.")
print("\t\t\t\t\t\t\t\t153) Send a test email (to yourself).")
print("#### rdlist ####\t\t\t\t\t\t154) Sent a test Matrix message (to yourself).")
print("120) Block all rooms with specific rdlist tags.\t\t\t155) Send test incident reports (to yourself).")
print("121) Block all rooms with recommended rdlist tags.")
print("122) Get rdlist tags for a room.")
print("\n#### ipinfo.io ####")
@ -188,10 +189,13 @@ while pass_token == False:
elif menu_input == "151":
report_commands.decrypt_zip_file()
elif menu_input == "152":
report_commands.lookup_homeserver_admin_email('')
admin_contact_dict, is_whois = report_commands.lookup_homeserver_admin('')
print(f"\nAdmin contacts: {json.dumps(admin_contact_dict, indent=4, sort_keys=True)}\nWhois: {str(is_whois)}")
elif menu_input == "153":
report_commands.test_send_email()
elif menu_input == "154":
bot_commands.test_matrix_message()
elif menu_input == "155":
report_commands.test_send_incident_reports()
elif menu_input == "q" or menu_input == "Q" or menu_input == "e" or menu_input == "E":
print("\nExiting...\n")

View File

@ -10,33 +10,6 @@ import room_commands
import report_commands
import hardcoded_variables
rdlist_tag_descriptions = {
"csam": "Child Sexual Abuse Material",
"cfm": "An abundance of content which would directly appeal to those seeking csam.",
"jailbait": "Photos which contain underage individuals in questionable or suggestive situations.",
"tfm": "An abundance of content which would directly appeal to those seeking jailbait.",
"beastiality": "Self explanatory.",
"3d_loli": "Pornography which depicts photorealistic underage characters.",
"stylized_3d_loli": "Pornography which depicts underage characters that are not depicted in a realistic style.",
"gore": "Self explanatory.",
"snuff": "Self explanatory.",
"degen_misc": "Other types of coomers rooms.",
"degen_larp": "Coomer larp rooms.",
"degen_meet": "Coomer socializing rooms.",
"degen_porn": "Rooms dedicated to pornography, excluding types which have dedicated tags.",
"bot_porn": "Rooms which contain bots that spam pornographic content.",
"bot_spam": "Rooms which contain bots that spam content. Primarily for malvertising and cryptospam",
"preban": "Rooms which may not contain tagged content, however have clear intent. i.e: Rooms with names like 'CP Room', 'Child Porn', etc",
"hub_room_trade": "Rooms which exist solely to trade illegal or questionable content. i.e: csam, jailbait",
"hub_room_sussy": "A room which is sussy. This tag does not have a solid definition, see existing tagged rooms",
"abandoned": "Similar to 'anarchy', primarily for rooms which have automated spam bots.",
"anarchy": "Unmoderated rooms.",
"hub_room_underage": "Rooms which contain a disproportionate amount of underage users.",
"hub_room_links": "Rooms which exist to share links to other rooms.",
"toddlercon": "Lolicon but younger.",
"loli": "Rooms which exist to host lolicon.",
}
def sync_rdlist():
rdlist_dir = "./rdlist"
os.makedirs(rdlist_dir, exist_ok=True)
@ -290,20 +263,13 @@ def block_recommended_rdlist_tags():
# Check if user account already exists
account_query = user_commands.query_account(hardcoded_variables.rdlist_bot_username)
# Generate random password
rdlist_bot_password = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(20))
# If user is not found, create it
if 'User not found' in account_query:
# Create user account
user_commands.create_account(hardcoded_variables.rdlist_bot_username, rdlist_bot_password)
user_commands.create_account(hardcoded_variables.rdlist_bot_username, hardcoded_variables.rdlist_bot_password)
else:
print(f"\n@{hardcoded_variables.rdlist_bot_username}:{hardcoded_variables.base_url} account already exists. Resetting account password.")
user_commands.reset_password(hardcoded_variables.rdlist_bot_username, rdlist_bot_password)
# Promote bot user to server admin
print(f"\nEnsuring @{hardcoded_variables.rdlist_bot_username}:{hardcoded_variables.base_url} account is a server admin.")
user_commands.set_user_server_admin(hardcoded_variables.rdlist_bot_username)
user_commands.reset_password(hardcoded_variables.rdlist_bot_username, hardcoded_variables.rdlist_bot_password)
# Define default valies for shutdown_room()
preset_new_room_name = 'POLICY VIOLATION'
@ -315,7 +281,7 @@ def block_recommended_rdlist_tags():
# Print user login details
print("\n\nRoom shutdowns completed!\n\nUser login details for your moderator account:\n")
print("Username: " + hardcoded_variables.rdlist_bot_username)
print("Password: " + rdlist_bot_password)
print("Password: " + hardcoded_variables.rdlist_bot_password)
# Print statistics for the admin
print(f"\nPrint rdlist statistics:")

View File

@ -9,6 +9,7 @@ import zipfile
import pyAesCrypt
import smtplib
import requests
import asyncio
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
@ -17,8 +18,50 @@ from email import encoders
import user_commands
import room_commands
import ipinfo_commands
import bot_commands
import hardcoded_variables
rdlist_tag_descriptions = {
"csam": "Child Sexual Abuse Material",
"cfm": "An abundance of content which would directly appeal to those seeking csam.",
"jailbait": "Photos which contain underage individuals in questionable or suggestive situations.",
"tfm": "An abundance of content which would directly appeal to those seeking jailbait.",
"beastiality": "Self explanatory.",
"3d_loli": "Pornography which depicts photorealistic underage characters.",
"stylized_3d_loli": "Pornography which depicts underage characters that are not depicted in a realistic style.",
"gore": "Self explanatory.",
"snuff": "Self explanatory.",
"degen_misc": "Other types of coomers rooms.",
"degen_larp": "Coomer larp rooms.",
"degen_meet": "Coomer socializing rooms.",
"degen_porn": "Rooms dedicated to pornography, excluding types which have dedicated tags.",
"bot_porn": "Rooms which contain bots that spam pornographic content.",
"bot_spam": "Rooms which contain bots that spam content. Primarily for malvertising and cryptospam",
"preban": "Rooms which may not contain tagged content, however have clear intent. i.e: Rooms with names like 'CP Room', 'Child Porn', etc",
"hub_room_trade": "Rooms which exist solely to trade illegal or questionable content. i.e: csam, jailbait",
"hub_room_sussy": "A room which is sussy. This tag does not have a solid definition, see existing tagged rooms",
"abandoned": "Similar to 'anarchy', primarily for rooms which have automated spam bots.",
"anarchy": "Unmoderated rooms.",
"hub_room_underage": "Rooms which contain a disproportionate amount of underage users.",
"hub_room_links": "Rooms which exist to share links to other rooms.",
"toddlercon": "Lolicon but younger.",
"loli": "Rooms which exist to host lolicon.",
}
confidentiality_warning = f"""\n\n**********************************************************************
\t\tATTENTION! CONFIDENTIALITY NOTICE!
\nThis electronic mail and any files linked to it may hold information
that is privileged, confidential, and intended exclusively for the use of
the designated recipient or entity. If you're not the expected recipient or
the individual tasked with delivering the electronic mail to the intended recipient,
be aware that you've received this mail in error. Any utilization, duplication,
distribution, forwarding, printing, or publicizing of this email or the attached files
is strictly prohibited, as is revealing the information contained within.
If you've received this email in error, please promptly inform the sender and
remove it from your electronic mailbox.
\n**********************************************************************
"""
def get_report_folder():
# Get report_folder from hardcoded_variables
report_folder = hardcoded_variables.report_folder
@ -202,8 +245,10 @@ def decrypt_zip_file():
# Print the location of the decrypted ZIP file
print("\nDecrypted .zip file location: " + encrypted_zip_file_name[:-4] + "\n")
def lookup_homeserver_admin_email(preset_baseurl):
if preset_baseurl == '':
def lookup_homeserver_admin(preset_baseurl):
if hardcoded_variables.testing_mode == True:
baseurl = hardcoded_variables.base_url
elif preset_baseurl == '':
baseurl = input("\nEnter the base URL to collect the admin contact details (Example: matrix.org): ")
elif preset_baseurl != '':
baseurl = preset_baseurl
@ -211,7 +256,7 @@ def lookup_homeserver_admin_email(preset_baseurl):
# If baseurl is matrix.org, return 'abuse@matrix.org' as a hardcoded response
if baseurl == "matrix.org":
print("\nAdmin contact email(s) for " + baseurl + " are: abuse@matrix.org")
return {"matrix.org": ["abuse@matrix.org"]}, False
return {"admins": {"email_address": "abuse@matrix.org"}}, False
# Check target homserver for MSC1929 support email
url = f"https://{baseurl}/.well-known/matrix/support"
@ -222,28 +267,22 @@ def lookup_homeserver_admin_email(preset_baseurl):
response = None
# If the request was successful, the status code will be 200
if response.status_code == 200 and "email_address" in response.text:
if response.status_code == 200 and ( "email_address" in response.text or "matrix_id" in response.text ):
# Parse the response as JSON
data = json.loads(response.text)
# Extract the emails from the admins field and remove duplicates
admin_emails = list({admin['email_address'] for admin in data['admins']})
#print("\nAdmin contact details for " + baseurl + " are: " + str(data))
print("\nAdmin contact emails for " + baseurl + " are: " + str(admin_emails))
# Create a dictionary with baseurl as key and emails as value
email_dict = {baseurl: admin_emails}
return email_dict, False
return data, False
else:
print(f"Error: Unable to collect admin email from server {baseurl}")
print(f"Error: Unable to collect admin contact details from server {baseurl}")
print("Attempting to collect admin email from WHOIS data...")
# Get WHOIS data
try:
w = whois.whois(baseurl)
if w.emails:
print("\nAdmin contact email(s) for " + baseurl + " are: " + str(w.emails))
#print("\nAdmin contact email(s) for " + baseurl + " are: " + str(w.emails))
return {baseurl: list(w.emails)}, True
else:
print(f"Error: Unable to collect admin email from WHOIS data for {baseurl}")
@ -285,11 +324,11 @@ def send_email(email_address, email_subject, email_content, email_attachments):
def test_send_email():
# Ask the user for the destination email address
email_address = input("\nPlease enter the destination email address to send this test email too: ")
email_address = hardcoded_variables.incident_report_return_email
# Example email parameters
email_subject = "Test Email"
email_content = "This is a test email."
email_subject = "Incident Report"
email_content = "Hello! This is a test email. Please ignore it."
email_attachments = ["./test_data/evil_clown.jpeg"] # List of file paths. Adjust this to the actual files you want to attach.
# Try to send the email
@ -307,14 +346,17 @@ We regret to inform you that there have been incidents involving the following u
for full_username, room_dict in user_dict.items():
email_content += f"\nUser: {full_username}\n"
for room_id, rdlist_tags in room_dict.items():
email_content += f"Is in the room {room_id}, this room has been flagged with the following rdlist tags:\n{', '.join(rdlist_tags)}\n"
email_content += f"\n Is in the room: {room_id}\n This room has been flagged with the following rdlist tags:\n"
for tag in rdlist_tags:
tag_description = rdlist_tag_descriptions.get(tag, "No description available.")
email_content += f" - {tag} ({tag_description})\n"
email_content += f"""
We request your immediate attention to this matter. It is recommended that you:
- Generate a report on these users' accounts and send it to law enforcement.
- Block and purge these rooms from your homeserver.
- Deactivate these users' accounts.
- Deactivate these users' accounts, or retain them for further observation.
All of these actions can be done automatically using this moderation tool:
https://github.com/PC-Admin/matrix-moderation-tool
@ -323,15 +365,14 @@ https://github.com/PC-Admin/matrix-moderation-tool
\tTHIS EMAIL IS UNMONITORED, PLEASE DO NOT REPLY TO IT
**********************************************************************
To contact us please email {hardcoded_variables.report_return_email}.
To contact us please email {hardcoded_variables.incident_report_return_email}.
Thank you for helping us make Matrix safer.
Best regards,
Incident Report Team
Abuse Management Team
https://{hardcoded_variables.base_url}
"""
if from_whois:
email_content += f"""\n\n**********************************************************************
\tATTENTION DOMAIN REGISTRAR, YOUR ACTION IS REQUIRED HERE
@ -342,25 +383,49 @@ https://{hardcoded_variables.base_url}
https://github.com/matrix-org/matrix-spec-proposals/pull/1929
"""
confidentiality_warning = f"""\n\n**********************************************************************
\t\tATTENTION! CONFIDENTIALITY NOTICE!
\nThis electronic mail and any files linked to it may hold information
that is privileged, confidential, and intended exclusively for the use of
the designated recipient or entity. If you're not the expected recipient or
the individual tasked with delivering the electronic mail to the intended recipient,
be aware that you've received this mail in error. Any utilization, duplication,
distribution, forwarding, printing, or publicizing of this email or the attached files
is strictly prohibited, as is revealing the information contained within.
If you've received this email in error, please promptly inform the sender and
remove it from your electronic mailbox.
\n**********************************************************************
"""
email_content += confidentiality_warning
return email_content
def prepare_message_content(user_dict, baseurl):
message_content = f"""Dear Administrator,
def send_incident_report(incidents_dict):
We regret to inform you that there have been incidents involving the following users in your homeserver:
"""
for full_username, room_dict in user_dict.items():
message_content += f"\nUser: {full_username}\n"
for room_id, rdlist_tags in room_dict.items():
message_content += f"\n Is in the room: {room_id}\n This room has been flagged with the following rdlist tags:\n"
for tag in rdlist_tags:
tag_description = rdlist_tag_descriptions.get(tag, "No description available.")
message_content += f" - {tag} ({tag_description})\n"
message_content += f"""
We request your immediate attention to this matter. It is recommended that you:
- Generate a report on these users' accounts and send it to law enforcement.
- Block and purge these rooms from your homeserver.
- Deactivate these users' accounts, or retain them for further observation.
All of these actions can be done automatically using this moderation tool:
https://github.com/PC-Admin/matrix-moderation-tool
**********************************************************************
\tTHIS ACCOUNT IS UNMONITORED, PLEASE DO NOT REPLY TO IT
**********************************************************************
To contact us please message {hardcoded_variables.incident_report_return_mxid}.
Thank you for helping us make Matrix safer.
Best regards,
Abuse Management Team
https://{hardcoded_variables.base_url}
"""
return message_content
async def send_incident_report(incidents_dict):
success = True
homeserver_dict = {}
@ -372,27 +437,38 @@ def send_incident_report(incidents_dict):
homeserver_dict[baseurl] = {}
homeserver_dict[baseurl][full_username] = room_dict
print("homeserver_dict: " + str(homeserver_dict))
# Prepare and send one email per homeserver, including all users and rooms.
# Prepare and send one incident report per homeserver, including all users and rooms.
for baseurl, user_dict in homeserver_dict.items():
if hardcoded_variables.testing_mode == True:
admin_email_dict = {baseurl: [hardcoded_variables.report_return_email]}
print("admin_email_dict: " + str(admin_email_dict))
from_whois = True
elif hardcoded_variables.testing_mode == False:
admin_email_dict, from_whois = lookup_homeserver_admin_email(baseurl)
if not admin_email_dict or baseurl not in admin_email_dict:
admin_contact_dict, from_whois = lookup_homeserver_admin(baseurl)
if not admin_contact_dict or "admins" not in admin_contact_dict:
print(f"Unable to find any admin emails for {baseurl}")
success = False
continue
# Prepare and send one email per homeserver, including all users and rooms.
for email_address in admin_email_dict[baseurl]:
# Prepare and send one message or email per homeserver, including all users and rooms.
for admin in admin_contact_dict["admins"]:
#print(f"DEBUG: {type(admin)}")
#print(f"DEBUG: {admin}") # this will print the content of each admin dict
if "matrix_id" in admin: # If matrix_id exists
message_content = prepare_message_content(user_dict, baseurl)
try:
print(f"Sending Incident Report message to {admin['matrix_id']}")
await bot_commands.send_message(admin["matrix_id"], message_content)
except Exception as e:
print(f"Failed to send message to {admin['matrix_id']}: {str(e)}")
success = False
# If email_address exists, or if message send failed, send Incident report via email
elif "email_address" in admin or success == False:
email_address = admin.get("email_address")
if email_address: # If email_address exists
email_subject = f"Incident Report for users from {baseurl}"
email_content = prepare_email_content(user_dict, from_whois, baseurl)
email_attachments = []
print(f"Sending Incident Report email to {email_address}")
if not send_email(email_address, email_subject, email_content, email_attachments):
print(f"Failed to send email to {email_address}")
success = False
@ -402,31 +478,31 @@ def send_incident_report(incidents_dict):
def test_send_incident_reports():
incidents_dict = {
f"@billybob:matrix.org": {
"!dummyid1:matrix.org": ["csam", "lolicon", "beastiality"],
"!dummyid1:matrix.org": ["csam", "loli", "beastiality"],
"!dummyid2:matrix.org": ["csam", "anarchy"]
},
f"@johndoe:matrix.org": {
"!dummyid3:matrix.org": ["csam", "lolicon", "toddlercon"],
"!dummyid4:matrix.org": ["csam", "terrorism"]
"!dummyid3:matrix.org": ["csam", "loli", "toddlercon"],
"!dummyid4:matrix.org": ["anarchy", "terrorism"]
},
f"@pedobear:perthchat.org": {
"!dummyid5:matrix.org": ["csam", "lolicon", "jailbait"],
"!dummyid5:matrix.org": ["csam", "loli", "jailbait"],
"!dummyid6:matrix.org": ["csam", "hub_links"]
},
f"@randomcreep:perthchat.org": {
"!dummyid7:matrix.org": ["csam", "jailbait"],
"!dummyid8:matrix.org": ["csam", "pre_ban"]
"!dummyid8:matrix.org": ["csam", "preban"]
},
f"@fatweeb:grin.hu": {
"!dummyid9:matrix.org": ["csam", "lolicon"],
"!dummyid9:matrix.org": ["csam", "loli"],
"!dummyid10:matrix.org": ["csam", "degen"]
}
}
try:
if hardcoded_variables.testing_mode == True:
print("\nWARNING: TESTING MODE ENABLED, SENDING EMAIL TO: " + hardcoded_variables.report_return_email + "\n")
if send_incident_report(incidents_dict):
print("\nNOTE: Testing mode is enabled, sending Incident Reports to you! :)\n")
if asyncio.run(send_incident_report(incidents_dict)):
print("\nIncident reports successfully sent.")
else:
print("\nFailed to send the incident reports.")