Backups: support for encrypted repositories

- implement download, restore, delete archives of encrypted
  repositories
- change how BorgRepository and SshBorgRepository handle path
- when/before creating remote repositories, check whether the
  connection works
- updated tests

Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Michael Pimmer 2018-12-02 13:22:59 +00:00 committed by James Valleroy
parent 6651c0468e
commit eab8991b54
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
12 changed files with 244 additions and 175 deletions

View File

@ -68,8 +68,13 @@ def parse_arguments():
'get-archive-apps',
help='Get list of apps included in archive')
restore_archive = subparsers.add_parser(
'restore-archive', help='Restore files from an archive')
restore_archive.add_argument('--destination', help='Destination',
required=True)
for cmd in [info, init, list_repo, create_archive, delete_archive,
export_tar, get_archive_apps, setup]:
export_tar, get_archive_apps, restore_archive, setup]:
cmd.add_argument('--path', help='Repository or Archive path',
required=False)
cmd.add_argument('--ssh-keyfile', help='Path of private ssh key',
@ -90,86 +95,66 @@ def parse_arguments():
restore_exported_archive.add_argument('--path', help='Tarball file path',
required=True)
restore_archive = subparsers.add_parser(
'restore-archive', help='Restore files from an archive')
restore_archive.add_argument('--path', help='Archive path', required=True)
restore_archive.add_argument('--destination', help='Destination',
required=True)
subparsers.required = True
return parser.parse_args()
def subcommand_setup(arguments):
"""Create repository if it does not already exist."""
env = get_env(arguments)
try:
run(['borg', 'info', arguments.path], check=True, env=env)
run(['borg', 'info', arguments.path], arguments=arguments, check=True)
except:
path = os.path.dirname(arguments.path)
if not os.path.exists(path):
os.makedirs(path)
init(arguments.path, 'none', env=env)
init(arguments, encryption='none')
def init(path, encryption, env=None):
def init(arguments, encryption):
"""Initialize a local or remote borg repository"""
if encryption != 'none' and 'BORG_PASSPHRASE' not in env:
raise ValueError('No encryption passphrase provided')
cmd = ['borg', 'init', '--encryption', encryption, path]
run(cmd, env=env)
def get_env(arguments, read_input=True):
"""Create encryption and ssh kwargs out of given arguments"""
env = dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes')
if arguments.encryption_passphrase:
env['BORG_PASSPHRASE'] = arguments.encryption_passphrase
if arguments.ssh_keyfile:
env['BORG_RSH'] = "ssh -i %s" % arguments.ssh_keyfile
else:
password = read_password() if read_input else None
if password:
env['SSHPASS'] = password
env['BORG_RSH'] = 'sshpass -e ssh -o StrictHostKeyChecking=no'
return env
if encryption != 'none':
if not hasattr(arguments, 'encryption_passphrase') or not \
arguments.encryption_passphrase:
raise ValueError('No encryption passphrase provided')
cmd = ['borg', 'init', '--encryption', encryption, arguments.path]
run(cmd, arguments=arguments)
def subcommand_init(arguments):
env = get_env(arguments)
init(arguments.path, arguments.encryption, env=env)
init(arguments, encryption=arguments.encryption)
def subcommand_info(arguments):
"""Show repository information."""
env = get_env(arguments)
run(['borg', 'info', '--json', arguments.path], env=env)
run(['borg', 'info', '--json', arguments.path], arguments=arguments)
def subcommand_list_repo(arguments):
"""List repository contents."""
env = get_env(arguments)
run(['borg', 'list', '--json', arguments.path], env=env)
run(['borg', 'list', '--json', arguments.path], arguments=arguments)
def subcommand_create_archive(arguments):
"""Create archive."""
env = get_env(arguments)
paths = filter(os.path.exists, arguments.paths)
run(['borg', 'create', '--json', arguments.path] + list(paths), env=env)
run(['borg', 'create', '--json', arguments.path] + list(paths),
arguments=arguments)
def subcommand_delete_archive(arguments):
"""Delete archive."""
env = get_env(arguments)
run(['borg', 'delete', arguments.path], env=env)
run(['borg', 'delete', arguments.path], arguments)
def _extract(archive_path, destination, locations=None):
def _extract(archive_path, destination, locations=None, env=None):
"""Extract archive contents."""
if not env:
env = dict(os.environ)
# TODO: is LANG necessary?
env['LANG'] = 'C.UTF-8'
prev_dir = os.getcwd()
env = dict(os.environ, LANG='C.UTF-8')
borg_call = ['borg', 'extract', archive_path]
# do not extract any files when we get an empty locations list
if locations is not None:
@ -193,15 +178,13 @@ def _extract(archive_path, destination, locations=None):
def subcommand_export_tar(arguments):
"""Export archive contents as tar stream on stdout."""
# TODO: Get read_password to reliably detect if a password is provided
env = get_env(arguments, read_input=False)
run(['borg', 'export-tar', arguments.path, '-'], env=env)
run(['borg', 'export-tar', arguments.path, '-'], arguments=arguments)
def _read_archive_file(archive, filepath):
def _read_archive_file(archive, filepath, env=None):
"""Read the content of a file inside an archive"""
arguments = ['borg', 'extract', archive, filepath, '--stdout']
return subprocess.check_output(arguments).decode()
return subprocess.check_output(arguments, env=env).decode()
def subcommand_get_archive_apps(arguments):
@ -222,7 +205,8 @@ def subcommand_get_archive_apps(arguments):
manifest = None
if manifest_path:
manifest_data = _read_archive_file(arguments.path, manifest_path)
manifest_data = _read_archive_file(arguments.path, manifest_path,
env=env)
manifest = json.loads(manifest_data)
if manifest:
for app in _get_apps_of_manifest(manifest):
@ -262,11 +246,13 @@ def subcommand_get_exported_archive_apps(arguments):
def subcommand_restore_archive(arguments):
"""Restore files from an archive."""
env = get_env(arguments)
locations_data = ''.join(sys.stdin)
_locations = json.loads(locations_data)
locations = _locations['directories'] + _locations['files']
locations = [os.path.relpath(location, '/') for location in locations]
_extract(arguments.path, arguments.destination, locations=locations)
_extract(arguments.path, arguments.destination, locations=locations,
env=env)
def subcommand_restore_exported_archive(arguments):
@ -294,12 +280,33 @@ def read_password():
return ''.join(sys.stdin)
def run(cmd, env=None, check=True):
def get_env(arguments, use_credentials=False):
"""Create encryption and ssh kwargs out of given arguments"""
env = dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes')
if arguments.encryption_passphrase:
env['BORG_PASSPHRASE'] = arguments.encryption_passphrase
if use_credentials:
if arguments.ssh_keyfile:
env['BORG_RSH'] = "ssh -i %s" % arguments.ssh_keyfile
else:
password = read_password()
if password:
env['SSHPASS'] = password
env['BORG_RSH'] = 'sshpass -e ssh -o StrictHostKeyChecking=no'
else:
raise ValueError('could not find credentials')
return env
def run(cmd, arguments, check=True):
"""Wrap the command with ssh password or keyfile authentication"""
# Set a timeout to not get stuck if the remote server asks for a password.
timeout = None
if env and 'BORG_RSH' in env and 'SSHPASS' not in env:
use_credentials = False
if "@" in arguments.path:
timeout = TIMEOUT
use_credentials = True
env = get_env(arguments, use_credentials=use_credentials)
subprocess.run(cmd, check=check, env=env, timeout=timeout)

View File

@ -42,6 +42,8 @@ def parse_arguments():
mount.add_argument('--mountpoint', help='Local mountpoint', required=True)
mount.add_argument('--path', help='Remote ssh path to mount',
required=True)
mount.add_argument('--ssh-keyfile', help='Path of private ssh key',
default=None, required=False)
umount = subparsers.add_parser('umount',
help='unmount an ssh filesystem')
umount.add_argument('--mountpoint', help='Mountpoint to unmount',
@ -55,16 +57,6 @@ def parse_arguments():
return parser.parse_args()
def get_env(arguments, read_input=True):
"""Create encryption and ssh kwargs out of given arguments"""
env = dict(os.environ)
password = read_password() if read_input else None
if password:
env['SSHPASS'] = password
env['BORG_RSH'] = 'sshpass -e ssh -o StrictHostKeyChecking=no'
return env
def subcommand_mount(arguments):
"""Show repository information."""
try:
@ -72,7 +64,6 @@ def subcommand_mount(arguments):
except AlreadyMountedError:
return
env = get_env(arguments)
remote_path = arguments.path
kwargs = {}
# the shell would expand ~/ to the local home directory
@ -80,16 +71,15 @@ def subcommand_mount(arguments):
cmd = ['sshfs', remote_path, arguments.mountpoint, '-o',
'UserKnownHostsFile=/dev/null', '-o',
'StrictHostKeyChecking=no']
timeout = None
if 'SSHPASS' in env:
cmd += ['-o', 'password_stdin']
kwargs['input'] = env['SSHPASS'].encode()
elif 'SSHKEY' in env:
if arguments.ssh_keyfile:
cmd += ['-o', 'IdentityFile=$SSHKEY']
timeout = TIMEOUT
else:
raise ValueError('mount requires either SSHPASS or SSHKEY in env')
subprocess.run(cmd, check=True, env=env, timeout=timeout, **kwargs)
password = read_password()
if not password:
raise ValueError('mount requires either a password or ssh_keyfile')
cmd += ['-o', 'password_stdin']
kwargs['input'] = password.encode()
subprocess.run(cmd, check=True, timeout=TIMEOUT, **kwargs)
def subcommand_umount(arguments):

View File

@ -64,7 +64,7 @@ def setup(helper, old_version=None):
ROOT_REPOSITORY])
def _backup_handler(packet):
def _backup_handler(packet, encryption_passphrase=None):
"""Performs backup operation on packet."""
if not os.path.exists(MANIFESTS_FOLDER):
os.makedirs(MANIFESTS_FOLDER)
@ -83,9 +83,10 @@ def _backup_handler(packet):
paths = packet.directories + packet.files
paths.append(manifest_path)
actions.superuser_run(
'backups', ['create-archive', '--path', packet.path, '--paths'] +
paths)
arguments = ['create-archive', '--path', packet.path, '--paths'] + paths
if encryption_passphrase:
arguments += ['--encryption-passphrase', encryption_passphrase]
actions.superuser_run('backups', arguments)
def get_exported_archive_apps(path):
@ -104,13 +105,15 @@ def _restore_exported_archive_handler(packet):
input=locations_data.encode())
def restore_archive_handler(packet):
def restore_archive_handler(packet, encryption_passphrase=None):
"""Perform restore operation on packet."""
locations = {'directories': packet.directories, 'files': packet.files}
locations_data = json.dumps(locations)
actions.superuser_run('backups', ['restore-archive', '--path',
packet.path, '--destination', '/'],
input=locations_data.encode())
arguments = ['restore-archive', '--path', packet.path, '--destination',
'/']
if encryption_passphrase:
arguments += ['--encryption-passphrase', encryption_passphrase]
actions.superuser_run('backups', arguments, input=locations_data.encode())
def restore_from_upload(path, apps=None):

