Files
python-asio-smtp/run.py
Yannik Schmidt ebdf98b4a7 asdasd
2022-10-11 04:46:40 +02:00

120 lines
3.4 KiB
Python

import argparse
import keycloak
import json
import time
from keycloak import KeycloakAdmin
import logging
import datetime
import smtplib
import ssl
db = {}
last_mail = None
times = []
timesToIP = {}
def send_mail(ipCount, times, args):
port = 465 # For SSL
# Create a secure SSL context
context = ssl.create_default_context()
timeString = ""
for t in times:
timeString += t.isoformat() + "\n"
orginString = ""
for ip, count in ipCount.items():
orginString += "{} : {}\n".format(ip.ljust(12), str(count).ljust(4))
with smtplib.SMTP_SSL(args.mail_server, port, context=context) as server:
sender = args.mail_user + "@" + args.mail_server
server.login(sender, args.mail_pass)
message = "Subject: Brute Force Detected\nAt Times:{}\n\nfrom origin(s):{}".format(
timeString, orginString)
server.sendmail(sender, args.admin_mail, message)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Keycloak Autoconfig')
parser.add_argument("--realm", default="master")
parser.add_argument("--kc-admin-user", default="admin")
parser.add_argument("--kc-admin-pass", required=True)
parser.add_argument("--kc-admin-auth-url", required=True)
parser.add_argument("--mail-user", default="keycloak")
parser.add_argument("--mail-pass", required=True)
parser.add_argument("--mail-server", required=True)
parser.add_argument("--admin-mail", required=True)
args = parser.parse_args()
#logging.basicConfig(level=logging.DEBUG)
kc_admin = KeycloakAdmin(server_url=args.kc_admin_auth_url,
username=args.kc_admin_user,
password=args.kc_admin_pass,
realm_name=args.realm,
verify=True)
startDate = datetime.datetime.now().strftime("%y-%d-%m")
endDate = datetime.datetime.now().strftime("%y-%d-%m")
filterDate = { "dateFrom" : startDate, "dateTo" : endDate, "max" : 11 }
events = []
counter = 1
found_new = True
while(found_new):
events = []
events = kc_admin.get_events(filterDate.update({ "first" : counter * 10 }))
counter += 1
print(events)
time.sleep(1)
print("======")
found_new = False
for e in events:
if e["type"] == "LOGIN_ERROR":
uid = str(e["time"]) + str(e["userId"])
if uid in db:
continue
else:
db[uid] = e
timeInJson = int(e["time"]) # careful this is unix in ms
timeParsed = datetime.datetime.fromtimestamp(timeInJson//1000)
times.append(timeParsed)
timesToIP.update({ timeParsed : e["ipAddress"] })
found_new = True
# compute count #
print("Computing failures...")
times = list(filter(lambda x: x - datetime.datetime.now() < datetime.timedelta(hours=3), times))
ipCount = {}
if len(times) > 10:
for t in times:
ip = timesToIP[t]
if ip in ipCount:
ipCount[ip] += 1
else:
ipCount[ip] = 1
print("Brutefoce detected")
print("Source IP(s)")
for ip in ipCount.keys():
print(ip, ":", ipCount[ip])
send_mail(ipCount, times, args)
else:
print("Only {} failures!".format(len(times)))