mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
This improves the user experience in many ways: - Help user understand if DNSSEC is being used on the current DNS server in case 'allow-fallback' is supported. - Nudges the user to explore enabling DNS-over-TLS and DNSSEC. - Help user understand how global vs. link specific configuration works. Help user understand if a global DNS is being used. - Show the list of fallback DNS servers being used (as this poses privacy concerns). Also helps with debugging in problematic situations: - Find out which DNS server is being used (and leading to problems) and show the cycling mechanism. Tests: - Enable/disable fallback DNS server in privacy app. See that fallback servers line is only shown when enabled. - Set various global values of DNS-over-TLS and DNSSEC and see the status changes. - Set various values of DNS-over-TLS in the network connection settings and see the changes in status. - Set DNSSEC to allow-fallback. Perform a query and see that the value of supported/unsupported changes. - Set DNS servers with special configuration file in /etc/systemd/resolved.conf.d/test.conf and restart systemd-resolved. See change in status page. Notice that if connection specific DNS server is set to an invalid server, global section has a current DNS server. - Set SNI domain name and port for the an IPv4 DNS and an IPv6 DNS. See that the display is as expected. - Raise an exception in get_status() and notice that an error alert is show properly. Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewd-by: Veiko Aasa <veiko17@disroot.org>
195 lines
6.4 KiB
Python
195 lines
6.4 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Module to interact with systemd-resolved over DBus."""
|
|
|
|
import ipaddress
|
|
import json
|
|
import subprocess
|
|
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from plinth.utils import import_from_gi
|
|
|
|
gio = import_from_gi('Gio', '2.0')
|
|
|
|
RESOLVE_NAME = 'org.freedesktop.resolve1'
|
|
RESOLVE_PATH = '/org/freedesktop/resolve1'
|
|
MANAGER_INTERFACE = 'org.freedesktop.resolve1.Manager'
|
|
LINK_INTERFACE = 'org.freedesktop.resolve1.Link'
|
|
|
|
|
|
class DNSServer:
|
|
"""Representation of a DNS server in systemd-resolved state."""
|
|
|
|
def __init__(self, link_index: int, address_class: int,
|
|
address_ints: list[int], port: int = 0,
|
|
domain_name: str | None = None):
|
|
self.link_index = link_index
|
|
self.address_class = address_class
|
|
self.address = ipaddress.ip_address(bytes(address_ints))
|
|
self.port = port
|
|
self.domain_name = domain_name
|
|
|
|
def __str__(self):
|
|
if self.port:
|
|
if self.address.version == 4: # IPv4
|
|
address_str = f'{self.address.compressed}:{self.port}'
|
|
else: # IPv6
|
|
address_str = f'[{self.address.compressed}]:{self.port}'
|
|
else:
|
|
address_str = self.address.compressed
|
|
|
|
if self.domain_name:
|
|
return f'{address_str} ({self.domain_name})'
|
|
|
|
return address_str
|
|
|
|
|
|
class Link:
|
|
"""systemd-resolved state for a particular link or global context."""
|
|
|
|
def __init__(self, connection, object_path, link_index: int = 0,
|
|
interface_name: str | None = None):
|
|
"""Fetch all the relevant properties for a link over DBus."""
|
|
if not link_index: # Global
|
|
interface = MANAGER_INTERFACE
|
|
else:
|
|
interface = LINK_INTERFACE
|
|
|
|
self.proxy = gio.DBusProxy.new_sync(connection,
|
|
gio.DBusProxyFlags.NONE, None,
|
|
RESOLVE_NAME, object_path,
|
|
interface)
|
|
|
|
self.link_index = link_index
|
|
self.interface_name = interface_name
|
|
|
|
self.dns_over_tls = self.proxy.get_cached_property(
|
|
'DNSOverTLS').unpack()
|
|
self.dnssec = self.proxy.get_cached_property('DNSSEC').unpack()
|
|
self.dnssec_supported = self.proxy.get_cached_property(
|
|
'DNSSECSupported')
|
|
|
|
self.dns_servers = self._new_dns_servers(
|
|
self.proxy.get_cached_property('DNSEx'))
|
|
|
|
self.fallback_dns_servers = None
|
|
if not link_index:
|
|
self.fallback_dns_servers = self._new_dns_servers(
|
|
self.proxy.get_cached_property('FallbackDNSEx'))
|
|
|
|
self.current_dns_server = self._new_dns_server(
|
|
self.proxy.get_cached_property('CurrentDNSServerEx'))
|
|
|
|
def get_link(self, link_index):
|
|
"""Return a string path to a link's DBus object."""
|
|
return self.proxy.GetLink('(i)', link_index)
|
|
|
|
@property
|
|
def dns_over_tls_string(self):
|
|
"""Return a string representation for DNS-over-TLS status."""
|
|
value_map = {
|
|
'yes': _('yes'),
|
|
'opportunistic': _('opportunistic'),
|
|
'no': _('no')
|
|
}
|
|
return value_map.get(self.dns_over_tls, self.dns_over_tls)
|
|
|
|
@property
|
|
def dnssec_string(self):
|
|
"""Return a string representation for DNSSEC status."""
|
|
value_map = {
|
|
'yes': _('yes'),
|
|
'allow-downgrade': _('allow-downgrade'),
|
|
'no': _('no')
|
|
}
|
|
return value_map.get(self.dnssec, self.dnssec)
|
|
|
|
@property
|
|
def dnssec_supported_string(self):
|
|
"""Return a string representation for whether DNSSEC is supported."""
|
|
return _('supported') if self.dnssec_supported else _('unsupported')
|
|
|
|
def _new_dns_servers(self, dns_tuples):
|
|
"""Return list of DNS Server objects from variant tuple.
|
|
|
|
Global DNS servers list also contains individual link DNS servers.
|
|
Ignore those.
|
|
"""
|
|
return [
|
|
self._new_dns_server(dns_tuple) for dns_tuple in dns_tuples
|
|
if self.link_index != 0 or dns_tuple[0] == 0
|
|
]
|
|
|
|
def _new_dns_server(self, dns_tuple):
|
|
"""Return a DNS Server object from variant tuple.
|
|
|
|
Tuple can be prefixed by link index in case of DNS server for global
|
|
context. Handle both cases. Entire tuple may be empty. Return None in
|
|
that case.
|
|
"""
|
|
if self.link_index: # Not global
|
|
dns_tuple = (self.link_index, ) + tuple(dns_tuple)
|
|
|
|
if not dns_tuple[2]: # Empty address
|
|
return None
|
|
|
|
return DNSServer(*dns_tuple)
|
|
|
|
def __str__(self):
|
|
dnssec_supported = ('supported'
|
|
if self.dnssec_supported else 'unspported')
|
|
value = ''
|
|
if not self.link_index:
|
|
value += 'Global\n'
|
|
else:
|
|
value = f'Link {self.link_index} ({self.interface_name})\n'
|
|
|
|
if self.current_dns_server:
|
|
value += f' Current DNS Server: {str(self.current_dns_server)}\n'
|
|
|
|
if self.dns_servers:
|
|
value += ' DNS Servers:\n'
|
|
for server in self.dns_servers:
|
|
value += f' {server}\n'
|
|
|
|
if self.fallback_dns_servers:
|
|
value += ' Fallback DNS Servers: \n'
|
|
for server in self.fallback_dns_servers:
|
|
value += f' {server}\n'
|
|
|
|
value += f' DNS-over-TLS: {self.dns_over_tls}\n'
|
|
value += f' DNSSEC: {self.dnssec}/{dnssec_supported}\n'
|
|
return value
|
|
|
|
|
|
def get_links():
|
|
"""Return the list of network interfaces and their indices."""
|
|
process = subprocess.run(['ip', '--json', 'link'], stdout=subprocess.PIPE,
|
|
check=True)
|
|
output = json.loads(process.stdout.decode())
|
|
|
|
links = {} # Maintain link index order
|
|
for entry in output:
|
|
links[entry['ifindex']] = entry['ifname']
|
|
|
|
return links
|
|
|
|
|
|
def get_status():
|
|
"""Return the current status of systemd-resolved daemon."""
|
|
link_status = []
|
|
connection = gio.bus_get_sync(gio.BusType.SYSTEM)
|
|
global_link = Link(connection, RESOLVE_PATH)
|
|
link_status.append(global_link)
|
|
|
|
links = get_links()
|
|
for link_index, interface_name in links.items():
|
|
if interface_name == 'lo':
|
|
continue
|
|
|
|
link_path = global_link.get_link(link_index)
|
|
link_status.append(
|
|
Link(connection, link_path, link_index, interface_name))
|
|
|
|
return link_status
|