View File

@ -162,7 +162,8 @@ def restore_full(restore_handler):
_switch_to_subvolume(subvolume)
def backup_apps(backup_handler, path, app_names=None):
def backup_apps(backup_handler, path, app_names=None,
encryption_passphrase=None):
"""Backup data belonging to a set of applications."""
if not app_names:
apps = get_all_apps_for_backup()
@ -180,7 +181,8 @@ def backup_apps(backup_handler, path, app_names=None):
snapshotted = False
packet = Packet('backup', 'apps', backup_root, apps, path)
_run_operation(backup_handler, packet)
_run_operation(backup_handler, packet,
encryption_passphrase=encryption_passphrase)
if snapshotted:
_delete_snapshot(snapshot)
@ -190,7 +192,7 @@ def backup_apps(backup_handler, path, app_names=None):
def restore_apps(restore_handler, app_names=None, create_subvolume=True,
backup_file=None):
backup_file=None, encryption_passphrase=None):
"""Restore data belonging to a set of applications."""
if not app_names:
apps = get_all_apps_for_backup()
@ -208,7 +210,8 @@ def restore_apps(restore_handler, app_names=None, create_subvolume=True,
subvolume = False
packet = Packet('restore', 'apps', restore_root, apps, backup_file)
_run_operation(restore_handler, packet)
_run_operation(restore_handler, packet,
encryption_passphrase=encryption_passphrase)
if subvolume:
_switch_to_subvolume(subvolume)
@ -479,8 +482,8 @@ def _run_hooks(hook, packet):
app.run_hook(hook, packet)
def _run_operation(handler, packet):
def _run_operation(handler, packet, encryption_passphrase=None):
"""Run handler and pre/post hooks for backup/restore operations."""
_run_hooks(packet.operation + '_pre', packet)
handler(packet)
handler(packet, encryption_passphrase=encryption_passphrase)
_run_hooks(packet.operation + '_post', packet)

View File

@ -25,6 +25,8 @@ from django.utils.translation import ugettext, ugettext_lazy as _
from plinth.utils import format_lazy
from . import api, network_storage, ROOT_REPOSITORY_NAME
from .errors import BorgRepositoryDoesNotExistError
from .repository import SshBorgRepository
def _get_app_choices(apps):
@ -119,6 +121,15 @@ class AddRepositoryForm(forms.Form):
required=False
)
def get_credentials(self):
credentials = {}
for field_name in ["ssh_password", "encryption_passphrase"]:
field_value = self.cleaned_data.get(field_name, None)
if field_value:
credentials[field_name] = field_value
return credentials
def clean(self):
cleaned_data = super(AddRepositoryForm, self).clean()
passphrase = cleaned_data.get("encryption_passphrase")
@ -128,3 +139,14 @@ class AddRepositoryForm(forms.Form):
raise forms.ValidationError(
"The entered encryption passphrases do not match"
)
path = cleaned_data.get("repository")
credentials = self.get_credentials()
self.repository = SshBorgRepository(path=path, credentials=credentials)
try:
self.repository.get_info()
except BorgRepositoryDoesNotExistError:
pass
except Exception as err:
msg = _('Accessing the remote repository failed. Details: %(err)s')
raise forms.ValidationError(msg, params={'err': str(err)})

