diff --git a/plinth/modules/storage/__init__.py b/plinth/modules/storage/__init__.py index a823ccba6..7b29265e6 100644 --- a/plinth/modules/storage/__init__.py +++ b/plinth/modules/storage/__init__.py @@ -1,25 +1,21 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -""" -FreedomBox app to manage storage. -""" +"""FreedomBox app to manage storage.""" import base64 import logging -import subprocess import psutil from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_noop -from plinth import actions from plinth import app as app_module from plinth import cfg, glib, menu -from plinth.errors import ActionError, PlinthError +from plinth.errors import PlinthError from plinth.modules.backups.components import BackupRestore from plinth.package import Packages from plinth.utils import format_lazy -from . import manifest, udisks2 +from . import manifest, privileged, udisks2 _description = [ format_lazy( @@ -77,19 +73,19 @@ class StorageApp(app_module.App): def setup(self, old_version): """Install and configure the app.""" super().setup(old_version) - actions.superuser_run('storage', ['setup']) + privileged.setup() self.enable() disks = get_disks() root_device = get_root_device(disks) if is_expandable(root_device): try: - expand_partition(root_device) - except ActionError: + privileged.expand_partition(root_device) + except Exception: pass def get_disks(): - """Returns list of disks and their free space. + """Return list of disks and their free space. The primary source of information is UDisks' list of block devices. Information from df is used for free space available. @@ -136,8 +132,8 @@ def get_mounts(): def _get_disks_from_df(): """Return the list of disks and free space available using 'df'.""" try: - output = actions.superuser_run('storage', ['usage-info']) - except subprocess.CalledProcessError as exception: + output = privileged.usage_info() + except Exception as exception: logger.exception('Error getting disk information: %s', exception) return [] @@ -162,7 +158,7 @@ def _get_disks_from_df(): def get_filesystem_type(mount_point='/'): - """Returns the type of the filesystem mounted at mountpoint.""" + """Return the type of the filesystem mounted at mountpoint.""" for partition in psutil.disk_partitions(): if partition.mountpoint == mount_point: return partition.fstype @@ -204,19 +200,10 @@ def is_expandable(device): return False try: - output = actions.superuser_run('storage', - ['is-partition-expandable', device], - log_error=False) - except actions.ActionError: + return privileged.is_partition_expandable(device, _log_error=False) + except Exception: return False - return int(output.strip()) - - -def expand_partition(device): - """Expand a partition.""" - actions.superuser_run('storage', ['expand-partition', device]) - def format_bytes(size): """Return human readable disk size from bytes.""" diff --git a/plinth/modules/storage/forms.py b/plinth/modules/storage/forms.py index be419a827..232cdc9bb 100644 --- a/plinth/modules/storage/forms.py +++ b/plinth/modules/storage/forms.py @@ -1,7 +1,5 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -""" -Forms for directory selection. -""" +"""Forms for directory selection.""" import os @@ -9,11 +7,12 @@ from django import forms from django.core.exceptions import ValidationError from django.utils.translation import gettext_lazy as _ -from plinth import actions from plinth import app as app_module from plinth.modules import storage from plinth.modules.samba import privileged as samba_privileged +from . import privileged + def get_available_samba_shares(): """Get available samba shares.""" @@ -40,6 +39,8 @@ def _is_app_enabled(app_id): class DirectoryValidator: + """Validation helper to check a directory.""" + username = None check_writable = False check_creatable = False @@ -48,6 +49,7 @@ class DirectoryValidator: def __init__(self, username=None, check_writable=None, check_creatable=None): + """Initialize the validator.""" if username is not None: self.username = username if check_writable is not None: @@ -60,29 +62,22 @@ class DirectoryValidator: if not value.startswith('/'): raise ValidationError(_('Invalid directory name.'), 'invalid') - command = ['validate-directory', '--path', value] - if self.check_creatable: - command.append('--check-creatable') - elif self.check_writable: - command.append('--check-writable') + try: + if not self.username: + raise ValueError('Invalid username for directory validator') - if self.username: - output = actions.run_as_user('storage', command, - become_user=self.username) - else: - output = actions.run('storage', command) - - if 'ValidationError' in output: - error_nr = int(output.strip().split()[1]) - if error_nr == 1: - raise ValidationError(_('Directory does not exist.'), - 'invalid') - elif error_nr == 2: - raise ValidationError(_('Path is not a directory.'), 'invalid') - elif error_nr == 3: + privileged.validate_directory(value, self.check_creatable, + self.check_writable, + _run_as_user=self.username) + except FileNotFoundError: + raise ValidationError(_('Directory does not exist.'), 'invalid') + except NotADirectoryError: + raise ValidationError(_('Path is not a directory.'), 'invalid') + except PermissionError as exception: + if exception.args[0] == 'read': raise ValidationError( _('Directory is not readable by the user.'), 'invalid') - elif error_nr == 4: + else: raise ValidationError( _('Directory is not writable by the user.'), 'invalid') diff --git a/actions/storage b/plinth/modules/storage/privileged.py old mode 100755 new mode 100644 similarity index 66% rename from actions/storage rename to plinth/modules/storage/privileged.py index ec682ec17..9138a7373 --- a/actions/storage +++ b/plinth/modules/storage/privileged.py @@ -1,85 +1,31 @@ -#!/usr/bin/python3 # SPDX-License-Identifier: AGPL-3.0-or-later -""" -Configuration helper for disks manager. -""" +"""Configure disks manager.""" -import argparse -import json import os import re import stat import subprocess -import sys from plinth import utils +from plinth.actions import privileged -def parse_arguments(): - """Return parsed command line arguments as dictionary.""" - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') - - subparsers.add_parser('setup', help='Configure storage after install') - - subparser = subparsers.add_parser( - 'is-partition-expandable', - help='Return whether a given partition can be expanded') - subparser.add_argument( - 'device', help='Partition for which check needs to be performed') - - subparser = subparsers.add_parser( - 'expand-partition', - help='Expand a partition to take adjacent free space') - subparser.add_argument('device', - help='Partition which needs to be resized') - subparser.add_argument( - '--mount-point', default='/', - help=('Mount point which the device is mounted. ' - 'Needed for btrfs filesystems')) - - subparser = subparsers.add_parser('mount', help='Mount a filesystem') - subparser.add_argument('--block-device', - help='Block device of the filesystem to mount') - - subparser = subparsers.add_parser('eject', help='Eject a storage device') - subparser.add_argument('device', help='Path of the device to eject') - - subparsers.add_parser('usage-info', - help='Get information about disk space usage') - - subparser = subparsers.add_parser('validate-directory', - help='Validate a directory') - subparser.add_argument('--path', help='Path of the directory', - required=True) - subparser.add_argument('--check-creatable', required=False, default=False, - action='store_true', - help='Check that the directory is creatable') - subparser.add_argument('--check-writable', required=False, default=False, - action='store_true', - help='Check that the directory is writable') - - subparsers.required = True - return parser.parse_args() - - -def subcommand_is_partition_expandable(arguments): +@privileged +def is_partition_expandable(device: str) -> int: """Return a list of partitions that can be expanded.""" - _, _, free_space = _get_free_space(arguments.device) - print(free_space['size']) + _, _, free_space = _get_free_space(device) + return int(free_space['size']) -def subcommand_expand_partition(arguments): +@privileged +def expand_partition(device: str, mount_point: str = '/'): """Expand a partition to take adjacent free space.""" - device = arguments.device - mount_point = arguments.mount_point device, requested_partition, free_space = _get_free_space(device) if requested_partition['table_type'] == 'msdos' and \ int(requested_partition['number']) >= 5: - print('Expanding logical partitions currently unsupported', - file=sys.stderr) - sys.exit(4) + raise RuntimeError( + 'Expanding logical partitions currently unsupported') if requested_partition['table_type'] == 'gpt': _move_gpt_second_header(device) @@ -102,8 +48,7 @@ def _move_gpt_second_header(device): try: subprocess.run(command, check=True) except subprocess.CalledProcessError: - print('Error moving GPT second header to the end') - sys.exit(6) + raise RuntimeError('Error moving GPT second header to the end') def _resize_partition(device, requested_partition, free_space): @@ -127,8 +72,7 @@ def _resize_partition(device, requested_partition, free_space): subprocess.run(fallback_command, check=True, input=input_text.encode()) except subprocess.CalledProcessError as exception: - print('Error expanding partition:', exception, file=sys.stderr) - sys.exit(5) + raise RuntimeError(f'Error expanding partition: {exception}') def _resize_file_system(device, requested_partition, free_space, @@ -149,8 +93,7 @@ def _resize_ext4(device, requested_partition, _free_space, _mount_point): subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError as exception: - print('Error expanding filesystem:', exception, file=sys.stderr) - sys.exit(6) + raise RuntimeError(f'Error expanding filesystem: {exception}') def _resize_btrfs(_device, _requested_partition, _free_space, mount_point='/'): @@ -159,8 +102,7 @@ def _resize_btrfs(_device, _requested_partition, _free_space, mount_point='/'): command = ['btrfs', 'filesystem', 'resize', 'max', mount_point] subprocess.run(command, stdout=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError as exception: - print('Error expanding filesystem:', exception, file=sys.stderr) - sys.exit(6) + raise RuntimeError(f'Error expanding filesystem: {exception}') def _get_free_space(device): @@ -172,21 +114,18 @@ def _get_free_space(device): requested_partition, free_spaces = \ _get_partitions_and_free_spaces(device, partition_number) except Exception as exception: - print('Error getting partition details: ', exception, file=sys.stderr) - sys.exit(2) + raise RuntimeError(f'Error getting partition details: {exception}') # Don't accept extended partitions for now if requested_partition['table_type'] == 'msdos' and \ int(requested_partition['number']) >= 5: - print('Expanding logical partitions currently unsupported', - file=sys.stderr) - sys.exit(3) + raise RuntimeError( + 'Expanding logical partitions currently unsupported') # Don't accept anything but btrfs and ext4 filesystems if requested_partition['type'] not in ('btrfs', 'ext4'): - print('Unsupported file system type: ', requested_partition['type'], - file=sys.stderr) - sys.exit(4) + raise RuntimeError( + f'Unsupported file system type: {requested_partition["type"]}') found_free_space = None for free_space in free_spaces: @@ -199,7 +138,7 @@ def _get_free_space(device): found_free_space = free_space if not found_free_space: - sys.exit(5) + raise RuntimeError('No free space available') return device, requested_partition, found_free_space @@ -218,8 +157,7 @@ def _get_root_device_and_partition_number(device): if not match: match = re.match(r'(.+[a-zA-Z])(\d+)$', device) if not match: - print('Invalid device, must be a partition', file=sys.stderr) - sys.exit(1) + raise ValueError('Invalid device, must be a partition') return match.group(1), match.group(2) @@ -263,7 +201,8 @@ def _interpret_unit(value): return int(value) -def subcommand_mount(arguments): +@privileged +def mount(block_device: str): """Mount a disk are root user. XXX: This is primarily to provide compatibility with older code that used @@ -276,22 +215,16 @@ def subcommand_mount(arguments): UDISKS_FILESYSTEM_SHARED=1 by writing a udev rule. """ - process = subprocess.run([ - 'udisksctl', 'mount', '--block-device', arguments.block_device, + subprocess.run([ + 'udisksctl', 'mount', '--block-device', block_device, '--no-user-interaction' - ], check=False) - sys.exit(process.returncode) + ], check=True) -def subcommand_eject(arguments): +@privileged +def eject(device_path: str) -> str: """Eject a device by its path.""" - device_path = arguments.device - try: - drive = eject_drive_of_device(device_path) - print(json.dumps(drive)) - except Exception as exception: - print(exception, file=sys.stderr) - sys.exit(1) + return _eject_drive_of_device(device_path) def _get_options(): @@ -302,7 +235,7 @@ def _get_options(): return options -def eject_drive_of_device(device_path): +def _eject_drive_of_device(device_path): """Eject a device after unmounting all of its partitions. Return the details (model, vendor) of drives ejected. @@ -327,10 +260,10 @@ def eject_drive_of_device(device_path): block_device = obj.get_block() drive_object_path = block_device.props.drive if drive_object_path != '/': - umount_all_filesystems_of_drive(drive_object_path) + _umount_all_filesystems_of_drive(drive_object_path) else: # Block device has not associated drive - umount_filesystem(obj.get_filesystem()) + _umount_filesystem(obj.get_filesystem()) # Eject the drive drive = client.get_drive_for_block(block_device) @@ -350,13 +283,13 @@ def eject_drive_of_device(device_path): return None -def umount_filesystem(filesystem): - """Unmount a filesystem """ +def _umount_filesystem(filesystem): + """Unmount a filesystem.""" if filesystem and filesystem.props.mount_points: filesystem.call_unmount_sync(_get_options()) -def umount_all_filesystems_of_drive(drive_object_path): +def _umount_all_filesystems_of_drive(drive_object_path): """Unmount all filesystems on block devices of a drive.""" udisks = utils.import_from_gi('UDisks', '2.0') client = udisks.Client.new_sync() @@ -367,10 +300,11 @@ def umount_all_filesystems_of_drive(drive_object_path): if not block_device or block_device.props.drive != drive_object_path: continue - umount_filesystem(obj.get_filesystem()) + _umount_filesystem(obj.get_filesystem()) -def subcommand_setup(_): +@privileged +def setup(): """Configure storage.""" # create udisks2 default mount directory mounts_directory = '/media/root' @@ -384,58 +318,43 @@ def subcommand_setup(_): os.chmod(mounts_directory, stats.st_mode | stat.S_IROTH | stat.S_IXOTH) -def subcommand_usage_info(_): +@privileged +def usage_info() -> str: """Get information about disk space usage.""" command = [ 'df', '--exclude-type=tmpfs', '--exclude-type=devtmpfs', '--block-size=1', '--output=source,fstype,size,used,avail,pcent,target' ] - subprocess.run(command, check=True) + return subprocess.check_output(command).decode() -def subcommand_validate_directory(arguments): - """Validate a directory""" +@privileged +def validate_directory(directory: str, check_creatable: bool, + check_writable: bool): + """Validate a directory.""" if os.geteuid() == 0: raise RuntimeError('You must not be root to run this command') - directory = arguments.path - def part_exists(path): - """Returns part of the path that exists.""" + """Return part of the path that exists.""" if not path or os.path.exists(path): return path return part_exists(os.path.dirname(path)) - if arguments.check_creatable: + if check_creatable: directory = part_exists(directory) if not directory: directory = '.' else: if not os.path.exists(directory): - # doesn't exist - print('ValidationError: 1') - return + raise FileNotFoundError if not os.path.isdir(directory): - # is not a directory - print('ValidationError: 2') - elif not os.access(directory, os.R_OK): - # is not readable - print('ValidationError: 3') - elif arguments.check_writable or arguments.check_creatable: + raise NotADirectoryError + + if not os.access(directory, os.R_OK): + raise PermissionError('read') + + if check_writable or check_creatable: if not os.access(directory, os.W_OK): - # is not writable - print('ValidationError: 4') - - -def main(): - """Parse arguments and perform all duties.""" - arguments = parse_arguments() - - subcommand = arguments.subcommand.replace('-', '_') - subcommand_method = globals()['subcommand_' + subcommand] - subcommand_method(arguments) - - -if __name__ == '__main__': - main() + raise PermissionError('write') diff --git a/plinth/modules/storage/tests/test_storage.py b/plinth/modules/storage/tests/test_storage.py index 4877ee81b..cb2672bd4 100644 --- a/plinth/modules/storage/tests/test_storage.py +++ b/plinth/modules/storage/tests/test_storage.py @@ -4,14 +4,17 @@ Test module for storage module operations. """ import contextlib -import os -import pathlib import re import subprocess import tempfile import pytest +from plinth.modules.storage import privileged + +pytestmark = pytest.mark.usefixtures('mock_privileged') +privileged_modules_to_mock = ['plinth.modules.storage.privileged'] + class Disk(): """Context manager to create/destroy a disk.""" @@ -255,8 +258,11 @@ def test_ext4_expansion(partition_table_type): def _assert_free_space(disk, partition_number, space=True): """Verify that free is available/not available after a partition.""" device = disk.get_partition_device(partition_number) - result = _check_action(['storage', 'is-partition-expandable', device]) - assert result == space + if space: + privileged.is_partition_expandable(device) + else: + with pytest.raises(RuntimeError): + privileged.is_partition_expandable(device) def _expand_partition(disk, partition_number, success=True): @@ -264,34 +270,15 @@ def _expand_partition(disk, partition_number, success=True): _assert_aligned(disk, partition_number) with disk.mount_partition(partition_number) as mount_point: device = disk.get_partition_device(partition_number) - result = _check_action([ - 'storage', 'expand-partition', device, '--mount-point', mount_point - ]) + if success: + privileged.expand_partition(device, mount_point) + else: + with pytest.raises(RuntimeError): + privileged.expand_partition(device, mount_point) - assert result == success _assert_aligned(disk, partition_number) -def _call_action(action_command, check=True, **kwargs): - """Call the action script.""" - test_directory = pathlib.Path(__file__).parent - top_directory = (test_directory / '..' / '..' / '..' / '..').resolve() - action_command[0] = top_directory / 'actions' / action_command[0] - kwargs['stdout'] = kwargs.get('stdout', subprocess.DEVNULL) - kwargs['stderr'] = kwargs.get('stderr', subprocess.DEVNULL) - env = dict(os.environ, PYTHONPATH=str(top_directory)) - return subprocess.run(action_command, env=env, check=check, **kwargs) - - -def _check_action(action_command): - """Return success/failure result of the action command.""" - try: - _call_action(action_command) - return True - except subprocess.CalledProcessError: - return False - - def _assert_aligned(disk, partition_number): """Test that partition is optimally aligned.""" subprocess.run([ @@ -319,43 +306,40 @@ def _assert_ext4_file_system_healthy(disk, partition_number): def _assert_validate_directory(path, error, check_writable=False, check_creatable=False): """Perform directory validation checks.""" - action_command = ['storage', 'validate-directory', '--path', path] - if check_writable: - action_command += ['--check-writable'] - if check_creatable: - action_command += ['--check-creatable'] - proc = _call_action(action_command, stderr=subprocess.PIPE, - stdout=subprocess.PIPE) - output = proc.stdout.decode() - if 'ValidationError' in output: - error_nr = output.strip().split()[1] - assert error_nr == error + if error: + match = None if not error.args else error.args[0] + with pytest.raises(error.__class__, match=match): + privileged.validate_directory(path, check_creatable, + check_writable) else: - assert output == error + privileged.validate_directory(path, check_creatable, check_writable) @pytest.mark.usefixtures('needs_not_root') -@pytest.mark.parametrize('path,error', [('/missing', '1'), - ('/etc/os-release', '2'), - ('/root', '3'), ('/', ''), - ('/etc/..', '')]) +@pytest.mark.parametrize('path,error', + [('/missing', FileNotFoundError()), + ('/etc/os-release', NotADirectoryError()), + ('/root', PermissionError('read')), ('/', None), + ('/etc/..', None)]) def test_validate_directory(path, error): """Test that directory validation returns expected output.""" _assert_validate_directory(path, error) @pytest.mark.usefixtures('needs_not_root') -@pytest.mark.parametrize('path,error', [('/', '4'), ('/tmp', '')]) +@pytest.mark.parametrize('path,error', [('/', PermissionError('write')), + ('/tmp', None)]) def test_validate_directory_writable(path, error): """Test that directory writable validation returns expected output.""" _assert_validate_directory(path, error, check_writable=True) @pytest.mark.usefixtures('needs_not_root') -@pytest.mark.parametrize('path,error', - [('/var/lib/plinth_storage_test_not_exists', '4'), - ('/tmp/plint_storage_test_not_exists', ''), - ('/var/../tmp/plint_storage_test_not_exists', '')]) +@pytest.mark.parametrize( + 'path,error', + [('/var/lib/plinth_storage_test_not_exists', PermissionError('write')), + ('/tmp/plint_storage_test_not_exists', None), + ('/var/../tmp/plint_storage_test_not_exists', None)]) def test_validate_directory_creatable(path, error): """Test that directory creatable validation returns expected output.""" _assert_validate_directory(path, error, check_creatable=True) diff --git a/plinth/modules/storage/udisks2.py b/plinth/modules/storage/udisks2.py index a98d985c8..9b536b40e 100644 --- a/plinth/modules/storage/udisks2.py +++ b/plinth/modules/storage/udisks2.py @@ -1,15 +1,14 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -""" -Handle disk operations using UDisk2 DBus API. -""" +"""Handle disk operations using UDisk2 DBus API.""" import logging import threading -from plinth import actions, cfg -from plinth.errors import ActionError +from plinth import cfg from plinth.utils import import_from_gi +from . import privileged + glib = import_from_gi('GLib', '2.0') gio = import_from_gi('Gio', '2.0') @@ -195,11 +194,8 @@ def _mount(object_path): logger.info('Auto-mounting device: %s %s', block_device.id, block_device.preferred_device) try: - actions.superuser_run( - 'storage', - ['mount', '--block-device', block_device.preferred_device], - log_error=False) - except ActionError as exception: + privileged.mount(block_device.preferred_device, _log_error=False) + except Exception as exception: parts = exception.args[2].split(':') if parts[1].strip() != 'GDBus.Error': raise diff --git a/plinth/modules/storage/views.py b/plinth/modules/storage/views.py index 749c27e8d..851929cb7 100644 --- a/plinth/modules/storage/views.py +++ b/plinth/modules/storage/views.py @@ -1,9 +1,6 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -""" -Views for storage module. -""" +"""Views for storage module.""" -import json import logging import urllib.parse @@ -14,17 +11,17 @@ from django.urls import reverse from django.utils.translation import gettext as _ from django.views.decorators.http import require_POST -from plinth import actions, views -from plinth.errors import ActionError +from plinth import views from plinth.modules import storage -from . import get_error_message +from . import get_error_message, privileged logger = logging.getLogger(__name__) class StorageAppView(views.AppView): """Show storage information.""" + app_id = 'storage' template_name = 'storage.html' @@ -63,7 +60,7 @@ def expand(request): def expand_partition(request, device): """Expand the partition.""" try: - storage.expand_partition(device) + privileged.expand_partition(device) except Exception as exception: messages.error( request, @@ -83,8 +80,7 @@ def eject(request, device_path): device_path = urllib.parse.unquote(device_path) try: - drive = json.loads( - actions.superuser_run('storage', ['eject', device_path])) + drive = privileged.eject(device_path) if drive: messages.success( request, @@ -93,8 +89,8 @@ def eject(request, device_path): drive_model=drive['model'])) else: messages.success(request, _('Device can be safely unplugged.')) - except ActionError as exception: - message = get_error_message(exception.args[2]) + except Exception as exception: + message = get_error_message(exception.args[-2].decode()) # stdout logger.error('Error ejecting device - %s', message) messages.error(