matrixsynapse: Add token based registration verification

- Allow setting registration verification to token based registration
verification.

- Configure the server with registration secret. Use the registration secret to
register an admin account for FreedomBox's use. Store the access token provided
during registration for future use.

- Use Admin API and the access token to create a registration verification
token. Show list of all registration tokens on app page.

Tests:

- On a fresh installation, setup succeeds, public registration is disabled.
Enabling public registration sets verification to be disabled by default.
Registration tokens are not shown in status.

- Without the patch, install the app and enable public registration. Apply the
patches. After update registration verification will show as disabled.

- Setting verification method to registration token works.
freedombox-registration-secret.yaml file is created. This file has 0o600
permissions and is owned by matrix-synapse:nogroup.
freedombox-admin-access-token.txt file is created. This file has 0o600
permissions and is owned by root:root. List of registration tokens are shown in
status section. Registration with Element app works with the token listed.

- Disabling registration verification works. Registration tokens are not shown
in status section. Registration with Element app works without verification.

- Disable app. Try to update the verification configuration to use tokens. An
error should be thrown that configuration can't be updated when app is disabled.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2023-02-13 16:21:17 -08:00 committed by James Valleroy
parent 485107604f
commit 92aff3e63c
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
4 changed files with 279 additions and 10 deletions

View File