View File

@ -63,15 +63,27 @@ class BorgRepository(object):
is_mounted = True
def __init__(self, path, credentials={}):
self.path = path
self._path = path
self.credentials = credentials
def append_credentials(self, arguments):
"""Append '--encryption-passphrase' argument to backups call"""
passphrase = self.credentials.get('encryption_passphrase', None)
if passphrase:
arguments += ['--encryption-passphrase', passphrase]
return arguments
@property
def repo_path(self):
"""Return the repository that the backups action script should use."""
return self._path
def get_info(self):
output = self._run('backups', ['info', '--path', self.path])
output = self.run(['info', '--path', self.repo_path])
return json.loads(output)
def list_archives(self):
output = self._run('backups', ['list-repo', '--path', self.path])
output = self.run(['list-repo', '--path', self.repo_path])
return json.loads(output)['archives']
def get_view_content(self):
@ -82,17 +94,19 @@ class BorgRepository(object):
'error': ''
}
try:
repository['archives'] = self.list_archives()
repository['mounted'] = self.is_mounted
error = ''
except (BorgError, ActionError) as err:
repository['mounted'] = self.is_mounted
if repository['mounted']:
repository['archives'] = self.list_archives()
except (BorgError, ActionError) as \
err:
error = str(err)
repository['error'] = error
return repository
def delete_archive(self, archive_name):
archive_path = self.get_archive_path(archive_name)
self._run('backups', ['delete-archive', '--path', archive_path])
self.run(['delete-archive', '--path', archive_path])
def remove_repository(self):
"""Remove a borg repository"""
@ -100,19 +114,20 @@ class BorgRepository(object):
def create_archive(self, archive_name, app_names):
archive_path = self.get_archive_path(archive_name)
passphrase = self.credentials.get('encryption_passphrase', None)
api.backup_apps(_backup_handler, path=archive_path,
app_names=app_names)
app_names=app_names, encryption_passphrase=passphrase)
def create_repository(self):
self._run('backups', ['init', '--path', self.path, '--encryption',
'none'])
self.run(['init', '--path', self.repo_path, '--encryption', 'none'])
def get_zipstream(self, archive_name):
archive_path = self.get_archive_path(archive_name)
args = ['export-tar', '--path', archive_path]
args = self.append_credentials(args)
kwargs = {'run_in_background': True,
'bufsize': 1}
proc = self._run('backups', args, kwargs=kwargs, use_credentials=False)
proc = self._run('backups', args, kwargs=kwargs)
return zipstream.ZipStream(proc.stdout, 'readline')
def get_archive(self, name):
@ -125,24 +140,21 @@ class BorgRepository(object):
def get_archive_apps(self, archive_name):
"""Get list of apps included in an archive."""
archive_path = self.get_archive_path(archive_name)
output = self._run('backups', ['get-archive-apps', '--path',
archive_path])
output = self.run(['get-archive-apps', '--path', archive_path])
return output.splitlines()
def restore_archive(self, archive_name, apps=None):
archive_path = self.get_archive_path(archive_name)
passphrase = self.credentials.get('encryption_passphrase', None)
api.restore_apps(restore_archive_handler, app_names=apps,
create_subvolume=False, backup_file=archive_path)
create_subvolume=False, backup_file=archive_path,
encryption_passphrase=passphrase)
def get_archive_path(self, archive_name):
return "::".join([self.path, archive_name])
return "::".join([self.repo_path, archive_name])
def _get_env(self):
return dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes')
def _run(self, cmd, arguments, superuser=True, kwargs=None,
use_credentials=False):
"""Run a backups action script command."""
def _run(self, cmd, arguments, superuser=True, kwargs=None):
"""Run a backups or sshfs action script command."""
if kwargs is None:
kwargs = {}
try:
@ -153,6 +165,9 @@ class BorgRepository(object):
except ActionError as err:
self.reraise_known_error(err)
def run(self, arguments):
return self._run('backups', arguments)
def reraise_known_error(self, err):
"""Look whether the caught error is known and reraise it accordingly"""
caught_error = str(err)
@ -175,19 +190,19 @@ class SshBorgRepository(BorgRepository):
**kwargs):
"""
Provide a uuid to instanciate an existing repository,
or 'path' and 'credentials' for a new repository.
or 'ssh_path' and 'credentials' for a new repository.
"""
if uuid:
self.uuid = uuid
# If all data are given, instanciate right away.
if path and credentials:
self.path = path
self._path = path
self.credentials = credentials
else:
self._load_from_kvstore()
# No uuid given: new instance.
elif path and credentials:
self.path = path
self._path = path
self.credentials = credentials
else:
raise ValueError('Invalid arguments.')
@ -195,13 +210,23 @@ class SshBorgRepository(BorgRepository):
if self.uuid and not self.is_mounted:
self.mount()
@property
def repo_path(self):
"""
Return the path to use for backups actions.
This is either the mountpoint or the remote ssh path,
depending on whether borg is running on the remote server.
"""
return self.mountpoint
@property
def mountpoint(self):
return os.path.join(SSHFS_MOUNTPOINT, self.uuid)
@property
def name(self):
return self.path
return self._path
@property
def is_mounted(self):
@ -218,11 +243,11 @@ class SshBorgRepository(BorgRepository):
self.credentials = storage['credentials']
except KeyError:
self.credentials = {}
self.path = storage['path']
self._path = storage['path']
def _get_network_storage_format(self, store_credentials):
storage = {
'path': self.path,
'path': self._path,
'storage_type': self.storage_type,
'added_by_module': 'backups'
}
@ -236,8 +261,8 @@ class SshBorgRepository(BorgRepository):
"""Initialize / create a borg repository."""
if encryption not in SUPPORTED_BORG_ENCRYPTION:
raise ValueError('Unsupported encryption: %s' % encryption)
self._run('backups', ['init', '--path', self.path, '--encryption',
encryption])
self.run(['init', '--path', self.repo_path, '--encryption',
encryption])
def save(self, store_credentials=True):
"""
@ -249,8 +274,10 @@ class SshBorgRepository(BorgRepository):
def mount(self):
arguments = ['mount', '--mountpoint', self.mountpoint, '--path',
self.path]
self._run('sshfs', arguments)
self._path]
arguments, kwargs = self._append_sshfs_arguments(arguments,
self.credentials)
self._run('sshfs', arguments, kwargs=kwargs)
def umount(self):
self._run('sshfs', ['umount', '--mountpoint', self.mountpoint])
@ -269,44 +296,39 @@ class SshBorgRepository(BorgRepository):
except Exception as err:
logger.error(err)
def _get_run_arguments(self, arguments, credentials, kwargs=None):
def _append_sshfs_arguments(self, arguments, credentials, kwargs=None):
if kwargs is None:
kwargs = {}
if 'encryption_passphrase' in credentials and \
credentials['encryption_passphrase']:
arguments += ['--encryption-passphrase',
credentials['encryption_passphrase']]
if 'ssh_password' in credentials and credentials['ssh_password']:
kwargs['input'] = credentials['ssh_password'].encode()
if 'ssh_keyfile' in credentials and credentials['ssh_keyfile']:
arguments += ['--ssh-keyfile', credentials['ssh_keyfile']]
return (arguments, kwargs)
def _run(self, cmd, arguments, superuser=True, use_credentials=True,
kwargs=None):
def _append_run_arguments(self, arguments, credentials):
kwargs = {}
passphrase = credentials.get('encryption_passphrase', None)
if passphrase:
arguments += ['--encryption-passphrase', passphrase]
# TODO: use or remove
"""
if 'ssh_password' in credentials and credentials['ssh_password']:
kwargs['input'] = credentials['ssh_password'].encode()
if 'ssh_keyfile' in credentials and credentials['ssh_keyfile']:
arguments += ['--ssh-keyfile', credentials['ssh_keyfile']]
"""
return (arguments, kwargs)
def run(self, arguments, superuser=True):
"""Run a backups action script command.
Automatically passes on credentials via self._get_run_arguments to the
backup script via environment variables or input, except if you
set use_credentials to False.
Add credentials via self._append_run_arguments to the backup script.
"""
if use_credentials:
if not self.credentials:
msg = 'Cannot access ssh repo without credentials'
raise BorgError(msg)
for key in self.credentials.keys():
if key not in self.KNOWN_CREDENTIALS:
raise ValueError('Unknown credentials entry: %s' % key)
arguments, kwargs = self._get_run_arguments(arguments,
self.credentials,
kwargs=kwargs)
try:
if superuser:
return actions.superuser_run(cmd, arguments, **kwargs)
else:
return actions.run(cmd, arguments, **kwargs)
except ActionError as err:
self.reraise_known_error(err)
for key in self.credentials.keys():
if key not in self.KNOWN_CREDENTIALS:
raise ValueError('Unknown credentials entry: %s' % key)
arguments = self.append_credentials(arguments)
return self._run('backups', arguments, superuser=superuser)
def get_ssh_repositories():

View File

@ -99,7 +99,11 @@
{% endfor %}
{% if not repository.error and not repository.archives %}
<p>{% trans 'No archives currently exist.' %}</p>
<tr>
<td>
<p>{% trans 'No archives currently exist.' %}</p>
</td>
</tr>
{% endif %}
{% endif %}

View File

@ -25,7 +25,7 @@
<h3>{{ title }}</h3>
<form class="form" method="post" target="_blank">
<form class="form" method="post">
{% csrf_token %}
{{ form|bootstrap }}

View File

@ -25,8 +25,8 @@
<h2>{{ title }}</h2>
<p>
{% trans "Are you sure that you want to remove the repository" %}<br />
<b>
{% trans "Are you sure that you want to remove the repository" %}
{{ repository.path }}?
</b>
</p>

View File

@ -234,7 +234,7 @@ class TestBackupProcesses(unittest.TestCase):
packet.apps[1].run_hook = MagicMock()
handler = MagicMock()
api._run_operation(handler, packet)
handler.assert_has_calls([call(packet)])
handler.assert_has_calls([call(packet, encryption_passphrase=None)])
calls = [call('backup_pre', packet), call('backup_post', packet)]
packet.apps[0].run_hook.assert_has_calls(calls)

View File

@ -22,6 +22,7 @@ import os
import shutil
import tempfile
import unittest
import uuid
from plinth import cfg
from plinth.modules import backups
@ -35,6 +36,13 @@ euid = os.geteuid()
class TestBackups(unittest.TestCase):
"""Test creating, reading and deleting a repository"""
# try to access a non-existing url and a URL that exists but does not
# grant access
nonexisting_repo_url = "user@%s.com.au:~/repo" % str(uuid.uuid1())
inaccessible_repo_url = "user@heise.de:~/repo"
dummy_credentials = {
'ssh_password': 'invalid_password'
}
@classmethod
def setUpClass(cls):
@ -116,14 +124,14 @@ class TestBackups(unittest.TestCase):
return
ssh_path = test_config.backups_ssh_path
ssh_repo = SshBorgRepository(uuid='plinth_test_sshfs',
path=ssh_path,
credentials=credentials,
automount=False)
ssh_repo.mount()
self.assertTrue(ssh_repo.is_mounted)
ssh_repo.umount()
self.assertFalse(ssh_repo.is_mounted)
repository = SshBorgRepository(uuid=str(uuid.uuid1()),
path=ssh_path,
credentials=credentials,
automount=False)
repository.mount()
self.assertTrue(repository.is_mounted)
repository.umount()
self.assertFalse(repository.is_mounted)
@unittest.skipUnless(euid == 0, 'Needs to be root')
def test_ssh_create_encrypted_repository(self):
@ -134,11 +142,32 @@ class TestBackups(unittest.TestCase):
# using SshBorgRepository to provide credentials because
# BorgRepository does not allow creating encrypted repositories
# TODO: find better way to test encryption
repository = SshBorgRepository(path=encrypted_repo,
credentials=credentials)
repository = SshBorgRepository(uuid=str(uuid.uuid1()),
path=encrypted_repo,
credentials=credentials,
automount=False)
repository.create_repository('repokey')
self.assertTrue(bool(repository.get_info()))
@unittest.skipUnless(euid == 0, 'Needs to be root')
def test_access_nonexisting_url(self):
repository = SshBorgRepository(uuid=str(uuid.uuid1()),
path=self.nonexisting_repo_url,
credentials=self.dummy_credentials,
automount=False)
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
@unittest.skipUnless(euid == 0, 'Needs to be root')
def test_inaccessible_repo_url(self):
"""Test accessing an existing URL with wrong credentials"""
repository = SshBorgRepository(uuid=str(uuid.uuid1()),
path=self.inaccessible_repo_url,
credentials=self.dummy_credentials,
automount=False)
with self.assertRaises(backups.errors.BorgError):
repository.get_info()
def get_credentials(self):
"""
Get access params for a remote location.

