mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
Also fix flake8 warnings. Closes: #1613 Signed-off-by: Veiko Aasa <veiko17@disroot.org> [sunil@medhas.org split multi strings differently to avoid spaces] Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: Sunil Mohan Adapa <sunil@medhas.org>
480 lines
17 KiB
Python
Executable File
480 lines
17 KiB
Python
Executable File
#!/usr/bin/python3
|
|
#
|
|
# This file is part of FreedomBox.
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
"""
|
|
Configuration helper for Let's Encrypt.
|
|
"""
|
|
|
|
import argparse
|
|
import filecmp
|
|
import glob
|
|
import importlib
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
|
|
import configobj
|
|
|
|
from plinth import action_utils, cfg
|
|
from plinth.modules import letsencrypt as le
|
|
|
|
TEST_MODE = False
|
|
LE_DIRECTORY = '/etc/letsencrypt/'
|
|
ETC_SSL_DIRECTORY = '/etc/ssl/'
|
|
RENEWAL_DIRECTORY = '/etc/letsencrypt/renewal/'
|
|
AUTHENTICATOR = 'webroot'
|
|
WEB_ROOT_PATH = '/var/www/html'
|
|
APACHE_PREFIX = '/etc/apache2/sites-available/'
|
|
APACHE_CONFIGURATION = '''
|
|
Use FreedomBoxTLSSiteMacro {domain}
|
|
'''
|
|
|
|
|
|
def parse_arguments():
|
|
"""Return parsed command line arguments as dictionary."""
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
|
|
|
|
setup_parser = subparsers.add_parser(
|
|
'setup', help='Run any setup/upgrade activities.')
|
|
setup_parser.add_argument(
|
|
'--old-version', type=int, required=True,
|
|
help='Version number being upgraded from or None if setting up first '
|
|
'time.')
|
|
|
|
subparsers.add_parser('get-status',
|
|
help='Return the status of configured domains.')
|
|
subparser = subparsers.add_parser(
|
|
'get-modified-time',
|
|
help='Return the modified time for a certificate.')
|
|
subparser.add_argument('--domain', required=True,
|
|
help='Domain name to get modified time for')
|
|
revoke_parser = subparsers.add_parser(
|
|
'revoke', help='Revoke certificate of a domain and disable website.')
|
|
revoke_parser.add_argument('--domain', required=True,
|
|
help='Domain name to revoke certificate for')
|
|
obtain_parser = subparsers.add_parser(
|
|
'obtain', help='Obtain certificate for a domain and setup website.')
|
|
obtain_parser.add_argument('--domain', required=True,
|
|
help='Domain name to obtain certificate for')
|
|
delete_parser = subparsers.add_parser(
|
|
'delete', help='Delete certificate for a domain and disable website.')
|
|
delete_parser.add_argument('--domain', required=True,
|
|
help='Domain name to delete certificate of')
|
|
|
|
subparser = subparsers.add_parser(
|
|
'copy-certificate',
|
|
help='Copy LE certificate to a daemon\'s directory')
|
|
subparser.add_argument('--managing-app', required=True,
|
|
help='App needing the certificate')
|
|
subparser.add_argument('--user-owner', required=True,
|
|
help='User who should own the certificate')
|
|
subparser.add_argument('--group-owner', required=True,
|
|
help='Group that should own the certificate')
|
|
subparser.add_argument('--source-private-key-path', required=True,
|
|
help='Path to the source private key')
|
|
subparser.add_argument(
|
|
'--source-certificate-path', required=True,
|
|
help='Path to the source certificate with public key')
|
|
subparser.add_argument('--private-key-path', required=True,
|
|
help='Path to the private key')
|
|
subparser.add_argument('--certificate-path', required=True,
|
|
help='Path to the certificate with public key')
|
|
|
|
subparser = subparsers.add_parser(
|
|
'compare-certificate',
|
|
help='Compare LE certificate to one in daemon\'s directory')
|
|
subparser.add_argument('--managing-app', required=True,
|
|
help='App needing the certificate')
|
|
subparser.add_argument('--source-private-key-path', required=True,
|
|
help='Path to the source private key')
|
|
subparser.add_argument(
|
|
'--source-certificate-path', required=True,
|
|
help='Path to the source certificate with public key')
|
|
subparser.add_argument('--private-key-path', required=True,
|
|
help='Path to the private key')
|
|
subparser.add_argument('--certificate-path', required=True,
|
|
help='Path to the certificate with public key')
|
|
|
|
help_hooks = 'Does nothing, kept for compatibility.'
|
|
subparser = subparsers.add_parser('run_pre_hooks', help=help_hooks)
|
|
subparser.add_argument('--domain')
|
|
subparser.add_argument('--modules', nargs='+', default=[])
|
|
|
|
subparser = subparsers.add_parser('run_renew_hooks', help=help_hooks)
|
|
subparser.add_argument('--domain')
|
|
subparser.add_argument('--modules', nargs='+', default=[])
|
|
|
|
subparser = subparsers.add_parser('run_post_hooks', help=help_hooks)
|
|
subparser.add_argument('--domain')
|
|
subparser.add_argument('--modules', nargs='+', default=[])
|
|
|
|
subparsers.required = True
|
|
return parser.parse_args()
|
|
|
|
|
|
def get_certificate_expiry(domain):
|
|
"""Return the expiry date of a certificate."""
|
|
certificate_file = os.path.join(le.LIVE_DIRECTORY, domain, 'cert.pem')
|
|
output = subprocess.check_output(
|
|
['openssl', 'x509', '-enddate', '-noout', '-in', certificate_file])
|
|
return output.decode().strip().split('=')[1]
|
|
|
|
|
|
def get_modified_time(domain):
|
|
"""Return the last modified time of a certificate."""
|
|
certificate_file = pathlib.Path(le.LIVE_DIRECTORY) / domain / 'cert.pem'
|
|
return int(certificate_file.stat().st_mtime)
|
|
|
|
|
|
def get_validity_status(domain):
|
|
"""Return validity status of a certificate, e.g. valid, revoked, expired"""
|
|
output = subprocess.check_output(['certbot', 'certificates', '-d', domain])
|
|
output = output.decode(sys.stdout.encoding)
|
|
|
|
match = re.search(r'INVALID: (.*)\)', output)
|
|
if match is not None:
|
|
validity = match.group(1).lower()
|
|
elif re.search('VALID', output) is not None:
|
|
validity = 'valid'
|
|
else:
|
|
validity = 'unknown'
|
|
|
|
return validity
|
|
|
|
|
|
def get_status():
|
|
"""
|
|
Return Python dictionary of currently configured domains.
|
|
Should be run as root, otherwise might yield a wrong, empty answer.
|
|
"""
|
|
try:
|
|
domains = os.listdir(le.LIVE_DIRECTORY)
|
|
except OSError:
|
|
domains = []
|
|
|
|
domains = [
|
|
domain for domain in domains
|
|
if os.path.isdir(os.path.join(le.LIVE_DIRECTORY, domain))
|
|
]
|
|
|
|
domain_status = {}
|
|
for domain in domains:
|
|
domain_status[domain] = {
|
|
'certificate_available':
|
|
True,
|
|
'expiry_date':
|
|
get_certificate_expiry(domain),
|
|
'web_enabled':
|
|
action_utils.webserver_is_enabled(domain, kind='site'),
|
|
'validity':
|
|
get_validity_status(domain),
|
|
'lineage':
|
|
str(pathlib.Path(le.LIVE_DIRECTORY) / domain),
|
|
'modified_time':
|
|
get_modified_time(domain)
|
|
}
|
|
return domain_status
|
|
|
|
|
|
def subcommand_setup(arguments):
|
|
"""Upgrade old site configuration to new macro based style.
|
|
|
|
Nothing to do for first time setup and for newer versions.
|
|
"""
|
|
if arguments.old_version == 2:
|
|
_remove_old_hooks()
|
|
return
|
|
|
|
if arguments.old_version != 1:
|
|
return
|
|
|
|
domain_status = get_status()
|
|
with action_utils.WebserverChange() as webserver_change:
|
|
for domain in domain_status:
|
|
setup_webserver_config(domain, webserver_change)
|
|
|
|
|
|
def subcommand_get_status(_):
|
|
"""Print a JSON dictionary of currently configured domains."""
|
|
domain_status = get_status()
|
|
print(json.dumps({'domains': domain_status}))
|
|
|
|
|
|
def subcommand_get_modified_time(arguments):
|
|
"""Print the modified time of a certificate as integer."""
|
|
print(get_modified_time(arguments.domain))
|
|
|
|
|
|
def subcommand_revoke(arguments):
|
|
"""Disable a domain and revoke the certificate."""
|
|
domain = arguments.domain
|
|
|
|
cert_path = pathlib.Path(le.LIVE_DIRECTORY) / domain / 'cert.pem'
|
|
if cert_path.exists():
|
|
command = [
|
|
'certbot', 'revoke', '--non-interactive', '--domain', domain,
|
|
'--cert-path', cert_path
|
|
]
|
|
if TEST_MODE:
|
|
command.append('--staging')
|
|
|
|
process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
_, stderr = process.communicate()
|
|
if process.returncode:
|
|
print(stderr.decode(), file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
action_utils.webserver_disable(domain, kind='site')
|
|
|
|
|
|
def subcommand_obtain(arguments):
|
|
"""Obtain a certificate for a domain and setup website."""
|
|
domain = arguments.domain
|
|
|
|
command = [
|
|
'certbot', 'certonly', '--non-interactive', '--text', '--agree-tos',
|
|
'--register-unsafely-without-email', '--domain', arguments.domain,
|
|
'--authenticator', AUTHENTICATOR, '--webroot-path', WEB_ROOT_PATH,
|
|
'--renew-by-default'
|
|
]
|
|
if TEST_MODE:
|
|
command.append('--staging')
|
|
|
|
process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
_, stderr = process.communicate()
|
|
if process.returncode:
|
|
print(stderr.decode(), file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
with action_utils.WebserverChange() as webserver_change:
|
|
setup_webserver_config(domain, webserver_change)
|
|
|
|
|
|
def _remove_old_hooks():
|
|
"""Remove old style renewal hooks from individual configuration files.
|
|
|
|
This has been replaced with global hooks by adding script files in
|
|
directory /etc/letsencrypt/renewal-hooks/{pre,post,deploy}/.
|
|
|
|
"""
|
|
for file_path in glob.glob(RENEWAL_DIRECTORY + '*.conf'):
|
|
try:
|
|
_remove_old_hooks_from_file(file_path)
|
|
except Exception as exception:
|
|
print('Error removing hooks from file:', file_path, exception)
|
|
|
|
|
|
def _remove_old_hooks_from_file(file_path):
|
|
"""Remove old style hooks from a single configuration file."""
|
|
config = configobj.ConfigObj(file_path)
|
|
edited = False
|
|
for line in config.initial_comment:
|
|
if 'edited by plinth' in line.lower():
|
|
edited = True
|
|
|
|
if not edited:
|
|
return
|
|
|
|
config.initial_comment = [
|
|
line for line in config.initial_comment
|
|
if 'edited by plinth' not in line.lower()
|
|
]
|
|
|
|
if 'pre_hook' in config['renewalparams']:
|
|
del config['renewalparams']['pre_hook']
|
|
|
|
if 'renew_hook' in config['renewalparams']:
|
|
del config['renewalparams']['renew_hook']
|
|
|
|
if 'post_hook' in config['renewalparams']:
|
|
del config['renewalparams']['post_hook']
|
|
|
|
config.write()
|
|
|
|
|
|
def subcommand_copy_certificate(arguments):
|
|
"""Copy certificate from LE directory to daemon's directory.
|
|
|
|
Set ownership and permissions as requested needed by the daemon.
|
|
|
|
"""
|
|
source_private_key_path = pathlib.Path(
|
|
arguments.source_private_key_path).resolve()
|
|
_assert_source_directory(source_private_key_path)
|
|
source_certificate_path = pathlib.Path(
|
|
arguments.source_certificate_path).resolve()
|
|
_assert_source_directory(source_certificate_path)
|
|
|
|
private_key_path = pathlib.Path(arguments.private_key_path).resolve()
|
|
_assert_managed_path(arguments.managing_app, private_key_path)
|
|
certificate_path = pathlib.Path(arguments.certificate_path).resolve()
|
|
_assert_managed_path(arguments.managing_app, certificate_path)
|
|
|
|
# Create directories, owned by root
|
|
private_key_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
|
|
certificate_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
|
|
|
|
# Private key is only accessible to the user owner
|
|
old_mask = os.umask(0o177)
|
|
shutil.copyfile(source_private_key_path, private_key_path)
|
|
|
|
if certificate_path != private_key_path:
|
|
# Certificate is only writable by the user owner
|
|
os.umask(0o133)
|
|
shutil.copyfile(source_certificate_path, certificate_path)
|
|
else:
|
|
# If private key and certificate are the same file, append one after
|
|
# the other.
|
|
source_certificate = source_certificate_path.read_bytes()
|
|
with private_key_path.open(mode='a+b') as file_handle:
|
|
file_handle.write(source_certificate)
|
|
|
|
os.umask(old_mask)
|
|
|
|
shutil.chown(certificate_path, user=arguments.user_owner,
|
|
group=arguments.group_owner)
|
|
shutil.chown(private_key_path, user=arguments.user_owner,
|
|
group=arguments.group_owner)
|
|
|
|
|
|
def subcommand_compare_certificate(arguments):
|
|
"""Compare LE certificate with an app certificate."""
|
|
source_private_key_path = pathlib.Path(arguments.source_private_key_path)
|
|
source_certificate_path = pathlib.Path(arguments.source_certificate_path)
|
|
_assert_source_directory(source_private_key_path)
|
|
_assert_source_directory(source_certificate_path)
|
|
|
|
private_key_path = pathlib.Path(arguments.private_key_path)
|
|
certificate_path = pathlib.Path(arguments.certificate_path)
|
|
_assert_managed_path(arguments.managing_app, private_key_path)
|
|
_assert_managed_path(arguments.managing_app, certificate_path)
|
|
|
|
result = False
|
|
try:
|
|
if filecmp.cmp(source_certificate_path, certificate_path) and \
|
|
filecmp.cmp(source_private_key_path, private_key_path):
|
|
result = True
|
|
except FileNotFoundError:
|
|
result = False
|
|
|
|
print(json.dumps({'result': result}))
|
|
|
|
|
|
def _assert_source_directory(path):
|
|
"""Assert that a path is a valid source of a certificates."""
|
|
assert (str(path).startswith(LE_DIRECTORY)
|
|
or str(path).startswith(ETC_SSL_DIRECTORY))
|
|
|
|
|
|
def _assert_managed_path(module, path):
|
|
"""Check that path is in fact managed by module."""
|
|
cfg.read()
|
|
module_file = pathlib.Path(cfg.config_dir) / 'modules-enabled' / module
|
|
module_path = module_file.read_text().strip()
|
|
|
|
module = importlib.import_module(module_path)
|
|
assert set(path.parents).intersection(set(module.managed_paths))
|
|
|
|
|
|
def subcommand_run_pre_hooks(_):
|
|
"""Do nothing, kept for legacy LE configuration.
|
|
|
|
If new version of Plinth is deployed and before it can update the Let's
|
|
Encrypt configuration and remove these old hooks, if a renew operation is
|
|
run, then we don't want it to exit with non-zero error code because this
|
|
hook could not be run.
|
|
|
|
Remove at some point in the future.
|
|
|
|
"""
|
|
|
|
|
|
def subcommand_run_renew_hooks(_):
|
|
"""Do nothing, kept for legacy LE configuration.
|
|
|
|
If new version of Plinth is deployed and before it can update the Let's
|
|
Encrypt configuration and remove these old hooks, if a renew operation is
|
|
run, then we don't want it to exit with non-zero error code because this
|
|
hook could not be run.
|
|
|
|
Remove at some point in the future.
|
|
|
|
"""
|
|
|
|
|
|
def subcommand_run_post_hooks(_):
|
|
"""Do nothing, kept for legacy LE configuration.
|
|
|
|
If new version of Plinth is deployed and before it can update the Let's
|
|
Encrypt configuration and remove these old hooks, if a renew operation is
|
|
run, then we don't want it to exit with non-zero error code because this
|
|
hook could not be run.
|
|
|
|
Remove at some point in the future.
|
|
|
|
"""
|
|
|
|
|
|
def subcommand_delete(arguments):
|
|
"""Disable a domain and delete the certificate."""
|
|
domain = arguments.domain
|
|
command = ['certbot', 'delete', '--non-interactive', '--cert-name', domain]
|
|
process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
_, stderr = process.communicate()
|
|
if process.returncode:
|
|
print(stderr.decode(), file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
action_utils.webserver_disable(domain, kind='site')
|
|
|
|
|
|
def setup_webserver_config(domain, webserver_change):
|
|
"""Create SSL web server configuration for a domain.
|
|
|
|
Do so only if there is no configuration existing.
|
|
"""
|
|
file_name = os.path.join(APACHE_PREFIX, domain + '.conf')
|
|
if os.path.isfile(file_name):
|
|
os.rename(file_name, file_name + '.fbx-bak')
|
|
|
|
with open(file_name, 'w') as file_handle:
|
|
file_handle.write(APACHE_CONFIGURATION.format(domain=domain))
|
|
|
|
webserver_change.enable('freedombox-tls-site-macro', kind='config')
|
|
webserver_change.enable(domain, kind='site')
|
|
|
|
|
|
def main():
|
|
"""Parse arguments and perform all duties."""
|
|
arguments = parse_arguments()
|
|
|
|
subcommand = arguments.subcommand.replace('-', '_')
|
|
subcommand_method = globals()['subcommand_' + subcommand]
|
|
subcommand_method(arguments)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|