fix busted recommended tags module. generate primitive report summaries. use incident_dict like disctionary for all_local_users and all_remote_users.

This commit is contained in:
PC-Admin 2023-07-31 21:36:44 +08:00
parent 65f297f594
commit c1bfa32e2d
4 changed files with 64 additions and 85 deletions

View File

@ -59,7 +59,7 @@ while pass_token == False:
print("\n#### Server Commands ####\t\t\t\t\t#### Report Generation ####")
print("100) Delete and block a specific media.\t\t\t\t150) Generate user report.")
print("101) Purge remote media repository up to a certain date.\t151) Decrypt user report .zip file.")
print("102) Prepare database for copying events of multiple rooms.\t152) Lookup homeserver admin contact email.")
print("102) Prepare database for copying events of multiple rooms.\t152) Lookup homeserver admin contact details.")
print("\t\t\t\t\t\t\t\t153) Send a test email (to yourself).")
print("#### rdlist ####\t\t\t\t\t\t154) Sent a test Matrix message (to yourself).")
print("120) Block all rooms with specific rdlist tags.\t\t\t155) Send test incident reports (to yourself).")

View File

@ -26,6 +26,8 @@ def sync_rdlist():
if "Your branch is up to date" not in status.stdout.decode():
print("Pulling latest changes from rdlist repo...")
os.chdir("./rdlist/")
# Avoid pulling changes if testing mode is enabled
if hardcoded_variables.testing_mode == False:
subprocess.run(["git", "pull"], check=True)
os.chdir("..")
else:
@ -35,46 +37,6 @@ def sync_rdlist():
print("Cloning rdlist repo...")
subprocess.run(["git", "clone", "ssh://gitea@code.glowers.club:1488/loj/rdlist.git"], check=True)
# def build_incident_report(users_list):
# # Git clone the rdlist repo to ./rdlist/
# sync_rdlist()
# # Load the summaries JSON file
# summaries_path = os.path.join("rdlist", "dist", "summaries.json")
# with open(summaries_path, 'r') as file:
# data = json.load(file)
# return incidents_dict
# # Example of the data structure we're trying to build/transform:
# # users_list = ["@billybob:matrix.org", "@johndoe:matrix.org", "@pedobear:perthchat.org", "@randomcreep:perthchat.org", "@fatweeb:grin.hu"]
# #
# # becomes:
# #
# # incidents_dict = {
# # f"@billybob:matrix.org": {
# # "!dummyid1:matrix.org": ["csam", "lolicon", "beastiality"],
# # "!dummyid2:matrix.org": ["csam", "anarchy"]
# # },
# # f"@johndoe:matrix.org": {
# # "!dummyid3:matrix.org": ["csam", "lolicon", "toddlercon"],
# # "!dummyid4:matrix.org": ["csam", "terrorism"]
# # },
# # f"@pedobear:perthchat.org": {
# # "!dummyid5:matrix.org": ["csam", "lolicon", "jailbait"],
# # "!dummyid6:matrix.org": ["csam", "hub_links"]
# # },
# # f"@randomcreep:perthchat.org": {
# # "!dummyid7:matrix.org": ["csam", "jailbait"],
# # "!dummyid8:matrix.org": ["csam", "pre_ban"]
# # },
# # f"@fatweeb:grin.hu": {
# # "!dummyid9:matrix.org": ["csam", "lolicon"],
# # "!dummyid10:matrix.org": ["csam", "degen"]
# # }
# # }
# A function to return the rdlist tags associated with a room
def get_rdlist_tags(preset_internal_ID):
if preset_internal_ID == '':
@ -127,65 +89,70 @@ def block_all_rooms_with_rdlist_tags(rdlist_use_recommended,preset_user_ID,prese
with open(summaries_path, 'r') as file:
data = json.load(file)
# Create an empty list to store all the room_ids
all_room_ids = []
# Create an empty dictionary to store all the room_ids for each user
all_local_users = dict()
all_remote_users = dict()
# Create a set to store all room_ids
all_room_ids = set()
# Create a dictionary to store the tags for each room
room_tags = dict()
# Iterate over blocked_tags
for tag in blocked_tags:
# Filter the data to keep only the entries where the tag appears in the "tags" list
filtered_data = [item for item in data if 'report_info' in item and 'tags' in item['report_info'] and tag in item['report_info']['tags']]
# Store the tags for each room
for item in filtered_data:
if 'room' in item and 'room_id' in item['room']:
room_id = item['room']['room_id']
all_room_ids.add(room_id) # add the room_id to the set
if room_id not in room_tags:
room_tags[room_id] = set()
room_tags[room_id].update(item['report_info']['tags'])
# Extract the room_ids
room_ids = [item['room']['room_id'] for item in filtered_data if 'room' in item and 'room_id' in item['room']]
# Add the room_ids to the list of all room_ids
all_room_ids.extend(room_ids)
# If choosing specific tags, print the tag and corresponding room_ids
if rdlist_use_recommended == False:
# Print the tag and corresponding room_ids
print(f"Tag: {tag}\nRoom IDs: {room_ids}\n")
# Deduplicate the list of all room_ids
all_room_ids = list(set(all_room_ids))
# Examine these room_ids for local and remote users
all_local_users = []
all_remote_users = []
for room_id in all_room_ids:
joined_local_members = room_commands.get_room_members(room_id, True)
all_local_users.extend(joined_local_members)
joined_remote_members = room_commands.get_room_members(room_id, False)
all_remote_users.extend(joined_remote_members)
for room_id in room_ids:
joined_members = room_commands.get_room_members(room_id)
# Deduplicate the list of all local users
all_local_users = list(set(all_local_users))
#print("all_local_users: " + str(all_local_users))
# For each user, add the room_id and tags to the dictionary
for user_id in joined_members:
if user_id.endswith(hardcoded_variables.base_url):
if user_id not in all_local_users:
all_local_users[user_id] = dict()
all_local_users[user_id][room_id] = list(room_tags.get(room_id, []))
else:
if user_id not in all_remote_users:
all_remote_users[user_id] = dict()
all_remote_users[user_id][room_id] = list(room_tags.get(room_id, []))
all_room_ids = list(all_room_ids) # convert the set to a list
print(f"all_local_users: {all_local_users}")
print(f"all_remote_users: {all_remote_users}")
print(f"all_room_ids: {all_room_ids}")
# If there's at least 1 local user detected, ask the admin if they want to generate a user report for every user found in rdlist rooms
if len(all_local_users) > 0:
print(f"\nWARNING! The following local users are current members of rooms tagged in rdlist: {all_local_users}")
print(f"\nWARNING! The following local users are current members of rooms tagged in rdlist: {list(all_local_users.keys())}")
generate_user_report_confirmation = input("\nDo you want to generate a user report file for each of these users? y/n? ")
if generate_user_report_confirmation.lower() in ['y', 'yes', 'Y', 'Yes']:
for user_id in all_local_users:
report_commands.generate_user_report(user_id)
print(f"\nGenerating user report for user: {user_id}")
# Generate report_dict for each user
report_content = report_commands.generate_rdlist_report_summary(all_local_users[user_id], user_id)
report_commands.generate_user_report(user_id, report_content)
elif generate_user_report_confirmation.lower() in ['n', 'no', 'N', 'No']:
print("\nSkipping user report generation...\n")
elif len(all_local_users) == 0:
print(f"\nNo local users were found in rdlist rooms.")
# Deduplicate the list of all remote users
all_remote_users = list(set(all_remote_users))
all_remote_users = [user for user in all_remote_users if user not in all_local_users]
#print("all_remote_users: " + str(all_remote_users))
# Ask the admin if they would like to mail off an incident report for every remote user found in rdlist rooms
# if len(all_remote_users) > 0:
# print(f"\nThe following remote users are current members of rooms tagged in rdlist: {all_remote_users}")
# send_incident_report_confirmation = input("\nDo you want to send an incident report to the abuse email address for each of these users? y/n? ")
# if send_incident_report_confirmation.lower() in ['y', 'yes', 'Y', 'Yes']:
# build_incident_report(all_remote_users)
# #for user_id in all_remote_users:
# # report_commands.send_incident_report(user_id)
# elif send_incident_report_confirmation.lower() in ['n', 'no', 'N', 'No']:
# print("\nSkipping incident report generation...\n")
# Todo: Add Incident Report section
# Ask the user if they wish to block and purge all these rooms, then collect shutdown parameters
if preset_user_ID == '':
@ -201,8 +168,6 @@ def block_all_rooms_with_rdlist_tags(rdlist_use_recommended,preset_user_ID,prese
elif preset_message != '':
message = preset_message
#print(f"all_room_ids: {all_room_ids}")
# Ask the user if they wish to block and purge all these rooms
shutdown_confirmation = input("\nNumber of rdlist rooms being shutdown: " + str(len(all_room_ids)) + "\n\nAre you sure you want to block/shutdown these rooms? y/n? ")
@ -268,7 +233,7 @@ def block_recommended_rdlist_tags():
# Create user account
user_commands.create_account(hardcoded_variables.rdlist_bot_username, hardcoded_variables.rdlist_bot_password)
else:
print(f"\n@{hardcoded_variables.rdlist_bot_username}:{hardcoded_variables.base_url} account already exists. Resetting account password.")
print(f"@{hardcoded_variables.rdlist_bot_username}:{hardcoded_variables.base_url} account already exists. Resetting account password.\n")
user_commands.reset_password(hardcoded_variables.rdlist_bot_username, hardcoded_variables.rdlist_bot_password)
# Define default valies for shutdown_room()

View File

@ -95,7 +95,7 @@ def encrypt_user_folder(user_report_folder, username):
pyAesCrypt.encryptFile(zip_file_name, zip_file_name + ".aes", strong_password, bufferSize)
# Delete the original zip file
os.remove(zip_file_name)
#os.remove(zip_file_name)
# Write the password to a file
password_file = open(zip_file_name + ".aes" + ".password", "w")
@ -337,6 +337,20 @@ def test_send_email():
else:
print("\nFailed to send email.")
def generate_rdlist_report_summary(room_dict, user_id):
print(f"user_dict: {room_dict}")
report_content = f"""\n~~~User Report~~~\n\nUsername: {user_id}\n"""
for room_id, rdlist_tags in room_dict.items():
report_content += f"\nWas a member of this room: {room_id}\nThis room has been flagged with the following rdlist tags:\n"
for tag in rdlist_tags:
tag_description = rdlist_tag_descriptions.get(tag, "No description available.")
report_content += f" - {tag} ({tag_description})\n"
report_content
return report_content
def prepare_email_content(user_dict, from_whois, baseurl):
email_content = f"""Dear Administrator,

View File

@ -31,7 +31,7 @@ def get_room_details(preset_internal_ID):
# Example
# $ curl -kXGET 'https://matrix.perthchat.org/_synapse/admin/v1/rooms/!OeqILBxiHahidSQQoC:matrix.org?access_token=ACCESS_TOKEN'
def get_room_members(preset_internal_ID, local_only):
def get_room_members(preset_internal_ID, local_only=False):
if preset_internal_ID == '':
internal_ID = input("\nEnter the internal id of the room you wish to query (Example: !OLkDvaYjpNrvmwnwdj:matrix.org): ")
elif preset_internal_ID != '':