Backups: unittests for accessing repository with borg directly

- adapt action and write tests for accessing a borg repo directly
  via borg+ssh, without mounting it
- some docstring updates

Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Michael Pimmer 2018-12-10 22:47:31 +01:00 committed by James Valleroy
parent a5ab22babf
commit bb95229a91
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
5 changed files with 65 additions and 60 deletions

View File

@ -282,8 +282,11 @@ def read_password():
def get_env(arguments, use_credentials=False):
"""Create encryption and ssh kwargs out of given arguments"""
env = dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes')
if arguments.encryption_passphrase:
env['BORG_PASSPHRASE'] = arguments.encryption_passphrase
# always provide BORG_PASSPHRASE (also if empty) so borg does not get stuck
# while asking for a passphrase.
passphrase = arguments.encryption_passphrase if \
arguments.encryption_passphrase else ''
env['BORG_PASSPHRASE'] = passphrase
if use_credentials:
if arguments.ssh_keyfile:
env['BORG_RSH'] = "ssh -i %s" % arguments.ssh_keyfile

View File

@ -29,7 +29,7 @@ REQUIRED_FIELDS = ['path', 'storage_type', 'added_by_module']
def get_storages(storage_type=None):
"""Get network storage"""
"""Get network storages"""
storages = kvstore.get_default(NETWORK_STORAGE_KEY, {})
if storages:
storages = json.loads(storages)
@ -45,7 +45,7 @@ def get(uuid):
def update_or_add(storage):
"""Update an existing or create a new network location"""
"""Update an existing or create a new network storage"""
for field in REQUIRED_FIELDS:
if field not in storage:
raise ValueError('missing storage parameter: %s' % field)

View File

