mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
diagnostics: Refactor check IDs, tests and background checks
- Ensure that each diagnostic test category can be identified by easy prefix matching on the test ID. - Give a different unique IDs each different kind of test. More specific tests of a type get a different kind of ID. - Make comparison of diagnostic test results in test cases more comprehensive. - Simplify code that shows the number if issues identified. - In many languages, there is complex logic to write plural forms. Plurals can't be handled by assuming singular = 1 item and plural is > 1. Translation of messages in Notification does not support plurals properly. Avoid this for now by using sometimes incorrect plural form. - For i18n we should avoid joining phrases/words. Words don't always maintain order after translation. - Notify about the total number of issues in diagnostics and not just the most severe category. This is likely to draw more attention and avoid i18n complexity. - Dismiss the diagnostic notification if the latest run succeeded completely. Tests: - Unit tests pass. - Diagnostics for following apps works: networks (drop-in config), apache (daemon, listen address, internal firewall, external firewall), tor (netcat), torproxy (internal only firewall, torproxy url, torproxy using tor), privoxy (privoxy url, package available, package latest), - Untested: Is release file available method in upgrades app. Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
parent
a233bbfd9b
commit
465e452daf
@ -115,7 +115,7 @@ class DropinConfigs(app_module.FollowerComponent):
|
||||
result = (etc_path.is_symlink()
|
||||
and etc_path.readlink() == target)
|
||||
|
||||
check_id = f'config-{etc_path}'
|
||||
check_id = f'dropin-config-{etc_path}'
|
||||
result_string = Result.PASSED if result else Result.FAILED
|
||||
template = _('Static configuration {etc_path} is setup properly')
|
||||
description = format_lazy(template, etc_path=str(etc_path))
|
||||
|
||||
@ -101,7 +101,7 @@ class Daemon(app.LeaderComponent):
|
||||
"""Check if a daemon is running."""
|
||||
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
|
||||
|
||||
check_id = f'daemon-{self.unit}-running'
|
||||
check_id = f'daemon-running-{self.unit}'
|
||||
result = Result.PASSED if self.is_running() else Result.FAILED
|
||||
|
||||
template = gettext_lazy('Service {service_name} is running')
|
||||
@ -159,13 +159,13 @@ def diagnose_port_listening(port, kind='tcp', listen_address=None):
|
||||
result = _check_port(port, kind, listen_address)
|
||||
|
||||
if listen_address:
|
||||
check_id = f'daemon-{kind}-{port}-{listen_address}'
|
||||
check_id = f'daemon-listening-address-{kind}-{port}-{listen_address}'
|
||||
template = gettext_lazy(
|
||||
'Listening on {kind} port {listen_address}:{port}')
|
||||
description = format_lazy(template, kind=kind,
|
||||
listen_address=listen_address, port=port)
|
||||
else:
|
||||
check_id = f'daemon-{kind}-{port}'
|
||||
check_id = f'daemon-listening-{kind}-{port}'
|
||||
template = gettext_lazy('Listening on {kind} port {port}')
|
||||
description = format_lazy(template, kind=kind, port=port)
|
||||
|
||||
@ -228,21 +228,16 @@ def diagnose_netcat(host, port, input='', negate=False):
|
||||
stderr=subprocess.PIPE)
|
||||
process.communicate(input=input.encode())
|
||||
if process.returncode != 0:
|
||||
result = Result.FAILED
|
||||
if negate:
|
||||
result = Result.PASSED
|
||||
|
||||
result = Result.FAILED if not negate else Result.PASSED
|
||||
else:
|
||||
result = Result.PASSED
|
||||
if negate:
|
||||
result = Result.FAILED
|
||||
|
||||
result = Result.PASSED if not negate else Result.FAILED
|
||||
except Exception:
|
||||
result = Result.FAILED
|
||||
|
||||
check_id = f'daemon-netcat-{host}-{port}'
|
||||
description = _('Connect to {host}:{port}')
|
||||
if negate:
|
||||
check_id = f'daemon-netcat-negate-{host}-{port}'
|
||||
description = _('Cannot connect to {host}:{port}')
|
||||
|
||||
return DiagnosticCheck(check_id, description.format(host=host, port=port),
|
||||
|
||||
@ -8,7 +8,7 @@ from django.utils.text import format_lazy
|
||||
from django.utils.translation import gettext_lazy
|
||||
|
||||
from plinth import action_utils, app
|
||||
from plinth.modules.diagnostics.check import DiagnosticCheck
|
||||
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
|
||||
from plinth.privileged import service as service_privileged
|
||||
|
||||
from . import privileged
|
||||
@ -142,11 +142,15 @@ def diagnose_url(url, kind=None, env=None, check_certificate=True,
|
||||
|
||||
Kind can be '4' for IPv4 or '6' for IPv6.
|
||||
"""
|
||||
result = check_url(url, kind, env, check_certificate, extra_options,
|
||||
wrapper, expected_output)
|
||||
try:
|
||||
return_value = check_url(url, kind, env, check_certificate,
|
||||
extra_options, wrapper, expected_output)
|
||||
result = Result.PASSED if return_value else Result.FAILED
|
||||
except FileNotFoundError:
|
||||
result = Result.ERROR
|
||||
|
||||
if kind:
|
||||
check_id = f'apache-url-{url}-{kind}'
|
||||
check_id = f'apache-url-kind-{url}-{kind}'
|
||||
template = gettext_lazy('Access URL {url} on tcp{kind}')
|
||||
description = format_lazy(template, url=url, kind=kind)
|
||||
else:
|
||||
@ -201,15 +205,13 @@ def check_url(url, kind=None, env=None, check_certificate=True,
|
||||
process = subprocess.run(command, env=env, check=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
result = 'passed'
|
||||
result = True
|
||||
if expected_output and expected_output not in process.stdout.decode():
|
||||
result = 'failed'
|
||||
result = False
|
||||
except subprocess.CalledProcessError as exception:
|
||||
result = 'failed'
|
||||
result = False
|
||||
# Authorization failed is a success
|
||||
if exception.stdout.decode().strip() in ('401', '405'):
|
||||
result = 'passed'
|
||||
except FileNotFoundError:
|
||||
result = 'error'
|
||||
result = True
|
||||
|
||||
return result
|
||||
|
||||
@ -12,6 +12,7 @@ from plinth import app
|
||||
from plinth.modules.apache.components import (Uwsgi, Webserver, check_url,
|
||||
diagnose_url,
|
||||
diagnose_url_on_all)
|
||||
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
|
||||
|
||||
|
||||
def test_webserver_init():
|
||||
@ -241,19 +242,21 @@ def test_diagnose_url(get_addresses, check):
|
||||
'wrapper': 'test-wrapper',
|
||||
'expected_output': 'test-expected'
|
||||
}
|
||||
check.return_value = 'passed'
|
||||
check.return_value = True
|
||||
result = diagnose_url(**args)
|
||||
assert result.description == 'Access URL https://localhost/test on tcp4'
|
||||
assert result.result == 'passed'
|
||||
assert result == DiagnosticCheck(
|
||||
'apache-url-kind-https://localhost/test-4',
|
||||
'Access URL https://localhost/test on tcp4', Result.PASSED)
|
||||
|
||||
check.return_value = 'failed'
|
||||
check.return_value = False
|
||||
result = diagnose_url(**args)
|
||||
assert result.description == 'Access URL https://localhost/test on tcp4'
|
||||
assert result.result == 'failed'
|
||||
assert result == DiagnosticCheck(
|
||||
'apache-url-kind-https://localhost/test-4',
|
||||
'Access URL https://localhost/test on tcp4', Result.FAILED)
|
||||
|
||||
del args['kind']
|
||||
args['url'] = 'https://{host}/test'
|
||||
check.return_value = 'passed'
|
||||
check.return_value = True
|
||||
get_addresses.return_value = [{
|
||||
'kind': '4',
|
||||
'address': 'test-host-1',
|
||||
@ -265,13 +268,15 @@ def test_diagnose_url(get_addresses, check):
|
||||
'numeric': False,
|
||||
'url_address': 'test-host-2'
|
||||
}]
|
||||
result = diagnose_url_on_all(**args)
|
||||
assert result[
|
||||
0].description == 'Access URL https://test-host-1/test on tcp4'
|
||||
assert result[0].result == 'passed'
|
||||
assert result[
|
||||
1].description == 'Access URL https://test-host-2/test on tcp6'
|
||||
assert result[1].result == 'passed'
|
||||
results = diagnose_url_on_all(**args)
|
||||
assert results == [
|
||||
DiagnosticCheck('apache-url-kind-https://test-host-1/test-4',
|
||||
'Access URL https://test-host-1/test on tcp4',
|
||||
Result.PASSED),
|
||||
DiagnosticCheck('apache-url-kind-https://test-host-2/test-6',
|
||||
'Access URL https://test-host-2/test on tcp6',
|
||||
Result.PASSED),
|
||||
]
|
||||
|
||||
|
||||
@patch('subprocess.run')
|
||||
@ -282,7 +287,7 @@ def test_check_url(run):
|
||||
extra_args = {'env': None, 'check': True, 'stdout': -1, 'stderr': -1}
|
||||
|
||||
# Basic
|
||||
assert check_url(url) == 'passed'
|
||||
assert check_url(url)
|
||||
run.assert_called_with(basic_command + [url], **extra_args)
|
||||
|
||||
# Wrapper
|
||||
@ -315,15 +320,16 @@ def test_check_url(run):
|
||||
exception = subprocess.CalledProcessError(returncode=1, cmd=['curl'])
|
||||
run.side_effect = exception
|
||||
run.side_effect.stdout = b'500'
|
||||
assert check_url(url) == 'failed'
|
||||
assert not check_url(url)
|
||||
|
||||
# Return code 401, 405
|
||||
run.side_effect = exception
|
||||
run.side_effect.stdout = b' 401 '
|
||||
assert check_url(url) == 'passed'
|
||||
assert check_url(url)
|
||||
run.side_effect.stdout = b'405\n'
|
||||
assert check_url(url) == 'passed'
|
||||
assert check_url(url)
|
||||
|
||||
# Error
|
||||
run.side_effect = FileNotFoundError()
|
||||
assert check_url(url) == 'error'
|
||||
with pytest.raises(FileNotFoundError):
|
||||
assert check_url(url)
|
||||
|
||||
@ -263,11 +263,16 @@ def _warn_about_low_ram_space(request):
|
||||
|
||||
def _start_background_diagnostics(request):
|
||||
"""Start daily diagnostics as a background operation."""
|
||||
operation = operation_module.manager.new(
|
||||
op_id='diagnostics-full', app_id='diagnostics',
|
||||
name=gettext_noop('Running background diagnostics'),
|
||||
target=_run_background_diagnostics, [], show_message=False,
|
||||
show_notification=False)
|
||||
try:
|
||||
operation = operation_module.manager.new(
|
||||
op_id='diagnostics-full', app_id='diagnostics',
|
||||
name=gettext_noop('Running background diagnostics'),
|
||||
target=_run_background_diagnostics, [], show_message=False,
|
||||
show_notification=False)
|
||||
except KeyError:
|
||||
logger.warning('Diagnostics are already running')
|
||||
return
|
||||
|
||||
operation.join()
|
||||
|
||||
|
||||
@ -294,71 +299,34 @@ def _run_background_diagnostics():
|
||||
with running_task_lock:
|
||||
running_task = None
|
||||
|
||||
exception_count = 0
|
||||
error_count = 0
|
||||
failure_count = 0
|
||||
warning_count = 0
|
||||
issue_count = 0
|
||||
severity = 'warning'
|
||||
for _app_id, app_data in results.items():
|
||||
if app_data['exception']:
|
||||
exception_count += 1
|
||||
continue
|
||||
|
||||
for check in app_data['diagnosis']:
|
||||
if check.result == Result.ERROR:
|
||||
error_count += 1
|
||||
elif check.result == Result.FAILED:
|
||||
failure_count += 1
|
||||
elif check.result == Result.WARNING:
|
||||
warning_count += 1
|
||||
|
||||
notification_id = 'diagnostics-background'
|
||||
if exception_count > 0:
|
||||
severity = 'error'
|
||||
issue_count = exception_count
|
||||
if exception_count > 1:
|
||||
issue_type = 'translate:exceptions'
|
||||
issue_count += 1
|
||||
severity = 'error'
|
||||
else:
|
||||
issue_type = 'translate:exception'
|
||||
for check in app_data['diagnosis']:
|
||||
if check.result != Result.PASSED:
|
||||
if check.result != Result.WARNING:
|
||||
severity = 'error'
|
||||
|
||||
elif error_count > 0:
|
||||
severity = 'error'
|
||||
issue_count = error_count
|
||||
if error_count > 1:
|
||||
issue_type = 'translate:errors'
|
||||
else:
|
||||
issue_type = 'translate:error'
|
||||
issue_count += 1
|
||||
|
||||
elif failure_count > 0:
|
||||
severity = 'error'
|
||||
issue_count = failure_count
|
||||
if failure_count > 1:
|
||||
issue_type = 'translate:failures'
|
||||
else:
|
||||
issue_type = 'translate:failure'
|
||||
if not issue_count:
|
||||
# Remove any previous notifications if there are no issues.
|
||||
try:
|
||||
Notification.get('diagnostics-background').delete()
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
elif warning_count > 0:
|
||||
severity = 'warning'
|
||||
issue_count = warning_count
|
||||
if warning_count > 1:
|
||||
issue_type = 'translate:warnings'
|
||||
else:
|
||||
issue_type = 'translate:warning'
|
||||
|
||||
else:
|
||||
# Don't display a notification if there are no issues.
|
||||
return
|
||||
|
||||
message = gettext_noop(
|
||||
# xgettext:no-python-format
|
||||
'Background diagnostics completed with {issue_count} {issue_type}')
|
||||
title = gettext_noop(
|
||||
# xgettext:no-python-format
|
||||
'Background diagnostics results')
|
||||
data = {
|
||||
'app_icon': 'fa-heartbeat',
|
||||
'issue_count': issue_count,
|
||||
'issue_type': issue_type,
|
||||
}
|
||||
'Found {issue_count} issues during routine tests.')
|
||||
title = gettext_noop('Diagnostics results')
|
||||
data = {'app_icon': 'fa-heartbeat', 'issue_count': issue_count}
|
||||
actions = [{
|
||||
'type': 'link',
|
||||
'class': 'primary',
|
||||
@ -367,7 +335,7 @@ def _run_background_diagnostics():
|
||||
}, {
|
||||
'type': 'dismiss'
|
||||
}]
|
||||
note = Notification.update_or_create(id=notification_id,
|
||||
note = Notification.update_or_create(id='diagnostics-background',
|
||||
app_id='diagnostics',
|
||||
severity=severity, title=title,
|
||||
message=message, actions=actions,
|
||||
|
||||
@ -1,7 +1,5 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""
|
||||
Diagnostic check data type
|
||||
"""
|
||||
"""Diagnostic check data type."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
@ -16,7 +14,6 @@ class Result(StrEnum):
|
||||
ERROR = 'error'
|
||||
|
||||
|
||||
# TODO: Add parameters for test.
|
||||
# TODO: Description should not be translated until we need to display it.
|
||||
|
||||
|
||||
|
||||
30
plinth/modules/diagnostics/tests/test_check.py
Normal file
30
plinth/modules/diagnostics/tests/test_check.py
Normal file
@ -0,0 +1,30 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""Tests for diagnostic check data type."""
|
||||
|
||||
import pytest
|
||||
|
||||
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
|
||||
|
||||
|
||||
def test_result():
|
||||
"""Test result enum type."""
|
||||
assert Result.__members__['ERROR'].name == 'ERROR'
|
||||
assert Result.__members__['ERROR'].value == 'error'
|
||||
assert Result.NOT_DONE == 'not_done'
|
||||
assert Result.PASSED == 'passed'
|
||||
assert Result.WARNING == 'warning'
|
||||
assert Result.FAILED == 'failed'
|
||||
assert Result.ERROR == 'error'
|
||||
|
||||
|
||||
def test_diagnostic_check():
|
||||
"""Test the diagnostic check data class."""
|
||||
with pytest.raises(TypeError):
|
||||
DiagnosticCheck()
|
||||
|
||||
check = DiagnosticCheck('some-check-id', 'sample check')
|
||||
assert check.check_id == 'some-check-id'
|
||||
assert check.description == 'sample check'
|
||||
assert check.result == Result.NOT_DONE
|
||||
check = DiagnosticCheck('some-check-id', 'sample check', Result.PASSED)
|
||||
assert check.result == Result.PASSED
|
||||
@ -131,7 +131,7 @@ class Firewall(app.FollowerComponent):
|
||||
for port_number, protocol in port_detail['details']))
|
||||
|
||||
# Internal zone
|
||||
check_id = f'firewall-port-{port}-internal'
|
||||
check_id = f'firewall-port-internal-{port}'
|
||||
result = Result.PASSED if port in internal_ports else Result.FAILED
|
||||
template = _(
|
||||
'Port {name} ({details}) available for internal networks')
|
||||
@ -139,20 +139,22 @@ class Firewall(app.FollowerComponent):
|
||||
results.append(DiagnosticCheck(check_id, description, result))
|
||||
|
||||
# External zone
|
||||
check_id = f'firewall-port-{port}-external'
|
||||
if self.is_external:
|
||||
check_id = f'firewall-port-external-available-{port}'
|
||||
result = Result.PASSED \
|
||||
if port in external_ports else Result.FAILED
|
||||
template = _(
|
||||
'Port {name} ({details}) available for external networks')
|
||||
description = format_lazy(template, name=port, details=details)
|
||||
else:
|
||||
check_id = f'firewall-port-external-unavailable-{port}'
|
||||
result = Result.PASSED \
|
||||
if port not in external_ports else Result.FAILED
|
||||
template = _(
|
||||
'Port {name} ({details}) unavailable for external networks'
|
||||
)
|
||||
description = format_lazy(template, name=port, details=details)
|
||||
|
||||
results.append(DiagnosticCheck(check_id, description, result))
|
||||
|
||||
return results
|
||||
|
||||
@ -8,6 +8,7 @@ from unittest.mock import call, patch
|
||||
import pytest
|
||||
|
||||
from plinth.app import App
|
||||
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
|
||||
from plinth.modules.firewall.components import (Firewall,
|
||||
FirewallLocalProtection)
|
||||
|
||||
@ -153,34 +154,46 @@ def test_diagnose(get_enabled_services, get_port_details):
|
||||
firewall = Firewall('test-firewall-1', ports=['test-port1', 'test-port2'],
|
||||
is_external=False)
|
||||
results = firewall.diagnose()
|
||||
assert (results[0].description == 'Port test-port1 (1234/tcp, 1234/udp) '
|
||||
'available for internal networks')
|
||||
assert results[0].result == 'passed'
|
||||
assert (results[1].description == 'Port test-port1 (1234/tcp, 1234/udp) '
|
||||
'unavailable for external networks')
|
||||
assert results[1].result == 'passed'
|
||||
assert (results[2].description == 'Port test-port2 (2345/udp) available '
|
||||
'for internal networks')
|
||||
assert results[2].result == 'failed'
|
||||
assert (results[3].description == 'Port test-port2 (2345/udp) unavailable '
|
||||
'for external networks')
|
||||
assert results[3].result == 'failed'
|
||||
assert results == [
|
||||
DiagnosticCheck(
|
||||
'firewall-port-internal-test-port1',
|
||||
'Port test-port1 (1234/tcp, 1234/udp) available for internal '
|
||||
'networks', Result.PASSED),
|
||||
DiagnosticCheck(
|
||||
'firewall-port-external-unavailable-test-port1',
|
||||
'Port test-port1 (1234/tcp, 1234/udp) unavailable for external '
|
||||
'networks', Result.PASSED),
|
||||
DiagnosticCheck(
|
||||
'firewall-port-internal-test-port2',
|
||||
'Port test-port2 (2345/udp) available for internal networks',
|
||||
Result.FAILED),
|
||||
DiagnosticCheck(
|
||||
'firewall-port-external-unavailable-test-port2',
|
||||
'Port test-port2 (2345/udp) unavailable for external networks',
|
||||
Result.FAILED),
|
||||
]
|
||||
|
||||
firewall = Firewall('test-firewall-1', ports=['test-port3', 'test-port4'],
|
||||
is_external=True)
|
||||
results = firewall.diagnose()
|
||||
assert (results[0].description == 'Port test-port3 (3456/tcp) available '
|
||||
'for internal networks')
|
||||
assert results[0].result == 'passed'
|
||||
assert (results[1].description == 'Port test-port3 (3456/tcp) available '
|
||||
'for external networks')
|
||||
assert results[1].result == 'passed'
|
||||
assert (results[2].description == 'Port test-port4 (4567/udp) available '
|
||||
'for internal networks')
|
||||
assert results[2].result == 'failed'
|
||||
assert (results[3].description == 'Port test-port4 (4567/udp) available '
|
||||
'for external networks')
|
||||
assert results[3].result == 'failed'
|
||||
assert results == [
|
||||
DiagnosticCheck(
|
||||
'firewall-port-internal-test-port3',
|
||||
'Port test-port3 (3456/tcp) available for internal networks',
|
||||
Result.PASSED),
|
||||
DiagnosticCheck(
|
||||
'firewall-port-external-available-test-port3',
|
||||
'Port test-port3 (3456/tcp) available for external networks',
|
||||
Result.PASSED),
|
||||
DiagnosticCheck(
|
||||
'firewall-port-internal-test-port4',
|
||||
'Port test-port4 (4567/udp) available for internal networks',
|
||||
Result.FAILED),
|
||||
DiagnosticCheck(
|
||||
'firewall-port-external-available-test-port4',
|
||||
'Port test-port4 (4567/udp) available for external networks',
|
||||
Result.FAILED),
|
||||
]
|
||||
|
||||
|
||||
def test_local_protection_init():
|
||||
|
||||
@ -111,6 +111,7 @@ def diagnose_url_with_proxy():
|
||||
env = {'https_proxy': proxy}
|
||||
|
||||
result = diagnose_url(url, kind=address['kind'], env=env)
|
||||
result.check_id = f'privoxy-url-proxy-kind-{url}-{address["kind"]}'
|
||||
result.description = _(
|
||||
'Access {url} with proxy {proxy} on tcp{kind}') \
|
||||
.format(url=url, proxy=proxy, kind=address['kind'])
|
||||
|
||||
@ -135,6 +135,7 @@ class TorProxyApp(app_module.App):
|
||||
def _diagnose_url_via_tor(url, kind=None):
|
||||
"""Diagnose whether a URL is reachable via Tor."""
|
||||
result = diagnose_url(url, kind=kind, wrapper='torsocks')
|
||||
result.check_id = 'torproxy-url'
|
||||
result.description = _('Access URL {url} on tcp{kind} via Tor') \
|
||||
.format(url=url, kind=kind)
|
||||
|
||||
@ -146,6 +147,7 @@ def _diagnose_tor_use(url, kind=None):
|
||||
expected_output = 'Congratulations. This browser is configured to use Tor.'
|
||||
result = diagnose_url(url, kind=kind, wrapper='torsocks',
|
||||
expected_output=expected_output)
|
||||
result.check_id = 'torproxy-using-tor'
|
||||
result.description = _('Confirm Tor usage at {url} on tcp{kind}') \
|
||||
.format(url=url, kind=kind)
|
||||
|
||||
|
||||
@ -216,8 +216,10 @@ def _is_release_file_available(protocol: str, dist: str,
|
||||
if backports:
|
||||
dist += '-backports'
|
||||
|
||||
result = check_url(RELEASE_FILE_URL.format(dist), wrapper=wrapper)
|
||||
return result == 'passed'
|
||||
try:
|
||||
return check_url(RELEASE_FILE_URL.format(dist), wrapper=wrapper)
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
|
||||
|
||||
def _add_backports_sources(sources_list: str, protocol: str, dist: str):
|
||||
|
||||
@ -210,7 +210,7 @@ class Packages(app_module.FollowerComponent):
|
||||
try:
|
||||
package_name = package_expression.actual()
|
||||
except MissingPackageError:
|
||||
check_id = f'package-{package_expression}'
|
||||
check_id = f'package-available-{package_expression}'
|
||||
description = _(
|
||||
'Package {expression} is not available for '
|
||||
'install').format(expression=package_expression)
|
||||
@ -226,7 +226,7 @@ class Packages(app_module.FollowerComponent):
|
||||
if package.candidate.is_installed:
|
||||
result = Result.PASSED
|
||||
|
||||
check_id = f'package-{package_name}'
|
||||
check_id = f'package-latest-{package_name}'
|
||||
description = _('Package {package_name} is the latest version '
|
||||
'({latest_version})').format(
|
||||
package_name=package_name,
|
||||
|
||||
@ -249,12 +249,12 @@ def test_app_set_enabled(app_with_components):
|
||||
def test_app_diagnose(app_with_components):
|
||||
"""Test running diagnostics on an app."""
|
||||
results = app_with_components.diagnose()
|
||||
assert results[0].check_id == 'test-result-test-leader-1'
|
||||
assert results[0].description == 'test-result-test-leader-1'
|
||||
assert results[0].result == Result.PASSED
|
||||
assert results[1].check_id == 'test-result-test-leader-2'
|
||||
assert results[1].description == 'test-result-test-leader-2'
|
||||
assert results[1].result == Result.PASSED
|
||||
assert results == [
|
||||
DiagnosticCheck('test-result-test-leader-1',
|
||||
'test-result-test-leader-1', Result.PASSED),
|
||||
DiagnosticCheck('test-result-test-leader-2',
|
||||
'test-result-test-leader-2', Result.PASSED),
|
||||
]
|
||||
|
||||
|
||||
def test_app_has_diagnostics(app_with_components):
|
||||
|
||||
@ -9,6 +9,7 @@ import pytest
|
||||
|
||||
from plinth.app import App
|
||||
from plinth.config import DropinConfigs
|
||||
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
|
||||
|
||||
pytestmark = pytest.mark.usefixtures('mock_privileged')
|
||||
privileged_modules_to_mock = ['plinth.privileged.config']
|
||||
@ -160,8 +161,16 @@ def test_dropin_config_diagnose_symlinks(dropin_configs, tmp_path):
|
||||
with patch('plinth.config.DropinConfigs.ROOT', new=tmp_path):
|
||||
# Nothing exists
|
||||
results = dropin_configs.diagnose()
|
||||
assert results[0].result == 'failed'
|
||||
assert results[1].result == 'failed'
|
||||
assert results == [
|
||||
DiagnosticCheck(
|
||||
f'dropin-config-{tmp_path}/etc/test/path1',
|
||||
f'Static configuration {tmp_path}/etc/test/path1 is setup '
|
||||
'properly', Result.FAILED),
|
||||
DiagnosticCheck(
|
||||
f'dropin-config-{tmp_path}/etc/path2',
|
||||
f'Static configuration {tmp_path}/etc/path2 is setup properly',
|
||||
Result.FAILED),
|
||||
]
|
||||
|
||||
# Proper symlinks exist
|
||||
dropin_configs.enable()
|
||||
|
||||
@ -12,7 +12,7 @@ import pytest
|
||||
from plinth.app import App, FollowerComponent, Info
|
||||
from plinth.daemon import (Daemon, RelatedDaemon, app_is_running,
|
||||
diagnose_netcat, diagnose_port_listening)
|
||||
from plinth.modules.diagnostics.check import DiagnosticCheck
|
||||
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
|
||||
|
||||
privileged_modules_to_mock = ['plinth.privileged.service']
|
||||
|
||||
@ -142,25 +142,27 @@ def test_diagnose(port_listening, service_is_running, daemon):
|
||||
|
||||
def side_effect(port, kind):
|
||||
name = f'test-result-{port}-{kind}'
|
||||
return DiagnosticCheck(name, name, 'passed')
|
||||
return DiagnosticCheck(name, name, Result.PASSED)
|
||||
|
||||
daemon = Daemon('test-daemon', 'test-unit', listen_ports=[(8273, 'tcp4'),
|
||||
(345, 'udp')])
|
||||
port_listening.side_effect = side_effect
|
||||
service_is_running.return_value = True
|
||||
results = daemon.diagnose()
|
||||
assert results[0].description == 'Service test-unit is running'
|
||||
assert results[0].result == 'passed'
|
||||
assert results[1].description == 'test-result-8273-tcp4'
|
||||
assert results[1].result == 'passed'
|
||||
assert results[2].description == 'test-result-345-udp'
|
||||
assert results[2].result == 'passed'
|
||||
assert results == [
|
||||
DiagnosticCheck('daemon-running-test-unit',
|
||||
'Service test-unit is running', Result.PASSED),
|
||||
DiagnosticCheck('test-result-8273-tcp4', 'test-result-8273-tcp4',
|
||||
Result.PASSED),
|
||||
DiagnosticCheck('test-result-345-udp', 'test-result-345-udp',
|
||||
Result.PASSED)
|
||||
]
|
||||
port_listening.assert_has_calls([call(8273, 'tcp4'), call(345, 'udp')])
|
||||
service_is_running.assert_has_calls([call('test-unit')])
|
||||
|
||||
service_is_running.return_value = False
|
||||
results = daemon.diagnose()
|
||||
assert results[0].result == 'failed'
|
||||
assert results[0].result == Result.FAILED
|
||||
|
||||
|
||||
@patch('plinth.action_utils.service_is_running')
|
||||
@ -212,19 +214,23 @@ def test_diagnose_port_listening(connections):
|
||||
|
||||
# Check that message is correct
|
||||
results = diagnose_port_listening(1234)
|
||||
assert results.description == 'Listening on tcp port 1234'
|
||||
assert results.result == 'passed'
|
||||
assert results == DiagnosticCheck('daemon-listening-tcp-1234',
|
||||
'Listening on tcp port 1234',
|
||||
Result.PASSED)
|
||||
results = diagnose_port_listening(1234, 'tcp', '0.0.0.0')
|
||||
assert results.description == 'Listening on tcp port 0.0.0.0:1234'
|
||||
assert results.result == 'passed'
|
||||
assert results == DiagnosticCheck(
|
||||
'daemon-listening-address-tcp-1234-0.0.0.0',
|
||||
'Listening on tcp port 0.0.0.0:1234', Result.PASSED)
|
||||
|
||||
# Failed results
|
||||
results = diagnose_port_listening(4321)
|
||||
assert results.description == 'Listening on tcp port 4321'
|
||||
assert results.result == 'failed'
|
||||
assert results == DiagnosticCheck('daemon-listening-tcp-4321',
|
||||
'Listening on tcp port 4321',
|
||||
Result.FAILED)
|
||||
results = diagnose_port_listening(4321, 'tcp', '0.0.0.0')
|
||||
assert results.description == 'Listening on tcp port 0.0.0.0:4321'
|
||||
assert results.result == 'failed'
|
||||
assert results == DiagnosticCheck(
|
||||
'daemon-listening-address-tcp-4321-0.0.0.0',
|
||||
'Listening on tcp port 0.0.0.0:4321', Result.FAILED)
|
||||
|
||||
# Check if psutil call is being made with right argument
|
||||
results = diagnose_port_listening(1234, 'tcp')
|
||||
@ -241,26 +247,30 @@ def test_diagnose_port_listening(connections):
|
||||
connections.assert_called_with('udp6')
|
||||
|
||||
# TCP
|
||||
assert diagnose_port_listening(1234).result == 'passed'
|
||||
assert diagnose_port_listening(1000).result == 'failed'
|
||||
assert diagnose_port_listening(2345).result == 'failed'
|
||||
assert diagnose_port_listening(1234, 'tcp', '0.0.0.0').result == 'passed'
|
||||
assert diagnose_port_listening(1234, 'tcp', '1.1.1.1').result == 'failed'
|
||||
assert diagnose_port_listening(1234, 'tcp6').result == 'passed'
|
||||
assert diagnose_port_listening(1234, 'tcp4').result == 'passed'
|
||||
assert diagnose_port_listening(6789, 'tcp4').result == 'passed'
|
||||
assert diagnose_port_listening(5678, 'tcp4').result == 'failed'
|
||||
assert diagnose_port_listening(1234).result == Result.PASSED
|
||||
assert diagnose_port_listening(1000).result == Result.FAILED
|
||||
assert diagnose_port_listening(2345).result == Result.FAILED
|
||||
assert diagnose_port_listening(1234, 'tcp',
|
||||
'0.0.0.0').result == Result.PASSED
|
||||
assert diagnose_port_listening(1234, 'tcp',
|
||||
'1.1.1.1').result == Result.FAILED
|
||||
assert diagnose_port_listening(1234, 'tcp6').result == Result.PASSED
|
||||
assert diagnose_port_listening(1234, 'tcp4').result == Result.PASSED
|
||||
assert diagnose_port_listening(6789, 'tcp4').result == Result.PASSED
|
||||
assert diagnose_port_listening(5678, 'tcp4').result == Result.FAILED
|
||||
|
||||
# UDP
|
||||
assert diagnose_port_listening(3456, 'udp').result == 'passed'
|
||||
assert diagnose_port_listening(3000, 'udp').result == 'failed'
|
||||
assert diagnose_port_listening(4567, 'udp').result == 'failed'
|
||||
assert diagnose_port_listening(3456, 'udp', '0.0.0.0').result == 'passed'
|
||||
assert diagnose_port_listening(3456, 'udp', '1.1.1.1').result == 'failed'
|
||||
assert diagnose_port_listening(3456, 'udp6').result == 'passed'
|
||||
assert diagnose_port_listening(3456, 'udp4').result == 'passed'
|
||||
assert diagnose_port_listening(6789, 'udp4').result == 'passed'
|
||||
assert diagnose_port_listening(5678, 'udp4').result == 'failed'
|
||||
assert diagnose_port_listening(3456, 'udp').result == Result.PASSED
|
||||
assert diagnose_port_listening(3000, 'udp').result == Result.FAILED
|
||||
assert diagnose_port_listening(4567, 'udp').result == Result.FAILED
|
||||
assert diagnose_port_listening(3456, 'udp',
|
||||
'0.0.0.0').result == Result.PASSED
|
||||
assert diagnose_port_listening(3456, 'udp',
|
||||
'1.1.1.1').result == Result.FAILED
|
||||
assert diagnose_port_listening(3456, 'udp6').result == Result.PASSED
|
||||
assert diagnose_port_listening(3456, 'udp4').result == Result.PASSED
|
||||
assert diagnose_port_listening(6789, 'udp4').result == Result.PASSED
|
||||
assert diagnose_port_listening(5678, 'udp4').result == Result.FAILED
|
||||
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
@ -268,25 +278,29 @@ def test_diagnose_netcat(popen):
|
||||
"""Test running diagnostic test using netcat."""
|
||||
popen().returncode = 0
|
||||
result = diagnose_netcat('test-host', 3300, input='test-input')
|
||||
assert result.description == 'Connect to test-host:3300'
|
||||
assert result.result == 'passed'
|
||||
assert result == DiagnosticCheck('daemon-netcat-test-host-3300',
|
||||
'Connect to test-host:3300',
|
||||
Result.PASSED)
|
||||
assert popen.mock_calls[1][1] == (['nc', 'test-host', '3300'], )
|
||||
assert popen.mock_calls[2] == call().communicate(input=b'test-input')
|
||||
|
||||
result = diagnose_netcat('test-host', 3300, input='test-input',
|
||||
negate=True)
|
||||
assert result.description == 'Cannot connect to test-host:3300'
|
||||
assert result.result == 'failed'
|
||||
assert result == DiagnosticCheck('daemon-netcat-negate-test-host-3300',
|
||||
'Cannot connect to test-host:3300',
|
||||
Result.FAILED)
|
||||
|
||||
popen().returncode = 1
|
||||
result = diagnose_netcat('test-host', 3300, input='test-input')
|
||||
assert result.description == 'Connect to test-host:3300'
|
||||
assert result.result == 'failed'
|
||||
assert result == DiagnosticCheck('daemon-netcat-test-host-3300',
|
||||
'Connect to test-host:3300',
|
||||
Result.FAILED)
|
||||
|
||||
result = diagnose_netcat('test-host', 3300, input='test-input',
|
||||
negate=True)
|
||||
assert result.description == 'Cannot connect to test-host:3300'
|
||||
assert result.result == 'passed'
|
||||
assert result == DiagnosticCheck('daemon-netcat-negate-test-host-3300',
|
||||
'Cannot connect to test-host:3300',
|
||||
Result.PASSED)
|
||||
|
||||
|
||||
def test_related_daemon_initialization():
|
||||
|
||||
@ -10,6 +10,7 @@ import pytest
|
||||
|
||||
from plinth.app import App
|
||||
from plinth.errors import MissingPackageError
|
||||
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
|
||||
from plinth.package import Package, Packages, packages_installed
|
||||
|
||||
|
||||
@ -238,16 +239,24 @@ def test_diagnose(cache):
|
||||
Package('package6') | Package('package7')
|
||||
])
|
||||
results = component.diagnose()
|
||||
assert 'not available for install' in results[0].description
|
||||
assert results[0].result == 'failed'
|
||||
assert '(2.0)' in results[1].description
|
||||
assert results[1].result == 'passed'
|
||||
assert '(3.0)' in results[2].description
|
||||
assert results[2].result == 'warning'
|
||||
assert 'not available for install' in results[3].description
|
||||
assert results[3].result == 'failed'
|
||||
assert '(4.0)' in results[4].description
|
||||
assert results[4].result == 'passed'
|
||||
assert results == [
|
||||
DiagnosticCheck('package-available-package1',
|
||||
'Package package1 is not available for install',
|
||||
Result.FAILED),
|
||||
DiagnosticCheck('package-latest-package2',
|
||||
'Package package2 is the latest version (2.0)',
|
||||
Result.PASSED),
|
||||
DiagnosticCheck('package-latest-package3',
|
||||
'Package package3 is the latest version (3.0)',
|
||||
Result.WARNING),
|
||||
DiagnosticCheck(
|
||||
'package-available-package4 | package5',
|
||||
'Package package4 | package5 is not available for install',
|
||||
Result.FAILED),
|
||||
DiagnosticCheck('package-latest-package7',
|
||||
'Package package7 is the latest version (4.0)',
|
||||
Result.PASSED),
|
||||
]
|
||||
|
||||
|
||||
@patch('plinth.package.packages_installed')
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user