FreedomBox/actions/monkeysphere
Sunil Mohan Adapa eada506b23
actions/*: Use SPDX license identifier
Reviewed-by: Veiko Aasa <veiko17@disroot.org>
2020-02-19 14:39:36 +02:00

318 lines
11 KiB
Python
Executable File

#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Configuration helper for monkeysphere.
"""
import argparse
import contextlib
import copy
import json
import os
import re
import shutil
import signal
import subprocess
import tempfile
import augeas
import psutil
def parse_arguments():
"""Return parsed command line arguments as dictionary."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
host_show_keys = subparsers.add_parser(
'host-show-keys', help='Show imported/importable keys')
host_show_keys.add_argument('key_id', nargs='?',
help='Optional KEYID to retrieve details for')
host_import_key = subparsers.add_parser(
'host-import-key', help='Import a key into monkeysphere')
host_import_key.add_argument('ssh_fingerprint',
help='SSH fingerprint of the key to import')
host_import_key.add_argument('domains', nargs='*',
help='List of available domains')
host_publish_key = subparsers.add_parser('host-publish-key',
help='Push host key to keyserver')
host_publish_key.add_argument('key_ids', nargs='*',
help='Optional list of KEYIDs')
host_cancel_publish = subparsers.add_parser(
'host-cancel-publish', help='Cancel a running publish operation')
host_cancel_publish.add_argument('pid', help='PID of the publish process')
subparsers.required = True
return parser.parse_args()
def get_ssh_keys(fingerprint_hash):
"""Return all SSH keys."""
keys = {}
key_files = ['/etc/ssh/ssh_host_rsa_key']
for key_file in key_files:
output = subprocess.check_output(
['ssh-keygen', '-l', '-E', fingerprint_hash, '-f', key_file])
fingerprint = output.decode().split()[1]
keys[fingerprint] = {
'ssh_fingerprint': fingerprint,
'service': 'ssh',
'key_file': key_file,
'available_domains': ['*']
}
return keys
def get_pem_ssh_fingerprint(pem_file, fingerprint_hash):
"""Return the SSH fingerprint of a PEM file."""
public_key = subprocess.check_output(
['openssl', 'rsa', '-in', pem_file, '-pubout'],
stderr=subprocess.DEVNULL)
ssh_public_key = subprocess.check_output(
['ssh-keygen', '-i', '-m', 'PKCS8', '-f', '/dev/stdin'],
input=public_key)
fingerprint = subprocess.check_output(
['ssh-keygen', '-l', '-E', fingerprint_hash, '-f', '/dev/stdin'],
input=ssh_public_key)
return fingerprint.decode().split()[1]
def get_https_keys(fingerprint_hash):
"""Return all HTTPS keys."""
aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
augeas.Augeas.NO_MODL_AUTOLOAD)
aug.set('/augeas/load/Httpd/lens', 'Httpd.lns')
aug.set('/augeas/load/Httpd/incl[last() + 1]',
'/etc/apache2/sites-available/*')
aug.set('/augeas/load/Httpd/incl[last() + 1]',
'/etc/apache2/conf-available/*')
aug.load()
# Read from default-tls.conf and default-ssl.conf
keys = {}
path = '/files/etc/apache2/sites-available//VirtualHost'
for match in aug.match(path):
host = {'available_domains': ['*'], 'service': 'https'}
for directive in aug.match(match + '/directive'):
name = aug.get(directive)
if name == 'ServerName':
host['available_domains'] = [aug.get(directive + '/arg')]
elif name in ('GnuTLSKeyFile', 'SSLCertificateKeyFile'):
host['key_file'] = aug.get(directive + '/arg')
if 'key_file' in host:
host['ssh_fingerprint'] = get_pem_ssh_fingerprint(
host['key_file'], fingerprint_hash)
keys[host['ssh_fingerprint']] = host
# Read from FreedomBox configured domains with proper SSL certs.
path = "/files/etc/apache2/sites-available//" \
"directive[. = 'Use'][arg[1] = 'FreedomBoxTLSSiteMacro']"
key_file = ("/files/etc/apache2//Macro[arg[1] = 'FreedomBoxTLSSiteMacro']"
"//VirtualHost/directive[. = 'GnuTLSKeyFile']/arg")
key_file = aug.get(key_file)
for match in aug.match(path):
domain = aug.get(match + '/arg[2]')
host = {
'available_domains': [domain],
'service': 'https',
'key_file': key_file.replace('$domain', domain)
}
host['ssh_fingerprint'] = get_pem_ssh_fingerprint(
host['key_file'], fingerprint_hash)
keys[host['ssh_fingerprint']] = host
return keys
def get_monkeysphere_keys(key_id=None):
"""Return the list of keys imported into monkeysphere."""
try:
key_ids = [] if not key_id else [key_id]
output = subprocess.check_output(['monkeysphere-host', 'show-keys'] +
key_ids, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
# no keys available
return {}
# parse output
default_dict = {'imported_domains': [], 'available_domains': []}
keys = [copy.deepcopy(default_dict)]
lines = output.decode().strip().split('\n')
for line in lines:
if line.startswith('pub'):
data = line.lstrip('pub').split()
keys[-1]['pub'] = data[0]
keys[-1]['date'] = data[1]
elif line.startswith('uid'):
uid = line.lstrip('uid').strip()
if uid.startswith('['):
uid = uid[uid.index(']') + 1:].strip()
keys[-1].setdefault('uids', []).append(uid)
matches = re.match(r'([a-zA-Z]*)://(.*)(:\d*)?', uid)
keys[-1]['service'] = matches.group(1)
keys[-1]['imported_domains'].append(matches.group(2))
elif line.startswith('OpenPGP fingerprint:'):
keys[-1]['openpgp_fingerprint'] = \
line.lstrip('OpenPGP fingerprint:')
elif line.startswith('ssh fingerprint:'):
data = line.lstrip('ssh fingerprint:').split()
keys[-1]['ssh_key_size'] = data[0]
keys[-1]['ssh_fingerprint'] = data[1]
keys[-1]['ssh_key_type'] = data[2].strip('()')
elif line == '':
keys.append(copy.deepcopy(default_dict))
return {key['ssh_fingerprint']: key for key in keys}
def get_merged_keys(key_id=None):
"""Return merged list of system and monkeysphere keys."""
keys = get_monkeysphere_keys(key_id)
# Monkeysphere used use MD5 for fingerprint hash and recently
# changed to SHA256. In case of SHA256 the string 'SHA256:' is
# being prepended to the fingerprint. Hoping that such a prefix
# will be available in all future changes, extract it from one key
# (assuming all the others will be the same) and use it.
fingerprint_hash = 'SHA256'
if keys:
fingerprint_hash = list(keys.keys())[0].split(':')[0]
system_keys = list(get_ssh_keys(fingerprint_hash).items()) + \
list(get_https_keys(fingerprint_hash).items())
for ssh_fingerprint, key in system_keys:
if key_id and ssh_fingerprint not in keys:
continue
if ssh_fingerprint in keys:
keys[ssh_fingerprint].update({
'available_domains': key['available_domains'],
'key_file': key['key_file']
})
else:
keys[ssh_fingerprint] = key
return keys
@contextlib.contextmanager
def _get_ssh_key_file_for_import(original_key_file, service):
"""Return an SSH key file that can be imported into monkeysphere.
If the key file is in PEM format, the key file can be used as it is.
Otherwise, if the file is in the OpenSSH key format, which is default since
7.8, then convert it to PEM format.
"""
if service != 'ssh':
yield original_key_file
return
first_line = open(original_key_file, 'r').readline()
if '--BEGIN OPENSSH PRIVATE KEY--' not in first_line:
yield original_key_file
return
with tempfile.TemporaryDirectory() as temp_directory:
key_file = os.path.join(temp_directory, 'ssh_key_file')
shutil.copy2(original_key_file, key_file)
# Convert OpenSSH format to PEM
subprocess.run(
['ssh-keygen', '-p', '-N', '', '-m', 'PEM', '-f', key_file])
yield key_file
def subcommand_host_show_keys(arguments):
"""Show host key fingerprints."""
print(json.dumps({'keys': get_merged_keys(arguments.key_id)}))
def subcommand_host_import_key(arguments, second_run=False):
"""Import host SSH key."""
keys = get_merged_keys()
if arguments.ssh_fingerprint not in keys:
raise Exception('Unknown SSH fingerprint')
key = keys[arguments.ssh_fingerprint]
if '*' in key['available_domains']:
key['available_domains'] = arguments.domains
if 'openpgp_fingerprint' not in key and not second_run:
env = dict(os.environ, MONKEYSPHERE_PROMPT='false')
with _get_ssh_key_file_for_import(key['key_file'],
key['service']) as key_file:
subprocess.check_call([
'monkeysphere-host', 'import-key', key_file,
key['service'] + '://' + key['available_domains'][0]
], env=env)
subcommand_host_import_key(arguments, second_run=True)
else:
for domain in key['available_domains']:
if domain in key['imported_domains']:
continue
env = dict(os.environ, MONKEYSPHERE_PROMPT='false')
subprocess.check_call([
'monkeysphere-host', 'add-servicename',
key['service'] + '://' + domain, key['openpgp_fingerprint']
], env=env)
def subcommand_host_publish_key(arguments):
"""Push host key to keyserver."""
# setting TMPDIR as workaround for Debian bug #656750
proc = subprocess.Popen(
['monkeysphere-host', 'publish-keys'] + arguments.key_ids,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=dict(os.environ,
TMPDIR='/var/lib/monkeysphere/authentication/tmp/',
MONKEYSPHERE_PROMPT='false'))
output, error = proc.communicate()
output, error = output.decode(), error.decode()
if proc.returncode != 0:
raise Exception(output, error)
print(output)
def subcommand_host_cancel_publish(arguments):
"""Kill a running publish process."""
process = psutil.Process(int(arguments.pid))
# Perform tight checks on the process before killing for security.
arguments = process.cmdline()
if not arguments:
# Process already completed
return
# Remove the sudo prefix if present
while arguments[0] == 'sudo' or arguments[0].startswith('-'):
arguments = arguments[1:]
if len(arguments) >= 2 and \
arguments[0].split('/')[-1] == 'monkeysphere' and \
arguments[1] == 'host-publish-key':
process.send_signal(signal.SIGTERM)
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()