Sunil Mohan Adapa 419559a86f
wireguard: Show public key even when connection is not active
When wireguard interface is not active 'wg show' does not provide any
information. In such case, get the public key by computing it from private key
by calling 'wg pubkey'.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
2020-01-18 13:53:55 -05:00

303 lines
10 KiB
Python

#
# This file is part of FreedomBox.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Utilities for managing WireGuard.
"""
import datetime
import json
import logging
import subprocess
import time
from plinth import actions, network
from plinth.utils import import_from_gi
nm = import_from_gi('NM', '1.0')
IP_TEMPLATE = '10.84.0.{}'
logger = logging.getLogger(__name__)
def get_nm_info():
"""Get information from network manager."""
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
client = network.get_nm_client()
connections = {}
for connection in client.get_connections():
if connection.get_connection_type() != setting_name:
continue
settings = connection.get_setting_by_name(setting_name)
secrets = connection.get_secrets(setting_name)
connection.update_secrets(setting_name, secrets)
info = {}
info['interface'] = connection.get_interface_name()
info['private_key'] = settings.get_private_key()
info['public_key'] = None
info['listen_port'] = settings.get_listen_port()
info['fwmark'] = settings.get_fwmark()
info['mtu'] = settings.get_mtu()
info['default_route'] = settings.get_ip4_auto_default_route()
info['peers'] = {}
for peer_index in range(settings.get_peers_len()):
peer = settings.get_peer(peer_index)
peer_info = {
'endpoint': peer.get_endpoint(),
'public_key': peer.get_public_key(),
'preshared_key': peer.get_preshared_key(),
'persistent_keepalive': peer.get_persistent_keepalive(),
'allowed_ips': []
}
for index in range(peer.get_allowed_ips_len()):
allowed_ip = peer.get_allowed_ip(index, None)
peer_info['allowed_ips'].append(allowed_ip)
info['peers'][peer_info['public_key']] = peer_info
settings_ipv4 = connection.get_setting_ip4_config()
if settings_ipv4 and settings_ipv4.get_num_addresses():
info['ip_address'] = settings_ipv4.get_address(0).get_address()
connections[info['interface']] = info
return connections
def get_info():
"""Return server and clients info."""
output = actions.superuser_run('wireguard', ['get-info'])
status = json.loads(output)
nm_info = get_nm_info()
my_server_info = None
my_client_servers = {}
for interface, info in nm_info.items():
if interface == 'wg0':
my_server_info = info
else:
my_client_servers[interface] = info
# If the NM connection is not active but the device link is up, 'wg
# show' will not show any public key configured on the interface.
if interface not in status or (interface in status and
not status[interface]['public_key']):
info['public_key'] = _get_public_key_from_private_key(
info['private_key'])
continue
info['public_key'] = status[interface]['public_key']
for status_peer in status[interface]['peers']:
if status_peer['latest_handshake']:
status_peer['latest_handshake'] = \
datetime.datetime.fromtimestamp(
status_peer['latest_handshake'])
public_key = status_peer['public_key']
info_peer = info['peers'].setdefault(public_key, {})
info_peer['status'] = status_peer
return {
'my_server': my_server_info,
'my_client': {
'servers': my_client_servers,
},
}
def enable_connections(enable):
"""Activate all connections and set them to auto-connect."""
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
client = network.get_nm_client()
for connection in client.get_connections():
if connection.get_connection_type() != setting_name:
continue
network.edit_connection(connection,
{'common': {
'autoconnect': enable
}})
if enable:
network.activate_connection(connection.get_uuid())
else:
try:
network.deactivate_connection(connection.get_uuid())
except network.ConnectionNotFound:
pass # Connection is already inactive
def _get_public_key_from_private_key(private_key):
process = subprocess.run(['wg', 'pubkey'], check=True, capture_output=True,
input=private_key.encode())
return process.stdout.decode()
def _generate_private_key():
"""Return a private key generated by 'wg' command."""
process = subprocess.run(['wg', 'genkey'], check=True, capture_output=True)
return process.stdout.decode().strip()
def _find_next_interface():
"""Find next unused wireguard interface name."""
output = subprocess.check_output(['wg', 'show',
'interfaces']).decode().strip()
interfaces = output.split()
interface_num = 1
new_interface_name = 'wg1'
while new_interface_name in interfaces:
interface_num += 1
new_interface_name = 'wg' + str(interface_num)
return new_interface_name
def add_server(settings):
"""Add information for connecting to a server."""
from plinth.modules.wireguard import app
interface_name = _find_next_interface()
settings['common']['name'] = 'WireGuard-Client-' + interface_name
settings['common']['interface'] = interface_name
settings['common']['autoconnect'] = app.is_enabled()
if not settings['wireguard']['private_key']:
settings['wireguard']['private_key'] = _generate_private_key()
network.add_connection(settings)
def edit_server(interface, settings):
"""Edit information for a connecting to a server."""
settings['common']['interface'] = interface
settings['common']['name'] = 'WireGuard-Client-' + interface
if not settings['wireguard']['private_key']:
settings['wireguard']['private_key'] = _generate_private_key()
connection = network.get_connection_by_interface_name(interface)
network.edit_connection(connection, settings)
network.reactivate_connection(connection.get_uuid())
def setup_server():
"""Setup a server connection that clients can connect to."""
from plinth.modules.wireguard import app
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
private_key = _generate_private_key()
settings = {
'common': {
'name': 'WireGuard-Server-wg0',
'type': setting_name,
'zone': 'internal',
'interface': 'wg0',
'autoconnect': app.is_enabled(),
},
'ipv4': {
'method': 'manual',
'address': IP_TEMPLATE.format(1),
'netmask': '255.255.255.0',
'gateway': '',
'dns': '',
'second_dns': '',
},
'wireguard': {
'private_key': private_key,
'listen_port': 51820,
}
}
network.add_connection(settings)
logger.info('Created new WireGuard server connection')
def _get_next_available_ip_address(settings):
"""Get the next available IP address to allocate to a client."""
allocated_ips = set()
for peer_index in range(settings.get_peers_len()):
peer = settings.get_peer(peer_index)
for ip_index in range(peer.get_allowed_ips_len()):
allowed_ip = peer.get_allowed_ip(ip_index)
# We assume these are simple IP addresses but they can be subnets.
allocated_ips.add(allowed_ip)
for index in range(2, 254):
ip_address = IP_TEMPLATE.format(index)
if ip_address not in allocated_ips:
return ip_address
raise IndexError('Reached client limit')
def _server_connection():
"""Return a server connection. Create one if necessary."""
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
connection = network.get_connection_by_interface_name('wg0')
if not connection:
setup_server()
for _ in range(10):
# XXX: Improve this waiting by doing a synchronous D-Bus operation to
# add network manager connection instead.
time.sleep(1)
connection = network.get_connection_by_interface_name('wg0')
if connection:
break
if not connection:
raise RuntimeError('Unable to create a server connection.')
# Retrieve secrets so that when the connection is changed, secrets are
# preserved properly.
secrets = connection.get_secrets(setting_name)
connection.update_secrets(setting_name, secrets)
return connection
def add_client(public_key):
"""Add a permission for a client to connect our server."""
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
connection = _server_connection()
settings = connection.get_setting_by_name(setting_name)
peer, _ = settings.get_peer_by_public_key(public_key)
if peer:
raise ValueError('Peer with public key already exists')
peer = nm.WireGuardPeer.new()
peer.set_public_key(public_key, False)
peer.set_persistent_keepalive(25) # To keep NAT 'connections' alive
peer.append_allowed_ip(_get_next_available_ip_address(settings), False)
settings.append_peer(peer)
connection.commit_changes(True)
network.reactivate_connection(connection.get_uuid())
def remove_client(public_key):
"""Remove permission for a client to connect our server."""
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
connection = _server_connection()
settings = connection.get_setting_by_name(setting_name)
peer, peer_index = settings.get_peer_by_public_key(public_key)
if not peer:
raise KeyError('Client not found')
settings.remove_peer(peer_index)
connection.commit_changes(True)
network.reactivate_connection(connection.get_uuid())