Sunil Mohan Adapa 0c936512c4
wireguard: Us privileged decorator for actions
Tests:

- Functional tests work (uninstall test fails to no backup component,
  intermittent failure)
- Showing status information works
  - In the main app page for server and clients
  - When showing server details
  - When showing client details

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
2022-10-08 18:53:36 -04:00

285 lines
9.7 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Utilities for managing WireGuard."""
import datetime
import logging
import subprocess
import time
from plinth import app as app_module
from plinth import network
from plinth.utils import import_from_gi
from . import privileged
nm = import_from_gi('NM', '1.0')
IP_TEMPLATE = '10.84.0.{}'
logger = logging.getLogger(__name__)
def get_nm_info():
"""Get information from network manager."""
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
client = network.get_nm_client()
connections = {}
for connection in client.get_connections():
if connection.get_connection_type() != setting_name:
continue
settings = connection.get_setting_by_name(setting_name)
secrets = connection.get_secrets(setting_name)
connection.update_secrets(setting_name, secrets)
info = {}
info['interface'] = connection.get_interface_name()
info['private_key'] = settings.get_private_key()
info['public_key'] = None
info['listen_port'] = settings.get_listen_port()
info['fwmark'] = settings.get_fwmark()
info['mtu'] = settings.get_mtu()
info['default_route'] = settings.get_ip4_auto_default_route()
info['peers'] = {}
for peer_index in range(settings.get_peers_len()):
peer = settings.get_peer(peer_index)
peer_info = {
'endpoint': peer.get_endpoint(),
'public_key': peer.get_public_key(),
'preshared_key': peer.get_preshared_key(),
'persistent_keepalive': peer.get_persistent_keepalive(),
'allowed_ips': []
}
for index in range(peer.get_allowed_ips_len()):
allowed_ip = peer.get_allowed_ip(index, None)
peer_info['allowed_ips'].append(allowed_ip)
info['peers'][peer_info['public_key']] = peer_info
settings_ipv4 = connection.get_setting_ip4_config()
if settings_ipv4 and settings_ipv4.get_num_addresses():
info['ip_address'] = settings_ipv4.get_address(0).get_address()
connections[info['interface']] = info
return connections
def get_info():
"""Return server and clients info."""
status = privileged.get_info()
nm_info = get_nm_info()
my_server_info = None
my_client_servers = {}
for interface, info in nm_info.items():
if interface == 'wg0':
my_server_info = info
else:
my_client_servers[interface] = info
# If the NM connection is not active but the device link is up, 'wg
# show' will not show any public key configured on the interface.
if interface not in status or (interface in status and
not status[interface]['public_key']):
info['public_key'] = _get_public_key_from_private_key(
info['private_key'])
continue
info['public_key'] = status[interface]['public_key']
for status_peer in status[interface]['peers']:
if status_peer['latest_handshake']:
status_peer['latest_handshake'] = \
datetime.datetime.fromtimestamp(
status_peer['latest_handshake'])
public_key = status_peer['public_key']
info_peer = info['peers'].setdefault(public_key, {})
info_peer['status'] = status_peer
return {
'my_server': my_server_info,
'my_client': {
'servers': my_client_servers,
},
}
def enable_connections(enable):
"""Activate all connections and set them to auto-connect."""
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
client = network.get_nm_client()
for connection in client.get_connections():
if connection.get_connection_type() != setting_name:
continue
network.edit_connection(connection,
{'common': {
'autoconnect': enable
}})
if enable:
network.activate_connection(connection.get_uuid())
else:
try:
network.deactivate_connection(connection.get_uuid())
except network.ConnectionNotFound:
pass # Connection is already inactive
def _get_public_key_from_private_key(private_key):
process = subprocess.run(['wg', 'pubkey'], check=True, capture_output=True,
input=private_key.encode())
return process.stdout.decode()
def _generate_private_key():
"""Return a private key generated by 'wg' command."""
process = subprocess.run(['wg', 'genkey'], check=True, capture_output=True)
return process.stdout.decode().strip()
def _find_next_interface():
"""Find next unused wireguard interface name."""
output = subprocess.check_output(['wg', 'show',
'interfaces']).decode().strip()
interfaces = output.split()
interface_num = 1
new_interface_name = 'wg1'
while new_interface_name in interfaces:
interface_num += 1
new_interface_name = 'wg' + str(interface_num)
return new_interface_name
def add_server(settings):
"""Add information for connecting to a server."""
app = app_module.App.get('wireguard')
interface_name = _find_next_interface()
settings['common']['name'] = 'WireGuard-Client-' + interface_name
settings['common']['interface'] = interface_name
settings['common']['autoconnect'] = app.is_enabled()
if not settings['wireguard']['private_key']:
settings['wireguard']['private_key'] = _generate_private_key()
network.add_connection(settings)
def edit_server(interface, settings):
"""Edit information for connecting to a server."""
settings['common']['interface'] = interface
settings['common']['name'] = 'WireGuard-Client-' + interface
if not settings['wireguard']['private_key']:
settings['wireguard']['private_key'] = _generate_private_key()
connection = network.get_connection_by_interface_name(interface)
network.edit_connection(connection, settings)
network.reactivate_connection(connection.get_uuid())
def setup_server():
"""Setup a server connection that clients can connect to."""
app = app_module.App.get('wireguard')
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
private_key = _generate_private_key()
settings = {
'common': {
'name': 'WireGuard-Server-wg0',
'type': setting_name,
'zone': 'internal',
'interface': 'wg0',
'autoconnect': app.is_enabled(),
},
'ipv4': {
'method': 'manual',
'address': IP_TEMPLATE.format(1),
'netmask': '255.255.255.0',
'gateway': '',
'dns': '',
'second_dns': '',
},
'wireguard': {
'private_key': private_key,
'listen_port': 51820,
}
}
network.add_connection(settings)
logger.info('Created new WireGuard server connection')
def _get_next_available_ip_address(settings):
"""Get the next available IP address to allocate to a client."""
allocated_ips = set()
for peer_index in range(settings.get_peers_len()):
peer = settings.get_peer(peer_index)
for ip_index in range(peer.get_allowed_ips_len()):
allowed_ip = peer.get_allowed_ip(ip_index)
# We assume these are simple IP addresses but they can be subnets.
allocated_ips.add(allowed_ip)
for index in range(2, 254):
ip_address = IP_TEMPLATE.format(index)
if ip_address not in allocated_ips:
return ip_address
raise IndexError('Reached client limit')
def _server_connection():
"""Return a server connection. Create one if necessary."""
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
connection = network.get_connection_by_interface_name('wg0')
if not connection:
setup_server()
for _ in range(10):
# XXX: Improve this waiting by doing a synchronous D-Bus operation to
# add network manager connection instead.
time.sleep(1)
connection = network.get_connection_by_interface_name('wg0')
if connection:
break
if not connection:
raise RuntimeError('Unable to create a server connection.')
# Retrieve secrets so that when the connection is changed, secrets are
# preserved properly.
secrets = connection.get_secrets(setting_name)
connection.update_secrets(setting_name, secrets)
return connection
def add_client(public_key):
"""Add a permission for a client to connect our server."""
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
connection = _server_connection()
settings = connection.get_setting_by_name(setting_name)
peer, _ = settings.get_peer_by_public_key(public_key)
if peer:
raise ValueError('Peer with public key already exists')
peer = nm.WireGuardPeer.new()
peer.set_public_key(public_key, False)
peer.set_persistent_keepalive(25) # To keep NAT 'connections' alive
peer.append_allowed_ip(_get_next_available_ip_address(settings), False)
settings.append_peer(peer)
connection.commit_changes(True)
network.reactivate_connection(connection.get_uuid())
def remove_client(public_key):
"""Remove permission for a client to connect our server."""
setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
connection = _server_connection()
settings = connection.get_setting_by_name(setting_name)
peer, peer_index = settings.get_peer_by_public_key(public_key)
if not peer:
raise KeyError('Client not found')
settings.remove_peer(peer_index)
connection.commit_changes(True)
network.reactivate_connection(connection.get_uuid())