bind: Use privileged decorator for actions

Tests:

- Initial setup:
  - Creates zones directory
  - Write default configuration
  - named is restarted
- Forwarders
  - Setting forwarders works as expected.
  - Current list of forwarders is shown as expected
- List of served domains is shown properly

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2022-08-26 16:00:54 -07:00 committed by James Valleroy
parent 212364ba2a
commit cdb04bb46a
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
5 changed files with 231 additions and 283 deletions

View File

@ -1,62 +0,0 @@
#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Configuration helper for BIND server.
"""
import argparse
from pathlib import Path
from plinth import action_utils
from plinth.modules.bind import (CONFIG_FILE, DEFAULT_CONFIG, ZONES_DIR,
set_dnssec, set_forwarders)
def parse_arguments():
"""Return parsed command line arguments as dictionary"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
setup = subparsers.add_parser('setup', help='Setup for BIND')
setup.add_argument(
'--old-version', type=int, required=True,
help='Earlier version of the app that is already setup.')
configure = subparsers.add_parser('configure', help='Configure BIND')
configure.add_argument('--forwarders',
help='List of IP addresses, separated by space')
configure.add_argument('--dnssec', choices=['enable', 'disable'],
help='Enable or disable DNSSEC')
subparsers.required = True
return parser.parse_args()
def subcommand_setup(arguments):
"""Setup BIND configuration."""
if arguments.old_version == 0:
with open(CONFIG_FILE, 'w', encoding='utf-8') as conf_file:
conf_file.write(DEFAULT_CONFIG)
Path(ZONES_DIR).mkdir(exist_ok=True, parents=True)
action_utils.service_restart('named')
def subcommand_configure(arguments):
"""Configure BIND."""
set_forwarders(arguments.forwarders)
set_dnssec(arguments.dnssec)
action_utils.service_restart('named')
def main():
"""Parse arguments and perform all duties"""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()

View File

