email: implemented service alert

- Better error handling
- You may retroactively diagnose and fix problems
- Shows the most important issues first

audit.models.Diagnosis
  - Added JSON serialization (needed for IPC)
  - Replaced ambiguous vocabulary
  - Provided a sorting key

audit, diagnosis
  - Support title translation
This commit is contained in:
fliu 2021-08-20 00:44:27 +00:00 committed by Sunil Mohan Adapa
parent 74bf8dc4a8
commit f400eb9439
No known key found for this signature in database
GPG Key ID: 43EA1CFF0AA7C5F2
8 changed files with 263 additions and 43 deletions

View File

@ -6,8 +6,9 @@ Provides diagnosis and repair of email server configuration issues
from . import domain
from . import home
from . import ldap
from . import models
from . import rcube
from . import spam
from . import tls
__all__ = ['domain', 'home', 'ldap', 'rcube', 'spam', 'tls']
__all__ = ['domain', 'home', 'ldap', 'models', 'rcube', 'spam', 'tls']

View File

@ -4,6 +4,7 @@ configurations"""
import logging
from django.utils.translation import ugettext_lazy as _
from plinth import actions
import plinth.modules.email_server.postconf as postconf
@ -61,11 +62,15 @@ def get():
Recommended endpoint name:
GET /audit/ldap
"""
translation_table = [
(check_sasl, _('Postfix-Dovecot SASL integration')),
(check_alias_maps, _('Postfix alias maps')),
(check_local_recipient_maps, _('Postfix local recipient maps')),
]
results = []
with postconf.mutex.lock_all():
results.append(check_sasl())
results.append(check_alias_maps())
results.append(check_local_recipient_maps())
for check, title in translation_table:
results.append(check(title))
return results
@ -85,8 +90,8 @@ def action_set_up():
action_set_ulookup()
def check_sasl():
diagnosis = models.MainCfDiagnosis('Postfix-Dovecot SASL integration')
def check_sasl(title=''):
diagnosis = models.MainCfDiagnosis(title)
diagnosis.compare(default_config, postconf.get_many_unsafe)
return diagnosis
@ -109,9 +114,9 @@ def action_set_submission():
options=default_smtps_options)
def check_alias_maps():
def check_alias_maps(title=''):
"""Check the ability to mail to usernames and user aliases"""
diagnosis = models.MainCfDiagnosis('Postfix alias maps')
diagnosis = models.MainCfDiagnosis(title)
analysis = models.AliasMapsAnalysis()
analysis.parsed = postconf.parse_maps_by_key_unsafe('alias_maps')
@ -152,8 +157,8 @@ def rearrange_alias_maps(analysis):
return ' '.join(filter(None, analysis.parsed))
def check_local_recipient_maps():
diagnosis = models.MainCfDiagnosis('Postfix local recipient maps')
def check_local_recipient_maps(title=''):
diagnosis = models.MainCfDiagnosis(title)
lrcpt_maps = postconf.parse_maps_by_key_unsafe('local_recipient_maps')
list_modified = False

View File

@ -15,18 +15,47 @@ class UnresolvedIssueError(AssertionError):
class Diagnosis:
"""Records a diagnosis: what went wrong and how to fix them"""
def __init__(self, title):
def __init__(self, title='', action=''):
"""Class constructor"""
self.title = title
self.fails = []
self.action = action
self.critical = []
self.errors = []
def to_json(self):
"""Serialize object to JSON"""
return {
'class': self.__class__.__name__,
'title': self.title,
'action': self.action,
'errors': self.errors,
'critical': self.critical
}
@classmethod
def from_json(cls, valid_dict, translate=None):
"""Construct a Diagnosis instance from a valid JSON dictionary.
:type valid_dict: dict
:param valid_dict: a valid dictionary representation
:type translate: str -> Union[str, None]
:param translate: optional; if specified, should be a function that
accepts the title and returns a new title or None.
"""
title = valid_dict['title']
if translate:
title = translate(title) or title
result = cls(title, action=valid_dict['action'])
result.errors.extend(valid_dict['errors'])
result.critical.extend(valid_dict['critical'])
return result
def critical(self, message_fmt, *args):
"""Append a message to the fails list"""
"""Append a message to the critical errors list"""
if args:
self.fails.append(message_fmt % args)
self.critical.append(message_fmt % args)
else:
self.fails.append(message_fmt)
self.critical.append(message_fmt)
def error(self, message_fmt, *args):
"""Append a message to the errors list"""
@ -40,28 +69,37 @@ class Diagnosis:
if log:
self.write_logs()
if self.errors:
if self.critical:
return [self.title, 'error']
elif self.fails:
elif self.errors:
return [self.title, 'failed']
else:
return [self.title, 'passed']
@property
def has_failed(self):
"""True if the diagnosis has failed or contains an error"""
return (self.critical or self.errors)
def write_logs(self):
"""Log errors and failures"""
logger.debug('Ran audit: %s', self.title)
for message in self.errors:
for message in self.critical:
logger.critical(message)
for message in self.fails:
for message in self.errors:
logger.error(message)
def sorting_key(self):
"""The key function for list.sort"""
return (-len(self.critical), -len(self.errors), self.title)
class MainCfDiagnosis(Diagnosis):
"""Diagnosis for a set of main.cf configuration keys"""
def __init__(self, title):
"""Class constructor"""
super().__init__(title)
def __init__(self, *args, **kwargs):
"""Class constructor. See :class:`.Diagnosis` for method signature"""
super().__init__(*args, **kwargs)
self.advice = {}
self.user = {}

