From 2d832ace3669b0275ff8fe9ba52eb65a03e3f839 Mon Sep 17 00:00:00 2001 From: Sunil Mohan Adapa Date: Wed, 1 May 2019 16:04:43 -0700 Subject: [PATCH] backups: Convert tests to pytest style Signed-off-by: Sunil Mohan Adapa Reviewed-by: Joseph Nuthalapati --- plinth/modules/backups/tests/test_api.py | 43 ++- plinth/modules/backups/tests/test_backups.py | 331 +++++++++--------- .../backups/tests/test_network_storage.py | 95 ++--- 3 files changed, 231 insertions(+), 238 deletions(-) diff --git a/plinth/modules/backups/tests/test_api.py b/plinth/modules/backups/tests/test_api.py index f19205a47..222d1c714 100644 --- a/plinth/modules/backups/tests/test_api.py +++ b/plinth/modules/backups/tests/test_api.py @@ -18,15 +18,14 @@ Tests for backups module API. """ -import unittest from unittest.mock import MagicMock, call, patch +import pytest from django.core.files.uploadedfile import SimpleUploadedFile from plinth import cfg, module_loader -from .. import api, forms -from .. import ROOT_REPOSITORY +from .. import ROOT_REPOSITORY, api, forms # pylint: disable=protected-access @@ -58,38 +57,34 @@ def _get_backup_app(name): return api.BackupApp(name, MagicMock(backup=_get_test_manifest(name))) -class TestBackupApp(unittest.TestCase): +class TestBackupApp: """Test the BackupApp class.""" - def test_run_hook(self): + @staticmethod + def test_run_hook(): """Test running a hook on an application.""" packet = api.Packet('backup', 'apps', '/', []) hook = 'testhook_pre' app = MagicMock() backup_app = api.BackupApp('app_name', app) backup_app.run_hook(hook, packet) + app.testhook_pre.assert_has_calls([call(packet)]) assert not packet.errors app.testhook_pre.reset_mock() app.testhook_pre.side_effect = Exception() backup_app.run_hook(hook, packet) - self.assertEqual(packet.errors, - [api.BackupError('hook', app, hook=hook)]) + assert packet.errors == [api.BackupError('hook', app, hook=hook)] del app.testhook_pre backup_app.run_hook(hook, packet) -class TestBackupProcesses(unittest.TestCase): +@pytest.mark.usefixtures('load_cfg') +class TestBackupProcesses: """Test cases for backup processes""" - @classmethod - def setUpClass(cls): - """Setup all the test cases.""" - super().setUpClass() - cfg.read() - @staticmethod def test_packet_process_manifests(): """Test that directories/files are collected from manifests.""" @@ -119,8 +114,9 @@ class TestBackupProcesses(unittest.TestCase): api.restore_apps(restore_handler) restore_handler.assert_called_once() + @staticmethod @patch('plinth.module_loader.loaded_modules.items') - def test_get_all_apps_for_backup(self, modules): + def test_get_all_apps_for_backup(modules): """Test listing apps supporting backup and needing backup.""" apps = [ ('a', MagicMock(backup=_get_test_manifest('a'))), @@ -137,7 +133,7 @@ class TestBackupProcesses(unittest.TestCase): api.BackupApp('b', apps[1][1]), api.BackupApp('c', apps[2][1]) ] - self.assertEqual(returned_apps, expected_apps) + assert returned_apps == expected_apps @staticmethod @patch('plinth.module_loader.loaded_modules.items') @@ -164,11 +160,11 @@ class TestBackupProcesses(unittest.TestCase): assert app_a.locked is True assert app_b.locked is True + @staticmethod @patch('plinth.action_utils.webserver_is_enabled') @patch('plinth.action_utils.service_is_running') @patch('plinth.actions.superuser_run') - def test__shutdown_services(self, run, service_is_running, - webserver_is_enabled): + def test__shutdown_services(run, service_is_running, webserver_is_enabled): """Test that services are stopped in correct order.""" apps = [_get_backup_app('a'), _get_backup_app('b')] service_is_running.return_value = True @@ -184,7 +180,7 @@ class TestBackupProcesses(unittest.TestCase): apps[1].manifest['services'][0]), api.ServiceHandler.create(apps[1], apps[1].manifest['services'][1]) ] - self.assertEqual(state, expected_state) + assert state == expected_state service_is_running.assert_has_calls([call('b'), call('a')]) webserver_is_enabled.assert_has_calls( @@ -242,19 +238,20 @@ class TestBackupProcesses(unittest.TestCase): packet.apps[1].run_hook.assert_has_calls(calls) -class TestBackupModule(unittest.TestCase): +class TestBackupModule: """Tests of the backups django module, like views or forms.""" - def test_file_upload(self): + @staticmethod + def test_file_upload(): # posting a video should fail video_file = SimpleUploadedFile("video.mp4", b"file_content", content_type="video/mp4") form = forms.UploadForm({}, {'file': video_file}) - self.assertFalse(form.is_valid()) + assert not form.is_valid() # posting an archive file should work archive_file = SimpleUploadedFile("backup.tar.gz", b"file_content", content_type="application/gzip") form = forms.UploadForm({}, {'file': archive_file}) form.is_valid() - self.assertTrue(form.is_valid()) + assert form.is_valid() diff --git a/plinth/modules/backups/tests/test_backups.py b/plinth/modules/backups/tests/test_backups.py index e4b875842..0e6c5048f 100644 --- a/plinth/modules/backups/tests/test_backups.py +++ b/plinth/modules/backups/tests/test_backups.py @@ -20,212 +20,203 @@ Test the backups action script. import json import os -import shutil +import pathlib import subprocess -import tempfile -import unittest import uuid -from plinth import cfg +import pytest + from plinth.modules import backups from plinth.modules.backups.repository import BorgRepository, SshBorgRepository from plinth import actions from plinth.tests import config as test_config -euid = os.geteuid() +pytestmark = pytest.mark.usefixtures('needs_root', 'needs_borg', 'load_cfg') + +# try to access a non-existing url and a URL that exists but does not +# grant access +_dummy_credentials = {'ssh_password': 'invalid_password'} +_repokey_encryption_passphrase = '12345' -def _borg_is_installed(): +@pytest.fixture(name='needs_borg') +def fixture_needs_borg(): """Return whether borg is installed on the system.""" try: subprocess.run(['borg', '--version'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) - return True except FileNotFoundError: - return False + pytest.skip('Needs borg installed') -class TestBackups(unittest.TestCase): - """Test creating, reading and deleting a repository""" - # try to access a non-existing url and a URL that exists but does not - # grant access - nonexisting_repo_url = "user@%s.com.au:~/repo" % str(uuid.uuid1()) - inaccessible_repo_url = "user@heise.de:~/repo" - dummy_credentials = {'ssh_password': 'invalid_password'} - repokey_encryption_passphrase = '12345' +@pytest.fixture(name='needs_ssh_config') +def fixture_needs_ssh_config(): + """Skip test if SSH details is not available in test configuration.""" + if not test_config.backups_ssh_path: + pytest.skip('Needs SSH password provided') - @classmethod - def setUpClass(cls): - """Initial setup for all the classes.""" - cls.action_directory = tempfile.TemporaryDirectory() - cls.backup_directory = tempfile.TemporaryDirectory() - cls.data_directory = os.path.join( - os.path.dirname(os.path.realpath(__file__)), 'backup_data') - cls.actions_dir_factory = cfg.actions_dir - cfg.actions_dir = cls.action_directory.name - actions_dir = os.path.join( - os.path.dirname(__file__), '..', '..', '..', '..', 'actions') - shutil.copy(os.path.join(actions_dir, 'backups'), cfg.actions_dir) - shutil.copy(os.path.join(actions_dir, 'sshfs'), cfg.actions_dir) - @classmethod - def tearDownClass(cls): - """Cleanup after all the tests are completed.""" - cls.action_directory.cleanup() - cls.backup_directory.cleanup() - cfg.actions_dir = cls.actions_dir_factory +@pytest.fixture(name='data_directory') +def fixture_data_directory(): + """Return directory where backup data is stored.""" + return pathlib.Path(__file__).parent / 'backup_data' - @unittest.skipUnless(euid == 0 and _borg_is_installed(), - 'Needs to be root') - def test_nonexisting_repository(self): - nonexisting_dir = os.path.join(self.backup_directory.name, - 'does_not_exist') - repository = BorgRepository(nonexisting_dir) - with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError): - repository.get_info() - @unittest.skipUnless(euid == 0 and _borg_is_installed(), - 'Needs to be root') - def test_empty_dir(self): - empty_dir = os.path.join(self.backup_directory.name, 'empty_dir') - os.mkdir(empty_dir) - repository = BorgRepository(empty_dir) - with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError): - repository.get_info() +@pytest.fixture(name='backup_directory') +def fixture_backup_directory(tmp_path): + """Create and cleanup a backup directory.""" + return tmp_path - @unittest.skipUnless(euid == 0 and _borg_is_installed(), - 'Needs to be root') - def test_create_unencrypted_repository(self): - repo_path = os.path.join(self.backup_directory.name, 'borgbackup') - repository = BorgRepository(repo_path) - repository.create_repository() - info = repository.get_info() - self.assertTrue('encryption' in info) - @unittest.skipUnless(euid == 0 and _borg_is_installed(), - 'Needs to be root') - def test_create_export_delete_archive(self): - """ - - Create a repo - - Create an archive - - Verify archive content - - Delete archive - """ - repo_name = 'test_create_and_delete' - archive_name = 'first_archive' - repo_path = os.path.join(self.backup_directory.name, repo_name) +def test_nonexisting_repository(backup_directory): + """Test that non-existent directory as borg repository throws error.""" + nonexisting_dir = backup_directory / 'does_not_exist' + repository = BorgRepository(str(nonexisting_dir)) + with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError): + repository.get_info() - repository = BorgRepository(repo_path) - repository.create_repository() - archive_path = "::".join([repo_path, archive_name]) - actions.superuser_run('backups', [ - 'create-archive', '--path', archive_path, '--paths', - self.data_directory - ]) - archive = repository.list_archives()[0] - self.assertEquals(archive['name'], archive_name) +def test_empty_dir(backup_directory): + """Test that empty directory as borg repository throws error.""" + empty_dir = backup_directory / 'empty_dir' + empty_dir.mkdir() + repository = BorgRepository(str(empty_dir)) + with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError): + repository.get_info() - repository.delete_archive(archive_name) - content = repository.list_archives() - self.assertEquals(len(content), 0) - @unittest.skipUnless(euid == 0 and _borg_is_installed() - and test_config.backups_ssh_path, - 'Needs to be root and ssh password provided') - def test_remote_backup_actions(self): - """ - Test creating an encrypted remote repository using borg directly. +def test_create_unencrypted_repository(backup_directory): + """Test creating an unencrypted repository.""" + repo_path = backup_directory / 'borgbackup' + repository = BorgRepository(str(repo_path)) + repository.create_repository() + info = repository.get_info() + assert 'encryption' in info - This relies on borgbackups being installed on the remote machine. - """ - credentials = self.get_credentials(add_encryption_passphrase=True) - repo_path = os.path.join(test_config.backups_ssh_path, - str(uuid.uuid1())) - arguments = ['init', '--path', repo_path, '--encryption', 'repokey'] - arguments, kwargs = self.append_borg_arguments(arguments, credentials) - actions.superuser_run('backups', arguments, **kwargs) - arguments = ['info', '--path', repo_path] - arguments, kwargs = self.append_borg_arguments(arguments, credentials) - info = actions.superuser_run('backups', arguments, **kwargs) - info = json.loads(info) - self.assertEquals(info['encryption']['mode'], 'repokey') +def test_create_export_delete_archive(data_directory, backup_directory): + """ + - Create a repo + - Create an archive + - Verify archive content + - Delete archive + """ + repo_name = 'test_create_and_delete' + archive_name = 'first_archive' + repo_path = backup_directory / repo_name - def append_borg_arguments(self, arguments, credentials): - """Append run arguments for running borg directly""" - kwargs = {} - passphrase = credentials.get('encryption_passphrase', None) - if passphrase: - arguments += ['--encryption-passphrase', passphrase] - if 'ssh_password' in credentials and credentials['ssh_password']: - kwargs['input'] = credentials['ssh_password'].encode() - if 'ssh_keyfile' in credentials and credentials['ssh_keyfile']: - arguments += ['--ssh-keyfile', credentials['ssh_keyfile']] - return (arguments, kwargs) + repository = BorgRepository(str(repo_path)) + repository.create_repository() + archive_path = "::".join([str(repo_path), archive_name]) + actions.superuser_run('backups', [ + 'create-archive', '--path', archive_path, '--paths', + str(data_directory) + ]) - @unittest.skipUnless(euid == 0 and _borg_is_installed() - and test_config.backups_ssh_path, - 'Needs to be root and ssh password provided') - def test_sshfs_mount_password(self): - """Test (un)mounting if password for a remote location is given""" - credentials = self.get_credentials() - ssh_path = test_config.backups_ssh_path + archive = repository.list_archives()[0] + assert archive['name'] == archive_name - repository = SshBorgRepository(path=ssh_path, credentials=credentials, - automount=False) - repository.mount() - self.assertTrue(repository.is_mounted) - repository.umount() - self.assertFalse(repository.is_mounted) + repository.delete_archive(archive_name) + content = repository.list_archives() + assert not content - @unittest.skipUnless(euid == 0 and _borg_is_installed() - and test_config.backups_ssh_keyfile, - 'Needs to be root and ssh keyfile provided') - def test_sshfs_mount_keyfile(self): - """Test (un)mounting if keyfile for a remote location is given""" - credentials = self.get_credentials() - ssh_path = test_config.backups_ssh_path - repository = SshBorgRepository(path=ssh_path, credentials=credentials, - automount=False) - repository.mount() - self.assertTrue(repository.is_mounted) - repository.umount() - self.assertFalse(repository.is_mounted) +@pytest.mark.usefixtures('needs_ssh_config') +def test_remote_backup_actions(): + """ + Test creating an encrypted remote repository using borg directly. - @unittest.skipUnless(euid == 0 and _borg_is_installed(), - 'Needs to be root') - def test_access_nonexisting_url(self): - repository = SshBorgRepository(path=self.nonexisting_repo_url, - credentials=self.dummy_credentials, - automount=False) - with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError): - repository.get_info() + This relies on borgbackups being installed on the remote machine. + """ + credentials = _get_credentials(add_encryption_passphrase=True) + repo_path = os.path.join(test_config.backups_ssh_path, str(uuid.uuid1())) + arguments = ['init', '--path', repo_path, '--encryption', 'repokey'] + arguments, kwargs = _append_borg_arguments(arguments, credentials) + actions.superuser_run('backups', arguments, **kwargs) - @unittest.skipUnless(euid == 0 and _borg_is_installed(), - 'Needs to be root') - def test_inaccessible_repo_url(self): - """Test accessing an existing URL with wrong credentials""" - repository = SshBorgRepository(path=self.inaccessible_repo_url, - credentials=self.dummy_credentials, - automount=False) - with self.assertRaises(backups.errors.BorgError): - repository.get_info() + arguments = ['info', '--path', repo_path] + arguments, kwargs = _append_borg_arguments(arguments, credentials) + info = actions.superuser_run('backups', arguments, **kwargs) + info = json.loads(info) + assert info['encryption']['mode'] == 'repokey' - def get_credentials(self, add_encryption_passphrase=False): - """ - Get access params for a remote location. - Return an empty dict if no valid access params are found. - """ - credentials = {} - if test_config.backups_ssh_password: - credentials['ssh_password'] = test_config.backups_ssh_password - elif test_config.backups_ssh_keyfile: - credentials['ssh_keyfile'] = test_config.backups_ssh_keyfile - if add_encryption_passphrase: - credentials['encryption_passphrase'] = \ - self.repokey_encryption_passphrase - return credentials +def _append_borg_arguments(arguments, credentials): + """Append run arguments for running borg directly""" + kwargs = {} + passphrase = credentials.get('encryption_passphrase', None) + if passphrase: + arguments += ['--encryption-passphrase', passphrase] + + if 'ssh_password' in credentials and credentials['ssh_password']: + kwargs['input'] = credentials['ssh_password'].encode() + + if 'ssh_keyfile' in credentials and credentials['ssh_keyfile']: + arguments += ['--ssh-keyfile', credentials['ssh_keyfile']] + + return (arguments, kwargs) + + +@pytest.mark.usefixtures('needs_ssh_config') +def test_sshfs_mount_password(): + """Test (un)mounting if password for a remote location is given""" + credentials = _get_credentials() + ssh_path = test_config.backups_ssh_path + + repository = SshBorgRepository(path=ssh_path, credentials=credentials, + automount=False) + repository.mount() + assert repository.is_mounted + repository.umount() + assert not repository.is_mounted + + +@pytest.mark.usefixtures('needs_ssh_config') +def test_sshfs_mount_keyfile(): + """Test (un)mounting if keyfile for a remote location is given""" + credentials = _get_credentials() + ssh_path = test_config.backups_ssh_path + + repository = SshBorgRepository(path=ssh_path, credentials=credentials, + automount=False) + repository.mount() + assert repository.is_mounted + repository.umount() + assert not repository.is_mounted + + +def test_access_nonexisting_url(): + """Test accessing a non-existent URL.""" + repo_url = "user@%s.com.au:~/repo" % str(uuid.uuid1()) + repository = SshBorgRepository( + path=repo_url, credentials=_dummy_credentials, automount=False) + with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError): + repository.get_info() + + +def test_inaccessible_repo_url(): + """Test accessing an existing URL with wrong credentials.""" + repo_url = 'user@heise.de:~/repo' + repository = SshBorgRepository( + path=repo_url, credentials=_dummy_credentials, automount=False) + with pytest.raises(backups.errors.BorgError): + repository.get_info() + + +def _get_credentials(add_encryption_passphrase=False): + """ + Get access params for a remote location. + Return an empty dict if no valid access params are found. + """ + credentials = {} + if test_config.backups_ssh_password: + credentials['ssh_password'] = test_config.backups_ssh_password + elif test_config.backups_ssh_keyfile: + credentials['ssh_keyfile'] = test_config.backups_ssh_keyfile + + if add_encryption_passphrase: + credentials['encryption_passphrase'] = \ + _repokey_encryption_passphrase + + return credentials diff --git a/plinth/modules/backups/tests/test_network_storage.py b/plinth/modules/backups/tests/test_network_storage.py index be604414b..1e6e779f6 100644 --- a/plinth/modules/backups/tests/test_network_storage.py +++ b/plinth/modules/backups/tests/test_network_storage.py @@ -18,59 +18,64 @@ Test network storage. """ -from django.test import TestCase +import pytest from plinth.modules.backups import network_storage +pytestmark = pytest.mark.django_db -class TestNetworkStorage(TestCase): - """Test handling network storage in kvstore""" - storages = [{ - 'path': 'test@nonexistent.org:~/', - 'storage_type': 'ssh', - 'added_by_module': 'test' - }, { +_storages = [{ + 'path': 'test@nonexistent.org:~/', + 'storage_type': 'ssh', + 'added_by_module': 'test' +}, { + 'path': 'test@nonexistent.org:~/tmp/repo/', + 'storage_type': 'ssh', + 'added_by_module': 'test' +}] + + +def test_add(): + """Add a storage item""" + storage = _storages[0] + uuid = network_storage.update_or_add(storage) + _storage = network_storage.get(uuid) + assert _storage['path'] == storage['path'] + + +def test_add_invalid(): + """Add a storage item""" + storage_with_missing_type = { 'path': 'test@nonexistent.org:~/tmp/repo/', - 'storage_type': 'ssh', 'added_by_module': 'test' - }] + } + with pytest.raises(ValueError): + network_storage.update_or_add(storage_with_missing_type) - def test_add(self): - """Add a storage item""" - storage = self.storages[0] + +def test_remove(): + """Add and remove storage items""" + storage = _storages[0] + uuid = None + for storage in _storages: uuid = network_storage.update_or_add(storage) - _storage = network_storage.get(uuid) - self.assertEqual(_storage['path'], storage['path']) - def test_add_invalid(self): - """Add a storage item""" - storage_with_missing_type = { - 'path': 'test@nonexistent.org:~/tmp/repo/', - 'added_by_module': 'test' - } - with self.assertRaises(ValueError): - network_storage.update_or_add(storage_with_missing_type) + storages = network_storage.get_storages() + assert len(storages) == 2 + network_storage.delete(uuid) + storages = network_storage.get_storages() + assert len(storages) == 1 - def test_remove(self): - """Add and remove storage items""" - storage = self.storages[0] - uuid = None - for storage in self.storages: - uuid = network_storage.update_or_add(storage) - storages = network_storage.get_storages() - self.assertEqual(len(storages), 2) - network_storage.delete(uuid) - storages = network_storage.get_storages() - self.assertEqual(len(storages), 1) - def test_update(self): - """Update existing storage items""" - uuid = None - for storage in self.storages: - uuid = network_storage.update_or_add(storage) - storage = network_storage.get(uuid) - new_path = 'test@nonexistent.org:~/tmp/repo_new/' - storage['path'] = new_path - network_storage.update_or_add(storage) - _storage = network_storage.get(uuid) - self.assertEquals(_storage['path'], new_path) +def test_update(): + """Update existing storage items""" + uuid = None + for storage in _storages: + uuid = network_storage.update_or_add(storage) + + storage = network_storage.get(uuid) + new_path = 'test@nonexistent.org:~/tmp/repo_new/' + storage['path'] = new_path + network_storage.update_or_add(storage) + _storage = network_storage.get(uuid) + assert _storage['path'] == new_path