letsencrypt: Allow reloading daemons after cert changes

- Instead of restarting them.

Tests:

- Changing a domain name leads restarting of services postfix/dovecot services.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
This commit is contained in:
Sunil Mohan Adapa 2024-09-16 12:02:35 -07:00 committed by Veiko Aasa
parent 92a61f422c
commit f8d2cc7b0d
No known key found for this signature in database
GPG Key ID: 478539CAE680674E
2 changed files with 132 additions and 96 deletions

View File

@ -44,7 +44,7 @@ class LetsEncrypt(app.FollowerComponent):
def __init__(self, component_id, domains=None, daemons=None,
should_copy_certificates=False, private_key_path=None,
certificate_path=None, user_owner=None, group_owner=None,
managing_app=None):
managing_app=None, reload_daemons=False):
"""Initialize the Let's Encrypt component.
component_id should be a unique ID across all components of an app and
@ -107,6 +107,8 @@ class LetsEncrypt(app.FollowerComponent):
objects to which the app is allowed to write certificates and other
files to.
reload_daemons is boolean to indicate if the daemons should be reloaded
instead of restarted (which is default).
"""
if should_copy_certificates:
if (not private_key_path or not certificate_path or not user_owner
@ -116,6 +118,7 @@ class LetsEncrypt(app.FollowerComponent):
super().__init__(component_id)
self._domains = domains
self.daemons = daemons
self.reload_daemons = reload_daemons
self.should_copy_certificates = should_copy_certificates
self.private_key_path = private_key_path
self.certificate_path = certificate_path
@ -170,7 +173,10 @@ class LetsEncrypt(app.FollowerComponent):
self._copy_self_signed_certificates([domain])
for daemon in self.daemons:
service_privileged.try_restart(daemon)
if self.reload_daemons:
service_privileged.try_reload_or_restart(daemon)
else:
service_privileged.try_restart(daemon)
def get_status(self):
"""Return the status of certificates for all interested domains.
@ -215,7 +221,10 @@ class LetsEncrypt(app.FollowerComponent):
self._copy_letsencrypt_certificates(interested_domains, lineage)
for daemon in self.daemons:
service_privileged.try_restart(daemon)
if self.reload_daemons:
service_privileged.try_reload_or_restart(daemon)
else:
service_privileged.try_restart(daemon)
def on_certificate_renewed(self, domains, lineage):
"""Handle event when a certificate is renewed.
@ -249,7 +258,10 @@ class LetsEncrypt(app.FollowerComponent):
self._copy_self_signed_certificates(interested_domains)
for daemon in self.daemons:
service_privileged.try_restart(daemon)
if self.reload_daemons:
service_privileged.try_reload_or_restart(daemon)
else:
service_privileged.try_restart(daemon)
def on_certificate_deleted(self, domains, lineage):
"""Handle event when a certificate is deleted.

View File

@ -3,6 +3,8 @@
Test the Let's Encrypt component for managing certificates.
"""
import contextlib
import random
from unittest.mock import call, patch
import pytest
@ -20,20 +22,16 @@ def fixture_empty_letsencrypt_list():
@pytest.fixture(name='component')
def fixture_component():
"""Create a new component for testing."""
return LetsEncrypt(
reload_daemons = random.choice([True, False])
component = LetsEncrypt(
'test-component', domains=['valid.example', 'invalid.example'],
daemons=['test-daemon'], should_copy_certificates=True,
private_key_path='/etc/test-app/{domain}/private.path',
certificate_path='/etc/test-app/{domain}/certificate.path',
user_owner='test-user', group_owner='test-group',
managing_app='test-app')
@pytest.fixture(name='try_restart')
def fixture_try_restart():
"""Patch and return service.try_restart privileged call."""
with patch('plinth.privileged.service.try_restart') as try_restart:
yield try_restart
managing_app='test-app', reload_daemons=reload_daemons)
assert component.reload_daemons == reload_daemons
return component
@pytest.fixture(name='copy_certificate')
@ -100,6 +98,7 @@ def test_init_without_arguments():
assert component.user_owner is None
assert component.group_owner is None
assert component.managing_app is None
assert not component.reload_daemons
assert len(component._all) == 1
assert component._all['test-component'] == component
@ -173,58 +172,74 @@ def _assert_copy_certificate_called(component, copy_certificate, domains):
copy_certificate.assert_has_calls(expected_calls, any_order=True)
def _assert_restarted_daemons(daemons, try_restart):
@contextlib.contextmanager
def _assert_restarted_daemons(component, daemons=None):
"""Check that a call has restarted the daemons of a component."""
daemons = daemons if daemons is not None else component.daemons
expected_calls = [call(daemon) for daemon in daemons]
try_restart.assert_has_calls(expected_calls, any_order=True)
with patch('plinth.privileged.service.try_reload_or_restart'
) as try_reload_or_restart, patch(
'plinth.privileged.service.try_restart') as try_restart:
yield
if component.reload_daemons:
try_reload_or_restart.assert_has_calls(expected_calls,
any_order=True)
try_restart.assert_not_called()
else:
try_restart.assert_has_calls(expected_calls, any_order=True)
try_reload_or_restart.assert_not_called()
def test_setup_certificates(copy_certificate, try_restart, get_status,
component):
def test_setup_certificates(copy_certificate, get_status, component):
"""Test that initial copying of certs for an app works."""
component.setup_certificates()
with _assert_restarted_daemons(component):
component.setup_certificates()
_assert_copy_certificate_called(component, copy_certificate, {
'valid.example': 'valid',
'invalid.example': 'invalid'
})
_assert_restarted_daemons(component.daemons, try_restart)
def test_setup_certificates_without_copy(copy_certificate, try_restart,
get_status, component):
def test_setup_certificates_without_copy(copy_certificate, get_status,
component):
"""Test that initial copying of certs for an app works."""
component.should_copy_certificates = False
component.setup_certificates()
with _assert_restarted_daemons(component):
component.setup_certificates()
_assert_copy_certificate_called(component, copy_certificate, {})
_assert_restarted_daemons(component.daemons, try_restart)
def test_setup_certificates_with_app_domains(copy_certificate, try_restart,
get_status, component):
def test_setup_certificates_with_app_domains(copy_certificate, get_status,
component):
"""Test that initial copying of certs for an app works."""
component._domains = ['irrelevant1.example', 'irrelevant2.example']
component.setup_certificates(
app_domains=['valid.example', 'invalid.example'])
with _assert_restarted_daemons(component):
component.setup_certificates(
app_domains=['valid.example', 'invalid.example'])
_assert_copy_certificate_called(component, copy_certificate, {
'valid.example': 'valid',
'invalid.example': 'invalid'
})
_assert_restarted_daemons(component.daemons, try_restart)
def test_setup_certificates_with_all_domains(domain_list, copy_certificate,
try_restart, get_status,
component):
get_status, component):
"""Test that initial copying for certs works when app domains is '*'."""
component._domains = '*'
component.setup_certificates()
with _assert_restarted_daemons(component):
component.setup_certificates()
_assert_copy_certificate_called(
component, copy_certificate, {
'valid.example': 'valid',
'invalid1.example': 'invalid',
'invalid2.example': 'invalid'
})
_assert_restarted_daemons(component.daemons, try_restart)
def _assert_compare_certificate_called(component, compare_certificate,
@ -278,139 +293,148 @@ def test_get_status_without_copy(component, get_status):
}
def test_on_certificate_obtained(copy_certificate, try_restart, component):
def test_on_certificate_obtained(copy_certificate, component):
"""Test that certificate obtained event handler works."""
component.on_certificate_obtained(['valid.example', 'irrelevant.example'],
'/etc/letsencrypt/live/valid.example/')
with _assert_restarted_daemons(component):
component.on_certificate_obtained(
['valid.example', 'irrelevant.example'],
'/etc/letsencrypt/live/valid.example/')
_assert_copy_certificate_called(component, copy_certificate, {
'valid.example': 'valid',
})
_assert_restarted_daemons(component.daemons, try_restart)
def test_on_certificate_obtained_with_all_domains(copy_certificate,
try_restart, component):
def test_on_certificate_obtained_with_all_domains(copy_certificate, component):
"""Test that certificate obtained event handler works for app with
all domains.
"""
component._domains = '*'
component.on_certificate_obtained(['valid.example'],
'/etc/letsencrypt/live/valid.example/')
with _assert_restarted_daemons(component):
component.on_certificate_obtained(
['valid.example'], '/etc/letsencrypt/live/valid.example/')
_assert_copy_certificate_called(component, copy_certificate, {
'valid.example': 'valid',
})
_assert_restarted_daemons(component.daemons, try_restart)
def test_on_certificate_obtained_irrelevant(copy_certificate, try_restart,
component):
def test_on_certificate_obtained_irrelevant(copy_certificate, component):
"""Test that certificate obtained event handler works with
irrelevant domain.
"""
component.on_certificate_obtained(
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
with _assert_restarted_daemons(component, []):
component.on_certificate_obtained(
['irrelevant.example'],
'/etc/letsencrypt/live/irrelevant.example/')
_assert_copy_certificate_called(component, copy_certificate, {})
_assert_restarted_daemons([], try_restart)
def test_on_certificate_obtained_without_copy(copy_certificate, try_restart,
component):
def test_on_certificate_obtained_without_copy(copy_certificate, component):
"""Test that certificate obtained event handler works without copying."""
component.should_copy_certificates = False
component.on_certificate_obtained(['valid.example'],
'/etc/letsencrypt/live/valid.example/')
with _assert_restarted_daemons(component):
component.on_certificate_obtained(
['valid.example'], '/etc/letsencrypt/live/valid.example/')
_assert_copy_certificate_called(component, copy_certificate, {})
_assert_restarted_daemons(component.daemons, try_restart)
def test_on_certificate_renewed(copy_certificate, try_restart, component):
def test_on_certificate_renewed(copy_certificate, component):
"""Test that certificate renewed event handler works."""
component.on_certificate_renewed(['valid.example', 'irrelevant.example'],
'/etc/letsencrypt/live/valid.example/')
with _assert_restarted_daemons(component):
component.on_certificate_renewed(
['valid.example', 'irrelevant.example'],
'/etc/letsencrypt/live/valid.example/')
_assert_copy_certificate_called(component, copy_certificate, {
'valid.example': 'valid',
})
_assert_restarted_daemons(component.daemons, try_restart)
def test_on_certificate_renewed_irrelevant(copy_certificate, try_restart,
component):
"""Test that certificate renewed event handler works for
irrelevant domains.
"""
component.on_certificate_renewed(
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
def test_on_certificate_renewed_irrelevant(copy_certificate, component):
"""Test that cert renewed event handler works for irrelevant domains."""
with _assert_restarted_daemons(component, []):
component.on_certificate_renewed(
['irrelevant.example'],
'/etc/letsencrypt/live/irrelevant.example/')
_assert_copy_certificate_called(component, copy_certificate, {})
_assert_restarted_daemons([], try_restart)
def test_on_certificate_renewed_without_copy(copy_certificate, try_restart,
component):
def test_on_certificate_renewed_without_copy(copy_certificate, component):
"""Test that certificate renewed event handler works without copying."""
component.should_copy_certificates = False
component.on_certificate_renewed(['valid.example'],
'/etc/letsencrypt/live/valid.example/')
with _assert_restarted_daemons(component):
component.on_certificate_renewed(
['valid.example'], '/etc/letsencrypt/live/valid.example/')
_assert_copy_certificate_called(component, copy_certificate, {})
_assert_restarted_daemons(component.daemons, try_restart)
def test_on_certificate_revoked(copy_certificate, try_restart, component):
def test_on_certificate_revoked(copy_certificate, component):
"""Test that certificate revoked event handler works."""
component.on_certificate_revoked(['valid.example', 'irrelevant.example'],
'/etc/letsencrypt/live/valid.example/')
with _assert_restarted_daemons(component):
component.on_certificate_revoked(
['valid.example', 'irrelevant.example'],
'/etc/letsencrypt/live/valid.example/')
_assert_copy_certificate_called(component, copy_certificate, {
'valid.example': 'invalid',
})
_assert_restarted_daemons(component.daemons, try_restart)
def test_on_certificate_revoked_irrelevant(copy_certificate, try_restart,
component):
def test_on_certificate_revoked_irrelevant(copy_certificate, component):
"""Test that certificate revoked event handler works for
irrelevant domains.
"""
component.on_certificate_revoked(
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
with _assert_restarted_daemons(component, []):
component.on_certificate_revoked(
['irrelevant.example'],
'/etc/letsencrypt/live/irrelevant.example/')
_assert_copy_certificate_called(component, copy_certificate, {})
_assert_restarted_daemons([], try_restart)
def test_on_certificate_revoked_without_copy(copy_certificate, try_restart,
component):
def test_on_certificate_revoked_without_copy(copy_certificate, component):
"""Test that certificate revoked event handler works without copying."""
component.should_copy_certificates = False
component.on_certificate_revoked(['valid.example'],
'/etc/letsencrypt/live/valid.example/')
with _assert_restarted_daemons(component):
component.on_certificate_revoked(
['valid.example'], '/etc/letsencrypt/live/valid.example/')
_assert_copy_certificate_called(component, copy_certificate, {})
_assert_restarted_daemons(component.daemons, try_restart)
def test_on_certificate_deleted(copy_certificate, try_restart, component):
def test_on_certificate_deleted(copy_certificate, component):
"""Test that certificate deleted event handler works."""
component.on_certificate_deleted(['valid.example', 'irrelevant.example'],
'/etc/letsencrypt/live/valid.example/')
with _assert_restarted_daemons(component):
component.on_certificate_deleted(
['valid.example', 'irrelevant.example'],
'/etc/letsencrypt/live/valid.example/')
_assert_copy_certificate_called(component, copy_certificate, {
'valid.example': 'invalid',
})
_assert_restarted_daemons(component.daemons, try_restart)
def test_on_certificate_deleted_irrelevant(copy_certificate, try_restart,
component):
def test_on_certificate_deleted_irrelevant(copy_certificate, component):
"""Test that certificate deleted event handler works for
irrelevant domains.
"""
component.on_certificate_deleted(
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
with _assert_restarted_daemons(component, []):
component.on_certificate_deleted(
['irrelevant.example'],
'/etc/letsencrypt/live/irrelevant.example/')
_assert_copy_certificate_called(component, copy_certificate, {})
_assert_restarted_daemons([], try_restart)
def test_on_certificate_deleted_without_copy(copy_certificate, try_restart,
component):
def test_on_certificate_deleted_without_copy(copy_certificate, component):
"""Test that certificate deleted event handler works without copying."""
component.should_copy_certificates = False
component.on_certificate_deleted(['valid.example'],
'/etc/letsencrypt/live/valid.example/')
with _assert_restarted_daemons(component):
component.on_certificate_deleted(
['valid.example'], '/etc/letsencrypt/live/valid.example/')
_assert_copy_certificate_called(component, copy_certificate, {})
_assert_restarted_daemons(component.daemons, try_restart)