Backups, remote repositories: create/delete/restore of remote repos

- updated unittests
- implemented create/delete/restore remote archives

Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Michael Pimmer 2018-11-30 01:34:34 +00:00 committed by James Valleroy
parent 37aa2f9992
commit 15e26caa23
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
11 changed files with 172 additions and 154 deletions

View File

@ -127,15 +127,13 @@ def subcommand_setup(arguments):
def init(path, encryption, env=None):
"""Initialize a local or remote borg repository"""
# TODO: verify that the repository does not exist?
# TODO: does remote borg also create folders if they don't exist?
if encryption != 'none' and 'BORG_PASSPHRASE' not in env:
raise ValueError('No encryption passphrase provided')
cmd = ['borg', 'init', '--encryption', encryption, path]
run(cmd, env=env)
def get_env(arguments):
def get_env(arguments, read_input=True):
"""Create encryption and ssh kwargs out of given arguments"""
env = dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes')
if arguments.encryption_passphrase:
@ -143,27 +141,13 @@ def get_env(arguments):
if arguments.ssh_keyfile:
env['BORG_RSH'] = "ssh -i %s" % arguments.ssh_keyfile
else:
password = read_password()
password = read_password() if read_input else None
if password:
env['SSHPASS'] = password
env['BORG_RSH'] = 'sshpass -e ssh -o StrictHostKeyChecking=no'
return env
def get_sshfs_env(arguments):
"""Create encryption and ssh kwargs out of given arguments"""
env = dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes')
if arguments.encryption_passphrase:
env['BORG_PASSPHRASE'] = arguments.encryption_passphrase
if arguments.ssh_keyfile:
env['SSHKEY'] = arguments.ssh_keyfile
else:
password = read_password()
if password:
env['SSHPASS'] = password
return env
def subcommand_init(arguments):
env = get_env(arguments)
init(arguments.path, arguments.encryption, env=env)
@ -281,9 +265,9 @@ def _extract(archive_path, destination, locations=None):
def subcommand_export_tar(arguments):
"""Export archive contents as tar stream on stdout."""
env = get_env(arguments)
run(['borg', 'export-tar', arguments.path, '-'],
env=env)
# TODO: Get read_password to reliably detect if a password is provided
env = get_env(arguments, read_input=False)
run(['borg', 'export-tar', arguments.path, '-'], env=env)
def _read_archive_file(archive, filepath):
@ -384,8 +368,7 @@ def read_password():
def run(cmd, env=None, check=True):
"""Wrap the command with ssh password or keyfile authentication"""
# If the remote server asks for a password but no password is
# provided, we get stuck at asking the password.
# Set a timeout to not get stuck if the remote server asks for a password.
timeout = None
if env and 'BORG_RSH' in env and 'SSHPASS' not in env:
timeout = TIMEOUT

View File

