mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-03-11 09:04:54 +00:00
backups: Add regex validation for ssh_repository field
Signed-off-by: Joseph Nuthalapati <njoseph@thoughtworks.com>
This commit is contained in:
parent
2c97e1e02e
commit
bd1874d774
@ -20,6 +20,7 @@ FreedomBox app to manage backup archives.
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
import paramiko
|
||||
from django.utils.text import get_valid_filename
|
||||
@ -156,3 +157,12 @@ def is_ssh_hostkey_verified(hostname):
|
||||
known_hosts = paramiko.hostkeys.HostKeys(known_hosts_path)
|
||||
host_keys = known_hosts.lookup(hostname)
|
||||
return host_keys is not None
|
||||
|
||||
|
||||
def split_path(path):
|
||||
"""Splits the given path into username, hostname, directory.
|
||||
|
||||
Network interface information is kept in the hostname if provided.
|
||||
e.g. fe80::2078:6c26:498a:1fa5%wlp1s0
|
||||
"""
|
||||
return re.findall(r'(.*)[@].*?(.*)[:](.*)', path)[0]
|
||||
|
||||
@ -20,17 +20,20 @@ Forms for backups module.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
from django import forms
|
||||
from django.core.validators import FileExtensionValidator
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.validators import (FileExtensionValidator,
|
||||
validate_ipv46_address)
|
||||
from django.utils.translation import ugettext
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from plinth.utils import format_lazy
|
||||
|
||||
from . import ROOT_REPOSITORY_NAME, api, network_storage
|
||||
from . import ROOT_REPOSITORY_NAME, api, network_storage, split_path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -95,11 +98,41 @@ class UploadForm(forms.Form):
|
||||
], help_text=_('Select the backup file you want to upload'))
|
||||
|
||||
|
||||
def repository_validator(path):
|
||||
if not ('@' in path and ':' in path):
|
||||
raise ValidationError(_('Repository path format incorrect.'))
|
||||
|
||||
username, hostname, dir_path = split_path(path)
|
||||
hostname = hostname.split('%')[0]
|
||||
|
||||
# Validate username using Unix username regex
|
||||
if not re.match(r'[a-z_][a-z0-9_-]*$', username):
|
||||
raise ValidationError(_(f'Invalid username: {username}'))
|
||||
|
||||
# The hostname should either be a valid IP address or hostname
|
||||
# Follows RFC1123 (hostnames can start with digits) instead of RFC952
|
||||
hostname_re = (r'^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*'
|
||||
r'([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$')
|
||||
try:
|
||||
validate_ipv46_address(hostname)
|
||||
except ValidationError:
|
||||
if not re.match(hostname_re, hostname):
|
||||
raise ValidationError(_(f'Invalid hostname: {hostname}'))
|
||||
|
||||
# Validate directory path
|
||||
if not re.match(r'[^\0]+', dir_path):
|
||||
raise ValidationError(_(f'Invalid directory path: {dir_path}'))
|
||||
|
||||
# Just for tests. A validator doesn't have to return anything.
|
||||
return True
|
||||
|
||||
|
||||
class AddRepositoryForm(forms.Form):
|
||||
repository = forms.CharField(
|
||||
label=_('SSH Repository Path'), strip=True,
|
||||
help_text=_('Path of a new or existing repository. Example: '
|
||||
'<i>user@host:~/path/to/repo/</i>'))
|
||||
'<i>user@host:~/path/to/repo/</i>'),
|
||||
validators=[repository_validator])
|
||||
ssh_password = forms.CharField(
|
||||
label=_('SSH server password'), strip=True,
|
||||
help_text=_('Password of the SSH Server.<br />'
|
||||
|
||||
67
plinth/modules/backups/tests/test_validators.py
Normal file
67
plinth/modules/backups/tests/test_validators.py
Normal file
@ -0,0 +1,67 @@
|
||||
#
|
||||
# This file is part of FreedomBox.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
"""
|
||||
Tests for form field validators in backups.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
from ..forms import repository_validator
|
||||
|
||||
|
||||
def validate_repository(valid_list, invalid_list, path_string):
|
||||
assert all(
|
||||
repository_validator(path_string.format(item)) for item in valid_list)
|
||||
for item in invalid_list:
|
||||
path = path_string.format(item)
|
||||
with pytest.raises(ValidationError):
|
||||
repository_validator(path)
|
||||
|
||||
|
||||
def test_repository_paths_validation():
|
||||
valid_paths = ['sshuser@10.0.2.2:~/backups']
|
||||
invalid_paths = [
|
||||
'mary had a little lamb', 'someone@example.com', 'www.example.com',
|
||||
'sshuser@hostname'
|
||||
]
|
||||
path_string = '{}'
|
||||
validate_repository(valid_paths, invalid_paths, path_string)
|
||||
|
||||
|
||||
def test_repository_username_validation():
|
||||
valid_usernames = ['sshuser', 'cypher_punk-2077', '_user', '_-_']
|
||||
invalid_usernames = ['1two', 'somebody else']
|
||||
path_string = '{}@example.org:~/backups'
|
||||
validate_repository(valid_usernames, invalid_usernames, path_string)
|
||||
|
||||
|
||||
def test_repository_hostname_validation():
|
||||
valid_hostnames = [
|
||||
'192.168.0.1', 'fe80::2078:6c26:498a:1fa5%wlps0', 'freedombox.org',
|
||||
'1.percent.org', 'freedombox', '::1'
|
||||
]
|
||||
invalid_hostnames = ['192.fe80::2089:1fa5']
|
||||
path_string = 'user@{}:~/backups'
|
||||
validate_repository(valid_hostnames, invalid_hostnames, path_string)
|
||||
|
||||
|
||||
def test_repository_dir_path_validation():
|
||||
valid_dir_paths = ['~/backups', '/home/user/backup-folder_1/']
|
||||
invalid_dir_paths = ['']
|
||||
path_string = 'user@localhost:{}'
|
||||
validate_repository(valid_dir_paths, invalid_dir_paths, path_string)
|
||||
@ -20,7 +20,6 @@ Views for the backups app.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
from contextlib import contextmanager
|
||||
@ -44,7 +43,7 @@ from plinth.errors import PlinthError
|
||||
from plinth.modules import backups, storage
|
||||
|
||||
from . import (ROOT_REPOSITORY, SESSION_PATH_VARIABLE, api, forms,
|
||||
is_ssh_hostkey_verified, network_storage)
|
||||
is_ssh_hostkey_verified, network_storage, split_path)
|
||||
from .decorators import delete_tmp_backup_file
|
||||
from .errors import BorgRepositoryDoesNotExistError
|
||||
from .repository import (BorgRepository, SshBorgRepository, get_repository,
|
||||
@ -282,7 +281,7 @@ class AddRepositoryView(SuccessMessageMixin, FormView):
|
||||
"""
|
||||
super().form_valid(form)
|
||||
path = form.cleaned_data.get('repository')
|
||||
_, hostname, _ = re.split('[@:]', path)
|
||||
_, hostname, _ = split_path(path)
|
||||
credentials = _get_credentials(form.cleaned_data)
|
||||
repository = SshBorgRepository(path=path, credentials=credentials)
|
||||
repository.save(verified=False)
|
||||
@ -321,8 +320,12 @@ class VerifySshHostkeyView(SuccessMessageMixin, FormView):
|
||||
return self.repo_data
|
||||
|
||||
def _get_hostname(self):
|
||||
_, hostname, _ = re.split('[@:]', self._get_repo_data()['path'])
|
||||
return hostname
|
||||
"""Get the hostname of the repository.
|
||||
|
||||
Network interface information is stripped out.
|
||||
"""
|
||||
_, hostname, _ = split_path(self._get_repo_data()['path'])
|
||||
return hostname.split('%')[0]
|
||||
|
||||
@staticmethod
|
||||
def _add_ssh_hostkey(hostname, key_type):
|
||||
@ -405,10 +408,11 @@ def _validate_remote_repository(path, credentials, uuid=None):
|
||||
- if not empty, check if it's an existing backup repository
|
||||
- else throw an error
|
||||
"""
|
||||
username, hostname, dir_path = re.split('[@:]', path)
|
||||
username, hostname, dir_path = split_path(path)
|
||||
dir_path = dir_path.replace('~', '.')
|
||||
password = credentials['ssh_password']
|
||||
repository = None
|
||||
# TODO Test with IPv6 connection
|
||||
with _ssh_connection(hostname, username, password) as ssh_client:
|
||||
with ssh_client.open_sftp() as sftp_client:
|
||||
dir_contents = None
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user