dynamicdns: Rewrite configuration handling and update using URL

- Drop all the bash code.

- Run the update URL code with same logic as before. Don't need to use action
code for it.

- Completely new way to handle configuration: using key/value store. Import old
configuration once and delete it.

- Use a glib scheduler instead of creating a cron job.

- Store and show status from key/value store.

- Handle multiple domains when getting/setting configuration and status. The UI
still shows a single configuration form. To be improved later.

- Catch and report all errors during the update process.

- Drop all NAT detection code.

- Drop selfhost.bz. German only, no free account, no proper TLS on domain, no
easy to test. Existing accounts will continue to work with "other" as the
service type.

- For gnudip update code, add a timeout of 10 seconds, set a buffer size of two
powers and fix handling error messages from server.

Tests:

- GnuDIP:

  - Upon submission of the form, the IP is updated if app is enabled. IP is not
  updated if app is disabled.

  - Every 5 minutes, check is made again and IP is updated.

  - If IP lookup URL is available, update calls are not made if the DNS is
  already up-to-date.

  - If IP lookup URL is not available, update calls are made unconditionally
  every 5 minutes.

- For each of noip.com, freedns.afraid.org and other service:

  - Upon submission of the form, the IP is updated if app is enabled. IP is not
  updated if app is disabled.

  - Every 5 minutes, check is made again and IP is updated.

  - If IP lookup URL is available, update calls are not made if the DNS is
  already up-to-date.

  - If IP lookup URL is not available, update calls are made unconditionally
  every 5 minutes.

- Form validation:

  - Domain field is always mandatory.

  - When type is selected as gnudip, the fields server, username, and password
  are mandatory.

  - When type is selected other than gnudip, the field update URL is mandatory.
  The rest are optional.

  - When the update URL contains a field contains <User>, username is mandatory.
  For <Pass>, password is mandatory. For <Ip>, ip_lookup_url is mandatory.

  - When use HTTP basic auth is checked, the fields username and password are
  mandatory.

  - Password is optional only if a previous password exists. If configuration is
  deleted from kvstore, password is mandatory.

- Configuration import:

  Install dynamicdns without the patch. Add configuration with each of the
  service types. For GnuDIP service type, set two configurations with one with
  and without IP lookup URL. Update to code with the patch. Setup should run.

  - All fields in the configuration should be imported properly.

  - If the previous configuration is disabled, app should be disabled after
  import. Enabled otherwise.

  - Updating the IP address should work immediately after import.

- Enable/Disable: when enabled, IP URL should be enabled every 5 minutes.
When disabled, updates should not happen.

- Status:

  - When status is removed from the DB, it should show that no status is
  available yet.

  - When the form is updated or update happens via the timer, the status is
  shown. It should show success for a proper update. Proper external IP address
  should be shown.

  - Set the server to localhost and submit. Status should show 'Server refused
  connection' message. IP address should be '-'.

  - Set the server to an unknown domain. Status should show 'Could not find
  server' message. IP address should be '-'.

  - Set the server to a known domain. Status should show 'Connection timed out'
  message. IP address should be '-'.

  - Last update time should keep increasing as time passes.

- Backup/restore:

  - Functional tests.

- Javascript:

  - When GnuDIP is selected as the type, the fields server, username, password,
  domain, show password, and IP lookup URL should be shown while other fields
  should be hidden. Same on page load with GnuDIP as pre-selected type.

  - When GnuDIP is not selected as the type, the fields update URL, accept all
  SSL certificates, use basic HTTP auth, domain name, username, password, show
  password, IP lookup URL and use IPv6 fields should be shown and rest of the
  fields should be hidden. Same on page load with non-GnuDIP as pre-selected
  type.

  - When show password is checked, password should be shown and when it is
  unchecked, password is masked.

  - When other service types are selected, the update URL values changes to the
  respective service's URL.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2022-02-03 19:12:49 -08:00 committed by James Valleroy
parent 59c5e58549
commit 7e0156adbe
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
12 changed files with 653 additions and 988 deletions

View File

