From 4fed6921d686cf1e7aefc5adaa9b1a589e3eb7ad Mon Sep 17 00:00:00 2001 From: Sunil Mohan Adapa Date: Tue, 14 Jun 2022 10:25:52 -0700 Subject: [PATCH] actions: Add a decorator for marking superuser actions Any privileged action (a method) can be marked as such with the new decorator. A call to the method will be serialized into a sudo call (or later into a D-Bus call). The method arguments are turned to JSON and method is called as superuser. Arguments are de-serialized and are verified for type before the actual call as superuser. Return values are serialized and returned where they are de-serialized. Exceptions are also serialized and de-serialized. The method must have be strictly typed and should not have keyword-only arguments. Currently supported types are int, float, str, dict/Dict, list/List and Optional. Signed-off-by: Sunil Mohan Adapa Reviewed-by: James Valleroy --- actions/actions | 196 +++++++++++++++++++++++++++ plinth/actions.py | 75 ++++++++++ plinth/module_loader.py | 28 +++- plinth/tests/test_actions.py | 98 +++++++++++++- plinth/tests/test_actions_actions.py | 179 ++++++++++++++++++++++++ plinth/tests/test_module_loader.py | 15 ++ 6 files changed, 583 insertions(+), 8 deletions(-) create mode 100755 actions/actions create mode 100644 plinth/tests/test_actions_actions.py create mode 100644 plinth/tests/test_module_loader.py diff --git a/actions/actions b/actions/actions new file mode 100755 index 000000000..1cebe5eae --- /dev/null +++ b/actions/actions @@ -0,0 +1,196 @@ +#!/usr/bin/python3 +# SPDX-License-Identifier: AGPL-3.0-or-later + +import argparse +import importlib +import inspect +import json +import logging +import os +import sys +import typing + +import plinth.log +from plinth import cfg, module_loader + +EXIT_SYNTAX = 10 +EXIT_PERM = 20 + +logger = logging.getLogger(__name__) + + +def main(): + """Parse arguments.""" + plinth.log.action_init() + + parser = argparse.ArgumentParser() + parser.add_argument('module', help='Module to trigger action in') + parser.add_argument('action', help='Action to trigger in module') + args = parser.parse_args() + + try: + try: + arguments = json.loads(sys.stdin.read()) + except json.JSONDecodeError as exception: + raise SyntaxError('Arguments on stdin not JSON.') from exception + + return_value = _call(args.module, args.action, arguments) + print(json.dumps(return_value)) + except PermissionError as exception: + logger.error(exception.args[0]) + sys.exit(EXIT_PERM) + except SyntaxError as exception: + logger.error(exception.args[0]) + sys.exit(EXIT_SYNTAX) + except TypeError as exception: + logger.error(exception.args[0]) + sys.exit(EXIT_SYNTAX) + except Exception as exception: + logger.exception(exception) + sys.exit(1) + + +def _call(module_name, action_name, arguments): + """Import the module and run action as superuser""" + if os.getuid() != 0: + raise PermissionError('This action is reserved for root') + + if '.' in module_name: + raise SyntaxError('Invalid module name') + + cfg.read() + import_path = module_loader.get_module_import_path(module_name) + try: + module = importlib.import_module(import_path + '.privileged') + except ModuleNotFoundError as exception: + raise SyntaxError('Specified module not found') from exception + + try: + action = getattr(module, action_name) + except AttributeError as exception: + raise SyntaxError('Specified action not found') from exception + + if not getattr(action, '_privileged', None): + raise SyntaxError('Specified action is not privileged action') + + func = getattr(action, '__wrapped__') + + _assert_valid_arguments(func, arguments) + + try: + return_values = func(*arguments['args'], **arguments['kwargs']) + return_value = {'result': 'success', 'return': return_values} + except Exception as exception: + logger.exception('Error executing action: %s', exception) + return_value = { + 'result': 'exception', + 'exception': { + 'module': type(exception).__module__, + 'name': type(exception).__name__, + 'args': exception.args + } + } + + return return_value + + +def _assert_valid_arguments(func, arguments): + """Check the names, types and completeness of the arguments passed.""" + # Check if arguments match types + if not isinstance(arguments, dict): + raise SyntaxError('Invalid arguments format') + + if 'args' not in arguments or 'kwargs' not in arguments: + raise SyntaxError('Invalid arguments format') + + args = arguments['args'] + kwargs = arguments['kwargs'] + if not isinstance(args, list) or not isinstance(kwargs, dict): + raise SyntaxError('Invalid arguments format') + + argspec = inspect.getfullargspec(func) + if len(args) + len(kwargs) > len(argspec.args): + raise SyntaxError('Too many arguments') + + no_defaults = len(argspec.args) + if argspec.defaults: + no_defaults -= len(argspec.defaults) + + for key in argspec.args[len(args):no_defaults]: + if key not in kwargs: + raise SyntaxError(f'Argument not provided: {key}') + + for key, value in kwargs.items(): + if key not in argspec.args: + raise SyntaxError(f'Unknown argument: {key}') + + if argspec.args.index(key) < len(args): + raise SyntaxError(f'Duplicate argument: {key}') + + _assert_valid_type(f'arg {key}', value, argspec.annotations[key]) + + for index, arg in enumerate(args): + annotation = argspec.annotations[argspec.args[index]] + _assert_valid_type(f'arg #{index}', arg, annotation) + + +def _assert_valid_type(arg_name, value, annotation): + """Assert that the type of argument value matches the annotation.""" + if annotation == typing.Any: + return + + NoneType = type(None) + if annotation == NoneType: + if value is not None: + raise TypeError('Expected None for {arg_name}') + + return + + basic_types = {bool, int, str, float} + if annotation in basic_types: + if not isinstance(value, annotation): + raise TypeError( + f'Expected type {annotation.__name__} for {arg_name}') + + return + + if not hasattr(annotation, '__origin__'): + raise TypeError('Unsupported annotation type') + + if annotation.__origin__ == typing.Union: + for arg in annotation.__args__: + try: + _assert_valid_type(arg_name, value, arg) + return + except TypeError: + pass + + raise TypeError(f'Expected one of unioned types for {arg_name}') + + if annotation.__origin__ == list: + if not isinstance(value, list): + raise TypeError(f'Expected type list for {arg_name}') + + for index, inner_item in enumerate(value): + _assert_valid_type(f'{arg_name}[{index}]', inner_item, + annotation.__args__[0]) + + return + + if annotation.__origin__ == dict: + if not isinstance(value, dict): + raise TypeError(f'Expected type dict for {arg_name}') + + for inner_key, inner_value in value.items(): + _assert_valid_type(f'{arg_name}[{inner_key}]', inner_key, + annotation.__args__[0]) + _assert_valid_type(f'{arg_name}[{inner_value}]', inner_value, + annotation.__args__[1]) + + return + + raise TypeError('Unsupported annotation type') + + +if __name__ == '__main__': + main() diff --git a/plinth/actions.py b/plinth/actions.py index fb4977e72..14d3599c1 100644 --- a/plinth/actions.py +++ b/plinth/actions.py @@ -75,11 +75,16 @@ Actions run commands with this contract (version 1.1): """ +import functools +import importlib +import inspect +import json import logging import os import re import shlex import subprocess +import sys from plinth import cfg from plinth.errors import ActionError @@ -236,3 +241,73 @@ def _log_command(cmd): cmd = ' '.join([shlex.quote(part) for part in cmd]) logger.info('%s%s %s', user, prompt, cmd) + + +def privileged(func): + """Mark a method as allowed to be run as privileged method. + + This decorator is to mark any method as needing to be executed with + superuser privileges. This is necessary because the primary FreedomBox + service daemon runs as a regular user and has no special privileges. When + performing system operations, FreedomBox service will either communicate + with privileged daemons such as NetworkManager and systemd, or spawns a + separate process with higher privileges. When spawning a separate process + all the action parameters need to serialized, communicated to the process + and then de-serialized inside the process. The return value also need to + undergo such serialization and de-serialization. This decorator makes this + task simpler. + + A call to a decorated method will be serialized into a sudo call (or later + into a D-Bus call). The method arguments are turned to JSON and method is + called with superuser privileges. As arguments are de-serialized, they are + verified for type before the actual call as superuser. Return values are + serialized and returned where they are de-serialized. Exceptions are also + serialized and de-serialized. The decorator wrapper code will either return + the value or raise exception. + + For a method to be decorated, the method must have type annotations for all + of its parameters and should not use keyword-only arguments. It must also + be in a module named privileged.py directly under the application similar + to models.py, views.py and urls.py. Currently supported types are bool, + int, float, str, dict/Dict, list/List, Optional and Union. + + """ + setattr(func, '_privileged', True) + + _check_privileged_action_arguments(func) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + module_name = _get_privileged_action_module_name(func) + action_name = func.__name__ + json_args = json.dumps({'args': args, 'kwargs': kwargs}) + return_value = superuser_run('actions', [module_name, action_name], + input=json_args.encode()) + return_value = json.loads(return_value) + if return_value['result'] == 'success': + return return_value['return'] + + module = importlib.import_module(return_value['exception']['module']) + exception = getattr(module, return_value['exception']['name']) + raise exception(*return_value['exception']['args']) + + return wrapper + + +def _check_privileged_action_arguments(func): + """Check that a privileged action has well defined types.""" + argspec = inspect.getfullargspec(func) + if (argspec.varargs or argspec.varkw or argspec.kwonlyargs + or argspec.kwonlydefaults): + raise SyntaxError('Actions must not have variable args') + + for arg in argspec.args: + if arg not in argspec.annotations: + raise SyntaxError('All arguments must be annotated') + + +def _get_privileged_action_module_name(func): + """Figure out the module name of a privileged action.""" + module_name = func.__module__ + module = sys.modules[module_name] + return module.__package__.rpartition('.')[2] diff --git a/plinth/module_loader.py b/plinth/module_loader.py index 02a6bdd68..190601520 100644 --- a/plinth/module_loader.py +++ b/plinth/module_loader.py @@ -8,6 +8,7 @@ import importlib import logging import pathlib import re +from typing import Optional import django @@ -83,12 +84,27 @@ def get_modules_to_load(): modules = [] for file_ in files: - with file_.open() as file_handle: - for line in file_handle: - line = re.sub('#.*', '', line) - line = line.strip() - if line: - modules.append(line) + module = _get_module_import_paths_from_file(file_) + if module: + modules.append(module) _modules_to_load = modules return modules + + +def get_module_import_path(module_name: str) -> str: + """Return the import path for a module.""" + file_path = pathlib.Path(cfg.config_dir) / 'modules-enabled' / module_name + return _get_module_import_paths_from_file(file_path) + + +def _get_module_import_paths_from_file(file_path: str) -> Optional[str]: + """Read a module's import path from a file.""" + with file_path.open() as file_handle: + for line in file_handle: + line = re.sub('#.*', '', line) + line = line.strip() + if line: + return line + + return None diff --git a/plinth/tests/test_actions.py b/plinth/tests/test_actions.py index f45cb1b2b..ca7335747 100644 --- a/plinth/tests/test_actions.py +++ b/plinth/tests/test_actions.py @@ -7,18 +7,19 @@ description of the expectations. """ +import json import os import pathlib import shutil import tempfile -from unittest.mock import patch +from unittest.mock import call, patch import apt_pkg import pytest from plinth import cfg from plinth.actions import _log_command as log_command -from plinth.actions import run, superuser_run +from plinth.actions import privileged, run, superuser_run from plinth.errors import ActionError @@ -193,3 +194,96 @@ def test_log_command(logger, cmd, message): logger.assert_called_once() args = logger.call_args[0] assert message == args[0] % args[1:] + + +def test_privileged_properties(): + """Test that privileged decorator sets proper properties on the method.""" + + def func(): + return + + wrapped_func = privileged(func) + assert wrapped_func._privileged + assert wrapped_func.__wrapped__ == func + + +def test_privileged_argument_vararg_check(): + """Test that privileged decorator checks for simple arguments.""" + + def func_with_varargs(*_args): + return + + def func_with_kwargs(**_kwargs): + return + + def func_with_kwonlyargs(*_args, _kwarg): + return + + def func_with_kwonlydefaults(*_args, _kwargs='foo'): + return + + for func in (func_with_varargs, func_with_kwargs, func_with_kwonlyargs, + func_with_kwonlydefaults): + with pytest.raises(SyntaxError): + privileged(func) + + +def test_privileged_argument_annotation_check(): + """Test that privileged decorator checks for annotations to arguments.""" + + def func1(_a): + return + + def func2(_a: int, _b): + return + + def func_valid(_a: int, _b: dict[int, str]): + return + + for func in (func1, func2): + with pytest.raises(SyntaxError): + privileged(func) + + privileged(func_valid) + + +@patch('plinth.actions.superuser_run') +def test_privileged_method_call(superuser_run_): + """Test that privileged method calls the superuser action properly.""" + + def func_with_args(_a: int, _b: str, _c: int = 1, _d: str = 'dval', + _e: str = 'eval'): + return + + superuser_run_.return_value = json.dumps({ + 'result': 'success', + 'return': 'bar' + }) + wrapped_func = privileged(func_with_args) + return_value = wrapped_func(1, 'bval', None, _d='dnewval') + assert return_value == 'bar' + + input_ = {'args': [1, 'bval', None], 'kwargs': {'_d': 'dnewval'}} + input_ = json.dumps(input_) + superuser_run_.assert_has_calls( + [call('actions', ['tests', 'func_with_args'], input=input_.encode())]) + + +@patch('plinth.actions.superuser_run') +def test_privileged_method_exceptions(superuser_run_): + """Test that exceptions on privileged methods are return properly.""" + + def func_with_exception(): + raise TypeError('type error') + + superuser_run_.return_value = json.dumps({ + 'result': 'exception', + 'exception': { + 'module': 'builtins', + 'name': 'TypeError', + 'args': ['type error'] + } + }) + wrapped_func = privileged(func_with_exception) + with pytest.raises(TypeError, match='type error'): + wrapped_func() diff --git a/plinth/tests/test_actions_actions.py b/plinth/tests/test_actions_actions.py new file mode 100644 index 000000000..9a5a84673 --- /dev/null +++ b/plinth/tests/test_actions_actions.py @@ -0,0 +1,179 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +""" +Test module for code that runs methods are privileged actions. +""" + +import typing +from unittest.mock import call as mock_call +from unittest.mock import patch + +import pytest + +from plinth.actions import privileged + +actions_name = 'actions' + + +@patch('importlib.import_module') +@patch('plinth.module_loader.get_module_import_path') +@patch('os.getuid') +def test_call_syntax_checks(getuid, get_module_import_path, import_module, + actions_module): + """Test that calling a method results in proper syntax checks.""" + call = actions_module._call + + # Test for root permissions + getuid.return_value = 1000 + with pytest.raises(PermissionError): + call('x-module', 'x-action', {}) + + # Module name validation + getuid.return_value = 0 + with pytest.raises(SyntaxError, match='Invalid module name'): + call('foo.bar', 'x-action', {}) + + # Module import test + get_module_import_path.return_value = 'plinth.modules.test_module' + import_module.side_effect = ModuleNotFoundError + with pytest.raises(SyntaxError, match='Specified module not found'): + call('test_module', 'x-action', {}) + + import_module.assert_has_calls( + [mock_call('plinth.modules.test_module.privileged')]) + + # Finding action in a module + module = type('', (), {}) + import_module.side_effect = None + import_module.return_value = module + with pytest.raises(SyntaxError, match='Specified action not found'): + call('test_module', 'x-action', {}) + + # Checking if action is privileged + def unprivileged_func(): + pass + + setattr(module, 'func', unprivileged_func) + with pytest.raises(SyntaxError, + match='Specified action is not privileged action'): + call('test-module', 'func', {}) + + # Argument validation + @privileged + def func(): + return 'foo' + + setattr(module, 'func', func) + with pytest.raises(SyntaxError, match='Invalid arguments format'): + call('test-module', 'func', {}) + + # Successful call + return_value = call('test-module', 'func', {'args': [], 'kwargs': {}}) + assert return_value == {'result': 'success', 'return': 'foo'} + + # Exception call + @privileged + def exception_func(): + raise RuntimeError('foo exception') + + setattr(module, 'func', exception_func) + return_value = call('test-module', 'func', {'args': [], 'kwargs': {}}) + assert return_value == { + 'result': 'exception', + 'exception': { + 'module': 'builtins', + 'name': 'RuntimeError', + 'args': ('foo exception', ) + } + } + + +def test_assert_valid_arguments(actions_module): + """Test that checking valid arguments works.""" + assert_valid = actions_module._assert_valid_arguments + + values = [ + None, [], 10, {}, { + 'args': [] + }, { + 'kwargs': {} + }, { + 'args': {}, + 'kwargs': {} + }, { + 'args': [], + 'kwargs': [] + } + ] + for value in values: + with pytest.raises(SyntaxError, match='Invalid arguments format'): + assert_valid(lambda: None, value) + + def func(a: int, b: str, c: int = 3, d: str = 'foo'): + pass + + with pytest.raises(SyntaxError, match='Too many arguments'): + assert_valid(func, {'args': [1, 2, 3], 'kwargs': {'c': 3, 'd': 4}}) + + with pytest.raises(SyntaxError, match='Too many arguments'): + assert_valid(func, {'args': [1, 2, 3, 4, 5], 'kwargs': {}}) + + with pytest.raises(SyntaxError, match='Too many arguments'): + assert_valid(func, { + 'args': [], + 'kwargs': { + 'a': 1, + 'b': '2', + 'c': 3, + 'd': '4', + 'e': 5 + } + }) + + with pytest.raises(SyntaxError, match='Argument not provided: b'): + assert_valid(func, {'args': [1], 'kwargs': {}}) + + with pytest.raises(SyntaxError, match='Unknown argument: e'): + assert_valid(func, {'args': [1, '2'], 'kwargs': {'e': 5}}) + + with pytest.raises(SyntaxError, match='Duplicate argument: c'): + assert_valid(func, {'args': [1, '2', 3], 'kwargs': {'c': 4}}) + + with pytest.raises(TypeError, match='Expected type str for arg #1'): + assert_valid(func, {'args': [1, 2], 'kwargs': {}}) + + with pytest.raises(TypeError, match='Expected type int for arg c'): + assert_valid(func, {'args': [1, '2'], 'kwargs': {'c': '3'}}) + + +def test_assert_valid_type(actions_module): + """Test that type validation works as expected.""" + assert_valid = actions_module._assert_valid_type + + assert_valid(None, None, typing.Any) + + # Invalid values for int, str, float and Optional + values = [[1, bool], ['foo', int], [1, str], [1, float], + [1, typing.Optional[str]], [1.1, typing.Union[int, str]], + [1, list], [1, dict], [[1], list[str]], + [{ + 'a': 'b' + }, dict[int, str]], [{ + 1: 2 + }, dict[int, str]]] + for value in values: + with pytest.raises(TypeError): + assert_valid('arg', *value) + + # Valid values + assert_valid('arg', True, bool) + assert_valid('arg', 1, int) + assert_valid('arg', '1', str) + assert_valid('arg', 1.1, float) + assert_valid('arg', None, typing.Optional[int]) + assert_valid('arg', 1, typing.Optional[int]) + assert_valid('arg', 1, typing.Union[int, str]) + assert_valid('arg', '1', typing.Union[int, str]) + assert_valid('arg', [], list[int]) + assert_valid('arg', ['foo'], list[str]) + assert_valid('arg', {}, dict[int, str]) + assert_valid('arg', {1: 'foo'}, dict[int, str]) diff --git a/plinth/tests/test_module_loader.py b/plinth/tests/test_module_loader.py new file mode 100644 index 000000000..90e8f27bc --- /dev/null +++ b/plinth/tests/test_module_loader.py @@ -0,0 +1,15 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +""" +Test module for module loading mechanism. +""" + +from unittest.mock import mock_open, patch + +from plinth import module_loader + + +@patch('pathlib.Path.open', mock_open(read_data='plinth.modules.apache\n')) +def test_get_module_import_path(): + """Returning the module import path.""" + import_path = module_loader.get_module_import_path('apache') + assert import_path == 'plinth.modules.apache'