mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-05-27 10:44:33 +00:00
letsencrypt: Use privileged decorator for actions
Tests: - DONE: Initial setup works - DONE: Certificate events on FreedomBox startup work - DONE: Basic operations work: obtain, revoke, delete - DONE: Status of certificates is shown properly - DONE: Domain add/remove hooks work, errors are handled - Not tested: Removing old hooks - DONE: Errors are shown properly on failure: revoke, obtain, reobtain, delete Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
parent
02ef750442
commit
c1cf5699c2
@ -1,493 +1,9 @@
|
||||
#!/usr/bin/python3
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""Legacy configuration helper for Let's Encrypt, kept for compatibility.
|
||||
|
||||
LE configuration in the earlier implementation used to call into this script
|
||||
with the sub-commands 'run-pre-hooks', 'run-renew-hooks' and 'run-post-hooks'.
|
||||
This action script now allows for any arbitrary sub-command to be called and
|
||||
does nothing. It can be removed after the release of Debian 12 (bookworm).
|
||||
"""
|
||||
Configuration helper for Let's Encrypt.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import filecmp
|
||||
import glob
|
||||
import importlib
|
||||
import inspect
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
import configobj
|
||||
|
||||
from plinth import action_utils
|
||||
from plinth import app as app_module
|
||||
from plinth import cfg
|
||||
from plinth.modules import letsencrypt as le
|
||||
from plinth.modules.letsencrypt.components import LetsEncrypt
|
||||
|
||||
TEST_MODE = False
|
||||
LE_DIRECTORY = '/etc/letsencrypt/'
|
||||
ETC_SSL_DIRECTORY = '/etc/ssl/'
|
||||
RENEWAL_DIRECTORY = '/etc/letsencrypt/renewal/'
|
||||
AUTHENTICATOR = 'webroot'
|
||||
WEB_ROOT_PATH = '/var/www/html'
|
||||
APACHE_PREFIX = '/etc/apache2/sites-available/'
|
||||
APACHE_CONFIGURATION = '''
|
||||
Use FreedomBoxTLSSiteMacro {domain}
|
||||
'''
|
||||
|
||||
|
||||
def parse_arguments():
|
||||
"""Return parsed command line arguments as dictionary."""
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
|
||||
|
||||
setup_parser = subparsers.add_parser(
|
||||
'setup', help='Run any setup/upgrade activities.')
|
||||
setup_parser.add_argument(
|
||||
'--old-version', type=int, required=True,
|
||||
help='Version number being upgraded from or None if setting up first '
|
||||
'time.')
|
||||
|
||||
subparsers.add_parser('get-status',
|
||||
help='Return the status of configured domains.')
|
||||
subparser = subparsers.add_parser(
|
||||
'get-modified-time',
|
||||
help='Return the modified time for a certificate.')
|
||||
subparser.add_argument('--domain', required=True,
|
||||
help='Domain name to get modified time for')
|
||||
revoke_parser = subparsers.add_parser(
|
||||
'revoke', help='Revoke certificate of a domain and disable website.')
|
||||
revoke_parser.add_argument('--domain', required=True,
|
||||
help='Domain name to revoke certificate for')
|
||||
obtain_parser = subparsers.add_parser(
|
||||
'obtain', help='Obtain certificate for a domain and setup website.')
|
||||
obtain_parser.add_argument('--domain', required=True,
|
||||
help='Domain name to obtain certificate for')
|
||||
delete_parser = subparsers.add_parser(
|
||||
'delete', help='Delete certificate for a domain and disable website.')
|
||||
delete_parser.add_argument('--domain', required=True,
|
||||
help='Domain name to delete certificate of')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'copy-certificate',
|
||||
help='Copy LE certificate to a daemon\'s directory')
|
||||
subparser.add_argument('--managing-app', required=True,
|
||||
help='App needing the certificate')
|
||||
subparser.add_argument('--user-owner', required=True,
|
||||
help='User who should own the certificate')
|
||||
subparser.add_argument('--group-owner', required=True,
|
||||
help='Group that should own the certificate')
|
||||
subparser.add_argument('--source-private-key-path', required=True,
|
||||
help='Path to the source private key')
|
||||
subparser.add_argument(
|
||||
'--source-certificate-path', required=True,
|
||||
help='Path to the source certificate with public key')
|
||||
subparser.add_argument('--private-key-path', required=True,
|
||||
help='Path to the private key')
|
||||
subparser.add_argument('--certificate-path', required=True,
|
||||
help='Path to the certificate with public key')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'compare-certificate',
|
||||
help='Compare LE certificate to one in daemon\'s directory')
|
||||
subparser.add_argument('--managing-app', required=True,
|
||||
help='App needing the certificate')
|
||||
subparser.add_argument('--source-private-key-path', required=True,
|
||||
help='Path to the source private key')
|
||||
subparser.add_argument(
|
||||
'--source-certificate-path', required=True,
|
||||
help='Path to the source certificate with public key')
|
||||
subparser.add_argument('--private-key-path', required=True,
|
||||
help='Path to the private key')
|
||||
subparser.add_argument('--certificate-path', required=True,
|
||||
help='Path to the certificate with public key')
|
||||
|
||||
help_hooks = 'Does nothing, kept for compatibility.'
|
||||
subparser = subparsers.add_parser('run_pre_hooks', help=help_hooks)
|
||||
subparser.add_argument('--domain')
|
||||
subparser.add_argument('--modules', nargs='+', default=[])
|
||||
|
||||
subparser = subparsers.add_parser('run_renew_hooks', help=help_hooks)
|
||||
subparser.add_argument('--domain')
|
||||
subparser.add_argument('--modules', nargs='+', default=[])
|
||||
|
||||
subparser = subparsers.add_parser('run_post_hooks', help=help_hooks)
|
||||
subparser.add_argument('--domain')
|
||||
subparser.add_argument('--modules', nargs='+', default=[])
|
||||
|
||||
subparsers.required = True
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def get_certificate_expiry(domain):
|
||||
"""Return the expiry date of a certificate."""
|
||||
certificate_file = os.path.join(le.LIVE_DIRECTORY, domain, 'cert.pem')
|
||||
output = subprocess.check_output(
|
||||
['openssl', 'x509', '-enddate', '-noout', '-in', certificate_file])
|
||||
return output.decode().strip().split('=')[1]
|
||||
|
||||
|
||||
def get_modified_time(domain):
|
||||
"""Return the last modified time of a certificate."""
|
||||
certificate_file = pathlib.Path(le.LIVE_DIRECTORY) / domain / 'cert.pem'
|
||||
return int(certificate_file.stat().st_mtime)
|
||||
|
||||
|
||||
def get_validity_status(domain):
|
||||
"""Return validity status of a certificate, e.g. valid, revoked, expired"""
|
||||
output = subprocess.check_output(['certbot', 'certificates', '-d', domain])
|
||||
output = output.decode(sys.stdout.encoding)
|
||||
|
||||
match = re.search(r'INVALID: (.*)\)', output)
|
||||
if match is not None:
|
||||
validity = match.group(1).lower()
|
||||
elif re.search('VALID', output) is not None:
|
||||
validity = 'valid'
|
||||
else:
|
||||
validity = 'unknown'
|
||||
|
||||
return validity
|
||||
|
||||
|
||||
def get_status():
|
||||
"""
|
||||
Return Python dictionary of currently configured domains.
|
||||
Should be run as root, otherwise might yield a wrong, empty answer.
|
||||
"""
|
||||
try:
|
||||
domains = os.listdir(le.LIVE_DIRECTORY)
|
||||
except OSError:
|
||||
domains = []
|
||||
|
||||
domains = [
|
||||
domain for domain in domains
|
||||
if os.path.isdir(os.path.join(le.LIVE_DIRECTORY, domain))
|
||||
]
|
||||
|
||||
domain_status = {}
|
||||
for domain in domains:
|
||||
domain_status[domain] = {
|
||||
'certificate_available':
|
||||
True,
|
||||
'expiry_date':
|
||||
get_certificate_expiry(domain),
|
||||
'web_enabled':
|
||||
action_utils.webserver_is_enabled(domain, kind='site'),
|
||||
'validity':
|
||||
get_validity_status(domain),
|
||||
'lineage':
|
||||
str(pathlib.Path(le.LIVE_DIRECTORY) / domain),
|
||||
'modified_time':
|
||||
get_modified_time(domain)
|
||||
}
|
||||
return domain_status
|
||||
|
||||
|
||||
def subcommand_setup(arguments):
|
||||
"""Upgrade old site configuration to new macro based style.
|
||||
|
||||
Nothing to do for first time setup and for newer versions.
|
||||
"""
|
||||
if arguments.old_version == 2:
|
||||
_remove_old_hooks()
|
||||
return
|
||||
|
||||
if arguments.old_version != 1:
|
||||
return
|
||||
|
||||
domain_status = get_status()
|
||||
with action_utils.WebserverChange() as webserver_change:
|
||||
for domain in domain_status:
|
||||
setup_webserver_config(domain, webserver_change)
|
||||
|
||||
|
||||
def subcommand_get_status(_):
|
||||
"""Print a JSON dictionary of currently configured domains."""
|
||||
domain_status = get_status()
|
||||
print(json.dumps({'domains': domain_status}))
|
||||
|
||||
|
||||
def subcommand_get_modified_time(arguments):
|
||||
"""Print the modified time of a certificate as integer."""
|
||||
print(get_modified_time(arguments.domain))
|
||||
|
||||
|
||||
def subcommand_revoke(arguments):
|
||||
"""Disable a domain and revoke the certificate."""
|
||||
domain = arguments.domain
|
||||
|
||||
cert_path = pathlib.Path(le.LIVE_DIRECTORY) / domain / 'cert.pem'
|
||||
if cert_path.exists():
|
||||
command = [
|
||||
'certbot', 'revoke', '--non-interactive', '--domain', domain,
|
||||
'--cert-path', cert_path
|
||||
]
|
||||
if TEST_MODE:
|
||||
command.append('--staging')
|
||||
|
||||
process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
_, stderr = process.communicate()
|
||||
if process.returncode:
|
||||
print(stderr.decode(), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
action_utils.webserver_disable(domain, kind='site')
|
||||
|
||||
|
||||
def subcommand_obtain(arguments):
|
||||
"""Obtain a certificate for a domain and setup website."""
|
||||
domain = arguments.domain
|
||||
|
||||
command = [
|
||||
'certbot', 'certonly', '--non-interactive', '--text', '--agree-tos',
|
||||
'--register-unsafely-without-email', '--domain', arguments.domain,
|
||||
'--authenticator', AUTHENTICATOR, '--webroot-path', WEB_ROOT_PATH,
|
||||
'--renew-by-default'
|
||||
]
|
||||
if TEST_MODE:
|
||||
command.append('--staging')
|
||||
|
||||
process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
_, stderr = process.communicate()
|
||||
if process.returncode:
|
||||
print(stderr.decode(), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
with action_utils.WebserverChange() as webserver_change:
|
||||
setup_webserver_config(domain, webserver_change)
|
||||
|
||||
|
||||
def _remove_old_hooks():
|
||||
"""Remove old style renewal hooks from individual configuration files.
|
||||
|
||||
This has been replaced with global hooks by adding script files in
|
||||
directory /etc/letsencrypt/renewal-hooks/{pre,post,deploy}/.
|
||||
|
||||
"""
|
||||
for file_path in glob.glob(RENEWAL_DIRECTORY + '*.conf'):
|
||||
try:
|
||||
_remove_old_hooks_from_file(file_path)
|
||||
except Exception as exception:
|
||||
print('Error removing hooks from file:', file_path, exception)
|
||||
|
||||
|
||||
def _remove_old_hooks_from_file(file_path):
|
||||
"""Remove old style hooks from a single configuration file."""
|
||||
config = configobj.ConfigObj(file_path)
|
||||
edited = False
|
||||
for line in config.initial_comment:
|
||||
if 'edited by plinth' in line.lower():
|
||||
edited = True
|
||||
|
||||
if not edited:
|
||||
return
|
||||
|
||||
config.initial_comment = [
|
||||
line for line in config.initial_comment
|
||||
if 'edited by plinth' not in line.lower()
|
||||
]
|
||||
|
||||
if 'pre_hook' in config['renewalparams']:
|
||||
del config['renewalparams']['pre_hook']
|
||||
|
||||
if 'renew_hook' in config['renewalparams']:
|
||||
del config['renewalparams']['renew_hook']
|
||||
|
||||
if 'post_hook' in config['renewalparams']:
|
||||
del config['renewalparams']['post_hook']
|
||||
|
||||
config.write()
|
||||
|
||||
|
||||
def subcommand_copy_certificate(arguments):
|
||||
"""Copy certificate from LE directory to daemon's directory.
|
||||
|
||||
Set ownership and permissions as requested needed by the daemon.
|
||||
|
||||
"""
|
||||
source_private_key_path = pathlib.Path(
|
||||
arguments.source_private_key_path).resolve()
|
||||
_assert_source_directory(source_private_key_path)
|
||||
source_certificate_path = pathlib.Path(
|
||||
arguments.source_certificate_path).resolve()
|
||||
_assert_source_directory(source_certificate_path)
|
||||
|
||||
private_key_path = pathlib.Path(arguments.private_key_path).resolve()
|
||||
_assert_managed_path(arguments.managing_app, private_key_path)
|
||||
certificate_path = pathlib.Path(arguments.certificate_path).resolve()
|
||||
_assert_managed_path(arguments.managing_app, certificate_path)
|
||||
|
||||
# Create directories, owned by root
|
||||
private_key_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
|
||||
certificate_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
|
||||
|
||||
# Private key is only accessible to the user owner
|
||||
old_mask = os.umask(0o177)
|
||||
shutil.copyfile(source_private_key_path, private_key_path)
|
||||
|
||||
if certificate_path != private_key_path:
|
||||
# Certificate is only writable by the user owner
|
||||
os.umask(0o133)
|
||||
shutil.copyfile(source_certificate_path, certificate_path)
|
||||
else:
|
||||
# If private key and certificate are the same file, append one after
|
||||
# the other.
|
||||
source_certificate = source_certificate_path.read_bytes()
|
||||
with private_key_path.open(mode='a+b') as file_handle:
|
||||
file_handle.write(source_certificate)
|
||||
|
||||
os.umask(old_mask)
|
||||
|
||||
shutil.chown(certificate_path, user=arguments.user_owner,
|
||||
group=arguments.group_owner)
|
||||
shutil.chown(private_key_path, user=arguments.user_owner,
|
||||
group=arguments.group_owner)
|
||||
|
||||
|
||||
def subcommand_compare_certificate(arguments):
|
||||
"""Compare LE certificate with an app certificate."""
|
||||
source_private_key_path = pathlib.Path(arguments.source_private_key_path)
|
||||
source_certificate_path = pathlib.Path(arguments.source_certificate_path)
|
||||
_assert_source_directory(source_private_key_path)
|
||||
_assert_source_directory(source_certificate_path)
|
||||
|
||||
private_key_path = pathlib.Path(arguments.private_key_path)
|
||||
certificate_path = pathlib.Path(arguments.certificate_path)
|
||||
_assert_managed_path(arguments.managing_app, private_key_path)
|
||||
_assert_managed_path(arguments.managing_app, certificate_path)
|
||||
|
||||
result = False
|
||||
try:
|
||||
if filecmp.cmp(source_certificate_path, certificate_path) and \
|
||||
filecmp.cmp(source_private_key_path, private_key_path):
|
||||
result = True
|
||||
except FileNotFoundError:
|
||||
result = False
|
||||
|
||||
print(json.dumps({'result': result}))
|
||||
|
||||
|
||||
def _assert_source_directory(path):
|
||||
"""Assert that a path is a valid source of a certificates."""
|
||||
assert (str(path).startswith(LE_DIRECTORY)
|
||||
or str(path).startswith(ETC_SSL_DIRECTORY))
|
||||
|
||||
|
||||
def _get_managed_path(path):
|
||||
"""Return the managed path given a certificate path."""
|
||||
if '{domain}' in path:
|
||||
return pathlib.Path(path.partition('{domain}')[0])
|
||||
|
||||
return pathlib.Path(path).parent
|
||||
|
||||
|
||||
def _assert_managed_path(module, path):
|
||||
"""Check that path is in fact managed by module."""
|
||||
cfg.read()
|
||||
module_file = pathlib.Path(cfg.config_dir) / 'modules-enabled' / module
|
||||
module_path = module_file.read_text().strip()
|
||||
|
||||
module = importlib.import_module(module_path)
|
||||
module_classes = inspect.getmembers(module, inspect.isclass)
|
||||
app_classes = [
|
||||
cls[1] for cls in module_classes if issubclass(cls[1], app_module.App)
|
||||
]
|
||||
|
||||
managed_paths = []
|
||||
for cls in app_classes:
|
||||
app = cls()
|
||||
components = app.get_components_of_type(LetsEncrypt)
|
||||
for component in components:
|
||||
if component.private_key_path:
|
||||
managed_paths.append(
|
||||
_get_managed_path(component.private_key_path))
|
||||
if component.certificate_path:
|
||||
managed_paths.append(
|
||||
_get_managed_path(component.certificate_path))
|
||||
|
||||
assert set(path.parents).intersection(set(managed_paths))
|
||||
|
||||
|
||||
def subcommand_run_pre_hooks(_):
|
||||
"""Do nothing, kept for legacy LE configuration.
|
||||
|
||||
If new version of Plinth is deployed and before it can update the Let's
|
||||
Encrypt configuration and remove these old hooks, if a renew operation is
|
||||
run, then we don't want it to exit with non-zero error code because this
|
||||
hook could not be run.
|
||||
|
||||
Remove at some point in the future.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
def subcommand_run_renew_hooks(_):
|
||||
"""Do nothing, kept for legacy LE configuration.
|
||||
|
||||
If new version of Plinth is deployed and before it can update the Let's
|
||||
Encrypt configuration and remove these old hooks, if a renew operation is
|
||||
run, then we don't want it to exit with non-zero error code because this
|
||||
hook could not be run.
|
||||
|
||||
Remove at some point in the future.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
def subcommand_run_post_hooks(_):
|
||||
"""Do nothing, kept for legacy LE configuration.
|
||||
|
||||
If new version of Plinth is deployed and before it can update the Let's
|
||||
Encrypt configuration and remove these old hooks, if a renew operation is
|
||||
run, then we don't want it to exit with non-zero error code because this
|
||||
hook could not be run.
|
||||
|
||||
Remove at some point in the future.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
def subcommand_delete(arguments):
|
||||
"""Disable a domain and delete the certificate."""
|
||||
domain = arguments.domain
|
||||
command = ['certbot', 'delete', '--non-interactive', '--cert-name', domain]
|
||||
process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
_, stderr = process.communicate()
|
||||
if process.returncode:
|
||||
print(stderr.decode(), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
action_utils.webserver_disable(domain, kind='site')
|
||||
|
||||
|
||||
def setup_webserver_config(domain, webserver_change):
|
||||
"""Create SSL web server configuration for a domain.
|
||||
|
||||
Do so only if there is no configuration existing.
|
||||
"""
|
||||
file_name = os.path.join(APACHE_PREFIX, domain + '.conf')
|
||||
if os.path.isfile(file_name):
|
||||
os.rename(file_name, file_name + '.fbx-bak')
|
||||
|
||||
with open(file_name, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write(APACHE_CONFIGURATION.format(domain=domain))
|
||||
|
||||
webserver_change.enable('freedombox-tls-site-macro', kind='config')
|
||||
webserver_change.enable(domain, kind='site')
|
||||
|
||||
|
||||
def main():
|
||||
"""Parse arguments and perform all duties."""
|
||||
arguments = parse_arguments()
|
||||
|
||||
subcommand = arguments.subcommand.replace('-', '_')
|
||||
subcommand_method = globals()['subcommand_' + subcommand]
|
||||
subcommand_method(arguments)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
@ -1,7 +1,5 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""
|
||||
FreedomBox app for using Let's Encrypt.
|
||||
"""
|
||||
"""FreedomBox app for using Let's Encrypt."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
@ -9,10 +7,8 @@ import pathlib
|
||||
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from plinth import actions
|
||||
from plinth import app as app_module
|
||||
from plinth import cfg, menu
|
||||
from plinth.errors import ActionError
|
||||
from plinth.modules import names
|
||||
from plinth.modules.apache.components import diagnose_url
|
||||
from plinth.modules.backups.components import BackupRestore
|
||||
@ -21,7 +17,7 @@ from plinth.package import Packages
|
||||
from plinth.signals import domain_added, domain_removed, post_app_loading
|
||||
from plinth.utils import format_lazy
|
||||
|
||||
from . import components, manifest
|
||||
from . import components, manifest, privileged
|
||||
|
||||
_description = [
|
||||
format_lazy(
|
||||
@ -102,14 +98,12 @@ class LetsEncryptApp(app_module.App):
|
||||
def setup(self, old_version):
|
||||
"""Install and configure the app."""
|
||||
super().setup(old_version)
|
||||
actions.superuser_run('letsencrypt',
|
||||
['setup', '--old-version',
|
||||
str(old_version)])
|
||||
privileged.setup(old_version)
|
||||
|
||||
|
||||
def certificate_obtain(domain):
|
||||
"""Obtain a certificate for a domain and notify handlers."""
|
||||
actions.superuser_run('letsencrypt', ['obtain', '--domain', domain])
|
||||
privileged.obtain(domain)
|
||||
components.on_certificate_event('obtained', [domain], None)
|
||||
|
||||
|
||||
@ -124,7 +118,7 @@ def certificate_reobtain(domain):
|
||||
trigger obtain event (LE will trigger a renewal event).
|
||||
|
||||
"""
|
||||
actions.superuser_run('letsencrypt', ['obtain', '--domain', domain])
|
||||
privileged.obtain(domain)
|
||||
|
||||
|
||||
def certificate_revoke(domain, really_revoke=True):
|
||||
@ -138,20 +132,20 @@ def certificate_revoke(domain, really_revoke=True):
|
||||
obtaining certificates on the Let's Encrypt servers).
|
||||
"""
|
||||
if really_revoke:
|
||||
actions.superuser_run('letsencrypt', ['revoke', '--domain', domain])
|
||||
privileged.revoke(domain)
|
||||
|
||||
components.on_certificate_event('revoked', [domain], None)
|
||||
|
||||
|
||||
def certificate_delete(domain):
|
||||
"""Delete a certificate for a domain and notify handlers."""
|
||||
actions.superuser_run('letsencrypt', ['delete', '--domain', domain])
|
||||
privileged.delete(domain)
|
||||
components.on_certificate_event('deleted', [domain], None)
|
||||
|
||||
|
||||
def on_domain_added(sender, domain_type='', name='', description='',
|
||||
services=None, **kwargs):
|
||||
"""Obtain a certificate for the new domain"""
|
||||
"""Obtain a certificate for the new domain."""
|
||||
if not DomainType.get(domain_type).can_have_certificate:
|
||||
return False
|
||||
|
||||
@ -167,12 +161,12 @@ def on_domain_added(sender, domain_type='', name='', description='',
|
||||
logger.info('Obtaining certificate for %s', name)
|
||||
certificate_obtain(name)
|
||||
return True
|
||||
except ActionError:
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def on_domain_removed(sender, domain_type, name='', **kwargs):
|
||||
"""Revoke Let's Encrypt certificate for the removed domain"""
|
||||
"""Revoke Let's Encrypt certificate for the removed domain."""
|
||||
if not DomainType.get(domain_type).can_have_certificate:
|
||||
return False
|
||||
|
||||
@ -181,7 +175,7 @@ def on_domain_removed(sender, domain_type, name='', **kwargs):
|
||||
logger.info('Revoking certificate for %s', name)
|
||||
certificate_revoke(name, really_revoke=False)
|
||||
return True
|
||||
except ActionError as exception:
|
||||
except Exception as exception:
|
||||
logger.warning('Failed to revoke certificate for %s: %s', name,
|
||||
exception.args[2])
|
||||
return False
|
||||
@ -189,8 +183,7 @@ def on_domain_removed(sender, domain_type, name='', **kwargs):
|
||||
|
||||
def get_status():
|
||||
"""Get the current settings."""
|
||||
status = actions.superuser_run('letsencrypt', ['get-status'])
|
||||
status = json.loads(status)
|
||||
status = privileged.get_status()
|
||||
|
||||
for domain in names.components.DomainName.list():
|
||||
if domain.domain_type.can_have_certificate:
|
||||
@ -247,9 +240,7 @@ def certificate_get_last_seen_modified_time(lineage):
|
||||
def certificate_set_last_seen_modified_time(lineage):
|
||||
"""Write to store a certificate's last seen expiry date."""
|
||||
lineage = pathlib.Path(lineage)
|
||||
output = actions.superuser_run(
|
||||
'letsencrypt', ['get-modified-time', '--domain', lineage.name])
|
||||
modified_time = int(output)
|
||||
modified_time = privileged.get_modified_time(lineage.name)
|
||||
|
||||
from plinth import kvstore
|
||||
info = kvstore.get_default('letsencrypt_certificate_info', '{}')
|
||||
|
||||
@ -1,9 +1,6 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""
|
||||
App component for other apps to use handle Let's Encrypt certificates.
|
||||
"""
|
||||
"""App component for other apps to use handle Let's Encrypt certificates."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import pathlib
|
||||
import threading
|
||||
@ -11,6 +8,8 @@ import threading
|
||||
from plinth import actions, app
|
||||
from plinth.modules.names.components import DomainName
|
||||
|
||||
from . import privileged
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -327,14 +326,11 @@ class LetsEncrypt(app.FollowerComponent):
|
||||
source_certificate_path, private_key_path,
|
||||
certificate_path):
|
||||
"""Copy certificate for a single domain."""
|
||||
actions.superuser_run('letsencrypt', [
|
||||
'copy-certificate', '--managing-app', self.managing_app,
|
||||
'--user-owner', self.user_owner, '--group-owner', self.group_owner,
|
||||
'--source-private-key-path',
|
||||
str(source_private_key_path), '--source-certificate-path',
|
||||
str(source_certificate_path), '--private-key-path',
|
||||
private_key_path, '--certificate-path', certificate_path
|
||||
])
|
||||
privileged.copy_certificate(self.managing_app,
|
||||
str(source_private_key_path),
|
||||
str(source_certificate_path),
|
||||
private_key_path, certificate_path,
|
||||
self.user_owner, self.group_owner)
|
||||
|
||||
def _compare_certificate(self, domain, lineage):
|
||||
"""Compare LE certificate with app certificate."""
|
||||
@ -342,14 +338,11 @@ class LetsEncrypt(app.FollowerComponent):
|
||||
source_certificate_path = pathlib.Path(lineage) / 'fullchain.pem'
|
||||
private_key_path = self.private_key_path.format(domain=domain)
|
||||
certificate_path = self.certificate_path.format(domain=domain)
|
||||
output = actions.superuser_run('letsencrypt', [
|
||||
'compare-certificate', '--managing-app', self.managing_app,
|
||||
'--source-private-key-path',
|
||||
str(source_private_key_path), '--source-certificate-path',
|
||||
str(source_certificate_path), '--private-key-path',
|
||||
private_key_path, '--certificate-path', certificate_path
|
||||
])
|
||||
return json.loads(output)['result']
|
||||
return privileged.compare_certificate(self.managing_app,
|
||||
str(source_private_key_path),
|
||||
str(source_certificate_path),
|
||||
private_key_path,
|
||||
certificate_path)
|
||||
|
||||
|
||||
def on_certificate_event(event, domains, lineage):
|
||||
|
||||
359
plinth/modules/letsencrypt/privileged.py
Normal file
359
plinth/modules/letsencrypt/privileged.py
Normal file
@ -0,0 +1,359 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""Configure Let's Encrypt."""
|
||||
|
||||
import filecmp
|
||||
import glob
|
||||
import importlib
|
||||
import inspect
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Any
|
||||
|
||||
import configobj
|
||||
|
||||
from plinth import action_utils
|
||||
from plinth import app as app_module
|
||||
from plinth import cfg
|
||||
from plinth.actions import privileged
|
||||
from plinth.modules import letsencrypt as le
|
||||
|
||||
TEST_MODE = False
|
||||
LE_DIRECTORY = '/etc/letsencrypt/'
|
||||
ETC_SSL_DIRECTORY = '/etc/ssl/'
|
||||
RENEWAL_DIRECTORY = '/etc/letsencrypt/renewal/'
|
||||
AUTHENTICATOR = 'webroot'
|
||||
WEB_ROOT_PATH = '/var/www/html'
|
||||
APACHE_PREFIX = '/etc/apache2/sites-available/'
|
||||
APACHE_CONFIGURATION = '''
|
||||
Use FreedomBoxTLSSiteMacro {domain}
|
||||
'''
|
||||
|
||||
|
||||
def _get_certificate_expiry(domain: str) -> str:
|
||||
"""Return the expiry date of a certificate."""
|
||||
certificate_file = os.path.join(le.LIVE_DIRECTORY, domain, 'cert.pem')
|
||||
output = subprocess.check_output(
|
||||
['openssl', 'x509', '-enddate', '-noout', '-in', certificate_file])
|
||||
return output.decode().strip().split('=')[1]
|
||||
|
||||
|
||||
def _get_modified_time(domain: str) -> int:
|
||||
"""Return the last modified time of a certificate."""
|
||||
certificate_file = pathlib.Path(le.LIVE_DIRECTORY) / domain / 'cert.pem'
|
||||
return int(certificate_file.stat().st_mtime)
|
||||
|
||||
|
||||
def _get_validity_status(domain: str) -> str:
|
||||
"""Return validity status of a certificate; valid, revoked, expired."""
|
||||
output = subprocess.check_output(['certbot', 'certificates', '-d', domain])
|
||||
line = output.decode(sys.stdout.encoding)
|
||||
|
||||
match = re.search(r'INVALID: (.*)\)', line)
|
||||
if match is not None:
|
||||
validity = match.group(1).lower()
|
||||
elif re.search('VALID', line) is not None:
|
||||
validity = 'valid'
|
||||
else:
|
||||
validity = 'unknown'
|
||||
|
||||
return validity
|
||||
|
||||
|
||||
def _get_status() -> dict[str, Any]:
|
||||
"""Return Python dictionary of currently configured domains.
|
||||
|
||||
Should be run as root, otherwise might yield a wrong, empty answer.
|
||||
"""
|
||||
try:
|
||||
domains = os.listdir(le.LIVE_DIRECTORY)
|
||||
except OSError:
|
||||
domains = []
|
||||
|
||||
domains = [
|
||||
domain for domain in domains
|
||||
if os.path.isdir(os.path.join(le.LIVE_DIRECTORY, domain))
|
||||
]
|
||||
|
||||
domain_status = {}
|
||||
for domain in domains:
|
||||
domain_status[domain] = {
|
||||
'certificate_available':
|
||||
True,
|
||||
'expiry_date':
|
||||
_get_certificate_expiry(domain),
|
||||
'web_enabled':
|
||||
action_utils.webserver_is_enabled(domain, kind='site'),
|
||||
'validity':
|
||||
_get_validity_status(domain),
|
||||
'lineage':
|
||||
str(pathlib.Path(le.LIVE_DIRECTORY) / domain),
|
||||
'modified_time':
|
||||
_get_modified_time(domain)
|
||||
}
|
||||
return domain_status
|
||||
|
||||
|
||||
@privileged
|
||||
def setup(old_version: int):
|
||||
"""Upgrade old site configuration to new macro based style.
|
||||
|
||||
Nothing to do for first time setup and for newer versions.
|
||||
"""
|
||||
if old_version == 2:
|
||||
_remove_old_hooks()
|
||||
return
|
||||
|
||||
if old_version != 1:
|
||||
return
|
||||
|
||||
domain_status = _get_status()
|
||||
with action_utils.WebserverChange() as webserver_change:
|
||||
for domain in domain_status:
|
||||
_setup_webserver_config(domain, webserver_change)
|
||||
|
||||
|
||||
@privileged
|
||||
def get_status() -> dict[str, Any]:
|
||||
"""Return a dictionary of currently configured domains."""
|
||||
domain_status = _get_status()
|
||||
return {'domains': domain_status}
|
||||
|
||||
|
||||
@privileged
|
||||
def get_modified_time(domain: str) -> int:
|
||||
"""Return the modified time of a certificate as integer."""
|
||||
return _get_modified_time(domain)
|
||||
|
||||
|
||||
@privileged
|
||||
def revoke(domain: str):
|
||||
"""Disable a domain and revoke the certificate."""
|
||||
cert_path = pathlib.Path(le.LIVE_DIRECTORY) / domain / 'cert.pem'
|
||||
if cert_path.exists():
|
||||
command = [
|
||||
'certbot', 'revoke', '--non-interactive', '--domain', domain,
|
||||
'--cert-path',
|
||||
str(cert_path)
|
||||
]
|
||||
if TEST_MODE:
|
||||
command.append('--staging')
|
||||
|
||||
process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
_, stderr = process.communicate()
|
||||
if process.returncode:
|
||||
raise RuntimeError('Error revoking certificate: {error}'.format(
|
||||
error=stderr.decode()))
|
||||
|
||||
action_utils.webserver_disable(domain, kind='site')
|
||||
|
||||
|
||||
@privileged
|
||||
def obtain(domain: str):
|
||||
"""Obtain a certificate for a domain and setup website."""
|
||||
command = [
|
||||
'certbot', 'certonly', '--non-interactive', '--text', '--agree-tos',
|
||||
'--register-unsafely-without-email', '--domain', domain,
|
||||
'--authenticator', AUTHENTICATOR, '--webroot-path', WEB_ROOT_PATH,
|
||||
'--renew-by-default'
|
||||
]
|
||||
if TEST_MODE:
|
||||
command.append('--staging')
|
||||
|
||||
process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
_, stderr = process.communicate()
|
||||
if process.returncode:
|
||||
raise RuntimeError('Error obtaining certificate: {error}'.format(
|
||||
error=stderr.decode()))
|
||||
|
||||
with action_utils.WebserverChange() as webserver_change:
|
||||
_setup_webserver_config(domain, webserver_change)
|
||||
|
||||
|
||||
def _remove_old_hooks():
|
||||
"""Remove old style renewal hooks from individual configuration files.
|
||||
|
||||
This has been replaced with global hooks by adding script files in
|
||||
directory /etc/letsencrypt/renewal-hooks/{pre,post,deploy}/.
|
||||
"""
|
||||
for file_path in glob.glob(RENEWAL_DIRECTORY + '*.conf'):
|
||||
try:
|
||||
_remove_old_hooks_from_file(file_path)
|
||||
except Exception as exception:
|
||||
print('Error removing hooks from file:', file_path, exception)
|
||||
|
||||
|
||||
def _remove_old_hooks_from_file(file_path: str):
|
||||
"""Remove old style hooks from a single configuration file."""
|
||||
config = configobj.ConfigObj(file_path)
|
||||
edited = False
|
||||
for line in config.initial_comment:
|
||||
if 'edited by plinth' in line.lower():
|
||||
edited = True
|
||||
|
||||
if not edited:
|
||||
return
|
||||
|
||||
config.initial_comment = [
|
||||
line for line in config.initial_comment
|
||||
if 'edited by plinth' not in line.lower()
|
||||
]
|
||||
|
||||
if 'pre_hook' in config['renewalparams']:
|
||||
del config['renewalparams']['pre_hook']
|
||||
|
||||
if 'renew_hook' in config['renewalparams']:
|
||||
del config['renewalparams']['renew_hook']
|
||||
|
||||
if 'post_hook' in config['renewalparams']:
|
||||
del config['renewalparams']['post_hook']
|
||||
|
||||
config.write()
|
||||
|
||||
|
||||
@privileged
|
||||
def copy_certificate(managing_app: str, source_private_key: str,
|
||||
source_certificate: str, private_key: str,
|
||||
certificate: str, user_owner: str, group_owner: str):
|
||||
"""Copy certificate from LE directory to daemon's directory.
|
||||
|
||||
Set ownership and permissions as requested needed by the daemon.
|
||||
|
||||
"""
|
||||
source_private_key_path = pathlib.Path(source_private_key).resolve()
|
||||
_assert_source_directory(source_private_key_path)
|
||||
source_certificate_path = pathlib.Path(source_certificate).resolve()
|
||||
_assert_source_directory(source_certificate_path)
|
||||
|
||||
private_key_path = pathlib.Path(private_key).resolve()
|
||||
_assert_managed_path(managing_app, private_key_path)
|
||||
certificate_path = pathlib.Path(certificate).resolve()
|
||||
_assert_managed_path(managing_app, certificate_path)
|
||||
|
||||
# Create directories, owned by root
|
||||
private_key_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
|
||||
certificate_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
|
||||
|
||||
# Private key is only accessible to the user owner
|
||||
old_mask = os.umask(0o177)
|
||||
shutil.copyfile(source_private_key_path, private_key_path)
|
||||
|
||||
if certificate_path != private_key_path:
|
||||
# Certificate is only writable by the user owner
|
||||
os.umask(0o133)
|
||||
shutil.copyfile(source_certificate_path, certificate_path)
|
||||
else:
|
||||
# If private key and certificate are the same file, append one after
|
||||
# the other.
|
||||
source_certificate_bytes = source_certificate_path.read_bytes()
|
||||
with private_key_path.open(mode='a+b') as file_handle:
|
||||
file_handle.write(source_certificate_bytes)
|
||||
|
||||
os.umask(old_mask)
|
||||
|
||||
shutil.chown(certificate_path, user=user_owner, group=group_owner)
|
||||
shutil.chown(private_key_path, user=user_owner, group=group_owner)
|
||||
|
||||
|
||||
@privileged
|
||||
def compare_certificate(managing_app: str, source_private_key: str,
|
||||
source_certificate: str, private_key: str,
|
||||
certificate: str) -> bool:
|
||||
"""Compare LE certificate with an app certificate."""
|
||||
source_private_key_path = pathlib.Path(source_private_key)
|
||||
source_certificate_path = pathlib.Path(source_certificate)
|
||||
_assert_source_directory(source_private_key_path)
|
||||
_assert_source_directory(source_certificate_path)
|
||||
|
||||
private_key_path = pathlib.Path(private_key)
|
||||
certificate_path = pathlib.Path(certificate)
|
||||
_assert_managed_path(managing_app, private_key)
|
||||
_assert_managed_path(managing_app, certificate)
|
||||
|
||||
result = False
|
||||
try:
|
||||
if filecmp.cmp(source_certificate_path, certificate_path) and \
|
||||
filecmp.cmp(source_private_key_path, private_key_path):
|
||||
result = True
|
||||
except FileNotFoundError:
|
||||
result = False
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _assert_source_directory(path):
|
||||
"""Assert that a path is a valid source of a certificates."""
|
||||
assert (str(path).startswith(LE_DIRECTORY)
|
||||
or str(path).startswith(ETC_SSL_DIRECTORY))
|
||||
|
||||
|
||||
def _get_managed_path(path):
|
||||
"""Return the managed path given a certificate path."""
|
||||
if '{domain}' in path:
|
||||
return pathlib.Path(path.partition('{domain}')[0])
|
||||
|
||||
return pathlib.Path(path).parent
|
||||
|
||||
|
||||
def _assert_managed_path(module, path):
|
||||
"""Check that path is in fact managed by module."""
|
||||
cfg.read()
|
||||
module_file = pathlib.Path(cfg.config_dir) / 'modules-enabled' / module
|
||||
module_path = module_file.read_text().strip()
|
||||
|
||||
module = importlib.import_module(module_path)
|
||||
module_classes = inspect.getmembers(module, inspect.isclass)
|
||||
app_classes = [
|
||||
cls[1] for cls in module_classes if issubclass(cls[1], app_module.App)
|
||||
]
|
||||
|
||||
managed_paths = []
|
||||
for cls in app_classes:
|
||||
app = cls()
|
||||
from plinth.modules.letsencrypt.components import LetsEncrypt
|
||||
components = app.get_components_of_type(LetsEncrypt)
|
||||
for component in components:
|
||||
if component.private_key_path:
|
||||
managed_paths.append(
|
||||
_get_managed_path(component.private_key_path))
|
||||
if component.certificate_path:
|
||||
managed_paths.append(
|
||||
_get_managed_path(component.certificate_path))
|
||||
|
||||
if not set(path.parents).intersection(set(managed_paths)):
|
||||
raise AssertionError('Not a managed path')
|
||||
|
||||
|
||||
@privileged
|
||||
def delete(domain: str):
|
||||
"""Disable a domain and delete the certificate."""
|
||||
command = ['certbot', 'delete', '--non-interactive', '--cert-name', domain]
|
||||
process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
_, stderr = process.communicate()
|
||||
if process.returncode:
|
||||
raise RuntimeError('Error deleting certificate: {error}'.format(
|
||||
error=stderr.decode()))
|
||||
|
||||
action_utils.webserver_disable(domain, kind='site')
|
||||
|
||||
|
||||
def _setup_webserver_config(domain, webserver_change):
|
||||
"""Create SSL web server configuration for a domain.
|
||||
|
||||
Do so only if there is no configuration existing.
|
||||
"""
|
||||
file_name = os.path.join(APACHE_PREFIX, domain + '.conf')
|
||||
if os.path.isfile(file_name):
|
||||
os.rename(file_name, file_name + '.fbx-bak')
|
||||
|
||||
with open(file_name, 'w', encoding='utf-8') as file_handle:
|
||||
file_handle.write(APACHE_CONFIGURATION.format(domain=domain))
|
||||
|
||||
webserver_change.enable('freedombox-tls-site-macro', kind='config')
|
||||
webserver_change.enable(domain, kind='site')
|
||||
@ -3,7 +3,6 @@
|
||||
Test the Let's Encrypt component for managing certificates.
|
||||
"""
|
||||
|
||||
import json
|
||||
from unittest.mock import call, patch
|
||||
|
||||
import pytest
|
||||
@ -30,6 +29,29 @@ def fixture_component():
|
||||
managing_app='test-app')
|
||||
|
||||
|
||||
@pytest.fixture(name='try_restart')
|
||||
def fixture_try_restart():
|
||||
"""Patch and return service.try_restart privileged call."""
|
||||
with patch('plinth.privileged.service.try_restart') as try_restart:
|
||||
yield try_restart
|
||||
|
||||
|
||||
@pytest.fixture(name='copy_certificate')
|
||||
def fixture_copy_certificate():
|
||||
"""Patch and return privileged.copy_certificate call."""
|
||||
with patch('plinth.modules.letsencrypt.privileged.copy_certificate'
|
||||
) as copy_certificate:
|
||||
yield copy_certificate
|
||||
|
||||
|
||||
@pytest.fixture(name='compare_certificate')
|
||||
def fixture_compare_certificate():
|
||||
"""Patch and return privileged.compare_certificate call."""
|
||||
with patch('plinth.modules.letsencrypt.privileged.compare_certificate'
|
||||
) as compare_certificate:
|
||||
yield compare_certificate
|
||||
|
||||
|
||||
@pytest.fixture(name='get_status')
|
||||
def fixture_get_status():
|
||||
"""Return patched letsencrypt.get_status() method."""
|
||||
@ -65,13 +87,6 @@ def fixture_domain_list():
|
||||
yield domain_list
|
||||
|
||||
|
||||
@pytest.fixture(name='superuser_run')
|
||||
def fixture_superuser_run():
|
||||
"""Return patched plinth.actions.superuser_run() method."""
|
||||
with patch('plinth.actions.superuser_run') as superuser_run:
|
||||
yield superuser_run
|
||||
|
||||
|
||||
def test_init_without_arguments():
|
||||
"""Test that component is initialized with defaults properly."""
|
||||
component = LetsEncrypt('test-component')
|
||||
@ -133,13 +148,8 @@ def test_list():
|
||||
assert set(LetsEncrypt.list()) == {component1, component2}
|
||||
|
||||
|
||||
def _assert_copy_certificate_called(component, superuser_run, domains):
|
||||
def _assert_copy_certificate_called(component, copy_certificate, domains):
|
||||
"""Check that copy certificate calls have been made properly."""
|
||||
copy_calls = [
|
||||
mock_call for mock_call in superuser_run.mock_calls
|
||||
if mock_call[1][0] == 'letsencrypt'
|
||||
and mock_call[1][1][0] == 'copy-certificate'
|
||||
]
|
||||
expected_calls = []
|
||||
for domain, domain_status in domains.items():
|
||||
if domain_status == 'valid':
|
||||
@ -153,83 +163,72 @@ def _assert_copy_certificate_called(component, superuser_run, domains):
|
||||
|
||||
private_key_path = '/etc/test-app/{}/private.path'.format(domain)
|
||||
certificate_path = '/etc/test-app/{}/certificate.path'.format(domain)
|
||||
expected_call = call('letsencrypt', [
|
||||
'copy-certificate', '--managing-app', component.managing_app,
|
||||
'--user-owner', component.user_owner, '--group-owner',
|
||||
component.group_owner, '--source-private-key-path',
|
||||
str(source_private_key_path), '--source-certificate-path',
|
||||
str(source_certificate_path), '--private-key-path',
|
||||
private_key_path, '--certificate-path', certificate_path
|
||||
])
|
||||
expected_call = call(component.managing_app,
|
||||
str(source_private_key_path),
|
||||
str(source_certificate_path), private_key_path,
|
||||
certificate_path, component.user_owner,
|
||||
component.group_owner)
|
||||
expected_calls.append(expected_call)
|
||||
|
||||
assert len(expected_calls) == len(copy_calls)
|
||||
for expected_call in expected_calls:
|
||||
print(expected_call)
|
||||
print(copy_calls)
|
||||
assert expected_call in copy_calls
|
||||
copy_certificate.assert_has_calls(expected_calls, any_order=True)
|
||||
|
||||
|
||||
def _assert_restarted_daemons(daemons, superuser_run):
|
||||
def _assert_restarted_daemons(daemons, try_restart):
|
||||
"""Check that a call has restarted the daemons of a component."""
|
||||
run_calls = [
|
||||
mock_call for mock_call in superuser_run.mock_calls
|
||||
if mock_call[1][0] == 'service'
|
||||
]
|
||||
expected_calls = [
|
||||
call('service', ['try-restart', daemon]) for daemon in daemons
|
||||
]
|
||||
assert len(expected_calls) == len(run_calls)
|
||||
for expected_call in expected_calls:
|
||||
assert expected_call in run_calls
|
||||
expected_calls = [call(daemon) for daemon in daemons]
|
||||
try_restart.assert_has_calls(expected_calls, any_order=True)
|
||||
|
||||
|
||||
def test_setup_certificates(superuser_run, get_status, component):
|
||||
def test_setup_certificates(copy_certificate, try_restart, get_status,
|
||||
component):
|
||||
"""Test that initial copying of certs for an app works."""
|
||||
component.setup_certificates()
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
_assert_copy_certificate_called(component, copy_certificate, {
|
||||
'valid.example': 'valid',
|
||||
'invalid.example': 'invalid'
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def test_setup_certificates_without_copy(superuser_run, get_status, component):
|
||||
def test_setup_certificates_without_copy(copy_certificate, try_restart,
|
||||
get_status, component):
|
||||
"""Test that initial copying of certs for an app works."""
|
||||
component.should_copy_certificates = False
|
||||
component.setup_certificates()
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_copy_certificate_called(component, copy_certificate, {})
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def test_setup_certificates_with_app_domains(superuser_run, get_status,
|
||||
component):
|
||||
def test_setup_certificates_with_app_domains(copy_certificate, try_restart,
|
||||
get_status, component):
|
||||
"""Test that initial copying of certs for an app works."""
|
||||
component._domains = ['irrelevant1.example', 'irrelevant2.example']
|
||||
component.setup_certificates(
|
||||
app_domains=['valid.example', 'invalid.example'])
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
_assert_copy_certificate_called(component, copy_certificate, {
|
||||
'valid.example': 'valid',
|
||||
'invalid.example': 'invalid'
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def test_setup_certificates_with_all_domains(domain_list, superuser_run,
|
||||
get_status, component):
|
||||
def test_setup_certificates_with_all_domains(domain_list, copy_certificate,
|
||||
try_restart, get_status,
|
||||
component):
|
||||
"""Test that initial copying for certs works when app domains is '*'."""
|
||||
component._domains = '*'
|
||||
component.setup_certificates()
|
||||
_assert_copy_certificate_called(
|
||||
component, superuser_run, {
|
||||
component, copy_certificate, {
|
||||
'valid.example': 'valid',
|
||||
'invalid1.example': 'invalid',
|
||||
'invalid2.example': 'invalid'
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def _assert_compare_certificate_called(component, superuser_run, domains):
|
||||
def _assert_compare_certificate_called(component, compare_certificate,
|
||||
domains):
|
||||
"""Check that compare certificate was called properly."""
|
||||
expected_calls = []
|
||||
for domain in domains:
|
||||
@ -239,37 +238,34 @@ def _assert_compare_certificate_called(component, superuser_run, domains):
|
||||
'/etc/letsencrypt/live/{}/fullchain.pem'.format(domain)
|
||||
private_key_path = '/etc/test-app/{}/private.path'.format(domain)
|
||||
certificate_path = '/etc/test-app/{}/certificate.path'.format(domain)
|
||||
expected_call = call('letsencrypt', [
|
||||
'compare-certificate', '--managing-app', component.managing_app,
|
||||
'--source-private-key-path',
|
||||
str(source_private_key_path), '--source-certificate-path',
|
||||
str(source_certificate_path), '--private-key-path',
|
||||
private_key_path, '--certificate-path', certificate_path
|
||||
])
|
||||
expected_call = call(component.managing_app,
|
||||
str(source_private_key_path),
|
||||
str(source_certificate_path), private_key_path,
|
||||
certificate_path)
|
||||
expected_calls.append(expected_call)
|
||||
|
||||
superuser_run.assert_has_calls(expected_calls)
|
||||
compare_certificate.assert_has_calls(expected_calls, any_order=True)
|
||||
|
||||
|
||||
def test_get_status(component, superuser_run, get_status):
|
||||
def test_get_status(component, compare_certificate, get_status):
|
||||
"""Test that getting domain status works."""
|
||||
superuser_run.return_value = json.dumps({'result': True})
|
||||
compare_certificate.return_value = True
|
||||
assert component.get_status() == {
|
||||
'valid.example': 'valid',
|
||||
'invalid.example': 'self-signed'
|
||||
}
|
||||
_assert_compare_certificate_called(component, superuser_run,
|
||||
_assert_compare_certificate_called(component, compare_certificate,
|
||||
['valid.example'])
|
||||
|
||||
|
||||
def test_get_status_outdate_copy(component, superuser_run, get_status):
|
||||
def test_get_status_outdate_copy(component, compare_certificate, get_status):
|
||||
"""Test that getting domain status works with outdated copy."""
|
||||
superuser_run.return_value = json.dumps({'result': False})
|
||||
compare_certificate.return_value = False
|
||||
assert component.get_status() == {
|
||||
'valid.example': 'outdated-copy',
|
||||
'invalid.example': 'self-signed'
|
||||
}
|
||||
_assert_compare_certificate_called(component, superuser_run,
|
||||
_assert_compare_certificate_called(component, compare_certificate,
|
||||
['valid.example'])
|
||||
|
||||
|
||||
@ -282,130 +278,139 @@ def test_get_status_without_copy(component, get_status):
|
||||
}
|
||||
|
||||
|
||||
def test_on_certificate_obtained(superuser_run, component):
|
||||
def test_on_certificate_obtained(copy_certificate, try_restart, component):
|
||||
"""Test that certificate obtained event handler works."""
|
||||
component.on_certificate_obtained(['valid.example', 'irrelevant.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
_assert_copy_certificate_called(component, copy_certificate, {
|
||||
'valid.example': 'valid',
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_obtained_with_all_domains(superuser_run, component):
|
||||
def test_on_certificate_obtained_with_all_domains(copy_certificate,
|
||||
try_restart, component):
|
||||
"""Test that certificate obtained event handler works for app with
|
||||
all domains.
|
||||
"""
|
||||
component._domains = '*'
|
||||
component.on_certificate_obtained(['valid.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
_assert_copy_certificate_called(component, copy_certificate, {
|
||||
'valid.example': 'valid',
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_obtained_irrelevant(superuser_run, component):
|
||||
def test_on_certificate_obtained_irrelevant(copy_certificate, try_restart,
|
||||
component):
|
||||
"""Test that certificate obtained event handler works with
|
||||
irrelevant domain.
|
||||
"""
|
||||
component.on_certificate_obtained(
|
||||
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons([], superuser_run)
|
||||
_assert_copy_certificate_called(component, copy_certificate, {})
|
||||
_assert_restarted_daemons([], try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_obtained_without_copy(superuser_run, component):
|
||||
def test_on_certificate_obtained_without_copy(copy_certificate, try_restart,
|
||||
component):
|
||||
"""Test that certificate obtained event handler works without copying."""
|
||||
component.should_copy_certificates = False
|
||||
component.on_certificate_obtained(['valid.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_copy_certificate_called(component, copy_certificate, {})
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_renewed(superuser_run, component):
|
||||
def test_on_certificate_renewed(copy_certificate, try_restart, component):
|
||||
"""Test that certificate renewed event handler works."""
|
||||
component.on_certificate_renewed(['valid.example', 'irrelevant.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
_assert_copy_certificate_called(component, copy_certificate, {
|
||||
'valid.example': 'valid',
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_renewed_irrelevant(superuser_run, component):
|
||||
def test_on_certificate_renewed_irrelevant(copy_certificate, try_restart,
|
||||
component):
|
||||
"""Test that certificate renewed event handler works for
|
||||
irrelevant domains.
|
||||
"""
|
||||
component.on_certificate_renewed(
|
||||
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons([], superuser_run)
|
||||
_assert_copy_certificate_called(component, copy_certificate, {})
|
||||
_assert_restarted_daemons([], try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_renewed_without_copy(superuser_run, component):
|
||||
def test_on_certificate_renewed_without_copy(copy_certificate, try_restart,
|
||||
component):
|
||||
"""Test that certificate renewed event handler works without copying."""
|
||||
component.should_copy_certificates = False
|
||||
component.on_certificate_renewed(['valid.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_copy_certificate_called(component, copy_certificate, {})
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_revoked(superuser_run, component):
|
||||
def test_on_certificate_revoked(copy_certificate, try_restart, component):
|
||||
"""Test that certificate revoked event handler works."""
|
||||
component.on_certificate_revoked(['valid.example', 'irrelevant.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
_assert_copy_certificate_called(component, copy_certificate, {
|
||||
'valid.example': 'invalid',
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_revoked_irrelevant(superuser_run, component):
|
||||
def test_on_certificate_revoked_irrelevant(copy_certificate, try_restart,
|
||||
component):
|
||||
"""Test that certificate revoked event handler works for
|
||||
irrelevant domains.
|
||||
"""
|
||||
component.on_certificate_revoked(
|
||||
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons([], superuser_run)
|
||||
_assert_copy_certificate_called(component, copy_certificate, {})
|
||||
_assert_restarted_daemons([], try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_revoked_without_copy(superuser_run, component):
|
||||
def test_on_certificate_revoked_without_copy(copy_certificate, try_restart,
|
||||
component):
|
||||
"""Test that certificate revoked event handler works without copying."""
|
||||
component.should_copy_certificates = False
|
||||
component.on_certificate_revoked(['valid.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_copy_certificate_called(component, copy_certificate, {})
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_deleted(superuser_run, component):
|
||||
def test_on_certificate_deleted(copy_certificate, try_restart, component):
|
||||
"""Test that certificate deleted event handler works."""
|
||||
component.on_certificate_deleted(['valid.example', 'irrelevant.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {
|
||||
_assert_copy_certificate_called(component, copy_certificate, {
|
||||
'valid.example': 'invalid',
|
||||
})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_deleted_irrelevant(superuser_run, component):
|
||||
def test_on_certificate_deleted_irrelevant(copy_certificate, try_restart,
|
||||
component):
|
||||
"""Test that certificate deleted event handler works for
|
||||
irrelevant domains.
|
||||
"""
|
||||
component.on_certificate_deleted(
|
||||
['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons([], superuser_run)
|
||||
_assert_copy_certificate_called(component, copy_certificate, {})
|
||||
_assert_restarted_daemons([], try_restart)
|
||||
|
||||
|
||||
def test_on_certificate_deleted_without_copy(superuser_run, component):
|
||||
def test_on_certificate_deleted_without_copy(copy_certificate, try_restart,
|
||||
component):
|
||||
"""Test that certificate deleted event handler works without copying."""
|
||||
component.should_copy_certificates = False
|
||||
component.on_certificate_deleted(['valid.example'],
|
||||
'/etc/letsencrypt/live/valid.example/')
|
||||
_assert_copy_certificate_called(component, superuser_run, {})
|
||||
_assert_restarted_daemons(component.daemons, superuser_run)
|
||||
_assert_copy_certificate_called(component, copy_certificate, {})
|
||||
_assert_restarted_daemons(component.daemons, try_restart)
|
||||
|
||||
@ -11,7 +11,6 @@ from django.urls import reverse_lazy
|
||||
from django.utils.translation import gettext as _
|
||||
from django.views.decorators.http import require_POST
|
||||
|
||||
from plinth.errors import ActionError
|
||||
from plinth.modules import letsencrypt
|
||||
from plinth.views import AppView
|
||||
|
||||
@ -41,11 +40,11 @@ def revoke(request, domain):
|
||||
_('Certificate successfully revoked for domain {domain}.'
|
||||
'This may take a few moments to take effect.').format(
|
||||
domain=domain))
|
||||
except ActionError as exception:
|
||||
except Exception as exception:
|
||||
messages.error(
|
||||
request,
|
||||
_('Failed to revoke certificate for domain {domain}: {error}').
|
||||
format(domain=domain, error=exception.args[2]))
|
||||
format(domain=domain, error=exception.args))
|
||||
|
||||
return redirect(reverse_lazy('letsencrypt:index'))
|
||||
|
||||
@ -59,11 +58,11 @@ def obtain(request, domain):
|
||||
request,
|
||||
_('Certificate successfully obtained for domain {domain}').format(
|
||||
domain=domain))
|
||||
except ActionError as exception:
|
||||
except Exception as exception:
|
||||
messages.error(
|
||||
request,
|
||||
_('Failed to obtain certificate for domain {domain}: {error}').
|
||||
format(domain=domain, error=exception.args[2]))
|
||||
format(domain=domain, error=exception.args))
|
||||
return redirect(reverse_lazy('letsencrypt:index'))
|
||||
|
||||
|
||||
@ -76,11 +75,11 @@ def reobtain(request, domain):
|
||||
request,
|
||||
_('Certificate successfully obtained for domain {domain}').format(
|
||||
domain=domain))
|
||||
except ActionError as exception:
|
||||
except Exception as exception:
|
||||
messages.error(
|
||||
request,
|
||||
_('Failed to obtain certificate for domain {domain}: {error}').
|
||||
format(domain=domain, error=exception.args[2]))
|
||||
format(domain=domain, error=exception.args))
|
||||
return redirect(reverse_lazy('letsencrypt:index'))
|
||||
|
||||
|
||||
@ -93,10 +92,10 @@ def delete(request, domain):
|
||||
request,
|
||||
_('Certificate successfully deleted for domain {domain}').format(
|
||||
domain=domain))
|
||||
except ActionError as exception:
|
||||
except Exception as exception:
|
||||
messages.error(
|
||||
request,
|
||||
_('Failed to delete certificate for domain {domain}: {error}').
|
||||
format(domain=domain, error=exception.args[2]))
|
||||
format(domain=domain, error=exception.args))
|
||||
|
||||
return redirect(reverse_lazy('letsencrypt:index'))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user