mirror of
				https://github.com/PC-Admin/matrix-moderation-tool.git
				synced 2025-10-31 10:20:29 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			448 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			448 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
 | |
| import os
 | |
| import requests
 | |
| import json
 | |
| import time
 | |
| import csv
 | |
| import hardcoded_variables
 | |
| import socket
 | |
| 
 | |
| def parse_username(username):
 | |
| 	tail_end = ':' + hardcoded_variables.base_url
 | |
| 	username = username.replace('@','')
 | |
| 	username = username.replace(tail_end,'')
 | |
| 	return username
 | |
| 
 | |
| def deactivate_account(preset_username):
 | |
| 	if len(preset_username) == 0:
 | |
| 		username = input("\nPlease enter the username to deactivate: ")
 | |
| 		username = parse_username(username)
 | |
| 	else:
 | |
| 		username = parse_username(preset_username)
 | |
| 
 | |
| 	url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/deactivate/@{username}:{hardcoded_variables.base_url}"
 | |
| 
 | |
| 	headers = {
 | |
| 		"Content-Type": "application/json",
 | |
| 		"Authorization": f"Bearer {hardcoded_variables.access_token}"
 | |
| 	}
 | |
| 
 | |
| 	data = {
 | |
| 		"erase": True
 | |
| 	}
 | |
| 
 | |
| 	print("\n" + url + "\n")
 | |
| 	response = requests.post(url, headers=headers, data=json.dumps(data), verify=True)
 | |
| 
 | |
| 	if response.status_code in [200,201]:
 | |
| 		print("Successfully deactivated account.")
 | |
| 	else:
 | |
| 		print(f"Error deactivating account: {response.status_code}, {response.text}")
 | |
| 
 | |
| 	return response.text
 | |
| 
 | |
| # Example:
 | |
| # $ curl -X POST -H "Authorization: Bearer ACCESS_TOKEN" "https://matrix.perthchat.org/_synapse/admin/v1/deactivate/@billybob:perthchat.org" --data '{"erase": true}'
 | |
| 
 | |
| def deactivate_multiple_accounts():
 | |
| 	print("Deactivate multiple user accounts selected")
 | |
| 	user_list_location = input("\nPlease enter the path of the file containing a csv list of names: ")
 | |
| 	with open(user_list_location, newline='') as f:
 | |
| 			reader = csv.reader(f)
 | |
| 			data = list(reader)
 | |
| 	delete_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to deactivate these users? y/n?\n")
 | |
| 	#print(len(data[0]))
 | |
| 	#print(data[0][0])
 | |
| 	if delete_confirmation == "y" or delete_confirmation == "Y" or  delete_confirmation == "yes" or  delete_confirmation == "Yes":  
 | |
| 		x = 0
 | |
| 		while x <= (len(data) - 1):
 | |
| 			#print(data[0][x])
 | |
| 			deactivate_account(data[x][0])
 | |
| 			x += 1
 | |
| 			#print(x)
 | |
| 			time.sleep(10)
 | |
| 	if delete_confirmation == "n" or delete_confirmation == "N" or  delete_confirmation == "no" or  delete_confirmation == "No":
 | |
| 		print("\nExiting...\n")
 | |
| 
 | |
| def create_account(preset_username, preset_password):
 | |
| 	if len(preset_username) == 0 and len(preset_password) == 0:
 | |
| 		username = input("\nPlease enter the username to create: ")
 | |
| 		username = parse_username(username)
 | |
| 		user_password = input("Please enter the password for this account: ")
 | |
| 	elif len(preset_username) > 0 and len(preset_password) > 0:
 | |
| 		username = parse_username(preset_username)
 | |
| 		user_password = preset_password
 | |
| 	else:
 | |
| 		print("\nError with user/pass file data, skipping...\n")
 | |
| 		return
 | |
| 
 | |
| 	url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users/@{username}:{hardcoded_variables.base_url}"
 | |
| 	url += f"?access_token={hardcoded_variables.access_token}"
 | |
| 
 | |
| 	headers = {
 | |
| 		"Content-Type": "application/json"
 | |
| 	}
 | |
| 
 | |
| 	data = {
 | |
| 		"password": user_password
 | |
| 	}
 | |
| 
 | |
| 	print("\n" + url + "\n")
 | |
| 	response = requests.put(url, headers=headers, data=json.dumps(data), verify=True)
 | |
