mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
- Helps test cases. - In future, we can resize non-root partitions. Tests: - On an amd64 disk image, apply this patch. Increase the image size. Boot the image. During first setup. The root partition should get expanded successfully and show full disk size. Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
442 lines
15 KiB
Python
Executable File
442 lines
15 KiB
Python
Executable File
#!/usr/bin/python3
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""
|
|
Configuration helper for disks manager.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
|
|
from plinth import utils
|
|
|
|
|
|
def parse_arguments():
|
|
"""Return parsed command line arguments as dictionary."""
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
|
|
|
|
subparsers.add_parser('setup', help='Configure storage after install')
|
|
|
|
subparser = subparsers.add_parser(
|
|
'is-partition-expandable',
|
|
help='Return whether a given partition can be expanded')
|
|
subparser.add_argument(
|
|
'device', help='Partition for which check needs to be performed')
|
|
|
|
subparser = subparsers.add_parser(
|
|
'expand-partition',
|
|
help='Expand a partition to take adjacent free space')
|
|
subparser.add_argument('device',
|
|
help='Partition which needs to be resized')
|
|
subparser.add_argument(
|
|
'--mount-point', default='/',
|
|
help=('Mount point which the device is mounted. '
|
|
'Needed for btrfs filesystems'))
|
|
|
|
subparser = subparsers.add_parser('mount', help='Mount a filesystem')
|
|
subparser.add_argument('--block-device',
|
|
help='Block device of the filesystem to mount')
|
|
|
|
subparser = subparsers.add_parser('eject', help='Eject a storage device')
|
|
subparser.add_argument('device', help='Path of the device to eject')
|
|
|
|
subparsers.add_parser('usage-info',
|
|
help='Get information about disk space usage')
|
|
|
|
subparser = subparsers.add_parser('validate-directory',
|
|
help='Validate a directory')
|
|
subparser.add_argument('--path', help='Path of the directory',
|
|
required=True)
|
|
subparser.add_argument('--check-creatable', required=False, default=False,
|
|
action='store_true',
|
|
help='Check that the directory is creatable')
|
|
subparser.add_argument('--check-writable', required=False, default=False,
|
|
action='store_true',
|
|
help='Check that the directory is writable')
|
|
|
|
subparsers.required = True
|
|
return parser.parse_args()
|
|
|
|
|
|
def subcommand_is_partition_expandable(arguments):
|
|
"""Return a list of partitions that can be expanded."""
|
|
_, _, free_space = _get_free_space(arguments.device)
|
|
print(free_space['size'])
|
|
|
|
|
|
def subcommand_expand_partition(arguments):
|
|
"""Expand a partition to take adjacent free space."""
|
|
device = arguments.device
|
|
mount_point = arguments.mount_point
|
|
device, requested_partition, free_space = _get_free_space(device)
|
|
|
|
if requested_partition['table_type'] == 'msdos' and \
|
|
int(requested_partition['number']) >= 5:
|
|
print('Expanding logical partitions currently unsupported',
|
|
file=sys.stderr)
|
|
sys.exit(4)
|
|
|
|
if requested_partition['table_type'] == 'gpt':
|
|
_move_gpt_second_header(device)
|
|
|
|
_resize_partition(device, requested_partition, free_space)
|
|
_resize_file_system(device, requested_partition, free_space, mount_point)
|
|
|
|
|
|
def _move_gpt_second_header(device):
|
|
"""Move second header to the end of the disk.
|
|
|
|
GPT scheme has two mostly identical partition table headers. One at the
|
|
beginning of the disk and one at the end. When an image is written to
|
|
larger disk, the second header is not at the end of the disk. Fix that by
|
|
moving second partition to end of the disk before attempting partition
|
|
resize.
|
|
|
|
"""
|
|
command = ['sgdisk', '--move-second-header', device]
|
|
try:
|
|
subprocess.run(command, check=True)
|
|
except subprocess.CalledProcessError:
|
|
print('Error moving GPT second header to the end')
|
|
sys.exit(6)
|
|
|
|
|
|
def _resize_partition(device, requested_partition, free_space):
|
|
"""Resize the partition table entry."""
|
|
command = [
|
|
'parted', '--align=optimal', '--script', device, 'unit', 'B',
|
|
'resizepart', requested_partition['number'],
|
|
str(free_space['end'])
|
|
]
|
|
# XXX: Remove workaround after bug in parted is fixed:
|
|
# https://debbugs.gnu.org/cgi/bugreport.cgi?bug=24215
|
|
fallback_command = [
|
|
'parted', '--align=optimal', device, '---pretend-input-tty', 'unit',
|
|
'B', 'resizepart', requested_partition['number']
|
|
]
|
|
try:
|
|
subprocess.run(command, check=True)
|
|
except subprocess.CalledProcessError:
|
|
try:
|
|
input_text = 'yes\n' + str(free_space['end'])
|
|
subprocess.run(fallback_command, check=True,
|
|
input=input_text.encode())
|
|
except subprocess.CalledProcessError as exception:
|
|
print('Error expanding partition:', exception, file=sys.stderr)
|
|
sys.exit(5)
|
|
|
|
|
|
def _resize_file_system(device, requested_partition, free_space,
|
|
mount_point='/'):
|
|
"""Resize a file system inside a partition."""
|
|
if requested_partition['type'] == 'btrfs':
|
|
_resize_btrfs(device, requested_partition, free_space, mount_point)
|
|
elif requested_partition['type'] == 'ext4':
|
|
_resize_ext4(device, requested_partition, free_space, mount_point)
|
|
|
|
|
|
def _resize_ext4(device, requested_partition, _free_space, _mount_point):
|
|
"""Resize an ext4 file system inside a partition."""
|
|
partition_device = _get_partition_device(device,
|
|
requested_partition['number'])
|
|
try:
|
|
command = ['resize2fs', partition_device]
|
|
subprocess.run(command, stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL, check=True)
|
|
except subprocess.CalledProcessError as exception:
|
|
print('Error expanding filesystem:', exception, file=sys.stderr)
|
|
sys.exit(6)
|
|
|
|
|
|
def _resize_btrfs(_device, _requested_partition, _free_space, mount_point='/'):
|
|
"""Resize a btrfs file system inside a partition."""
|
|
try:
|
|
command = ['btrfs', 'filesystem', 'resize', 'max', mount_point]
|
|
subprocess.run(command, stdout=subprocess.DEVNULL, check=True)
|
|
except subprocess.CalledProcessError as exception:
|
|
print('Error expanding filesystem:', exception, file=sys.stderr)
|
|
sys.exit(6)
|
|
|
|
|
|
def _get_free_space(device):
|
|
"""Return the amount of free space after a partition."""
|
|
device, partition_number = \
|
|
_get_root_device_and_partition_number(device)
|
|
|
|
try:
|
|
requested_partition, free_spaces = \
|
|
_get_partitions_and_free_spaces(device, partition_number)
|
|
except Exception as exception:
|
|
print('Error getting partition details: ', exception, file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
# Don't accept extended partitions for now
|
|
if requested_partition['table_type'] == 'msdos' and \
|
|
int(requested_partition['number']) >= 5:
|
|
print('Expanding logical partitions currently unsupported',
|
|
file=sys.stderr)
|
|
sys.exit(3)
|
|
|
|
# Don't accept anything but btrfs and ext4 filesystems
|
|
if requested_partition['type'] not in ('btrfs', 'ext4'):
|
|
print('Unsupported file system type: ', requested_partition['type'],
|
|
file=sys.stderr)
|
|
sys.exit(4)
|
|
|
|
found_free_space = None
|
|
for free_space in free_spaces:
|
|
if free_space['start'] != requested_partition['end'] + 1:
|
|
continue
|
|
|
|
if free_space['size'] < 10 * 1024 * 1024: # Minimum 10MiB
|
|
continue
|
|
|
|
found_free_space = free_space
|
|
|
|
if not found_free_space:
|
|
sys.exit(5)
|
|
|
|
return device, requested_partition, found_free_space
|
|
|
|
|
|
def _get_partition_device(device, partition_number):
|
|
"""Return the device corresponding to a parition in a given device."""
|
|
if re.match('[0-9]', device[-1]):
|
|
return device + 'p' + str(partition_number)
|
|
|
|
return device + str(partition_number)
|
|
|
|
|
|
def _get_root_device_and_partition_number(device):
|
|
"""Return the parent device and number of partition separately."""
|
|
match = re.match(r'(.+[a-zA-Z]\d+)p(\d+)$', device)
|
|
if not match:
|
|
match = re.match(r'(.+[a-zA-Z])(\d+)$', device)
|
|
if not match:
|
|
print('Invalid device, must be a partition', file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
return match.group(1), match.group(2)
|
|
|
|
|
|
def _get_partitions_and_free_spaces(device, partition_number):
|
|
"""Run parted and return list of partitions and free spaces."""
|
|
command = [
|
|
'parted', '--machine', '--script', device, 'unit', 'B', 'print', 'free'
|
|
]
|
|
process = subprocess.run(command, stdout=subprocess.PIPE, check=True)
|
|
|
|
requested_partition = None
|
|
free_spaces = []
|
|
|
|
lines = process.stdout.decode().splitlines()
|
|
partition_table_type = lines[1].split(':')[5]
|
|
for line in lines[2:]:
|
|
line = line.rstrip(';')
|
|
keys = ('number', 'start', 'end', 'size', 'type')
|
|
parts = line.split(':')
|
|
segment = dict(zip(keys, parts[:5]))
|
|
|
|
segment['table_type'] = partition_table_type
|
|
segment['start'] = _interpret_unit(segment['start'])
|
|
segment['end'] = _interpret_unit(segment['end'])
|
|
segment['size'] = _interpret_unit(segment['size'])
|
|
|
|
if segment['type'] == 'free':
|
|
segment['number'] = None
|
|
free_spaces.append(segment)
|
|
else:
|
|
if segment['number'] == partition_number:
|
|
requested_partition = segment
|
|
|
|
return requested_partition, free_spaces
|
|
|
|
|
|
def _interpret_unit(value):
|
|
"""Return value in bytes after understanding parted unit."""
|
|
value = value.rstrip('B') # For now, we only need to understand bytes
|
|
return int(value)
|
|
|
|
|
|
def subcommand_mount(arguments):
|
|
"""Mount a disk are root user.
|
|
|
|
XXX: This is primarily to provide compatibility with older code that used
|
|
udiskie to auto-mount all partitions as root user under /media/root/
|
|
directory. We are setting special permissions for the directory /media/root
|
|
and users have set shared folders using this path. This can be removed in
|
|
favor of using DBus API once we have a migration plan in place. Disks can
|
|
be mounted directly /mount without ACL restrictions that apply to
|
|
/mount/<user> directories. This can be done by setting udev flag
|
|
UDISKS_FILESYSTEM_SHARED=1 by writing a udev rule.
|
|
|
|
"""
|
|
process = subprocess.run([
|
|
'udisksctl', 'mount', '--block-device', arguments.block_device,
|
|
'--no-user-interaction'
|
|
], check=False)
|
|
sys.exit(process.returncode)
|
|
|
|
|
|
def subcommand_eject(arguments):
|
|
"""Eject a device by its path."""
|
|
device_path = arguments.device
|
|
try:
|
|
drive = eject_drive_of_device(device_path)
|
|
print(json.dumps(drive))
|
|
except Exception as exception:
|
|
print(exception, file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def _get_options():
|
|
"""Return the common options used for udisks2 operations."""
|
|
glib = utils.import_from_gi('GLib', '2.0')
|
|
options = glib.Variant(
|
|
'a{sv}', {'auth.no_user_interaction': glib.Variant('b', True)})
|
|
return options
|
|
|
|
|
|
def eject_drive_of_device(device_path):
|
|
"""Eject a device after unmounting all of its partitions.
|
|
|
|
Return the details (model, vendor) of drives ejected.
|
|
"""
|
|
udisks = utils.import_from_gi('UDisks', '2.0')
|
|
glib = utils.import_from_gi('GLib', '2.0')
|
|
client = udisks.Client.new_sync()
|
|
object_manager = client.get_object_manager()
|
|
|
|
found_objects = [
|
|
obj for obj in object_manager.get_objects()
|
|
if obj.get_block() and obj.get_block().props.device == device_path
|
|
]
|
|
|
|
if not found_objects:
|
|
raise ValueError(
|
|
'No such device - {device_path}'.format(device_path=device_path))
|
|
|
|
obj = found_objects[0]
|
|
|
|
# Unmount filesystems
|
|
block_device = obj.get_block()
|
|
drive_object_path = block_device.props.drive
|
|
if drive_object_path != '/':
|
|
umount_all_filesystems_of_drive(drive_object_path)
|
|
else:
|
|
# Block device has not associated drive
|
|
umount_filesystem(obj.get_filesystem())
|
|
|
|
# Eject the drive
|
|
drive = client.get_drive_for_block(block_device)
|
|
if drive:
|
|
try:
|
|
drive.call_eject_sync(_get_options(), None)
|
|
except glib.Error:
|
|
# Ignore error during ejection as along as all the filesystems are
|
|
# unmounted, the disk can be removed.
|
|
pass
|
|
|
|
return {
|
|
'vendor': drive.props.vendor,
|
|
'model': drive.props.model,
|
|
}
|
|
|
|
return None
|
|
|
|
|
|
def umount_filesystem(filesystem):
|
|
"""Unmount a filesystem """
|
|
if filesystem and filesystem.props.mount_points:
|
|
filesystem.call_unmount_sync(_get_options())
|
|
|
|
|
|
def umount_all_filesystems_of_drive(drive_object_path):
|
|
"""Unmount all filesystems on block devices of a drive."""
|
|
udisks = utils.import_from_gi('UDisks', '2.0')
|
|
client = udisks.Client.new_sync()
|
|
object_manager = client.get_object_manager()
|
|
|
|
for obj in object_manager.get_objects():
|
|
block_device = obj.get_block()
|
|
if not block_device or block_device.props.drive != drive_object_path:
|
|
continue
|
|
|
|
umount_filesystem(obj.get_filesystem())
|
|
|
|
|
|
def subcommand_setup(_):
|
|
"""Configure storage."""
|
|
# create udisks2 default mount directory
|
|
mounts_directory = '/media/root'
|
|
try:
|
|
os.mkdir(mounts_directory)
|
|
except FileExistsError:
|
|
pass
|
|
|
|
# make the directory readable and traversible by other users
|
|
stats = os.stat(mounts_directory)
|
|
os.chmod(mounts_directory, stats.st_mode | stat.S_IROTH | stat.S_IXOTH)
|
|
|
|
|
|
def subcommand_usage_info(_):
|
|
"""Get information about disk space usage."""
|
|
command = [
|
|
'df', '--exclude-type=tmpfs', '--exclude-type=devtmpfs',
|
|
'--block-size=1', '--output=source,fstype,size,used,avail,pcent,target'
|
|
]
|
|
subprocess.run(command, check=True)
|
|
|
|
|
|
def subcommand_validate_directory(arguments):
|
|
"""Validate a directory"""
|
|
if os.geteuid() == 0:
|
|
raise RuntimeError('You must not be root to run this command')
|
|
|
|
directory = arguments.path
|
|
|
|
def part_exists(path):
|
|
"""Returns part of the path that exists."""
|
|
if not path or os.path.exists(path):
|
|
return path
|
|
return part_exists(os.path.dirname(path))
|
|
|
|
if arguments.check_creatable:
|
|
directory = part_exists(directory)
|
|
if not directory:
|
|
directory = '.'
|
|
else:
|
|
if not os.path.exists(directory):
|
|
# doesn't exist
|
|
print('ValidationError: 1')
|
|
return
|
|
|
|
if not os.path.isdir(directory):
|
|
# is not a directory
|
|
print('ValidationError: 2')
|
|
elif not os.access(directory, os.R_OK):
|
|
# is not readable
|
|
print('ValidationError: 3')
|
|
elif arguments.check_writable or arguments.check_creatable:
|
|
if not os.access(directory, os.W_OK):
|
|
# is not writable
|
|
print('ValidationError: 4')
|
|
|
|
|
|
def main():
|
|
"""Parse arguments and perform all duties."""
|
|
arguments = parse_arguments()
|
|
|
|
subcommand = arguments.subcommand.replace('-', '_')
|
|
subcommand_method = globals()['subcommand_' + subcommand]
|
|
subcommand_method(arguments)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|