#!/usr/bin/python3 # SPDX-License-Identifier: AGPL-3.0-or-later """ Configuration helper for disks manager. """ import argparse import json import os import re import stat import subprocess import sys from plinth import utils def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') subparsers.add_parser('setup', help='Configure storage after install') subparser = subparsers.add_parser( 'is-partition-expandable', help='Return whether a given partition can be expanded') subparser.add_argument( 'device', help='Partition for which check needs to be performed') subparser = subparsers.add_parser( 'expand-partition', help='Expand a partition to take adjacent free space') subparser.add_argument('device', help='Partition which needs to be resized') subparser = subparsers.add_parser('mount', help='Mount a filesystem') subparser.add_argument('--block-device', help='Block device of the filesystem to mount') subparser = subparsers.add_parser('eject', help='Eject a storage device') subparser.add_argument('device', help='Path of the device to eject') subparsers.add_parser('usage-info', help='Get information about disk space usage') subparser = subparsers.add_parser('validate-directory', help='Validate a directory') subparser.add_argument('--path', help='Path of the directory', required=True) subparser.add_argument('--check-creatable', required=False, default=False, action='store_true', help='Check that the directory is creatable') subparser.add_argument('--check-writable', required=False, default=False, action='store_true', help='Check that the directory is writable') subparsers.required = True return parser.parse_args() def subcommand_is_partition_expandable(arguments): """Return a list of partitions that can be expanded.""" _, _, free_space = _get_free_space(arguments.device) print(free_space['size']) def subcommand_expand_partition(arguments): """Expand a partition to take adjacent free space.""" device = arguments.device device, requested_partition, free_space = _get_free_space(device) if requested_partition['table_type'] == 'msdos' and \ int(requested_partition['number']) >= 5: print('Expanding logical partitions currently unsupported', file=sys.stderr) sys.exit(4) if requested_partition['table_type'] == 'gpt': _move_gpt_second_header(device) _resize_partition(device, requested_partition, free_space) _resize_file_system(device, requested_partition, free_space) def _move_gpt_second_header(device): """Move second header to the end of the disk. GPT scheme has two mostly identical partition table headers. One at the beginning of the disk and one at the end. When an image is written to larger disk, the second header is not at the end of the disk. Fix that by moving second partition to end of the disk before attempting partition resize. """ command = ['sgdisk', '--move-second-header', device] try: subprocess.run(command, check=True) except subprocess.CalledProcessError: print('Error moving GPT second header to the end') sys.exit(6) def _resize_partition(device, requested_partition, free_space): """Resize the partition table entry.""" command = [ 'parted', '--align=optimal', '--script', device, 'unit', 'B', 'resizepart', requested_partition['number'], str(free_space['end']) ] # XXX: Remove workaround after bug in parted is fixed: # https://debbugs.gnu.org/cgi/bugreport.cgi?bug=24215 fallback_command = [ 'parted', '--align=optimal', device, '---pretend-input-tty', 'unit', 'B', 'resizepart', requested_partition['number'] ] try: subprocess.run(command, check=True) except subprocess.CalledProcessError: try: input_text = 'yes\n' + str(free_space['end']) subprocess.run(fallback_command, check=True, input=input_text.encode()) except subprocess.CalledProcessError as exception: print('Error expanding partition:', exception, file=sys.stderr) sys.exit(5) def _resize_file_system(device, requested_partition, free_space): """Resize a file system inside a partition.""" if requested_partition['type'] == 'btrfs': _resize_btrfs(device, requested_partition, free_space) elif requested_partition['type'] == 'ext4': _resize_ext4(device, requested_partition, free_space) def _resize_ext4(device, requested_partition, free_space): """Resize an ext4 file system inside a partition.""" partition_device = _get_partition_device(device, requested_partition['number']) try: command = ['resize2fs', partition_device] subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError as exception: print('Error expanding filesystem:', exception, file=sys.stderr) sys.exit(6) def _resize_btrfs(device, requested_partition, free_space): """Resize a btrfs file system inside a partition.""" try: command = ['btrfs', 'filesystem', 'resize', 'max', '/'] subprocess.run(command, stdout=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError as exception: print('Error expanding filesystem:', exception, file=sys.stderr) sys.exit(6) def _get_free_space(device): """Return the amount of free space after a partition.""" device, partition_number = \ _get_root_device_and_partition_number(device) try: requested_partition, free_spaces = \ _get_partitions_and_free_spaces(device, partition_number) except Exception as exception: print('Error getting partition details: ', exception, file=sys.stderr) sys.exit(2) # Don't accept extended partitions for now if requested_partition['table_type'] == 'msdos' and \ int(requested_partition['number']) >= 5: print('Expanding logical partitions currently unsupported', file=sys.stderr) sys.exit(3) # Don't accept anything but btrfs and ext4 filesystems if requested_partition['type'] not in ('btrfs', 'ext4'): print('Unsupported file system type: ', requested_partition['type'], file=sys.stderr) sys.exit(4) found_free_space = None for free_space in free_spaces: if free_space['start'] != requested_partition['end'] + 1: continue if free_space['size'] < 10 * 1024 * 1024: # Minimum 10MiB continue found_free_space = free_space if not found_free_space: sys.exit(5) return device, requested_partition, found_free_space def _get_partition_device(device, partition_number): """Return the device corresponding to a parition in a given device.""" if re.match('[0-9]', device[-1]): return device + 'p' + str(partition_number) return device + str(partition_number) def _get_root_device_and_partition_number(device): """Return the parent device and number of partition separately.""" match = re.match(r'(.+[a-zA-Z]\d+)p(\d+)$', device) if not match: match = re.match(r'(.+[a-zA-Z])(\d+)$', device) if not match: print('Invalid device, must be a partition', file=sys.stderr) sys.exit(1) return match.group(1), match.group(2) def _get_partitions_and_free_spaces(device, partition_number): """Run parted and return list of partitions and free spaces.""" command = [ 'parted', '--machine', '--script', device, 'unit', 'B', 'print', 'free' ] process = subprocess.run(command, stdout=subprocess.PIPE, check=True) requested_partition = None free_spaces = [] lines = process.stdout.decode().splitlines() partition_table_type = lines[1].split(':')[5] for line in lines[2:]: line = line.rstrip(';') keys = ('number', 'start', 'end', 'size', 'type') parts = line.split(':') segment = dict(zip(keys, parts[:5])) segment['table_type'] = partition_table_type segment['start'] = _interpret_unit(segment['start']) segment['end'] = _interpret_unit(segment['end']) segment['size'] = _interpret_unit(segment['size']) if segment['type'] == 'free': segment['number'] = None free_spaces.append(segment) else: if segment['number'] == partition_number: requested_partition = segment return requested_partition, free_spaces def _interpret_unit(value): """Return value in bytes after understanding parted unit.""" value = value.rstrip('B') # For now, we only need to understand bytes return int(value) def subcommand_mount(arguments): """Mount a disk are root user. XXX: This is primarily to provide compatibility with older code that used udiskie to auto-mount all partitions as root user under /media/root/ directory. We are setting special permissions for the directory /media/root and users have set shared folders using this path. This can be removed in favor of using DBus API once we have a migration plan in place. Disks can be mounted directly /mount without ACL restrictions that apply to /mount/ directories. This can be done by setting udev flag UDISKS_FILESYSTEM_SHARED=1 by writing a udev rule. """ process = subprocess.run([ 'udisksctl', 'mount', '--block-device', arguments.block_device, '--no-user-interaction' ]) sys.exit(process.returncode) def subcommand_eject(arguments): """Eject a device by its path.""" device_path = arguments.device try: drive = eject_drive_of_device(device_path) print(json.dumps(drive)) except Exception as exception: print(exception, file=sys.stderr) sys.exit(1) def _get_options(): """Return the common options used for udisks2 operations.""" glib = utils.import_from_gi('GLib', '2.0') options = glib.Variant( 'a{sv}', {'auth.no_user_interaction': glib.Variant('b', True)}) return options def eject_drive_of_device(device_path): """Eject a device after unmounting all of its partitions. Return the details (model, vendor) of drives ejected. """ udisks = utils.import_from_gi('UDisks', '2.0') glib = utils.import_from_gi('GLib', '2.0') client = udisks.Client.new_sync() object_manager = client.get_object_manager() found_objects = [ obj for obj in object_manager.get_objects() if obj.get_block() and obj.get_block().props.device == device_path ] if not found_objects: raise ValueError( 'No such device - {device_path}'.format(device_path=device_path)) obj = found_objects[0] # Unmount filesystems block_device = obj.get_block() drive_object_path = block_device.props.drive if drive_object_path != '/': umount_all_filesystems_of_drive(drive_object_path) else: # Block device has not associated drive umount_filesystem(obj.get_filesystem()) # Eject the drive drive = client.get_drive_for_block(block_device) if drive: try: drive.call_eject_sync(_get_options(), None) except glib.Error: # Ignore error during ejection as along as all the filesystems are # unmounted, the disk can be removed. pass return { 'vendor': drive.props.vendor, 'model': drive.props.model, } return None def umount_filesystem(filesystem): """Unmount a filesystem """ if filesystem and filesystem.props.mount_points: filesystem.call_unmount_sync(_get_options()) def umount_all_filesystems_of_drive(drive_object_path): """Unmount all filesystems on block devices of a drive.""" udisks = utils.import_from_gi('UDisks', '2.0') client = udisks.Client.new_sync() object_manager = client.get_object_manager() for obj in object_manager.get_objects(): block_device = obj.get_block() if not block_device or block_device.props.drive != drive_object_path: continue umount_filesystem(obj.get_filesystem()) def subcommand_setup(_): """Configure storage.""" # create udisks2 default mount directory mounts_directory = '/media/root' try: os.mkdir(mounts_directory) except FileExistsError: pass # make the directory readable and traversible by other users stats = os.stat(mounts_directory) os.chmod(mounts_directory, stats.st_mode | stat.S_IROTH | stat.S_IXOTH) def subcommand_usage_info(_): """Get information about disk space usage.""" command = [ 'df', '--exclude-type=tmpfs', '--exclude-type=devtmpfs', '--block-size=1', '--output=source,fstype,size,used,avail,pcent,target' ] subprocess.run(command, check=True) def subcommand_validate_directory(arguments): """Validate a directory""" if os.geteuid() == 0: raise RuntimeError('You must not be root to run this command') directory = arguments.path def part_exists(path): """Returns part of the path that exists.""" if not path or os.path.exists(path): return path return part_exists(os.path.dirname(path)) if arguments.check_creatable: directory = part_exists(directory) if not directory: directory = '.' else: if not os.path.exists(directory): # doesn't exist print('ValidationError: 1') return if not os.path.isdir(directory): # is not a directory print('ValidationError: 2') elif not os.access(directory, os.R_OK): # is not readable print('ValidationError: 3') elif arguments.check_writable or arguments.check_creatable: if not os.access(directory, os.W_OK): # is not writable print('ValidationError: 4') def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()