dynamicdns: Use privileged decorator for actions

Tests:

- Functional tests work.
- Initial setup works.
- Setting the setup version to 1 and running the service upgrades to version 2.
  During this, export_config() and clean() work successfully.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2022-09-02 08:39:13 -07:00 committed by James Valleroy
parent 884e0d69ef
commit 79e48310a1
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
3 changed files with 122 additions and 161 deletions

View File

@ -1,159 +1,9 @@
#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Legacy configuration helper for Dynamic DNS, kept for compatibility.
Cron jobs in the earlier implementation used to call into this script with the
sub-commands 'update' and 'success'. This action script now allows for any
arbitrary sub-command to be called and does nothing. It can be removed after
the release of Debian 12 (bookworm).
"""
Configuration helper for Dynamic DNS.
"""
import argparse
import json
import pathlib
import urllib
_conf_dir = pathlib.Path('/etc/ez-ipupdate/')
_active_config = _conf_dir / 'ez-ipupdate.conf'
_inactive_config = _conf_dir / 'ez-ipupdate.inactive'
_helper_config = _conf_dir / 'ez-ipupdate-plinth.cfg'
_cron_job = pathlib.Path('/etc/cron.d/ez-ipupdate')
def parse_arguments():
""" Return parsed command line arguments as dictionary. """
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
subparsers.add_parser('export-config',
help='Print configuration in JSON format')
subparsers.add_parser('clean', help='Remove all old configuration files')
subparsers.add_parser('update', help='For backwards compatibility')
subparser = subparsers.add_parser('success',
help='For backwards compatibility')
subparser.add_argument('wan_ip_address')
subparsers.required = True
return parser.parse_args()
def _read_configuration(path, separator='='):
"""Read ez-ipupdate configuration."""
config = {}
for line in path.read_text().splitlines():
if line.startswith('#'):
continue
parts = line.partition(separator)
if parts[1]:
config[parts[0].strip()] = parts[2].strip()
else:
config[parts[0].strip()] = True
return config
def subcommand_export_config(_):
"""Print the old ez-ipupdate configuration in JSON format."""
input_config = {}
if _active_config.exists():
input_config = _read_configuration(_active_config)
elif _inactive_config.exists():
input_config = _read_configuration(_inactive_config)
helper = {}
if _helper_config.exists():
helper.update(_read_configuration(_helper_config, separator=' '))
def _clean(value):
value_map = {'enabled': True, 'disabled': False, '': None}
return value_map.get(value, value)
domain = {
'service_type': 'gnudip',
'domain': input_config.get('host'),
'server': input_config.get('server'),
'username': input_config.get('user', '').split(':')[0] or None,
'password': input_config.get('user', '').split(':')[-1] or None,
'ip_lookup_url': helper.get('IPURL'),
'update_url': _clean(helper.get('POSTURL')) or None,
'use_http_basic_auth': _clean(helper.get('POSTAUTH')),
'disable_ssl_cert_check': _clean(helper.get('POSTSSLIGNORE')),
'use_ipv6': _clean(helper.get('POSTUSEIPV6')),
}
if isinstance(domain['update_url'], bool):
# 'POSTURL ' is a line found in the configuration file
domain['update_url'] = None
if not domain['server']:
domain['service_type'] = 'other'
update_url = domain['update_url']
try:
server = urllib.parse.urlparse(update_url).netloc
service_types = {
'dynupdate.noip.com': 'noip.com',
'dynupdate.no-ip.com': 'noip.com',
'freedns.afraid.org': 'freedns.afraid.org'
}
domain['service_type'] = service_types.get(server, 'other')
except ValueError:
pass
# Old logic for 'enabling' the app is as follows: If behind NAT, add
# cronjob. If not behind NAT and type is update URL, add cronjob. If not
# behind NAT and type is GnuDIP, move inactive configuration to active
# configuration and start the ez-ipupdate daemon.
enabled = False
if _cron_job.exists() or (domain['service_type'] == 'gnudip'
and _active_config.exists()):
enabled = True
output_config = {'enabled': enabled, 'domains': {}}
if domain['domain']:
output_config['domains'][domain['domain']] = domain
print(json.dumps(output_config))
def subcommand_clean(_):
"""Remove all old configuration files."""
last_update = _conf_dir / 'last-update'
status = _conf_dir / 'ez-ipupdate.status'
current_ip = _conf_dir / 'ez-ipupdate.currentIP'
cleanup_files = [
_active_config, _inactive_config, last_update, _helper_config, status,
current_ip
]
for cleanup_file in cleanup_files:
try:
cleanup_file.rename(cleanup_file.with_suffix('.bak'))
except FileNotFoundError:
pass
_cron_job.unlink(missing_ok=True)
def subcommand_update(_):
"""Empty subcommand kept only for backwards compatibility.
Drop after stable release.
"""
def subcommand_success(_):
"""Empty subcommand kept only for backwards compatibility.
Drop after stable release.
"""
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()

