#!/usr/bin/python3 import flask import sys import argparse import os import subprocess import crypt from sqlalchemy import Column, Integer, String, Boolean, or_, and_ from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError from sqlalchemy.sql import func import sqlalchemy from flask_sqlalchemy import SQLAlchemy app = flask.Flask("Flask-VSFTP-User-Tool") app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.sqlite' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) HTTP_FORBIDDEN = 401 HTTP_NOT_FOUND = 404 HTTP_UNPROCESSABLE = 422 HTTP_INTERNAL_ERR = 500 HTTP_EMPTY = 204 EMPTY = "" FORBIDDEN_USERNAMES = ["root", "bin", "daemon", "sys", "sync", "games", "man", "news", "uucp", "proxy", "www-data", "backup", "list", "irc", "gnats", "nobody", "_apt", "systemd-timesync", "systemd-network", "systemd-resolve", "messagebus", "docker", "nginx" , "sshd", "flask"] # unix useradd requires exactly this salt, do not change PAM_PASSWD_SALT = "22" @app.route('/') def index(): previousResponseCode = flask.request.args.get("code") return flask.render_template("index.html", code=previousResponseCode) @app.route('/list-users') def listUsers(): users = db.session.query(FTPUser) previousResponseCode = flask.request.args.get("code") return flask.render_template("list_users.html", users=users, code=previousResponseCode) @app.route('/create-user', methods=["POST"]) def createUser(): error = createUser(flask.request.form) if error: return (error, HTTP_INTERNAL_ERR) return (EMPTY, HTTP_EMPTY) @app.route('/delete-user', methods=["POST"]) def deleteUser(): userToDelete = flask.request.form['username'] user = db.session.query(FTPUser).filter(FTPUser.username == userToDelete).first() if not user: return ("User doesn't exist.", 405) db.session.delete(user) db.session.commit() # be extra safe and use value from database subprocess.run(["/usr/bin/sudo", "./scripts/delete_user.sh", user.username]) return ("/list-users", 200) def sanityCheckInputString(string, stringName): # sanity check input, let's not built RCE # try: string = string.encode("ascii").decode("ascii") except UnicodeEncodeError: return "Error: {} contains non-ascii characters".format(stringName) if not string.isalpha(): return "Error: {} contains non-alpha characters".format(stringName) return None def createUser(webform): # command line useradd requires a pre-encrypted password cryptPass = crypt.crypt(webform['password'], PAM_PASSWD_SALT) username = webform['username'] error = sanityCheckInputString(username, "username") if error: return error # forbid system users if username in FORBIDDEN_USERNAMES: return "Error: Username {} is forbidden because it is a special user.".format(username) # track added users to prevent deletion of other users and listing # try: db.session.add(FTPUser(username=webform['username'])) db.session.commit() except sqlalchemy.exc.IntegrityError as e: if "UNIQUE" in str(e): return "Error: User with this name already exists" else: return str(e) subprocess.run(["/usr/bin/sudo", "./scripts/create_user.sh", cryptPass, username]) return None class FTPUser(db.Model): __tablename__ = 'users' username = Column(String, primary_key=True) @app.before_first_request def init(): app.config["DB"] = db db.create_all() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Flask-VSFTP-User-Tool", \ formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("-i", "--interface", default="0.0.0.0", help="Interface to listen on") parser.add_argument("-p", "--port", default="5000", help="Port to listen on") args = parser.parse_args() app.run(host=args.interface, port=args.port)