Sunil Mohan Adapa 7175a05733
backups: Mark secret strings in privileged actions
Tests:

- Run affected privileged actions through UI and notice that secret strings are
not logged (except initializing init/info a new SSH repository).

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
2024-08-11 12:57:45 -04:00

357 lines
12 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Configure backups (with borg) and sshfs."""
import json
import os
import pathlib
import re
import subprocess
import tarfile
from plinth.actions import privileged, secret_str
from plinth.utils import Version
TIMEOUT = 30
BACKUPS_DATA_PATH = pathlib.Path('/var/lib/plinth/backups-data/')
MANIFESTS_FOLDER = '/var/lib/plinth/backups-manifests/'
class AlreadyMountedError(Exception):
"""Exception raised when mount point is already mounted."""
@privileged
def mount(mountpoint: str, remote_path: str, ssh_keyfile: str | None = None,
password: secret_str | None = None,
user_known_hosts_file: str = '/dev/null'):
"""Mount a remote ssh path via sshfs."""
try:
_validate_mountpoint(mountpoint)
except AlreadyMountedError:
return
input_ = None
# the shell would expand ~/ to the local home directory
remote_path = remote_path.replace('~/', '').replace('~', '')
# 'reconnect', 'ServerAliveInternal' and 'ServerAliveCountMax' allow the
# client (FreedomBox) to keep control of the SSH connection even when the
# SSH server misbehaves. Without these options, other commands such as
# '/usr/share/plinth/actions/actions storage usage_info --no-args', 'df',
# '/usr/share/plinth/actions/actions sshfs is_mounted --no-args', or
# 'mountpoint' might block indefinitely (even when manually invoked from
# the command line). This situation has some lateral effects, causing major
# system instability in the course of ~11 days, and leaving the system in
# such state that the only solution is a reboot.
cmd = [
'sshfs', remote_path, mountpoint, '-o',
f'UserKnownHostsFile={user_known_hosts_file}', '-o',
'StrictHostKeyChecking=yes', '-o', 'reconnect', '-o',
'ServerAliveInterval=15', '-o', 'ServerAliveCountMax=3'
]
if ssh_keyfile:
cmd += ['-o', 'IdentityFile=' + ssh_keyfile]
else:
if not password:
raise ValueError('mount requires either a password or ssh_keyfile')
cmd += ['-o', 'password_stdin']
input_ = password.encode()
subprocess.run(cmd, check=True, timeout=TIMEOUT, input=input_)
@privileged
def umount(mountpoint: str):
"""Unmount a mountpoint."""
subprocess.run(['umount', mountpoint], check=True)
def _validate_mountpoint(mountpoint):
"""Check that the folder is empty, and create it if it doesn't exist."""
if os.path.exists(mountpoint):
if _is_mounted(mountpoint):
raise AlreadyMountedError('Mountpoint %s already mounted' %
mountpoint)
if os.listdir(mountpoint) or not os.path.isdir(mountpoint):
raise ValueError('Mountpoint %s is not an empty directory' %
mountpoint)
else:
os.makedirs(mountpoint)
def _is_mounted(mountpoint):
"""Return boolean whether a local directory is a mountpoint."""
cmd = ['mountpoint', '-q', mountpoint]
# mountpoint exits with status non-zero if it didn't find a mountpoint
try:
subprocess.run(cmd, check=True)
return True
except subprocess.CalledProcessError:
return False
@privileged
def is_mounted(mount_point: str) -> bool:
"""Return whether a path is already mounted."""
return _is_mounted(mount_point)
@privileged
def setup(path: str):
"""Create repository if it does not already exist."""
try:
_run(['borg', 'info', path], check=True)
except subprocess.CalledProcessError:
parent = os.path.dirname(path)
if not os.path.exists(parent):
os.makedirs(parent)
_init_repository(path, encryption='none')
def _init_repository(path: str, encryption: str,
encryption_passphrase: str | None = None):
"""Initialize a local or remote borg repository."""
if encryption != 'none':
if not encryption_passphrase:
raise ValueError('No encryption passphrase provided')
cmd = ['borg', 'init', '--encryption', encryption, path]
_run(cmd, encryption_passphrase)
@privileged
def init(path: str, encryption: str,
encryption_passphrase: secret_str | None = None):
"""Initialize the borg repository."""
_init_repository(path, encryption, encryption_passphrase)
@privileged
def info(path: str, encryption_passphrase: secret_str | None = None) -> dict:
"""Show repository information."""
process = _run(['borg', 'info', '--json', path], encryption_passphrase,
stdout=subprocess.PIPE)
return json.loads(process.stdout.decode())
@privileged
def list_repo(path: str,
encryption_passphrase: secret_str | None = None) -> dict:
"""List repository contents."""
process = _run(['borg', 'list', '--json', '--format="{comment}"', path],
encryption_passphrase, stdout=subprocess.PIPE)
return json.loads(process.stdout.decode())
def _get_borg_version():
"""Return the version of borgbackup."""
process = _run(['borg', '--version'], stdout=subprocess.PIPE)
return process.stdout.decode().split()[1] # Example: "borg 1.1.9"
@privileged
def create_archive(path: str, paths: list[str], comment: str | None = None,
encryption_passphrase: secret_str | None = None):
"""Create archive."""
existing_paths = filter(os.path.exists, paths)
command = ['borg', 'create', '--json']
if comment:
if Version(_get_borg_version()) < Version('1.1.10'):
# Undo any placeholder escape sequences in comments as this version
# of borg does not support placeholders. XXX: Drop this code when
# support for borg < 1.1.10 is dropped.
comment = comment.replace('{{', '{').replace('}}', '}')
command += ['--comment', comment]
command += [path] + list(existing_paths)
_run(command, encryption_passphrase)
@privileged
def delete_archive(path: str, encryption_passphrase: secret_str | None = None):
"""Delete archive."""
_run(['borg', 'delete', path], encryption_passphrase)
def _extract(archive_path, destination, encryption_passphrase, locations=None):
"""Extract archive contents."""
prev_dir = os.getcwd()
borg_call = ['borg', 'extract', archive_path]
# do not extract any files when we get an empty locations list
if locations is not None:
borg_call.extend(locations)
try:
os.chdir(os.path.expanduser(destination))
# TODO: with python 3.7 use subprocess.run with the 'capture_output'
# argument
process = _run(borg_call, encryption_passphrase, check=False,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if process.returncode != 0:
error = process.stderr.decode()
# Don't fail on the borg error when no files were matched
if "never matched" not in error:
raise subprocess.CalledProcessError(process.returncode,
process.args)
finally:
os.chdir(prev_dir)
@privileged
def export_tar(path: str, encryption_passphrase: secret_str | None = None):
"""Export archive contents as tar stream on stdout."""
_run(['borg', 'export-tar', path, '-', '--tar-filter=gzip'],
encryption_passphrase)
def _read_archive_file(archive, filepath, encryption_passphrase):
"""Read the content of a file inside an archive."""
borg_call = ['borg', 'extract', archive, filepath, '--stdout']
return _run(borg_call, encryption_passphrase,
stdout=subprocess.PIPE).stdout.decode()
@privileged
def get_archive_apps(
path: str,
encryption_passphrase: secret_str | None = None) -> list[str]:
"""Get list of apps included in archive."""
manifest_folder = os.path.relpath(MANIFESTS_FOLDER, '/')
borg_call = [
'borg', 'list', path, manifest_folder, '--format', '{path}{NEWLINE}'
]
try:
borg_process = _run(borg_call, encryption_passphrase,
stdout=subprocess.PIPE)
manifest_path = borg_process.stdout.decode().strip()
except subprocess.CalledProcessError:
raise RuntimeError('Borg exited unsuccessfully')
manifest = None
if manifest_path:
manifest_data = _read_archive_file(path, manifest_path,
encryption_passphrase)
manifest = json.loads(manifest_data)
archive_apps = []
if manifest:
for app in _get_apps_of_manifest(manifest):
archive_apps.append(app['name'])
return archive_apps
def _get_apps_of_manifest(manifest):
"""Get apps of a manifest.
Supports both dict format as well as list format of plinth <=0.42
"""
if isinstance(manifest, list):
apps = manifest
elif isinstance(manifest, dict) and 'apps' in manifest:
apps = manifest['apps']
else:
raise RuntimeError('Unknown manifest format')
return apps
@privileged
def get_exported_archive_apps(path: str) -> list[str]:
"""Get list of apps included in an exported archive file."""
manifest = None
with tarfile.open(path) as tar_handle:
filenames = tar_handle.getnames()
for name in filenames:
if 'var/lib/plinth/backups-manifests/' in name \
and name.endswith('.json'):
file_handle = tar_handle.extractfile(name)
if not file_handle:
raise RuntimeError(
'Unable to extract app manifest from backup file.')
manifest_data = file_handle.read()
manifest = json.loads(manifest_data)
break
app_names = []
if manifest:
for app in _get_apps_of_manifest(manifest):
app_names.append(app['name'])
return app_names
@privileged
def restore_archive(archive_path: str, destination: str,
directories: list[str], files: list[str],
encryption_passphrase: secret_str | None = None):
"""Restore files from an archive."""
locations_all = directories + files
locations_all = [
os.path.relpath(location, '/') for location in locations_all
]
_extract(archive_path, destination, encryption_passphrase,
locations=locations_all)
@privileged
def restore_exported_archive(path: str, directories: list[str],
files: list[str]):
"""Restore files from an exported archive."""
with tarfile.open(path) as tar_handle:
for member in tar_handle.getmembers():
path = '/' + member.name
if path in files:
tar_handle.extract(member, '/')
else:
for directory in directories:
if path.startswith(directory):
tar_handle.extract(member, '/')
break
def _assert_app_id(app_id):
"""Check that app ID is correct."""
if not re.fullmatch(r'[a-z0-9_]+', app_id):
raise Exception('Invalid App ID')
@privileged
def dump_settings(app_id: str, settings: dict[str, int | float | bool | str]):
"""Dump an app's settings to a JSON file."""
_assert_app_id(app_id)
BACKUPS_DATA_PATH.mkdir(exist_ok=True)
settings_path = BACKUPS_DATA_PATH / f'{app_id}-settings.json'
settings_path.write_text(json.dumps(settings))
@privileged
def load_settings(app_id: str) -> dict[str, int | float | bool | str]:
"""Load an app's settings from a JSON file."""
_assert_app_id(app_id)
settings_path = BACKUPS_DATA_PATH / f'{app_id}-settings.json'
try:
return json.loads(settings_path.read_text())
except FileNotFoundError:
return {}
def _get_env(encryption_passphrase: str | None = None):
"""Create encryption and ssh kwargs out of given arguments."""
env = dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes',
BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK='yes',
LANG='C.UTF-8')
# Always provide BORG_PASSPHRASE (also if empty) so borg does not get stuck
# while asking for a passphrase.
env['BORG_PASSPHRASE'] = encryption_passphrase or ''
return env
def _run(cmd, encryption_passphrase=None, check=True, **kwargs):
"""Wrap the command with extra encryption passphrase handling."""
env = _get_env(encryption_passphrase)
return subprocess.run(cmd, check=check, env=env, **kwargs)