diff --git a/.gitignore b/.gitignore index 1377554..65e4107 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.swp +config.yaml diff --git a/Dockerfile.smtp_check b/Dockerfile.smtp_check new file mode 100644 index 0000000..3c8eae8 --- /dev/null +++ b/Dockerfile.smtp_check @@ -0,0 +1,12 @@ +FROM alpine + +RUN apk add --no-cache py3-pip + +WORKDIR /app +RUN python3 -m pip install --no-cache-dir --break-system-packages requests PyYaml + +COPY check_smtp_imap.py . + +EXPOSE 5000/tcp +ENTRYPOINT ["python"] +CMD ["check_smtp_imap.py"] diff --git a/check_smtp_imap.py b/check_smtp_imap.py index 9c01df7..da44ea0 100644 --- a/check_smtp_imap.py +++ b/check_smtp_imap.py @@ -1,5 +1,7 @@ #!/usr/bin/python3 +import yaml +import os import smtplib, ssl import requests import sys @@ -11,41 +13,36 @@ import time import imaplib import json -args = None -def exit(status, info): +def report(args, status, info): - content = { "service" : args.monitoring_service_name, - "status" : status, - "token" : args.monitoring_token, - "info" : info } + try: + content = { + "service": args.monitoring_service_name, + "status": status, + "token": args.monitoring_server_token, + "info": info + } - r = requests.post(args.monitoring_server, json=content) - sys.exit(0) + # check for auth params # + if "monitoring_server_user" in args: + auth = (args.monitoring_server_user, args.monitoring_server_pass) + else: + auth = (None, None) + r = requests.post(args.monitoring_server, json=content, auth=auth) + print(f"Report: {args.imap_target} [{status}] - {info}", file=sys.stderr) + r.raise_for_status() -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Email STMP/IMAP Monitor', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + except requests.RequestException as e: + print(f"Warning: Report failed {e}", file=sys.stderr) - parser.add_argument("--target", required=True, help="Target Server to check") - parser.add_argument("--sender", required=True, help="Sender Email to use") - parser.add_argument("--receiver", required=True, help="Receiver Mail (must exits on target)") - parser.add_argument("--imap-user", help="IMAP User for receiver Mail") - parser.add_argument("--imap-pass", required=True, help="IMAP Password for receiver Mail") - parser.add_argument("--port", default=587, help="Target (START_TLS) port") - parser.add_argument("--monitoring-server", required=True) - parser.add_argument("--monitoring-token", required=True) - parser.add_argument("--monitoring-service-name", required=True) +def send_and_check(args): - parser.add_argument("--smtp-sender-pass", help="Sender password for SMTP if login is required") - parser.add_argument("--imap-target", help="IMAP-Target Server if different from '--target''") - - args = parser.parse_args() - - imap_user = args.imap_user - if not imap_user: - imap_user = args.receiver + if "imap_user" not in args: + imap_user = args.receiver_email + else: + imap_user = args.imap_user challenge = { "time" : datetime.datetime.now().timestamp(), @@ -54,16 +51,17 @@ if __name__ == "__main__": } message = 'From: {}\nTo: {}\nSubject: Monitoring Challenge\n\n{}'.format( - args.sender, args.receiver, json.dumps(challenge)) + args.smtp_sender_email, args.receiver_email, json.dumps(challenge)) context = ssl.create_default_context() # send mail # - server = smtplib.SMTP(args.target, args.port) + server = smtplib.SMTP(args.smtp_sender_server, args.smtp_sender_server_port) server.starttls(context=context) if args.smtp_sender_pass: - server.login(args.sender, args.smtp_sender_pass) - server.sendmail(args.sender, args.receiver, message) + server.login(args.smtp_sender_email, args.smtp_sender_pass) + + test = server.sendmail(args.smtp_sender_email, args.receiver_email, message) # give server some time to deliver # time.sleep(5) @@ -71,57 +69,97 @@ if __name__ == "__main__": # check imap # for x in range(0,5): - imap_target = args.imap_target or args.target - + imap_target = args.imap_target or args.target_server + with imaplib.IMAP4_SSL(imap_target) as imap: + imap.login(imap_user, args.imap_pass) imap.select('INBOX') status, messages = imap.search(None, 'ALL') # check search status # if not status == "OK": - exit("CRITICAL", "IMAP search failed") + report_and_exit(args, "CRITICAL", "IMAP search failed") - for message in messages[0].split(b' '): + try: + for message in messages[0].split(b' '): - if not message: - continue + if not message: + time.sleep(0.1) + continue - status, data = imap.fetch(message, '(RFC822)') + status, data = imap.fetch(message, '(RFC822)') - # check search status # - if not status == "OK": - exit("CRITICAL", "IMAP fetch failed") + # check search status # + if not status == "OK": + report(args, "CRITICAL", "IMAP fetch failed") - # parse mail - info = None - body = None + # parse mail + info = None + body = None - # ignore badly formated messages # - try: - body = data[0][1].decode("utf-8").split("\r\n")[-2] - except IndexError: - continue + # ignore badly formated messages # + try: + body = data[0][1].decode("utf-8").split("\r\n")[-2] + except IndexError: + continue - # ignore badly formated messages (json-body) # - try: - info = json.loads(body) - except json.decoder.JSONDecodeError: - continue + # ignore badly formated messages (json-body) # + try: + info = json.loads(body) + except json.decoder.JSONDecodeError: + continue - # ignore mail if it's not ours otherwise cleanup # - if info["origin"] != challenge["origin"]: - continue - else: - imap.store(message, '+FLAGS', '\\Deleted') + # ignore mail if it's not ours otherwise cleanup # + if info["origin"] != challenge["origin"]: + continue + else: + imap.store(message, '+FLAGS', '\\Deleted') - if info["token"] == challenge["token"]: - exit("OK", "") + if info["token"] == challenge["token"]: + report(args, "OK", "") + return + + # backoff and try again # + time.sleep(10) - imap.logout() + finally: + imap.expunge() + imap.logout() - # backoff and try again # - time.sleep(10) # if we didn't find anything # - exit("CRITICAL", "Challenge not found via IMAP") + report(args, "CRITICAL", "Challenge not found via IMAP") + return + + +if __name__ == "__main__": + + DEBUG = os.getenv("ENABLE_DEBUG") == 1 + + with open("config.yaml") as f: + + config = yaml.safe_load(f) + + index = 0 + for element in config: + + element |= os.environ + + if type(element) != dict: + print(f"Config at index {index} is not a valid config struct", file=sys.stderr) + continue + if "imap_target" not in element: + print(f"Config struct at {index} is missing field 'imap_target'", file=sys.stderr) + continue + + try: + args = argparse.Namespace(**element) + print(f"Checking: {args.imap_target}", file=sys.stderr) + send_and_check(args) + except AttributeError as e: + print(f"Error during check for {args.imap_target}: {e}", file=sys.stderr) + continue + + + diff --git a/check_smtp_imap_legacy.py b/check_smtp_imap_legacy.py new file mode 100644 index 0000000..9c01df7 --- /dev/null +++ b/check_smtp_imap_legacy.py @@ -0,0 +1,127 @@ +#!/usr/bin/python3 + +import smtplib, ssl +import requests +import sys +import secrets +import datetime +import argparse +import socket +import time +import imaplib +import json + +args = None + +def exit(status, info): + + content = { "service" : args.monitoring_service_name, + "status" : status, + "token" : args.monitoring_token, + "info" : info } + + r = requests.post(args.monitoring_server, json=content) + sys.exit(0) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Email STMP/IMAP Monitor', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument("--target", required=True, help="Target Server to check") + parser.add_argument("--sender", required=True, help="Sender Email to use") + parser.add_argument("--receiver", required=True, help="Receiver Mail (must exits on target)") + parser.add_argument("--imap-user", help="IMAP User for receiver Mail") + parser.add_argument("--imap-pass", required=True, help="IMAP Password for receiver Mail") + parser.add_argument("--port", default=587, help="Target (START_TLS) port") + parser.add_argument("--monitoring-server", required=True) + parser.add_argument("--monitoring-token", required=True) + parser.add_argument("--monitoring-service-name", required=True) + + parser.add_argument("--smtp-sender-pass", help="Sender password for SMTP if login is required") + parser.add_argument("--imap-target", help="IMAP-Target Server if different from '--target''") + + args = parser.parse_args() + + imap_user = args.imap_user + if not imap_user: + imap_user = args.receiver + + challenge = { + "time" : datetime.datetime.now().timestamp(), + "token" : secrets.token_urlsafe(), + "origin" : socket.gethostname(), + } + + message = 'From: {}\nTo: {}\nSubject: Monitoring Challenge\n\n{}'.format( + args.sender, args.receiver, json.dumps(challenge)) + + context = ssl.create_default_context() + + # send mail # + server = smtplib.SMTP(args.target, args.port) + server.starttls(context=context) + if args.smtp_sender_pass: + server.login(args.sender, args.smtp_sender_pass) + server.sendmail(args.sender, args.receiver, message) + + # give server some time to deliver # + time.sleep(5) + + # check imap # + for x in range(0,5): + + imap_target = args.imap_target or args.target + + with imaplib.IMAP4_SSL(imap_target) as imap: + imap.login(imap_user, args.imap_pass) + imap.select('INBOX') + status, messages = imap.search(None, 'ALL') + + # check search status # + if not status == "OK": + exit("CRITICAL", "IMAP search failed") + + for message in messages[0].split(b' '): + + if not message: + continue + + status, data = imap.fetch(message, '(RFC822)') + + # check search status # + if not status == "OK": + exit("CRITICAL", "IMAP fetch failed") + + # parse mail + info = None + body = None + + # ignore badly formated messages # + try: + body = data[0][1].decode("utf-8").split("\r\n")[-2] + except IndexError: + continue + + # ignore badly formated messages (json-body) # + try: + info = json.loads(body) + except json.decoder.JSONDecodeError: + continue + + # ignore mail if it's not ours otherwise cleanup # + if info["origin"] != challenge["origin"]: + continue + else: + imap.store(message, '+FLAGS', '\\Deleted') + + if info["token"] == challenge["token"]: + exit("OK", "") + + imap.logout() + + # backoff and try again # + time.sleep(10) + + # if we didn't find anything # + exit("CRITICAL", "Challenge not found via IMAP")