From ebdf98b4a70b2984306cd84deb07dfec3f6f48de Mon Sep 17 00:00:00 2001 From: Yannik Schmidt Date: Tue, 11 Oct 2022 04:46:40 +0200 Subject: [PATCH] asdasd --- run.py | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 run.py diff --git a/run.py b/run.py new file mode 100644 index 0000000..79ce065 --- /dev/null +++ b/run.py @@ -0,0 +1,119 @@ +import argparse +import keycloak +import json +import time + +from keycloak import KeycloakAdmin + +import logging +import datetime +import smtplib +import ssl + +db = {} + +last_mail = None +times = [] +timesToIP = {} + +def send_mail(ipCount, times, args): + port = 465 # For SSL + + # Create a secure SSL context + context = ssl.create_default_context() + + timeString = "" + for t in times: + timeString += t.isoformat() + "\n" + + orginString = "" + for ip, count in ipCount.items(): + orginString += "{} : {}\n".format(ip.ljust(12), str(count).ljust(4)) + + with smtplib.SMTP_SSL(args.mail_server, port, context=context) as server: + sender = args.mail_user + "@" + args.mail_server + server.login(sender, args.mail_pass) + + message = "Subject: Brute Force Detected\nAt Times:{}\n\nfrom origin(s):{}".format( + timeString, orginString) + + server.sendmail(sender, args.admin_mail, message) + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser(description='Keycloak Autoconfig') + + parser.add_argument("--realm", default="master") + + parser.add_argument("--kc-admin-user", default="admin") + parser.add_argument("--kc-admin-pass", required=True) + + parser.add_argument("--kc-admin-auth-url", required=True) + parser.add_argument("--mail-user", default="keycloak") + parser.add_argument("--mail-pass", required=True) + parser.add_argument("--mail-server", required=True) + parser.add_argument("--admin-mail", required=True) + + args = parser.parse_args() + + #logging.basicConfig(level=logging.DEBUG) + + kc_admin = KeycloakAdmin(server_url=args.kc_admin_auth_url, + username=args.kc_admin_user, + password=args.kc_admin_pass, + realm_name=args.realm, + verify=True) + + startDate = datetime.datetime.now().strftime("%y-%d-%m") + endDate = datetime.datetime.now().strftime("%y-%d-%m") + filterDate = { "dateFrom" : startDate, "dateTo" : endDate, "max" : 11 } + + events = [] + counter = 1 + found_new = True + + while(found_new): + events = [] + events = kc_admin.get_events(filterDate.update({ "first" : counter * 10 })) + counter += 1 + print(events) + time.sleep(1) + print("======") + + found_new = False + for e in events: + if e["type"] == "LOGIN_ERROR": + uid = str(e["time"]) + str(e["userId"]) + if uid in db: + continue + else: + db[uid] = e + timeInJson = int(e["time"]) # careful this is unix in ms + timeParsed = datetime.datetime.fromtimestamp(timeInJson//1000) + times.append(timeParsed) + timesToIP.update({ timeParsed : e["ipAddress"] }) + found_new = True + + + # compute count # + print("Computing failures...") + times = list(filter(lambda x: x - datetime.datetime.now() < datetime.timedelta(hours=3), times)) + ipCount = {} + if len(times) > 10: + + for t in times: + ip = timesToIP[t] + if ip in ipCount: + ipCount[ip] += 1 + else: + ipCount[ip] = 1 + + print("Brutefoce detected") + print("Source IP(s)") + for ip in ipCount.keys(): + print(ip, ":", ipCount[ip]) + + send_mail(ipCount, times, args) + else: + print("Only {} failures!".format(len(times)))