mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-03-18 09:10:49 +00:00
users: Migrate ldap bash script into actions/users
- Also wrote unit tests in users/tests/test_actions Signed-off-by: Joseph Nuthalpati <njoseph@thoughtworks.com> Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: Sunil Mohan Adapa <sunil@medhas.org>
This commit is contained in:
parent
21494f5615
commit
396e086727
178
actions/ldap
178
actions/ldap
@ -1,178 +0,0 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
# This file is part of Plinth.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
# Store anything available from stdin.
|
||||
# This is used to receive passwords from Plinth.
|
||||
if read -t 0; then
|
||||
IFS= read -r input
|
||||
fi
|
||||
|
||||
set -e # Exit on failure
|
||||
|
||||
|
||||
create_user()
|
||||
{
|
||||
username="$1"
|
||||
password="$2"
|
||||
|
||||
# All users shall have 'users' (a group in /etc/group) as primary group.
|
||||
ldapadduser $username users > /dev/null
|
||||
|
||||
set_user_password $username $password
|
||||
|
||||
flush_cache
|
||||
}
|
||||
|
||||
|
||||
delete_user()
|
||||
{
|
||||
username="$1"
|
||||
|
||||
groups=$(get_user_groups $username)
|
||||
|
||||
ldapdeleteuser $username
|
||||
|
||||
while read -r group; do
|
||||
ldapdeleteuserfromgroup $username $group > /dev/null || true
|
||||
done <<< "$groups"
|
||||
|
||||
flush_cache
|
||||
}
|
||||
|
||||
|
||||
rename_user()
|
||||
{
|
||||
old_username="$1"
|
||||
new_username="$2"
|
||||
|
||||
groups=$(get_user_groups $old_username)
|
||||
|
||||
ldaprenameuser $old_username $new_username
|
||||
|
||||
while read -r group; do
|
||||
ldapdeleteuserfromgroup $old_username $group > /dev/null || true
|
||||
ldapaddusertogroup $new_username $group > /dev/null || true
|
||||
done <<< "$groups"
|
||||
|
||||
flush_cache
|
||||
}
|
||||
|
||||
|
||||
set_user_password()
|
||||
{
|
||||
username="$1"
|
||||
password=$(slappasswd -s "$2")
|
||||
|
||||
ldapsetpasswd "$username" "$password"
|
||||
}
|
||||
|
||||
|
||||
get_user_groups()
|
||||
{
|
||||
# Return only supplimentary groups and don't include the 'users'
|
||||
# primary group.
|
||||
username="$1"
|
||||
|
||||
ldapid $username | cut -f 3 -d ' ' | cut -d = -f 2 | sed 's+,+\n+g' | sed "s+.*(\(.*\))+\1+" | grep -v users || true
|
||||
}
|
||||
|
||||
|
||||
add_group()
|
||||
{
|
||||
groupname="$1"
|
||||
|
||||
ldapsearch -Q -L -L -L -Y EXTERNAL -H ldapi:/// -s base -b "cn=${groupname},dc=thisbox" || ldapaddgroup "${groupname}" > /dev/null 2>&1
|
||||
}
|
||||
|
||||
|
||||
remove_group()
|
||||
{
|
||||
groupname="$1"
|
||||
|
||||
ldapsearch -Q -L -L -L -Y EXTERNAL -H ldapi:/// -s base -b "cn=${groupname},dc=thisbox" && ldapdeletegroup "${groupname}" > /dev/null 2>&1
|
||||
}
|
||||
|
||||
|
||||
add_user_to_group()
|
||||
{
|
||||
username="$1"
|
||||
groupname="$2"
|
||||
|
||||
# Try to create group and ignore failure if group already exists
|
||||
add_group "${groupname}"
|
||||
|
||||
ldapaddusertogroup $username $groupname > /dev/null
|
||||
|
||||
flush_cache
|
||||
}
|
||||
|
||||
|
||||
remove_user_from_group()
|
||||
{
|
||||
username="$1"
|
||||
groupname="$2"
|
||||
|
||||
ldapdeleteuserfromgroup $username $groupname > /dev/null
|
||||
|
||||
flush_cache
|
||||
}
|
||||
|
||||
|
||||
flush_cache()
|
||||
{
|
||||
# Flush nscd cache
|
||||
nscd --invalidate=passwd
|
||||
nscd --invalidate=group
|
||||
}
|
||||
|
||||
|
||||
command=$1
|
||||
shift
|
||||
case $command in
|
||||
create-user)
|
||||
create_user "$1" "$input"
|
||||
;;
|
||||
delete-user)
|
||||
delete_user "$@"
|
||||
;;
|
||||
rename-user)
|
||||
rename_user "$@"
|
||||
;;
|
||||
set-user-password)
|
||||
set_user_password "$1" "$input"
|
||||
;;
|
||||
get-user-groups)
|
||||
get_user_groups "$@"
|
||||
;;
|
||||
add-user-to-group)
|
||||
add_user_to_group "$@"
|
||||
;;
|
||||
remove-user-from-group)
|
||||
remove_user_from_group "$@"
|
||||
;;
|
||||
add-group)
|
||||
add_group "$@"
|
||||
;;
|
||||
remove-group)
|
||||
remove_group "$@"
|
||||
;;
|
||||
*)
|
||||
echo "Invalid sub-command"
|
||||
exit -1
|
||||
;;
|
||||
esac
|
||||
245
actions/users
245
actions/users
@ -16,15 +16,16 @@
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
"""
|
||||
Configuration helper for the LDAP user directory
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import subprocess
|
||||
|
||||
import augeas
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from plinth import action_utils
|
||||
|
||||
ACCESS_CONF = '/etc/security/access.conf'
|
||||
@ -38,6 +39,51 @@ def parse_arguments():
|
||||
|
||||
subparsers.add_parser('setup', help='Setup LDAP')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'create-user', help='Create an LDAP user')
|
||||
subparser.add_argument('username', help='Name of the LDAP user to create')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'remove-user', help='Delete an LDAP user')
|
||||
subparser.add_argument('username', help='Name of the LDAP user to delete')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'rename-user', help='Rename an LDAP user')
|
||||
subparser.add_argument('oldusername', help='Old name of the LDAP user')
|
||||
subparser.add_argument('newusername', help='New name of the LDAP user')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'set-user-password', help='Set the password of an LDAP user')
|
||||
subparser.add_argument(
|
||||
'username', help='Name of the LDAP user to set the password for')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'create-group', help='Create an LDAP group')
|
||||
subparser.add_argument(
|
||||
'groupname', help='Name of the LDAP group to create')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'remove-group', help='Delete an LDAP group')
|
||||
subparser.add_argument(
|
||||
'groupname', help='Name of the LDAP group to delete')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'get-user-groups', help='Get all the LDAP groups for an LDAP user')
|
||||
subparser.add_argument(
|
||||
'username', help='LDAP user to retrieve the groups for')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'add-user-to-group', help='Add an LDAP user to an LDAP group')
|
||||
subparser.add_argument('username', help='LDAP user to add to group')
|
||||
subparser.add_argument('groupname', help='LDAP group to add the user to')
|
||||
|
||||
subparser = subparsers.add_parser(
|
||||
'remove-user-from-group',
|
||||
help='Remove an LDAP user from an LDAP group')
|
||||
subparser.add_argument('username', help='LDAP user to remove from group')
|
||||
subparser.add_argument(
|
||||
'groupname', help='LDAP group to remove the user from')
|
||||
|
||||
subparsers.required = True
|
||||
return parser.parse_args()
|
||||
|
||||
@ -57,10 +103,12 @@ def subcommand_setup(_):
|
||||
def configure_slapd():
|
||||
"""Configure LDAP authentication."""
|
||||
action_utils.dpkg_reconfigure('slapd', {'domain': 'thisbox'})
|
||||
action_utils.dpkg_reconfigure('nslcd', {'ldap-uris': 'ldapi:///',
|
||||
'ldap-base': 'dc=thisbox',
|
||||
'ldap-auth-type': 'SASL',
|
||||
'ldap-sasl-mech': 'EXTERNAL'})
|
||||
action_utils.dpkg_reconfigure('nslcd', {
|
||||
'ldap-uris': 'ldapi:///',
|
||||
'ldap-base': 'dc=thisbox',
|
||||
'ldap-auth-type': 'SASL',
|
||||
'ldap-sasl-mech': 'EXTERNAL'
|
||||
})
|
||||
action_utils.dpkg_reconfigure('libnss-ldapd',
|
||||
{'nsswitch': 'group, passwd, shadow'})
|
||||
action_utils.service_restart('nscd')
|
||||
@ -82,9 +130,12 @@ def create_organizational_unit(unit):
|
||||
distinguished_name = 'ou={unit},dc=thisbox'.format(unit=unit)
|
||||
try:
|
||||
subprocess.run(
|
||||
['ldapsearch', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///', '-s',
|
||||
'base', '-b', distinguished_name, '(objectclass=*)'],
|
||||
stdout=subprocess.DEVNULL, check=True)
|
||||
[
|
||||
'ldapsearch', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///', '-s',
|
||||
'base', '-b', distinguished_name, '(objectclass=*)'
|
||||
],
|
||||
stdout=subprocess.DEVNULL,
|
||||
check=True)
|
||||
return # Already exists
|
||||
except subprocess.CalledProcessError:
|
||||
input = '''
|
||||
@ -92,18 +143,23 @@ dn: ou={unit},dc=thisbox
|
||||
objectClass: top
|
||||
objectClass: organizationalUnit
|
||||
ou: {unit}'''.format(unit=unit)
|
||||
subprocess.run(['ldapadd', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'],
|
||||
input=input.encode(), stdout=subprocess.DEVNULL,
|
||||
check=True)
|
||||
subprocess.run(
|
||||
['ldapadd', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'],
|
||||
input=input.encode(),
|
||||
stdout=subprocess.DEVNULL,
|
||||
check=True)
|
||||
|
||||
|
||||
def setup_admin():
|
||||
"""Remove LDAP admin password and Allow root to modify the users."""
|
||||
process = subprocess.run(
|
||||
['ldapsearch', '-Q', '-L', '-L', '-L', '-Y', 'EXTERNAL', '-H',
|
||||
'ldapi:///', '-s', 'base', '-b', 'olcDatabase={1}mdb,cn=config',
|
||||
'(objectclass=*)', 'olcRootDN', 'olcRootPW'],
|
||||
check=True, stdout=subprocess.PIPE)
|
||||
[
|
||||
'ldapsearch', '-Q', '-L', '-L', '-L', '-Y', 'EXTERNAL', '-H',
|
||||
'ldapi:///', '-s', 'base', '-b', 'olcDatabase={1}mdb,cn=config',
|
||||
'(objectclass=*)', 'olcRootDN', 'olcRootPW'
|
||||
],
|
||||
check=True,
|
||||
stdout=subprocess.PIPE)
|
||||
ldap_object = {}
|
||||
for line in process.stdout.decode().splitlines():
|
||||
if line:
|
||||
@ -113,7 +169,9 @@ def setup_admin():
|
||||
if 'olcRootPW' in ldap_object:
|
||||
subprocess.run(
|
||||
['ldapmodify', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'],
|
||||
check=True, stdout=subprocess.DEVNULL, input=b'''
|
||||
check=True,
|
||||
stdout=subprocess.DEVNULL,
|
||||
input=b'''
|
||||
dn: olcDatabase={1}mdb,cn=config
|
||||
changetype: modify
|
||||
delete: olcRootPW''')
|
||||
@ -122,7 +180,9 @@ delete: olcRootPW''')
|
||||
if ldap_object['olcRootDN'] != root_dn:
|
||||
subprocess.run(
|
||||
['ldapmodify', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'],
|
||||
check=True, stdout=subprocess.DEVNULL, input=b'''
|
||||
check=True,
|
||||
stdout=subprocess.DEVNULL,
|
||||
input=b'''
|
||||
dn: olcDatabase={1}mdb,cn=config
|
||||
changetype: modify
|
||||
replace: olcRootDN
|
||||
@ -132,8 +192,8 @@ olcRootDN: gidNumber=0+uidNumber=0,cn=peercred,cn=external,cn=auth
|
||||
|
||||
def configure_ldapscripts():
|
||||
"""Set the configuration used by ldapscripts for later user management."""
|
||||
aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
|
||||
augeas.Augeas.NO_MODL_AUTOLOAD)
|
||||
aug = augeas.Augeas(
|
||||
flags=augeas.Augeas.NO_LOAD + augeas.Augeas.NO_MODL_AUTOLOAD)
|
||||
aug.set('/augeas/load/Shellvars/lens', 'Shellvars.lns')
|
||||
aug.set('/augeas/load/Shellvars/incl[last() + 1]', LDAPSCRIPTS_CONF)
|
||||
aug.load()
|
||||
@ -149,12 +209,155 @@ def configure_ldapscripts():
|
||||
aug.save()
|
||||
|
||||
|
||||
def read_password():
|
||||
"""Read the password from stdin."""
|
||||
return ''.join(sys.stdin)
|
||||
|
||||
|
||||
def subcommand_create_user(arguments):
|
||||
"""Create an LDAP user, set password and flush cache."""
|
||||
_run(['ldapadduser', arguments.username, 'users'])
|
||||
set_user_password(arguments.username, read_password())
|
||||
flush_cache()
|
||||
|
||||
|
||||
def subcommand_remove_user(arguments):
|
||||
"""Remove an LDAP user."""
|
||||
username = arguments.username
|
||||
groups = get_user_groups(username)
|
||||
|
||||
for group in groups:
|
||||
remove_user_from_group(username, group)
|
||||
|
||||
_run(['ldapdeleteuser', username])
|
||||
flush_cache()
|
||||
|
||||
|
||||
def subcommand_rename_user(arguments):
|
||||
"""Rename an LDAP user."""
|
||||
old_username = arguments.oldusername
|
||||
new_username = arguments.newusername
|
||||
groups = get_user_groups(old_username)
|
||||
|
||||
for group in groups:
|
||||
remove_user_from_group(old_username, group)
|
||||
|
||||
_run(['ldaprenameuser', old_username, new_username])
|
||||
|
||||
for group in groups:
|
||||
add_user_to_group(new_username, group)
|
||||
|
||||
flush_cache()
|
||||
|
||||
|
||||
def set_user_password(username, password):
|
||||
"""Set a user's password."""
|
||||
process = _run(
|
||||
['slappasswd', '-s', password], stdout=subprocess.PIPE)
|
||||
password = process.stdout.decode().strip()
|
||||
_run(['ldapsetpasswd', username, password])
|
||||
|
||||
|
||||
def subcommand_set_user_password(arguments):
|
||||
"""Set a user's password."""
|
||||
set_user_password(arguments.username, read_password())
|
||||
|
||||
|
||||
def get_user_groups(username):
|
||||
"""Returns only the supplementary groups of the given user and doesn't include
|
||||
the 'users' primary group"""
|
||||
process = _run(
|
||||
['ldapid', username], stdout=subprocess.PIPE, check=False)
|
||||
output = process.stdout.decode('utf-8').strip()
|
||||
if output:
|
||||
groups_part = output.split(' ')[2]
|
||||
groups = groups_part.split('=')[1]
|
||||
group_names = [user.strip('()')
|
||||
for user in re.findall('\(.*?\)', groups)]
|
||||
group_names.remove('users')
|
||||
return group_names
|
||||
return []
|
||||
|
||||
|
||||
def subcommand_get_user_groups(arguments):
|
||||
"""Return list of a given user's groups."""
|
||||
groups = [group
|
||||
for group in get_user_groups(arguments.username)]
|
||||
if groups:
|
||||
print(*groups, sep='\n')
|
||||
|
||||
|
||||
def group_exists(groupname):
|
||||
"""Return whether a group already exits."""
|
||||
process = _run(['ldapgid', groupname], check=False)
|
||||
return process.returncode == 0
|
||||
|
||||
|
||||
def create_group(groupname):
|
||||
"""Add an LDAP group."""
|
||||
if not group_exists(groupname):
|
||||
_run(['ldapaddgroup', groupname])
|
||||
|
||||
|
||||
def subcommand_create_group(arguments):
|
||||
"""Add an LDAP group."""
|
||||
create_group(arguments.groupname)
|
||||
flush_cache()
|
||||
|
||||
|
||||
def subcommand_remove_group(arguments):
|
||||
"""Remove an LDAP group."""
|
||||
if group_exists(arguments.groupname):
|
||||
_run(['ldapdeletegroup', arguments.groupname])
|
||||
|
||||
flush_cache()
|
||||
|
||||
|
||||
def add_user_to_group(username, groupname):
|
||||
"""Add an LDAP user to an LDAP group."""
|
||||
create_group(groupname)
|
||||
_run(['ldapaddusertogroup', username, groupname])
|
||||
|
||||
|
||||
def subcommand_add_user_to_group(arguments):
|
||||
"""Add an LDAP user to an LDAP group."""
|
||||
add_user_to_group(arguments.username, arguments.groupname)
|
||||
flush_cache()
|
||||
|
||||
|
||||
def remove_user_from_group(username, groupname):
|
||||
"""Remove an LDAP user from an LDAP group."""
|
||||
_run(
|
||||
['ldapdeleteuserfromgroup', username, groupname])
|
||||
|
||||
|
||||
def subcommand_remove_user_from_group(arguments):
|
||||
"""Remove an LDAP user from an LDAP group."""
|
||||
remove_user_from_group(arguments.username, arguments.groupname)
|
||||
flush_cache()
|
||||
|
||||
|
||||
def flush_cache():
|
||||
"""Flush nscd cache."""
|
||||
_run(['nscd', '--invalidate=passwd'])
|
||||
_run(['nscd', '--invalidate=group'])
|
||||
|
||||
|
||||
def _run(arguments, **kwargs):
|
||||
"""Run a command. Check return code and suppress output by default."""
|
||||
kwargs['stdout'] = kwargs.get('stdout', subprocess.DEVNULL)
|
||||
kwargs['stderr'] = kwargs.get('stderr', subprocess.DEVNULL)
|
||||
kwargs['check'] = kwargs.get('check', True)
|
||||
return subprocess.run(arguments, **kwargs)
|
||||
|
||||
|
||||
def main():
|
||||
"""Parse arguments and perform all duties"""
|
||||
arguments = parse_arguments()
|
||||
|
||||
subcommand = arguments.subcommand.replace('-', '_')
|
||||
subcommand_method = globals()['subcommand_' + subcommand]
|
||||
|
||||
subcommand_method(arguments)
|
||||
|
||||
|
||||
|
||||
@ -87,8 +87,8 @@ def _diagnose_ldap_entry(search_item):
|
||||
|
||||
|
||||
def add_group(group):
|
||||
actions.superuser_run("ldap", options=["add-group", group])
|
||||
actions.superuser_run("users", options=["add-group", group])
|
||||
|
||||
|
||||
def remove_group(group):
|
||||
actions.superuser_run("ldap", options=["remove-group", group])
|
||||
actions.superuser_run("users", options=["remove-group", group])
|
||||
|
||||
@ -102,7 +102,7 @@ class CreateUserForm(ValidNewUsernameCheckMixin, UserCreationForm):
|
||||
if commit:
|
||||
try:
|
||||
actions.superuser_run(
|
||||
'ldap',
|
||||
'users',
|
||||
['create-user', user.get_username()],
|
||||
input=self.cleaned_data['password1'].encode())
|
||||
except ActionError:
|
||||
@ -112,7 +112,7 @@ class CreateUserForm(ValidNewUsernameCheckMixin, UserCreationForm):
|
||||
for group in self.cleaned_data['groups']:
|
||||
try:
|
||||
actions.superuser_run(
|
||||
'ldap',
|
||||
'users',
|
||||
['add-user-to-group', user.get_username(), group])
|
||||
except ActionError:
|
||||
messages.error(
|
||||
@ -167,14 +167,14 @@ class UserUpdateForm(ValidNewUsernameCheckMixin, forms.ModelForm):
|
||||
|
||||
if commit:
|
||||
output = actions.superuser_run(
|
||||
'ldap', ['get-user-groups', self.username])
|
||||
'users', ['get-user-groups', self.username])
|
||||
old_groups = output.strip().split('\n')
|
||||
old_groups = [group for group in old_groups if group]
|
||||
|
||||
if self.username != user.get_username():
|
||||
try:
|
||||
actions.superuser_run(
|
||||
'ldap',
|
||||
'users',
|
||||
['rename-user', self.username, user.get_username()])
|
||||
except ActionError:
|
||||
messages.error(self.request,
|
||||
@ -185,7 +185,7 @@ class UserUpdateForm(ValidNewUsernameCheckMixin, forms.ModelForm):
|
||||
if old_group not in new_groups:
|
||||
try:
|
||||
actions.superuser_run(
|
||||
'ldap',
|
||||
'users',
|
||||
['remove-user-from-group', user.get_username(),
|
||||
old_group])
|
||||
except ActionError:
|
||||
@ -196,7 +196,7 @@ class UserUpdateForm(ValidNewUsernameCheckMixin, forms.ModelForm):
|
||||
if new_group not in old_groups:
|
||||
try:
|
||||
actions.superuser_run(
|
||||
'ldap',
|
||||
'users',
|
||||
['add-user-to-group', user.get_username(),
|
||||
new_group])
|
||||
except ActionError:
|
||||
@ -227,7 +227,7 @@ class UserChangePasswordForm(SetPasswordForm):
|
||||
if commit:
|
||||
try:
|
||||
actions.superuser_run(
|
||||
'ldap',
|
||||
'users',
|
||||
['set-user-password', user.get_username()],
|
||||
input=self.cleaned_data['new_password1'].encode())
|
||||
except ActionError:
|
||||
@ -253,7 +253,7 @@ class FirstBootForm(ValidNewUsernameCheckMixin, auth.forms.UserCreationForm):
|
||||
|
||||
try:
|
||||
actions.superuser_run(
|
||||
'ldap',
|
||||
'users',
|
||||
['create-user', user.get_username()],
|
||||
input=self.cleaned_data['password1'].encode())
|
||||
except ActionError:
|
||||
@ -262,7 +262,7 @@ class FirstBootForm(ValidNewUsernameCheckMixin, auth.forms.UserCreationForm):
|
||||
|
||||
try:
|
||||
actions.superuser_run(
|
||||
'ldap',
|
||||
'users',
|
||||
['add-user-to-group', user.get_username(), 'admin'])
|
||||
except ActionError:
|
||||
messages.error(self.request,
|
||||
|
||||
293
plinth/modules/users/tests/test_actions.py
Normal file
293
plinth/modules/users/tests/test_actions.py
Normal file
@ -0,0 +1,293 @@
|
||||
#!/usr/bin/python3
|
||||
#
|
||||
# This file is part of Plinth.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
"""
|
||||
Test module to exercise user actions.
|
||||
|
||||
it is recommended to run this module with root privileges in a virtual machine.
|
||||
"""
|
||||
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
import subprocess
|
||||
import unittest
|
||||
|
||||
from plinth import action_utils
|
||||
|
||||
euid = os.geteuid()
|
||||
|
||||
|
||||
def random_string(length=8):
|
||||
"""Return a random string created from lower case ascii."""
|
||||
return ''.join(
|
||||
[random.choice(string.ascii_lowercase) for _ in range(length)])
|
||||
|
||||
|
||||
def is_exit_zero(args):
|
||||
"""Return whether a command gave exit code zero"""
|
||||
process = subprocess.run(
|
||||
args,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
check=False)
|
||||
return process.returncode == 0
|
||||
|
||||
|
||||
def get_password_hash(username):
|
||||
"""Query and return the password hash of the given LDAP username"""
|
||||
query = [
|
||||
'ldapsearch', '-L', '-L', '-L', '-Y', 'EXTERNAL', '-H', 'ldapi:///',
|
||||
'-b', 'ou=users,dc=thisbox', '-Q', '(cn={})'.format(username),
|
||||
'userPassword'
|
||||
]
|
||||
process = subprocess.run(
|
||||
query, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=True)
|
||||
return process.stdout.decode().strip().split()[-1]
|
||||
|
||||
|
||||
def try_login_to_ssh(username, password, returncode=0):
|
||||
"""Return whether the sshpass returncode matches when trying to
|
||||
login to ssh using the given username and password"""
|
||||
if not action_utils.service_is_running('ssh'):
|
||||
return True
|
||||
|
||||
command = [
|
||||
'sshpass', '-p', password, 'ssh', '-o', 'UserKnownHostsFile=/dev/null',
|
||||
'-o', 'StrictHostKeyChecking=no', '-o', 'VerifyHostKeyDNS=no',
|
||||
username + '@127.0.0.1', '/bin/true'
|
||||
]
|
||||
process = subprocess.run(
|
||||
command,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
check=False)
|
||||
return process.returncode == returncode
|
||||
|
||||
|
||||
class TestActions(unittest.TestCase):
|
||||
"""Test user related actions."""
|
||||
|
||||
def setUp(self):
|
||||
"""Setup each ."""
|
||||
current_directory = os.path.dirname(__file__)
|
||||
self.action_file = os.path.join(current_directory, '..', '..', '..',
|
||||
'..', 'actions', 'users')
|
||||
self.users = set()
|
||||
self.groups = set()
|
||||
|
||||
def tearDown(self):
|
||||
for user in self.users:
|
||||
try:
|
||||
self.delete_user(user)
|
||||
except Exception:
|
||||
pass
|
||||
for group in self.groups:
|
||||
self.delete_group(group)
|
||||
|
||||
def call_action(self, arguments, **kwargs):
|
||||
"""Call the action script."""
|
||||
kwargs['stdout'] = kwargs.get('stdout', subprocess.DEVNULL)
|
||||
kwargs['stderr'] = kwargs.get('stderr', subprocess.DEVNULL)
|
||||
kwargs['check'] = kwargs.get('check', True)
|
||||
return subprocess.run([self.action_file] + arguments, **kwargs)
|
||||
|
||||
def create_user(self, username=None, groups=None):
|
||||
"""Call the action script for creating a new user."""
|
||||
username = username or random_string()
|
||||
password = random_string()
|
||||
|
||||
self.call_action(['create-user', username], input=password.encode())
|
||||
|
||||
if groups:
|
||||
for group in groups:
|
||||
self.call_action(['add-user-to-group', username, group])
|
||||
self.groups.add(group)
|
||||
|
||||
self.users.add(username)
|
||||
return username, password
|
||||
|
||||
def delete_user(self, username):
|
||||
"""Utility to delete an LDAP user"""
|
||||
self.call_action(['remove-user', username])
|
||||
|
||||
def rename_user(self, old_username, new_username=None):
|
||||
"""Rename a user."""
|
||||
new_username = new_username or random_string()
|
||||
self.call_action(['rename-user', old_username, new_username])
|
||||
self.users.remove(old_username)
|
||||
self.users.add(new_username)
|
||||
return new_username
|
||||
|
||||
def get_user_groups(self, username):
|
||||
"""Return the list of groups for a user."""
|
||||
process = self.call_action(
|
||||
['get-user-groups', username], stdout=subprocess.PIPE)
|
||||
return process.stdout.split()
|
||||
|
||||
def create_group(self, groupname=None):
|
||||
groupname = groupname or random_string()
|
||||
self.call_action(['create-group', groupname])
|
||||
self.groups.add(groupname)
|
||||
return groupname
|
||||
|
||||
def delete_group(self, groupname):
|
||||
self.call_action(['remove-group', groupname])
|
||||
|
||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
||||
def test_create_user(self):
|
||||
"""Test whether creating a new user works."""
|
||||
username, password = self.create_user(groups=[random_string()])
|
||||
# assert_can_login_to_console(username, password)
|
||||
self.assertTrue(try_login_to_ssh(username, password))
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
self.create_user(username)
|
||||
|
||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
||||
def test_change_user_password(self):
|
||||
username, old_password = self.create_user()
|
||||
old_password_hash = get_password_hash(username)
|
||||
new_password = 'pass $123'
|
||||
self.call_action(
|
||||
['set-user-password', username], input=new_password.encode())
|
||||
new_password_hash = get_password_hash(username)
|
||||
self.assertNotEqual(old_password_hash, new_password_hash)
|
||||
|
||||
# User can login to ssh using new password but not the old password.
|
||||
# sshpass gives a return code of 5 if the password is incorrect.
|
||||
self.assertTrue(try_login_to_ssh(username, old_password, returncode=5))
|
||||
self.assertTrue(try_login_to_ssh(username, new_password))
|
||||
|
||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
||||
def test_set_password_for_non_existent_user(self):
|
||||
non_existent_user = random_string()
|
||||
fake_password = random_string().encode()
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
self.call_action(
|
||||
['set-user-password', non_existent_user], input=fake_password)
|
||||
|
||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
||||
def test_rename_user(self):
|
||||
"""Test whether renaming a user works."""
|
||||
old_username, password = self.create_user(groups=[random_string()])
|
||||
old_groups = self.get_user_groups(old_username)
|
||||
|
||||
new_username = self.rename_user(old_username)
|
||||
self.assertTrue(try_login_to_ssh(new_username, password))
|
||||
self.assertTrue(try_login_to_ssh(old_username, password, returncode=5))
|
||||
|
||||
new_groups = self.get_user_groups(new_username)
|
||||
old_users_groups = self.get_user_groups(old_username)
|
||||
self.assertFalse(len(old_users_groups))
|
||||
self.assertEqual(old_groups, new_groups)
|
||||
|
||||
self.assertFalse(self.get_user_groups(old_username)) # is empty
|
||||
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
self.rename_user(old_username)
|
||||
|
||||
# Renaming a non-existent user fails
|
||||
random_username = random_string()
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
self.rename_user(random_username, new_username=random_string())
|
||||
|
||||
# Renaming to an existing user fails
|
||||
existing_user, _ = self.create_user()
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
self.rename_user(existing_user, new_username=new_username)
|
||||
|
||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
||||
def test_delete_user(self):
|
||||
"""Test to check whether LDAP users can be deleted"""
|
||||
username, password = self.create_user(groups=[random_string()])
|
||||
groups_before = self.get_user_groups(username)
|
||||
self.assertTrue(groups_before)
|
||||
self.delete_user(username)
|
||||
groups_after = self.get_user_groups(username)
|
||||
self.assertFalse(groups_after) # User gets removed from all groups
|
||||
|
||||
# User account cannot be found after deletion
|
||||
self.assertFalse(is_exit_zero(['ldapid', username]))
|
||||
|
||||
# Deleted user cannot login to ssh
|
||||
self.assertTrue(try_login_to_ssh(username, password, returncode=5))
|
||||
|
||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
||||
def test_delete_non_existent_user(self):
|
||||
"""Deleting a non-existent user should fail."""
|
||||
non_existent_user = random_string()
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
self.call_action(['delete-user', non_existent_user])
|
||||
|
||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
||||
def test_groups(self):
|
||||
"""Test to check that LDAP groups can be deleted"""
|
||||
groupname = random_string()
|
||||
|
||||
self.create_group(groupname)
|
||||
self.assertTrue(is_exit_zero(['ldapgid', groupname]))
|
||||
|
||||
# create-group is idempotent
|
||||
self.assertTrue(
|
||||
is_exit_zero([self.action_file, 'create-group', groupname]))
|
||||
|
||||
self.delete_group(groupname)
|
||||
self.assertFalse(is_exit_zero(['ldapgid', groupname]))
|
||||
|
||||
# delete-group is idempotent
|
||||
self.assertTrue(
|
||||
is_exit_zero([self.action_file, 'remove-group', groupname]))
|
||||
|
||||
@unittest.skipUnless(euid == 0, 'Needs to be root')
|
||||
def test_user_group_interactions(self):
|
||||
group1 = random_string()
|
||||
user1, _ = self.create_user(groups=[group1])
|
||||
|
||||
# add-user-to-group is not idempotent
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
self.call_action(['add-user-to-group', user1, group1])
|
||||
|
||||
# The same user can be added to other new groups
|
||||
group2 = random_string()
|
||||
self.create_group(group2)
|
||||
self.call_action(['add-user-to-group', user1, group2])
|
||||
|
||||
# Adding a user to a non-existent group creates the group
|
||||
group3 = random_string()
|
||||
self.call_action(['add-user-to-group', user1, group3])
|
||||
self.groups.add(group3)
|
||||
|
||||
# The expected groups got created and the user is part of them.
|
||||
expected_groups = [group1, group2, group3]
|
||||
actual_groups = [g.decode() for g in self.get_user_groups(user1)]
|
||||
self.assertEqual(expected_groups, actual_groups)
|
||||
|
||||
# Remove user from group
|
||||
group_to_remove_from = random.choice(expected_groups)
|
||||
self.call_action(
|
||||
['remove-user-from-group', user1, group_to_remove_from])
|
||||
|
||||
# User is no longer in the group that they're removed from
|
||||
expected_groups.remove(group_to_remove_from)
|
||||
actual_groups = [g.decode() for g in self.get_user_groups(user1)]
|
||||
self.assertEqual(expected_groups, actual_groups)
|
||||
|
||||
# User cannot be removed from a group that they're not part of
|
||||
random_group = random_string()
|
||||
self.create_group(random_group)
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
self.call_action(['remove-user-from-group', user1, random_group])
|
||||
@ -138,7 +138,7 @@ class UserDelete(ContextMixin, DeleteView):
|
||||
messages.success(self.request, message)
|
||||
|
||||
try:
|
||||
actions.superuser_run('ldap', ['delete-user', self.kwargs['slug']])
|
||||
actions.superuser_run('users', ['remove-user', self.kwargs['slug']])
|
||||
except ActionError:
|
||||
messages.error(self.request,
|
||||
_('Deleting LDAP user failed.'))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user