@ -46,6 +46,7 @@ MANIFESTS_FOLDER = '/var/lib/plinth/backups-manifests/'
ROOT_REPOSITORY = '/var/lib/freedombox/borgbackup'
ROOT_REPOSITORY_NAME = format_lazy(_('{box_name} storage'),
box_name=cfg.box_name)
ROOT_REPOSITORY_UUID = 'root'
# session variable name that stores when a backup file should be deleted
SESSION_PATH_VARIABLE = 'fbx-backups-upload-path'
@ -69,7 +70,7 @@ def _backup_handler(packet):
os.makedirs(MANIFESTS_FOLDER)
manifest_path = os.path.join(MANIFESTS_FOLDER,
get_valid_filename(packet.label) + '.json')
get_valid_filename(packet.path) + '.json')
manifests = {
'apps': [{
'name': app.name,
@ -83,8 +84,7 @@ def _backup_handler(packet):
paths = packet.directories + packet.files
paths.append(manifest_path)
actions.superuser_run(
'backups', ['create-archive', '--name', packet.label, '--paths'] +
# TODO: add ssh_keyfile
'backups', ['create-archive', '--path', packet.path, '--paths'] +
paths)
@ -100,16 +100,16 @@ def _restore_exported_archive_handler(packet):
locations = {'directories': packet.directories, 'files': packet.files}
locations_data = json.dumps(locations)
actions.superuser_run('backups', ['restore-exported-archive', '--path',
packet.label],
packet.path],
input=locations_data.encode())
def _restore_archive_handler(packet):
def restore_archive_handler(packet):
"""Perform restore operation on packet."""
locations = {'directories': packet.directories, 'files': packet.files}
locations_data = json.dumps(locations)
actions.superuser_run('backups', ['restore-archive', '--path',
packet.label, '--destination', '/'],
packet.path, '--destination', '/'],
input=locations_data.encode())
@ -117,9 +117,3 @@ def restore_from_upload(path, apps=None):
"""Restore files from an uploaded .tar.gz backup file"""
api.restore_apps(_restore_exported_archive_handler, app_names=apps,
create_subvolume=False, backup_file=path)
def restore(archive_path, apps=None):
"""Restore files from a backup archive."""
api.restore_apps(_restore_archive_handler, app_names=apps,
create_subvolume=False, backup_file=archive_path)

View File

@ -98,7 +98,7 @@ class BackupError:
class Packet:
"""Information passed to a handlers for backup/restore operations."""
def __init__(self, operation, scope, root, apps=None, label=None):
def __init__(self, operation, scope, root, apps=None, path=None):
"""Initialize the packet.
operation is either 'backup' or 'restore.
@ -110,8 +110,7 @@ class Packet:
All paths populated are relative to the 'root' path. The root path
itself must not be stored in the backup.
label is either an archive name (w/o path), or the full path of an
exported archive.
path is the full path of an (possibly exported) archive.
TODO: create two variables out of it as it's distinct information.
"""
@ -119,7 +118,7 @@ class Packet:
self.scope = scope
self.root = root
self.apps = apps
self.label = label
self.path = path
self.errors = []
self.directories = []
@ -136,7 +135,7 @@ class Packet:
self.files += app.manifest.get(section, {}).get('files', [])
def backup_full(backup_handler, label=None):
def backup_full(backup_handler, path=None):
"""Backup the entire system."""
if not _is_snapshot_available():
raise Exception('Full backup is not supported without snapshots.')
@ -144,7 +143,7 @@ def backup_full(backup_handler, label=None):
snapshot = _take_snapshot()
backup_root = snapshot['mount_path']
packet = Packet('backup', 'full', backup_root, label)
packet = Packet('backup', 'full', backup_root, path)
_run_operation(backup_handler, packet)
_delete_snapshot(snapshot)
@ -163,7 +162,7 @@ def restore_full(restore_handler):
_switch_to_subvolume(subvolume)
def backup_apps(backup_handler, app_names=None, label=None):
def backup_apps(backup_handler, path, app_names=None):
"""Backup data belonging to a set of applications."""
if not app_names:
apps = get_all_apps_for_backup()
@ -180,7 +179,7 @@ def backup_apps(backup_handler, app_names=None, label=None):
backup_root = '/'
snapshotted = False
packet = Packet('backup', 'apps', backup_root, apps, label)
packet = Packet('backup', 'apps', backup_root, apps, path)
_run_operation(backup_handler, packet)
if snapshotted:

View File

@ -19,7 +19,7 @@ from plinth.errors import PlinthError
class BorgError(PlinthError):
"""Generic class for borg errors that """
"""Generic borg errors"""
pass

View File

@ -19,15 +19,17 @@ Remote and local Borg backup repositories
"""
import json
import os
import logging
import os
from django.utils.translation import ugettext_lazy as _
from plinth import actions
from plinth.errors import ActionError
from . import api, network_storage, _backup_handler, ROOT_REPOSITORY_NAME
from . import api, network_storage, _backup_handler, ROOT_REPOSITORY_NAME, \
ROOT_REPOSITORY_UUID, ROOT_REPOSITORY, restore_archive_handler, \
zipstream
from .errors import BorgError, BorgRepositoryDoesNotExistError
logger = logging.getLogger(__name__)
@ -93,20 +95,25 @@ class BorgRepository(object):
archive_path = self.get_archive_path(archive_name)
self._run(['delete-archive', '--path', archive_path])
def remove(self):
def remove_repository(self):
"""Remove a borg repository"""
raise NotImplementedError
def create_archive(self, app_names, archive_name):
api.backup_apps(_backup_handler, app_names=app_names,
label=archive_name),
def create_archive(self, archive_name, app_names):
archive_path = self.get_archive_path(archive_name)
api.backup_apps(_backup_handler, path=archive_path,
app_names=app_names)
def create_repository(self):
self._run(['init', '--path', self.path, '--encryption', 'none'])
def download_archive(self, name):
# TODO
pass
def get_zipstream(self, archive_name):
archive_path = self.get_archive_path(archive_name)
args = ['export-tar', '--path', archive_path]
kwargs = {'run_in_background': True,
'bufsize': 1}
proc = self._run(args, kwargs=kwargs, use_credentials=False)
return zipstream.ZipStream(proc.stdout, 'readline')
def get_archive(self, name):
# TODO: can't we get this archive directly?
@ -122,33 +129,27 @@ class BorgRepository(object):
output = self._run(['get-archive-apps', '--path', archive_path])
return output.splitlines()
def restore_archive(self):
# TODO
pass
def restore_archive(self, archive_name, apps=None):
archive_path = self.get_archive_path(archive_name)
api.restore_apps(restore_archive_handler, app_names=apps,
create_subvolume=False, backup_file=archive_path)
def get_archive_path(self, archive_name):
return "::".join([self.path, archive_name])
def _get_arguments(self, arguments, credentials):
kwargs = {}
if 'encryption_passphrase' in credentials and \
credentials['encryption_passphrase']:
arguments += ['--encryption-passphrase',
credentials['encryption_passphrase']]
return (arguments, kwargs)
def _get_env(self):
return dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes')
def _run(self, arguments, superuser=True):
"""Run a backups action script command.
Automatically passes on credentials via self._get_arguments to the
backup script via environment variables or input, except if you
set use_credentials to False.
"""
def _run(self, arguments, superuser=True, kwargs=None,
use_credentials=False):
"""Run a backups action script command."""
if kwargs is None:
kwargs = {}
try:
if superuser:
return actions.superuser_run(self.command, arguments)
return actions.superuser_run(self.command, arguments, **kwargs)
else:
return actions.run(self.command, arguments)
return actions.run(self.command, arguments, **kwargs)
except ActionError as err:
self.reraise_known_error(err)
@ -168,8 +169,10 @@ class SshBorgRepository(BorgRepository):
KNOWN_CREDENTIALS = ['ssh_keyfile', 'ssh_password',
'encryption_passphrase']
storage_type = 'ssh'
uuid = None
def __init__(self, uuid=None, path=None, credentials=None, **kwargs):
def __init__(self, uuid=None, path=None, credentials=None, automount=True,
**kwargs):
"""
Provide a uuid to instanciate an existing repository,
or 'path' and 'credentials' for a new repository.
@ -188,6 +191,9 @@ class SshBorgRepository(BorgRepository):
self.credentials = credentials
else:
raise ValueError('Invalid arguments.')
if automount:
if self.uuid and not self.is_mounted:
self.mount()
@property
def mountpoint(self):
@ -202,6 +208,9 @@ class SshBorgRepository(BorgRepository):
output = self._run(['is-mounted', '--mountpoint', self.mountpoint])
return json.loads(output)
def get_archive_path(self, archive_name):
return "::".join([self.mountpoint, archive_name])
def _load_from_kvstore(self):
storage = network_storage.get(self.uuid)
try:
@ -216,16 +225,12 @@ class SshBorgRepository(BorgRepository):
'storage_type': self.storage_type,
'added_by_module': 'backups'
}
if hasattr(self, 'uuid'):
if self.uuid:
storage['uuid'] = self.uuid
if store_credentials:
storage['credentials'] = self.credentials
return storage
def create_archive(self, app_names, archive_name):
raise NotImplementedError
# api.backup_apps(_backup_handler, app_names=app_names,
def create_repository(self, encryption):
if encryption not in SUPPORTED_BORG_ENCRYPTION:
raise ValueError('Unsupported encryption: %s' % encryption)
@ -242,7 +247,7 @@ class SshBorgRepository(BorgRepository):
def umount(self):
self._run(['umount', '--mountpoint', self.mountpoint])
def remove(self):
def remove_repository(self):
"""Remove a repository from the kvstore and delete its mountpoint"""
network_storage.delete(self.uuid)
try:
@ -256,22 +261,27 @@ class SshBorgRepository(BorgRepository):
except Exception as err:
logger.error(err)
def _get_arguments(self, arguments, credentials):
arguments, kwargs = super()._get_arguments(arguments, credentials)
def _get_run_arguments(self, arguments, credentials, kwargs=None):
if kwargs is None:
kwargs = {}
if 'encryption_passphrase' in credentials and \
credentials['encryption_passphrase']:
arguments += ['--encryption-passphrase',
credentials['encryption_passphrase']]
if 'ssh_password' in credentials and credentials['ssh_password']:
kwargs['input'] = credentials['ssh_password'].encode()
if 'ssh_keyfile' in credentials and credentials['ssh_keyfile']:
arguments += ['--ssh-keyfile', credentials['ssh_keyfile']]
return (arguments, kwargs)
def _run(self, arguments, superuser=True, use_credentials=True):
def _run(self, arguments, superuser=True, use_credentials=True,
kwargs=None):
"""Run a backups action script command.
Automatically passes on credentials via self._get_arguments to the
Automatically passes on credentials via self._get_run_arguments to the
backup script via environment variables or input, except if you
set use_credentials to False.
"""
kwargs = {}
if use_credentials:
if not self.credentials:
msg = 'Cannot access ssh repo without credentials'
@ -279,8 +289,9 @@ class SshBorgRepository(BorgRepository):
for key in self.credentials.keys():
if key not in self.KNOWN_CREDENTIALS:
raise ValueError('Unknown credentials entry: %s' % key)
arguments, kwargs = self._get_arguments(arguments,
self.credentials)
arguments, kwargs = self._get_run_arguments(arguments,
self.credentials,
kwargs=kwargs)
try:
if superuser:
return actions.superuser_run(self.command, arguments, **kwargs)
@ -294,6 +305,13 @@ def get_ssh_repositories():
"""Get all SSH Repositories including the archive content"""
repositories = {}
for storage in network_storage.get_storages():
repository = SshBorgRepository(**storage)
repository = SshBorgRepository(automount=False, **storage)
repositories[storage['uuid']] = repository.get_view_content()
return repositories
def get_repository(uuid):
if uuid == ROOT_REPOSITORY_UUID:
return BorgRepository(path=ROOT_REPOSITORY)
else:
return SshBorgRepository(uuid=uuid, automount=False)

