mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
Tests: - If there is a syntax error in communication with privileged server. 'stdout' and 'stderr' keys are present in 'exception' dictionary of the reply. - If there is a error in the privileged method in communication with privileged server. 'stdout' and 'stderr' keys are present in 'exception' dictionary of the reply. The values are filled with output of the command that have been run. - If a privileged method uses action_utils.run, then raising an exception in the method shows proper stdout and stderr in the UI HTML message. Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: Joseph Nuthalapati <njoseph@riseup.net>
301 lines
11 KiB
Python
301 lines
11 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""
|
|
Test module for key/value store.
|
|
"""
|
|
|
|
import json
|
|
import pathlib
|
|
import subprocess
|
|
from unittest.mock import Mock, call, patch
|
|
|
|
import pytest
|
|
|
|
from plinth.action_utils import (get_addresses, get_hostname,
|
|
is_systemd_running, move_uploaded_file, run,
|
|
run_as_user, service_action, service_disable,
|
|
service_enable, service_is_enabled,
|
|
service_is_running, service_reload,
|
|
service_restart, service_start, service_stop,
|
|
service_try_reload_or_restart,
|
|
service_try_restart, service_unmask)
|
|
|
|
UNKNOWN = 'unknowndeamon'
|
|
|
|
systemctl_path = pathlib.Path('/usr/bin/systemctl')
|
|
systemd_installed = pytest.mark.skipif(not systemctl_path.exists(),
|
|
reason='systemd not available')
|
|
|
|
ip_path = pathlib.Path('/usr/bin/ip')
|
|
ip_installed = pytest.mark.skipif(not ip_path.exists(),
|
|
reason='ip command not available')
|
|
|
|
|
|
@patch('os.path.exists')
|
|
def test_is_systemd_running(mock):
|
|
"""Trivial white box test for a trivial implementation."""
|
|
mock.return_value = True
|
|
assert is_systemd_running()
|
|
mock.return_value = False
|
|
assert not is_systemd_running()
|
|
|
|
|
|
@systemd_installed
|
|
def test_service_checks():
|
|
"""Test basic checks on status of an arbitrary service."""
|
|
assert not service_is_running(UNKNOWN)
|
|
assert not service_is_enabled(UNKNOWN)
|
|
|
|
# expected is best if: generic. Alternatives: systemd-sysctl, logrotate
|
|
expected = 'networking'
|
|
if not service_is_running(expected):
|
|
pytest.skip(f'Needs service {expected} running.')
|
|
|
|
assert service_is_enabled(expected)
|
|
|
|
|
|
@pytest.mark.usefixtures('needs_root')
|
|
@systemd_installed
|
|
def test_service_enable_and_disable():
|
|
"""Test enabling and disabling of an arbitrary service."""
|
|
# service is best if: non-essential part of FreedomBox that restarts fast
|
|
service = 'unattended-upgrades'
|
|
if not service_is_enabled(service):
|
|
reason = f'Needs service {service} enabled.'
|
|
pytest.skip(reason)
|
|
|
|
service_disable(service)
|
|
assert not service_is_running(service)
|
|
service_enable(service)
|
|
assert service_is_running(service)
|
|
|
|
# Ignore unknown services, don't fail:
|
|
service_disable(UNKNOWN)
|
|
service_enable(UNKNOWN)
|
|
|
|
|
|
@patch('plinth.action_utils.service_action')
|
|
@systemd_installed
|
|
def test_service_actions(mock):
|
|
"""Trivial white box test for trivial implementations."""
|
|
service_start(UNKNOWN)
|
|
mock.assert_called_with(UNKNOWN, 'start', check=False)
|
|
service_stop(UNKNOWN)
|
|
mock.assert_called_with(UNKNOWN, 'stop', check=False)
|
|
service_restart(UNKNOWN)
|
|
mock.assert_called_with(UNKNOWN, 'restart', check=False)
|
|
service_try_restart(UNKNOWN)
|
|
mock.assert_called_with(UNKNOWN, 'try-restart', check=False)
|
|
service_reload(UNKNOWN)
|
|
mock.assert_called_with(UNKNOWN, 'reload', check=False)
|
|
service_try_reload_or_restart(UNKNOWN)
|
|
mock.assert_called_with(UNKNOWN, 'try-reload-or-restart', check=False)
|
|
|
|
|
|
@pytest.mark.usefixtures('needs_root')
|
|
@systemd_installed
|
|
def test_service_unmask():
|
|
"""Test unmasking of an arbitrary masked service."""
|
|
|
|
def is_masked(service):
|
|
process = subprocess.run([
|
|
'systemctl', 'list-unit-files', '--output=json',
|
|
service + '.service'
|
|
], stdout=subprocess.PIPE, check=False)
|
|
output = json.loads(process.stdout)
|
|
return output[0]['state'] == 'masked' if output else False
|
|
|
|
# SERVICE is best if: part of FreedomBox, so we can mess with least risk.
|
|
service = 'samba-ad-dc'
|
|
if not is_masked(service):
|
|
pytest.skip(f'Needs service {service} masked.')
|
|
|
|
service_unmask(service)
|
|
assert not is_masked(service)
|
|
|
|
service_action(service, 'mask')
|
|
assert is_masked(service)
|
|
|
|
|
|
def test_get_hostname():
|
|
"""get_hostname returns a string.
|
|
|
|
In fact, the maximum length for a hostname is 253 characters, but
|
|
anything longer than 80 is very suspicious, so we fail the test.
|
|
|
|
To avoid error messages pass as hostnames we seek and fail if we find
|
|
some unexpected characters.
|
|
"""
|
|
hostname = get_hostname()
|
|
assert hostname
|
|
assert isinstance(hostname, str)
|
|
assert len(hostname) < 80
|
|
for char in ' ,:;!?=$%&@*+()[]{}<>"\'':
|
|
assert char not in hostname
|
|
|
|
|
|
@ip_installed
|
|
def test_get_addresses():
|
|
"""Test that any FreedomBox has some addresses."""
|
|
ips = get_addresses()
|
|
assert len(ips) > 3 # min: ip, 2x'localhost', hostname
|
|
for address in ips:
|
|
assert address['kind'] in ('4', '6')
|
|
|
|
|
|
@pytest.fixture(name='upload_dir')
|
|
def fixture_update_dir(tmp_path):
|
|
"""Patch Django file upload directory."""
|
|
tmp_path /= 'source'
|
|
tmp_path.mkdir()
|
|
|
|
import plinth.settings
|
|
old_value = plinth.settings.FILE_UPLOAD_TEMP_DIR
|
|
plinth.settings.FILE_UPLOAD_TEMP_DIR = tmp_path
|
|
yield tmp_path
|
|
plinth.settings.FILE_UPLOAD_TEMP_DIR = old_value
|
|
|
|
|
|
def test_move_uploaded_file(tmp_path, upload_dir):
|
|
"""Test moving Django uploaded file to destination directory."""
|
|
tmp_path /= 'destination'
|
|
tmp_path.mkdir()
|
|
|
|
# Source file does not exist
|
|
source = tmp_path / 'does-non-exist'
|
|
destination = tmp_path / 'destination'
|
|
destination_file_name = 'destination-file-name'
|
|
with pytest.raises(FileNotFoundError):
|
|
move_uploaded_file(source, destination, destination_file_name)
|
|
|
|
# Source is not a file
|
|
source = tmp_path / 'source-dir'
|
|
source.mkdir()
|
|
with pytest.raises(ValueError, match='Source is not a file'):
|
|
move_uploaded_file(source, destination, destination_file_name)
|
|
|
|
# Source is not in expected temporary upload directory
|
|
source = tmp_path / 'source-file'
|
|
source.touch()
|
|
with pytest.raises(
|
|
ValueError,
|
|
match='Uploaded file is not in expected temp directory'):
|
|
move_uploaded_file(source, destination, destination_file_name)
|
|
|
|
# Destination does not exist
|
|
source = upload_dir / 'source-file'
|
|
source.touch()
|
|
with pytest.raises(ValueError, match='Destination is not a directory'):
|
|
move_uploaded_file(source, destination, destination_file_name)
|
|
|
|
# Destination is not a file
|
|
destination.touch()
|
|
with pytest.raises(ValueError, match='Destination is not a directory'):
|
|
move_uploaded_file(source, destination, destination_file_name)
|
|
|
|
# Destination file name is a multi-component path
|
|
destination.unlink()
|
|
destination.mkdir()
|
|
destination_file_name = '../destination-file-name'
|
|
with pytest.raises(ValueError, match='Invalid destination file name'):
|
|
move_uploaded_file(source, destination, destination_file_name)
|
|
|
|
# Destination file exists and override is not allowed
|
|
destination_file_name = 'destination-file-exists'
|
|
(destination / destination_file_name).touch()
|
|
with pytest.raises(FileExistsError, match='Destination already exists'):
|
|
move_uploaded_file(source, destination, destination_file_name)
|
|
|
|
with pytest.raises(FileExistsError, match='Destination already exists'):
|
|
move_uploaded_file(source, destination, destination_file_name,
|
|
allow_overwrite=False)
|
|
|
|
# Successful move
|
|
with patch('shutil.chown') as chown:
|
|
destination_file = destination / destination_file_name
|
|
destination_file.unlink()
|
|
source.write_text('x-contents-1')
|
|
move_uploaded_file(source, destination, destination_file_name)
|
|
chown.mock_calls = [call(destination_file, 'root', 'root')]
|
|
assert destination_file.stat().st_mode & 0o777 == 0o644
|
|
assert destination_file.read_text() == 'x-contents-1'
|
|
assert not source.exists()
|
|
|
|
chown.reset_mock()
|
|
source.write_text('x-contents-2')
|
|
move_uploaded_file(source, destination, destination_file_name,
|
|
allow_overwrite=True, user='x-user',
|
|
group='x-group', permissions=0o600)
|
|
chown.mock_calls = [call(destination_file, 'x-user', 'x-group')]
|
|
assert destination_file.stat().st_mode & 0o777 == 0o600
|
|
assert destination_file.read_text() == 'x-contents-2'
|
|
assert not source.exists()
|
|
|
|
|
|
@patch('subprocess.run')
|
|
def test_run_as_user(subprocess_run):
|
|
"""Test running a command as another user works."""
|
|
subprocess_run.return_value = 'test-return-value'
|
|
return_value = run_as_user(['command', 'arg1', '--foo'],
|
|
username='foouser', stdout=subprocess.PIPE,
|
|
check=True)
|
|
assert return_value == 'test-return-value'
|
|
assert subprocess_run.mock_calls == [
|
|
call(
|
|
['runuser', '--user', 'foouser', '--', 'command', 'arg1', '--foo'],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
|
|
]
|
|
|
|
|
|
@patch('plinth.actions.thread_storage')
|
|
@patch('subprocess.run')
|
|
def test_run_capture(subprocess_run, thread_storage):
|
|
"""Test running a command with stdin/stdout capture works."""
|
|
thread_storage.stdout = 'initial-stdout'
|
|
thread_storage.stderr = 'initial-stderr'
|
|
|
|
subprocess_run.return_value = Mock()
|
|
subprocess_run.return_value.stdout = 'test-stdout'
|
|
subprocess_run.return_value.stderr = 'test-stderr'
|
|
|
|
return_value = run(['command', 'arg1', '--foo'], check=True)
|
|
assert return_value == subprocess_run.return_value
|
|
assert subprocess_run.mock_calls == [
|
|
call(['command', 'arg1', '--foo'], stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE, check=True)
|
|
]
|
|
assert thread_storage.stdout == 'initial-stdouttest-stdout'
|
|
assert thread_storage.stderr == 'initial-stderrtest-stderr'
|
|
|
|
|
|
@patch('plinth.actions.thread_storage')
|
|
@patch('subprocess.run')
|
|
def test_run_no_capture(subprocess_run, thread_storage):
|
|
"""Test running a command without stdin/stdout capture works."""
|
|
thread_storage.stdout = 'initial-stdout'
|
|
thread_storage.stderr = 'initial-stderr'
|
|
|
|
subprocess_run.return_value = Mock()
|
|
subprocess_run.return_value.stdout = 'test-stdout'
|
|
subprocess_run.return_value.stderr = 'test-stderr'
|
|
|
|
return_value = run(['command', 'arg1', '--foo'], stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE, check=True)
|
|
assert return_value == subprocess_run.return_value
|
|
assert subprocess_run.mock_calls == [
|
|
call(['command', 'arg1', '--foo'], stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE, check=True)
|
|
]
|
|
assert thread_storage.stdout == 'initial-stdout'
|
|
assert thread_storage.stderr == 'initial-stderr'
|
|
|
|
|
|
@patch('plinth.actions.thread_storage', None)
|
|
@patch('subprocess.run')
|
|
def test_run_no_storage(subprocess_run):
|
|
"""Test running a command without thread storage."""
|
|
subprocess_run.return_value = Mock()
|
|
subprocess_run.return_value.stdout = 'test-stdout'
|
|
subprocess_run.return_value.stderr = 'test-stderr'
|
|
|
|
run(['command', 'arg1', '--foo'], check=True)
|