@ -1,16 +1,8 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
FreedomBox app to configure BIND server.
"""
"""FreedomBox app to configure BIND server."""
import re
from collections import defaultdict
from pathlib import Path
import augeas
from django.utils.translation import gettext_lazy as _
from plinth import actions
from plinth import app as app_module
from plinth import cfg, menu
from plinth.daemon import Daemon
@ -19,7 +11,7 @@ from plinth.modules.firewall.components import Firewall
from plinth.package import Packages, install
from plinth.utils import format_lazy
from . import manifest
from . import manifest, privileged
_description = [
_('BIND enables you to publish your Domain Name System (DNS) information '
@ -32,32 +24,6 @@ _description = [
box_name=_(cfg.box_name)),
]
CONFIG_FILE = '/etc/bind/named.conf.options'
ZONES_DIR = '/var/bind/pri'
DEFAULT_CONFIG = '''
acl goodclients {
localnets;
};
options {
directory "/var/cache/bind";
recursion yes;
allow-query { goodclients; };
forwarders {
};
forward first;
dnssec-enable yes;
dnssec-validation auto;
auth-nxdomain no; # conform to RFC1035
listen-on-v6 { any; };
};
'''
class BindApp(app_module.App):
"""FreedomBox app for Bind."""
@ -100,154 +66,10 @@ class BindApp(app_module.App):
def setup(self, old_version):
"""Install and configure the app."""
super().setup(old_version)
actions.superuser_run('bind',
['setup', '--old-version',
str(old_version)])
privileged.setup(old_version)
self.enable()
def force_upgrade(self, _packages):
"""Force upgrade the managed packages to resolve conffile prompt."""
install(['bind9'], force_configuration='old')
return True
def get_config():
"""Get current configuration"""
data = [line.strip() for line in open(CONFIG_FILE, 'r', encoding='utf-8')]
forwarders = ''
dnssec_enabled = False
flag = False
for line in data:
if re.match(r'^\s*forwarders\s+{', line):
flag = True
elif re.match(r'^\s*dnssec-enable\s+yes;', line):
dnssec_enabled = True
elif flag and '//' not in line:
forwarders = re.sub('[;]', '', line)
flag = False
conf = {
'forwarders': forwarders,
'enable_dnssec': dnssec_enabled,
}
return conf
def set_forwarders(forwarders):
"""Set DNS forwarders."""
flag = 0
data = [line.strip() for line in open(CONFIG_FILE, 'r', encoding='utf-8')]
conf_file = open(CONFIG_FILE, 'w', encoding='utf-8')
for line in data:
if re.match(r'^\s*forwarders\s+{', line):
conf_file.write(line + '\n')
for dns in forwarders.split():
conf_file.write(dns + '; ')
conf_file.write('\n')
flag = 1
elif '};' in line and flag == 1:
conf_file.write(line + '\n')
flag = 0
elif flag == 0:
conf_file.write(line + '\n')
conf_file.close()
def set_dnssec(choice):
"""Enable or disable DNSSEC."""
data = [line.strip() for line in open(CONFIG_FILE, 'r', encoding='utf-8')]
if choice == 'enable':
conf_file = open(CONFIG_FILE, 'w', encoding='utf-8')
for line in data:
if re.match(r'//\s*dnssec-enable\s+yes;', line):
line = line.lstrip('/')
conf_file.write(line + '\n')
conf_file.close()
if choice == 'disable':
conf_file = open(CONFIG_FILE, 'w', encoding='utf-8')
for line in data:
if re.match(r'^\s*dnssec-enable\s+yes;', line):
line = '//' + line
conf_file.write(line + '\n')
conf_file.close()
def get_served_domains():
"""
Augeas path for zone files:
===========================
augtool> print /files/var/bind/pri/local.zone
/files/var/bind/pri/local.zone
/files/var/bind/pri/local.zone/$TTL = "604800"
/files/var/bind/pri/local.zone/@[1]
/files/var/bind/pri/local.zone/@[1]/1
/files/var/bind/pri/local.zone/@[1]/1/class = "IN"
/files/var/bind/pri/local.zone/@[1]/1/type = "SOA"
/files/var/bind/pri/local.zone/@[1]/1/mname = "localhost."
/files/var/bind/pri/local.zone/@[1]/1/rname = "root.localhost."
/files/var/bind/pri/local.zone/@[1]/1/serial = "2"
/files/var/bind/pri/local.zone/@[1]/1/refresh = "604800"
/files/var/bind/pri/local.zone/@[1]/1/retry = "86400"
/files/var/bind/pri/local.zone/@[1]/1/expiry = "2419200"
/files/var/bind/pri/local.zone/@[1]/1/minimum = "604800"
/files/var/bind/pri/local.zone/@[2]
/files/var/bind/pri/local.zone/@[2]/1
/files/var/bind/pri/local.zone/@[2]/1/class = "IN"
/files/var/bind/pri/local.zone/@[2]/1/type = "NS"
/files/var/bind/pri/local.zone/@[2]/1/rdata = "localhost."
/files/var/bind/pri/local.zone/@[3]
/files/var/bind/pri/local.zone/@[3]/1
/files/var/bind/pri/local.zone/@[3]/1/class = "IN"
/files/var/bind/pri/local.zone/@[3]/1/type = "A"
/files/var/bind/pri/local.zone/@[3]/1/rdata = "127.0.0.1"
/files/var/bind/pri/local.zone/@[4]
/files/var/bind/pri/local.zone/@[4]/1
/files/var/bind/pri/local.zone/@[4]/1/class = "IN"
/files/var/bind/pri/local.zone/@[4]/1/type = "AAAA"
/files/var/bind/pri/local.zone/@[4]/1/rdata = "::1"
Need to find the related functionality to parse the A records
Retrieve from /etc/bind/db* zone files all the configured A records.
Assuming zones files in ZONES_DIR are all used.
:return: dictionary in the form 'domain_name': ['ip_address', 'ipv6_addr']
"""
RECORD_TYPES = ('A', 'AAAA')
aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
augeas.Augeas.NO_MODL_AUTOLOAD)
aug.set('/augeas/load/Dns_Zone/lens', 'Dns_Zone.lns')
zone_file_path = Path(ZONES_DIR)
zone_files = [zf for zf in zone_file_path.iterdir() if zf.is_file()]
# augeas load only required files
for zone_file in zone_files:
aug.set('/augeas/load/Dns_Zone/incl[last() + 1]', str(zone_file))
aug.load()
served_domains = defaultdict(list)
for zone_file in zone_files:
base_path = '/files/%s/@[{record_order}]/1/{field}' % zone_file
count = 1
mname = aug.get(base_path.format(record_order=count, field='mname'))
while True:
record_type = aug.get(
base_path.format(record_order=count, field='type'))
# no record type ends the search
if record_type is None:
break
if record_type in RECORD_TYPES:
served_domains[mname].append(
aug.get(base_path.format(record_order=count,
field='rdata')))
count += 1
return served_domains

View File

@ -0,0 +1,198 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Configuration helper for BIND server."""
import re
from collections import defaultdict
from pathlib import Path
import augeas
from plinth import action_utils
from plinth.actions import privileged
CONFIG_FILE = '/etc/bind/named.conf.options'
ZONES_DIR = '/var/bind/pri'
DEFAULT_CONFIG = '''
acl goodclients {
localnets;
};
options {
directory "/var/cache/bind";
recursion yes;
allow-query { goodclients; };
forwarders {
};
forward first;
dnssec-enable yes;
dnssec-validation auto;
auth-nxdomain no; # conform to RFC1035
listen-on-v6 { any; };
};
'''
@privileged
def setup(old_version: int):
"""Setup BIND configuration."""
if old_version == 0:
with open(CONFIG_FILE, 'w', encoding='utf-8') as conf_file:
conf_file.write(DEFAULT_CONFIG)
Path(ZONES_DIR).mkdir(exist_ok=True, parents=True)
action_utils.service_restart('named')
@privileged
def configure(forwarders: str, dnssec: bool):
"""Configure BIND."""
_set_forwarders(forwarders)
_set_dnssec(dnssec)
action_utils.service_restart('named')
def get_config():
"""Get current configuration."""
data = [line.strip() for line in open(CONFIG_FILE, 'r', encoding='utf-8')]
forwarders = ''
dnssec_enabled = False
flag = False
for line in data:
if re.match(r'^\s*forwarders\s+{', line):
flag = True
elif re.match(r'^\s*dnssec-enable\s+yes;', line):
dnssec_enabled = True
elif flag and '//' not in line:
forwarders = re.sub('[;]', '', line)
flag = False
conf = {
'forwarders': forwarders,
'enable_dnssec': dnssec_enabled,
}
return conf
def _set_forwarders(forwarders):
"""Set DNS forwarders."""
flag = 0
data = [line.strip() for line in open(CONFIG_FILE, 'r', encoding='utf-8')]
conf_file = open(CONFIG_FILE, 'w', encoding='utf-8')
for line in data:
if re.match(r'^\s*forwarders\s+{', line):
conf_file.write(line + '\n')
for dns in forwarders.split():
conf_file.write(dns + '; ')
conf_file.write('\n')
flag = 1
elif '};' in line and flag == 1:
conf_file.write(line + '\n')
flag = 0
elif flag == 0:
conf_file.write(line + '\n')
conf_file.close()
def _set_dnssec(choice):
"""Enable or disable DNSSEC."""
data = [line.strip() for line in open(CONFIG_FILE, 'r', encoding='utf-8')]
if choice:
conf_file = open(CONFIG_FILE, 'w', encoding='utf-8')
for line in data:
if re.match(r'//\s*dnssec-enable\s+yes;', line):
line = line.lstrip('/')
conf_file.write(line + '\n')
conf_file.close()
else:
conf_file = open(CONFIG_FILE, 'w', encoding='utf-8')
for line in data:
if re.match(r'^\s*dnssec-enable\s+yes;', line):
line = '//' + line
conf_file.write(line + '\n')
conf_file.close()
def get_served_domains():
"""Return list of domains service handles.
Augeas path for zone files:
===========================
augtool> print /files/var/bind/pri/local.zone
/files/var/bind/pri/local.zone
/files/var/bind/pri/local.zone/$TTL = "604800"
/files/var/bind/pri/local.zone/@[1]
/files/var/bind/pri/local.zone/@[1]/1
/files/var/bind/pri/local.zone/@[1]/1/class = "IN"
/files/var/bind/pri/local.zone/@[1]/1/type = "SOA"
/files/var/bind/pri/local.zone/@[1]/1/mname = "localhost."
/files/var/bind/pri/local.zone/@[1]/1/rname = "root.localhost."
/files/var/bind/pri/local.zone/@[1]/1/serial = "2"
/files/var/bind/pri/local.zone/@[1]/1/refresh = "604800"
/files/var/bind/pri/local.zone/@[1]/1/retry = "86400"
/files/var/bind/pri/local.zone/@[1]/1/expiry = "2419200"
/files/var/bind/pri/local.zone/@[1]/1/minimum = "604800"
/files/var/bind/pri/local.zone/@[2]
/files/var/bind/pri/local.zone/@[2]/1
/files/var/bind/pri/local.zone/@[2]/1/class = "IN"
/files/var/bind/pri/local.zone/@[2]/1/type = "NS"
/files/var/bind/pri/local.zone/@[2]/1/rdata = "localhost."
/files/var/bind/pri/local.zone/@[3]
/files/var/bind/pri/local.zone/@[3]/1
/files/var/bind/pri/local.zone/@[3]/1/class = "IN"
/files/var/bind/pri/local.zone/@[3]/1/type = "A"
/files/var/bind/pri/local.zone/@[3]/1/rdata = "127.0.0.1"
/files/var/bind/pri/local.zone/@[4]
/files/var/bind/pri/local.zone/@[4]/1
/files/var/bind/pri/local.zone/@[4]/1/class = "IN"
/files/var/bind/pri/local.zone/@[4]/1/type = "AAAA"
/files/var/bind/pri/local.zone/@[4]/1/rdata = "::1"
Need to find the related functionality to parse the A records
Retrieve from /etc/bind/db* zone files all the configured A records.
Assuming zones files in ZONES_DIR are all used.
:return: dictionary in the form 'domain_name': ['ip_address', 'ipv6_addr']
"""
RECORD_TYPES = ('A', 'AAAA')
aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
augeas.Augeas.NO_MODL_AUTOLOAD)
aug.set('/augeas/load/Dns_Zone/lens', 'Dns_Zone.lns')
zone_file_path = Path(ZONES_DIR)
zone_files = [zf for zf in zone_file_path.iterdir() if zf.is_file()]
# augeas load only required files
for zone_file in zone_files:
aug.set('/augeas/load/Dns_Zone/incl[last() + 1]', str(zone_file))
aug.load()
served_domains = defaultdict(list)
for zone_file in zone_files:
base_path = '/files/%s/@[{record_order}]/1/{field}' % zone_file
count = 1
mname = aug.get(base_path.format(record_order=count, field='mname'))
while True:
record_type = aug.get(
base_path.format(record_order=count, field='type'))
# no record type ends the search
if record_type is None:
break
if record_type in RECORD_TYPES:
served_domains[mname].append(
aug.get(base_path.format(record_order=count,
field='rdata')))
count += 1
return served_domains

