mumble: Use privileged decorator for superuser actions

- Don't use command line argument for sending the join password for security.

- murmurd can switch to mumble-server UID before performing database operations.
Remove code to switch to mumble-server user.

- murmurd seems to return correct response code of 0 upon successfully setting
the password. Simplify code accordingly.

- Use subprocess.run() instead of subprocess.Popen for convenience.

Tests:

- Run functional and unit tests on Debian stable.

- Perform a fresh installation.

- Verify that setting super user password works.

- Verify that setting root channel names works.

- Verify that setting join password works.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2022-06-21 11:21:31 -07:00 committed by James Valleroy
parent a55e63b7f4
commit 7470821dc7
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
4 changed files with 104 additions and 173 deletions

View File

@ -1,143 +0,0 @@
#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Configure Mumble server.
"""
import argparse
import pathlib
import sys
from subprocess import PIPE, Popen
import augeas
from plinth import action_utils
CONFIG_FILE = '/etc/mumble-server.ini'
DATA_DIR = '/var/lib/mumble-server'
def parse_arguments():
"""Return parsed command line arguments as dictionary."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
subparsers.add_parser('setup', help='Setup Mumble server')
subparsers.add_parser('create-password',
help='Setup mumble superuser password')
subparsers.add_parser('get-domain', help='Print Mumble domain')
subparser = subparsers.add_parser('set-domain', help='Setup Mumble domain')
subparser.add_argument('domain_name', help='Domain name to be allowed')
subparser = subparsers.add_parser('change-root-channel-name',
help='Set the root channel name')
subparser.add_argument('root_channel_name', help='New root channel name')
subparsers.add_parser('get-root-channel-name',
help='Print the root channel name')
subparser = subparsers.add_parser(
'change-join-password', help='Set the password to join the server')
subparser.add_argument('join_password', help='New join password')
subparsers.required = True
return parser.parse_args()
def subcommand_setup(_):
"""Setup Mumble server."""
aug = load_augeas()
aug.set('.anon/sslCert', DATA_DIR + '/fullchain.pem')
aug.set('.anon/sslKey', DATA_DIR + '/privkey.pem')
aug.save()
def read_from_stdin():
"""Read password from stdin"""
return (''.join(sys.stdin)).strip()
def subcommand_create_password(_):
"""Save superuser password with murmurd command"""
password = read_from_stdin()
cmd = ['murmurd', '-ini', CONFIG_FILE, '-readsupw']
proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=False)
# The exit code of the command above seems to be 1 when successful!
# checking if the 'phrase' is included in the error message which
# shows that the password is successfully set.
out, err = proc.communicate(input=password.encode())
out, err = out.decode(), err.decode()
phrase = "Superuser password set on server"
if phrase not in err:
print("Error occured while saving password: %s" % err)
sys.exit(1)
def subcommand_get_domain(_):
"""Print the file containing domain name or empty string."""
domain_file = pathlib.Path('/var/lib/mumble-server/domain-freedombox')
try:
print(domain_file.read_text())
except FileNotFoundError:
pass
def subcommand_set_domain(arguments):
"""Write a file containing domain name."""
domain_file = pathlib.Path('/var/lib/mumble-server/domain-freedombox')
domain_file.write_text(arguments.domain_name)
def subcommand_change_join_password(arguments):
"""Change to password that is required to join the server"""
aug = load_augeas()
aug.set('.anon/serverpassword', arguments.join_password)
aug.save()
action_utils.service_try_restart('mumble-server')
def subcommand_change_root_channel_name(arguments):
"""Change the name of the Root channel."""
aug = load_augeas()
aug.set('.anon/registerName', arguments.root_channel_name)
aug.save()
action_utils.service_try_restart('mumble-server')
def subcommand_get_root_channel_name(_):
aug = load_augeas()
name = aug.get('.anon/registerName')
if name:
print(name)
def load_augeas():
"""Initialize Augeas."""
aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
augeas.Augeas.NO_MODL_AUTOLOAD)
aug.transform('Php', CONFIG_FILE)
aug.set('/augeas/context', '/files' + CONFIG_FILE)
aug.load()
return aug
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()

View File