View File

@ -11,7 +11,6 @@ import urllib
from django.utils.translation import gettext_lazy as _
from plinth import actions
from plinth import app as app_module
from plinth import cfg, glib, kvstore, menu
from plinth.modules.backups.components import BackupRestore
@ -20,7 +19,7 @@ from plinth.modules.users.components import UsersAndGroups
from plinth.signals import domain_added, domain_removed
from plinth.utils import format_lazy
from . import gnudip, manifest
from . import gnudip, manifest, privileged
logger = logging.getLogger(__name__)
@ -103,8 +102,7 @@ class DynamicDNSApp(app_module.App):
self.enable()
if old_version == 1:
config = actions.superuser_run('dynamicdns', ['export-config'])
config = json.loads(config)
config = privileged.export_config()
if config['enabled']:
self.enable()
else:
@ -112,7 +110,7 @@ class DynamicDNSApp(app_module.App):
del config['enabled']
set_config(config)
actions.superuser_run('dynamicdns', ['clean'])
privileged.clean()
def _query_external_address(domain):

View File

@ -0,0 +1,113 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Configuration helper for Dynamic DNS."""
import pathlib
import urllib
from plinth.actions import privileged
_conf_dir = pathlib.Path('/etc/ez-ipupdate/')
_active_config = _conf_dir / 'ez-ipupdate.conf'
_inactive_config = _conf_dir / 'ez-ipupdate.inactive'
_helper_config = _conf_dir / 'ez-ipupdate-plinth.cfg'
_cron_job = pathlib.Path('/etc/cron.d/ez-ipupdate')
def _read_configuration(path, separator='='):
"""Read ez-ipupdate configuration."""
config = {}
for line in path.read_text().splitlines():
if line.startswith('#'):
continue
parts = line.partition(separator)
if parts[1]:
config[parts[0].strip()] = parts[2].strip()
else:
config[parts[0].strip()] = True
return config
@privileged
def export_config() -> dict[str, object]:
"""Return the old ez-ipupdate configuration in JSON format."""
input_config = {}
if _active_config.exists():
input_config = _read_configuration(_active_config)
elif _inactive_config.exists():
input_config = _read_configuration(_inactive_config)
helper = {}
if _helper_config.exists():
helper.update(_read_configuration(_helper_config, separator=' '))
def _clean(value):
value_map = {'enabled': True, 'disabled': False, '': None}
return value_map.get(value, value)
domain = {
'service_type': 'gnudip',
'domain': input_config.get('host'),
'server': input_config.get('server'),
'username': input_config.get('user', '').split(':')[0] or None,
'password': input_config.get('user', '').split(':')[-1] or None,
'ip_lookup_url': helper.get('IPURL'),
'update_url': _clean(helper.get('POSTURL')) or None,
'use_http_basic_auth': _clean(helper.get('POSTAUTH')),
'disable_ssl_cert_check': _clean(helper.get('POSTSSLIGNORE')),
'use_ipv6': _clean(helper.get('POSTUSEIPV6')),
}
if isinstance(domain['update_url'], bool):
# 'POSTURL ' is a line found in the configuration file
domain['update_url'] = None
if not domain['server']:
domain['service_type'] = 'other'
update_url = domain['update_url']
try:
server = urllib.parse.urlparse(update_url).netloc
service_types = {
'dynupdate.noip.com': 'noip.com',
'dynupdate.no-ip.com': 'noip.com',
'freedns.afraid.org': 'freedns.afraid.org'
}
domain['service_type'] = service_types.get(server, 'other')
except ValueError:
pass
# Old logic for 'enabling' the app is as follows: If behind NAT, add
# cronjob. If not behind NAT and type is update URL, add cronjob. If not
# behind NAT and type is GnuDIP, move inactive configuration to active
# configuration and start the ez-ipupdate daemon.
enabled = False
if _cron_job.exists() or (domain['service_type'] == 'gnudip'
and _active_config.exists()):
enabled = True
output_config = {'enabled': enabled, 'domains': {}}
if domain['domain']:
output_config['domains'][domain['domain']] = domain
return output_config
@privileged
def clean():
"""Remove all old configuration files."""
last_update = _conf_dir / 'last-update'
status = _conf_dir / 'ez-ipupdate.status'
current_ip = _conf_dir / 'ez-ipupdate.currentIP'
cleanup_files = [
_active_config, _inactive_config, last_update, _helper_config, status,
current_ip
]
for cleanup_file in cleanup_files:
try:
cleanup_file.rename(cleanup_file.with_suffix('.bak'))
except FileNotFoundError:
pass
_cron_job.unlink(missing_ok=True)