diff --git a/plinth/modules/backups/__init__.py b/plinth/modules/backups/__init__.py index 7b339f296..34a0d6e65 100644 --- a/plinth/modules/backups/__init__.py +++ b/plinth/modules/backups/__init__.py @@ -20,6 +20,7 @@ FreedomBox app to manage backup archives. import json import os +import re import paramiko from django.utils.text import get_valid_filename @@ -156,3 +157,12 @@ def is_ssh_hostkey_verified(hostname): known_hosts = paramiko.hostkeys.HostKeys(known_hosts_path) host_keys = known_hosts.lookup(hostname) return host_keys is not None + + +def split_path(path): + """Splits the given path into username, hostname, directory. + + Network interface information is kept in the hostname if provided. + e.g. fe80::2078:6c26:498a:1fa5%wlp1s0 + """ + return re.findall(r'(.*)[@].*?(.*)[:](.*)', path)[0] diff --git a/plinth/modules/backups/forms.py b/plinth/modules/backups/forms.py index 21c9ca68e..120a62c34 100644 --- a/plinth/modules/backups/forms.py +++ b/plinth/modules/backups/forms.py @@ -20,17 +20,20 @@ Forms for backups module. import logging import os +import re import subprocess import tempfile from django import forms -from django.core.validators import FileExtensionValidator +from django.core.exceptions import ValidationError +from django.core.validators import (FileExtensionValidator, + validate_ipv46_address) from django.utils.translation import ugettext from django.utils.translation import ugettext_lazy as _ from plinth.utils import format_lazy -from . import ROOT_REPOSITORY_NAME, api, network_storage +from . import ROOT_REPOSITORY_NAME, api, network_storage, split_path logger = logging.getLogger(__name__) @@ -95,11 +98,41 @@ class UploadForm(forms.Form): ], help_text=_('Select the backup file you want to upload')) +def repository_validator(path): + if not ('@' in path and ':' in path): + raise ValidationError(_('Repository path format incorrect.')) + + username, hostname, dir_path = split_path(path) + hostname = hostname.split('%')[0] + + # Validate username using Unix username regex + if not re.match(r'[a-z_][a-z0-9_-]*$', username): + raise ValidationError(_(f'Invalid username: {username}')) + + # The hostname should either be a valid IP address or hostname + # Follows RFC1123 (hostnames can start with digits) instead of RFC952 + hostname_re = (r'^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*' + r'([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$') + try: + validate_ipv46_address(hostname) + except ValidationError: + if not re.match(hostname_re, hostname): + raise ValidationError(_(f'Invalid hostname: {hostname}')) + + # Validate directory path + if not re.match(r'[^\0]+', dir_path): + raise ValidationError(_(f'Invalid directory path: {dir_path}')) + + # Just for tests. A validator doesn't have to return anything. + return True + + class AddRepositoryForm(forms.Form): repository = forms.CharField( label=_('SSH Repository Path'), strip=True, help_text=_('Path of a new or existing repository. Example: ' - 'user@host:~/path/to/repo/')) + 'user@host:~/path/to/repo/'), + validators=[repository_validator]) ssh_password = forms.CharField( label=_('SSH server password'), strip=True, help_text=_('Password of the SSH Server.
' diff --git a/plinth/modules/backups/tests/test_validators.py b/plinth/modules/backups/tests/test_validators.py new file mode 100644 index 000000000..d540832fc --- /dev/null +++ b/plinth/modules/backups/tests/test_validators.py @@ -0,0 +1,67 @@ +# +# This file is part of FreedomBox. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +""" +Tests for form field validators in backups. +""" + +import pytest +from django.core.exceptions import ValidationError + +from ..forms import repository_validator + + +def validate_repository(valid_list, invalid_list, path_string): + assert all( + repository_validator(path_string.format(item)) for item in valid_list) + for item in invalid_list: + path = path_string.format(item) + with pytest.raises(ValidationError): + repository_validator(path) + + +def test_repository_paths_validation(): + valid_paths = ['sshuser@10.0.2.2:~/backups'] + invalid_paths = [ + 'mary had a little lamb', 'someone@example.com', 'www.example.com', + 'sshuser@hostname' + ] + path_string = '{}' + validate_repository(valid_paths, invalid_paths, path_string) + + +def test_repository_username_validation(): + valid_usernames = ['sshuser', 'cypher_punk-2077', '_user', '_-_'] + invalid_usernames = ['1two', 'somebody else'] + path_string = '{}@example.org:~/backups' + validate_repository(valid_usernames, invalid_usernames, path_string) + + +def test_repository_hostname_validation(): + valid_hostnames = [ + '192.168.0.1', 'fe80::2078:6c26:498a:1fa5%wlps0', 'freedombox.org', + '1.percent.org', 'freedombox', '::1' + ] + invalid_hostnames = ['192.fe80::2089:1fa5'] + path_string = 'user@{}:~/backups' + validate_repository(valid_hostnames, invalid_hostnames, path_string) + + +def test_repository_dir_path_validation(): + valid_dir_paths = ['~/backups', '/home/user/backup-folder_1/'] + invalid_dir_paths = [''] + path_string = 'user@localhost:{}' + validate_repository(valid_dir_paths, invalid_dir_paths, path_string) diff --git a/plinth/modules/backups/views.py b/plinth/modules/backups/views.py index 394cbbe73..9b567666c 100644 --- a/plinth/modules/backups/views.py +++ b/plinth/modules/backups/views.py @@ -20,7 +20,6 @@ Views for the backups app. import logging import os -import re import subprocess import tempfile from contextlib import contextmanager @@ -44,7 +43,7 @@ from plinth.errors import PlinthError from plinth.modules import backups, storage from . import (ROOT_REPOSITORY, SESSION_PATH_VARIABLE, api, forms, - is_ssh_hostkey_verified, network_storage) + is_ssh_hostkey_verified, network_storage, split_path) from .decorators import delete_tmp_backup_file from .errors import BorgRepositoryDoesNotExistError from .repository import (BorgRepository, SshBorgRepository, get_repository, @@ -282,7 +281,7 @@ class AddRepositoryView(SuccessMessageMixin, FormView): """ super().form_valid(form) path = form.cleaned_data.get('repository') - _, hostname, _ = re.split('[@:]', path) + _, hostname, _ = split_path(path) credentials = _get_credentials(form.cleaned_data) repository = SshBorgRepository(path=path, credentials=credentials) repository.save(verified=False) @@ -321,8 +320,12 @@ class VerifySshHostkeyView(SuccessMessageMixin, FormView): return self.repo_data def _get_hostname(self): - _, hostname, _ = re.split('[@:]', self._get_repo_data()['path']) - return hostname + """Get the hostname of the repository. + + Network interface information is stripped out. + """ + _, hostname, _ = split_path(self._get_repo_data()['path']) + return hostname.split('%')[0] @staticmethod def _add_ssh_hostkey(hostname, key_type): @@ -405,10 +408,11 @@ def _validate_remote_repository(path, credentials, uuid=None): - if not empty, check if it's an existing backup repository - else throw an error """ - username, hostname, dir_path = re.split('[@:]', path) + username, hostname, dir_path = split_path(path) dir_path = dir_path.replace('~', '.') password = credentials['ssh_password'] repository = None + # TODO Test with IPv6 connection with _ssh_connection(hostname, username, password) as ssh_client: with ssh_client.open_sftp() as sftp_client: dir_contents = None