mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-05-13 10:30:16 +00:00
actions: Add a decorator for marking superuser actions
Any privileged action (a method) can be marked as such with the new decorator. A call to the method will be serialized into a sudo call (or later into a D-Bus call). The method arguments are turned to JSON and method is called as superuser. Arguments are de-serialized and are verified for type before the actual call as superuser. Return values are serialized and returned where they are de-serialized. Exceptions are also serialized and de-serialized. The method must have be strictly typed and should not have keyword-only arguments. Currently supported types are int, float, str, dict/Dict, list/List and Optional. Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
parent
3ec086411b
commit
4fed6921d6
196
actions/actions
Executable file
196
actions/actions
Executable file
@ -0,0 +1,196 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import importlib
|
||||||
|
import inspect
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import typing
|
||||||
|
|
||||||
|
import plinth.log
|
||||||
|
from plinth import cfg, module_loader
|
||||||
|
|
||||||
|
EXIT_SYNTAX = 10
|
||||||
|
EXIT_PERM = 20
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Parse arguments."""
|
||||||
|
plinth.log.action_init()
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument('module', help='Module to trigger action in')
|
||||||
|
parser.add_argument('action', help='Action to trigger in module')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
arguments = json.loads(sys.stdin.read())
|
||||||
|
except json.JSONDecodeError as exception:
|
||||||
|
raise SyntaxError('Arguments on stdin not JSON.') from exception
|
||||||
|
|
||||||
|
return_value = _call(args.module, args.action, arguments)
|
||||||
|
print(json.dumps(return_value))
|
||||||
|
except PermissionError as exception:
|
||||||
|
logger.error(exception.args[0])
|
||||||
|
sys.exit(EXIT_PERM)
|
||||||
|
except SyntaxError as exception:
|
||||||
|
logger.error(exception.args[0])
|
||||||
|
sys.exit(EXIT_SYNTAX)
|
||||||
|
except TypeError as exception:
|
||||||
|
logger.error(exception.args[0])
|
||||||
|
sys.exit(EXIT_SYNTAX)
|
||||||
|
except Exception as exception:
|
||||||
|
logger.exception(exception)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def _call(module_name, action_name, arguments):
|
||||||
|
"""Import the module and run action as superuser"""
|
||||||
|
if os.getuid() != 0:
|
||||||
|
raise PermissionError('This action is reserved for root')
|
||||||
|
|
||||||
|
if '.' in module_name:
|
||||||
|
raise SyntaxError('Invalid module name')
|
||||||
|
|
||||||
|
cfg.read()
|
||||||
|
import_path = module_loader.get_module_import_path(module_name)
|
||||||
|
try:
|
||||||
|
module = importlib.import_module(import_path + '.privileged')
|
||||||
|
except ModuleNotFoundError as exception:
|
||||||
|
raise SyntaxError('Specified module not found') from exception
|
||||||
|
|
||||||
|
try:
|
||||||
|
action = getattr(module, action_name)
|
||||||
|
except AttributeError as exception:
|
||||||
|
raise SyntaxError('Specified action not found') from exception
|
||||||
|
|
||||||
|
if not getattr(action, '_privileged', None):
|
||||||
|
raise SyntaxError('Specified action is not privileged action')
|
||||||
|
|
||||||
|
func = getattr(action, '__wrapped__')
|
||||||
|
|
||||||
|
_assert_valid_arguments(func, arguments)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return_values = func(*arguments['args'], **arguments['kwargs'])
|
||||||
|
return_value = {'result': 'success', 'return': return_values}
|
||||||
|
except Exception as exception:
|
||||||
|
logger.exception('Error executing action: %s', exception)
|
||||||
|
return_value = {
|
||||||
|
'result': 'exception',
|
||||||
|
'exception': {
|
||||||
|
'module': type(exception).__module__,
|
||||||
|
'name': type(exception).__name__,
|
||||||
|
'args': exception.args
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return return_value
|
||||||
|
|
||||||
|
|
||||||
|
def _assert_valid_arguments(func, arguments):
|
||||||
|
"""Check the names, types and completeness of the arguments passed."""
|
||||||
|
# Check if arguments match types
|
||||||
|
if not isinstance(arguments, dict):
|
||||||
|
raise SyntaxError('Invalid arguments format')
|
||||||
|
|
||||||
|
if 'args' not in arguments or 'kwargs' not in arguments:
|
||||||
|
raise SyntaxError('Invalid arguments format')
|
||||||
|
|
||||||
|
args = arguments['args']
|
||||||
|
kwargs = arguments['kwargs']
|
||||||
|
if not isinstance(args, list) or not isinstance(kwargs, dict):
|
||||||
|
raise SyntaxError('Invalid arguments format')
|
||||||
|
|
||||||
|
argspec = inspect.getfullargspec(func)
|
||||||
|
if len(args) + len(kwargs) > len(argspec.args):
|
||||||
|
raise SyntaxError('Too many arguments')
|
||||||
|
|
||||||
|
no_defaults = len(argspec.args)
|
||||||
|
if argspec.defaults:
|
||||||
|
no_defaults -= len(argspec.defaults)
|
||||||
|
|
||||||
|
for key in argspec.args[len(args):no_defaults]:
|
||||||
|
if key not in kwargs:
|
||||||
|
raise SyntaxError(f'Argument not provided: {key}')
|
||||||
|
|
||||||
|
for key, value in kwargs.items():
|
||||||
|
if key not in argspec.args:
|
||||||
|
raise SyntaxError(f'Unknown argument: {key}')
|
||||||
|
|
||||||
|
if argspec.args.index(key) < len(args):
|
||||||
|
raise SyntaxError(f'Duplicate argument: {key}')
|
||||||
|
|
||||||
|
_assert_valid_type(f'arg {key}', value, argspec.annotations[key])
|
||||||
|
|
||||||
|
for index, arg in enumerate(args):
|
||||||
|
annotation = argspec.annotations[argspec.args[index]]
|
||||||
|
_assert_valid_type(f'arg #{index}', arg, annotation)
|
||||||
|
|
||||||
|
|
||||||
|
def _assert_valid_type(arg_name, value, annotation):
|
||||||
|
"""Assert that the type of argument value matches the annotation."""
|
||||||
|
if annotation == typing.Any:
|
||||||
|
return
|
||||||
|
|
||||||
|
NoneType = type(None)
|
||||||
|
if annotation == NoneType:
|
||||||
|
if value is not None:
|
||||||
|
raise TypeError('Expected None for {arg_name}')
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
basic_types = {bool, int, str, float}
|
||||||
|
if annotation in basic_types:
|
||||||
|
if not isinstance(value, annotation):
|
||||||
|
raise TypeError(
|
||||||
|
f'Expected type {annotation.__name__} for {arg_name}')
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
if not hasattr(annotation, '__origin__'):
|
||||||
|
raise TypeError('Unsupported annotation type')
|
||||||
|
|
||||||
|
if annotation.__origin__ == typing.Union:
|
||||||
|
for arg in annotation.__args__:
|
||||||
|
try:
|
||||||
|
_assert_valid_type(arg_name, value, arg)
|
||||||
|
return
|
||||||
|
except TypeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
raise TypeError(f'Expected one of unioned types for {arg_name}')
|
||||||
|
|
||||||
|
if annotation.__origin__ == list:
|
||||||
|
if not isinstance(value, list):
|
||||||
|
raise TypeError(f'Expected type list for {arg_name}')
|
||||||
|
|
||||||
|
for index, inner_item in enumerate(value):
|
||||||
|
_assert_valid_type(f'{arg_name}[{index}]', inner_item,
|
||||||
|
annotation.__args__[0])
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
if annotation.__origin__ == dict:
|
||||||
|
if not isinstance(value, dict):
|
||||||
|
raise TypeError(f'Expected type dict for {arg_name}')
|
||||||
|
|
||||||
|
for inner_key, inner_value in value.items():
|
||||||
|
_assert_valid_type(f'{arg_name}[{inner_key}]', inner_key,
|
||||||
|
annotation.__args__[0])
|
||||||
|
_assert_valid_type(f'{arg_name}[{inner_value}]', inner_value,
|
||||||
|
annotation.__args__[1])
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
raise TypeError('Unsupported annotation type')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@ -75,11 +75,16 @@ Actions run commands with this contract (version 1.1):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import functools
|
||||||
|
import importlib
|
||||||
|
import inspect
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import shlex
|
import shlex
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import sys
|
||||||
|
|
||||||
from plinth import cfg
|
from plinth import cfg
|
||||||
from plinth.errors import ActionError
|
from plinth.errors import ActionError
|
||||||
@ -236,3 +241,73 @@ def _log_command(cmd):
|
|||||||
cmd = ' '.join([shlex.quote(part) for part in cmd])
|
cmd = ' '.join([shlex.quote(part) for part in cmd])
|
||||||
|
|
||||||
logger.info('%s%s %s', user, prompt, cmd)
|
logger.info('%s%s %s', user, prompt, cmd)
|
||||||
|
|
||||||
|
|
||||||
|
def privileged(func):
|
||||||
|
"""Mark a method as allowed to be run as privileged method.
|
||||||
|
|
||||||
|
This decorator is to mark any method as needing to be executed with
|
||||||
|
superuser privileges. This is necessary because the primary FreedomBox
|
||||||
|
service daemon runs as a regular user and has no special privileges. When
|
||||||
|
performing system operations, FreedomBox service will either communicate
|
||||||
|
with privileged daemons such as NetworkManager and systemd, or spawns a
|
||||||
|
separate process with higher privileges. When spawning a separate process
|
||||||
|
all the action parameters need to serialized, communicated to the process
|
||||||
|
and then de-serialized inside the process. The return value also need to
|
||||||
|
undergo such serialization and de-serialization. This decorator makes this
|
||||||
|
task simpler.
|
||||||
|
|
||||||
|
A call to a decorated method will be serialized into a sudo call (or later
|
||||||
|
into a D-Bus call). The method arguments are turned to JSON and method is
|
||||||
|
called with superuser privileges. As arguments are de-serialized, they are
|
||||||
|
verified for type before the actual call as superuser. Return values are
|
||||||
|
serialized and returned where they are de-serialized. Exceptions are also
|
||||||
|
serialized and de-serialized. The decorator wrapper code will either return
|
||||||
|
the value or raise exception.
|
||||||
|
|
||||||
|
For a method to be decorated, the method must have type annotations for all
|
||||||
|
of its parameters and should not use keyword-only arguments. It must also
|
||||||
|
be in a module named privileged.py directly under the application similar
|
||||||
|
to models.py, views.py and urls.py. Currently supported types are bool,
|
||||||
|
int, float, str, dict/Dict, list/List, Optional and Union.
|
||||||
|
|
||||||
|
"""
|
||||||
|
setattr(func, '_privileged', True)
|
||||||
|
|
||||||
|
_check_privileged_action_arguments(func)
|
||||||
|
|
||||||
|
@functools.wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
module_name = _get_privileged_action_module_name(func)
|
||||||
|
action_name = func.__name__
|
||||||
|
json_args = json.dumps({'args': args, 'kwargs': kwargs})
|
||||||
|
return_value = superuser_run('actions', [module_name, action_name],
|
||||||
|
input=json_args.encode())
|
||||||
|
return_value = json.loads(return_value)
|
||||||
|
if return_value['result'] == 'success':
|
||||||
|
return return_value['return']
|
||||||
|
|
||||||
|
module = importlib.import_module(return_value['exception']['module'])
|
||||||
|
exception = getattr(module, return_value['exception']['name'])
|
||||||
|
raise exception(*return_value['exception']['args'])
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def _check_privileged_action_arguments(func):
|
||||||
|
"""Check that a privileged action has well defined types."""
|
||||||
|
argspec = inspect.getfullargspec(func)
|
||||||
|
if (argspec.varargs or argspec.varkw or argspec.kwonlyargs
|
||||||
|
or argspec.kwonlydefaults):
|
||||||
|
raise SyntaxError('Actions must not have variable args')
|
||||||
|
|
||||||
|
for arg in argspec.args:
|
||||||
|
if arg not in argspec.annotations:
|
||||||
|
raise SyntaxError('All arguments must be annotated')
|
||||||
|
|
||||||
|
|
||||||
|
def _get_privileged_action_module_name(func):
|
||||||
|
"""Figure out the module name of a privileged action."""
|
||||||
|
module_name = func.__module__
|
||||||
|
module = sys.modules[module_name]
|
||||||
|
return module.__package__.rpartition('.')[2]
|
||||||
|
|||||||
@ -8,6 +8,7 @@ import importlib
|
|||||||
import logging
|
import logging
|
||||||
import pathlib
|
import pathlib
|
||||||
import re
|
import re
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
import django
|
import django
|
||||||
|
|
||||||
@ -83,12 +84,27 @@ def get_modules_to_load():
|
|||||||
|
|
||||||
modules = []
|
modules = []
|
||||||
for file_ in files:
|
for file_ in files:
|
||||||
with file_.open() as file_handle:
|
module = _get_module_import_paths_from_file(file_)
|
||||||
for line in file_handle:
|
if module:
|
||||||
line = re.sub('#.*', '', line)
|
modules.append(module)
|
||||||
line = line.strip()
|
|
||||||
if line:
|
|
||||||
modules.append(line)
|
|
||||||
|
|
||||||
_modules_to_load = modules
|
_modules_to_load = modules
|
||||||
return modules
|
return modules
|
||||||
|
|
||||||
|
|
||||||
|
def get_module_import_path(module_name: str) -> str:
|
||||||
|
"""Return the import path for a module."""
|
||||||
|
file_path = pathlib.Path(cfg.config_dir) / 'modules-enabled' / module_name
|
||||||
|
return _get_module_import_paths_from_file(file_path)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_module_import_paths_from_file(file_path: str) -> Optional[str]:
|
||||||
|
"""Read a module's import path from a file."""
|
||||||
|
with file_path.open() as file_handle:
|
||||||
|
for line in file_handle:
|
||||||
|
line = re.sub('#.*', '', line)
|
||||||
|
line = line.strip()
|
||||||
|
if line:
|
||||||
|
return line
|
||||||
|
|
||||||
|
return None
|
||||||
|
|||||||
@ -7,18 +7,19 @@ description of the expectations.
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
import shutil
|
import shutil
|
||||||
import tempfile
|
import tempfile
|
||||||
from unittest.mock import patch
|
from unittest.mock import call, patch
|
||||||
|
|
||||||
import apt_pkg
|
import apt_pkg
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from plinth import cfg
|
from plinth import cfg
|
||||||
from plinth.actions import _log_command as log_command
|
from plinth.actions import _log_command as log_command
|
||||||
from plinth.actions import run, superuser_run
|
from plinth.actions import privileged, run, superuser_run
|
||||||
from plinth.errors import ActionError
|
from plinth.errors import ActionError
|
||||||
|
|
||||||
|
|
||||||
@ -193,3 +194,96 @@ def test_log_command(logger, cmd, message):
|
|||||||
logger.assert_called_once()
|
logger.assert_called_once()
|
||||||
args = logger.call_args[0]
|
args = logger.call_args[0]
|
||||||
assert message == args[0] % args[1:]
|
assert message == args[0] % args[1:]
|
||||||
|
|
||||||
|
|
||||||
|
def test_privileged_properties():
|
||||||
|
"""Test that privileged decorator sets proper properties on the method."""
|
||||||
|
|
||||||
|
def func():
|
||||||
|
return
|
||||||
|
|
||||||
|
wrapped_func = privileged(func)
|
||||||
|
assert wrapped_func._privileged
|
||||||
|
assert wrapped_func.__wrapped__ == func
|
||||||
|
|
||||||
|
|
||||||
|
def test_privileged_argument_vararg_check():
|
||||||
|
"""Test that privileged decorator checks for simple arguments."""
|
||||||
|
|
||||||
|
def func_with_varargs(*_args):
|
||||||
|
return
|
||||||
|
|
||||||
|
def func_with_kwargs(**_kwargs):
|
||||||
|
return
|
||||||
|
|
||||||
|
def func_with_kwonlyargs(*_args, _kwarg):
|
||||||
|
return
|
||||||
|
|
||||||
|
def func_with_kwonlydefaults(*_args, _kwargs='foo'):
|
||||||
|
return
|
||||||
|
|
||||||
|
for func in (func_with_varargs, func_with_kwargs, func_with_kwonlyargs,
|
||||||
|
func_with_kwonlydefaults):
|
||||||
|
with pytest.raises(SyntaxError):
|
||||||
|
privileged(func)
|
||||||
|
|
||||||
|
|
||||||
|
def test_privileged_argument_annotation_check():
|
||||||
|
"""Test that privileged decorator checks for annotations to arguments."""
|
||||||
|
|
||||||
|
def func1(_a):
|
||||||
|
return
|
||||||
|
|
||||||
|
def func2(_a: int, _b):
|
||||||
|
return
|
||||||
|
|
||||||
|
def func_valid(_a: int, _b: dict[int, str]):
|
||||||
|
return
|
||||||
|
|
||||||
|
for func in (func1, func2):
|
||||||
|
with pytest.raises(SyntaxError):
|
||||||
|
privileged(func)
|
||||||
|
|
||||||
|
privileged(func_valid)
|
||||||
|
|
||||||
|
|
||||||
|
@patch('plinth.actions.superuser_run')
|
||||||
|
def test_privileged_method_call(superuser_run_):
|
||||||
|
"""Test that privileged method calls the superuser action properly."""
|
||||||
|
|
||||||
|
def func_with_args(_a: int, _b: str, _c: int = 1, _d: str = 'dval',
|
||||||
|
_e: str = 'eval'):
|
||||||
|
return
|
||||||
|
|
||||||
|
superuser_run_.return_value = json.dumps({
|
||||||
|
'result': 'success',
|
||||||
|
'return': 'bar'
|
||||||
|
})
|
||||||
|
wrapped_func = privileged(func_with_args)
|
||||||
|
return_value = wrapped_func(1, 'bval', None, _d='dnewval')
|
||||||
|
assert return_value == 'bar'
|
||||||
|
|
||||||
|
input_ = {'args': [1, 'bval', None], 'kwargs': {'_d': 'dnewval'}}
|
||||||
|
input_ = json.dumps(input_)
|
||||||
|
superuser_run_.assert_has_calls(
|
||||||
|
[call('actions', ['tests', 'func_with_args'], input=input_.encode())])
|
||||||
|
|
||||||
|
|
||||||
|
@patch('plinth.actions.superuser_run')
|
||||||
|
def test_privileged_method_exceptions(superuser_run_):
|
||||||
|
"""Test that exceptions on privileged methods are return properly."""
|
||||||
|
|
||||||
|
def func_with_exception():
|
||||||
|
raise TypeError('type error')
|
||||||
|
|
||||||
|
superuser_run_.return_value = json.dumps({
|
||||||
|
'result': 'exception',
|
||||||
|
'exception': {
|
||||||
|
'module': 'builtins',
|
||||||
|
'name': 'TypeError',
|
||||||
|
'args': ['type error']
|
||||||
|
}
|
||||||
|
})
|
||||||
|
wrapped_func = privileged(func_with_exception)
|
||||||
|
with pytest.raises(TypeError, match='type error'):
|
||||||
|
wrapped_func()
|
||||||
|
|||||||
179
plinth/tests/test_actions_actions.py
Normal file
179
plinth/tests/test_actions_actions.py
Normal file
@ -0,0 +1,179 @@
|
|||||||
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
|
"""
|
||||||
|
Test module for code that runs methods are privileged actions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import typing
|
||||||
|
from unittest.mock import call as mock_call
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from plinth.actions import privileged
|
||||||
|
|
||||||
|
actions_name = 'actions'
|
||||||
|
|
||||||
|
|
||||||
|
@patch('importlib.import_module')
|
||||||
|
@patch('plinth.module_loader.get_module_import_path')
|
||||||
|
@patch('os.getuid')
|
||||||
|
def test_call_syntax_checks(getuid, get_module_import_path, import_module,
|
||||||
|
actions_module):
|
||||||
|
"""Test that calling a method results in proper syntax checks."""
|
||||||
|
call = actions_module._call
|
||||||
|
|
||||||
|
# Test for root permissions
|
||||||
|
getuid.return_value = 1000
|
||||||
|
with pytest.raises(PermissionError):
|
||||||
|
call('x-module', 'x-action', {})
|
||||||
|
|
||||||
|
# Module name validation
|
||||||
|
getuid.return_value = 0
|
||||||
|
with pytest.raises(SyntaxError, match='Invalid module name'):
|
||||||
|
call('foo.bar', 'x-action', {})
|
||||||
|
|
||||||
|
# Module import test
|
||||||
|
get_module_import_path.return_value = 'plinth.modules.test_module'
|
||||||
|
import_module.side_effect = ModuleNotFoundError
|
||||||
|
with pytest.raises(SyntaxError, match='Specified module not found'):
|
||||||
|
call('test_module', 'x-action', {})
|
||||||
|
|
||||||
|
import_module.assert_has_calls(
|
||||||
|
[mock_call('plinth.modules.test_module.privileged')])
|
||||||
|
|
||||||
|
# Finding action in a module
|
||||||
|
module = type('', (), {})
|
||||||
|
import_module.side_effect = None
|
||||||
|
import_module.return_value = module
|
||||||
|
with pytest.raises(SyntaxError, match='Specified action not found'):
|
||||||
|
call('test_module', 'x-action', {})
|
||||||
|
|
||||||
|
# Checking if action is privileged
|
||||||
|
def unprivileged_func():
|
||||||
|
pass
|
||||||
|
|
||||||
|
setattr(module, 'func', unprivileged_func)
|
||||||
|
with pytest.raises(SyntaxError,
|
||||||
|
match='Specified action is not privileged action'):
|
||||||
|
call('test-module', 'func', {})
|
||||||
|
|
||||||
|
# Argument validation
|
||||||
|
@privileged
|
||||||
|
def func():
|
||||||
|
return 'foo'
|
||||||
|
|
||||||
|
setattr(module, 'func', func)
|
||||||
|
with pytest.raises(SyntaxError, match='Invalid arguments format'):
|
||||||
|
call('test-module', 'func', {})
|
||||||
|
|
||||||
|
# Successful call
|
||||||
|
return_value = call('test-module', 'func', {'args': [], 'kwargs': {}})
|
||||||
|
assert return_value == {'result': 'success', 'return': 'foo'}
|
||||||
|
|
||||||
|
# Exception call
|
||||||
|
@privileged
|
||||||
|
def exception_func():
|
||||||
|
raise RuntimeError('foo exception')
|
||||||
|
|
||||||
|
setattr(module, 'func', exception_func)
|
||||||
|
return_value = call('test-module', 'func', {'args': [], 'kwargs': {}})
|
||||||
|
assert return_value == {
|
||||||
|
'result': 'exception',
|
||||||
|
'exception': {
|
||||||
|
'module': 'builtins',
|
||||||
|
'name': 'RuntimeError',
|
||||||
|
'args': ('foo exception', )
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_assert_valid_arguments(actions_module):
|
||||||
|
"""Test that checking valid arguments works."""
|
||||||
|
assert_valid = actions_module._assert_valid_arguments
|
||||||
|
|
||||||
|
values = [
|
||||||
|
None, [], 10, {}, {
|
||||||
|
'args': []
|
||||||
|
}, {
|
||||||
|
'kwargs': {}
|
||||||
|
}, {
|
||||||
|
'args': {},
|
||||||
|
'kwargs': {}
|
||||||
|
}, {
|
||||||
|
'args': [],
|
||||||
|
'kwargs': []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
for value in values:
|
||||||
|
with pytest.raises(SyntaxError, match='Invalid arguments format'):
|
||||||
|
assert_valid(lambda: None, value)
|
||||||
|
|
||||||
|
def func(a: int, b: str, c: int = 3, d: str = 'foo'):
|
||||||
|
pass
|
||||||
|
|
||||||
|
with pytest.raises(SyntaxError, match='Too many arguments'):
|
||||||
|
assert_valid(func, {'args': [1, 2, 3], 'kwargs': {'c': 3, 'd': 4}})
|
||||||
|
|
||||||
|
with pytest.raises(SyntaxError, match='Too many arguments'):
|
||||||
|
assert_valid(func, {'args': [1, 2, 3, 4, 5], 'kwargs': {}})
|
||||||
|
|
||||||
|
with pytest.raises(SyntaxError, match='Too many arguments'):
|
||||||
|
assert_valid(func, {
|
||||||
|
'args': [],
|
||||||
|
'kwargs': {
|
||||||
|
'a': 1,
|
||||||
|
'b': '2',
|
||||||
|
'c': 3,
|
||||||
|
'd': '4',
|
||||||
|
'e': 5
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
with pytest.raises(SyntaxError, match='Argument not provided: b'):
|
||||||
|
assert_valid(func, {'args': [1], 'kwargs': {}})
|
||||||
|
|
||||||
|
with pytest.raises(SyntaxError, match='Unknown argument: e'):
|
||||||
|
assert_valid(func, {'args': [1, '2'], 'kwargs': {'e': 5}})
|
||||||
|
|
||||||
|
with pytest.raises(SyntaxError, match='Duplicate argument: c'):
|
||||||
|
assert_valid(func, {'args': [1, '2', 3], 'kwargs': {'c': 4}})
|
||||||
|
|
||||||
|
with pytest.raises(TypeError, match='Expected type str for arg #1'):
|
||||||
|
assert_valid(func, {'args': [1, 2], 'kwargs': {}})
|
||||||
|
|
||||||
|
with pytest.raises(TypeError, match='Expected type int for arg c'):
|
||||||
|
assert_valid(func, {'args': [1, '2'], 'kwargs': {'c': '3'}})
|
||||||
|
|
||||||
|
|
||||||
|
def test_assert_valid_type(actions_module):
|
||||||
|
"""Test that type validation works as expected."""
|
||||||
|
assert_valid = actions_module._assert_valid_type
|
||||||
|
|
||||||
|
assert_valid(None, None, typing.Any)
|
||||||
|
|
||||||
|
# Invalid values for int, str, float and Optional
|
||||||
|
values = [[1, bool], ['foo', int], [1, str], [1, float],
|
||||||
|
[1, typing.Optional[str]], [1.1, typing.Union[int, str]],
|
||||||
|
[1, list], [1, dict], [[1], list[str]],
|
||||||
|
[{
|
||||||
|
'a': 'b'
|
||||||
|
}, dict[int, str]], [{
|
||||||
|
1: 2
|
||||||
|
}, dict[int, str]]]
|
||||||
|
for value in values:
|
||||||
|
with pytest.raises(TypeError):
|
||||||
|
assert_valid('arg', *value)
|
||||||
|
|
||||||
|
# Valid values
|
||||||
|
assert_valid('arg', True, bool)
|
||||||
|
assert_valid('arg', 1, int)
|
||||||
|
assert_valid('arg', '1', str)
|
||||||
|
assert_valid('arg', 1.1, float)
|
||||||
|
assert_valid('arg', None, typing.Optional[int])
|
||||||
|
assert_valid('arg', 1, typing.Optional[int])
|
||||||
|
assert_valid('arg', 1, typing.Union[int, str])
|
||||||
|
assert_valid('arg', '1', typing.Union[int, str])
|
||||||
|
assert_valid('arg', [], list[int])
|
||||||
|
assert_valid('arg', ['foo'], list[str])
|
||||||
|
assert_valid('arg', {}, dict[int, str])
|
||||||
|
assert_valid('arg', {1: 'foo'}, dict[int, str])
|
||||||
15
plinth/tests/test_module_loader.py
Normal file
15
plinth/tests/test_module_loader.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
|
"""
|
||||||
|
Test module for module loading mechanism.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from unittest.mock import mock_open, patch
|
||||||
|
|
||||||
|
from plinth import module_loader
|
||||||
|
|
||||||
|
|
||||||
|
@patch('pathlib.Path.open', mock_open(read_data='plinth.modules.apache\n'))
|
||||||
|
def test_get_module_import_path():
|
||||||
|
"""Returning the module import path."""
|
||||||
|
import_path = module_loader.get_module_import_path('apache')
|
||||||
|
assert import_path == 'plinth.modules.apache'
|
||||||
Loading…
x
Reference in New Issue
Block a user