users: Use privileged decorator for actions

Tests:

- Functional tests work (failing already)
- DONE: Showing front page shortcuts according to user groups works
  - DONE: Only user who is party of syncthing group is shown syncthing
  - DONE: Admin users are always shown all the apps
- DONE: Syncthing:
  - Not tested: When upgrading from version 2 or below, renaming group works
  - DONE: Syncthing is added to freedombox-share group
- DONE: Initial setup of users app works
  - DONE: freedombox-share group is created
- DONE: Retriving last admin user works
  - DONE: Last admin is not allowed to delete account
- DONE: Creating a new user works
  - DONE: Password is set properly (user can login with 'su - user' after)
  - DONE: Incorrect confirmation password leads to error
  - DONE: Adding the user to groups works (edit page shows correct list of groups)
- DONE: Editing a user works
  - DONE: User is renamed properly
  - DONE: Removing user from groups works
  - DONE: Adding user to new groups works
  - DONE: Providing incorrect auth password results in error message
  - DONE: Enabling/disabling account work (confirm with 'su - user'). See #2277.
- DONE: Updating user password works
  - DONE: New password is set (confirm with 'su - user')
  - DONE: Providing incorrect auth password results in error message
- DONE: Initial user account creation works
  - DONE: User account can be used (confirm with 'su - user')
  - DONE: User is added to admin group
- DONE: Exception while getting SSH keys results in showing empty field
- DONE: Removing a user works
  - DONE: Command provided in a message in users_firstboot.html works for
    deleting users.
- DONE: If an admin users exists when running first wizard, list of admin users
  is shown.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2022-09-03 10:49:02 -07:00 committed by James Valleroy
parent e3d0be2885
commit 1dcbfce713
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
10 changed files with 264 additions and 483 deletions

View File