View File

@ -235,6 +235,7 @@ class RestoreArchiveView(BaseRestoreView):
def form_valid(self, form):
"""Restore files from the archive on valid form submission."""
repository = get_repository(self.kwargs['uuid'])
import ipdb; ipdb.set_trace()
repository.restore_archive(self.kwargs['name'],
form.cleaned_data['selected_apps'])
return super().form_valid(form)
@ -269,24 +270,12 @@ class AddRepositoryView(SuccessMessageMixin, FormView):
return context
def form_valid(self, form):
"""Restore files from the archive on valid form submission."""
path = form.cleaned_data['repository']
credentials = {}
encryption_passphrase = form.cleaned_data['encryption_passphrase']
if encryption_passphrase:
credentials['encryption_passphrase'] = encryption_passphrase
if form.cleaned_data['ssh_password']:
credentials['ssh_password'] = form.cleaned_data['ssh_password']
# TODO: add ssh_keyfile
# ssh_keyfile = form.cleaned_data['ssh_keyfile']
repository = SshBorgRepository(path=path, credentials=credentials)
"""Create and store the repository."""
try:
repository.get_info()
form.repository.get_info()
except BorgRepositoryDoesNotExistError:
repository.create_repository(form.cleaned_data['encryption'])
repository.save(store_credentials=True)
form.repository.create_repository(form.cleaned_data['encryption'])
form.repository.save(store_credentials=True)
return super().form_valid(form)
@ -322,12 +311,12 @@ class RemoveRepositoryView(SuccessMessageMixin, TemplateView):
"""Return additional context for rendering the template."""
context = super().get_context_data(**kwargs)
context['title'] = _('Remove Repository')
context['repository'] = SshBorgRepository(uuid=uuid)
context['repository'] = SshBorgRepository(uuid=uuid, automount=False)
return context
def post(self, request, uuid):
"""Delete the archive."""
repository = SshBorgRepository(uuid)
repository = SshBorgRepository(uuid, automount=False)
repository.remove_repository()
messages.success(request, _('Repository removed. The remote backup '
'itself was not deleted.'))
@ -343,7 +332,7 @@ def umount_repository(request, uuid):
def mount_repository(request, uuid):
repository = SshBorgRepository(uuid=uuid)
repository = SshBorgRepository(uuid=uuid, automount=False)
try:
repository.mount()
except Exception as err: