From 79e48310a1348b9906f2f04789d4ea1e7f8cf533 Mon Sep 17 00:00:00 2001 From: Sunil Mohan Adapa Date: Fri, 2 Sep 2022 08:39:13 -0700 Subject: [PATCH] dynamicdns: Use privileged decorator for actions Tests: - Functional tests work. - Initial setup works. - Setting the setup version to 1 and running the service upgrades to version 2. During this, export_config() and clean() work successfully. Signed-off-by: Sunil Mohan Adapa Reviewed-by: James Valleroy --- actions/dynamicdns | 162 +----------------------- plinth/modules/dynamicdns/__init__.py | 8 +- plinth/modules/dynamicdns/privileged.py | 113 +++++++++++++++++ 3 files changed, 122 insertions(+), 161 deletions(-) create mode 100644 plinth/modules/dynamicdns/privileged.py diff --git a/actions/dynamicdns b/actions/dynamicdns index 3cbe07793..9f035ea7c 100755 --- a/actions/dynamicdns +++ b/actions/dynamicdns @@ -1,159 +1,9 @@ #!/usr/bin/python3 # SPDX-License-Identifier: AGPL-3.0-or-later +"""Legacy configuration helper for Dynamic DNS, kept for compatibility. + +Cron jobs in the earlier implementation used to call into this script with the +sub-commands 'update' and 'success'. This action script now allows for any +arbitrary sub-command to be called and does nothing. It can be removed after +the release of Debian 12 (bookworm). """ -Configuration helper for Dynamic DNS. -""" - -import argparse -import json -import pathlib -import urllib - -_conf_dir = pathlib.Path('/etc/ez-ipupdate/') -_active_config = _conf_dir / 'ez-ipupdate.conf' -_inactive_config = _conf_dir / 'ez-ipupdate.inactive' -_helper_config = _conf_dir / 'ez-ipupdate-plinth.cfg' -_cron_job = pathlib.Path('/etc/cron.d/ez-ipupdate') - - -def parse_arguments(): - """ Return parsed command line arguments as dictionary. """ - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') - - subparsers.add_parser('export-config', - help='Print configuration in JSON format') - subparsers.add_parser('clean', help='Remove all old configuration files') - - subparsers.add_parser('update', help='For backwards compatibility') - subparser = subparsers.add_parser('success', - help='For backwards compatibility') - subparser.add_argument('wan_ip_address') - - subparsers.required = True - return parser.parse_args() - - -def _read_configuration(path, separator='='): - """Read ez-ipupdate configuration.""" - config = {} - for line in path.read_text().splitlines(): - if line.startswith('#'): - continue - - parts = line.partition(separator) - if parts[1]: - config[parts[0].strip()] = parts[2].strip() - else: - config[parts[0].strip()] = True - - return config - - -def subcommand_export_config(_): - """Print the old ez-ipupdate configuration in JSON format.""" - input_config = {} - if _active_config.exists(): - input_config = _read_configuration(_active_config) - elif _inactive_config.exists(): - input_config = _read_configuration(_inactive_config) - - helper = {} - if _helper_config.exists(): - helper.update(_read_configuration(_helper_config, separator=' ')) - - def _clean(value): - value_map = {'enabled': True, 'disabled': False, '': None} - return value_map.get(value, value) - - domain = { - 'service_type': 'gnudip', - 'domain': input_config.get('host'), - 'server': input_config.get('server'), - 'username': input_config.get('user', '').split(':')[0] or None, - 'password': input_config.get('user', '').split(':')[-1] or None, - 'ip_lookup_url': helper.get('IPURL'), - 'update_url': _clean(helper.get('POSTURL')) or None, - 'use_http_basic_auth': _clean(helper.get('POSTAUTH')), - 'disable_ssl_cert_check': _clean(helper.get('POSTSSLIGNORE')), - 'use_ipv6': _clean(helper.get('POSTUSEIPV6')), - } - - if isinstance(domain['update_url'], bool): - # 'POSTURL ' is a line found in the configuration file - domain['update_url'] = None - - if not domain['server']: - domain['service_type'] = 'other' - update_url = domain['update_url'] - try: - server = urllib.parse.urlparse(update_url).netloc - service_types = { - 'dynupdate.noip.com': 'noip.com', - 'dynupdate.no-ip.com': 'noip.com', - 'freedns.afraid.org': 'freedns.afraid.org' - } - domain['service_type'] = service_types.get(server, 'other') - except ValueError: - pass - - # Old logic for 'enabling' the app is as follows: If behind NAT, add - # cronjob. If not behind NAT and type is update URL, add cronjob. If not - # behind NAT and type is GnuDIP, move inactive configuration to active - # configuration and start the ez-ipupdate daemon. - enabled = False - if _cron_job.exists() or (domain['service_type'] == 'gnudip' - and _active_config.exists()): - enabled = True - - output_config = {'enabled': enabled, 'domains': {}} - if domain['domain']: - output_config['domains'][domain['domain']] = domain - - print(json.dumps(output_config)) - - -def subcommand_clean(_): - """Remove all old configuration files.""" - last_update = _conf_dir / 'last-update' - status = _conf_dir / 'ez-ipupdate.status' - current_ip = _conf_dir / 'ez-ipupdate.currentIP' - - cleanup_files = [ - _active_config, _inactive_config, last_update, _helper_config, status, - current_ip - ] - for cleanup_file in cleanup_files: - try: - cleanup_file.rename(cleanup_file.with_suffix('.bak')) - except FileNotFoundError: - pass - - _cron_job.unlink(missing_ok=True) - - -def subcommand_update(_): - """Empty subcommand kept only for backwards compatibility. - - Drop after stable release. - """ - - -def subcommand_success(_): - """Empty subcommand kept only for backwards compatibility. - - Drop after stable release. - """ - - -def main(): - """Parse arguments and perform all duties.""" - arguments = parse_arguments() - - subcommand = arguments.subcommand.replace('-', '_') - subcommand_method = globals()['subcommand_' + subcommand] - subcommand_method(arguments) - - -if __name__ == '__main__': - main() diff --git a/plinth/modules/dynamicdns/__init__.py b/plinth/modules/dynamicdns/__init__.py index 79688396f..a6904e0c5 100644 --- a/plinth/modules/dynamicdns/__init__.py +++ b/plinth/modules/dynamicdns/__init__.py @@ -11,7 +11,6 @@ import urllib from django.utils.translation import gettext_lazy as _ -from plinth import actions from plinth import app as app_module from plinth import cfg, glib, kvstore, menu from plinth.modules.backups.components import BackupRestore @@ -20,7 +19,7 @@ from plinth.modules.users.components import UsersAndGroups from plinth.signals import domain_added, domain_removed from plinth.utils import format_lazy -from . import gnudip, manifest +from . import gnudip, manifest, privileged logger = logging.getLogger(__name__) @@ -103,8 +102,7 @@ class DynamicDNSApp(app_module.App): self.enable() if old_version == 1: - config = actions.superuser_run('dynamicdns', ['export-config']) - config = json.loads(config) + config = privileged.export_config() if config['enabled']: self.enable() else: @@ -112,7 +110,7 @@ class DynamicDNSApp(app_module.App): del config['enabled'] set_config(config) - actions.superuser_run('dynamicdns', ['clean']) + privileged.clean() def _query_external_address(domain): diff --git a/plinth/modules/dynamicdns/privileged.py b/plinth/modules/dynamicdns/privileged.py new file mode 100644 index 000000000..79d618a33 --- /dev/null +++ b/plinth/modules/dynamicdns/privileged.py @@ -0,0 +1,113 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Configuration helper for Dynamic DNS.""" + +import pathlib +import urllib + +from plinth.actions import privileged + +_conf_dir = pathlib.Path('/etc/ez-ipupdate/') +_active_config = _conf_dir / 'ez-ipupdate.conf' +_inactive_config = _conf_dir / 'ez-ipupdate.inactive' +_helper_config = _conf_dir / 'ez-ipupdate-plinth.cfg' +_cron_job = pathlib.Path('/etc/cron.d/ez-ipupdate') + + +def _read_configuration(path, separator='='): + """Read ez-ipupdate configuration.""" + config = {} + for line in path.read_text().splitlines(): + if line.startswith('#'): + continue + + parts = line.partition(separator) + if parts[1]: + config[parts[0].strip()] = parts[2].strip() + else: + config[parts[0].strip()] = True + + return config + + +@privileged +def export_config() -> dict[str, object]: + """Return the old ez-ipupdate configuration in JSON format.""" + input_config = {} + if _active_config.exists(): + input_config = _read_configuration(_active_config) + elif _inactive_config.exists(): + input_config = _read_configuration(_inactive_config) + + helper = {} + if _helper_config.exists(): + helper.update(_read_configuration(_helper_config, separator=' ')) + + def _clean(value): + value_map = {'enabled': True, 'disabled': False, '': None} + return value_map.get(value, value) + + domain = { + 'service_type': 'gnudip', + 'domain': input_config.get('host'), + 'server': input_config.get('server'), + 'username': input_config.get('user', '').split(':')[0] or None, + 'password': input_config.get('user', '').split(':')[-1] or None, + 'ip_lookup_url': helper.get('IPURL'), + 'update_url': _clean(helper.get('POSTURL')) or None, + 'use_http_basic_auth': _clean(helper.get('POSTAUTH')), + 'disable_ssl_cert_check': _clean(helper.get('POSTSSLIGNORE')), + 'use_ipv6': _clean(helper.get('POSTUSEIPV6')), + } + + if isinstance(domain['update_url'], bool): + # 'POSTURL ' is a line found in the configuration file + domain['update_url'] = None + + if not domain['server']: + domain['service_type'] = 'other' + update_url = domain['update_url'] + try: + server = urllib.parse.urlparse(update_url).netloc + service_types = { + 'dynupdate.noip.com': 'noip.com', + 'dynupdate.no-ip.com': 'noip.com', + 'freedns.afraid.org': 'freedns.afraid.org' + } + domain['service_type'] = service_types.get(server, 'other') + except ValueError: + pass + + # Old logic for 'enabling' the app is as follows: If behind NAT, add + # cronjob. If not behind NAT and type is update URL, add cronjob. If not + # behind NAT and type is GnuDIP, move inactive configuration to active + # configuration and start the ez-ipupdate daemon. + enabled = False + if _cron_job.exists() or (domain['service_type'] == 'gnudip' + and _active_config.exists()): + enabled = True + + output_config = {'enabled': enabled, 'domains': {}} + if domain['domain']: + output_config['domains'][domain['domain']] = domain + + return output_config + + +@privileged +def clean(): + """Remove all old configuration files.""" + last_update = _conf_dir / 'last-update' + status = _conf_dir / 'ez-ipupdate.status' + current_ip = _conf_dir / 'ez-ipupdate.currentIP' + + cleanup_files = [ + _active_config, _inactive_config, last_update, _helper_config, status, + current_ip + ] + for cleanup_file in cleanup_files: + try: + cleanup_file.rename(cleanup_file.with_suffix('.bak')) + except FileNotFoundError: + pass + + _cron_job.unlink(missing_ok=True)