From c1cf5699c20f0f3405ba6db609133465e7f76e36 Mon Sep 17 00:00:00 2001 From: Sunil Mohan Adapa Date: Fri, 2 Sep 2022 16:53:17 -0700 Subject: [PATCH] letsencrypt: Use privileged decorator for actions Tests: - DONE: Initial setup works - DONE: Certificate events on FreedomBox startup work - DONE: Basic operations work: obtain, revoke, delete - DONE: Status of certificates is shown properly - DONE: Domain add/remove hooks work, errors are handled - Not tested: Removing old hooks - DONE: Errors are shown properly on failure: revoke, obtain, reobtain, delete Signed-off-by: Sunil Mohan Adapa Reviewed-by: James Valleroy --- actions/letsencrypt | 496 +----------------- plinth/modules/letsencrypt/__init__.py | 35 +- plinth/modules/letsencrypt/components.py | 33 +- plinth/modules/letsencrypt/privileged.py | 359 +++++++++++++ .../letsencrypt/tests/test_components.py | 217 ++++---- plinth/modules/letsencrypt/views.py | 17 +- 6 files changed, 510 insertions(+), 647 deletions(-) create mode 100644 plinth/modules/letsencrypt/privileged.py diff --git a/actions/letsencrypt b/actions/letsencrypt index ed9fb04fc..cf0190e04 100755 --- a/actions/letsencrypt +++ b/actions/letsencrypt @@ -1,493 +1,9 @@ #!/usr/bin/python3 # SPDX-License-Identifier: AGPL-3.0-or-later +"""Legacy configuration helper for Let's Encrypt, kept for compatibility. + +LE configuration in the earlier implementation used to call into this script +with the sub-commands 'run-pre-hooks', 'run-renew-hooks' and 'run-post-hooks'. +This action script now allows for any arbitrary sub-command to be called and +does nothing. It can be removed after the release of Debian 12 (bookworm). """ -Configuration helper for Let's Encrypt. -""" - -import argparse -import filecmp -import glob -import importlib -import inspect -import json -import os -import pathlib -import re -import shutil -import subprocess -import sys - -import configobj - -from plinth import action_utils -from plinth import app as app_module -from plinth import cfg -from plinth.modules import letsencrypt as le -from plinth.modules.letsencrypt.components import LetsEncrypt - -TEST_MODE = False -LE_DIRECTORY = '/etc/letsencrypt/' -ETC_SSL_DIRECTORY = '/etc/ssl/' -RENEWAL_DIRECTORY = '/etc/letsencrypt/renewal/' -AUTHENTICATOR = 'webroot' -WEB_ROOT_PATH = '/var/www/html' -APACHE_PREFIX = '/etc/apache2/sites-available/' -APACHE_CONFIGURATION = ''' -Use FreedomBoxTLSSiteMacro {domain} -''' - - -def parse_arguments(): - """Return parsed command line arguments as dictionary.""" - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') - - setup_parser = subparsers.add_parser( - 'setup', help='Run any setup/upgrade activities.') - setup_parser.add_argument( - '--old-version', type=int, required=True, - help='Version number being upgraded from or None if setting up first ' - 'time.') - - subparsers.add_parser('get-status', - help='Return the status of configured domains.') - subparser = subparsers.add_parser( - 'get-modified-time', - help='Return the modified time for a certificate.') - subparser.add_argument('--domain', required=True, - help='Domain name to get modified time for') - revoke_parser = subparsers.add_parser( - 'revoke', help='Revoke certificate of a domain and disable website.') - revoke_parser.add_argument('--domain', required=True, - help='Domain name to revoke certificate for') - obtain_parser = subparsers.add_parser( - 'obtain', help='Obtain certificate for a domain and setup website.') - obtain_parser.add_argument('--domain', required=True, - help='Domain name to obtain certificate for') - delete_parser = subparsers.add_parser( - 'delete', help='Delete certificate for a domain and disable website.') - delete_parser.add_argument('--domain', required=True, - help='Domain name to delete certificate of') - - subparser = subparsers.add_parser( - 'copy-certificate', - help='Copy LE certificate to a daemon\'s directory') - subparser.add_argument('--managing-app', required=True, - help='App needing the certificate') - subparser.add_argument('--user-owner', required=True, - help='User who should own the certificate') - subparser.add_argument('--group-owner', required=True, - help='Group that should own the certificate') - subparser.add_argument('--source-private-key-path', required=True, - help='Path to the source private key') - subparser.add_argument( - '--source-certificate-path', required=True, - help='Path to the source certificate with public key') - subparser.add_argument('--private-key-path', required=True, - help='Path to the private key') - subparser.add_argument('--certificate-path', required=True, - help='Path to the certificate with public key') - - subparser = subparsers.add_parser( - 'compare-certificate', - help='Compare LE certificate to one in daemon\'s directory') - subparser.add_argument('--managing-app', required=True, - help='App needing the certificate') - subparser.add_argument('--source-private-key-path', required=True, - help='Path to the source private key') - subparser.add_argument( - '--source-certificate-path', required=True, - help='Path to the source certificate with public key') - subparser.add_argument('--private-key-path', required=True, - help='Path to the private key') - subparser.add_argument('--certificate-path', required=True, - help='Path to the certificate with public key') - - help_hooks = 'Does nothing, kept for compatibility.' - subparser = subparsers.add_parser('run_pre_hooks', help=help_hooks) - subparser.add_argument('--domain') - subparser.add_argument('--modules', nargs='+', default=[]) - - subparser = subparsers.add_parser('run_renew_hooks', help=help_hooks) - subparser.add_argument('--domain') - subparser.add_argument('--modules', nargs='+', default=[]) - - subparser = subparsers.add_parser('run_post_hooks', help=help_hooks) - subparser.add_argument('--domain') - subparser.add_argument('--modules', nargs='+', default=[]) - - subparsers.required = True - return parser.parse_args() - - -def get_certificate_expiry(domain): - """Return the expiry date of a certificate.""" - certificate_file = os.path.join(le.LIVE_DIRECTORY, domain, 'cert.pem') - output = subprocess.check_output( - ['openssl', 'x509', '-enddate', '-noout', '-in', certificate_file]) - return output.decode().strip().split('=')[1] - - -def get_modified_time(domain): - """Return the last modified time of a certificate.""" - certificate_file = pathlib.Path(le.LIVE_DIRECTORY) / domain / 'cert.pem' - return int(certificate_file.stat().st_mtime) - - -def get_validity_status(domain): - """Return validity status of a certificate, e.g. valid, revoked, expired""" - output = subprocess.check_output(['certbot', 'certificates', '-d', domain]) - output = output.decode(sys.stdout.encoding) - - match = re.search(r'INVALID: (.*)\)', output) - if match is not None: - validity = match.group(1).lower() - elif re.search('VALID', output) is not None: - validity = 'valid' - else: - validity = 'unknown' - - return validity - - -def get_status(): - """ - Return Python dictionary of currently configured domains. - Should be run as root, otherwise might yield a wrong, empty answer. - """ - try: - domains = os.listdir(le.LIVE_DIRECTORY) - except OSError: - domains = [] - - domains = [ - domain for domain in domains - if os.path.isdir(os.path.join(le.LIVE_DIRECTORY, domain)) - ] - - domain_status = {} - for domain in domains: - domain_status[domain] = { - 'certificate_available': - True, - 'expiry_date': - get_certificate_expiry(domain), - 'web_enabled': - action_utils.webserver_is_enabled(domain, kind='site'), - 'validity': - get_validity_status(domain), - 'lineage': - str(pathlib.Path(le.LIVE_DIRECTORY) / domain), - 'modified_time': - get_modified_time(domain) - } - return domain_status - - -def subcommand_setup(arguments): - """Upgrade old site configuration to new macro based style. - - Nothing to do for first time setup and for newer versions. - """ - if arguments.old_version == 2: - _remove_old_hooks() - return - - if arguments.old_version != 1: - return - - domain_status = get_status() - with action_utils.WebserverChange() as webserver_change: - for domain in domain_status: - setup_webserver_config(domain, webserver_change) - - -def subcommand_get_status(_): - """Print a JSON dictionary of currently configured domains.""" - domain_status = get_status() - print(json.dumps({'domains': domain_status})) - - -def subcommand_get_modified_time(arguments): - """Print the modified time of a certificate as integer.""" - print(get_modified_time(arguments.domain)) - - -def subcommand_revoke(arguments): - """Disable a domain and revoke the certificate.""" - domain = arguments.domain - - cert_path = pathlib.Path(le.LIVE_DIRECTORY) / domain / 'cert.pem' - if cert_path.exists(): - command = [ - 'certbot', 'revoke', '--non-interactive', '--domain', domain, - '--cert-path', cert_path - ] - if TEST_MODE: - command.append('--staging') - - process = subprocess.Popen(command, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - _, stderr = process.communicate() - if process.returncode: - print(stderr.decode(), file=sys.stderr) - sys.exit(1) - - action_utils.webserver_disable(domain, kind='site') - - -def subcommand_obtain(arguments): - """Obtain a certificate for a domain and setup website.""" - domain = arguments.domain - - command = [ - 'certbot', 'certonly', '--non-interactive', '--text', '--agree-tos', - '--register-unsafely-without-email', '--domain', arguments.domain, - '--authenticator', AUTHENTICATOR, '--webroot-path', WEB_ROOT_PATH, - '--renew-by-default' - ] - if TEST_MODE: - command.append('--staging') - - process = subprocess.Popen(command, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - _, stderr = process.communicate() - if process.returncode: - print(stderr.decode(), file=sys.stderr) - sys.exit(1) - - with action_utils.WebserverChange() as webserver_change: - setup_webserver_config(domain, webserver_change) - - -def _remove_old_hooks(): - """Remove old style renewal hooks from individual configuration files. - - This has been replaced with global hooks by adding script files in - directory /etc/letsencrypt/renewal-hooks/{pre,post,deploy}/. - - """ - for file_path in glob.glob(RENEWAL_DIRECTORY + '*.conf'): - try: - _remove_old_hooks_from_file(file_path) - except Exception as exception: - print('Error removing hooks from file:', file_path, exception) - - -def _remove_old_hooks_from_file(file_path): - """Remove old style hooks from a single configuration file.""" - config = configobj.ConfigObj(file_path) - edited = False - for line in config.initial_comment: - if 'edited by plinth' in line.lower(): - edited = True - - if not edited: - return - - config.initial_comment = [ - line for line in config.initial_comment - if 'edited by plinth' not in line.lower() - ] - - if 'pre_hook' in config['renewalparams']: - del config['renewalparams']['pre_hook'] - - if 'renew_hook' in config['renewalparams']: - del config['renewalparams']['renew_hook'] - - if 'post_hook' in config['renewalparams']: - del config['renewalparams']['post_hook'] - - config.write() - - -def subcommand_copy_certificate(arguments): - """Copy certificate from LE directory to daemon's directory. - - Set ownership and permissions as requested needed by the daemon. - - """ - source_private_key_path = pathlib.Path( - arguments.source_private_key_path).resolve() - _assert_source_directory(source_private_key_path) - source_certificate_path = pathlib.Path( - arguments.source_certificate_path).resolve() - _assert_source_directory(source_certificate_path) - - private_key_path = pathlib.Path(arguments.private_key_path).resolve() - _assert_managed_path(arguments.managing_app, private_key_path) - certificate_path = pathlib.Path(arguments.certificate_path).resolve() - _assert_managed_path(arguments.managing_app, certificate_path) - - # Create directories, owned by root - private_key_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True) - certificate_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True) - - # Private key is only accessible to the user owner - old_mask = os.umask(0o177) - shutil.copyfile(source_private_key_path, private_key_path) - - if certificate_path != private_key_path: - # Certificate is only writable by the user owner - os.umask(0o133) - shutil.copyfile(source_certificate_path, certificate_path) - else: - # If private key and certificate are the same file, append one after - # the other. - source_certificate = source_certificate_path.read_bytes() - with private_key_path.open(mode='a+b') as file_handle: - file_handle.write(source_certificate) - - os.umask(old_mask) - - shutil.chown(certificate_path, user=arguments.user_owner, - group=arguments.group_owner) - shutil.chown(private_key_path, user=arguments.user_owner, - group=arguments.group_owner) - - -def subcommand_compare_certificate(arguments): - """Compare LE certificate with an app certificate.""" - source_private_key_path = pathlib.Path(arguments.source_private_key_path) - source_certificate_path = pathlib.Path(arguments.source_certificate_path) - _assert_source_directory(source_private_key_path) - _assert_source_directory(source_certificate_path) - - private_key_path = pathlib.Path(arguments.private_key_path) - certificate_path = pathlib.Path(arguments.certificate_path) - _assert_managed_path(arguments.managing_app, private_key_path) - _assert_managed_path(arguments.managing_app, certificate_path) - - result = False - try: - if filecmp.cmp(source_certificate_path, certificate_path) and \ - filecmp.cmp(source_private_key_path, private_key_path): - result = True - except FileNotFoundError: - result = False - - print(json.dumps({'result': result})) - - -def _assert_source_directory(path): - """Assert that a path is a valid source of a certificates.""" - assert (str(path).startswith(LE_DIRECTORY) - or str(path).startswith(ETC_SSL_DIRECTORY)) - - -def _get_managed_path(path): - """Return the managed path given a certificate path.""" - if '{domain}' in path: - return pathlib.Path(path.partition('{domain}')[0]) - - return pathlib.Path(path).parent - - -def _assert_managed_path(module, path): - """Check that path is in fact managed by module.""" - cfg.read() - module_file = pathlib.Path(cfg.config_dir) / 'modules-enabled' / module - module_path = module_file.read_text().strip() - - module = importlib.import_module(module_path) - module_classes = inspect.getmembers(module, inspect.isclass) - app_classes = [ - cls[1] for cls in module_classes if issubclass(cls[1], app_module.App) - ] - - managed_paths = [] - for cls in app_classes: - app = cls() - components = app.get_components_of_type(LetsEncrypt) - for component in components: - if component.private_key_path: - managed_paths.append( - _get_managed_path(component.private_key_path)) - if component.certificate_path: - managed_paths.append( - _get_managed_path(component.certificate_path)) - - assert set(path.parents).intersection(set(managed_paths)) - - -def subcommand_run_pre_hooks(_): - """Do nothing, kept for legacy LE configuration. - - If new version of Plinth is deployed and before it can update the Let's - Encrypt configuration and remove these old hooks, if a renew operation is - run, then we don't want it to exit with non-zero error code because this - hook could not be run. - - Remove at some point in the future. - - """ - - -def subcommand_run_renew_hooks(_): - """Do nothing, kept for legacy LE configuration. - - If new version of Plinth is deployed and before it can update the Let's - Encrypt configuration and remove these old hooks, if a renew operation is - run, then we don't want it to exit with non-zero error code because this - hook could not be run. - - Remove at some point in the future. - - """ - - -def subcommand_run_post_hooks(_): - """Do nothing, kept for legacy LE configuration. - - If new version of Plinth is deployed and before it can update the Let's - Encrypt configuration and remove these old hooks, if a renew operation is - run, then we don't want it to exit with non-zero error code because this - hook could not be run. - - Remove at some point in the future. - - """ - - -def subcommand_delete(arguments): - """Disable a domain and delete the certificate.""" - domain = arguments.domain - command = ['certbot', 'delete', '--non-interactive', '--cert-name', domain] - process = subprocess.Popen(command, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - _, stderr = process.communicate() - if process.returncode: - print(stderr.decode(), file=sys.stderr) - sys.exit(1) - - action_utils.webserver_disable(domain, kind='site') - - -def setup_webserver_config(domain, webserver_change): - """Create SSL web server configuration for a domain. - - Do so only if there is no configuration existing. - """ - file_name = os.path.join(APACHE_PREFIX, domain + '.conf') - if os.path.isfile(file_name): - os.rename(file_name, file_name + '.fbx-bak') - - with open(file_name, 'w', encoding='utf-8') as file_handle: - file_handle.write(APACHE_CONFIGURATION.format(domain=domain)) - - webserver_change.enable('freedombox-tls-site-macro', kind='config') - webserver_change.enable(domain, kind='site') - - -def main(): - """Parse arguments and perform all duties.""" - arguments = parse_arguments() - - subcommand = arguments.subcommand.replace('-', '_') - subcommand_method = globals()['subcommand_' + subcommand] - subcommand_method(arguments) - - -if __name__ == '__main__': - main() diff --git a/plinth/modules/letsencrypt/__init__.py b/plinth/modules/letsencrypt/__init__.py index 76b067e10..3718e67c9 100644 --- a/plinth/modules/letsencrypt/__init__.py +++ b/plinth/modules/letsencrypt/__init__.py @@ -1,7 +1,5 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -""" -FreedomBox app for using Let's Encrypt. -""" +"""FreedomBox app for using Let's Encrypt.""" import json import logging @@ -9,10 +7,8 @@ import pathlib from django.utils.translation import gettext_lazy as _ -from plinth import actions from plinth import app as app_module from plinth import cfg, menu -from plinth.errors import ActionError from plinth.modules import names from plinth.modules.apache.components import diagnose_url from plinth.modules.backups.components import BackupRestore @@ -21,7 +17,7 @@ from plinth.package import Packages from plinth.signals import domain_added, domain_removed, post_app_loading from plinth.utils import format_lazy -from . import components, manifest +from . import components, manifest, privileged _description = [ format_lazy( @@ -102,14 +98,12 @@ class LetsEncryptApp(app_module.App): def setup(self, old_version): """Install and configure the app.""" super().setup(old_version) - actions.superuser_run('letsencrypt', - ['setup', '--old-version', - str(old_version)]) + privileged.setup(old_version) def certificate_obtain(domain): """Obtain a certificate for a domain and notify handlers.""" - actions.superuser_run('letsencrypt', ['obtain', '--domain', domain]) + privileged.obtain(domain) components.on_certificate_event('obtained', [domain], None) @@ -124,7 +118,7 @@ def certificate_reobtain(domain): trigger obtain event (LE will trigger a renewal event). """ - actions.superuser_run('letsencrypt', ['obtain', '--domain', domain]) + privileged.obtain(domain) def certificate_revoke(domain, really_revoke=True): @@ -138,20 +132,20 @@ def certificate_revoke(domain, really_revoke=True): obtaining certificates on the Let's Encrypt servers). """ if really_revoke: - actions.superuser_run('letsencrypt', ['revoke', '--domain', domain]) + privileged.revoke(domain) components.on_certificate_event('revoked', [domain], None) def certificate_delete(domain): """Delete a certificate for a domain and notify handlers.""" - actions.superuser_run('letsencrypt', ['delete', '--domain', domain]) + privileged.delete(domain) components.on_certificate_event('deleted', [domain], None) def on_domain_added(sender, domain_type='', name='', description='', services=None, **kwargs): - """Obtain a certificate for the new domain""" + """Obtain a certificate for the new domain.""" if not DomainType.get(domain_type).can_have_certificate: return False @@ -167,12 +161,12 @@ def on_domain_added(sender, domain_type='', name='', description='', logger.info('Obtaining certificate for %s', name) certificate_obtain(name) return True - except ActionError: + except Exception: return False def on_domain_removed(sender, domain_type, name='', **kwargs): - """Revoke Let's Encrypt certificate for the removed domain""" + """Revoke Let's Encrypt certificate for the removed domain.""" if not DomainType.get(domain_type).can_have_certificate: return False @@ -181,7 +175,7 @@ def on_domain_removed(sender, domain_type, name='', **kwargs): logger.info('Revoking certificate for %s', name) certificate_revoke(name, really_revoke=False) return True - except ActionError as exception: + except Exception as exception: logger.warning('Failed to revoke certificate for %s: %s', name, exception.args[2]) return False @@ -189,8 +183,7 @@ def on_domain_removed(sender, domain_type, name='', **kwargs): def get_status(): """Get the current settings.""" - status = actions.superuser_run('letsencrypt', ['get-status']) - status = json.loads(status) + status = privileged.get_status() for domain in names.components.DomainName.list(): if domain.domain_type.can_have_certificate: @@ -247,9 +240,7 @@ def certificate_get_last_seen_modified_time(lineage): def certificate_set_last_seen_modified_time(lineage): """Write to store a certificate's last seen expiry date.""" lineage = pathlib.Path(lineage) - output = actions.superuser_run( - 'letsencrypt', ['get-modified-time', '--domain', lineage.name]) - modified_time = int(output) + modified_time = privileged.get_modified_time(lineage.name) from plinth import kvstore info = kvstore.get_default('letsencrypt_certificate_info', '{}') diff --git a/plinth/modules/letsencrypt/components.py b/plinth/modules/letsencrypt/components.py index e6f1a29ac..5fa760598 100644 --- a/plinth/modules/letsencrypt/components.py +++ b/plinth/modules/letsencrypt/components.py @@ -1,9 +1,6 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -""" -App component for other apps to use handle Let's Encrypt certificates. -""" +"""App component for other apps to use handle Let's Encrypt certificates.""" -import json import logging import pathlib import threading @@ -11,6 +8,8 @@ import threading from plinth import actions, app from plinth.modules.names.components import DomainName +from . import privileged + logger = logging.getLogger(__name__) @@ -327,14 +326,11 @@ class LetsEncrypt(app.FollowerComponent): source_certificate_path, private_key_path, certificate_path): """Copy certificate for a single domain.""" - actions.superuser_run('letsencrypt', [ - 'copy-certificate', '--managing-app', self.managing_app, - '--user-owner', self.user_owner, '--group-owner', self.group_owner, - '--source-private-key-path', - str(source_private_key_path), '--source-certificate-path', - str(source_certificate_path), '--private-key-path', - private_key_path, '--certificate-path', certificate_path - ]) + privileged.copy_certificate(self.managing_app, + str(source_private_key_path), + str(source_certificate_path), + private_key_path, certificate_path, + self.user_owner, self.group_owner) def _compare_certificate(self, domain, lineage): """Compare LE certificate with app certificate.""" @@ -342,14 +338,11 @@ class LetsEncrypt(app.FollowerComponent): source_certificate_path = pathlib.Path(lineage) / 'fullchain.pem' private_key_path = self.private_key_path.format(domain=domain) certificate_path = self.certificate_path.format(domain=domain) - output = actions.superuser_run('letsencrypt', [ - 'compare-certificate', '--managing-app', self.managing_app, - '--source-private-key-path', - str(source_private_key_path), '--source-certificate-path', - str(source_certificate_path), '--private-key-path', - private_key_path, '--certificate-path', certificate_path - ]) - return json.loads(output)['result'] + return privileged.compare_certificate(self.managing_app, + str(source_private_key_path), + str(source_certificate_path), + private_key_path, + certificate_path) def on_certificate_event(event, domains, lineage): diff --git a/plinth/modules/letsencrypt/privileged.py b/plinth/modules/letsencrypt/privileged.py new file mode 100644 index 000000000..d77e9e29a --- /dev/null +++ b/plinth/modules/letsencrypt/privileged.py @@ -0,0 +1,359 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Configure Let's Encrypt.""" + +import filecmp +import glob +import importlib +import inspect +import os +import pathlib +import re +import shutil +import subprocess +import sys +from typing import Any + +import configobj + +from plinth import action_utils +from plinth import app as app_module +from plinth import cfg +from plinth.actions import privileged +from plinth.modules import letsencrypt as le + +TEST_MODE = False +LE_DIRECTORY = '/etc/letsencrypt/' +ETC_SSL_DIRECTORY = '/etc/ssl/' +RENEWAL_DIRECTORY = '/etc/letsencrypt/renewal/' +AUTHENTICATOR = 'webroot' +WEB_ROOT_PATH = '/var/www/html' +APACHE_PREFIX = '/etc/apache2/sites-available/' +APACHE_CONFIGURATION = ''' +Use FreedomBoxTLSSiteMacro {domain} +''' + + +def _get_certificate_expiry(domain: str) -> str: + """Return the expiry date of a certificate.""" + certificate_file = os.path.join(le.LIVE_DIRECTORY, domain, 'cert.pem') + output = subprocess.check_output( + ['openssl', 'x509', '-enddate', '-noout', '-in', certificate_file]) + return output.decode().strip().split('=')[1] + + +def _get_modified_time(domain: str) -> int: + """Return the last modified time of a certificate.""" + certificate_file = pathlib.Path(le.LIVE_DIRECTORY) / domain / 'cert.pem' + return int(certificate_file.stat().st_mtime) + + +def _get_validity_status(domain: str) -> str: + """Return validity status of a certificate; valid, revoked, expired.""" + output = subprocess.check_output(['certbot', 'certificates', '-d', domain]) + line = output.decode(sys.stdout.encoding) + + match = re.search(r'INVALID: (.*)\)', line) + if match is not None: + validity = match.group(1).lower() + elif re.search('VALID', line) is not None: + validity = 'valid' + else: + validity = 'unknown' + + return validity + + +def _get_status() -> dict[str, Any]: + """Return Python dictionary of currently configured domains. + + Should be run as root, otherwise might yield a wrong, empty answer. + """ + try: + domains = os.listdir(le.LIVE_DIRECTORY) + except OSError: + domains = [] + + domains = [ + domain for domain in domains + if os.path.isdir(os.path.join(le.LIVE_DIRECTORY, domain)) + ] + + domain_status = {} + for domain in domains: + domain_status[domain] = { + 'certificate_available': + True, + 'expiry_date': + _get_certificate_expiry(domain), + 'web_enabled': + action_utils.webserver_is_enabled(domain, kind='site'), + 'validity': + _get_validity_status(domain), + 'lineage': + str(pathlib.Path(le.LIVE_DIRECTORY) / domain), + 'modified_time': + _get_modified_time(domain) + } + return domain_status + + +@privileged +def setup(old_version: int): + """Upgrade old site configuration to new macro based style. + + Nothing to do for first time setup and for newer versions. + """ + if old_version == 2: + _remove_old_hooks() + return + + if old_version != 1: + return + + domain_status = _get_status() + with action_utils.WebserverChange() as webserver_change: + for domain in domain_status: + _setup_webserver_config(domain, webserver_change) + + +@privileged +def get_status() -> dict[str, Any]: + """Return a dictionary of currently configured domains.""" + domain_status = _get_status() + return {'domains': domain_status} + + +@privileged +def get_modified_time(domain: str) -> int: + """Return the modified time of a certificate as integer.""" + return _get_modified_time(domain) + + +@privileged +def revoke(domain: str): + """Disable a domain and revoke the certificate.""" + cert_path = pathlib.Path(le.LIVE_DIRECTORY) / domain / 'cert.pem' + if cert_path.exists(): + command = [ + 'certbot', 'revoke', '--non-interactive', '--domain', domain, + '--cert-path', + str(cert_path) + ] + if TEST_MODE: + command.append('--staging') + + process = subprocess.Popen(command, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + _, stderr = process.communicate() + if process.returncode: + raise RuntimeError('Error revoking certificate: {error}'.format( + error=stderr.decode())) + + action_utils.webserver_disable(domain, kind='site') + + +@privileged +def obtain(domain: str): + """Obtain a certificate for a domain and setup website.""" + command = [ + 'certbot', 'certonly', '--non-interactive', '--text', '--agree-tos', + '--register-unsafely-without-email', '--domain', domain, + '--authenticator', AUTHENTICATOR, '--webroot-path', WEB_ROOT_PATH, + '--renew-by-default' + ] + if TEST_MODE: + command.append('--staging') + + process = subprocess.Popen(command, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + _, stderr = process.communicate() + if process.returncode: + raise RuntimeError('Error obtaining certificate: {error}'.format( + error=stderr.decode())) + + with action_utils.WebserverChange() as webserver_change: + _setup_webserver_config(domain, webserver_change) + + +def _remove_old_hooks(): + """Remove old style renewal hooks from individual configuration files. + + This has been replaced with global hooks by adding script files in + directory /etc/letsencrypt/renewal-hooks/{pre,post,deploy}/. + """ + for file_path in glob.glob(RENEWAL_DIRECTORY + '*.conf'): + try: + _remove_old_hooks_from_file(file_path) + except Exception as exception: + print('Error removing hooks from file:', file_path, exception) + + +def _remove_old_hooks_from_file(file_path: str): + """Remove old style hooks from a single configuration file.""" + config = configobj.ConfigObj(file_path) + edited = False + for line in config.initial_comment: + if 'edited by plinth' in line.lower(): + edited = True + + if not edited: + return + + config.initial_comment = [ + line for line in config.initial_comment + if 'edited by plinth' not in line.lower() + ] + + if 'pre_hook' in config['renewalparams']: + del config['renewalparams']['pre_hook'] + + if 'renew_hook' in config['renewalparams']: + del config['renewalparams']['renew_hook'] + + if 'post_hook' in config['renewalparams']: + del config['renewalparams']['post_hook'] + + config.write() + + +@privileged +def copy_certificate(managing_app: str, source_private_key: str, + source_certificate: str, private_key: str, + certificate: str, user_owner: str, group_owner: str): + """Copy certificate from LE directory to daemon's directory. + + Set ownership and permissions as requested needed by the daemon. + + """ + source_private_key_path = pathlib.Path(source_private_key).resolve() + _assert_source_directory(source_private_key_path) + source_certificate_path = pathlib.Path(source_certificate).resolve() + _assert_source_directory(source_certificate_path) + + private_key_path = pathlib.Path(private_key).resolve() + _assert_managed_path(managing_app, private_key_path) + certificate_path = pathlib.Path(certificate).resolve() + _assert_managed_path(managing_app, certificate_path) + + # Create directories, owned by root + private_key_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True) + certificate_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True) + + # Private key is only accessible to the user owner + old_mask = os.umask(0o177) + shutil.copyfile(source_private_key_path, private_key_path) + + if certificate_path != private_key_path: + # Certificate is only writable by the user owner + os.umask(0o133) + shutil.copyfile(source_certificate_path, certificate_path) + else: + # If private key and certificate are the same file, append one after + # the other. + source_certificate_bytes = source_certificate_path.read_bytes() + with private_key_path.open(mode='a+b') as file_handle: + file_handle.write(source_certificate_bytes) + + os.umask(old_mask) + + shutil.chown(certificate_path, user=user_owner, group=group_owner) + shutil.chown(private_key_path, user=user_owner, group=group_owner) + + +@privileged +def compare_certificate(managing_app: str, source_private_key: str, + source_certificate: str, private_key: str, + certificate: str) -> bool: + """Compare LE certificate with an app certificate.""" + source_private_key_path = pathlib.Path(source_private_key) + source_certificate_path = pathlib.Path(source_certificate) + _assert_source_directory(source_private_key_path) + _assert_source_directory(source_certificate_path) + + private_key_path = pathlib.Path(private_key) + certificate_path = pathlib.Path(certificate) + _assert_managed_path(managing_app, private_key) + _assert_managed_path(managing_app, certificate) + + result = False + try: + if filecmp.cmp(source_certificate_path, certificate_path) and \ + filecmp.cmp(source_private_key_path, private_key_path): + result = True + except FileNotFoundError: + result = False + + return result + + +def _assert_source_directory(path): + """Assert that a path is a valid source of a certificates.""" + assert (str(path).startswith(LE_DIRECTORY) + or str(path).startswith(ETC_SSL_DIRECTORY)) + + +def _get_managed_path(path): + """Return the managed path given a certificate path.""" + if '{domain}' in path: + return pathlib.Path(path.partition('{domain}')[0]) + + return pathlib.Path(path).parent + + +def _assert_managed_path(module, path): + """Check that path is in fact managed by module.""" + cfg.read() + module_file = pathlib.Path(cfg.config_dir) / 'modules-enabled' / module + module_path = module_file.read_text().strip() + + module = importlib.import_module(module_path) + module_classes = inspect.getmembers(module, inspect.isclass) + app_classes = [ + cls[1] for cls in module_classes if issubclass(cls[1], app_module.App) + ] + + managed_paths = [] + for cls in app_classes: + app = cls() + from plinth.modules.letsencrypt.components import LetsEncrypt + components = app.get_components_of_type(LetsEncrypt) + for component in components: + if component.private_key_path: + managed_paths.append( + _get_managed_path(component.private_key_path)) + if component.certificate_path: + managed_paths.append( + _get_managed_path(component.certificate_path)) + + if not set(path.parents).intersection(set(managed_paths)): + raise AssertionError('Not a managed path') + + +@privileged +def delete(domain: str): + """Disable a domain and delete the certificate.""" + command = ['certbot', 'delete', '--non-interactive', '--cert-name', domain] + process = subprocess.Popen(command, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + _, stderr = process.communicate() + if process.returncode: + raise RuntimeError('Error deleting certificate: {error}'.format( + error=stderr.decode())) + + action_utils.webserver_disable(domain, kind='site') + + +def _setup_webserver_config(domain, webserver_change): + """Create SSL web server configuration for a domain. + + Do so only if there is no configuration existing. + """ + file_name = os.path.join(APACHE_PREFIX, domain + '.conf') + if os.path.isfile(file_name): + os.rename(file_name, file_name + '.fbx-bak') + + with open(file_name, 'w', encoding='utf-8') as file_handle: + file_handle.write(APACHE_CONFIGURATION.format(domain=domain)) + + webserver_change.enable('freedombox-tls-site-macro', kind='config') + webserver_change.enable(domain, kind='site') diff --git a/plinth/modules/letsencrypt/tests/test_components.py b/plinth/modules/letsencrypt/tests/test_components.py index 942db5f0a..320c235ad 100644 --- a/plinth/modules/letsencrypt/tests/test_components.py +++ b/plinth/modules/letsencrypt/tests/test_components.py @@ -3,7 +3,6 @@ Test the Let's Encrypt component for managing certificates. """ -import json from unittest.mock import call, patch import pytest @@ -30,6 +29,29 @@ def fixture_component(): managing_app='test-app') +@pytest.fixture(name='try_restart') +def fixture_try_restart(): + """Patch and return service.try_restart privileged call.""" + with patch('plinth.privileged.service.try_restart') as try_restart: + yield try_restart + + +@pytest.fixture(name='copy_certificate') +def fixture_copy_certificate(): + """Patch and return privileged.copy_certificate call.""" + with patch('plinth.modules.letsencrypt.privileged.copy_certificate' + ) as copy_certificate: + yield copy_certificate + + +@pytest.fixture(name='compare_certificate') +def fixture_compare_certificate(): + """Patch and return privileged.compare_certificate call.""" + with patch('plinth.modules.letsencrypt.privileged.compare_certificate' + ) as compare_certificate: + yield compare_certificate + + @pytest.fixture(name='get_status') def fixture_get_status(): """Return patched letsencrypt.get_status() method.""" @@ -65,13 +87,6 @@ def fixture_domain_list(): yield domain_list -@pytest.fixture(name='superuser_run') -def fixture_superuser_run(): - """Return patched plinth.actions.superuser_run() method.""" - with patch('plinth.actions.superuser_run') as superuser_run: - yield superuser_run - - def test_init_without_arguments(): """Test that component is initialized with defaults properly.""" component = LetsEncrypt('test-component') @@ -133,13 +148,8 @@ def test_list(): assert set(LetsEncrypt.list()) == {component1, component2} -def _assert_copy_certificate_called(component, superuser_run, domains): +def _assert_copy_certificate_called(component, copy_certificate, domains): """Check that copy certificate calls have been made properly.""" - copy_calls = [ - mock_call for mock_call in superuser_run.mock_calls - if mock_call[1][0] == 'letsencrypt' - and mock_call[1][1][0] == 'copy-certificate' - ] expected_calls = [] for domain, domain_status in domains.items(): if domain_status == 'valid': @@ -153,83 +163,72 @@ def _assert_copy_certificate_called(component, superuser_run, domains): private_key_path = '/etc/test-app/{}/private.path'.format(domain) certificate_path = '/etc/test-app/{}/certificate.path'.format(domain) - expected_call = call('letsencrypt', [ - 'copy-certificate', '--managing-app', component.managing_app, - '--user-owner', component.user_owner, '--group-owner', - component.group_owner, '--source-private-key-path', - str(source_private_key_path), '--source-certificate-path', - str(source_certificate_path), '--private-key-path', - private_key_path, '--certificate-path', certificate_path - ]) + expected_call = call(component.managing_app, + str(source_private_key_path), + str(source_certificate_path), private_key_path, + certificate_path, component.user_owner, + component.group_owner) expected_calls.append(expected_call) - assert len(expected_calls) == len(copy_calls) - for expected_call in expected_calls: - print(expected_call) - print(copy_calls) - assert expected_call in copy_calls + copy_certificate.assert_has_calls(expected_calls, any_order=True) -def _assert_restarted_daemons(daemons, superuser_run): +def _assert_restarted_daemons(daemons, try_restart): """Check that a call has restarted the daemons of a component.""" - run_calls = [ - mock_call for mock_call in superuser_run.mock_calls - if mock_call[1][0] == 'service' - ] - expected_calls = [ - call('service', ['try-restart', daemon]) for daemon in daemons - ] - assert len(expected_calls) == len(run_calls) - for expected_call in expected_calls: - assert expected_call in run_calls + expected_calls = [call(daemon) for daemon in daemons] + try_restart.assert_has_calls(expected_calls, any_order=True) -def test_setup_certificates(superuser_run, get_status, component): +def test_setup_certificates(copy_certificate, try_restart, get_status, + component): """Test that initial copying of certs for an app works.""" component.setup_certificates() - _assert_copy_certificate_called(component, superuser_run, { + _assert_copy_certificate_called(component, copy_certificate, { 'valid.example': 'valid', 'invalid.example': 'invalid' }) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_restarted_daemons(component.daemons, try_restart) -def test_setup_certificates_without_copy(superuser_run, get_status, component): +def test_setup_certificates_without_copy(copy_certificate, try_restart, + get_status, component): """Test that initial copying of certs for an app works.""" component.should_copy_certificates = False component.setup_certificates() - _assert_copy_certificate_called(component, superuser_run, {}) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_copy_certificate_called(component, copy_certificate, {}) + _assert_restarted_daemons(component.daemons, try_restart) -def test_setup_certificates_with_app_domains(superuser_run, get_status, - component): +def test_setup_certificates_with_app_domains(copy_certificate, try_restart, + get_status, component): """Test that initial copying of certs for an app works.""" component._domains = ['irrelevant1.example', 'irrelevant2.example'] component.setup_certificates( app_domains=['valid.example', 'invalid.example']) - _assert_copy_certificate_called(component, superuser_run, { + _assert_copy_certificate_called(component, copy_certificate, { 'valid.example': 'valid', 'invalid.example': 'invalid' }) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_restarted_daemons(component.daemons, try_restart) -def test_setup_certificates_with_all_domains(domain_list, superuser_run, - get_status, component): +def test_setup_certificates_with_all_domains(domain_list, copy_certificate, + try_restart, get_status, + component): """Test that initial copying for certs works when app domains is '*'.""" component._domains = '*' component.setup_certificates() _assert_copy_certificate_called( - component, superuser_run, { + component, copy_certificate, { 'valid.example': 'valid', 'invalid1.example': 'invalid', 'invalid2.example': 'invalid' }) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_restarted_daemons(component.daemons, try_restart) -def _assert_compare_certificate_called(component, superuser_run, domains): +def _assert_compare_certificate_called(component, compare_certificate, + domains): """Check that compare certificate was called properly.""" expected_calls = [] for domain in domains: @@ -239,37 +238,34 @@ def _assert_compare_certificate_called(component, superuser_run, domains): '/etc/letsencrypt/live/{}/fullchain.pem'.format(domain) private_key_path = '/etc/test-app/{}/private.path'.format(domain) certificate_path = '/etc/test-app/{}/certificate.path'.format(domain) - expected_call = call('letsencrypt', [ - 'compare-certificate', '--managing-app', component.managing_app, - '--source-private-key-path', - str(source_private_key_path), '--source-certificate-path', - str(source_certificate_path), '--private-key-path', - private_key_path, '--certificate-path', certificate_path - ]) + expected_call = call(component.managing_app, + str(source_private_key_path), + str(source_certificate_path), private_key_path, + certificate_path) expected_calls.append(expected_call) - superuser_run.assert_has_calls(expected_calls) + compare_certificate.assert_has_calls(expected_calls, any_order=True) -def test_get_status(component, superuser_run, get_status): +def test_get_status(component, compare_certificate, get_status): """Test that getting domain status works.""" - superuser_run.return_value = json.dumps({'result': True}) + compare_certificate.return_value = True assert component.get_status() == { 'valid.example': 'valid', 'invalid.example': 'self-signed' } - _assert_compare_certificate_called(component, superuser_run, + _assert_compare_certificate_called(component, compare_certificate, ['valid.example']) -def test_get_status_outdate_copy(component, superuser_run, get_status): +def test_get_status_outdate_copy(component, compare_certificate, get_status): """Test that getting domain status works with outdated copy.""" - superuser_run.return_value = json.dumps({'result': False}) + compare_certificate.return_value = False assert component.get_status() == { 'valid.example': 'outdated-copy', 'invalid.example': 'self-signed' } - _assert_compare_certificate_called(component, superuser_run, + _assert_compare_certificate_called(component, compare_certificate, ['valid.example']) @@ -282,130 +278,139 @@ def test_get_status_without_copy(component, get_status): } -def test_on_certificate_obtained(superuser_run, component): +def test_on_certificate_obtained(copy_certificate, try_restart, component): """Test that certificate obtained event handler works.""" component.on_certificate_obtained(['valid.example', 'irrelevant.example'], '/etc/letsencrypt/live/valid.example/') - _assert_copy_certificate_called(component, superuser_run, { + _assert_copy_certificate_called(component, copy_certificate, { 'valid.example': 'valid', }) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_restarted_daemons(component.daemons, try_restart) -def test_on_certificate_obtained_with_all_domains(superuser_run, component): +def test_on_certificate_obtained_with_all_domains(copy_certificate, + try_restart, component): """Test that certificate obtained event handler works for app with all domains. """ component._domains = '*' component.on_certificate_obtained(['valid.example'], '/etc/letsencrypt/live/valid.example/') - _assert_copy_certificate_called(component, superuser_run, { + _assert_copy_certificate_called(component, copy_certificate, { 'valid.example': 'valid', }) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_restarted_daemons(component.daemons, try_restart) -def test_on_certificate_obtained_irrelevant(superuser_run, component): +def test_on_certificate_obtained_irrelevant(copy_certificate, try_restart, + component): """Test that certificate obtained event handler works with irrelevant domain. """ component.on_certificate_obtained( ['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/') - _assert_copy_certificate_called(component, superuser_run, {}) - _assert_restarted_daemons([], superuser_run) + _assert_copy_certificate_called(component, copy_certificate, {}) + _assert_restarted_daemons([], try_restart) -def test_on_certificate_obtained_without_copy(superuser_run, component): +def test_on_certificate_obtained_without_copy(copy_certificate, try_restart, + component): """Test that certificate obtained event handler works without copying.""" component.should_copy_certificates = False component.on_certificate_obtained(['valid.example'], '/etc/letsencrypt/live/valid.example/') - _assert_copy_certificate_called(component, superuser_run, {}) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_copy_certificate_called(component, copy_certificate, {}) + _assert_restarted_daemons(component.daemons, try_restart) -def test_on_certificate_renewed(superuser_run, component): +def test_on_certificate_renewed(copy_certificate, try_restart, component): """Test that certificate renewed event handler works.""" component.on_certificate_renewed(['valid.example', 'irrelevant.example'], '/etc/letsencrypt/live/valid.example/') - _assert_copy_certificate_called(component, superuser_run, { + _assert_copy_certificate_called(component, copy_certificate, { 'valid.example': 'valid', }) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_restarted_daemons(component.daemons, try_restart) -def test_on_certificate_renewed_irrelevant(superuser_run, component): +def test_on_certificate_renewed_irrelevant(copy_certificate, try_restart, + component): """Test that certificate renewed event handler works for irrelevant domains. """ component.on_certificate_renewed( ['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/') - _assert_copy_certificate_called(component, superuser_run, {}) - _assert_restarted_daemons([], superuser_run) + _assert_copy_certificate_called(component, copy_certificate, {}) + _assert_restarted_daemons([], try_restart) -def test_on_certificate_renewed_without_copy(superuser_run, component): +def test_on_certificate_renewed_without_copy(copy_certificate, try_restart, + component): """Test that certificate renewed event handler works without copying.""" component.should_copy_certificates = False component.on_certificate_renewed(['valid.example'], '/etc/letsencrypt/live/valid.example/') - _assert_copy_certificate_called(component, superuser_run, {}) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_copy_certificate_called(component, copy_certificate, {}) + _assert_restarted_daemons(component.daemons, try_restart) -def test_on_certificate_revoked(superuser_run, component): +def test_on_certificate_revoked(copy_certificate, try_restart, component): """Test that certificate revoked event handler works.""" component.on_certificate_revoked(['valid.example', 'irrelevant.example'], '/etc/letsencrypt/live/valid.example/') - _assert_copy_certificate_called(component, superuser_run, { + _assert_copy_certificate_called(component, copy_certificate, { 'valid.example': 'invalid', }) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_restarted_daemons(component.daemons, try_restart) -def test_on_certificate_revoked_irrelevant(superuser_run, component): +def test_on_certificate_revoked_irrelevant(copy_certificate, try_restart, + component): """Test that certificate revoked event handler works for irrelevant domains. """ component.on_certificate_revoked( ['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/') - _assert_copy_certificate_called(component, superuser_run, {}) - _assert_restarted_daemons([], superuser_run) + _assert_copy_certificate_called(component, copy_certificate, {}) + _assert_restarted_daemons([], try_restart) -def test_on_certificate_revoked_without_copy(superuser_run, component): +def test_on_certificate_revoked_without_copy(copy_certificate, try_restart, + component): """Test that certificate revoked event handler works without copying.""" component.should_copy_certificates = False component.on_certificate_revoked(['valid.example'], '/etc/letsencrypt/live/valid.example/') - _assert_copy_certificate_called(component, superuser_run, {}) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_copy_certificate_called(component, copy_certificate, {}) + _assert_restarted_daemons(component.daemons, try_restart) -def test_on_certificate_deleted(superuser_run, component): +def test_on_certificate_deleted(copy_certificate, try_restart, component): """Test that certificate deleted event handler works.""" component.on_certificate_deleted(['valid.example', 'irrelevant.example'], '/etc/letsencrypt/live/valid.example/') - _assert_copy_certificate_called(component, superuser_run, { + _assert_copy_certificate_called(component, copy_certificate, { 'valid.example': 'invalid', }) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_restarted_daemons(component.daemons, try_restart) -def test_on_certificate_deleted_irrelevant(superuser_run, component): +def test_on_certificate_deleted_irrelevant(copy_certificate, try_restart, + component): """Test that certificate deleted event handler works for irrelevant domains. """ component.on_certificate_deleted( ['irrelevant.example'], '/etc/letsencrypt/live/irrelevant.example/') - _assert_copy_certificate_called(component, superuser_run, {}) - _assert_restarted_daemons([], superuser_run) + _assert_copy_certificate_called(component, copy_certificate, {}) + _assert_restarted_daemons([], try_restart) -def test_on_certificate_deleted_without_copy(superuser_run, component): +def test_on_certificate_deleted_without_copy(copy_certificate, try_restart, + component): """Test that certificate deleted event handler works without copying.""" component.should_copy_certificates = False component.on_certificate_deleted(['valid.example'], '/etc/letsencrypt/live/valid.example/') - _assert_copy_certificate_called(component, superuser_run, {}) - _assert_restarted_daemons(component.daemons, superuser_run) + _assert_copy_certificate_called(component, copy_certificate, {}) + _assert_restarted_daemons(component.daemons, try_restart) diff --git a/plinth/modules/letsencrypt/views.py b/plinth/modules/letsencrypt/views.py index f6b9906f1..5a946f680 100644 --- a/plinth/modules/letsencrypt/views.py +++ b/plinth/modules/letsencrypt/views.py @@ -11,7 +11,6 @@ from django.urls import reverse_lazy from django.utils.translation import gettext as _ from django.views.decorators.http import require_POST -from plinth.errors import ActionError from plinth.modules import letsencrypt from plinth.views import AppView @@ -41,11 +40,11 @@ def revoke(request, domain): _('Certificate successfully revoked for domain {domain}.' 'This may take a few moments to take effect.').format( domain=domain)) - except ActionError as exception: + except Exception as exception: messages.error( request, _('Failed to revoke certificate for domain {domain}: {error}'). - format(domain=domain, error=exception.args[2])) + format(domain=domain, error=exception.args)) return redirect(reverse_lazy('letsencrypt:index')) @@ -59,11 +58,11 @@ def obtain(request, domain): request, _('Certificate successfully obtained for domain {domain}').format( domain=domain)) - except ActionError as exception: + except Exception as exception: messages.error( request, _('Failed to obtain certificate for domain {domain}: {error}'). - format(domain=domain, error=exception.args[2])) + format(domain=domain, error=exception.args)) return redirect(reverse_lazy('letsencrypt:index')) @@ -76,11 +75,11 @@ def reobtain(request, domain): request, _('Certificate successfully obtained for domain {domain}').format( domain=domain)) - except ActionError as exception: + except Exception as exception: messages.error( request, _('Failed to obtain certificate for domain {domain}: {error}'). - format(domain=domain, error=exception.args[2])) + format(domain=domain, error=exception.args)) return redirect(reverse_lazy('letsencrypt:index')) @@ -93,10 +92,10 @@ def delete(request, domain): request, _('Certificate successfully deleted for domain {domain}').format( domain=domain)) - except ActionError as exception: + except Exception as exception: messages.error( request, _('Failed to delete certificate for domain {domain}: {error}'). - format(domain=domain, error=exception.args[2])) + format(domain=domain, error=exception.args)) return redirect(reverse_lazy('letsencrypt:index'))