move ipinfo functions to seperate file, add rudimentary user reporting function

This commit is contained in:
PC-Admin 2023-07-23 04:21:50 +08:00
parent 94740aceb0
commit 3ad8469b5f
7 changed files with 341 additions and 109 deletions

View File

@ -88,21 +88,23 @@ To do:
5) Add more automated rdlist function with sane defaults - DONE
6) Add fully automated (should just return a web link and decryption password) reporting functions for users:
- Description of why the report was made (what happened)
- User's ID
- Whois Data
- Account Data
- Query Data
- Pushers List
- User's ID - DONE
- Whois Data - DONE
- Account Data - DONE
- Query Data - DONE
- Pushers List - DONE
- IPs + ipinfo Data - DONE
- List of the rooms the user is participating in, divided into 1:1 conversations and larger rooms
- The content of the messages they've sent (if they were sent to rooms your server is participating in)
- Copies of any media they've sent?
- Any other usernames associated with that IP
- Timestamp for when illegal material was accessed
- Description of report format and contents (to guide the reader)
7) Add a room report function to create a properly formatted report for rdlist
8) Add a function to extract a users email or 3PID
9) Do room shutdowns in parallel?
10) Add function for probing the support email of another server automatically
11) Automated incident report email to other server owners for more scalable coordination
12) Automated public room joining and reminder if reporting email is not available
12) Automated public room joining and reminder if reporting email is not available?
***
## rdlist Functionality
@ -161,3 +163,31 @@ The room was not found.
```
Note that this script before shutting these rooms down will save the state events to the "./state_events" folder, please keep this data as it's important for law enforcement.
***
## One-touch Reporting
WARNING: This section is under heavy development and shouldn't be used by anyone!!!
This script can automatically generate reports about user accounts for law enforcement.
It collects as much data about the target user account as possible, then packages it into an encrypted ZIP file that can be shared:
```
Please enter a number from the above menu, or enter 'q' or 'e' to exit.
70
Please enter the username to automatically generate a report: michael
...
Report generated successfully on user: "michael"
You can send this .zip file and password when reporting a user to law enforcement.
Password: RwiFrw9zouhVO7Dy9kW7
Encrypted .zip file location: ./reports/michael_2023-07-23_02-21-56.zip.aes
Encrypted .zip file size: 0.503927 MB
```

View File

@ -1,12 +1,14 @@
###########################################################################
# These values can be hard coded for easier usage: #
homeserver_url = "matrix.example.org"
base_url = "example.org"
access_token = ""
homeserver_url = "matrix.example.org" # Your homeserver URL
base_url = "example.org" # Your base URL (appears in usernames)
access_token = "" # Your homeserver admin access token
# ipinfo.io token
ipinfo_token = ""
ipinfo_token = "" # Leave blank to disable ipinfo.io lookups
# rdlist specific
rdlist_bot_username = "mod_team"
rdlist_bot_username = "mod_team" # The username to perform automated room shutdowns
rdlist_recommended_tags = ['hub_room_links', 'hub_room_trade', 'preban', 'degen_misc', 'beastiality', 'degen_porn', 'gore', 'snuff', 'degen_larp', 'hub_room_sussy', 'bot_spam', 'cfm', 'jailbait', 'bot_porn', 'toddlercon', 'loli', 'csam', 'tfm', 'degen_meet', 'stylized_3d_loli', '3d_loli']
# report generator
report_folder = "./reports" # Reports folder name
###########################################################################

77
ipinfo_commands.py Normal file
View File

@ -0,0 +1,77 @@
import os
import requests
import json
import csv
import time
import socket
import hardcoded_variables
import user_commands
def is_valid_ipv4(ip):
try:
socket.inet_pton(socket.AF_INET, ip)
except socket.error: # not a valid address
return False
return True
def analyse_account_ip(preset_username):
if not preset_username:
preset_username = input("\nPlease enter a username to analyse their country of origin: ")
data = user_commands.whois_account(preset_username=preset_username)
user_id = data['user_id']
device_data = data['devices']
ip_info = {}
for device_id, device_info in device_data.items():
for session in device_info['sessions']:
for connection in session['connections']:
ip = connection['ip']
if is_valid_ipv4(ip) and len(hardcoded_variables.ipinfo_token) > 0:
res = requests.get(f"https://ipinfo.io/{ip}",
headers={"Authorization": f"Bearer {hardcoded_variables.ipinfo_token}"})
if res.status_code == 200:
country = res.json().get('country')
ip_info[ip] = country
if len(hardcoded_variables.ipinfo_token) == 0:
return {"user_id": user_id, "ip_info": "IPINFO DISABLED"}
else:
return {"user_id": user_id, "ip_info": ip_info}
def analyse_multiple_account_ips():
print("Analyse multiple user IPs selected")
user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
with open(user_list_location, newline='') as f:
reader = csv.reader(f)
data = list(reader)
print(len(data))
print("\n" + str(data))
output_file = None
if len(data) > 10:
file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n")
if file_confirmation.lower() in ("y", "yes"):
output_file = input("\nPlease enter the desired output file path:\n")
analyse_confirmation = input("\n\nAre you sure you want to analyse the IP of all of these users? y/n?\n")
if analyse_confirmation.lower() in ("y", "yes"):
x = 0
while x <= (len(data) - 1):
output = analyse_account_ip(data[x][0])
# if output file is specified, append to file
if output_file:
with open(output_file, 'a') as f:
f.write(output + "\n")
x += 1
time.sleep(1)
if analyse_confirmation.lower() in ("n", "no"):
print("\nExiting...\n")
if output_file and os.path.isfile(output_file):
print(f"Output saved to {output_file}")

View File

@ -2,7 +2,9 @@
import user_commands
import room_commands
import server_commands
import ipinfo_commands
import rdlist_commands
import report_commands
import hardcoded_variables
# check if homeserver url is hard coded, if not set it
@ -51,9 +53,9 @@ while pass_token == False:
print("17) Set rate limit of a user account.")
print("18) Delete rate limit of a user account.")
print("19) Check if user account exists.")
print("\n#### Server Commands ####")
print("40) Delete and block a specific media.")
print("41) Purge remote media repository up to a certain date.")
print("\n#### Server Commands ####\t\t\t\t\t#### Report Generation ####")
print("40) Delete and block a specific media.\t\t\t\t70) Generate user report.")
print("41) Purge remote media repository up to a certain date.\t\t71) Decrypt user report .zip file.")
print("42) Prepare database for copying events of multiple rooms.")
print("\n#### rdlist ####")
print("50) Block all rooms with specific rdlist tags.")
@ -142,9 +144,13 @@ while pass_token == False:
elif menu_input == "51":
rdlist_commands.block_recommended_rdlist_tags()
elif menu_input == "60":
user_commands.analyse_account_ip('')
ipinfo_commands.analyse_account_ip('')
elif menu_input == "61":
user_commands.analyse_multiple_account_ips()
ipinfo_commands.analyse_multiple_account_ips()
elif menu_input == "70":
report_commands.generate_user_report('')
elif menu_input == "71":
report_commands.decrypt_zip_file()
elif menu_input == "q" or menu_input == "Q" or menu_input == "e" or menu_input == "E":
print("\nExiting...\n")
pass_token = True

166
report_commands.py Normal file
View File

@ -0,0 +1,166 @@
import os
import json
import random
import string
import datetime
import zipfile
import pyAesCrypt
import user_commands
import room_commands
import ipinfo_commands
import hardcoded_variables
# For testing the Report Generator, set this to True
testing_mode = False
def get_report_folder():
# Get report_folder from hardcoded_variables
report_folder = hardcoded_variables.report_folder
# If report_folder ends with a slash, remove it
if report_folder.endswith(os.sep):
report_folder = report_folder[:-1]
return report_folder
def encrypt_user_folder(user_report_folder, username):
# Generate a strong random password
strong_password = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(20))
# Get parent directory of user_report_folder
parent_directory = os.path.dirname(os.path.abspath(user_report_folder))
# Create the name of the .zip file including timestamp
zip_file_name = os.path.join(parent_directory, username + "_" + datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".zip")
# Create a .zip file of the specified folder
with zipfile.ZipFile(zip_file_name, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for root, dirs, files in os.walk(user_report_folder):
for file in files:
zip_file.write(os.path.join(root, file), arcname=os.path.relpath(os.path.join(root, file), user_report_folder))
# Buffer size - 64K
bufferSize = 64 * 1024
# Encrypt the .zip file
pyAesCrypt.encryptFile(zip_file_name, zip_file_name + ".aes", strong_password, bufferSize)
# Delete the original zip file
os.remove(zip_file_name)
# You can return the password if you need to use it later, or you can directly print it here
return strong_password, zip_file_name + ".aes"
def generate_user_report(preset_username):
if len(preset_username) == 0:
username = input("\nPlease enter the username to automatically generate a report: ")
username = user_commands.parse_username(username)
else:
username = user_commands.parse_username(preset_username)
# Check if user exists
if user_commands.check_user_account_exists(username) == True:
print("\nUser exists, continuing with report generation.")
return
# If report_folder ends in a slash, remove it
report_folder = get_report_folder()
# Create report folders
user_report_folder = report_folder + "/" + username + "/"
if os.path.exists(report_folder) == False:
os.mkdir(report_folder)
if os.path.exists(user_report_folder) == False:
os.mkdir(user_report_folder)
# Get user account data and write to ./report/username/account_data.json
account_data = user_commands.collect_account_data(username)
account_data_file = open(user_report_folder + "account_data.json", "w")
account_data_file.write(json.dumps(account_data, indent=4, sort_keys=True))
account_data_file.close()
# Get user pushers and write to ./report/username/pushers.json
pushers_data = user_commands.list_account_pushers(username)
pushers_file = open(user_report_folder + "pushers.json", "w")
pushers_file.write(json.dumps(pushers_data, indent=4, sort_keys=True))
pushers_file.close()
# Get whois data and write to ./report/username/whois.json
whois_data = user_commands.whois_account(username)
whois_file = open(user_report_folder + "whois.json", "w")
whois_file.write(json.dumps(whois_data, indent=4, sort_keys=True))
whois_file.close()
# Get query data and write to ./report/username/query.json
query_data = user_commands.query_account(username)
query_file = open(user_report_folder + "query.json", "w")
query_file.write(json.dumps(query_data, indent=4, sort_keys=True))
query_file.close()
# Get user joined rooms and write to ./report/username/joined_rooms.json
joined_rooms_dict = user_commands.list_joined_rooms(username)
joined_rooms_file = open(user_report_folder + "joined_rooms.json", "w")
joined_rooms_file.write(json.dumps(joined_rooms_dict, indent=4, sort_keys=True))
joined_rooms_file.close()
# Get user ipinfo and write to ./report/username/ipinfo.json
ipinfo = ipinfo_commands.analyse_account_ip(username)
ipinfo_file = open(user_report_folder + "ipinfo.json", "w")
ipinfo_file.write(json.dumps(ipinfo, indent=4, sort_keys=True))
ipinfo_file.close()
# For each room the user is in, get the room state and write to ./report/username/room_states/
room_states_folder = user_report_folder + "room_states/"
if os.path.exists(room_states_folder) == False:
os.mkdir(room_states_folder)
room_list = joined_rooms_dict.get('joined_rooms', [])
count = 0
for room in room_list:
count += 1
room = room.split(" ")[0]
room_commands.export_room_state(room, room_states_folder)
if count > 4 and testing_mode == True:
break
# For each room the user is in, get the room details and write to ./report/username/room_details/
room_details_folder = user_report_folder + "room_details/"
if os.path.exists(room_details_folder) == False:
os.mkdir(room_details_folder)
count = 0
for room in room_list:
count += 1
room = room.split(" ")[0]
room_details = room_commands.list_room_details(room)
room_details_file = open(room_details_folder + room + ".json", "w")
room_details_file.write(str(room_details))
room_details_file.close()
if count > 4 and testing_mode == True:
break
# Generate a random password, then encrypt the ./report/username/ folder to a timestamped .zip file
strong_password, encrypted_zip_file_name = encrypt_user_folder(user_report_folder, username)
# Measure the size of the encrypted .zip file in MB
encrypted_zip_file_size = os.path.getsize(encrypted_zip_file_name) / 1000000
# Print the password and the encrypted .zip file name
print("\nReport generated successfully on user: \"" + username + "\"\n\nYou can send this .zip file and password when reporting a user to law enforcement.")
print("\nPassword: " + strong_password)
print("Encrypted .zip file location: " + encrypted_zip_file_name)
print("Encrypted .zip file size: " + str(encrypted_zip_file_size) + " MB\n")
def decrypt_zip_file():
# Ask user for the location of the encrypted .zip file
encrypted_zip_file_name = input("\nPlease enter the location of the encrypted .zip file: ")
# Ask user for the password
strong_password = input("Please enter the password: ")
# Decrypt the ZIP file into the same location as the encrypted ZIP file
pyAesCrypt.decryptFile(encrypted_zip_file_name, encrypted_zip_file_name[:-4], strong_password, 64 * 1024)
# Print the location of the decrypted ZIP file
print("\nDecrypted .zip file location: " + encrypted_zip_file_name[:-4] + "\n")

View File

@ -24,12 +24,15 @@ def list_room_details(preset_internal_ID):
print("\n" + url + "\n")
response = requests.get(url, headers=headers, verify=True)
print(response.text)
room_details_dict = json.loads(response.text)
print(json.dumps(room_details_dict, indent=4, sort_keys=True))
return room_details_dict
# Example
# $ curl -kXGET 'https://matrix.perthchat.org/_synapse/admin/v1/rooms/!OeqILBxiHahidSQQoC:matrix.org?access_token=ACCESS_TOKEN'
def export_room_state(preset_internal_ID):
def export_room_state(preset_internal_ID, preset_directory):
# record the current directory location
current_directory = os.getcwd()
@ -38,7 +41,11 @@ def export_room_state(preset_internal_ID):
elif preset_internal_ID != '':
internal_ID = preset_internal_ID
if preset_directory == '':
room_dir = os.path.join(current_directory, "state_events")
elif preset_directory != '':
room_dir = preset_directory
os.makedirs(room_dir, exist_ok=True)
unix_time = int(time.time())
@ -52,8 +59,9 @@ def export_room_state(preset_internal_ID):
with open(filename, 'w') as f:
f.write(response.text)
print(response.text)
return(response.text)
state_events_dict = json.loads(response.text)
return state_events_dict
# Example
# $ curl -kXGET 'https://matrix.perthchat.org/_synapse/admin/v1/rooms/!OeqILBxiHahidSQQoC:matrix.org/state?access_token=ACCESS_TOKEN'
@ -71,7 +79,8 @@ def list_directory_rooms():
output = output.replace('\"room_id\":\"','\n')
output = output.replace('\",\"name','\n\",\"name')
print(output)
print(json.dumps(output, indent=4, sort_keys=True))
# Example
# $ curl -kXGET https://matrix.perthchat.org/_matrix/client/r0/publicRooms?access_token=ACCESS_TOKEN

View File

@ -210,7 +210,9 @@ def whois_account(preset_username, output_file=None):
else:
print(output_text)
return response.text
whois_account_dict = json.loads(response.text)
return whois_account_dict
# Example:
# $ curl -kXGET https://matrix.perthchat.org/_matrix/client/r0/admin/whois/@dogpoo:perthchat.org?access_token=ACCESS_TOKEN
@ -251,76 +253,6 @@ def whois_multiple_accounts():
if output_file and os.path.isfile(output_file):
print(f"Output saved to {output_file}")
def is_valid_ipv4(ip):
try:
socket.inet_pton(socket.AF_INET, ip)
except socket.error: # not a valid address
return False
return True
def analyse_account_ip(preset_username):
if not preset_username:
preset_username = input("\nPlease enter a username to analyse their country of origin: ")
user_info = whois_account(preset_username=preset_username)
data = json.loads(user_info)
user_id = data['user_id']
#print(f'user_id: {user_id}')
device_data = data['devices']
#print(f'device_data: {device_data}')
countries = []
for device_id, device_info in device_data.items():
for session in device_info['sessions']:
for connection in session['connections']:
ip = connection['ip']
if is_valid_ipv4(ip):
res = requests.get(f"https://ipinfo.io/{ip}",
headers={"Authorization": f"Bearer {hardcoded_variables.ipinfo_token}"})
if res.status_code == 200:
country = res.json().get('country')
countries.append(country)
print(f"User: {user_id} from Countries: {countries}")
return(f"User: {user_id} from Countries: {countries}")
def analyse_multiple_account_ips():
print("Analyse multiple user IPs selected")
user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
with open(user_list_location, newline='') as f:
reader = csv.reader(f)
data = list(reader)
print(len(data))
print("\n" + str(data))
output_file = None
if len(data) > 10:
file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n")
if file_confirmation.lower() in ("y", "yes"):
output_file = input("\nPlease enter the desired output file path:\n")
analyse_confirmation = input("\n\nAre you sure you want to analyse the IP of all of these users? y/n?\n")
if analyse_confirmation.lower() in ("y", "yes"):
x = 0
while x <= (len(data) - 1):
output = analyse_account_ip(data[x][0])
# if output file is specified, append to file
if output_file:
with open(output_file, 'a') as f:
f.write(output + "\n")
x += 1
time.sleep(1)
if analyse_confirmation.lower() in ("n", "no"):
print("\nExiting...\n")
if output_file and os.path.isfile(output_file):
print(f"Output saved to {output_file}")
def list_joined_rooms(preset_username):
if preset_username == '':
username = input("\nPlease enter the username you wish to query: ")
@ -338,6 +270,10 @@ def list_joined_rooms(preset_username):
else:
print(f"Error querying joined rooms: {response.status_code}, {response.text}")
joined_rooms_dict = json.loads(response.text)
return joined_rooms_dict
# Example:
# $ curl -kXGET https://matrix.perthchat.org/_synapse/admin/v1/users/@dogpoo:perthchat.org/joined_rooms?access_token=ACCESS_TOKEN
@ -389,7 +325,9 @@ def query_account(preset_username):
else:
print(f"Error querying account: {response.status_code}, {response.text}")
return response.text
query_account_dict = json.loads(response.text)
return query_account_dict
# Example:
# $ curl -kX GET https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN
@ -464,7 +402,9 @@ def collect_account_data(preset_username):
else:
print(f"Error querying account: {response.status_code}, {response.text}")
return response.text
account_data_dict = json.loads(response.text)
return account_data_dict
# Example:
# $ curl -X GET https://matrix.perthchat.org/_synapse/admin/v1/users/@dogpoo:perthchat.org/accountdata?access_token=ACCESS_TOKEN
@ -486,7 +426,9 @@ def list_account_pushers(preset_username):
else:
print(f"Error querying account: {response.status_code}, {response.text}")
return response.text
pusher_data_dict = json.loads(response.text)
return pusher_data_dict
# Example:
# $ curl -X GET https://matrix.perthchat.org/_synapse/admin/v1/users/@dogpoo:perthchat.org/pushers
@ -569,12 +511,12 @@ def check_user_account_exists(preset_username):
if response.status_code == 200:
print("User ID is available.")
return True
elif response.status_code == 400:
print(f"User ID already taken.")
return False
else:
print(f"Error querying account: {response.status_code}, {response.text}")
return response.text
# Example:
# $ curl -X GET /_synapse/admin/v1/username_available?username=dogpoo&access_token=ACCESS_TOKEN