#!/usr/bin/python3 # # This file is part of FreedomBox. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # """ Configuration helper for filesystem snapshots. """ import argparse import json import os import subprocess import augeas FSTAB = '/etc/fstab' AUG_FSTAB = '/files/etc/fstab' def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') subparsers.add_parser('setup', help='Configure snapper') subparsers.add_parser('list', help='List snapshots') subparsers.add_parser('create', help='Create snapshot') subparsers.add_parser('get-config', help='Configurations of snapshot') subparser = subparsers.add_parser('delete', help='Delete a snapshot by number') subparser.add_argument('number', help='Number of snapshot to delete') subparser = subparsers.add_parser('delete-all', help='Delete all the snapshots') subparser = subparsers.add_parser('configure', help='Configure automatic snapshots') subparser.add_argument('config') subparser = subparsers.add_parser('rollback', help='Rollback to snapshot') subparser.add_argument('number', help='Number of snapshot to rollback to') subparsers.required = True return parser.parse_args() def subcommand_setup(_): """Configure snapper.""" # Check if root config exists. command = ['snapper', 'list-configs'] process = subprocess.run(command, stdout=subprocess.PIPE, check=True) output = process.stdout.decode() # Create root config if needed. if 'root' not in output: command = ['snapper', 'create-config', '/'] subprocess.run(command, check=True) _set_default_config() _add_fstab_entry('/') def _set_default_config(): command = [ 'snapper', 'set-config', 'TIMELINE_CREATE=yes', 'TIMELINE_LIMIT_HOURLY=10', 'TIMELINE_LIMIT_MONTHLY=2', 'TIMELINE_LIMIT_WEEKLY=2', 'TIMELINE_LIMIT_YEARLY=0', 'TIMELINE_LIMIT_DAILY=3', 'NUMBER_MIN_AGE=1296000', 'NUMBER_LIMIT=0', 'NUMBER_LIMIT_IMPORTANT=4-10' ] subprocess.run(command, check=True) def _add_fstab_entry(mount_point): """Add mountpoint for subvolumes.""" snapshots_mount_point = os.path.join(mount_point, '.snapshots') aug = augeas.Augeas( flags=augeas.Augeas.NO_LOAD + augeas.Augeas.NO_MODL_AUTOLOAD) aug.set('/augeas/load/Fstab/lens', 'Fstab.lns') aug.set('/augeas/load/Fstab/incl[last() + 1]', FSTAB) aug.load() spec = None for entry in aug.match(AUG_FSTAB + '/*'): entry_mount_point = aug.get(entry + '/file') if entry_mount_point == snapshots_mount_point: return if entry_mount_point == mount_point and \ aug.get(entry + '/vfstype') == 'btrfs': spec = aug.get(entry + '/spec') if spec: aug.set(AUG_FSTAB + '/01/spec', spec) aug.set(AUG_FSTAB + '/01/file', snapshots_mount_point) aug.set(AUG_FSTAB + '/01/vfstype', 'btrfs') aug.set(AUG_FSTAB + '/01/opt', 'subvol') aug.set(AUG_FSTAB + '/01/opt/value', '.snapshots') aug.set(AUG_FSTAB + '/01/dump', '0') aug.set(AUG_FSTAB + '/01/passno', '1') aug.save() def _get_snapper_list(): command = ['snapper', 'list'] process = subprocess.run(command, stdout=subprocess.PIPE, check=True) return process.stdout.decode().splitlines() def subcommand_list(_): """List snapshots.""" lines = _get_snapper_list() keys = ('type', 'number', 'pre_number', 'date', 'user', 'cleanup', 'description') snapshots = [] for line in lines[2:]: parts = [part.strip() for part in line.split('|')] snapshots.append(dict(zip(keys, parts))) default = _get_default_snapshot() for snapshot in snapshots: snapshot['is_default'] = (snapshot['number'] == default) print(json.dumps(snapshots)) def _get_default_snapshot(): """Return the default snapshot by looking at default subvolume.""" command = ['btrfs', 'subvolume', 'get-default', '/'] process = subprocess.run(command, stdout=subprocess.PIPE, check=True) output = process.stdout.decode() output_parts = output.split() if len(output_parts) >= 9: path = output.split()[8] path_parts = path.split('/') if len(path_parts) == 3 and path_parts[0] == '.snapshots': return path_parts[1] return None def subcommand_create(_): """Create snapshot.""" command = ['snapper', 'create', '--description', 'manually created'] subprocess.run(command, check=True) def subcommand_delete(arguments): """Delete a snapshot by number.""" command = ['snapper', 'delete', arguments.number] subprocess.run(command, check=True) def subcommand_delete_all(_): """Delete all the snapshots (except the active one).""" lines = _get_snapper_list() snapshot_range = [line.split('|')[1].strip() for line in lines[3:]] default_snapshot = _get_default_snapshot() if snapshot_range: if default_snapshot: index = snapshot_range.index(default_snapshot) range_before = snapshot_range[:index] range_after = snapshot_range[index + 1:] to_delete = [range_before, range_after] else: to_delete = [snapshot_range] delete_args = filter(None, map(_get_delete_arg, to_delete)) for arg in delete_args: subprocess.run(['snapper', 'delete', arg], check=True) def _get_delete_arg(range_list): """Return 'a-b' when given ['a', ..., 'b'].""" if not range_list: return None elif len(range_list) == 1: return range_list[0] else: return range_list[0] + '-' + range_list[-1] def subcommand_configure(arguments): command = ['snapper', 'set-config', arguments.config] subprocess.run(command, check=True) def subcommand_get_config(_): command = ['snapper', 'get-config'] process = subprocess.run(command, stdout=subprocess.PIPE, check=True) lines = process.stdout.decode().splitlines() config = {} for line in lines[2:]: parts = [part.strip() for part in line.split('|')] config[parts[0]] = parts[1] print(json.dumps(config)) def subcommand_rollback(arguments): """Rollback to snapshot.""" command = [ 'snapper', 'rollback', '--description', 'created by rollback', arguments.number ] subprocess.run(command, check=True) def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()