#!/usr/bin/python3 # -*- mode: python -*- # # This file is part of FreedomBox. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # """ Wrapper to handle backups using borg-backups. """ import argparse import json import os import subprocess import sys import tarfile from plinth.modules.backups import MANIFESTS_FOLDER TIMEOUT = 5 class AlreadyMountedError(Exception): pass def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') setup = subparsers.add_parser( 'setup', help='Create repository if it does not already exist') init = subparsers.add_parser('init', help='Initialize a repository') init.add_argument('--encryption', help='Enryption of the repository', required=True) info = subparsers.add_parser('info', help='Show repository information') list_repo = subparsers.add_parser('list-repo', help='List repository contents') create_archive = subparsers.add_parser('create-archive', help='Create archive') create_archive.add_argument('--paths', help='Paths to include in archive', nargs='+') delete_archive = subparsers.add_parser('delete-archive', help='Delete archive') export_help = 'Export archive contents as tar on stdout' export_tar = subparsers.add_parser('export-tar', help=export_help) get_archive_apps = subparsers.add_parser( 'get-archive-apps', help='Get list of apps included in archive') # SSH mount actions mount = subparsers.add_parser('mount', help='mount an ssh filesystem') mount.add_argument('--mountpoint', help='Local mountpoint', required=True) umount = subparsers.add_parser('umount', help='unmount an ssh filesystem') umount.add_argument('--mountpoint', help='Mountpoint to unmount', required=True) is_mounted = subparsers.add_parser('is-mounted', help='Check whether an sshfs is mouned') is_mounted.add_argument('--mountpoint', help='Mountpoint to check', required=True) for cmd in [info, init, list_repo, create_archive, delete_archive, export_tar, get_archive_apps, mount, setup]: cmd.add_argument('--path', help='Repository or Archive path', required=False) # TODO: set required to True! cmd.add_argument('--ssh-keyfile', help='Path of private ssh key', default=None) cmd.add_argument('--encryption-passphrase', help='Encryption passphrase', default=None) get_exported_archive_apps = subparsers.add_parser( 'get-exported-archive-apps', help='Get list of apps included in exported archive file') get_exported_archive_apps.add_argument( '--path', help='Tarball file path', required=True) restore_exported_archive = subparsers.add_parser( 'restore-exported-archive', help='Restore files from an exported archive') restore_exported_archive.add_argument('--path', help='Tarball file path', required=True) restore_archive = subparsers.add_parser( 'restore-archive', help='Restore files from an archive') restore_archive.add_argument('--path', help='Archive path', required=True) restore_archive.add_argument('--destination', help='Destination', required=True) subparsers.required = True return parser.parse_args() def subcommand_setup(arguments): """Create repository if it does not already exist.""" env = get_env(arguments) try: run(['borg', 'info', arguments.path], check=True, env=env) except: path = os.path.dirname(arguments.path) if not os.path.exists(path): os.makedirs(path) init(arguments.path, 'none', env=env) def init(path, encryption, env=None): """Initialize a local or remote borg repository""" # TODO: verify that the repository does not exist? # TODO: does remote borg also create folders if they don't exist? if encryption != 'none' and 'BORG_PASSPHRASE' not in env: raise ValueError('No encryption passphrase provided') cmd = ['borg', 'init', '--encryption', encryption, path] run(cmd, env=env) def get_env(arguments): """Create encryption and ssh kwargs out of given arguments""" env = dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes') if arguments.encryption_passphrase: env['BORG_PASSPHRASE'] = arguments.encryption_passphrase if arguments.ssh_keyfile: env['BORG_RSH'] = "ssh -i %s" % arguments.ssh_keyfile else: password = read_password() if password: env['SSHPASS'] = password env['BORG_RSH'] = 'sshpass -e ssh -o StrictHostKeyChecking=no' return env def get_sshfs_env(arguments): """Create encryption and ssh kwargs out of given arguments""" env = dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes') if arguments.encryption_passphrase: env['BORG_PASSPHRASE'] = arguments.encryption_passphrase if arguments.ssh_keyfile: env['SSHKEY'] = arguments.ssh_keyfile else: password = read_password() if password: env['SSHPASS'] = password return env def subcommand_init(arguments): env = get_env(arguments) init(arguments.path, arguments.encryption, env=env) def subcommand_info(arguments): """Show repository information.""" env = get_env(arguments) run(['borg', 'info', '--json', arguments.path], env=env) def subcommand_mount(arguments): """Show repository information.""" try: validate_mountpoint(arguments.mountpoint) except AlreadyMountedError: return env = get_env(arguments) remote_path = arguments.path kwargs = {} # the shell would expand ~/ to the local home directory remote_path = remote_path.replace('~/', '').replace('~', '') cmd = ['sshfs', remote_path, arguments.mountpoint, '-o', 'UserKnownHostsFile=/dev/null', '-o', 'StrictHostKeyChecking=no'] timeout = None if 'SSHPASS' in env: cmd += ['-o', 'password_stdin'] kwargs['input'] = env['SSHPASS'].encode() elif 'SSHKEY' in env: cmd += ['-o', 'IdentityFile=$SSHKEY'] timeout = TIMEOUT else: raise ValueError('mount requires either SSHPASS or SSHKEY in env') subprocess.run(cmd, check=True, env=env, timeout=timeout, **kwargs) def subcommand_umount(arguments): """Show repository information.""" run(['umount', arguments.mountpoint]) def validate_mountpoint(mountpoint): """Check that the folder is empty, and create it if it doesn't exist""" if os.path.exists(mountpoint): if _is_mounted(mountpoint): raise AlreadyMountedError('Mountpoint %s already mounted' % mountpoint) if os.listdir(mountpoint) or not os.path.isdir(mountpoint): raise ValueError('Mountpoint %s is not an empty directory' % mountpoint) else: os.makedirs(mountpoint) def _is_mounted(mountpoint): """Return boolean whether a local directory is a mountpoint.""" cmd = ['mountpoint', '-q', mountpoint] # mountpoint exits with status non-zero if it didn't find a mountpoint try: subprocess.run(cmd, check=True) return True except subprocess.CalledProcessError: return False def subcommand_is_mounted(arguments): print(json.dumps(_is_mounted(arguments.mountpoint))) def subcommand_list_repo(arguments): """List repository contents.""" env = get_env(arguments) run(['borg', 'list', '--json', arguments.path], env=env) def subcommand_create_archive(arguments): """Create archive.""" env = get_env(arguments) paths = filter(os.path.exists, arguments.paths) run(['borg', 'create', '--json', arguments.path] + list(paths), env=env) def subcommand_delete_archive(arguments): """Delete archive.""" env = get_env(arguments) run(['borg', 'delete', arguments.path], env=env) def _extract(archive_path, destination, locations=None): """Extract archive contents.""" prev_dir = os.getcwd() env = dict(os.environ, LANG='C.UTF-8') borg_call = ['borg', 'extract', archive_path] # do not extract any files when we get an empty locations list if locations is not None: borg_call.extend(locations) try: os.chdir(os.path.expanduser(destination)) # TODO: with python 3.7 use subprocess.run with the 'capture_output' # argument process = subprocess.run(borg_call, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if process.returncode != 0: error = process.stderr.decode() # Don't fail on the borg error when no files were matched if "never matched" not in error: raise subprocess.CalledProcessError(process.returncode, process.args) finally: os.chdir(prev_dir) def subcommand_export_tar(arguments): """Export archive contents as tar stream on stdout.""" env = get_env(arguments) run(['borg', 'export-tar', arguments.path, '-'], env=env) def _read_archive_file(archive, filepath): """Read the content of a file inside an archive""" arguments = ['borg', 'extract', archive, filepath, '--stdout'] return subprocess.check_output(arguments).decode() def subcommand_get_archive_apps(arguments): """Get list of apps included in archive.""" env = get_env(arguments) manifest_folder = os.path.relpath(MANIFESTS_FOLDER, '/') borg_call = ['borg', 'list', arguments.path, manifest_folder, '--format', '{path}{NEWLINE}'] timeout = None if 'BORG_RSH' in env and 'SSHPASS' not in env: timeout = TIMEOUT try: manifest_path = subprocess.check_output(borg_call, env=env, timeout=timeout).decode()\ .strip() except subprocess.CalledProcessError: sys.exit(1) manifest = None if manifest_path: manifest_data = _read_archive_file(arguments.path, manifest_path) manifest = json.loads(manifest_data) if manifest: for app in _get_apps_of_manifest(manifest): print(app['name']) def _get_apps_of_manifest(manifest): """ Get apps of a manifest. Supports both dict format as well as list format of plinth <=0.42 """ if type(manifest) is list: apps = manifest elif type(manifest) is dict and 'apps' in manifest: apps = manifest['apps'] else: raise RuntimeError('Unknown manifest format') return apps def subcommand_get_exported_archive_apps(arguments): """Get list of apps included in an exported archive file.""" manifest = None with tarfile.open(arguments.path) as t: filenames = t.getnames() for name in filenames: if 'var/lib/plinth/backups-manifests/' in name \ and name.endswith('.json'): manifest_data = t.extractfile(name).read() manifest = json.loads(manifest_data) break if manifest: for app in _get_apps_of_manifest(manifest): print(app['name']) def subcommand_restore_archive(arguments): """Restore files from an archive.""" locations_data = ''.join(sys.stdin) _locations = json.loads(locations_data) locations = _locations['directories'] + _locations['files'] locations = [os.path.relpath(location, '/') for location in locations] _extract(arguments.path, arguments.destination, locations=locations) def subcommand_restore_exported_archive(arguments): """Restore files from an exported archive.""" locations_data = ''.join(sys.stdin) locations = json.loads(locations_data) with tarfile.open(arguments.path) as tar_handle: for member in tar_handle.getmembers(): path = '/' + member.name if path in locations['files']: tar_handle.extract(member, '/') else: for directory in locations['directories']: if path.startswith(directory): tar_handle.extract(member, '/') break def read_password(): """Read the password from stdin.""" if sys.stdin.isatty(): return '' else: return ''.join(sys.stdin) def run(cmd, env=None, check=True): """Wrap the command with ssh password or keyfile authentication""" # If the remote server asks for a password but no password is # provided, we get stuck at asking the password. timeout = None if env and 'BORG_RSH' in env and 'SSHPASS' not in env: timeout = TIMEOUT subprocess.run(cmd, check=check, env=env, timeout=timeout) def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()