Veiko Aasa e7afa69155
storage: Directory selection form improvements
- Action script:
  - must not be root when validating directory
  - return only first validation error
- Directory selection form, transmission, deluge:
  show the download path as it is in the configuration,
  the path is resolved only on form submit.
- Tests: add relative path checks, refactor parametrize code

Signed-off-by: Veiko Aasa <veiko17@disroot.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
2020-03-02 20:51:41 -05:00

380 lines
12 KiB
Python
Executable File

#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Configuration helper for disks manager.
"""
import argparse
import json
import os
import re
import stat
import subprocess
import sys
from plinth import utils
def parse_arguments():
"""Return parsed command line arguments as dictionary."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
subparsers.add_parser('setup', help='Configure storage after install')
subparser = subparsers.add_parser(
'is-partition-expandable',
help='Return whether a given partition can be expanded')
subparser.add_argument(
'device', help='Partition for which check needs to be performed')
subparser = subparsers.add_parser(
'expand-partition',
help='Expand a partition to take adjacent free space')
subparser.add_argument('device',
help='Partition which needs to be resized')
subparser = subparsers.add_parser('eject', help='Eject a storage device')
subparser.add_argument('device', help='Path of the device to eject')
subparsers.add_parser('usage-info',
help='Get information about disk space usage')
subparser = subparsers.add_parser('validate-directory',
help='Validate a directory')
subparser.add_argument('--path', help='Path of the directory',
required=True)
subparser.add_argument('--check-creatable', required=False, default=False,
action='store_true',
help='Check that the directory is creatable')
subparser.add_argument('--check-writable', required=False, default=False,
action='store_true',
help='Check that the directory is writable')
subparsers.required = True
return parser.parse_args()
def subcommand_is_partition_expandable(arguments):
"""Return a list of partitions that can be expanded."""
_, _, free_space = _get_free_space(arguments.device)
print(free_space['size'])
def subcommand_expand_partition(arguments):
"""Expand a partition to take adjacent free space."""
device = arguments.device
device, requested_partition, free_space = _get_free_space(device)
if requested_partition['table_type'] == 'msdos' and \
int(requested_partition['number']) >= 5:
print('Expanding logical partitions currently unsupported',
file=sys.stderr)
sys.exit(4)
_resize_partition(device, requested_partition, free_space)
_resize_file_system(device, requested_partition, free_space)
def _resize_partition(device, requested_partition, free_space):
"""Resize the partition table entry."""
command = [
'parted', '--align=optimal', '--script', device, 'unit', 'B',
'resizepart', requested_partition['number'],
str(free_space['end'])
]
# XXX: Remove workaround after bug in parted is fixed:
# https://debbugs.gnu.org/cgi/bugreport.cgi?bug=24215
fallback_command = [
'parted', '--align=optimal', device, '---pretend-input-tty', 'unit',
'B', 'resizepart', requested_partition['number']
]
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError:
try:
input_text = 'yes\n' + str(free_space['end'])
subprocess.run(fallback_command, check=True,
input=input_text.encode())
except subprocess.CalledProcessError as exception:
print('Error expanding partition:', exception, file=sys.stderr)
sys.exit(5)
def _resize_file_system(device, requested_partition, free_space):
"""Resize a file system inside a partition."""
if requested_partition['type'] == 'btrfs':
_resize_btrfs(device, requested_partition, free_space)
elif requested_partition['type'] == 'ext4':
_resize_ext4(device, requested_partition, free_space)
def _resize_ext4(device, requested_partition, free_space):
"""Resize an ext4 file system inside a partition."""
partition_device = _get_partition_device(device,
requested_partition['number'])
try:
command = ['resize2fs', partition_device]
subprocess.run(command, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, check=True)
except subprocess.CalledProcessError as exception:
print('Error expanding filesystem:', exception, file=sys.stderr)
sys.exit(6)
def _resize_btrfs(device, requested_partition, free_space):
"""Resize a btrfs file system inside a partition."""
try:
command = ['btrfs', 'filesystem', 'resize', 'max', '/']
subprocess.run(command, stdout=subprocess.DEVNULL, check=True)
except subprocess.CalledProcessError as exception:
print('Error expanding filesystem:', exception, file=sys.stderr)
sys.exit(6)
def _get_free_space(device):
"""Return the amount of free space after a partition."""
device, partition_number = \
_get_root_device_and_partition_number(device)
try:
requested_partition, free_spaces = \
_get_partitions_and_free_spaces(device, partition_number)
except Exception as exception:
print('Error getting partition details: ', exception, file=sys.stderr)
sys.exit(2)
# Don't accept extended partitions for now
if requested_partition['table_type'] == 'msdos' and \
int(requested_partition['number']) >= 5:
print('Expanding logical partitions currently unsupported',
file=sys.stderr)
sys.exit(3)
# Don't accept anything but btrfs and ext4 filesystems
if requested_partition['type'] not in ('btrfs', 'ext4'):
print('Unsupported file system type: ', requested_partition['type'],
file=sys.stderr)
sys.exit(4)
found_free_space = None
for free_space in free_spaces:
if free_space['start'] != requested_partition['end'] + 1:
continue
if free_space['size'] < 10 * 1024 * 1024: # Minimum 10MiB
continue
found_free_space = free_space
if not found_free_space:
sys.exit(5)
return device, requested_partition, found_free_space
def _get_partition_device(device, partition_number):
"""Return the device corresponding to a parition in a given device."""
if re.match('[0-9]', device[-1]):
return device + 'p' + str(partition_number)
return device + str(partition_number)
def _get_root_device_and_partition_number(device):
"""Return the parent device and number of partition separately."""
match = re.match(r'(.+[a-zA-Z]\d+)p(\d+)$', device)
if not match:
match = re.match(r'(.+[a-zA-Z])(\d+)$', device)
if not match:
print('Invalid device, must be a partition', file=sys.stderr)
sys.exit(1)
return match.group(1), match.group(2)
def _get_partitions_and_free_spaces(device, partition_number):
"""Run parted and return list of partitions and free spaces."""
command = [
'parted', '--machine', '--script', device, 'unit', 'B', 'print', 'free'
]
process = subprocess.run(command, stdout=subprocess.PIPE, check=True)
requested_partition = None
free_spaces = []
lines = process.stdout.decode().splitlines()
partition_table_type = lines[1].split(':')[5]
for line in lines[2:]:
line = line.rstrip(';')
keys = ('number', 'start', 'end', 'size', 'type')
parts = line.split(':')
segment = dict(zip(keys, parts[:5]))
segment['table_type'] = partition_table_type
segment['start'] = _interpret_unit(segment['start'])
segment['end'] = _interpret_unit(segment['end'])
segment['size'] = _interpret_unit(segment['size'])
if segment['type'] == 'free':
segment['number'] = None
free_spaces.append(segment)
else:
if segment['number'] == partition_number:
requested_partition = segment
return requested_partition, free_spaces
def _interpret_unit(value):
"""Return value in bytes after understanding parted unit."""
value = value.rstrip('B') # For now, we only need to understand bytes
return int(value)
def subcommand_eject(arguments):
"""Eject a device by its path."""
device_path = arguments.device
drive = eject_drive_of_device(device_path)
print(json.dumps(drive))
def _get_options():
"""Return the common options used for udisks2 operations."""
glib = utils.import_from_gi('GLib', '2.0')
options = glib.Variant(
'a{sv}', {'auth.no_user_interaction': glib.Variant('b', True)})
return options
def eject_drive_of_device(device_path):
"""Eject a device after unmounting all of its partitions.
Return the details (model, vendor) of drives ejected.
"""
udisks = utils.import_from_gi('UDisks', '2.0')
client = udisks.Client.new_sync()
object_manager = client.get_object_manager()
found_objects = [
obj for obj in object_manager.get_objects()
if obj.get_block() and obj.get_block().props.device == device_path
]
if not found_objects:
raise ValueError(
'No such device - {device_path}'.format(device_path=device_path))
obj = found_objects[0]
# Unmount filesystems
block_device = obj.get_block()
drive_object_path = block_device.props.drive
if drive_object_path != '/':
umount_all_filesystems_of_drive(drive_object_path)
else:
# Block device has not associated drive
umount_filesystem(obj.get_filesystem())
# Eject the drive
drive = client.get_drive_for_block(block_device)
if drive:
drive.call_eject_sync(_get_options(), None)
return {
'vendor': drive.props.vendor,
'model': drive.props.model,
}
return None
def umount_filesystem(filesystem):
"""Unmount a filesystem """
if filesystem and filesystem.props.mount_points:
filesystem.call_unmount_sync(_get_options())
def umount_all_filesystems_of_drive(drive_object_path):
"""Unmount all filesystems on block devices of a drive."""
udisks = utils.import_from_gi('UDisks', '2.0')
client = udisks.Client.new_sync()
object_manager = client.get_object_manager()
for obj in object_manager.get_objects():
block_device = obj.get_block()
if not block_device or block_device.props.drive != drive_object_path:
continue
umount_filesystem(obj.get_filesystem())
def subcommand_setup(_):
"""Configure storage."""
# create udisks2 default mount directory
mounts_directory = '/media/root'
try:
os.mkdir(mounts_directory)
except FileExistsError:
pass
# make the directory readable and traversible by other users
stats = os.stat(mounts_directory)
os.chmod(mounts_directory, stats.st_mode | stat.S_IROTH | stat.S_IXOTH)
def subcommand_usage_info(_):
"""Get information about disk space usage."""
command = [
'df', '--exclude-type=tmpfs', '--exclude-type=devtmpfs',
'--block-size=1', '--output=source,fstype,size,used,avail,pcent,target'
]
subprocess.run(command, check=True)
def subcommand_validate_directory(arguments):
"""Validate a directory"""
if os.geteuid() == 0:
raise RuntimeError('You must not be root to run this command')
directory = arguments.path
def part_exists(path):
"""Returns part of the path that exists."""
if not path or os.path.exists(path):
return path
return part_exists(os.path.dirname(path))
if arguments.check_creatable:
directory = part_exists(directory)
if not directory:
directory = '.'
else:
if not os.path.exists(directory):
# doesn't exist
print('ValidationError: 1')
return
if not os.path.isdir(directory):
# is not a directory
print('ValidationError: 2')
elif not os.access(directory, os.R_OK):
# is not readable
print('ValidationError: 3')
elif arguments.check_writable or arguments.check_creatable:
if not os.access(directory, os.W_OK):
# is not writable
print('ValidationError: 4')
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()