re-write half the room functions to use requests module

This commit is contained in:
PC-Admin 2023-07-15 03:34:10 +08:00
parent 6e2e8a28d6
commit 07ec96334b

View File

@ -4,6 +4,7 @@ import json
import time import time
import os import os
import csv import csv
import requests
import hardcoded_variables import hardcoded_variables
def parse_username(username): def parse_username(username):
@ -17,11 +18,13 @@ def list_room_details(preset_internal_ID):
internal_ID = input("\nEnter the internal id of the room you wish to query (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ") internal_ID = input("\nEnter the internal id of the room you wish to query (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ")
elif preset_internal_ID != '': elif preset_internal_ID != '':
internal_ID = preset_internal_ID internal_ID = preset_internal_ID
command_string = "curl -kXGET 'https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/rooms/" + internal_ID + "?access_token=" + hardcoded_variables.access_token + "'" url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/rooms/{internal_ID}"
print("\n" + command_string + "\n") headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"}
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout print("\n" + url + "\n")
print(output) response = requests.get(url, headers=headers, verify=True)
print(response.text)
# Example # Example
# $ curl -kXGET 'https://matrix.perthchat.org/_synapse/admin/v1/rooms/!OeqILBxiHahidSQQoC:matrix.org?access_token=ACCESS_TOKEN' # $ curl -kXGET 'https://matrix.perthchat.org/_synapse/admin/v1/rooms/!OeqILBxiHahidSQQoC:matrix.org?access_token=ACCESS_TOKEN'
@ -31,21 +34,25 @@ def export_room_state(preset_internal_ID):
current_directory = os.getcwd() current_directory = os.getcwd()
if preset_internal_ID == '': if preset_internal_ID == '':
internal_ID = input("\nEnter the internal id of the room with with to export the 'state' of (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ") internal_ID = input("\nEnter the internal id of the room with which to export the 'state' of (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ")
elif preset_internal_ID != '': elif preset_internal_ID != '':
internal_ID = preset_internal_ID internal_ID = preset_internal_ID
room_dir = current_directory + "/state_events" room_dir = os.path.join(current_directory, "state_events")
os.makedirs(room_dir, exist_ok=True) os.makedirs(room_dir, exist_ok=True)
os.chdir(room_dir)
unix_time = int(time.time()) unix_time = int(time.time())
command_string = "curl -kXGET 'https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/rooms/" + internal_ID + "/state?access_token=" + hardcoded_variables.access_token + "' > ./" + internal_ID + "_state_" + str(unix_time) + ".json" url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/rooms/{internal_ID}/state"
print("\n" + command_string + "\n") headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"}
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) filename = os.path.join(room_dir, f"{internal_ID}_state_{unix_time}.json")
output = process.stdout
os.chdir(current_directory) print("\n" + url + "\n")
print(output) response = requests.get(url, headers=headers, verify=True)
with open(filename, 'w') as f:
f.write(response.text)
print(response.text)
# Example # Example
# $ curl -kXGET 'https://matrix.perthchat.org/_synapse/admin/v1/rooms/!OeqILBxiHahidSQQoC:matrix.org/state?access_token=ACCESS_TOKEN' # $ curl -kXGET 'https://matrix.perthchat.org/_synapse/admin/v1/rooms/!OeqILBxiHahidSQQoC:matrix.org/state?access_token=ACCESS_TOKEN'
@ -54,10 +61,13 @@ def export_room_state(preset_internal_ID):
# https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#room-state-api # https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#room-state-api
def list_directory_rooms(): def list_directory_rooms():
command_string = "curl -kXGET https://" + hardcoded_variables.homeserver_url + "/_matrix/client/r0/publicRooms?access_token=" + hardcoded_variables.access_token url = f"https://{hardcoded_variables.homeserver_url}/_matrix/client/r0/publicRooms"
print("\n" + command_string + "\n") headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"}
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout print("\n" + url + "\n")
response = requests.get(url, headers=headers, verify=True)
output = response.text
output = output.replace('\"room_id\":\"','\n') output = output.replace('\"room_id\":\"','\n')
output = output.replace('\",\"name','\n\",\"name') output = output.replace('\",\"name','\n\",\"name')
print(output) print(output)
@ -70,11 +80,14 @@ def remove_room_from_directory(preset_internal_ID):
internal_ID = input("\nEnter the internal id of the room you wish to remove from the directory (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ") internal_ID = input("\nEnter the internal id of the room you wish to remove from the directory (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ")
elif preset_internal_ID != '': elif preset_internal_ID != '':
internal_ID = preset_internal_ID internal_ID = preset_internal_ID
command_string = "curl -kX PUT -H \'Content-Type: application/json\' -d \'{\"visibility\": \"private\"}\' \'https://" + hardcoded_variables.homeserver_url + "/_matrix/client/r0/directory/list/room/" + internal_ID + "?access_token=" + hardcoded_variables.access_token + "\'" url = f"https://{hardcoded_variables.homeserver_url}/_matrix/client/r0/directory/list/room/{internal_ID}"
print("\n" + command_string + "\n") headers = {"Authorization": f"Bearer {hardcoded_variables.access_token}"}
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) data = {"visibility": "private"}
output = process.stdout
print(output) print("\n" + url + "\n")
response = requests.put(url, headers=headers, json=data, verify=True)
print(response.text)
# Example # Example
# $ curl -kX PUT -H 'Content-Type: application/json' -d '{"visibility": "private"}' 'https://matrix.perthchat.org/_matrix/client/r0/directory/list/room/!DwUPBvNapIVecNllgt:perthchat.org?access_token=ACCESS_TOKEN' # $ curl -kX PUT -H 'Content-Type: application/json' -d '{"visibility": "private"}' 'https://matrix.perthchat.org/_matrix/client/r0/directory/list/room/!DwUPBvNapIVecNllgt:perthchat.org?access_token=ACCESS_TOKEN'
@ -94,69 +107,73 @@ def remove_multiple_rooms_from_directory():
time.sleep(1) time.sleep(1)
def list_and_download_media_in_room(preset_internal_ID, preset_print_file_list_choice, preset_download_files_choice, base_directory): def list_and_download_media_in_room(preset_internal_ID, preset_print_file_list_choice, preset_download_files_choice, base_directory):
headers = {
'Authorization': f"Bearer {hardcoded_variables.access_token}"
}
if preset_internal_ID == '': if preset_internal_ID == '':
internal_ID = input("\nEnter the internal id of the room you want to get a list of media for (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ") internal_ID = input("\nEnter the internal id of the room you want to get a list of media for (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ")
elif preset_internal_ID != '': else:
internal_ID = preset_internal_ID internal_ID = preset_internal_ID
command_string = "curl -kXGET https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/room/" + internal_ID + "/media?access_token=" + hardcoded_variables.access_token
print("\n" + command_string + "\n") url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/room/{internal_ID}/media"
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) print("\n" + url + "\n")
media_list_output = process.stdout
#print("Full media list:\n" + media_list_output) response = requests.get(url, headers=headers, verify=True)
media_list_output = response.text
print("Full media list:\n" + media_list_output)
if preset_print_file_list_choice == '': if preset_print_file_list_choice == '':
print_file_list_choice = input("\nDo you want to write this list to a file? y/n? ") print_file_list_choice = input("\nDo you want to write this list to a file? y/n? ")
elif preset_print_file_list_choice != '': else:
print_file_list_choice = preset_print_file_list_choice print_file_list_choice = preset_print_file_list_choice
if print_file_list_choice == "y" or print_file_list_choice == "Y" or print_file_list_choice == "yes" or print_file_list_choice == "Yes": if print_file_list_choice.lower() in ["y", "yes", "Y", "Yes"]:
print_file_list_choice = "true" print_file_list_choice = "true"
elif print_file_list_choice == "n" or print_file_list_choice == "N" or print_file_list_choice == "no" or print_file_list_choice == "No": elif print_file_list_choice.lower() in ["n", "no", "N", "No"]:
print_file_list_choice = "false" print_file_list_choice = "false"
else: else:
print("Input invalid! Defaulting to 'No'.") print("Input invalid! Defaulting to 'No'.")
print_file_list_choice = "false" print_file_list_choice = "false"
room_dir = "./" + internal_ID room_dir = os.path.join(base_directory, internal_ID.replace('!', '').replace(':', '-'))
room_dir = room_dir.replace('!', '') os.makedirs(room_dir, exist_ok=True)
room_dir = room_dir.replace(':', '-')
os.mkdir(room_dir)
os.chdir(room_dir)
if print_file_list_choice == "true": if print_file_list_choice == "true":
media_list_filename_location = "./media_list.txt" media_list_filename_location = os.path.join(room_dir, "media_list.txt")
media_list_filename = open(media_list_filename_location,"w+") with open(media_list_filename_location,"w+") as media_list_filename:
media_list_filename.write(media_list_output) media_list_filename.write(media_list_output)
media_list_filename.close()
if preset_download_files_choice == '': if preset_download_files_choice == '':
download_files_choice = input("\nDo you also want to download a copy of these media files? y/n? ") download_files_choice = input("\nDo you also want to download a copy of these media files? y/n? ")
if preset_download_files_choice != '': else:
download_files_choice = preset_download_files_choice download_files_choice = preset_download_files_choice
if download_files_choice == "y" or download_files_choice == "Y" or download_files_choice == "yes" or download_files_choice == "Yes": if download_files_choice.lower() in ["y", "yes", "Y", "Yes"]:
download_files_choice = "true" download_files_choice = "true"
elif download_files_choice == "n" or download_files_choice == "N" or download_files_choice == "no" or download_files_choice == "No": elif download_files_choice.lower() in ["n", "no", "N", "No"]:
download_files_choice = "false" download_files_choice = "false"
else: else:
print("Input invalid! Defaulting to 'No'.") print("Input invalid! Defaulting to 'No'.")
download_files_choice = "false" download_files_choice = "false"
if download_files_choice == "true": if download_files_choice == "true":
media_list_output = media_list_output.split('\"') media_files_dir = os.path.join(room_dir, "media-files")
#print("New media list:\n" + str(media_list_output)) os.makedirs(media_files_dir, exist_ok=True)
os.mkdir("./media-files")
os.chdir("./media-files") media_list_output = json.loads(media_list_output)
count = 0 for key in ['local', 'remote']:
# Strips the newline character for media in media_list_output.get(key, []):
for line in media_list_output: media_url = media.replace('mxc://', f"https://{hardcoded_variables.homeserver_url}/_matrix/media/r0/download/")
if "mxc" in line: media_response = requests.get(media_url, stream=True, headers=headers, verify=True)
#print("Line is 1: \n\n" + line + "\n")
line = line.replace('mxc://','') if media_response.status_code == 200:
download_command = "wget https://" + hardcoded_variables.homeserver_url + "/_matrix/media/r0/download/" + line media_file_path = os.path.join(media_files_dir, media.split('/')[-1])
print(download_command) with open(media_file_path, 'wb') as media_file:
download_process = subprocess.run([download_command], shell=True, stdout=subprocess.PIPE, universal_newlines=True) media_file.write(media_response.content)
os.chdir(base_directory) print(f"Downloaded {media_url} to {media_file_path}")
else:
print(f"Failed to download {media_url}, status code: {media_response.status_code}")
# Example # Example
# $ curl -kXGET https://matrix.perthchat.org/_synapse/admin/v1/room/<room_id>/media?access_token=ACCESS_TOKEN # $ curl -kXGET https://matrix.perthchat.org/_synapse/admin/v1/room/<room_id>/media?access_token=ACCESS_TOKEN
@ -164,18 +181,6 @@ def list_and_download_media_in_room(preset_internal_ID,preset_print_file_list_ch
# To access via web: # To access via web:
# https://matrix.perthchat.org/_matrix/media/r0/download/ + server_name + "/" + media_id # https://matrix.perthchat.org/_matrix/media/r0/download/ + server_name + "/" + media_id
def redact_room_event():
internal_ID = input("\nEnter the internal id of the room the event is in (Example: !rapAelwZkajRyeZIpm:perthchat.org): ")
event_ID = input("\nEnter the event id of the event you wish to redact (Example: $lQT7NYYyVvwoVpZWcj7wceYQqeOzsJg1N6aXIecys4s): ")
redaction_reason = input("\nEnter the reason you're redacting this content: ")
command_string = "curl -X POST --header \"Authorization: Bearer " + hardcoded_variables.access_token + "\" --data-raw '{\"reason\": \"" + redaction_reason + "\"}' 'https://matrix.perthchat.org/_matrix/client/v3/rooms/" + internal_ID + "/redact/" + event_ID + "'"
print("\n" + command_string + "\n")
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
print(output)
# $ curl -X POST --header "Authorization: Bearer syt_..." --data-raw '{"reason": "Indecent material"}' 'https://matrix.perthchat.org/_matrix/client/v3/rooms/!fuYHAYyXqNLDxlKsWP:perthchat.org/redact/$nyjgZguQGadRRy8MdYtIgwbAeFcUAPqOPiaj_E60XZs'
# {"event_id":"$_m1gFtPg-5DiTyCvGfeveAX2xaA8gAv0BYLpjC8xe64"}
def download_media_from_multiple_rooms(): def download_media_from_multiple_rooms():
print("Download media from multiple rooms selected") print("Download media from multiple rooms selected")
download_media_list_location = input("\nPlease enter the path of the file containing a newline seperated list of room ids: ") download_media_list_location = input("\nPlease enter the path of the file containing a newline seperated list of room ids: ")
@ -202,13 +207,25 @@ def download_media_from_multiple_rooms():
#print(x) #print(x)
time.sleep(1) time.sleep(1)
def redact_room_event():
internal_ID = input("\nEnter the internal id of the room the event is in (Example: !rapAelwZkajRyeZIpm:perthchat.org): ")
event_ID = input("\nEnter the event id of the event you wish to redact (Example: $lQT7NYYyVvwoVpZWcj7wceYQqeOzsJg1N6aXIecys4s): ")
redaction_reason = input("\nEnter the reason you're redacting this content: ")
url = f"https://{hardcoded_variables.homeserver_url}/_matrix/client/v3/rooms/{internal_ID}/redact/{event_ID}"
print(f"\nRequesting: {url}\n")
output = requests.post(url, headers={'Authorization': f"Bearer {hardcoded_variables.access_token}"}, json={'reason': redaction_reason}, verify=True).text
print(output)
# $ curl -X POST --header "Authorization: Bearer syt_..." --data-raw '{"reason": "Indecent material"}' 'https://matrix.perthchat.org/_matrix/client/v3/rooms/!fuYHAYyXqNLDxlKsWP:perthchat.org/redact/$nyjgZguQGadRRy8MdYtIgwbAeFcUAPqOPiaj_E60XZs'
# {"event_id":"$_m1gFtPg-5DiTyCvGfeveAX2xaA8gAv0BYLpjC8xe64"}
def quarantine_media_in_room(): def quarantine_media_in_room():
internal_ID = input("\nEnter the internal id of the room you want to quarantine, this makes local and remote data inaccessible (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ") internal_ID = input("\nEnter the internal id of the room you want to quarantine, this makes local and remote data inaccessible (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ")
command_string = "curl -X POST \'https://" + hardcoded_variables.homeserver_url + "/_synapse/admin/v1/room/" + internal_ID + "/media/quarantine?access_token=" + hardcoded_variables.access_token + "\'" url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/room/{internal_ID}/media/quarantine"
print("\n" + command_string + "\n") print(f"\nRequesting: {url}\n")
process = subprocess.run([command_string], shell=True, stdout=subprocess.PIPE, universal_newlines=True) headers = {'Authorization': f'Bearer {hardcoded_variables.access_token}'}
output = process.stdout response = requests.post(url, headers=headers, verify=True)
print(output) print(response.text)
# Example # Example
# $ curl -X POST 'https://matrix.perthchat.org/_synapse/admin/v1/room/!DwUPBvNapIVecNllgt:perthchat.org/media/quarantine?access_token=ACCESS_TOKEN' # $ curl -X POST 'https://matrix.perthchat.org/_synapse/admin/v1/room/!DwUPBvNapIVecNllgt:perthchat.org/media/quarantine?access_token=ACCESS_TOKEN'