@ -1,15 +1,12 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Manage application shortcuts on front page.
"""
"""Manage application shortcuts on front page."""
import json
import logging
import pathlib
from plinth import app, cfg
from . import actions
from plinth.modules.users import privileged as users_privileged
logger = logging.getLogger(__name__)
@ -116,8 +113,7 @@ class Shortcut(app.FollowerComponent):
return cls._all_shortcuts
# XXX: Turn this into an API call in users module and cache
output = actions.superuser_run('users', ['get-user-groups', username])
user_groups = set(output.strip().split('\n'))
user_groups = set(users_privileged.get_user_groups(username))
if 'admin' in user_groups: # Admin has access to all services
return cls._all_shortcuts

View File

@ -10,6 +10,7 @@ from plinth.modules.apache.components import Webserver
from plinth.modules.backups.components import BackupRestore
from plinth.modules.firewall.components import Firewall
from plinth.modules.users import add_user_to_share_group
from plinth.modules.users import privileged as users_privileged
from plinth.modules.users.components import UsersAndGroups
from plinth.package import Packages
from plinth.utils import format_lazy
@ -119,9 +120,7 @@ class SyncthingApp(app_module.App):
old_groupname = 'syncthing'
new_groupname = 'syncthing-access'
actions.superuser_run(
'users',
options=['rename-group', old_groupname, new_groupname])
users_privileged.rename_group(old_groupname, new_groupname)
from django.contrib.auth.models import Group
Group.objects.filter(name=old_groupname).update(name=new_groupname)

View File

@ -1,7 +1,5 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
FreedomBox app to manage users.
"""
"""FreedomBox app to manage users."""
import grp
import subprocess
@ -15,6 +13,7 @@ from plinth import cfg, menu
from plinth.daemon import Daemon
from plinth.package import Packages
from . import privileged
from .components import UsersAndGroups
first_boot_steps = [
@ -91,10 +90,10 @@ class UsersApp(app_module.App):
"""Install and configure the app."""
super().setup(old_version)
if not old_version:
actions.superuser_run('users', ['first-setup'])
privileged.first_setup()
actions.superuser_run('users', ['setup'])
create_group('freedombox-share')
privileged.setup()
privileged.create_group('freedombox-share')
def _diagnose_ldap_entry(search_item):
@ -114,21 +113,9 @@ def _diagnose_ldap_entry(search_item):
return [testname, result]
def create_group(group):
"""Add an LDAP group."""
actions.superuser_run('users', options=['create-group', group])
def remove_group(group):
"""Remove an LDAP group."""
actions.superuser_run('users', options=['remove-group', group])
def get_last_admin_user():
"""If there is only one admin user return its name else return None."""
output = actions.superuser_run('users', ['get-group-users', 'admin'])
admin_users = output.strip().split('\n')
admin_users = privileged.get_group_users('admin')
if len(admin_users) == 1 and admin_users[0]:
return admin_users[0]
@ -142,7 +129,6 @@ def add_user_to_share_group(username, service=None):
except KeyError:
group_members = []
if username not in group_members:
actions.superuser_run(
'users', ['add-user-to-group', username, 'freedombox-share'])
privileged.add_user_to_group(username, 'freedombox-share')
if service:
actions.superuser_run('service', ['try-restart', service])

View File

@ -1,4 +1,5 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Django forms for user management."""
import pwd
import re
@ -16,13 +17,11 @@ from django.utils.translation import gettext_lazy
import plinth.forms
import plinth.modules.ssh.privileged as ssh_privileged
from plinth import actions
from plinth.errors import ActionError
from plinth.modules import first_boot
from plinth.modules.security import set_restricted_access
from plinth.utils import is_user_admin
from . import get_last_admin_user
from . import get_last_admin_user, privileged
from .components import UsersAndGroups
@ -74,11 +73,13 @@ USERNAME_FIELD = forms.CharField(
class PasswordConfirmForm(forms.Form):
"""Password confirmation form."""
confirm_password = forms.CharField(
widget=forms.PasswordInput,
label=gettext_lazy('Authorization Password'))
def __init__(self, *args, **kwargs):
"""Initialize form."""
super().__init__(*args, **kwargs)
self.fields['confirm_password'].help_text = _(
@ -103,6 +104,7 @@ class CreateUserForm(ValidNewUsernameCheckMixin,
Include options to add user to groups.
"""
username = USERNAME_FIELD
groups = forms.MultipleChoiceField(
choices=UsersAndGroups.get_group_choices,
@ -120,6 +122,7 @@ class CreateUserForm(ValidNewUsernameCheckMixin,
class Meta(UserCreationForm.Meta):
"""Metadata to control automatic form building."""
fields = ('username', 'password1', 'password2', 'groups', 'language',
'confirm_password')
@ -143,14 +146,11 @@ class CreateUserForm(ValidNewUsernameCheckMixin,
auth_username = self.request.user.username
confirm_password = self.cleaned_data['confirm_password']
process_input = '{0}\n{1}'.format(self.cleaned_data['password1'],
confirm_password).encode()
try:
actions.superuser_run('users', [
'create-user',
user.get_username(), '--auth-user', auth_username
], input=process_input)
except ActionError as error:
privileged.create_user(user.get_username(),
self.cleaned_data['password1'],
auth_username, confirm_password)
except Exception as error:
messages.error(
self.request,
_('Creating LDAP user failed: {error}'.format(
@ -158,12 +158,10 @@ class CreateUserForm(ValidNewUsernameCheckMixin,
for group in self.cleaned_data['groups']:
try:
actions.superuser_run('users', [
'add-user-to-group',
user.get_username(), group, '--auth-user',
auth_username
], input=confirm_password.encode())
except ActionError as error:
privileged.add_user_to_group(user.get_username(), group,
auth_username,
confirm_password)
except Exception as error:
messages.error(
self.request,
_('Failed to add new user to {group} group: {error}').
@ -178,6 +176,7 @@ class CreateUserForm(ValidNewUsernameCheckMixin,
class UserUpdateForm(ValidNewUsernameCheckMixin, PasswordConfirmForm,
plinth.forms.LanguageSelectionFormMixin, forms.ModelForm):
"""When user info is changed, also updates LDAP user."""
username = USERNAME_FIELD
ssh_keys = forms.CharField(
label=gettext_lazy('Authorized SSH Keys'), required=False,
@ -192,6 +191,7 @@ class UserUpdateForm(ValidNewUsernameCheckMixin, PasswordConfirmForm,
class Meta:
"""Metadata to control automatic form building."""
fields = ('username', 'groups', 'ssh_keys', 'language', 'is_active',
'confirm_password')
model = User
@ -254,18 +254,13 @@ class UserUpdateForm(ValidNewUsernameCheckMixin, PasswordConfirmForm,
user.save()
self.save_m2m()
output = actions.superuser_run('users',
['get-user-groups', self.username])
old_groups = output.strip().split('\n')
old_groups = privileged.get_user_groups(self.username)
old_groups = [group for group in old_groups if group]
if self.username != user.get_username():
try:
actions.superuser_run(
'users',
['rename-user', self.username,
user.get_username()])
except ActionError:
privileged.rename_user(self.username, user.get_username())
except Exception:
messages.error(self.request,
_('Renaming LDAP user failed.'))
@ -273,24 +268,20 @@ class UserUpdateForm(ValidNewUsernameCheckMixin, PasswordConfirmForm,
for old_group in old_groups:
if old_group not in new_groups:
try:
actions.superuser_run('users', [
'remove-user-from-group',
user.get_username(), old_group, '--auth-user',
auth_username
], input=confirm_password.encode())
except ActionError:
privileged.remove_user_from_group(
user.get_username(), old_group, auth_username,
confirm_password)
except Exception:
messages.error(self.request,
_('Failed to remove user from group.'))
for new_group in new_groups:
if new_group not in old_groups:
try:
actions.superuser_run('users', [
'add-user-to-group',
user.get_username(), new_group, '--auth-user',
auth_username
], input=confirm_password.encode())
except ActionError:
privileged.add_user_to_group(user.get_username(),
new_group, auth_username,
confirm_password)
except Exception:
messages.error(self.request,
_('Failed to add user to group.'))
@ -308,14 +299,9 @@ class UserUpdateForm(ValidNewUsernameCheckMixin, PasswordConfirmForm,
else:
status = 'inactive'
try:
actions.superuser_run('users', [
'set-user-status',
user.get_username(),
status,
'--auth-user',
auth_username,
], input=confirm_password.encode())
except ActionError:
privileged.set_user_status(user.get_username(), status,
auth_username, confirm_password)
except Exception:
messages.error(self.request,
_('Failed to change user status.'))
@ -352,15 +338,11 @@ class UserChangePasswordForm(PasswordConfirmForm, SetPasswordForm):
user = super().save(commit)
auth_username = self.request.user.username
if commit:
process_input = '{0}\n{1}'.format(
self.cleaned_data['new_password1'],
self.cleaned_data['confirm_password']).encode()
try:
actions.superuser_run('users', [
'set-user-password',
user.get_username(), '--auth-user', auth_username
], input=process_input)
except ActionError:
privileged.set_user_password(
user.get_username(), self.cleaned_data['new_password1'],
auth_username, self.cleaned_data['confirm_password'])
except Exception:
messages.error(self.request,
_('Changing LDAP user password failed.'))
@ -369,6 +351,7 @@ class UserChangePasswordForm(PasswordConfirmForm, SetPasswordForm):
class FirstBootForm(ValidNewUsernameCheckMixin, auth.forms.UserCreationForm):
"""User module first boot step: create a new admin user."""
username = USERNAME_FIELD
def __init__(self, *args, **kwargs):
@ -383,23 +366,17 @@ class FirstBootForm(ValidNewUsernameCheckMixin, auth.forms.UserCreationForm):
first_boot.mark_step_done('users_firstboot')
try:
actions.superuser_run(
'users',
['create-user',
user.get_username(), '--auth-user', ''],
input=self.cleaned_data['password1'].encode())
except ActionError as error:
privileged.create_user(user.get_username(),
self.cleaned_data['password1'])
except Exception as error:
messages.error(
self.request,
_('Creating LDAP user failed: {error}'.format(
error=error)))
try:
actions.superuser_run(
'users',
['add-user-to-group',
user.get_username(), 'admin'])
except ActionError as error:
privileged.add_user_to_group(user.get_username(), 'admin')
except Exception as error:
messages.error(
self.request,
_('Failed to add new user to admin group: {error}'.format(

367
actions/users → plinth/modules/users/privileged.py Executable file → Normal file
View File

@ -1,142 +1,53 @@
#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Configuration helper for the LDAP user directory
"""
"""Configuration helper for the LDAP user directory."""
import argparse
import logging
import os
import re
import shutil
import subprocess
import sys
from typing import Optional
import augeas
from plinth import action_utils, utils
from plinth.actions import privileged
INPUT_LINES = None
ACCESS_CONF = '/etc/security/access.conf'
LDAPSCRIPTS_CONF = '/etc/ldapscripts/freedombox-ldapscripts.conf'
def parse_arguments():
"""Return parsed command line arguments as dictionary"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
subparsers.add_parser('first-setup', help='Perform initial setup of LDAP')
subparsers.add_parser('setup', help='Setup LDAP')
subparser = subparsers.add_parser('create-user',
help='Create an LDAP user')
subparser.add_argument('username', help='Name of the LDAP user to create')
subparser.add_argument('--auth-user', required=True)
subparser = subparsers.add_parser('remove-user',
help='Delete an LDAP user')
subparser.add_argument(
'username', help='Name of the LDAP user to delete. If the username is '
'the last admin user, a password should be provided through STDIN.')
subparser = subparsers.add_parser('rename-user',
help='Rename an LDAP user')
subparser.add_argument('oldusername', help='Old name of the LDAP user')
subparser.add_argument('newusername', help='New name of the LDAP user')
subparser = subparsers.add_parser('set-user-password',
help='Set the password of an LDAP user')
subparser.add_argument(
'username', help='Name of the LDAP user to set the password for')
subparser.add_argument('--auth-user', required=True)
subparser = subparsers.add_parser('create-group',
help='Create an LDAP group')
subparser.add_argument('groupname',
help='Name of the LDAP group to create')
subparser = subparsers.add_parser('rename-group',
help='Rename an LDAP group')
subparser.add_argument('old_groupname',
help='Name of the LDAP group to rename')
subparser.add_argument('new_groupname', help='Name of the new LDAP group')
subparser = subparsers.add_parser('remove-group',
help='Delete an LDAP group')
subparser.add_argument('groupname',
help='Name of the LDAP group to delete')
subparser = subparsers.add_parser(
'get-user-groups', help='Get all the LDAP groups for an LDAP user')
subparser.add_argument('username',
help='LDAP user to retrieve the groups for')
subparser = subparsers.add_parser('add-user-to-group',
help='Add an LDAP user to an LDAP group')
subparser.add_argument('username', help='LDAP user to add to group')
subparser.add_argument('groupname', help='LDAP group to add the user to')
subparser.add_argument('--auth-user', required=False)
subparser = subparsers.add_parser('set-user-status',
help='Set user as active or inactive')
subparser.add_argument('username', help='User to change status')
subparser.add_argument('status', choices=['active', 'inactive'],
help='New status of the user')
subparser.add_argument('--auth-user', required=True)
subparser = subparsers.add_parser(
'remove-user-from-group',
help='Remove an LDAP user from an LDAP group')
subparser.add_argument('username', help='LDAP user to remove from group')
subparser.add_argument('groupname',
help='LDAP group to remove the user from')
subparser.add_argument('--auth-user', required=False)
help_get_group_users = 'Get the list of all users in an LDAP group'
subparser = subparsers.add_parser('get-group-users',
help=help_get_group_users)
subparser.add_argument(
'groupname', help='name of the LDAP group to get the '
'list of users')
subparsers.required = True
return parser.parse_args()
def validate_user(username, must_be_admin=True):
def _validate_user(username, password, must_be_admin=True):
"""Validate a user."""
if must_be_admin:
admins = get_admin_users()
admins = _get_admin_users()
if not admins:
# any user is valid
return
if not username:
msg = 'Argument --auth-user is required'
raise argparse.ArgumentTypeError(msg)
raise PermissionError('Authentication user is required')
if username not in admins:
msg = '"{}" is not authorized to perform this action'.format(
username)
raise argparse.ArgumentTypeError(msg)
msg = f'"{username}" is not authorized to perform this action'
raise PermissionError(msg)
if not username:
msg = 'Argument --auth-user is required'
raise argparse.ArgumentTypeError(msg)
raise PermissionError('Authentication user is required')
validate_password(username)
_validate_password(username, password)
def validate_password(username):
def _validate_password(username, password):
"""Raise an error if the user password is invalid."""
password = read_password(last=True)
if not utils.is_authenticated_user(username, password):
raise argparse.ArgumentTypeError("Invalid credentials")
raise PermissionError('Invalid credentials')
def subcommand_first_setup(_):
@privileged
def first_setup():
"""Perform initial setup of LDAP."""
# Avoid reconfiguration of slapd during module upgrades, because
# this will move the existing database.
@ -147,19 +58,20 @@ def subcommand_first_setup(_):
action_utils.dpkg_reconfigure('slapd', {'domain': 'thisbox'})
def subcommand_setup(_):
@privileged
def setup():
"""Setup LDAP."""
# Update pam configs for access and mkhomedir.
subprocess.run(['pam-auth-update', '--package'], check=True)
configure_ldapscripts()
_configure_ldapscripts()
configure_ldap_authentication()
_configure_ldap_authentication()
configure_ldap_structure()
_configure_ldap_structure()
def configure_ldap_authentication():
def _configure_ldap_authentication():
"""Configure LDAP authentication."""
action_utils.dpkg_reconfigure(
'nslcd', {
@ -179,18 +91,18 @@ def configure_ldap_authentication():
action_utils.service_start('nslcd')
def configure_ldap_structure():
def _configure_ldap_structure():
"""Configure LDAP basic structure."""
was_running = action_utils.service_is_running('slapd')
if not was_running:
action_utils.service_start('slapd')
setup_admin()
create_organizational_unit('users')
create_organizational_unit('groups')
_setup_admin()
_create_organizational_unit('users')
_create_organizational_unit('groups')
def create_organizational_unit(unit):
def _create_organizational_unit(unit):
"""Create an organizational unit in LDAP."""
distinguished_name = 'ou={unit},dc=thisbox'.format(unit=unit)
try:
@ -210,7 +122,7 @@ ou: {unit}'''.format(unit=unit)
check=True)
def setup_admin():
def _setup_admin():
"""Remove LDAP admin password and Allow root to modify the users."""
process = subprocess.run([
'ldapsearch', '-Q', '-L', '-L', '-L', '-Y', 'EXTERNAL', '-H',
@ -243,7 +155,7 @@ olcRootDN: gidNumber=0+uidNumber=0,cn=peercred,cn=external,cn=auth
''')
def configure_ldapscripts():
def _configure_ldapscripts():
"""Set the configuration used by ldapscripts for later user management."""
# modify a copy of the config file
shutil.copy('/etc/ldapscripts/ldapscripts.conf', LDAPSCRIPTS_CONF)
@ -266,7 +178,7 @@ def configure_ldapscripts():
aug.save()
def get_samba_users():
def _get_samba_users():
"""Get users from the Samba user database."""
# 'pdbedit -L' is better for listing users but is installed only with samba
stdout = subprocess.check_output(
@ -274,14 +186,14 @@ def get_samba_users():
return re.findall(r'USER_(.*)\\0', stdout)
def delete_samba_user(username):
def _delete_samba_user(username):
"""Delete a Samba user."""
if username in get_samba_users():
if username in _get_samba_users():
subprocess.check_call(['smbpasswd', '-x', username])
disconnect_samba_user(username)
_disconnect_samba_user(username)
def disconnect_samba_user(username):
def _disconnect_samba_user(username):
"""Disconnect a Samba user."""
try:
subprocess.check_call(['pkill', '-U', username, 'smbd'])
@ -290,84 +202,63 @@ def disconnect_samba_user(username):
raise
def get_input_lines():
"""Return list of input lines from stdin."""
global INPUT_LINES
if INPUT_LINES is None:
INPUT_LINES = [line.strip() for line in sys.stdin]
return INPUT_LINES
def read_password(last=False):
"""Return the password.
Set last=True to read password from last line of the input.
"""
line = -1 if last else 0
return get_input_lines()[line]
def subcommand_create_user(arguments):
@privileged
def create_user(username: str, password: str, auth_user: Optional[str] = None,
auth_password: Optional[str] = None):
"""Create an LDAP user, set password and flush cache."""
username = arguments.username
auth_user = arguments.auth_user
validate_user(auth_user)
_validate_user(auth_user, auth_password)
_run(['ldapadduser', username, 'users'])
password = read_password()
set_user_password(username, password)
flush_cache()
set_samba_user(username, password)
_set_user_password(username, password)
_flush_cache()
_set_samba_user(username, password)
def subcommand_remove_user(arguments):
@privileged
def remove_user(username: str, password: Optional[str] = None):
"""Remove an LDAP user."""
username = arguments.username
groups = get_user_groups(username)
groups = _get_user_groups(username)
# require authentication if the user is last admin user
if get_group_users('admin') == [username]:
validate_password(username)
if _get_group_users('admin') == [username]:
_validate_password(username, password)
delete_samba_user(username)
_delete_samba_user(username)
for group in groups:
remove_user_from_group(username, group)
_remove_user_from_group(username, group)
_run(['ldapdeleteuser', username])
flush_cache()
_flush_cache()
def subcommand_rename_user(arguments):
@privileged
def rename_user(old_username: str, new_username: str):
"""Rename an LDAP user."""
old_username = arguments.oldusername
new_username = arguments.newusername
groups = get_user_groups(old_username)
groups = _get_user_groups(old_username)
delete_samba_user(old_username)
_delete_samba_user(old_username)
for group in groups:
remove_user_from_group(old_username, group)
_remove_user_from_group(old_username, group)
_run(['ldaprenameuser', old_username, new_username])
for group in groups:
add_user_to_group(new_username, group)
_add_user_to_group(new_username, group)
flush_cache()
_flush_cache()
def set_user_password(username, password):
def _set_user_password(username, password):
"""Set a user's password."""
process = _run(['slappasswd', '-s', password], stdout=subprocess.PIPE)
password = process.stdout.decode().strip()
_run(['ldapsetpasswd', username, password])
def set_samba_user(username, password):
def _set_samba_user(username, password):
"""Insert a user to the Samba database.
If a user already exists, update password.
@ -379,24 +270,21 @@ def set_samba_user(username, password):
raise RuntimeError('Unable to add Samba user: ', proc.stderr)
def subcommand_set_user_password(arguments):
@privileged
def set_user_password(username: str, password: str, auth_user: str,
auth_password: str):
"""Set a user's password."""
username = arguments.username
auth_user = arguments.auth_user
must_be_admin = username != auth_user
validate_user(auth_user, must_be_admin=must_be_admin)
_validate_user(auth_user, auth_password, must_be_admin=must_be_admin)
password = read_password()
set_user_password(username, password)
set_samba_user(username, password)
_set_user_password(username, password)
_set_samba_user(username, password)
def get_admin_users():
"""Returns list of members in the admin group.
Raises an error if the slapd service is not running.
def _get_admin_users():
"""Return list of members in the admin group.
Raise an error if the slapd service is not running.
"""
admin_users = []
@ -420,8 +308,8 @@ def get_admin_users():
return admin_users
def get_group_users(groupname):
"""Returns list of members in the group."""
def _get_group_users(groupname):
"""Return list of members in the group."""
try:
process = _run(['ldapgid', '-P', groupname], stdout=subprocess.PIPE)
except subprocess.CalledProcessError:
@ -435,10 +323,11 @@ def get_group_users(groupname):
return users
def get_user_groups(username):
"""Returns only the supplementary groups of the given user.
def _get_user_groups(username):
"""Return only the supplementary groups of the given user.
Exclude the 'users' primary group from the returned list."""
Exclude the 'users' primary group from the returned list.
"""
process = _run(['ldapid', username], stdout=subprocess.PIPE, check=False)
output = process.stdout.decode().strip()
if output:
@ -460,121 +349,119 @@ def get_user_groups(username):
return []
def subcommand_get_user_groups(arguments):
@privileged
def get_user_groups(username: str) -> list[str]:
"""Return list of a given user's groups."""
groups = get_user_groups(arguments.username)
if groups:
print(*groups, sep='\n')
return _get_user_groups(username)
def group_exists(groupname):
def _group_exists(groupname):
"""Return whether a group already exits."""
process = _run(['ldapgid', groupname], check=False)
return process.returncode == 0
def create_group(groupname):
def _create_group(groupname):
"""Add an LDAP group."""
if not group_exists(groupname):
if not _group_exists(groupname):
_run(['ldapaddgroup', groupname])
def subcommand_create_group(arguments):
@privileged
def create_group(groupname: str):
"""Add an LDAP group."""
create_group(arguments.groupname)
flush_cache()
_create_group(groupname)
_flush_cache()
def subcommand_rename_group(arguments):
@privileged
def rename_group(old_groupname: str, new_groupname: str):
"""Rename an LDAP group.
Skip if the group to rename from doesn't exist.
"""
old_groupname = arguments.old_groupname
new_groupname = arguments.new_groupname
if old_groupname == 'admin' or new_groupname == 'admin':
raise argparse.ArgumentTypeError('Can\'t rename the group "admin"')
raise ValueError('Can\'t rename the group "admin"')
if group_exists(old_groupname):
if _group_exists(old_groupname):
_run(['ldaprenamegroup', old_groupname, new_groupname])
flush_cache()
_flush_cache()
def subcommand_remove_group(arguments):
@privileged
def remove_group(groupname: str):
"""Remove an LDAP group."""
groupname = arguments.groupname
if groupname == 'admin':
raise argparse.ArgumentTypeError("Can't remove the group 'admin'")
raise ValueError("Can't remove the group 'admin'")
if group_exists(groupname):
if _group_exists(groupname):
_run(['ldapdeletegroup', groupname])
flush_cache()
_flush_cache()
def add_user_to_group(username, groupname):
def _add_user_to_group(username, groupname):
"""Add an LDAP user to an LDAP group."""
create_group(groupname)
_create_group(groupname)
_run(['ldapaddusertogroup', username, groupname])
def subcommand_add_user_to_group(arguments):
@privileged
def add_user_to_group(username: str, groupname: str,
auth_user: Optional[str] = None,
auth_password: Optional[str] = None):
"""Add an LDAP user to an LDAP group."""
groupname = arguments.groupname
if groupname == 'admin':
validate_user(arguments.auth_user)
_validate_user(auth_user, auth_password)
add_user_to_group(arguments.username, groupname)
flush_cache()
_add_user_to_group(username, groupname)
_flush_cache()
def remove_user_from_group(username, groupname):
def _remove_user_from_group(username, groupname):
"""Remove an LDAP user from an LDAP group."""
_run(['ldapdeleteuserfromgroup', username, groupname])
def subcommand_remove_user_from_group(arguments):
@privileged
def remove_user_from_group(username: str, groupname: str, auth_user: str,
auth_password: str):
"""Remove an LDAP user from an LDAP group."""
username = arguments.username
groupname = arguments.groupname
if groupname == 'admin':
validate_user(arguments.auth_user)
_validate_user(auth_user, auth_password)
remove_user_from_group(username, groupname)
flush_cache()
_remove_user_from_group(username, groupname)
_flush_cache()
if groupname == 'freedombox-share':
disconnect_samba_user(username)
_disconnect_samba_user(username)
def subcommand_get_group_users(arguments):
@privileged
def get_group_users(group_name: str) -> list[str]:
"""Get the list of users of an LDAP group."""
for user in get_group_users(arguments.groupname):
print(user)
return _get_group_users(group_name)
def subcommand_set_user_status(arguments):
@privileged
def set_user_status(username: str, status: str, auth_user: str,
auth_password: str):
"""Set the status of the user."""
username = arguments.username
status = arguments.status
auth_user = arguments.auth_user
if status not in ('active', 'inactive'):
raise ValueError('Invalid status')
validate_user(auth_user)
_validate_user(auth_user, auth_password)
if status == 'active':
flag = '-e'
else:
flag = '-d'
if username in get_samba_users():
if username in _get_samba_users():
subprocess.check_call(['smbpasswd', flag, username])
if status == 'inactive':
disconnect_samba_user(username)
_disconnect_samba_user(username)
def flush_cache():
def _flush_cache():
"""Flush nscd and apache2 cache."""
_run(['nscd', '--invalidate=passwd'])
_run(['nscd', '--invalidate=group'])
@ -587,17 +474,3 @@ def _run(arguments, check=True, **kwargs):
kwargs['stdout'] = kwargs.get('stdout', subprocess.DEVNULL)
kwargs['stderr'] = kwargs.get('stderr', subprocess.DEVNULL)
return subprocess.run(arguments, env=env, check=check, **kwargs)
def main():
"""Parse arguments and perform all duties"""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()

View File

@ -50,9 +50,9 @@
{% blocktrans trimmed %}
Delete these accounts from command line and refresh the page to create
an account that is usable with {{ box_name }}. On the command line run
the command 'echo "{password}" | /usr/share/plinth/actions/users
remove-user {username}'. If an account is already usable with
{{ box_name }}, skip this step.
the command "echo '{"args": ["USERNAME", "PASSWORD"], "kwargs": {}}' |
sudo /usr/share/plinth/actions/actions users remove_user". If an
account is already usable with {{ box_name }}, skip this step.
{% endblocktrans %}
</p>

View File

@ -1,4 +1,3 @@
#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Test module to exercise user actions.
@ -6,7 +5,6 @@ Test module to exercise user actions.
it is recommended to run this module with root privileges in a virtual machine.
"""
import pathlib
import random
import re
import string
@ -15,9 +13,15 @@ import subprocess
import pytest
from plinth import action_utils
from plinth.modules import security
from plinth.modules.security import privileged as security_privileged
from plinth.modules.users import privileged
from plinth.tests import config as test_config
pytestmark = pytest.mark.usefixtures('mock_privileged')
privileged_modules_to_mock = [
'plinth.modules.users.privileged', 'plinth.modules.security.privileged'
]
_cleanup_users = None
_cleanup_groups = None
@ -48,13 +52,6 @@ def _random_string(length=8):
[random.choice(string.ascii_lowercase) for _ in range(length)])
def _is_exit_zero(args):
"""Return whether a command gave exit code zero"""
process = subprocess.run(args, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, check=False)
return process.returncode == 0
def _get_password_hash(username):
"""Query and return the password hash of the given LDAP username"""
query = [
@ -90,21 +87,14 @@ def _try_login_to_ssh(username, password, returncode=0):
return process.returncode == returncode
def _action_file():
"""Return the path to the 'users' actions file."""
current_directory = pathlib.Path(__file__).parent
return str(current_directory / '..' / '..' / '..' / '..' / 'actions' /
'users')
@pytest.fixture(name='disable_restricted_access', autouse=True)
def fixture_disable_restricted_access(needs_root, load_cfg):
"""Disable console login restrictions."""
restricted_access = security.get_restricted_access_enabled()
restricted_access = security_privileged.get_restricted_access_enabled()
if restricted_access:
security.set_restricted_access(False)
security_privileged.disable_restricted_access()
yield
security.set_restricted_access(True)
security_privileged.enable_restricted_access()
else:
yield
@ -135,14 +125,7 @@ def fixture_auto_cleanup_users_groups(needs_root, load_cfg):
pass
for group in _cleanup_groups:
_delete_group(group)
def _call_action(arguments, check=True, **kwargs):
"""Call the action script."""
kwargs['stdout'] = kwargs.get('stdout', subprocess.PIPE)
kwargs['stderr'] = kwargs.get('stderr', subprocess.PIPE)
return subprocess.run([_action_file()] + arguments, check=check, **kwargs)
privileged.remove_group(group)
def _create_user(username=None, groups=None):
@ -151,16 +134,13 @@ def _create_user(username=None, groups=None):
password = username + '_passwd'
admin_user, admin_password = _get_admin_user_password()
process_input = "{0}\n{1}".format(password, admin_password).encode()
_call_action(['create-user', '--auth-user', admin_user, username],
input=process_input)
privileged.create_user(username, password, admin_user, admin_password)
if groups:
for group in groups:
admin_user, admin_password = _get_admin_user_password()
_call_action([
'add-user-to-group', '--auth-user', admin_user, username, group
], input=admin_password.encode())
privileged.add_user_to_group(username, group, admin_user,
admin_password)
if group != 'admin':
_cleanup_groups.add(group)
@ -170,11 +150,11 @@ def _create_user(username=None, groups=None):
def _delete_user(username):
"""Utility to delete an LDAP and Samba user"""
process_input = None
if _get_group_users('admin') == [username]:
admin_password = None
if privileged.get_group_users('admin') == [username]:
_, admin_password = _get_admin_user_password()
process_input = admin_password.encode()
_call_action(['remove-user', username], input=process_input)
privileged.remove_user(username, admin_password)
def _create_admin_if_does_not_exist():
@ -186,8 +166,7 @@ def _create_admin_if_does_not_exist():
def _get_admin_user_password():
"""Return an admin username and password."""
admin_users = _get_group_users('admin')
admin_users = privileged.get_group_users('admin')
if not admin_users:
return ('', '')
@ -205,36 +184,20 @@ def _rename_user(old_username, new_username=None):
"""Rename a user."""
new_username = new_username or _random_string()
_call_action(['rename-user', old_username, new_username])
privileged.rename_user(old_username, new_username)
_cleanup_users.remove(old_username)
_cleanup_users.add(new_username)
return new_username
def _get_group_users(group):
"""Return the list of members in a group."""
process = _call_action(['get-group-users', group])
return process.stdout.decode().split()
def _get_user_groups(username):
"""Return the list of groups for a user."""
process = _call_action(['get-user-groups', username])
return process.stdout.decode().split()
def _create_group(groupname=None):
groupname = groupname or _random_string()
_call_action(['create-group', groupname])
privileged.create_group(groupname)
if groupname != 'admin':
_cleanup_groups.add(groupname)
return groupname
def _delete_group(groupname):
_call_action(['remove-group', groupname])
def test_create_user():
"""Test whether creating a new user works."""
_create_admin_if_does_not_exist()
@ -256,9 +219,8 @@ def test_change_user_password():
old_password_hash = _get_password_hash(username)
new_password = 'pass $123'
process_input = "{0}\n{1}".format(new_password, admin_password).encode()
_call_action(['set-user-password', username, '--auth-user', admin_user],
input=process_input)
privileged.set_user_password(username, new_password, admin_user,
admin_password)
new_password_hash = _get_password_hash(username)
assert old_password_hash != new_password_hash
@ -277,9 +239,8 @@ def test_change_password_as_non_admin_user():
old_password_hash = _get_password_hash(username)
new_password = 'pass $123'
process_input = "{0}\n{1}".format(new_password, old_password).encode()
_call_action(['set-user-password', username, '--auth-user', username],
input=process_input)
privileged.set_user_password(username, new_password, username,
old_password)
new_password_hash = _get_password_hash(username)
assert old_password_hash != new_password_hash
@ -298,11 +259,9 @@ def test_change_other_users_password_as_non_admin():
username2, _ = _create_user()
new_password = 'pass $123'
process_input = "{0}\n{1}".format(new_password, password1).encode()
with pytest.raises(subprocess.CalledProcessError):
_call_action(
['set-user-password', username2, '--auth-user', username1],
input=process_input)
privileged.set_user_password(username2, new_password, username1,
password1)
def test_set_password_for_non_existent_user():
@ -313,11 +272,9 @@ def test_set_password_for_non_existent_user():
non_existent_user = _random_string()
fake_password = _random_string()
process_input = "{0}\n{1}".format(fake_password, admin_password).encode()
with pytest.raises(subprocess.CalledProcessError):
_call_action([
'set-user-password', non_existent_user, '--auth-user', admin_user
], input=process_input)
privileged.set_user_password(non_existent_user, fake_password,
admin_user, admin_password)
def test_rename_user():
@ -325,15 +282,15 @@ def test_rename_user():
_create_admin_if_does_not_exist()
old_username, password = _create_user(groups=['admin', _random_string()])
old_groups = _get_user_groups(old_username)
old_groups = privileged.get_user_groups(old_username)
new_username = _rename_user(old_username)
assert _try_login_to_ssh(new_username, password)
assert _try_login_to_ssh(old_username, password, returncode=5)
assert old_username not in _get_samba_users()
new_groups = _get_user_groups(new_username)
old_users_groups = _get_user_groups(old_username)
new_groups = privileged.get_user_groups(new_username)
old_users_groups = privileged.get_user_groups(old_username)
assert not old_users_groups # empty
assert old_groups == new_groups
@ -357,11 +314,12 @@ def test_delete_user():
username, password = _create_user(groups=[_random_string()])
_delete_user(username)
groups_after = _get_user_groups(username)
groups_after = privileged.get_user_groups(username)
assert not groups_after # User gets removed from all groups
# User account cannot be found after deletion
assert not _is_exit_zero(['ldapid', username])
with pytest.raises(subprocess.CalledProcessError):
subprocess.run(['ldapid', username], check=True)
# Deleted user cannot login to ssh
assert _try_login_to_ssh(username, password, returncode=5)
@ -373,7 +331,7 @@ def test_delete_non_existent_user():
"""Deleting a non-existent user should fail."""
non_existent_user = _random_string()
with pytest.raises(subprocess.CalledProcessError):
_call_action(['delete-user', non_existent_user])
privileged.remove_user(non_existent_user)
def test_groups():
@ -381,16 +339,18 @@ def test_groups():
groupname = _random_string()
_create_group(groupname)
assert _is_exit_zero(['ldapgid', groupname])
with pytest.raises(subprocess.CalledProcessError):
subprocess.run(['ldapgid', groupname], check=True)
# create-group is idempotent
assert _is_exit_zero([_action_file(), 'create-group', groupname])
privileged.create_group(groupname)
_delete_group(groupname)
assert not _is_exit_zero(['ldapgid', groupname])
privileged.remove_group(groupname)
with pytest.raises(subprocess.CalledProcessError):
subprocess.run(['ldapgid', groupname], check=True)
# delete-group is idempotent
assert _is_exit_zero([_action_file(), 'remove-group', groupname])
privileged.remove_group(groupname)
def test_delete_admin_group_fails():
@ -398,7 +358,8 @@ def test_delete_admin_group_fails():
groupname = 'admin'
_create_group('admin')
assert not _is_exit_zero([_action_file(), 'remove-group', groupname])
with pytest.raises(ValueError):
privileged.remove_group(groupname)
def test_user_group_interactions():
@ -408,48 +369,38 @@ def test_user_group_interactions():
group1 = _random_string()
user1, _ = _create_user(groups=[group1])
assert [group1] == _get_user_groups(user1)
assert [group1] == privileged.get_user_groups(user1)
# add-user-to-group is not idempotent
with pytest.raises(subprocess.CalledProcessError):
_call_action(
['add-user-to-group', '--auth-user', admin_user, user1, group1],
input=admin_password.encode())
privileged.add_user_to_group(user1, group1, admin_user, admin_password)
# The same user can be added to other new groups
group2 = _random_string()
_create_group(group2)
_call_action(
['add-user-to-group', '--auth-user', admin_user, user1, group2],
input=admin_password.encode())
privileged.add_user_to_group(user1, group2, admin_user, admin_password)
# Adding a user to a non-existent group creates the group
group3 = _random_string()
_call_action(
['add-user-to-group', '--auth-user', admin_user, user1, group3],
input=admin_password.encode())
privileged.add_user_to_group(user1, group3, admin_user, admin_password)
_cleanup_groups.add(group3)
# The expected groups got created and the user is part of them.
expected_groups = [group1, group2, group3]
assert expected_groups == _get_user_groups(user1)
assert expected_groups == privileged.get_user_groups(user1)
# Remove user from group
group_to_remove_from = random.choice(expected_groups)
_call_action([
'remove-user-from-group', '--auth-user', admin_user, user1,
group_to_remove_from
], input=admin_password.encode())
privileged.remove_user_from_group(user1, group_to_remove_from, admin_user,
admin_password)
# User is no longer in the group that they're removed from
expected_groups.remove(group_to_remove_from)
assert expected_groups == _get_user_groups(user1)
assert expected_groups == privileged.get_user_groups(user1)
# User cannot be removed from a group that they're not part of
random_group = _random_string()
_create_group(random_group)
with pytest.raises(subprocess.CalledProcessError):
_call_action([
'remove-user-from-group', '--auth-user', admin_user, user1,
random_group
], input=admin_password.encode())
privileged.remove_user_from_group(user1, random_group, admin_user,
admin_password)

View File

@ -31,18 +31,6 @@ def fixture_users_urls():
yield
def action_run(action, options, **kwargs):
"""Action return values."""
if action == 'users' and options == ['get-group-users', 'admin']:
return 'admin'
if action == 'ssh' and options[:2] == ['get-keys', '--username']:
return ''
if action == 'users' and options == ['get-user-groups', 'tester']:
return ''
return None
@pytest.fixture(autouse=True)
def module_patch():
"""Patch users module."""
@ -54,8 +42,20 @@ def module_patch():
UsersAndGroups('users-and-groups-minetest',
reserved_usernames=['debian-minetest'])
with patch('pwd.getpwall', return_value=pwd_users),\
patch('plinth.actions.superuser_run', side_effect=action_run):
privileged = 'plinth.modules.users.privileged'
with patch('pwd.getpwall', return_value=pwd_users), \
patch(f'{privileged}.create_user'), \
patch(f'{privileged}.add_user_to_group'), \
patch(f'{privileged}.set_user_password'), \
patch(f'{privileged}.set_user_status'), \
patch(f'{privileged}.rename_user'), \
patch(f'{privileged}.get_group_users') as get_group_users, \
patch('plinth.modules.ssh.privileged.set_keys'), \
patch('plinth.modules.ssh.privileged.get_keys') as get_keys, \
patch(f'{privileged}.get_user_groups') as get_user_groups:
get_group_users.return_value = ['admin']
get_keys.return_value = []
get_user_groups.return_value = []
yield

View File

@ -15,13 +15,12 @@ from django.views.generic.edit import (CreateView, DeleteView, FormView,
UpdateView)
import plinth.modules.ssh.privileged as ssh_privileged
from plinth import actions, translation
from plinth.errors import ActionError
from plinth import translation
from plinth.modules import first_boot
from plinth.utils import is_user_admin
from plinth.views import AppView
from . import get_last_admin_user
from . import get_last_admin_user, privileged
from .forms import (CreateUserForm, FirstBootForm, UserChangePasswordForm,
UserUpdateForm)
@ -59,6 +58,7 @@ class UserCreate(ContextMixin, SuccessMessageMixin, CreateView):
class UserList(AppView, ContextMixin, django.views.generic.ListView):
"""View to list users."""
model = User
template_name = 'users_list.html'
title = gettext_lazy('Users')
@ -72,6 +72,7 @@ class UserList(AppView, ContextMixin, django.views.generic.ListView):
class UserUpdate(ContextMixin, SuccessMessageMixin, UpdateView):
"""View to update a user's details."""
template_name = 'users_update.html'
model = User
form_class = UserUpdateForm
@ -101,7 +102,7 @@ class UserUpdate(ContextMixin, SuccessMessageMixin, UpdateView):
ssh_keys = ssh_privileged.get_keys(self.object.username)
initial['ssh_keys'] = ssh_keys.strip()
initial['language'] = self.object.userprofile.language
except ActionError:
except Exception:
pass
return initial
@ -129,6 +130,7 @@ class UserDelete(ContextMixin, DeleteView):
On GET, display a confirmation page.
On POST, delete the user.
"""
template_name = 'users_delete.html'
model = User
slug_field = 'username'
@ -136,6 +138,7 @@ class UserDelete(ContextMixin, DeleteView):
title = gettext_lazy('Delete User')
def __init__(self, *args, **kwargs):
"""Initialize view, override delete method."""
super().__init__(*args, **kwargs)
# Avoid a warning with Django 4.0 that delete member should not be
@ -149,9 +152,8 @@ class UserDelete(ContextMixin, DeleteView):
messages.success(self.request, message)
try:
actions.superuser_run('users',
['remove-user', self.kwargs['slug']])
except ActionError:
privileged.remove_user(self.kwargs['slug'])
except Exception:
messages.error(self.request, _('Deleting LDAP user failed.'))
def _delete(self, *args, **kwargs):
@ -177,6 +179,7 @@ class UserDelete(ContextMixin, DeleteView):
class UserChangePassword(ContextMixin, SuccessMessageMixin, FormView):
"""View to change user password."""
template_name = 'users_change_password.html'
form_class = UserChangePasswordForm
title = gettext_lazy('Change Password')
@ -216,8 +219,8 @@ class UserChangePassword(ContextMixin, SuccessMessageMixin, FormView):
class FirstBootView(django.views.generic.CreateView):
"""Create user account and log the user in."""
template_name = 'users_firstboot.html'
template_name = 'users_firstboot.html'
form_class = FirstBootForm
def dispatch(self, request, *args, **kwargs):
@ -231,15 +234,11 @@ class FirstBootView(django.views.generic.CreateView):
def get_context_data(self, *args, **kwargs):
"""Add admin users to context data."""
context = super().get_context_data(*args, **kwargs)
output = actions.superuser_run('users', ['get-group-users', 'admin'])
admin_users = output.strip().split('\n') if output.strip() else []
context['admin_users'] = admin_users
context['admin_users'] = privileged.get_group_users('admin')
return context
def get_form_kwargs(self):
"""Make request available to the form (to insert messages)"""
"""Make request available to the form (to insert messages)."""
kwargs = super().get_form_kwargs()
kwargs['request'] = self.request
return kwargs

View File

@ -109,33 +109,33 @@ def test_shortcut_list_web_apps_only(common_shortcuts):
assert return_list == [cuts[0], cuts[1], cuts[2]]
@patch('plinth.actions.superuser_run')
def test_shortcut_list_with_username(superuser_run, common_shortcuts):
@patch('plinth.modules.users.privileged.get_user_groups')
def test_shortcut_list_with_username(get_user_groups, common_shortcuts):
"""Test listing for particular users."""
cuts = common_shortcuts
return_list = Shortcut.list()
assert return_list == [cuts[0], cuts[1], cuts[2], cuts[3]]
superuser_run.return_value = 'admin'
get_user_groups.return_value = ['admin']
return_list = Shortcut.list(username='admin')
assert return_list == [cuts[0], cuts[1], cuts[2], cuts[3]]
superuser_run.return_value = 'group1'
get_user_groups.return_value = ['group1']
return_list = Shortcut.list(username='user1')
assert return_list == [cuts[0], cuts[1], cuts[3]]
superuser_run.return_value = 'group1\ngroup2'
get_user_groups.return_value = ['group1', 'group2']
return_list = Shortcut.list(username='user2')
assert return_list == [cuts[0], cuts[1], cuts[2], cuts[3]]
cut = Shortcut('group2-web-app-component-1', 'name5', 'short2', url='url4',
login_required=False, allowed_groups=['group3'])
superuser_run.return_value = 'group3'
get_user_groups.return_value = ['group3']
return_list = Shortcut.list(username='user3')
assert return_list == [cuts[0], cuts[3], cut]
superuser_run.return_value = 'group4'
get_user_groups.return_value = ['group4']
return_list = Shortcut.list(username='user4')
assert return_list == [cuts[0], cuts[3], cut]