mirror of
https://github.com/PC-Admin/matrix-moderation-tool.git
synced 2024-12-19 07:00:27 -05:00
initial requests module mods
This commit is contained in:
parent
5839e73de9
commit
0b6c8a0845
179
modtool.py
179
modtool.py
@ -11,15 +11,17 @@
|
||||
# Make the menu prettier!
|
||||
|
||||
import subprocess
|
||||
import requests
|
||||
import json
|
||||
import csv
|
||||
import time
|
||||
import os
|
||||
|
||||
###########################################################################
|
||||
# These values can be hard coded for easier usage: #
|
||||
homeserver_url = "matrix.example.org"
|
||||
base_url = "example.org"
|
||||
access_token = ""
|
||||
homeserver_url = "matrix.perthchat.org"
|
||||
base_url = "perthchat.org"
|
||||
access_token = "syt_bWljaGFlbA_buBejnFJUvfDjtMLLDkP_3sspLa"
|
||||
###########################################################################
|
||||
|
||||
def parse_username(username):
|
||||
@ -28,47 +30,93 @@ def parse_username(username):
|
||||
username = username.replace(tail_end,'')
|
||||
return username
|
||||
|
||||
#def deactivate_account(preset_username):
|
||||
# if len(preset_username) == 0:
|
||||
# username = input("\nPlease enter the username to deactivate: ")
|
||||
# username = parse_username(username)
|
||||
# else:
|
||||
# username = parse_username(preset_username)
|
||||
# command_string = "curl -X POST -H \"Authorization: Bearer " + access_token + "\" 'https://" + homeserver_url + "/_synapse/admin/v1/deactivate/@" + username + ":" + base_url + "' --data '{\"erase\": true}'"
|
||||
# print("\n" + command_string + "\n")
|
||||
# process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
# output = process.stdout
|
||||
# print(output)
|
||||
|
||||
|
||||
def deactivate_account(preset_username):
|
||||
if len(preset_username) == 0:
|
||||
username = input("\nPlease enter the username to deactivate: ")
|
||||
username = parse_username(username)
|
||||
else:
|
||||
username = parse_username(preset_username)
|
||||
command_string = "curl -X POST -H \"Authorization: Bearer " + access_token + "\" 'https://" + homeserver_url + "/_synapse/admin/v1/deactivate/@" + username + ":" + base_url + "' --data '{\"erase\": true}'"
|
||||
print("\n" + command_string + "\n")
|
||||
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
output = process.stdout
|
||||
print(output)
|
||||
deactivate_url = "https://" + homeserver_url + "/_synapse/admin/v1/deactivate/@" + username + ":" + base_url
|
||||
request_header = { "Authorization": 'Bearer ' + access_token }
|
||||
request_data = { "erase": True }
|
||||
deactivate = requests.post(deactivate_url, headers=request_header, json=request_data)
|
||||
print( '\nRequest .status_code: ' + str(deactivate.status_code) )
|
||||
print( 'Request .text: ' + deactivate.text )
|
||||
|
||||
# Example:
|
||||
# $ curl -X POST -H "Authorization: Bearer ACCESS_TOKEN" "https://matrix.perthchat.org/_synapse/admin/v1/deactivate/@billybob:perthchat.org" --data '{"erase": true}'
|
||||
|
||||
#def reset_password():
|
||||
# username = input("\nPlease enter the username for the password reset: ")
|
||||
# password = input("Please enter the password to set: ")
|
||||
# username = parse_username(username)
|
||||
# command_string = "curl -X POST -H 'Content-Type: application/json' -d '{\"new_password\": \"" + password + "\", \"logout_devices\": true}' https://" + homeserver_url + "/_synapse/admin/v1/reset_password/@" + username + ":" + base_url + "?access_token=" + access_token
|
||||
# print("\n" + command_string + "\n")
|
||||
# process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
# output = process.stdout
|
||||
# print(output)
|
||||
|
||||
def reset_password():
|
||||
username = input("\nPlease enter the username for the password reset: ")
|
||||
password = input("Please enter the password to set: ")
|
||||
username = parse_username(username)
|
||||
command_string = "curl -X POST -H 'Content-Type: application/json' -d '{\"new_password\": \"" + password + "\", \"logout_devices\": true}' https://" + homeserver_url + "/_synapse/admin/v1/reset_password/@" + username + ":" + base_url + "?access_token=" + access_token
|
||||
print("\n" + command_string + "\n")
|
||||
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
output = process.stdout
|
||||
print(output)
|
||||
reset_url = "https://" + homeserver_url + "/_synapse/admin/v1/reset_password/@" + username + ":" + base_url
|
||||
request_header = { "Authorization": 'Bearer ' + access_token }
|
||||
request_data = { "new_password": password, "logout_devices": True }
|
||||
reset_password = requests.post(reset_url, headers=request_header, json=request_data)
|
||||
print( '\nRequest .status_code: ' + str(reset_password.status_code) )
|
||||
print( 'Request .text: ' + reset_password.text )
|
||||
|
||||
# Example:
|
||||
# $ curl -X POST -H 'Content-Type: application/json' -d '{"new_password": "dogpoo", "logout_devices": true}' https://matrix.perthchat.org/_synapse/admin/v1/reset_password/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN
|
||||
|
||||
#def set_user_server_admin():
|
||||
# tried setting 'admin: false' here but it failed and promoted the user instead!
|
||||
# print("\nBe aware that you need to set at least 1 user to server admin already by editing the database in order to use this command. See https://github.com/PC-Admin/PC-Admins-Synapse-Moderation-Tool/blob/master/README.md for details on how to do this.")
|
||||
username = input("\nPlease enter the username you want to promote to server admin: ")
|
||||
# username = parse_username(username)
|
||||
# passthrough = 0
|
||||
# server_admin_result = "true"
|
||||
|
||||
# command_string = "curl -X PUT -H 'Content-Type: application/json' -d '{\"admin\": \"" + server_admin_result + "\"}' https://" + homeserver_url + "/_synapse/admin/v1/users/@" + username + ":" + base_url + "/admin?access_token=" + access_token
|
||||
# print("\n" + command_string + "\n")
|
||||
# process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
# output = process.stdout
|
||||
# print(output)
|
||||
|
||||
def set_user_server_admin():
|
||||
# tried setting 'admin: false' here but it failed and promoted the user instead!
|
||||
print("\nBe aware that you need to set at least 1 user to server admin already by editing the database in order to use this command. See https://github.com/PC-Admin/PC-Admins-Synapse-Moderation-Tool/blob/master/README.md for details on how to do this.")
|
||||
username = input("\nPlease enter the username you want to promote to server admin: ")
|
||||
username = input("\nPlease enter the username you want to promote/demote to/from server admin: ")
|
||||
server_admin_result_raw = input("\nDo you want this user to be a server admin? yes/no? ")
|
||||
username = parse_username(username)
|
||||
passthrough = 0
|
||||
server_admin_result = "true"
|
||||
if server_admin_result_raw == "y" or server_admin_result_raw == "yes" or server_admin_result_raw == "Y" or server_admin_result_raw == "Yes" or server_admin_result_raw == "YES":
|
||||
request_data = { "admin": "true" }
|
||||
elif server_admin_result_raw == "n" or server_admin_result_raw == "no" or server_admin_result_raw == "N" or server_admin_result_raw == "No" or server_admin_result_raw == "NO":
|
||||
request_data = { "admin": "false" }
|
||||
else:
|
||||
print("Input not recognised, aborting command.")
|
||||
return
|
||||
set_server_admin_url = "https://" + homeserver_url + "/_synapse/admin/v1/users/@" + username + ":" + base_url + "/admin"
|
||||
request_header = { "Authorization": 'Bearer ' + access_token }
|
||||
set_server_admin = requests.post(set_server_admin_url, headers=request_header, json=request_data)
|
||||
print( '\nRequest .status_code: ' + str(set_server_admin.status_code) )
|
||||
print( 'Request .text: ' + set_server_admin.text )
|
||||
|
||||
command_string = "curl -X PUT -H 'Content-Type: application/json' -d '{\"admin\": \"" + server_admin_result + "\"}' https://" + homeserver_url + "/_synapse/admin/v1/users/@" + username + ":" + base_url + "/admin?access_token=" + access_token
|
||||
print("\n" + command_string + "\n")
|
||||
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
output = process.stdout
|
||||
print(output)
|
||||
#### ^ THIS ISN'T WORKING ####
|
||||
|
||||
# Example:
|
||||
# $ curl -kX POST -H 'Content-Type: application/json' -d '{"admin": "true"}' https://matrix.perthchat.org/_synapse/admin/v2/users/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN
|
||||
@ -79,11 +127,17 @@ def query_account(preset_username):
|
||||
elif preset_username != '':
|
||||
username = preset_username
|
||||
username = parse_username(username)
|
||||
command_string = "curl -kXGET https://" + homeserver_url + "/_matrix/client/r0/admin/whois/@" + username + ":" + base_url + "?access_token=" + access_token
|
||||
query_account_url = "https://" + homeserver_url + "/_matrix/client/r0/admin/whois/@" + username + ":" + base_url
|
||||
request_header = { "Authorization": 'Bearer ' + access_token }
|
||||
query_account = requests.get(query_account_url, headers=request_header)
|
||||
print( '\nRequest .status_code: ' + str(query_account.status_code) )
|
||||
print( 'Request .text: ' + query_account.text )
|
||||
|
||||
# command_string = "curl -kXGET https://" + homeserver_url + "/_matrix/client/r0/admin/whois/@" + username + ":" + base_url + "?access_token=" + access_token
|
||||
#print("\n" + command_string + "\n")
|
||||
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
output = process.stdout
|
||||
print(output + "\n")
|
||||
# process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
# output = process.stdout
|
||||
# print(output + "\n")
|
||||
|
||||
# Example:
|
||||
# $ curl -kXGET https://matrix.perthchat.org/_matrix/client/r0/admin/whois/@PC-Admin:perthchat.org?access_token=ACCESS_TOKEN
|
||||
@ -94,11 +148,17 @@ def list_joined_rooms(preset_username):
|
||||
elif preset_username != '':
|
||||
username = preset_username
|
||||
username = parse_username(username)
|
||||
command_string = "curl -kXGET https://" + homeserver_url + "/_synapse/admin/v1/users/@" + username + ":" + base_url + "/joined_rooms?access_token=" + access_token
|
||||
list_joined_rooms_url = "https://" + homeserver_url + "/_synapse/admin/v1/users/@" + username + ":" + base_url + "/joined_rooms"
|
||||
request_header = { "Authorization": 'Bearer ' + access_token }
|
||||
list_joined_rooms = requests.get(list_joined_rooms_url, headers=request_header)
|
||||
print( '\nRequest .status_code: ' + str(list_joined_rooms.status_code) )
|
||||
print( 'Request .text: ' + list_joined_rooms.text )
|
||||
|
||||
# command_string = "curl -kXGET https://" + homeserver_url + "/_synapse/admin/v1/users/@" + username + ":" + base_url + "/joined_rooms?access_token=" + access_token
|
||||
#print("\n" + command_string + "\n")
|
||||
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
output = process.stdout
|
||||
print(output + "\n")
|
||||
# process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
# output = process.stdout
|
||||
# print(output + "\n")
|
||||
|
||||
# Example:
|
||||
# $ curl -kXGET https://matrix.perthchat.org/_synapse/admin/v1/users/@PC-Admin:perthchat.org/joined_rooms?access_token=ACCESS_TOKEN
|
||||
@ -128,35 +188,49 @@ def list_accounts():
|
||||
guest_choice = input("Do you want to include guest accounts y/n? ")
|
||||
|
||||
if deactivated_choice == "y" or deactivated_choice == "Y" or deactivated_choice == "yes" or deactivated_choice == "Yes":
|
||||
deactivated_string = "deactivated=true"
|
||||
deactivated_string = "&deactivated=true"
|
||||
elif deactivated_choice == "n" or deactivated_choice == "N" or deactivated_choice == "no" or deactivated_choice == "No":
|
||||
deactivated_string = "deactivated=false"
|
||||
deactivated_string = "&deactivated=false"
|
||||
else:
|
||||
print("Input invalid! Defaulting to false.")
|
||||
deactivated_string = "deactivated=false"
|
||||
deactivated_string = "&deactivated=false"
|
||||
|
||||
if guest_choice == "y" or guest_choice == "Y" or guest_choice == "yes" or guest_choice == "Yes":
|
||||
guest_string = "guest=true"
|
||||
guest_string = "&guests=true"
|
||||
elif guest_choice == "n" or guest_choice == "N" or guest_choice == "no" or guest_choice == "No":
|
||||
guest_string = "guest=false"
|
||||
guest_string = "&guests=false"
|
||||
else:
|
||||
print("Input invalid! Defaulting to false.")
|
||||
guest_string = "guest=false"
|
||||
guest_string = "&guests=false"
|
||||
|
||||
command_string = "curl -kXGET \"https://" + homeserver_url + "/_synapse/admin/v2/users?from=0&limit=1000000&" + guest_string + "&" + deactivated_string + "&access_token=" + access_token + "\""
|
||||
print("\n" + command_string + "\n")
|
||||
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
output = process.stdout
|
||||
number_of_users = output.count("name")
|
||||
list_accounts_url = "https://" + homeserver_url + "/_synapse/admin/v2/users?from=0" + guest_string + deactivated_string
|
||||
request_header = { "Authorization": 'Bearer ' + access_token }
|
||||
#request_data = { "guest": guest_string, "deactivate": deactivated_string }
|
||||
list_accounts = requests.get(list_accounts_url, headers=request_header)
|
||||
print( '\nRequest .status_code: ' + str(list_accounts.status_code) )
|
||||
print( 'Request .text: ' + list_accounts.text )
|
||||
|
||||
# command_string = "curl -kXGET \"https://" + homeserver_url + "/_synapse/admin/v2/users?from=0&limit=1000000&" + guest_string + "&" + deactivated_string + "&access_token=" + access_token + "\""
|
||||
# print("\n" + command_string + "\n")
|
||||
# process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
# output = process.stdout
|
||||
list_accounts_json = list_accounts.json()
|
||||
number_of_users = list_accounts_json['total']
|
||||
#
|
||||
print("\nTotal amount of users: " + str(number_of_users))
|
||||
if number_of_users < 100:
|
||||
print(output)
|
||||
elif number_of_users >= 100:
|
||||
accounts_output_file = input("\nThere are too many users to list here, please specify a filename to print this data too: ")
|
||||
f = open(accounts_output_file, "w")
|
||||
f.write(output)
|
||||
f = open(accounts_output_file, "a")
|
||||
for i in range(0, number_of_users, 100):
|
||||
list_accounts_url = "https://" + homeserver_url + "/_synapse/admin/v2/users?from=" + str(i) + guest_string + deactivated_string
|
||||
#request_data = { "guest": guest_string, "deactivate": deactivated_string }
|
||||
list_accounts = requests.get(list_accounts_url, headers=request_header)
|
||||
f.write("\n")
|
||||
f.write(list_accounts.text)
|
||||
f.close()
|
||||
print("Done!\n")
|
||||
|
||||
# Example:
|
||||
# $ curl -kXGET "https://matrix.perthchat.org/_synapse/admin/v2/users?from=0&limit=10&guests=false&access_token=ACCESS_TOKEN"
|
||||
@ -171,11 +245,18 @@ def create_account(preset_username,preset_password):
|
||||
user_password = preset_password
|
||||
else:
|
||||
print("\nError with user/pass file data, skipping...\n")
|
||||
command_string = "curl -kX PUT -H 'Content-Type: application/json' -d '{\"password\": \"" + user_password + "\"}' https://" + homeserver_url + "/_synapse/admin/v2/users/@" + username + ":" + base_url + "?access_token=" + access_token
|
||||
print("\n" + command_string + "\n")
|
||||
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
output = process.stdout
|
||||
print(output)
|
||||
|
||||
create_account_url = "https://" + homeserver_url + "/_synapse/admin/v2/users/@" + username + ":" + base_url
|
||||
request_header = { "Authorization": 'Bearer ' + access_token }
|
||||
request_data = { "password": user_password ,"admin": False, "deactivated": False }
|
||||
create_account = requests.get(create_account_url, headers=request_header, data=request_data)
|
||||
print( '\nRequest .status_code: ' + str(create_account.status_code) )
|
||||
print( 'Request .text: ' + create_account.text )
|
||||
#command_string = "curl -kX PUT -H 'Content-Type: application/json' -d '{\"password\": \"" + user_password + "\"}' https://" + homeserver_url + "/_synapse/admin/v2/users/@" + username + ":" + base_url + "?access_token=" + access_token
|
||||
#print("\n" + command_string + "\n")
|
||||
#process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
||||
#output = process.stdout
|
||||
#print(output)
|
||||
|
||||
# Example:
|
||||
# $ curl -kX PUT -H 'Content-Type: application/json' -d '{"password": "user_password","admin": false,"deactivated": false}' https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN
|
||||
@ -562,7 +643,7 @@ if length_access_token == 0:
|
||||
|
||||
pass_token = False
|
||||
while pass_token == False:
|
||||
menu_input = input('\nPlease select one of the following options:\n#### User Account Commands ####\n1) Deactivate a user account.\n2) Create a user account.\n3) Query user account.\n4) List room memberships of user.\n5) Query multiple user accounts.\n6) Reset a users password.\n7) Promote a user to server admin.\n8) List all user accounts.\n9) Create multiple user accounts.\n10) Deactivate multiple user accounts.\n11) Quarantine all media a users uploaded.\n#### Room Commands ####\n12) List details of a room.\n13) List rooms in public directory.\n14) Remove a room from the public directory.\n15) Remove multiple rooms from the public directory.\n16) List/Download all media in a room.\n17) Download media from multiple rooms.\n18) Quarantine all media in a room.\n19) Purge a room.\n20) Purge multiple rooms.\n#### Server Commands ####\n21) Purge remote media repository up to a certain date.\n22) Prepare database for copying events of multiple rooms.\n(\'q\' or \'e\') Exit.\n\n')
|
||||
menu_input = input('\nPlease select one of the following options:\n#### User Account Commands ####\n1) Deactivate a user account.\n2) Create a user account.\n3) Query user account.\n4) List room memberships of user.\n5) Query multiple user accounts.\n6) Reset a users password.\n7) Promote/Demote a user to/from server admin.\n8) List all user accounts.\n9) Create multiple user accounts.\n10) Deactivate multiple user accounts.\n11) Quarantine all media a users uploaded.\n#### Room Commands ####\n12) List details of a room.\n13) List rooms in public directory.\n14) Remove a room from the public directory.\n15) Remove multiple rooms from the public directory.\n16) List/Download all media in a room.\n17) Download media from multiple rooms.\n18) Quarantine all media in a room.\n19) Purge a room.\n20) Purge multiple rooms.\n#### Server Commands ####\n21) Purge remote media repository up to a certain date.\n22) Prepare database for copying events of multiple rooms.\n(\'q\' or \'e\') Exit.\n\n')
|
||||
if menu_input == "1":
|
||||
deactivate_account('')
|
||||
elif menu_input == "2":
|
||||
@ -611,4 +692,4 @@ while pass_token == False:
|
||||
print("\nExiting...\n")
|
||||
pass_token = True
|
||||
else:
|
||||
print("\nIncorrect input detected, please select a number from 1 to 21!\n")
|
||||
print("\nIncorrect input detected, please select a number from 1 to 22!\n")
|
||||
|
Loading…
Reference in New Issue
Block a user