| 
 | |
| 	if response.status_code == 201:
 | |
| 		print("Successfully created account.")
 | |
| 	elif response.status_code == 200:
 | |
| 		print("Account already exists.")
 | |
| 	else:
 | |
| 		print(f"Error creating account: {response.status_code}, {response.text}")
 | |
| 
 | |
| 	return response.text
 | |
| 
 | |
| # Example:
 | |
| # $ curl -kX PUT -H 'Content-Type: application/json' -d '{"password": "user_password","admin": false,"deactivated": false}' https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN
 | |
| 
 | |
| def create_multiple_accounts():
 | |
| 	print("Create multiple user accounts selected")
 | |
| 	user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
 | |
| 	with open(user_list_location, newline='') as f:
 | |
| 		reader = csv.reader(f)
 | |
| 		data = list(reader)
 | |
| 		print(len(data))
 | |
| 	create_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to create these users? y/n?\n")
 | |
| 	if create_confirmation == "y" or create_confirmation == "Y" or  create_confirmation == "yes" or  create_confirmation == "Yes":  
 | |
| 		x = 0
 | |
| 		while x <= (len(data) - 1):
 | |
| 			print(data[x][0])
 | |
| 			create_account(data[x][0],data[x][1])
 | |
| 			x += 1
 | |
| 			#print(x)
 | |
| 			time.sleep(10)
 | |
| 	if create_confirmation == "n" or create_confirmation == "N" or  create_confirmation == "no" or  create_confirmation == "No":
 | |
| 		print("\nExiting...\n")
 | |
| 
 | |
| def reset_password(preset_username, preset_password):
 | |
| 	if len(preset_username) == 0 and len(preset_password) == 0:
 | |
| 		username = input("\nPlease enter the username for the password reset: ")
 | |
| 		password = input("Please enter the password to set: ")
 | |
| 	else:
 | |
| 		username = parse_username(preset_username)
 | |
| 		password = preset_password
 | |
| 
 | |
| 	username = parse_username(username)
 | |
| 
 | |
| 	url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/reset_password/@" + username + f":{hardcoded_variables.base_url}?access_token={hardcoded_variables.access_token}"
 | |
| 
 | |
| 	headers = {'Content-Type': 'application/json'}
 | |
| 	data = {'new_password': password, 'logout_devices': True}
 | |
| 
 | |
| 	print("\n" + url + "\n")
 | |
| 
 | |
| 	response = requests.post(url, headers=headers, data=json.dumps(data), verify=True)
 | |
| 
 | |
| 	if response.status_code == 200:
 | |
| 		print(response.text)
 | |
| 	else:
 | |
| 		print(f"Error resetting password: {response.status_code}, {response.text}")
 | |
| 
 | |
| 	return response.text
 | |
| 
 | |
| # Example:
 | |
| # $ curl -X POST -H 'Content-Type: application/json' -d '{"new_password": "dogpoo", "logout_devices": true}' https://matrix.perthchat.org/_synapse/admin/v1/reset_password/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN
 | |
| 
 | |
| def set_user_server_admin(preset_username):
 | |
| 	if len(preset_username) == 0:
 | |
| 		print("\nBe aware that you need to set at least 1 user to server admin already by editing the database in order to use this command. See https://github.com/PC-Admin/PC-Admins-Synapse-Moderation-Tool/blob/master/README.md for details on how to do this.")
 | |
| 		username = input("\nPlease enter the username you want to promote to server admin: ")
 | |
| 		username = parse_username(username)
 | |
| 	elif len(preset_username) > 0:
 | |
| 		username = parse_username(preset_username)
 | |
| 
 | |
| 	# tried setting 'admin: false' here but it failed and promoted the user instead!
 | |
| 	server_admin_result = "true"
 | |
| 
 | |
| 	url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/admin"
 | |
| 	url += f"?access_token={hardcoded_variables.access_token}"
 | |
| 
 | |
| 	headers = {
 | |
| 		"Content-Type": "application/json"
 | |
| 	}
 | |
| 
 | |
| 	data = {
 | |
| 		"admin": server_admin_result
 | |
| 	}
 | |
| 
 | |
| 	print("\n" + url + "\n")
 | |
| 	response = requests.put(url, headers=headers, data=json.dumps(data), verify=True)
 | |
