#!/usr/bin/python3 # # This file is part of FreedomBox. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # """ Configuration helper for Let's Encrypt. """ import argparse import json import os import shutil import subprocess import sys import re import configobj from plinth import action_utils from plinth.errors import ActionError from plinth.modules import config from plinth.modules import letsencrypt as le TEST_MODE = False RENEWAL_DIRECTORY = '/etc/letsencrypt/renewal/' AUTHENTICATOR = 'webroot' WEB_ROOT_PATH = '/var/www/html' APACHE_PREFIX = '/etc/apache2/sites-available/' APACHE_CONFIGURATION = ''' Use FreedomBoxTLSSiteMacro {domain} ''' def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') setup_parser = subparsers.add_parser( 'setup', help='Run any setup/upgrade activities.') setup_parser.add_argument( '--old-version', type=int, required=True, help= 'Version number being upgraded from or None if setting up first time.') subparsers.add_parser('get-status', help='Return the status of configured domains.') revoke_parser = subparsers.add_parser( 'revoke', help='Revoke certificate of a domain and disable website.') revoke_parser.add_argument('--domain', required=True, help='Domain name to revoke certificate for') obtain_parser = subparsers.add_parser( 'obtain', help='Obtain certificate for a domain and setup website.') obtain_parser.add_argument('--domain', required=True, help='Domain name to obtain certificate for') delete_parser = subparsers.add_parser( 'delete', help='Delete certificate for a domain and disable website.') delete_parser.add_argument('--domain', required=True, help='Domain name to delete certificate of') help_hooks = 'FreedomBox management of certificates (for current domain).' manage_hook_parser = subparsers.add_parser('manage_hooks', help=help_hooks) manage_hook_parser.add_argument('command', help=help_hooks, choices=('enable', 'disable', 'status')) manage_hook_parser.add_argument('--domain', help='Domain for hook management command.') help_module_arg = 'For enable: Also use LE cert with other FreedomBox ' \ 'apps.' manage_hook_parser.add_argument('--modules', help=help_module_arg, nargs='+', default=[], choices=le.MODULES_WITH_HOOKS) help_domain_arg = 'Domain name to run the hook scripts with.' help_module_arg = 'Include hooks from the provided module names.' help_pre_hooks = 'Maintenance tasks before a cert is obtained or renewed.' run_pre_hooks_parser = subparsers.add_parser('run_pre_hooks', help=help_pre_hooks) run_pre_hooks_parser.add_argument('--domain', help=help_domain_arg) run_pre_hooks_parser.add_argument('--modules', help=help_module_arg, nargs='+', default=[], choices=le.MODULES_WITH_HOOKS) help_renew_hooks = 'Maintenance tasks after a cert is actually renewed.' run_renew_hooks_parser = subparsers.add_parser('run_renew_hooks', help=help_renew_hooks) run_renew_hooks_parser.add_argument('--domain', help=help_domain_arg) run_renew_hooks_parser.add_argument('--modules', help=help_module_arg, nargs='+', default=[], choices=le.MODULES_WITH_HOOKS) help_post_hooks = 'Maintenance tasks after a cert is obtained or renewed.' run_post_hooks_parser = subparsers.add_parser('run_post_hooks', help=help_post_hooks) run_post_hooks_parser.add_argument('--domain', help=help_domain_arg) run_post_hooks_parser.add_argument('--modules', help=help_module_arg, nargs='+', default=[], choices=le.MODULES_WITH_HOOKS) subparsers.required = True return parser.parse_args() def get_certificate_expiry(domain): """Return the expiry date of a certificate.""" certificate_file = os.path.join(le.LIVE_DIRECTORY, domain, 'cert.pem') output = subprocess.check_output( ['openssl', 'x509', '-enddate', '-noout', '-in', certificate_file]) return output.decode().strip().split('=')[1] def get_validity_status(domain): """Return validity status of a certificate, e.g. valid, revoked, expired""" output = subprocess.check_output(['certbot', 'certificates', '-d', domain]) output = output.decode(sys.stdout.encoding) match = re.search(r'INVALID: (.*)\)', output) if match is not None: validity = match.group(1).lower() elif re.search('VALID', output) is not None: validity = 'valid' else: validity = 'unknown' return validity def get_status(): """ Return Python dictionary of currently configured domains. Should be run as root, otherwise might yield a wrong, empty answer. """ try: domains = os.listdir(le.LIVE_DIRECTORY) except OSError: domains = [] domains = [ domain for domain in domains if os.path.isdir(os.path.join(le.LIVE_DIRECTORY, domain)) ] domain_status = {} for domain in domains: domain_status[domain] = { 'certificate_available': True, 'expiry_date': get_certificate_expiry(domain), 'web_enabled': action_utils.webserver_is_enabled(domain, kind='site'), 'validity': get_validity_status(domain) } return domain_status def subcommand_setup(arguments): """Upgrade old site configuration to new macro based style. Nothing to do for first time setup and for newer versions. """ if arguments.old_version != 1: return domain_status = get_status() with action_utils.WebserverChange() as webserver_change: for domain in domain_status: setup_webserver_config(domain, webserver_change) def subcommand_get_status(_): """Print a JSON dictionary of currently configured domains.""" domain_status = get_status() print(json.dumps({'domains': domain_status})) def subcommand_revoke(arguments): """Disable a domain and revoke the certificate.""" domain = arguments.domain command = [ 'certbot', 'revoke', '--domain', domain, '--cert-path', os.path.join(le.LIVE_DIRECTORY, domain, 'cert.pem') ] if TEST_MODE: command.append('--staging') process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) _, stderr = process.communicate() if process.returncode: print(stderr.decode(), file=sys.stderr) sys.exit(1) action_utils.webserver_disable(domain, kind='site') def subcommand_obtain(arguments): """Obtain a certificate for a domain and setup website.""" domain = arguments.domain command = [ 'certbot', 'certonly', '--text', '--agree-tos', '--register-unsafely-without-email', '--domain', arguments.domain, '--authenticator', AUTHENTICATOR, '--webroot-path', WEB_ROOT_PATH, '--renew-by-default' ] if TEST_MODE: command.append('--staging') process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) _, stderr = process.communicate() if process.returncode: print(stderr.decode(), file=sys.stderr) sys.exit(1) with action_utils.WebserverChange() as webserver_change: setup_webserver_config(domain, webserver_change) def subcommand_manage_hooks(arguments): """ Enable/disable/status of certbot's pre-, renew-, & post-hooks for renewal. Enable edits renewal config of current domain (RENEWAL_DIR/DOMAIN.config), and creates a backup beforehand that is restored if disable is called. Commands disable and status work without error on any domain string. """ if arguments.command not in ('enable', 'disable', 'status'): print('Aborted: Unknown command "%s".' % arguments.command) sys.exit(1) cmd_is_enable = arguments.command == 'enable' current_domain = config.get_domainname() if not arguments.domain: arguments.domain = current_domain if cmd_is_enable: if not current_domain: print('Aborted: No current domain set.') sys.exit(2) if not arguments.domain == current_domain: print('Aborted: Passed domain (%s) is not current domain (%s).' % (arguments.domain, current_domain)) sys.exit(3) domain_status = get_status() if current_domain not in domain_status: print('Aborted: Current domain (%s) has no LE certificate.' % current_domain) sys.exit(4) config_path = RENEWAL_DIRECTORY + arguments.domain + '.conf' if not os.path.exists(config_path): msg, code = ('Aborted', 5) if cmd_is_enable else ('Disabled', 0) print('%s: No certbot config file at %s.' % (msg, config_path)) sys.exit(code) config_certbot = configobj.ConfigObj(config_path) if 'renewalparams' not in config_certbot: msg, code = ('Aborted', 6) if cmd_is_enable else ('Disabled', 0) print('%s: No section [renewalparams] in config file at %s.' % (msg, config_path)) sys.exit(code) script_path = os.path.realpath(__file__) config_backup_path = config_path + '_plinth_backup' call_pre = script_path + ' run_pre_hooks --domain ' + arguments.domain call_renew = script_path + ' run_renew_hooks --domain ' + arguments.domain call_post = script_path + ' run_post_hooks --domain ' + arguments.domain config_plinth = { 'renewalparams': { 'authenticator': AUTHENTICATOR, # 'webroot_path': [WEB_ROOT_PATH], # removed by renew... 'webroot_map': { arguments.domain: WEB_ROOT_PATH }, 'installer': 'None', 'pre_hook': call_pre, 'renew_hook': call_renew, 'post_hook': call_post } } comment_plinth = '# This file was edited by Plinth.' config_edited_by_plinth = any([ 'edited by plinth' in line.lower() for line in config_certbot.initial_comment ]) if arguments.command == 'status': # check for presence of expected minimal configuration config_checks = [ (entry in config_certbot['renewalparams']) and (str(config_plinth['renewalparams'][entry]) in str( config_certbot['renewalparams'][entry])) for entry in config_plinth['renewalparams'].keys() ] if not all(config_checks): print('disabled') sys.exit(0) # is enabled; check for which selected modules (only for renew_hook) cmd_str = config_certbot['renewalparams']['renew_hook'] module_list = [] for mod_str in le.MODULES_WITH_HOOKS: mod_pattern = 'letsencrypt .*--modules .*%s.*' % mod_str match = re.search(mod_pattern, cmd_str) if match is not None: module_list.append(mod_str) if module_list != []: print('enabled, with modules: ' + ', '.join(module_list)) else: print('enabled, without modules') elif arguments.command == 'enable': if not config_edited_by_plinth: shutil.copy(config_path, config_backup_path) config_certbot.initial_comment.append(comment_plinth) if arguments.modules != []: call_renew += ' --modules ' + ' '.join(arguments.modules) config_plinth['renewalparams']['renew_hook'] = call_renew config_certbot['renewalparams'].update(config_plinth['renewalparams']) config_certbot.write() if arguments.modules != []: print('enabled, with modules: ' + ', '.join(arguments.modules)) else: print('enabled, without modules') elif arguments.command == 'disable': # if changed, restore from backup; refuse disabling if no backup exists if not config_edited_by_plinth: print('Disabled: Nothing to do, hook management was not enabled.') elif os.path.exists(config_backup_path): shutil.move(config_backup_path, config_path) print('disabled successfully') else: print('Aborted: No backup config file at %s.' % config_backup_path) sys.exit(7) sys.exit(0) def subcommand_run_pre_hooks(arguments): """ Execute all needed maintenance tasks BEFORE a cert is obtained/renewed. If registered as certbot's pre-hook, this script gets ALWAYS executed when certbot attempts a renewal, irrespective of necessity/success (2x per day). """ # Require current domain, to avoid confusion (e.g. call from old cron job). if not arguments.domain: print('Aborted: You must specify the current domain.') sys.exit(1) current_domain = config.get_domainname() if not arguments.domain == current_domain: print('Aborted: Current domain is %s, but called for %s.' % (current_domain, arguments.domain)) sys.exit(2) sys.exit(0) def subcommand_run_renew_hooks(arguments): """ Execute ALL maintenance tasks when (just after) certbot renews a cert, i.e. run all tasks of every app/plinth module that supports usage of LE certs. If registered as certbot's renew-hook, this script gets ONLY executed when certbot successfully renewed a certificate; with Debian default config, this means it would run about once every 60 days (renewals get executed if a cert is <30 days before expiry, and current default is 90 days). Errors will be logged by certbot to /var/log/letsencrypt/letsencrypt.log. """ # Require current domain, to avoid confusion (e.g. call from old cron job). if not arguments.domain: print('Aborted: You must specify the current domain.') sys.exit(1) current_domain = config.get_domainname() if not arguments.domain == current_domain: print('Aborted: Current domain is %s, but called for %s.' % (current_domain, arguments.domain)) sys.exit(2) if action_utils.service_is_running('apache2'): action_utils.service_restart('apache2') for module in arguments.modules: # If >1 modules, collect errors and raise just one in the end, # for certbot to log ALL failed attempts, not just 1st fail. error_messages = [] try: _run_action(module, ['letsencrypt', 'add']) except Exception as err: error_messages.append(err.message) if error_messages: raise ActionError(message="\n".join(error_messages)) sys.exit(3) sys.exit(0) def _run_action(action, action_options=None): """ Run a specific action from another module, for the run_renew_hooks command. This function is a simplified version of plinth/actions.py, to enable somewhat safe calls of other actions from outside the FreedomBox Service (Plinth) process. The comments about the action contracts refer to plinth/actions.py. """ if action_options is None: action_options = [] # Contract 3A and 3B: don't call anything outside of the actions directory. # Assume the current path is the actions directly. script_path = os.path.realpath(__file__) actions_dir, _ = os.path.split(script_path) if os.sep in action: raise ValueError('Action cannot contain: ' + os.sep) cmd = os.path.join(actions_dir, action) if not os.path.realpath(cmd).startswith(actions_dir): raise ValueError('Action has to be in directory %s' % actions_dir) # Contract 3C: interpret shell escape sequences as literal file names. # Contract 3E: fail if the action doesn't exist or exists elsewhere. if not os.access(cmd, os.F_OK): raise ValueError('Action must exist in action directory.') cmd = [cmd] # Contract: 3C, 3D: don't allow shell special characters in # options be interpreted by the shell. if action_options: if not isinstance(action_options, (list, tuple)): raise ValueError('Options must be list or tuple.') cmd += list(action_options) # No escaping necessary # Contract 3C: don't interpret shell escape sequences. # Contract 5 (and 6-ish). proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) output, error = proc.communicate() output, error = output.decode(), error.decode() if proc.returncode != 0: raise ActionError(action, output, error) def subcommand_run_post_hooks(arguments): """ Execute all needed maintenance tasks AFTER a cert is obtained/renewed. If registered as certbot's post-hook, this script gets ALWAYS executed when certbot attempts a renewal, irrespective of necessity/success (2x per day). """ # Require current domain, to avoid confusion (e.g. call from old cron job). if not arguments.domain: print('Aborted: You must specify the current domain.') sys.exit(1) current_domain = config.get_domainname() if not arguments.domain == current_domain: print('Aborted: Current domain is %s, but called for %s.' % (current_domain, arguments.domain)) sys.exit(2) sys.exit(0) def subcommand_delete(arguments): """Disable a domain and delete the certificate.""" domain = arguments.domain command = ['certbot', 'delete', '--cert-name', domain] process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) _, stderr = process.communicate() if process.returncode: print(stderr.decode(), file=sys.stderr) sys.exit(1) action_utils.webserver_disable(domain, kind='site') def setup_webserver_config(domain, webserver_change): """Create SSL web server configuration for a domain. Do so only if there is no configuration existing. """ file_name = os.path.join(APACHE_PREFIX, domain + '.conf') if os.path.isfile(file_name): os.rename(file_name, file_name + '.fbx-bak') with open(file_name, 'w') as file_handle: file_handle.write(APACHE_CONFIGURATION.format(domain=domain)) webserver_change.enable('macro', kind='module') webserver_change.enable('freedombox-tls-site-macro', kind='config') webserver_change.enable(domain, kind='site') def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()