mirror of
https://github.com/PC-Admin/matrix-moderation-tool.git
synced 2024-12-19 07:00:27 -05:00
replace 4 spaces with tabs, i <3 tabs!
This commit is contained in:
parent
58ed3defee
commit
1aea8c7f93
218
user_commands.py
218
user_commands.py
@ -216,110 +216,110 @@ def whois_account(preset_username, output_file=None):
|
||||
# $ curl -kXGET https://matrix.perthchat.org/_matrix/client/r0/admin/whois/@PC-Admin:perthchat.org?access_token=ACCESS_TOKEN
|
||||
|
||||
def whois_multiple_accounts():
|
||||
print("Whois multiple user accounts selected")
|
||||
user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
|
||||
with open(user_list_location, newline='') as f:
|
||||
reader = csv.reader(f)
|
||||
data = list(reader)
|
||||
print(len(data))
|
||||
print("Whois multiple user accounts selected")
|
||||
user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
|
||||
with open(user_list_location, newline='') as f:
|
||||
reader = csv.reader(f)
|
||||
data = list(reader)
|
||||
print(len(data))
|
||||
|
||||
print("\n" + str(data))
|
||||
print("\n" + str(data))
|
||||
|
||||
output_file = None
|
||||
if len(data) > 10:
|
||||
file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n")
|
||||
if file_confirmation.lower() in ("y", "yes"):
|
||||
output_file = input("\nPlease enter the desired output file path:\n")
|
||||
output_file = None
|
||||
if len(data) > 10:
|
||||
file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n")
|
||||
if file_confirmation.lower() in ("y", "yes"):
|
||||
output_file = input("\nPlease enter the desired output file path:\n")
|
||||
|
||||
whois_confirmation = input("\n\nAre you sure you want to whois all of these users? y/n?\n")
|
||||
whois_confirmation = input("\n\nAre you sure you want to whois all of these users? y/n?\n")
|
||||
|
||||
if whois_confirmation.lower() in ("y", "yes"):
|
||||
x = 0
|
||||
while x <= (len(data) - 1):
|
||||
output = whois_account(data[x][0])
|
||||
if whois_confirmation.lower() in ("y", "yes"):
|
||||
x = 0
|
||||
while x <= (len(data) - 1):
|
||||
output = whois_account(data[x][0])
|
||||
|
||||
# if output file is specified, append to file
|
||||
if output_file:
|
||||
with open(output_file, 'a') as f:
|
||||
f.write(output + "\n")
|
||||
x += 1
|
||||
time.sleep(1)
|
||||
# if output file is specified, append to file
|
||||
if output_file:
|
||||
with open(output_file, 'a') as f:
|
||||
f.write(output + "\n")
|
||||
x += 1
|
||||
time.sleep(1)
|
||||
|
||||
if whois_confirmation.lower() in ("n", "no"):
|
||||
print("\nExiting...\n")
|
||||
if whois_confirmation.lower() in ("n", "no"):
|
||||
print("\nExiting...\n")
|
||||
|
||||
if output_file and os.path.isfile(output_file):
|
||||
print(f"Output saved to {output_file}")
|
||||
if output_file and os.path.isfile(output_file):
|
||||
print(f"Output saved to {output_file}")
|
||||
|
||||
def is_valid_ipv4(ip):
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET, ip)
|
||||
except socket.error: # not a valid address
|
||||
return False
|
||||
return True
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET, ip)
|
||||
except socket.error: # not a valid address
|
||||
return False
|
||||
return True
|
||||
|
||||
def analyse_account_ip(preset_username):
|
||||
if not preset_username:
|
||||
preset_username = input("\nPlease enter a username to analyse their country of origin: ")
|
||||
user_info = whois_account(preset_username=preset_username)
|
||||
if not preset_username:
|
||||
preset_username = input("\nPlease enter a username to analyse their country of origin: ")
|
||||
user_info = whois_account(preset_username=preset_username)
|
||||
|
||||
data = json.loads(user_info)
|
||||
|
||||
user_id = data['user_id']
|
||||
#print(f'user_id: {user_id}')
|
||||
device_data = data['devices']
|
||||
#print(f'device_data: {device_data}')
|
||||
data = json.loads(user_info)
|
||||
|
||||
user_id = data['user_id']
|
||||
#print(f'user_id: {user_id}')
|
||||
device_data = data['devices']
|
||||
#print(f'device_data: {device_data}')
|
||||
|
||||
countries = []
|
||||
for device_id, device_info in device_data.items():
|
||||
for session in device_info['sessions']:
|
||||
for connection in session['connections']:
|
||||
ip = connection['ip']
|
||||
if is_valid_ipv4(ip):
|
||||
res = requests.get(f"https://ipinfo.io/{ip}",
|
||||
headers={"Authorization": f"Bearer {hardcoded_variables.ipinfo_token}"})
|
||||
if res.status_code == 200:
|
||||
country = res.json().get('country')
|
||||
countries.append(country)
|
||||
countries = []
|
||||
for device_id, device_info in device_data.items():
|
||||
for session in device_info['sessions']:
|
||||
for connection in session['connections']:
|
||||
ip = connection['ip']
|
||||
if is_valid_ipv4(ip):
|
||||
res = requests.get(f"https://ipinfo.io/{ip}",
|
||||
headers={"Authorization": f"Bearer {hardcoded_variables.ipinfo_token}"})
|
||||
if res.status_code == 200:
|
||||
country = res.json().get('country')
|
||||
countries.append(country)
|
||||
|
||||
print(f"User: {user_id} from Countries: {countries}")
|
||||
return(f"User: {user_id} from Countries: {countries}")
|
||||
print(f"User: {user_id} from Countries: {countries}")
|
||||
return(f"User: {user_id} from Countries: {countries}")
|
||||
|
||||
def analyse_multiple_account_ips():
|
||||
print("Analyse multiple user IPs selected")
|
||||
user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
|
||||
with open(user_list_location, newline='') as f:
|
||||
reader = csv.reader(f)
|
||||
data = list(reader)
|
||||
print(len(data))
|
||||
print("Analyse multiple user IPs selected")
|
||||
user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
|
||||
with open(user_list_location, newline='') as f:
|
||||
reader = csv.reader(f)
|
||||
data = list(reader)
|
||||
print(len(data))
|
||||
|
||||
print("\n" + str(data))
|
||||
print("\n" + str(data))
|
||||
|
||||
output_file = None
|
||||
if len(data) > 10:
|
||||
file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n")
|
||||
if file_confirmation.lower() in ("y", "yes"):
|
||||
output_file = input("\nPlease enter the desired output file path:\n")
|
||||
output_file = None
|
||||
if len(data) > 10:
|
||||
file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n")
|
||||
if file_confirmation.lower() in ("y", "yes"):
|
||||
output_file = input("\nPlease enter the desired output file path:\n")
|
||||
|
||||
analyse_confirmation = input("\n\nAre you sure you want to analyse the IP of all of these users? y/n?\n")
|
||||
analyse_confirmation = input("\n\nAre you sure you want to analyse the IP of all of these users? y/n?\n")
|
||||
|
||||
if analyse_confirmation.lower() in ("y", "yes"):
|
||||
x = 0
|
||||
while x <= (len(data) - 1):
|
||||
output = analyse_account_ip(data[x][0])
|
||||
if analyse_confirmation.lower() in ("y", "yes"):
|
||||
x = 0
|
||||
while x <= (len(data) - 1):
|
||||
output = analyse_account_ip(data[x][0])
|
||||
|
||||
# if output file is specified, append to file
|
||||
if output_file:
|
||||
with open(output_file, 'a') as f:
|
||||
f.write(output + "\n")
|
||||
x += 1
|
||||
time.sleep(1)
|
||||
# if output file is specified, append to file
|
||||
if output_file:
|
||||
with open(output_file, 'a') as f:
|
||||
f.write(output + "\n")
|
||||
x += 1
|
||||
time.sleep(1)
|
||||
|
||||
if analyse_confirmation.lower() in ("n", "no"):
|
||||
print("\nExiting...\n")
|
||||
if analyse_confirmation.lower() in ("n", "no"):
|
||||
print("\nExiting...\n")
|
||||
|
||||
if output_file and os.path.isfile(output_file):
|
||||
print(f"Output saved to {output_file}")
|
||||
if output_file and os.path.isfile(output_file):
|
||||
print(f"Output saved to {output_file}")
|
||||
|
||||
def list_joined_rooms(preset_username):
|
||||
if preset_username == '':
|
||||
@ -395,47 +395,47 @@ def query_account(preset_username):
|
||||
# $ curl -kX GET https://matrix.perthchat.org/_synapse/admin/v2/users/@billybob:perthchat.org?access_token=ACCESS_TOKEN
|
||||
|
||||
def query_multiple_accounts():
|
||||
print("Query multiple user accounts selected")
|
||||
user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
|
||||
with open(user_list_location, newline='') as f:
|
||||
reader = csv.reader(f)
|
||||
data = list(reader)
|
||||
print(len(data))
|
||||
print("Query multiple user accounts selected")
|
||||
user_list_location = input("\nPlease enter the path of the file containing a newline seperated list of Matrix usernames: ")
|
||||
with open(user_list_location, newline='') as f:
|
||||
reader = csv.reader(f)
|
||||
data = list(reader)
|
||||
print(len(data))
|
||||
|
||||
print("\n" + str(data))
|
||||
print("\n" + str(data))
|
||||
|
||||
output_file = None
|
||||
if len(data) > 10:
|
||||
file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n")
|
||||
if file_confirmation.lower() in ("y", "yes"):
|
||||
output_file = input("\nPlease enter the desired output file path:\n")
|
||||
output_file = None
|
||||
if len(data) > 10:
|
||||
file_confirmation = input("\nThere are more than 10 users. Would you like to save the output to a file? y/n?\n")
|
||||
if file_confirmation.lower() in ("y", "yes"):
|
||||
output_file = input("\nPlease enter the desired output file path:\n")
|
||||
|
||||
query_confirmation = input("\n\nAre you sure you want to query all of these users? y/n?\n")
|
||||
query_confirmation = input("\n\nAre you sure you want to query all of these users? y/n?\n")
|
||||
|
||||
if query_confirmation.lower() in ("y", "yes"):
|
||||
x = 0
|
||||
while x <= (len(data) - 1):
|
||||
output = query_account(data[x][0])
|
||||
if query_confirmation.lower() in ("y", "yes"):
|
||||
x = 0
|
||||
while x <= (len(data) - 1):
|
||||
output = query_account(data[x][0])
|
||||
|
||||
# if output file is specified, append to file
|
||||
if output_file:
|
||||
with open(output_file, 'a') as f:
|
||||
f.write(output + "\n")
|
||||
x += 1
|
||||
time.sleep(1)
|
||||
# if output file is specified, append to file
|
||||
if output_file:
|
||||
with open(output_file, 'a') as f:
|
||||
f.write(output + "\n")
|
||||
x += 1
|
||||
time.sleep(1)
|
||||
|
||||
if query_confirmation.lower() in ("n", "no"):
|
||||
print("\nExiting...\n")
|
||||
if query_confirmation.lower() in ("n", "no"):
|
||||
print("\nExiting...\n")
|
||||
|
||||
if output_file and os.path.isfile(output_file):
|
||||
print(f"Output saved to {output_file}")
|
||||
if output_file and os.path.isfile(output_file):
|
||||
print(f"Output saved to {output_file}")
|
||||
|
||||
def quarantine_users_media():
|
||||
username = input("\nPlease enter the username of the user who's media you want to quarantine: ")
|
||||
username = parse_username(username)
|
||||
|
||||
url = f"https://{hardcoded_variables.homeserver_url}/_synapse/admin/v1/user/@{username}:{hardcoded_variables.base_url}/media/quarantine?access_token={hardcoded_variables.access_token}"
|
||||
|
||||
|
||||
print("\n" + url + "\n")
|
||||
response = requests.post(url, verify=True)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user