add generic email send function for automated reporting, hardcode matrix.org abuse email because... well I wanna stay in their good books lol

This commit is contained in:
PC-Admin 2023-07-24 02:59:49 +08:00
parent 63dc5bd6d8
commit 492b384a92
4 changed files with 76 additions and 21 deletions

View File

@ -56,7 +56,7 @@ while pass_token == False:
print("\n#### Server Commands ####\t\t\t\t\t#### Report Generation ####")
print("40) Delete and block a specific media.\t\t\t\t70) Generate user report.")
print("41) Purge remote media repository up to a certain date.\t\t71) Decrypt user report .zip file.")
print("42) Prepare database for copying events of multiple rooms.")
print("42) Prepare database for copying events of multiple rooms.\t72) Send a test email.")
print("43) Lookup homeserver admin contact email.")
print("\n#### rdlist ####")
print("50) Block all rooms with specific rdlist tags.")
@ -154,6 +154,8 @@ while pass_token == False:
report_commands.generate_user_report('')
elif menu_input == "71":
report_commands.decrypt_zip_file()
elif menu_input == "72":
report_commands.test_send_email()
elif menu_input == "q" or menu_input == "Q" or menu_input == "e" or menu_input == "E":
print("\nExiting...\n")
pass_token = True

View File

@ -6,6 +6,12 @@ import string
import datetime
import zipfile
import pyAesCrypt
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email.utils import COMMASPACE
from email import encoders
import user_commands
import room_commands
import ipinfo_commands
@ -164,3 +170,49 @@ def decrypt_zip_file():
pyAesCrypt.decryptFile(encrypted_zip_file_name, encrypted_zip_file_name[:-4], strong_password, 64 * 1024)
# Print the location of the decrypted ZIP file
print("\nDecrypted .zip file location: " + encrypted_zip_file_name[:-4] + "\n")
def send_email(email_address, email_subject, email_content, email_attachments):
assert isinstance(email_attachments, list)
msg = MIMEMultipart() # Create a multipart message
msg['From'] = hardcoded_variables.smtp_user
msg['To'] = COMMASPACE.join([email_address])
msg['Subject'] = email_subject
msg.attach(MIMEText(email_content)) # Attach the email body
# Attach files
for file in email_attachments:
part = MIMEBase('application', "octet-stream")
with open(file, 'rb') as f:
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', 'attachment', filename=os.path.basename(file))
msg.attach(part)
try:
# Send the email via SMTP server
smtp = smtplib.SMTP(hardcoded_variables.smtp_server, hardcoded_variables.smtp_port)
smtp.starttls()
smtp.login(hardcoded_variables.smtp_user, hardcoded_variables.smtp_password)
smtp.sendmail(hardcoded_variables.smtp_user, email_address, msg.as_string())
smtp.close()
return True
except Exception as e:
print(f"Failed to send email: {e}")
return False
def test_send_email():
# Ask the user for the destination email address
email_address = input("\nPlease enter the destination email address to send this test email too: ")
# Example email parameters
email_subject = "Test Email"
email_content = "This is a test email."
email_attachments = ["./test_data/evil_clown.jpeg"] # List of file paths. Adjust this to the actual files you want to attach.
# Try to send the email
if send_email(email_address, email_subject, email_content, email_attachments):
print("\nEmail successfully sent.")
else:
print("\nFailed to send email.")

View File

@ -139,18 +139,23 @@ def prepare_database_copy_of_multiple_rooms():
print("\nThe sql query files have been generated, as postgres user in container run:\n# docker exec -it matrix-postgres /bin/bash\nbash-5.0$ export PGPASSWORD=your-db-password\nbash-5.0$ for f in /var/lib/postgresql/data/ramdisk/*/dump_room_data.sql; do psql --host=127.0.0.1 --port=5432 --username=synapse -w -f $f; done\n\nAfter copying the data to a cloud location law enforcement can access, clean up the ramdisk like so:\n# rm -r /matrix/postgres/data/ramdisk/*\n# umount /matrix/postgres/data/ramdisk")
def lookup_homeserver_admin_email(preset_homeserver):
if preset_homeserver == '':
homeserver = input("\nEnter the base URL to collect the admin contact details (Example: matrix.org): ")
elif preset_homeserver != '':
homeserver = preset_homeserver
def lookup_homeserver_admin_email(preset_baseurl):
if preset_baseurl == '':
baseurl = input("\nEnter the base URL to collect the admin contact details (Example: matrix.org): ")
elif preset_baseurl != '':
baseurl = preset_baseurl
# If baseurl is matrix.org, return 'abuse@matrix.org' as a hardcoded response
if baseurl == "matrix.org":
print("\nAdmin contact email(s) for " + baseurl + " are: abuse@matrix.org")
return {"matrix.org": ["abuse@matrix.org"]}, False
# Check target homserver for MSC1929 support email
url = f"https://{homeserver}/.well-known/matrix/support"
url = f"https://{baseurl}/.well-known/matrix/support"
try:
response = requests.get(url)
except requests.exceptions.RequestException as e:
print(f"Error: Unable to connect to server {homeserver}. Trying WHOIS data...")
print(f"Error: Unable to connect to server {baseurl}. Trying WHOIS data...")
response = None
# If the request was successful, the status code will be 200
@ -161,29 +166,25 @@ def lookup_homeserver_admin_email(preset_homeserver):
# Extract the emails from the admins field and remove duplicates
admin_emails = list({admin['email_address'] for admin in data['admins']})
print("Admin contact emails for " + homeserver + " are: " + str(admin_emails))
print("\nAdmin contact emails for " + baseurl + " are: " + str(admin_emails))
# Create a dictionary with homeserver as key and emails as value
email_dict = {homeserver: admin_emails}
# Convert the dictionary to a JSON string and print it
email_json = json.dumps(email_dict, indent=4)
print("Admin contact emails for " + homeserver + " in JSON format: " + email_json)
# Create a dictionary with baseurl as key and emails as value
email_dict = {baseurl: admin_emails}
return email_dict, False
else:
print(f"Error: Unable to collect admin email from server {homeserver}")
print(f"Error: Unable to collect admin email from server {baseurl}")
print("Attempting to collect admin email from WHOIS data...")
# Get WHOIS data
try:
w = whois.whois(homeserver)
w = whois.whois(baseurl)
if w.emails:
print("Admin contact email(s) for " + homeserver + " are: " + str(w.emails))
return {homeserver: list(w.emails)}, True
print("\nAdmin contact email(s) for " + baseurl + " are: " + str(w.emails))
return {baseurl: list(w.emails)}, True
else:
print(f"Error: Unable to collect admin email from WHOIS data for {homeserver}")
print(f"Error: Unable to collect admin email from WHOIS data for {baseurl}")
return None, False
except:
print(f"Error: Unable to collect WHOIS data for {homeserver}")
print(f"Error: Unable to collect WHOIS data for {baseurl}")
return None, False

BIN
test_data/evil_clown.jpeg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB