actions, backups: Fix tests depending on sudo based actions

Tests:

- Mounting/unmounting of remote SSH repositories works.

- Creating repo, creating/deleting/list archives work.

- If a privileged method raises an exception after outputting to stdout (using
action_utils.run) then stdout is shown in the HTML UI message.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: Joseph Nuthalapati <njoseph@riseup.net>
This commit is contained in:
Sunil Mohan Adapa 2025-08-14 08:01:16 -07:00 committed by Joseph Nuthalapati
parent 944c427f44
commit 0c6f04b55f
No known key found for this signature in database
GPG Key ID: 5398F00A2FA43C35
5 changed files with 50 additions and 53 deletions

View File

@ -67,6 +67,9 @@ def privileged(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if getattr(func, '_skip_privileged_call', False):
return func(*args, **kwargs)
module_name = _get_privileged_action_module_name(func)
action_name = func.__name__
return run_privileged_method(func, module_name, action_name, args,

View File

@ -111,17 +111,6 @@ def fixture_needs_sudo():
pytest.skip('Needs sudo command installed.')
@pytest.fixture(name='no_privileged_server', scope='module')
def fixture_no_privileged__server():
"""Don't setup for and run privileged methods on server.
Tests on using privileged daemon are not yet implemented.
"""
with patch('plinth.actions.run_privileged_method_on_server') as mock:
mock.side_effect = NotImplementedError
yield
@pytest.fixture(scope='session')
def splinter_selenium_implicit_wait():
"""Disable implicit waiting."""
@ -164,9 +153,10 @@ def fixture_mock_privileged(request):
privileged_modules_to_mock = request.module.privileged_modules_to_mock
except AttributeError:
raise AttributeError(
'mock_privileged fixture requires "privileged_module_to_mock" '
'mock_privileged fixture requires "privileged_modules_to_mock" '
'attribute at module level')
mocked_methods = []
for module_name in privileged_modules_to_mock:
module = importlib.import_module(module_name)
for name, member in module.__dict__.items():
@ -177,19 +167,20 @@ def fixture_mock_privileged(request):
if not getattr(member, '_privileged', False):
continue
setattr(wrapped, '_original_wrapper', member)
module.__dict__[name] = wrapped
while getattr(wrapped, '__wrapped__', None) and getattr(
wrapped, '_privileged', True):
wrapped = getattr(wrapped, '__wrapped__', None)
setattr(wrapped, '_skip_privileged_call', True)
mocked_methods.append(wrapped)
yield
for module_name in privileged_modules_to_mock:
module = importlib.import_module(module_name)
for name, member in module.__dict__.items():
wrapper = getattr(member, '_original_wrapper', None)
if not callable(member) or not wrapper:
continue
module.__dict__[name] = wrapper
for mocked_method in mocked_methods:
try:
delattr(mocked_method, '_skip_privileged_call')
except AttributeError:
pass
@pytest.fixture(name='mock_run_as_user')

View File

@ -118,8 +118,8 @@ def reraise_known_errors(privileged_func):
def _reraise_known_errors(err):
"""Look whether the caught error is known and reraise it accordingly"""
stdout = getattr(err, 'stdout', b'').decode()
stderr = getattr(err, 'stderr', b'').decode()
stdout = (getattr(err, 'stdout') or b'').decode()
stderr = (getattr(err, 'stderr') or b'').decode()
caught_error = str((err, err.args, stdout, stderr))
for known_error in KNOWN_ERRORS:
for error in known_error['errors']:
@ -166,14 +166,14 @@ def mount(mountpoint: str, remote_path: str, ssh_keyfile: str | None = None,
cmd += ['-o', 'password_stdin']
input_ = password.encode()
subprocess.run(cmd, check=True, timeout=TIMEOUT, input=input_)
action_utils.run(cmd, check=True, timeout=TIMEOUT, input=input_)
@reraise_known_errors
@privileged
def umount(mountpoint: str):
"""Unmount a mountpoint."""
subprocess.run(['umount', mountpoint], check=True)
action_utils.run(['umount', mountpoint], check=True)
def _validate_mountpoint(mountpoint):
@ -515,4 +515,4 @@ def _get_env(encryption_passphrase: str | None = None):
def _run(cmd, encryption_passphrase=None, check=True, **kwargs):
"""Wrap the command with extra encryption passphrase handling."""
env = _get_env(encryption_passphrase)
return subprocess.run(cmd, check=check, env=env, **kwargs)
return action_utils.run(cmd, check=check, env=env, **kwargs)

View File

@ -16,7 +16,9 @@ from plinth.modules.backups.repository import BorgRepository, SshBorgRepository
from plinth.tests import config as test_config
pytestmark = pytest.mark.usefixtures('needs_root', 'needs_borg', 'load_cfg',
'no_privileged_server')
'mock_privileged')
privileged_modules_to_mock = ['plinth.modules.backups.privileged']
# try to access a non-existing url and a URL that exists but does not
# grant access

View File

@ -7,21 +7,17 @@ description of the expectations.
"""
import json
import os
import subprocess
import typing
from unittest.mock import Mock, call, patch
import pytest
from plinth import actions, cfg
from plinth import actions
from plinth.actions import privileged, secret_str
actions_name = 'actions'
pytestmark = pytest.mark.usefixtures('no_privileged_server')
@pytest.fixture(name='popen')
def fixture_popen():
@ -104,8 +100,11 @@ def test_privileged_argument_annotation_check():
privileged(func2_valid)
@patch('plinth.actions._read_from_server')
@patch('plinth.actions._request_to_server')
@patch('plinth.actions._get_privileged_action_module_name')
def test_privileged_method_call(get_module_name, popen):
def test_privileged_method_call(get_module_name, request_to_server,
read_from_server):
"""Test that privileged method calls the superuser action properly."""
def func_with_args(_a: int, _b: str, _c: int = 1, _d: str = 'dval',
@ -113,43 +112,45 @@ def test_privileged_method_call(get_module_name, popen):
return
get_module_name.return_value = 'tests'
popen.return_value = json.dumps({'result': 'success', 'return': 'bar'})
read_from_server.return_value = {"result": "success", "return": "bar"}
wrapped_func = privileged(func_with_args)
return_value = wrapped_func(1, 'bval', None, _d='dnewval')
assert return_value == 'bar'
input_ = {'args': [1, 'bval', None], 'kwargs': {'_d': 'dnewval'}}
input_ = json.dumps(input_)
write_fd = popen.called_with_write_fd[0]
close_from_fd = str(write_fd + 1)
popen.assert_has_calls([
call([
'sudo', '--non-interactive', '--close-from', close_from_fd,
cfg.actions_dir + '/actions', 'tests', 'func_with_args',
'--write-fd',
str(write_fd)
], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=False, pass_fds=[write_fd])
])
assert request_to_server.mock_calls == [
call({
'module': 'tests',
'action': 'func_with_args',
'args': (1, 'bval', None),
'kwargs': {
'_d': 'dnewval'
}
}),
call().close()
]
@patch('plinth.actions._read_from_server')
@patch('plinth.actions._request_to_server')
@patch('plinth.actions._get_privileged_action_module_name')
def test_privileged_method_exceptions(get_module_name, popen):
def test_privileged_method_exceptions(get_module_name, request_to_server,
read_from_server):
"""Test that exceptions on privileged methods are return properly."""
def func_with_exception():
raise TypeError('type error')
get_module_name.return_value = 'tests'
popen.return_value = json.dumps({
read_from_server.return_value = {
'result': 'exception',
'exception': {
'module': 'builtins',
'name': 'TypeError',
'args': ['type error'],
'traceback': ['']
'traceback': [''],
'stdout': '',
'stderr': ''
}
})
}
wrapped_func = privileged(func_with_exception)
with pytest.raises(TypeError, match='type error'):
wrapped_func()