diff --git a/actions/dynamicdns b/actions/dynamicdns index 46118a7b3..0e111b45b 100755 --- a/actions/dynamicdns +++ b/actions/dynamicdns @@ -1,436 +1,156 @@ -#!/bin/bash +#!/usr/bin/python3 # SPDX-License-Identifier: AGPL-3.0-or-later +""" +Configuration helper for Dynamic DNS. +""" -############################################################################ -# # -# This script is a wrapper around ez-ipupdate and/or wget # -# to update a Dynamic DNS account. The script is used as an # -# interface between plinth and ez-ipupdate # -# the script will store configuration, return configuration # -# to plinth UI and do a dynamic DNS update. The script will # -# also determe if we are behind a NAT device, if we can use # -# ez-ipupdate tool or if we need to do some wget magic # -# # -# Todo: IPv6 # -# Todo: GET WAN IP from Router via UPnP if supported # -# Todo: licence string? # -# author: Daniel Steglich # -# # -############################################################################ +import argparse +import json +import pathlib +import urllib -PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin +_conf_dir = pathlib.Path('/etc/ez-ipupdate/') +_active_config = _conf_dir / 'ez-ipupdate.conf' +_inactive_config = _conf_dir / 'ez-ipupdate.inactive' +_helper_config = _conf_dir / 'ez-ipupdate-plinth.cfg' +_cron_job = pathlib.Path('/etc/cron.d/ez-ipupdate') -# static values -WGET=$(which wget) -WGETOPTIONS="-o /dev/null -t 3 -T 3" -EMPTYSTRING="none" -NOIP="0.0.0.0" -# how often do we poll for IP changes -UPDATEMINUTES=5 -# if we do not have a URL to look up public IP, how often should we do -# a "blind" update -UPDATEMINUTESUNKNOWN=3600 -TOOLNAME=ez-ipupdate -GNUDIP_ACTION=/usr/share/plinth/actions/gnudip -DISABLED_STRING='disabled' -ENABLED_STRING='enabled' -# Dirs and filenames -CFGDIR="/etc/${TOOLNAME}/" -CFG="${CFGDIR}${TOOLNAME}.conf" -CFG_disabled="${CFGDIR}${TOOLNAME}.inactive" -IPFILE="${CFGDIR}${TOOLNAME}.currentIP" -STATUSFILE="${CFGDIR}${TOOLNAME}.status" -LASTUPDATE="${CFGDIR}/last-update" -HELPERCFG="${CFGDIR}${TOOLNAME}-plinth.cfg" -CRONJOB="/etc/cron.d/${TOOLNAME}" -PIDFILE="/var/run/ez-ipupdate.pid" +def parse_arguments(): + """ Return parsed command line arguments as dictionary. """ + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') -# this function will parse commandline options -doGetOpt() -{ - basicauth=0 - ignoreCertError=0 - useIPv6=0 + subparsers.add_parser('export-config', + help='Print configuration in JSON format') + subparsers.add_parser('clean', help='Remove all old configuration files') - while getopts ":s:d:u:P:I:U:c:b:6:p" opt; do - case ${opt} in - s) - if [ "${OPTARG}" != "${EMPTYSTRING}" ];then - server=${OPTARG} - else - server="" - fi - ;; - d) - host=${OPTARG} - ;; - u) - user=${OPTARG} - ;; - P) - pass=${OPTARG} - ;; - p) - if read -t 0; then - IFS= read -r pass - fi - ;; - I) - if [ "${OPTARG}" != "${EMPTYSTRING}" ];then - ipurl=${OPTARG} - else - ipurl="" - fi - ;; - U) - if [ "${OPTARG}" != "${EMPTYSTRING}" ];then - updateurl=${OPTARG} - else - updateurl="" - fi - ;; - b) - basicauth=${OPTARG} - ;; - c) - ignoreCertError=${OPTARG} - ;; - 6) - useIPv6=${OPTARG} - ;; - \?) - echo "Invalid option: -${OPTARG}" >&2 - exit 1 - ;; - esac - done -} + subparsers.add_parser('update', help='For backwards compatibility') + subparser = subparsers.add_parser('success', + help='For backwards compatibility') + subparser.add_argument('wan_ip_address') -# this function will write a persistent config file to disk -doWriteCFG() -{ - mkdir ${CFGDIR} 2> /dev/null - # always write to the inactive config - needs to be enabled via "start" command later - local out_file=${CFG_disabled} + subparsers.required = True + return parser.parse_args() - # reset the last update time - echo 0 > ${LASTUPDATE} - # reset the last updated IP - echo "0.0.0.0" > ${IPFILE} +def _read_configuration(path, separator='='): + """Read ez-ipupdate configuration.""" + config = {} + for line in path.read_text().splitlines(): + if line.startswith('#'): + continue - # reset last update (if there is one) - rm ${STATUSFILE} 2> /dev/null + parts = line.partition(separator) + if parts[1]: + config[parts[0].strip()] = parts[2].strip() + else: + config[parts[0].strip()] = True - # find the interface (always the default gateway interface) - default_interface=$(ip route |grep default |awk '{print $5}') + return config - # store the given options in ez-ipupdate compatible config file - { - echo "host=${host}" - echo "server=${server}" - echo "user=${user}:${pass}" - echo "service-type=gnudip" - echo "retrys=3" - echo "wildcard" - } > ${out_file} - - # store UPDATE URL params - { - echo "POSTURL ${updateurl}" - echo "POSTAUTH ${basicauth}" - echo "POSTSSLIGNORE ${ignoreCertError}" - echo "POSTUSEIPV6 ${useIPv6}" - } > ${HELPERCFG} - - # check if we are behind a NAT Router - echo "IPURL ${ipurl}" >> ${HELPERCFG} -} -# this function will read the config file from disk -# special treatment for empty strings is done here: -# plinth will give empty strings like: '' -# but we don't want this single quotes to be used -doReadCFG() -{ - host="" - server="" - user="" - pass="" - ipurl="" +def subcommand_export_config(_): + """Print the old ez-ipupdate configuration in JSON format.""" + input_config = {} + if _active_config.exists(): + input_config = _read_configuration(_active_config) + elif _inactive_config.exists(): + input_config = _read_configuration(_inactive_config) - if [ ! -z "${cfgfile}" ];then - host=$(grep ^host= "${cfgfile}" 2> /dev/null | cut -d = -f 2-) - server=$(grep ^server= "${cfgfile}" 2> /dev/null | cut -d = -f 2- | grep -v ^\'\') - user=$(grep ^user= "${cfgfile}" 2> /dev/null | cut -d = -f 2- | cut -d : -f 1 ) - pass=$(grep ^user= "${cfgfile}" 2> /dev/null | cut -d = -f 2- | cut -d : -f 2-) - fi + helper = {} + if _helper_config.exists(): + helper.update(_read_configuration(_helper_config, separator=' ')) - if [ ! -z ${HELPERCFG} ];then - ipurl=$(grep ^IPURL "${HELPERCFG}" 2> /dev/null |awk '{print $2}' |grep -v ^\'\') - updateurl=$(grep POSTURL "${HELPERCFG}" 2> /dev/null |awk '{print $2}' |grep -v ^\'\') - basicauth=$(grep POSTAUTH "${HELPERCFG}" 2> /dev/null |awk '{print $2}' |grep -v ^\'\') - ignoreCertError=$(grep POSTSSLIGNORE "${HELPERCFG}" 2> /dev/null |awk '{print $2}' |grep -v ^\'\') - useIPv6=$(grep POSTUSEIPV6 "${HELPERCFG}" 2> /dev/null |awk '{print $2}' |grep -v ^\'\') - fi -} + def _clean(value): + value_map = {'enabled': True, 'disabled': False, '': None} + return value_map.get(value, value) -# replace vars from url: i.e.: -# https://example.com/update.php?domain=&User=&Pass= -# also this function will remove the surounding single quotes from the URL string -# as plinth will add them -doReplaceVars() -{ - local url=$(echo "${updateurl}" | sed "s//${wanip}/g") - url=$(echo "${url}" | sed "s//${host}/g") - url=$(echo "${url}" | sed "s//${user}/g") - url=$(echo "${url}" | sed "s//${pass}/g") - url=$(echo "${url}" | sed "s/'//g") - updateurl=${url} - logger "expanded update URL as ${url}" -} + domain = { + 'service_type': 'gnudip', + 'domain': input_config.get('host'), + 'server': input_config.get('server'), + 'username': input_config.get('user', '').split(':')[0] or None, + 'password': input_config.get('user', '').split(':')[-1] or None, + 'ip_lookup_url': helper.get('IPURL'), + 'update_url': _clean(helper.get('POSTURL')) or None, + 'use_http_basic_auth': _clean(helper.get('POSTAUTH')), + 'disable_ssl_cert_check': _clean(helper.get('POSTSSLIGNORE')), + 'use_ipv6': _clean(helper.get('POSTUSEIPV6')), + } -# doReadCFG() needs to be run before this -# this function will return all configured parameters in a way that -# plinth will understand (plinth know the order of -# parameters this function will return) -doStatus() -{ - PROC=$(pgrep ${TOOLNAME}) - if [ -f "${CRONJOB}" ];then - echo "${ENABLED_STRING}" - elif [ ! -z "${PROC}" ];then - echo "${ENABLED_STRING}" - else - echo "${DISABLED_STRING}" - fi - if [ ! -z "${server}" ];then - echo "${server}" - else - echo "${DISABLED_STRING}" - fi - if [ ! -z "${host}" ];then - echo "${host}" - else - echo "${DISABLED_STRING}" - fi - if [ ! -z "${user}" ];then - echo "${user}" - else - echo "${DISABLED_STRING}" - fi - if [ ! -z "${pass}" ];then - echo "${pass}" - else - echo "${DISABLED_STRING}" - fi - if [ ! -z "${ipurl}" ];then - echo "${ipurl}" - else - echo "${DISABLED_STRING}" - fi - if [ ! -z "${updateurl}" ];then - echo "${updateurl}" - else - echo "${DISABLED_STRING}" - fi - if [ ! -z "${ignoreCertError}" ];then - echo "${ignoreCertError}" - else - echo "${DISABLED_STRING}" - fi - if [ ! -z "${basicauth}" ];then - echo "${basicauth}" - else - echo "${DISABLED_STRING}" - fi - if [ ! -z "${useIPv6}" ];then - echo "${useIPv6}" - else - echo "${DISABLED_STRING}" - fi -} + if isinstance(domain['update_url'], bool): + # 'POSTURL ' is a line found in the configuration file + domain['update_url'] = None -# ask a public WEB Server for the WAN IP we are comming from -# and store this ip within $wanip -doGetWANIP() -{ - if [ ! -z "${ipurl}" ];then - local wgetoptions="${WGETOPTIONS}" - if [ "${useIPv6}" = "enabled" ];then - wgetoptions="${wgetoptions} -6" - else - wgetoptions="${wgetoptions} -4" - fi - local cmd="${WGET} ${wgetoptions} -O - ${ipurl}" - if [ "${useIPv6}" = "enabled" ];then - wanip=$($cmd | tr A-F a-f | sed s/[^0-9a-f:]//g) - else - wanip=$($cmd | sed s/[^0-9.]//g) - fi - [ -z "${wanip}" ] && wanip=${NOIP} - else - # no WAN IP found because of missing check URL - wanip=${NOIP} - fi -} + if not domain['server']: + domain['service_type'] = 'other' + update_url = domain['update_url'] + try: + server = urllib.parse.urlparse(update_url).netloc + service_types = { + 'dynupdate.noip.com': 'noip.com', + 'dynupdate.no-ip.com': 'noip.com', + 'freedns.afraid.org': 'freedns.afraid.org' + } + domain['service_type'] = service_types.get(server, 'other') + except ValueError: + pass -# actualy do the update (using wget or gnudip action or even both) -# this function is called via cronjob -doUpdate() -{ - if [ "${useIPv6}" = "enabled" ];then - local dnsentry="$(host -t AAAA "${host}" | sed 's/.*address\s*//')" - else - local dnsentry="$(host -t A "${host}" | sed 's/.*address\s*//')" - fi - if [ "${dnsentry}" = "${wanip}" ];then - return - fi - if [ ! -z "${server}" ];then - if "${GNUDIP_ACTION}" update; then - ${0} success ${wanip} - else - ${0} failed - fi - fi - if [ ! -z "${updateurl}" ];then - doReplaceVars - if [ "${basicauth}" = "enabled" ];then - WGETOPTIONS="${WGETOPTIONS} --user ${user} --password ${pass} " - fi - if [ "${ignoreCertError}" = "enabled" ];then - WGETOPTIONS="${WGETOPTIONS} --no-check-certificate " - fi - if [ "${useIPv6}" = "enabled" ];then - WGETOPTIONS="${WGETOPTIONS} -6" - else - WGETOPTIONS="${WGETOPTIONS} -4" - fi - local cmd="${WGET} -O /dev/null ${WGETOPTIONS} ${updateurl}" - $cmd - # ToDo: check the returning text from WEB Server. User need to give expected string. - if [ ${?} -eq 0 ];then - ${0} success ${wanip} - else - ${0} failed - fi - fi -} + # Old logic for 'enabling' the app is as follows: If behind NAT, add + # cronjob. If not behind NAT and type is update URL, add cronjob. If not + # behind NAT and type is GnuDIP, move inactive configuration to active + # configuration and start the ez-ipupdate daemon. + enabled = False + if _cron_job.exists() or (domain['service_type'] == 'gnudip' + and _active_config.exists()): + enabled = True -umask u=rw,g=,o= -cmd=${1} -shift -# decide which config to use -cfgfile="/tmp/none" -[ -f ${CFG_disabled} ] && cfgfile=${CFG_disabled} -[ -f ${CFG} ] && cfgfile=${CFG} + output_config = {'enabled': enabled, 'domains': {domain['domain']: domain}} + print(json.dumps(output_config)) -# check what action is requested -case ${cmd} in - configure) - doGetOpt "${@}" - doWriteCFG - ;; - start) - doGetWANIP - # if we use gnudip, enable gnudip. - gnudipServer=$(grep ^server= ${cfgfile} 2> /dev/null | cut -d = -f 2- |grep -v ^\'\') - if [ ! -f ${CFG} -a ! -z "${gnudipServer}" ];then - mv ${CFG_disabled} ${CFG} - fi - # add a cronjob - echo "*/${UPDATEMINUTES} * * * * root ${0} update" > $CRONJOB - $0 update - ;; - update) - doReadCFG - dnsentry=$(nslookup "${host}"|tail -n2|grep A|sed s/[^0-9.]//g) - doGetWANIP - echo ${wanip} > ${IPFILE} - grep -v execute ${cfgfile} > ${cfgfile}.tmp - mv ${cfgfile}.tmp ${cfgfile} - echo "execute=${0} success ${wanip}" >> ${cfgfile} - # if we know our WAN IP, only update if IP changes - if [ "${dnsentry}" != "${wanip}" -a "${wanip}" != ${NOIP} ];then - doUpdate - else - # If nothing has changed, nothing needs to be done but we - # need to write the success status (if no success was - # recorded yet because maybe DNS record was up to date - # when script is executed for the first time) - ${0} success ${wanip} - fi - # if we don't know our WAN IP do a blind update once a hour - if [ "${wanip}" = ${NOIP} ];then - currenttime=$(date +%s) - LAST=0 - [ -f ${LASTUPDATE} ] && LAST=$(cat ${LASTUPDATE}) - diff=$((currenttime - LAST)) - if [ ${diff} -gt ${UPDATEMINUTESUNKNOWN} ];then - doUpdate - fi - fi - ;; - stop) - rm ${CRONJOB} 2> /dev/null - /etc/init.d/${TOOLNAME} stop - kill "$(cat ${PIDFILE})" 2> /dev/null - mv ${CFG} ${CFG_disabled} - ;; - success) - date=$(date) - echo "DNS record is up to date (${date})" > ${STATUSFILE} - date +%s > ${LASTUPDATE} - # if called from cronjob, the current IP is given as parameter - if [ $# -eq 1 ];then - echo "${1}" > ${IPFILE} - else - # if called from ez-ipupdate daemon, no WAN IP is given as parameter - doGetWANIP - echo ${wanip} > ${IPFILE} - fi - ;; - failed) - date=$(date) - echo "last update failed (${date})" > ${STATUSFILE} - ;; - get-last-success) - if [ -f ${STATUSFILE} ];then - cat ${STATUSFILE} - else - echo "no successful update recorded since last config change" - fi - ;; - status) - doReadCFG - doStatus - ;; - clean) - rm ${CFGDIR}/* - rm ${CRONJOB} - ;; - *) - echo "usage: status|configure |start|stop|update|clean|success [updated IP]|failed|get-last-success" - echo "" - echo "options are:" - echo "-s Gnudip Server address" - echo "-d Domain to be updated" - echo "-u Account username" - echo "-P Account password" - echo "-p Read Account Password from stdin" - echo "-I A URL which returns the IP of the client who is requesting" - echo "-U The update URL (a HTTP GET on this URL will be done)" - echo "-c <1|0> disable SSL check on Update URL" - echo "-b <1|0> use HTTP basic auth on Update URL" - echo "-6 use IPv6 type address" - echo "" - echo "update do a one time update" - echo "clean delete configuration" - echo "success store update success and optional the updated IP" - echo "failed store update failure" - echo "get-last-success return date of last successful update" - ;; -esac -exit 0 + +def subcommand_clean(_): + """Remove all old configuration files.""" + last_update = _conf_dir / 'last-update' + status = _conf_dir / 'ez-ipupdate.status' + current_ip = _conf_dir / 'ez-ipupdate.currentIP' + + cleanup_files = [ + _active_config, _inactive_config, last_update, _helper_config, status, + current_ip + ] + for cleanup_file in cleanup_files: + try: + cleanup_file.rename(cleanup_file.with_suffix('.bak')) + except FileNotFoundError: + pass + + _cron_job.unlink(missing_ok=True) + + +def subcommand_update(_): + """Empty subcommand kept only for backwards compatibility. + + Drop after stable release. + """ + + +def subcommand_success(_): + """Empty subcommand kept only for backwards compatibility. + + Drop after stable release. + """ + + +def main(): + """Parse arguments and perform all duties.""" + arguments = parse_arguments() + + subcommand = arguments.subcommand.replace('-', '_') + subcommand_method = globals()['subcommand_' + subcommand] + subcommand_method(arguments) + + +if __name__ == '__main__': + main() diff --git a/actions/gnudip b/actions/gnudip deleted file mode 100755 index fe112d429..000000000 --- a/actions/gnudip +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/python3 -# -*- mode: python -*- -# SPDX-License-Identifier: AGPL-3.0-or-later -""" -GnuDIP client for updating Dynamic DNS records. -""" - -import argparse -import logging -import pathlib -import sys - -from plinth.modules.dynamicdns import gnudip - -logger = logging.getLogger(__name__) - -CONFIG_FILE = pathlib.Path('/etc/ez-ipupdate/ez-ipupdate.conf') - - -def parse_arguments(): - """Return parsed command line arguments as dictionary.""" - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') - - subparsers.add_parser('update', help='Update Dynamic DNS record') - - subparsers.required = True - return parser.parse_args() - - -def subcommand_update(_): - """Update Dynamic DNS record. - - Uses settings from ez-ipupdate config file. - """ - if not CONFIG_FILE.is_file(): - logger.info('GnuDIP configuration not found.') - return - - config = {} - with CONFIG_FILE.open() as cf: - lines = cf.readlines() - for line in lines: - if '=' in line: - items = line.split('=') - key = items[0].strip() - value = items[1].strip() - else: - key = line.strip() - value = True - - config[key] = value - - if not all(['host' in config, 'server' in config, 'user' in config]): - logger.warn('GnuDIP configuration is not complete.') - return - - username, password = config['user'].split(':') - result, new_ip = gnudip.update(config['server'], config['host'], username, - password) - if result == 0 and new_ip: - print(new_ip) - - return result - - -def main(): - """Parse arguments and perform all duties.""" - arguments = parse_arguments() - - subcommand = arguments.subcommand.replace('-', '_') - subcommand_method = globals()['subcommand_' + subcommand] - sys.exit(subcommand_method(arguments)) - - -if __name__ == '__main__': - main() diff --git a/plinth/app.py b/plinth/app.py index e18a824be..df90dc2aa 100644 --- a/plinth/app.py +++ b/plinth/app.py @@ -485,7 +485,7 @@ class EnableState(LeaderComponent): kvstore.set(self.key, True) def disable(self): - """Store that the app/component is enabled.""" + """Store that the app/component is disabled.""" from plinth import kvstore kvstore.set(self.key, False) diff --git a/plinth/modules/dynamicdns/__init__.py b/plinth/modules/dynamicdns/__init__.py index 11f03f8ce..9fc18467c 100644 --- a/plinth/modules/dynamicdns/__init__.py +++ b/plinth/modules/dynamicdns/__init__.py @@ -3,18 +3,26 @@ FreedomBox app to configure ez-ipupdate client. """ +import json +import logging +import subprocess +import time +import urllib + from django.utils.translation import gettext_lazy as _ from plinth import actions from plinth import app as app_module -from plinth import cfg, menu +from plinth import cfg, glib, kvstore, menu from plinth.modules.backups.components import BackupRestore from plinth.modules.names.components import DomainType from plinth.modules.users.components import UsersAndGroups -from plinth.signals import domain_added +from plinth.signals import domain_added, domain_removed from plinth.utils import format_lazy -from . import manifest +from . import gnudip, manifest + +logger = logging.getLogger(__name__) _description = [ format_lazy( @@ -45,7 +53,7 @@ class DynamicDNSApp(app_module.App): app_id = 'dynamicdns' - _version = 1 + _version = 2 def __init__(self): """Create components for the app.""" @@ -62,6 +70,9 @@ class DynamicDNSApp(app_module.App): 'dynamicdns:index', parent_url_name='system') self.add(menu_item) + enable_state = app_module.EnableState('enable-state-dynamicdns') + self.add(enable_state) + domain_type = DomainType('domain-type-dynamic', _('Dynamic Domain Name'), 'dynamicdns:index', can_have_certificate=True) @@ -75,106 +86,195 @@ class DynamicDNSApp(app_module.App): **manifest.backup) self.add(backup_restore) - @staticmethod - def post_init(): + def post_init(self): """Perform post initialization operations.""" - current_status = get_status() - if current_status['enabled']: - domain_added.send_robust(sender='dynamicdns', - domain_type='domain-type-dynamic', - name=current_status['dynamicdns_domain'], - services='__all__') + config = get_config() + if self.is_enabled(): + for domain_name in config['domains']: + notify_domain_added(domain_name) - def is_enabled(self): - """Return whether all the leader components are enabled. - - Return True when there are no leader components and DynamicDNS setup - is done. - """ - return super().is_enabled() and get_status()['enabled'] + # Check every 5 minutes (every 3 minutes in debug mode) to perform + # dynamic DNS updates. + interval = 180 if cfg.develop else 300 + glib.schedule(interval, update_dns) def setup(helper, old_version=None): """Install and configure the module.""" app.setup(old_version) + if not old_version: + helper.call('post', app.enable) + + if old_version == 1: + config = actions.superuser_run('dynamicdns', ['export-config']) + config = json.loads(config) + if config['enabled']: + app.enable() + else: + app.disable() + + del config['enabled'] + set_config(config) + actions.superuser_run('dynamicdns', ['clean']) + + +def _query_external_address(domain): + """Return the IP address by querying an external server.""" + if not domain['ip_lookup_url']: + return None + + ip_option = '-6' if domain['use_ipv6'] else '-4' + try: + ip_address = subprocess.check_output([ + 'wget', ip_option, '-o', '/dev/null', '-t', '3', '-T', '3', '-O', + '-', domain['ip_lookup_url'] + ]) + return ip_address.decode().strip().lower() + except subprocess.CalledProcessError as exception: + logger.warning('Unable to lookup external IP with URL %s: %s', + domain['ip_lookup_url'], exception) + return None + + +def _query_dns_address(domain): + """Return the IP address in the DNS records.""" + ip_option = 'AAAA' if domain['use_ipv6'] else 'A' + try: + output = subprocess.check_output( + ['host', '-t', ip_option, domain['domain']]) + return output.decode().split(' ')[-1].strip().lower() + except subprocess.CalledProcessError as exception: + logger.warning('Unable to lookup DNS for host %s: %s', + domain['domain'], exception) + return None + + +def _update_using_url(domain, external_address): + """Update DNS entry using an update URL.""" + update_url = domain['update_url'] + quote = urllib.parse.quote + if external_address: + update_url = update_url.replace('', quote(external_address)) + + if domain['domain']: + update_url = update_url.replace('', quote(domain['domain'])) + + if domain['username']: + update_url = update_url.replace('', quote(domain['username'])) + + if domain['password']: + update_url = update_url.replace('', quote(domain['password'])) + + options = ['-o', '/dev/null', '-t', '3', '-T', '3'] + if domain['use_http_basic_auth']: + options += [ + '--user', domain['username'], '--password', domain['password'] + ] + + if domain['disable_ssl_cert_check']: + options += ['--no-check-certificate'] + + if domain['use_ipv6']: + options += ['-6'] + else: + options += ['-4'] + + command = ['wget', '-O', '/dev/null'] + options + [update_url] + process = subprocess.run(command, check=False) + return process.returncode == 0, external_address + + +def _update_dns_for_domain(domain): + """Update DNS records for a single domain.""" + result = False + ip_address = None + error = None + + try: + dns_address = _query_dns_address(domain) + external_address = _query_external_address(domain) + if dns_address == external_address and dns_address is not None: + logger.info('Dynamic domain %s is up-to-date: %s', + domain['domain'], dns_address) + result = True + ip_address = dns_address + error = ValueError('up-to-date') + else: + logger.info( + 'Updating dynamic domain %s, DNS address %s, looked up ' + 'external address %s', domain['domain'], dns_address, + external_address) + if domain['service_type'] == 'gnudip': + result, ip_address = gnudip.update(domain['server'], + domain['domain'], + domain['username'], + domain['password']) + else: + result, ip_address = _update_using_url(domain, + external_address) + except Exception as exception: + logger.exception('Failed to be update Dynamic DNS - %s', exception) + error = exception + + set_status(domain, result, ip_address, error) + + +def update_dns(_data): + """For all configured domains, check and up to date DNS records.""" + config = get_config() + if not app.is_enabled(): + return + + # Update for each domain + for domain in config['domains'].values(): + _update_dns_for_domain(domain) def get_status(): - """Return the current status.""" - # TODO: use key/value instead of hard coded value list - status = {} - output = actions.superuser_run('dynamicdns', ['status']) - details = output.split() - status['enabled'] = (output.split()[0] == 'enabled') - - if len(details) > 1: - if details[1] == 'disabled': - status['dynamicdns_server'] = '' - else: - status['dynamicdns_server'] = details[1].replace("'", "") - else: - status['dynamicdns_server'] = '' - - if len(details) > 2: - if details[2] == 'disabled': - status['dynamicdns_domain'] = '' - else: - status['dynamicdns_domain'] = details[2].replace("'", "") - else: - status['dynamicdns_domain'] = '' - - if len(details) > 3: - if details[3] == 'disabled': - status['dynamicdns_user'] = '' - else: - status['dynamicdns_user'] = details[3].replace("'", "") - else: - status['dynamicdns_user'] = '' - - if len(details) > 4: - if details[4] == 'disabled': - status['dynamicdns_secret'] = '' - else: - status['dynamicdns_secret'] = details[4].replace("'", "") - else: - status['dynamicdns_secret'] = '' - - if len(details) > 5: - if details[5] == 'disabled': - status['dynamicdns_ipurl'] = '' - else: - status['dynamicdns_ipurl'] = details[5].replace("'", "") - else: - status['dynamicdns_ipurl'] = '' - - if len(details) > 6: - if details[6] == 'disabled': - status['dynamicdns_update_url'] = '' - else: - status['dynamicdns_update_url'] = details[6].replace("'", "") - else: - status['dynamicdns_update_url'] = '' - - if len(details) > 7: - status['disable_SSL_cert_check'] = (output.split()[7] == 'enabled') - else: - status['disable_SSL_cert_check'] = False - - if len(details) > 8: - status['use_http_basic_auth'] = (output.split()[8] == 'enabled') - else: - status['use_http_basic_auth'] = False - - if len(details) > 9: - status['use_ipv6'] = (output.split()[9] == 'enabled') - else: - status['use_ipv6'] = False - - if not status['dynamicdns_server'] and not status['dynamicdns_update_url']: - status['service_type'] = 'GnuDIP' - elif not status['dynamicdns_server'] and status['dynamicdns_update_url']: - status['service_type'] = 'other' - else: - status['service_type'] = 'GnuDIP' - + """Return the status of recent update for each domain.""" + status = kvstore.get_default('dynamicdns_status', '{}') + status = json.loads(status) + status.setdefault('domains', {}) return status + + +def set_status(domain, result, ip_address, error=None): + """Set the status of most recent update.""" + status = kvstore.get_default('dynamicdns_status', '{}') + status = json.loads(status) + domains = status.setdefault('domains', {}) + domains[domain['domain']] = { + 'domain': domain['domain'], + 'result': result, + 'ip_address': ip_address, + 'error_code': error.__class__.__name__ if error else None, + 'error_message': error.args[0] if error and error.args else None, + 'timestamp': int(time.time()), + } + kvstore.set('dynamicdns_status', json.dumps(status)) + + +def get_config(): + """Return the current configuration.""" + default_config = {'domains': {}} + config = kvstore.get_default('dynamicdns_config', '{}') + return json.loads(config) or default_config + + +def set_config(config): + """Set a new configuration.""" + kvstore.set('dynamicdns_config', json.dumps(config)) + + +def notify_domain_added(domain_name): + """Send a signal that domain has been added.""" + domain_added.send_robust(sender='dynamicdns', + domain_type='domain-type-dynamic', + name=domain_name, services='__all__') + + +def notify_domain_removed(domain_name): + """Send a signal that domain has been removed.""" + domain_removed.send_robust(sender='dynamicdns', + domain_type='domain-type-dynamic', + name=domain_name) diff --git a/plinth/modules/dynamicdns/forms.py b/plinth/modules/dynamicdns/forms.py index 60454a9df..51693af62 100644 --- a/plinth/modules/dynamicdns/forms.py +++ b/plinth/modules/dynamicdns/forms.py @@ -12,24 +12,13 @@ from plinth import cfg from plinth.utils import format_lazy -class TrimmedCharField(forms.CharField): - """Trim the contents of a CharField.""" - - def clean(self, value): - """Clean and validate the field value""" - if value: - value = value.strip() - - return super(TrimmedCharField, self).clean(value) - - class ConfigureForm(forms.Form): """Form to configure the Dynamic DNS client.""" help_update_url = \ gettext_lazy('The Variables <User>, <Pass>, <Ip>, ' '<Domain> may be used within the URL. For details ' 'see the update URL templates of the example providers.') - help_services = \ + help_service_type = \ gettext_lazy('Please choose an update protocol according to your ' 'provider. If your provider does not support the GnuDIP ' 'protocol or your provider is not listed you may use ' @@ -41,16 +30,16 @@ class ConfigureForm(forms.Form): help_domain = format_lazy( gettext_lazy('The public domain name you want to use to reach your ' '{box_name}.'), box_name=gettext_lazy(cfg.box_name)) - help_disable_ssl = \ + help_disable_ssl_cert_check = \ gettext_lazy('Use this option if your provider uses self signed ' 'certificates.') - help_http_auth = \ + help_use_http_basic_auth = \ gettext_lazy('If this option is selected, your username and password ' 'will be used for HTTP basic authentication.') - help_secret = \ + help_password = \ gettext_lazy('Leave this field empty if you want to keep your ' 'current password.') - help_ip_url = format_lazy( + help_ip_lookup_url = format_lazy( gettext_lazy('Optional Value. If your {box_name} is not connected ' 'directly to the Internet (i.e. connected to a NAT ' 'router) this URL is used to determine the real ' @@ -58,98 +47,99 @@ class ConfigureForm(forms.Form): 'the client comes from (example: ' 'https://ddns.freedombox.org/ip/).'), box_name=gettext_lazy(cfg.box_name)) - help_user = \ + help_username = \ gettext_lazy('The username that was used when the account was ' 'created.') - provider_choices = (('GnuDIP', gettext_lazy('GnuDIP')), - ('noip', 'noip.com'), ('selfhost', 'selfhost.bz'), - ('freedns', 'freedns.afraid.org'), - ('other', gettext_lazy('other update URL'))) - - enabled = forms.BooleanField(label=gettext_lazy('Enable Dynamic DNS'), - required=False) + provider_choices = (('gnudip', gettext_lazy('GnuDIP')), + ('noip.com', 'noip.com'), ('freedns.afraid.org', + 'freedns.afraid.org'), + ('other', gettext_lazy('Other update URL'))) service_type = forms.ChoiceField(label=gettext_lazy('Service Type'), - help_text=help_services, + help_text=help_service_type, choices=provider_choices) - dynamicdns_server = TrimmedCharField( + server = forms.CharField( label=gettext_lazy('GnuDIP Server Address'), required=False, help_text=help_server, validators=[ validators.RegexValidator(r'^[\w-]{1,63}(\.[\w-]{1,63})*$', gettext_lazy('Invalid server name')) ]) - dynamicdns_update_url = TrimmedCharField(label=gettext_lazy('Update URL'), - required=False, - help_text=help_update_url) + update_url = forms.CharField(label=gettext_lazy('Update URL'), + required=False, help_text=help_update_url) - disable_SSL_cert_check = forms.BooleanField( + disable_ssl_cert_check = forms.BooleanField( label=gettext_lazy('Accept all SSL certificates'), - help_text=help_disable_ssl, required=False) + help_text=help_disable_ssl_cert_check, required=False) use_http_basic_auth = forms.BooleanField( label=gettext_lazy('Use HTTP basic authentication'), - help_text=help_http_auth, required=False) + help_text=help_use_http_basic_auth, required=False) - dynamicdns_domain = TrimmedCharField( + domain = forms.CharField( label=gettext_lazy('Domain Name'), help_text=help_domain, - required=False, validators=[ + required=True, validators=[ validators.RegexValidator(r'^[\w-]{1,63}(\.[\w-]{1,63})*$', gettext_lazy('Invalid domain name')) ]) - dynamicdns_user = TrimmedCharField(label=gettext_lazy('Username'), - required=False, help_text=help_user) + username = forms.CharField(label=gettext_lazy('Username'), required=False, + help_text=help_username) - dynamicdns_secret = TrimmedCharField(label=gettext_lazy('Password'), - widget=forms.PasswordInput(), - required=False, help_text=help_secret) + password = forms.CharField(label=gettext_lazy('Password'), + widget=forms.PasswordInput(), required=False, + help_text=help_password) - showpw = forms.BooleanField(label=gettext_lazy('Show password'), - required=False) + show_password = forms.BooleanField(label=gettext_lazy('Show password'), + required=False) - dynamicdns_ipurl = TrimmedCharField( + ip_lookup_url = forms.CharField( label=gettext_lazy('URL to look up public IP'), required=False, - help_text=help_ip_url, - validators=[validators.URLValidator(schemes=['http', 'https', 'ftp'])]) + help_text=help_ip_lookup_url, + validators=[validators.URLValidator(schemes=['http', 'https'])]) use_ipv6 = forms.BooleanField( label=gettext_lazy('Use IPv6 instead of IPv4'), required=False) def clean(self): - cleaned_data = super(ConfigureForm, self).clean() - dynamicdns_secret = cleaned_data.get('dynamicdns_secret') - dynamicdns_update_url = cleaned_data.get('dynamicdns_update_url') - dynamicdns_user = cleaned_data.get('dynamicdns_user') - dynamicdns_domain = cleaned_data.get('dynamicdns_domain') - dynamicdns_server = cleaned_data.get('dynamicdns_server') + """Further validate and transform field data.""" + cleaned_data = super().clean() + + # Domain name is not case sensitive, but Let's Encrypt + # certificate paths use lower-case domain name. + cleaned_data['domain'] = cleaned_data['domain'].lower() + + update_url = cleaned_data.get('update_url') + password = cleaned_data.get('password') service_type = cleaned_data.get('service_type') - old_dynamicdns_secret = self.initial['dynamicdns_secret'] + old_password = self.initial.get('password') - # Clear the fields which are not in use - if service_type == 'GnuDIP': - dynamicdns_update_url = '' + if not password: + # If password is not set, use old password + cleaned_data['password'] = old_password + + message = _('This field is required.') + if service_type == 'gnudip': + for field_name in ['server', 'username', 'password']: + if not cleaned_data.get(field_name): + self.add_error(field_name, message) else: - dynamicdns_server = '' + if not update_url: + self.add_error('update_url', message) - if cleaned_data.get('enabled'): - # Check if gnudip server or update URL is filled - if not dynamicdns_update_url and not dynamicdns_server: - raise forms.ValidationError( - _('Please provide an update URL or a GnuDIP server ' - 'address')) + param_map = (('username', ''), ('password', ''), + ('ip_lookup_url', '')) + for field_name, param in param_map: + if (update_url and param in update_url + and not cleaned_data.get(field_name)): + self.add_error(field_name, message) - if dynamicdns_server and not dynamicdns_user: - raise forms.ValidationError( - _('Please provide a GnuDIP username')) + if cleaned_data.get('use_http_basic_auth'): + for field_name in ('username', 'password'): + if not cleaned_data.get(field_name): + self.add_error(field_name, message) - if dynamicdns_server and not dynamicdns_domain: - raise forms.ValidationError( - _('Please provide a GnuDIP domain name')) - - # Check if a password was set before or a password is set now - if dynamicdns_server and \ - not dynamicdns_secret and not old_dynamicdns_secret: - raise forms.ValidationError(_('Please provide a password')) + del cleaned_data['show_password'] + return cleaned_data diff --git a/plinth/modules/dynamicdns/gnudip.py b/plinth/modules/dynamicdns/gnudip.py index 58ad15045..7b96eef2e 100644 --- a/plinth/modules/dynamicdns/gnudip.py +++ b/plinth/modules/dynamicdns/gnudip.py @@ -5,10 +5,11 @@ GnuDIP client for updating Dynamic DNS records. import hashlib import logging -import socket +import socket as socket_module -BUF_SIZE = 50 +BUF_SIZE = 128 GNUDIP_PORT = 3495 +TIMEOUT = 10 logger = logging.getLogger(__name__) @@ -18,22 +19,22 @@ def update(server, domain, username, password): domain = domain.removeprefix(username + '.') password_digest = hashlib.md5(password.encode()).hexdigest() - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - logger.debug('Connecting to %s:%d', server, GNUDIP_PORT) - s.connect((server, GNUDIP_PORT)) - salt = s.recv(BUF_SIZE).decode().strip() + with socket_module.socket(socket_module.AF_INET, + socket_module.SOCK_STREAM) as socket: + logger.debug('Connecting to %s:%d, timeout %ss', server, GNUDIP_PORT, + TIMEOUT) + socket.settimeout(TIMEOUT) + socket.connect((server, GNUDIP_PORT)) + salt = socket.recv(BUF_SIZE).decode().strip() salted_digest = password_digest + '.' + salt final_digest = hashlib.md5(salted_digest.encode()).hexdigest() update_request = username + ':' + final_digest + ':' + domain + ':2\n' - s.sendall(update_request.encode()) + socket.sendall(update_request.encode()) - response = s.recv(BUF_SIZE).decode().strip() - result, new_ip = response.split(':') - result = int(result) - if result == 0: - logger.info('GnuDIP update success: %s', new_ip) - else: - logger.warn('GnuDIP update error: %s', response) + response = socket.recv(BUF_SIZE).decode().strip() + result, _, new_ip = response.partition(':') + result = (int(result) == 0) + new_ip = new_ip if result else None return result, new_ip diff --git a/plinth/modules/dynamicdns/manifest.py b/plinth/modules/dynamicdns/manifest.py index 15151580d..8be41ecda 100644 --- a/plinth/modules/dynamicdns/manifest.py +++ b/plinth/modules/dynamicdns/manifest.py @@ -1,3 +1,10 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -backup = {'config': {'directories': ['/etc/ez-ipupdate/']}} +backup = { + 'config': { + 'directories': ['/etc/ez-ipupdate/'] + }, + 'settings': [ + 'dynamicdns_enable', 'dynamicdns_config', 'dynamicdns_status' + ], +} diff --git a/plinth/modules/dynamicdns/static/dynamicdns.js b/plinth/modules/dynamicdns/static/dynamicdns.js index b37d1b100..7ee0bcbb0 100644 --- a/plinth/modules/dynamicdns/static/dynamicdns.js +++ b/plinth/modules/dynamicdns/static/dynamicdns.js @@ -23,123 +23,57 @@ */ (function($) { - var SELFHOST = 'https://carol.selfhost.de/update?username=&' + - 'password=&myip='; - var NOIP = 'http://dynupdate.no-ip.com/nic/update?hostname=' + - '&myip='; + var NOIP = 'https://:@dynupdate.no-ip.com/nic/update?' + + 'hostname='; var FREEDNS = 'https://freedns.afraid.org/dynamic/update.php?' + '_YOURAPIKEYHERE_'; - // Hide all form fields - $('.form-group').hide(); - // Show the enable checkbox - $('#id_enabled').closest('.form-group').show(); - if ($('#id_enabled').prop('checked')) { - // Show all form fields - show_all(); - // Set the selectbox to the last configured value - select_service(); - } - - $('#id_enabled').change(function() { - if ($('#id_enabled').prop('checked')) { - show_all(); - if ($("#id_service_type option:selected").text() == "GnuDIP") { - set_gnudip_mode(); - } else { - set_update_url_mode(); - } - } else { - $('.form-group').hide(); - $('#id_enabled').closest('.form-group').show(); - } - }); - $('#id_service_type').change(function() { - var service_type = $("#id_service_type option:selected").text(); - if (service_type == "GnuDIP") { - set_gnudip_mode(); - } else { - set_update_url_mode(); - if (service_type == "noip.com") { - $('#id_dynamicdns_update_url').val(NOIP); - $('#id_use_http_basic_auth').prop('checked', true); - } else { - $('#id_use_http_basic_auth').prop('checked', false); - } - if (service_type == "selfhost.bz") { - $('#id_dynamicdns_update_url').val(SELFHOST); - } - if (service_type == "freedns.afraid.org") { - $('#id_dynamicdns_update_url').val(FREEDNS); - } - if (service_type == "other update URL") { - $('#id_dynamicdns_update_url').val(''); - } + set_mode(); + + var service_type = $("#id_service_type").val(); + if (service_type == "noip.com") { + $('#id_update_url').val(NOIP); + } else if (service_type == "freedns.afraid.org") { + $('#id_update_url').val(FREEDNS); + } else { // GnuDIP and other + $('#id_update_url').val(''); } }); - $('#id_showpw').change(function() { - // Changing type attribute from password to text is prevented by most - // browsers make a new form field works for me - if ($('#id_showpw').prop('checked')) { - $('#id_dynamicdns_secret').replaceWith( - $('#id_dynamicdns_secret').clone().attr( - 'type', 'text')); + $('#id_show_password').change(function() { + if ($('#id_show_password').prop('checked')) { + $('#id_password').prop('type', 'text'); } else { - $('#id_dynamicdns_secret').replaceWith( - $('#id_dynamicdns_secret').clone().attr( - 'type', 'password')); + $('#id_password').prop('type', 'password'); } }); - function select_service() { - var update_url = $("#id_dynamicdns_update_url").val(); - if ($("#id_dynamicdns_server").val().length == 0) { - set_update_url_mode(); - if (update_url == NOIP) { - $("#id_service_type").val("noip"); - } else if (update_url == SELFHOST) { - $("#id_service_type").val("selfhost"); - } else if (update_url == FREEDNS) { - $("#id_service_type").val("freedns"); - } else { - $("#id_service_type").val("other"); - } - } else { - $("#id_service_type").val("GnuDIP"); + function set_mode() { + var service_type = $("#id_service_type").val(); + if (service_type == "gnudip") { set_gnudip_mode(); + } else { + set_update_url_mode(); } } function set_gnudip_mode() { - $('#id_dynamicdns_update_url').closest('.form-group').hide(); - $('#id_disable_SSL_cert_check').closest('.form-group').hide(); + $('.form-group').show(); + $('#id_update_url').closest('.form-group').hide(); + $('#id_disable_ssl_cert_check').closest('.form-group').hide(); $('#id_use_http_basic_auth').closest('.form-group').hide(); $('#id_use_ipv6').closest('.form-group').hide(); - $('#id_dynamicdns_server').closest('.form-group').show(); + $('#id_server').closest('.form-group').show(); } function set_update_url_mode() { - $('#id_dynamicdns_update_url').closest('.form-group').show(); - $('#id_disable_SSL_cert_check').closest('.form-group').show(); + $('#id_update_url').closest('.form-group').show(); + $('#id_disable_ssl_cert_check').closest('.form-group').show(); $('#id_use_http_basic_auth').closest('.form-group').show(); $('#id_use_ipv6').closest('.form-group').show(); - $('#id_dynamicdns_server').closest('.form-group').hide(); + $('#id_server').closest('.form-group').hide(); } - function show_all() { - $('#id_enabled').closest('.form-group').show(); - $('#id_service_type').closest('.form-group').show(); - $('#id_dynamicdns_server').closest('.form-group').show(); - $('#id_dynamicdns_update_url').closest('.form-group').show(); - $('#id_disable_SSL_cert_check').closest('.form-group').show(); - $('#id_use_http_basic_auth').closest('.form-group').show(); - $('#id_dynamicdns_domain').closest('.form-group').show(); - $('#id_dynamicdns_user').closest('.form-group').show(); - $('#id_dynamicdns_secret').closest('.form-group').show(); - $('#id_showpw').closest('.form-group').show(); - $('#id_dynamicdns_ipurl').closest('.form-group').show(); - $('#id_use_ipv6').closest('.form-group').show(); - } + set_mode(); })(jQuery); diff --git a/plinth/modules/dynamicdns/templates/dynamicdns.html b/plinth/modules/dynamicdns/templates/dynamicdns.html index 37296190f..169420eb2 100644 --- a/plinth/modules/dynamicdns/templates/dynamicdns.html +++ b/plinth/modules/dynamicdns/templates/dynamicdns.html @@ -7,29 +7,50 @@ {% load i18n %} {% load static %} -{% block configuration %} -
- {% csrf_token %} - - - - {{ form|bootstrap }} - - -
-{% endblock %} - {% block extra_content %}

