mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-05-27 10:44:33 +00:00
storage: Use privileged decorator for actions
Tests: - SKIPPED: Functional tests work - DONE: Initial setup works - DONE: Root partition is expanded when space is available - DONE: When there is free space for root partition it shows up in the interface - DONE: Expand partition from user interface works - DONE: Getting storage usage information works - DONE: Disks and free space shown in app page - DONE: Showing share mounts in samba works - DONE: Backups add repository form shows disk choices - DONE: Samba shows proper list of mounted shares and unavailable shares - DONE: Directory validator works - DONE: In deluge and transmission - DONE: Auto-mounting a device works - DONE: Ejecting a mounted disk from UI works - DONE: Error are graciously handled Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
parent
49e4ebf8f9
commit
e3d0be2885
@ -1,25 +1,21 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""
|
||||
FreedomBox app to manage storage.
|
||||
"""
|
||||
"""FreedomBox app to manage storage."""
|
||||
|
||||
import base64
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
import psutil
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.utils.translation import gettext_noop
|
||||
|
||||
from plinth import actions
|
||||
from plinth import app as app_module
|
||||
from plinth import cfg, glib, menu
|
||||
from plinth.errors import ActionError, PlinthError
|
||||
from plinth.errors import PlinthError
|
||||
from plinth.modules.backups.components import BackupRestore
|
||||
from plinth.package import Packages
|
||||
from plinth.utils import format_lazy
|
||||
|
||||
from . import manifest, udisks2
|
||||
from . import manifest, privileged, udisks2
|
||||
|
||||
_description = [
|
||||
format_lazy(
|
||||
@ -77,19 +73,19 @@ class StorageApp(app_module.App):
|
||||
def setup(self, old_version):
|
||||
"""Install and configure the app."""
|
||||
super().setup(old_version)
|
||||
actions.superuser_run('storage', ['setup'])
|
||||
privileged.setup()
|
||||
self.enable()
|
||||
disks = get_disks()
|
||||
root_device = get_root_device(disks)
|
||||
if is_expandable(root_device):
|
||||
try:
|
||||
expand_partition(root_device)
|
||||
except ActionError:
|
||||
privileged.expand_partition(root_device)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def get_disks():
|
||||
"""Returns list of disks and their free space.
|
||||
"""Return list of disks and their free space.
|
||||
|
||||
The primary source of information is UDisks' list of block devices.
|
||||
Information from df is used for free space available.
|
||||
@ -136,8 +132,8 @@ def get_mounts():
|
||||
def _get_disks_from_df():
|
||||
"""Return the list of disks and free space available using 'df'."""
|
||||
try:
|
||||
output = actions.superuser_run('storage', ['usage-info'])
|
||||
except subprocess.CalledProcessError as exception:
|
||||
output = privileged.usage_info()
|
||||
except Exception as exception:
|
||||
logger.exception('Error getting disk information: %s', exception)
|
||||
return []
|
||||
|
||||
@ -162,7 +158,7 @@ def _get_disks_from_df():
|
||||
|
||||
|
||||
def get_filesystem_type(mount_point='/'):
|
||||
"""Returns the type of the filesystem mounted at mountpoint."""
|
||||
"""Return the type of the filesystem mounted at mountpoint."""
|
||||
for partition in psutil.disk_partitions():
|
||||
if partition.mountpoint == mount_point:
|
||||
return partition.fstype
|
||||
@ -204,19 +200,10 @@ def is_expandable(device):
|
||||
return False
|
||||
|
||||
try:
|
||||
output = actions.superuser_run('storage',
|
||||
['is-partition-expandable', device],
|
||||
log_error=False)
|
||||
except actions.ActionError:
|
||||
return privileged.is_partition_expandable(device, _log_error=False)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
return int(output.strip())
|
||||
|
||||
|
||||
def expand_partition(device):
|
||||
"""Expand a partition."""
|
||||
actions.superuser_run('storage', ['expand-partition', device])
|
||||
|
||||
|
||||
def format_bytes(size):
|
||||
"""Return human readable disk size from bytes."""
|
||||
|
||||
@ -1,7 +1,5 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""
|
||||
Forms for directory selection.
|
||||
"""
|
||||
"""Forms for directory selection."""
|
||||
|
||||
import os
|
||||
|
||||
@ -9,11 +7,12 @@ from django import forms
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from plinth import actions
|
||||
from plinth import app as app_module
|
||||
from plinth.modules import storage
|
||||
from plinth.modules.samba import privileged as samba_privileged
|
||||
|
||||
from . import privileged
|
||||
|
||||
|
||||
def get_available_samba_shares():
|
||||
"""Get available samba shares."""
|
||||
@ -40,6 +39,8 @@ def _is_app_enabled(app_id):
|
||||
|
||||
|
||||
class DirectoryValidator:
|
||||
"""Validation helper to check a directory."""
|
||||
|
||||
username = None
|
||||
check_writable = False
|
||||
check_creatable = False
|
||||
@ -48,6 +49,7 @@ class DirectoryValidator:
|
||||
|
||||
def __init__(self, username=None, check_writable=None,
|
||||
check_creatable=None):
|
||||
"""Initialize the validator."""
|
||||
if username is not None:
|
||||
self.username = username
|
||||
if check_writable is not None:
|
||||
@ -60,29 +62,22 @@ class DirectoryValidator:
|
||||
if not value.startswith('/'):
|
||||
raise ValidationError(_('Invalid directory name.'), 'invalid')
|
||||
|
||||
command = ['validate-directory', '--path', value]
|
||||
if self.check_creatable:
|
||||
command.append('--check-creatable')
|
||||
elif self.check_writable:
|
||||
command.append('--check-writable')
|
||||
try:
|
||||
if not self.username:
|
||||
raise ValueError('Invalid username for directory validator')
|
||||
|
||||
if self.username:
|
||||
output = actions.run_as_user('storage', command,
|
||||
become_user=self.username)
|
||||
else:
|
||||
output = actions.run('storage', command)
|
||||
|
||||
if 'ValidationError' in output:
|
||||
error_nr = int(output.strip().split()[1])
|
||||
if error_nr == 1:
|
||||
raise ValidationError(_('Directory does not exist.'),
|
||||
'invalid')
|
||||
elif error_nr == 2:
|
||||
raise ValidationError(_('Path is not a directory.'), 'invalid')
|
||||
elif error_nr == 3:
|
||||
privileged.validate_directory(value, self.check_creatable,
|
||||
self.check_writable,
|
||||
_run_as_user=self.username)
|
||||
except FileNotFoundError:
|
||||
raise ValidationError(_('Directory does not exist.'), 'invalid')
|
||||
except NotADirectoryError:
|
||||
raise ValidationError(_('Path is not a directory.'), 'invalid')
|
||||
except PermissionError as exception:
|
||||
if exception.args[0] == 'read':
|
||||
raise ValidationError(
|
||||
_('Directory is not readable by the user.'), 'invalid')
|
||||
elif error_nr == 4:
|
||||
else:
|
||||
raise ValidationError(
|
||||
_('Directory is not writable by the user.'), 'invalid')
|
||||
|
||||
|
||||
191
actions/storage → plinth/modules/storage/privileged.py
Executable file → Normal file
191
actions/storage → plinth/modules/storage/privileged.py
Executable file → Normal file
@ -1,85 +1,31 @@
|
||||
#!/usr/bin/python3
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""
|
||||
Configuration helper for disks manager.
|
||||
"""
|
||||
"""Configure disks manager."""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from plinth import utils
|
||||
from plinth.actions import privileged
|
||||
|
||||
|
||||
def parse_arguments():
|
||||
"""Return parsed command line arguments as dictionary."""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
|
||||
|
||||
subparsers.add_parser('setup', help='Configure storage after install')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'is-partition-expandable',
|
||||
help='Return whether a given partition can be expanded')
|
||||
subparser.add_argument(
|
||||
'device', help='Partition for which check needs to be performed')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'expand-partition',
|
||||
help='Expand a partition to take adjacent free space')
|
||||
subparser.add_argument('device',
|
||||
help='Partition which needs to be resized')
|
||||
subparser.add_argument(
|
||||
'--mount-point', default='/',
|
||||
help=('Mount point which the device is mounted. '
|
||||
'Needed for btrfs filesystems'))
|
||||
|
||||
subparser = subparsers.add_parser('mount', help='Mount a filesystem')
|
||||
subparser.add_argument('--block-device',
|
||||
help='Block device of the filesystem to mount')
|
||||
|
||||
subparser = subparsers.add_parser('eject', help='Eject a storage device')
|
||||
subparser.add_argument('device', help='Path of the device to eject')
|
||||
|
||||
subparsers.add_parser('usage-info',
|
||||
help='Get information about disk space usage')
|
||||
|
||||
subparser = subparsers.add_parser('validate-directory',
|
||||
help='Validate a directory')
|
||||
subparser.add_argument('--path', help='Path of the directory',
|
||||
required=True)
|
||||
subparser.add_argument('--check-creatable', required=False, default=False,
|
||||
action='store_true',
|
||||
help='Check that the directory is creatable')
|
||||
subparser.add_argument('--check-writable', required=False, default=False,
|
||||
action='store_true',
|
||||
help='Check that the directory is writable')
|
||||
|
||||
subparsers.required = True
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def subcommand_is_partition_expandable(arguments):
|
||||
@privileged
|
||||
def is_partition_expandable(device: str) -> int:
|
||||
"""Return a list of partitions that can be expanded."""
|
||||
_, _, free_space = _get_free_space(arguments.device)
|
||||
print(free_space['size'])
|
||||
_, _, free_space = _get_free_space(device)
|
||||
return int(free_space['size'])
|
||||
|
||||
|
||||
def subcommand_expand_partition(arguments):
|
||||
@privileged
|
||||
def expand_partition(device: str, mount_point: str = '/'):
|
||||
"""Expand a partition to take adjacent free space."""
|
||||
device = arguments.device
|
||||
mount_point = arguments.mount_point
|
||||
device, requested_partition, free_space = _get_free_space(device)
|
||||
|
||||
if requested_partition['table_type'] == 'msdos' and \
|
||||
int(requested_partition['number']) >= 5:
|
||||
print('Expanding logical partitions currently unsupported',
|
||||
file=sys.stderr)
|
||||
sys.exit(4)
|
||||
raise RuntimeError(
|
||||
'Expanding logical partitions currently unsupported')
|
||||
|
||||
if requested_partition['table_type'] == 'gpt':
|
||||
_move_gpt_second_header(device)
|
||||
@ -102,8 +48,7 @@ def _move_gpt_second_header(device):
|
||||
try:
|
||||
subprocess.run(command, check=True)
|
||||
except subprocess.CalledProcessError:
|
||||
print('Error moving GPT second header to the end')
|
||||
sys.exit(6)
|
||||
raise RuntimeError('Error moving GPT second header to the end')
|
||||
|
||||
|
||||
def _resize_partition(device, requested_partition, free_space):
|
||||
@ -127,8 +72,7 @@ def _resize_partition(device, requested_partition, free_space):
|
||||
subprocess.run(fallback_command, check=True,
|
||||
input=input_text.encode())
|
||||
except subprocess.CalledProcessError as exception:
|
||||
print('Error expanding partition:', exception, file=sys.stderr)
|
||||
sys.exit(5)
|
||||
raise RuntimeError(f'Error expanding partition: {exception}')
|
||||
|
||||
|
||||
def _resize_file_system(device, requested_partition, free_space,
|
||||
@ -149,8 +93,7 @@ def _resize_ext4(device, requested_partition, _free_space, _mount_point):
|
||||
subprocess.run(command, stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL, check=True)
|
||||
except subprocess.CalledProcessError as exception:
|
||||
print('Error expanding filesystem:', exception, file=sys.stderr)
|
||||
sys.exit(6)
|
||||
raise RuntimeError(f'Error expanding filesystem: {exception}')
|
||||
|
||||
|
||||
def _resize_btrfs(_device, _requested_partition, _free_space, mount_point='/'):
|
||||
@ -159,8 +102,7 @@ def _resize_btrfs(_device, _requested_partition, _free_space, mount_point='/'):
|
||||
command = ['btrfs', 'filesystem', 'resize', 'max', mount_point]
|
||||
subprocess.run(command, stdout=subprocess.DEVNULL, check=True)
|
||||
except subprocess.CalledProcessError as exception:
|
||||
print('Error expanding filesystem:', exception, file=sys.stderr)
|
||||
sys.exit(6)
|
||||
raise RuntimeError(f'Error expanding filesystem: {exception}')
|
||||
|
||||
|
||||
def _get_free_space(device):
|
||||
@ -172,21 +114,18 @@ def _get_free_space(device):
|
||||
requested_partition, free_spaces = \
|
||||
_get_partitions_and_free_spaces(device, partition_number)
|
||||
except Exception as exception:
|
||||
print('Error getting partition details: ', exception, file=sys.stderr)
|
||||
sys.exit(2)
|
||||
raise RuntimeError(f'Error getting partition details: {exception}')
|
||||
|
||||
# Don't accept extended partitions for now
|
||||
if requested_partition['table_type'] == 'msdos' and \
|
||||
int(requested_partition['number']) >= 5:
|
||||
print('Expanding logical partitions currently unsupported',
|
||||
file=sys.stderr)
|
||||
sys.exit(3)
|
||||
raise RuntimeError(
|
||||
'Expanding logical partitions currently unsupported')
|
||||
|
||||
# Don't accept anything but btrfs and ext4 filesystems
|
||||
if requested_partition['type'] not in ('btrfs', 'ext4'):
|
||||
print('Unsupported file system type: ', requested_partition['type'],
|
||||
file=sys.stderr)
|
||||
sys.exit(4)
|
||||
raise RuntimeError(
|
||||
f'Unsupported file system type: {requested_partition["type"]}')
|
||||
|
||||
found_free_space = None
|
||||
for free_space in free_spaces:
|
||||
@ -199,7 +138,7 @@ def _get_free_space(device):
|
||||
found_free_space = free_space
|
||||
|
||||
if not found_free_space:
|
||||
sys.exit(5)
|
||||
raise RuntimeError('No free space available')
|
||||
|
||||
return device, requested_partition, found_free_space
|
||||
|
||||
@ -218,8 +157,7 @@ def _get_root_device_and_partition_number(device):
|
||||
if not match:
|
||||
match = re.match(r'(.+[a-zA-Z])(\d+)$', device)
|
||||
if not match:
|
||||
print('Invalid device, must be a partition', file=sys.stderr)
|
||||
sys.exit(1)
|
||||
raise ValueError('Invalid device, must be a partition')
|
||||
|
||||
return match.group(1), match.group(2)
|
||||
|
||||
@ -263,7 +201,8 @@ def _interpret_unit(value):
|
||||
return int(value)
|
||||
|
||||
|
||||
def subcommand_mount(arguments):
|
||||
@privileged
|
||||
def mount(block_device: str):
|
||||
"""Mount a disk are root user.
|
||||
|
||||
XXX: This is primarily to provide compatibility with older code that used
|
||||
@ -276,22 +215,16 @@ def subcommand_mount(arguments):
|
||||
UDISKS_FILESYSTEM_SHARED=1 by writing a udev rule.
|
||||
|
||||
"""
|
||||
process = subprocess.run([
|
||||
'udisksctl', 'mount', '--block-device', arguments.block_device,
|
||||
subprocess.run([
|
||||
'udisksctl', 'mount', '--block-device', block_device,
|
||||
'--no-user-interaction'
|
||||
], check=False)
|
||||
sys.exit(process.returncode)
|
||||
], check=True)
|
||||
|
||||
|
||||
def subcommand_eject(arguments):
|
||||
@privileged
|
||||
def eject(device_path: str) -> str:
|
||||
"""Eject a device by its path."""
|
||||
device_path = arguments.device
|
||||
try:
|
||||
drive = eject_drive_of_device(device_path)
|
||||
print(json.dumps(drive))
|
||||
except Exception as exception:
|
||||
print(exception, file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return _eject_drive_of_device(device_path)
|
||||
|
||||
|
||||
def _get_options():
|
||||
@ -302,7 +235,7 @@ def _get_options():
|
||||
return options
|
||||
|
||||
|
||||
def eject_drive_of_device(device_path):
|
||||
def _eject_drive_of_device(device_path):
|
||||
"""Eject a device after unmounting all of its partitions.
|
||||
|
||||
Return the details (model, vendor) of drives ejected.
|
||||
@ -327,10 +260,10 @@ def eject_drive_of_device(device_path):
|
||||
block_device = obj.get_block()
|
||||
drive_object_path = block_device.props.drive
|
||||
if drive_object_path != '/':
|
||||
umount_all_filesystems_of_drive(drive_object_path)
|
||||
_umount_all_filesystems_of_drive(drive_object_path)
|
||||
else:
|
||||
# Block device has not associated drive
|
||||
umount_filesystem(obj.get_filesystem())
|
||||
_umount_filesystem(obj.get_filesystem())
|
||||
|
||||
# Eject the drive
|
||||
drive = client.get_drive_for_block(block_device)
|
||||
@ -350,13 +283,13 @@ def eject_drive_of_device(device_path):
|
||||
return None
|
||||
|
||||
|
||||
def umount_filesystem(filesystem):
|
||||
"""Unmount a filesystem """
|
||||
def _umount_filesystem(filesystem):
|
||||
"""Unmount a filesystem."""
|
||||
if filesystem and filesystem.props.mount_points:
|
||||
filesystem.call_unmount_sync(_get_options())
|
||||
|
||||
|
||||
def umount_all_filesystems_of_drive(drive_object_path):
|
||||
def _umount_all_filesystems_of_drive(drive_object_path):
|
||||
"""Unmount all filesystems on block devices of a drive."""
|
||||
udisks = utils.import_from_gi('UDisks', '2.0')
|
||||
client = udisks.Client.new_sync()
|
||||
@ -367,10 +300,11 @@ def umount_all_filesystems_of_drive(drive_object_path):
|
||||
if not block_device or block_device.props.drive != drive_object_path:
|
||||
continue
|
||||
|
||||
umount_filesystem(obj.get_filesystem())
|
||||
_umount_filesystem(obj.get_filesystem())
|
||||
|
||||
|
||||
def subcommand_setup(_):
|
||||
@privileged
|
||||
def setup():
|
||||
"""Configure storage."""
|
||||
# create udisks2 default mount directory
|
||||
mounts_directory = '/media/root'
|
||||
@ -384,58 +318,43 @@ def subcommand_setup(_):
|
||||
os.chmod(mounts_directory, stats.st_mode | stat.S_IROTH | stat.S_IXOTH)
|
||||
|
||||
|
||||
def subcommand_usage_info(_):
|
||||
@privileged
|
||||
def usage_info() -> str:
|
||||
"""Get information about disk space usage."""
|
||||
command = [
|
||||
'df', '--exclude-type=tmpfs', '--exclude-type=devtmpfs',
|
||||
'--block-size=1', '--output=source,fstype,size,used,avail,pcent,target'
|
||||
]
|
||||
subprocess.run(command, check=True)
|
||||
return subprocess.check_output(command).decode()
|
||||
|
||||
|
||||
def subcommand_validate_directory(arguments):
|
||||
"""Validate a directory"""
|
||||
@privileged
|
||||
def validate_directory(directory: str, check_creatable: bool,
|
||||
check_writable: bool):
|
||||
"""Validate a directory."""
|
||||
if os.geteuid() == 0:
|
||||
raise RuntimeError('You must not be root to run this command')
|
||||
|
||||
directory = arguments.path
|
||||
|
||||
def part_exists(path):
|
||||
"""Returns part of the path that exists."""
|
||||
"""Return part of the path that exists."""
|
||||
if not path or os.path.exists(path):
|
||||
return path
|
||||
return part_exists(os.path.dirname(path))
|
||||
|
||||
if arguments.check_creatable:
|
||||
if check_creatable:
|
||||
directory = part_exists(directory)
|
||||
if not directory:
|
||||
directory = '.'
|
||||
else:
|
||||
if not os.path.exists(directory):
|
||||
# doesn't exist
|
||||
print('ValidationError: 1')
|
||||
return
|
||||
raise FileNotFoundError
|
||||
|
||||
if not os.path.isdir(directory):
|
||||
# is not a directory
|
||||
print('ValidationError: 2')
|
||||
elif not os.access(directory, os.R_OK):
|
||||
# is not readable
|
||||
print('ValidationError: 3')
|
||||
elif arguments.check_writable or arguments.check_creatable:
|
||||
raise NotADirectoryError
|
||||
|
||||
if not os.access(directory, os.R_OK):
|
||||
raise PermissionError('read')
|
||||
|
||||
if check_writable or check_creatable:
|
||||
if not os.access(directory, os.W_OK):
|
||||
# is not writable
|
||||
print('ValidationError: 4')
|
||||
|
||||
|
||||
def main():
|
||||
"""Parse arguments and perform all duties."""
|
||||
arguments = parse_arguments()
|
||||
|
||||
subcommand = arguments.subcommand.replace('-', '_')
|
||||
subcommand_method = globals()['subcommand_' + subcommand]
|
||||
subcommand_method(arguments)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
raise PermissionError('write')
|
||||
@ -4,14 +4,17 @@ Test module for storage module operations.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
from plinth.modules.storage import privileged
|
||||
|
||||
pytestmark = pytest.mark.usefixtures('mock_privileged')
|
||||
privileged_modules_to_mock = ['plinth.modules.storage.privileged']
|
||||
|
||||
|
||||
class Disk():
|
||||
"""Context manager to create/destroy a disk."""
|
||||
@ -255,8 +258,11 @@ def test_ext4_expansion(partition_table_type):
|
||||
def _assert_free_space(disk, partition_number, space=True):
|
||||
"""Verify that free is available/not available after a partition."""
|
||||
device = disk.get_partition_device(partition_number)
|
||||
result = _check_action(['storage', 'is-partition-expandable', device])
|
||||
assert result == space
|
||||
if space:
|
||||
privileged.is_partition_expandable(device)
|
||||
else:
|
||||
with pytest.raises(RuntimeError):
|
||||
privileged.is_partition_expandable(device)
|
||||
|
||||
|
||||
def _expand_partition(disk, partition_number, success=True):
|
||||
@ -264,34 +270,15 @@ def _expand_partition(disk, partition_number, success=True):
|
||||
_assert_aligned(disk, partition_number)
|
||||
with disk.mount_partition(partition_number) as mount_point:
|
||||
device = disk.get_partition_device(partition_number)
|
||||
result = _check_action([
|
||||
'storage', 'expand-partition', device, '--mount-point', mount_point
|
||||
])
|
||||
if success:
|
||||
privileged.expand_partition(device, mount_point)
|
||||
else:
|
||||
with pytest.raises(RuntimeError):
|
||||
privileged.expand_partition(device, mount_point)
|
||||
|
||||
assert result == success
|
||||
_assert_aligned(disk, partition_number)
|
||||
|
||||
|
||||
def _call_action(action_command, check=True, **kwargs):
|
||||
"""Call the action script."""
|
||||
test_directory = pathlib.Path(__file__).parent
|
||||
top_directory = (test_directory / '..' / '..' / '..' / '..').resolve()
|
||||
action_command[0] = top_directory / 'actions' / action_command[0]
|
||||
kwargs['stdout'] = kwargs.get('stdout', subprocess.DEVNULL)
|
||||
kwargs['stderr'] = kwargs.get('stderr', subprocess.DEVNULL)
|
||||
env = dict(os.environ, PYTHONPATH=str(top_directory))
|
||||
return subprocess.run(action_command, env=env, check=check, **kwargs)
|
||||
|
||||
|
||||
def _check_action(action_command):
|
||||
"""Return success/failure result of the action command."""
|
||||
try:
|
||||
_call_action(action_command)
|
||||
return True
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
|
||||
|
||||
def _assert_aligned(disk, partition_number):
|
||||
"""Test that partition is optimally aligned."""
|
||||
subprocess.run([
|
||||
@ -319,43 +306,40 @@ def _assert_ext4_file_system_healthy(disk, partition_number):
|
||||
def _assert_validate_directory(path, error, check_writable=False,
|
||||
check_creatable=False):
|
||||
"""Perform directory validation checks."""
|
||||
action_command = ['storage', 'validate-directory', '--path', path]
|
||||
if check_writable:
|
||||
action_command += ['--check-writable']
|
||||
if check_creatable:
|
||||
action_command += ['--check-creatable']
|
||||
proc = _call_action(action_command, stderr=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE)
|
||||
output = proc.stdout.decode()
|
||||
if 'ValidationError' in output:
|
||||
error_nr = output.strip().split()[1]
|
||||
assert error_nr == error
|
||||
if error:
|
||||
match = None if not error.args else error.args[0]
|
||||
with pytest.raises(error.__class__, match=match):
|
||||
privileged.validate_directory(path, check_creatable,
|
||||
check_writable)
|
||||
else:
|
||||
assert output == error
|
||||
privileged.validate_directory(path, check_creatable, check_writable)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('needs_not_root')
|
||||
@pytest.mark.parametrize('path,error', [('/missing', '1'),
|
||||
('/etc/os-release', '2'),
|
||||
('/root', '3'), ('/', ''),
|
||||
('/etc/..', '')])
|
||||
@pytest.mark.parametrize('path,error',
|
||||
[('/missing', FileNotFoundError()),
|
||||
('/etc/os-release', NotADirectoryError()),
|
||||
('/root', PermissionError('read')), ('/', None),
|
||||
('/etc/..', None)])
|
||||
def test_validate_directory(path, error):
|
||||
"""Test that directory validation returns expected output."""
|
||||
_assert_validate_directory(path, error)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('needs_not_root')
|
||||
@pytest.mark.parametrize('path,error', [('/', '4'), ('/tmp', '')])
|
||||
@pytest.mark.parametrize('path,error', [('/', PermissionError('write')),
|
||||
('/tmp', None)])
|
||||
def test_validate_directory_writable(path, error):
|
||||
"""Test that directory writable validation returns expected output."""
|
||||
_assert_validate_directory(path, error, check_writable=True)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('needs_not_root')
|
||||
@pytest.mark.parametrize('path,error',
|
||||
[('/var/lib/plinth_storage_test_not_exists', '4'),
|
||||
('/tmp/plint_storage_test_not_exists', ''),
|
||||
('/var/../tmp/plint_storage_test_not_exists', '')])
|
||||
@pytest.mark.parametrize(
|
||||
'path,error',
|
||||
[('/var/lib/plinth_storage_test_not_exists', PermissionError('write')),
|
||||
('/tmp/plint_storage_test_not_exists', None),
|
||||
('/var/../tmp/plint_storage_test_not_exists', None)])
|
||||
def test_validate_directory_creatable(path, error):
|
||||
"""Test that directory creatable validation returns expected output."""
|
||||
_assert_validate_directory(path, error, check_creatable=True)
|
||||
|
||||
@ -1,15 +1,14 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""
|
||||
Handle disk operations using UDisk2 DBus API.
|
||||
"""
|
||||
"""Handle disk operations using UDisk2 DBus API."""
|
||||
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from plinth import actions, cfg
|
||||
from plinth.errors import ActionError
|
||||
from plinth import cfg
|
||||
from plinth.utils import import_from_gi
|
||||
|
||||
from . import privileged
|
||||
|
||||
glib = import_from_gi('GLib', '2.0')
|
||||
gio = import_from_gi('Gio', '2.0')
|
||||
|
||||
@ -195,11 +194,8 @@ def _mount(object_path):
|
||||
logger.info('Auto-mounting device: %s %s', block_device.id,
|
||||
block_device.preferred_device)
|
||||
try:
|
||||
actions.superuser_run(
|
||||
'storage',
|
||||
['mount', '--block-device', block_device.preferred_device],
|
||||
log_error=False)
|
||||
except ActionError as exception:
|
||||
privileged.mount(block_device.preferred_device, _log_error=False)
|
||||
except Exception as exception:
|
||||
parts = exception.args[2].split(':')
|
||||
if parts[1].strip() != 'GDBus.Error':
|
||||
raise
|
||||
|
||||
@ -1,9 +1,6 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""
|
||||
Views for storage module.
|
||||
"""
|
||||
"""Views for storage module."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import urllib.parse
|
||||
|
||||
@ -14,17 +11,17 @@ from django.urls import reverse
|
||||
from django.utils.translation import gettext as _
|
||||
from django.views.decorators.http import require_POST
|
||||
|
||||
from plinth import actions, views
|
||||
from plinth.errors import ActionError
|
||||
from plinth import views
|
||||
from plinth.modules import storage
|
||||
|
||||
from . import get_error_message
|
||||
from . import get_error_message, privileged
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StorageAppView(views.AppView):
|
||||
"""Show storage information."""
|
||||
|
||||
app_id = 'storage'
|
||||
template_name = 'storage.html'
|
||||
|
||||
@ -63,7 +60,7 @@ def expand(request):
|
||||
def expand_partition(request, device):
|
||||
"""Expand the partition."""
|
||||
try:
|
||||
storage.expand_partition(device)
|
||||
privileged.expand_partition(device)
|
||||
except Exception as exception:
|
||||
messages.error(
|
||||
request,
|
||||
@ -83,8 +80,7 @@ def eject(request, device_path):
|
||||
device_path = urllib.parse.unquote(device_path)
|
||||
|
||||
try:
|
||||
drive = json.loads(
|
||||
actions.superuser_run('storage', ['eject', device_path]))
|
||||
drive = privileged.eject(device_path)
|
||||
if drive:
|
||||
messages.success(
|
||||
request,
|
||||
@ -93,8 +89,8 @@ def eject(request, device_path):
|
||||
drive_model=drive['model']))
|
||||
else:
|
||||
messages.success(request, _('Device can be safely unplugged.'))
|
||||
except ActionError as exception:
|
||||
message = get_error_message(exception.args[2])
|
||||
except Exception as exception:
|
||||
message = get_error_message(exception.args[-2].decode()) # stdout
|
||||
|
||||
logger.error('Error ejecting device - %s', message)
|
||||
messages.error(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user