#!/usr/bin/python3 # SPDX-License-Identifier: AGPL-3.0-or-later """ Configuration helper for the LDAP user directory """ import argparse import os import re import shutil import subprocess import sys import augeas from plinth import action_utils ACCESS_CONF = '/etc/security/access.conf' LDAPSCRIPTS_CONF = '/etc/ldapscripts/freedombox-ldapscripts.conf' def parse_arguments(): """Return parsed command line arguments as dictionary""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') subparsers.add_parser('first-setup', help='Perform initial setup of LDAP') subparsers.add_parser('setup', help='Setup LDAP') subparser = subparsers.add_parser('create-user', help='Create an LDAP user') subparser.add_argument('username', help='Name of the LDAP user to create') subparser = subparsers.add_parser('remove-user', help='Delete an LDAP user') subparser.add_argument('username', help='Name of the LDAP user to delete') subparser = subparsers.add_parser('rename-user', help='Rename an LDAP user') subparser.add_argument('oldusername', help='Old name of the LDAP user') subparser.add_argument('newusername', help='New name of the LDAP user') subparser = subparsers.add_parser('set-user-password', help='Set the password of an LDAP user') subparser.add_argument( 'username', help='Name of the LDAP user to set the password for') subparser = subparsers.add_parser('create-group', help='Create an LDAP group') subparser.add_argument('groupname', help='Name of the LDAP group to create') subparser = subparsers.add_parser('remove-group', help='Delete an LDAP group') subparser.add_argument('groupname', help='Name of the LDAP group to delete') subparser = subparsers.add_parser( 'get-user-groups', help='Get all the LDAP groups for an LDAP user') subparser.add_argument('username', help='LDAP user to retrieve the groups for') subparser = subparsers.add_parser('add-user-to-group', help='Add an LDAP user to an LDAP group') subparser.add_argument('username', help='LDAP user to add to group') subparser.add_argument('groupname', help='LDAP group to add the user to') subparser = subparsers.add_parser('set-user-status', help='Set user as active or inactive') subparser.add_argument('username', help='User to change status') subparser.add_argument('status', choices=['active', 'inactive'], help='New status of the user') subparser = subparsers.add_parser( 'remove-user-from-group', help='Remove an LDAP user from an LDAP group') subparser.add_argument('username', help='LDAP user to remove from group') subparser.add_argument('groupname', help='LDAP group to remove the user from') help_get_group_users = 'Get the list of all users in an LDAP group' subparser = subparsers.add_parser('get-group-users', help=help_get_group_users) subparser.add_argument( 'groupname', help='name of the LDAP group to get the ' 'list of users') subparsers.required = True return parser.parse_args() def subcommand_first_setup(_): """Perform initial setup of LDAP.""" # Avoid reconfiguration of slapd during module upgrades, because # this will move the existing database. # XXX: Instead of a separate action that is conditionally called for a # version number, we can check if the domain currently configured is what # we want and then based on the value do a reconfiguration. This approach # will work better when FreedomBox state is reset etc. action_utils.dpkg_reconfigure('slapd', {'domain': 'thisbox'}) def subcommand_setup(_): """Setup LDAP.""" # Update pam configs for access and mkhomedir. subprocess.run(['pam-auth-update', '--package'], check=True) configure_ldapscripts() configure_ldap_authentication() configure_ldap_structure() def configure_ldap_authentication(): """Configure LDAP authentication.""" action_utils.dpkg_reconfigure( 'nslcd', { 'ldap-uris': 'ldapi:///', 'ldap-base': 'dc=thisbox', 'ldap-auth-type': 'SASL', 'ldap-sasl-mech': 'EXTERNAL' }) action_utils.dpkg_reconfigure('libnss-ldapd', {'nsswitch': 'group, passwd, shadow'}) action_utils.service_restart('nscd') # XXX: Workaround for login issue action_utils.service_enable('slapd') action_utils.service_start('slapd') action_utils.service_enable('nslcd') action_utils.service_start('nslcd') def configure_ldap_structure(): """Configure LDAP basic structure.""" was_running = action_utils.service_is_running('slapd') if not was_running: action_utils.service_start('slapd') setup_admin() create_organizational_unit('users') create_organizational_unit('groups') def create_organizational_unit(unit): """Create an organizational unit in LDAP.""" distinguished_name = 'ou={unit},dc=thisbox'.format(unit=unit) try: subprocess.run([ 'ldapsearch', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///', '-s', 'base', '-b', distinguished_name, '(objectclass=*)' ], stdout=subprocess.DEVNULL, check=True) return # Already exists except subprocess.CalledProcessError: input = ''' dn: ou={unit},dc=thisbox objectClass: top objectClass: organizationalUnit ou: {unit}'''.format(unit=unit) subprocess.run(['ldapadd', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'], input=input.encode(), stdout=subprocess.DEVNULL, check=True) def setup_admin(): """Remove LDAP admin password and Allow root to modify the users.""" process = subprocess.run([ 'ldapsearch', '-Q', '-L', '-L', '-L', '-Y', 'EXTERNAL', '-H', 'ldapi:///', '-s', 'base', '-b', 'olcDatabase={1}mdb,cn=config', '(objectclass=*)', 'olcRootDN', 'olcRootPW' ], check=True, stdout=subprocess.PIPE) ldap_object = {} for line in process.stdout.decode().splitlines(): if line: line = line.split(':') ldap_object[line[0]] = line[1] if 'olcRootPW' in ldap_object: subprocess.run( ['ldapmodify', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'], check=True, stdout=subprocess.DEVNULL, input=b''' dn: olcDatabase={1}mdb,cn=config changetype: modify delete: olcRootPW''') root_dn = 'gidNumber=0+uidNumber=0,cn=peercred,cn=external,cn=auth' if ldap_object['olcRootDN'] != root_dn: subprocess.run( ['ldapmodify', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'], check=True, stdout=subprocess.DEVNULL, input=b''' dn: olcDatabase={1}mdb,cn=config changetype: modify replace: olcRootDN olcRootDN: gidNumber=0+uidNumber=0,cn=peercred,cn=external,cn=auth ''') def configure_ldapscripts(): """Set the configuration used by ldapscripts for later user management.""" # modify a copy of the config file shutil.copy('/etc/ldapscripts/ldapscripts.conf', LDAPSCRIPTS_CONF) aug = augeas.Augeas( flags=augeas.Augeas.NO_LOAD + augeas.Augeas.NO_MODL_AUTOLOAD) aug.set('/augeas/load/Shellvars/lens', 'Shellvars.lns') aug.set('/augeas/load/Shellvars/incl[last() + 1]', LDAPSCRIPTS_CONF) aug.load() # XXX: Password setting on users is disabled as changing passwords # using SASL Auth is not supported. aug.set('/files' + LDAPSCRIPTS_CONF + '/SERVER', '"ldapi://"') aug.set('/files' + LDAPSCRIPTS_CONF + '/SASLAUTH', '"EXTERNAL"') aug.set('/files' + LDAPSCRIPTS_CONF + '/SUFFIX', '"dc=thisbox"') aug.set('/files' + LDAPSCRIPTS_CONF + '/USUFFIX', '"ou=Users"') aug.set('/files' + LDAPSCRIPTS_CONF + '/GSUFFIX', '"ou=Groups"') aug.set('/files' + LDAPSCRIPTS_CONF + '/PASSWORDGEN', '"true"') aug.save() def get_samba_users(): """Get users from the Samba user database.""" # 'pdbedit -L' is better for listing users but is installed only with samba stdout = subprocess.check_output( ['tdbdump', '/var/lib/samba/private/passdb.tdb']).decode() return re.findall(r'USER_(.*)\\0', stdout) def delete_samba_user(username): """Delete a Samba user.""" if username in get_samba_users(): subprocess.check_call(['smbpasswd', '-x', username]) disconnect_samba_user(username) def disconnect_samba_user(username): """Disconnect a Samba user.""" try: subprocess.check_call(['pkill', '-U', username, 'smbd']) except subprocess.CalledProcessError as error: if error.returncode != 1: raise def read_password(): """Read the password from stdin.""" return ''.join(sys.stdin) def subcommand_create_user(arguments): """Create an LDAP user, set password and flush cache.""" _run(['ldapadduser', arguments.username, 'users']) password = read_password() set_user_password(arguments.username, password) flush_cache() set_samba_user(arguments.username, password) def subcommand_remove_user(arguments): """Remove an LDAP user.""" username = arguments.username groups = get_user_groups(username) delete_samba_user(username) for group in groups: remove_user_from_group(username, group) _run(['ldapdeleteuser', username]) flush_cache() def subcommand_rename_user(arguments): """Rename an LDAP user.""" old_username = arguments.oldusername new_username = arguments.newusername groups = get_user_groups(old_username) delete_samba_user(old_username) for group in groups: remove_user_from_group(old_username, group) _run(['ldaprenameuser', old_username, new_username]) for group in groups: add_user_to_group(new_username, group) flush_cache() def set_user_password(username, password): """Set a user's password.""" process = _run(['slappasswd', '-s', password], stdout=subprocess.PIPE) password = process.stdout.decode().strip() _run(['ldapsetpasswd', username, password]) def set_samba_user(username, password): """Insert a user to the Samba database. If a user already exists, update password. """ proc = subprocess.Popen(['smbpasswd', '-a', '-s', username], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) _, stderr = proc.communicate(input='{0}\n{0}\n'.format(password).encode(), timeout=10) if proc.returncode != 0: raise RuntimeError('Unable to add Samba user: ', stderr) def subcommand_set_user_password(arguments): """Set a user's password.""" password = read_password() set_user_password(arguments.username, password) set_samba_user(arguments.username, password) def get_user_groups(username): """Returns only the supplementary groups of the given user. Exclude the 'users' primary group from the returned list.""" process = _run(['ldapid', username], stdout=subprocess.PIPE, check=False) output = process.stdout.decode().strip() if output: groups_part = output.split(' ')[2] groups = groups_part.split('=')[1] group_names = [ user.strip('()') for user in re.findall(r'\(.*?\)', groups) ] group_names.remove('users') return group_names return [] def subcommand_get_user_groups(arguments): """Return list of a given user's groups.""" groups = get_user_groups(arguments.username) if groups: print(*groups, sep='\n') def group_exists(groupname): """Return whether a group already exits.""" process = _run(['ldapgid', groupname], check=False) return process.returncode == 0 def create_group(groupname): """Add an LDAP group.""" if not group_exists(groupname): _run(['ldapaddgroup', groupname]) def subcommand_create_group(arguments): """Add an LDAP group.""" create_group(arguments.groupname) flush_cache() def subcommand_remove_group(arguments): """Remove an LDAP group.""" if group_exists(arguments.groupname): _run(['ldapdeletegroup', arguments.groupname]) flush_cache() def add_user_to_group(username, groupname): """Add an LDAP user to an LDAP group.""" create_group(groupname) _run(['ldapaddusertogroup', username, groupname]) def subcommand_add_user_to_group(arguments): """Add an LDAP user to an LDAP group.""" add_user_to_group(arguments.username, arguments.groupname) flush_cache() def remove_user_from_group(username, groupname): """Remove an LDAP user from an LDAP group.""" _run(['ldapdeleteuserfromgroup', username, groupname]) def subcommand_remove_user_from_group(arguments): """Remove an LDAP user from an LDAP group.""" remove_user_from_group(arguments.username, arguments.groupname) flush_cache() if arguments.groupname == 'freedombox-share': disconnect_samba_user(arguments.username) def subcommand_get_group_users(arguments): """Get the list of users of an LDAP group.""" try: process = _run(['ldapgid', '-P', arguments.groupname], stdout=subprocess.PIPE) except subprocess.CalledProcessError: return # Group does not exist, return empty list output = process.stdout.decode() users = output.rsplit(':')[-1] if users: for user in users.strip().split(','): print(user) def subcommand_set_user_status(arguments): """Set the status of the user.""" username = arguments.username status = arguments.status if status == 'active': flag = '-e' else: flag = '-d' if username in get_samba_users(): subprocess.check_call(['smbpasswd', flag, username]) if status == 'inactive': disconnect_samba_user(username) def flush_cache(): """Flush nscd and apache2 cache.""" _run(['nscd', '--invalidate=passwd']) _run(['nscd', '--invalidate=group']) action_utils.service_reload('apache2') def _run(arguments, **kwargs): """Run a command. Check return code and suppress output by default.""" env = dict(os.environ, LDAPSCRIPTS_CONF=LDAPSCRIPTS_CONF) kwargs['stdout'] = kwargs.get('stdout', subprocess.DEVNULL) kwargs['stderr'] = kwargs.get('stderr', subprocess.DEVNULL) kwargs['check'] = kwargs.get('check', True) return subprocess.run(arguments, env=env, **kwargs) def main(): """Parse arguments and perform all duties""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()