diff --git a/plinth/frontpage.py b/plinth/frontpage.py index 0fd7e8864..012ba3c75 100644 --- a/plinth/frontpage.py +++ b/plinth/frontpage.py @@ -1,15 +1,12 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -""" -Manage application shortcuts on front page. -""" +"""Manage application shortcuts on front page.""" import json import logging import pathlib from plinth import app, cfg - -from . import actions +from plinth.modules.users import privileged as users_privileged logger = logging.getLogger(__name__) @@ -116,8 +113,7 @@ class Shortcut(app.FollowerComponent): return cls._all_shortcuts # XXX: Turn this into an API call in users module and cache - output = actions.superuser_run('users', ['get-user-groups', username]) - user_groups = set(output.strip().split('\n')) + user_groups = set(users_privileged.get_user_groups(username)) if 'admin' in user_groups: # Admin has access to all services return cls._all_shortcuts diff --git a/plinth/modules/syncthing/__init__.py b/plinth/modules/syncthing/__init__.py index 770883447..fb1d53a64 100644 --- a/plinth/modules/syncthing/__init__.py +++ b/plinth/modules/syncthing/__init__.py @@ -10,6 +10,7 @@ from plinth.modules.apache.components import Webserver from plinth.modules.backups.components import BackupRestore from plinth.modules.firewall.components import Firewall from plinth.modules.users import add_user_to_share_group +from plinth.modules.users import privileged as users_privileged from plinth.modules.users.components import UsersAndGroups from plinth.package import Packages from plinth.utils import format_lazy @@ -119,9 +120,7 @@ class SyncthingApp(app_module.App): old_groupname = 'syncthing' new_groupname = 'syncthing-access' - actions.superuser_run( - 'users', - options=['rename-group', old_groupname, new_groupname]) + users_privileged.rename_group(old_groupname, new_groupname) from django.contrib.auth.models import Group Group.objects.filter(name=old_groupname).update(name=new_groupname) diff --git a/plinth/modules/users/__init__.py b/plinth/modules/users/__init__.py index 4f4d18784..7176ac9a8 100644 --- a/plinth/modules/users/__init__.py +++ b/plinth/modules/users/__init__.py @@ -1,7 +1,5 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -""" -FreedomBox app to manage users. -""" +"""FreedomBox app to manage users.""" import grp import subprocess @@ -15,6 +13,7 @@ from plinth import cfg, menu from plinth.daemon import Daemon from plinth.package import Packages +from . import privileged from .components import UsersAndGroups first_boot_steps = [ @@ -91,10 +90,10 @@ class UsersApp(app_module.App): """Install and configure the app.""" super().setup(old_version) if not old_version: - actions.superuser_run('users', ['first-setup']) + privileged.first_setup() - actions.superuser_run('users', ['setup']) - create_group('freedombox-share') + privileged.setup() + privileged.create_group('freedombox-share') def _diagnose_ldap_entry(search_item): @@ -114,21 +113,9 @@ def _diagnose_ldap_entry(search_item): return [testname, result] -def create_group(group): - """Add an LDAP group.""" - actions.superuser_run('users', options=['create-group', group]) - - -def remove_group(group): - """Remove an LDAP group.""" - actions.superuser_run('users', options=['remove-group', group]) - - def get_last_admin_user(): """If there is only one admin user return its name else return None.""" - output = actions.superuser_run('users', ['get-group-users', 'admin']) - admin_users = output.strip().split('\n') - + admin_users = privileged.get_group_users('admin') if len(admin_users) == 1 and admin_users[0]: return admin_users[0] @@ -142,7 +129,6 @@ def add_user_to_share_group(username, service=None): except KeyError: group_members = [] if username not in group_members: - actions.superuser_run( - 'users', ['add-user-to-group', username, 'freedombox-share']) + privileged.add_user_to_group(username, 'freedombox-share') if service: actions.superuser_run('service', ['try-restart', service]) diff --git a/plinth/modules/users/forms.py b/plinth/modules/users/forms.py index 76747a9c2..64d379c45 100644 --- a/plinth/modules/users/forms.py +++ b/plinth/modules/users/forms.py @@ -1,4 +1,5 @@ # SPDX-License-Identifier: AGPL-3.0-or-later +"""Django forms for user management.""" import pwd import re @@ -16,13 +17,11 @@ from django.utils.translation import gettext_lazy import plinth.forms import plinth.modules.ssh.privileged as ssh_privileged -from plinth import actions -from plinth.errors import ActionError from plinth.modules import first_boot from plinth.modules.security import set_restricted_access from plinth.utils import is_user_admin -from . import get_last_admin_user +from . import get_last_admin_user, privileged from .components import UsersAndGroups @@ -74,11 +73,13 @@ USERNAME_FIELD = forms.CharField( class PasswordConfirmForm(forms.Form): """Password confirmation form.""" + confirm_password = forms.CharField( widget=forms.PasswordInput, label=gettext_lazy('Authorization Password')) def __init__(self, *args, **kwargs): + """Initialize form.""" super().__init__(*args, **kwargs) self.fields['confirm_password'].help_text = _( @@ -103,6 +104,7 @@ class CreateUserForm(ValidNewUsernameCheckMixin, Include options to add user to groups. """ + username = USERNAME_FIELD groups = forms.MultipleChoiceField( choices=UsersAndGroups.get_group_choices, @@ -120,6 +122,7 @@ class CreateUserForm(ValidNewUsernameCheckMixin, class Meta(UserCreationForm.Meta): """Metadata to control automatic form building.""" + fields = ('username', 'password1', 'password2', 'groups', 'language', 'confirm_password') @@ -143,14 +146,11 @@ class CreateUserForm(ValidNewUsernameCheckMixin, auth_username = self.request.user.username confirm_password = self.cleaned_data['confirm_password'] - process_input = '{0}\n{1}'.format(self.cleaned_data['password1'], - confirm_password).encode() try: - actions.superuser_run('users', [ - 'create-user', - user.get_username(), '--auth-user', auth_username - ], input=process_input) - except ActionError as error: + privileged.create_user(user.get_username(), + self.cleaned_data['password1'], + auth_username, confirm_password) + except Exception as error: messages.error( self.request, _('Creating LDAP user failed: {error}'.format( @@ -158,12 +158,10 @@ class CreateUserForm(ValidNewUsernameCheckMixin, for group in self.cleaned_data['groups']: try: - actions.superuser_run('users', [ - 'add-user-to-group', - user.get_username(), group, '--auth-user', - auth_username - ], input=confirm_password.encode()) - except ActionError as error: + privileged.add_user_to_group(user.get_username(), group, + auth_username, + confirm_password) + except Exception as error: messages.error( self.request, _('Failed to add new user to {group} group: {error}'). @@ -178,6 +176,7 @@ class CreateUserForm(ValidNewUsernameCheckMixin, class UserUpdateForm(ValidNewUsernameCheckMixin, PasswordConfirmForm, plinth.forms.LanguageSelectionFormMixin, forms.ModelForm): """When user info is changed, also updates LDAP user.""" + username = USERNAME_FIELD ssh_keys = forms.CharField( label=gettext_lazy('Authorized SSH Keys'), required=False, @@ -192,6 +191,7 @@ class UserUpdateForm(ValidNewUsernameCheckMixin, PasswordConfirmForm, class Meta: """Metadata to control automatic form building.""" + fields = ('username', 'groups', 'ssh_keys', 'language', 'is_active', 'confirm_password') model = User @@ -254,18 +254,13 @@ class UserUpdateForm(ValidNewUsernameCheckMixin, PasswordConfirmForm, user.save() self.save_m2m() - output = actions.superuser_run('users', - ['get-user-groups', self.username]) - old_groups = output.strip().split('\n') + old_groups = privileged.get_user_groups(self.username) old_groups = [group for group in old_groups if group] if self.username != user.get_username(): try: - actions.superuser_run( - 'users', - ['rename-user', self.username, - user.get_username()]) - except ActionError: + privileged.rename_user(self.username, user.get_username()) + except Exception: messages.error(self.request, _('Renaming LDAP user failed.')) @@ -273,24 +268,20 @@ class UserUpdateForm(ValidNewUsernameCheckMixin, PasswordConfirmForm, for old_group in old_groups: if old_group not in new_groups: try: - actions.superuser_run('users', [ - 'remove-user-from-group', - user.get_username(), old_group, '--auth-user', - auth_username - ], input=confirm_password.encode()) - except ActionError: + privileged.remove_user_from_group( + user.get_username(), old_group, auth_username, + confirm_password) + except Exception: messages.error(self.request, _('Failed to remove user from group.')) for new_group in new_groups: if new_group not in old_groups: try: - actions.superuser_run('users', [ - 'add-user-to-group', - user.get_username(), new_group, '--auth-user', - auth_username - ], input=confirm_password.encode()) - except ActionError: + privileged.add_user_to_group(user.get_username(), + new_group, auth_username, + confirm_password) + except Exception: messages.error(self.request, _('Failed to add user to group.')) @@ -308,14 +299,9 @@ class UserUpdateForm(ValidNewUsernameCheckMixin, PasswordConfirmForm, else: status = 'inactive' try: - actions.superuser_run('users', [ - 'set-user-status', - user.get_username(), - status, - '--auth-user', - auth_username, - ], input=confirm_password.encode()) - except ActionError: + privileged.set_user_status(user.get_username(), status, + auth_username, confirm_password) + except Exception: messages.error(self.request, _('Failed to change user status.')) @@ -352,15 +338,11 @@ class UserChangePasswordForm(PasswordConfirmForm, SetPasswordForm): user = super().save(commit) auth_username = self.request.user.username if commit: - process_input = '{0}\n{1}'.format( - self.cleaned_data['new_password1'], - self.cleaned_data['confirm_password']).encode() try: - actions.superuser_run('users', [ - 'set-user-password', - user.get_username(), '--auth-user', auth_username - ], input=process_input) - except ActionError: + privileged.set_user_password( + user.get_username(), self.cleaned_data['new_password1'], + auth_username, self.cleaned_data['confirm_password']) + except Exception: messages.error(self.request, _('Changing LDAP user password failed.')) @@ -369,6 +351,7 @@ class UserChangePasswordForm(PasswordConfirmForm, SetPasswordForm): class FirstBootForm(ValidNewUsernameCheckMixin, auth.forms.UserCreationForm): """User module first boot step: create a new admin user.""" + username = USERNAME_FIELD def __init__(self, *args, **kwargs): @@ -383,23 +366,17 @@ class FirstBootForm(ValidNewUsernameCheckMixin, auth.forms.UserCreationForm): first_boot.mark_step_done('users_firstboot') try: - actions.superuser_run( - 'users', - ['create-user', - user.get_username(), '--auth-user', ''], - input=self.cleaned_data['password1'].encode()) - except ActionError as error: + privileged.create_user(user.get_username(), + self.cleaned_data['password1']) + except Exception as error: messages.error( self.request, _('Creating LDAP user failed: {error}'.format( error=error))) try: - actions.superuser_run( - 'users', - ['add-user-to-group', - user.get_username(), 'admin']) - except ActionError as error: + privileged.add_user_to_group(user.get_username(), 'admin') + except Exception as error: messages.error( self.request, _('Failed to add new user to admin group: {error}'.format( diff --git a/actions/users b/plinth/modules/users/privileged.py old mode 100755 new mode 100644 similarity index 52% rename from actions/users rename to plinth/modules/users/privileged.py index 759b847eb..099e24c44 --- a/actions/users +++ b/plinth/modules/users/privileged.py @@ -1,142 +1,53 @@ -#!/usr/bin/python3 # SPDX-License-Identifier: AGPL-3.0-or-later -""" -Configuration helper for the LDAP user directory -""" +"""Configuration helper for the LDAP user directory.""" -import argparse import logging import os import re import shutil import subprocess -import sys +from typing import Optional import augeas from plinth import action_utils, utils +from plinth.actions import privileged INPUT_LINES = None ACCESS_CONF = '/etc/security/access.conf' LDAPSCRIPTS_CONF = '/etc/ldapscripts/freedombox-ldapscripts.conf' -def parse_arguments(): - """Return parsed command line arguments as dictionary""" - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') - - subparsers.add_parser('first-setup', help='Perform initial setup of LDAP') - subparsers.add_parser('setup', help='Setup LDAP') - - subparser = subparsers.add_parser('create-user', - help='Create an LDAP user') - subparser.add_argument('username', help='Name of the LDAP user to create') - subparser.add_argument('--auth-user', required=True) - - subparser = subparsers.add_parser('remove-user', - help='Delete an LDAP user') - subparser.add_argument( - 'username', help='Name of the LDAP user to delete. If the username is ' - 'the last admin user, a password should be provided through STDIN.') - - subparser = subparsers.add_parser('rename-user', - help='Rename an LDAP user') - subparser.add_argument('oldusername', help='Old name of the LDAP user') - subparser.add_argument('newusername', help='New name of the LDAP user') - - subparser = subparsers.add_parser('set-user-password', - help='Set the password of an LDAP user') - subparser.add_argument( - 'username', help='Name of the LDAP user to set the password for') - subparser.add_argument('--auth-user', required=True) - - subparser = subparsers.add_parser('create-group', - help='Create an LDAP group') - subparser.add_argument('groupname', - help='Name of the LDAP group to create') - - subparser = subparsers.add_parser('rename-group', - help='Rename an LDAP group') - subparser.add_argument('old_groupname', - help='Name of the LDAP group to rename') - subparser.add_argument('new_groupname', help='Name of the new LDAP group') - - subparser = subparsers.add_parser('remove-group', - help='Delete an LDAP group') - subparser.add_argument('groupname', - help='Name of the LDAP group to delete') - - subparser = subparsers.add_parser( - 'get-user-groups', help='Get all the LDAP groups for an LDAP user') - subparser.add_argument('username', - help='LDAP user to retrieve the groups for') - - subparser = subparsers.add_parser('add-user-to-group', - help='Add an LDAP user to an LDAP group') - subparser.add_argument('username', help='LDAP user to add to group') - subparser.add_argument('groupname', help='LDAP group to add the user to') - subparser.add_argument('--auth-user', required=False) - - subparser = subparsers.add_parser('set-user-status', - help='Set user as active or inactive') - subparser.add_argument('username', help='User to change status') - subparser.add_argument('status', choices=['active', 'inactive'], - help='New status of the user') - subparser.add_argument('--auth-user', required=True) - - subparser = subparsers.add_parser( - 'remove-user-from-group', - help='Remove an LDAP user from an LDAP group') - subparser.add_argument('username', help='LDAP user to remove from group') - subparser.add_argument('groupname', - help='LDAP group to remove the user from') - subparser.add_argument('--auth-user', required=False) - - help_get_group_users = 'Get the list of all users in an LDAP group' - subparser = subparsers.add_parser('get-group-users', - help=help_get_group_users) - subparser.add_argument( - 'groupname', help='name of the LDAP group to get the ' - 'list of users') - - subparsers.required = True - return parser.parse_args() - - -def validate_user(username, must_be_admin=True): +def _validate_user(username, password, must_be_admin=True): """Validate a user.""" if must_be_admin: - admins = get_admin_users() + admins = _get_admin_users() if not admins: # any user is valid return if not username: - msg = 'Argument --auth-user is required' - raise argparse.ArgumentTypeError(msg) + raise PermissionError('Authentication user is required') if username not in admins: - msg = '"{}" is not authorized to perform this action'.format( - username) - raise argparse.ArgumentTypeError(msg) + msg = f'"{username}" is not authorized to perform this action' + raise PermissionError(msg) if not username: - msg = 'Argument --auth-user is required' - raise argparse.ArgumentTypeError(msg) + raise PermissionError('Authentication user is required') - validate_password(username) + _validate_password(username, password) -def validate_password(username): +def _validate_password(username, password): """Raise an error if the user password is invalid.""" - password = read_password(last=True) if not utils.is_authenticated_user(username, password): - raise argparse.ArgumentTypeError("Invalid credentials") + raise PermissionError('Invalid credentials') -def subcommand_first_setup(_): +@privileged +def first_setup(): """Perform initial setup of LDAP.""" # Avoid reconfiguration of slapd during module upgrades, because # this will move the existing database. @@ -147,19 +58,20 @@ def subcommand_first_setup(_): action_utils.dpkg_reconfigure('slapd', {'domain': 'thisbox'}) -def subcommand_setup(_): +@privileged +def setup(): """Setup LDAP.""" # Update pam configs for access and mkhomedir. subprocess.run(['pam-auth-update', '--package'], check=True) - configure_ldapscripts() + _configure_ldapscripts() - configure_ldap_authentication() + _configure_ldap_authentication() - configure_ldap_structure() + _configure_ldap_structure() -def configure_ldap_authentication(): +def _configure_ldap_authentication(): """Configure LDAP authentication.""" action_utils.dpkg_reconfigure( 'nslcd', { @@ -179,18 +91,18 @@ def configure_ldap_authentication(): action_utils.service_start('nslcd') -def configure_ldap_structure(): +def _configure_ldap_structure(): """Configure LDAP basic structure.""" was_running = action_utils.service_is_running('slapd') if not was_running: action_utils.service_start('slapd') - setup_admin() - create_organizational_unit('users') - create_organizational_unit('groups') + _setup_admin() + _create_organizational_unit('users') + _create_organizational_unit('groups') -def create_organizational_unit(unit): +def _create_organizational_unit(unit): """Create an organizational unit in LDAP.""" distinguished_name = 'ou={unit},dc=thisbox'.format(unit=unit) try: @@ -210,7 +122,7 @@ ou: {unit}'''.format(unit=unit) check=True) -def setup_admin(): +def _setup_admin(): """Remove LDAP admin password and Allow root to modify the users.""" process = subprocess.run([ 'ldapsearch', '-Q', '-L', '-L', '-L', '-Y', 'EXTERNAL', '-H', @@ -243,7 +155,7 @@ olcRootDN: gidNumber=0+uidNumber=0,cn=peercred,cn=external,cn=auth ''') -def configure_ldapscripts(): +def _configure_ldapscripts(): """Set the configuration used by ldapscripts for later user management.""" # modify a copy of the config file shutil.copy('/etc/ldapscripts/ldapscripts.conf', LDAPSCRIPTS_CONF) @@ -266,7 +178,7 @@ def configure_ldapscripts(): aug.save() -def get_samba_users(): +def _get_samba_users(): """Get users from the Samba user database.""" # 'pdbedit -L' is better for listing users but is installed only with samba stdout = subprocess.check_output( @@ -274,14 +186,14 @@ def get_samba_users(): return re.findall(r'USER_(.*)\\0', stdout) -def delete_samba_user(username): +def _delete_samba_user(username): """Delete a Samba user.""" - if username in get_samba_users(): + if username in _get_samba_users(): subprocess.check_call(['smbpasswd', '-x', username]) - disconnect_samba_user(username) + _disconnect_samba_user(username) -def disconnect_samba_user(username): +def _disconnect_samba_user(username): """Disconnect a Samba user.""" try: subprocess.check_call(['pkill', '-U', username, 'smbd']) @@ -290,84 +202,63 @@ def disconnect_samba_user(username): raise -def get_input_lines(): - """Return list of input lines from stdin.""" - global INPUT_LINES - if INPUT_LINES is None: - INPUT_LINES = [line.strip() for line in sys.stdin] - return INPUT_LINES - - -def read_password(last=False): - """Return the password. - - Set last=True to read password from last line of the input. - - """ - line = -1 if last else 0 - return get_input_lines()[line] - - -def subcommand_create_user(arguments): +@privileged +def create_user(username: str, password: str, auth_user: Optional[str] = None, + auth_password: Optional[str] = None): """Create an LDAP user, set password and flush cache.""" - username = arguments.username - auth_user = arguments.auth_user - - validate_user(auth_user) + _validate_user(auth_user, auth_password) _run(['ldapadduser', username, 'users']) - password = read_password() - set_user_password(username, password) - flush_cache() - set_samba_user(username, password) + _set_user_password(username, password) + _flush_cache() + _set_samba_user(username, password) -def subcommand_remove_user(arguments): +@privileged +def remove_user(username: str, password: Optional[str] = None): """Remove an LDAP user.""" - username = arguments.username - groups = get_user_groups(username) + groups = _get_user_groups(username) # require authentication if the user is last admin user - if get_group_users('admin') == [username]: - validate_password(username) + if _get_group_users('admin') == [username]: + _validate_password(username, password) - delete_samba_user(username) + _delete_samba_user(username) for group in groups: - remove_user_from_group(username, group) + _remove_user_from_group(username, group) _run(['ldapdeleteuser', username]) - flush_cache() + _flush_cache() -def subcommand_rename_user(arguments): +@privileged +def rename_user(old_username: str, new_username: str): """Rename an LDAP user.""" - old_username = arguments.oldusername - new_username = arguments.newusername - groups = get_user_groups(old_username) + groups = _get_user_groups(old_username) - delete_samba_user(old_username) + _delete_samba_user(old_username) for group in groups: - remove_user_from_group(old_username, group) + _remove_user_from_group(old_username, group) _run(['ldaprenameuser', old_username, new_username]) for group in groups: - add_user_to_group(new_username, group) + _add_user_to_group(new_username, group) - flush_cache() + _flush_cache() -def set_user_password(username, password): +def _set_user_password(username, password): """Set a user's password.""" process = _run(['slappasswd', '-s', password], stdout=subprocess.PIPE) password = process.stdout.decode().strip() _run(['ldapsetpasswd', username, password]) -def set_samba_user(username, password): +def _set_samba_user(username, password): """Insert a user to the Samba database. If a user already exists, update password. @@ -379,24 +270,21 @@ def set_samba_user(username, password): raise RuntimeError('Unable to add Samba user: ', proc.stderr) -def subcommand_set_user_password(arguments): +@privileged +def set_user_password(username: str, password: str, auth_user: str, + auth_password: str): """Set a user's password.""" - username = arguments.username - auth_user = arguments.auth_user - must_be_admin = username != auth_user - validate_user(auth_user, must_be_admin=must_be_admin) + _validate_user(auth_user, auth_password, must_be_admin=must_be_admin) - password = read_password() - set_user_password(username, password) - set_samba_user(username, password) + _set_user_password(username, password) + _set_samba_user(username, password) -def get_admin_users(): - """Returns list of members in the admin group. - - Raises an error if the slapd service is not running. +def _get_admin_users(): + """Return list of members in the admin group. + Raise an error if the slapd service is not running. """ admin_users = [] @@ -420,8 +308,8 @@ def get_admin_users(): return admin_users -def get_group_users(groupname): - """Returns list of members in the group.""" +def _get_group_users(groupname): + """Return list of members in the group.""" try: process = _run(['ldapgid', '-P', groupname], stdout=subprocess.PIPE) except subprocess.CalledProcessError: @@ -435,10 +323,11 @@ def get_group_users(groupname): return users -def get_user_groups(username): - """Returns only the supplementary groups of the given user. +def _get_user_groups(username): + """Return only the supplementary groups of the given user. - Exclude the 'users' primary group from the returned list.""" + Exclude the 'users' primary group from the returned list. + """ process = _run(['ldapid', username], stdout=subprocess.PIPE, check=False) output = process.stdout.decode().strip() if output: @@ -460,121 +349,119 @@ def get_user_groups(username): return [] -def subcommand_get_user_groups(arguments): +@privileged +def get_user_groups(username: str) -> list[str]: """Return list of a given user's groups.""" - groups = get_user_groups(arguments.username) - if groups: - print(*groups, sep='\n') + return _get_user_groups(username) -def group_exists(groupname): +def _group_exists(groupname): """Return whether a group already exits.""" process = _run(['ldapgid', groupname], check=False) return process.returncode == 0 -def create_group(groupname): +def _create_group(groupname): """Add an LDAP group.""" - if not group_exists(groupname): + if not _group_exists(groupname): _run(['ldapaddgroup', groupname]) -def subcommand_create_group(arguments): +@privileged +def create_group(groupname: str): """Add an LDAP group.""" - create_group(arguments.groupname) - flush_cache() + _create_group(groupname) + _flush_cache() -def subcommand_rename_group(arguments): +@privileged +def rename_group(old_groupname: str, new_groupname: str): """Rename an LDAP group. Skip if the group to rename from doesn't exist. """ - old_groupname = arguments.old_groupname - new_groupname = arguments.new_groupname - if old_groupname == 'admin' or new_groupname == 'admin': - raise argparse.ArgumentTypeError('Can\'t rename the group "admin"') + raise ValueError('Can\'t rename the group "admin"') - if group_exists(old_groupname): + if _group_exists(old_groupname): _run(['ldaprenamegroup', old_groupname, new_groupname]) - flush_cache() + _flush_cache() -def subcommand_remove_group(arguments): +@privileged +def remove_group(groupname: str): """Remove an LDAP group.""" - groupname = arguments.groupname - if groupname == 'admin': - raise argparse.ArgumentTypeError("Can't remove the group 'admin'") + raise ValueError("Can't remove the group 'admin'") - if group_exists(groupname): + if _group_exists(groupname): _run(['ldapdeletegroup', groupname]) - flush_cache() + _flush_cache() -def add_user_to_group(username, groupname): +def _add_user_to_group(username, groupname): """Add an LDAP user to an LDAP group.""" - create_group(groupname) + _create_group(groupname) _run(['ldapaddusertogroup', username, groupname]) -def subcommand_add_user_to_group(arguments): +@privileged +def add_user_to_group(username: str, groupname: str, + auth_user: Optional[str] = None, + auth_password: Optional[str] = None): """Add an LDAP user to an LDAP group.""" - groupname = arguments.groupname - if groupname == 'admin': - validate_user(arguments.auth_user) + _validate_user(auth_user, auth_password) - add_user_to_group(arguments.username, groupname) - flush_cache() + _add_user_to_group(username, groupname) + _flush_cache() -def remove_user_from_group(username, groupname): +def _remove_user_from_group(username, groupname): """Remove an LDAP user from an LDAP group.""" _run(['ldapdeleteuserfromgroup', username, groupname]) -def subcommand_remove_user_from_group(arguments): +@privileged +def remove_user_from_group(username: str, groupname: str, auth_user: str, + auth_password: str): """Remove an LDAP user from an LDAP group.""" - username = arguments.username - groupname = arguments.groupname - if groupname == 'admin': - validate_user(arguments.auth_user) + _validate_user(auth_user, auth_password) - remove_user_from_group(username, groupname) - flush_cache() + _remove_user_from_group(username, groupname) + _flush_cache() if groupname == 'freedombox-share': - disconnect_samba_user(username) + _disconnect_samba_user(username) -def subcommand_get_group_users(arguments): +@privileged +def get_group_users(group_name: str) -> list[str]: """Get the list of users of an LDAP group.""" - for user in get_group_users(arguments.groupname): - print(user) + return _get_group_users(group_name) -def subcommand_set_user_status(arguments): +@privileged +def set_user_status(username: str, status: str, auth_user: str, + auth_password: str): """Set the status of the user.""" - username = arguments.username - status = arguments.status - auth_user = arguments.auth_user + if status not in ('active', 'inactive'): + raise ValueError('Invalid status') - validate_user(auth_user) + _validate_user(auth_user, auth_password) if status == 'active': flag = '-e' else: flag = '-d' - if username in get_samba_users(): + if username in _get_samba_users(): subprocess.check_call(['smbpasswd', flag, username]) if status == 'inactive': - disconnect_samba_user(username) + _disconnect_samba_user(username) -def flush_cache(): +def _flush_cache(): """Flush nscd and apache2 cache.""" _run(['nscd', '--invalidate=passwd']) _run(['nscd', '--invalidate=group']) @@ -587,17 +474,3 @@ def _run(arguments, check=True, **kwargs): kwargs['stdout'] = kwargs.get('stdout', subprocess.DEVNULL) kwargs['stderr'] = kwargs.get('stderr', subprocess.DEVNULL) return subprocess.run(arguments, env=env, check=check, **kwargs) - - -def main(): - """Parse arguments and perform all duties""" - arguments = parse_arguments() - - subcommand = arguments.subcommand.replace('-', '_') - subcommand_method = globals()['subcommand_' + subcommand] - - subcommand_method(arguments) - - -if __name__ == '__main__': - main() diff --git a/plinth/modules/users/templates/users_firstboot.html b/plinth/modules/users/templates/users_firstboot.html index 0802a12d9..0e3537f93 100644 --- a/plinth/modules/users/templates/users_firstboot.html +++ b/plinth/modules/users/templates/users_firstboot.html @@ -50,9 +50,9 @@ {% blocktrans trimmed %} Delete these accounts from command line and refresh the page to create an account that is usable with {{ box_name }}. On the command line run - the command 'echo "{password}" | /usr/share/plinth/actions/users - remove-user {username}'. If an account is already usable with - {{ box_name }}, skip this step. + the command "echo '{"args": ["USERNAME", "PASSWORD"], "kwargs": {}}' | + sudo /usr/share/plinth/actions/actions users remove_user". If an + account is already usable with {{ box_name }}, skip this step. {% endblocktrans %}