View File

@ -13,11 +13,11 @@ from plinth.modules import bind
def fixture_configuration_file(tmp_path):
"""Setup the a bind configuration file temporary directory."""
conf_file = tmp_path / 'named.conf.options'
conf_file.write_text(bind.DEFAULT_CONFIG)
old_config_file = bind.CONFIG_FILE
bind.CONFIG_FILE = str(conf_file)
conf_file.write_text(bind.privileged.DEFAULT_CONFIG)
old_config_file = bind.privileged.CONFIG_FILE
bind.privileged.CONFIG_FILE = str(conf_file)
yield
bind.CONFIG_FILE = old_config_file
bind.privileged.CONFIG_FILE = old_config_file
@pytest.fixture
@ -40,51 +40,49 @@ $TTL 604800
@ IN AAAA {aaaa_record}
""" # noqa
old_zones_dir = bind.ZONES_DIR
bind.ZONES_DIR = tmp_path
zones_dir_path = Path(bind.ZONES_DIR)
old_zones_dir = bind.privileged.ZONES_DIR
bind.privileged.ZONES_DIR = tmp_path
zones_dir_path = Path(bind.privileged.ZONES_DIR)
zones_dir_path.mkdir(exist_ok=True, parents=True)
local_path = zones_dir_path / "local.zone"
local_path.write_text(
test_zone_file.format(name='localhost.', a_record="127.0.0.1",
aaaa_record="::1")
)
aaaa_record="::1"))
custom_zone_path = zones_dir_path / "custom.zone"
custom_zone_path.write_text(
test_zone_file.format(name='custom.domain.', a_record="10.10.10.1",
aaaa_record="fe80::c6e9:84ff:fe16:95da")
)
aaaa_record="fe80::c6e9:84ff:fe16:95da"))
yield
local_path.unlink()
custom_zone_path.unlink()
bind.ZONES_DIR = old_zones_dir
bind.privileged.ZONES_DIR = old_zones_dir
@pytest.mark.usefixtures('configuration_file')
def test_set_forwarders():
"""Test that setting forwarders works."""
bind.set_forwarders('8.8.8.8 8.8.4.4')
conf = bind.get_config()
bind.privileged._set_forwarders('8.8.8.8 8.8.4.4')
conf = bind.privileged.get_config()
assert conf['forwarders'] == '8.8.8.8 8.8.4.4'
bind.set_forwarders('')
conf = bind.get_config()
bind.privileged._set_forwarders('')
conf = bind.privileged.get_config()
assert conf['forwarders'] == ''
@pytest.mark.usefixtures('configuration_file')
def test_enable_dnssec():
"""Test that enabling DNSSEC works."""
bind.set_dnssec('enable')
conf = bind.get_config()
bind.privileged._set_dnssec(True)
conf = bind.privileged.get_config()
assert conf['enable_dnssec']
bind.set_dnssec('disable')
conf = bind.get_config()
bind.privileged._set_dnssec(False)
conf = bind.privileged.get_config()
assert not conf['enable_dnssec']
@ -94,8 +92,9 @@ def test_get_correct_served_domains():
Test that get_served_domains collects the right a/aaaa records from zone
files
"""
served_domains = bind.get_served_domains()
served_domains = bind.privileged.get_served_domains()
assert served_domains['localhost.'] == ["127.0.0.1", "::1"]
assert served_domains['custom.domain.'] == [
"10.10.10.1", "fe80::c6e9:84ff:fe16:95da"]
"10.10.10.1", "fe80::c6e9:84ff:fe16:95da"
]

