matrix-moderation-tool/rdlist_commands.py

335 lines
15 KiB
Python

import os
import subprocess
import json
import random
import string
import time
import user_commands
import room_commands
import report_commands
import hardcoded_variables
rdlist_tag_descriptions = {
"csam": "Child Sexual Abuse Material",
"cfm": "An abundance of content which would directly appeal to those seeking csam.",
"jailbait": "Photos which contain underage individuals in questionable or suggestive situations.",
"tfm": "An abundance of content which would directly appeal to those seeking jailbait.",
"beastiality": "Self explanatory.",
"3d_loli": "Pornography which depicts photorealistic underage characters.",
"stylized_3d_loli": "Pornography which depicts underage characters that are not depicted in a realistic style.",
"gore": "Self explanatory.",
"snuff": "Self explanatory.",
"degen_misc": "Other types of coomers rooms.",
"degen_larp": "Coomer larp rooms.",
"degen_meet": "Coomer socializing rooms.",
"degen_porn": "Rooms dedicated to pornography, excluding types which have dedicated tags.",
"bot_porn": "Rooms which contain bots that spam pornographic content.",
"bot_spam": "Rooms which contain bots that spam content. Primarily for malvertising and cryptospam",
"preban": "Rooms which may not contain tagged content, however have clear intent. i.e: Rooms with names like 'CP Room', 'Child Porn', etc",
"hub_room_trade": "Rooms which exist solely to trade illegal or questionable content. i.e: csam, jailbait",
"hub_room_sussy": "A room which is sussy. This tag does not have a solid definition, see existing tagged rooms",
"abandoned": "Similar to 'anarchy', primarily for rooms which have automated spam bots.",
"anarchy": "Unmoderated rooms.",
"hub_room_underage": "Rooms which contain a disproportionate amount of underage users.",
"hub_room_links": "Rooms which exist to share links to other rooms.",
"toddlercon": "Lolicon but younger.",
"loli": "Rooms which exist to host lolicon.",
}
def sync_rdlist():
rdlist_dir = "./rdlist"
os.makedirs(rdlist_dir, exist_ok=True)
# Check if the rdlist repo has already been cloned
if os.path.isdir("./rdlist/.git"):
print("\nrdlist repo already cloned...")
os.chdir("./rdlist/")
# Update git remote references and get status
subprocess.run(["git", "remote", "update"], check=True)
status = subprocess.run(["git", "status", "-uno"], stdout=subprocess.PIPE, check=True)
os.chdir("..")
# If "Your branch is up to date" is not in the status, then there are changes to pull
if "Your branch is up to date" not in status.stdout.decode():
print("Pulling latest changes from rdlist repo...")
os.chdir("./rdlist/")
subprocess.run(["git", "pull"], check=True)
os.chdir("..")
else:
print("rdlist repo is up-to-date, no need to pull changes.")
else:
print("Cloning rdlist repo...")
subprocess.run(["git", "clone", "ssh://gitea@code.glowers.club:1488/loj/rdlist.git"], check=True)
# def build_incident_report(users_list):
# # Git clone the rdlist repo to ./rdlist/
# sync_rdlist()
# # Load the summaries JSON file
# summaries_path = os.path.join("rdlist", "dist", "summaries.json")
# with open(summaries_path, 'r') as file:
# data = json.load(file)
# return incidents_dict
# # Example of the data structure we're trying to build/transform:
# # users_list = ["@billybob:matrix.org", "@johndoe:matrix.org", "@pedobear:perthchat.org", "@randomcreep:perthchat.org", "@fatweeb:grin.hu"]
# #
# # becomes:
# #
# # incidents_dict = {
# # f"@billybob:matrix.org": {
# # "!dummyid1:matrix.org": ["csam", "lolicon", "beastiality"],
# # "!dummyid2:matrix.org": ["csam", "anarchy"]
# # },
# # f"@johndoe:matrix.org": {
# # "!dummyid3:matrix.org": ["csam", "lolicon", "toddlercon"],
# # "!dummyid4:matrix.org": ["csam", "terrorism"]
# # },
# # f"@pedobear:perthchat.org": {
# # "!dummyid5:matrix.org": ["csam", "lolicon", "jailbait"],
# # "!dummyid6:matrix.org": ["csam", "hub_links"]
# # },
# # f"@randomcreep:perthchat.org": {
# # "!dummyid7:matrix.org": ["csam", "jailbait"],
# # "!dummyid8:matrix.org": ["csam", "pre_ban"]
# # },
# # f"@fatweeb:grin.hu": {
# # "!dummyid9:matrix.org": ["csam", "lolicon"],
# # "!dummyid10:matrix.org": ["csam", "degen"]
# # }
# # }
# A function to return the rdlist tags associated with a room
def get_rdlist_tags(preset_internal_ID):
if preset_internal_ID == '':
internal_ID = input("\nEnter the internal id of the room you wish to query (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ")
elif preset_internal_ID != '':
internal_ID = preset_internal_ID
# Git clone the rdlist repo to ./rdlist/
sync_rdlist()
# Load the summaries JSON file
summaries_path = os.path.join("rdlist", "dist", "summaries.json")
with open(summaries_path, 'r') as file:
data = json.load(file)
# Find the room with the given id and return its tags
for item in data:
if 'room' in item and 'room_id' in item['room'] and item['room']['room_id'] == internal_ID:
if 'report_info' in item and 'tags' in item['report_info']:
return item['report_info']['tags']
return None
def block_all_rooms_with_rdlist_tags(rdlist_use_recommended,preset_user_ID,preset_new_room_name,preset_message):
# Git clone the rdlist repo to ./rdlist/
sync_rdlist()
if rdlist_use_recommended == True:
# Use the hardcoded recommended tags
blocked_tags = hardcoded_variables.rdlist_recommended_tags
print(f"\nUsing recommended rdlist tags. Rooms matching the following tags will be purged and/or blocked:\n{hardcoded_variables.rdlist_recommended_tags}")
elif rdlist_use_recommended == False:
# After the git repo has been cloned/pulled, open the file and read it into a string
with open(os.path.join("rdlist", "lib", "docs", "tags.md"), 'r') as file:
data = file.readlines()
# Print ./rdlist/lib/docs/tags.md README file for the user
print("\nPrinting details about the current tags in rdlist:\n")
for line in data:
print(line, end='') # Print the contents of the file
# Take input from the user and convert it to a list
print("\nPlease enter a space seperated list of tags you wish to block:\n")
blocked_tags = input().split()
print('')
# Load the summaries JSON file
summaries_path = os.path.join("rdlist", "dist", "summaries.json")
with open(summaries_path, 'r') as file:
data = json.load(file)
# Create an empty list to store all the room_ids
all_room_ids = []
# Iterate over blocked_tags
for tag in blocked_tags:
# Filter the data to keep only the entries where the tag appears in the "tags" list
filtered_data = [item for item in data if 'report_info' in item and 'tags' in item['report_info'] and tag in item['report_info']['tags']]
# Extract the room_ids
room_ids = [item['room']['room_id'] for item in filtered_data if 'room' in item and 'room_id' in item['room']]
# Add the room_ids to the list of all room_ids
all_room_ids.extend(room_ids)
# If choosing specific tags, print the tag and corresponding room_ids
if rdlist_use_recommended == False:
# Print the tag and corresponding room_ids
print(f"Tag: {tag}\nRoom IDs: {room_ids}\n")
# Deduplicate the list of all room_ids
all_room_ids = list(set(all_room_ids))
# Examine these room_ids for local and remote users
all_local_users = []
all_remote_users = []
for room_id in all_room_ids:
joined_local_members = room_commands.get_room_members(room_id, True)
all_local_users.extend(joined_local_members)
joined_remote_members = room_commands.get_room_members(room_id, False)
all_remote_users.extend(joined_remote_members)
# Deduplicate the list of all local users
all_local_users = list(set(all_local_users))
#print("all_local_users: " + str(all_local_users))
# If there's at least 1 local user detected, ask the admin if they want to generate a user report for every user found in rdlist rooms
if len(all_local_users) > 0:
print(f"\nWARNING! The following local users are current members of rooms tagged in rdlist: {all_local_users}")
generate_user_report_confirmation = input("\nDo you want to generate a user report file for each of these users? y/n? ")
if generate_user_report_confirmation.lower() in ['y', 'yes', 'Y', 'Yes']:
for user_id in all_local_users:
report_commands.generate_user_report(user_id)
elif generate_user_report_confirmation.lower() in ['n', 'no', 'N', 'No']:
print("\nSkipping user report generation...\n")
elif len(all_local_users) == 0:
print(f"\nNo local users were found in rdlist rooms.")
# Deduplicate the list of all remote users
all_remote_users = list(set(all_remote_users))
all_remote_users = [user for user in all_remote_users if user not in all_local_users]
#print("all_remote_users: " + str(all_remote_users))
# Ask the admin if they would like to mail off an incident report for every remote user found in rdlist rooms
# if len(all_remote_users) > 0:
# print(f"\nThe following remote users are current members of rooms tagged in rdlist: {all_remote_users}")
# send_incident_report_confirmation = input("\nDo you want to send an incident report to the abuse email address for each of these users? y/n? ")
# if send_incident_report_confirmation.lower() in ['y', 'yes', 'Y', 'Yes']:
# build_incident_report(all_remote_users)
# #for user_id in all_remote_users:
# # report_commands.send_incident_report(user_id)
# elif send_incident_report_confirmation.lower() in ['n', 'no', 'N', 'No']:
# print("\nSkipping incident report generation...\n")
# Ask the user if they wish to block and purge all these rooms, then collect shutdown parameters
if preset_user_ID == '':
user_ID = input("\nPlease enter the local username that will create a 'muted violation room' for your users (Example: michael): ")
elif preset_user_ID != '':
user_ID = preset_user_ID
if preset_new_room_name == '':
new_room_name = input("\nPlease enter the room name of the muted violation room your users will be sent to: ")
elif preset_new_room_name != '':
new_room_name = preset_new_room_name
if preset_message == '':
message = input("\nPlease enter the shutdown message that will be displayed to users: ")
elif preset_message != '':
message = preset_message
#print(f"all_room_ids: {all_room_ids}")
# Ask the user if they wish to block and purge all these rooms
shutdown_confirmation = input("\nNumber of rdlist rooms being shutdown: " + str(len(all_room_ids)) + "\n\nAre you sure you want to block/shutdown these rooms? y/n? ")
total_list_kicked_users = []
num_rooms_blocked = 0
num_rooms_purged = 0
if shutdown_confirmation.lower() in ['y', 'yes', 'Y', 'Yes']:
# Ask the user if they wish to shadow ban all local users in these rooms
shadow_ban_confirmation = input("\nDo you want to also shadow ban all your local users in these rooms before performing these shutdowns? (This is recommended as it prevents them from alerting others about these mass shutdown.) y/n? ")
# Perform shadow bans if admin confirms
if shadow_ban_confirmation in ['y', 'yes', 'Y', 'Yes']:
for user in all_local_users:
print(f"\nShadow banning user: {user}")
user_commands.shadow_ban_account(user)
for room_id in all_room_ids:
blocked_status = room_commands.get_block_status(room_id)
#print(f"\nroom_details_dict: {room_details_dict}")
#print(f"\nblock_status: {blocked_status}")
# If room is already blocked, skip it
if blocked_status == False:
# Examine if unblocked room is known, if not block it
room_details_dict = room_commands.get_room_details(room_id)
if "Room not found" in room_details_dict.get('error', ''):
print(f"\n\nBlocking unknown room: {room_id}")
room_commands.set_block_status(room_id, True)
num_rooms_blocked += 1
# If unblocked room is known, perform a shutdown of the room
else:
print(f"\n\nShutting down known room: {room_id}")
list_kicked_users = room_commands.shutdown_room(room_id, user_ID, new_room_name, message, True, True)
num_rooms_purged += 1
total_list_kicked_users.extend(list_kicked_users)
if hardcoded_variables.testing_mode == True:
time.sleep(5)
elif blocked_status == True:
print(f"\n\nSkipping already blocked room: {room_id}")
if hardcoded_variables.testing_mode == True:
time.sleep(5)
elif shutdown_confirmation.lower() in ['n', 'no', 'N', 'No']:
print("\nSkipping blocking/shutdown of rooms...\n")
return
else:
print("\nInvalid input, skipping these files...\n")
return
# Deduplicate the list of all kicked users
total_list_kicked_users = list(set(total_list_kicked_users))
# Return the list of all kicked users
return num_rooms_blocked, num_rooms_purged, total_list_kicked_users
def block_recommended_rdlist_tags():
# Print warning if testing mode is enabled
if hardcoded_variables.testing_mode == True:
print("\nWARNING! Testing mode is enabled, this will reduce the amount of data generated in reports and greatly slow down rdlist blocking!\n")
# Check if user account already exists
account_query = user_commands.query_account(hardcoded_variables.rdlist_bot_username)
# Generate random password
rdlist_bot_password = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(20))
# If user is not found, create it
if 'User not found' in account_query:
# Create user account
user_commands.create_account(hardcoded_variables.rdlist_bot_username, rdlist_bot_password)
else:
print(f"\n@{hardcoded_variables.rdlist_bot_username}:{hardcoded_variables.base_url} account already exists. Resetting account password.")
user_commands.reset_password(hardcoded_variables.rdlist_bot_username, rdlist_bot_password)
# Promote bot user to server admin
print(f"\nEnsuring @{hardcoded_variables.rdlist_bot_username}:{hardcoded_variables.base_url} account is a server admin.")
user_commands.set_user_server_admin(hardcoded_variables.rdlist_bot_username)
# Define default valies for shutdown_room()
preset_new_room_name = 'POLICY VIOLATION'
preset_message = 'THIS ROOM VIOLATES SERVER POLICIES'
# Block all rooms with recommended tag set
num_rooms_blocked, num_rooms_purged, total_list_kicked_users = block_all_rooms_with_rdlist_tags(True, hardcoded_variables.rdlist_bot_username, preset_new_room_name, preset_message)
# Print user login details
print("\n\nRoom shutdowns completed!\n\nUser login details for your moderator account:\n")
print("Username: " + hardcoded_variables.rdlist_bot_username)
print("Password: " + rdlist_bot_password)
# Print statistics for the admin
print(f"\nPrint rdlist statistics:")
print(f"\nNumber of rooms blocked: {num_rooms_blocked}")
print(f"Number of rooms purged: {num_rooms_purged}")
print(f"Number of local users located in rdlist rooms and kicked: {len(total_list_kicked_users)}")
print(f"\nThe following users were current members of rooms tagged in rdlist: {total_list_kicked_users}")
# Ask admin if they want to deactivate all the accounts that were kicked from rdlist rooms
deactivate_confirmation = input("\nDo you want to also deactivate all these accounts that were kicked from rdlist rooms? y/n? ")
if deactivate_confirmation.lower() in ['y', 'yes', 'Y', 'Yes']:
for user_id in total_list_kicked_users:
user_commands.deactivate_account(user_id)
print(f"\nThese accounts have been deactivated.")
elif deactivate_confirmation.lower() in ['n', 'no', 'N', 'No']:
print("\nSkipping account deactivations...\n")