Files
jeffrey_miller_flask_ftp/server.py
2021-08-31 23:00:59 +02:00

130 lines
4.0 KiB
Python
Executable File

#!/usr/bin/python3
import flask
import sys
import argparse
import os
import subprocess
import crypt
from sqlalchemy import Column, Integer, String, Boolean, or_, and_
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import func
import sqlalchemy
from flask_sqlalchemy import SQLAlchemy
app = flask.Flask("Flask-VSFTP-User-Tool")
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.sqlite'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
HTTP_FORBIDDEN = 401
HTTP_NOT_FOUND = 404
HTTP_UNPROCESSABLE = 422
HTTP_INTERNAL_ERR = 500
HTTP_EMPTY = 204
EMPTY = ""
FORBIDDEN_USERNAMES = ["root", "bin", "daemon", "sys", "sync", "games", "man", "news", "uucp",
"proxy", "www-data", "backup", "list", "irc", "gnats", "nobody", "_apt",
"systemd-timesync", "systemd-network", "systemd-resolve", "messagebus",
"docker", "nginx" , "sshd", "flask"]
# unix useradd requires exactly this salt, do not change
PAM_PASSWD_SALT = "22"
@app.route('/')
def index():
previousResponseCode = flask.request.args.get("code")
return flask.render_template("index.html", code=previousResponseCode)
@app.route('/list-users')
def listUsers():
users = db.session.query(FTPUser)
previousResponseCode = flask.request.args.get("code")
return flask.render_template("list_users.html", users=users, code=previousResponseCode)
@app.route('/create-user', methods=["POST"])
def createUser():
error = createUser(flask.request.form)
if error:
return (error, HTTP_INTERNAL_ERR)
return (EMPTY, HTTP_EMPTY)
@app.route('/delete-user', methods=["POST"])
def deleteUser():
userToDelete = flask.request.form['username']
user = db.session.query(FTPUser).filter(FTPUser.username == userToDelete).first()
if not user:
return ("User doesn't exist.", 405)
db.session.delete(user)
db.session.commit()
# be extra safe and use value from database
subprocess.run(["/usr/bin/sudo", "./scripts/delete_user.sh", user.username])
return ("/list-users", 200)
def sanityCheckInputString(string, stringName):
# sanity check input, let's not built RCE #
try:
string = string.encode("ascii").decode("ascii")
except UnicodeEncodeError:
return "Error: {} contains non-ascii characters".format(stringName)
if not string.isalpha():
return "Error: {} contains non-alpha characters".format(stringName)
return None
def createUser(webform):
# command line useradd requires a pre-encrypted password
cryptPass = crypt.crypt(webform['password'], PAM_PASSWD_SALT)
username = webform['username']
error = sanityCheckInputString(username, "username")
if error:
return error
# forbid system users
if username in FORBIDDEN_USERNAMES:
return "Error: Username {} is forbidden because it is a special user.".format(username)
# track added users to prevent deletion of other users and listing #
try:
db.session.add(FTPUser(username=webform['username']))
db.session.commit()
except sqlalchemy.exc.IntegrityError as e:
if "UNIQUE" in str(e):
return "Error: User with this name already exists"
else:
return str(e)
subprocess.run(["/usr/bin/sudo", "./scripts/create_user.sh", cryptPass, username])
return None
class FTPUser(db.Model):
__tablename__ = 'users'
username = Column(String, primary_key=True)
@app.before_first_request
def init():
app.config["DB"] = db
db.create_all()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Flask-VSFTP-User-Tool", \
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-i", "--interface", default="0.0.0.0", help="Interface to listen on")
parser.add_argument("-p", "--port", default="5000", help="Port to listen on")
args = parser.parse_args()
app.run(host=args.interface, port=args.port)