#!/usr/bin/python3 # -*- mode: python -*- # SPDX-License-Identifier: AGPL-3.0-or-later """ Wrapper to handle backups using borg-backups. """ import argparse import json import os import subprocess import sys import tarfile from plinth.modules.backups import MANIFESTS_FOLDER from plinth.utils import Version TIMEOUT = 30 def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') setup = subparsers.add_parser( 'setup', help='Create repository if it does not already exist') init = subparsers.add_parser('init', help='Initialize a repository') init.add_argument('--encryption', help='Encryption of the repository', required=True) info = subparsers.add_parser('info', help='Show repository information') list_repo = subparsers.add_parser('list-repo', help='List repository contents') create_archive = subparsers.add_parser('create-archive', help='Create archive') create_archive.add_argument('--paths', help='Paths to include in archive', nargs='+') create_archive.add_argument('--comment', help='Comment text to add to archive', default='') delete_archive = subparsers.add_parser('delete-archive', help='Delete archive') export_help = 'Export archive contents as tar on stdout' export_tar = subparsers.add_parser('export-tar', help=export_help) get_archive_apps = subparsers.add_parser( 'get-archive-apps', help='Get list of apps included in archive') restore_archive = subparsers.add_parser( 'restore-archive', help='Restore files from an archive') restore_archive.add_argument('--destination', help='Destination', required=True) for cmd in [ info, init, list_repo, create_archive, delete_archive, export_tar, get_archive_apps, restore_archive, setup ]: cmd.add_argument('--path', help='Repository or Archive path', required=False) cmd.add_argument('--ssh-keyfile', help='Path of private ssh key', default=None) get_exported_archive_apps = subparsers.add_parser( 'get-exported-archive-apps', help='Get list of apps included in exported archive file') get_exported_archive_apps.add_argument('--path', help='Tarball file path', required=True) restore_exported_archive = subparsers.add_parser( 'restore-exported-archive', help='Restore files from an exported archive') restore_exported_archive.add_argument('--path', help='Tarball file path', required=True) subparsers.required = True return parser.parse_args() def subcommand_setup(arguments): """Create repository if it does not already exist.""" try: run(['borg', 'info', arguments.path], arguments, check=True) except subprocess.CalledProcessError: path = os.path.dirname(arguments.path) if not os.path.exists(path): os.makedirs(path) init_repository(arguments, encryption='none') def init_repository(arguments, encryption): """Initialize a local or remote borg repository""" if encryption != 'none': if not _read_encryption_passphrase(arguments): raise ValueError('No encryption passphrase provided') cmd = ['borg', 'init', '--encryption', encryption, arguments.path] run(cmd, arguments) def subcommand_init(arguments): """Initialize the borg repository.""" init_repository(arguments, encryption=arguments.encryption) def subcommand_info(arguments): """Show repository information.""" run(['borg', 'info', '--json', arguments.path], arguments) def subcommand_list_repo(arguments): """List repository contents.""" run(['borg', 'list', '--json', '--format="{comment}"', arguments.path], arguments) def _get_borg_version(arugments): """Return the version of borgbackup.""" process = run(['borg', '--version'], arugments, stdout=subprocess.PIPE) return process.stdout.decode().split()[1] # Example: "borg 1.1.9" def subcommand_create_archive(arguments): """Create archive.""" paths = filter(os.path.exists, arguments.paths) command = ['borg', 'create', '--json'] if arguments.comment: comment = arguments.comment if Version(_get_borg_version(arguments)) < Version('1.1.10'): # Undo any placeholder escape sequences in comments as this version # of borg does not support placeholders. XXX: Drop this code when # support for borg < 1.1.10 is dropped. comment = comment.replace('{{', '{').replace('}}', '}') command += ['--comment', comment] command += [arguments.path] + list(paths) run(command, arguments) def subcommand_delete_archive(arguments): """Delete archive.""" run(['borg', 'delete', arguments.path], arguments) def _extract(archive_path, destination, arguments, locations=None): """Extract archive contents.""" prev_dir = os.getcwd() borg_call = ['borg', 'extract', archive_path] # do not extract any files when we get an empty locations list if locations is not None: borg_call.extend(locations) try: os.chdir(os.path.expanduser(destination)) # TODO: with python 3.7 use subprocess.run with the 'capture_output' # argument process = run(borg_call, arguments, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if process.returncode != 0: error = process.stderr.decode() # Don't fail on the borg error when no files were matched if "never matched" not in error: raise subprocess.CalledProcessError(process.returncode, process.args) finally: os.chdir(prev_dir) def subcommand_export_tar(arguments): """Export archive contents as tar stream on stdout.""" run(['borg', 'export-tar', arguments.path, '-', '--tar-filter=gzip'], arguments) def _read_archive_file(archive, filepath, arguments): """Read the content of a file inside an archive""" borg_call = ['borg', 'extract', archive, filepath, '--stdout'] return run(borg_call, arguments, stdout=subprocess.PIPE).stdout.decode() def subcommand_get_archive_apps(arguments): """Get list of apps included in archive.""" manifest_folder = os.path.relpath(MANIFESTS_FOLDER, '/') borg_call = [ 'borg', 'list', arguments.path, manifest_folder, '--format', '{path}{NEWLINE}' ] try: borg_process = run(borg_call, arguments, stdout=subprocess.PIPE) manifest_path = borg_process.stdout.decode().strip() except subprocess.CalledProcessError: sys.exit(1) manifest = None if manifest_path: manifest_data = _read_archive_file(arguments.path, manifest_path, arguments) manifest = json.loads(manifest_data) if manifest: for app in _get_apps_of_manifest(manifest): print(app['name']) def _get_apps_of_manifest(manifest): """Get apps of a manifest. Supports both dict format as well as list format of plinth <=0.42 """ if isinstance(manifest, list): apps = manifest elif isinstance(manifest, dict) and 'apps' in manifest: apps = manifest['apps'] else: raise RuntimeError('Unknown manifest format') return apps def subcommand_get_exported_archive_apps(arguments): """Get list of apps included in an exported archive file.""" manifest = None with tarfile.open(arguments.path) as tar_handle: filenames = tar_handle.getnames() for name in filenames: if 'var/lib/plinth/backups-manifests/' in name \ and name.endswith('.json'): manifest_data = tar_handle.extractfile(name).read() manifest = json.loads(manifest_data) break if manifest: for app in _get_apps_of_manifest(manifest): print(app['name']) def subcommand_restore_archive(arguments): """Restore files from an archive.""" _locations = json.loads(arguments.stdin) locations = _locations['directories'] + _locations['files'] locations = [os.path.relpath(location, '/') for location in locations] _extract(arguments.path, arguments.destination, arguments, locations=locations) def subcommand_restore_exported_archive(arguments): """Restore files from an exported archive.""" locations = json.loads(arguments.stdin) with tarfile.open(arguments.path) as tar_handle: for member in tar_handle.getmembers(): path = '/' + member.name if path in locations['files']: tar_handle.extract(member, '/') else: for directory in locations['directories']: if path.startswith(directory): tar_handle.extract(member, '/') break def _read_encryption_passphrase(arguments): """Read encryption passphrase from stdin.""" if arguments.stdin: try: return json.loads(arguments.stdin)['encryption_passphrase'] except KeyError: pass return None def get_env(arguments): """Create encryption and ssh kwargs out of given arguments""" env = dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes', LANG='C.UTF-8') # Always provide BORG_PASSPHRASE (also if empty) so borg does not get stuck # while asking for a passphrase. encryption_passphrase = _read_encryption_passphrase(arguments) env['BORG_PASSPHRASE'] = encryption_passphrase or '' return env def run(cmd, arguments, check=True, **kwargs): """Wrap the command with extra encryption passphrase handling.""" env = get_env(arguments) return subprocess.run(cmd, check=check, env=env, **kwargs) def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() arguments.stdin = sys.stdin.read() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()