| 
 | |
| 	if response.status_code == 200:
 | |
| 		print("Successfully set user as server admin.")
 | |
| 	else:
 | |
| 		print(f"Error setting user as server admin: {response.status_code}, {response.text}")
 | |
| 
 | |
| 	return response.text
 | |
| 
 | |
| # Example:
 | |
| # $ curl -kX POST -H 'Content-Type: application/json' -d '{"admin": "true"}' https://matrix.perthchat.org/_synapse/admin/v2/users/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN
 | |
| 
 | |
| def whois_account(preset_username, output_file=None):
 | |
| 	if preset_username == '':
 | |
| 		username = input("\nPlease enter the username you wish to whois: ")
 | |
| 	elif preset_username != '':
 | |
| 		username = preset_username
 | |
| 	username = parse_username(username)
 | |
| 
 | |
| 	url = f"https://{hardcoded_variables.homeserver_url}/_matrix/client/r0/admin/whois/@{username}:{hardcoded_variables.base_url}"
 | |
| 	url += f"?access_token={hardcoded_variables.access_token}"
 | |
| 
 | |
| 	print("\n" + url + "\n")
 | |
| 	response = requests.get(url, verify=True)
 | |
| 
 | |
| 	output_text = ""
 | |
| 	if response.status_code == 200:
 | |
| 		output_text = response.text + "\n"
 | |
| 	else:
 | |
| 		output_text = f"Error retrieving account info: {response.status_code}, {response.text}\n"
 | |
| 
 | |
| 	if output_file:
 | |
| 		with open(output_file, 'a') as f:
 | |
| 			f.write(output_text)
 | |
| 	else:
 | |
| 		print(output_text)
 | |
| 
 | |
| 	return response.text
 | |
| 
 | |
| # Example:
 | |
| # $ curl -kXGET https://matrix.perthchat.org/_matrix/client/r0/admin/whois/@PC-Admin:perthchat.org?access_token=ACCESS_TOKEN
 | |
| 
 | |
| def whois_multiple_accounts():
 | |
| 	print("Whois multiple user accounts selected")
 | |
| 	user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
 | |
| 	with open(user_list_location, newline='') as f:
 | |
| 		reader = csv.reader(f)
 | |
| 		data = list(reader)
 | |
| 		print(len(data))
 | |
| 
 | |
| 	print("\n" + str(data))
 | |
| 
 | |
| 	output_file = None
 | |
| 	if len(data) > 10:
 | |
| 		file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n")
 | |
| 		if file_confirmation.lower() in ("y", "yes"):
 | |
| 			output_file = input("\nPlease enter the desired output file path:\n")
 | |
| 
 | |
| 	whois_confirmation = input("\n\nAre you sure you want to whois all of these users? y/n?\n")
 | |
| 
 | |
| 	if whois_confirmation.lower() in ("y", "yes"):  
 | |
| 		x = 0
 | |
| 		while x <= (len(data) - 1):
 | |
| 			output = whois_account(data[x][0])
 | |
| 
 | |
| 			# if output file is specified, append to file
 | |
| 			if output_file:
 | |
| 				with open(output_file, 'a') as f:
 | |
| 					f.write(output + "\n")
 | |
| 			x += 1
 | |
| 			time.sleep(1)
 | |
| 
 | |
| 	if whois_confirmation.lower() in ("n", "no"):
 | |
| 		print("\nExiting...\n")
 | |
| 
 | |
| 	if output_file and os.path.isfile(output_file):
 | |
| 		print(f"Output saved to {output_file}")
 | |
| 
 | |
| def is_valid_ipv4(ip):
 | |
| 	try:
 | |
| 		socket.inet_pton(socket.AF_INET, ip)
 | |
| 	except socket.error:  # not a valid address
 | |
| 		return False
 | |
| 	return True
 | |
| 
 | |
| def analyse_account_ip(preset_username):
 | |
| 	if not preset_username:
 | |
| 		preset_username = input("\nPlease enter a username to analyse their country of origin: ")
 | |
| 	user_info = whois_account(preset_username=preset_username)
 | |
| 
 | |
| 	data = json.loads(user_info)
 | |
| 	
 | |
| 	user_id = data['user_id']
 | |
| 	#print(f'user_id: {user_id}')
 | |
| 	device_data = data['devices']
 | |
