Files
athq-vm-management/helper_scripts/wireguard-extender.py

160 lines
4.4 KiB
Python

import subprocess
import json
import os
from pathlib import Path
import configparser
import ipaddress
# FIXME/WARNING: dont change this order without checking the output
EXISTING_HOSTS = [
"atlantishq.de",
"katzencluster.atlantishq.de",
"atlantis-helsinki.atlantishq.de",
]
USER = "root"
REMOTE_DIR = "/etc/wireguard"
LOCAL_TMP_DIR = "./tmp"
LINK_BASE = f"10.0.{len(EXISTING_HOSTS)+10}.1/32"
LINK_BASE_PEER = f"10.0.{len(EXISTING_HOSTS)+10}.2/32"
PORT_BASE = 51820 + len(EXISTING_HOSTS)
def fetch_conf_files(host):
print(f"Doing {host}\n")
os.makedirs(LOCAL_TMP_DIR, exist_ok=True)
remote_path = f"{USER}@{host}:{REMOTE_DIR}/*.conf"
subprocess.run(["scp", remote_path, LOCAL_TMP_DIR], check=True)
print(f"\n{host} retrieved successfully.")
def parse_conf_file(filepath, host):
config = {}
current_section = "Interface"
config[current_section] = {}
with open(filepath) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and line.endswith("]"):
current_section = line.strip("[]")
config[current_section] = {}
else:
if "=" in line:
key, val = map(str.strip, line.split("=", 1))
config[current_section][key] = val
return config
def load_all_confs(host):
all_configs = {}
for conf_file in Path(LOCAL_TMP_DIR).glob("*.conf"):
conf_name = conf_file.stem
all_configs[conf_name] = parse_conf_file(conf_file, host)
os.remove(conf_file)
return { host : all_configs }
def create_wireguard_config_pair(
new_host: str,
old_host: str,
new_host_allowed_ips: list[str],
old_host_allowed_ips,
old_host_public_key,
old_host_private_key,
output_dir: str = "./tmp"
):
# Validate input subnet
try:
ipaddress.IPv4Interface(local_address)
except ValueError as e:
raise ValueError(f"Invalid IP address: {local_address}") from e
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Generate key pairs
local_private, local_public = generate_keypair()
peer_private, peer_public = generate_keypair()
MASTER_CONFIG = """[Interface]
PrivateKey = {local_private}
Address = {local_address}
[Peer]
Endpoint = {peer_endpoint}
PublicKey = {peer_public}
AllowedIPs = {", ".join(peer_allowed_ips)}
"""
PEER_CONFIG = f"""[Interface]
PrivateKey = {peer_private}
Address = {peer_address}
[Peer]
PublicKey = {local_public}
AllowedIPs = {", ".join(peer_allowed_ips)}
"""
new_host = MASTER_CONFIG.format(
local_private = local_private,
local_address = LINK_BASE,
local_public = local_public,
peer_endpoint = old_host,
peer_public = old_host_public_key,
peer_allowed_ips = [LINK_BASE_PEER, old_host_allowed_ips],
)
old_host = PEER_CONFIG.format(
peer_private = old_host_private_key,
peer_address = LINK_BASE_PEER,
local_public= local_public,
peer_allowed_ips = [LINK_BASE, new_host_subnet]
)
old_host_file = Path(output_dir) / f"{new_host}_local.conf"
new_host_file = Path(output_dir) / f"{new_host}_peer.conf"
new_host_file.write_text(new_host)
old_host_file.write_text(old_host)
return str(local_file), str(peer_file)
def generate_wireguard_keys():
private_key = subprocess.check_output(["wg", "genkey"]).strip()
public_key = subprocess.check_output(["wg", "pubkey"], input=private_key).strip()
return private_key.decode(), public_key.decode()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WireGuard utility script")
parser.add_argument("--new-host", help="Hostname or IP of the new host")
parser.add_argument("--new-host-subnet", type=ipaddress.IPv4Network, help="Subnet of new host")
parser.add_argument("--write", action=argparse.BooleanOptionalAction, default=False)
parser.add_argument("--allow-overwrite", action=argparse.BooleanOptionalAction, default=False)
args = parser.parse_args()
configs = {}
for host in EXISTING_HOSTS:
fetch_conf_files(host)
configs |= load_all_confs(host)
# output current state #
print(json.dumps(configs, indent=2))
if args.new_host and args.new_host_subnet:
pub, priv = generate_wireguard_keys()
create_wireguard_config_pair(LINK_BASE, new_host