import argparse import keycloak import json import time from keycloak import KeycloakAdmin import logging import datetime import smtplib import ssl db = {} last_mail = None times = [] timesToIP = {} def send_mail(ipCount, times, args): port = 465 # For SSL # Create a secure SSL context context = ssl.create_default_context() timeString = "" for t in times: timeString += t.isoformat() + "\n" orginString = "" for ip, count in ipCount.items(): orginString += "{} : {}\n".format(ip.ljust(12), str(count).ljust(4)) with smtplib.SMTP_SSL(args.mail_server, port, context=context) as server: sender = args.mail_user + "@" + args.mail_server server.login(sender, args.mail_pass) message = "Subject: Brute Force Detected\nAt Times:{}\n\nfrom origin(s):{}".format( timeString, orginString) server.sendmail(sender, args.admin_mail, message) if __name__ == "__main__": parser = argparse.ArgumentParser(description='Keycloak Autoconfig') parser.add_argument("--realm", default="master") parser.add_argument("--kc-admin-user", default="admin") parser.add_argument("--kc-admin-pass", required=True) parser.add_argument("--kc-admin-auth-url", required=True) parser.add_argument("--mail-user", default="keycloak") parser.add_argument("--mail-pass", required=True) parser.add_argument("--mail-server", required=True) parser.add_argument("--admin-mail", required=True) args = parser.parse_args() #logging.basicConfig(level=logging.DEBUG) kc_admin = KeycloakAdmin(server_url=args.kc_admin_auth_url, username=args.kc_admin_user, password=args.kc_admin_pass, realm_name=args.realm, verify=True) startDate = datetime.datetime.now().strftime("%y-%d-%m") endDate = datetime.datetime.now().strftime("%y-%d-%m") filterDate = { "dateFrom" : startDate, "dateTo" : endDate, "max" : 11 } events = [] counter = 1 found_new = True while(found_new): events = [] events = kc_admin.get_events(filterDate.update({ "first" : counter * 10 })) counter += 1 print(events) time.sleep(1) print("======") found_new = False for e in events: if e["type"] == "LOGIN_ERROR": uid = str(e["time"]) + str(e["userId"]) if uid in db: continue else: db[uid] = e timeInJson = int(e["time"]) # careful this is unix in ms timeParsed = datetime.datetime.fromtimestamp(timeInJson//1000) times.append(timeParsed) timesToIP.update({ timeParsed : e["ipAddress"] }) found_new = True # compute count # print("Computing failures...") times = list(filter(lambda x: x - datetime.datetime.now() < datetime.timedelta(hours=3), times)) ipCount = {} if len(times) > 10: for t in times: ip = timesToIP[t] if ip in ipCount: ipCount[ip] += 1 else: ipCount[ip] = 1 print("Brutefoce detected") print("Source IP(s)") for ip in ipCount.keys(): print(ip, ":", ipCount[ip]) send_mail(ipCount, times, args) else: print("Only {} failures!".format(len(times)))