mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
backups: Don't rely on disk labels during export/restore
Disk labels are unreliable. They may not exist. There are not unique and two disks in the system may have the same label. Instead use the device path of the disk/partition. Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
parent
b29ccfc060
commit
c33b192f71
@ -213,8 +213,8 @@ def backup_restore(browser, app_name):
|
||||
browser.visit(default_url)
|
||||
nav_to_module(browser, 'backups')
|
||||
browser.find_link_by_href(
|
||||
'/plinth/sys/backups/restore/Root%2520Filesystem/_functional_test_' +
|
||||
app_name + '.tar.gz/').first.click()
|
||||
'/plinth/sys/backups/restore/%252F/_functional_test_' + app_name +
|
||||
'.tar.gz/').first.click()
|
||||
submit(browser)
|
||||
|
||||
|
||||
|
||||
@ -109,7 +109,9 @@ def delete_archive(name):
|
||||
|
||||
|
||||
def export_archive(name, location):
|
||||
filepath = get_archive_path(location, get_valid_filename(name) + '.tar.gz')
|
||||
location_path = get_location_path(location)
|
||||
filepath = get_archive_path(location_path,
|
||||
get_valid_filename(name) + '.tar.gz')
|
||||
# TODO: that's a full path, not a filename; rename argument
|
||||
actions.superuser_run('backups',
|
||||
['export', '--name', name, '--filename', filepath])
|
||||
@ -117,35 +119,46 @@ def export_archive(name, location):
|
||||
|
||||
def get_export_locations():
|
||||
"""Return a list of storage locations for exported backup archives."""
|
||||
locations = [('/var/lib/freedombox/', _('Root Filesystem'))]
|
||||
locations = [{
|
||||
'path': '/var/lib/freedombox/',
|
||||
'label': _('Root Filesystem'),
|
||||
'device': '/'
|
||||
}]
|
||||
if storage.is_running():
|
||||
devices = storage.udisks2.list_devices()
|
||||
for device in devices:
|
||||
if 'mount_points' in device and len(device['mount_points']) > 0:
|
||||
name = device['label'] or device['device']
|
||||
locations.append((device['mount_points'][0], name))
|
||||
location = {
|
||||
'path': device['mount_points'][0],
|
||||
'label': device['label'] or device['device'],
|
||||
'device': device['device']
|
||||
}
|
||||
locations.append(location)
|
||||
|
||||
return locations
|
||||
|
||||
|
||||
def get_location_path(label, locations=None):
|
||||
def get_location_path(device, locations=None):
|
||||
"""Returns the location path given a disk label"""
|
||||
if locations is None:
|
||||
locations = get_export_locations()
|
||||
for (location_path, location_label) in locations:
|
||||
if location_label == label:
|
||||
return location_path
|
||||
raise PlinthError("Could not find path of location %s" % label)
|
||||
|
||||
for location in locations:
|
||||
if location['device'] == device:
|
||||
return location['path']
|
||||
|
||||
raise PlinthError('Could not find path of location %s' % device)
|
||||
|
||||
|
||||
def get_export_files():
|
||||
"""Return a dict of exported backup archives found in storage locations."""
|
||||
locations = get_export_locations()
|
||||
export_files = {}
|
||||
export_files = []
|
||||
for location in locations:
|
||||
output = actions.superuser_run(
|
||||
'backups', ['list-exports', '--location', location[0]])
|
||||
export_files[location[1]] = json.loads(output)
|
||||
'backups', ['list-exports', '--location', location['path']])
|
||||
location['files'] = json.loads(output)
|
||||
export_files.append(location)
|
||||
|
||||
return export_files
|
||||
|
||||
@ -154,9 +167,9 @@ def get_archive_path(location, archive_name):
|
||||
return os.path.join(location, BACKUP_FOLDER_NAME, archive_name)
|
||||
|
||||
|
||||
def find_exported_archive(disk_label, archive_name):
|
||||
def find_exported_archive(device, archive_name):
|
||||
"""Return the full path for the exported archive file."""
|
||||
location_path = get_location_path(disk_label)
|
||||
location_path = get_location_path(device)
|
||||
return get_archive_path(location_path, archive_name)
|
||||
|
||||
|
||||
@ -175,8 +188,8 @@ def _restore_handler(packet):
|
||||
input=locations_data.encode())
|
||||
|
||||
|
||||
def restore_exported(label, archive_name, apps=None):
|
||||
def restore_exported(device, archive_name, apps=None):
|
||||
"""Restore files from exported backup archive."""
|
||||
filename = find_exported_archive(label, archive_name)
|
||||
filename = find_exported_archive(device, archive_name)
|
||||
api.restore_apps(_restore_handler, app_names=apps, create_subvolume=False,
|
||||
backup_file=filename)
|
||||
|
||||
@ -71,7 +71,8 @@ class ExportArchiveForm(forms.Form):
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Initialize the form with disk choices."""
|
||||
super().__init__(*args, **kwargs)
|
||||
self.fields['disk'].choices = get_export_locations()
|
||||
self.fields['disk'].choices = [(location['device'], location['label'])
|
||||
for location in get_export_locations()]
|
||||
|
||||
|
||||
class RestoreForm(forms.Form):
|
||||
@ -104,7 +105,7 @@ class UploadForm(forms.Form):
|
||||
locations = get_export_locations()
|
||||
# users should only be able to select a location name -- don't
|
||||
# provide paths as a form input for security reasons
|
||||
location_labels = [(location[1], location[1])
|
||||
location_labels = [(location['device'], location['label'])
|
||||
for location in locations]
|
||||
self.fields['location'].choices = location_labels
|
||||
|
||||
@ -112,8 +113,8 @@ class UploadForm(forms.Form):
|
||||
"""Check that the uploaded file does not yet exist."""
|
||||
cleaned_data = super().clean()
|
||||
file = cleaned_data.get('file')
|
||||
location_label = cleaned_data.get('location')
|
||||
location_path = get_location_path(location_label)
|
||||
location_device = cleaned_data.get('location')
|
||||
location_path = get_location_path(location_device)
|
||||
# if other errors occured before, 'file' won't be in cleaned_data
|
||||
if (file and file.name):
|
||||
filepath = get_archive_path(location_path, file.name)
|
||||
|
||||
@ -113,18 +113,18 @@
|
||||
</thead>
|
||||
|
||||
<tbody>
|
||||
{% for label, file_list in exports.items %}
|
||||
{% for name in file_list %}
|
||||
{% for export in exports %}
|
||||
{% for name in export.files %}
|
||||
<tr id="export-{{ label }}-{{ name }}" class="export">
|
||||
<td class="export-label">{{ label }}</td>
|
||||
<td class="export-label">{{ export.label }}</td>
|
||||
<td class="export-name">{{ name }}</td>
|
||||
<td class="export-operations">
|
||||
<a class="download btn btn-sm btn-default" target="_blank"
|
||||
href="{% url 'backups:download' label|urlencode:'' name|urlencode:'' %}">
|
||||
href="{% url 'backups:download' export.device|urlencode:'' name|urlencode:'' %}">
|
||||
{% trans "Download" %}
|
||||
</a>
|
||||
<a class="restore btn btn-sm btn-default"
|
||||
href="{% url 'backups:restore' label|urlencode:'' name|urlencode:'' %}">
|
||||
href="{% url 'backups:restore' export.device|urlencode:'' name|urlencode:'' %}">
|
||||
{% trans "Restore" %}
|
||||
</a>
|
||||
</td>
|
||||
|
||||
@ -60,6 +60,7 @@ def _get_backup_app(name):
|
||||
|
||||
class TestBackupApp(unittest.TestCase):
|
||||
"""Test the BackupApp class."""
|
||||
|
||||
def test_run_hook(self):
|
||||
"""Test running a hook on an application."""
|
||||
packet = api.Packet('backup', 'apps', '/', [])
|
||||
@ -73,7 +74,8 @@ class TestBackupApp(unittest.TestCase):
|
||||
app.testhook_pre.reset_mock()
|
||||
app.testhook_pre.side_effect = Exception()
|
||||
backup_app.run_hook(hook, packet)
|
||||
self.assertEqual(packet.errors, [api.BackupError('hook', app, hook=hook)])
|
||||
self.assertEqual(packet.errors,
|
||||
[api.BackupError('hook', app, hook=hook)])
|
||||
|
||||
del app.testhook_pre
|
||||
backup_app.run_hook(hook, packet)
|
||||
@ -137,10 +139,14 @@ class TestBackupProcesses(unittest.TestCase):
|
||||
|
||||
@staticmethod
|
||||
def test_export_locations():
|
||||
"""Check get_export_locations returns a list of tuples of length 2."""
|
||||
"""Check get_export_locations returns a list of locations."""
|
||||
locations = get_export_locations()
|
||||
assert locations
|
||||
assert len(locations[0]) == 2
|
||||
for location in locations:
|
||||
assert isinstance(location, dict)
|
||||
assert isinstance(location['path'], str)
|
||||
assert isinstance(location['device'], str)
|
||||
assert str(location['label'])
|
||||
|
||||
@staticmethod
|
||||
@patch('plinth.module_loader.loaded_modules.items')
|
||||
@ -251,16 +257,22 @@ class TestBackupModule(unittest.TestCase):
|
||||
|
||||
def test_get_location_path(self):
|
||||
"""Test the 'get_location_path' method"""
|
||||
locations = [('/var/www', 'dummy location'), ('/etc', 'dangerous')]
|
||||
location = get_location_path('dummy location', locations)
|
||||
self.assertEquals(location, locations[0][0])
|
||||
locations = [{
|
||||
'path': '/var/www',
|
||||
'device': '/dummy/device'
|
||||
}, {
|
||||
'path': '/etc',
|
||||
'device': '/dangerous'
|
||||
}]
|
||||
location_path = get_location_path('/dummy/device', locations)
|
||||
self.assertEqual(location_path, locations[0]['path'])
|
||||
# verify that an unknown location raises an error
|
||||
with self.assertRaises(PlinthError):
|
||||
get_location_path('unknown location', locations)
|
||||
get_location_path('/unknown/device', locations)
|
||||
|
||||
def test_file_upload(self):
|
||||
locations = get_export_locations()
|
||||
location_name = locations[0][1]
|
||||
location_name = locations[0]['device']
|
||||
post_data = {'location': location_name}
|
||||
|
||||
# posting a video should fail
|
||||
|
||||
@ -28,11 +28,11 @@ urlpatterns = [
|
||||
url(r'^sys/backups/create/$', CreateArchiveView.as_view(), name='create'),
|
||||
url(r'^sys/backups/export/(?P<name>[^/]+)/$',
|
||||
ExportArchiveView.as_view(), name='export'),
|
||||
url(r'^sys/backups/download/(?P<label>[^/]+)/(?P<name>[^/]+)/$',
|
||||
url(r'^sys/backups/download/(?P<device>[^/]+)/(?P<name>[^/]+)/$',
|
||||
DownloadArchiveView.as_view(), name='download'),
|
||||
url(r'^sys/backups/delete/(?P<name>[^/]+)/$',
|
||||
DeleteArchiveView.as_view(), name='delete'),
|
||||
url(r'^sys/backups/upload/$', UploadArchiveView.as_view(), name='upload'),
|
||||
url(r'^sys/backups/restore/(?P<label>[^/]+)/(?P<name>[^/]+)/$',
|
||||
url(r'^sys/backups/restore/(?P<device>[^/]+)/(?P<name>[^/]+)/$',
|
||||
RestoreView.as_view(), name='restore'),
|
||||
]
|
||||
|
||||
@ -115,10 +115,10 @@ class DeleteArchiveView(SuccessMessageMixin, TemplateView):
|
||||
|
||||
class DownloadArchiveView(View):
|
||||
"""View to download an archive."""
|
||||
def get(self, request, label, name):
|
||||
label = unquote(label)
|
||||
def get(self, request, device, name):
|
||||
device = unquote(device)
|
||||
name = unquote(name)
|
||||
filepath = find_exported_archive(label, name)
|
||||
filepath = find_exported_archive(device, name)
|
||||
return _get_file_response(filepath, name)
|
||||
|
||||
|
||||
@ -187,9 +187,9 @@ class RestoreView(SuccessMessageMixin, FormView):
|
||||
|
||||
def _get_included_apps(self):
|
||||
"""Save some data used to instantiate the form."""
|
||||
label = unquote(self.kwargs['label'])
|
||||
device = unquote(self.kwargs['device'])
|
||||
name = unquote(self.kwargs['name'])
|
||||
filename = backups.find_exported_archive(label, name)
|
||||
filename = backups.find_exported_archive(device, name)
|
||||
return backups.get_export_apps(filename)
|
||||
|
||||
def get_form_kwargs(self):
|
||||
@ -206,13 +206,13 @@ class RestoreView(SuccessMessageMixin, FormView):
|
||||
"""Return additional context for rendering the template."""
|
||||
context = super().get_context_data(**kwargs)
|
||||
context['title'] = _('Restore from backup')
|
||||
context['label'] = unquote(self.kwargs['label'])
|
||||
context['device'] = unquote(self.kwargs['device'])
|
||||
context['name'] = self.kwargs['name']
|
||||
return context
|
||||
|
||||
def form_valid(self, form):
|
||||
"""Restore files from the archive on valid form submission."""
|
||||
backups.restore_exported(
|
||||
unquote(self.kwargs['label']), self.kwargs['name'],
|
||||
unquote(self.kwargs['device']), self.kwargs['name'],
|
||||
form.cleaned_data['selected_apps'])
|
||||
return super().form_valid(form)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user