diff --git a/moderation_tool.py b/moderation_tool.py index fe7c81e..7bea639 100755 --- a/moderation_tool.py +++ b/moderation_tool.py @@ -32,27 +32,27 @@ if length_access_token == 0: pass_token = False while pass_token == False: - menu_input = input('\nPlease select one of the following options:\n#### User Account Commands ####\n1) Deactivate a user account.\n2) Create a user account.\n3) Query user account.\n4) List room memberships of user.\n5) Query multiple user accounts.\n6) Reset a users password.\n7) Promote a user to server admin.\n8) List all user accounts.\n9) Create multiple user accounts.\n10) Deactivate multiple user accounts.\n11) Quarantine all media a users uploaded.\n#### Room Commands ####\n12) List details of a room.\n13) Export the state events of a target room.\n14) List rooms in public directory.\n15) Remove a room from the public directory.\n16) Remove multiple rooms from the public directory.\n17) Redact a room event. (Like abusive avatars or display names.) \n18) List/Download all media in a room.\n19) Download media from multiple rooms.\n20) Quarantine all media in a room.\n21) Shutdown a room.\n22) Shutdown multiple rooms.\n23) Delete a room.\n24) Delete multiple rooms.\n25) Purge the event history of a room to a specific timestamp.\n26) Purge the event history of multiple rooms to a specific timestamp.\n#### Server Commands ####\n27) Delete and block a specific media. (Like an abusive avatar.) \n28) Purge remote media repository up to a certain date.\n29) Prepare database for copying events of multiple rooms.\n#### rdlist ####\n30) Block all rooms with specific rdlist tags.\n34) Block all rooms with recommended rdlist tags.\n(\'q\' or \'e\') Exit.\n\n') + menu_input = input('\nPlease select one of the following options:\n#### User Account Commands ####\n1) Deactivate a user account.\n2) Deactivate multiple user accounts.\n3) Create a user account.\n4) Create multiple user accounts.\n5) Reset a users password.\n6) Whois user account.\n7) Whois multiple user accounts.\n8) List room memberships of user.\n9) Promote a user to server admin.\n10) List all user accounts.\n11) Quarantine all media a users uploaded.\n#### Room Commands ####\n12) List details of a room.\n13) Export the state events of a target room.\n14) List rooms in public directory.\n15) Remove a room from the public directory.\n16) Remove multiple rooms from the public directory.\n17) Redact a room event. (Like abusive avatars or display names.) \n18) List/Download all media in a room.\n19) Download media from multiple rooms.\n20) Quarantine all media in a room.\n21) Shutdown a room.\n22) Shutdown multiple rooms.\n23) Delete a room.\n24) Delete multiple rooms.\n25) Purge the event history of a room to a specific timestamp.\n26) Purge the event history of multiple rooms to a specific timestamp.\n#### Server Commands ####\n27) Delete and block a specific media. (Like an abusive avatar.) \n28) Purge remote media repository up to a certain date.\n29) Prepare database for copying events of multiple rooms.\n#### rdlist ####\n30) Block all rooms with specific rdlist tags.\n34) Block all rooms with recommended rdlist tags.\n(\'q\' or \'e\') Exit.\n\n') if menu_input == "1": user_commands.deactivate_account('') elif menu_input == "2": - user_commands.create_account('','') - elif menu_input == "3": - user_commands.whois_account('') - elif menu_input == "4": - user_commands.list_joined_rooms('') - elif menu_input == "5": - user_commands.whois_multiple_accounts() - elif menu_input == "6": - user_commands.reset_password() - elif menu_input == "7": - user_commands.set_user_server_admin() - elif menu_input == "8": - user_commands.list_accounts() - elif menu_input == "9": - user_commands.create_multiple_accounts() - elif menu_input == "10": user_commands.deactivate_multiple_accounts() + elif menu_input == "3": + user_commands.create_account('','') + elif menu_input == "4": + user_commands.create_multiple_accounts() + elif menu_input == "6": + user_commands.reset_password('','') + elif menu_input == "6": + user_commands.whois_account('') + elif menu_input == "7": + user_commands.whois_multiple_accounts() + elif menu_input == "8": + user_commands.list_joined_rooms('') + elif menu_input == "9": + user_commands.set_user_server_admin('') + elif menu_input == "10": + user_commands.list_accounts() elif menu_input == "11": user_commands.quarantine_users_media() elif menu_input == "12": diff --git a/user_commands.py b/user_commands.py index 95669fc..028321a 100644 --- a/user_commands.py +++ b/user_commands.py @@ -1,5 +1,6 @@ -import subprocess +import requests +import json import time import csv import hardcoded_variables @@ -16,160 +17,52 @@ def deactivate_account(preset_username): username = parse_username(username) else: username = parse_username(preset_username) - command_string = "curl -X POST -H \"Authorization: Bearer " + hardcoded_variables.access_token + "\" 'https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/deactivate/@" + username + ":" + hardcoded_variables.base_url + "' --data '{\"erase\": true}'" - print("\n" + command_string + "\n") - process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) - output = process.stdout - print(output) + + url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/deactivate/@{username}:{hardcoded_variables.base_url}" + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {hardcoded_variables.access_token}" + } + + data = { + "erase": True + } + + print("\n" + url + "\n") + response = requests.post(url, headers=headers, data=json.dumps(data), verify=True) + + if response.status_code in [200,201]: + print("Successfully deactivated account.") + else: + print(f"Error deactivating account: {response.status_code}, {response.text}") + + return response.text # Example: # $ curl -X POST -H "Authorization: Bearer ACCESS_TOKEN" "https://matrix.perthchat.org/_synapse/admin/v1/deactivate/@billybob:perthchat.org" --data '{"erase": true}' -def reset_password(preset_username,preset_password): - if len(preset_username) == 0 and len(preset_password) == 0: - username = input("\nPlease enter the username for the password reset: ") - password = input("Please enter the password to set: ") - else: - username = parse_username(preset_username) - password = preset_password - username = parse_username(username) - command_string = "curl -X POST -H 'Content-Type: application/json' -d '{\"new_password\": \"" + password + "\", \"logout_devices\": true}' https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/reset_password/@" + username + ":" + hardcoded_variables.base_url + "?access_token=" + hardcoded_variables.access_token - print("\n" + command_string + "\n") - process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) - output = process.stdout - print(output) - return output - -# Example: -# $ curl -X POST -H 'Content-Type: application/json' -d '{"new_password": "dogpoo", "logout_devices": true}' https://matrix.perthchat.org/_synapse/admin/v1/reset_password/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN - -def set_user_server_admin(preset_username): - if len(preset_username) == 0: - # tried setting 'admin: false' here but it failed and promoted the user instead! - print("\nBe aware that you need to set at least 1 user to server admin already by editing the database in order to use this command. See https://github.com/PC-Admin/PC-Admins-Synapse-Moderation-Tool/blob/master/README.md for details on how to do this.") - username = input("\nPlease enter the username you want to promote to server admin: ") - username = parse_username(username) - elif len(preset_username) > 0: - username = parse_username(preset_username) - #passthrough = 0 - server_admin_result = "true" - - command_string = "curl -X PUT -H 'Content-Type: application/json' -d '{\"admin\": \"" + server_admin_result + "\"}' https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/users/@" + username + ":" + hardcoded_variables.base_url + "/admin?access_token=" + hardcoded_variables.access_token - print("\n" + command_string + "\n") - process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) - output = process.stdout - print(output) - return output - -# Example: -# $ curl -kX POST -H 'Content-Type: application/json' -d '{"admin": "true"}' https://matrix.perthchat.org/_synapse/admin/v2/users/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN - -def whois_account(preset_username): - if preset_username == '': - username = input("\nPlease enter the username you wish to whois: ") - elif preset_username != '': - username = preset_username - username = parse_username(username) - command_string = "curl -kXGET https://" + hardcoded_variables.homeserver_url + "/_matrix/client/r0/admin/whois/@" + username + ":" + hardcoded_variables.base_url + "?access_token=" + hardcoded_variables.access_token - #print("\n" + command_string + "\n") - process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) - output = process.stdout - print(output + "\n") - return(output) - -# Example: -# $ curl -kXGET https://matrix.perthchat.org/_matrix/client/r0/admin/whois/@PC-Admin:perthchat.org?access_token=ACCESS_TOKEN - -def list_joined_rooms(preset_username): - if preset_username == '': - username = input("\nPlease enter the username you wish to query: ") - elif preset_username != '': - username = preset_username - username = parse_username(username) - command_string = "curl -kXGET https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/users/@" + username + ":" + hardcoded_variables.base_url + "/joined_rooms?access_token=" + hardcoded_variables.access_token - #print("\n" + command_string + "\n") - process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) - output = process.stdout - print(output + "\n") - -# Example: -# $ curl -kXGET https://matrix.perthchat.org/_synapse/admin/v1/users/@PC-Admin:perthchat.org/joined_rooms?access_token=ACCESS_TOKEN - -def whois_multiple_accounts(): - print("Whois multiple user accounts selected") - user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ") +def deactivate_multiple_accounts(): + print("Deactivate multiple user accounts selected") + user_list_location = input("\nPlease enter the path of the file containing a csv list of names: ") with open(user_list_location, newline='') as f: - reader = csv.reader(f) - data = list(reader) - print(len(data)) - whois_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to whois all of these users? y/n?\n") - if whois_confirmation == "y" or whois_confirmation == "Y" or whois_confirmation == "yes" or whois_confirmation == "Yes": + reader = csv.reader(f) + data = list(reader) + delete_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to deactivate these users? y/n?\n") + #print(len(data[0])) + #print(data[0][0]) + if delete_confirmation == "y" or delete_confirmation == "Y" or delete_confirmation == "yes" or delete_confirmation == "Yes": x = 0 while x <= (len(data) - 1): - print(data[x][0]) - query_account(data[x][0]) - list_joined_rooms(data[x][0]) + #print(data[0][x]) + deactivate_account(data[x][0]) x += 1 #print(x) - time.sleep(1) - if whois_confirmation == "n" or whois_confirmation == "N" or whois_confirmation == "no" or whois_confirmation == "No": + time.sleep(10) + if delete_confirmation == "n" or delete_confirmation == "N" or delete_confirmation == "no" or delete_confirmation == "No": print("\nExiting...\n") -def list_accounts(): - deactivated_choice = input("Do you want to include deactivated accounts y/n? ") - guest_choice = input("Do you want to include guest accounts y/n? ") - - if deactivated_choice == "y" or deactivated_choice == "Y" or deactivated_choice == "yes" or deactivated_choice == "Yes": - deactivated_string = "deactivated=true" - elif deactivated_choice == "n" or deactivated_choice == "N" or deactivated_choice == "no" or deactivated_choice == "No": - deactivated_string = "deactivated=false" - else: - print("Input invalid! Defaulting to false.") - deactivated_string = "deactivated=false" - - if guest_choice == "y" or guest_choice == "Y" or guest_choice == "yes" or guest_choice == "Yes": - guest_string = "guest=true" - elif guest_choice == "n" or guest_choice == "N" or guest_choice == "no" or guest_choice == "No": - guest_string = "guest=false" - else: - print("Input invalid! Defaulting to false.") - guest_string = "guest=false" - - command_string = "curl -kXGET \"https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v2/users?from=0&limit=1000000&" + guest_string + "&" + deactivated_string + "&access_token=" + hardcoded_variables.access_token + "\"" - print("\n" + command_string + "\n") - process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) - output = process.stdout - number_of_users = output.count("name") - # - print("\nTotal amount of users: " + str(number_of_users)) - if number_of_users < 100: - print(output) - elif number_of_users >= 100: - accounts_output_file = input("\nThere are too many users to list here, please specify a filename to print this data too: ") - f = open(accounts_output_file, "w") - f.write(output) - f.close() - -# Example: -# $ curl -kXGET "https://matrix.perthchat.org/_synapse/admin/v2/users?from=0&limit=10&guests=false&access_token=ACCESS_TOKEN" - -def query_account(preset_username): - if len(preset_username) == 0: - username = input("\nPlease enter the username to query: ") - username = parse_username(username) - elif len(preset_username) > 0: - username = parse_username(preset_username) - command_string = "curl -kX GET https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v2/users/@" + username + ":" + hardcoded_variables.base_url + "?access_token=" + hardcoded_variables.access_token - print("\n" + command_string + "\n") - process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) - output = process.stdout - print(output) - return output - -# Example: -# $ curl -kX GET https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN - -def create_account(preset_username,preset_password): +def create_account(preset_username, preset_password): if len(preset_username) == 0 and len(preset_password) == 0: username = input("\nPlease enter the username to create: ") username = parse_username(username) @@ -179,12 +72,30 @@ def create_account(preset_username,preset_password): user_password = preset_password else: print("\nError with user/pass file data, skipping...\n") - command_string = "curl -kX PUT -H 'Content-Type: application/json' -d '{\"password\": \"" + user_password + "\"}' https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v2/users/@" + username + ":" + hardcoded_variables.base_url + "?access_token=" + hardcoded_variables.access_token - print("\n" + command_string + "\n") - process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) - output = process.stdout - print(output) - return output + return + + url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users/@{username}:{hardcoded_variables.base_url}" + url += f"?access_token={hardcoded_variables.access_token}" + + headers = { + "Content-Type": "application/json" + } + + data = { + "password": user_password + } + + print("\n" + url + "\n") + response = requests.put(url, headers=headers, data=json.dumps(data), verify=True) + + if response.status_code == 201: + print("Successfully created account.") + elif response.status_code == 200: + print("Account already exists.") + else: + print(f"Error creating account: {response.status_code}, {response.text}") + + return response.text # Example: # $ curl -kX PUT -H 'Content-Type: application/json' -d '{"password": "user_password","admin": false,"deactivated": false}' https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN @@ -208,34 +119,199 @@ def create_multiple_accounts(): if create_confirmation == "n" or create_confirmation == "N" or create_confirmation == "no" or create_confirmation == "No": print("\nExiting...\n") -def deactivate_multiple_accounts(): - print("Deactivate multiple user accounts selected") - user_list_location = input("\nPlease enter the path of the file containing a csv list of names: ") +def reset_password(preset_username, preset_password): + if len(preset_username) == 0 and len(preset_password) == 0: + username = input("\nPlease enter the username for the password reset: ") + password = input("Please enter the password to set: ") + else: + username = parse_username(preset_username) + password = preset_password + + username = parse_username(username) + + url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/reset_password/@" + username + f":{hardcoded_variables.base_url}?access_token={hardcoded_variables.access_token}" + + headers = {'Content-Type': 'application/json'} + data = {'new_password': password, 'logout_devices': True} + + print("\n" + url + "\n") + + response = requests.post(url, headers=headers, data=json.dumps(data), verify=True) + + if response.status_code == 200: + print(response.text) + else: + print(f"Error resetting password: {response.status_code}, {response.text}") + + return response.text + +# Example: +# $ curl -X POST -H 'Content-Type: application/json' -d '{"new_password": "dogpoo", "logout_devices": true}' https://matrix.perthchat.org/_synapse/admin/v1/reset_password/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN + +def set_user_server_admin(preset_username): + if len(preset_username) == 0: + print("\nBe aware that you need to set at least 1 user to server admin already by editing the database in order to use this command. See https://github.com/PC-Admin/PC-Admins-Synapse-Moderation-Tool/blob/master/README.md for details on how to do this.") + username = input("\nPlease enter the username you want to promote to server admin: ") + username = parse_username(username) + elif len(preset_username) > 0: + username = parse_username(preset_username) + + # tried setting 'admin: false' here but it failed and promoted the user instead! + server_admin_result = "true" + + url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/admin" + url += f"?access_token={hardcoded_variables.access_token}" + + headers = { + "Content-Type": "application/json" + } + + data = { + "admin": server_admin_result + } + + print("\n" + url + "\n") + response = requests.put(url, headers=headers, data=json.dumps(data), verify=True) + + if response.status_code == 200: + print("Successfully set user as server admin.") + else: + print(f"Error setting user as server admin: {response.status_code}, {response.text}") + + return response.text + +# Example: +# $ curl -kX POST -H 'Content-Type: application/json' -d '{"admin": "true"}' https://matrix.perthchat.org/_synapse/admin/v2/users/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN + +def whois_account(preset_username): + if preset_username == '': + username = input("\nPlease enter the username you wish to whois: ") + elif preset_username != '': + username = preset_username + username = parse_username(username) + + url = f"https://{hardcoded_variables.homeserver_url}/_matrix/client/r0/admin/whois/@{username}:{hardcoded_variables.base_url}" + url += f"?access_token={hardcoded_variables.access_token}" + + print("\n" + url + "\n") + response = requests.get(url, verify=True) + + if response.status_code == 200: + print(response.text + "\n") + else: + print(f"Error retrieving account info: {response.status_code}, {response.text}\n") + + return response.text + +# Example: +# $ curl -kXGET https://matrix.perthchat.org/_matrix/client/r0/admin/whois/@PC-Admin:perthchat.org?access_token=ACCESS_TOKEN + +def whois_multiple_accounts(): + print("Whois multiple user accounts selected") + user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ") with open(user_list_location, newline='') as f: - reader = csv.reader(f) - data = list(reader) - delete_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to deactivate these users? y/n?\n") - #print(len(data[0])) - #print(data[0][0]) - if delete_confirmation == "y" or delete_confirmation == "Y" or delete_confirmation == "yes" or delete_confirmation == "Yes": + reader = csv.reader(f) + data = list(reader) + print(len(data)) + whois_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to whois all of these users? y/n?\n") + if whois_confirmation == "y" or whois_confirmation == "Y" or whois_confirmation == "yes" or whois_confirmation == "Yes": x = 0 while x <= (len(data) - 1): - #print(data[0][x]) - deactivate_account(data[x][0]) + print(data[x][0]) + query_account(data[x][0]) + list_joined_rooms(data[x][0]) x += 1 #print(x) - time.sleep(10) - if delete_confirmation == "n" or delete_confirmation == "N" or delete_confirmation == "no" or delete_confirmation == "No": + time.sleep(1) + if whois_confirmation == "n" or whois_confirmation == "N" or whois_confirmation == "no" or whois_confirmation == "No": print("\nExiting...\n") +def list_joined_rooms(preset_username): + if preset_username == '': + username = input("\nPlease enter the username you wish to query: ") + elif preset_username != '': + username = preset_username + + username = parse_username(username) + + url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@" + username + f":{hardcoded_variables.base_url}/joined_rooms?access_token={hardcoded_variables.access_token}" + + response = requests.get(url, verify=True) + + if response.status_code == 200: + print(response.text + "\n") + else: + print(f"Error querying joined rooms: {response.status_code}, {response.text}") + +# Example: +# $ curl -kXGET https://matrix.perthchat.org/_synapse/admin/v1/users/@PC-Admin:perthchat.org/joined_rooms?access_token=ACCESS_TOKEN + +def list_accounts(): + deactivated_choice = input("Do you want to include deactivated accounts y/n? ") + guest_choice = input("Do you want to include guest accounts y/n? ") + + deactivated_string = "deactivated=false" if deactivated_choice.lower() not in ["y", "yes"] else "deactivated=true" + guest_string = "guest=false" if guest_choice.lower() not in ["y", "yes"] else "guest=true" + + url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users?from=0&limit=1000000&{guest_string}&{deactivated_string}&access_token={hardcoded_variables.access_token}" + + print("\n" + url + "\n") + response = requests.get(url, verify=True) + + if response.status_code == 200: + users = response.json() + number_of_users = len(users) + print("\nTotal amount of users: " + str(number_of_users)) + + if number_of_users < 100: + print(users) + else: + accounts_output_file = input("\nThere are too many users to list here, please specify a filename to print this data too: ") + with open(accounts_output_file, "w") as f: + json.dump(users, f, indent=4) + else: + print(f"Error retrieving users list: {response.status_code}, {response.text}") + +# Example: +# $ curl -kXGET "https://matrix.perthchat.org/_synapse/admin/v2/users?from=0&limit=10&guests=false&access_token=ACCESS_TOKEN" + +# NEED A MENU OPTION FOR THIS: + +def query_account(preset_username): + if len(preset_username) == 0: + username = input("\nPlease enter the username to query: ") + username = parse_username(username) + elif len(preset_username) > 0: + username = parse_username(preset_username) + + url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users/@{username}:{hardcoded_variables.base_url}?access_token={hardcoded_variables.access_token}" + + print("\n" + url + "\n") + response = requests.get(url, verify=True) + + if response.status_code == 200: + print(response.text) + else: + print(f"Error querying account: {response.status_code}, {response.text}") + + return response.text + +# Example: +# $ curl -kX GET https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN + def quarantine_users_media(): username = input("\nPlease enter the username of the user who's media you want to quarantine: ") username = parse_username(username) - command_string = "curl -X POST \'https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/user/@" + username + ":" + hardcoded_variables.base_url + "/media/quarantine?access_token=" + hardcoded_variables.access_token + "\'" - print("\n" + command_string + "\n") - process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) - output = process.stdout - print(output) + + url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/user/@{username}:{hardcoded_variables.base_url}/media/quarantine?access_token={hardcoded_variables.access_token}" + + print("\n" + url + "\n") + response = requests.post(url, verify=True) + + if response.status_code == 200: + print(response.text) + else: + print(f"Error quarantining user's media: {response.status_code}, {response.text}") # Example: # $ curl -X POST https://matrix.perthchat.org/_synapse/admin/v1/user/@PC-Admin:perthchat.org/media/quarantine?access_token=ACCESS_TOKEN \ No newline at end of file