mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-05-27 10:44:33 +00:00
Backups, remote backups: fix unittests
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
parent
cf6bbd6bba
commit
37aa2f9992
@ -26,7 +26,7 @@ from django.utils.translation import ugettext, ugettext_lazy as _
|
|||||||
from plinth.utils import format_lazy
|
from plinth.utils import format_lazy
|
||||||
from plinth import cfg
|
from plinth import cfg
|
||||||
|
|
||||||
from . import api
|
from . import api, network_storage, ROOT_REPOSITORY_NAME
|
||||||
|
|
||||||
|
|
||||||
def _get_app_choices(apps):
|
def _get_app_choices(apps):
|
||||||
@ -43,7 +43,17 @@ def _get_app_choices(apps):
|
|||||||
return choices
|
return choices
|
||||||
|
|
||||||
|
|
||||||
|
def _get_repository_choices():
|
||||||
|
"""Return the list of available repositories."""
|
||||||
|
choices = [('root', ROOT_REPOSITORY_NAME)]
|
||||||
|
storages = network_storage.get_storages()
|
||||||
|
for storage in storages:
|
||||||
|
choices += [(storage['uuid'], storage['path'])]
|
||||||
|
return choices
|
||||||
|
|
||||||
|
|
||||||
class CreateArchiveForm(forms.Form):
|
class CreateArchiveForm(forms.Form):
|
||||||
|
repository = forms.ChoiceField()
|
||||||
name = forms.CharField(
|
name = forms.CharField(
|
||||||
label=_('Archive name'), strip=True,
|
label=_('Archive name'), strip=True,
|
||||||
help_text=_('Name for new backup archive.'), validators=[
|
help_text=_('Name for new backup archive.'), validators=[
|
||||||
@ -60,6 +70,7 @@ class CreateArchiveForm(forms.Form):
|
|||||||
apps = api.get_all_apps_for_backup()
|
apps = api.get_all_apps_for_backup()
|
||||||
self.fields['selected_apps'].choices = _get_app_choices(apps)
|
self.fields['selected_apps'].choices = _get_app_choices(apps)
|
||||||
self.fields['selected_apps'].initial = [app.name for app in apps]
|
self.fields['selected_apps'].initial = [app.name for app in apps]
|
||||||
|
self.fields['repository'].choices = _get_repository_choices()
|
||||||
|
|
||||||
|
|
||||||
class RestoreForm(forms.Form):
|
class RestoreForm(forms.Form):
|
||||||
@ -115,7 +126,7 @@ class AddRepositoryForm(forms.Form):
|
|||||||
widget=forms.PasswordInput(),
|
widget=forms.PasswordInput(),
|
||||||
required=False
|
required=False
|
||||||
)
|
)
|
||||||
store_passwords = forms.BooleanField(
|
store_credentials = forms.BooleanField(
|
||||||
label=_('Store passwords on FreedomBox'),
|
label=_('Store passwords on FreedomBox'),
|
||||||
help_text=format_lazy(
|
help_text=format_lazy(
|
||||||
_('Store the passwords on your {box_name}.'
|
_('Store the passwords on your {box_name}.'
|
||||||
|
|||||||
@ -33,6 +33,7 @@ from .errors import BorgError, BorgRepositoryDoesNotExistError
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
SSHFS_MOUNTPOINT = '/media/'
|
SSHFS_MOUNTPOINT = '/media/'
|
||||||
|
SUPPORTED_BORG_ENCRYPTION = ['none', 'repokey']
|
||||||
# known errors that come up when remotely accessing a borg repository
|
# known errors that come up when remotely accessing a borg repository
|
||||||
# 'errors' are error strings to look for in the stacktrace.
|
# 'errors' are error strings to look for in the stacktrace.
|
||||||
KNOWN_ERRORS = [{
|
KNOWN_ERRORS = [{
|
||||||
@ -60,8 +61,9 @@ class BorgRepository(object):
|
|||||||
name = ROOT_REPOSITORY_NAME
|
name = ROOT_REPOSITORY_NAME
|
||||||
is_mounted = True
|
is_mounted = True
|
||||||
|
|
||||||
def __init__(self, path):
|
def __init__(self, path, credentials={}):
|
||||||
self.path = path
|
self.path = path
|
||||||
|
self.credentials = credentials
|
||||||
|
|
||||||
def get_info(self):
|
def get_info(self):
|
||||||
output = self._run(['info', '--path', self.path])
|
output = self._run(['info', '--path', self.path])
|
||||||
@ -73,12 +75,19 @@ class BorgRepository(object):
|
|||||||
|
|
||||||
def get_view_content(self):
|
def get_view_content(self):
|
||||||
"""Get archives with additional information as needed by the view"""
|
"""Get archives with additional information as needed by the view"""
|
||||||
return {
|
repository = {
|
||||||
'name': self.name,
|
'name': self.name,
|
||||||
'mounted': self.is_mounted,
|
|
||||||
'archives': self.list_archives(),
|
|
||||||
'type': self.storage_type,
|
'type': self.storage_type,
|
||||||
|
'error': ''
|
||||||
}
|
}
|
||||||
|
try:
|
||||||
|
repository['archives'] = self.list_archives()
|
||||||
|
repository['mounted'] = self.is_mounted
|
||||||
|
error = ''
|
||||||
|
except (BorgError, ActionError) as err:
|
||||||
|
error = str(err)
|
||||||
|
repository['error'] = error
|
||||||
|
return repository
|
||||||
|
|
||||||
def delete_archive(self, archive_name):
|
def delete_archive(self, archive_name):
|
||||||
archive_path = self.get_archive_path(archive_name)
|
archive_path = self.get_archive_path(archive_name)
|
||||||
@ -92,7 +101,11 @@ class BorgRepository(object):
|
|||||||
api.backup_apps(_backup_handler, app_names=app_names,
|
api.backup_apps(_backup_handler, app_names=app_names,
|
||||||
label=archive_name),
|
label=archive_name),
|
||||||
|
|
||||||
|
def create_repository(self):
|
||||||
|
self._run(['init', '--path', self.path, '--encryption', 'none'])
|
||||||
|
|
||||||
def download_archive(self, name):
|
def download_archive(self, name):
|
||||||
|
# TODO
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_archive(self, name):
|
def get_archive(self, name):
|
||||||
@ -110,13 +123,27 @@ class BorgRepository(object):
|
|||||||
return output.splitlines()
|
return output.splitlines()
|
||||||
|
|
||||||
def restore_archive(self):
|
def restore_archive(self):
|
||||||
|
# TODO
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_archive_path(self, archive_name):
|
def get_archive_path(self, archive_name):
|
||||||
return "::".join(self.path, archive_name)
|
return "::".join([self.path, archive_name])
|
||||||
|
|
||||||
|
def _get_arguments(self, arguments, credentials):
|
||||||
|
kwargs = {}
|
||||||
|
if 'encryption_passphrase' in credentials and \
|
||||||
|
credentials['encryption_passphrase']:
|
||||||
|
arguments += ['--encryption-passphrase',
|
||||||
|
credentials['encryption_passphrase']]
|
||||||
|
return (arguments, kwargs)
|
||||||
|
|
||||||
def _run(self, arguments, superuser=True):
|
def _run(self, arguments, superuser=True):
|
||||||
"""Run a backups action script command."""
|
"""Run a backups action script command.
|
||||||
|
|
||||||
|
Automatically passes on credentials via self._get_arguments to the
|
||||||
|
backup script via environment variables or input, except if you
|
||||||
|
set use_credentials to False.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
if superuser:
|
if superuser:
|
||||||
return actions.superuser_run(self.command, arguments)
|
return actions.superuser_run(self.command, arguments)
|
||||||
@ -137,6 +164,7 @@ class BorgRepository(object):
|
|||||||
|
|
||||||
|
|
||||||
class SshBorgRepository(BorgRepository):
|
class SshBorgRepository(BorgRepository):
|
||||||
|
"""Borg repository that is accessed via SSH"""
|
||||||
KNOWN_CREDENTIALS = ['ssh_keyfile', 'ssh_password',
|
KNOWN_CREDENTIALS = ['ssh_keyfile', 'ssh_password',
|
||||||
'encryption_passphrase']
|
'encryption_passphrase']
|
||||||
storage_type = 'ssh'
|
storage_type = 'ssh'
|
||||||
@ -176,30 +204,35 @@ class SshBorgRepository(BorgRepository):
|
|||||||
|
|
||||||
def _load_from_kvstore(self):
|
def _load_from_kvstore(self):
|
||||||
storage = network_storage.get(self.uuid)
|
storage = network_storage.get(self.uuid)
|
||||||
self.credentials = storage['credentials']
|
try:
|
||||||
|
self.credentials = storage['credentials']
|
||||||
|
except KeyError:
|
||||||
|
self.credentials = {}
|
||||||
self.path = storage['path']
|
self.path = storage['path']
|
||||||
|
|
||||||
def _get_network_storage_format(self):
|
def _get_network_storage_format(self, store_credentials):
|
||||||
storage = {
|
storage = {
|
||||||
'path': self.path,
|
'path': self.path,
|
||||||
'credentials': self.credentials,
|
|
||||||
'storage_type': self.storage_type,
|
'storage_type': self.storage_type,
|
||||||
'added_by_module': 'backups'
|
'added_by_module': 'backups'
|
||||||
}
|
}
|
||||||
if hasattr(self, 'uuid'):
|
if hasattr(self, 'uuid'):
|
||||||
storage['uuid'] = self.uuid
|
storage['uuid'] = self.uuid
|
||||||
|
if store_credentials:
|
||||||
|
storage['credentials'] = self.credentials
|
||||||
return storage
|
return storage
|
||||||
|
|
||||||
def create_archive(self, app_names, archive_name):
|
def create_archive(self, app_names, archive_name):
|
||||||
api.backup_apps(_backup_handler, app_names=app_names,
|
raise NotImplementedError
|
||||||
label=archive_name, credentials=self.credentials)
|
# api.backup_apps(_backup_handler, app_names=app_names,
|
||||||
|
|
||||||
def create_repository(self, encryption):
|
def create_repository(self, encryption):
|
||||||
cmd = ['init', '--path', self.path, '--encryption', encryption]
|
if encryption not in SUPPORTED_BORG_ENCRYPTION:
|
||||||
self._run(cmd)
|
raise ValueError('Unsupported encryption: %s' % encryption)
|
||||||
|
self._run(['init', '--path', self.path, '--encryption', encryption])
|
||||||
|
|
||||||
def save(self):
|
def save(self, store_credentials=True):
|
||||||
storage = self._get_network_storage_format()
|
storage = self._get_network_storage_format(store_credentials)
|
||||||
self.uuid = network_storage.update_or_add(storage)
|
self.uuid = network_storage.update_or_add(storage)
|
||||||
|
|
||||||
def mount(self):
|
def mount(self):
|
||||||
@ -207,8 +240,7 @@ class SshBorgRepository(BorgRepository):
|
|||||||
self._run(cmd)
|
self._run(cmd)
|
||||||
|
|
||||||
def umount(self):
|
def umount(self):
|
||||||
self._run(['umount', '--mountpoint', self.mountpoint],
|
self._run(['umount', '--mountpoint', self.mountpoint])
|
||||||
use_credentials=False)
|
|
||||||
|
|
||||||
def remove(self):
|
def remove(self):
|
||||||
"""Remove a repository from the kvstore and delete its mountpoint"""
|
"""Remove a repository from the kvstore and delete its mountpoint"""
|
||||||
@ -224,30 +256,31 @@ class SshBorgRepository(BorgRepository):
|
|||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(err)
|
logger.error(err)
|
||||||
|
|
||||||
|
def _get_arguments(self, arguments, credentials):
|
||||||
|
arguments, kwargs = super()._get_arguments(arguments, credentials)
|
||||||
|
if 'ssh_password' in credentials and credentials['ssh_password']:
|
||||||
|
kwargs['input'] = credentials['ssh_password'].encode()
|
||||||
|
if 'ssh_keyfile' in credentials and credentials['ssh_keyfile']:
|
||||||
|
arguments += ['--ssh-keyfile', credentials['ssh_keyfile']]
|
||||||
|
return (arguments, kwargs)
|
||||||
|
|
||||||
def _run(self, arguments, superuser=True, use_credentials=True):
|
def _run(self, arguments, superuser=True, use_credentials=True):
|
||||||
"""Run a backups action script command.
|
"""Run a backups action script command.
|
||||||
|
|
||||||
This automatically passes on self.credentials to the backups script
|
Automatically passes on credentials via self._get_arguments to the
|
||||||
via environment variables or input, except if you set use_credentials
|
backup script via environment variables or input, except if you
|
||||||
to False.
|
set use_credentials to False.
|
||||||
"""
|
"""
|
||||||
for key in self.credentials.keys():
|
|
||||||
if key not in self.KNOWN_CREDENTIALS:
|
|
||||||
raise ValueError('Unknown credentials: %s' % key)
|
|
||||||
|
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
if use_credentials:
|
if use_credentials:
|
||||||
if 'ssh_password' in self.credentials and \
|
if not self.credentials:
|
||||||
self.credentials['ssh_password'] is not None:
|
msg = 'Cannot access ssh repo without credentials'
|
||||||
kwargs['input'] = self.credentials['ssh_password'].encode()
|
raise BorgError(msg)
|
||||||
if 'ssh_keyfile' in self.credentials and \
|
for key in self.credentials.keys():
|
||||||
self.credentials['ssh_keyfile'] is not None:
|
if key not in self.KNOWN_CREDENTIALS:
|
||||||
arguments += ['--ssh-keyfile', self.credentials['ssh_keyfile']]
|
raise ValueError('Unknown credentials entry: %s' % key)
|
||||||
if 'encryption_passphrase' in self.credentials and \
|
arguments, kwargs = self._get_arguments(arguments,
|
||||||
self.credentials['encryption_passphrase'] is not None:
|
self.credentials)
|
||||||
arguments += ['--encryption-passphrase',
|
|
||||||
self.credentials['encryption_passphrase']]
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if superuser:
|
if superuser:
|
||||||
return actions.superuser_run(self.command, arguments, **kwargs)
|
return actions.superuser_run(self.command, arguments, **kwargs)
|
||||||
|
|||||||
@ -44,7 +44,7 @@ from plinth.modules import backups, storage
|
|||||||
from . import api, forms, SESSION_PATH_VARIABLE, ROOT_REPOSITORY
|
from . import api, forms, SESSION_PATH_VARIABLE, ROOT_REPOSITORY
|
||||||
from .repository import BorgRepository, SshBorgRepository, get_ssh_repositories
|
from .repository import BorgRepository, SshBorgRepository, get_ssh_repositories
|
||||||
from .decorators import delete_tmp_backup_file
|
from .decorators import delete_tmp_backup_file
|
||||||
from .errors import BorgRepositoryDoesNotExistError
|
from .errors import BorgError, BorgRepositoryDoesNotExistError
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -310,14 +310,13 @@ class AddRepositoryView(SuccessMessageMixin, FormView):
|
|||||||
"""Restore files from the archive on valid form submission."""
|
"""Restore files from the archive on valid form submission."""
|
||||||
path = form.cleaned_data['repository']
|
path = form.cleaned_data['repository']
|
||||||
credentials = {}
|
credentials = {}
|
||||||
if form.cleaned_data['store_passwords']:
|
encryption_passphrase = form.cleaned_data['encryption_passphrase']
|
||||||
encryption_passphrase = form.cleaned_data['encryption_passphrase']
|
if encryption_passphrase:
|
||||||
if encryption_passphrase:
|
credentials['encryption_passphrase'] = encryption_passphrase
|
||||||
credentials['encryption_passphrase'] = encryption_passphrase
|
if form.cleaned_data['ssh_password']:
|
||||||
if form.cleaned_data['ssh_password']:
|
credentials['ssh_password'] = form.cleaned_data['ssh_password']
|
||||||
credentials['ssh_password'] = form.cleaned_data['ssh_password']
|
# TODO: add ssh_keyfile
|
||||||
# TODO: add ssh_keyfile
|
# ssh_keyfile = form.cleaned_data['ssh_keyfile']
|
||||||
# ssh_keyfile = form.cleaned_data['ssh_keyfile']
|
|
||||||
|
|
||||||
repository = SshBorgRepository(path=path, credentials=credentials)
|
repository = SshBorgRepository(path=path, credentials=credentials)
|
||||||
|
|
||||||
@ -325,7 +324,8 @@ class AddRepositoryView(SuccessMessageMixin, FormView):
|
|||||||
repository.get_info()
|
repository.get_info()
|
||||||
except BorgRepositoryDoesNotExistError:
|
except BorgRepositoryDoesNotExistError:
|
||||||
repository.create_repository(form.cleaned_data['encryption'])
|
repository.create_repository(form.cleaned_data['encryption'])
|
||||||
repository.save()
|
repository.save(store_credentials=
|
||||||
|
form.cleaned_data['store_credentials'])
|
||||||
return super().form_valid(form)
|
return super().form_valid(form)
|
||||||
|
|
||||||
|
|
||||||
@ -345,6 +345,8 @@ class TestRepositoryView(TemplateView):
|
|||||||
try:
|
try:
|
||||||
repo_info = repository.get_info()
|
repo_info = repository.get_info()
|
||||||
context["message"] = repo_info
|
context["message"] = repo_info
|
||||||
|
except BorgError as err:
|
||||||
|
context["error"] = str(err)
|
||||||
except ActionError as err:
|
except ActionError as err:
|
||||||
context["error"] = str(err)
|
context["error"] = str(err)
|
||||||
|
|
||||||
|
|||||||
@ -25,10 +25,10 @@ the variables defined here.
|
|||||||
backups_ssh_path = None
|
backups_ssh_path = None
|
||||||
backups_ssh_password = None
|
backups_ssh_password = None
|
||||||
backups_ssh_keyfile = None
|
backups_ssh_keyfile = None
|
||||||
backups_ssh_mountpoint = '/mnt/plinth_test_sshfs'
|
backups_ssh_repo_uuid = 'plinth_test_sshfs' # will be mounted to /media/<uuid>
|
||||||
|
|
||||||
# Import config_local to override the default variables
|
# Import config_local to override the default variables
|
||||||
try:
|
try:
|
||||||
from config_local.py import *
|
from .config_local import *
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
|
|||||||
@ -25,17 +25,11 @@ import unittest
|
|||||||
|
|
||||||
from plinth import cfg
|
from plinth import cfg
|
||||||
from plinth.modules import backups
|
from plinth.modules import backups
|
||||||
from plinth.modules.backups import sshfs
|
from plinth.modules.backups.repository import BorgRepository, SshBorgRepository
|
||||||
from plinth import actions
|
from plinth import actions
|
||||||
|
|
||||||
from . import config
|
from . import config
|
||||||
|
|
||||||
try:
|
|
||||||
from . import config_local as config
|
|
||||||
except ImportError:
|
|
||||||
from . import config
|
|
||||||
|
|
||||||
|
|
||||||
euid = os.geteuid()
|
euid = os.geteuid()
|
||||||
|
|
||||||
|
|
||||||
@ -64,34 +58,26 @@ class TestBackups(unittest.TestCase):
|
|||||||
def test_nonexisting_repository(self):
|
def test_nonexisting_repository(self):
|
||||||
nonexisting_dir = os.path.join(self.backup_directory.name,
|
nonexisting_dir = os.path.join(self.backup_directory.name,
|
||||||
'does_not_exist')
|
'does_not_exist')
|
||||||
|
repository = BorgRepository(nonexisting_dir)
|
||||||
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
|
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
|
||||||
backups.test_connection(nonexisting_dir)
|
repository.get_info()
|
||||||
|
|
||||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
||||||
def test_empty_dir(self):
|
def test_empty_dir(self):
|
||||||
empty_dir = os.path.join(self.backup_directory.name, 'empty_dir')
|
empty_dir = os.path.join(self.backup_directory.name, 'empty_dir')
|
||||||
os.mkdir(empty_dir)
|
os.mkdir(empty_dir)
|
||||||
|
repository = BorgRepository(empty_dir)
|
||||||
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
|
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
|
||||||
backups.test_connection(empty_dir)
|
repository.get_info()
|
||||||
|
|
||||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
||||||
def test_create_unencrypted_repository(self):
|
def test_create_unencrypted_repository(self):
|
||||||
repo_path = os.path.join(self.backup_directory.name, 'borgbackup')
|
repo_path = os.path.join(self.backup_directory.name, 'borgbackup')
|
||||||
backups.create_repository(repo_path, 'none')
|
repository = BorgRepository(repo_path)
|
||||||
info = backups.get_info(repo_path)
|
repository.create_repository()
|
||||||
|
info = repository.get_info()
|
||||||
assert 'encryption' in info
|
assert 'encryption' in info
|
||||||
|
|
||||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
|
||||||
def test_create_encrypted_repository(self):
|
|
||||||
repo_path = os.path.join(self.backup_directory.name,
|
|
||||||
'borgbackup_encrypted')
|
|
||||||
# 'borg init' creates missing folders automatically
|
|
||||||
access_params = {'encryption_passphrase': '12345'}
|
|
||||||
backups.create_repository(repo_path, 'repokey',
|
|
||||||
access_params=access_params)
|
|
||||||
assert backups.get_info(repo_path, access_params)
|
|
||||||
assert backups.test_connection(repo_path, access_params)
|
|
||||||
|
|
||||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
||||||
def test_create_and_delete_archive(self):
|
def test_create_and_delete_archive(self):
|
||||||
"""
|
"""
|
||||||
@ -104,48 +90,58 @@ class TestBackups(unittest.TestCase):
|
|||||||
archive_name = 'first_archive'
|
archive_name = 'first_archive'
|
||||||
repo_path = os.path.join(self.backup_directory.name, repo_name)
|
repo_path = os.path.join(self.backup_directory.name, repo_name)
|
||||||
|
|
||||||
backups.create_repository(repo_path, 'none')
|
repository = BorgRepository(repo_path)
|
||||||
|
repository.create_repository()
|
||||||
archive_path = "::".join([repo_path, archive_name])
|
archive_path = "::".join([repo_path, archive_name])
|
||||||
actions.superuser_run(
|
actions.superuser_run(
|
||||||
'backups', ['create-archive', '--path', archive_path, '--paths',
|
'backups', ['create-archive', '--path', archive_path, '--paths',
|
||||||
self.data_directory])
|
self.data_directory])
|
||||||
|
|
||||||
archive = backups.list_archives(repo_path)[0]
|
archive = repository.list_archives()[0]
|
||||||
assert archive['name'] == archive_name
|
assert archive['name'] == archive_name
|
||||||
|
|
||||||
backups.delete_archive(archive_path)
|
repository.delete_archive(archive_name)
|
||||||
content = backups.list_archives(repo_path)
|
content = repository.list_archives()
|
||||||
assert len(content) == 0
|
assert len(content) == 0
|
||||||
|
|
||||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
@unittest.skipUnless(euid == 0 and config.backups_ssh_path,
|
||||||
def test_is_mounted(self):
|
'Needs to be root and ssh credentials provided')
|
||||||
assert not sshfs.is_mounted(self.action_directory.name)
|
def test_ssh_mount(self):
|
||||||
assert sshfs.is_mounted('/')
|
|
||||||
|
|
||||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
|
||||||
def test_mount(self):
|
|
||||||
"""Test (un)mounting if credentials for a remote location are given"""
|
"""Test (un)mounting if credentials for a remote location are given"""
|
||||||
import ipdb; ipdb.set_trace()
|
credentials = self.get_credentials()
|
||||||
if config.backups_ssh_path:
|
if not credentials:
|
||||||
access_params = self.get_remote_access_params()
|
return
|
||||||
if not access_params:
|
ssh_path = config.backups_ssh_path
|
||||||
return
|
|
||||||
mountpoint = config.backups_ssh_mountpoint
|
|
||||||
ssh_path = config.backups_ssh_path
|
|
||||||
|
|
||||||
sshfs.mount(ssh_path, mountpoint, access_params)
|
ssh_repo = SshBorgRepository(uuid='plinth_test_sshfs',
|
||||||
assert sshfs.is_mounted(mountpoint)
|
path=ssh_path,
|
||||||
sshfs.umount(mountpoint)
|
credentials=credentials)
|
||||||
assert not sshfs.is_mounted(mountpoint)
|
ssh_repo.mount()
|
||||||
|
assert ssh_repo.is_mounted
|
||||||
|
ssh_repo.umount()
|
||||||
|
assert not ssh_repo.is_mounted
|
||||||
|
|
||||||
def get_remote_access_params(self):
|
@unittest.skipUnless(euid == 0 and config.backups_ssh_path,
|
||||||
|
'Needs to be root and ssh credentials provided')
|
||||||
|
def test_ssh_create_encrypted_repository(self):
|
||||||
|
credentials = self.get_credentials()
|
||||||
|
encrypted_repo = os.path.join(self.backup_directory.name,
|
||||||
|
'borgbackup_encrypted')
|
||||||
|
credentials['encryption_passphrase'] = '12345'
|
||||||
|
repository = SshBorgRepository(path=encrypted_repo,
|
||||||
|
credentials=credentials)
|
||||||
|
# 'borg init' creates missing folders automatically
|
||||||
|
repository.create_repository(encryption='repokey')
|
||||||
|
assert repository.get_info()
|
||||||
|
|
||||||
|
def get_credentials(self):
|
||||||
"""
|
"""
|
||||||
Get access params for a remote location.
|
Get access params for a remote location.
|
||||||
Return an empty dict if no valid access params are found.
|
Return an empty dict if no valid access params are found.
|
||||||
"""
|
"""
|
||||||
access_params = {}
|
credentials = {}
|
||||||
if config.backups_ssh_password:
|
if config.backups_ssh_password:
|
||||||
access_params['ssh_password'] = config.backups_ssh_password
|
credentials['ssh_password'] = config.backups_ssh_password
|
||||||
elif config.backups_ssh_keyfile:
|
elif config.backups_ssh_keyfile:
|
||||||
access_params['ssh_keyfile'] = config.backups_ssh_keyfile
|
credentials['ssh_keyfile'] = config.backups_ssh_keyfile
|
||||||
return access_params
|
return credentials
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user