Sunil Mohan Adapa 9bd1f80d5c
*: Always pass check= argument to subprocess.run()
- Avoid flake8 warnings.

- Makes the call more explicitly readable in case an exception is expected but
check=True is not passed by mistake.

Tests:

- Many tests are skipped since the changes are considered trivial.
check=False is already the default for subprocess.run() method.

- actions/package: Install an app when it is not installed.

- actions/upgrade: Run manual upgrades.

- actions/users: Change a user password. Login. Create/remove a user.

- actions/zoph: Restore a database.

- container: On a fresh repository, run ./container up,ssh,stop,destroy for a
testing container.

- plinth/action_utils.py: Enable/disable an app that has a running service.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
2021-10-11 14:34:40 -04:00

298 lines
11 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Forms for backups module.
"""
import logging
import os
import re
import subprocess
from django import forms
from django.core.exceptions import ValidationError
from django.core.validators import (FileExtensionValidator,
validate_ipv46_address)
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
from plinth.modules.storage import get_mounts
from plinth.utils import format_lazy
from . import api, split_path
from .repository import get_repositories
logger = logging.getLogger(__name__)
def _get_app_choices(components):
"""Return a list of check box multiple choices from list of components."""
choices = []
for component in components:
name = component.app.info.name
if not component.has_data:
name = gettext('{app} (No data to backup)').format(
app=component.app.info.name)
choices.append((component.app_id, name))
return choices
def _get_repository_choices():
"""Return the list of available repositories."""
choices = [(repository.uuid, repository.name)
for repository in get_repositories() if repository.is_usable()]
return choices
class ScheduleForm(forms.Form):
"""Form to edit backups schedule."""
enabled = forms.BooleanField(
label=_('Enable scheduled backups'), required=False,
help_text=_('If enabled, a backup is taken every day, every week and '
'every month. Older backups are removed.'))
daily_to_keep = forms.IntegerField(
label=_('Number of daily backups to keep'), required=True, min_value=0,
help_text=_('This many latest backups are kept and the rest are '
'removed. A value of "0" disables backups of this type. '
'Triggered at specified hour every day.'))
weekly_to_keep = forms.IntegerField(
label=_('Number of weekly backups to keep'), required=True,
min_value=0,
help_text=_('This many latest backups are kept and the rest are '
'removed. A value of "0" disables backups of this type. '
'Triggered at specified hour every Sunday.'))
monthly_to_keep = forms.IntegerField(
label=_('Number of monthly backups to keep'), required=True,
min_value=0,
help_text=_('This many latest backups are kept and the rest are '
'removed. A value of "0" disables backups of this type. '
'Triggered at specified hour first day of every month.'))
run_at_hour = forms.IntegerField(
label=_('Hour of the day to trigger backup operation'), required=True,
min_value=0, max_value=23, help_text=_('In 24 hour format.'))
selected_apps = forms.MultipleChoiceField(
label=_('Included apps'), help_text=_('Apps to include in the backup'),
widget=forms.CheckboxSelectMultiple(attrs={'class': 'has-select-all'}))
def __init__(self, *args, **kwargs):
"""Initialize the form with selectable apps."""
super().__init__(*args, **kwargs)
components = api.get_all_components_for_backup()
choices = _get_app_choices(components)
self.fields['selected_apps'].choices = choices
self.fields['selected_apps'].initial = [
choice[0] for choice in choices
if choice[0] not in self.initial.get('unselected_apps', [])
]
class CreateArchiveForm(forms.Form):
repository = forms.ChoiceField(label=_('Repository'))
name = forms.RegexField(
label=_('Name'),
help_text=_('(Optional) Set a name for this backup archive'),
regex=r'^[^{}/]*$', required=False, strip=True)
selected_apps = forms.MultipleChoiceField(
label=_('Included apps'), help_text=_('Apps to include in the backup'),
widget=forms.CheckboxSelectMultiple(attrs={'class': 'has-select-all'}))
def __init__(self, *args, **kwargs):
"""Initialize the form with selectable apps."""
super().__init__(*args, **kwargs)
components = api.get_all_components_for_backup()
choices = _get_app_choices(components)
self.fields['selected_apps'].choices = choices
self.fields['selected_apps'].initial = [
choice[0] for choice in choices
]
self.fields['repository'].choices = _get_repository_choices()
class RestoreForm(forms.Form):
selected_apps = forms.MultipleChoiceField(
label=_('Select the apps you want to restore'),
widget=forms.CheckboxSelectMultiple(attrs={'class': 'has-select-all'}))
def __init__(self, *args, **kwargs):
"""Initialize the form with selectable apps."""
components = kwargs.pop('components')
super().__init__(*args, **kwargs)
choices = _get_app_choices(components)
self.fields['selected_apps'].choices = choices
self.fields['selected_apps'].initial = [
choice[0] for choice in choices
]
class UploadForm(forms.Form):
file = forms.FileField(
label=_('Upload File'), required=True, validators=[
FileExtensionValidator(
['gz'], _('Backup files have to be in .tar.gz format'))
], help_text=_('Select the backup file you want to upload'))
def repository_validator(path):
"""Validate an SSH repository path."""
if not ('@' in path and ':' in path):
raise ValidationError(_('Repository path format incorrect.'))
username, hostname, dir_path = split_path(path)
hostname = hostname.split('%')[0]
# Validate username using Unix username regex
if not re.match(r'[a-z0-9_][a-z0-9_-]*$', username):
raise ValidationError(_(f'Invalid username: {username}'))
# The hostname should either be a valid IP address or hostname
# Follows RFC1123 (hostnames can start with digits) instead of RFC952
hostname_re = (r'^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*'
r'([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$')
try:
validate_ipv46_address(hostname)
except ValidationError:
if not re.match(hostname_re, hostname):
raise ValidationError(_(f'Invalid hostname: {hostname}'))
# Validate directory path
if not re.match(r'[^\0]*', dir_path):
raise ValidationError(_(f'Invalid directory path: {dir_path}'))
class EncryptedBackupsMixin(forms.Form):
"""Form to add a new backup repository."""
encryption = forms.ChoiceField(
label=_('Encryption'), help_text=format_lazy(
_('"Key in Repository" means that a '
'password-protected key is stored with the backup.')),
choices=[('repokey', _('Key in Repository')), ('none', _('None'))])
encryption_passphrase = forms.CharField(
label=_('Passphrase'),
help_text=_('Passphrase; Only needed when using encryption.'),
widget=forms.PasswordInput(), required=False)
confirm_encryption_passphrase = forms.CharField(
label=_('Confirm Passphrase'), help_text=_('Repeat the passphrase.'),
widget=forms.PasswordInput(), required=False)
def clean(self):
super().clean()
passphrase = self.cleaned_data.get('encryption_passphrase')
confirm_passphrase = self.cleaned_data.get(
'confirm_encryption_passphrase')
if passphrase != confirm_passphrase:
raise forms.ValidationError(
_('The entered encryption passphrases do not match'))
if self.cleaned_data.get('encryption') != 'none' and not passphrase:
raise forms.ValidationError(
_('Passphrase is needed for encryption.'))
return self.cleaned_data
encryption_fields = [
'encryption', 'encryption_passphrase', 'confirm_encryption_passphrase'
]
def get_disk_choices():
"""Returns a list of all available partitions except the root partition."""
repositories = get_repositories()
existing_paths = [
repository.path for repository in repositories
if repository.storage_type == 'disk'
]
choices = []
for device in get_mounts():
if device['mount_point'] == '/':
continue
path = os.path.join(device['mount_point'], 'FreedomBoxBackups')
if path in existing_paths:
continue
name = device['label'] if device['label'] else device['mount_point']
choices.append((device['mount_point'], name))
return choices
class AddRepositoryForm(EncryptedBackupsMixin, forms.Form):
"""Form to create a new backups repository on a disk."""
disk = forms.ChoiceField(
label=_('Select Disk or Partition'), help_text=format_lazy(
_('Backups will be stored in the directory FreedomBoxBackups')),
choices=get_disk_choices)
field_order = ['disk'] + encryption_fields
class AddRemoteRepositoryForm(EncryptedBackupsMixin, forms.Form):
"""Form to add new SSH remote repository."""
repository = forms.CharField(
label=_('SSH Repository Path'), strip=True,
help_text=_('Path of a new or existing repository. Example: '
'<i>user@host:~/path/to/repo/</i>'),
validators=[repository_validator])
ssh_password = forms.CharField(
label=_('SSH server password'), strip=True,
help_text=_('Password of the SSH Server.<br />'
'SSH key-based authentication is not yet possible.'),
widget=forms.PasswordInput(), required=False)
field_order = ['repository', 'ssh_password'] + encryption_fields
def clean_repository(self):
"""Validate repository form field."""
path = self.cleaned_data.get('repository')
# Avoid creation of duplicate ssh remotes
self._check_if_duplicate_remote(path)
return path
@staticmethod
def _check_if_duplicate_remote(path):
"""Raise validation error if given path is a stored remote."""
for repository in get_repositories():
if repository.path == path:
raise forms.ValidationError(
_('Remote backup repository already exists.'))
class VerifySshHostkeyForm(forms.Form):
"""Form to verify the SSH public key for a host."""
ssh_public_key = forms.ChoiceField(
label=_('Select verified SSH public key'), widget=forms.RadioSelect)
def __init__(self, *args, **kwargs):
"""Initialize the form with selectable apps."""
hostname = kwargs.pop('hostname')
super().__init__(*args, **kwargs)
(self.fields['ssh_public_key'].choices,
self.keyscan_error) = self._get_all_public_keys(hostname)
@staticmethod
def _get_all_public_keys(hostname):
"""Use ssh-keyscan to get all the SSH public keys of a host."""
# Fetch public keys of ssh remote
keyscan = subprocess.run(['ssh-keyscan', hostname],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, check=False)
keys = keyscan.stdout.decode().splitlines()
error_message = keyscan.stderr.decode() if keyscan.returncode else None
# Generate user-friendly fingerprints of public keys
keygen = subprocess.run(['ssh-keygen', '-l', '-f', '-'],
input=keyscan.stdout, stdout=subprocess.PIPE,
check=False)
fingerprints = keygen.stdout.decode().splitlines()
return zip(keys, fingerprints), error_message