mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-28 08:03:36 +00:00
letsencrypt: Introduce component for handling certificates
Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: Joseph Nuthalapati <njoseph@thoughtworks.com>
This commit is contained in:
parent
da366636d8
commit
e24a76e1b7
400
plinth/modules/letsencrypt/components.py
Normal file
400
plinth/modules/letsencrypt/components.py
Normal file
@ -0,0 +1,400 @@
|
||||
#
|
||||
# This file is part of FreedomBox.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
"""
|
||||
App component for other apps to use handle Let's Encrypt certificates.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import pathlib
|
||||
import threading
|
||||
|
||||
from plinth import actions, app
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LetsEncrypt(app.FollowerComponent):
|
||||
"""Component to receive Let's Encrypt renewal hooks.
|
||||
|
||||
Performs the following tasks:
|
||||
|
||||
- Listen to certificate change events from the Let's Encrypt app. This
|
||||
could be obtain, re-obtain, renewal, revoke and delete events.
|
||||
|
||||
- Filter out events after comparing domains list in certificate events to
|
||||
the list of domains the app (to which this component belongs) is
|
||||
interested in.
|
||||
|
||||
- If requested, copy certificates from /etc/letsencrypt directory to app's
|
||||
directory with the needed permissions and ownership.
|
||||
|
||||
- Query status of copied certificates. Status could be up-to-date,
|
||||
self-signed, outdated (/etc/letsencrypt has more recent certificate),
|
||||
expired or a test certificate.
|
||||
|
||||
- Optionally restart a bunch of daemons after interesting certificate event
|
||||
occurs.
|
||||
|
||||
"""
|
||||
|
||||
_all = {}
|
||||
|
||||
def __init__(self, component_id, domains=None, daemons=None,
|
||||
should_copy_certificates=False, private_key_path=None,
|
||||
certificate_path=None, user_owner=None, group_owner=None,
|
||||
managing_app=None):
|
||||
"""Initialize the Let's Encrypt component.
|
||||
|
||||
component_id should be a unique ID across all components of an app and
|
||||
across all components.
|
||||
|
||||
domains is a list of domains that the app is configured for. Any
|
||||
certificate operation that is not happening on these domains is ignored
|
||||
by this component.
|
||||
|
||||
domains can be the string '*' in which case it means that the app is
|
||||
relevant for (or is interested in) all domains configured in the
|
||||
system.
|
||||
|
||||
domains can be a callable instead of a list. In this case, the callable
|
||||
is called every time an operation needs to be checked against list of
|
||||
domains for the app. The callable must return a list of domains that
|
||||
the app is configured for.
|
||||
|
||||
daemons is a list of systemd units that need to be restarted (actually
|
||||
try-restarted, i.e., only if they are already running) when a relevant
|
||||
certificate operation happens.
|
||||
|
||||
should_copy_certificates indicates whether the app needs the Let's
|
||||
Encrypt certificates in the directory /etc/letsencrypt to be copied to
|
||||
a different location with different permissions to be able to use them.
|
||||
If true, private_key_path, certificate_path, user_owner, group_owner
|
||||
and managing_app properties must be specified. Otherwise, ValueError is
|
||||
raised.
|
||||
|
||||
private_key_path is the string containing the file path to which a
|
||||
private key must be copied. It may contain the sub-string '{domain}'
|
||||
which will be replaced with the name of the domain for which this
|
||||
certificate is being copied. Any non-existing parent directories are
|
||||
created with 'root:root' ownership and '0o755' file permissions. The
|
||||
permissions for the file will be '0o600' and ownership will be as
|
||||
specified in parameters user_owner and group_owner. If both
|
||||
private_key_path and certificate_path refer to the same path, the
|
||||
target file is created by appending the private key and certificate.
|
||||
|
||||
certificate_path is the string containing the file path to which a
|
||||
certificate must be copied. It may contain the sub-string '{domain}'
|
||||
which will be replaced with the name of the domain for which this
|
||||
certificate is being copied. Any non-existing parent directories are
|
||||
created with 'root:root' ownership and '0o755' file permissions. The
|
||||
permissions for the file will be '0o644' and ownership will be as
|
||||
specified in parameters user_owner and group_owner. If both
|
||||
private_key_path and certificate_path refer to the same path, the
|
||||
target file is created by appending the private key and certificate.
|
||||
|
||||
user_owner specifies the user who should own the copied certificates.
|
||||
This is typically the unprivileged user account that runs the daemon.
|
||||
|
||||
group_owner specifies the group who should own the copied certificates.
|
||||
This is typically the unprivileged group account that runs the daemon.
|
||||
|
||||
managing_app is the name of the app that contains this component. This
|
||||
is used by the superuser action to verify that this app is allowed to
|
||||
copy certificates to the target location. The app must then define a
|
||||
property 'managing_paths' which contains a list of pathlib.Path()
|
||||
objects to which the app is allowed to write certificates and other
|
||||
files to.
|
||||
|
||||
"""
|
||||
if should_copy_certificates:
|
||||
if (not private_key_path or not certificate_path or not user_owner
|
||||
or not group_owner or not managing_app):
|
||||
raise ValueError('Not enough values for copying certificates')
|
||||
|
||||
super().__init__(component_id)
|
||||
self._domains = domains
|
||||
self.daemons = daemons
|
||||
self.should_copy_certificates = should_copy_certificates
|
||||
self.private_key_path = private_key_path
|
||||
self.certificate_path = certificate_path
|
||||
self.user_owner = user_owner
|
||||
self.group_owner = group_owner
|
||||
self.managing_app = managing_app
|
||||
|
||||
self._all[component_id] = self
|
||||
|
||||
@property
|
||||
def domains(self):
|
||||
"""Return a list of domains this component's app is interested in."""
|
||||
if callable(self._domains):
|
||||
return self._domains()
|
||||
|
||||
return self._domains
|
||||
|
||||
@classmethod
|
||||
def list(cls):
|
||||
"""Return a list of all Let's Encrypt components."""
|
||||
return cls._all.values()
|
||||
|
||||
def setup_certificates(self, app_domains=None):
|
||||
"""Setup app certificates for all interested domains.
|
||||
|
||||
For every domain, a certificate is copied. If a valid certificate is
|
||||
not available in Let's Encrypt, a self-signed snakeoil certificate is
|
||||
used. Each daemon is restarted if it is running.
|
||||
|
||||
app_domains is the list of domains for which certificates must be
|
||||
copied. If it is not provided, the component's list of domains (which
|
||||
may be acquired by a callable) is used.
|
||||
|
||||
"""
|
||||
if not app_domains:
|
||||
app_domains = self.domains
|
||||
|
||||
domains, status = self._get_letsencrypt_domains()
|
||||
|
||||
if self.should_copy_certificates:
|
||||
for domain in app_domains:
|
||||
if domain in domains:
|
||||
lineage = status['domains'][domain]['lineage']
|
||||
self._copy_letsencrypt_certificates([domain], lineage)
|
||||
else:
|
||||
self._copy_self_signed_certificates([domain])
|
||||
|
||||
for daemon in self.daemons:
|
||||
actions.superuser_run('service', ['try-restart', daemon])
|
||||
|
||||
def get_status(self):
|
||||
"""Return the status of certificates for all interested domains.
|
||||
|
||||
A dictionary is returned containing a key for each domain and the value
|
||||
shall be a simple string with state of certificate for this app.
|
||||
Possible values are 'outdated-copy', 'self-signed', 'unknown', 'valid',
|
||||
'revoked', 'expired' and 'test_cert'.
|
||||
|
||||
"""
|
||||
app_domains = self.domains
|
||||
domains, le_status = self._get_letsencrypt_domains()
|
||||
|
||||
final_status = {}
|
||||
for domain in app_domains:
|
||||
if domain in domains:
|
||||
status = le_status['domains'][domain]['validity']
|
||||
if self.should_copy_certificates:
|
||||
if not self._compare_certificate(
|
||||
domain, le_status['domains'][domain]['lineage']):
|
||||
status = 'outdated-copy'
|
||||
else:
|
||||
status = 'self-signed'
|
||||
|
||||
final_status[domain] = status
|
||||
|
||||
return final_status
|
||||
|
||||
def on_certificate_obtained(self, domains, lineage):
|
||||
"""Handle event when a certificate is obtained.
|
||||
|
||||
Filter any domains that are interesting for the app that this component
|
||||
is part of. For all interesting domains, copy the certificates if
|
||||
requested and then restart the daemons.
|
||||
|
||||
"""
|
||||
interested_domains = self._get_interested_domains(domains)
|
||||
if not interested_domains:
|
||||
return
|
||||
|
||||
if self.should_copy_certificates:
|
||||
self._copy_letsencrypt_certificates(interested_domains, lineage)
|
||||
|
||||
for daemon in self.daemons:
|
||||
actions.superuser_run('service', ['try-restart', daemon])
|
||||
|
||||
def on_certificate_renewed(self, domains, lineage):
|
||||
"""Handle event when a certificate is renewed.
|
||||
|
||||
This is typically called when a Cron job triggers certbot renewal
|
||||
process which in turn triggers configured renewal hooks after
|
||||
successful renewal. It is also called when a valid certificate is
|
||||
re-obtained.
|
||||
|
||||
To trigger a renewal event for testing purposes, run `certbot renew
|
||||
--force-renewal`.
|
||||
|
||||
For now, performs the same operations as obtaining a fresh certificate.
|
||||
|
||||
"""
|
||||
return self.on_certificate_obtained(domains, lineage)
|
||||
|
||||
def on_certificate_revoked(self, domains, lineage):
|
||||
"""Handle event when a certificate is revoked.
|
||||
|
||||
Filter any domains that are interesting for the app that this component
|
||||
is part of. For all interesting domains, copy self-signed snakeoil
|
||||
certificates if requested and then restart the daemons.
|
||||
|
||||
"""
|
||||
interested_domains = self._get_interested_domains(domains)
|
||||
if not interested_domains:
|
||||
return
|
||||
|
||||
if self.should_copy_certificates:
|
||||
self._copy_self_signed_certificates(interested_domains)
|
||||
|
||||
for daemon in self.daemons:
|
||||
actions.superuser_run('service', ['try-restart', daemon])
|
||||
|
||||
def on_certificate_deleted(self, domains, lineage):
|
||||
"""Handle event when a certificate is deleted.
|
||||
|
||||
For now, performs the same operations as obtaining a fresh certificate.
|
||||
|
||||
"""
|
||||
return self.on_certificate_revoked(domains, lineage)
|
||||
|
||||
@staticmethod
|
||||
def _get_letsencrypt_domains():
|
||||
"""Return the list of domains with LE certificates."""
|
||||
from plinth.modules import letsencrypt
|
||||
status = letsencrypt.get_status()
|
||||
domains = {
|
||||
domain
|
||||
for domain, domain_status in status['domains'].items()
|
||||
if domain_status
|
||||
}
|
||||
return domains, status
|
||||
|
||||
def _get_interested_domains(self, domains):
|
||||
"""Return set of domains the component is interested in."""
|
||||
app_domains = self.domains
|
||||
if not app_domains:
|
||||
return set()
|
||||
|
||||
if app_domains == '*':
|
||||
return set(domains)
|
||||
|
||||
return set(domains).intersection(set(app_domains))
|
||||
|
||||
def _copy_self_signed_certificates(self, domains):
|
||||
"""Copy a self-signed certificate for all domains.
|
||||
|
||||
Copy Apache's snake-oil certificate into daemon's directory. The
|
||||
self-signed certificate may not really work. It is merely to prevent
|
||||
the server from failing to startup because the files are missing.
|
||||
|
||||
"""
|
||||
source_private_key_path = '/etc/ssl/private/ssl-cert-snakeoil.key'
|
||||
source_certificate_path = '/etc/ssl/certs/ssl-cert-snakeoil.pem'
|
||||
self._copy_certificates(domains, source_private_key_path,
|
||||
source_certificate_path)
|
||||
|
||||
def _copy_letsencrypt_certificates(self, domains, lineage):
|
||||
"""Copy a valid Let's Encrypt certificate for given domains."""
|
||||
source_private_key_path = pathlib.Path(lineage) / 'privkey.pem'
|
||||
source_certificate_path = pathlib.Path(lineage) / 'fullchain.pem'
|
||||
self._copy_certificates(domains, source_private_key_path,
|
||||
source_certificate_path)
|
||||
|
||||
def _copy_certificates(self, domains, source_private_key_path,
|
||||
source_certificate_path):
|
||||
"""Copy certificate for all domains in the certificate.
|
||||
|
||||
The list of domains passed should be subset of domains listed in the
|
||||
certificate.
|
||||
|
||||
"""
|
||||
if '{domain}' in self.private_key_path or \
|
||||
'{domain}' in self.certificate_path:
|
||||
for domain in domains:
|
||||
private_key_path = self.private_key_path.format(domain=domain)
|
||||
certificate_path = self.certificate_path.format(domain=domain)
|
||||
self._copy_certificate(source_private_key_path,
|
||||
source_certificate_path,
|
||||
private_key_path, certificate_path)
|
||||
else:
|
||||
self._copy_certificate(
|
||||
source_private_key_path, source_certificate_path,
|
||||
self.private_key_path, self.certificate_path)
|
||||
|
||||
def _copy_certificate(self, source_private_key_path,
|
||||
source_certificate_path, private_key_path,
|
||||
certificate_path):
|
||||
"""Copy certificate for a single domain."""
|
||||
actions.superuser_run('letsencrypt', [
|
||||
'copy-certificate', '--managing-app', self.managing_app,
|
||||
'--user-owner', self.user_owner, '--group-owner', self.group_owner,
|
||||
'--source-private-key-path',
|
||||
str(source_private_key_path), '--source-certificate-path',
|
||||
str(source_certificate_path), '--private-key-path',
|
||||
private_key_path, '--certificate-path', certificate_path
|
||||
])
|
||||
|
||||
def _compare_certificate(self, domain, lineage):
|
||||
"""Compare LE certificate with app certificate."""
|
||||
source_private_key_path = pathlib.Path(lineage) / 'privkey.pem'
|
||||
source_certificate_path = pathlib.Path(lineage) / 'fullchain.pem'
|
||||
private_key_path = self.private_key_path.format(domain=domain)
|
||||
certificate_path = self.certificate_path.format(domain=domain)
|
||||
output = actions.superuser_run('letsencrypt', [
|
||||
'compare-certificate', '--managing-app', self.managing_app,
|
||||
'--source-private-key-path',
|
||||
str(source_private_key_path), '--source-certificate-path',
|
||||
str(source_certificate_path), '--private-key-path',
|
||||
private_key_path, '--certificate-path', certificate_path
|
||||
])
|
||||
return json.loads(output)['result']
|
||||
|
||||
|
||||
def on_certificate_event(event, domains, lineage):
|
||||
"""Start a new thread to handle a LE certificate event.
|
||||
|
||||
Run in a new thread because:
|
||||
|
||||
- We don't want to block the thread running Glib main loop when called from
|
||||
dbus handler in case of renewal.
|
||||
|
||||
- We don't want to delay the a certificate operation with copying
|
||||
certificates to all apps.
|
||||
|
||||
- We don't want to cause a page load error when restarting Apache due to
|
||||
certificate changes.
|
||||
|
||||
"""
|
||||
threading.Thread(target=on_certificate_event_sync, args=(event, domains,
|
||||
lineage)).start()
|
||||
|
||||
|
||||
def on_certificate_event_sync(event, domains, lineage):
|
||||
"""Trigger certificate event hooks across all apps."""
|
||||
if isinstance(domains, str):
|
||||
domains = domains.split()
|
||||
|
||||
if not lineage:
|
||||
# XXX: A better to find lineage is by reading the renewal/{domain}.conf
|
||||
lineage = str(pathlib.Path('/etc/letsencrypt/live/') / domains[0])
|
||||
|
||||
assert event in ('obtained', 'renewed', 'revoked', 'deleted')
|
||||
|
||||
for component in LetsEncrypt.list():
|
||||
logger.info('Handling certificate event for %s: %s, %s, %s',
|
||||
component.component_id, event, domains, lineage)
|
||||
try:
|
||||
getattr(component, 'on_certificate_' + event)(domains, lineage)
|
||||
except Exception as exception:
|
||||
logger.exception(
|
||||
'Error executing certificate hook for %s: %s, %s, %s: %s',
|
||||
component.component_id, event, domains, lineage, exception)
|
||||
383
plinth/modules/letsencrypt/tests/test_components.py
Normal file
383
plinth/modules/letsencrypt/tests/test_components.py
Normal file
@ -0,0 +1,383 @@
|
||||
#
|
||||
# This file is part of FreedomBox.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
"""
|
||||
Test the Let's Encrypt component for managing certificates.
|
||||
"""
|
||||
|
||||
import json
|
||||
from unittest.mock import call, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from plinth.modules.letsencrypt.components import LetsEncrypt
|
||||
|
||||
|
||||
@pytest.fixture(name='empty_letsencrypt_list', autouse=True)
|
||||
def fixture_empty_letsencrypt_list():
|
||||
"""Remove all entries from Let's Encrypt component list."""
|
||||
LetsEncrypt._all = {}
|
||||
|
||||
|
||||
@pytest.fixture(name='component')
|
||||
def fixture_component():
|
||||
"""Create a new component for testing."""
|
||||
return LetsEncrypt(
|
||||
'test-component', domains=['valid.example', 'invalid.example'],
|
||||
daemons=['test-daemon'], should_copy_certificates=True,
|
||||
private_key_path='/etc/test-app/{domain}/private.path',
|
||||
certificate_path='/etc/test-app/{domain}/certificate.path',
|
||||
user_owner='test-user', group_owner='test-group',
|
||||
managing_app='test-app')
|
||||
|
||||
|
||||
@pytest.fixture(name='get_status')
|
||||
def fixture_get_status():
|
||||
"""Return patched letsencrypt.get_status() method."""
|
||||
domains = ['valid.example']
|
||||
with patch('plinth.modules.letsencrypt.get_status') as get_status:
|
||||
get_status.return_value = {
|
||||
'domains': {
|
||||
domain: {
|
||||
'lineage': '/etc/letsencrypt/live/' + domain,
|
||||
'validity': 'valid',
|
||||
}
|
||||
for domain in domains
|
||||
}
|
||||
}
|
||||
yield get_status
|
||||
|
||||
|
||||
@pytest.fixture(name='superuser_run')
|
||||
def fixture_superuser_run():
|
||||
"""Return patched plinth.actions.superuser_run() method."""
|
||||
with patch('plinth.actions.superuser_run') as superuser_run:
|
||||
yield superuser_run
|
||||
|
||||
|
||||
def test_init_without_arguments():
|
||||
"""Test that component is initialized with defaults properly."""
|
||||
component = LetsEncrypt('test-component')
|
||||
|
||||
assert component.component_id == 'test-component'
|
||||
assert component.domains is None
|
||||
assert component.daemons is None
|
||||
assert not component.should_copy_certificates
|
||||
assert component.private_key_path is None
|
||||
assert component.certificate_path is None
|
||||
assert component.user_owner is None
|
||||
assert component.group_owner is None
|
||||
assert component.managing_app is None
|
||||
assert len(component._all) == 1
|
||||
assert component._all['test-component'] == component
|
||||
|
||||
|
||||
def test_init(component):
|
||||
"""Test initializing the component."""
|
||||
assert component.domains == ['valid.example', 'invalid.example']
|
||||
assert component.daemons == ['test-daemon']
|
||||
assert component.should_copy_certificates
|
||||
assert component.private_key_path == '/etc/test-app/{domain}/private.path'
|
||||
assert component.certificate_path == \
|
||||
'/etc/test-app/{domain}/certificate.path'
|
||||
assert component.user_owner == 'test-user'
|
||||
assert component.group_owner == 'test-group'
|
||||
assert component.managing_app == 'test-app'
|
||||
|
||||
|
||||
def test_init_values():
|
||||
"""Test initializing with invalid values."""
|
||||
properties = {
|
||||
'private_key_path': 'test-private-key-path',
|
||||
'certificate_path': 'test-certificate-path',
|
||||
'user_owner': 'test-user',
|
||||
'group_owner': 'test-group',
|
||||
'managing_app': 'test-app'
|
||||
}
|
||||
LetsEncrypt('test-component', should_copy_certificates=True, **properties)
|
||||
for key in properties:
|
||||
new_properties = dict(properties)
|
||||
new_properties[key] = None
|
||||
with pytest.raises(ValueError):
|
||||
LetsEncrypt('test-component', should_copy_certificates=True,
|
||||
**new_properties)
|
||||
|
||||
|
||||
def test_domains():
|
||||
"""Test getting domains."""
|
||||
component = LetsEncrypt('test-component', domains=lambda: ['test-domains'])
|
||||
assert component.domains == ['test-domains']
|
||||
|
||||
|
||||
def test_list():
|
||||
"""Test listing components."""
|
||||
component1 = LetsEncrypt('test-component1')
|
||||
component2 = LetsEncrypt('test-component2')
|
||||
assert set(LetsEncrypt.list()) == {component1, component2}
|
||||
|
||||
|
||||
def _assert_copy_certificate_called(component, superuser_run, domains):
|
||||
"""Check that copy certificate calls have been made properly."""
|
||||
copy_calls = [
|
||||
mock_call for mock_call in superuser_run.mock_calls
|
||||
if mock_call[1][0] == 'letsencrypt'
|
||||
and mock_call[1][1][0] == 'copy-certificate'
|
||||
]
|
||||
expected_calls = []
|
||||
for domain, domain_status in domains.items():
|
||||
if domain_status == 'valid':
|
||||
source_private_key_path = \
|
||||
'/etc/letsencrypt/live/{}/privkey.pem'.format(domain)
|
||||
source_certificate_path = \
|
||||
'/etc/letsencrypt/live/{}/fullchain.pem'.format(domain)
|
||||
else:
|
||||
source_private_key_path = '/etc/ssl/private/ssl-cert-snakeoil.key'
|
||||
source_certificate_path = '/etc/ssl/certs/ssl-cert-snakeoil.pem'
|
||||
|
||||
private_key_path = '/etc/test-app/{}/private.path'.format(domain)
|
||||
certificate_path = '/etc/test-app/{}/certificate.path'.format(domain)
|
||||
expected_call = call('letsencrypt', [
|
||||
'copy-certificate', '--managing-app', component.managing_app,
|
||||
'--user-owner', component.user_owner, '--group-owner',
|
||||
component.group_owner, '--source-private-key-path',
|
||||
str(source_private_key_path), '--source-certificate-path',
|
||||
str(source_certificate_path), '--private-key-path',
|
||||
private_key_path, '--certificate-path', certificate_path
|
||||
])
|
||||
expected_calls.append(expected_call)
|
||||
|
||||
assert len(expected_calls) == len(copy_calls)
|
||||
for expected_call in expected_calls:
|
||||
print(expected_call)
|
||||
print(copy_calls)
|
||||
assert expected_call in copy_calls
|
||||
|
||||
|
||||
def _assert_restarted_daemons(daemons, superuser_run):
|
||||
"""Check that a call has restarted the daemons of a component."""
|
||||
run_calls = [
|
||||
mock_call for mock_call in superuser_run.mock_calls
|
||||
if mock_call[1][0] == 'service'
|
||||
]
|
||||
expected_calls = [
|
||||
call('service', ['try-restart', daemon]) for daemon in daemons
|
||||
]
|
||||
assert len(expected_calls) == len(run_calls)
|
||||
for expected_call in expected_calls:
|
||||
assert expected_call in run_calls
|
||||
|
||||
|
||||
def test_setup_certificates(superuser_run, get_status, component):
|
||||
"""Test that initial copying of certs for an app works."""
|
||||
component.setup_certificates()
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
'valid.example': 'valid',
|
||||
'invalid.example': 'invalid'
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
|
||||
|
||||
def test_setup_certificates_without_copy(superuser_run, get_status, component):
|
||||
"""Test that initial copying of certs for an app works."""
|
||||
component.should_copy_certificates = False
|
||||
component.setup_certificates()
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
|
||||
|
||||
def test_setup_certificates_with_app_domains(superuser_run, get_status,
|
||||
component):
|
||||
"""Test that initial copying of certs for an app works."""
|
||||
component._domains = ['irrelevant1.example', 'irrelevant2.example']
|
||||
component.setup_certificates(
|
||||
app_domains=['valid.example', 'invalid.example'])
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
'valid.example': 'valid',
|
||||
'invalid.example': 'invalid'
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
|
||||
|
||||
def _assert_compare_certificate_called(component, superuser_run, domains):
|
||||
"""Check that compare certificate was called properly."""
|
||||
expected_calls = []
|
||||
for domain in domains:
|
||||
source_private_key_path = \
|
||||
'/etc/letsencrypt/live/{}/privkey.pem'.format(domain)
|
||||
source_certificate_path = \
|
||||
'/etc/letsencrypt/live/{}/fullchain.pem'.format(domain)
|
||||
private_key_path = '/etc/test-app/{}/private.path'.format(domain)
|
||||
certificate_path = '/etc/test-app/{}/certificate.path'.format(domain)
|
||||
expected_call = call('letsencrypt', [
|
||||
'compare-certificate', '--managing-app', component.managing_app,
|
||||
'--source-private-key-path',
|
||||
str(source_private_key_path), '--source-certificate-path',
|
||||
str(source_certificate_path), '--private-key-path',
|
||||
private_key_path, '--certificate-path', certificate_path
|
||||
])
|
||||
expected_calls.append(expected_call)
|
||||
|
||||
superuser_run.assert_has_calls(expected_calls)
|
||||
|
||||
|
||||
def test_get_status(component, superuser_run, get_status):
|
||||
"""Test that getting domain status works."""
|
||||
superuser_run.return_value = json.dumps({'result': True})
|
||||
assert component.get_status() == {
|
||||
'valid.example': 'valid',
|
||||
'invalid.example': 'self-signed'
|
||||
}
|
||||
_assert_compare_certificate_called(component, superuser_run,
|
||||
['valid.example'])
|
||||
|
||||
|
||||
def test_get_status_outdate_copy(component, superuser_run, get_status):
|
||||
"""Test that getting domain status works with outdated copy."""
|
||||
superuser_run.return_value = json.dumps({'result': False})
|
||||
assert component.get_status() == {
|
||||
'valid.example': 'outdated-copy',
|
||||
'invalid.example': 'self-signed'
|
||||
}
|
||||
_assert_compare_certificate_called(component, superuser_run,
|
||||
['valid.example'])
|
||||
|
||||
|
||||
def test_get_status_without_copy(component, get_status):
|
||||
"""Test that getting domain status works without copying."""
|
||||
component.should_copy_certificates = False
|
||||
assert component.get_status() == {
|
||||
'valid.example': 'valid',
|
||||
'invalid.example': 'self-signed'
|
||||
}
|
||||
|
||||
|
||||
def test_on_certificate_obtained(superuser_run, component):
|
||||
"""Test that certificate obtained event handler works."""
|
||||
component.on_certificate_obtained(['valid.example', 'irrelevant.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
'valid.example': 'valid',
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_obtained_with_all_domains(superuser_run, component):
|
||||
"""Test that certificate obtained event handler works for app with all domains."""
|
||||
component._domains = '*'
|
||||
component.on_certificate_obtained(['valid.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
'valid.example': 'valid',
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_obtained_irrelevant(superuser_run, component):
|
||||
"""Test that certificate obtained event handler works with irrelevant domain."""
|
||||
component.on_certificate_obtained(
|
||||
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons([], superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_obtained_without_copy(superuser_run, component):
|
||||
"""Test that certificate obtained event handler works without copying."""
|
||||
component.should_copy_certificates = False
|
||||
component.on_certificate_obtained(['valid.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_renewed(superuser_run, component):
|
||||
"""Test that certificate renewed event handler works."""
|
||||
component.on_certificate_renewed(['valid.example', 'irrelevant.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
'valid.example': 'valid',
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_renewed_irrelevant(superuser_run, component):
|
||||
"""Test that certificate renewed event handler works for irrelevant domains."""
|
||||
component.on_certificate_renewed(
|
||||
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons([], superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_renewed_without_copy(superuser_run, component):
|
||||
"""Test that certificate renewed event handler works without copying."""
|
||||
component.should_copy_certificates = False
|
||||
component.on_certificate_renewed(['valid.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_revoked(superuser_run, component):
|
||||
"""Test that certificate revoked event handler works."""
|
||||
component.on_certificate_revoked(['valid.example', 'irrelevant.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
'valid.example': 'invalid',
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_revoked_irrelevant(superuser_run, component):
|
||||
"""Test that certificate revoked event handler works for irrelevant domains."""
|
||||
component.on_certificate_revoked(
|
||||
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons([], superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_revoked_without_copy(superuser_run, component):
|
||||
"""Test that certificate revoked event handler works without copying."""
|
||||
component.should_copy_certificates = False
|
||||
component.on_certificate_revoked(['valid.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_deleted(superuser_run, component):
|
||||
"""Test that certificate deleted event handler works."""
|
||||
component.on_certificate_deleted(['valid.example', 'irrelevant.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
'valid.example': 'invalid',
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_deleted_irrelevant(superuser_run, component):
|
||||
"""Test that certificate deleted event handler works for irrelevant domains."""
|
||||
component.on_certificate_deleted(
|
||||
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons([], superuser_run)
|
||||
|
||||
|
||||
def test_on_certificate_deleted_without_copy(superuser_run, component):
|
||||
"""Test that certificate deleted event handler works without copying."""
|
||||
component.should_copy_certificates = False
|
||||
component.on_certificate_deleted(['valid.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
Loading…
x
Reference in New Issue
Block a user