View File

@ -23,7 +23,13 @@
id="archives-list">
<thead>
<tr>
<th>
<th colspan="2">
{% if repository.error %}
<span class="glyphicon glyphicon-warning-sign mount-error"
aria-hidden="true" title="{{ repository.error }}">
</span>
{% endif %}
{{ repository.name }}
@ -31,8 +37,8 @@
{% if repository.mounted %}
<!-- With GET redirects the browser URL would be pointing to the
redirected page - use POST instead.
<!-- With GET redirects, the browser URL points to the
redirected page (bad when reloading) - use POST instead.
-->
<form action="{% url 'backups:repository-umount' uuid %}" method="POST"
class="inline-block" >
@ -65,8 +71,6 @@
{% endif %}
</th>
<th>
</th>
</tr>
</thead>
@ -82,11 +86,11 @@
{% trans "Download" %}
</a>
<a class="archive-export btn btn-sm btn-default"
href="{% url 'backups:restore-archive' archive.name %}">
href="{% url 'backups:restore-archive' uuid archive.name %}">
{% trans "Restore" %}
</a>
<a class="archive-delete btn btn-sm btn-default"
href="{% url 'backups:delete' archive.name %}">
href="{% url 'backups:delete' uuid archive.name %}">
<span class="glyphicon glyphicon-trash" aria-hidden="true">
</span>
</a>
@ -94,7 +98,7 @@
</tr>
{% endfor %}
{% if not repository.archives %}
{% if not repository.error and not repository.archives %}
<p>{% trans 'No archives currently exist.' %}</p>
{% endif %}

