#!/usr/bin/python3 # # This file is part of FreedomBox. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # """ Configuration helper for WireGuard. """ import argparse import json import os import pathlib import subprocess import sys from plinth import network PUBLIC_KEY_HELP = 'Public key for the client' SERVER_INTERFACE = 'wg0' KEY_FOLDER = pathlib.Path('/var/lib/freedombox/wireguard') PRIVATE_KEY_PATH = KEY_FOLDER / 'privatekey' PUBLIC_KEY_PATH = KEY_FOLDER / 'publickey' class InterfaceNotFoundError(Exception): """Exception raised when no matching interface is found.""" def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') subparsers.add_parser('setup', help='Setup WireGuard') subparsers.add_parser('get-info', help='Get info for each configured interface') add_client = subparsers.add_parser('add-client', help='Add a client') add_client.add_argument('publickey', help=PUBLIC_KEY_HELP) remove_client = subparsers.add_parser('remove-client', help='Remove a client') remove_client.add_argument('publickey', help=PUBLIC_KEY_HELP) add_server = subparsers.add_parser('add-server', help='Add a server') add_server.add_argument('--endpoint', required=True, help='Server endpoint') add_server.add_argument('--client-ip', required=True, help='Client IP address provided by server') add_server.add_argument('--public-key', required=True, help='Public key of the server') add_server.add_argument( '--all-outgoing', action='store_true', help='Use this connection to send all outgoing traffic') modify_server = subparsers.add_parser('modify-server', help='Modify a server') modify_server.add_argument('--endpoint', required=True, help='Server endpoint') modify_server.add_argument('--client-ip', required=True, help='Client IP address provided by server') modify_server.add_argument('--public-key', required=True, help='Public key of the server') modify_server.add_argument('--pre-shared-key', help='Pre-shared key') modify_server.add_argument( '--all-outgoing', action='store_true', help='Use this connection to send all outgoing traffic') remove_server = subparsers.add_parser('remove-server', help='Remove a server') remove_server.add_argument('publickey', help=PUBLIC_KEY_HELP) subparsers.required = True return parser.parse_args() def _generate_key_pair(): """Generate private/public key pair.""" private_key = subprocess.check_output(['wg', 'genkey']) public_key = subprocess.check_output(['wg', 'pubkey'], input=private_key) KEY_FOLDER.mkdir(parents=True, exist_ok=True) with PUBLIC_KEY_PATH.open(mode='wb') as public_key_file: public_key_file.write(public_key) old_umask = os.umask(0o077) try: with PRIVATE_KEY_PATH.open(mode='wb') as private_key_file: private_key_file.write(private_key) finally: os.umask(old_umask) def subcommand_setup(_): """Setup WireGuard.""" # Create interface. try: subprocess.run(['ip', 'link', 'show', SERVER_INTERFACE], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError: subprocess.run([ 'ip', 'link', 'add', 'dev', SERVER_INTERFACE, 'type', 'wireguard' ], check=True) if not (PUBLIC_KEY_PATH.exists() and PRIVATE_KEY_PATH.exists()): _generate_key_pair() # Configure interface. subprocess.run([ 'wg', 'set', SERVER_INTERFACE, 'listen-port', '51820', 'private-key', str(PRIVATE_KEY_PATH) ], check=True) def _get_info(): """Return info for each configured interface.""" output = subprocess.check_output(['wg', 'show', 'all', 'dump']).decode().strip() lines = output.split('\n') interfaces = {} for line in lines: if not line: continue fields = line.split() interface_name = fields[0] if interface_name in interfaces: peer = { 'public_key': fields[1], 'preshared_key': fields[2], 'endpoint': fields[3], 'allowed_ips': fields[4], 'latest_handshake': fields[5], 'transfer_rx': fields[6], 'transfer_tx': fields[7], 'persistent_keepalive': fields[8], } interfaces[interface_name]['peers'].append(peer) else: interfaces[interface_name] = { 'interface_name': interface_name, 'private_key': fields[1], 'public_key': fields[2], 'listen_port': fields[3], 'fwmark': fields[4], 'peers': [], } return interfaces def subcommand_get_info(_): """Print info for each configured interface.""" print(json.dumps(_get_info())) def subcommand_add_client(arguments): """Add a client.""" subprocess.run( ['wg', 'set', SERVER_INTERFACE, 'peer', arguments.publickey], check=True) def subcommand_remove_client(arguments): """Remove a client.""" subprocess.run( ['wg', 'set', SERVER_INTERFACE, 'peer', arguments.publickey, 'remove'], check=True) def _find_next_interface(): """Find next unused wireguard interface name.""" output = subprocess.check_output(['wg', 'show', 'interfaces']).decode().strip() interfaces = output.split() interface_num = 1 new_interface_name = 'wg1' while new_interface_name in interfaces: interface_num += 1 new_interface_name = 'wg' + str(interface_num) return new_interface_name def _get_connection_settings(name, interface, endpoint, client_ip, public_key, client_private_key, pre_shared_key): """Return settings for Network Manager connection.""" if not client_private_key: with PRIVATE_KEY_PATH.open() as private_key_file: client_private_key = private_key_file.read().strip() return { 'common': { 'name': name, 'type': 'wireguard', 'interface': interface, 'zone': 'internal', }, 'ipv4': { 'method': 'manual', 'address': client_ip, 'netmask': '', 'gateway': '', 'dns': '', 'second_dns': '', }, 'wireguard': { 'private_key': client_private_key, 'peer_endpoint': endpoint, 'peer_public_key': public_key, 'preshared_key': pre_shared_key, }, } def subcommand_add_server(arguments): """Add a server.""" secret_args = json.loads(sys.stdin.read() or '{}') new_interface_name = _find_next_interface() subprocess.run( ['ip', 'link', 'add', 'dev', new_interface_name, 'type', 'wireguard'], check=True) connection_name = 'WireGuard-' + new_interface_name settings = _get_connection_settings(connection_name, new_interface_name, arguments.endpoint, arguments.client_ip, arguments.public_key, secret_args.get('client_private_key'), secret_args.get('pre_shared_key')) network.add_connection(settings) def subcommand_modify_server(arguments): """Modify a server.""" secret_args = json.loads(sys.stdin.read() or '{}') interfaces = _get_info() interfaces.pop(SERVER_INTERFACE, None) interface_to_modify = None for interface in interfaces.values(): if interface['peers']: peer = interface['peers'][0] if peer['public_key'] == arguments.public_key: interface_to_modify = interface['interface_name'] if interface_to_modify: connection = network.get_connection_by_interface_name( interface_to_modify) settings = _get_connection_settings( 'WireGuard-' + interface_to_modify, interface_to_modify, arguments.endpoint, arguments.client_ip, arguments.public_key, secret_args.get('client_private_key'), secret_args.get('pre_shared_key')) if connection: network.edit_connection(connection, settings) else: # XXX: raise error? network.add_connection(settings) else: raise InterfaceNotFoundError( 'Interface with peer %s not found' % arguments.public_key) def subcommand_remove_server(arguments): """Remove a server.""" interfaces = _get_info() interfaces.pop(SERVER_INTERFACE, None) interface_to_remove = None for interface in interfaces.values(): if interface['peers']: peer = interface['peers'][0] if peer['public_key'] == arguments.publickey: interface_to_remove = interface['interface_name'] if interface_to_remove: connection = network.get_connection_by_interface_name( interface_to_remove) if connection: network.delete_connection(connection.get_uuid()) subprocess.run(['ip', 'link', 'delete', interface_to_remove], check=True) else: raise InterfaceNotFoundError( 'Interface with peer %s not found' % arguments.publickey) def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()