#!/usr/bin/python3 # # This file is part of FreedomBox. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # """ Configuration helper for disks manager. """ import argparse import json import re import subprocess import sys from plinth import utils def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') subparser = subparsers.add_parser( 'is-partition-expandable', help='Return whether a given partition can be expanded') subparser.add_argument( 'device', help='Partition for which check needs to be performed') subparser = subparsers.add_parser( 'expand-partition', help='Expand a partition to take adjacent free space') subparser.add_argument('device', help='Partition which needs to be resized') subparser = subparsers.add_parser('eject', help='Eject a storage device') subparser.add_argument('device', help='Path of the device to eject') subparsers.required = True return parser.parse_args() def subcommand_is_partition_expandable(arguments): """Return a list of partitions that can be expanded.""" _, _, free_space = _get_free_space(arguments.device) print(free_space['size']) def subcommand_expand_partition(arguments): """Expand a partition to take adjacent free space.""" device = arguments.device device, requested_partition, free_space = _get_free_space(device) if requested_partition['table_type'] == 'msdos' and \ int(requested_partition['number']) >= 5: print('Expanding logical partitions currently unsupported', file=sys.stderr) sys.exit(4) _resize_partition(device, requested_partition, free_space) _resize_file_system(device, requested_partition, free_space) def _resize_partition(device, requested_partition, free_space): """Resize the partition table entry.""" command = ['parted', '--align=optimal', '--script', device, 'unit', 'B', 'resizepart', requested_partition['number'], str(free_space['end'])] # XXX: Remove workaround after bug in parted is fixed: # https://debbugs.gnu.org/cgi/bugreport.cgi?bug=24215 fallback_command = ['parted', '--align=optimal', device, '---pretend-input-tty', 'unit', 'B', 'resizepart', requested_partition['number'], 'yes', str(free_space['end'])] try: subprocess.run(command, check=True) except subprocess.CalledProcessError as exception: try: subprocess.run(fallback_command, check=True) except subprocess.CalledProcessError as exception: print('Error expanding partition:', exception, file=sys.stderr) sys.exit(5) def _resize_file_system(device, requested_partition, free_space): """Resize a file system inside a partition.""" if requested_partition['type'] == 'btrfs': _resize_btrfs(device, requested_partition, free_space) elif requested_partition['type'] == 'ext4': _resize_ext4(device, requested_partition, free_space) def _resize_ext4(device, requested_partition, free_space): """Resize an ext4 file system inside a partition.""" partition_device = _get_partition_device( device, requested_partition['number']) try: command = ['resize2fs', partition_device] subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError as exception: print('Error expanding filesystem:', exception, file=sys.stderr) sys.exit(6) def _resize_btrfs(device, requested_partition, free_space): """Resize a btrfs file system inside a partition.""" try: command = ['btrfs', 'filesystem', 'resize', 'max', '/'] subprocess.run(command, stdout=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError as exception: print('Error expanding filesystem:', exception, file=sys.stderr) sys.exit(6) def _get_free_space(device): """Return the amount of free space after a partition.""" device, partition_number = \ _get_root_device_and_partition_number(device) try: requested_partition, free_spaces = \ _get_partitions_and_free_spaces(device, partition_number) except Exception as exception: print('Error getting partition details: ', exception, file=sys.stderr) sys.exit(2) # Don't accept extended partitions for now if requested_partition['table_type'] == 'msdos' and \ int(requested_partition['number']) >= 5: print('Expanding logical partitions currently unsupported', file=sys.stderr) sys.exit(3) # Don't accept anything but btrfs and ext4 filesystems if requested_partition['type'] not in ('btrfs', 'ext4'): print('Unsupported file system type: ', requested_partition['type'], file=sys.stderr) sys.exit(4) found_free_space = None for free_space in free_spaces: if free_space['start'] != requested_partition['end'] + 1: continue if free_space['size'] < 10 * 1024 * 1024: # Minimum 10MiB continue found_free_space = free_space if not found_free_space: sys.exit(5) return device, requested_partition, found_free_space def _get_partition_device(device, partition_number): """Return the device corresponding to a parition in a given device.""" if re.match('[0-9]', device[-1]): return device + 'p' + str(partition_number) return device + str(partition_number) def _get_root_device_and_partition_number(device): """Return the parent device and number of partition separately.""" match = re.match('(.+[a-zA-Z]\d+)p(\d+)$', device) if not match: match = re.match('(.+[a-zA-Z])(\d+)$', device) if not match: print('Invalid device, must be a partition', file=sys.stderr) sys.exit(1) return match.group(1), match.group(2) def _get_partitions_and_free_spaces(device, partition_number): """Run parted and return list of partitions and free spaces.""" command = ['parted', '--machine', '--script', device, 'unit', 'B', 'print', 'free'] process = subprocess.run(command, stdout=subprocess.PIPE, check=True) requested_partition = None free_spaces = [] lines = process.stdout.decode().splitlines() partition_table_type = lines[1].split(':')[5] for line in lines[2:]: line = line.rstrip(';') keys = ('number', 'start', 'end', 'size', 'type') parts = line.split(':') segment = dict(zip(keys, parts[:5])) segment['table_type'] = partition_table_type segment['start'] = _interpret_unit(segment['start']) segment['end'] = _interpret_unit(segment['end']) segment['size'] = _interpret_unit(segment['size']) if segment['type'] == 'free': segment['number'] = None free_spaces.append(segment) else: if segment['number'] == partition_number: requested_partition = segment return requested_partition, free_spaces def _interpret_unit(value): """Return value in bytes after understanding parted unit.""" value = value.rstrip('B') # For now, we only need to understand bytes return int(value) def subcommand_eject(arguments): """Eject a device by its path.""" device_path = arguments.device drive = eject_drive_of_device(device_path) print(json.dumps(drive)) def _get_options(): """Return the common options used for udisks2 operations.""" glib = utils.import_from_gi('GLib', '2.0') options = glib.Variant( 'a{sv}', {'auth.no_user_interaction': glib.Variant('b', True)}) return options def eject_drive_of_device(device_path): """Eject a device after unmounting all of its partitions. Return the details (model, vendor) of drives ejected. """ udisks = utils.import_from_gi('UDisks', '2.0') client = udisks.Client.new_sync() object_manager = client.get_object_manager() found_objects = [ obj for obj in object_manager.get_objects() if obj.get_block() and obj.get_block().props.device == device_path ] if not found_objects: raise ValueError( _('No such device - {device_path}').format( device_path=device_path)) obj = found_objects[0] # Unmount filesystems block_device = obj.get_block() drive_object_path = block_device.props.drive if drive_object_path != '/': umount_all_filesystems_of_drive(drive_object_path) else: # Block device has not associated drive umount_filesystem(obj.get_filesystem()) # Eject the drive drive = client.get_drive_for_block(block_device) if drive: drive.call_eject_sync(_get_options(), None) return { 'vendor': drive.props.vendor, 'model': drive.props.model, } return None def umount_filesystem(filesystem): """Unmount a filesystem """ if filesystem and filesystem.props.mount_points: filesystem.call_unmount_sync(_get_options()) def umount_all_filesystems_of_drive(drive_object_path): """Unmount all filesystems on block devices of a drive.""" udisks = utils.import_from_gi('UDisks', '2.0') client = udisks.Client.new_sync() object_manager = client.get_object_manager() for obj in object_manager.get_objects(): block_device = obj.get_block() if not block_device or block_device.props.drive != drive_object_path: continue umount_filesystem(obj.get_filesystem()) def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()