change user commands function list to use requests module instead of curl (safer)

This commit is contained in:
PC-Admin 2023-07-10 04:48:14 +08:00
parent f1a2e58dd7
commit 96c1d5b94e
2 changed files with 261 additions and 185 deletions

View File

@ -32,27 +32,27 @@ if length_access_token == 0:
pass_token = False
while pass_token == False:
menu_input = input('\nPlease select one of the following options:\n#### User Account Commands ####\n1) Deactivate a user account.\n2) Create a user account.\n3) Query user account.\n4) List room memberships of user.\n5) Query multiple user accounts.\n6) Reset a users password.\n7) Promote a user to server admin.\n8) List all user accounts.\n9) Create multiple user accounts.\n10) Deactivate multiple user accounts.\n11) Quarantine all media a users uploaded.\n#### Room Commands ####\n12) List details of a room.\n13) Export the state events of a target room.\n14) List rooms in public directory.\n15) Remove a room from the public directory.\n16) Remove multiple rooms from the public directory.\n17) Redact a room event. (Like abusive avatars or display names.) \n18) List/Download all media in a room.\n19) Download media from multiple rooms.\n20) Quarantine all media in a room.\n21) Shutdown a room.\n22) Shutdown multiple rooms.\n23) Delete a room.\n24) Delete multiple rooms.\n25) Purge the event history of a room to a specific timestamp.\n26) Purge the event history of multiple rooms to a specific timestamp.\n#### Server Commands ####\n27) Delete and block a specific media. (Like an abusive avatar.) \n28) Purge remote media repository up to a certain date.\n29) Prepare database for copying events of multiple rooms.\n#### rdlist ####\n30) Block all rooms with specific rdlist tags.\n34) Block all rooms with recommended rdlist tags.\n(\'q\' or \'e\') Exit.\n\n')
menu_input = input('\nPlease select one of the following options:\n#### User Account Commands ####\n1) Deactivate a user account.\n2) Deactivate multiple user accounts.\n3) Create a user account.\n4) Create multiple user accounts.\n5) Reset a users password.\n6) Whois user account.\n7) Whois multiple user accounts.\n8) List room memberships of user.\n9) Promote a user to server admin.\n10) List all user accounts.\n11) Quarantine all media a users uploaded.\n#### Room Commands ####\n12) List details of a room.\n13) Export the state events of a target room.\n14) List rooms in public directory.\n15) Remove a room from the public directory.\n16) Remove multiple rooms from the public directory.\n17) Redact a room event. (Like abusive avatars or display names.) \n18) List/Download all media in a room.\n19) Download media from multiple rooms.\n20) Quarantine all media in a room.\n21) Shutdown a room.\n22) Shutdown multiple rooms.\n23) Delete a room.\n24) Delete multiple rooms.\n25) Purge the event history of a room to a specific timestamp.\n26) Purge the event history of multiple rooms to a specific timestamp.\n#### Server Commands ####\n27) Delete and block a specific media. (Like an abusive avatar.) \n28) Purge remote media repository up to a certain date.\n29) Prepare database for copying events of multiple rooms.\n#### rdlist ####\n30) Block all rooms with specific rdlist tags.\n34) Block all rooms with recommended rdlist tags.\n(\'q\' or \'e\') Exit.\n\n')
if menu_input == "1":
user_commands.deactivate_account('')
elif menu_input == "2":
user_commands.create_account('','')
elif menu_input == "3":
user_commands.whois_account('')
elif menu_input == "4":
user_commands.list_joined_rooms('')
elif menu_input == "5":
user_commands.whois_multiple_accounts()
elif menu_input == "6":
user_commands.reset_password()
elif menu_input == "7":
user_commands.set_user_server_admin()
elif menu_input == "8":
user_commands.list_accounts()
elif menu_input == "9":
user_commands.create_multiple_accounts()
elif menu_input == "10":
user_commands.deactivate_multiple_accounts()
elif menu_input == "3":
user_commands.create_account('','')
elif menu_input == "4":
user_commands.create_multiple_accounts()
elif menu_input == "6":
user_commands.reset_password('','')
elif menu_input == "6":
user_commands.whois_account('')
elif menu_input == "7":
user_commands.whois_multiple_accounts()
elif menu_input == "8":
user_commands.list_joined_rooms('')
elif menu_input == "9":
user_commands.set_user_server_admin('')
elif menu_input == "10":
user_commands.list_accounts()
elif menu_input == "11":
user_commands.quarantine_users_media()
elif menu_input == "12":