@ -78,9 +78,9 @@ class BorgRepository(object):
self._path = path
self.credentials = credentials
def append_credentials(self, arguments):
def append_encryption_passphrase(self, arguments, credentials):
"""Append '--encryption-passphrase' argument to backups call"""
passphrase = self.credentials.get('encryption_passphrase', None)
passphrase = credentials.get('encryption_passphrase', None)
if passphrase:
arguments += ['--encryption-passphrase', passphrase]
return arguments
@ -138,7 +138,7 @@ class BorgRepository(object):
def get_zipstream(self, archive_name):
archive_path = self.get_archive_path(archive_name)
args = ['export-tar', '--path', archive_path]
args = self.append_credentials(args)
args = self.append_encryption_passphrase(args, self.credentials)
kwargs = {'run_in_background': True,
'bufsize': 1}
proc = self._run('backups', args, kwargs=kwargs)
@ -148,7 +148,6 @@ class BorgRepository(object):
for archive in self.list_archives():
if archive['name'] == name:
return archive
return None
def get_archive_apps(self, archive_name):
@ -203,8 +202,9 @@ class SshBorgRepository(BorgRepository):
def __init__(self, uuid=None, path=None, credentials=None, automount=True,
**kwargs):
"""
Provide a uuid to instanciate an existing repository,
or 'ssh_path' and 'credentials' for a new repository.
Instanciate a new repository.
If only a uuid is given, load the values from kvstore.
"""
is_new_instance = not bool(uuid)
if not uuid:
@ -229,7 +229,7 @@ class SshBorgRepository(BorgRepository):
"""
Return the path to use for backups actions.
This is either the mountpoint or the remote ssh path,
This could either be the mountpoint or the remote ssh path,
depending on whether borg is running on the remote server.
"""
return self.mountpoint
@ -289,7 +289,6 @@ class SshBorgRepository(BorgRepository):
def mount(self):
if self.is_mounted:
return
arguments = ['mount', '--mountpoint', self.mountpoint, '--path',
self._path]
arguments, kwargs = self._append_sshfs_arguments(arguments,
@ -316,6 +315,7 @@ class SshBorgRepository(BorgRepository):
logger.error(err)
def _append_sshfs_arguments(self, arguments, credentials, kwargs=None):
"""Add credentials to a run command and kwargs"""
if kwargs is None:
kwargs = {}
if 'ssh_password' in credentials and credentials['ssh_password']:
@ -324,29 +324,13 @@ class SshBorgRepository(BorgRepository):
arguments += ['--ssh-keyfile', credentials['ssh_keyfile']]
return (arguments, kwargs)
def _append_run_arguments(self, arguments, credentials):
kwargs = {}
passphrase = credentials.get('encryption_passphrase', None)
if passphrase:
arguments += ['--encryption-passphrase', passphrase]
# TODO: use or remove
"""
if 'ssh_password' in credentials and credentials['ssh_password']:
kwargs['input'] = credentials['ssh_password'].encode()
if 'ssh_keyfile' in credentials and credentials['ssh_keyfile']:
arguments += ['--ssh-keyfile', credentials['ssh_keyfile']]
"""
return (arguments, kwargs)
def run(self, arguments, superuser=True):
"""Run a backups action script command.
Add credentials via self._append_run_arguments to the backup script.
"""
"""Add credentials and run a backups action script command."""
for key in self.credentials.keys():
if key not in self.KNOWN_CREDENTIALS:
raise ValueError('Unknown credentials entry: %s' % key)
arguments = self.append_credentials(arguments)
arguments = self.append_encryption_passphrase(arguments,
self.credentials)
return self._run('backups', arguments, superuser=superuser)

View File

@ -18,6 +18,7 @@
Test the backups action script.
"""
import json
import os
import shutil
import tempfile
@ -43,6 +44,7 @@ class TestBackups(unittest.TestCase):
dummy_credentials = {
'ssh_password': 'invalid_password'
}
repokey_encryption_passphrase = '12345'
@classmethod
def setUpClass(cls):
@ -115,17 +117,47 @@ class TestBackups(unittest.TestCase):
content = repository.list_archives()
self.assertEquals(len(content), 0)
@unittest.skipUnless(euid == 0 and test_config.backups_ssh_path,
'Needs to be root and ssh password provided')
def test_remote_backup_actions(self):
"""
Test creating an encrypted remote repository using borg directly.
This relies on borgbackups being installed on the remote machine.
"""
credentials = self.get_credentials(add_encryption_passphrase=True)
repo_path = os.path.join(test_config.backups_ssh_path,
str(uuid.uuid1()))
arguments = ['init', '--path', repo_path, '--encryption', 'repokey']
arguments, kwargs = self.append_borg_arguments(arguments, credentials)
actions.superuser_run('backups', arguments, **kwargs)
arguments = ['info', '--path', repo_path]
arguments, kwargs = self.append_borg_arguments(arguments, credentials)
info = actions.superuser_run('backups', arguments, **kwargs)
info = json.loads(info)
self.assertEquals(info['encryption']['mode'], 'repokey')
def append_borg_arguments(self, arguments, credentials):
"""Append run arguments for running borg directly"""
kwargs = {}
passphrase = credentials.get('encryption_passphrase', None)
if passphrase:
arguments += ['--encryption-passphrase', passphrase]
if 'ssh_password' in credentials and credentials['ssh_password']:
kwargs['input'] = credentials['ssh_password'].encode()
if 'ssh_keyfile' in credentials and credentials['ssh_keyfile']:
arguments += ['--ssh-keyfile', credentials['ssh_keyfile']]
return (arguments, kwargs)
@unittest.skipUnless(euid == 0 and test_config.backups_ssh_path,
'Needs to be root and ssh password provided')
def test_sshfs_mount_password(self):
"""Test (un)mounting if password for a remote location is given"""
credentials = self.get_credentials()
if not credentials:
return
ssh_path = test_config.backups_ssh_path
repository = SshBorgRepository(uuid=str(uuid.uuid1()),
path=ssh_path,
repository = SshBorgRepository(path=ssh_path,
credentials=credentials,
automount=False)
repository.mount()
@ -138,12 +170,9 @@ class TestBackups(unittest.TestCase):
def test_sshfs_mount_keyfile(self):
"""Test (un)mounting if keyfile for a remote location is given"""
credentials = self.get_credentials()
if not credentials:
return
ssh_path = test_config.backups_ssh_path
repository = SshBorgRepository(uuid=str(uuid.uuid1()),
path=ssh_path,
repository = SshBorgRepository(path=ssh_path,
credentials=credentials,
automount=False)
repository.mount()
@ -151,26 +180,9 @@ class TestBackups(unittest.TestCase):
repository.umount()
self.assertFalse(repository.is_mounted)
@unittest.skipUnless(euid == 0, 'Needs to be root')
def test_ssh_create_encrypted_repository(self):
credentials = self.get_credentials()
encrypted_repo = os.path.join(self.backup_directory.name,
'borgbackup_encrypted')
credentials['encryption_passphrase'] = '12345'
# using SshBorgRepository to provide credentials because
# BorgRepository does not allow creating encrypted repositories
# TODO: find better way to test encryption
repository = SshBorgRepository(uuid=str(uuid.uuid1()),
path=encrypted_repo,
credentials=credentials,
automount=False)
repository.create_repository('repokey')
self.assertTrue(bool(repository.get_info()))
@unittest.skipUnless(euid == 0, 'Needs to be root')
def test_access_nonexisting_url(self):
repository = SshBorgRepository(uuid=str(uuid.uuid1()),
path=self.nonexisting_repo_url,
repository = SshBorgRepository(path=self.nonexisting_repo_url,
credentials=self.dummy_credentials,
automount=False)
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
@ -179,14 +191,13 @@ class TestBackups(unittest.TestCase):
@unittest.skipUnless(euid == 0, 'Needs to be root')
def test_inaccessible_repo_url(self):
"""Test accessing an existing URL with wrong credentials"""
repository = SshBorgRepository(uuid=str(uuid.uuid1()),
path=self.inaccessible_repo_url,
repository = SshBorgRepository(path=self.inaccessible_repo_url,
credentials=self.dummy_credentials,
automount=False)
with self.assertRaises(backups.errors.BorgError):
repository.get_info()
def get_credentials(self):
def get_credentials(self, add_encryption_passphrase=False):
"""
Get access params for a remote location.
Return an empty dict if no valid access params are found.
@ -196,4 +207,8 @@ class TestBackups(unittest.TestCase):
credentials['ssh_password'] = test_config.backups_ssh_password
elif test_config.backups_ssh_keyfile:
credentials['ssh_keyfile'] = test_config.backups_ssh_keyfile
if add_encryption_passphrase:
credentials['encryption_passphrase'] = \
self.repokey_encryption_passphrase
return credentials

View File

@ -22,7 +22,10 @@ To customize these settings, create a 'config_local.py' and override
the variables defined here.
"""
# When credentials are given, backups_ssh_path will be mounted. In the given
# folder, repositories will be created in subfolders with random uuids.
backups_ssh_path = None
# provide backups_ssh_path and either a password or a keyfile for ssh tests
backups_ssh_password = None
backups_ssh_keyfile = None
backups_ssh_repo_uuid = 'plinth_test_sshfs' # will be mounted to /media/<uuid>