@ -8,7 +8,6 @@ import pathlib
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
from plinth import actions
from plinth import app as app_module
from plinth import frontpage, menu
from plinth.daemon import Daemon
@ -20,7 +19,7 @@ from plinth.modules.users.components import UsersAndGroups
from plinth.package import Packages
from plinth.utils import Version
from . import manifest
from . import manifest, privileged
_description = [
_('Mumble is an open source, low-latency, encrypted, high quality '
@ -98,7 +97,7 @@ class MumbleApp(app_module.App):
def setup(helper, old_version=None):
"""Install and configure the module."""
app.setup(old_version)
helper.call('post', actions.superuser_run, 'mumble', ['setup'])
helper.call('post', privileged.setup)
if not old_version:
helper.call('post', app.enable)
@ -116,7 +115,7 @@ def force_upgrade(helper, packages):
return False
helper.install(['mumble-server'], force_configuration='new')
helper.call('post', actions.superuser_run, 'mumble', ['setup'])
helper.call('post', privileged.setup)
return True
@ -126,18 +125,12 @@ def get_available_domains():
if domain.domain_type.can_have_certificate)
def set_domain(domain):
"""Set the TLS domain by writing a file to data directory."""
if domain:
actions.superuser_run('mumble', ['set-domain', domain])
def get_domain():
"""Read TLS domain from config file select first available if none."""
domain = actions.superuser_run('mumble', ['get-domain']).strip()
domain = privileged.get_domain()
if not domain:
domain = next(get_available_domains(), None)
set_domain(domain)
privileged.set_domain(domain)
return domain
@ -152,8 +145,3 @@ def get_domains():
return [domain]
return []
def get_root_channel_name():
"""Return the root channel name."""
return actions.superuser_run('mumble', ['get-root-channel-name'])

View File

@ -0,0 +1,87 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Configure Mumble server.
"""
import pathlib
import subprocess
from typing import Optional
import augeas
from plinth import action_utils
from plinth.actions import privileged
CONFIG_FILE = '/etc/mumble-server.ini'
DATA_DIR = '/var/lib/mumble-server'
@privileged
def setup():
"""Setup Mumble server."""
aug = _load_augeas()
aug.set('.anon/sslCert', DATA_DIR + '/fullchain.pem')
aug.set('.anon/sslKey', DATA_DIR + '/privkey.pem')
aug.save()
@privileged
def set_super_user_password(password: str):
"""Set the superuser password with murmurd command."""
subprocess.run(['murmurd', '-readsupw'], input=password.encode(),
stdout=subprocess.DEVNULL, check=False)
@privileged
def get_domain() -> Optional[str]:
"""Return domain name set in mumble or empty string."""
domain_file = pathlib.Path('/var/lib/mumble-server/domain-freedombox')
try:
return domain_file.read_text(encoding='utf-8')
except FileNotFoundError:
return None
@privileged
def set_domain(domain_name: Optional[str]):
"""Write a file containing domain name."""
if domain_name:
domain_file = pathlib.Path('/var/lib/mumble-server/domain-freedombox')
domain_file.write_text(domain_name, encoding='utf-8')
@privileged
def change_join_password(join_password: str):
"""Change to password that is required to join the server"""
aug = _load_augeas()
aug.set('.anon/serverpassword', join_password)
aug.save()
action_utils.service_try_restart('mumble-server')
@privileged
def change_root_channel_name(root_channel_name: str):
"""Change the name of the Root channel."""
aug = _load_augeas()
aug.set('.anon/registerName', root_channel_name)
aug.save()
action_utils.service_try_restart('mumble-server')
@privileged
def get_root_channel_name() -> Optional[str]:
"""Return the currently configured Root channel name."""
aug = _load_augeas()
name = aug.get('.anon/registerName')
return name or None
def _load_augeas():
"""Initialize Augeas."""
aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
augeas.Augeas.NO_MODL_AUTOLOAD)
aug.transform('Php', CONFIG_FILE)
aug.set('/augeas/context', '/files' + CONFIG_FILE)
aug.load()
return aug

View File

@ -1,12 +1,17 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Views for mumble app.
"""
from django.contrib import messages
from django.utils.translation import gettext_lazy as _
from plinth import actions
from plinth.modules import mumble
from plinth.modules.mumble.forms import MumbleForm
from plinth.views import AppView
from . import privileged
class MumbleAppView(AppView):
app_id = 'mumble'
@ -16,38 +21,32 @@ class MumbleAppView(AppView):
"""Return the values to fill in the form."""
initial = super().get_initial()
initial['domain'] = mumble.get_domain()
initial['root_channel_name'] = mumble.get_root_channel_name()
initial['root_channel_name'] = privileged.get_root_channel_name()
return initial
def form_valid(self, form):
"""Apply new superuser password if it exists"""
"""Apply form changes."""
new_config = form.cleaned_data
if mumble.get_domain() != new_config['domain']:
mumble.set_domain(new_config['domain'])
privileged.set_domain(new_config['domain'])
mumble.app.get_component('letsencrypt-mumble').setup_certificates()
messages.success(self.request, _('Configuration updated'))
password = new_config.get('super_user_password')
if password:
actions.run_as_user(
'mumble',
['create-password'],
input=password.encode(),
become_user="mumble-server",
)
privileged.set_super_user_password(password)
messages.success(self.request,
_('SuperUser password successfully updated.'))
join_password = new_config.get('join_password')
if join_password:
actions.superuser_run('mumble',
['change-join-password', join_password])
privileged.change_join_password(join_password)
messages.success(self.request, _('Join password changed'))
name = new_config.get('root_channel_name')
if name:
actions.superuser_run('mumble', ['change-root-channel-name', name])
privileged.change_root_channel_name(name)
messages.success(self.request, _('Root channel name changed.'))
return super().form_valid(form)