diagnostics: Add DiagnosticCheck dataclass

- Set unique check_id for each diagnostic check.

- Result is a string-based enumeration. The default value (NOT_DONE) can be
  used for diagnostic checks that have not been completed yet.

- Result is StrEnum so that the return value of check_url can still be used
  directly as a diagnostic result.

Closes: #2375

Signed-off-by: James Valleroy <jvalleroy@mailbox.org>
Reviewed-by: Sunil Mohan Adapa <sunil@medhas.org>
This commit is contained in:
James Valleroy 2023-09-11 16:35:41 -04:00
parent d9491d5762
commit 79f36e6a0c
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
19 changed files with 284 additions and 183 deletions

View File

@ -102,6 +102,8 @@ class DropinConfigs(app_module.FollowerComponent):
def diagnose(self): def diagnose(self):
"""Check all links/copies and return generate diagnostic results.""" """Check all links/copies and return generate diagnostic results."""
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
results = [] results = []
for path in self.etc_paths: for path in self.etc_paths:
etc_path = self._get_etc_path(path) etc_path = self._get_etc_path(path)
@ -113,10 +115,12 @@ class DropinConfigs(app_module.FollowerComponent):
result = (etc_path.is_symlink() result = (etc_path.is_symlink()
and etc_path.readlink() == target) and etc_path.readlink() == target)
result_string = 'passed' if result else 'failed' check_id = f'config-{etc_path}'
result_string = Result.PASSED if result else Result.FAILED
template = _('Static configuration {etc_path} is setup properly') template = _('Static configuration {etc_path} is setup properly')
test_name = format_lazy(template, etc_path=str(etc_path)) description = format_lazy(template, etc_path=str(etc_path))
results.append([test_name, result_string]) results.append(
DiagnosticCheck(check_id, description, result_string))
return results return results

View File

@ -99,12 +99,15 @@ class Daemon(app.LeaderComponent):
def _diagnose_unit_is_running(self): def _diagnose_unit_is_running(self):
"""Check if a daemon is running.""" """Check if a daemon is running."""
result = 'passed' if self.is_running() else 'failed' from plinth.modules.diagnostics.check import DiagnosticCheck, Result
check_id = f'daemon-{self.unit}-running'
result = Result.PASSED if self.is_running() else Result.FAILED
template = gettext_lazy('Service {service_name} is running') template = gettext_lazy('Service {service_name} is running')
testname = format_lazy(template, service_name=self.unit) description = format_lazy(template, service_name=self.unit)
return [testname, result] return DiagnosticCheck(check_id, description, result)
class RelatedDaemon(app.FollowerComponent): class RelatedDaemon(app.FollowerComponent):
@ -151,18 +154,23 @@ def diagnose_port_listening(port, kind='tcp', listen_address=None):
information. information.
""" """
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
result = _check_port(port, kind, listen_address) result = _check_port(port, kind, listen_address)
if listen_address: if listen_address:
check_id = f'daemon-{kind}-{port}-{listen_address}'
template = gettext_lazy( template = gettext_lazy(
'Listening on {kind} port {listen_address}:{port}') 'Listening on {kind} port {listen_address}:{port}')
testname = format_lazy(template, kind=kind, description = format_lazy(template, kind=kind,
listen_address=listen_address, port=port) listen_address=listen_address, port=port)
else: else:
check_id = f'daemon-{kind}-{port}'
template = gettext_lazy('Listening on {kind} port {port}') template = gettext_lazy('Listening on {kind} port {port}')
testname = format_lazy(template, kind=kind, port=port) description = format_lazy(template, kind=kind, port=port)
return [testname, 'passed' if result else 'failed'] return DiagnosticCheck(check_id, description,
Result.PASSED if result else Result.FAILED)
def _check_port(port, kind='tcp', listen_address=None): def _check_port(port, kind='tcp', listen_address=None):
@ -211,6 +219,8 @@ def _check_port(port, kind='tcp', listen_address=None):
def diagnose_netcat(host, port, input='', negate=False): def diagnose_netcat(host, port, input='', negate=False):
"""Run a diagnostic using netcat.""" """Run a diagnostic using netcat."""
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
try: try:
process = subprocess.Popen(['nc', host, str(port)], process = subprocess.Popen(['nc', host, str(port)],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
@ -218,17 +228,22 @@ def diagnose_netcat(host, port, input='', negate=False):
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
process.communicate(input=input.encode()) process.communicate(input=input.encode())
if process.returncode != 0: if process.returncode != 0:
result = 'failed' result = Result.FAILED
if negate:
result = Result.PASSED
else: else:
result = 'passed' result = Result.PASSED
if negate:
result = Result.FAILED
if negate:
result = 'failed' if result == 'passed' else 'passed'
except Exception: except Exception:
result = 'failed' result = Result.FAILED
test = _('Connect to {host}:{port}') check_id = f'daemon-netcat-{host}-{port}'
description = _('Connect to {host}:{port}')
if negate: if negate:
test = _('Cannot connect to {host}:{port}') description = _('Cannot connect to {host}:{port}')
return [test.format(host=host, port=port), result] return DiagnosticCheck(check_id, description.format(host=host, port=port),
result)

View File

@ -8,6 +8,7 @@ from django.utils.text import format_lazy
from django.utils.translation import gettext_lazy from django.utils.translation import gettext_lazy
from plinth import action_utils, app from plinth import action_utils, app
from plinth.modules.diagnostics.check import DiagnosticCheck
from plinth.privileged import service as service_privileged from plinth.privileged import service as service_privileged
from . import privileged from . import privileged
@ -145,13 +146,15 @@ def diagnose_url(url, kind=None, env=None, check_certificate=True,
wrapper, expected_output) wrapper, expected_output)
if kind: if kind:
check_id = f'apache-url-{url}-{kind}'
template = gettext_lazy('Access URL {url} on tcp{kind}') template = gettext_lazy('Access URL {url} on tcp{kind}')
testname = format_lazy(template, url=url, kind=kind) description = format_lazy(template, url=url, kind=kind)
else: else:
check_id = f'apache-url-{url}'
template = gettext_lazy('Access URL {url}') template = gettext_lazy('Access URL {url}')
testname = format_lazy(template, url=url) description = format_lazy(template, url=url)
return [testname, result] return DiagnosticCheck(check_id, description, result)
def diagnose_url_on_all(url, expect_redirects=False, **kwargs): def diagnose_url_on_all(url, expect_redirects=False, **kwargs):

View File

@ -243,11 +243,13 @@ def test_diagnose_url(get_addresses, check):
} }
check.return_value = 'passed' check.return_value = 'passed'
result = diagnose_url(**args) result = diagnose_url(**args)
assert result == ['Access URL https://localhost/test on tcp4', 'passed'] assert result.description == 'Access URL https://localhost/test on tcp4'
assert result.result == 'passed'
check.return_value = 'failed' check.return_value = 'failed'
result = diagnose_url(**args) result = diagnose_url(**args)
assert result == ['Access URL https://localhost/test on tcp4', 'failed'] assert result.description == 'Access URL https://localhost/test on tcp4'
assert result.result == 'failed'
del args['kind'] del args['kind']
args['url'] = 'https://{host}/test' args['url'] = 'https://{host}/test'
@ -264,10 +266,12 @@ def test_diagnose_url(get_addresses, check):
'url_address': 'test-host-2' 'url_address': 'test-host-2'
}] }]
result = diagnose_url_on_all(**args) result = diagnose_url_on_all(**args)
assert result == [ assert result[
['Access URL https://test-host-1/test on tcp4', 'passed'], 0].description == 'Access URL https://test-host-1/test on tcp4'
['Access URL https://test-host-2/test on tcp6', 'passed'], assert result[0].result == 'passed'
] assert result[
1].description == 'Access URL https://test-host-2/test on tcp6'
assert result[1].result == 'passed'
@patch('subprocess.run') @patch('subprocess.run')