@ -10,6 +10,17 @@ from django.utils.translation import gettext_lazy as _
from plinth.modules.coturn.forms import turn_uris_validator
from plinth.utils import format_lazy
_registration_verification_choices = [
('disabled',
_('Disabled. This could lead to adversaries registering many spam '
'accounts on your server with automated scripts.')),
('token',
_('Require users creating a new account to use a registration token. A '
'token will be created automatically. Pass this token to your '
'potential new users. They will be asked for the token during '
'registration. (recommended)')),
]
class MatrixSynapseForm(forms.Form):
enable_public_registration = forms.BooleanField(
@ -18,6 +29,11 @@ class MatrixSynapseForm(forms.Form):
'can register a new account on your Matrix server. Disable this '
'if you only want existing users to be able to use it.'))
registration_verification = forms.ChoiceField(
label=_('Verification method for registration'),
choices=_registration_verification_choices, required=True,
widget=forms.RadioSelect)
enable_managed_turn = forms.BooleanField(
label=_('Automatically manage audio/video call setup'), required=False,
help_text=format_lazy(

View File

@ -1,14 +1,18 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Configure Matrix-Synapse server."""
import hashlib
import hmac
import json
import os
import pathlib
from typing import Optional
import shutil
from typing import Dict, List, Optional, Union
import requests
import yaml
from plinth import action_utils
from plinth import action_utils, utils
from plinth.actions import privileged
CONF_DIR = "/etc/matrix-synapse/conf.d/"
@ -18,8 +22,12 @@ SERVER_NAME_PATH = CONF_DIR + 'server_name.yaml'
STATIC_CONF_PATH = CONF_DIR + 'freedombox-static.yaml'
LISTENERS_CONF_PATH = CONF_DIR + 'freedombox-listeners.yaml'
REGISTRATION_CONF_PATH = CONF_DIR + 'freedombox-registration.yaml'
REGISTRATION_SECRET_PATH = CONF_DIR + 'freedombox-registration-secret.yaml'
ADMIN_ACCESS_TOKEN_PATH = CONF_DIR + 'freedombox-admin-access-token.txt'
TURN_CONF_PATH = CONF_DIR + 'freedombox-turn.yaml'
OVERRIDDEN_TURN_CONF_PATH = CONF_DIR + 'turn.yaml'
FREEDOMBOX_ADMIN_USERNAME = 'freedombox-admin'
ADMIN_API_BASE = 'http://localhost:8008/_synapse/admin/v1/'
STATIC_CONFIG = {
'max_upload_size':
@ -81,6 +89,8 @@ def get_config():
if config.get('enable_registration_without_verification'):
registration_verification = 'disabled'
elif config.get('registration_requires_token'):
registration_verification = 'token'
else:
registration_verification = None
@ -94,9 +104,14 @@ def get_config():
def set_config(public_registration: bool,
registration_verification: Optional[str] = None):
"""Enable/disable public user registration."""
if registration_verification == 'token':
_create_registration_token()
config = {'enable_registration': public_registration}
if public_registration and registration_verification in ('disabled', None):
config['enable_registration_without_verification'] = True
elif registration_verification == 'token':
config['registration_requires_token'] = True
with open(REGISTRATION_CONF_PATH, 'w', encoding='utf-8') as reg_conf_file:
yaml.dump(config, reg_conf_file)
@ -155,3 +170,172 @@ def fix_public_registrations():
and config['registration_verification'] is None):
set_config(public_registration=True,
registration_verification='disabled')
def _ensure_server_is_running():
"""Raise an exception if matrix-synapse server is not running."""
if not action_utils.service_is_running('matrix-synapse'):
raise ProcessLookupError
def _get_registration_shared_secret() -> str:
"""Ensure that server is set to use a shared secret for registration."""
secret_file = pathlib.Path(REGISTRATION_SECRET_PATH)
if secret_file.exists():
with secret_file.open() as file_handle:
return yaml.safe_load(file_handle)['registration_shared_secret']
secret = utils.generate_password()
# Ensure that file is readable by matrix-synapse user and root.
previous_umask = os.umask(0o077)
try:
secret_file.write_text(f'registration_shared_secret: "{secret}"\n')
finally:
os.umask(previous_umask)
shutil.chown(secret_file, 'matrix-synapse', 'nogroup')
action_utils.service_try_restart('matrix-synapse')
return secret
def _parse_api_error(response):
"""Raise exception if the response of API call was an error."""
if response.get('errcode'):
raise ConnectionError(response['errcode'], response['error'])
def _get_access_token() -> str:
"""Create a new freedombox-admin account and return its access token.
For user registration Admin API, see:
https://matrix-org.github.io/synapse/latest/admin_api/register_api.html
"""
access_token_path = pathlib.Path(ADMIN_ACCESS_TOKEN_PATH)
if access_token_path.exists():
return access_token_path.read_text().strip()
shared_secret = _get_registration_shared_secret()
nonce = _get_nonce()
username = FREEDOMBOX_ADMIN_USERNAME
# No need to store password, we will use the access token.
password = utils.generate_password()
mac = _generate_mac(shared_secret, nonce, username, password, True)
data = {
'nonce': nonce,
'username': username,
'displayname': 'FreedomBox Admin',
'password': password,
'admin': True,
'mac': mac
}
request = requests.post(ADMIN_API_BASE + 'register', json=data)
response = request.json()
_parse_api_error(response)
# Ensure that file is only readable by root user.
previous_umask = os.umask(0o077)
try:
access_token_path.write_text(response['access_token'])
finally:
os.umask(previous_umask)
return response['access_token']
@privileged
def list_registration_tokens() -> List[Dict[str, Optional[Union[str, int]]]]:
"""Return the current list of registration tokens."""
if not action_utils.service_is_running('matrix-synapse'):
return []
access_token = _get_access_token()
return _list_registration_tokens(access_token)
def _get_headers(access_token: str):
"""Return the common HTTP headers needed for synapse admin API.
For details on authorization to the Admin API, see:
https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/index.html
"""
return {'Authorization': f'Bearer {access_token}'}
def _list_registration_tokens(
access_token: str) -> List[Dict[str, Optional[Union[str, int]]]]:
"""Use Admin API to fetch the list of registration tokens.
For details on registration tokens API, see:
https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/registration_tokens.html
"""
request = requests.get(ADMIN_API_BASE + 'registration_tokens',
headers=_get_headers(access_token))
response = request.json()
_parse_api_error(response)
return response['registration_tokens']
def _create_registration_token():
"""Make sure that at least one registration token is created.
For details on registration tokens API, see:
https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/registration_tokens.html
"""
_ensure_server_is_running()
access_token = _get_access_token()
tokens = _list_registration_tokens(access_token)
for token in tokens:
if token['uses_allowed'] is None and token['expiry_time'] is None:
# A token with unlimited uses and for unlimited time already
# exists.
return
if not tokens:
# API can generate a token for us but it includes special chars which
# is awkward to deal with for users.
token = utils.generate_password(size=12)
request = requests.post(ADMIN_API_BASE + 'registration_tokens/new',
headers=_get_headers(access_token),
json={'token': token})
response = request.json()
_parse_api_error(response)
def _get_nonce() -> str:
"""Make a preliminary registration request to get a nonce.
Nonce must be returned to the server during actual registration request. It
is used in computation of MAC and prevents replay attacks.
https://matrix-org.github.io/synapse/latest/admin_api/register_api.html
"""
request = requests.get(ADMIN_API_BASE + 'register')
response = request.json()
_parse_api_error(response)
return response['nonce']
def _generate_mac(shared_secret: str, nonce: str, user: str, password: str,
admin: bool = False, user_type=None) -> str:
"""Generate MAC using HMAC-SHA1 algorithm.
For information on how to encode the data for MAC computation, see:
https://matrix-org.github.io/synapse/latest/admin_api/register_api.html
"""
mac = hmac.new(
key=shared_secret.encode('utf8'),
digestmod=hashlib.sha1,
)
mac.update(nonce.encode('utf8'))
mac.update(b'\x00')
mac.update(user.encode('utf8'))
mac.update(b'\x00')
mac.update(password.encode('utf8'))
mac.update(b'\x00')
mac.update(b'admin' if admin else b'notadmin')
if user_type:
mac.update(b'\x00')
mac.update(user_type.encode('utf8'))
return mac.hexdigest()

View File

@ -24,6 +24,50 @@
enabled.
{% endblocktrans %}
</p>
{% if config.public_registration and config.registration_verification == 'token' and registration_tokens %}
<p>
{% blocktrans trimmed %}
New users must use one of the following tokens for verification during
account registration:
{% endblocktrans %}
</p>
<div class="table-responsive table-registration-tokens">
<table class="table">
<thead>
<tr>
<th>{% trans "Registration Token" %}</th>
<th>{% trans "Uses Allowed" %}</th>
<th>{% trans "Pending Registrations" %}</th>
<th>{% trans "Completed Registrations" %}</th>
<th>{% trans "Expiry Time" %}</th>
</tr>
</thead>
<tbody>
{% for token in registration_tokens %}
<tr>
<td>{{ token.token }}</td>
<td>
{% if token.uses_allowed is None %}
{% trans "Unlimited" %}
{% else %}
{{ token.uses_allowed }}
{% endif %}
</td>
<td>{{ token.pending }}</td>
<td>{{ token.completed }}</td>
<td>
{% if token.expiry_time %}
{{ token.expiry_time|date:"DATETIME_FORMAT" }}
{% else %}
{% trans "None" %}
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
{% if certificate_status != "valid" %}
<div class="alert alert-warning" role="alert">

View File

@ -1,6 +1,8 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Views for the Matrix Synapse module."""
import datetime
from django.contrib import messages
from django.shortcuts import redirect
from django.urls import reverse_lazy
@ -61,21 +63,33 @@ class MatrixSynapseAppView(AppView):
context = super().get_context_data(*args, **kwargs)
context['domain_name'] = matrixsynapse.get_configured_domain_name()
context['certificate_status'] = matrixsynapse.get_certificate_status()
context['config'] = privileged.get_config()
tokens = privileged.list_registration_tokens()
for token in tokens:
if token['expiry_time']:
date = datetime.datetime.utcfromtimestamp(
token['expiry_time'] / 1000)
token['expiry_time'] = date
context['registration_tokens'] = tokens
return context
def get_initial(self):
"""Return the values to fill in the form."""
initial = super().get_initial()
config, managed = get_turn_configuration()
turn_config, managed = get_turn_configuration()
config = privileged.get_config()
initial.update({
'enable_public_registration':
privileged.get_config()['public_registration'],
config['public_registration'],
'registration_verification':
config['registration_verification'] or 'disabled',
'enable_managed_turn':
managed,
'turn_uris':
'\n'.join(config.uris),
'\n'.join(turn_config.uris),
'shared_secret':
config.shared_secret
turn_config.shared_secret
})
return initial
@ -107,10 +121,21 @@ class MatrixSynapseAppView(AppView):
return old_config[prop] != new_config[prop]
is_changed = False
if changed('enable_public_registration'):
privileged.set_config(
public_registration=new_config['enable_public_registration'])
is_changed = True
if (changed('enable_public_registration')
or changed('registration_verification')):
try:
privileged.set_config(
public_registration=new_config[
'enable_public_registration'],
registration_verification=new_config[
'registration_verification'])
is_changed = True
except ProcessLookupError:
# Matrix Synapse server is not running
messages.error(
self.request,
_('Registration configuration cannot be updated when app '
'is disabled.'))
if changed('enable_managed_turn') or changed('turn_uris') or \
changed('shared_secret'):