From 37aa2f99928da6efc8eaa9bfe8722ee796f5a41f Mon Sep 17 00:00:00 2001 From: Michael Pimmer Date: Fri, 30 Nov 2018 00:10:57 +0000 Subject: [PATCH] Backups, remote backups: fix unittests Reviewed-by: James Valleroy --- plinth/modules/backups/forms.py | 15 +++- plinth/modules/backups/repository.py | 103 ++++++++++++++++++--------- plinth/modules/backups/views.py | 22 +++--- plinth/tests/config.py | 4 +- plinth/tests/test_backups.py | 94 ++++++++++++------------ 5 files changed, 140 insertions(+), 98 deletions(-) diff --git a/plinth/modules/backups/forms.py b/plinth/modules/backups/forms.py index 5506dce29..6e7b5fa1c 100644 --- a/plinth/modules/backups/forms.py +++ b/plinth/modules/backups/forms.py @@ -26,7 +26,7 @@ from django.utils.translation import ugettext, ugettext_lazy as _ from plinth.utils import format_lazy from plinth import cfg -from . import api +from . import api, network_storage, ROOT_REPOSITORY_NAME def _get_app_choices(apps): @@ -43,7 +43,17 @@ def _get_app_choices(apps): return choices +def _get_repository_choices(): + """Return the list of available repositories.""" + choices = [('root', ROOT_REPOSITORY_NAME)] + storages = network_storage.get_storages() + for storage in storages: + choices += [(storage['uuid'], storage['path'])] + return choices + + class CreateArchiveForm(forms.Form): + repository = forms.ChoiceField() name = forms.CharField( label=_('Archive name'), strip=True, help_text=_('Name for new backup archive.'), validators=[ @@ -60,6 +70,7 @@ class CreateArchiveForm(forms.Form): apps = api.get_all_apps_for_backup() self.fields['selected_apps'].choices = _get_app_choices(apps) self.fields['selected_apps'].initial = [app.name for app in apps] + self.fields['repository'].choices = _get_repository_choices() class RestoreForm(forms.Form): @@ -115,7 +126,7 @@ class AddRepositoryForm(forms.Form): widget=forms.PasswordInput(), required=False ) - store_passwords = forms.BooleanField( + store_credentials = forms.BooleanField( label=_('Store passwords on FreedomBox'), help_text=format_lazy( _('Store the passwords on your {box_name}.' diff --git a/plinth/modules/backups/repository.py b/plinth/modules/backups/repository.py index db0a8df27..a09877e70 100644 --- a/plinth/modules/backups/repository.py +++ b/plinth/modules/backups/repository.py @@ -33,6 +33,7 @@ from .errors import BorgError, BorgRepositoryDoesNotExistError logger = logging.getLogger(__name__) SSHFS_MOUNTPOINT = '/media/' +SUPPORTED_BORG_ENCRYPTION = ['none', 'repokey'] # known errors that come up when remotely accessing a borg repository # 'errors' are error strings to look for in the stacktrace. KNOWN_ERRORS = [{ @@ -60,8 +61,9 @@ class BorgRepository(object): name = ROOT_REPOSITORY_NAME is_mounted = True - def __init__(self, path): + def __init__(self, path, credentials={}): self.path = path + self.credentials = credentials def get_info(self): output = self._run(['info', '--path', self.path]) @@ -73,12 +75,19 @@ class BorgRepository(object): def get_view_content(self): """Get archives with additional information as needed by the view""" - return { + repository = { 'name': self.name, - 'mounted': self.is_mounted, - 'archives': self.list_archives(), 'type': self.storage_type, + 'error': '' } + try: + repository['archives'] = self.list_archives() + repository['mounted'] = self.is_mounted + error = '' + except (BorgError, ActionError) as err: + error = str(err) + repository['error'] = error + return repository def delete_archive(self, archive_name): archive_path = self.get_archive_path(archive_name) @@ -92,7 +101,11 @@ class BorgRepository(object): api.backup_apps(_backup_handler, app_names=app_names, label=archive_name), + def create_repository(self): + self._run(['init', '--path', self.path, '--encryption', 'none']) + def download_archive(self, name): + # TODO pass def get_archive(self, name): @@ -110,13 +123,27 @@ class BorgRepository(object): return output.splitlines() def restore_archive(self): + # TODO pass def get_archive_path(self, archive_name): - return "::".join(self.path, archive_name) + return "::".join([self.path, archive_name]) + + def _get_arguments(self, arguments, credentials): + kwargs = {} + if 'encryption_passphrase' in credentials and \ + credentials['encryption_passphrase']: + arguments += ['--encryption-passphrase', + credentials['encryption_passphrase']] + return (arguments, kwargs) def _run(self, arguments, superuser=True): - """Run a backups action script command.""" + """Run a backups action script command. + + Automatically passes on credentials via self._get_arguments to the + backup script via environment variables or input, except if you + set use_credentials to False. + """ try: if superuser: return actions.superuser_run(self.command, arguments) @@ -137,6 +164,7 @@ class BorgRepository(object): class SshBorgRepository(BorgRepository): + """Borg repository that is accessed via SSH""" KNOWN_CREDENTIALS = ['ssh_keyfile', 'ssh_password', 'encryption_passphrase'] storage_type = 'ssh' @@ -176,30 +204,35 @@ class SshBorgRepository(BorgRepository): def _load_from_kvstore(self): storage = network_storage.get(self.uuid) - self.credentials = storage['credentials'] + try: + self.credentials = storage['credentials'] + except KeyError: + self.credentials = {} self.path = storage['path'] - def _get_network_storage_format(self): + def _get_network_storage_format(self, store_credentials): storage = { 'path': self.path, - 'credentials': self.credentials, 'storage_type': self.storage_type, 'added_by_module': 'backups' } if hasattr(self, 'uuid'): storage['uuid'] = self.uuid + if store_credentials: + storage['credentials'] = self.credentials return storage def create_archive(self, app_names, archive_name): - api.backup_apps(_backup_handler, app_names=app_names, - label=archive_name, credentials=self.credentials) + raise NotImplementedError + # api.backup_apps(_backup_handler, app_names=app_names, def create_repository(self, encryption): - cmd = ['init', '--path', self.path, '--encryption', encryption] - self._run(cmd) + if encryption not in SUPPORTED_BORG_ENCRYPTION: + raise ValueError('Unsupported encryption: %s' % encryption) + self._run(['init', '--path', self.path, '--encryption', encryption]) - def save(self): - storage = self._get_network_storage_format() + def save(self, store_credentials=True): + storage = self._get_network_storage_format(store_credentials) self.uuid = network_storage.update_or_add(storage) def mount(self): @@ -207,8 +240,7 @@ class SshBorgRepository(BorgRepository): self._run(cmd) def umount(self): - self._run(['umount', '--mountpoint', self.mountpoint], - use_credentials=False) + self._run(['umount', '--mountpoint', self.mountpoint]) def remove(self): """Remove a repository from the kvstore and delete its mountpoint""" @@ -224,30 +256,31 @@ class SshBorgRepository(BorgRepository): except Exception as err: logger.error(err) + def _get_arguments(self, arguments, credentials): + arguments, kwargs = super()._get_arguments(arguments, credentials) + if 'ssh_password' in credentials and credentials['ssh_password']: + kwargs['input'] = credentials['ssh_password'].encode() + if 'ssh_keyfile' in credentials and credentials['ssh_keyfile']: + arguments += ['--ssh-keyfile', credentials['ssh_keyfile']] + return (arguments, kwargs) + def _run(self, arguments, superuser=True, use_credentials=True): """Run a backups action script command. - This automatically passes on self.credentials to the backups script - via environment variables or input, except if you set use_credentials - to False. + Automatically passes on credentials via self._get_arguments to the + backup script via environment variables or input, except if you + set use_credentials to False. """ - for key in self.credentials.keys(): - if key not in self.KNOWN_CREDENTIALS: - raise ValueError('Unknown credentials: %s' % key) - kwargs = {} if use_credentials: - if 'ssh_password' in self.credentials and \ - self.credentials['ssh_password'] is not None: - kwargs['input'] = self.credentials['ssh_password'].encode() - if 'ssh_keyfile' in self.credentials and \ - self.credentials['ssh_keyfile'] is not None: - arguments += ['--ssh-keyfile', self.credentials['ssh_keyfile']] - if 'encryption_passphrase' in self.credentials and \ - self.credentials['encryption_passphrase'] is not None: - arguments += ['--encryption-passphrase', - self.credentials['encryption_passphrase']] - + if not self.credentials: + msg = 'Cannot access ssh repo without credentials' + raise BorgError(msg) + for key in self.credentials.keys(): + if key not in self.KNOWN_CREDENTIALS: + raise ValueError('Unknown credentials entry: %s' % key) + arguments, kwargs = self._get_arguments(arguments, + self.credentials) try: if superuser: return actions.superuser_run(self.command, arguments, **kwargs) diff --git a/plinth/modules/backups/views.py b/plinth/modules/backups/views.py index 6ec226a62..b277923db 100644 --- a/plinth/modules/backups/views.py +++ b/plinth/modules/backups/views.py @@ -44,7 +44,7 @@ from plinth.modules import backups, storage from . import api, forms, SESSION_PATH_VARIABLE, ROOT_REPOSITORY from .repository import BorgRepository, SshBorgRepository, get_ssh_repositories from .decorators import delete_tmp_backup_file -from .errors import BorgRepositoryDoesNotExistError +from .errors import BorgError, BorgRepositoryDoesNotExistError logger = logging.getLogger(__name__) @@ -310,14 +310,13 @@ class AddRepositoryView(SuccessMessageMixin, FormView): """Restore files from the archive on valid form submission.""" path = form.cleaned_data['repository'] credentials = {} - if form.cleaned_data['store_passwords']: - encryption_passphrase = form.cleaned_data['encryption_passphrase'] - if encryption_passphrase: - credentials['encryption_passphrase'] = encryption_passphrase - if form.cleaned_data['ssh_password']: - credentials['ssh_password'] = form.cleaned_data['ssh_password'] - # TODO: add ssh_keyfile - # ssh_keyfile = form.cleaned_data['ssh_keyfile'] + encryption_passphrase = form.cleaned_data['encryption_passphrase'] + if encryption_passphrase: + credentials['encryption_passphrase'] = encryption_passphrase + if form.cleaned_data['ssh_password']: + credentials['ssh_password'] = form.cleaned_data['ssh_password'] + # TODO: add ssh_keyfile + # ssh_keyfile = form.cleaned_data['ssh_keyfile'] repository = SshBorgRepository(path=path, credentials=credentials) @@ -325,7 +324,8 @@ class AddRepositoryView(SuccessMessageMixin, FormView): repository.get_info() except BorgRepositoryDoesNotExistError: repository.create_repository(form.cleaned_data['encryption']) - repository.save() + repository.save(store_credentials= + form.cleaned_data['store_credentials']) return super().form_valid(form) @@ -345,6 +345,8 @@ class TestRepositoryView(TemplateView): try: repo_info = repository.get_info() context["message"] = repo_info + except BorgError as err: + context["error"] = str(err) except ActionError as err: context["error"] = str(err) diff --git a/plinth/tests/config.py b/plinth/tests/config.py index 14c2f04a3..3c5489601 100644 --- a/plinth/tests/config.py +++ b/plinth/tests/config.py @@ -25,10 +25,10 @@ the variables defined here. backups_ssh_path = None backups_ssh_password = None backups_ssh_keyfile = None -backups_ssh_mountpoint = '/mnt/plinth_test_sshfs' +backups_ssh_repo_uuid = 'plinth_test_sshfs' # will be mounted to /media/ # Import config_local to override the default variables try: - from config_local.py import * + from .config_local import * except ImportError: pass diff --git a/plinth/tests/test_backups.py b/plinth/tests/test_backups.py index 833fa9b7e..0868b8df8 100644 --- a/plinth/tests/test_backups.py +++ b/plinth/tests/test_backups.py @@ -25,17 +25,11 @@ import unittest from plinth import cfg from plinth.modules import backups -from plinth.modules.backups import sshfs +from plinth.modules.backups.repository import BorgRepository, SshBorgRepository from plinth import actions from . import config -try: - from . import config_local as config -except ImportError: - from . import config - - euid = os.geteuid() @@ -64,34 +58,26 @@ class TestBackups(unittest.TestCase): def test_nonexisting_repository(self): nonexisting_dir = os.path.join(self.backup_directory.name, 'does_not_exist') + repository = BorgRepository(nonexisting_dir) with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError): - backups.test_connection(nonexisting_dir) + repository.get_info() @unittest.skipUnless(euid == 0, 'Needs to be root') def test_empty_dir(self): empty_dir = os.path.join(self.backup_directory.name, 'empty_dir') os.mkdir(empty_dir) + repository = BorgRepository(empty_dir) with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError): - backups.test_connection(empty_dir) + repository.get_info() @unittest.skipUnless(euid == 0, 'Needs to be root') def test_create_unencrypted_repository(self): repo_path = os.path.join(self.backup_directory.name, 'borgbackup') - backups.create_repository(repo_path, 'none') - info = backups.get_info(repo_path) + repository = BorgRepository(repo_path) + repository.create_repository() + info = repository.get_info() assert 'encryption' in info - @unittest.skipUnless(euid == 0, 'Needs to be root') - def test_create_encrypted_repository(self): - repo_path = os.path.join(self.backup_directory.name, - 'borgbackup_encrypted') - # 'borg init' creates missing folders automatically - access_params = {'encryption_passphrase': '12345'} - backups.create_repository(repo_path, 'repokey', - access_params=access_params) - assert backups.get_info(repo_path, access_params) - assert backups.test_connection(repo_path, access_params) - @unittest.skipUnless(euid == 0, 'Needs to be root') def test_create_and_delete_archive(self): """ @@ -104,48 +90,58 @@ class TestBackups(unittest.TestCase): archive_name = 'first_archive' repo_path = os.path.join(self.backup_directory.name, repo_name) - backups.create_repository(repo_path, 'none') + repository = BorgRepository(repo_path) + repository.create_repository() archive_path = "::".join([repo_path, archive_name]) actions.superuser_run( 'backups', ['create-archive', '--path', archive_path, '--paths', self.data_directory]) - archive = backups.list_archives(repo_path)[0] + archive = repository.list_archives()[0] assert archive['name'] == archive_name - backups.delete_archive(archive_path) - content = backups.list_archives(repo_path) + repository.delete_archive(archive_name) + content = repository.list_archives() assert len(content) == 0 - @unittest.skipUnless(euid == 0, 'Needs to be root') - def test_is_mounted(self): - assert not sshfs.is_mounted(self.action_directory.name) - assert sshfs.is_mounted('/') - - @unittest.skipUnless(euid == 0, 'Needs to be root') - def test_mount(self): + @unittest.skipUnless(euid == 0 and config.backups_ssh_path, + 'Needs to be root and ssh credentials provided') + def test_ssh_mount(self): """Test (un)mounting if credentials for a remote location are given""" - import ipdb; ipdb.set_trace() - if config.backups_ssh_path: - access_params = self.get_remote_access_params() - if not access_params: - return - mountpoint = config.backups_ssh_mountpoint - ssh_path = config.backups_ssh_path + credentials = self.get_credentials() + if not credentials: + return + ssh_path = config.backups_ssh_path - sshfs.mount(ssh_path, mountpoint, access_params) - assert sshfs.is_mounted(mountpoint) - sshfs.umount(mountpoint) - assert not sshfs.is_mounted(mountpoint) + ssh_repo = SshBorgRepository(uuid='plinth_test_sshfs', + path=ssh_path, + credentials=credentials) + ssh_repo.mount() + assert ssh_repo.is_mounted + ssh_repo.umount() + assert not ssh_repo.is_mounted - def get_remote_access_params(self): + @unittest.skipUnless(euid == 0 and config.backups_ssh_path, + 'Needs to be root and ssh credentials provided') + def test_ssh_create_encrypted_repository(self): + credentials = self.get_credentials() + encrypted_repo = os.path.join(self.backup_directory.name, + 'borgbackup_encrypted') + credentials['encryption_passphrase'] = '12345' + repository = SshBorgRepository(path=encrypted_repo, + credentials=credentials) + # 'borg init' creates missing folders automatically + repository.create_repository(encryption='repokey') + assert repository.get_info() + + def get_credentials(self): """ Get access params for a remote location. Return an empty dict if no valid access params are found. """ - access_params = {} + credentials = {} if config.backups_ssh_password: - access_params['ssh_password'] = config.backups_ssh_password + credentials['ssh_password'] = config.backups_ssh_password elif config.backups_ssh_keyfile: - access_params['ssh_keyfile'] = config.backups_ssh_keyfile - return access_params + credentials['ssh_keyfile'] = config.backups_ssh_keyfile + return credentials