Sunil Mohan Adapa a1070bf319
names: Show systemd-resolved status in the names page
This improves the user experience in many ways:

- Help user understand if DNSSEC is being used on the current DNS server in case
'allow-fallback' is supported.

- Nudges the user to explore enabling DNS-over-TLS and DNSSEC.

- Help user understand how global vs. link specific configuration works. Help
user understand if a global DNS is being used.

- Show the list of fallback DNS servers being used (as this poses privacy
concerns).

Also helps with debugging in problematic situations:

- Find out which DNS server is being used (and leading to problems) and show the
cycling mechanism.

Tests:

- Enable/disable fallback DNS server in privacy app. See that fallback servers
line is only shown when enabled.

- Set various global values of DNS-over-TLS and DNSSEC and see the status
changes.

- Set various values of DNS-over-TLS in the network connection settings and see
the changes in status.

- Set DNSSEC to allow-fallback. Perform a query and see that the value of
supported/unsupported changes.

- Set DNS servers with special configuration file in
/etc/systemd/resolved.conf.d/test.conf and restart systemd-resolved. See change
in status page. Notice that if connection specific DNS server is set to an
invalid server, global section has a current DNS server.

- Set SNI domain name and port for the an IPv4 DNS and an IPv6 DNS. See that the
display is as expected.

- Raise an exception in get_status() and notice that an error alert is show
properly.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewd-by: Veiko Aasa <veiko17@disroot.org>
2024-09-07 22:58:48 +03:00

195 lines
6.4 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Module to interact with systemd-resolved over DBus."""
import ipaddress
import json
import subprocess
from django.utils.translation import gettext_lazy as _
from plinth.utils import import_from_gi
gio = import_from_gi('Gio', '2.0')
RESOLVE_NAME = 'org.freedesktop.resolve1'
RESOLVE_PATH = '/org/freedesktop/resolve1'
MANAGER_INTERFACE = 'org.freedesktop.resolve1.Manager'
LINK_INTERFACE = 'org.freedesktop.resolve1.Link'
class DNSServer:
"""Representation of a DNS server in systemd-resolved state."""
def __init__(self, link_index: int, address_class: int,
address_ints: list[int], port: int = 0,
domain_name: str | None = None):
self.link_index = link_index
self.address_class = address_class
self.address = ipaddress.ip_address(bytes(address_ints))
self.port = port
self.domain_name = domain_name
def __str__(self):
if self.port:
if self.address.version == 4: # IPv4
address_str = f'{self.address.compressed}:{self.port}'
else: # IPv6
address_str = f'[{self.address.compressed}]:{self.port}'
else:
address_str = self.address.compressed
if self.domain_name:
return f'{address_str} ({self.domain_name})'
return address_str
class Link:
"""systemd-resolved state for a particular link or global context."""
def __init__(self, connection, object_path, link_index: int = 0,
interface_name: str | None = None):
"""Fetch all the relevant properties for a link over DBus."""
if not link_index: # Global
interface = MANAGER_INTERFACE
else:
interface = LINK_INTERFACE
self.proxy = gio.DBusProxy.new_sync(connection,
gio.DBusProxyFlags.NONE, None,
RESOLVE_NAME, object_path,
interface)
self.link_index = link_index
self.interface_name = interface_name
self.dns_over_tls = self.proxy.get_cached_property(
'DNSOverTLS').unpack()
self.dnssec = self.proxy.get_cached_property('DNSSEC').unpack()
self.dnssec_supported = self.proxy.get_cached_property(
'DNSSECSupported')
self.dns_servers = self._new_dns_servers(
self.proxy.get_cached_property('DNSEx'))
self.fallback_dns_servers = None
if not link_index:
self.fallback_dns_servers = self._new_dns_servers(
self.proxy.get_cached_property('FallbackDNSEx'))
self.current_dns_server = self._new_dns_server(
self.proxy.get_cached_property('CurrentDNSServerEx'))
def get_link(self, link_index):
"""Return a string path to a link's DBus object."""
return self.proxy.GetLink('(i)', link_index)
@property
def dns_over_tls_string(self):
"""Return a string representation for DNS-over-TLS status."""
value_map = {
'yes': _('yes'),
'opportunistic': _('opportunistic'),
'no': _('no')
}
return value_map.get(self.dns_over_tls, self.dns_over_tls)
@property
def dnssec_string(self):
"""Return a string representation for DNSSEC status."""
value_map = {
'yes': _('yes'),
'allow-downgrade': _('allow-downgrade'),
'no': _('no')
}
return value_map.get(self.dnssec, self.dnssec)
@property
def dnssec_supported_string(self):
"""Return a string representation for whether DNSSEC is supported."""
return _('supported') if self.dnssec_supported else _('unsupported')
def _new_dns_servers(self, dns_tuples):
"""Return list of DNS Server objects from variant tuple.
Global DNS servers list also contains individual link DNS servers.
Ignore those.
"""
return [
self._new_dns_server(dns_tuple) for dns_tuple in dns_tuples
if self.link_index != 0 or dns_tuple[0] == 0
]
def _new_dns_server(self, dns_tuple):
"""Return a DNS Server object from variant tuple.
Tuple can be prefixed by link index in case of DNS server for global
context. Handle both cases. Entire tuple may be empty. Return None in
that case.
"""
if self.link_index: # Not global
dns_tuple = (self.link_index, ) + tuple(dns_tuple)
if not dns_tuple[2]: # Empty address
return None
return DNSServer(*dns_tuple)
def __str__(self):
dnssec_supported = ('supported'
if self.dnssec_supported else 'unspported')
value = ''
if not self.link_index:
value += 'Global\n'
else:
value = f'Link {self.link_index} ({self.interface_name})\n'
if self.current_dns_server:
value += f' Current DNS Server: {str(self.current_dns_server)}\n'
if self.dns_servers:
value += ' DNS Servers:\n'
for server in self.dns_servers:
value += f' {server}\n'
if self.fallback_dns_servers:
value += ' Fallback DNS Servers: \n'
for server in self.fallback_dns_servers:
value += f' {server}\n'
value += f' DNS-over-TLS: {self.dns_over_tls}\n'
value += f' DNSSEC: {self.dnssec}/{dnssec_supported}\n'
return value
def get_links():
"""Return the list of network interfaces and their indices."""
process = subprocess.run(['ip', '--json', 'link'], stdout=subprocess.PIPE,
check=True)
output = json.loads(process.stdout.decode())
links = {} # Maintain link index order
for entry in output:
links[entry['ifindex']] = entry['ifname']
return links
def get_status():
"""Return the current status of systemd-resolved daemon."""
link_status = []
connection = gio.bus_get_sync(gio.BusType.SYSTEM)
global_link = Link(connection, RESOLVE_PATH)
link_status.append(global_link)
links = get_links()
for link_index, interface_name in links.items():
if interface_name == 'lo':
continue
link_path = global_link.get_link(link_index)
link_status.append(
Link(connection, link_path, link_index, interface_name))
return link_status