diff --git a/hardcoded_variables.py b/hardcoded_variables.py index e1e275f..c880939 100644 --- a/hardcoded_variables.py +++ b/hardcoded_variables.py @@ -9,10 +9,10 @@ ipinfo_token = "" # Leave blank to disable ipinfo.io lookups # rdlist specific rdlist_bot_username = "mod_team" # The username to perform automated room shutdowns rdlist_recommended_tags = ['hub_room_links', 'hub_room_trade', 'preban', 'degen_misc', 'beastiality', 'degen_porn', 'gore', 'snuff', 'degen_larp', 'hub_room_sussy', 'bot_spam', 'cfm', 'jailbait', 'bot_porn', 'toddlercon', 'loli', 'csam', 'tfm', 'degen_meet', 'stylized_3d_loli', '3d_loli'] -# report generator +# User report generator report_folder = "./reports" # Reports folder name testing_mode = True # For testing this report generator, set this to True -# email settings +# Incident report email settings smtp_user = "abuse@matrix.example.org" smtp_password = "strong-stmp-password" smtp_server = "smtp.provider.org" diff --git a/moderation_tool.py b/moderation_tool.py index 1f59451..c221f4d 100755 --- a/moderation_tool.py +++ b/moderation_tool.py @@ -1,4 +1,5 @@ +import json import user_commands import room_commands import server_commands @@ -35,21 +36,21 @@ while pass_token == False: print("\n----------------------------------------------") print("\n#### User Account Commands ####\t\t\t#### Room Commands ####") print("1) Deactivate a user account.\t\t\t20) List details of a room.") - print("2) Deactivate multiple user accounts.\t\t21) Export the state events of a target room.") - print("3) Create a user account.\t\t\t22) List rooms in public directory.") - print("4) Create multiple user accounts.\t\t23) Remove a room from the public directory.") - print("5) Reset a users password.\t\t\t24) Remove multiple rooms from the public directory.") - print("6) Whois user account.\t\t\t\t25) Redact a room event.") - print("7) Whois multiple user accounts.\t\t26) List/Download all media in a room.") - print("8) Query user account.\t\t\t\t27) Download media from multiple rooms.") - print("9) Query multiple user accounts.\t\t28) Quarantine all media in a room.") - print("10) List room memberships of user.\t\t29) Shutdown a room.") - print("11) Promote a user to server admin.\t\t30) Shutdown multiple rooms.") - print("12) List all user accounts.\t\t\t31) Delete a room.") - print("13) Quarantine all media a users uploaded.\t32) Delete multiple rooms.") - print("14) Collect account data.\t\t\t33) Purge the event history of a room to a specific timestamp.") - print("15) List account pushers.\t\t\t34) Purge the event history of multiple rooms to a specific timestamp.") - print("16) Get rate limit of a user account.") + print("2) Deactivate multiple user accounts.\t\t21) List the members of a room.") + print("3) Create a user account.\t\t\t22) Export the state events of a target room.") + print("4) Create multiple user accounts.\t\t23) List rooms in public directory.") + print("5) Reset a users password.\t\t\t24) Remove a room from the public directory.") + print("6) Whois user account.\t\t\t\t25) Remove multiple rooms from the public directory.") + print("7) Whois multiple user accounts.\t\t26) Redact a room event.") + print("8) Query user account.\t\t\t\t27) List/Download all media in a room.") + print("9) Query multiple user accounts.\t\t28) Download media from multiple rooms.") + print("10) List room memberships of user.\t\t29) Quarantine all media in a room.") + print("11) Promote a user to server admin.\t\t30) Shutdown a room.") + print("12) List all user accounts.\t\t\t31) Shutdown multiple rooms.") + print("13) Quarantine all media a users uploaded.\t32) Delete a room.") + print("14) Collect account data.\t\t\t33) Delete multiple rooms.") + print("15) List account pushers.\t\t\t34) Purge the event history of a room to a specific timestamp.") + print("16) Get rate limit of a user account.\t\t35) Purge the event history of multiple rooms to a specific timestamp.") print("17) Set rate limit of a user account.") print("18) Delete rate limit of a user account.") print("19) Check if user account exists.") @@ -58,7 +59,7 @@ while pass_token == False: print("41) Purge remote media repository up to a certain date.\t\t71) Decrypt user report .zip file.") print("42) Prepare database for copying events of multiple rooms.\t72) Lookup homeserver admin contact email.") print("\t\t\t\t\t\t\t\t73) Send a test email.") - print("#### rdlist ####\t\t\t\t\t\t74) Send a test incident report to yourself.") + print("#### rdlist ####\t\t\t\t\t\t74) Send test incident reports to yourself.") print("50) Block all rooms with specific rdlist tags.") print("51) Block all rooms with recommended rdlist tags.") print("\n#### ipinfo.io ####") @@ -77,23 +78,28 @@ while pass_token == False: elif menu_input == "5": user_commands.reset_password('','') elif menu_input == "6": - user_commands.whois_account('') + whois_account_dict = user_commands.whois_account('') + print(json.dumps(whois_account_dict, indent=4, sort_keys=True)) elif menu_input == "7": user_commands.whois_multiple_accounts() elif menu_input == "8": - user_commands.query_account() + query_account_dict = user_commands.query_account() + print(json.dumps(query_account_dict, indent=4, sort_keys=True)) elif menu_input == "9": user_commands.query_multiple_accounts() elif menu_input == "10": - user_commands.list_joined_rooms('') + joined_rooms_dict = user_commands.list_joined_rooms('') + print(json.dumps(joined_rooms_dict, indent=4, sort_keys=True)) elif menu_input == "11": - user_commands.set_user_server_admin('') + set_user_server_admin_dict = user_commands.set_user_server_admin('') + print(json.dumps(set_user_server_admin_dict, indent=4, sort_keys=True)) elif menu_input == "12": user_commands.list_accounts() elif menu_input == "13": user_commands.quarantine_users_media() elif menu_input == "14": - user_commands.collect_account_data('') + account_data_dict = user_commands.collect_account_data('') + print(json.dumps(account_data_dict, indent=4, sort_keys=True)) elif menu_input == "15": user_commands.list_account_pushers('') elif menu_input == "16": @@ -103,36 +109,44 @@ while pass_token == False: elif menu_input == "18": user_commands.delete_rate_limit() elif menu_input == "19": - user_commands.check_user_account_exists('') + user_account_exists = user_commands.check_user_account_exists('') + if user_account_exists == True: + print("\nUser account exists.\n") + elif user_account_exists == False: + print("\nUser account does not exist.\n") elif menu_input == "20": - room_commands.list_room_details('') + room_details_dict = room_commands.list_room_details('') + print(json.dumps(room_details_dict, indent=4, sort_keys=True)) elif menu_input == "21": - room_commands.export_room_state('') + room_members_dict = room_commands.get_room_members('',False) + print(json.dumps(room_members_dict, indent=4, sort_keys=True)) elif menu_input == "22": - room_commands.list_directory_rooms() + room_commands.export_room_state('','',True) elif menu_input == "23": - room_commands.remove_room_from_directory('') + room_commands.public_directory_rooms() elif menu_input == "24": - room_commands.remove_multiple_rooms_from_directory() + room_commands.remove_room_from_directory('') elif menu_input == "25": - room_commands.redact_room_event() + room_commands.remove_multiple_rooms_from_directory() elif menu_input == "26": - room_commands.list_and_download_media_in_room('','','','./') + room_commands.redact_room_event() elif menu_input == "27": - room_commands.download_media_from_multiple_rooms() + room_commands.list_and_download_media_in_room('','','','./') elif menu_input == "28": - room_commands.quarantine_media_in_room() + room_commands.download_media_from_multiple_rooms() elif menu_input == "29": - room_commands.shutdown_room('','','','','','') + room_commands.quarantine_media_in_room() elif menu_input == "30": - room_commands.shutdown_multiple_rooms() + room_commands.shutdown_room('','','','','','') elif menu_input == "31": - room_commands.delete_room('') + room_commands.shutdown_multiple_rooms() elif menu_input == "32": - room_commands.delete_multiple_rooms() + room_commands.delete_room('') elif menu_input == "33": - room_commands.purge_room_to_timestamp('','') + room_commands.delete_multiple_rooms() elif menu_input == "34": + room_commands.purge_room_to_timestamp('','') + elif menu_input == "35": room_commands.purge_multiple_rooms_to_timestamp() elif menu_input == "40": server_commands.delete_block_media() @@ -141,7 +155,7 @@ while pass_token == False: elif menu_input == "42": server_commands.prepare_database_copy_of_multiple_rooms() elif menu_input == "50": - rdlist_commands.block_all_rooms_with_rdlist_tags(False,'','','','','') + rdlist_commands.block_all_rooms_with_rdlist_tags(False,'','','') elif menu_input == "51": rdlist_commands.block_recommended_rdlist_tags() elif menu_input == "60": @@ -157,10 +171,10 @@ while pass_token == False: elif menu_input == "73": report_commands.test_send_email() elif menu_input == "74": - report_commands.test_send_incident_report() + report_commands.test_send_incident_reports() elif menu_input == "q" or menu_input == "Q" or menu_input == "e" or menu_input == "E": print("\nExiting...\n") pass_token = True else: - print("\nIncorrect input detected, please select a number from 1 to 41!\n") + print("\nIncorrect input detected, please select a number from 1 to 74!\n") diff --git a/rdlist_commands.py b/rdlist_commands.py index 6cb3135..d73ebb9 100644 --- a/rdlist_commands.py +++ b/rdlist_commands.py @@ -7,60 +7,108 @@ import string import time import user_commands import room_commands +import report_commands import hardcoded_variables #rdlist_bot_username = hardcoded_variables.rdlist_bot_username def sync_rdlist(): - rdlist_dir = "./rdlist" - os.makedirs(rdlist_dir, exist_ok=True) - # Check if the rdlist repo has already been cloned - if os.path.isdir("./rdlist/.git"): - print("rdlist repo already cloned...") - os.chdir("./rdlist/") - # Update git remote references and get status - subprocess.run(["git", "remote", "update"], check=True) - status = subprocess.run(["git", "status", "-uno"], stdout=subprocess.PIPE, check=True) - os.chdir("..") + rdlist_dir = "./rdlist" + os.makedirs(rdlist_dir, exist_ok=True) + # Check if the rdlist repo has already been cloned + if os.path.isdir("./rdlist/.git"): + print("\nrdlist repo already cloned...") + os.chdir("./rdlist/") + # Update git remote references and get status + subprocess.run(["git", "remote", "update"], check=True) + status = subprocess.run(["git", "status", "-uno"], stdout=subprocess.PIPE, check=True) + os.chdir("..") - # If "Your branch is up to date" is not in the status, then there are changes to pull - if "Your branch is up to date" not in status.stdout.decode(): - print("Pulling latest changes from rdlist repo...") - os.chdir("./rdlist/") - subprocess.run(["git", "pull"], check=True) - os.chdir("..") - else: - print("rdlist repo is up-to-date, no need to pull changes.") + # If "Your branch is up to date" is not in the status, then there are changes to pull + if "Your branch is up to date" not in status.stdout.decode(): + print("Pulling latest changes from rdlist repo...") + os.chdir("./rdlist/") + subprocess.run(["git", "pull"], check=True) + os.chdir("..") + else: + print("rdlist repo is up-to-date, no need to pull changes.") - else: - print("Cloning rdlist repo...") - subprocess.run(["git", "clone", "ssh://gitea@code.glowers.club:1488/loj/rdlist.git"], check=True) + else: + print("Cloning rdlist repo...") + subprocess.run(["git", "clone", "ssh://gitea@code.glowers.club:1488/loj/rdlist.git"], check=True) -def block_all_rooms_with_rdlist_tags(rdlist_use_recommended,preset_user_ID,preset_new_room_name,preset_message,preset_purge_choice,preset_block_choice): +# def build_incident_report(users_list): +# # Git clone the rdlist repo to ./rdlist/ +# sync_rdlist() + +# # Load the summaries JSON file +# summaries_path = os.path.join("rdlist", "dist", "summaries.json") +# with open(summaries_path, 'r') as file: +# data = json.load(file) + + +# return incidents_dict + +# # Example of the data structure we're trying to build/transform: +# # users_list = ["@billybob:matrix.org", "@johndoe:matrix.org", "@pedobear:perthchat.org", "@randomcreep:perthchat.org", "@fatweeb:grin.hu"] +# # +# # becomes: +# # +# # incidents_dict = { +# # f"@billybob:matrix.org": { +# # "!dummyid1:matrix.org": ["csam", "lolicon", "beastiality"], +# # "!dummyid2:matrix.org": ["csam", "anarchy"] +# # }, +# # f"@johndoe:matrix.org": { +# # "!dummyid3:matrix.org": ["csam", "lolicon", "toddlercon"], +# # "!dummyid4:matrix.org": ["csam", "terrorism"] +# # }, +# # f"@pedobear:perthchat.org": { +# # "!dummyid5:matrix.org": ["csam", "lolicon", "jailbait"], +# # "!dummyid6:matrix.org": ["csam", "hub_links"] +# # }, +# # f"@randomcreep:perthchat.org": { +# # "!dummyid7:matrix.org": ["csam", "jailbait"], +# # "!dummyid8:matrix.org": ["csam", "pre_ban"] +# # }, +# # f"@fatweeb:grin.hu": { +# # "!dummyid9:matrix.org": ["csam", "lolicon"], +# # "!dummyid10:matrix.org": ["csam", "degen"] +# # } +# # } + +def block_all_rooms_with_rdlist_tags(rdlist_use_recommended,preset_user_ID,preset_new_room_name,preset_message): # Git clone the rdlist repo to ./rdlist/ sync_rdlist() + if rdlist_use_recommended == True: - # Take input from the user and convert it to a list + # Use the hardcoded recommended tags blocked_tags = hardcoded_variables.rdlist_recommended_tags - print("\nUsing recommended rdlist tags. Rooms matching the following tags will be blocked and purged:\n\n" + str(hardcoded_variables.rdlist_recommended_tags)) + print(f"\nUsing recommended rdlist tags. Rooms matching the following tags will be purged and/or blocked:\n{hardcoded_variables.rdlist_recommended_tags}") + elif rdlist_use_recommended == False: # After the git repo has been cloned/pulled, open the file and read it into a string with open(os.path.join("rdlist", "lib", "docs", "tags.md"), 'r') as file: data = file.readlines() + # Print ./rdlist/lib/docs/tags.md README file for the user print("\nPrinting details about the current tags in rdlist:\n") for line in data: print(line, end='') # Print the contents of the file + # Take input from the user and convert it to a list print("\nPlease enter a space seperated list of tags you wish to block:\n") blocked_tags = input().split() print('') + # Load the summaries JSON file summaries_path = os.path.join("rdlist", "dist", "summaries.json") with open(summaries_path, 'r') as file: data = json.load(file) + # Create an empty list to store all the room_ids all_room_ids = [] + # Iterate over blocked_tags for tag in blocked_tags: # Filter the data to keep only the entries where the tag appears in the "tags" list @@ -73,8 +121,51 @@ def block_all_rooms_with_rdlist_tags(rdlist_use_recommended,preset_user_ID,prese if rdlist_use_recommended == False: # Print the tag and corresponding room_ids print(f"Tag: {tag}\nRoom IDs: {room_ids}\n") + # Deduplicate the list of all room_ids all_room_ids = list(set(all_room_ids)) + + # Examine these room_ids for local users + all_local_users = [] + all_remote_users = [] + for room_id in all_room_ids: + joined_local_members = room_commands.get_room_members(room_id, True) + all_local_users.extend(joined_local_members) + joined_remote_members = room_commands.get_room_members(room_id, False) + all_remote_users.extend(joined_remote_members) + + # Deduplicate the list of all local users + all_local_users = list(set(all_local_users)) + #print("all_local_users: " + str(all_local_users)) + + # If there's at least 1 local user detected, ask the admin if they want to generate a user report for every user found in rdlist rooms + if len(all_local_users) > 0: + print(f"\nWARNING! The following local users are current members of rooms tagged in rdlist: {all_local_users}") + generate_user_report_confirmation = input("\nDo you want to generate a user report file for each of these users? y/n? ") + if generate_user_report_confirmation.lower() in ['y', 'yes', 'Y', 'Yes']: + for user_id in all_local_users: + report_commands.generate_user_report(user_id) + elif generate_user_report_confirmation.lower() in ['n', 'no', 'N', 'No']: + print("\nSkipping user report generation...\n") + elif len(all_local_users) == 0: + print(f"\nNo local users were found in rdlist rooms.") + + # Deduplicate the list of all remote users + all_remote_users = list(set(all_remote_users)) + all_remote_users = [user for user in all_remote_users if user not in all_local_users] + #print("all_remote_users: " + str(all_remote_users)) + + # Ask the admin if they would like to mail off an incident report for every remote user found in rdlist rooms + # if len(all_remote_users) > 0: + # print(f"\nThe following remote users are current members of rooms tagged in rdlist: {all_remote_users}") + # send_incident_report_confirmation = input("\nDo you want to send an incident report to the abuse email address for each of these users? y/n? ") + # if send_incident_report_confirmation.lower() in ['y', 'yes', 'Y', 'Yes']: + # build_incident_report(all_remote_users) + # #for user_id in all_remote_users: + # # report_commands.send_incident_report(user_id) + # elif send_incident_report_confirmation.lower() in ['n', 'no', 'N', 'No']: + # print("\nSkipping incident report generation...\n") + # Ask the user if they wish to block and purge all these rooms, then collect shutdown parameters if preset_user_ID == '': user_ID = input("\nPlease enter the local username that will create a 'muted violation room' for your users (Example: michael): ") @@ -88,47 +179,84 @@ def block_all_rooms_with_rdlist_tags(rdlist_use_recommended,preset_user_ID,prese message = input("\nPlease enter the shutdown message that will be displayed to users: ") elif preset_message != '': message = preset_message - if preset_purge_choice == '': - purge_choice = input("\nDo you want to purge the room? (This deletes all the room history from your database.) y/n? ") - elif preset_purge_choice != '': - purge_choice = preset_purge_choice - if preset_block_choice == '': - block_choice = input("\nDo you want to block the room? (This prevents your server users re-entering the room.) y/n? ") - elif preset_block_choice != '': - block_choice = preset_block_choice + # Ask the user if they wish to block and purge all these rooms - shutdown_confirmation = input("\nNumber of rooms being shutdown: " + str(len(all_room_ids)) + "\n\nAre you sure you want to shutdown these rooms? y/n? ") + shutdown_confirmation = input("\nNumber of rdlist rooms being shutdown: " + str(len(all_room_ids)) + "\n\nAre you sure you want to shutdown these rooms? y/n? ") + + total_list_kicked_users = [] + num_rooms_blocked = 0 + + #print(f"all_room_ids: {all_room_ids}") if shutdown_confirmation.lower() in ['y', 'yes', 'Y', 'Yes']: for room_id in all_room_ids: - room_commands.shutdown_room(room_id, user_ID, new_room_name, message, purge_choice, block_choice) + print(f"\n\nShutting down room: {room_id}") + room_state_dict = room_commands.export_room_state(room_id, "", False) + #print(f"\nroom_state_dict: {room_state_dict}") + if "Room not found" in room_state_dict.get('error', ''): + list_kicked_users = room_commands.shutdown_room(room_id, user_ID, new_room_name, message, False, True) + else: + list_kicked_users = room_commands.shutdown_room(room_id, user_ID, new_room_name, message, True, True) + num_rooms_blocked += 1 + total_list_kicked_users.extend(list_kicked_users) time.sleep(5) elif shutdown_confirmation.lower() in ['n', 'no', 'N', 'No']: print("\nSkipping these files...\n") + return else: print("\nInvalid input, skipping these files...\n") + return + + # Deduplicate the list of all kicked users + total_list_kicked_users = list(set(total_list_kicked_users)) + + # Print the list of all kicked users + print(f"\n\nList of all kicked users: {total_list_kicked_users}\n") + + # Return the list of all kicked users + return num_rooms_blocked, total_list_kicked_users def block_recommended_rdlist_tags(): # Check if user account already exists account_query = user_commands.query_account(hardcoded_variables.rdlist_bot_username) + # Generate random password preset_password = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(20)) + # If user is not found, create it if 'User not found' in account_query: # Create user account user_commands.create_account(hardcoded_variables.rdlist_bot_username, preset_password) else: - print("Account already exists.") + print(f"\n@{hardcoded_variables.rdlist_bot_username}:{hardcoded_variables.base_url} account already exists. Resetting account password.") user_commands.reset_password(hardcoded_variables.rdlist_bot_username, preset_password) + # Promote bot user to server admin + print(f"\nEnsuring @{hardcoded_variables.rdlist_bot_username}:{hardcoded_variables.base_url} account is a server admin.") user_commands.set_user_server_admin(hardcoded_variables.rdlist_bot_username) + # Define default valies for shutdown_room() preset_new_room_name = 'POLICY VIOLATION' preset_message = 'THIS ROOM VIOLATES SERVER POLICIES' - preset_purge_choice = 'y' - preset_block_choice = 'y' + # Block all rooms with recommended tag set - block_all_rooms_with_rdlist_tags(True, hardcoded_variables.rdlist_bot_username, preset_new_room_name, preset_message, preset_purge_choice, preset_block_choice) + num_rooms_blocked, total_list_kicked_users = block_all_rooms_with_rdlist_tags(True, hardcoded_variables.rdlist_bot_username, preset_new_room_name, preset_message) + # Print user login details - print("\n\nUser login details for your moderator account:\n") + print("\n\nRoom shutdowns completed!\n\nUser login details for your moderator account:\n") print("Username: " + hardcoded_variables.rdlist_bot_username) print("Password: " + preset_password) + + # Print statistics for the admin + print(f"\nPrint rdlist statistics:") + print(f"\nNumber of rooms blocked/purged: {num_rooms_blocked}") + print(f"Number of local users located in rdlist rooms and kicked: {len(total_list_kicked_users)}") + print(f"\nThe following users were current members of rooms tagged in rdlist: {total_list_kicked_users}") + + # Ask admin if they want to deactivate all the accounts that were kicked from rdlist rooms + deactivate_confirmation = input("\nDo you want to also deactivate all these accounts that were kicked from rdlist rooms? y/n? ") + if deactivate_confirmation.lower() in ['y', 'yes', 'Y', 'Yes']: + for user_id in total_list_kicked_users: + user_commands.deactivate_account(user_id) + print(f"\nThese accounts have been deactivated.") + elif deactivate_confirmation.lower() in ['n', 'no', 'N', 'No']: + print("\nSkipping account deactivations...\n") \ No newline at end of file diff --git a/report_commands.py b/report_commands.py index d708cb6..1f54423 100644 --- a/report_commands.py +++ b/report_commands.py @@ -54,6 +54,11 @@ def encrypt_user_folder(user_report_folder, username): # Delete the original zip file os.remove(zip_file_name) + # Write the password to a file + password_file = open(zip_file_name + ".aes" + ".password", "w") + password_file.write(strong_password) + password_file.close() + # You can return the password if you need to use it later, or you can directly print it here return strong_password, zip_file_name + ".aes" @@ -65,10 +70,11 @@ def generate_user_report(preset_username): username = user_commands.parse_username(preset_username) # Check if user exists - if user_commands.check_user_account_exists(username) == True: - print("\nUser exists, continuing with report generation.") + if user_commands.check_user_account_exists(username) == False: + print("\nUser does not exist, exiting report generation.") return - + elif user_commands.check_user_account_exists(username) == True: + print(f"\nGenerating user report for {username}...") # If report_folder ends in a slash, remove it report_folder = get_report_folder() @@ -117,34 +123,51 @@ def generate_user_report(preset_username): ipinfo_file.write(json.dumps(ipinfo, indent=4, sort_keys=True)) ipinfo_file.close() - # For each room the user is in, get the room state and write to ./report/username/room_states/ - room_states_folder = user_report_folder + "room_states/" - if os.path.exists(room_states_folder) == False: - os.mkdir(room_states_folder) + # Prepare folder structures + room_folder = user_report_folder + "rooms/" + dm_folder = user_report_folder + "dms/" + details_folder = "details/" + states_folder = "states/" + + # For each room the user is in, get the room state and write to ./report/username/rooms/states/ + room_states_folder = room_folder + states_folder + if not os.path.exists(room_states_folder): + os.makedirs(room_states_folder, exist_ok=True) + + # For each room the user is in, get the room details and write to ./report/username/rooms/details/ + room_details_folder = room_folder + details_folder + if not os.path.exists(room_details_folder): + os.makedirs(room_details_folder, exist_ok=True) + + # For DM, get the state and write to ./report/username/dms/states/ + dm_states_folder = dm_folder + states_folder + if not os.path.exists(dm_states_folder): + os.makedirs(dm_states_folder, exist_ok=True) + + # For DM, get the details and write to ./report/username/dms/details/ + dm_details_folder = dm_folder + details_folder + if not os.path.exists(dm_details_folder): + os.makedirs(dm_details_folder, exist_ok=True) room_list = joined_rooms_dict.get('joined_rooms', []) - count = 0 - for room in room_list: - count += 1 - room = room.split(" ")[0] - room_commands.export_room_state(room, room_states_folder) - if count > 4 and hardcoded_variables.testing_mode == True: - break - - # For each room the user is in, get the room details and write to ./report/username/room_details/ - room_details_folder = user_report_folder + "room_details/" - if os.path.exists(room_details_folder) == False: - os.mkdir(room_details_folder) - count = 0 for room in room_list: count += 1 room = room.split(" ")[0] room_details = room_commands.list_room_details(room) - room_details_file = open(room_details_folder + room + ".json", "w") - room_details_file.write(str(room_details)) + + # Check the room conditions to select the proper output folders + if room_details['joined_members'] == 2 and room_details['public'] == False: + room_details_file = open(dm_details_folder + room + ".json", "w") + state_events = room_commands.export_room_state(room, dm_states_folder, True) + else: + room_details_file = open(room_details_folder + room + ".json", "w") + state_events = room_commands.export_room_state(room, room_states_folder, True) + + room_details_file.write(json.dumps(room_details, indent=4, sort_keys=True)) room_details_file.close() + if count > 4 and hardcoded_variables.testing_mode == True: break @@ -155,7 +178,7 @@ def generate_user_report(preset_username): encrypted_zip_file_size = os.path.getsize(encrypted_zip_file_name) / 1000000 # Print the password and the encrypted .zip file name - print("\nReport generated successfully on user: \"" + username + "\"\n\nYou can send this .zip file and password when reporting a user to law enforcement.") + print("Report generated successfully on user: \"" + username + "\"\n\nYou can send this .zip file and password when reporting a user to law enforcement.") print("\nPassword: " + strong_password) print("Encrypted .zip file location: " + encrypted_zip_file_name) print("Encrypted .zip file size: " + str(encrypted_zip_file_size) + " MB\n") @@ -173,136 +196,124 @@ def decrypt_zip_file(): print("\nDecrypted .zip file location: " + encrypted_zip_file_name[:-4] + "\n") def lookup_homeserver_admin_email(preset_baseurl): - if preset_baseurl == '': - baseurl = input("\nEnter the base URL to collect the admin contact details (Example: matrix.org): ") - elif preset_baseurl != '': - baseurl = preset_baseurl + if preset_baseurl == '': + baseurl = input("\nEnter the base URL to collect the admin contact details (Example: matrix.org): ") + elif preset_baseurl != '': + baseurl = preset_baseurl - # If baseurl is matrix.org, return 'abuse@matrix.org' as a hardcoded response - if baseurl == "matrix.org": - print("\nAdmin contact email(s) for " + baseurl + " are: abuse@matrix.org") - return {"matrix.org": ["abuse@matrix.org"]}, False + # If baseurl is matrix.org, return 'abuse@matrix.org' as a hardcoded response + if baseurl == "matrix.org": + print("\nAdmin contact email(s) for " + baseurl + " are: abuse@matrix.org") + return {"matrix.org": ["abuse@matrix.org"]}, False - # Check target homserver for MSC1929 support email - url = f"https://{baseurl}/.well-known/matrix/support" - try: - response = requests.get(url) - except requests.exceptions.RequestException as e: - print(f"Error: Unable to connect to server {baseurl}. Trying WHOIS data...") - response = None + # Check target homserver for MSC1929 support email + url = f"https://{baseurl}/.well-known/matrix/support" + try: + response = requests.get(url) + except requests.exceptions.RequestException as e: + print(f"Error: Unable to connect to server {baseurl}. Trying WHOIS data...") + response = None - # If the request was successful, the status code will be 200 - if response and response.status_code == 200: - # Parse the response as JSON - data = json.loads(response.text) + # If the request was successful, the status code will be 200 + if response and response.status_code == 200: + # Parse the response as JSON + data = json.loads(response.text) - # Extract the emails from the admins field and remove duplicates - admin_emails = list({admin['email_address'] for admin in data['admins']}) + # Extract the emails from the admins field and remove duplicates + admin_emails = list({admin['email_address'] for admin in data['admins']}) - print("\nAdmin contact emails for " + baseurl + " are: " + str(admin_emails)) + print("\nAdmin contact emails for " + baseurl + " are: " + str(admin_emails)) - # Create a dictionary with baseurl as key and emails as value - email_dict = {baseurl: admin_emails} + # Create a dictionary with baseurl as key and emails as value + email_dict = {baseurl: admin_emails} - return email_dict, False - else: - print(f"Error: Unable to collect admin email from server {baseurl}") - print("Attempting to collect admin email from WHOIS data...") + return email_dict, False + else: + print(f"Error: Unable to collect admin email from server {baseurl}") + print("Attempting to collect admin email from WHOIS data...") - # Get WHOIS data - try: - w = whois.whois(baseurl) - if w.emails: - print("\nAdmin contact email(s) for " + baseurl + " are: " + str(w.emails)) - return {baseurl: list(w.emails)}, True - else: - print(f"Error: Unable to collect admin email from WHOIS data for {baseurl}") - return None, False - except: - print(f"Error: Unable to collect WHOIS data for {baseurl}") - return None, False + # Get WHOIS data + try: + w = whois.whois(baseurl) + if w.emails: + print("\nAdmin contact email(s) for " + baseurl + " are: " + str(w.emails)) + return {baseurl: list(w.emails)}, True + else: + print(f"Error: Unable to collect admin email from WHOIS data for {baseurl}") + return None, False + except: + print(f"Error: Unable to collect WHOIS data for {baseurl}") + return None, False def send_email(email_address, email_subject, email_content, email_attachments): - assert isinstance(email_attachments, list) + assert isinstance(email_attachments, list) - msg = MIMEMultipart() # Create a multipart message - msg['From'] = hardcoded_variables.smtp_user - msg['To'] = COMMASPACE.join([email_address]) - msg['Subject'] = email_subject + msg = MIMEMultipart() # Create a multipart message + msg['From'] = hardcoded_variables.smtp_user + msg['To'] = COMMASPACE.join([email_address]) + msg['Subject'] = email_subject - msg.attach(MIMEText(email_content)) # Attach the email body + msg.attach(MIMEText(email_content)) # Attach the email body - # Attach files - for file in email_attachments: - part = MIMEBase('application', "octet-stream") - with open(file, 'rb') as f: - part.set_payload(f.read()) - encoders.encode_base64(part) - part.add_header('Content-Disposition', 'attachment', filename=os.path.basename(file)) - msg.attach(part) + # Attach files + for file in email_attachments: + part = MIMEBase('application', "octet-stream") + with open(file, 'rb') as f: + part.set_payload(f.read()) + encoders.encode_base64(part) + part.add_header('Content-Disposition', 'attachment', filename=os.path.basename(file)) + msg.attach(part) - try: - # Send the email via SMTP server - smtp = smtplib.SMTP(hardcoded_variables.smtp_server, hardcoded_variables.smtp_port) - smtp.starttls() - smtp.login(hardcoded_variables.smtp_user, hardcoded_variables.smtp_password) - smtp.sendmail(hardcoded_variables.smtp_user, email_address, msg.as_string()) - smtp.close() - return True - except Exception as e: - print(f"Failed to send email: {e}") - return False + try: + # Send the email via SMTP server + smtp = smtplib.SMTP(hardcoded_variables.smtp_server, hardcoded_variables.smtp_port) + smtp.starttls() + smtp.login(hardcoded_variables.smtp_user, hardcoded_variables.smtp_password) + smtp.sendmail(hardcoded_variables.smtp_user, email_address, msg.as_string()) + smtp.close() + return True + except Exception as e: + print(f"Failed to send email: {e}") + return False def test_send_email(): - # Ask the user for the destination email address - email_address = input("\nPlease enter the destination email address to send this test email too: ") + # Ask the user for the destination email address + email_address = input("\nPlease enter the destination email address to send this test email too: ") - # Example email parameters - email_subject = "Test Email" - email_content = "This is a test email." - email_attachments = ["./test_data/evil_clown.jpeg"] # List of file paths. Adjust this to the actual files you want to attach. + # Example email parameters + email_subject = "Test Email" + email_content = "This is a test email." + email_attachments = ["./test_data/evil_clown.jpeg"] # List of file paths. Adjust this to the actual files you want to attach. - # Try to send the email - if send_email(email_address, email_subject, email_content, email_attachments): - print("\nEmail successfully sent.") - else: - print("\nFailed to send email.") + # Try to send the email + if send_email(email_address, email_subject, email_content, email_attachments): + print("\nEmail successfully sent.") + else: + print("\nFailed to send email.") -def send_incident_report(full_username, room_id, rdlist_tags): - # First extract the baseurl from the username, for example '@billybob:matrix.org' becomes 'matrix.org' - baseurl = full_username.split(":")[1] +def prepare_email_content(user_dict, from_whois, baseurl): + email_content = f"""Dear Administrator, - # Use the lookup function to get the admin's email - if hardcoded_variables.testing_mode == True: - admin_email_dict = {hardcoded_variables.base_url: [hardcoded_variables.report_return_email]} - print("admin_email_dict: " + str(admin_email_dict)) - from_whois = True - elif hardcoded_variables.testing_mode == False: - admin_email_dict, from_whois = lookup_homeserver_admin_email(baseurl) +We regret to inform you that there have been incidents involving the following users in your homeserver: + """ - # If no admin emails are found, return False - if not admin_email_dict or baseurl not in admin_email_dict: - print(f"Unable to find any admin emails for {baseurl}") - return False + for full_username, room_dict in user_dict.items(): + email_content += f"\nUser: {full_username}\n" + for room_id, rdlist_tags in room_dict.items(): + email_content += f"Is in the room {room_id}, this room has been flagged with the following rdlist tags:\n{', '.join(rdlist_tags)}\n" - # Prepare the incident report email content - email_subject = f"Incident Report for user: {full_username}" - email_content = f"""Dear Administrator, - -We regret to inform you that your user {full_username} has been involved in an incident in the following room: {room_id} - -In that room they were exposed to the following content: {', '.join(rdlist_tags)} + email_content += f""" We request your immediate attention to this matter. It is recommended that you: -- Generate a report on this user's account and send it to law enforcement. -- Block and purge this room from your homeserver. -- Deactivate this users account. +- Generate a report on these users' accounts and send it to law enforcement. +- Block and purge these rooms from your homeserver. +- Deactivate these users' accounts. All of these actions can be done automatically using this moderation tool: https://github.com/PC-Admin/matrix-moderation-tool ********************************************************************** -THIS EMAIL IS UNMONITORED, PLEASE DO NOT REPLY TO IT +\tTHIS EMAIL IS UNMONITORED, PLEASE DO NOT REPLY TO IT ********************************************************************** To contact us please email {hardcoded_variables.report_return_email}. @@ -312,64 +323,105 @@ Thank you for helping us make Matrix safer. Best regards, Incident Report Team https://{hardcoded_variables.base_url} - """ +""" - # If email address is collected from WHOIS data, append an extra paragraph - if from_whois: - email_content += f"""\n\n********************************************************************** -ATTENTION DOMAIN REGISTRAR, YOUR ACTION IS REQUIRED HERE + if from_whois: + email_content += f"""\n\n********************************************************************** +\tATTENTION DOMAIN REGISTRAR, YOUR ACTION IS REQUIRED HERE ********************************************************************** \nThis email is intended for the owner of the domain {baseurl}, but they have not published their email address. \nAs the recipient of this email, you have a legal obligation to ensure that this email reaches them. \nTo avoid receiving these emails in the future, please ask them to configure MSC1929 for their Matrix server: https://github.com/matrix-org/matrix-spec-proposals/pull/1929 - """ + """ - # Confidentiality warning - confidentiality_warning = """\n\n********************************************************************** - WARNING! CONFIDENTIALITY NOTICE! - \nThis email message and any attached files may contain information -that is confidential and subject of legal privilege intended only for -use by the individual or entity to whom they are addressed. If you -are not the intended recipient or the person responsible for -delivering the message to the intended recipient be advised that you -have received this message in error and that any use, copying, -circulation, forwarding, printing or publication of this message or -attached files is strictly forbidden, as is the disclosure of the -information contained therein. If you have received this message in -error, please notify the sender immediately and delete it from your -inbox. - \n********************************************************************** - """ + confidentiality_warning = f"""\n\n********************************************************************** +\t\tATTENTION! CONFIDENTIALITY NOTICE! +\nThis electronic mail and any files linked to it may hold information +that is privileged, confidential, and intended exclusively for the use of +the designated recipient or entity. If you're not the expected recipient or +the individual tasked with delivering the electronic mail to the intended recipient, +be aware that you've received this mail in error. Any utilization, duplication, +distribution, forwarding, printing, or publicizing of this email or the attached files +is strictly prohibited, as is revealing the information contained within. +If you've received this email in error, please promptly inform the sender and +remove it from your electronic mailbox. + \n********************************************************************** + """ - # Append the confidentiality warning - email_content += confidentiality_warning + email_content += confidentiality_warning + return email_content - # Prepare the email attachments. This can be modified based on what you want to attach. - email_attachments = [] - # Loop over each admin email address and send them the email - success = True - for email_address in admin_email_dict[baseurl]: - if not send_email(email_address, email_subject, email_content, email_attachments): - print(f"Failed to send email to {email_address}") - success = False +def send_incident_report(incidents_dict): + success = True + homeserver_dict = {} - return success + # Aggregate incidents by homeserver. + for full_username, room_dict in incidents_dict.items(): + baseurl = full_username.split(":")[1] -def test_send_incident_report(): - # Preset the parameters - full_username = f"@billybob:{hardcoded_variables.base_url}" - room_id = "!dummyid:matrix.org" - rdlist_tags = ["csam", "lolicon", "beastiality"] + if baseurl not in homeserver_dict: + homeserver_dict[baseurl] = {} + homeserver_dict[baseurl][full_username] = room_dict - # Try to send the incident report - try: - if hardcoded_variables.testing_mode == True: - print("\nWARNING: TESTING MODE ENABLED, SENDING EMAIL TO: " + hardcoded_variables.report_return_email + "\n") - if send_incident_report(full_username, room_id, rdlist_tags): - print("\nIncident report successfully sent.") - else: - print("\nFailed to send the incident report.") - except Exception as e: - print(f"\nFailed to send incident report: {e}") + print("homeserver_dict: " + str(homeserver_dict)) + # Prepare and send one email per homeserver, including all users and rooms. + for baseurl, user_dict in homeserver_dict.items(): + if hardcoded_variables.testing_mode == True: + admin_email_dict = {baseurl: [hardcoded_variables.report_return_email]} + print("admin_email_dict: " + str(admin_email_dict)) + from_whois = True + elif hardcoded_variables.testing_mode == False: + admin_email_dict, from_whois = lookup_homeserver_admin_email(baseurl) + + if not admin_email_dict or baseurl not in admin_email_dict: + print(f"Unable to find any admin emails for {baseurl}") + success = False + continue + + # Prepare and send one email per homeserver, including all users and rooms. + for email_address in admin_email_dict[baseurl]: + email_subject = f"Incident Report for users from {baseurl}" + email_content = prepare_email_content(user_dict, from_whois, baseurl) + + email_attachments = [] + if not send_email(email_address, email_subject, email_content, email_attachments): + print(f"Failed to send email to {email_address}") + success = False + + return success + +def test_send_incident_reports(): + incidents_dict = { + f"@billybob:matrix.org": { + "!dummyid1:matrix.org": ["csam", "lolicon", "beastiality"], + "!dummyid2:matrix.org": ["csam", "anarchy"] + }, + f"@johndoe:matrix.org": { + "!dummyid3:matrix.org": ["csam", "lolicon", "toddlercon"], + "!dummyid4:matrix.org": ["csam", "terrorism"] + }, + f"@pedobear:perthchat.org": { + "!dummyid5:matrix.org": ["csam", "lolicon", "jailbait"], + "!dummyid6:matrix.org": ["csam", "hub_links"] + }, + f"@randomcreep:perthchat.org": { + "!dummyid7:matrix.org": ["csam", "jailbait"], + "!dummyid8:matrix.org": ["csam", "pre_ban"] + }, + f"@fatweeb:grin.hu": { + "!dummyid9:matrix.org": ["csam", "lolicon"], + "!dummyid10:matrix.org": ["csam", "degen"] + } + } + + try: + if hardcoded_variables.testing_mode == True: + print("\nWARNING: TESTING MODE ENABLED, SENDING EMAIL TO: " + hardcoded_variables.report_return_email + "\n") + if send_incident_report(incidents_dict): + print("\nIncident reports successfully sent.") + else: + print("\nFailed to send the incident reports.") + except Exception as e: + print(f"\nFailed to send incident reports: {e}") diff --git a/room_commands.py b/room_commands.py index 6be0a6d..2a653f7 100644 --- a/room_commands.py +++ b/room_commands.py @@ -21,18 +21,53 @@ def list_room_details(preset_internal_ID): url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/rooms/{internal_ID}" headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"} - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.get(url, headers=headers, verify=True) room_details_dict = json.loads(response.text) - print(json.dumps(room_details_dict, indent=4, sort_keys=True)) return room_details_dict # Example # $ curl -kXGET 'https://matrix.perthchat.org/_synapse/admin/v1/rooms/!OeqILBxiHahidSQQoC:matrix.org?access_token=ACCESS_TOKEN' -def export_room_state(preset_internal_ID, preset_directory): +def get_room_members(preset_internal_ID, local_only): + if preset_internal_ID == '': + internal_ID = input("\nEnter the internal id of the room you wish to query (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ") + elif preset_internal_ID != '': + internal_ID = preset_internal_ID + url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/rooms/{internal_ID}/members" + headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"} + + response = requests.get(url, headers=headers, verify=True) + + room_members_dict = json.loads(response.text) + + # Print room_members_dict for debugging + #print("room_members_dict: " + json.dumps(room_members_dict, indent=4, sort_keys=True)) + + # Check if the 'members' key is in the response + if 'members' in room_members_dict: + # List of all members + room_members = room_members_dict['members'] + + if local_only: + # Filter to get only local members + room_members = [member for member in room_members if member.split(':')[1] == hardcoded_variables.base_url] + + else: + # If 'members' key is not found, return an empty dictionary + room_members = {} + + # Print room_members for debugging + #print("room_members: " + str(room_members)) + return room_members + +# Example +# $ curl -kXGET 'https://matrix.perthchat.org/_synapse/admin/v1/rooms/!OeqILBxiHahidSQQoC:matrix.org/members?access_token=ACCESS_TOKEN' + +# This function returns the state of a room as output and optionally writes it to a json file +def export_room_state(preset_internal_ID, preset_directory, save_to_file): # record the current directory location current_directory = os.getcwd() @@ -46,21 +81,28 @@ def export_room_state(preset_internal_ID, preset_directory): elif preset_directory != '': room_dir = preset_directory - os.makedirs(room_dir, exist_ok=True) - - unix_time = int(time.time()) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/rooms/{internal_ID}/state" headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"} - filename = os.path.join(room_dir, f"{internal_ID}_state_{unix_time}.json") - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.get(url, headers=headers, verify=True) - with open(filename, 'w') as f: - f.write(response.text) - state_events_dict = json.loads(response.text) + # If save_to_file is True, write the output to a file + if save_to_file == True: + if "Room not found" not in state_events_dict.get('error', ''): + # If save_to_file is True, create the directory if it doesn't exist + os.makedirs(room_dir, exist_ok=True) + # Define the filename and write to it + unix_time = int(time.time()) + filename = os.path.join(room_dir, f"{internal_ID}_state_{unix_time}.json") + #print(f"Writing room state events to {filename}") + with open(filename, 'w') as f: + f.write(json.dumps(state_events_dict, indent=4, sort_keys=True)) + elif "Room not found" in state_events_dict.get('error', ''): + print("Room not found, skipping write to file...") + return state_events_dict # Example @@ -69,18 +111,22 @@ def export_room_state(preset_internal_ID, preset_directory): # See # https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#room-state-api -def list_directory_rooms(): - url = f"https://{hardcoded_variables.homeserver_url}/_matrix/client/r0/publicRooms" - headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"} +def public_directory_rooms(): + url = f"https://{hardcoded_variables.homeserver_url}/_matrix/client/r0/publicRooms" + headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"} - print("\n" + url + "\n") - response = requests.get(url, headers=headers, verify=True) - output = response.text + #print("\n" + url + "\n") + response = requests.get(url, headers=headers, verify=True) + output = response.text - output = output.replace('\"room_id\":\"','\n') - output = output.replace('\",\"name','\n\",\"name') + output = output.replace('\"room_id\":\"','\n') + output = output.replace('\",\"name','\n\",\"name') - print(json.dumps(output, indent=4, sort_keys=True)) + print(json.dumps(output, indent=4, sort_keys=True)) + + public_room_directories_dict = json.loads(response.text) + + return public_room_directories_dict # Example # $ curl -kXGET https://matrix.perthchat.org/_matrix/client/r0/publicRooms?access_token=ACCESS_TOKEN @@ -94,7 +140,7 @@ def remove_room_from_directory(preset_internal_ID): headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"} data = {"visibility": "private"} - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.put(url, headers=headers, json=data, verify=True) print(response.text) @@ -127,7 +173,7 @@ def list_and_download_media_in_room(preset_internal_ID, preset_print_file_list_c internal_ID = preset_internal_ID url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/room/{internal_ID}/media" - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.get(url, headers=headers, verify=True) media_list_output = response.text @@ -268,67 +314,65 @@ def shutdown_room(preset_internal_ID,preset_user_ID,preset_new_room_name,preset_ username = parse_username(user_ID) - if purge_choice == "y" or purge_choice == "Y" or purge_choice == "yes" or purge_choice == "Yes": + if purge_choice == "y" or purge_choice == "Y" or purge_choice == "yes" or purge_choice == "Yes" or purge_choice == True: purge_choice = "true" - elif purge_choice == "n" or purge_choice == "N" or purge_choice == "no" or purge_choice == "No": + elif purge_choice == "n" or purge_choice == "N" or purge_choice == "no" or purge_choice == "No" or purge_choice == False: purge_choice = "false" else: print("Input invalid! exiting.") return - if block_choice == "y" or block_choice == "Y" or block_choice == "yes" or block_choice == "Yes": + if block_choice == "y" or block_choice == "Y" or block_choice == "yes" or block_choice == "Yes" or block_choice == True: block_choice = "true" - elif block_choice == "n" or block_choice == "N" or block_choice == "no" or block_choice == "No": + elif block_choice == "n" or block_choice == "N" or block_choice == "no" or block_choice == "No" or block_choice == False: block_choice = "false" else: print("Input invalid! exiting.") return - # First export the state events of the room to examine them later or import them to rdlist - room_status = export_room_state(internal_ID) + headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"} + data = { + "new_room_user_id": f"@{username}:{hardcoded_variables.base_url}", + "room_name": new_room_name, + "message": message, + "block": bool(block_choice), + "purge": bool(purge_choice) + } + delete_room_url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/rooms/{internal_ID}" + response = requests.delete(delete_room_url, headers=headers, json=data, verify=True) + #print(response.text) - # Convert the string to a dictionary - room_status_dict = json.loads(room_status) + status = "null" + count = 0 + sleep_time = 1 - if "Room not found" not in room_status_dict.get('error', ''): - print(f"Exported room state events to file, this data can be useful for profiling a room after you've blocked/purged it: ./state_events{internal_ID}_state.json") + while status != "complete" and count < 8: + time.sleep(sleep_time) + count += 1 + sleep_time *= 2 + check_status_url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/rooms/{internal_ID}/delete_status" + status_response = requests.get(check_status_url, headers=headers, verify=True) + #print(f"status_response: {status_response.text}") + output_json = status_response.json() + #print(f"output_json: {output_json}") + status = output_json["results"][0]["status"] + #print(f"status: {status}") + if status != "complete": + print(f"Sleeping for {sleep_time} seconds...") - headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"} - data = { - "new_room_user_id": f"@{username}:{hardcoded_variables.base_url}", - "room_name": new_room_name, - "message": message, - "block": block_choice, - "purge": purge_choice - } - delete_room_url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/rooms/{internal_ID}" - response = requests.delete(delete_room_url, headers=headers, json=data, verify=True) - - status = "null" - count = 0 - sleep_time = 1 - - while status != "complete" and count < 8: - time.sleep(sleep_time) - count += 1 - sleep_time *= 2 - check_status_url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/rooms/{internal_ID}/delete_status" - status_response = requests.get(check_status_url, headers=headers, verify=True) - output_json = status_response.json() - status = output_json["results"][0]["status"] - print(f"status: {status}") - if status != "complete": - print(f"Sleeping for {sleep_time} seconds...") - - if status == "complete": - print(f"{internal_ID} has been successfully shutdown!") - if str(output_json["results"][0]["shutdown_room"]["kicked_users"]) != '[]': - print("List of kicked users:") - for entry in output_json["results"][0]["shutdown_room"]["kicked_users"]: - print(entry) - print("") + if status == "complete": + print(f"{internal_ID} has been successfully shutdown!") + list_kicked_users = [] + if str(output_json["results"][0]["shutdown_room"]["kicked_users"]) != '[]': + print("List of kicked users:") + for entry in output_json["results"][0]["shutdown_room"]["kicked_users"]: + list_kicked_users.append(entry) + print(entry) else: - print("The room was not found.") + print(f"Failed to shutdown {internal_ID}!") + list_kicked_users = [] + + return list_kicked_users # Example: #$ curl -H "Authorization: Bearer ACCESS_TOKEN" --data '{ "new_room_user_id": "@PC-Admin:perthchat.org", "room_name": "VIOLATION ROOM", "message": "YOU HAVE BEEN NAUGHTY!", "block": true, "purge": true }' -X DELETE 'https://matrix.perthchat.org/_synapse/admin/v2/rooms/!yUykDcYIEtrbSxOyPD:perthchat.org' @@ -357,7 +401,15 @@ def shutdown_multiple_rooms(): preset_new_room_name = input("\nPlease enter the room name of the muted violation room your users will be sent to: ") preset_message = input("\nPlease enter the shutdown message that will be displayed to users: ") preset_purge_choice = input("\n Do you want to purge these rooms? (This deletes all the room history from your database.) y/n? ") + if preset_purge_choice.lower() in ["y", "yes", "Y", "Yes"]: + preset_purge_choice = True + elif preset_purge_choice.lower() in ["n", "no", "N", "No"]: + preset_purge_choice = False preset_block_choice = input("\n Do you want to block these rooms? (This prevents your server users re-entering the room.) y/n? ") + if preset_block_choice.lower() in ["y", "yes", "Y", "Yes"]: + preset_block_choice = True + elif preset_block_choice.lower() in ["n", "no", "N", "No"]: + preset_block_choice = False # Get the directory of the current script script_dir = os.path.dirname(os.path.realpath(__file__)) room_list_data = [] diff --git a/user_commands.py b/user_commands.py index b5df538..504329a 100644 --- a/user_commands.py +++ b/user_commands.py @@ -87,7 +87,7 @@ def create_account(preset_username, preset_password): "password": user_password } - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.put(url, headers=headers, data=json.dumps(data), verify=True) if response.status_code == 201: @@ -97,7 +97,9 @@ def create_account(preset_username, preset_password): else: print(f"Error creating account: {response.status_code}, {response.text}") - return response.text + create_account_dict = json.loads(response.text) + + return create_account_dict # Example: # $ curl -kX PUT -H 'Content-Type: application/json' -d '{"password": "user_password","admin": false,"deactivated": false}' https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN @@ -136,16 +138,16 @@ def reset_password(preset_username, preset_password): headers = {'Content-Type': 'application/json'} data = {'new_password': password, 'logout_devices': True} - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.post(url, headers=headers, data=json.dumps(data), verify=True) - if response.status_code == 200: - print(response.text) - else: + if response.status_code != 200: print(f"Error resetting password: {response.status_code}, {response.text}") - return response.text + reset_password_dict = json.loads(response.text) + + return reset_password_dict # Example: # $ curl -X POST -H 'Content-Type: application/json' -d '{"new_password": "dogpoo", "logout_devices": true}' https://matrix.perthchat.org/_synapse/admin/v1/reset_password/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN @@ -172,20 +174,20 @@ def set_user_server_admin(preset_username): "admin": server_admin_result } - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.put(url, headers=headers, data=json.dumps(data), verify=True) - if response.status_code == 200: - print("Successfully set user as server admin.") - else: + if response.status_code != 200: print(f"Error setting user as server admin: {response.status_code}, {response.text}") - return response.text + set_user_server_admin_dict = json.loads(response.text) + + return set_user_server_admin_dict # Example: # $ curl -kX POST -H 'Content-Type: application/json' -d '{"admin": "true"}' https://matrix.perthchat.org/_synapse/admin/v2/users/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN -def whois_account(preset_username, output_file=None): +def whois_account(preset_username): if preset_username == '': username = input("\nPlease enter the username you wish to whois: ") elif preset_username != '': @@ -195,20 +197,11 @@ def whois_account(preset_username, output_file=None): url = f"https://{hardcoded_variables.homeserver_url}/_matrix/client/r0/admin/whois/@{username}:{hardcoded_variables.base_url}" url += f"?access_token={hardcoded_variables.access_token}" - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.get(url, verify=True) - output_text = "" - if response.status_code == 200: - output_text = response.text + "\n" - else: - output_text = f"Error retrieving account info: {response.status_code}, {response.text}\n" - - if output_file: - with open(output_file, 'a') as f: - f.write(output_text) - else: - print(output_text) + if response.status_code != 200: + print(f"Error retrieving account info: {response.status_code}, {response.text}\n") whois_account_dict = json.loads(response.text) @@ -265,9 +258,7 @@ def list_joined_rooms(preset_username): response = requests.get(url, verify=True) - if response.status_code == 200: - print(response.text + "\n") - else: + if response.status_code != 200: print(f"Error querying joined rooms: {response.status_code}, {response.text}") joined_rooms_dict = json.loads(response.text) @@ -317,12 +308,10 @@ def query_account(preset_username): url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users/@{username}:{hardcoded_variables.base_url}?access_token={hardcoded_variables.access_token}" - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.get(url, verify=True) - if response.status_code == 200: - print(response.text) - else: + if response.status_code != 200: print(f"Error querying account: {response.status_code}, {response.text}") query_account_dict = json.loads(response.text) @@ -394,12 +383,10 @@ def collect_account_data(preset_username): url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/accountdata?access_token={hardcoded_variables.access_token}" - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.get(url, verify=True) - if response.status_code == 200: - print(response.text) - else: + if response.status_code != 200: print(f"Error querying account: {response.status_code}, {response.text}") account_data_dict = json.loads(response.text) @@ -418,12 +405,10 @@ def list_account_pushers(preset_username): url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/pushers?access_token={hardcoded_variables.access_token}" - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.get(url, verify=True) - if response.status_code == 200: - print(response.text) - else: + if response.status_code != 200: print(f"Error querying account: {response.status_code}, {response.text}") pusher_data_dict = json.loads(response.text) @@ -438,7 +423,7 @@ def get_rate_limit(): url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/override_ratelimit?access_token={hardcoded_variables.access_token}" - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.get(url, verify=True) if response.status_code == 200: @@ -465,7 +450,7 @@ def set_rate_limit(): "burst_count": int(burst_count) } - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.post(url, headers=headers, data=json.dumps(data), verify=True) @@ -484,7 +469,7 @@ def delete_rate_limit(): url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/override_ratelimit?access_token={hardcoded_variables.access_token}" - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.delete(url, verify=True) if response.status_code == 200: @@ -506,15 +491,15 @@ def check_user_account_exists(preset_username): url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/username_available?username={username}&access_token={hardcoded_variables.access_token}" - print("\n" + url + "\n") + #print("\n" + url + "\n") response = requests.get(url, verify=True) if response.status_code == 200: - print("User ID is available.") - return True - elif response.status_code == 400: - print(f"User ID already taken.") + #print("User ID is available.") return False + elif response.status_code == 400: + #print(f"User ID already exists.") + return True else: print(f"Error querying account: {response.status_code}, {response.text}")