mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-28 08:03:36 +00:00
- updated unittests - implemented create/delete/restore remote archives Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
389 lines
14 KiB
Python
Executable File
389 lines
14 KiB
Python
Executable File
#!/usr/bin/python3
|
|
# -*- mode: python -*-
|
|
#
|
|
# This file is part of FreedomBox.
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
"""
|
|
Wrapper to handle backups using borg-backups.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tarfile
|
|
|
|
from plinth.modules.backups import MANIFESTS_FOLDER
|
|
|
|
TIMEOUT = 5
|
|
|
|
|
|
class AlreadyMountedError(Exception):
|
|
pass
|
|
|
|
|
|
def parse_arguments():
|
|
"""Return parsed command line arguments as dictionary."""
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
|
|
|
|
setup = subparsers.add_parser(
|
|
'setup', help='Create repository if it does not already exist')
|
|
|
|
init = subparsers.add_parser('init', help='Initialize a repository')
|
|
init.add_argument('--encryption', help='Enryption of the repository',
|
|
required=True)
|
|
|
|
info = subparsers.add_parser('info', help='Show repository information')
|
|
|
|
list_repo = subparsers.add_parser('list-repo',
|
|
help='List repository contents')
|
|
|
|
create_archive = subparsers.add_parser('create-archive',
|
|
help='Create archive')
|
|
create_archive.add_argument('--paths', help='Paths to include in archive',
|
|
nargs='+')
|
|
|
|
delete_archive = subparsers.add_parser('delete-archive',
|
|
help='Delete archive')
|
|
|
|
export_help = 'Export archive contents as tar on stdout'
|
|
export_tar = subparsers.add_parser('export-tar', help=export_help)
|
|
|
|
get_archive_apps = subparsers.add_parser(
|
|
'get-archive-apps',
|
|
help='Get list of apps included in archive')
|
|
|
|
# SSH mount actions
|
|
mount = subparsers.add_parser('mount', help='mount an ssh filesystem')
|
|
mount.add_argument('--mountpoint', help='Local mountpoint', required=True)
|
|
umount = subparsers.add_parser('umount',
|
|
help='unmount an ssh filesystem')
|
|
umount.add_argument('--mountpoint', help='Mountpoint to unmount',
|
|
required=True)
|
|
is_mounted = subparsers.add_parser('is-mounted',
|
|
help='Check whether an sshfs is mouned')
|
|
is_mounted.add_argument('--mountpoint', help='Mountpoint to check',
|
|
required=True)
|
|
|
|
for cmd in [info, init, list_repo, create_archive, delete_archive,
|
|
export_tar, get_archive_apps, mount, setup]:
|
|
cmd.add_argument('--path', help='Repository or Archive path',
|
|
required=False) # TODO: set required to True!
|
|
cmd.add_argument('--ssh-keyfile', help='Path of private ssh key',
|
|
default=None)
|
|
cmd.add_argument('--encryption-passphrase',
|
|
help='Encryption passphrase',
|
|
default=None)
|
|
|
|
get_exported_archive_apps = subparsers.add_parser(
|
|
'get-exported-archive-apps',
|
|
help='Get list of apps included in exported archive file')
|
|
get_exported_archive_apps.add_argument(
|
|
'--path', help='Tarball file path', required=True)
|
|
|
|
restore_exported_archive = subparsers.add_parser(
|
|
'restore-exported-archive',
|
|
help='Restore files from an exported archive')
|
|
restore_exported_archive.add_argument('--path', help='Tarball file path',
|
|
required=True)
|
|
|
|
restore_archive = subparsers.add_parser(
|
|
'restore-archive', help='Restore files from an archive')
|
|
restore_archive.add_argument('--path', help='Archive path', required=True)
|
|
restore_archive.add_argument('--destination', help='Destination',
|
|
required=True)
|
|
|
|
subparsers.required = True
|
|
return parser.parse_args()
|
|
|
|
|
|
def subcommand_setup(arguments):
|
|
"""Create repository if it does not already exist."""
|
|
env = get_env(arguments)
|
|
try:
|
|
run(['borg', 'info', arguments.path], check=True, env=env)
|
|
except:
|
|
path = os.path.dirname(arguments.path)
|
|
if not os.path.exists(path):
|
|
os.makedirs(path)
|
|
|
|
init(arguments.path, 'none', env=env)
|
|
|
|
|
|
def init(path, encryption, env=None):
|
|
"""Initialize a local or remote borg repository"""
|
|
if encryption != 'none' and 'BORG_PASSPHRASE' not in env:
|
|
raise ValueError('No encryption passphrase provided')
|
|
cmd = ['borg', 'init', '--encryption', encryption, path]
|
|
run(cmd, env=env)
|
|
|
|
|
|
def get_env(arguments, read_input=True):
|
|
"""Create encryption and ssh kwargs out of given arguments"""
|
|
env = dict(os.environ, BORG_RELOCATED_REPO_ACCESS_IS_OK='yes')
|
|
if arguments.encryption_passphrase:
|
|
env['BORG_PASSPHRASE'] = arguments.encryption_passphrase
|
|
if arguments.ssh_keyfile:
|
|
env['BORG_RSH'] = "ssh -i %s" % arguments.ssh_keyfile
|
|
else:
|
|
password = read_password() if read_input else None
|
|
if password:
|
|
env['SSHPASS'] = password
|
|
env['BORG_RSH'] = 'sshpass -e ssh -o StrictHostKeyChecking=no'
|
|
return env
|
|
|
|
|
|
def subcommand_init(arguments):
|
|
env = get_env(arguments)
|
|
init(arguments.path, arguments.encryption, env=env)
|
|
|
|
|
|
def subcommand_info(arguments):
|
|
"""Show repository information."""
|
|
env = get_env(arguments)
|
|
run(['borg', 'info', '--json', arguments.path], env=env)
|
|
|
|
|
|
def subcommand_mount(arguments):
|
|
"""Show repository information."""
|
|
try:
|
|
validate_mountpoint(arguments.mountpoint)
|
|
except AlreadyMountedError:
|
|
return
|
|
|
|
env = get_env(arguments)
|
|
remote_path = arguments.path
|
|
kwargs = {}
|
|
# the shell would expand ~/ to the local home directory
|
|
remote_path = remote_path.replace('~/', '').replace('~', '')
|
|
cmd = ['sshfs', remote_path, arguments.mountpoint, '-o',
|
|
'UserKnownHostsFile=/dev/null', '-o',
|
|
'StrictHostKeyChecking=no']
|
|
timeout = None
|
|
if 'SSHPASS' in env:
|
|
cmd += ['-o', 'password_stdin']
|
|
kwargs['input'] = env['SSHPASS'].encode()
|
|
elif 'SSHKEY' in env:
|
|
cmd += ['-o', 'IdentityFile=$SSHKEY']
|
|
timeout = TIMEOUT
|
|
else:
|
|
raise ValueError('mount requires either SSHPASS or SSHKEY in env')
|
|
subprocess.run(cmd, check=True, env=env, timeout=timeout, **kwargs)
|
|
|
|
|
|
def subcommand_umount(arguments):
|
|
"""Show repository information."""
|
|
run(['umount', arguments.mountpoint])
|
|
|
|
|
|
def validate_mountpoint(mountpoint):
|
|
"""Check that the folder is empty, and create it if it doesn't exist"""
|
|
if os.path.exists(mountpoint):
|
|
if _is_mounted(mountpoint):
|
|
raise AlreadyMountedError('Mountpoint %s already mounted' %
|
|
mountpoint)
|
|
if os.listdir(mountpoint) or not os.path.isdir(mountpoint):
|
|
raise ValueError('Mountpoint %s is not an empty directory' %
|
|
mountpoint)
|
|
else:
|
|
os.makedirs(mountpoint)
|
|
|
|
|
|
def _is_mounted(mountpoint):
|
|
"""Return boolean whether a local directory is a mountpoint."""
|
|
cmd = ['mountpoint', '-q', mountpoint]
|
|
# mountpoint exits with status non-zero if it didn't find a mountpoint
|
|
try:
|
|
subprocess.run(cmd, check=True)
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
|
|
def subcommand_is_mounted(arguments):
|
|
print(json.dumps(_is_mounted(arguments.mountpoint)))
|
|
|
|
|
|
def subcommand_list_repo(arguments):
|
|
"""List repository contents."""
|
|
env = get_env(arguments)
|
|
run(['borg', 'list', '--json', arguments.path], env=env)
|
|
|
|
|
|
def subcommand_create_archive(arguments):
|
|
"""Create archive."""
|
|
env = get_env(arguments)
|
|
paths = filter(os.path.exists, arguments.paths)
|
|
run(['borg', 'create', '--json', arguments.path] + list(paths), env=env)
|
|
|
|
|
|
def subcommand_delete_archive(arguments):
|
|
"""Delete archive."""
|
|
env = get_env(arguments)
|
|
run(['borg', 'delete', arguments.path], env=env)
|
|
|
|
|
|
def _extract(archive_path, destination, locations=None):
|
|
"""Extract archive contents."""
|
|
prev_dir = os.getcwd()
|
|
env = dict(os.environ, LANG='C.UTF-8')
|
|
borg_call = ['borg', 'extract', archive_path]
|
|
# do not extract any files when we get an empty locations list
|
|
if locations is not None:
|
|
borg_call.extend(locations)
|
|
try:
|
|
os.chdir(os.path.expanduser(destination))
|
|
# TODO: with python 3.7 use subprocess.run with the 'capture_output'
|
|
# argument
|
|
process = subprocess.run(borg_call, env=env,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
if process.returncode != 0:
|
|
error = process.stderr.decode()
|
|
# Don't fail on the borg error when no files were matched
|
|
if "never matched" not in error:
|
|
raise subprocess.CalledProcessError(process.returncode,
|
|
process.args)
|
|
finally:
|
|
os.chdir(prev_dir)
|
|
|
|
|
|
def subcommand_export_tar(arguments):
|
|
"""Export archive contents as tar stream on stdout."""
|
|
# TODO: Get read_password to reliably detect if a password is provided
|
|
env = get_env(arguments, read_input=False)
|
|
run(['borg', 'export-tar', arguments.path, '-'], env=env)
|
|
|
|
|
|
def _read_archive_file(archive, filepath):
|
|
"""Read the content of a file inside an archive"""
|
|
arguments = ['borg', 'extract', archive, filepath, '--stdout']
|
|
return subprocess.check_output(arguments).decode()
|
|
|
|
|
|
def subcommand_get_archive_apps(arguments):
|
|
"""Get list of apps included in archive."""
|
|
env = get_env(arguments)
|
|
manifest_folder = os.path.relpath(MANIFESTS_FOLDER, '/')
|
|
borg_call = ['borg', 'list', arguments.path, manifest_folder,
|
|
'--format', '{path}{NEWLINE}']
|
|
timeout = None
|
|
if 'BORG_RSH' in env and 'SSHPASS' not in env:
|
|
timeout = TIMEOUT
|
|
try:
|
|
manifest_path = subprocess.check_output(borg_call, env=env,
|
|
timeout=timeout).decode()\
|
|
.strip()
|
|
except subprocess.CalledProcessError:
|
|
sys.exit(1)
|
|
|
|
manifest = None
|
|
if manifest_path:
|
|
manifest_data = _read_archive_file(arguments.path, manifest_path)
|
|
manifest = json.loads(manifest_data)
|
|
if manifest:
|
|
for app in _get_apps_of_manifest(manifest):
|
|
print(app['name'])
|
|
|
|
|
|
def _get_apps_of_manifest(manifest):
|
|
"""
|
|
Get apps of a manifest.
|
|
Supports both dict format as well as list format of plinth <=0.42
|
|
"""
|
|
if type(manifest) is list:
|
|
apps = manifest
|
|
elif type(manifest) is dict and 'apps' in manifest:
|
|
apps = manifest['apps']
|
|
else:
|
|
raise RuntimeError('Unknown manifest format')
|
|
return apps
|
|
|
|
|
|
def subcommand_get_exported_archive_apps(arguments):
|
|
"""Get list of apps included in an exported archive file."""
|
|
manifest = None
|
|
with tarfile.open(arguments.path) as t:
|
|
filenames = t.getnames()
|
|
for name in filenames:
|
|
if 'var/lib/plinth/backups-manifests/' in name \
|
|
and name.endswith('.json'):
|
|
manifest_data = t.extractfile(name).read()
|
|
manifest = json.loads(manifest_data)
|
|
break
|
|
|
|
if manifest:
|
|
for app in _get_apps_of_manifest(manifest):
|
|
print(app['name'])
|
|
|
|
|
|
def subcommand_restore_archive(arguments):
|
|
"""Restore files from an archive."""
|
|
locations_data = ''.join(sys.stdin)
|
|
_locations = json.loads(locations_data)
|
|
locations = _locations['directories'] + _locations['files']
|
|
locations = [os.path.relpath(location, '/') for location in locations]
|
|
_extract(arguments.path, arguments.destination, locations=locations)
|
|
|
|
|
|
def subcommand_restore_exported_archive(arguments):
|
|
"""Restore files from an exported archive."""
|
|
locations_data = ''.join(sys.stdin)
|
|
locations = json.loads(locations_data)
|
|
|
|
with tarfile.open(arguments.path) as tar_handle:
|
|
for member in tar_handle.getmembers():
|
|
path = '/' + member.name
|
|
if path in locations['files']:
|
|
tar_handle.extract(member, '/')
|
|
else:
|
|
for directory in locations['directories']:
|
|
if path.startswith(directory):
|
|
tar_handle.extract(member, '/')
|
|
break
|
|
|
|
|
|
def read_password():
|
|
"""Read the password from stdin."""
|
|
if sys.stdin.isatty():
|
|
return ''
|
|
else:
|
|
return ''.join(sys.stdin)
|
|
|
|
|
|
def run(cmd, env=None, check=True):
|
|
"""Wrap the command with ssh password or keyfile authentication"""
|
|
# Set a timeout to not get stuck if the remote server asks for a password.
|
|
timeout = None
|
|
if env and 'BORG_RSH' in env and 'SSHPASS' not in env:
|
|
timeout = TIMEOUT
|
|
subprocess.run(cmd, check=check, env=env, timeout=timeout)
|
|
|
|
|
|
def main():
|
|
"""Parse arguments and perform all duties."""
|
|
arguments = parse_arguments()
|
|
|
|
subcommand = arguments.subcommand.replace('-', '_')
|
|
subcommand_method = globals()['subcommand_' + subcommand]
|
|
subcommand_method(arguments)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|