feat: split imap monitoring into k3s & legacy

This commit is contained in:
2026-03-29 04:01:53 +02:00
parent de0ba6d093
commit 7ca3308e64
4 changed files with 244 additions and 66 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
*.swp *.swp
config.yaml

12
Dockerfile.smtp_check Normal file
View File

@@ -0,0 +1,12 @@
FROM alpine
RUN apk add --no-cache py3-pip
WORKDIR /app
RUN python3 -m pip install --no-cache-dir --break-system-packages requests PyYaml
COPY check_smtp_imap.py .
EXPOSE 5000/tcp
ENTRYPOINT ["python"]
CMD ["check_smtp_imap.py"]

View File

@@ -1,5 +1,7 @@
#!/usr/bin/python3 #!/usr/bin/python3
import yaml
import os
import smtplib, ssl import smtplib, ssl
import requests import requests
import sys import sys
@@ -11,41 +13,36 @@ import time
import imaplib import imaplib
import json import json
args = None
def exit(status, info): def report(args, status, info):
content = { "service" : args.monitoring_service_name, try:
"status" : status, content = {
"token" : args.monitoring_token, "service": args.monitoring_service_name,
"info" : info } "status": status,
"token": args.monitoring_server_token,
"info": info
}
r = requests.post(args.monitoring_server, json=content) # check for auth params #
sys.exit(0) if "monitoring_server_user" in args:
auth = (args.monitoring_server_user, args.monitoring_server_pass)
else:
auth = (None, None)
r = requests.post(args.monitoring_server, json=content, auth=auth)
print(f"Report: {args.imap_target} [{status}] - {info}", file=sys.stderr)
r.raise_for_status()
if __name__ == "__main__": except requests.RequestException as e:
parser = argparse.ArgumentParser(description='Email STMP/IMAP Monitor', print(f"Warning: Report failed {e}", file=sys.stderr)
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--target", required=True, help="Target Server to check") def send_and_check(args):
parser.add_argument("--sender", required=True, help="Sender Email to use")
parser.add_argument("--receiver", required=True, help="Receiver Mail (must exits on target)")
parser.add_argument("--imap-user", help="IMAP User for receiver Mail")
parser.add_argument("--imap-pass", required=True, help="IMAP Password for receiver Mail")
parser.add_argument("--port", default=587, help="Target (START_TLS) port")
parser.add_argument("--monitoring-server", required=True)
parser.add_argument("--monitoring-token", required=True)
parser.add_argument("--monitoring-service-name", required=True)
parser.add_argument("--smtp-sender-pass", help="Sender password for SMTP if login is required")
parser.add_argument("--imap-target", help="IMAP-Target Server if different from '--target''")
args = parser.parse_args()
if "imap_user" not in args:
imap_user = args.receiver_email
else:
imap_user = args.imap_user imap_user = args.imap_user
if not imap_user:
imap_user = args.receiver
challenge = { challenge = {
"time" : datetime.datetime.now().timestamp(), "time" : datetime.datetime.now().timestamp(),
@@ -54,16 +51,17 @@ if __name__ == "__main__":
} }
message = 'From: {}\nTo: {}\nSubject: Monitoring Challenge\n\n{}'.format( message = 'From: {}\nTo: {}\nSubject: Monitoring Challenge\n\n{}'.format(
args.sender, args.receiver, json.dumps(challenge)) args.smtp_sender_email, args.receiver_email, json.dumps(challenge))
context = ssl.create_default_context() context = ssl.create_default_context()
# send mail # # send mail #
server = smtplib.SMTP(args.target, args.port) server = smtplib.SMTP(args.smtp_sender_server, args.smtp_sender_server_port)
server.starttls(context=context) server.starttls(context=context)
if args.smtp_sender_pass: if args.smtp_sender_pass:
server.login(args.sender, args.smtp_sender_pass) server.login(args.smtp_sender_email, args.smtp_sender_pass)
server.sendmail(args.sender, args.receiver, message)
test = server.sendmail(args.smtp_sender_email, args.receiver_email, message)
# give server some time to deliver # # give server some time to deliver #
time.sleep(5) time.sleep(5)
@@ -71,27 +69,30 @@ if __name__ == "__main__":
# check imap # # check imap #
for x in range(0,5): for x in range(0,5):
imap_target = args.imap_target or args.target imap_target = args.imap_target or args.target_server
with imaplib.IMAP4_SSL(imap_target) as imap: with imaplib.IMAP4_SSL(imap_target) as imap:
imap.login(imap_user, args.imap_pass) imap.login(imap_user, args.imap_pass)
imap.select('INBOX') imap.select('INBOX')
status, messages = imap.search(None, 'ALL') status, messages = imap.search(None, 'ALL')
# check search status # # check search status #
if not status == "OK": if not status == "OK":
exit("CRITICAL", "IMAP search failed") report_and_exit(args, "CRITICAL", "IMAP search failed")
try:
for message in messages[0].split(b' '): for message in messages[0].split(b' '):
if not message: if not message:
time.sleep(0.1)
continue continue
status, data = imap.fetch(message, '(RFC822)') status, data = imap.fetch(message, '(RFC822)')
# check search status # # check search status #
if not status == "OK": if not status == "OK":
exit("CRITICAL", "IMAP fetch failed") report(args, "CRITICAL", "IMAP fetch failed")
# parse mail # parse mail
info = None info = None
@@ -116,12 +117,49 @@ if __name__ == "__main__":
imap.store(message, '+FLAGS', '\\Deleted') imap.store(message, '+FLAGS', '\\Deleted')
if info["token"] == challenge["token"]: if info["token"] == challenge["token"]:
exit("OK", "") report(args, "OK", "")
return
imap.logout()
# backoff and try again # # backoff and try again #
time.sleep(10) time.sleep(10)
finally:
imap.expunge()
imap.logout()
# if we didn't find anything # # if we didn't find anything #
exit("CRITICAL", "Challenge not found via IMAP") report(args, "CRITICAL", "Challenge not found via IMAP")
return
if __name__ == "__main__":
DEBUG = os.getenv("ENABLE_DEBUG") == 1
with open("config.yaml") as f:
config = yaml.safe_load(f)
index = 0
for element in config:
element |= os.environ
if type(element) != dict:
print(f"Config at index {index} is not a valid config struct", file=sys.stderr)
continue
if "imap_target" not in element:
print(f"Config struct at {index} is missing field 'imap_target'", file=sys.stderr)
continue
try:
args = argparse.Namespace(**element)
print(f"Checking: {args.imap_target}", file=sys.stderr)
send_and_check(args)
except AttributeError as e:
print(f"Error during check for {args.imap_target}: {e}", file=sys.stderr)
continue

127
check_smtp_imap_legacy.py Normal file
View File

@@ -0,0 +1,127 @@
#!/usr/bin/python3
import smtplib, ssl
import requests
import sys
import secrets
import datetime
import argparse
import socket
import time
import imaplib
import json
args = None
def exit(status, info):
content = { "service" : args.monitoring_service_name,
"status" : status,
"token" : args.monitoring_token,
"info" : info }
r = requests.post(args.monitoring_server, json=content)
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Email STMP/IMAP Monitor',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--target", required=True, help="Target Server to check")
parser.add_argument("--sender", required=True, help="Sender Email to use")
parser.add_argument("--receiver", required=True, help="Receiver Mail (must exits on target)")
parser.add_argument("--imap-user", help="IMAP User for receiver Mail")
parser.add_argument("--imap-pass", required=True, help="IMAP Password for receiver Mail")
parser.add_argument("--port", default=587, help="Target (START_TLS) port")
parser.add_argument("--monitoring-server", required=True)
parser.add_argument("--monitoring-token", required=True)
parser.add_argument("--monitoring-service-name", required=True)
parser.add_argument("--smtp-sender-pass", help="Sender password for SMTP if login is required")
parser.add_argument("--imap-target", help="IMAP-Target Server if different from '--target''")
args = parser.parse_args()
imap_user = args.imap_user
if not imap_user:
imap_user = args.receiver
challenge = {
"time" : datetime.datetime.now().timestamp(),
"token" : secrets.token_urlsafe(),
"origin" : socket.gethostname(),
}
message = 'From: {}\nTo: {}\nSubject: Monitoring Challenge\n\n{}'.format(
args.sender, args.receiver, json.dumps(challenge))
context = ssl.create_default_context()
# send mail #
server = smtplib.SMTP(args.target, args.port)
server.starttls(context=context)
if args.smtp_sender_pass:
server.login(args.sender, args.smtp_sender_pass)
server.sendmail(args.sender, args.receiver, message)
# give server some time to deliver #
time.sleep(5)
# check imap #
for x in range(0,5):
imap_target = args.imap_target or args.target
with imaplib.IMAP4_SSL(imap_target) as imap:
imap.login(imap_user, args.imap_pass)
imap.select('INBOX')
status, messages = imap.search(None, 'ALL')
# check search status #
if not status == "OK":
exit("CRITICAL", "IMAP search failed")
for message in messages[0].split(b' '):
if not message:
continue
status, data = imap.fetch(message, '(RFC822)')
# check search status #
if not status == "OK":
exit("CRITICAL", "IMAP fetch failed")
# parse mail
info = None
body = None
# ignore badly formated messages #
try:
body = data[0][1].decode("utf-8").split("\r\n")[-2]
except IndexError:
continue
# ignore badly formated messages (json-body) #
try:
info = json.loads(body)
except json.decoder.JSONDecodeError:
continue
# ignore mail if it's not ours otherwise cleanup #
if info["origin"] != challenge["origin"]:
continue
else:
imap.store(message, '+FLAGS', '\\Deleted')
if info["token"] == challenge["token"]:
exit("OK", "")
imap.logout()
# backoff and try again #
time.sleep(10)
# if we didn't find anything #
exit("CRITICAL", "Challenge not found via IMAP")