View File

@ -19,6 +19,7 @@ from plinth.modules.apache.components import diagnose_url_on_all
from plinth.modules.backups.components import BackupRestore from plinth.modules.backups.components import BackupRestore
from . import manifest from . import manifest
from .check import Result
_description = [ _description = [
_('The system diagnostic test will run a number of checks on your ' _('The system diagnostic test will run a number of checks on your '
@ -299,13 +300,14 @@ def _run_background_diagnostics():
for _app_id, app_data in results.items(): for _app_id, app_data in results.items():
if app_data['exception']: if app_data['exception']:
exception_count += 1 exception_count += 1
continue
for _test, result in app_data['diagnosis']: for check in app_data['diagnosis']:
if result == 'error': if check.result == Result.ERROR:
error_count += 1 error_count += 1
elif result == 'failed': elif check.result == Result.FAILED:
failure_count += 1 failure_count += 1
elif cfg.develop and result == 'warning': elif check.result == Result.WARNING:
warning_count += 1 warning_count += 1
notification_id = 'diagnostics-background' notification_id = 'diagnostics-background'

View File

@ -0,0 +1,28 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Diagnostic check data type
"""
from dataclasses import dataclass
from enum import StrEnum
class Result(StrEnum):
"""The result of a diagnostic check."""
NOT_DONE = 'not_done'
PASSED = 'passed'
WARNING = 'warning'
FAILED = 'failed'
ERROR = 'error'
# TODO: Add parameters for test.
# TODO: Description should not be translated until we need to display it.
@dataclass
class DiagnosticCheck:
"""A diagnostic check and optional result."""
check_id: str
description: str
result: Result = Result.NOT_DONE

View File

@ -13,18 +13,18 @@
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
{% for test, result in results %} {% for result in results %}
<tr> <tr>
<td>{{ test }}</td> <td>{{ result.description }}</td>
<td class="diagnostics-result"> <td class="diagnostics-result">
{% if result == 'passed' %} {% if result.result == 'passed' %}
<span class="badge badge-success">{% trans result %}</span> <span class="badge badge-success">{% trans result.result %}</span>
{% elif result == 'failed' %} {% elif result.result == 'failed' %}
<span class="badge badge-danger">{% trans result %}</span> <span class="badge badge-danger">{% trans result.result %}</span>
{% elif result == 'error' or result == 'warning' %} {% elif result.result == 'error' or result.result == 'warning' %}
<span class="badge badge-warning">{% trans result %}</span> <span class="badge badge-warning">{% trans result.result %}</span>
{% else %} {% else %}
{{ result }} {{ result.result }}
{% endif %} {% endif %}
</td> </td>
</tr> </tr>

View File

@ -11,6 +11,7 @@ from plinth import app as app_module
from plinth import cfg, menu from plinth import cfg, menu
from plinth.daemon import Daemon from plinth.daemon import Daemon
from plinth.modules.backups.components import BackupRestore from plinth.modules.backups.components import BackupRestore
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
from plinth.package import Packages, install from plinth.package import Packages, install
from plinth.utils import Version, format_lazy, import_from_gi from plinth.utils import Version, format_lazy, import_from_gi
@ -266,16 +267,20 @@ def remove_passthrough(ipv, *args):
def _diagnose_default_zone(config): def _diagnose_default_zone(config):
"""Diagnose whether the default zone is external.""" """Diagnose whether the default zone is external."""
testname = gettext('Default zone is external') check_id = 'firewall-default-zone'
result = 'passed' if config['default_zone'] == 'external' else 'failed' description = gettext('Default zone is external')
return [testname, result] result = Result.PASSED if config[
'default_zone'] == 'external' else Result.FAILED
return DiagnosticCheck(check_id, description, result)
def _diagnose_firewall_backend(config): def _diagnose_firewall_backend(config):
"""Diagnose whether the firewall backend is nftables.""" """Diagnose whether the firewall backend is nftables."""
testname = gettext('Firewall backend is nftables') check_id = 'firewall-backend'
result = 'passed' if config['backend'] == 'nftables' else 'failed' description = gettext('Firewall backend is nftables')
return [testname, result] result = Result.PASSED if config['backend'] == 'nftables' \
else Result.FAILED
return DiagnosticCheck(check_id, description, result)
def _diagnose_direct_passthroughs(config): def _diagnose_direct_passthroughs(config):
@ -284,6 +289,8 @@ def _diagnose_direct_passthroughs(config):
Currently, we just check that the number of passthroughs is at least 12, Currently, we just check that the number of passthroughs is at least 12,
which are the number that are added by firewall's setup. which are the number that are added by firewall's setup.
""" """
testname = gettext('Direct passthrough rules exist') check_id = 'firewall-direct-passthroughs'
result = 'passed' if len(config['passthroughs']) >= 12 else 'failed' description = gettext('Direct passthrough rules exist')
return [testname, result] result = Result.PASSED if len(
config['passthroughs']) >= 12 else Result.FAILED
return DiagnosticCheck(check_id, description, result)

View File

@ -12,6 +12,7 @@ from django.utils.translation import gettext_lazy as _
from plinth import app from plinth import app
from plinth.modules import firewall from plinth.modules import firewall
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -130,25 +131,29 @@ class Firewall(app.FollowerComponent):
for port_number, protocol in port_detail['details'])) for port_number, protocol in port_detail['details']))
# Internal zone # Internal zone
result = 'passed' if port in internal_ports else 'failed' check_id = f'firewall-port-{port}-internal'
result = Result.PASSED if port in internal_ports else Result.FAILED
template = _( template = _(
'Port {name} ({details}) available for internal networks') 'Port {name} ({details}) available for internal networks')
testname = format_lazy(template, name=port, details=details) description = format_lazy(template, name=port, details=details)
results.append([testname, result]) results.append(DiagnosticCheck(check_id, description, result))
# External zone # External zone
check_id = f'firewall-port-{port}-external'
if self.is_external: if self.is_external:
result = 'passed' if port in external_ports else 'failed' result = Result.PASSED \
if port in external_ports else Result.FAILED
template = _( template = _(
'Port {name} ({details}) available for external networks') 'Port {name} ({details}) available for external networks')
testname = format_lazy(template, name=port, details=details) description = format_lazy(template, name=port, details=details)
else: else:
result = 'passed' if port not in external_ports else 'failed' result = Result.PASSED \
if port not in external_ports else Result.FAILED
template = _( template = _(
'Port {name} ({details}) unavailable for external networks' 'Port {name} ({details}) unavailable for external networks'
) )
testname = format_lazy(template, name=port, details=details) description = format_lazy(template, name=port, details=details)
results.append([testname, result]) results.append(DiagnosticCheck(check_id, description, result))
return results return results

View File

@ -153,37 +153,34 @@ def test_diagnose(get_enabled_services, get_port_details):
firewall = Firewall('test-firewall-1', ports=['test-port1', 'test-port2'], firewall = Firewall('test-firewall-1', ports=['test-port1', 'test-port2'],
is_external=False) is_external=False)
results = firewall.diagnose() results = firewall.diagnose()
assert results == [ assert (results[0].description == 'Port test-port1 (1234/tcp, 1234/udp) '
[ 'available for internal networks')
'Port test-port1 (1234/tcp, 1234/udp) available for internal ' assert results[0].result == 'passed'
'networks', 'passed' assert (results[1].description == 'Port test-port1 (1234/tcp, 1234/udp) '
], 'unavailable for external networks')
[ assert results[1].result == 'passed'
'Port test-port1 (1234/tcp, 1234/udp) unavailable for external ' assert (results[2].description == 'Port test-port2 (2345/udp) available '
'networks', 'passed' 'for internal networks')
], assert results[2].result == 'failed'
[ assert (results[3].description == 'Port test-port2 (2345/udp) unavailable '
'Port test-port2 (2345/udp) available for internal networks', 'for external networks')
'failed' assert results[3].result == 'failed'
],
[
'Port test-port2 (2345/udp) unavailable for external networks',
'failed'
]
]
firewall = Firewall('test-firewall-1', ports=['test-port3', 'test-port4'], firewall = Firewall('test-firewall-1', ports=['test-port3', 'test-port4'],
is_external=True) is_external=True)
results = firewall.diagnose() results = firewall.diagnose()
assert results == [[ assert (results[0].description == 'Port test-port3 (3456/tcp) available '
'Port test-port3 (3456/tcp) available for internal networks', 'passed' 'for internal networks')
], [ assert results[0].result == 'passed'
'Port test-port3 (3456/tcp) available for external networks', 'passed' assert (results[1].description == 'Port test-port3 (3456/tcp) available '
], [ 'for external networks')
'Port test-port4 (4567/udp) available for internal networks', 'failed' assert results[1].result == 'passed'
], [ assert (results[2].description == 'Port test-port4 (4567/udp) available '
'Port test-port4 (4567/udp) available for external networks', 'failed' 'for internal networks')
]] assert results[2].result == 'failed'
assert (results[3].description == 'Port test-port4 (4567/udp) available '
'for external networks')
assert results[3].result == 'failed'
def test_local_protection_init(): def test_local_protection_init():

View File

@ -111,7 +111,8 @@ def diagnose_url_with_proxy():
env = {'https_proxy': proxy} env = {'https_proxy': proxy}
result = diagnose_url(url, kind=address['kind'], env=env) result = diagnose_url(url, kind=address['kind'], env=env)
result[0] = _('Access {url} with proxy {proxy} on tcp{kind}') \ result.description = _(
'Access {url} with proxy {proxy} on tcp{kind}') \
.format(url=url, proxy=proxy, kind=address['kind']) .format(url=url, proxy=proxy, kind=address['kind'])
results.append(result) results.append(result)

View File

@ -15,6 +15,7 @@ from plinth.daemon import (Daemon, app_is_running, diagnose_netcat,
from plinth.modules import torproxy from plinth.modules import torproxy
from plinth.modules.apache.components import Webserver from plinth.modules.apache.components import Webserver
from plinth.modules.backups.components import BackupRestore from plinth.modules.backups.components import BackupRestore
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
from plinth.modules.firewall.components import Firewall from plinth.modules.firewall.components import Firewall
from plinth.modules.names.components import DomainType from plinth.modules.names.components import DomainType
from plinth.modules.torproxy.utils import is_apt_transport_tor_enabled from plinth.modules.torproxy.utils import is_apt_transport_tor_enabled
@ -131,10 +132,10 @@ class TorApp(app_module.App):
ports = status['ports'] ports = status['ports']
if status['relay_enabled']: if status['relay_enabled']:
results.append([ results.append(
_('Tor relay port available'), DiagnosticCheck(
'passed' if 'orport' in ports else 'failed' 'tor-port-relay', _('Tor relay port available'),
]) Result.PASSED if 'orport' in ports else Result.FAILED))
if 'orport' in ports: if 'orport' in ports:
results.append( results.append(
diagnose_port_listening(int(ports['orport']), 'tcp4')) diagnose_port_listening(int(ports['orport']), 'tcp4'))
@ -142,20 +143,20 @@ class TorApp(app_module.App):
diagnose_port_listening(int(ports['orport']), 'tcp6')) diagnose_port_listening(int(ports['orport']), 'tcp6'))
if status['bridge_relay_enabled']: if status['bridge_relay_enabled']:
results.append([ results.append(
_('Obfs3 transport registered'), DiagnosticCheck(
'passed' if 'obfs3' in ports else 'failed' 'tor-port-obfs3', _('Obfs3 transport registered'),
]) Result.PASSED if 'obfs3' in ports else Result.FAILED))
if 'obfs3' in ports: if 'obfs3' in ports:
results.append( results.append(
diagnose_port_listening(int(ports['obfs3']), 'tcp4')) diagnose_port_listening(int(ports['obfs3']), 'tcp4'))
results.append( results.append(
diagnose_port_listening(int(ports['obfs3']), 'tcp6')) diagnose_port_listening(int(ports['obfs3']), 'tcp6'))
results.append([ results.append(
_('Obfs4 transport registered'), DiagnosticCheck(
'passed' if 'obfs4' in ports else 'failed' 'tor-port-obfs4', _('Obfs4 transport registered'),
]) Result.PASSED if 'obfs4' in ports else Result.FAILED))
if 'obfs4' in ports: if 'obfs4' in ports:
results.append( results.append(
diagnose_port_listening(int(ports['obfs4']), 'tcp4')) diagnose_port_listening(int(ports['obfs4']), 'tcp4'))
@ -164,10 +165,11 @@ class TorApp(app_module.App):
if status['hs_enabled']: if status['hs_enabled']:
hs_hostname = status['hs_hostname'].split('.onion')[0] hs_hostname = status['hs_hostname'].split('.onion')[0]
results.append([ results.append(
_('Onion service is version 3'), DiagnosticCheck(
'passed' if len(hs_hostname) == 56 else 'failed' 'tor-onion-version', _('Onion service is version 3'),
]) Result.PASSED
if len(hs_hostname) == 56 else Result.FAILED))
return results return results

View File

@ -135,7 +135,7 @@ class TorProxyApp(app_module.App):
def _diagnose_url_via_tor(url, kind=None): def _diagnose_url_via_tor(url, kind=None):
"""Diagnose whether a URL is reachable via Tor.""" """Diagnose whether a URL is reachable via Tor."""
result = diagnose_url(url, kind=kind, wrapper='torsocks') result = diagnose_url(url, kind=kind, wrapper='torsocks')
result[0] = _('Access URL {url} on tcp{kind} via Tor') \ result.description = _('Access URL {url} on tcp{kind} via Tor') \
.format(url=url, kind=kind) .format(url=url, kind=kind)
return result return result
@ -146,7 +146,7 @@ def _diagnose_tor_use(url, kind=None):
expected_output = 'Congratulations. This browser is configured to use Tor.' expected_output = 'Congratulations. This browser is configured to use Tor.'
result = diagnose_url(url, kind=kind, wrapper='torsocks', result = diagnose_url(url, kind=kind, wrapper='torsocks',
expected_output=expected_output) expected_output=expected_output)
result[0] = _('Confirm Tor usage at {url} on tcp{kind}') \ result.description = _('Confirm Tor usage at {url} on tcp{kind}') \
.format(url=url, kind=kind) .format(url=url, kind=kind)
return result return result

View File

@ -12,6 +12,7 @@ from plinth import app as app_module
from plinth import cfg, menu from plinth import cfg, menu
from plinth.config import DropinConfigs from plinth.config import DropinConfigs
from plinth.daemon import Daemon from plinth.daemon import Daemon
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
from plinth.package import Packages from plinth.package import Packages
from plinth.privileged import service as service_privileged from plinth.privileged import service as service_privileged
@ -112,32 +113,34 @@ class UsersApp(app_module.App):
def _diagnose_ldap_entry(search_item): def _diagnose_ldap_entry(search_item):
"""Diagnose that an LDAP entry exists.""" """Diagnose that an LDAP entry exists."""
result = 'failed' check_id = f'users-ldap-entry-{search_item}'
result = Result.FAILED
try: try:
subprocess.check_output( subprocess.check_output(
['ldapsearch', '-x', '-b', 'dc=thisbox', search_item]) ['ldapsearch', '-x', '-b', 'dc=thisbox', search_item])
result = 'passed' result = Result.PASSED
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
pass pass
template = _('Check LDAP entry "{search_item}"') template = _('Check LDAP entry "{search_item}"')
testname = format_lazy(template, search_item=search_item) description = format_lazy(template, search_item=search_item)
return [testname, result] return DiagnosticCheck(check_id, description, result)
def _diagnose_nslcd_config(config, key, value): def _diagnose_nslcd_config(config, key, value):
"""Diagnose that nslcd has a configuration.""" """Diagnose that nslcd has a configuration."""
check_id = f'users-nslcd-config-{key}'
try: try:
result = 'passed' if config[key] == value else 'failed' result = Result.PASSED if config[key] == value else Result.FAILED
except KeyError: except KeyError:
result = 'failed' result = Result.FAILED
template = _('Check nslcd config "{key} {value}"') template = _('Check nslcd config "{key} {value}"')
testname = format_lazy(template, key=key, value=value) description = format_lazy(template, key=key, value=value)
return [testname, result] return DiagnosticCheck(check_id, description, result)
def _diagnose_nsswitch_config(): def _diagnose_nsswitch_config():
@ -151,22 +154,23 @@ def _diagnose_nsswitch_config():
results = [] results = []
for database in ['passwd', 'group', 'shadow']: for database in ['passwd', 'group', 'shadow']:
result = 'failed' check_id = f'users-nsswitch-config-{database}'
result = Result.FAILED
for match in aug.match('database'): for match in aug.match('database'):
if aug.get(match) != database: if aug.get(match) != database:
continue continue
for service_match in aug.match(match + '/service'): for service_match in aug.match(match + '/service'):
if 'ldap' == aug.get(service_match): if 'ldap' == aug.get(service_match):
result = 'passed' result = Result.PASSED
break break
break break
template = _('Check nsswitch config "{database}"') template = _('Check nsswitch config "{database}"')
testname = format_lazy(template, database=database) description = format_lazy(template, database=database)
results.append([testname, result]) results.append(DiagnosticCheck(check_id, description, result))
return results return results

View File

@ -202,30 +202,36 @@ class Packages(app_module.FollowerComponent):
def diagnose(self): def diagnose(self):
"""Run diagnostics and return results.""" """Run diagnostics and return results."""
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
results = super().diagnose() results = super().diagnose()
cache = apt.Cache() cache = apt.Cache()
for package_expression in self.package_expressions: for package_expression in self.package_expressions:
try: try:
package_name = package_expression.actual() package_name = package_expression.actual()
except MissingPackageError: except MissingPackageError:
message = _('Package {expression} is not available for ' check_id = f'package-{package_expression}'
'install').format(expression=package_expression) description = _(
results.append([message, 'failed']) 'Package {expression} is not available for '
'install').format(expression=package_expression)
results.append(
DiagnosticCheck(check_id, description, Result.FAILED))
continue continue
result = 'warning' result = Result.WARNING
latest_version = '?' latest_version = '?'
if package_name in cache: if package_name in cache:
package = cache[package_name] package = cache[package_name]
latest_version = package.candidate.version latest_version = package.candidate.version
if package.candidate.is_installed: if package.candidate.is_installed:
result = 'passed' result = Result.PASSED
message = _('Package {package_name} is the latest version ' check_id = f'package-{package_name}'
'({latest_version})').format( description = _('Package {package_name} is the latest version '
package_name=package_name, '({latest_version})').format(
latest_version=latest_version) package_name=package_name,
results.append([message, result]) latest_version=latest_version)
results.append(DiagnosticCheck(check_id, description, result))
return results return results

View File

@ -10,6 +10,7 @@ import pytest
from plinth.app import (App, Component, EnableState, FollowerComponent, Info, from plinth.app import (App, Component, EnableState, FollowerComponent, Info,
LeaderComponent, apps_init) LeaderComponent, apps_init)
from plinth.modules.diagnostics.check import DiagnosticCheck, Result
# pylint: disable=protected-access # pylint: disable=protected-access
@ -35,7 +36,10 @@ class LeaderTest(FollowerComponent):
def diagnose(self): def diagnose(self):
"""Return diagnostic results.""" """Return diagnostic results."""
return [('test-result-' + self.component_id, 'success')] return [
DiagnosticCheck('test-result-' + self.component_id,
'test-result-' + self.component_id, Result.PASSED)
]
@pytest.fixture(name='app_with_components') @pytest.fixture(name='app_with_components')
@ -245,8 +249,12 @@ def test_app_set_enabled(app_with_components):
def test_app_diagnose(app_with_components): def test_app_diagnose(app_with_components):
"""Test running diagnostics on an app.""" """Test running diagnostics on an app."""
results = app_with_components.diagnose() results = app_with_components.diagnose()
assert results == [('test-result-test-leader-1', 'success'), assert results[0].check_id == 'test-result-test-leader-1'
('test-result-test-leader-2', 'success')] assert results[0].description == 'test-result-test-leader-1'
assert results[0].result == Result.PASSED
assert results[1].check_id == 'test-result-test-leader-2'
assert results[1].description == 'test-result-test-leader-2'
assert results[1].result == Result.PASSED
def test_app_has_diagnostics(app_with_components): def test_app_has_diagnostics(app_with_components):
@ -262,7 +270,9 @@ def test_app_has_diagnostics(app_with_components):
assert not app.has_diagnostics() assert not app.has_diagnostics()
# App with app-level diagnostics # App with app-level diagnostics
with patch.object(AppTest, 'diagnose', return_value=[('test1', 'passed')]): with patch.object(
AppTest, 'diagnose',
return_value=[DiagnosticCheck('test1', 'test1', Result.PASSED)]):
assert app.has_diagnostics() assert app.has_diagnostics()

View File

@ -160,32 +160,32 @@ def test_dropin_config_diagnose_symlinks(dropin_configs, tmp_path):
with patch('plinth.config.DropinConfigs.ROOT', new=tmp_path): with patch('plinth.config.DropinConfigs.ROOT', new=tmp_path):
# Nothing exists # Nothing exists
results = dropin_configs.diagnose() results = dropin_configs.diagnose()
assert results[0][1] == 'failed' assert results[0].result == 'failed'
assert results[1][1] == 'failed' assert results[1].result == 'failed'
# Proper symlinks exist # Proper symlinks exist
dropin_configs.enable() dropin_configs.enable()
results = dropin_configs.diagnose() results = dropin_configs.diagnose()
assert results[0][1] == 'passed' assert results[0].result == 'passed'
assert results[1][1] == 'passed' assert results[1].result == 'passed'
# A file exists instead of symlink # A file exists instead of symlink
dropin_configs.disable() dropin_configs.disable()
etc_path = DropinConfigs._get_etc_path('/etc/test/path1') etc_path = DropinConfigs._get_etc_path('/etc/test/path1')
etc_path.touch() etc_path.touch()
results = dropin_configs.diagnose() results = dropin_configs.diagnose()
assert results[0][1] == 'failed' assert results[0].result == 'failed'
# Symlink points to wrong location # Symlink points to wrong location
dropin_configs.disable() dropin_configs.disable()
etc_path.symlink_to('/blah') etc_path.symlink_to('/blah')
results = dropin_configs.diagnose() results = dropin_configs.diagnose()
assert results[0][1] == 'failed' assert results[0].result == 'failed'
# Symlink is recreated # Symlink is recreated
dropin_configs.enable() dropin_configs.enable()
results = dropin_configs.diagnose() results = dropin_configs.diagnose()
assert results[0][1] == 'passed' assert results[0].result == 'passed'
def test_dropin_config_diagnose_copy_only(dropin_configs, tmp_path): def test_dropin_config_diagnose_copy_only(dropin_configs, tmp_path):
@ -199,24 +199,24 @@ def test_dropin_config_diagnose_copy_only(dropin_configs, tmp_path):
# Nothing exists # Nothing exists
results = dropin_configs.diagnose() results = dropin_configs.diagnose()
assert results[0][1] == 'failed' assert results[0].result == 'failed'
assert results[1][1] == 'failed' assert results[1].result == 'failed'
# Proper copies exist # Proper copies exist
dropin_configs.enable() dropin_configs.enable()
results = dropin_configs.diagnose() results = dropin_configs.diagnose()
assert results[0][1] == 'passed' assert results[0].result == 'passed'
assert results[1][1] == 'passed' assert results[1].result == 'passed'
# A symlink exists instead of a copied file # A symlink exists instead of a copied file
dropin_configs.disable() dropin_configs.disable()
etc_path = DropinConfigs._get_etc_path('/etc/test/path1') etc_path = DropinConfigs._get_etc_path('/etc/test/path1')
etc_path.symlink_to('/blah') etc_path.symlink_to('/blah')
results = dropin_configs.diagnose() results = dropin_configs.diagnose()
assert results[0][1] == 'failed' assert results[0].result == 'failed'
# Copied file contains wrong contents # Copied file contains wrong contents
dropin_configs.disable() dropin_configs.disable()
etc_path.write_text('x-invalid-contents') etc_path.write_text('x-invalid-contents')
results = dropin_configs.diagnose() results = dropin_configs.diagnose()
assert results[0][1] == 'failed' assert results[0].result == 'failed'

View File

@ -12,6 +12,7 @@ import pytest
from plinth.app import App, FollowerComponent, Info from plinth.app import App, FollowerComponent, Info
from plinth.daemon import (Daemon, RelatedDaemon, app_is_running, from plinth.daemon import (Daemon, RelatedDaemon, app_is_running,
diagnose_netcat, diagnose_port_listening) diagnose_netcat, diagnose_port_listening)
from plinth.modules.diagnostics.check import DiagnosticCheck
privileged_modules_to_mock = ['plinth.privileged.service'] privileged_modules_to_mock = ['plinth.privileged.service']
@ -140,22 +141,26 @@ def test_diagnose(port_listening, service_is_running, daemon):
"""Test running diagnostics.""" """Test running diagnostics."""
def side_effect(port, kind): def side_effect(port, kind):
return [f'test-result-{port}-{kind}', 'passed'] name = f'test-result-{port}-{kind}'
return DiagnosticCheck(name, name, 'passed')
daemon = Daemon('test-daemon', 'test-unit', listen_ports=[(8273, 'tcp4'), daemon = Daemon('test-daemon', 'test-unit', listen_ports=[(8273, 'tcp4'),
(345, 'udp')]) (345, 'udp')])
port_listening.side_effect = side_effect port_listening.side_effect = side_effect
service_is_running.return_value = True service_is_running.return_value = True
results = daemon.diagnose() results = daemon.diagnose()
assert results == [['Service test-unit is running', 'passed'], assert results[0].description == 'Service test-unit is running'
['test-result-8273-tcp4', 'passed'], assert results[0].result == 'passed'
['test-result-345-udp', 'passed']] assert results[1].description == 'test-result-8273-tcp4'
assert results[1].result == 'passed'
assert results[2].description == 'test-result-345-udp'
assert results[2].result == 'passed'
port_listening.assert_has_calls([call(8273, 'tcp4'), call(345, 'udp')]) port_listening.assert_has_calls([call(8273, 'tcp4'), call(345, 'udp')])
service_is_running.assert_has_calls([call('test-unit')]) service_is_running.assert_has_calls([call('test-unit')])
service_is_running.return_value = False service_is_running.return_value = False
results = daemon.diagnose() results = daemon.diagnose()
assert results[0][1] == 'failed' assert results[0].result == 'failed'
@patch('plinth.action_utils.service_is_running') @patch('plinth.action_utils.service_is_running')
@ -207,15 +212,19 @@ def test_diagnose_port_listening(connections):
# Check that message is correct # Check that message is correct
results = diagnose_port_listening(1234) results = diagnose_port_listening(1234)
assert results == ['Listening on tcp port 1234', 'passed'] assert results.description == 'Listening on tcp port 1234'
assert results.result == 'passed'
results = diagnose_port_listening(1234, 'tcp', '0.0.0.0') results = diagnose_port_listening(1234, 'tcp', '0.0.0.0')
assert results == ['Listening on tcp port 0.0.0.0:1234', 'passed'] assert results.description == 'Listening on tcp port 0.0.0.0:1234'
assert results.result == 'passed'
# Failed results # Failed results
results = diagnose_port_listening(4321) results = diagnose_port_listening(4321)
assert results == ['Listening on tcp port 4321', 'failed'] assert results.description == 'Listening on tcp port 4321'
assert results.result == 'failed'
results = diagnose_port_listening(4321, 'tcp', '0.0.0.0') results = diagnose_port_listening(4321, 'tcp', '0.0.0.0')
assert results == ['Listening on tcp port 0.0.0.0:4321', 'failed'] assert results.description == 'Listening on tcp port 0.0.0.0:4321'
assert results.result == 'failed'
# Check if psutil call is being made with right argument # Check if psutil call is being made with right argument
results = diagnose_port_listening(1234, 'tcp') results = diagnose_port_listening(1234, 'tcp')
@ -232,26 +241,26 @@ def test_diagnose_port_listening(connections):
connections.assert_called_with('udp6') connections.assert_called_with('udp6')
# TCP # TCP
assert diagnose_port_listening(1234)[1] == 'passed' assert diagnose_port_listening(1234).result == 'passed'
assert diagnose_port_listening(1000)[1] == 'failed' assert diagnose_port_listening(1000).result == 'failed'
assert diagnose_port_listening(2345)[1] == 'failed' assert diagnose_port_listening(2345).result == 'failed'
assert diagnose_port_listening(1234, 'tcp', '0.0.0.0')[1] == 'passed' assert diagnose_port_listening(1234, 'tcp', '0.0.0.0').result == 'passed'
assert diagnose_port_listening(1234, 'tcp', '1.1.1.1')[1] == 'failed' assert diagnose_port_listening(1234, 'tcp', '1.1.1.1').result == 'failed'
assert diagnose_port_listening(1234, 'tcp6')[1] == 'passed' assert diagnose_port_listening(1234, 'tcp6').result == 'passed'
assert diagnose_port_listening(1234, 'tcp4')[1] == 'passed' assert diagnose_port_listening(1234, 'tcp4').result == 'passed'
assert diagnose_port_listening(6789, 'tcp4')[1] == 'passed' assert diagnose_port_listening(6789, 'tcp4').result == 'passed'
assert diagnose_port_listening(5678, 'tcp4')[1] == 'failed' assert diagnose_port_listening(5678, 'tcp4').result == 'failed'
# UDP # UDP
assert diagnose_port_listening(3456, 'udp')[1] == 'passed' assert diagnose_port_listening(3456, 'udp').result == 'passed'
assert diagnose_port_listening(3000, 'udp')[1] == 'failed' assert diagnose_port_listening(3000, 'udp').result == 'failed'
assert diagnose_port_listening(4567, 'udp')[1] == 'failed' assert diagnose_port_listening(4567, 'udp').result == 'failed'
assert diagnose_port_listening(3456, 'udp', '0.0.0.0')[1] == 'passed' assert diagnose_port_listening(3456, 'udp', '0.0.0.0').result == 'passed'
assert diagnose_port_listening(3456, 'udp', '1.1.1.1')[1] == 'failed' assert diagnose_port_listening(3456, 'udp', '1.1.1.1').result == 'failed'
assert diagnose_port_listening(3456, 'udp6')[1] == 'passed' assert diagnose_port_listening(3456, 'udp6').result == 'passed'
assert diagnose_port_listening(3456, 'udp4')[1] == 'passed' assert diagnose_port_listening(3456, 'udp4').result == 'passed'
assert diagnose_port_listening(6789, 'udp4')[1] == 'passed' assert diagnose_port_listening(6789, 'udp4').result == 'passed'
assert diagnose_port_listening(5678, 'udp4')[1] == 'failed' assert diagnose_port_listening(5678, 'udp4').result == 'failed'
@patch('subprocess.Popen') @patch('subprocess.Popen')
@ -259,21 +268,25 @@ def test_diagnose_netcat(popen):
"""Test running diagnostic test using netcat.""" """Test running diagnostic test using netcat."""
popen().returncode = 0 popen().returncode = 0
result = diagnose_netcat('test-host', 3300, input='test-input') result = diagnose_netcat('test-host', 3300, input='test-input')
assert result == ['Connect to test-host:3300', 'passed'] assert result.description == 'Connect to test-host:3300'
assert result.result == 'passed'
assert popen.mock_calls[1][1] == (['nc', 'test-host', '3300'], ) assert popen.mock_calls[1][1] == (['nc', 'test-host', '3300'], )
assert popen.mock_calls[2] == call().communicate(input=b'test-input') assert popen.mock_calls[2] == call().communicate(input=b'test-input')
result = diagnose_netcat('test-host', 3300, input='test-input', result = diagnose_netcat('test-host', 3300, input='test-input',
negate=True) negate=True)
assert result == ['Cannot connect to test-host:3300', 'failed'] assert result.description == 'Cannot connect to test-host:3300'
assert result.result == 'failed'
popen().returncode = 1 popen().returncode = 1
result = diagnose_netcat('test-host', 3300, input='test-input') result = diagnose_netcat('test-host', 3300, input='test-input')
assert result == ['Connect to test-host:3300', 'failed'] assert result.description == 'Connect to test-host:3300'
assert result.result == 'failed'
result = diagnose_netcat('test-host', 3300, input='test-input', result = diagnose_netcat('test-host', 3300, input='test-input',
negate=True) negate=True)
assert result == ['Cannot connect to test-host:3300', 'passed'] assert result.description == 'Cannot connect to test-host:3300'
assert result.result == 'passed'
def test_related_daemon_initialization(): def test_related_daemon_initialization():

View File

@ -238,16 +238,16 @@ def test_diagnose(cache):
Package('package6') | Package('package7') Package('package6') | Package('package7')
]) ])
results = component.diagnose() results = component.diagnose()
assert 'not available for install' in results[0][0] assert 'not available for install' in results[0].description
assert results[0][1] == 'failed' assert results[0].result == 'failed'
assert '(2.0)' in results[1][0] assert '(2.0)' in results[1].description
assert results[1][1] == 'passed' assert results[1].result == 'passed'
assert '(3.0)' in results[2][0] assert '(3.0)' in results[2].description
assert results[2][1] == 'warning' assert results[2].result == 'warning'
assert 'not available for install' in results[3][0] assert 'not available for install' in results[3].description
assert results[3][1] == 'failed' assert results[3].result == 'failed'
assert '(4.0)' in results[4][0] assert '(4.0)' in results[4].description
assert results[4][1] == 'passed' assert results[4].result == 'passed'
@patch('plinth.package.packages_installed') @patch('plinth.package.packages_installed')