diff --git a/plinth/modules/users/tests/test_actions.py b/plinth/modules/users/tests/test_privileged.py similarity index 70% rename from plinth/modules/users/tests/test_actions.py rename to plinth/modules/users/tests/test_privileged.py index 832077d4e..54abf15e8 100644 --- a/plinth/modules/users/tests/test_actions.py +++ b/plinth/modules/users/tests/test_privileged.py @@ -1,4 +1,3 @@ -#!/usr/bin/python3 # SPDX-License-Identifier: AGPL-3.0-or-later """ Test module to exercise user actions. @@ -6,7 +5,6 @@ Test module to exercise user actions. it is recommended to run this module with root privileges in a virtual machine. """ -import pathlib import random import re import string @@ -15,9 +13,15 @@ import subprocess import pytest from plinth import action_utils -from plinth.modules import security +from plinth.modules.security import privileged as security_privileged +from plinth.modules.users import privileged from plinth.tests import config as test_config +pytestmark = pytest.mark.usefixtures('mock_privileged') +privileged_modules_to_mock = [ + 'plinth.modules.users.privileged', 'plinth.modules.security.privileged' +] + _cleanup_users = None _cleanup_groups = None @@ -48,13 +52,6 @@ def _random_string(length=8): [random.choice(string.ascii_lowercase) for _ in range(length)]) -def _is_exit_zero(args): - """Return whether a command gave exit code zero""" - process = subprocess.run(args, stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, check=False) - return process.returncode == 0 - - def _get_password_hash(username): """Query and return the password hash of the given LDAP username""" query = [ @@ -90,21 +87,14 @@ def _try_login_to_ssh(username, password, returncode=0): return process.returncode == returncode -def _action_file(): - """Return the path to the 'users' actions file.""" - current_directory = pathlib.Path(__file__).parent - return str(current_directory / '..' / '..' / '..' / '..' / 'actions' / - 'users') - - @pytest.fixture(name='disable_restricted_access', autouse=True) def fixture_disable_restricted_access(needs_root, load_cfg): """Disable console login restrictions.""" - restricted_access = security.get_restricted_access_enabled() + restricted_access = security_privileged.get_restricted_access_enabled() if restricted_access: - security.set_restricted_access(False) + security_privileged.disable_restricted_access() yield - security.set_restricted_access(True) + security_privileged.enable_restricted_access() else: yield @@ -135,14 +125,7 @@ def fixture_auto_cleanup_users_groups(needs_root, load_cfg): pass for group in _cleanup_groups: - _delete_group(group) - - -def _call_action(arguments, check=True, **kwargs): - """Call the action script.""" - kwargs['stdout'] = kwargs.get('stdout', subprocess.PIPE) - kwargs['stderr'] = kwargs.get('stderr', subprocess.PIPE) - return subprocess.run([_action_file()] + arguments, check=check, **kwargs) + privileged.remove_group(group) def _create_user(username=None, groups=None): @@ -151,16 +134,13 @@ def _create_user(username=None, groups=None): password = username + '_passwd' admin_user, admin_password = _get_admin_user_password() - process_input = "{0}\n{1}".format(password, admin_password).encode() - _call_action(['create-user', '--auth-user', admin_user, username], - input=process_input) + privileged.create_user(username, password, admin_user, admin_password) if groups: for group in groups: admin_user, admin_password = _get_admin_user_password() - _call_action([ - 'add-user-to-group', '--auth-user', admin_user, username, group - ], input=admin_password.encode()) + privileged.add_user_to_group(username, group, admin_user, + admin_password) if group != 'admin': _cleanup_groups.add(group) @@ -170,11 +150,11 @@ def _create_user(username=None, groups=None): def _delete_user(username): """Utility to delete an LDAP and Samba user""" - process_input = None - if _get_group_users('admin') == [username]: + admin_password = None + if privileged.get_group_users('admin') == [username]: _, admin_password = _get_admin_user_password() - process_input = admin_password.encode() - _call_action(['remove-user', username], input=process_input) + + privileged.remove_user(username, admin_password) def _create_admin_if_does_not_exist(): @@ -186,8 +166,7 @@ def _create_admin_if_does_not_exist(): def _get_admin_user_password(): """Return an admin username and password.""" - - admin_users = _get_group_users('admin') + admin_users = privileged.get_group_users('admin') if not admin_users: return ('', '') @@ -205,36 +184,20 @@ def _rename_user(old_username, new_username=None): """Rename a user.""" new_username = new_username or _random_string() - _call_action(['rename-user', old_username, new_username]) + privileged.rename_user(old_username, new_username) _cleanup_users.remove(old_username) _cleanup_users.add(new_username) return new_username -def _get_group_users(group): - """Return the list of members in a group.""" - process = _call_action(['get-group-users', group]) - return process.stdout.decode().split() - - -def _get_user_groups(username): - """Return the list of groups for a user.""" - process = _call_action(['get-user-groups', username]) - return process.stdout.decode().split() - - def _create_group(groupname=None): groupname = groupname or _random_string() - _call_action(['create-group', groupname]) + privileged.create_group(groupname) if groupname != 'admin': _cleanup_groups.add(groupname) return groupname -def _delete_group(groupname): - _call_action(['remove-group', groupname]) - - def test_create_user(): """Test whether creating a new user works.""" _create_admin_if_does_not_exist() @@ -256,9 +219,8 @@ def test_change_user_password(): old_password_hash = _get_password_hash(username) new_password = 'pass $123' - process_input = "{0}\n{1}".format(new_password, admin_password).encode() - _call_action(['set-user-password', username, '--auth-user', admin_user], - input=process_input) + privileged.set_user_password(username, new_password, admin_user, + admin_password) new_password_hash = _get_password_hash(username) assert old_password_hash != new_password_hash @@ -277,9 +239,8 @@ def test_change_password_as_non_admin_user(): old_password_hash = _get_password_hash(username) new_password = 'pass $123' - process_input = "{0}\n{1}".format(new_password, old_password).encode() - _call_action(['set-user-password', username, '--auth-user', username], - input=process_input) + privileged.set_user_password(username, new_password, username, + old_password) new_password_hash = _get_password_hash(username) assert old_password_hash != new_password_hash @@ -298,11 +259,9 @@ def test_change_other_users_password_as_non_admin(): username2, _ = _create_user() new_password = 'pass $123' - process_input = "{0}\n{1}".format(new_password, password1).encode() with pytest.raises(subprocess.CalledProcessError): - _call_action( - ['set-user-password', username2, '--auth-user', username1], - input=process_input) + privileged.set_user_password(username2, new_password, username1, + password1) def test_set_password_for_non_existent_user(): @@ -313,11 +272,9 @@ def test_set_password_for_non_existent_user(): non_existent_user = _random_string() fake_password = _random_string() - process_input = "{0}\n{1}".format(fake_password, admin_password).encode() with pytest.raises(subprocess.CalledProcessError): - _call_action([ - 'set-user-password', non_existent_user, '--auth-user', admin_user - ], input=process_input) + privileged.set_user_password(non_existent_user, fake_password, + admin_user, admin_password) def test_rename_user(): @@ -325,15 +282,15 @@ def test_rename_user(): _create_admin_if_does_not_exist() old_username, password = _create_user(groups=['admin', _random_string()]) - old_groups = _get_user_groups(old_username) + old_groups = privileged.get_user_groups(old_username) new_username = _rename_user(old_username) assert _try_login_to_ssh(new_username, password) assert _try_login_to_ssh(old_username, password, returncode=5) assert old_username not in _get_samba_users() - new_groups = _get_user_groups(new_username) - old_users_groups = _get_user_groups(old_username) + new_groups = privileged.get_user_groups(new_username) + old_users_groups = privileged.get_user_groups(old_username) assert not old_users_groups # empty assert old_groups == new_groups @@ -357,11 +314,12 @@ def test_delete_user(): username, password = _create_user(groups=[_random_string()]) _delete_user(username) - groups_after = _get_user_groups(username) + groups_after = privileged.get_user_groups(username) assert not groups_after # User gets removed from all groups # User account cannot be found after deletion - assert not _is_exit_zero(['ldapid', username]) + with pytest.raises(subprocess.CalledProcessError): + subprocess.run(['ldapid', username], check=True) # Deleted user cannot login to ssh assert _try_login_to_ssh(username, password, returncode=5) @@ -373,7 +331,7 @@ def test_delete_non_existent_user(): """Deleting a non-existent user should fail.""" non_existent_user = _random_string() with pytest.raises(subprocess.CalledProcessError): - _call_action(['delete-user', non_existent_user]) + privileged.remove_user(non_existent_user) def test_groups(): @@ -381,16 +339,18 @@ def test_groups(): groupname = _random_string() _create_group(groupname) - assert _is_exit_zero(['ldapgid', groupname]) + with pytest.raises(subprocess.CalledProcessError): + subprocess.run(['ldapgid', groupname], check=True) # create-group is idempotent - assert _is_exit_zero([_action_file(), 'create-group', groupname]) + privileged.create_group(groupname) - _delete_group(groupname) - assert not _is_exit_zero(['ldapgid', groupname]) + privileged.remove_group(groupname) + with pytest.raises(subprocess.CalledProcessError): + subprocess.run(['ldapgid', groupname], check=True) # delete-group is idempotent - assert _is_exit_zero([_action_file(), 'remove-group', groupname]) + privileged.remove_group(groupname) def test_delete_admin_group_fails(): @@ -398,7 +358,8 @@ def test_delete_admin_group_fails(): groupname = 'admin' _create_group('admin') - assert not _is_exit_zero([_action_file(), 'remove-group', groupname]) + with pytest.raises(ValueError): + privileged.remove_group(groupname) def test_user_group_interactions(): @@ -408,48 +369,38 @@ def test_user_group_interactions(): group1 = _random_string() user1, _ = _create_user(groups=[group1]) - assert [group1] == _get_user_groups(user1) + assert [group1] == privileged.get_user_groups(user1) # add-user-to-group is not idempotent with pytest.raises(subprocess.CalledProcessError): - _call_action( - ['add-user-to-group', '--auth-user', admin_user, user1, group1], - input=admin_password.encode()) + privileged.add_user_to_group(user1, group1, admin_user, admin_password) # The same user can be added to other new groups group2 = _random_string() _create_group(group2) - _call_action( - ['add-user-to-group', '--auth-user', admin_user, user1, group2], - input=admin_password.encode()) + privileged.add_user_to_group(user1, group2, admin_user, admin_password) # Adding a user to a non-existent group creates the group group3 = _random_string() - _call_action( - ['add-user-to-group', '--auth-user', admin_user, user1, group3], - input=admin_password.encode()) + privileged.add_user_to_group(user1, group3, admin_user, admin_password) _cleanup_groups.add(group3) # The expected groups got created and the user is part of them. expected_groups = [group1, group2, group3] - assert expected_groups == _get_user_groups(user1) + assert expected_groups == privileged.get_user_groups(user1) # Remove user from group group_to_remove_from = random.choice(expected_groups) - _call_action([ - 'remove-user-from-group', '--auth-user', admin_user, user1, - group_to_remove_from - ], input=admin_password.encode()) + privileged.remove_user_from_group(user1, group_to_remove_from, admin_user, + admin_password) # User is no longer in the group that they're removed from expected_groups.remove(group_to_remove_from) - assert expected_groups == _get_user_groups(user1) + assert expected_groups == privileged.get_user_groups(user1) # User cannot be removed from a group that they're not part of random_group = _random_string() _create_group(random_group) with pytest.raises(subprocess.CalledProcessError): - _call_action([ - 'remove-user-from-group', '--auth-user', admin_user, user1, - random_group - ], input=admin_password.encode()) + privileged.remove_user_from_group(user1, random_group, admin_user, + admin_password) diff --git a/plinth/modules/users/tests/test_views.py b/plinth/modules/users/tests/test_views.py index 467604990..a622c7a58 100644 --- a/plinth/modules/users/tests/test_views.py +++ b/plinth/modules/users/tests/test_views.py @@ -31,18 +31,6 @@ def fixture_users_urls(): yield -def action_run(action, options, **kwargs): - """Action return values.""" - if action == 'users' and options == ['get-group-users', 'admin']: - return 'admin' - if action == 'ssh' and options[:2] == ['get-keys', '--username']: - return '' - if action == 'users' and options == ['get-user-groups', 'tester']: - return '' - - return None - - @pytest.fixture(autouse=True) def module_patch(): """Patch users module.""" @@ -54,8 +42,20 @@ def module_patch(): UsersAndGroups('users-and-groups-minetest', reserved_usernames=['debian-minetest']) - with patch('pwd.getpwall', return_value=pwd_users),\ - patch('plinth.actions.superuser_run', side_effect=action_run): + privileged = 'plinth.modules.users.privileged' + with patch('pwd.getpwall', return_value=pwd_users), \ + patch(f'{privileged}.create_user'), \ + patch(f'{privileged}.add_user_to_group'), \ + patch(f'{privileged}.set_user_password'), \ + patch(f'{privileged}.set_user_status'), \ + patch(f'{privileged}.rename_user'), \ + patch(f'{privileged}.get_group_users') as get_group_users, \ + patch('plinth.modules.ssh.privileged.set_keys'), \ + patch('plinth.modules.ssh.privileged.get_keys') as get_keys, \ + patch(f'{privileged}.get_user_groups') as get_user_groups: + get_group_users.return_value = ['admin'] + get_keys.return_value = [] + get_user_groups.return_value = [] yield diff --git a/plinth/modules/users/views.py b/plinth/modules/users/views.py index 005701299..93a849ab3 100644 --- a/plinth/modules/users/views.py +++ b/plinth/modules/users/views.py @@ -15,13 +15,12 @@ from django.views.generic.edit import (CreateView, DeleteView, FormView, UpdateView) import plinth.modules.ssh.privileged as ssh_privileged -from plinth import actions, translation -from plinth.errors import ActionError +from plinth import translation from plinth.modules import first_boot from plinth.utils import is_user_admin from plinth.views import AppView -from . import get_last_admin_user +from . import get_last_admin_user, privileged from .forms import (CreateUserForm, FirstBootForm, UserChangePasswordForm, UserUpdateForm) @@ -59,6 +58,7 @@ class UserCreate(ContextMixin, SuccessMessageMixin, CreateView): class UserList(AppView, ContextMixin, django.views.generic.ListView): """View to list users.""" + model = User template_name = 'users_list.html' title = gettext_lazy('Users') @@ -72,6 +72,7 @@ class UserList(AppView, ContextMixin, django.views.generic.ListView): class UserUpdate(ContextMixin, SuccessMessageMixin, UpdateView): """View to update a user's details.""" + template_name = 'users_update.html' model = User form_class = UserUpdateForm @@ -101,7 +102,7 @@ class UserUpdate(ContextMixin, SuccessMessageMixin, UpdateView): ssh_keys = ssh_privileged.get_keys(self.object.username) initial['ssh_keys'] = ssh_keys.strip() initial['language'] = self.object.userprofile.language - except ActionError: + except Exception: pass return initial @@ -129,6 +130,7 @@ class UserDelete(ContextMixin, DeleteView): On GET, display a confirmation page. On POST, delete the user. """ + template_name = 'users_delete.html' model = User slug_field = 'username' @@ -136,6 +138,7 @@ class UserDelete(ContextMixin, DeleteView): title = gettext_lazy('Delete User') def __init__(self, *args, **kwargs): + """Initialize view, override delete method.""" super().__init__(*args, **kwargs) # Avoid a warning with Django 4.0 that delete member should not be @@ -149,9 +152,8 @@ class UserDelete(ContextMixin, DeleteView): messages.success(self.request, message) try: - actions.superuser_run('users', - ['remove-user', self.kwargs['slug']]) - except ActionError: + privileged.remove_user(self.kwargs['slug']) + except Exception: messages.error(self.request, _('Deleting LDAP user failed.')) def _delete(self, *args, **kwargs): @@ -177,6 +179,7 @@ class UserDelete(ContextMixin, DeleteView): class UserChangePassword(ContextMixin, SuccessMessageMixin, FormView): """View to change user password.""" + template_name = 'users_change_password.html' form_class = UserChangePasswordForm title = gettext_lazy('Change Password') @@ -216,8 +219,8 @@ class UserChangePassword(ContextMixin, SuccessMessageMixin, FormView): class FirstBootView(django.views.generic.CreateView): """Create user account and log the user in.""" - template_name = 'users_firstboot.html' + template_name = 'users_firstboot.html' form_class = FirstBootForm def dispatch(self, request, *args, **kwargs): @@ -231,15 +234,11 @@ class FirstBootView(django.views.generic.CreateView): def get_context_data(self, *args, **kwargs): """Add admin users to context data.""" context = super().get_context_data(*args, **kwargs) - - output = actions.superuser_run('users', ['get-group-users', 'admin']) - admin_users = output.strip().split('\n') if output.strip() else [] - context['admin_users'] = admin_users - + context['admin_users'] = privileged.get_group_users('admin') return context def get_form_kwargs(self): - """Make request available to the form (to insert messages)""" + """Make request available to the form (to insert messages).""" kwargs = super().get_form_kwargs() kwargs['request'] = self.request return kwargs diff --git a/plinth/tests/test_frontpage.py b/plinth/tests/test_frontpage.py index f09d84569..df6005ce1 100644 --- a/plinth/tests/test_frontpage.py +++ b/plinth/tests/test_frontpage.py @@ -109,33 +109,33 @@ def test_shortcut_list_web_apps_only(common_shortcuts): assert return_list == [cuts[0], cuts[1], cuts[2]] -@patch('plinth.actions.superuser_run') -def test_shortcut_list_with_username(superuser_run, common_shortcuts): +@patch('plinth.modules.users.privileged.get_user_groups') +def test_shortcut_list_with_username(get_user_groups, common_shortcuts): """Test listing for particular users.""" cuts = common_shortcuts return_list = Shortcut.list() assert return_list == [cuts[0], cuts[1], cuts[2], cuts[3]] - superuser_run.return_value = 'admin' + get_user_groups.return_value = ['admin'] return_list = Shortcut.list(username='admin') assert return_list == [cuts[0], cuts[1], cuts[2], cuts[3]] - superuser_run.return_value = 'group1' + get_user_groups.return_value = ['group1'] return_list = Shortcut.list(username='user1') assert return_list == [cuts[0], cuts[1], cuts[3]] - superuser_run.return_value = 'group1\ngroup2' + get_user_groups.return_value = ['group1', 'group2'] return_list = Shortcut.list(username='user2') assert return_list == [cuts[0], cuts[1], cuts[2], cuts[3]] cut = Shortcut('group2-web-app-component-1', 'name5', 'short2', url='url4', login_required=False, allowed_groups=['group3']) - superuser_run.return_value = 'group3' + get_user_groups.return_value = ['group3'] return_list = Shortcut.list(username='user3') assert return_list == [cuts[0], cuts[3], cut] - superuser_run.return_value = 'group4' + get_user_groups.return_value = ['group4'] return_list = Shortcut.list(username='user4') assert return_list == [cuts[0], cuts[3], cut]