View File

@ -26,6 +26,7 @@ from django.core.files.uploadedfile import SimpleUploadedFile
from plinth import cfg, module_loader
from .. import api, forms
from .. import ROOT_REPOSITORY
# pylint: disable=protected-access
@ -105,7 +106,7 @@ class TestBackupProcesses(unittest.TestCase):
def test_backup_apps():
"""Test that backup_handler is called."""
backup_handler = MagicMock()
api.backup_apps(backup_handler)
api.backup_apps(backup_handler, path=ROOT_REPOSITORY)
backup_handler.assert_called_once()
@staticmethod

View File

@ -30,10 +30,10 @@ urlpatterns = [
url(r'^sys/backups/create/$', CreateArchiveView.as_view(), name='create'),
url(r'^sys/backups/export-and-download/(?P<uuid>[^/]+)/(?P<name>[^/]+)/$',
ExportAndDownloadView.as_view(), name='export-and-download'),
url(r'^sys/backups/delete/(?P<name>[^/]+)/$',
url(r'^sys/backups/delete/(?P<uuid>[^/]+)/(?P<name>[^/]+)/$',
DeleteArchiveView.as_view(), name='delete'),
url(r'^sys/backups/upload/$', UploadArchiveView.as_view(), name='upload'),
url(r'^sys/backups/restore-archive/(?P<name>[^/]+)/$',
url(r'^sys/backups/restore-archive/(?P<uuid>[^/]+)/(?P<name>[^/]+)/$',
RestoreArchiveView.as_view(), name='restore-archive'),
url(r'^sys/backups/restore-from-upload/$',
RestoreFromUploadView.as_view(), name='restore-from-upload'),

View File

@ -19,8 +19,6 @@ Views for the backups app.
"""
from datetime import datetime
import gzip
from io import BytesIO
import logging
import mimetypes
import os
@ -37,12 +35,12 @@ from django.utils.translation import ugettext as _
from django.utils.translation import ugettext_lazy
from django.views.generic import View, FormView, TemplateView
from plinth import actions
from plinth.errors import PlinthError, ActionError
from plinth.modules import backups, storage
from . import api, forms, SESSION_PATH_VARIABLE, ROOT_REPOSITORY
from .repository import BorgRepository, SshBorgRepository, get_ssh_repositories
from .repository import BorgRepository, SshBorgRepository, get_repository, \
get_ssh_repositories
from .decorators import delete_tmp_backup_file
from .errors import BorgError, BorgRepositoryDoesNotExistError
@ -104,8 +102,9 @@ class CreateArchiveView(SuccessMessageMixin, FormView):
def form_valid(self, form):
"""Create the archive on valid form submission."""
backups.create_archive(form.cleaned_data['name'],
form.cleaned_data['selected_apps'])
repository = get_repository(form.cleaned_data['repository'])
repository.create_archive(form.cleaned_data['name'],
form.cleaned_data['selected_apps'])
return super().form_valid(form)
@ -117,15 +116,17 @@ class DeleteArchiveView(SuccessMessageMixin, TemplateView):
"""Return additional context for rendering the template."""
context = super().get_context_data(**kwargs)
context['title'] = _('Delete Archive')
context['archive'] = backups.get_archive(self.kwargs['name'])
repository = get_repository(self.kwargs['uuid'])
context['archive'] = repository.get_archive(self.kwargs['name'])
if context['archive'] is None:
raise Http404
return context
def post(self, request, name):
def post(self, request, uuid, name):
"""Delete the archive."""
backups.delete_archive(name)
repository = get_repository(uuid)
repository.delete_archive(name)
messages.success(request, _('Archive deleted.'))
return redirect('backups:index')
@ -198,6 +199,7 @@ class BaseRestoreView(SuccessMessageMixin, FormView):
context = super().get_context_data(**kwargs)
context['title'] = _('Restore')
context['name'] = self.kwargs.get('name', None)
context['uuid'] = self.kwargs.get('uuid', None)
return context
@ -236,55 +238,25 @@ class RestoreArchiveView(BaseRestoreView):
def _get_included_apps(self):
"""Save some data used to instantiate the form."""
name = unquote(self.kwargs['name'])
archive_path = backups.get_archive_path(name)
return backups.get_archive_apps(archive_path)
uuid = self.kwargs['uuid']
repository = get_repository(uuid)
return repository.get_archive_apps(name)
def form_valid(self, form):
"""Restore files from the archive on valid form submission."""
archive_path = backups.get_archive_path(self.kwargs['name'])
backups.restore(archive_path, form.cleaned_data['selected_apps'])
repository = get_repository(self.kwargs['uuid'])
repository.restore_archive(self.kwargs['name'],
form.cleaned_data['selected_apps'])
return super().form_valid(form)
class ZipStream(object):
"""Zip a stream that yields binary data"""
def __init__(self, stream, get_chunk_method):
"""
- stream: the input stream
- get_chunk_method: name of the method to get a chunk of the stream
"""
self.stream = stream
self.buffer = BytesIO()
self.zipfile = gzip.GzipFile(mode='wb', fileobj=self.buffer)
self.get_chunk = getattr(self.stream, get_chunk_method)
def __next__(self):
line = self.get_chunk()
if not len(line):
raise StopIteration
self.zipfile.write(line)
self.zipfile.flush()
zipped = self.buffer.getvalue()
self.buffer.truncate(0)
self.buffer.seek(0)
return zipped
def __iter__(self):
return self
class ExportAndDownloadView(View):
"""View to export and download an archive as stream."""
def get(self, request, uuid, name):
# The uuid is 'root' for the root repository
name = unquote(name)
repository = get_repository(uuid)
filename = "%s.tar.gz" % name
args = ['export-tar', '--name', name]
proc = actions.superuser_run('backups', args, run_in_background=True,
bufsize=1)
zipStream = ZipStream(proc.stdout, 'readline')
response = StreamingHttpResponse(zipStream,
response = StreamingHttpResponse(repository.get_zipstream(name),
content_type="application/x-gzip")
response['Content-Disposition'] = 'attachment; filename="%s"' % \
filename
@ -324,8 +296,8 @@ class AddRepositoryView(SuccessMessageMixin, FormView):
repository.get_info()
except BorgRepositoryDoesNotExistError:
repository.create_repository(form.cleaned_data['encryption'])
repository.save(store_credentials=
form.cleaned_data['store_credentials'])
repository.save(
store_credentials=form.cleaned_data['store_credentials'])
return super().form_valid(form)
@ -354,7 +326,7 @@ class TestRepositoryView(TemplateView):
class RemoveRepositoryView(SuccessMessageMixin, TemplateView):
"""View to delete an archive."""
"""View to delete a repository."""
template_name = 'backups_repository_remove.html'
def get_context_data(self, uuid, **kwargs):
@ -367,7 +339,7 @@ class RemoveRepositoryView(SuccessMessageMixin, TemplateView):
def post(self, request, uuid):
"""Delete the archive."""
repository = SshBorgRepository(uuid)
repository.remove()
repository.remove_repository()
messages.success(request, _('Repository removed. The remote backup '
'itself was not deleted.'))
return redirect('backups:index')

View File

@ -0,0 +1,47 @@
#
# This file is part of FreedomBox.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import gzip
from io import BytesIO
class ZipStream(object):
"""Zip a stream that yields binary data"""
def __init__(self, stream, get_chunk_method):
"""
- stream: the input stream
- get_chunk_method: name of the method to get a chunk of the stream
"""
self.stream = stream
self.buffer = BytesIO()
self.zipfile = gzip.GzipFile(mode='wb', fileobj=self.buffer)
self.get_chunk = getattr(self.stream, get_chunk_method)
def __next__(self):
line = self.get_chunk()
if not len(line):
raise StopIteration
self.zipfile.write(line)
self.zipfile.flush()
zipped = self.buffer.getvalue()
self.buffer.truncate(0)
self.buffer.seek(0)
return zipped
def __iter__(self):
return self

View File

@ -121,17 +121,17 @@ class TestBackups(unittest.TestCase):
ssh_repo.umount()
assert not ssh_repo.is_mounted
@unittest.skipUnless(euid == 0 and config.backups_ssh_path,
'Needs to be root and ssh credentials provided')
@unittest.skipUnless(euid == 0, 'Needs to be root')
def test_ssh_create_encrypted_repository(self):
credentials = self.get_credentials()
encrypted_repo = os.path.join(self.backup_directory.name,
'borgbackup_encrypted')
credentials['encryption_passphrase'] = '12345'
# using SshBorgRepository to provide credentials because
# BorgRepository does not allow creating encrypted repositories
repository = SshBorgRepository(path=encrypted_repo,
credentials=credentials)
# 'borg init' creates missing folders automatically
repository.create_repository(encryption='repokey')
repository.create_repository('repokey')
assert repository.get_info()
def get_credentials(self):