#!/usr/bin/python3 # # This file is part of FreedomBox. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # """ Configuration helper for WireGuard. """ import argparse import json import os import pathlib import subprocess PUBLIC_KEY_HELP = 'Public key for the client' SERVER_INTERFACE = 'wg0' def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') subparsers.add_parser('setup', help='Setup WireGuard') subparsers.add_parser('get-info', help='Get info for each configured interface') add_client = subparsers.add_parser('add-client', help='Add a client') add_client.add_argument('publickey', help=PUBLIC_KEY_HELP) remove_client = subparsers.add_parser('remove-client', help='Remove a client') remove_client.add_argument('publickey', help=PUBLIC_KEY_HELP) add_server = subparsers.add_parser('add-server', help='Add a server') add_server.add_argument('--endpoint', required=True, help='Server endpoint') add_server.add_argument('--client-ip', required=True, help='Client IP address provided by server') add_server.add_argument('--public-key', required=True, help='Public key of the server') add_server.add_argument('--pre-shared-key', help='Pre-shared key') add_server.add_argument( '--all-outgoing', action='store_true', help='Use this connection to send all outgoing traffic') subparsers.required = True return parser.parse_args() def subcommand_setup(_): """Setup WireGuard.""" key_folder = pathlib.Path('/var/lib/freedombox/wireguard') private_key_path = key_folder / 'privatekey' public_key_path = key_folder / 'publickey' # TODO: make idempotent # create interface subprocess.run( ['ip', 'link', 'add', 'dev', SERVER_INTERFACE, 'type', 'wireguard'], check=True) # generate key pair private_key = subprocess.check_output(['wg', 'genkey']) public_key = subprocess.check_output(['wg', 'pubkey'], input=private_key) key_folder.mkdir(parents=True, exist_ok=True) with public_key_path.open(mode='wb') as public_key_file: public_key_file.write(public_key) old_umask = os.umask(0o077) try: with private_key_path.open(mode='wb') as private_key_file: private_key_file.write(private_key) finally: os.umask(old_umask) subprocess.run( ['wg', 'set', SERVER_INTERFACE, 'listen-port', '51820', 'private-key', str(private_key_path)], check=True) def subcommand_get_info(_): """Get info for each configured interface.""" output = subprocess.check_output( ['wg', 'show', 'all', 'dump']).decode().strip() lines = output.split('\n') interfaces = {} for line in lines: fields = line.split() interface_name = fields[0] if interface_name in interfaces: peer = { 'public_key': fields[1], 'preshared_key': fields[2], 'endpoint': fields[3], 'allowed_ips': fields[4], 'latest_handshake': fields[5], 'transfer_rx': fields[6], 'transfer_tx': fields[7], 'persistent_keepalive': fields[8], } interfaces[interface_name]['peers'].append(peer) else: interfaces[interface_name] = { 'interface_name': interface_name, 'private_key': fields[1], 'public_key': fields[2], 'listen_port': fields[3], 'fwmark': fields[4], 'peers': [], } print(json.dumps(interfaces)) def subcommand_add_client(arguments): """Add a client.""" subprocess.run( ['wg', 'set', SERVER_INTERFACE, 'peer', arguments.publickey], check=True) def subcommand_remove_client(arguments): """Remove a client.""" subprocess.run( ['wg', 'set', SERVER_INTERFACE, 'peer', arguments.publickey, 'remove'], check=True) def subcommand_add_server(arguments): """Add a server.""" output = subprocess.check_output( ['wg', 'show', 'interfaces']).decode().strip() interfaces = output.split() interface_num = 1 new_interface_name = 'wg1' while new_interface_name in interfaces: interface_num += 1 new_interface_name = 'wg' + str(interface_num) subprocess.run( ['ip', 'link', 'add', 'dev', new_interface_name, 'type', 'wireguard'], check=True) args = ['wg', 'set', new_interface_name, 'peer', arguments.public_key] if arguments.pre_shared_key: args += ['preshared-key', arguments.pre_shared_key] args += ['endpoint', arguments.endpoint] subprocess.run(args, check=True) def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()