View File

@ -1,33 +1,28 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Views for BIND module.
"""
"""Views for BIND module."""
from django.contrib import messages
from django.utils.translation import gettext_lazy as _
from plinth import actions
from plinth.modules import bind, names
from plinth.modules import names
from plinth.views import AppView
from . import get_config
from . import privileged
from .forms import BindForm
class BindAppView(AppView): # pylint: disable=too-many-ancestors
"""A specialized view for configuring Bind."""
app_id = 'bind'
form_class = BindForm
template_name = 'bind.html'
def get_context_data(self, *args, **kwargs):
"""
Get/append information for domains bind is configured to respond for
and additional names from the names module
"""
"""Get/append information for domains and additional names."""
context = super().get_context_data(**kwargs)
served_domains = bind.get_served_domains()
served_domains = privileged.get_served_domains()
context['domains_table'] = []
for key, val in served_domains.items():
if key == 'localhost.':
@ -53,21 +48,17 @@ class BindAppView(AppView): # pylint: disable=too-many-ancestors
def get_initial(self):
"""Return the values to fill in the form."""
initial = super().get_initial()
initial.update(get_config())
initial.update(privileged.get_config())
return initial
def form_valid(self, form):
"""Change the configurations of Bind service."""
data = form.cleaned_data
old_config = get_config()
old_config = privileged.get_config()
if old_config['forwarders'] != data['forwarders'] \
or old_config['enable_dnssec'] != data['enable_dnssec']:
dnssec_setting = 'enable' if data['enable_dnssec'] else 'disable'
actions.superuser_run('bind', [
'configure', '--forwarders', data['forwarders'], '--dnssec',
dnssec_setting
])
privileged.configure(data['forwarders'], data['enable_dnssec'])
messages.success(self.request, _('Configuration updated'))
return super().form_valid(form)