#!/usr/bin/python3 # SPDX-License-Identifier: AGPL-3.0-or-later """ Configuration helper for OpenVPN server. """ import argparse import os import subprocess import augeas from plinth import action_utils KEYS_DIRECTORY = '/etc/openvpn/freedombox-keys' DH_PARAMS = f'{KEYS_DIRECTORY}/pki/dh.pem' EC_PARAMS_DIR = f'{KEYS_DIRECTORY}/pki/ecparams' SERVER_CONFIGURATION_PATH = '/etc/openvpn/server/freedombox.conf' SERVICE_NAME = 'openvpn-server@freedombox' CA_CERTIFICATE_PATH = os.path.join(KEYS_DIRECTORY, 'pki', 'ca.crt') USER_CERTIFICATE_PATH = os.path.join(KEYS_DIRECTORY, 'pki', 'issued', '{username}.crt') USER_KEY_PATH = os.path.join(KEYS_DIRECTORY, 'pki', 'private', '{username}.key') ATTR_FILE = os.path.join(KEYS_DIRECTORY, 'pki', 'index.txt.attr') SERVER_CONFIGURATION = ''' port 1194 proto udp proto udp6 dev tun client-to-client ca /etc/openvpn/freedombox-keys/pki/ca.crt cert /etc/openvpn/freedombox-keys/pki/issued/server.crt key /etc/openvpn/freedombox-keys/pki/private/server.key dh none server 10.91.0.0 255.255.255.0 keepalive 10 120 verb 3 tls-server tls-version-min 1.2 cipher AES-256-CBC script-security 2 ''' CLIENT_CONFIGURATION_RSA = ''' client remote {remote} 1194 proto udp proto udp6 dev tun nobind remote-cert-tls server cipher AES-256-CBC comp-lzo redirect-gateway verb 3 {ca} {cert} {key}''' CLIENT_CONFIGURATION_ECC = ''' client remote {remote} 1194 proto udp proto udp6 dev tun nobind remote-cert-tls server cipher AES-256-CBC redirect-gateway verb 3 {ca} {cert} {key}''' CERTIFICATE_CONFIGURATION = { 'EASYRSA_BATCH': '1', 'EASYRSA_DIGEST': 'sha512', 'KEY_CONFIG': '/usr/share/easy-rsa/openssl-easyrsa.cnf', 'KEY_DIR': KEYS_DIRECTORY, 'EASYRSA_OPENSSL': 'openssl', 'EASYRSA_CA_EXPIRE': '3650', 'EASYRSA_REQ_EXPIRE': '3650', 'EASYRSA_REQ_COUNTRY': 'US', 'EASYRSA_REQ_PROVINCE': 'NY', 'EASYRSA_REQ_CITY': 'New York', 'EASYRSA_REQ_ORG': 'FreedomBox', 'EASYRSA_REQ_EMAIL': 'me@freedombox', 'EASYRSA_REQ_OU': 'Home', 'EASYRSA_REQ_NAME': 'FreedomBox' } CERTIFICATE_CONFIGURATION_RSA = { 'EASYRSA_KEY_SIZE': '4096', **CERTIFICATE_CONFIGURATION } CERTIFICATE_CONFIGURATION_ECC = { 'EASYRSA_ALGO': 'ec', **CERTIFICATE_CONFIGURATION } COMMON_ARGS = {'env': CERTIFICATE_CONFIGURATION_ECC, 'cwd': KEYS_DIRECTORY} def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') subparsers.add_parser('is-setup', help='Return whether setup is completed') subparsers.add_parser('setup', help='Setup OpenVPN server configuration') get_profile = subparsers.add_parser( 'get-profile', help='Return the OpenVPN profile of a user') get_profile.add_argument('username', help='User to get profile for') get_profile.add_argument('remote_server', help='The server name for the user to connect') subparsers.required = True return parser.parse_args() def _is_using_ecc(): """Return whether the service is using ECC.""" if os.path.exists(SERVER_CONFIGURATION_PATH): with open(SERVER_CONFIGURATION_PATH, 'r') as file_handle: for line in file_handle: if line.strip() == 'dh none': return True return False def _is_setup(): """Return whether setup is complete.""" return _is_non_empty_file(DH_PARAMS) or os.path.exists(EC_PARAMS_DIR) def subcommand_is_setup(_): """Print whether setup is complete.""" print('true' if _is_setup() else 'false') def subcommand_setup(_): """Setup configuration, CA and certificates.""" _write_server_config() _create_certificates() _setup_firewall() action_utils.service_enable(SERVICE_NAME) action_utils.service_restart(SERVICE_NAME) def _write_server_config(): """Write server configuration.""" with open(SERVER_CONFIGURATION_PATH, 'w') as file_handle: file_handle.write(SERVER_CONFIGURATION) def _setup_firewall(): """Add TUN device to internal zone in firewalld.""" def _configure_interface(interface, operation): """Add or remove an interface into internal zone.""" command = [ 'firewall-cmd', '--zone', 'internal', '--{}-interface'.format(operation), interface ] subprocess.call(command) subprocess.call(command + ['--permanent']) def _is_tunplus_enabled(): """Return whether tun+ interface is already added.""" try: process = subprocess.run( ['firewall-cmd', '--zone', 'internal', '--list-interfaces'], stdout=subprocess.PIPE, check=True) return 'tun+' in process.stdout.decode().strip().split() except subprocess.CalledProcessError: return True # Safer # XXX: Due to https://bugs.debian.org/919517 when tun+ interface is added, # firewalld is unable to handle it in nftables backend causing firewalld to # break while applying rules. This makes the entire system unreachable. # Hack around the problem by adding a few tun interfaces into the internal # zone. Hopefully, OpenVPN setting 'dev tun' will end up using one of those # if the tun devices are not used by other services. When the issue is # fixed, use tun+ instead. is_tunplus_set = _is_tunplus_enabled() _configure_interface('tun+', 'remove') for index in range(8): _configure_interface('tun{}'.format(index), 'add') if is_tunplus_set: action_utils.service_restart('firewalld') def _init_pki(): """Initialize easy-rsa PKI directory to create configuration file.""" subprocess.check_call(['/usr/share/easy-rsa/easyrsa', 'init-pki'], **COMMON_ARGS) def _create_certificates(): """Generate CA and server certificates.""" try: os.mkdir(KEYS_DIRECTORY, 0o700) except FileExistsError: pass _init_pki() easy_rsa = '/usr/share/easy-rsa/easyrsa' subprocess.check_call([easy_rsa, 'build-ca', 'nopass'], **COMMON_ARGS) subprocess.check_call([easy_rsa, 'build-server-full', 'server', 'nopass'], **COMMON_ARGS) subprocess.check_call([easy_rsa, 'gen-req', 'server', 'nopass'], **COMMON_ARGS) subprocess.check_call([easy_rsa, 'sign-req', 'server', 'server'], **COMMON_ARGS) def subcommand_get_profile(arguments): """Return the profile for a user.""" username = arguments.username remote_server = arguments.remote_server if username == 'ca' or username == 'server': raise Exception('Invalid username') user_certificate = USER_CERTIFICATE_PATH.format(username=username) user_key = USER_KEY_PATH.format(username=username) if not _is_non_empty_file(user_certificate) or \ not _is_non_empty_file(user_key): set_unique_subject('no') # Set unique subject in attribute file to no subprocess.check_call([ '/usr/share/easy-rsa/easyrsa', 'build-client-full', username, 'nopass' ], env=CERTIFICATE_CONFIGURATION_ECC if _is_using_ecc() else CERTIFICATE_CONFIGURATION_RSA, cwd=KEYS_DIRECTORY) user_certificate_string = _read_file(user_certificate) user_key_string = _read_file(user_key) ca_string = _read_file(CA_CERTIFICATE_PATH) client_configuration = CLIENT_CONFIGURATION_ECC if _is_using_ecc( ) else CLIENT_CONFIGURATION_RSA profile = client_configuration.format(ca=ca_string, cert=user_certificate_string, key=user_key_string, remote=remote_server) print(profile) def set_unique_subject(value): """ Sets the unique_subject value to a particular value""" aug = load_augeas() aug.set('/files' + ATTR_FILE + '/unique_subject', value) aug.save() def _read_file(filename): """Return the entire contents of a file as string.""" with open(filename, 'r') as file_handle: return ''.join(file_handle.readlines()) def _is_non_empty_file(filepath): """Return whether a file exists and is not zero size.""" return os.path.isfile(filepath) and os.path.getsize(filepath) > 0 def load_augeas(): """Initialize Augeas.""" aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD + augeas.Augeas.NO_MODL_AUTOLOAD) # shell-script config file lens aug.set('/augeas/load/Simplevars/lens', 'Simplevars.lns') aug.set('/augeas/load/Simplevars/incl[last() + 1]', ATTR_FILE) aug.load() return aug def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()