diff --git a/plinth/actions.py b/plinth/actions.py index d2e4503c8..0f195da73 100644 --- a/plinth/actions.py +++ b/plinth/actions.py @@ -67,6 +67,9 @@ def privileged(func): @functools.wraps(func) def wrapper(*args, **kwargs): + if getattr(func, '_skip_privileged_call', False): + return func(*args, **kwargs) + module_name = _get_privileged_action_module_name(func) action_name = func.__name__ return run_privileged_method(func, module_name, action_name, args, diff --git a/plinth/conftest.py b/plinth/conftest.py index 3c56a33fe..dee221380 100644 --- a/plinth/conftest.py +++ b/plinth/conftest.py @@ -111,17 +111,6 @@ def fixture_needs_sudo(): pytest.skip('Needs sudo command installed.') -@pytest.fixture(name='no_privileged_server', scope='module') -def fixture_no_privileged__server(): - """Don't setup for and run privileged methods on server. - - Tests on using privileged daemon are not yet implemented. - """ - with patch('plinth.actions.run_privileged_method_on_server') as mock: - mock.side_effect = NotImplementedError - yield - - @pytest.fixture(scope='session') def splinter_selenium_implicit_wait(): """Disable implicit waiting.""" @@ -164,9 +153,10 @@ def fixture_mock_privileged(request): privileged_modules_to_mock = request.module.privileged_modules_to_mock except AttributeError: raise AttributeError( - 'mock_privileged fixture requires "privileged_module_to_mock" ' + 'mock_privileged fixture requires "privileged_modules_to_mock" ' 'attribute at module level') + mocked_methods = [] for module_name in privileged_modules_to_mock: module = importlib.import_module(module_name) for name, member in module.__dict__.items(): @@ -177,19 +167,20 @@ def fixture_mock_privileged(request): if not getattr(member, '_privileged', False): continue - setattr(wrapped, '_original_wrapper', member) - module.__dict__[name] = wrapped + while getattr(wrapped, '__wrapped__', None) and getattr( + wrapped, '_privileged', True): + wrapped = getattr(wrapped, '__wrapped__', None) + + setattr(wrapped, '_skip_privileged_call', True) + mocked_methods.append(wrapped) yield - for module_name in privileged_modules_to_mock: - module = importlib.import_module(module_name) - for name, member in module.__dict__.items(): - wrapper = getattr(member, '_original_wrapper', None) - if not callable(member) or not wrapper: - continue - - module.__dict__[name] = wrapper + for mocked_method in mocked_methods: + try: + delattr(mocked_method, '_skip_privileged_call') + except AttributeError: + pass @pytest.fixture(name='mock_run_as_user') diff --git a/plinth/modules/backups/privileged.py b/plinth/modules/backups/privileged.py index c010c5f23..ced04bc8c 100644 --- a/plinth/modules/backups/privileged.py +++ b/plinth/modules/backups/privileged.py @@ -118,8 +118,8 @@ def reraise_known_errors(privileged_func): def _reraise_known_errors(err): """Look whether the caught error is known and reraise it accordingly""" - stdout = getattr(err, 'stdout', b'').decode() - stderr = getattr(err, 'stderr', b'').decode() + stdout = (getattr(err, 'stdout') or b'').decode() + stderr = (getattr(err, 'stderr') or b'').decode() caught_error = str((err, err.args, stdout, stderr)) for known_error in KNOWN_ERRORS: for error in known_error['errors']: @@ -166,14 +166,14 @@ def mount(mountpoint: str, remote_path: str, ssh_keyfile: str | None = None, cmd += ['-o', 'password_stdin'] input_ = password.encode() - subprocess.run(cmd, check=True, timeout=TIMEOUT, input=input_) + action_utils.run(cmd, check=True, timeout=TIMEOUT, input=input_) @reraise_known_errors @privileged def umount(mountpoint: str): """Unmount a mountpoint.""" - subprocess.run(['umount', mountpoint], check=True) + action_utils.run(['umount', mountpoint], check=True) def _validate_mountpoint(mountpoint): @@ -515,4 +515,4 @@ def _get_env(encryption_passphrase: str | None = None): def _run(cmd, encryption_passphrase=None, check=True, **kwargs): """Wrap the command with extra encryption passphrase handling.""" env = _get_env(encryption_passphrase) - return subprocess.run(cmd, check=check, env=env, **kwargs) + return action_utils.run(cmd, check=check, env=env, **kwargs) diff --git a/plinth/modules/backups/tests/test_backups.py b/plinth/modules/backups/tests/test_backups.py index ea5002ad9..b6d3aa707 100644 --- a/plinth/modules/backups/tests/test_backups.py +++ b/plinth/modules/backups/tests/test_backups.py @@ -16,7 +16,9 @@ from plinth.modules.backups.repository import BorgRepository, SshBorgRepository from plinth.tests import config as test_config pytestmark = pytest.mark.usefixtures('needs_root', 'needs_borg', 'load_cfg', - 'no_privileged_server') + 'mock_privileged') + +privileged_modules_to_mock = ['plinth.modules.backups.privileged'] # try to access a non-existing url and a URL that exists but does not # grant access diff --git a/plinth/tests/test_actions.py b/plinth/tests/test_actions.py index fc3f045f3..78cb4dd7e 100644 --- a/plinth/tests/test_actions.py +++ b/plinth/tests/test_actions.py @@ -7,21 +7,17 @@ description of the expectations. """ -import json import os -import subprocess import typing from unittest.mock import Mock, call, patch import pytest -from plinth import actions, cfg +from plinth import actions from plinth.actions import privileged, secret_str actions_name = 'actions' -pytestmark = pytest.mark.usefixtures('no_privileged_server') - @pytest.fixture(name='popen') def fixture_popen(): @@ -104,8 +100,11 @@ def test_privileged_argument_annotation_check(): privileged(func2_valid) +@patch('plinth.actions._read_from_server') +@patch('plinth.actions._request_to_server') @patch('plinth.actions._get_privileged_action_module_name') -def test_privileged_method_call(get_module_name, popen): +def test_privileged_method_call(get_module_name, request_to_server, + read_from_server): """Test that privileged method calls the superuser action properly.""" def func_with_args(_a: int, _b: str, _c: int = 1, _d: str = 'dval', @@ -113,43 +112,45 @@ def test_privileged_method_call(get_module_name, popen): return get_module_name.return_value = 'tests' - popen.return_value = json.dumps({'result': 'success', 'return': 'bar'}) + read_from_server.return_value = {"result": "success", "return": "bar"} wrapped_func = privileged(func_with_args) return_value = wrapped_func(1, 'bval', None, _d='dnewval') assert return_value == 'bar' - - input_ = {'args': [1, 'bval', None], 'kwargs': {'_d': 'dnewval'}} - input_ = json.dumps(input_) - write_fd = popen.called_with_write_fd[0] - close_from_fd = str(write_fd + 1) - popen.assert_has_calls([ - call([ - 'sudo', '--non-interactive', '--close-from', close_from_fd, - cfg.actions_dir + '/actions', 'tests', 'func_with_args', - '--write-fd', - str(write_fd) - ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, shell=False, pass_fds=[write_fd]) - ]) + assert request_to_server.mock_calls == [ + call({ + 'module': 'tests', + 'action': 'func_with_args', + 'args': (1, 'bval', None), + 'kwargs': { + '_d': 'dnewval' + } + }), + call().close() + ] +@patch('plinth.actions._read_from_server') +@patch('plinth.actions._request_to_server') @patch('plinth.actions._get_privileged_action_module_name') -def test_privileged_method_exceptions(get_module_name, popen): +def test_privileged_method_exceptions(get_module_name, request_to_server, + read_from_server): """Test that exceptions on privileged methods are return properly.""" def func_with_exception(): raise TypeError('type error') get_module_name.return_value = 'tests' - popen.return_value = json.dumps({ + read_from_server.return_value = { 'result': 'exception', 'exception': { 'module': 'builtins', 'name': 'TypeError', 'args': ['type error'], - 'traceback': [''] + 'traceback': [''], + 'stdout': '', + 'stderr': '' } - }) + } wrapped_func = privileged(func_with_exception) with pytest.raises(TypeError, match='type error'): wrapped_func()