| 	#print(f'device_data: {device_data}')
 | |
| 
 | |
| 	countries = []
 | |
| 	for device_id, device_info in device_data.items():
 | |
| 		for session in device_info['sessions']:
 | |
| 			for connection in session['connections']:
 | |
| 				ip = connection['ip']
 | |
| 				if is_valid_ipv4(ip):
 | |
| 					res = requests.get(f"https://ipinfo.io/{ip}", 
 | |
| 								   headers={"Authorization": f"Bearer {hardcoded_variables.ipinfo_token}"})
 | |
| 					if res.status_code == 200:
 | |
| 						country = res.json().get('country')
 | |
| 						countries.append(country)
 | |
| 
 | |
| 	print(f"User: {user_id} from Countries: {countries}")
 | |
| 	return(f"User: {user_id} from Countries: {countries}")
 | |
| 
 | |
| def analyse_multiple_account_ips():
 | |
| 	print("Analyse multiple user IPs selected")
 | |
| 	user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
 | |
| 	with open(user_list_location, newline='') as f:
 | |
| 		reader = csv.reader(f)
 | |
| 		data = list(reader)
 | |
| 		print(len(data))
 | |
| 
 | |
| 	print("\n" + str(data))
 | |
| 
 | |
| 	output_file = None
 | |
| 	if len(data) > 10:
 | |
| 		file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n")
 | |
| 		if file_confirmation.lower() in ("y", "yes"):
 | |
| 			output_file = input("\nPlease enter the desired output file path:\n")
 | |
| 
 | |
| 	analyse_confirmation = input("\n\nAre you sure you want to analyse the IP of all of these users? y/n?\n")
 | |
| 
 | |
| 	if analyse_confirmation.lower() in ("y", "yes"):  
 | |
| 		x = 0
 | |
| 		while x <= (len(data) - 1):
 | |
| 			output = analyse_account_ip(data[x][0])
 | |
| 
 | |
| 			# if output file is specified, append to file
 | |
| 			if output_file:
 | |
| 				with open(output_file, 'a') as f:
 | |
| 					f.write(output + "\n")
 | |
| 			x += 1
 | |
| 			time.sleep(1)
 | |
| 
 | |
| 	if analyse_confirmation.lower() in ("n", "no"):
 | |
| 		print("\nExiting...\n")
 | |
| 
 | |
| 	if output_file and os.path.isfile(output_file):
 | |
| 		print(f"Output saved to {output_file}")
 | |
| 
 | |
| def list_joined_rooms(preset_username):
 | |
| 	if preset_username == '':
 | |
| 		username = input("\nPlease enter the username you wish to query: ")
 | |
| 	elif preset_username != '':
 | |
| 		username = preset_username
 | |
| 
 | |
| 	username = parse_username(username)
 | |
| 
 | |
| 	url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@" + username + f":{hardcoded_variables.base_url}/joined_rooms?access_token={hardcoded_variables.access_token}"
 | |
| 
 | |
| 	response = requests.get(url, verify=True)
 | |
| 
 | |
| 	if response.status_code == 200:
 | |
| 		print(response.text + "\n")
 | |
| 	else:
 | |
| 		print(f"Error querying joined rooms: {response.status_code}, {response.text}")
 | |
| 
 | |
| # Example:
 | |
| # $ curl -kXGET https://matrix.perthchat.org/_synapse/admin/v1/users/@PC-Admin:perthchat.org/joined_rooms?access_token=ACCESS_TOKEN
 | |
| 
 | |
| def list_accounts():
 | |
| 	deactivated_choice = input("Do you want to include deactivated accounts y/n? ")
 | |
| 	guest_choice = input("Do you want to include guest accounts y/n? ")
 | |
| 
 | |
| 	deactivated_string = "deactivated=false" if deactivated_choice.lower() not in ["y", "yes"] else "deactivated=true"
 | |
| 	guest_string = "guest=false" if guest_choice.lower() not in ["y", "yes"] else "guest=true"
 | |
| 
 | |
| 	url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users?from=0&limit=1000000&{guest_string}&{deactivated_string}&access_token={hardcoded_variables.access_token}"
 | |
| 
 | |
| 	print("\n" + url + "\n")
 | |
| 	response = requests.get(url, verify=True)
 | |
| 
 | |
| 	if response.status_code == 200:
 | |
