Michael Pimmer c770a7adfb
Backups: Restore directly from archive
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
2018-11-17 08:59:29 -05:00

273 lines
8.9 KiB
Python
Executable File

#!/usr/bin/python3
# -*- mode: python -*-
#
# This file is part of FreedomBox.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Configuration helper for backups.
"""
import argparse
import glob
import json
import os
import shutil
import subprocess
import sys
import tarfile
from plinth.modules.backups import MANIFESTS_FOLDER, REPOSITORY
def parse_arguments():
"""Return parsed command line arguments as dictionary."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
subparsers.add_parser(
'setup', help='Create repository if it does not already exist')
subparsers.add_parser('info', help='Show repository information')
subparsers.add_parser('list', help='List repository contents')
create = subparsers.add_parser('create', help='Create archive')
create.add_argument('--name', help='Archive name', required=True)
create.add_argument('--paths', help='Paths to include in archive',
nargs='+')
delete = subparsers.add_parser('delete', help='Delete archive')
delete.add_argument('--name', help='Archive name', required=True)
extract = subparsers.add_parser('extract', help='Extract archive contents')
extract.add_argument('--name', help='Archive name', required=True)
extract.add_argument('--destination', help='Extract destination',
required=True)
export_tar = subparsers.add_parser('export-tar',
help='Export archive contents as tarball')
export_tar.add_argument('--name', help='Archive name', required=True)
export_tar.add_argument('--filename', help='Tarball file name', required=True)
list_exports = subparsers.add_parser(
'list-exports', help='List exported backup archive files')
list_exports.add_argument('--location', required=True,
help='location to check')
get_export_apps = subparsers.add_parser(
'get-export-apps',
help='Get list of apps included in exported archive file')
get_export_apps.add_argument(
'--filename', help='Tarball file name', required=True)
get_archive_apps = subparsers.add_parser(
'get-archive-apps',
help='Get list of apps included in archive')
get_archive_apps.add_argument(
'--path', help='Path of the archive', required=True)
restore_exported_archive = subparsers.add_parser(
'restore-exported-archive',
help='Restore files from an exported archive')
# TODO: rename filename to filepath (or just path)
restore_exported_archive.add_argument('--filename',
help='Tarball file name', required=True)
restore_archive = subparsers.add_parser(
'restore-archive', help='Restore files from an archive')
restore_archive.add_argument('--path', help='Archive path', required=True)
restore_archive.add_argument('--destination', help='Destination',
required=True)
subparsers.required = True
return parser.parse_args()
def subcommand_setup(_):
"""Create repository if it does not already exist."""
try:
subprocess.run(['borg', 'info', REPOSITORY], check=True)
except:
path = os.path.dirname(REPOSITORY)
if not os.path.exists(path):
os.makedirs(path)
subprocess.run(['borg', 'init', '--encryption', 'none', REPOSITORY])
def subcommand_info(_):
"""Show repository information."""
subprocess.run(['borg', 'info', '--json', REPOSITORY], check=True)
def subcommand_list(_):
"""List repository contents."""
subprocess.run(['borg', 'list', '--json', REPOSITORY], check=True)
def subcommand_create(arguments):
"""Create archive."""
paths = filter(os.path.exists, arguments.paths)
subprocess.run([
'borg',
'create',
'--json',
REPOSITORY + '::' + arguments.name,
] + list(paths), check=True)
def subcommand_delete(arguments):
"""Delete archive."""
subprocess.run(['borg', 'delete', REPOSITORY + '::' + arguments.name],
check=True)
def subcommand_extract(arguments):
"""Extract archive contents."""
path = REPOSITORY + '::' + arguments.name
return _extract(path, arguments.destination)
def _extract(archive_path, destination, locations=None):
"""Extract archive contents."""
prev_dir = os.getcwd()
env = dict(os.environ, LANG='C.UTF-8')
borg_call = ['borg', 'extract', archive_path]
if locations is not None:
borg_call.append(locations)
print(borg_call)
try:
os.chdir(os.path.expanduser(destination))
subprocess.run(borg_call, env=env, check=True)
finally:
os.chdir(prev_dir)
def subcommand_export_tar(arguments):
"""Export archive contents as tarball."""
# TODO: if this is only used for files in /tmp, add checks to verify that
# arguments.filename is within /tmp (does this actually increase security?)
# TODO: arguments.filename is not a filename but a path
path = os.path.dirname(arguments.filename)
if not os.path.exists(path):
os.makedirs(path)
subprocess.run([
'borg', 'export-tar', REPOSITORY + '::' + arguments.name,
arguments.filename
], check=True)
try:
shutil.chown(arguments.filename, user='plinth', group='plinth')
except PermissionError:
pass
def subcommand_list_exports(arguments):
"""List exported backup archive files."""
exports = []
path = arguments.location
if path[-1] != '/':
path += '/'
path += 'FreedomBox-backups/'
if os.path.exists(path):
for filename in glob.glob(path + '*.tar.gz'):
exports.append(os.path.basename(filename))
print(json.dumps(exports))
def _read_archive_file(archive, filepath):
"""Read the content of a file inside an archive"""
arguments = ['borg', 'extract', archive, filepath, '--stdout']
return subprocess.check_output(arguments).decode()
def subcommand_get_archive_apps(arguments):
"""Get list of apps included in archive."""
manifest_folder = os.path.relpath(MANIFESTS_FOLDER, '/')
borg_call = ['borg', 'list', arguments.path, manifest_folder,
'--format', '{path}{NEWLINE}']
try:
manifest_path = subprocess.check_output(borg_call).decode().strip()
except subprocess.CalledProcessError:
sys.exit(1)
manifest = None
if manifest_path:
manifest_data = _read_archive_file(arguments.path,
manifest_path)
manifest = json.loads(manifest_data)
if manifest:
for app in manifest:
print(app['name'])
def subcommand_get_export_apps(arguments):
"""Get list of apps included in exported archive file."""
manifest = None
with tarfile.open(arguments.filename) as t:
filenames = t.getnames()
for name in filenames:
if 'var/lib/plinth/backups-manifests/' in name \
and name.endswith('.json'):
manifest_data = t.extractfile(name).read()
manifest = json.loads(manifest_data)
break
if manifest:
for app in manifest:
print(app['name'])
def subcommand_restore_archive(arguments):
"""Restore files from an archive."""
locations_data = ''.join(sys.stdin)
locations = json.loads(locations_data)
locations_string = " ".join(locations['directories'])
locations_string += " ".join(locations['files'])
_extract(arguments.path, arguments.destination, locations=locations_string)
def subcommand_restore_exported_archive(arguments):
"""Restore files from an exported archive."""
locations_data = ''.join(sys.stdin)
locations = json.loads(locations_data)
with tarfile.open(arguments.filename) as tar_handle:
for member in tar_handle.getmembers():
path = '/' + member.name
if path in locations['files']:
tar_handle.extract(member, '/')
else:
for directory in locations['directories']:
if path.startswith(directory):
tar_handle.extract(member, '/')
break
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()