{% trans "Status" %}

-

{% trans "Last update" %}: {{ status.last_update }}

+ {% if domains_status %} +
+ + + + + + + + + + + {% for domain in domains_status.values %} + + + + + + + {% endfor %} + +
{% trans "Domain" %}{% trans "Last update" %}{% trans "Result" %}{% trans "IP Address" %}
{{ domain.domain }}{{ domain.timestamp|timesince }} + {% if domain.result %} + + {% trans "Success" %} + + {% else %} + + {% trans "Failed" %} + + {% endif %} + {% if domain.error_message %} + ({{ domain.error_message }}) + {% elif domain.error_code %} + ({{ domain.error_code }}) + {% endif %} + {{ domain.ip_address|default_if_none:'-' }}
+
+ {% else %} + {% trans "No status available." %} + {% endif %} {% endblock %} {% block page_js %} diff --git a/plinth/modules/dynamicdns/tests/test_functional.py b/plinth/modules/dynamicdns/tests/test_functional.py index 8f2655532..4f6af00a2 100644 --- a/plinth/modules/dynamicdns/tests/test_functional.py +++ b/plinth/modules/dynamicdns/tests/test_functional.py @@ -3,8 +3,6 @@ Functional, browser based tests for dynamicdns app. """ -import time - import pytest from plinth.tests import functional @@ -14,6 +12,58 @@ pytestmark = [ pytest.mark.dynamicdns ] +_configs = { + 'gnudip1': { + 'service_type': 'gnudip', + 'server': 'localhost', + 'domain': 'freedombox.example.com', + 'username': 'tester', + 'password': 'testingtesting', + 'ip_lookup_url': 'https://ddns.freedombox.org/ip/', + }, + 'gnudip2': { + 'service_type': 'gnudip', + 'server': '127.0.0.1', + 'domain': 'freedombox2.example.com', + 'username': 'tester2', + 'password': 'testingtesting2', + 'ip_lookup_url': 'https://ddns2.freedombox.org/ip/', + }, + 'noip.com': { + 'service_type': 'noip.com', + 'update_url': 'https://localhost/update3/', + 'disable_ssl_cert_check': True, + 'use_http_basic_auth': True, + 'domain': 'freedombox3.example.com', + 'username': 'tester3', + 'password': 'testingtesting3', + 'ip_lookup_url': 'https://ddns3.freedombox.org/ip/', + 'use_ipv6': True, + }, + 'freedns.afraid.org': { + 'service_type': 'freedns.afraid.org', + 'update_url': 'https://localhost/update5/', + 'disable_ssl_cert_check': False, + 'use_http_basic_auth': False, + 'domain': 'freedombox5.example.com', + 'username': '', + 'password': '', + 'ip_lookup_url': '', + 'use_ipv6': False, + }, + 'other': { + 'service_type': 'other', + 'update_url': 'https://localhost/update6/', + 'disable_ssl_cert_check': False, + 'use_http_basic_auth': False, + 'domain': 'freedombox6.example.com', + 'username': 'tester6', + 'password': 'testingtesting6', + 'ip_lookup_url': 'https://ddns6.freedombox.org/ip/', + 'use_ipv6': False, + }, +} + @pytest.fixture(scope='module', autouse=True) def fixture_background(session_browser): @@ -21,104 +71,70 @@ def fixture_background(session_browser): functional.login(session_browser) -def test_capitalized_domain_name(session_browser): - """Test handling of capitalized domain name.""" - _configure(session_browser) - _configure_domain(session_browser, 'FreedomBox.example.com') - assert _get_domain(session_browser) == 'freedombox.example.com' +class TestDynamicDNSApp(functional.BaseAppTests): + app_name = 'dynamicdns' + has_service = False + has_web = False + check_diagnostics = False + + @staticmethod + def test_capitalized_domain_name(session_browser): + """Test handling of capitalized domain name.""" + _configure(session_browser, _configs['gnudip1']) + _configure(session_browser, {'domain': 'FreedomBox.example.com'}) + _assert_has_config(session_browser, + {'domain': 'freedombox.example.com'}) + + @staticmethod + @pytest.mark.parametrize('config_name', list(_configs.keys())) + def test_various_form_values(session_browser, config_name): + """Test feeding various values and check that they are saved.""" + _configure(session_browser, _configs[config_name]) + _assert_has_config(session_browser, _configs[config_name]) + + @staticmethod + @pytest.mark.backups + def test_backup_restore(session_browser): + """Test backup and restore of configuration.""" + _configure(session_browser, _configs['gnudip1']) + functional.backup_create(session_browser, 'dynamicdns', + 'test_dynamicdns') + + _configure(session_browser, _configs['gnudip2']) + functional.backup_restore(session_browser, 'dynamicdns', + 'test_dynamicdns') + + _assert_has_config(session_browser, _configs['gnudip1']) -def test_backup_and_restore(session_browser): - """Test backup and restore of configuration.""" - _configure(session_browser) - functional.backup_create(session_browser, 'dynamicdns', 'test_dynamicdns') - - _change_config(session_browser) - functional.backup_restore(session_browser, 'dynamicdns', 'test_dynamicdns') - - assert _has_original_config(session_browser) - - -# TODO Scenario: Configure GnuDIP service -# TODO Scenario: Configure noip.com service -# TODO Scenario: Configure selfhost.bz service -# TODO Scenario: Configure freedns.afraid.org service -# TODO Scenario: Configure other update URL service - - -def _configure(browser): +def _configure(browser, config): functional.nav_to_module(browser, 'dynamicdns') - browser.links.find_by_href( - '/plinth/sys/dynamicdns/configure/').first.click() - browser.find_by_id('id_enabled').check() - browser.find_by_id('id_service_type').select('GnuDIP') - browser.find_by_id('id_dynamicdns_server').fill('example.com') - browser.find_by_id('id_dynamicdns_domain').fill('freedombox.example.com') - browser.find_by_id('id_dynamicdns_user').fill('tester') - browser.find_by_id('id_dynamicdns_secret').fill('testingtesting') - browser.find_by_id('id_dynamicdns_ipurl').fill( - 'https://ddns.freedombox.org/ip/') - functional.submit(browser) + for key, value in config.items(): + if key == 'service_type': + browser.find_by_id(f'id_{key}').select(value) + elif isinstance(value, bool): + if value: + browser.find_by_id(f'id_{key}').check() + else: + browser.find_by_id(f'id_{key}').uncheck() + else: + browser.find_by_id(f'id_{key}').fill(value) - # After a domain name change, Let's Encrypt will restart the web - # server and could cause a connection failure. - time.sleep(1) - functional.eventually(functional.nav_to_module, [browser, 'dynamicdns']) + functional.submit(browser, form_class='form-configuration') -def _has_original_config(browser): +def _assert_has_config(browser, config): functional.nav_to_module(browser, 'dynamicdns') - browser.links.find_by_href( - '/plinth/sys/dynamicdns/configure/').first.click() - enabled = browser.find_by_id('id_enabled').value - service_type = browser.find_by_id('id_service_type').value - server = browser.find_by_id('id_dynamicdns_server').value - domain = browser.find_by_id('id_dynamicdns_domain').value - user = browser.find_by_id('id_dynamicdns_user').value - ipurl = browser.find_by_id('id_dynamicdns_ipurl').value - if enabled and service_type == 'GnuDIP' and server == 'example.com' \ - and domain == 'freedombox.example.com' and user == 'tester' \ - and ipurl == 'https://ddns.freedombox.org/ip/': - return True - else: - return False + for key, value in config.items(): + if key == 'password': + continue - -def _change_config(browser): - functional.nav_to_module(browser, 'dynamicdns') - browser.links.find_by_href( - '/plinth/sys/dynamicdns/configure/').first.click() - browser.find_by_id('id_enabled').check() - browser.find_by_id('id_service_type').select('GnuDIP') - browser.find_by_id('id_dynamicdns_server').fill('2.example.com') - browser.find_by_id('id_dynamicdns_domain').fill('freedombox2.example.com') - browser.find_by_id('id_dynamicdns_user').fill('tester2') - browser.find_by_id('id_dynamicdns_secret').fill('testingtesting2') - browser.find_by_id('id_dynamicdns_ipurl').fill( - 'https://ddns2.freedombox.org/ip/') - functional.submit(browser) - - # After a domain name change, Let's Encrypt will restart the web - # server and could cause a connection failure. - time.sleep(1) - functional.eventually(functional.nav_to_module, [browser, 'dynamicdns']) - - -def _configure_domain(browser, domain): - functional.nav_to_module(browser, 'dynamicdns') - browser.links.find_by_href( - '/plinth/sys/dynamicdns/configure/').first.click() - browser.find_by_id('id_dynamicdns_domain').fill(domain) - functional.submit(browser) - - # After a domain name change, Let's Encrypt will restart the web - # server and could cause a connection failure. - time.sleep(1) - functional.eventually(functional.nav_to_module, [browser, 'dynamicdns']) + if isinstance(value, bool): + assert browser.find_by_id(f'id_{key}').checked == value + else: + assert value == browser.find_by_id(f'id_{key}').value def _get_domain(browser): functional.nav_to_module(browser, 'dynamicdns') - browser.links.find_by_href( - '/plinth/sys/dynamicdns/configure/').first.click() - return browser.find_by_id('id_dynamicdns_domain').value + return browser.find_by_id('id_domain').value diff --git a/plinth/modules/dynamicdns/urls.py b/plinth/modules/dynamicdns/urls.py index 9ab63d297..af9f805d6 100644 --- a/plinth/modules/dynamicdns/urls.py +++ b/plinth/modules/dynamicdns/urls.py @@ -8,5 +8,6 @@ from django.urls import re_path from . import views urlpatterns = [ - re_path(r'^sys/dynamicdns/$', views.index, name='index'), + re_path(r'^sys/dynamicdns/$', views.DynamicDNSAppView.as_view(), + name='index'), ] diff --git a/plinth/modules/dynamicdns/views.py b/plinth/modules/dynamicdns/views.py index 85e8c2c73..9d0d0034b 100644 --- a/plinth/modules/dynamicdns/views.py +++ b/plinth/modules/dynamicdns/views.py @@ -3,129 +3,81 @@ Views for the dynamicsdns module. """ -import logging +import datetime from django.contrib import messages -from django.template.response import TemplateResponse -from django.utils.translation import gettext as _ +from django.utils.translation import gettext_lazy as _ -from plinth import actions +from plinth import views from plinth.modules import dynamicdns -from plinth.signals import domain_added, domain_removed from .forms import ConfigureForm -logger = logging.getLogger(__name__) -EMPTYSTRING = 'none' +class DynamicDNSAppView(views.AppView): + """Serve configuration page.""" + app_id = 'dynamicdns' + template_name = 'dynamicdns.html' + form_class = ConfigureForm + _error_messages = { + 'timeout': _('Connection timed out'), + 'gaierror': _('Could not find server'), + 'TimeoutError': _('Connection timed out'), + 'ConnectionRefusedError': _('Server refused connection'), + 'ValueError': _('Already up-to-date') + } -def index(request): - """Serve the configuration form.""" - status = dynamicdns.get_status() - form = None + def get_context_data(self, **kwargs): + """Return the context data for rendering the template view.""" + context = super().get_context_data(**kwargs) + status = dynamicdns.get_status() + config = dynamicdns.get_config() + domains_status = {} + for domain_name, domain in status['domains'].items(): + if domain_name not in config['domains']: + continue - if request.method == 'POST': - form = ConfigureForm(request.POST, initial=status) - if form.is_valid(): - _apply_changes(request, status, form.cleaned_data) - status = dynamicdns.get_status() - form = ConfigureForm(initial=status) - else: - form = ConfigureForm(initial=status) + # Create naive datetime object in local timezone + domain['timestamp'] = datetime.datetime.fromtimestamp( + domain['timestamp']) + domains_status[domain_name] = domain + if domain['error_code'] in self._error_messages: + domain['error_message'] = self._error_messages[ + domain['error_code']] - return TemplateResponse( - request, 'dynamicdns.html', { - 'title': _('Configure Dynamic DNS'), - 'app_info': dynamicdns.app.info, - 'form': form, - 'status': get_status() - }) + context['domains_status'] = domains_status + return context + def get_initial(self): + """Get the current values for the form.""" + initial = super().get_initial() + domains = dynamicdns.get_config()['domains'] + domain = list(domains.values())[0] if domains else {} + initial.update(domain) + return domain -def get_status(): - """Return the current status.""" - return {'last_update': _run(['get-last-success'])} + def form_valid(self, form): + """Apply the changes submitted in the form.""" + old_status = form.initial + new_status = form.cleaned_data + if old_status != new_status: + config = dynamicdns.get_config() + try: + del config['domains'][old_status['domain']] + except KeyError: + pass -def _apply_changes(request, old_status, new_status): - """Apply the changes to Dynamic DNS client.""" - logger.info('New status is - %s', new_status) - logger.info('Old status was - %s', old_status) + config['domains'][new_status['domain']] = new_status + dynamicdns.set_config(config) + if old_status.get('domain'): + dynamicdns.notify_domain_removed(old_status['domain']) - if new_status['dynamicdns_secret'] == '': - new_status['dynamicdns_secret'] = old_status['dynamicdns_secret'] + dynamicdns.notify_domain_added(new_status['domain']) + messages.success(self.request, _('Configuration updated')) - if new_status['dynamicdns_ipurl'] == '': - new_status['dynamicdns_ipurl'] = EMPTYSTRING + # Perform an immediate update, even when configuration is not changed. + dynamicdns.update_dns(None) - if new_status['dynamicdns_update_url'] == '': - new_status['dynamicdns_update_url'] = EMPTYSTRING - - if new_status['dynamicdns_server'] == '': - new_status['dynamicdns_server'] = EMPTYSTRING - - if new_status['service_type'] == 'GnuDIP': - new_status['dynamicdns_update_url'] = EMPTYSTRING - else: - new_status['dynamicdns_server'] = EMPTYSTRING - - if old_status != new_status: - disable_ssl_check = "disabled" - use_http_basic_auth = "disabled" - use_ipv6 = "disabled" - - if new_status['disable_SSL_cert_check']: - disable_ssl_check = "enabled" - - if new_status['use_http_basic_auth']: - use_http_basic_auth = "enabled" - - if new_status.get('use_ipv6'): - use_ipv6 = "enabled" - - # Domain name is not case sensitive, but Let's Encrypt - # certificate paths use lower-case domain name. - new_domain_name = new_status['dynamicdns_domain'].lower() - - _run([ - 'configure', - '-s', - new_status['dynamicdns_server'], - '-d', - new_domain_name, - '-u', - new_status['dynamicdns_user'], - '-p', - '-I', - new_status['dynamicdns_ipurl'], - '-U', - new_status['dynamicdns_update_url'], - '-c', - disable_ssl_check, - '-b', - use_http_basic_auth, - '-6', - use_ipv6, - ], input=new_status['dynamicdns_secret'].encode()) - - if old_status['enabled']: - domain_removed.send_robust(sender='dynamicdns', - domain_type='domain-type-dynamic', - name=old_status['dynamicdns_domain']) - _run(['stop']) - - if new_status['enabled']: - domain_added.send_robust(sender='dynamicdns', - domain_type='domain-type-dynamic', - name=new_domain_name, services='__all__') - _run(['start']) - - messages.success(request, _('Configuration updated')) - else: - logger.info('Nothing changed') - - -def _run(arguments, input=None): - """Run a given command and raise exception if there was an error.""" - return actions.superuser_run('dynamicdns', arguments, input=input) + return super().form_valid(form)