backups: Convert tests to pytest style

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: Joseph Nuthalapati <njoseph@thoughtworks.com>
This commit is contained in:
Sunil Mohan Adapa 2019-05-01 16:04:43 -07:00 committed by Joseph Nuthalapati
parent 9040b26b4e
commit 2d832ace36
No known key found for this signature in database
GPG Key ID: 5398F00A2FA43C35
3 changed files with 231 additions and 238 deletions

View File

@ -18,15 +18,14 @@
Tests for backups module API.
"""
import unittest
from unittest.mock import MagicMock, call, patch
import pytest
from django.core.files.uploadedfile import SimpleUploadedFile
from plinth import cfg, module_loader
from .. import api, forms
from .. import ROOT_REPOSITORY
from .. import ROOT_REPOSITORY, api, forms
# pylint: disable=protected-access
@ -58,38 +57,34 @@ def _get_backup_app(name):
return api.BackupApp(name, MagicMock(backup=_get_test_manifest(name)))
class TestBackupApp(unittest.TestCase):
class TestBackupApp:
"""Test the BackupApp class."""
def test_run_hook(self):
@staticmethod
def test_run_hook():
"""Test running a hook on an application."""
packet = api.Packet('backup', 'apps', '/', [])
hook = 'testhook_pre'
app = MagicMock()
backup_app = api.BackupApp('app_name', app)
backup_app.run_hook(hook, packet)
app.testhook_pre.assert_has_calls([call(packet)])
assert not packet.errors
app.testhook_pre.reset_mock()
app.testhook_pre.side_effect = Exception()
backup_app.run_hook(hook, packet)
self.assertEqual(packet.errors,
[api.BackupError('hook', app, hook=hook)])
assert packet.errors == [api.BackupError('hook', app, hook=hook)]
del app.testhook_pre
backup_app.run_hook(hook, packet)
class TestBackupProcesses(unittest.TestCase):
@pytest.mark.usefixtures('load_cfg')
class TestBackupProcesses:
"""Test cases for backup processes"""
@classmethod
def setUpClass(cls):
"""Setup all the test cases."""
super().setUpClass()
cfg.read()
@staticmethod
def test_packet_process_manifests():
"""Test that directories/files are collected from manifests."""
@ -119,8 +114,9 @@ class TestBackupProcesses(unittest.TestCase):
api.restore_apps(restore_handler)
restore_handler.assert_called_once()
@staticmethod
@patch('plinth.module_loader.loaded_modules.items')
def test_get_all_apps_for_backup(self, modules):
def test_get_all_apps_for_backup(modules):
"""Test listing apps supporting backup and needing backup."""
apps = [
('a', MagicMock(backup=_get_test_manifest('a'))),
@ -137,7 +133,7 @@ class TestBackupProcesses(unittest.TestCase):
api.BackupApp('b', apps[1][1]),
api.BackupApp('c', apps[2][1])
]
self.assertEqual(returned_apps, expected_apps)
assert returned_apps == expected_apps
@staticmethod
@patch('plinth.module_loader.loaded_modules.items')
@ -164,11 +160,11 @@ class TestBackupProcesses(unittest.TestCase):
assert app_a.locked is True
assert app_b.locked is True
@staticmethod
@patch('plinth.action_utils.webserver_is_enabled')
@patch('plinth.action_utils.service_is_running')
@patch('plinth.actions.superuser_run')
def test__shutdown_services(self, run, service_is_running,
webserver_is_enabled):
def test__shutdown_services(run, service_is_running, webserver_is_enabled):
"""Test that services are stopped in correct order."""
apps = [_get_backup_app('a'), _get_backup_app('b')]
service_is_running.return_value = True
@ -184,7 +180,7 @@ class TestBackupProcesses(unittest.TestCase):
apps[1].manifest['services'][0]),
api.ServiceHandler.create(apps[1], apps[1].manifest['services'][1])
]
self.assertEqual(state, expected_state)
assert state == expected_state
service_is_running.assert_has_calls([call('b'), call('a')])
webserver_is_enabled.assert_has_calls(
@ -242,19 +238,20 @@ class TestBackupProcesses(unittest.TestCase):
packet.apps[1].run_hook.assert_has_calls(calls)
class TestBackupModule(unittest.TestCase):
class TestBackupModule:
"""Tests of the backups django module, like views or forms."""
def test_file_upload(self):
@staticmethod
def test_file_upload():
# posting a video should fail
video_file = SimpleUploadedFile("video.mp4", b"file_content",
content_type="video/mp4")
form = forms.UploadForm({}, {'file': video_file})
self.assertFalse(form.is_valid())
assert not form.is_valid()
# posting an archive file should work
archive_file = SimpleUploadedFile("backup.tar.gz", b"file_content",
content_type="application/gzip")
form = forms.UploadForm({}, {'file': archive_file})
form.is_valid()
self.assertTrue(form.is_valid())
assert form.is_valid()

View File

@ -20,212 +20,203 @@ Test the backups action script.
import json
import os
import shutil
import pathlib
import subprocess
import tempfile
import unittest
import uuid
from plinth import cfg
import pytest
from plinth.modules import backups
from plinth.modules.backups.repository import BorgRepository, SshBorgRepository
from plinth import actions
from plinth.tests import config as test_config
euid = os.geteuid()
pytestmark = pytest.mark.usefixtures('needs_root', 'needs_borg', 'load_cfg')
# try to access a non-existing url and a URL that exists but does not
# grant access
_dummy_credentials = {'ssh_password': 'invalid_password'}
_repokey_encryption_passphrase = '12345'
def _borg_is_installed():
@pytest.fixture(name='needs_borg')
def fixture_needs_borg():
"""Return whether borg is installed on the system."""
try:
subprocess.run(['borg', '--version'], stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, check=True)
return True
except FileNotFoundError:
return False
pytest.skip('Needs borg installed')
class TestBackups(unittest.TestCase):
"""Test creating, reading and deleting a repository"""
# try to access a non-existing url and a URL that exists but does not
# grant access
nonexisting_repo_url = "user@%s.com.au:~/repo" % str(uuid.uuid1())
inaccessible_repo_url = "user@heise.de:~/repo"
dummy_credentials = {'ssh_password': 'invalid_password'}
repokey_encryption_passphrase = '12345'
@pytest.fixture(name='needs_ssh_config')
def fixture_needs_ssh_config():
"""Skip test if SSH details is not available in test configuration."""
if not test_config.backups_ssh_path:
pytest.skip('Needs SSH password provided')
@classmethod
def setUpClass(cls):
"""Initial setup for all the classes."""
cls.action_directory = tempfile.TemporaryDirectory()
cls.backup_directory = tempfile.TemporaryDirectory()
cls.data_directory = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'backup_data')
cls.actions_dir_factory = cfg.actions_dir
cfg.actions_dir = cls.action_directory.name
actions_dir = os.path.join(
os.path.dirname(__file__), '..', '..', '..', '..', 'actions')
shutil.copy(os.path.join(actions_dir, 'backups'), cfg.actions_dir)
shutil.copy(os.path.join(actions_dir, 'sshfs'), cfg.actions_dir)
@classmethod
def tearDownClass(cls):
"""Cleanup after all the tests are completed."""
cls.action_directory.cleanup()
cls.backup_directory.cleanup()
cfg.actions_dir = cls.actions_dir_factory
@pytest.fixture(name='data_directory')
def fixture_data_directory():
"""Return directory where backup data is stored."""
return pathlib.Path(__file__).parent / 'backup_data'
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_nonexisting_repository(self):
nonexisting_dir = os.path.join(self.backup_directory.name,
'does_not_exist')
repository = BorgRepository(nonexisting_dir)
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_empty_dir(self):
empty_dir = os.path.join(self.backup_directory.name, 'empty_dir')
os.mkdir(empty_dir)
repository = BorgRepository(empty_dir)
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
@pytest.fixture(name='backup_directory')
def fixture_backup_directory(tmp_path):
"""Create and cleanup a backup directory."""
return tmp_path
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_create_unencrypted_repository(self):
repo_path = os.path.join(self.backup_directory.name, 'borgbackup')
repository = BorgRepository(repo_path)
repository.create_repository()
info = repository.get_info()
self.assertTrue('encryption' in info)
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_create_export_delete_archive(self):
"""
- Create a repo
- Create an archive
- Verify archive content
- Delete archive
"""
repo_name = 'test_create_and_delete'
archive_name = 'first_archive'
repo_path = os.path.join(self.backup_directory.name, repo_name)
def test_nonexisting_repository(backup_directory):
"""Test that non-existent directory as borg repository throws error."""
nonexisting_dir = backup_directory / 'does_not_exist'
repository = BorgRepository(str(nonexisting_dir))
with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
repository = BorgRepository(repo_path)
repository.create_repository()
archive_path = "::".join([repo_path, archive_name])
actions.superuser_run('backups', [
'create-archive', '--path', archive_path, '--paths',
self.data_directory
])
archive = repository.list_archives()[0]
self.assertEquals(archive['name'], archive_name)
def test_empty_dir(backup_directory):
"""Test that empty directory as borg repository throws error."""
empty_dir = backup_directory / 'empty_dir'
empty_dir.mkdir()
repository = BorgRepository(str(empty_dir))
with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
repository.delete_archive(archive_name)
content = repository.list_archives()
self.assertEquals(len(content), 0)
@unittest.skipUnless(euid == 0 and _borg_is_installed()
and test_config.backups_ssh_path,
'Needs to be root and ssh password provided')
def test_remote_backup_actions(self):
"""
Test creating an encrypted remote repository using borg directly.
def test_create_unencrypted_repository(backup_directory):
"""Test creating an unencrypted repository."""
repo_path = backup_directory / 'borgbackup'
repository = BorgRepository(str(repo_path))
repository.create_repository()
info = repository.get_info()
assert 'encryption' in info
This relies on borgbackups being installed on the remote machine.
"""
credentials = self.get_credentials(add_encryption_passphrase=True)
repo_path = os.path.join(test_config.backups_ssh_path,
str(uuid.uuid1()))
arguments = ['init', '--path', repo_path, '--encryption', 'repokey']
arguments, kwargs = self.append_borg_arguments(arguments, credentials)
actions.superuser_run('backups', arguments, **kwargs)
arguments = ['info', '--path', repo_path]
arguments, kwargs = self.append_borg_arguments(arguments, credentials)
info = actions.superuser_run('backups', arguments, **kwargs)
info = json.loads(info)
self.assertEquals(info['encryption']['mode'], 'repokey')
def test_create_export_delete_archive(data_directory, backup_directory):
"""
- Create a repo
- Create an archive
- Verify archive content
- Delete archive
"""
repo_name = 'test_create_and_delete'
archive_name = 'first_archive'
repo_path = backup_directory / repo_name
def append_borg_arguments(self, arguments, credentials):
"""Append run arguments for running borg directly"""
kwargs = {}
passphrase = credentials.get('encryption_passphrase', None)
if passphrase:
arguments += ['--encryption-passphrase', passphrase]
if 'ssh_password' in credentials and credentials['ssh_password']:
kwargs['input'] = credentials['ssh_password'].encode()
if 'ssh_keyfile' in credentials and credentials['ssh_keyfile']:
arguments += ['--ssh-keyfile', credentials['ssh_keyfile']]
return (arguments, kwargs)
repository = BorgRepository(str(repo_path))
repository.create_repository()
archive_path = "::".join([str(repo_path), archive_name])
actions.superuser_run('backups', [
'create-archive', '--path', archive_path, '--paths',
str(data_directory)
])
@unittest.skipUnless(euid == 0 and _borg_is_installed()
and test_config.backups_ssh_path,
'Needs to be root and ssh password provided')
def test_sshfs_mount_password(self):
"""Test (un)mounting if password for a remote location is given"""
credentials = self.get_credentials()
ssh_path = test_config.backups_ssh_path
archive = repository.list_archives()[0]
assert archive['name'] == archive_name
repository = SshBorgRepository(path=ssh_path, credentials=credentials,
automount=False)
repository.mount()
self.assertTrue(repository.is_mounted)
repository.umount()
self.assertFalse(repository.is_mounted)
repository.delete_archive(archive_name)
content = repository.list_archives()
assert not content
@unittest.skipUnless(euid == 0 and _borg_is_installed()
and test_config.backups_ssh_keyfile,
'Needs to be root and ssh keyfile provided')
def test_sshfs_mount_keyfile(self):
"""Test (un)mounting if keyfile for a remote location is given"""
credentials = self.get_credentials()
ssh_path = test_config.backups_ssh_path
repository = SshBorgRepository(path=ssh_path, credentials=credentials,
automount=False)
repository.mount()
self.assertTrue(repository.is_mounted)
repository.umount()
self.assertFalse(repository.is_mounted)
@pytest.mark.usefixtures('needs_ssh_config')
def test_remote_backup_actions():
"""
Test creating an encrypted remote repository using borg directly.
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_access_nonexisting_url(self):
repository = SshBorgRepository(path=self.nonexisting_repo_url,
credentials=self.dummy_credentials,
automount=False)
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
This relies on borgbackups being installed on the remote machine.
"""
credentials = _get_credentials(add_encryption_passphrase=True)
repo_path = os.path.join(test_config.backups_ssh_path, str(uuid.uuid1()))
arguments = ['init', '--path', repo_path, '--encryption', 'repokey']
arguments, kwargs = _append_borg_arguments(arguments, credentials)
actions.superuser_run('backups', arguments, **kwargs)
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_inaccessible_repo_url(self):
"""Test accessing an existing URL with wrong credentials"""
repository = SshBorgRepository(path=self.inaccessible_repo_url,
credentials=self.dummy_credentials,
automount=False)
with self.assertRaises(backups.errors.BorgError):
repository.get_info()
arguments = ['info', '--path', repo_path]
arguments, kwargs = _append_borg_arguments(arguments, credentials)
info = actions.superuser_run('backups', arguments, **kwargs)
info = json.loads(info)
assert info['encryption']['mode'] == 'repokey'
def get_credentials(self, add_encryption_passphrase=False):
"""
Get access params for a remote location.
Return an empty dict if no valid access params are found.
"""
credentials = {}
if test_config.backups_ssh_password:
credentials['ssh_password'] = test_config.backups_ssh_password
elif test_config.backups_ssh_keyfile:
credentials['ssh_keyfile'] = test_config.backups_ssh_keyfile
if add_encryption_passphrase:
credentials['encryption_passphrase'] = \
self.repokey_encryption_passphrase
return credentials
def _append_borg_arguments(arguments, credentials):
"""Append run arguments for running borg directly"""
kwargs = {}
passphrase = credentials.get('encryption_passphrase', None)
if passphrase:
arguments += ['--encryption-passphrase', passphrase]
if 'ssh_password' in credentials and credentials['ssh_password']:
kwargs['input'] = credentials['ssh_password'].encode()
if 'ssh_keyfile' in credentials and credentials['ssh_keyfile']:
arguments += ['--ssh-keyfile', credentials['ssh_keyfile']]
return (arguments, kwargs)
@pytest.mark.usefixtures('needs_ssh_config')
def test_sshfs_mount_password():
"""Test (un)mounting if password for a remote location is given"""
credentials = _get_credentials()
ssh_path = test_config.backups_ssh_path
repository = SshBorgRepository(path=ssh_path, credentials=credentials,
automount=False)
repository.mount()
assert repository.is_mounted
repository.umount()
assert not repository.is_mounted
@pytest.mark.usefixtures('needs_ssh_config')
def test_sshfs_mount_keyfile():
"""Test (un)mounting if keyfile for a remote location is given"""
credentials = _get_credentials()
ssh_path = test_config.backups_ssh_path
repository = SshBorgRepository(path=ssh_path, credentials=credentials,
automount=False)
repository.mount()
assert repository.is_mounted
repository.umount()
assert not repository.is_mounted
def test_access_nonexisting_url():
"""Test accessing a non-existent URL."""
repo_url = "user@%s.com.au:~/repo" % str(uuid.uuid1())
repository = SshBorgRepository(
path=repo_url, credentials=_dummy_credentials, automount=False)
with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
def test_inaccessible_repo_url():
"""Test accessing an existing URL with wrong credentials."""
repo_url = 'user@heise.de:~/repo'
repository = SshBorgRepository(
path=repo_url, credentials=_dummy_credentials, automount=False)
with pytest.raises(backups.errors.BorgError):
repository.get_info()
def _get_credentials(add_encryption_passphrase=False):
"""
Get access params for a remote location.
Return an empty dict if no valid access params are found.
"""
credentials = {}
if test_config.backups_ssh_password:
credentials['ssh_password'] = test_config.backups_ssh_password
elif test_config.backups_ssh_keyfile:
credentials['ssh_keyfile'] = test_config.backups_ssh_keyfile
if add_encryption_passphrase:
credentials['encryption_passphrase'] = \
_repokey_encryption_passphrase
return credentials

View File

@ -18,59 +18,64 @@
Test network storage.
"""
from django.test import TestCase
import pytest
from plinth.modules.backups import network_storage
pytestmark = pytest.mark.django_db
class TestNetworkStorage(TestCase):
"""Test handling network storage in kvstore"""
storages = [{
'path': 'test@nonexistent.org:~/',
'storage_type': 'ssh',
'added_by_module': 'test'
}, {
_storages = [{
'path': 'test@nonexistent.org:~/',
'storage_type': 'ssh',
'added_by_module': 'test'
}, {
'path': 'test@nonexistent.org:~/tmp/repo/',
'storage_type': 'ssh',
'added_by_module': 'test'
}]
def test_add():
"""Add a storage item"""
storage = _storages[0]
uuid = network_storage.update_or_add(storage)
_storage = network_storage.get(uuid)
assert _storage['path'] == storage['path']
def test_add_invalid():
"""Add a storage item"""
storage_with_missing_type = {
'path': 'test@nonexistent.org:~/tmp/repo/',
'storage_type': 'ssh',
'added_by_module': 'test'
}]
}
with pytest.raises(ValueError):
network_storage.update_or_add(storage_with_missing_type)
def test_add(self):
"""Add a storage item"""
storage = self.storages[0]
def test_remove():
"""Add and remove storage items"""
storage = _storages[0]
uuid = None
for storage in _storages:
uuid = network_storage.update_or_add(storage)
_storage = network_storage.get(uuid)
self.assertEqual(_storage['path'], storage['path'])
def test_add_invalid(self):
"""Add a storage item"""
storage_with_missing_type = {
'path': 'test@nonexistent.org:~/tmp/repo/',
'added_by_module': 'test'
}
with self.assertRaises(ValueError):
network_storage.update_or_add(storage_with_missing_type)
storages = network_storage.get_storages()
assert len(storages) == 2
network_storage.delete(uuid)
storages = network_storage.get_storages()
assert len(storages) == 1
def test_remove(self):
"""Add and remove storage items"""
storage = self.storages[0]
uuid = None
for storage in self.storages:
uuid = network_storage.update_or_add(storage)
storages = network_storage.get_storages()
self.assertEqual(len(storages), 2)
network_storage.delete(uuid)
storages = network_storage.get_storages()
self.assertEqual(len(storages), 1)
def test_update(self):
"""Update existing storage items"""
uuid = None
for storage in self.storages:
uuid = network_storage.update_or_add(storage)
storage = network_storage.get(uuid)
new_path = 'test@nonexistent.org:~/tmp/repo_new/'
storage['path'] = new_path
network_storage.update_or_add(storage)
_storage = network_storage.get(uuid)
self.assertEquals(_storage['path'], new_path)
def test_update():
"""Update existing storage items"""
uuid = None
for storage in _storages:
uuid = network_storage.update_or_add(storage)
storage = network_storage.get(uuid)
new_path = 'test@nonexistent.org:~/tmp/repo_new/'
storage['path'] = new_path
network_storage.update_or_add(storage)
_storage = network_storage.get(uuid)
assert _storage['path'] == new_path