From e24a76e1b7ffb9ae27353a3a899c1bd495b97864 Mon Sep 17 00:00:00 2001 From: Sunil Mohan Adapa Date: Fri, 14 Jun 2019 12:07:00 -0700 Subject: [PATCH] letsencrypt: Introduce component for handling certificates Signed-off-by: Sunil Mohan Adapa Reviewed-by: Joseph Nuthalapati --- plinth/modules/letsencrypt/components.py | 400 ++++++++++++++++++ .../letsencrypt/tests/test_components.py | 383 +++++++++++++++++ 2 files changed, 783 insertions(+) create mode 100644 plinth/modules/letsencrypt/components.py create mode 100644 plinth/modules/letsencrypt/tests/test_components.py diff --git a/plinth/modules/letsencrypt/components.py b/plinth/modules/letsencrypt/components.py new file mode 100644 index 000000000..bad5bf902 --- /dev/null +++ b/plinth/modules/letsencrypt/components.py @@ -0,0 +1,400 @@ +# +# This file is part of FreedomBox. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +""" +App component for other apps to use handle Let's Encrypt certificates. +""" + +import json +import logging +import pathlib +import threading + +from plinth import actions, app + +logger = logging.getLogger(__name__) + + +class LetsEncrypt(app.FollowerComponent): + """Component to receive Let's Encrypt renewal hooks. + + Performs the following tasks: + + - Listen to certificate change events from the Let's Encrypt app. This + could be obtain, re-obtain, renewal, revoke and delete events. + + - Filter out events after comparing domains list in certificate events to + the list of domains the app (to which this component belongs) is + interested in. + + - If requested, copy certificates from /etc/letsencrypt directory to app's + directory with the needed permissions and ownership. + + - Query status of copied certificates. Status could be up-to-date, + self-signed, outdated (/etc/letsencrypt has more recent certificate), + expired or a test certificate. + + - Optionally restart a bunch of daemons after interesting certificate event + occurs. + + """ + + _all = {} + + def __init__(self, component_id, domains=None, daemons=None, + should_copy_certificates=False, private_key_path=None, + certificate_path=None, user_owner=None, group_owner=None, + managing_app=None): + """Initialize the Let's Encrypt component. + + component_id should be a unique ID across all components of an app and + across all components. + + domains is a list of domains that the app is configured for. Any + certificate operation that is not happening on these domains is ignored + by this component. + + domains can be the string '*' in which case it means that the app is + relevant for (or is interested in) all domains configured in the + system. + + domains can be a callable instead of a list. In this case, the callable + is called every time an operation needs to be checked against list of + domains for the app. The callable must return a list of domains that + the app is configured for. + + daemons is a list of systemd units that need to be restarted (actually + try-restarted, i.e., only if they are already running) when a relevant + certificate operation happens. + + should_copy_certificates indicates whether the app needs the Let's + Encrypt certificates in the directory /etc/letsencrypt to be copied to + a different location with different permissions to be able to use them. + If true, private_key_path, certificate_path, user_owner, group_owner + and managing_app properties must be specified. Otherwise, ValueError is + raised. + + private_key_path is the string containing the file path to which a + private key must be copied. It may contain the sub-string '{domain}' + which will be replaced with the name of the domain for which this + certificate is being copied. Any non-existing parent directories are + created with 'root:root' ownership and '0o755' file permissions. The + permissions for the file will be '0o600' and ownership will be as + specified in parameters user_owner and group_owner. If both + private_key_path and certificate_path refer to the same path, the + target file is created by appending the private key and certificate. + + certificate_path is the string containing the file path to which a + certificate must be copied. It may contain the sub-string '{domain}' + which will be replaced with the name of the domain for which this + certificate is being copied. Any non-existing parent directories are + created with 'root:root' ownership and '0o755' file permissions. The + permissions for the file will be '0o644' and ownership will be as + specified in parameters user_owner and group_owner. If both + private_key_path and certificate_path refer to the same path, the + target file is created by appending the private key and certificate. + + user_owner specifies the user who should own the copied certificates. + This is typically the unprivileged user account that runs the daemon. + + group_owner specifies the group who should own the copied certificates. + This is typically the unprivileged group account that runs the daemon. + + managing_app is the name of the app that contains this component. This + is used by the superuser action to verify that this app is allowed to + copy certificates to the target location. The app must then define a + property 'managing_paths' which contains a list of pathlib.Path() + objects to which the app is allowed to write certificates and other + files to. + + """ + if should_copy_certificates: + if (not private_key_path or not certificate_path or not user_owner + or not group_owner or not managing_app): + raise ValueError('Not enough values for copying certificates') + + super().__init__(component_id) + self._domains = domains + self.daemons = daemons + self.should_copy_certificates = should_copy_certificates + self.private_key_path = private_key_path + self.certificate_path = certificate_path + self.user_owner = user_owner + self.group_owner = group_owner + self.managing_app = managing_app + + self._all[component_id] = self + + @property + def domains(self): + """Return a list of domains this component's app is interested in.""" + if callable(self._domains): + return self._domains() + + return self._domains + + @classmethod + def list(cls): + """Return a list of all Let's Encrypt components.""" + return cls._all.values() + + def setup_certificates(self, app_domains=None): + """Setup app certificates for all interested domains. + + For every domain, a certificate is copied. If a valid certificate is + not available in Let's Encrypt, a self-signed snakeoil certificate is + used. Each daemon is restarted if it is running. + + app_domains is the list of domains for which certificates must be + copied. If it is not provided, the component's list of domains (which + may be acquired by a callable) is used. + + """ + if not app_domains: + app_domains = self.domains + + domains, status = self._get_letsencrypt_domains() + + if self.should_copy_certificates: + for domain in app_domains: + if domain in domains: + lineage = status['domains'][domain]['lineage'] + self._copy_letsencrypt_certificates([domain], lineage) + else: + self._copy_self_signed_certificates([domain]) + + for daemon in self.daemons: + actions.superuser_run('service', ['try-restart', daemon]) + + def get_status(self): + """Return the status of certificates for all interested domains. + + A dictionary is returned containing a key for each domain and the value + shall be a simple string with state of certificate for this app. + Possible values are 'outdated-copy', 'self-signed', 'unknown', 'valid', + 'revoked', 'expired' and 'test_cert'. + + """ + app_domains = self.domains + domains, le_status = self._get_letsencrypt_domains() + + final_status = {} + for domain in app_domains: + if domain in domains: + status = le_status['domains'][domain]['validity'] + if self.should_copy_certificates: + if not self._compare_certificate( + domain, le_status['domains'][domain]['lineage']): + status = 'outdated-copy' + else: + status = 'self-signed' + + final_status[domain] = status + + return final_status + + def on_certificate_obtained(self, domains, lineage): + """Handle event when a certificate is obtained. + + Filter any domains that are interesting for the app that this component + is part of. For all interesting domains, copy the certificates if + requested and then restart the daemons. + + """ + interested_domains = self._get_interested_domains(domains) + if not interested_domains: + return + + if self.should_copy_certificates: + self._copy_letsencrypt_certificates(interested_domains, lineage) + + for daemon in self.daemons: + actions.superuser_run('service', ['try-restart', daemon]) + + def on_certificate_renewed(self, domains, lineage): + """Handle event when a certificate is renewed. + + This is typically called when a Cron job triggers certbot renewal + process which in turn triggers configured renewal hooks after + successful renewal. It is also called when a valid certificate is + re-obtained. + + To trigger a renewal event for testing purposes, run `certbot renew + --force-renewal`. + + For now, performs the same operations as obtaining a fresh certificate. + + """ + return self.on_certificate_obtained(domains, lineage) + + def on_certificate_revoked(self, domains, lineage): + """Handle event when a certificate is revoked. + + Filter any domains that are interesting for the app that this component + is part of. For all interesting domains, copy self-signed snakeoil + certificates if requested and then restart the daemons. + + """ + interested_domains = self._get_interested_domains(domains) + if not interested_domains: + return + + if self.should_copy_certificates: + self._copy_self_signed_certificates(interested_domains) + + for daemon in self.daemons: + actions.superuser_run('service', ['try-restart', daemon]) + + def on_certificate_deleted(self, domains, lineage): + """Handle event when a certificate is deleted. + + For now, performs the same operations as obtaining a fresh certificate. + + """ + return self.on_certificate_revoked(domains, lineage) + + @staticmethod + def _get_letsencrypt_domains(): + """Return the list of domains with LE certificates.""" + from plinth.modules import letsencrypt + status = letsencrypt.get_status() + domains = { + domain + for domain, domain_status in status['domains'].items() + if domain_status + } + return domains, status + + def _get_interested_domains(self, domains): + """Return set of domains the component is interested in.""" + app_domains = self.domains + if not app_domains: + return set() + + if app_domains == '*': + return set(domains) + + return set(domains).intersection(set(app_domains)) + + def _copy_self_signed_certificates(self, domains): + """Copy a self-signed certificate for all domains. + + Copy Apache's snake-oil certificate into daemon's directory. The + self-signed certificate may not really work. It is merely to prevent + the server from failing to startup because the files are missing. + + """ + source_private_key_path = '/etc/ssl/private/ssl-cert-snakeoil.key' + source_certificate_path = '/etc/ssl/certs/ssl-cert-snakeoil.pem' + self._copy_certificates(domains, source_private_key_path, + source_certificate_path) + + def _copy_letsencrypt_certificates(self, domains, lineage): + """Copy a valid Let's Encrypt certificate for given domains.""" + source_private_key_path = pathlib.Path(lineage) / 'privkey.pem' + source_certificate_path = pathlib.Path(lineage) / 'fullchain.pem' + self._copy_certificates(domains, source_private_key_path, + source_certificate_path) + + def _copy_certificates(self, domains, source_private_key_path, + source_certificate_path): + """Copy certificate for all domains in the certificate. + + The list of domains passed should be subset of domains listed in the + certificate. + + """ + if '{domain}' in self.private_key_path or \ + '{domain}' in self.certificate_path: + for domain in domains: + private_key_path = self.private_key_path.format(domain=domain) + certificate_path = self.certificate_path.format(domain=domain) + self._copy_certificate(source_private_key_path, + source_certificate_path, + private_key_path, certificate_path) + else: + self._copy_certificate( + source_private_key_path, source_certificate_path, + self.private_key_path, self.certificate_path) + + def _copy_certificate(self, source_private_key_path, + source_certificate_path, private_key_path, + certificate_path): + """Copy certificate for a single domain.""" + actions.superuser_run('letsencrypt', [ + 'copy-certificate', '--managing-app', self.managing_app, + '--user-owner', self.user_owner, '--group-owner', self.group_owner, + '--source-private-key-path', + str(source_private_key_path), '--source-certificate-path', + str(source_certificate_path), '--private-key-path', + private_key_path, '--certificate-path', certificate_path + ]) + + def _compare_certificate(self, domain, lineage): + """Compare LE certificate with app certificate.""" + source_private_key_path = pathlib.Path(lineage) / 'privkey.pem' + source_certificate_path = pathlib.Path(lineage) / 'fullchain.pem' + private_key_path = self.private_key_path.format(domain=domain) + certificate_path = self.certificate_path.format(domain=domain) + output = actions.superuser_run('letsencrypt', [ + 'compare-certificate', '--managing-app', self.managing_app, + '--source-private-key-path', + str(source_private_key_path), '--source-certificate-path', + str(source_certificate_path), '--private-key-path', + private_key_path, '--certificate-path', certificate_path + ]) + return json.loads(output)['result'] + + +def on_certificate_event(event, domains, lineage): + """Start a new thread to handle a LE certificate event. + + Run in a new thread because: + + - We don't want to block the thread running Glib main loop when called from + dbus handler in case of renewal. + + - We don't want to delay the a certificate operation with copying + certificates to all apps. + + - We don't want to cause a page load error when restarting Apache due to + certificate changes. + + """ + threading.Thread(target=on_certificate_event_sync, args=(event, domains, + lineage)).start() + + +def on_certificate_event_sync(event, domains, lineage): + """Trigger certificate event hooks across all apps.""" + if isinstance(domains, str): + domains = domains.split() + + if not lineage: + # XXX: A better to find lineage is by reading the renewal/{domain}.conf + lineage = str(pathlib.Path('/etc/letsencrypt/live/') / domains[0]) + + assert event in ('obtained', 'renewed', 'revoked', 'deleted') + + for component in LetsEncrypt.list(): + logger.info('Handling certificate event for %s: %s, %s, %s', + component.component_id, event, domains, lineage) + try: + getattr(component, 'on_certificate_' + event)(domains, lineage) + except Exception as exception: + logger.exception( + 'Error executing certificate hook for %s: %s, %s, %s: %s', + component.component_id, event, domains, lineage, exception) diff --git a/plinth/modules/letsencrypt/tests/test_components.py b/plinth/modules/letsencrypt/tests/test_components.py new file mode 100644 index 000000000..0491d9903 --- /dev/null +++ b/plinth/modules/letsencrypt/tests/test_components.py @@ -0,0 +1,383 @@ +# +# This file is part of FreedomBox. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +""" +Test the Let's Encrypt component for managing certificates. +""" + +import json +from unittest.mock import call, patch + +import pytest + +from plinth.modules.letsencrypt.components import LetsEncrypt + + +@pytest.fixture(name='empty_letsencrypt_list', autouse=True) +def fixture_empty_letsencrypt_list(): + """Remove all entries from Let's Encrypt component list.""" + LetsEncrypt._all = {} + + +@pytest.fixture(name='component') +def fixture_component(): + """Create a new component for testing.""" + return LetsEncrypt( + 'test-component', domains=['valid.example', 'invalid.example'], + daemons=['test-daemon'], should_copy_certificates=True, + private_key_path='/etc/test-app/{domain}/private.path', + certificate_path='/etc/test-app/{domain}/certificate.path', + user_owner='test-user', group_owner='test-group', + managing_app='test-app') + + +@pytest.fixture(name='get_status') +def fixture_get_status(): + """Return patched letsencrypt.get_status() method.""" + domains = ['valid.example'] + with patch('plinth.modules.letsencrypt.get_status') as get_status: + get_status.return_value = { + 'domains': { + domain: { + 'lineage': '/etc/letsencrypt/live/' + domain, + 'validity': 'valid', + } + for domain in domains + } + } + yield get_status + + +@pytest.fixture(name='superuser_run') +def fixture_superuser_run(): + """Return patched plinth.actions.superuser_run() method.""" + with patch('plinth.actions.superuser_run') as superuser_run: + yield superuser_run + + +def test_init_without_arguments(): + """Test that component is initialized with defaults properly.""" + component = LetsEncrypt('test-component') + + assert component.component_id == 'test-component' + assert component.domains is None + assert component.daemons is None + assert not component.should_copy_certificates + assert component.private_key_path is None + assert component.certificate_path is None + assert component.user_owner is None + assert component.group_owner is None + assert component.managing_app is None + assert len(component._all) == 1 + assert component._all['test-component'] == component + + +def test_init(component): + """Test initializing the component.""" + assert component.domains == ['valid.example', 'invalid.example'] + assert component.daemons == ['test-daemon'] + assert component.should_copy_certificates + assert component.private_key_path == '/etc/test-app/{domain}/private.path' + assert component.certificate_path == \ + '/etc/test-app/{domain}/certificate.path' + assert component.user_owner == 'test-user' + assert component.group_owner == 'test-group' + assert component.managing_app == 'test-app' + + +def test_init_values(): + """Test initializing with invalid values.""" + properties = { + 'private_key_path': 'test-private-key-path', + 'certificate_path': 'test-certificate-path', + 'user_owner': 'test-user', + 'group_owner': 'test-group', + 'managing_app': 'test-app' + } + LetsEncrypt('test-component', should_copy_certificates=True, **properties) + for key in properties: + new_properties = dict(properties) + new_properties[key] = None + with pytest.raises(ValueError): + LetsEncrypt('test-component', should_copy_certificates=True, + **new_properties) + + +def test_domains(): + """Test getting domains.""" + component = LetsEncrypt('test-component', domains=lambda: ['test-domains']) + assert component.domains == ['test-domains'] + + +def test_list(): + """Test listing components.""" + component1 = LetsEncrypt('test-component1') + component2 = LetsEncrypt('test-component2') + assert set(LetsEncrypt.list()) == {component1, component2} + + +def _assert_copy_certificate_called(component, superuser_run, domains): + """Check that copy certificate calls have been made properly.""" + copy_calls = [ + mock_call for mock_call in superuser_run.mock_calls + if mock_call[1][0] == 'letsencrypt' + and mock_call[1][1][0] == 'copy-certificate' + ] + expected_calls = [] + for domain, domain_status in domains.items(): + if domain_status == 'valid': + source_private_key_path = \ + '/etc/letsencrypt/live/{}/privkey.pem'.format(domain) + source_certificate_path = \ + '/etc/letsencrypt/live/{}/fullchain.pem'.format(domain) + else: + source_private_key_path = '/etc/ssl/private/ssl-cert-snakeoil.key' + source_certificate_path = '/etc/ssl/certs/ssl-cert-snakeoil.pem' + + private_key_path = '/etc/test-app/{}/private.path'.format(domain) + certificate_path = '/etc/test-app/{}/certificate.path'.format(domain) + expected_call = call('letsencrypt', [ + 'copy-certificate', '--managing-app', component.managing_app, + '--user-owner', component.user_owner, '--group-owner', + component.group_owner, '--source-private-key-path', + str(source_private_key_path), '--source-certificate-path', + str(source_certificate_path), '--private-key-path', + private_key_path, '--certificate-path', certificate_path + ]) + expected_calls.append(expected_call) + + assert len(expected_calls) == len(copy_calls) + for expected_call in expected_calls: + print(expected_call) + print(copy_calls) + assert expected_call in copy_calls + + +def _assert_restarted_daemons(daemons, superuser_run): + """Check that a call has restarted the daemons of a component.""" + run_calls = [ + mock_call for mock_call in superuser_run.mock_calls + if mock_call[1][0] == 'service' + ] + expected_calls = [ + call('service', ['try-restart', daemon]) for daemon in daemons + ] + assert len(expected_calls) == len(run_calls) + for expected_call in expected_calls: + assert expected_call in run_calls + + +def test_setup_certificates(superuser_run, get_status, component): + """Test that initial copying of certs for an app works.""" + component.setup_certificates() + _assert_copy_certificate_called(component, superuser_run, { + 'valid.example': 'valid', + 'invalid.example': 'invalid' + }) + _assert_restarted_daemons(component.daemons, superuser_run) + + +def test_setup_certificates_without_copy(superuser_run, get_status, component): + """Test that initial copying of certs for an app works.""" + component.should_copy_certificates = False + component.setup_certificates() + _assert_copy_certificate_called(component, superuser_run, {}) + _assert_restarted_daemons(component.daemons, superuser_run) + + +def test_setup_certificates_with_app_domains(superuser_run, get_status, + component): + """Test that initial copying of certs for an app works.""" + component._domains = ['irrelevant1.example', 'irrelevant2.example'] + component.setup_certificates( + app_domains=['valid.example', 'invalid.example']) + _assert_copy_certificate_called(component, superuser_run, { + 'valid.example': 'valid', + 'invalid.example': 'invalid' + }) + _assert_restarted_daemons(component.daemons, superuser_run) + + +def _assert_compare_certificate_called(component, superuser_run, domains): + """Check that compare certificate was called properly.""" + expected_calls = [] + for domain in domains: + source_private_key_path = \ + '/etc/letsencrypt/live/{}/privkey.pem'.format(domain) + source_certificate_path = \ + '/etc/letsencrypt/live/{}/fullchain.pem'.format(domain) + private_key_path = '/etc/test-app/{}/private.path'.format(domain) + certificate_path = '/etc/test-app/{}/certificate.path'.format(domain) + expected_call = call('letsencrypt', [ + 'compare-certificate', '--managing-app', component.managing_app, + '--source-private-key-path', + str(source_private_key_path), '--source-certificate-path', + str(source_certificate_path), '--private-key-path', + private_key_path, '--certificate-path', certificate_path + ]) + expected_calls.append(expected_call) + + superuser_run.assert_has_calls(expected_calls) + + +def test_get_status(component, superuser_run, get_status): + """Test that getting domain status works.""" + superuser_run.return_value = json.dumps({'result': True}) + assert component.get_status() == { + 'valid.example': 'valid', + 'invalid.example': 'self-signed' + } + _assert_compare_certificate_called(component, superuser_run, + ['valid.example']) + + +def test_get_status_outdate_copy(component, superuser_run, get_status): + """Test that getting domain status works with outdated copy.""" + superuser_run.return_value = json.dumps({'result': False}) + assert component.get_status() == { + 'valid.example': 'outdated-copy', + 'invalid.example': 'self-signed' + } + _assert_compare_certificate_called(component, superuser_run, + ['valid.example']) + + +def test_get_status_without_copy(component, get_status): + """Test that getting domain status works without copying.""" + component.should_copy_certificates = False + assert component.get_status() == { + 'valid.example': 'valid', + 'invalid.example': 'self-signed' + } + + +def test_on_certificate_obtained(superuser_run, component): + """Test that certificate obtained event handler works.""" + component.on_certificate_obtained(['valid.example', 'irrelevant.example'], + '/etc/letsencrypt/live/valid.example/') + _assert_copy_certificate_called(component, superuser_run, { + 'valid.example': 'valid', + }) + _assert_restarted_daemons(component.daemons, superuser_run) + + +def test_on_certificate_obtained_with_all_domains(superuser_run, component): + """Test that certificate obtained event handler works for app with all domains.""" + component._domains = '*' + component.on_certificate_obtained(['valid.example'], + '/etc/letsencrypt/live/valid.example/') + _assert_copy_certificate_called(component, superuser_run, { + 'valid.example': 'valid', + }) + _assert_restarted_daemons(component.daemons, superuser_run) + + +def test_on_certificate_obtained_irrelevant(superuser_run, component): + """Test that certificate obtained event handler works with irrelevant domain.""" + component.on_certificate_obtained( + ['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/') + _assert_copy_certificate_called(component, superuser_run, {}) + _assert_restarted_daemons([], superuser_run) + + +def test_on_certificate_obtained_without_copy(superuser_run, component): + """Test that certificate obtained event handler works without copying.""" + component.should_copy_certificates = False + component.on_certificate_obtained(['valid.example'], + '/etc/letsencrypt/live/valid.example/') + _assert_copy_certificate_called(component, superuser_run, {}) + _assert_restarted_daemons(component.daemons, superuser_run) + + +def test_on_certificate_renewed(superuser_run, component): + """Test that certificate renewed event handler works.""" + component.on_certificate_renewed(['valid.example', 'irrelevant.example'], + '/etc/letsencrypt/live/valid.example/') + _assert_copy_certificate_called(component, superuser_run, { + 'valid.example': 'valid', + }) + _assert_restarted_daemons(component.daemons, superuser_run) + + +def test_on_certificate_renewed_irrelevant(superuser_run, component): + """Test that certificate renewed event handler works for irrelevant domains.""" + component.on_certificate_renewed( + ['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/') + _assert_copy_certificate_called(component, superuser_run, {}) + _assert_restarted_daemons([], superuser_run) + + +def test_on_certificate_renewed_without_copy(superuser_run, component): + """Test that certificate renewed event handler works without copying.""" + component.should_copy_certificates = False + component.on_certificate_renewed(['valid.example'], + '/etc/letsencrypt/live/valid.example/') + _assert_copy_certificate_called(component, superuser_run, {}) + _assert_restarted_daemons(component.daemons, superuser_run) + + +def test_on_certificate_revoked(superuser_run, component): + """Test that certificate revoked event handler works.""" + component.on_certificate_revoked(['valid.example', 'irrelevant.example'], + '/etc/letsencrypt/live/valid.example/') + _assert_copy_certificate_called(component, superuser_run, { + 'valid.example': 'invalid', + }) + _assert_restarted_daemons(component.daemons, superuser_run) + + +def test_on_certificate_revoked_irrelevant(superuser_run, component): + """Test that certificate revoked event handler works for irrelevant domains.""" + component.on_certificate_revoked( + ['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/') + _assert_copy_certificate_called(component, superuser_run, {}) + _assert_restarted_daemons([], superuser_run) + + +def test_on_certificate_revoked_without_copy(superuser_run, component): + """Test that certificate revoked event handler works without copying.""" + component.should_copy_certificates = False + component.on_certificate_revoked(['valid.example'], + '/etc/letsencrypt/live/valid.example/') + _assert_copy_certificate_called(component, superuser_run, {}) + _assert_restarted_daemons(component.daemons, superuser_run) + + +def test_on_certificate_deleted(superuser_run, component): + """Test that certificate deleted event handler works.""" + component.on_certificate_deleted(['valid.example', 'irrelevant.example'], + '/etc/letsencrypt/live/valid.example/') + _assert_copy_certificate_called(component, superuser_run, { + 'valid.example': 'invalid', + }) + _assert_restarted_daemons(component.daemons, superuser_run) + + +def test_on_certificate_deleted_irrelevant(superuser_run, component): + """Test that certificate deleted event handler works for irrelevant domains.""" + component.on_certificate_deleted( + ['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/') + _assert_copy_certificate_called(component, superuser_run, {}) + _assert_restarted_daemons([], superuser_run) + + +def test_on_certificate_deleted_without_copy(superuser_run, component): + """Test that certificate deleted event handler works without copying.""" + component.should_copy_certificates = False + component.on_certificate_deleted(['valid.example'], + '/etc/letsencrypt/live/valid.example/') + _assert_copy_certificate_called(component, superuser_run, {}) + _assert_restarted_daemons(component.daemons, superuser_run)