| 		users = response.json()
 | |
| 		number_of_users = len(users)
 | |
| 		print("\nTotal amount of users: " + str(number_of_users))
 | |
| 
 | |
| 		if number_of_users < 100:    
 | |
| 			print(users)
 | |
| 		else:
 | |
| 			accounts_output_file = input("\nThere are too many users to list here, please specify a filename to print this data too: ")
 | |
| 			with open(accounts_output_file, "w") as f:
 | |
| 				json.dump(users, f, indent=4)
 | |
| 	else:
 | |
| 		print(f"Error retrieving users list: {response.status_code}, {response.text}")
 | |
| 
 | |
| # Example:
 | |
| # $ curl -kXGET "https://matrix.perthchat.org/_synapse/admin/v2/users?from=0&limit=10&guests=false&access_token=ACCESS_TOKEN"
 | |
| 
 | |
| # NEED A MENU OPTION FOR THIS:
 | |
| 
 | |
| def query_account(preset_username):
 | |
| 	if len(preset_username) == 0:
 | |
| 		username = input("\nPlease enter the username to query: ")
 | |
| 		username = parse_username(username)
 | |
| 	elif len(preset_username) > 0:
 | |
| 		username = parse_username(preset_username)
 | |
| 
 | |
| 	url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users/@{username}:{hardcoded_variables.base_url}?access_token={hardcoded_variables.access_token}"
 | |
| 
 | |
| 	print("\n" + url + "\n")
 | |
| 	response = requests.get(url, verify=True)
 | |
| 
 | |
| 	if response.status_code == 200:
 | |
| 		print(response.text)
 | |
| 	else:
 | |
| 		print(f"Error querying account: {response.status_code}, {response.text}")
 | |
| 
 | |
| 	return response.text
 | |
| 
 | |
| # Example:
 | |
| # $ curl -kX GET https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN
 | |
| 
 | |
| def query_multiple_accounts():
 | |
| 	print("Query multiple user accounts selected")
 | |
| 	user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
 | |
| 	with open(user_list_location, newline='') as f:
 | |
| 		reader = csv.reader(f)
 | |
| 		data = list(reader)
 | |
| 		print(len(data))
 | |
| 
 | |
| 	print("\n" + str(data))
 | |
| 
 | |
| 	output_file = None
 | |
| 	if len(data) > 10:
 | |
| 		file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n")
 | |
| 		if file_confirmation.lower() in ("y", "yes"):
 | |
| 			output_file = input("\nPlease enter the desired output file path:\n")
 | |
| 
 | |
| 	query_confirmation = input("\n\nAre you sure you want to query all of these users? y/n?\n")
 | |
| 
 | |
| 	if query_confirmation.lower() in ("y", "yes"):  
 | |
| 		x = 0
 | |
| 		while x <= (len(data) - 1):
 | |
| 			output = query_account(data[x][0])
 | |
| 
 | |
| 			# if output file is specified, append to file
 | |
| 			if output_file:
 | |
| 				with open(output_file, 'a') as f:
 | |
| 					f.write(output + "\n")
 | |
| 			x += 1
 | |
| 			time.sleep(1)
 | |
| 
 | |
| 	if query_confirmation.lower() in ("n", "no"):
 | |
| 		print("\nExiting...\n")
 | |
| 
 | |
| 	if output_file and os.path.isfile(output_file):
 | |
| 		print(f"Output saved to {output_file}")
 | |
| 
 | |
| def quarantine_users_media():
 | |
| 	username = input("\nPlease enter the username of the user who's media you want to quarantine: ")
 | |
| 	username = parse_username(username)
 | |
| 
 | |
| 	url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/user/@{username}:{hardcoded_variables.base_url}/media/quarantine?access_token={hardcoded_variables.access_token}"
 | |
| 	
 | |
| 	print("\n" + url + "\n")
 | |
| 	response = requests.post(url, verify=True)
 | |
| 
 | |
| 	if response.status_code == 200:
 | |
| 		print(response.text)
 | |
| 	else:
 | |
| 		print(f"Error quarantining user's media: {response.status_code}, {response.text}")
 | |
| 
 | |
| # Example:
 | |
| # $ curl -X POST https://matrix.perthchat.org/_synapse/admin/v1/user/@PC-Admin:perthchat.org/media/quarantine?access_token=ACCESS_TOKEN |