2020-05-12 00:43:03 -04:00
# modtool.py
# an easy moderation tool for matrix/synapse
#
# created by @PC-Admin:perthchat.org
#
# This work is licensed under LGPLv3, for more information see: https://www.gnu.org/licenses/lgpl-3.0.txt
#
# To do:
# https://github.com/matrix-org/synapse/blob/master/docs/admin_api/delete_group.md
# https://github.com/matrix-org/synapse/blob/master/docs/admin_api/purge_room.md
# https://github.com/matrix-org/synapse/blob/master/docs/admin_api/purge_remote_media.rst
2020-08-25 06:09:57 -04:00
# https://github.com/matrix-org/synapse/blob/develop/docs/admin_api/media_admin_api.md#list-all-media-in-a-room
2020-05-12 00:43:03 -04:00
import subprocess
import csv
import time
2020-08-25 06:09:57 -04:00
import os
2020-05-12 00:43:03 -04:00
2020-06-01 06:50:01 -04:00
###########################################################################
# These values can be hard coded for easier usage: #
2020-08-25 19:12:56 -04:00
server_url = " example.org "
access_token = " "
2020-08-25 06:09:57 -04:00
federation_port = " 8448 "
2020-06-01 06:50:01 -04:00
###########################################################################
2020-05-12 00:43:03 -04:00
2020-08-25 06:09:57 -04:00
def parse_username ( username ) :
tail_end = ' : ' + server_url
username = username . replace ( ' @ ' , ' ' )
username = username . replace ( tail_end , ' ' )
return username
2020-05-12 00:43:03 -04:00
def deactivate_account ( preset_username ) :
if len ( preset_username ) == 0 :
username = input ( " \n Please enter the username to deactivate: " )
2020-08-25 06:09:57 -04:00
username = parse_username ( username )
2020-05-12 00:43:03 -04:00
else :
2020-08-25 06:09:57 -04:00
username = parse_username ( preset_username )
command_string = " curl -kX POST -H ' Content-Type: application/json ' -d ' { \" erase \" : true} ' https:// " + server_url + " : " + str ( federation_port ) + " /_matrix/client/r0/admin/deactivate/@ " + username + " : " + server_url + " ?access_token= " + access_token
2020-05-12 00:43:03 -04:00
print ( " \n " + command_string + " \n " )
process = subprocess . run ( [ command_string ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
output = process . stdout
print ( output )
# Example:
# $ curl -kX POST -H 'Content-Type: application/json' -d '{"erase": true}' https://perthchat.org/_matrix/client/r0/admin/deactivate/@billybob:perthchat.org?access_token=ACCESS_TOKEN
def reset_password ( ) :
username = input ( " \n Please enter the username for the password reset: " )
password = input ( " Please enter the password to set: " )
2020-08-25 06:09:57 -04:00
username = parse_username ( username )
command_string = " curl -kX POST -H ' Content-Type: application/json ' -d ' { \" new_password \" : \" " + password + " \" } ' https:// " + server_url + " : " + str ( federation_port ) + " /_matrix/client/r0/admin/reset_password/@ " + username + " : " + server_url + " ?access_token= " + access_token
2020-05-12 00:43:03 -04:00
print ( " \n " + command_string + " \n " )
process = subprocess . run ( [ command_string ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
output = process . stdout
print ( output )
# Example:
# $ curl -kX POST -H 'Content-Type: application/json' -d '{"new_password": "dogpoo"}' https://perthchat.org/_matrix/client/r0/admin/reset_password/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN
2020-05-27 22:17:16 -04:00
def set_user_server_admin ( ) :
# tried setting 'admin: false' here but it failed and promoted the user instead!
print ( " \n Be aware that you need to set at least 1 user to server admin already by editing the database in order to use this command. See https://github.com/PC-Admin/PC-Admins-Synapse-Moderation-Tool/blob/master/README.md for details on how to do this. " )
username = input ( " \n Please enter the username you want to promote to server admin: " )
2020-08-25 06:09:57 -04:00
username = parse_username ( username )
2020-05-27 22:17:16 -04:00
passthrough = 0
server_admin_result = " true "
2020-08-25 06:09:57 -04:00
command_string = " curl -kX PUT -H ' Content-Type: application/json ' -d ' { \" admin \" : \" " + server_admin_result + " \" } ' https:// " + server_url + " : " + str ( federation_port ) + " /_synapse/admin/v2/users/@ " + username + " : " + server_url + " ?access_token= " + access_token
2020-05-27 22:17:16 -04:00
print ( " \n " + command_string + " \n " )
process = subprocess . run ( [ command_string ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
output = process . stdout
print ( output )
# Example:
# $ curl -kX POST -H 'Content-Type: application/json' -d '{"admin": "true"}' https://perthchat.org/_synapse/admin/v2/users/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN
2020-05-12 00:43:03 -04:00
def query_account ( ) :
username = input ( " \n Please enter the username you wish to query: " )
2020-08-25 06:09:57 -04:00
username = parse_username ( username )
command_string = " curl -kXGET https:// " + server_url + " : " + str ( federation_port ) + " /_matrix/client/r0/admin/whois/@ " + username + " : " + server_url + " ?access_token= " + access_token
2020-05-12 00:43:03 -04:00
print ( " \n " + command_string + " \n " )
process = subprocess . run ( [ command_string ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
output = process . stdout
print ( output )
# Example:
# $ curl -kXGET https://perthchat.org/_matrix/client/r0/admin/whois/@PC-Admin:perthchat.org?access_token=ACCESS_TOKEN
def list_accounts ( ) :
deactivated_choice = input ( " Do you want to include deactivated accounts y/n? " )
guest_choice = input ( " Do you want to include guest accounts y/n? " )
if deactivated_choice == " y " or deactivated_choice == " Y " or deactivated_choice == " yes " or deactivated_choice == " Yes " :
deactivated_string = " deactivated=true "
elif deactivated_choice == " n " or deactivated_choice == " N " or deactivated_choice == " no " or deactivated_choice == " No " :
deactivated_string = " deactivated=false "
else :
print ( " Input invalid! Defaulting to false. " )
deactivated_string = " deactivated=false "
if guest_choice == " y " or guest_choice == " Y " or guest_choice == " yes " or guest_choice == " Yes " :
guest_string = " guest=true "
elif guest_choice == " n " or guest_choice == " N " or guest_choice == " no " or guest_choice == " No " :
guest_string = " guest=false "
else :
print ( " Input invalid! Defaulting to false. " )
guest_string = " guest=false "
command_string = " curl -kXGET \" https:// " + server_url + " : " + str ( federation_port ) + " /_synapse/admin/v2/users?from=0&limit=1000000& " + guest_string + " & " + deactivated_string + " &access_token= " + access_token + " \" "
print ( " \n " + command_string + " \n " )
process = subprocess . run ( [ command_string ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
output = process . stdout
number_of_users = output . count ( " name " )
#
print ( " \n Total amount of users: " + str ( number_of_users ) )
if number_of_users < 100 :
print ( output )
elif number_of_users > = 100 :
accounts_output_file = input ( " \n There are too many users to list here, please specify a filename to print this data too: " )
f = open ( accounts_output_file , " w " )
f . write ( output )
f . close ( )
# Example:
# $ curl -kXGET "https://perthchat.org/_synapse/admin/v2/users?from=0&limit=10&guests=false&access_token=ACCESS_TOKEN"
def create_account ( preset_username , preset_password ) :
if len ( preset_username ) == 0 and len ( preset_password ) == 0 :
username = input ( " \n Please enter the username to create: " )
2020-08-25 06:09:57 -04:00
username = parse_username ( username )
2020-05-12 00:43:03 -04:00
user_password = input ( " Please enter the password for this account: " )
elif len ( preset_username ) > 0 and len ( preset_password ) > 0 :
2020-08-25 06:09:57 -04:00
username = parse_username ( preset_username )
2020-05-12 00:43:03 -04:00
user_password = preset_password
else :
print ( " \n Error with user/pass file data, skipping... \n " )
2020-08-25 06:09:57 -04:00
command_string = " curl -kX PUT -H ' Content-Type: application/json ' -d ' { \" password \" : \" " + user_password + " \" } ' https:// " + server_url + " : " + str ( federation_port ) + " /_synapse/admin/v2/users/@ " + username + " : " + server_url + " ?access_token= " + access_token
2020-05-12 00:43:03 -04:00
print ( " \n " + command_string + " \n " )
process = subprocess . run ( [ command_string ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
output = process . stdout
print ( output )
# Example:
# $ curl -kX PUT -H 'Content-Type: application/json' -d '{"password": "user_password","admin": false,"deactivated": false}' https://perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN
# needed to map /_synapse/ too!
def create_multiple_accounts ( ) :
print ( " Create multiple user accounts selected " )
user_list_location = input ( " \n Please enter the path of the file containing a csv list of names: " )
with open ( user_list_location , newline = ' ' ) as f :
reader = csv . reader ( f )
data = list ( reader )
print ( len ( data ) )
create_confirmation = input ( " \n " + str ( data ) + " \n \n Are you sure you want to create these users? y/n? \n " )
if create_confirmation == " y " or create_confirmation == " Y " or create_confirmation == " yes " or create_confirmation == " Yes " :
x = 0
while x < = ( len ( data ) - 1 ) :
print ( data [ x ] [ 0 ] )
create_account ( data [ x ] [ 0 ] , data [ x ] [ 1 ] )
x + = 1
#print(x)
time . sleep ( 1 )
if create_confirmation == " n " or create_confirmation == " N " or create_confirmation == " no " or create_confirmation == " No " :
print ( " \n Exiting... \n " )
def deactivate_multiple_accounts ( ) :
print ( " Deactivate multiple user accounts selected " )
user_list_location = input ( " \n Please enter the path of the file containing a csv list of names: " )
with open ( user_list_location , newline = ' ' ) as f :
reader = csv . reader ( f )
data = list ( reader )
delete_confirmation = input ( " \n " + str ( data ) + " \n \n Are you sure you want to deactivate these users? y/n? \n " )
#print(len(data[0]))
#print(data[0][0])
if delete_confirmation == " y " or delete_confirmation == " Y " or delete_confirmation == " yes " or delete_confirmation == " Yes " :
x = 0
while x < = ( len ( data [ 0 ] ) - 1 ) :
#print(data[0][x])
deactivate_account ( data [ 0 ] [ x ] )
x + = 1
#print(x)
time . sleep ( 1 )
if delete_confirmation == " n " or delete_confirmation == " N " or delete_confirmation == " no " or delete_confirmation == " No " :
print ( " \n Exiting... \n " )
def list_directory_rooms ( ) :
command_string = " curl -kX GET https:// " + server_url + " : " + str ( federation_port ) + " /_matrix/client/r0/publicRooms?access_token= " + access_token
print ( " \n " + command_string + " \n " )
process = subprocess . run ( [ command_string ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
output = process . stdout
print ( output )
# Example
# $ curl -kX GET https://perthchat.org/_matrix/client/r0/publicRooms?access_token=ACCESS_TOKEN
def remove_room_from_directory ( ) :
2020-06-01 06:50:01 -04:00
internal_ID = input ( " \n Enter the internal id of the room you wish to remove from the directory (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): " )
command_string = " curl -kX PUT -H \' Content-Type: application/json \' -d \' { \" visibility \" : \" private \" } \' \' https:// " + server_url + " : " + str ( federation_port ) + " /_matrix/client/r0/directory/list/room/ " + internal_ID + " ?access_token= " + access_token + " \' "
2020-05-12 00:43:03 -04:00
print ( " \n " + command_string + " \n " )
process = subprocess . run ( [ command_string ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
output = process . stdout
print ( output )
# Example
# $ curl -kX PUT -H 'Content-Type: application/json' -d '{"visibility": "private"}' 'https://perthchat.org/_matrix/client/r0/directory/list/room/!DwUPBvNapIVecNllgt:perthchat.org?access_token=ACCESS_TOKEN'
2020-08-25 06:09:57 -04:00
def list_media_in_room ( ) :
internal_ID = input ( " \n Enter the internal id of the room you want to get a list of media for (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): " )
command_string = " curl -kX GET https:// " + server_url + " : " + str ( federation_port ) + " /_synapse/admin/v1/room/ " + internal_ID + " /media?access_token= " + access_token
print ( " \n " + command_string + " \n " )
process = subprocess . run ( [ command_string ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
media_list_output = process . stdout
print ( media_list_output )
print_file_list_choice = input ( " \n Do you want to write this list to a file? y/n? " )
if print_file_list_choice == " y " or print_file_list_choice == " Y " or print_file_list_choice == " yes " or print_file_list_choice == " Yes " :
print_file_list_choice = " true "
elif print_file_list_choice == " n " or print_file_list_choice == " N " or print_file_list_choice == " no " or print_file_list_choice == " No " :
print_file_list_choice = " false "
else :
print ( " Input invalid! Defaulting to ' No ' . " )
print_file_list_choice = " false "
if print_file_list_choice == " true " :
media_list_filename = open ( " media_list.txt " , " w+ " )
media_list_filename . write ( media_list_output )
media_list_filename . close ( )
download_files_choice = input ( " \n Do you also want to download a copy of these media files? y/n? " )
if download_files_choice == " y " or download_files_choice == " Y " or download_files_choice == " yes " or download_files_choice == " Yes " :
download_files_choice = " true "
elif download_files_choice == " n " or download_files_choice == " N " or download_files_choice == " no " or download_files_choice == " No " :
download_files_choice = " false "
else :
print ( " Input invalid! Defaulting to ' No ' . " )
download_files_choice = " false "
if download_files_choice == " true " :
media_list_filename = open ( " media_list.txt " , " r " )
media_list_filename_lines = media_list_filename . readlines ( )
os . mkdir ( " ./media-files " )
os . chdir ( " ./media-files " )
count = 0
# Strips the newline character
for line in media_list_filename_lines :
if " mxc " in line :
print ( line )
cleaned_line = line . replace ( ' " mxc:// ' , ' ' )
cleaned_line = cleaned_line . replace ( ' " , ' , ' ' )
cleaned_line = cleaned_line . replace ( ' " ' , ' ' )
cleaned_line = cleaned_line . replace ( ' \n ' , ' ' )
print ( cleaned_line )
download_command = " wget https:// " + server_url + " /_matrix/media/r0/download/ " + cleaned_line
print ( download_command )
download_process = subprocess . run ( [ download_command ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
os . chdir ( " ../ " )
# Example
# $ curl -kX GET https://perthchat.org/_synapse/admin/v1/room/<room_id>/media?access_token=ACCESS_TOKEN
# To access via web:
# https://perthchat.org/_matrix/media/r0/download/ + server_name + "/" + media_id
def quarantine_media_in_room ( ) :
internal_ID = input ( " \n Enter the internal id of the room you want to quarantine, this makes local and remote data inaccessible (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): " )
command_string = " curl -kX PUT \' https:// " + server_url + " : " + str ( federation_port ) + " /_synapse/admin/v1/room/ " + internal_ID + " /media/quarantine?access_token= " + access_token
print ( " \n " + command_string + " \n " )
process = subprocess . run ( [ command_string ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
output = process . stdout
print ( output )
# UNTESTED
# Example
# $ curl -kX PUT 'https://perthchat.org/_synapse/admin/v1/room/!DwUPBvNapIVecNllgt:perthchat.org/media/quarantine?access_token=ACCESS_TOKEN'
def quarantine_users_media ( ) :
username = input ( " \n Please enter the username of the user who ' s media you want to quarantine: " )
username = parse_username ( username )
command_string = " curl -kX PUT https:// " + server_url + " : " + str ( federation_port ) + " /_synapse/admin/v1/user/@ " + username + " : " + server_url + " /media/quarantine?access_token= " + access_token
print ( " \n " + command_string + " \n " )
process = subprocess . run ( [ command_string ] , shell = True , stdout = subprocess . PIPE , universal_newlines = True )
output = process . stdout
print ( output )
# UNTESTED
# Example:
# $ curl -kX PUT https://perthchat.org/_synapse/admin/v1/user/@PC-Admin:perthchat.org/media/quarantine?access_token=ACCESS_TOKEN
2020-05-12 00:43:03 -04:00
# check if url is hard coded, if not set it
if server_url == " example.org " :
server_url = input ( " What is the URL of your server? Eg: example.org " )
# check if access token is hard coded, if not set it
length_access_token = len ( access_token )
if length_access_token == 0 :
access_token = input ( " Please enter access token for server admin account: " )
# check is federation port is hard coded, if not set it
2020-06-01 06:50:01 -04:00
if len ( federation_port ) == 0 :
2020-05-12 00:43:03 -04:00
federation_port = input ( " Please enter the federation port for the server (default is 8448): " )
# loop menu for various moderation actions
pass_token = False
while pass_token == False :
2020-08-25 06:09:57 -04:00
menu_input = input ( ' \n Please select one of the following options: \n \n 1) Deactivate a user account. \n 2) Create a user account. \n 3) Query user account. \n 4) Reset a users password. \n 5) Promote a user to server admin. \n 6) List all user accounts. \n 7) Create multiple user accounts. \n 8) Deactivate multiple user accounts. \n 9) List rooms in public directory. \n 10) Remove a room from the public directory. \n 11) List/Download all media in a room. \n 12) Quarantine all media in a room. \n 13) Quarantine all media a users uploaded. \n ( \' q \' or \' e \' ) Exit. \n \n ' )
2020-05-12 00:43:03 -04:00
if menu_input == " 1 " :
deactivate_account ( ' ' )
elif menu_input == " 2 " :
create_account ( ' ' , ' ' )
elif menu_input == " 3 " :
query_account ( )
elif menu_input == " 4 " :
reset_password ( )
elif menu_input == " 5 " :
2020-05-27 22:17:16 -04:00
set_user_server_admin ( )
2020-05-12 00:43:03 -04:00
elif menu_input == " 6 " :
2020-05-27 22:17:16 -04:00
list_accounts ( )
2020-05-12 00:43:03 -04:00
elif menu_input == " 7 " :
2020-05-27 22:17:16 -04:00
create_multiple_accounts ( )
2020-05-12 00:43:03 -04:00
elif menu_input == " 8 " :
2020-05-27 22:17:16 -04:00
deactivate_multiple_accounts ( )
2020-05-12 00:43:03 -04:00
elif menu_input == " 9 " :
2020-05-27 22:17:16 -04:00
list_directory_rooms ( )
2020-05-12 00:43:03 -04:00
elif menu_input == " 10 " :
2020-05-27 22:17:16 -04:00
remove_room_from_directory ( )
2020-08-25 06:09:57 -04:00
elif menu_input == " 11 " :
list_media_in_room ( )
elif menu_input == " 12 " :
quarantine_media_in_room ( )
elif menu_input == " 13 " :
quarantine_users_media ( )
2020-06-01 06:50:01 -04:00
elif menu_input == " q " or menu_input == " Q " or menu_input == " e " or menu_input == " E " :
2020-05-12 00:43:03 -04:00
print ( " \n Exiting... \n " )
pass_token = True
else :
2020-05-27 22:17:16 -04:00
print ( " \n Incorrect input detected, please select a number from 1 to 11! \n " )