@ -1,436 +1,156 @@
#!/bin/bash
#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Configuration helper for Dynamic DNS.
"""
############################################################################
# #
# This script is a wrapper around ez-ipupdate and/or wget #
# to update a Dynamic DNS account. The script is used as an #
# interface between plinth and ez-ipupdate #
# the script will store configuration, return configuration #
# to plinth UI and do a dynamic DNS update. The script will #
# also determe if we are behind a NAT device, if we can use #
# ez-ipupdate tool or if we need to do some wget magic #
# #
# Todo: IPv6 #
# Todo: GET WAN IP from Router via UPnP if supported #
# Todo: licence string? #
# author: Daniel Steglich <steglich@datasystems24.de> #
# #
############################################################################
import argparse
import json
import pathlib
import urllib
PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
_conf_dir = pathlib.Path('/etc/ez-ipupdate/')
_active_config = _conf_dir / 'ez-ipupdate.conf'
_inactive_config = _conf_dir / 'ez-ipupdate.inactive'
_helper_config = _conf_dir / 'ez-ipupdate-plinth.cfg'
_cron_job = pathlib.Path('/etc/cron.d/ez-ipupdate')
# static values
WGET=$(which wget)
WGETOPTIONS="-o /dev/null -t 3 -T 3"
EMPTYSTRING="none"
NOIP="0.0.0.0"
# how often do we poll for IP changes
UPDATEMINUTES=5
# if we do not have a URL to look up public IP, how often should we do
# a "blind" update
UPDATEMINUTESUNKNOWN=3600
TOOLNAME=ez-ipupdate
GNUDIP_ACTION=/usr/share/plinth/actions/gnudip
DISABLED_STRING='disabled'
ENABLED_STRING='enabled'
# Dirs and filenames
CFGDIR="/etc/${TOOLNAME}/"
CFG="${CFGDIR}${TOOLNAME}.conf"
CFG_disabled="${CFGDIR}${TOOLNAME}.inactive"
IPFILE="${CFGDIR}${TOOLNAME}.currentIP"
STATUSFILE="${CFGDIR}${TOOLNAME}.status"
LASTUPDATE="${CFGDIR}/last-update"
HELPERCFG="${CFGDIR}${TOOLNAME}-plinth.cfg"
CRONJOB="/etc/cron.d/${TOOLNAME}"
PIDFILE="/var/run/ez-ipupdate.pid"
def parse_arguments():
""" Return parsed command line arguments as dictionary. """
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
# this function will parse commandline options
doGetOpt()
{
basicauth=0
ignoreCertError=0
useIPv6=0
subparsers.add_parser('export-config',
help='Print configuration in JSON format')
subparsers.add_parser('clean', help='Remove all old configuration files')
while getopts ":s:d:u:P:I:U:c:b:6:p" opt; do
case ${opt} in
s)
if [ "${OPTARG}" != "${EMPTYSTRING}" ];then
server=${OPTARG}
else
server=""
fi
;;
d)
host=${OPTARG}
;;
u)
user=${OPTARG}
;;
P)
pass=${OPTARG}
;;
p)
if read -t 0; then
IFS= read -r pass
fi
;;
I)
if [ "${OPTARG}" != "${EMPTYSTRING}" ];then
ipurl=${OPTARG}
else
ipurl=""
fi
;;
U)
if [ "${OPTARG}" != "${EMPTYSTRING}" ];then
updateurl=${OPTARG}
else
updateurl=""
fi
;;
b)
basicauth=${OPTARG}
;;
c)
ignoreCertError=${OPTARG}
;;
6)
useIPv6=${OPTARG}
;;
\?)
echo "Invalid option: -${OPTARG}" >&2
exit 1
;;
esac
done
}
subparsers.add_parser('update', help='For backwards compatibility')
subparser = subparsers.add_parser('success',
help='For backwards compatibility')
subparser.add_argument('wan_ip_address')
# this function will write a persistent config file to disk
doWriteCFG()
{
mkdir ${CFGDIR} 2> /dev/null
# always write to the inactive config - needs to be enabled via "start" command later
local out_file=${CFG_disabled}
subparsers.required = True
return parser.parse_args()
# reset the last update time
echo 0 > ${LASTUPDATE}
# reset the last updated IP
echo "0.0.0.0" > ${IPFILE}
def _read_configuration(path, separator='='):
"""Read ez-ipupdate configuration."""
config = {}
for line in path.read_text().splitlines():
if line.startswith('#'):
continue
# reset last update (if there is one)
rm ${STATUSFILE} 2> /dev/null
parts = line.partition(separator)
if parts[1]:
config[parts[0].strip()] = parts[2].strip()
else:
config[parts[0].strip()] = True
# find the interface (always the default gateway interface)
default_interface=$(ip route |grep default |awk '{print $5}')
return config
# store the given options in ez-ipupdate compatible config file
{
echo "host=${host}"
echo "server=${server}"
echo "user=${user}:${pass}"
echo "service-type=gnudip"
echo "retrys=3"
echo "wildcard"
} > ${out_file}
# store UPDATE URL params
{
echo "POSTURL ${updateurl}"
echo "POSTAUTH ${basicauth}"
echo "POSTSSLIGNORE ${ignoreCertError}"
echo "POSTUSEIPV6 ${useIPv6}"
} > ${HELPERCFG}
# check if we are behind a NAT Router
echo "IPURL ${ipurl}" >> ${HELPERCFG}
}
# this function will read the config file from disk
# special treatment for empty strings is done here:
# plinth will give empty strings like: ''
# but we don't want this single quotes to be used
doReadCFG()
{
host=""
server=""
user=""
pass=""
ipurl=""
def subcommand_export_config(_):
"""Print the old ez-ipupdate configuration in JSON format."""
input_config = {}
if _active_config.exists():
input_config = _read_configuration(_active_config)
elif _inactive_config.exists():
input_config = _read_configuration(_inactive_config)
if [ ! -z "${cfgfile}" ];then
host=$(grep ^host= "${cfgfile}" 2> /dev/null | cut -d = -f 2-)
server=$(grep ^server= "${cfgfile}" 2> /dev/null | cut -d = -f 2- | grep -v ^\'\')
user=$(grep ^user= "${cfgfile}" 2> /dev/null | cut -d = -f 2- | cut -d : -f 1 )
pass=$(grep ^user= "${cfgfile}" 2> /dev/null | cut -d = -f 2- | cut -d : -f 2-)
fi
helper = {}
if _helper_config.exists():
helper.update(_read_configuration(_helper_config, separator=' '))
if [ ! -z ${HELPERCFG} ];then
ipurl=$(grep ^IPURL "${HELPERCFG}" 2> /dev/null |awk '{print $2}' |grep -v ^\'\')
updateurl=$(grep POSTURL "${HELPERCFG}" 2> /dev/null |awk '{print $2}' |grep -v ^\'\')
basicauth=$(grep POSTAUTH "${HELPERCFG}" 2> /dev/null |awk '{print $2}' |grep -v ^\'\')
ignoreCertError=$(grep POSTSSLIGNORE "${HELPERCFG}" 2> /dev/null |awk '{print $2}' |grep -v ^\'\')
useIPv6=$(grep POSTUSEIPV6 "${HELPERCFG}" 2> /dev/null |awk '{print $2}' |grep -v ^\'\')
fi
}
def _clean(value):
value_map = {'enabled': True, 'disabled': False, '': None}
return value_map.get(value, value)
# replace vars from url: i.e.:
# https://example.com/update.php?domain=<Domain>&User=<User>&Pass=<Pass>
# also this function will remove the surounding single quotes from the URL string
# as plinth will add them
doReplaceVars()
{
local url=$(echo "${updateurl}" | sed "s/<Ip>/${wanip}/g")
url=$(echo "${url}" | sed "s/<Domain>/${host}/g")
url=$(echo "${url}" | sed "s/<User>/${user}/g")
url=$(echo "${url}" | sed "s/<Pass>/${pass}/g")
url=$(echo "${url}" | sed "s/'//g")
updateurl=${url}
logger "expanded update URL as ${url}"
}
domain = {
'service_type': 'gnudip',
'domain': input_config.get('host'),
'server': input_config.get('server'),
'username': input_config.get('user', '').split(':')[0] or None,
'password': input_config.get('user', '').split(':')[-1] or None,
'ip_lookup_url': helper.get('IPURL'),
'update_url': _clean(helper.get('POSTURL')) or None,
'use_http_basic_auth': _clean(helper.get('POSTAUTH')),
'disable_ssl_cert_check': _clean(helper.get('POSTSSLIGNORE')),
'use_ipv6': _clean(helper.get('POSTUSEIPV6')),
}
# doReadCFG() needs to be run before this
# this function will return all configured parameters in a way that
# plinth will understand (plinth know the order of
# parameters this function will return)
doStatus()
{
PROC=$(pgrep ${TOOLNAME})
if [ -f "${CRONJOB}" ];then
echo "${ENABLED_STRING}"
elif [ ! -z "${PROC}" ];then
echo "${ENABLED_STRING}"
else
echo "${DISABLED_STRING}"
fi
if [ ! -z "${server}" ];then
echo "${server}"
else
echo "${DISABLED_STRING}"
fi
if [ ! -z "${host}" ];then
echo "${host}"
else
echo "${DISABLED_STRING}"
fi
if [ ! -z "${user}" ];then
echo "${user}"
else
echo "${DISABLED_STRING}"
fi
if [ ! -z "${pass}" ];then
echo "${pass}"
else
echo "${DISABLED_STRING}"
fi
if [ ! -z "${ipurl}" ];then
echo "${ipurl}"
else
echo "${DISABLED_STRING}"
fi
if [ ! -z "${updateurl}" ];then
echo "${updateurl}"
else
echo "${DISABLED_STRING}"
fi
if [ ! -z "${ignoreCertError}" ];then
echo "${ignoreCertError}"
else
echo "${DISABLED_STRING}"
fi
if [ ! -z "${basicauth}" ];then
echo "${basicauth}"
else
echo "${DISABLED_STRING}"
fi
if [ ! -z "${useIPv6}" ];then
echo "${useIPv6}"
else
echo "${DISABLED_STRING}"
fi
}
if isinstance(domain['update_url'], bool):
# 'POSTURL ' is a line found in the configuration file
domain['update_url'] = None
# ask a public WEB Server for the WAN IP we are comming from
# and store this ip within $wanip
doGetWANIP()
{
if [ ! -z "${ipurl}" ];then
local wgetoptions="${WGETOPTIONS}"
if [ "${useIPv6}" = "enabled" ];then
wgetoptions="${wgetoptions} -6"
else
wgetoptions="${wgetoptions} -4"
fi
local cmd="${WGET} ${wgetoptions} -O - ${ipurl}"
if [ "${useIPv6}" = "enabled" ];then
wanip=$($cmd | tr A-F a-f | sed s/[^0-9a-f:]//g)
else
wanip=$($cmd | sed s/[^0-9.]//g)
fi
[ -z "${wanip}" ] && wanip=${NOIP}
else
# no WAN IP found because of missing check URL
wanip=${NOIP}
fi
}
if not domain['server']:
domain['service_type'] = 'other'
update_url = domain['update_url']
try:
server = urllib.parse.urlparse(update_url).netloc
service_types = {
'dynupdate.noip.com': 'noip.com',
'dynupdate.no-ip.com': 'noip.com',
'freedns.afraid.org': 'freedns.afraid.org'
}
domain['service_type'] = service_types.get(server, 'other')
except ValueError:
pass
# actualy do the update (using wget or gnudip action or even both)
# this function is called via cronjob
doUpdate()
{
if [ "${useIPv6}" = "enabled" ];then
local dnsentry="$(host -t AAAA "${host}" | sed 's/.*address\s*//')"
else
local dnsentry="$(host -t A "${host}" | sed 's/.*address\s*//')"
fi
if [ "${dnsentry}" = "${wanip}" ];then
return
fi
if [ ! -z "${server}" ];then
if "${GNUDIP_ACTION}" update; then
${0} success ${wanip}
else
${0} failed
fi
fi
if [ ! -z "${updateurl}" ];then
doReplaceVars
if [ "${basicauth}" = "enabled" ];then
WGETOPTIONS="${WGETOPTIONS} --user ${user} --password ${pass} "
fi
if [ "${ignoreCertError}" = "enabled" ];then
WGETOPTIONS="${WGETOPTIONS} --no-check-certificate "
fi
if [ "${useIPv6}" = "enabled" ];then
WGETOPTIONS="${WGETOPTIONS} -6"
else
WGETOPTIONS="${WGETOPTIONS} -4"
fi
local cmd="${WGET} -O /dev/null ${WGETOPTIONS} ${updateurl}"
$cmd
# ToDo: check the returning text from WEB Server. User need to give expected string.
if [ ${?} -eq 0 ];then
${0} success ${wanip}
else
${0} failed
fi
fi
}
# Old logic for 'enabling' the app is as follows: If behind NAT, add
# cronjob. If not behind NAT and type is update URL, add cronjob. If not
# behind NAT and type is GnuDIP, move inactive configuration to active
# configuration and start the ez-ipupdate daemon.
enabled = False
if _cron_job.exists() or (domain['service_type'] == 'gnudip'
and _active_config.exists()):
enabled = True
umask u=rw,g=,o=
cmd=${1}
shift
# decide which config to use
cfgfile="/tmp/none"
[ -f ${CFG_disabled} ] && cfgfile=${CFG_disabled}
[ -f ${CFG} ] && cfgfile=${CFG}
output_config = {'enabled': enabled, 'domains': {domain['domain']: domain}}
print(json.dumps(output_config))
# check what action is requested
case ${cmd} in
configure)
doGetOpt "${@}"
doWriteCFG
;;
start)
doGetWANIP
# if we use gnudip, enable gnudip.
gnudipServer=$(grep ^server= ${cfgfile} 2> /dev/null | cut -d = -f 2- |grep -v ^\'\')
if [ ! -f ${CFG} -a ! -z "${gnudipServer}" ];then
mv ${CFG_disabled} ${CFG}
fi
# add a cronjob
echo "*/${UPDATEMINUTES} * * * * root ${0} update" > $CRONJOB
$0 update
;;
update)
doReadCFG
dnsentry=$(nslookup "${host}"|tail -n2|grep A|sed s/[^0-9.]//g)
doGetWANIP
echo ${wanip} > ${IPFILE}
grep -v execute ${cfgfile} > ${cfgfile}.tmp
mv ${cfgfile}.tmp ${cfgfile}
echo "execute=${0} success ${wanip}" >> ${cfgfile}
# if we know our WAN IP, only update if IP changes
if [ "${dnsentry}" != "${wanip}" -a "${wanip}" != ${NOIP} ];then
doUpdate
else
# If nothing has changed, nothing needs to be done but we
# need to write the success status (if no success was
# recorded yet because maybe DNS record was up to date
# when script is executed for the first time)
${0} success ${wanip}
fi
# if we don't know our WAN IP do a blind update once a hour
if [ "${wanip}" = ${NOIP} ];then
currenttime=$(date +%s)
LAST=0
[ -f ${LASTUPDATE} ] && LAST=$(cat ${LASTUPDATE})
diff=$((currenttime - LAST))
if [ ${diff} -gt ${UPDATEMINUTESUNKNOWN} ];then
doUpdate
fi
fi
;;
stop)
rm ${CRONJOB} 2> /dev/null
/etc/init.d/${TOOLNAME} stop
kill "$(cat ${PIDFILE})" 2> /dev/null
mv ${CFG} ${CFG_disabled}
;;
success)
date=$(date)
echo "DNS record is up to date (${date})" > ${STATUSFILE}
date +%s > ${LASTUPDATE}
# if called from cronjob, the current IP is given as parameter
if [ $# -eq 1 ];then
echo "${1}" > ${IPFILE}
else
# if called from ez-ipupdate daemon, no WAN IP is given as parameter
doGetWANIP
echo ${wanip} > ${IPFILE}
fi
;;
failed)
date=$(date)
echo "last update failed (${date})" > ${STATUSFILE}
;;
get-last-success)
if [ -f ${STATUSFILE} ];then
cat ${STATUSFILE}
else
echo "no successful update recorded since last config change"
fi
;;
status)
doReadCFG
doStatus
;;
clean)
rm ${CFGDIR}/*
rm ${CRONJOB}
;;
*)
echo "usage: status|configure <options>|start|stop|update|clean|success [updated IP]|failed|get-last-success"
echo ""
echo "options are:"
echo "-s <server> Gnudip Server address"
echo "-d <domain> Domain to be updated"
echo "-u <user> Account username"
echo "-P <password> Account password"
echo "-p Read Account Password from stdin"
echo "-I <URL to look up public IP> A URL which returns the IP of the client who is requesting"
echo "-U <update URL> The update URL (a HTTP GET on this URL will be done)"
echo "-c <1|0> disable SSL check on Update URL"
echo "-b <1|0> use HTTP basic auth on Update URL"
echo "-6 use IPv6 type address"
echo ""
echo "update do a one time update"
echo "clean delete configuration"
echo "success store update success and optional the updated IP"
echo "failed store update failure"
echo "get-last-success return date of last successful update"
;;
esac
exit 0
def subcommand_clean(_):
"""Remove all old configuration files."""
last_update = _conf_dir / 'last-update'
status = _conf_dir / 'ez-ipupdate.status'
current_ip = _conf_dir / 'ez-ipupdate.currentIP'
cleanup_files = [
_active_config, _inactive_config, last_update, _helper_config, status,
current_ip
]
for cleanup_file in cleanup_files:
try:
cleanup_file.rename(cleanup_file.with_suffix('.bak'))
except FileNotFoundError:
pass
_cron_job.unlink(missing_ok=True)
def subcommand_update(_):
"""Empty subcommand kept only for backwards compatibility.
Drop after stable release.
"""
def subcommand_success(_):
"""Empty subcommand kept only for backwards compatibility.
Drop after stable release.
"""
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()

View File

@ -1,77 +0,0 @@
#!/usr/bin/python3
# -*- mode: python -*-
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
GnuDIP client for updating Dynamic DNS records.
"""
import argparse
import logging
import pathlib
import sys
from plinth.modules.dynamicdns import gnudip
logger = logging.getLogger(__name__)
CONFIG_FILE = pathlib.Path('/etc/ez-ipupdate/ez-ipupdate.conf')
def parse_arguments():
"""Return parsed command line arguments as dictionary."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
subparsers.add_parser('update', help='Update Dynamic DNS record')
subparsers.required = True
return parser.parse_args()
def subcommand_update(_):
"""Update Dynamic DNS record.
Uses settings from ez-ipupdate config file.
"""
if not CONFIG_FILE.is_file():
logger.info('GnuDIP configuration not found.')
return
config = {}
with CONFIG_FILE.open() as cf:
lines = cf.readlines()
for line in lines:
if '=' in line:
items = line.split('=')
key = items[0].strip()
value = items[1].strip()
else:
key = line.strip()
value = True
config[key] = value
if not all(['host' in config, 'server' in config, 'user' in config]):
logger.warn('GnuDIP configuration is not complete.')
return
username, password = config['user'].split(':')
result, new_ip = gnudip.update(config['server'], config['host'], username,
password)
if result == 0 and new_ip:
print(new_ip)
return result
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
sys.exit(subcommand_method(arguments))
if __name__ == '__main__':
main()

View File

@ -485,7 +485,7 @@ class EnableState(LeaderComponent):
kvstore.set(self.key, True)
def disable(self):
"""Store that the app/component is enabled."""
"""Store that the app/component is disabled."""
from plinth import kvstore
kvstore.set(self.key, False)

View File

@ -3,18 +3,26 @@
FreedomBox app to configure ez-ipupdate client.
"""
import json
import logging
import subprocess
import time
import urllib
from django.utils.translation import gettext_lazy as _
from plinth import actions
from plinth import app as app_module
from plinth import cfg, menu
from plinth import cfg, glib, kvstore, menu
from plinth.modules.backups.components import BackupRestore
from plinth.modules.names.components import DomainType
from plinth.modules.users.components import UsersAndGroups
from plinth.signals import domain_added
from plinth.signals import domain_added, domain_removed
from plinth.utils import format_lazy
from . import manifest
from . import gnudip, manifest
logger = logging.getLogger(__name__)
_description = [
format_lazy(
@ -45,7 +53,7 @@ class DynamicDNSApp(app_module.App):
app_id = 'dynamicdns'
_version = 1
_version = 2
def __init__(self):
"""Create components for the app."""
@ -62,6 +70,9 @@ class DynamicDNSApp(app_module.App):
'dynamicdns:index', parent_url_name='system')
self.add(menu_item)
enable_state = app_module.EnableState('enable-state-dynamicdns')
self.add(enable_state)
domain_type = DomainType('domain-type-dynamic',
_('Dynamic Domain Name'), 'dynamicdns:index',
can_have_certificate=True)
@ -75,106 +86,195 @@ class DynamicDNSApp(app_module.App):
**manifest.backup)
self.add(backup_restore)
@staticmethod
def post_init():
def post_init(self):
"""Perform post initialization operations."""
current_status = get_status()
if current_status['enabled']:
domain_added.send_robust(sender='dynamicdns',
domain_type='domain-type-dynamic',
name=current_status['dynamicdns_domain'],
services='__all__')
config = get_config()
if self.is_enabled():
for domain_name in config['domains']:
notify_domain_added(domain_name)
def is_enabled(self):
"""Return whether all the leader components are enabled.
Return True when there are no leader components and DynamicDNS setup
is done.
"""
return super().is_enabled() and get_status()['enabled']
# Check every 5 minutes (every 3 minutes in debug mode) to perform
# dynamic DNS updates.
interval = 180 if cfg.develop else 300
glib.schedule(interval, update_dns)
def setup(helper, old_version=None):
"""Install and configure the module."""
app.setup(old_version)
if not old_version:
helper.call('post', app.enable)
if old_version == 1:
config = actions.superuser_run('dynamicdns', ['export-config'])
config = json.loads(config)
if config['enabled']:
app.enable()
else:
app.disable()
del config['enabled']
set_config(config)
actions.superuser_run('dynamicdns', ['clean'])
def _query_external_address(domain):
"""Return the IP address by querying an external server."""
if not domain['ip_lookup_url']:
return None
ip_option = '-6' if domain['use_ipv6'] else '-4'
try:
ip_address = subprocess.check_output([
'wget', ip_option, '-o', '/dev/null', '-t', '3', '-T', '3', '-O',
'-', domain['ip_lookup_url']
])
return ip_address.decode().strip().lower()
except subprocess.CalledProcessError as exception:
logger.warning('Unable to lookup external IP with URL %s: %s',
domain['ip_lookup_url'], exception)
return None
def _query_dns_address(domain):
"""Return the IP address in the DNS records."""
ip_option = 'AAAA' if domain['use_ipv6'] else 'A'
try:
output = subprocess.check_output(
['host', '-t', ip_option, domain['domain']])
return output.decode().split(' ')[-1].strip().lower()
except subprocess.CalledProcessError as exception:
logger.warning('Unable to lookup DNS for host %s: %s',
domain['domain'], exception)
return None
def _update_using_url(domain, external_address):
"""Update DNS entry using an update URL."""
update_url = domain['update_url']
quote = urllib.parse.quote
if external_address:
update_url = update_url.replace('<Ip>', quote(external_address))
if domain['domain']:
update_url = update_url.replace('<Domain>', quote(domain['domain']))
if domain['username']:
update_url = update_url.replace('<User>', quote(domain['username']))
if domain['password']:
update_url = update_url.replace('<Pass>', quote(domain['password']))
options = ['-o', '/dev/null', '-t', '3', '-T', '3']
if domain['use_http_basic_auth']:
options += [
'--user', domain['username'], '--password', domain['password']
]
if domain['disable_ssl_cert_check']:
options += ['--no-check-certificate']
if domain['use_ipv6']:
options += ['-6']
else:
options += ['-4']
command = ['wget', '-O', '/dev/null'] + options + [update_url]
process = subprocess.run(command, check=False)
return process.returncode == 0, external_address
def _update_dns_for_domain(domain):
"""Update DNS records for a single domain."""
result = False
ip_address = None
error = None
try:
dns_address = _query_dns_address(domain)
external_address = _query_external_address(domain)
if dns_address == external_address and dns_address is not None:
logger.info('Dynamic domain %s is up-to-date: %s',
domain['domain'], dns_address)
result = True
ip_address = dns_address
error = ValueError('up-to-date')
else:
logger.info(
'Updating dynamic domain %s, DNS address %s, looked up '
'external address %s', domain['domain'], dns_address,
external_address)
if domain['service_type'] == 'gnudip':
result, ip_address = gnudip.update(domain['server'],
domain['domain'],
domain['username'],
domain['password'])
else:
result, ip_address = _update_using_url(domain,
external_address)
except Exception as exception:
logger.exception('Failed to be update Dynamic DNS - %s', exception)
error = exception
set_status(domain, result, ip_address, error)
def update_dns(_data):
"""For all configured domains, check and up to date DNS records."""
config = get_config()
if not app.is_enabled():
return
# Update for each domain
for domain in config['domains'].values():
_update_dns_for_domain(domain)
def get_status():
"""Return the current status."""
# TODO: use key/value instead of hard coded value list
status = {}
output = actions.superuser_run('dynamicdns', ['status'])
details = output.split()
status['enabled'] = (output.split()[0] == 'enabled')
if len(details) > 1:
if details[1] == 'disabled':
status['dynamicdns_server'] = ''
else:
status['dynamicdns_server'] = details[1].replace("'", "")
else:
status['dynamicdns_server'] = ''
if len(details) > 2:
if details[2] == 'disabled':
status['dynamicdns_domain'] = ''
else:
status['dynamicdns_domain'] = details[2].replace("'", "")
else:
status['dynamicdns_domain'] = ''
if len(details) > 3:
if details[3] == 'disabled':
status['dynamicdns_user'] = ''
else:
status['dynamicdns_user'] = details[3].replace("'", "")
else:
status['dynamicdns_user'] = ''
if len(details) > 4:
if details[4] == 'disabled':
status['dynamicdns_secret'] = ''
else:
status['dynamicdns_secret'] = details[4].replace("'", "")
else:
status['dynamicdns_secret'] = ''
if len(details) > 5:
if details[5] == 'disabled':
status['dynamicdns_ipurl'] = ''
else:
status['dynamicdns_ipurl'] = details[5].replace("'", "")
else:
status['dynamicdns_ipurl'] = ''
if len(details) > 6:
if details[6] == 'disabled':
status['dynamicdns_update_url'] = ''
else:
status['dynamicdns_update_url'] = details[6].replace("'", "")
else:
status['dynamicdns_update_url'] = ''
if len(details) > 7:
status['disable_SSL_cert_check'] = (output.split()[7] == 'enabled')
else:
status['disable_SSL_cert_check'] = False
if len(details) > 8:
status['use_http_basic_auth'] = (output.split()[8] == 'enabled')
else:
status['use_http_basic_auth'] = False
if len(details) > 9:
status['use_ipv6'] = (output.split()[9] == 'enabled')
else:
status['use_ipv6'] = False
if not status['dynamicdns_server'] and not status['dynamicdns_update_url']:
status['service_type'] = 'GnuDIP'
elif not status['dynamicdns_server'] and status['dynamicdns_update_url']:
status['service_type'] = 'other'
else:
status['service_type'] = 'GnuDIP'
"""Return the status of recent update for each domain."""
status = kvstore.get_default('dynamicdns_status', '{}')
status = json.loads(status)
status.setdefault('domains', {})
return status
def set_status(domain, result, ip_address, error=None):
"""Set the status of most recent update."""
status = kvstore.get_default('dynamicdns_status', '{}')
status = json.loads(status)
domains = status.setdefault('domains', {})
domains[domain['domain']] = {
'domain': domain['domain'],
'result': result,
'ip_address': ip_address,
'error_code': error.__class__.__name__ if error else None,
'error_message': error.args[0] if error and error.args else None,
'timestamp': int(time.time()),
}
kvstore.set('dynamicdns_status', json.dumps(status))
def get_config():
"""Return the current configuration."""
default_config = {'domains': {}}
config = kvstore.get_default('dynamicdns_config', '{}')
return json.loads(config) or default_config
def set_config(config):
"""Set a new configuration."""
kvstore.set('dynamicdns_config', json.dumps(config))
def notify_domain_added(domain_name):
"""Send a signal that domain has been added."""
domain_added.send_robust(sender='dynamicdns',
domain_type='domain-type-dynamic',
name=domain_name, services='__all__')
def notify_domain_removed(domain_name):
"""Send a signal that domain has been removed."""
domain_removed.send_robust(sender='dynamicdns',
domain_type='domain-type-dynamic',
name=domain_name)