View File

@ -1,5 +1,6 @@
import subprocess
import requests
import json
import time
import csv
import hardcoded_variables
@ -16,160 +17,52 @@ def deactivate_account(preset_username):
username = parse_username(username)
else:
username = parse_username(preset_username)
command_string = "curl -X POST -H \"Authorization: Bearer " + hardcoded_variables.access_token + "\" 'https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/deactivate/@" + username + ":" + hardcoded_variables.base_url + "' --data '{\"erase\": true}'"
print("\n" + command_string + "\n")
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
print(output)
url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/deactivate/@{username}:{hardcoded_variables.base_url}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {hardcoded_variables.access_token}"
}
data = {
"erase": True
}
print("\n" + url + "\n")
response = requests.post(url, headers=headers, data=json.dumps(data), verify=True)
if response.status_code in [200,201]:
print("Successfully deactivated account.")
else:
print(f"Error deactivating account: {response.status_code}, {response.text}")
return response.text
# Example:
# $ curl -X POST -H "Authorization: Bearer ACCESS_TOKEN" "https://matrix.perthchat.org/_synapse/admin/v1/deactivate/@billybob:perthchat.org" --data '{"erase": true}'
def reset_password(preset_username,preset_password):
if len(preset_username) == 0 and len(preset_password) == 0:
username = input("\nPlease enter the username for the password reset: ")
password = input("Please enter the password to set: ")
else:
username = parse_username(preset_username)
password = preset_password
username = parse_username(username)
command_string = "curl -X POST -H 'Content-Type: application/json' -d '{\"new_password\": \"" + password + "\", \"logout_devices\": true}' https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/reset_password/@" + username + ":" + hardcoded_variables.base_url + "?access_token=" + hardcoded_variables.access_token
print("\n" + command_string + "\n")
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
print(output)
return output
# Example:
# $ curl -X POST -H 'Content-Type: application/json' -d '{"new_password": "dogpoo", "logout_devices": true}' https://matrix.perthchat.org/_synapse/admin/v1/reset_password/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN
def set_user_server_admin(preset_username):
if len(preset_username) == 0:
# tried setting 'admin: false' here but it failed and promoted the user instead!
print("\nBe aware that you need to set at least 1 user to server admin already by editing the database in order to use this command. See https://github.com/PC-Admin/PC-Admins-Synapse-Moderation-Tool/blob/master/README.md for details on how to do this.")
username = input("\nPlease enter the username you want to promote to server admin: ")
username = parse_username(username)
elif len(preset_username) > 0:
username = parse_username(preset_username)
#passthrough = 0
server_admin_result = "true"
command_string = "curl -X PUT -H 'Content-Type: application/json' -d '{\"admin\": \"" + server_admin_result + "\"}' https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/users/@" + username + ":" + hardcoded_variables.base_url + "/admin?access_token=" + hardcoded_variables.access_token
print("\n" + command_string + "\n")
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
print(output)
return output
# Example:
# $ curl -kX POST -H 'Content-Type: application/json' -d '{"admin": "true"}' https://matrix.perthchat.org/_synapse/admin/v2/users/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN
def whois_account(preset_username):
if preset_username == '':
username = input("\nPlease enter the username you wish to whois: ")
elif preset_username != '':
username = preset_username
username = parse_username(username)
command_string = "curl -kXGET https://" + hardcoded_variables.homeserver_url + "/_matrix/client/r0/admin/whois/@" + username + ":" + hardcoded_variables.base_url + "?access_token=" + hardcoded_variables.access_token
#print("\n" + command_string + "\n")
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
print(output + "\n")
return(output)
# Example:
# $ curl -kXGET https://matrix.perthchat.org/_matrix/client/r0/admin/whois/@PC-Admin:perthchat.org?access_token=ACCESS_TOKEN
def list_joined_rooms(preset_username):
if preset_username == '':
username = input("\nPlease enter the username you wish to query: ")
elif preset_username != '':
username = preset_username
username = parse_username(username)
command_string = "curl -kXGET https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/users/@" + username + ":" + hardcoded_variables.base_url + "/joined_rooms?access_token=" + hardcoded_variables.access_token
#print("\n" + command_string + "\n")
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
print(output + "\n")
# Example:
# $ curl -kXGET https://matrix.perthchat.org/_synapse/admin/v1/users/@PC-Admin:perthchat.org/joined_rooms?access_token=ACCESS_TOKEN
def whois_multiple_accounts():
print("Whois multiple user accounts selected")
user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
def deactivate_multiple_accounts():
print("Deactivate multiple user accounts selected")
user_list_location = input("\nPlease enter the path of the file containing a csv list of names: ")
with open(user_list_location, newline='') as f:
reader = csv.reader(f)
data = list(reader)
print(len(data))
whois_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to whois all of these users? y/n?\n")
if whois_confirmation == "y" or whois_confirmation == "Y" or whois_confirmation == "yes" or whois_confirmation == "Yes":
reader = csv.reader(f)
data = list(reader)
delete_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to deactivate these users? y/n?\n")
#print(len(data[0]))
#print(data[0][0])
if delete_confirmation == "y" or delete_confirmation == "Y" or delete_confirmation == "yes" or delete_confirmation == "Yes":
x = 0
while x <= (len(data) - 1):
print(data[x][0])
query_account(data[x][0])
list_joined_rooms(data[x][0])
#print(data[0][x])
deactivate_account(data[x][0])
x += 1
#print(x)
time.sleep(1)
if whois_confirmation == "n" or whois_confirmation == "N" or whois_confirmation == "no" or whois_confirmation == "No":
time.sleep(10)
if delete_confirmation == "n" or delete_confirmation == "N" or delete_confirmation == "no" or delete_confirmation == "No":
print("\nExiting...\n")
def list_accounts():
deactivated_choice = input("Do you want to include deactivated accounts y/n? ")
guest_choice = input("Do you want to include guest accounts y/n? ")
if deactivated_choice == "y" or deactivated_choice == "Y" or deactivated_choice == "yes" or deactivated_choice == "Yes":
deactivated_string = "deactivated=true"
elif deactivated_choice == "n" or deactivated_choice == "N" or deactivated_choice == "no" or deactivated_choice == "No":
deactivated_string = "deactivated=false"
else:
print("Input invalid! Defaulting to false.")
deactivated_string = "deactivated=false"
if guest_choice == "y" or guest_choice == "Y" or guest_choice == "yes" or guest_choice == "Yes":
guest_string = "guest=true"
elif guest_choice == "n" or guest_choice == "N" or guest_choice == "no" or guest_choice == "No":
guest_string = "guest=false"
else:
print("Input invalid! Defaulting to false.")
guest_string = "guest=false"
command_string = "curl -kXGET \"https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v2/users?from=0&limit=1000000&" + guest_string + "&" + deactivated_string + "&access_token=" + hardcoded_variables.access_token + "\""
print("\n" + command_string + "\n")
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
number_of_users = output.count("name")
#
print("\nTotal amount of users: " + str(number_of_users))
if number_of_users < 100:
print(output)
elif number_of_users >= 100:
accounts_output_file = input("\nThere are too many users to list here, please specify a filename to print this data too: ")
f = open(accounts_output_file, "w")
f.write(output)
f.close()
# Example:
# $ curl -kXGET "https://matrix.perthchat.org/_synapse/admin/v2/users?from=0&limit=10&guests=false&access_token=ACCESS_TOKEN"
def query_account(preset_username):
if len(preset_username) == 0:
username = input("\nPlease enter the username to query: ")
username = parse_username(username)
elif len(preset_username) > 0:
username = parse_username(preset_username)
command_string = "curl -kX GET https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v2/users/@" + username + ":" + hardcoded_variables.base_url + "?access_token=" + hardcoded_variables.access_token
print("\n" + command_string + "\n")
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
print(output)
return output
# Example:
# $ curl -kX GET https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN
def create_account(preset_username,preset_password):
def create_account(preset_username, preset_password):
if len(preset_username) == 0 and len(preset_password) == 0:
username = input("\nPlease enter the username to create: ")
username = parse_username(username)
@ -179,12 +72,30 @@ def create_account(preset_username,preset_password):
user_password = preset_password
else:
print("\nError with user/pass file data, skipping...\n")
command_string = "curl -kX PUT -H 'Content-Type: application/json' -d '{\"password\": \"" + user_password + "\"}' https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v2/users/@" + username + ":" + hardcoded_variables.base_url + "?access_token=" + hardcoded_variables.access_token
print("\n" + command_string + "\n")
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
print(output)
return output
return
url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users/@{username}:{hardcoded_variables.base_url}"
url += f"?access_token={hardcoded_variables.access_token}"
headers = {
"Content-Type": "application/json"
}
data = {
"password": user_password
}
print("\n" + url + "\n")
response = requests.put(url, headers=headers, data=json.dumps(data), verify=True)
if response.status_code == 201:
print("Successfully created account.")
elif response.status_code == 200:
print("Account already exists.")
else:
print(f"Error creating account: {response.status_code}, {response.text}")
return response.text
# Example:
# $ curl -kX PUT -H 'Content-Type: application/json' -d '{"password": "user_password","admin": false,"deactivated": false}' https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN
@ -208,34 +119,199 @@ def create_multiple_accounts():
if create_confirmation == "n" or create_confirmation == "N" or create_confirmation == "no" or create_confirmation == "No":
print("\nExiting...\n")
def deactivate_multiple_accounts():
print("Deactivate multiple user accounts selected")
user_list_location = input("\nPlease enter the path of the file containing a csv list of names: ")
def reset_password(preset_username, preset_password):
if len(preset_username) == 0 and len(preset_password) == 0:
username = input("\nPlease enter the username for the password reset: ")
password = input("Please enter the password to set: ")
else:
username = parse_username(preset_username)
password = preset_password
username = parse_username(username)
url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/reset_password/@" + username + f":{hardcoded_variables.base_url}?access_token={hardcoded_variables.access_token}"
headers = {'Content-Type': 'application/json'}
data = {'new_password': password, 'logout_devices': True}
print("\n" + url + "\n")
response = requests.post(url, headers=headers, data=json.dumps(data), verify=True)
if response.status_code == 200:
print(response.text)
else:
print(f"Error resetting password: {response.status_code}, {response.text}")
return response.text
# Example:
# $ curl -X POST -H 'Content-Type: application/json' -d '{"new_password": "dogpoo", "logout_devices": true}' https://matrix.perthchat.org/_synapse/admin/v1/reset_password/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN
def set_user_server_admin(preset_username):
if len(preset_username) == 0:
print("\nBe aware that you need to set at least 1 user to server admin already by editing the database in order to use this command. See https://github.com/PC-Admin/PC-Admins-Synapse-Moderation-Tool/blob/master/README.md for details on how to do this.")
username = input("\nPlease enter the username you want to promote to server admin: ")
username = parse_username(username)
elif len(preset_username) > 0:
username = parse_username(preset_username)
# tried setting 'admin: false' here but it failed and promoted the user instead!
server_admin_result = "true"
url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@{username}:{hardcoded_variables.base_url}/admin"
url += f"?access_token={hardcoded_variables.access_token}"
headers = {
"Content-Type": "application/json"
}
data = {
"admin": server_admin_result
}
print("\n" + url + "\n")
response = requests.put(url, headers=headers, data=json.dumps(data), verify=True)
if response.status_code == 200:
print("Successfully set user as server admin.")
else:
print(f"Error setting user as server admin: {response.status_code}, {response.text}")
return response.text
# Example:
# $ curl -kX POST -H 'Content-Type: application/json' -d '{"admin": "true"}' https://matrix.perthchat.org/_synapse/admin/v2/users/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN
def whois_account(preset_username):
if preset_username == '':
username = input("\nPlease enter the username you wish to whois: ")
elif preset_username != '':
username = preset_username
username = parse_username(username)
url = f"https://{hardcoded_variables.homeserver_url}/_matrix/client/r0/admin/whois/@{username}:{hardcoded_variables.base_url}"
url += f"?access_token={hardcoded_variables.access_token}"
print("\n" + url + "\n")
response = requests.get(url, verify=True)
if response.status_code == 200:
print(response.text + "\n")
else:
print(f"Error retrieving account info: {response.status_code}, {response.text}\n")
return response.text
# Example:
# $ curl -kXGET https://matrix.perthchat.org/_matrix/client/r0/admin/whois/@PC-Admin:perthchat.org?access_token=ACCESS_TOKEN
def whois_multiple_accounts():
print("Whois multiple user accounts selected")
user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
with open(user_list_location, newline='') as f:
reader = csv.reader(f)
data = list(reader)
delete_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to deactivate these users? y/n?\n")
#print(len(data[0]))
#print(data[0][0])
if delete_confirmation == "y" or delete_confirmation == "Y" or delete_confirmation == "yes" or delete_confirmation == "Yes":
reader = csv.reader(f)
data = list(reader)
print(len(data))
whois_confirmation = input("\n" + str(data) + "\n\nAre you sure you want to whois all of these users? y/n?\n")
if whois_confirmation == "y" or whois_confirmation == "Y" or whois_confirmation == "yes" or whois_confirmation == "Yes":
x = 0
while x <= (len(data) - 1):
#print(data[0][x])
deactivate_account(data[x][0])
print(data[x][0])
query_account(data[x][0])
list_joined_rooms(data[x][0])
x += 1
#print(x)
time.sleep(10)
if delete_confirmation == "n" or delete_confirmation == "N" or delete_confirmation == "no" or delete_confirmation == "No":
time.sleep(1)
if whois_confirmation == "n" or whois_confirmation == "N" or whois_confirmation == "no" or whois_confirmation == "No":
print("\nExiting...\n")
def list_joined_rooms(preset_username):
if preset_username == '':
username = input("\nPlease enter the username you wish to query: ")
elif preset_username != '':
username = preset_username
username = parse_username(username)
url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/users/@" + username + f":{hardcoded_variables.base_url}/joined_rooms?access_token={hardcoded_variables.access_token}"
response = requests.get(url, verify=True)
if response.status_code == 200:
print(response.text + "\n")
else:
print(f"Error querying joined rooms: {response.status_code}, {response.text}")
# Example:
# $ curl -kXGET https://matrix.perthchat.org/_synapse/admin/v1/users/@PC-Admin:perthchat.org/joined_rooms?access_token=ACCESS_TOKEN
def list_accounts():
deactivated_choice = input("Do you want to include deactivated accounts y/n? ")
guest_choice = input("Do you want to include guest accounts y/n? ")
deactivated_string = "deactivated=false" if deactivated_choice.lower() not in ["y", "yes"] else "deactivated=true"
guest_string = "guest=false" if guest_choice.lower() not in ["y", "yes"] else "guest=true"
url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users?from=0&limit=1000000&{guest_string}&{deactivated_string}&access_token={hardcoded_variables.access_token}"
print("\n" + url + "\n")
response = requests.get(url, verify=True)
if response.status_code == 200:
users = response.json()
number_of_users = len(users)
print("\nTotal amount of users: " + str(number_of_users))
if number_of_users < 100:
print(users)
else:
accounts_output_file = input("\nThere are too many users to list here, please specify a filename to print this data too: ")
with open(accounts_output_file, "w") as f:
json.dump(users, f, indent=4)
else:
print(f"Error retrieving users list: {response.status_code}, {response.text}")
# Example:
# $ curl -kXGET "https://matrix.perthchat.org/_synapse/admin/v2/users?from=0&limit=10&guests=false&access_token=ACCESS_TOKEN"
# NEED A MENU OPTION FOR THIS:
def query_account(preset_username):
if len(preset_username) == 0:
username = input("\nPlease enter the username to query: ")
username = parse_username(username)
elif len(preset_username) > 0:
username = parse_username(preset_username)
url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v2/users/@{username}:{hardcoded_variables.base_url}?access_token={hardcoded_variables.access_token}"
print("\n" + url + "\n")
response = requests.get(url, verify=True)
if response.status_code == 200:
print(response.text)
else:
print(f"Error querying account: {response.status_code}, {response.text}")
return response.text
# Example:
# $ curl -kX GET https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN
def quarantine_users_media():
username = input("\nPlease enter the username of the user who's media you want to quarantine: ")
username = parse_username(username)
command_string = "curl -X POST \'https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/user/@" + username + ":" + hardcoded_variables.base_url + "/media/quarantine?access_token=" + hardcoded_variables.access_token + "\'"
print("\n" + command_string + "\n")
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
print(output)
url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/user/@{username}:{hardcoded_variables.base_url}/media/quarantine?access_token={hardcoded_variables.access_token}"
print("\n" + url + "\n")
response = requests.post(url, verify=True)
if response.status_code == 200:
print(response.text)
else:
print(f"Error quarantining user's media: {response.status_code}, {response.text}")
# Example:
# $ curl -X POST https://matrix.perthchat.org/_synapse/admin/v1/user/@PC-Admin:perthchat.org/media/quarantine?access_token=ACCESS_TOKEN