View File

@ -27,18 +27,15 @@ rcube_mutex = Mutex('rcube-config')
def get():
translation_table = {
translation = {
'rc_installed': _('RoundCube availability'),
'rc_config_header': _('FreedomBox header in RoundCube config'),
'rc_config_header': _('RoundCube configured for FreedomBox email'),
}
output = actions.superuser_run('email_server', ['-i', 'rcube', 'check'])
results = json.loads(output)
for i in range(0, len(results)):
name = translation_table.get(results[i][0], results[i][0])
diagnosis = models.Diagnosis(name)
if results[i][1] == 'error':
diagnosis.error('Failed')
results[i] = diagnosis
results[i] = models.Diagnosis.from_json(results[i], translation.get)
return results
@ -47,23 +44,35 @@ def repair():
actions.superuser_run('email_server', ['-i', 'rcube', 'set_up'])
def repair_component(action):
action_to_services = {'set_up': []}
if action not in action_to_services:
return
actions.superuser_run('email_server', ['-i', 'rcube', action])
return action_to_services[action]
def action_check():
results = _action_check()
for i in range(0, len(results)):
results[i] = results[i].to_json()
print(json.dumps(results))
def _action_check():
results = []
if not os.path.exists(config_path):
results.append(['rc_installed', 'error'])
diagnosis = models.Diagnosis('rc_installed')
diagnosis.error('Config file was missing')
diagnosis.error('Check that RoundCube has been installed')
results.append(diagnosis)
return results
diagnosis = models.Diagnosis('rc_config_header', action='set_up')
injector = ConfigInjector(boundary_pattern, boundary_format)
if injector.has_header_line(config_path):
results.append(['rc_config_header', 'pass'])
else:
results.append(['rc_config_header', 'error'])
if not injector.has_header_line(config_path):
diagnosis.error('FreedomBox header line was missing')
results.append(diagnosis)
return results

View File

@ -6,6 +6,7 @@ import logging
import re
import subprocess
from django.utils.translation import ugettext_lazy as _
from plinth import actions
from . import models
@ -81,9 +82,13 @@ logger = logging.getLogger(__name__)
def get():
translation_table = [
(check_filter, _('Inbound and outbound mail filters')),
]
results = []
with postconf.mutex.lock_all():
results.append(check_filter())
for check, title in translation_table:
results.append(check(title))
return results
@ -91,8 +96,8 @@ def repair():
actions.superuser_run('email_server', ['-i', 'spam', 'set_filter'])
def check_filter():
diagnosis = models.MainCfDiagnosis('Inbound and outbound mail filters')
def check_filter(title=''):
diagnosis = models.MainCfDiagnosis(title)
diagnosis.compare(milter_config, postconf.get_many_unsafe)
return diagnosis

View File

@ -1,9 +1,12 @@
"""TLS configuration"""
# SPDX-License-Identifier: AGPL-3.0-or-later
import json
import logging
import os
import sys
from django.utils.translation import ugettext_lazy as _
from plinth import actions
from . import models
@ -67,17 +70,44 @@ logger = logging.getLogger(__name__)
def get():
results = []
with postconf.mutex.lock_all():
results.append(check_tls())
_get_regular_results(results)
_get_superuser_results(results)
return results
def _get_regular_results(results):
translation_table = [
(check_tls, _('Postfix TLS parameters')),
(check_postfix_cert_usage, _('Postfix uses a TLS certificate')),
]
with postconf.mutex.lock_all():
for check, title in translation_table:
results.append(check(title))
def _get_superuser_results(results):
translation = {
'cert_availability': _('Has a TLS certificate'),
}
dump = actions.superuser_run('email_server', ['-i', 'tls', 'check'])
for jmap in json.loads(dump):
results.append(models.Diagnosis.from_json(jmap, translation.get))
def repair():
actions.superuser_run('email_server', ['-i', 'tls', 'set_up'])
def check_tls():
diagnosis = models.MainCfDiagnosis('Postfix TLS')
def repair_component(action):
action_to_services = {'set_cert': ['dovecot', 'postfix']}
if action not in action_to_services: # action not allowed
return
actions.superuser_run('email_server', ['-i', 'tls', action])
return action_to_services[action]
def check_tls(title=''):
diagnosis = models.MainCfDiagnosis(title)
diagnosis.compare(postfix_config, postconf.get_many_unsafe)
return diagnosis
@ -102,7 +132,12 @@ def try_set_up_certificates():
def find_cert_folder() -> str:
directory = '/etc/letsencrypt/live'
domains_available = []
for item in os.listdir(directory):
try:
listdir_result = os.listdir(directory)
except OSError:
return ''
for item in listdir_result:
if item[0] != '.' and os.path.isdir(directory + '/' + item):
domains_available.append(item)
domains_available.sort()
@ -129,7 +164,41 @@ def write_dovecot_cert_config(cert, key):
fd.write(content)
def check_postfix_cert_usage(title=''):
prefix = '/etc/letsencrypt/live/'
diagnosis = models.Diagnosis(title, action='set_cert')
conf = postconf.get_many_unsafe(['smtpd_tls_cert_file',
'smtpd_tls_key_file'])
if not conf['smtpd_tls_cert_file'].startswith(prefix):
diagnosis.error("Cert file not in Let's Encrypt directory")
if not conf['smtpd_tls_key_file'].startswith(prefix):
diagnosis.error("Privkey file not in Let's Encrypt directory")
return diagnosis
def su_check_cert_availability(title=''):
diagnosis = models.Diagnosis(title)
if find_cert_folder() == '':
diagnosis.error("Could not find a Let's Encrypt certificate")
return diagnosis
def action_set_up():
with postconf.mutex.lock_all():
repair_tls(check_tls())
try_set_up_certificates()
def action_set_cert():
with postconf.mutex.lock_all():
try_set_up_certificates()
def action_check():
checks = ('cert_availability',)
results = []
for check_name in checks:
check_function = globals()['su_check_' + check_name]
results.append(check_function(check_name).to_json())
json.dump(results, sys.stdout, indent=0) # indent=0 adds a new line

View File

@ -14,5 +14,44 @@
{% trans "Visit Rspamd administration interface" %}
</a>
</p>
{% if related_diagnostics %}
<h3>{% trans "Service Alert" %}</h3>
<ul class="list-group">
{% for model in related_diagnostics %}
<li class="list-group-item clearfix">
<span>{{ model.title }}</span>
{% if model.critical %}
<span class="badge badge-danger">{% trans "error" %}</span>
{% elif model.errors %}
<span class="badge badge-warning">{% trans "failed" %}</span>
{% else %}
<span class="badge badge-success">{% trans "passed" %}</span>
{% endif %}
{% if model.has_failed and model.action %}
<form method="post" class="float-right"
action="{{ request.path }}" >
{% csrf_token %}
<button type="submit" class="btn btn-sm btn-outline-primary"
name="repair" value="{{ model.action }}">
{% trans "Repair" %}
</button>
</form>
{% endif %}
<ul>
{% for message in model.critical %}
<li>{{ message }}</li>
{% endfor %}
{% for message in model.errors %}
<li>{{ message }}</li>
{% endfor %}
</ul>
</li>
{% endfor %}
</ul>
{% endif %}
{{ block.super }}
{% endblock %}

View File

@ -3,9 +3,12 @@ import io
import itertools
import pwd
import plinth.actions
import plinth.utils
from django.core.exceptions import ValidationError
from django.http import HttpResponseBadRequest
from django.shortcuts import redirect
from django.utils.html import escape
from django.utils.translation import ugettext_lazy as _
from django.views.generic.base import TemplateView, View
@ -82,6 +85,57 @@ class EmailServerView(TabMixin, AppView):
"""Server configuration page"""
app_id = 'email_server'
template_name = 'email_server.html'
audit_modules = ('tls', 'rcube')
def get_context_data(self, *args, **kwargs):
dlist = []
for module_name in self.audit_modules:
self._get_audit_results(module_name, dlist)
dlist.sort(key=audit.models.Diagnosis.sorting_key)
context = super().get_context_data(*args, **kwargs)
context['related_diagnostics'] = dlist
return context
def _get_audit_results(self, module_name, dlist):
try:
results = getattr(audit, module_name).get()
except Exception as e:
title = _('Internal error in {0}').format('audit.' + module_name)
diagnosis = audit.models.Diagnosis(title)
diagnosis.critical(str(e))
diagnosis.critical(_('Check syslog for more information'))
results = [diagnosis]
for diagnosis in results:
if diagnosis.action:
diagnosis.action = '%s.%s' % (module_name, diagnosis.action)
if diagnosis.has_failed:
dlist.append(diagnosis)
def post(self, request):
repair_field = request.POST.get('repair')
module_name, sep, action_name = repair_field.partition('.')
if not sep or module_name not in self.audit_modules:
return HttpResponseBadRequest('Bad post data')
self._repair(module_name, action_name)
return redirect(request.path)
def _repair(self, module_name, action_name):
module = getattr(audit, module_name)
if not hasattr(module, 'repair_component'):
return
reload_list = []
try:
reload_list = module.repair_component(action_name)
except Exception:
pass
for service in reload_list:
# plinth.action_utils.service_reload(service)
plinth.actions.superuser_run('service', ['reload', service])
class MyMailView(TabMixin, TemplateView):