View File

@ -12,24 +12,13 @@ from plinth import cfg
from plinth.utils import format_lazy
class TrimmedCharField(forms.CharField):
"""Trim the contents of a CharField."""
def clean(self, value):
"""Clean and validate the field value"""
if value:
value = value.strip()
return super(TrimmedCharField, self).clean(value)
class ConfigureForm(forms.Form):
"""Form to configure the Dynamic DNS client."""
help_update_url = \
gettext_lazy('The Variables &lt;User&gt;, &lt;Pass&gt;, &lt;Ip&gt;, '
'&lt;Domain&gt; may be used within the URL. For details '
'see the update URL templates of the example providers.')
help_services = \
help_service_type = \
gettext_lazy('Please choose an update protocol according to your '
'provider. If your provider does not support the GnuDIP '
'protocol or your provider is not listed you may use '
@ -41,16 +30,16 @@ class ConfigureForm(forms.Form):
help_domain = format_lazy(
gettext_lazy('The public domain name you want to use to reach your '
'{box_name}.'), box_name=gettext_lazy(cfg.box_name))
help_disable_ssl = \
help_disable_ssl_cert_check = \
gettext_lazy('Use this option if your provider uses self signed '
'certificates.')
help_http_auth = \
help_use_http_basic_auth = \
gettext_lazy('If this option is selected, your username and password '
'will be used for HTTP basic authentication.')
help_secret = \
help_password = \
gettext_lazy('Leave this field empty if you want to keep your '
'current password.')
help_ip_url = format_lazy(
help_ip_lookup_url = format_lazy(
gettext_lazy('Optional Value. If your {box_name} is not connected '
'directly to the Internet (i.e. connected to a NAT '
'router) this URL is used to determine the real '
@ -58,98 +47,99 @@ class ConfigureForm(forms.Form):
'the client comes from (example: '
'https://ddns.freedombox.org/ip/).'),
box_name=gettext_lazy(cfg.box_name))
help_user = \
help_username = \
gettext_lazy('The username that was used when the account was '
'created.')
provider_choices = (('GnuDIP', gettext_lazy('GnuDIP')),
('noip', 'noip.com'), ('selfhost', 'selfhost.bz'),
('freedns', 'freedns.afraid.org'),
('other', gettext_lazy('other update URL')))
enabled = forms.BooleanField(label=gettext_lazy('Enable Dynamic DNS'),
required=False)
provider_choices = (('gnudip', gettext_lazy('GnuDIP')),
('noip.com', 'noip.com'), ('freedns.afraid.org',
'freedns.afraid.org'),
('other', gettext_lazy('Other update URL')))
service_type = forms.ChoiceField(label=gettext_lazy('Service Type'),
help_text=help_services,
help_text=help_service_type,
choices=provider_choices)
dynamicdns_server = TrimmedCharField(
server = forms.CharField(
label=gettext_lazy('GnuDIP Server Address'), required=False,
help_text=help_server, validators=[
validators.RegexValidator(r'^[\w-]{1,63}(\.[\w-]{1,63})*$',
gettext_lazy('Invalid server name'))
])
dynamicdns_update_url = TrimmedCharField(label=gettext_lazy('Update URL'),
required=False,
help_text=help_update_url)
update_url = forms.CharField(label=gettext_lazy('Update URL'),
required=False, help_text=help_update_url)
disable_SSL_cert_check = forms.BooleanField(
disable_ssl_cert_check = forms.BooleanField(
label=gettext_lazy('Accept all SSL certificates'),
help_text=help_disable_ssl, required=False)
help_text=help_disable_ssl_cert_check, required=False)
use_http_basic_auth = forms.BooleanField(
label=gettext_lazy('Use HTTP basic authentication'),
help_text=help_http_auth, required=False)
help_text=help_use_http_basic_auth, required=False)
dynamicdns_domain = TrimmedCharField(
domain = forms.CharField(
label=gettext_lazy('Domain Name'), help_text=help_domain,
required=False, validators=[
required=True, validators=[
validators.RegexValidator(r'^[\w-]{1,63}(\.[\w-]{1,63})*$',
gettext_lazy('Invalid domain name'))
])
dynamicdns_user = TrimmedCharField(label=gettext_lazy('Username'),
required=False, help_text=help_user)
username = forms.CharField(label=gettext_lazy('Username'), required=False,
help_text=help_username)
dynamicdns_secret = TrimmedCharField(label=gettext_lazy('Password'),
widget=forms.PasswordInput(),
required=False, help_text=help_secret)
password = forms.CharField(label=gettext_lazy('Password'),
widget=forms.PasswordInput(), required=False,
help_text=help_password)
showpw = forms.BooleanField(label=gettext_lazy('Show password'),
required=False)
show_password = forms.BooleanField(label=gettext_lazy('Show password'),
required=False)
dynamicdns_ipurl = TrimmedCharField(
ip_lookup_url = forms.CharField(
label=gettext_lazy('URL to look up public IP'), required=False,
help_text=help_ip_url,
validators=[validators.URLValidator(schemes=['http', 'https', 'ftp'])])
help_text=help_ip_lookup_url,
validators=[validators.URLValidator(schemes=['http', 'https'])])
use_ipv6 = forms.BooleanField(
label=gettext_lazy('Use IPv6 instead of IPv4'), required=False)
def clean(self):
cleaned_data = super(ConfigureForm, self).clean()
dynamicdns_secret = cleaned_data.get('dynamicdns_secret')
dynamicdns_update_url = cleaned_data.get('dynamicdns_update_url')
dynamicdns_user = cleaned_data.get('dynamicdns_user')
dynamicdns_domain = cleaned_data.get('dynamicdns_domain')
dynamicdns_server = cleaned_data.get('dynamicdns_server')
"""Further validate and transform field data."""
cleaned_data = super().clean()
# Domain name is not case sensitive, but Let's Encrypt
# certificate paths use lower-case domain name.
cleaned_data['domain'] = cleaned_data['domain'].lower()
update_url = cleaned_data.get('update_url')
password = cleaned_data.get('password')
service_type = cleaned_data.get('service_type')
old_dynamicdns_secret = self.initial['dynamicdns_secret']
old_password = self.initial.get('password')
# Clear the fields which are not in use
if service_type == 'GnuDIP':
dynamicdns_update_url = ''
if not password:
# If password is not set, use old password
cleaned_data['password'] = old_password
message = _('This field is required.')
if service_type == 'gnudip':
for field_name in ['server', 'username', 'password']:
if not cleaned_data.get(field_name):
self.add_error(field_name, message)
else:
dynamicdns_server = ''
if not update_url:
self.add_error('update_url', message)
if cleaned_data.get('enabled'):
# Check if gnudip server or update URL is filled
if not dynamicdns_update_url and not dynamicdns_server:
raise forms.ValidationError(
_('Please provide an update URL or a GnuDIP server '
'address'))
param_map = (('username', '<User>'), ('password', '<Pass>'),
('ip_lookup_url', '<Ip>'))
for field_name, param in param_map:
if (update_url and param in update_url
and not cleaned_data.get(field_name)):
self.add_error(field_name, message)
if dynamicdns_server and not dynamicdns_user:
raise forms.ValidationError(
_('Please provide a GnuDIP username'))
if cleaned_data.get('use_http_basic_auth'):
for field_name in ('username', 'password'):
if not cleaned_data.get(field_name):
self.add_error(field_name, message)
if dynamicdns_server and not dynamicdns_domain:
raise forms.ValidationError(
_('Please provide a GnuDIP domain name'))
# Check if a password was set before or a password is set now
if dynamicdns_server and \
not dynamicdns_secret and not old_dynamicdns_secret:
raise forms.ValidationError(_('Please provide a password'))
del cleaned_data['show_password']
return cleaned_data

View File

@ -5,10 +5,11 @@ GnuDIP client for updating Dynamic DNS records.
import hashlib
import logging
import socket
import socket as socket_module
BUF_SIZE = 50
BUF_SIZE = 128
GNUDIP_PORT = 3495
TIMEOUT = 10
logger = logging.getLogger(__name__)
@ -18,22 +19,22 @@ def update(server, domain, username, password):
domain = domain.removeprefix(username + '.')
password_digest = hashlib.md5(password.encode()).hexdigest()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
logger.debug('Connecting to %s:%d', server, GNUDIP_PORT)
s.connect((server, GNUDIP_PORT))
salt = s.recv(BUF_SIZE).decode().strip()
with socket_module.socket(socket_module.AF_INET,
socket_module.SOCK_STREAM) as socket:
logger.debug('Connecting to %s:%d, timeout %ss', server, GNUDIP_PORT,
TIMEOUT)
socket.settimeout(TIMEOUT)
socket.connect((server, GNUDIP_PORT))
salt = socket.recv(BUF_SIZE).decode().strip()
salted_digest = password_digest + '.' + salt
final_digest = hashlib.md5(salted_digest.encode()).hexdigest()
update_request = username + ':' + final_digest + ':' + domain + ':2\n'
s.sendall(update_request.encode())
socket.sendall(update_request.encode())
response = s.recv(BUF_SIZE).decode().strip()
result, new_ip = response.split(':')
result = int(result)
if result == 0:
logger.info('GnuDIP update success: %s', new_ip)
else:
logger.warn('GnuDIP update error: %s', response)
response = socket.recv(BUF_SIZE).decode().strip()
result, _, new_ip = response.partition(':')
result = (int(result) == 0)
new_ip = new_ip if result else None
return result, new_ip

View File

@ -1,3 +1,10 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
backup = {'config': {'directories': ['/etc/ez-ipupdate/']}}
backup = {
'config': {
'directories': ['/etc/ez-ipupdate/']
},
'settings': [
'dynamicdns_enable', 'dynamicdns_config', 'dynamicdns_status'
],
}

View File

@ -23,123 +23,57 @@
*/
(function($) {
var SELFHOST = 'https://carol.selfhost.de/update?username=<User>&' +
'password=<Pass>&myip=<Ip>';
var NOIP = 'http://dynupdate.no-ip.com/nic/update?hostname=' +
'<Domain>&myip=<Ip>';
var NOIP = 'https://<User>:<Pass>@dynupdate.no-ip.com/nic/update?' +
'hostname=<Domain>';
var FREEDNS = 'https://freedns.afraid.org/dynamic/update.php?' +
'_YOURAPIKEYHERE_';
// Hide all form fields
$('.form-group').hide();
// Show the enable checkbox
$('#id_enabled').closest('.form-group').show();
if ($('#id_enabled').prop('checked')) {
// Show all form fields
show_all();
// Set the selectbox to the last configured value
select_service();
}
$('#id_enabled').change(function() {
if ($('#id_enabled').prop('checked')) {
show_all();
if ($("#id_service_type option:selected").text() == "GnuDIP") {
set_gnudip_mode();
} else {
set_update_url_mode();
}
} else {
$('.form-group').hide();
$('#id_enabled').closest('.form-group').show();
}
});
$('#id_service_type').change(function() {
var service_type = $("#id_service_type option:selected").text();
if (service_type == "GnuDIP") {
set_gnudip_mode();
} else {
set_update_url_mode();
if (service_type == "noip.com") {
$('#id_dynamicdns_update_url').val(NOIP);
$('#id_use_http_basic_auth').prop('checked', true);
} else {
$('#id_use_http_basic_auth').prop('checked', false);
}
if (service_type == "selfhost.bz") {
$('#id_dynamicdns_update_url').val(SELFHOST);
}
if (service_type == "freedns.afraid.org") {
$('#id_dynamicdns_update_url').val(FREEDNS);
}
if (service_type == "other update URL") {
$('#id_dynamicdns_update_url').val('');
}
set_mode();
var service_type = $("#id_service_type").val();
if (service_type == "noip.com") {
$('#id_update_url').val(NOIP);
} else if (service_type == "freedns.afraid.org") {
$('#id_update_url').val(FREEDNS);
} else { // GnuDIP and other
$('#id_update_url').val('');
}
});
$('#id_showpw').change(function() {
// Changing type attribute from password to text is prevented by most
// browsers make a new form field works for me
if ($('#id_showpw').prop('checked')) {
$('#id_dynamicdns_secret').replaceWith(
$('#id_dynamicdns_secret').clone().attr(
'type', 'text'));
$('#id_show_password').change(function() {
if ($('#id_show_password').prop('checked')) {
$('#id_password').prop('type', 'text');
} else {
$('#id_dynamicdns_secret').replaceWith(
$('#id_dynamicdns_secret').clone().attr(
'type', 'password'));
$('#id_password').prop('type', 'password');
}
});
function select_service() {
var update_url = $("#id_dynamicdns_update_url").val();
if ($("#id_dynamicdns_server").val().length == 0) {
set_update_url_mode();
if (update_url == NOIP) {
$("#id_service_type").val("noip");
} else if (update_url == SELFHOST) {
$("#id_service_type").val("selfhost");
} else if (update_url == FREEDNS) {
$("#id_service_type").val("freedns");
} else {
$("#id_service_type").val("other");
}
} else {
$("#id_service_type").val("GnuDIP");
function set_mode() {
var service_type = $("#id_service_type").val();
if (service_type == "gnudip") {
set_gnudip_mode();
} else {
set_update_url_mode();
}
}
function set_gnudip_mode() {
$('#id_dynamicdns_update_url').closest('.form-group').hide();
$('#id_disable_SSL_cert_check').closest('.form-group').hide();
$('.form-group').show();
$('#id_update_url').closest('.form-group').hide();
$('#id_disable_ssl_cert_check').closest('.form-group').hide();
$('#id_use_http_basic_auth').closest('.form-group').hide();
$('#id_use_ipv6').closest('.form-group').hide();
$('#id_dynamicdns_server').closest('.form-group').show();
$('#id_server').closest('.form-group').show();
}
function set_update_url_mode() {
$('#id_dynamicdns_update_url').closest('.form-group').show();
$('#id_disable_SSL_cert_check').closest('.form-group').show();
$('#id_update_url').closest('.form-group').show();
$('#id_disable_ssl_cert_check').closest('.form-group').show();
$('#id_use_http_basic_auth').closest('.form-group').show();
$('#id_use_ipv6').closest('.form-group').show();
$('#id_dynamicdns_server').closest('.form-group').hide();
$('#id_server').closest('.form-group').hide();
}
function show_all() {
$('#id_enabled').closest('.form-group').show();
$('#id_service_type').closest('.form-group').show();
$('#id_dynamicdns_server').closest('.form-group').show();
$('#id_dynamicdns_update_url').closest('.form-group').show();
$('#id_disable_SSL_cert_check').closest('.form-group').show();
$('#id_use_http_basic_auth').closest('.form-group').show();
$('#id_dynamicdns_domain').closest('.form-group').show();
$('#id_dynamicdns_user').closest('.form-group').show();
$('#id_dynamicdns_secret').closest('.form-group').show();
$('#id_showpw').closest('.form-group').show();
$('#id_dynamicdns_ipurl').closest('.form-group').show();
$('#id_use_ipv6').closest('.form-group').show();
}
set_mode();
})(jQuery);

View File

@ -7,29 +7,50 @@
{% load i18n %}
{% load static %}
{% block configuration %}
<form class="form" method="post">
{% csrf_token %}
<noscript>
{% blocktrans trimmed %}
You have disabled Javascript. Dynamic form mode is disabled
and some helper functions may not work (but the main
functionality should work).
{% endblocktrans %}
</noscript>
{{ form|bootstrap }}
<input type="submit" class="btn btn-primary"
value="{% trans "Update setup" %}"/>
</form>
{% endblock %}
{% block extra_content %}
<h3>{% trans "Status" %}</h3>
<p>{% trans "Last update" %}: {{ status.last_update }}</p>
{% if domains_status %}
<div class="table-responsive">
<table class="table">
<thead>
<tr>
<th>{% trans "Domain" %}</th>
<th>{% trans "Last update" %}</th>
<th>{% trans "Result" %}</th>
<th>{% trans "IP Address" %}</th>
</tr>
</thead>
<tbody>
{% for domain in domains_status.values %}
<tr>
<td>{{ domain.domain }}</td>
<td>{{ domain.timestamp|timesince }}</td>
<td>
{% if domain.result %}
<span class="badge badge-success">
{% trans "Success" %}
</span>
{% else %}
<span class="badge badge-warning">
{% trans "Failed" %}
</span>
{% endif %}
{% if domain.error_message %}
({{ domain.error_message }})
{% elif domain.error_code %}
({{ domain.error_code }})
{% endif %}
</td>
<td>{{ domain.ip_address|default_if_none:'-' }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
{% trans "No status available." %}
{% endif %}
{% endblock %}
{% block page_js %}

View File

@ -3,8 +3,6 @@
Functional, browser based tests for dynamicdns app.
"""
import time
import pytest
from plinth.tests import functional
@ -14,6 +12,58 @@ pytestmark = [
pytest.mark.dynamicdns
]
_configs = {
'gnudip1': {
'service_type': 'gnudip',
'server': 'localhost',
'domain': 'freedombox.example.com',
'username': 'tester',
'password': 'testingtesting',
'ip_lookup_url': 'https://ddns.freedombox.org/ip/',
},
'gnudip2': {
'service_type': 'gnudip',
'server': '127.0.0.1',
'domain': 'freedombox2.example.com',
'username': 'tester2',
'password': 'testingtesting2',
'ip_lookup_url': 'https://ddns2.freedombox.org/ip/',
},
'noip.com': {
'service_type': 'noip.com',
'update_url': 'https://localhost/update3/',
'disable_ssl_cert_check': True,
'use_http_basic_auth': True,
'domain': 'freedombox3.example.com',
'username': 'tester3',
'password': 'testingtesting3',
'ip_lookup_url': 'https://ddns3.freedombox.org/ip/',
'use_ipv6': True,
},
'freedns.afraid.org': {
'service_type': 'freedns.afraid.org',
'update_url': 'https://localhost/update5/',
'disable_ssl_cert_check': False,
'use_http_basic_auth': False,
'domain': 'freedombox5.example.com',
'username': '',
'password': '',
'ip_lookup_url': '',
'use_ipv6': False,
},
'other': {
'service_type': 'other',
'update_url': 'https://localhost/update6/',
'disable_ssl_cert_check': False,
'use_http_basic_auth': False,
'domain': 'freedombox6.example.com',
'username': 'tester6',
'password': 'testingtesting6',
'ip_lookup_url': 'https://ddns6.freedombox.org/ip/',
'use_ipv6': False,
},
}
@pytest.fixture(scope='module', autouse=True)
def fixture_background(session_browser):
@ -21,104 +71,70 @@ def fixture_background(session_browser):
functional.login(session_browser)
def test_capitalized_domain_name(session_browser):
"""Test handling of capitalized domain name."""
_configure(session_browser)
_configure_domain(session_browser, 'FreedomBox.example.com')
assert _get_domain(session_browser) == 'freedombox.example.com'
class TestDynamicDNSApp(functional.BaseAppTests):
app_name = 'dynamicdns'
has_service = False
has_web = False
check_diagnostics = False
@staticmethod
def test_capitalized_domain_name(session_browser):
"""Test handling of capitalized domain name."""
_configure(session_browser, _configs['gnudip1'])
_configure(session_browser, {'domain': 'FreedomBox.example.com'})
_assert_has_config(session_browser,
{'domain': 'freedombox.example.com'})
@staticmethod
@pytest.mark.parametrize('config_name', list(_configs.keys()))
def test_various_form_values(session_browser, config_name):
"""Test feeding various values and check that they are saved."""
_configure(session_browser, _configs[config_name])
_assert_has_config(session_browser, _configs[config_name])
@staticmethod
@pytest.mark.backups
def test_backup_restore(session_browser):
"""Test backup and restore of configuration."""
_configure(session_browser, _configs['gnudip1'])
functional.backup_create(session_browser, 'dynamicdns',
'test_dynamicdns')
_configure(session_browser, _configs['gnudip2'])
functional.backup_restore(session_browser, 'dynamicdns',
'test_dynamicdns')
_assert_has_config(session_browser, _configs['gnudip1'])
def test_backup_and_restore(session_browser):
"""Test backup and restore of configuration."""
_configure(session_browser)
functional.backup_create(session_browser, 'dynamicdns', 'test_dynamicdns')
_change_config(session_browser)
functional.backup_restore(session_browser, 'dynamicdns', 'test_dynamicdns')
assert _has_original_config(session_browser)
# TODO Scenario: Configure GnuDIP service
# TODO Scenario: Configure noip.com service
# TODO Scenario: Configure selfhost.bz service
# TODO Scenario: Configure freedns.afraid.org service
# TODO Scenario: Configure other update URL service
def _configure(browser):
def _configure(browser, config):
functional.nav_to_module(browser, 'dynamicdns')
browser.links.find_by_href(
'/plinth/sys/dynamicdns/configure/').first.click()
browser.find_by_id('id_enabled').check()
browser.find_by_id('id_service_type').select('GnuDIP')
browser.find_by_id('id_dynamicdns_server').fill('example.com')
browser.find_by_id('id_dynamicdns_domain').fill('freedombox.example.com')
browser.find_by_id('id_dynamicdns_user').fill('tester')
browser.find_by_id('id_dynamicdns_secret').fill('testingtesting')
browser.find_by_id('id_dynamicdns_ipurl').fill(
'https://ddns.freedombox.org/ip/')
functional.submit(browser)
for key, value in config.items():
if key == 'service_type':
browser.find_by_id(f'id_{key}').select(value)
elif isinstance(value, bool):
if value:
browser.find_by_id(f'id_{key}').check()
else:
browser.find_by_id(f'id_{key}').uncheck()
else:
browser.find_by_id(f'id_{key}').fill(value)
# After a domain name change, Let's Encrypt will restart the web
# server and could cause a connection failure.
time.sleep(1)
functional.eventually(functional.nav_to_module, [browser, 'dynamicdns'])
functional.submit(browser, form_class='form-configuration')
def _has_original_config(browser):
def _assert_has_config(browser, config):
functional.nav_to_module(browser, 'dynamicdns')
browser.links.find_by_href(
'/plinth/sys/dynamicdns/configure/').first.click()
enabled = browser.find_by_id('id_enabled').value
service_type = browser.find_by_id('id_service_type').value
server = browser.find_by_id('id_dynamicdns_server').value
domain = browser.find_by_id('id_dynamicdns_domain').value
user = browser.find_by_id('id_dynamicdns_user').value
ipurl = browser.find_by_id('id_dynamicdns_ipurl').value
if enabled and service_type == 'GnuDIP' and server == 'example.com' \
and domain == 'freedombox.example.com' and user == 'tester' \
and ipurl == 'https://ddns.freedombox.org/ip/':
return True
else:
return False
for key, value in config.items():
if key == 'password':
continue
def _change_config(browser):
functional.nav_to_module(browser, 'dynamicdns')
browser.links.find_by_href(
'/plinth/sys/dynamicdns/configure/').first.click()
browser.find_by_id('id_enabled').check()
browser.find_by_id('id_service_type').select('GnuDIP')
browser.find_by_id('id_dynamicdns_server').fill('2.example.com')
browser.find_by_id('id_dynamicdns_domain').fill('freedombox2.example.com')
browser.find_by_id('id_dynamicdns_user').fill('tester2')
browser.find_by_id('id_dynamicdns_secret').fill('testingtesting2')
browser.find_by_id('id_dynamicdns_ipurl').fill(
'https://ddns2.freedombox.org/ip/')
functional.submit(browser)
# After a domain name change, Let's Encrypt will restart the web
# server and could cause a connection failure.
time.sleep(1)
functional.eventually(functional.nav_to_module, [browser, 'dynamicdns'])
def _configure_domain(browser, domain):
functional.nav_to_module(browser, 'dynamicdns')
browser.links.find_by_href(
'/plinth/sys/dynamicdns/configure/').first.click()
browser.find_by_id('id_dynamicdns_domain').fill(domain)
functional.submit(browser)
# After a domain name change, Let's Encrypt will restart the web
# server and could cause a connection failure.
time.sleep(1)
functional.eventually(functional.nav_to_module, [browser, 'dynamicdns'])
if isinstance(value, bool):
assert browser.find_by_id(f'id_{key}').checked == value
else:
assert value == browser.find_by_id(f'id_{key}').value
def _get_domain(browser):
functional.nav_to_module(browser, 'dynamicdns')
browser.links.find_by_href(
'/plinth/sys/dynamicdns/configure/').first.click()
return browser.find_by_id('id_dynamicdns_domain').value
return browser.find_by_id('id_domain').value

View File

@ -8,5 +8,6 @@ from django.urls import re_path
from . import views
urlpatterns = [
re_path(r'^sys/dynamicdns/$', views.index, name='index'),
re_path(r'^sys/dynamicdns/$', views.DynamicDNSAppView.as_view(),
name='index'),
]

View File

@ -3,129 +3,81 @@
Views for the dynamicsdns module.
"""
import logging
import datetime
from django.contrib import messages
from django.template.response import TemplateResponse
from django.utils.translation import gettext as _
from django.utils.translation import gettext_lazy as _
from plinth import actions
from plinth import views
from plinth.modules import dynamicdns
from plinth.signals import domain_added, domain_removed
from .forms import ConfigureForm
logger = logging.getLogger(__name__)
EMPTYSTRING = 'none'
class DynamicDNSAppView(views.AppView):
"""Serve configuration page."""
app_id = 'dynamicdns'
template_name = 'dynamicdns.html'
form_class = ConfigureForm
_error_messages = {
'timeout': _('Connection timed out'),
'gaierror': _('Could not find server'),
'TimeoutError': _('Connection timed out'),
'ConnectionRefusedError': _('Server refused connection'),
'ValueError': _('Already up-to-date')
}
def index(request):
"""Serve the configuration form."""
status = dynamicdns.get_status()
form = None
def get_context_data(self, **kwargs):
"""Return the context data for rendering the template view."""
context = super().get_context_data(**kwargs)
status = dynamicdns.get_status()
config = dynamicdns.get_config()
domains_status = {}
for domain_name, domain in status['domains'].items():
if domain_name not in config['domains']:
continue
if request.method == 'POST':
form = ConfigureForm(request.POST, initial=status)
if form.is_valid():
_apply_changes(request, status, form.cleaned_data)
status = dynamicdns.get_status()
form = ConfigureForm(initial=status)
else:
form = ConfigureForm(initial=status)
# Create naive datetime object in local timezone
domain['timestamp'] = datetime.datetime.fromtimestamp(
domain['timestamp'])
domains_status[domain_name] = domain
if domain['error_code'] in self._error_messages:
domain['error_message'] = self._error_messages[
domain['error_code']]
return TemplateResponse(
request, 'dynamicdns.html', {
'title': _('Configure Dynamic DNS'),
'app_info': dynamicdns.app.info,
'form': form,
'status': get_status()
})
context['domains_status'] = domains_status
return context
def get_initial(self):
"""Get the current values for the form."""
initial = super().get_initial()
domains = dynamicdns.get_config()['domains']
domain = list(domains.values())[0] if domains else {}
initial.update(domain)
return domain
def get_status():
"""Return the current status."""
return {'last_update': _run(['get-last-success'])}
def form_valid(self, form):
"""Apply the changes submitted in the form."""
old_status = form.initial
new_status = form.cleaned_data
if old_status != new_status:
config = dynamicdns.get_config()
try:
del config['domains'][old_status['domain']]
except KeyError:
pass
def _apply_changes(request, old_status, new_status):
"""Apply the changes to Dynamic DNS client."""
logger.info('New status is - %s', new_status)
logger.info('Old status was - %s', old_status)
config['domains'][new_status['domain']] = new_status
dynamicdns.set_config(config)
if old_status.get('domain'):
dynamicdns.notify_domain_removed(old_status['domain'])
if new_status['dynamicdns_secret'] == '':
new_status['dynamicdns_secret'] = old_status['dynamicdns_secret']
dynamicdns.notify_domain_added(new_status['domain'])
messages.success(self.request, _('Configuration updated'))
if new_status['dynamicdns_ipurl'] == '':
new_status['dynamicdns_ipurl'] = EMPTYSTRING
# Perform an immediate update, even when configuration is not changed.
dynamicdns.update_dns(None)
if new_status['dynamicdns_update_url'] == '':
new_status['dynamicdns_update_url'] = EMPTYSTRING
if new_status['dynamicdns_server'] == '':
new_status['dynamicdns_server'] = EMPTYSTRING
if new_status['service_type'] == 'GnuDIP':
new_status['dynamicdns_update_url'] = EMPTYSTRING
else:
new_status['dynamicdns_server'] = EMPTYSTRING
if old_status != new_status:
disable_ssl_check = "disabled"
use_http_basic_auth = "disabled"
use_ipv6 = "disabled"
if new_status['disable_SSL_cert_check']:
disable_ssl_check = "enabled"
if new_status['use_http_basic_auth']:
use_http_basic_auth = "enabled"
if new_status.get('use_ipv6'):
use_ipv6 = "enabled"
# Domain name is not case sensitive, but Let's Encrypt
# certificate paths use lower-case domain name.
new_domain_name = new_status['dynamicdns_domain'].lower()
_run([
'configure',
'-s',
new_status['dynamicdns_server'],
'-d',
new_domain_name,
'-u',
new_status['dynamicdns_user'],
'-p',
'-I',
new_status['dynamicdns_ipurl'],
'-U',
new_status['dynamicdns_update_url'],
'-c',
disable_ssl_check,
'-b',
use_http_basic_auth,
'-6',
use_ipv6,
], input=new_status['dynamicdns_secret'].encode())
if old_status['enabled']:
domain_removed.send_robust(sender='dynamicdns',
domain_type='domain-type-dynamic',
name=old_status['dynamicdns_domain'])
_run(['stop'])
if new_status['enabled']:
domain_added.send_robust(sender='dynamicdns',
domain_type='domain-type-dynamic',
name=new_domain_name, services='__all__')
_run(['start'])
messages.success(request, _('Configuration updated'))
else:
logger.info('Nothing changed')
def _run(arguments, input=None):
"""Run a given command and raise exception if there was an error."""
return actions.superuser_run('dynamicdns', arguments, input=input)
return super().form_valid(form)