import os import requests import json import time import csv import hardcoded_variables import socket def parse_username(username): tail_end = ':' + hardcoded_variables.base_url username = username.replace('@','') username = username.replace(tail_end,'') return username def deactivate_account(preset_username, erase=False): if preset_username == '': username = input("\nPlease enter the username to deactivate: ") username = parse_username(username) else: username = parse_username(preset_username) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/deactivate/@{username}:{hardcoded_variables.base_url}" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {hardcoded_variables.access_token}" } data = { "erase": erase } print("\n" + url + "\n") response = requests.post(url, headers=headers, data=json.dumps(data), verify=True) if response.status_code in [200,201]: print("Successfully deactivated account.") else: print(f"Error deactivating account: {response.status_code}, {response.text}") return response.text # Example: # $ curl -X POST -H "Authorization: Bearer ACCESS_TOKEN" "https://matrix.perthchat.org/_synapse/admin/v1/deactivate/@billybob:perthchat.org" --data '{"erase": true}' def deactivate_multiple_accounts(): print("Deactivate multiple user accounts selected") user_list_location = input("\nPlease enter the path of the file containing a csv list of names: ") with open(user_list_location, newline='') as f: reader = csv.reader(f) data = list(reader) print(data) delete_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to deactivate these users? y/n?\n") erase_confirmation = input("\nDo you want to also erase all these users' data? y/n?\n") if erase_confirmation in ["y", "Y", "yes", "Yes", "YES"]: erase = True elif erase_confirmation in ["n", "N", "no", "No", "NO"]: erase = False else: print("\nIncorrect input detected, please select 'y' or 'n'!\n") return #print(len(data[0])) #print(data[0][0]) if delete_confirmation ["y", "Y", "yes", "Yes", "YES"]: x = 0 while x <= (len(data) - 1): #print(data[0][x]) status = deactivate_account(data[x][0], erase) #print(status) x += 1 #print(x) time.sleep(5) elif delete_confirmation in ["n", "N", "no", "No", "NO"]: print("\nExiting...\n") else: print("\nIncorrect input detected, please select 'y' or 'n'!\n") def create_account(preset_username, preset_password): if len(preset_username) == 0 and len(preset_password) == 0: username = input("\nPlease enter the username to create: ") username = parse_username(username) user_password = input("Please enter the password for this account: ") elif len(preset_username) > 0 and len(preset_password) > 0: username = parse_username(preset_username) user_password = preset_password else: print("\nError with user/pass file data, skipping...\n") return url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users/@{username}:{hardcoded_variables.base_url}" url += f"?access_token={hardcoded_variables.access_token}" headers = { "Content-Type": "application/json" } data = { "password": user_password } #print("\n" + url + "\n") response = requests.put(url, headers=headers, data=json.dumps(data), verify=True) if response.status_code == 201: print("Successfully created account.") elif response.status_code == 200: print("Account already exists.") else: print(f"Error creating account: {response.status_code}, {response.text}") create_account_dict = json.loads(response.text) return create_account_dict # Example: # $ curl -kX PUT -H 'Content-Type: application/json' -d '{"password": "user_password","admin": false,"deactivated": false}' https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN def create_multiple_accounts(): print("Create multiple user accounts selected") user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ") with open(user_list_location, newline='') as f: reader = csv.reader(f) data = list(reader) print(len(data)) create_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to create these users? y/n?\n") if create_confirmation == "y" or create_confirmation == "Y" or create_confirmation == "yes" or create_confirmation == "Yes": x = 0 while x <= (len(data) - 1): print(data[x][0]) create_account(data[x][0],data[x][1]) x += 1 #print(x) time.sleep(10) if create_confirmation == "n" or create_confirmation == "N" or create_confirmation == "no" or create_confirmation == "No": print("\nExiting...\n") def reset_password(preset_username, preset_password): if len(preset_username) == 0 and len(preset_password) == 0: username = input("\nPlease enter the username for the password reset: ") password = input("Please enter the password to set: ") else: username = parse_username(preset_username) password = preset_password username = parse_username(username) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/reset_password/@" + username + f":{hardcoded_variables.base_url}?access_token={hardcoded_variables.access_token}" headers = {'Content-Type': 'application/json'} data = {'new_password': password, 'logout_devices': True} #print("\n" + url + "\n") response = requests.post(url, headers=headers, data=json.dumps(data), verify=True) if response.status_code != 200: print(f"Error resetting password: {response.status_code}, {response.text}") reset_password_dict = json.loads(response.text) return reset_password_dict # Example: # $ curl -X POST -H 'Content-Type: application/json' -d '{"new_password": "dogpoo", "logout_devices": true}' https://matrix.perthchat.org/_synapse/admin/v1/reset_password/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN def set_user_server_admin(preset_username): if len(preset_username) == 0: print("\nBe aware that you need to set at least 1 user to server admin already by editing the database in order to use this command. See https://github.com/PC-Admin/PC-Admins-Synapse-Moderation-Tool/blob/master/README.md for details on how to do this.") username = input("\nPlease enter the username you want to promote to server admin: ") username = parse_username(username) elif len(preset_username) > 0: username = parse_username(preset_username) # tried setting 'admin: false' here but it failed and promoted the user instead! server_admin_result = "true" url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/admin" url += f"?access_token={hardcoded_variables.access_token}" headers = { "Content-Type": "application/json" } data = { "admin": server_admin_result } #print("\n" + url + "\n") response = requests.put(url, headers=headers, data=json.dumps(data), verify=True) if response.status_code != 200: print(f"Error setting user as server admin: {response.status_code}, {response.text}") set_user_server_admin_dict = json.loads(response.text) return set_user_server_admin_dict # Example: # $ curl -kX POST -H 'Content-Type: application/json' -d '{"admin": "true"}' https://matrix.perthchat.org/_synapse/admin/v2/users/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN def whois_account(preset_username): if preset_username == '': username = input("\nPlease enter the username you wish to whois: ") elif preset_username != '': username = preset_username username = parse_username(username) url = f"https://{hardcoded_variables.homeserver_url}/_matrix/client/r0/admin/whois/@{username}:{hardcoded_variables.base_url}" url += f"?access_token={hardcoded_variables.access_token}" #print("\n" + url + "\n") response = requests.get(url, verify=True) if response.status_code != 200: print(f"Error retrieving account info: {response.status_code}, {response.text}\n") whois_account_dict = json.loads(response.text) return whois_account_dict # Example: # $ curl -kXGET https://matrix.perthchat.org/_matrix/client/r0/admin/whois/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN def whois_multiple_accounts(): print("Whois multiple user accounts selected") user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ") with open(user_list_location, newline='') as f: reader = csv.reader(f) data = list(reader) print(len(data)) print("\n" + str(data)) output_file = None if len(data) > 10: file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n") if file_confirmation.lower() in ("y", "yes"): output_file = input("\nPlease enter the desired output file path:\n") whois_confirmation = input("\n\nAre you sure you want to whois all of these users? y/n?\n") if whois_confirmation.lower() in ("y", "yes"): x = 0 while x <= (len(data) - 1): output = whois_account(data[x][0]) # if output file is specified, append to file if output_file: with open(output_file, 'a') as f: f.write(output + "\n") x += 1 time.sleep(1) if whois_confirmation.lower() in ("n", "no"): print("\nExiting...\n") if output_file and os.path.isfile(output_file): print(f"Output saved to {output_file}") def list_joined_rooms(preset_username): if preset_username == '': username = input("\nPlease enter the username you wish to query: ") elif preset_username != '': username = preset_username username = parse_username(username) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@" + username + f":{hardcoded_variables.base_url}/joined_rooms?access_token={hardcoded_variables.access_token}" response = requests.get(url, verify=True) if response.status_code != 200: print(f"Error querying joined rooms: {response.status_code}, {response.text}") joined_rooms_dict = json.loads(response.text) return joined_rooms_dict # Example: # $ curl -kXGET https://matrix.perthchat.org/_synapse/admin/v1/users/@dogpoo:perthchat.org/joined_rooms?access_token=ACCESS_TOKEN def list_accounts(): deactivated_choice = input("Do you want to include deactivated accounts y/n? ") guest_choice = input("Do you want to include guest accounts y/n? ") deactivated_string = "deactivated=false" if deactivated_choice.lower() not in ["y", "yes"] else "deactivated=true" guest_string = "guest=false" if guest_choice.lower() not in ["y", "yes"] else "guest=true" url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users?from=0&limit=1000000&{guest_string}&{deactivated_string}&access_token={hardcoded_variables.access_token}" print("\n" + url + "\n") response = requests.get(url, verify=True) if response.status_code == 200: users = response.json() number_of_users = len(users) print("\nTotal amount of users: " + str(number_of_users)) if number_of_users < 100: print(users) else: accounts_output_file = input("\nThere are too many users to list here, please specify a filename to print this data too: ") with open(accounts_output_file, "w") as f: json.dump(users, f, indent=4) else: print(f"Error retrieving users list: {response.status_code}, {response.text}") # Example: # $ curl -kXGET "https://matrix.perthchat.org/_synapse/admin/v2/users?from=0&limit=10&guests=false&access_token=ACCESS_TOKEN" # NEED A MENU OPTION FOR THIS: def query_account(preset_username): if len(preset_username) == 0: username = input("\nPlease enter the username to query: ") username = parse_username(username) elif len(preset_username) > 0: username = parse_username(preset_username) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users/@{username}:{hardcoded_variables.base_url}?access_token={hardcoded_variables.access_token}" #print("\n" + url + "\n") response = requests.get(url, verify=True) if response.status_code != 200: print(f"Error querying account: {response.status_code}, {response.text}") query_account_dict = json.loads(response.text) return query_account_dict # Example: # $ curl -kX GET https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN def query_multiple_accounts(): print("Query multiple user accounts selected") user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ") with open(user_list_location, newline='') as f: reader = csv.reader(f) data = list(reader) print(len(data)) print("\n" + str(data)) output_file = None if len(data) > 10: file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n") if file_confirmation.lower() in ("y", "yes"): output_file = input("\nPlease enter the desired output file path:\n") query_confirmation = input("\n\nAre you sure you want to query all of these users? y/n?\n") if query_confirmation.lower() in ("y", "yes"): x = 0 while x <= (len(data) - 1): output = query_account(data[x][0]) # if output file is specified, append to file if output_file: with open(output_file, 'a') as f: f.write(output + "\n") x += 1 time.sleep(1) if query_confirmation.lower() in ("n", "no"): print("\nExiting...\n") if output_file and os.path.isfile(output_file): print(f"Output saved to {output_file}") def quarantine_users_media(): username = input("\nPlease enter the username of the user who's media you want to quarantine: ") username = parse_username(username) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/user/@{username}:{hardcoded_variables.base_url}/media/quarantine?access_token={hardcoded_variables.access_token}" print("\n" + url + "\n") response = requests.post(url, verify=True) if response.status_code == 200: print(response.text) else: print(f"Error quarantining user's media: {response.status_code}, {response.text}") # Example: # $ curl -X POST https://matrix.perthchat.org/_synapse/admin/v1/user/@dogpoo:perthchat.org/media/quarantine?access_token=ACCESS_TOKEN def collect_account_data(preset_username): if len(preset_username) == 0: username = input("\nPlease enter the username to collect account data: ") username = parse_username(username) elif len(preset_username) > 0: username = parse_username(preset_username) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/accountdata?access_token={hardcoded_variables.access_token}" #print("\n" + url + "\n") response = requests.get(url, verify=True) if response.status_code != 200: print(f"Error querying account: {response.status_code}, {response.text}") account_data_dict = json.loads(response.text) return account_data_dict # Example: # $ curl -X GET https://matrix.perthchat.org/_synapse/admin/v1/users/@dogpoo:perthchat.org/accountdata?access_token=ACCESS_TOKEN def list_account_pushers(preset_username): if len(preset_username) == 0: username = input("\nPlease enter the username to list all pushers: ") username = parse_username(username) elif len(preset_username) > 0: username = parse_username(preset_username) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/pushers?access_token={hardcoded_variables.access_token}" #print("\n" + url + "\n") response = requests.get(url, verify=True) if response.status_code != 200: print(f"Error querying account: {response.status_code}, {response.text}") pusher_data_dict = json.loads(response.text) return pusher_data_dict # Example: # $ curl -X GET https://matrix.perthchat.org/_synapse/admin/v1/users/@dogpoo:perthchat.org/pushers def get_rate_limit(): username = input("\nPlease enter the username to get its ratelimiting: ") username = parse_username(username) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/override_ratelimit?access_token={hardcoded_variables.access_token}" #print("\n" + url + "\n") response = requests.get(url, verify=True) if response.status_code == 200: print(response.text) else: print(f"Error querying account: {response.status_code}, {response.text}") return response.text # Example: # $ curl -X GET https://matrix.perthchat.org/_synapse/admin/v1/users/@dogpoo:perthchat.org/override_ratelimit?access_token=ACCESS_TOKEN def set_rate_limit(): username = input("\nPlease enter the username to adjust its ratelimiting: ") username = parse_username(username) messages_per_second = input("\nPlease enter the desired messages per second: ") burst_count = input("\nPlease enter the desired burst count: ") url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/override_ratelimit?access_token={hardcoded_variables.access_token}" headers = {'Content-Type': 'application/json'} data = { "messages_per_second": int(messages_per_second), "burst_count": int(burst_count) } #print("\n" + url + "\n") response = requests.post(url, headers=headers, data=json.dumps(data), verify=True) if response.status_code == 200: print(response.text) else: print(f"Error querying account: {response.status_code}, {response.text}") return response.text # Example: # $ curl -X POST -H "Content-Type: application/json" -d '{"messages_per_second": 0,"burst_count": 0}' https://matrix.perthchat.org/_synapse/admin/v1/users/@dogpoo:perthchat.org/override_ratelimit?access_token=ACCESS_TOKEN def delete_rate_limit(): username = input("\nPlease enter the username to delete its ratelimiting: ") username = parse_username(username) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/override_ratelimit?access_token={hardcoded_variables.access_token}" #print("\n" + url + "\n") response = requests.delete(url, verify=True) if response.status_code == 200: print(response.text) else: print(f"Error querying account: {response.status_code}, {response.text}") return response.text # Example: # $ curl -X DELETE https://matrix.perthchat.org/_synapse/admin/v1/users/@dogpoo:perthchat.org/override_ratelimit?access_token=ACCESS_TOKEN def check_user_account_exists(preset_username): if len(preset_username) == 0: username = input("\nPlease enter the username to check if it exists: ") username = parse_username(username) elif len(preset_username) > 0: username = parse_username(preset_username) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/username_available?username={username}&access_token={hardcoded_variables.access_token}" #print("\n" + url + "\n") response = requests.get(url, verify=True) if response.status_code == 200: #print("User ID is available.") return False elif response.status_code == 400: #print(f"User ID already exists.") return True else: print(f"Error querying account: {response.status_code}, {response.text}") # Example: # $ curl -X GET /_synapse/admin/v1/username_available?username=dogpoo&access_token=ACCESS_TOKEN def shadow_ban_account(preset_username): if preset_username == '': username = input("\nPlease enter the username you wish to shadow ban: ") elif preset_username != '': username = preset_username username = parse_username(username) url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/shadow_ban" url += f"?access_token={hardcoded_variables.access_token}" response = requests.post(url, verify=True) if response.status_code != 200: print(f"Error shadow banning account: {response.status_code}, {response.text}\n") return json.loads(response.text) # Example: # curl -XPOST -H "Content-Type: application/json" 'https://matrix.perthchat.org/_synapse/admin/v1/users/@dogpoo:perthchat.org/shadow_ban?access_token=ACCESS_TOKEN' def find_account_with_threepid(medium="", address=""): # prompt user to enter values if they're not provided if medium == "": print("\nPlease enter the medium (either 'email' or 'msisdn' for mobile number): ") medium = input() if address == "": print("\nPlease enter the address (the email or mobile number): ") address = input() url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/threepid/{medium}/users/{address}?access_token={hardcoded_variables.access_token}" response = requests.get(url, verify=True) if response.status_code == 200: # User exists return response.json() elif response.status_code == 404: # User not found return {"errcode":"M_NOT_FOUND", "error":"User not found"} else: print(f"Error querying account: {response.status_code}, {response.text}") # Example: # $ curl -X GET 'https://matrix.perthchat.org/_synapse/admin/v1/threepid/email/users/dogpoo@protonmail.com?access_token=ACCESS_TOKEN'