#!/usr/bin/python3 # SPDX-License-Identifier: AGPL-3.0-or-later """ Configuration helper for the LDAP user directory """ import argparse import logging import os import re import shutil import subprocess import sys import augeas from plinth import action_utils, utils INPUT_LINES = None ACCESS_CONF = '/etc/security/access.conf' LDAPSCRIPTS_CONF = '/etc/ldapscripts/freedombox-ldapscripts.conf' def parse_arguments(): """Return parsed command line arguments as dictionary""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') subparsers.add_parser('first-setup', help='Perform initial setup of LDAP') subparsers.add_parser('setup', help='Setup LDAP') subparser = subparsers.add_parser('create-user', help='Create an LDAP user') subparser.add_argument('username', help='Name of the LDAP user to create') subparser.add_argument('--auth-user', required=True) subparser = subparsers.add_parser('remove-user', help='Delete an LDAP user') subparser.add_argument( 'username', help='Name of the LDAP user to delete. If the username is ' 'the last admin user, a password should be provided through STDIN.') subparser = subparsers.add_parser('rename-user', help='Rename an LDAP user') subparser.add_argument('oldusername', help='Old name of the LDAP user') subparser.add_argument('newusername', help='New name of the LDAP user') subparser = subparsers.add_parser('set-user-password', help='Set the password of an LDAP user') subparser.add_argument( 'username', help='Name of the LDAP user to set the password for') subparser.add_argument('--auth-user', required=True) subparser = subparsers.add_parser('create-group', help='Create an LDAP group') subparser.add_argument('groupname', help='Name of the LDAP group to create') subparser = subparsers.add_parser('rename-group', help='Rename an LDAP group') subparser.add_argument('old_groupname', help='Name of the LDAP group to rename') subparser.add_argument('new_groupname', help='Name of the new LDAP group') subparser = subparsers.add_parser('remove-group', help='Delete an LDAP group') subparser.add_argument('groupname', help='Name of the LDAP group to delete') subparser = subparsers.add_parser( 'get-user-groups', help='Get all the LDAP groups for an LDAP user') subparser.add_argument('username', help='LDAP user to retrieve the groups for') subparser = subparsers.add_parser('add-user-to-group', help='Add an LDAP user to an LDAP group') subparser.add_argument('username', help='LDAP user to add to group') subparser.add_argument('groupname', help='LDAP group to add the user to') subparser.add_argument('--auth-user', required=False) subparser = subparsers.add_parser('set-user-status', help='Set user as active or inactive') subparser.add_argument('username', help='User to change status') subparser.add_argument('status', choices=['active', 'inactive'], help='New status of the user') subparser.add_argument('--auth-user', required=True) subparser = subparsers.add_parser( 'remove-user-from-group', help='Remove an LDAP user from an LDAP group') subparser.add_argument('username', help='LDAP user to remove from group') subparser.add_argument('groupname', help='LDAP group to remove the user from') subparser.add_argument('--auth-user', required=False) help_get_group_users = 'Get the list of all users in an LDAP group' subparser = subparsers.add_parser('get-group-users', help=help_get_group_users) subparser.add_argument( 'groupname', help='name of the LDAP group to get the ' 'list of users') subparsers.required = True return parser.parse_args() def validate_user(username, must_be_admin=True): """Validate a user.""" if must_be_admin: admins = get_admin_users() if not admins: # any user is valid return if not username: msg = 'Argument --auth-user is required' raise argparse.ArgumentTypeError(msg) if username not in admins: msg = '"{}" is not authorized to perform this action'.format( username) raise argparse.ArgumentTypeError(msg) if not username: msg = 'Argument --auth-user is required' raise argparse.ArgumentTypeError(msg) validate_password(username) def validate_password(username): """Raise an error if the user password is invalid.""" password = read_password(last=True) if not utils.is_authenticated_user(username, password): raise argparse.ArgumentTypeError("Invalid credentials") def subcommand_first_setup(_): """Perform initial setup of LDAP.""" # Avoid reconfiguration of slapd during module upgrades, because # this will move the existing database. # XXX: Instead of a separate action that is conditionally called for a # version number, we can check if the domain currently configured is what # we want and then based on the value do a reconfiguration. This approach # will work better when FreedomBox state is reset etc. action_utils.dpkg_reconfigure('slapd', {'domain': 'thisbox'}) def subcommand_setup(_): """Setup LDAP.""" # Update pam configs for access and mkhomedir. subprocess.run(['pam-auth-update', '--package'], check=True) configure_ldapscripts() configure_ldap_authentication() configure_ldap_structure() def configure_ldap_authentication(): """Configure LDAP authentication.""" action_utils.dpkg_reconfigure( 'nslcd', { 'ldap-uris': 'ldapi:///', 'ldap-base': 'dc=thisbox', 'ldap-auth-type': 'SASL', 'ldap-sasl-mech': 'EXTERNAL' }) action_utils.dpkg_reconfigure('libnss-ldapd', {'nsswitch': 'group, passwd, shadow'}) action_utils.service_restart('nscd') # XXX: Workaround for login issue action_utils.service_enable('slapd') action_utils.service_start('slapd') action_utils.service_enable('nslcd') action_utils.service_start('nslcd') def configure_ldap_structure(): """Configure LDAP basic structure.""" was_running = action_utils.service_is_running('slapd') if not was_running: action_utils.service_start('slapd') setup_admin() create_organizational_unit('users') create_organizational_unit('groups') def create_organizational_unit(unit): """Create an organizational unit in LDAP.""" distinguished_name = 'ou={unit},dc=thisbox'.format(unit=unit) try: subprocess.run([ 'ldapsearch', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///', '-s', 'base', '-b', distinguished_name, '(objectclass=*)' ], stdout=subprocess.DEVNULL, check=True) return # Already exists except subprocess.CalledProcessError: input = ''' dn: ou={unit},dc=thisbox objectClass: top objectClass: organizationalUnit ou: {unit}'''.format(unit=unit) subprocess.run(['ldapadd', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'], input=input.encode(), stdout=subprocess.DEVNULL, check=True) def setup_admin(): """Remove LDAP admin password and Allow root to modify the users.""" process = subprocess.run([ 'ldapsearch', '-Q', '-L', '-L', '-L', '-Y', 'EXTERNAL', '-H', 'ldapi:///', '-s', 'base', '-b', 'olcDatabase={1}mdb,cn=config', '(objectclass=*)', 'olcRootDN', 'olcRootPW' ], check=True, stdout=subprocess.PIPE) ldap_object = {} for line in process.stdout.decode().splitlines(): if line: line = line.split(':') ldap_object[line[0]] = line[1] if 'olcRootPW' in ldap_object: subprocess.run( ['ldapmodify', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'], check=True, stdout=subprocess.DEVNULL, input=b''' dn: olcDatabase={1}mdb,cn=config changetype: modify delete: olcRootPW''') root_dn = 'gidNumber=0+uidNumber=0,cn=peercred,cn=external,cn=auth' if ldap_object['olcRootDN'] != root_dn: subprocess.run( ['ldapmodify', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'], check=True, stdout=subprocess.DEVNULL, input=b''' dn: olcDatabase={1}mdb,cn=config changetype: modify replace: olcRootDN olcRootDN: gidNumber=0+uidNumber=0,cn=peercred,cn=external,cn=auth ''') def configure_ldapscripts(): """Set the configuration used by ldapscripts for later user management.""" # modify a copy of the config file shutil.copy('/etc/ldapscripts/ldapscripts.conf', LDAPSCRIPTS_CONF) aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD + augeas.Augeas.NO_MODL_AUTOLOAD) aug.set('/augeas/load/Shellvars/lens', 'Shellvars.lns') aug.set('/augeas/load/Shellvars/incl[last() + 1]', LDAPSCRIPTS_CONF) aug.load() # XXX: Password setting on users is disabled as changing passwords # using SASL Auth is not supported. aug.set('/files' + LDAPSCRIPTS_CONF + '/SERVER', '"ldapi://"') aug.set('/files' + LDAPSCRIPTS_CONF + '/SASLAUTH', '"EXTERNAL"') aug.set('/files' + LDAPSCRIPTS_CONF + '/SUFFIX', '"dc=thisbox"') aug.set('/files' + LDAPSCRIPTS_CONF + '/USUFFIX', '"ou=Users"') aug.set('/files' + LDAPSCRIPTS_CONF + '/GSUFFIX', '"ou=Groups"') aug.set('/files' + LDAPSCRIPTS_CONF + '/PASSWORDGEN', '"true"') aug.save() def get_samba_users(): """Get users from the Samba user database.""" # 'pdbedit -L' is better for listing users but is installed only with samba stdout = subprocess.check_output( ['tdbdump', '/var/lib/samba/private/passdb.tdb']).decode() return re.findall(r'USER_(.*)\\0', stdout) def delete_samba_user(username): """Delete a Samba user.""" if username in get_samba_users(): subprocess.check_call(['smbpasswd', '-x', username]) disconnect_samba_user(username) def disconnect_samba_user(username): """Disconnect a Samba user.""" try: subprocess.check_call(['pkill', '-U', username, 'smbd']) except subprocess.CalledProcessError as error: if error.returncode != 1: raise def get_input_lines(): """Return list of input lines from stdin.""" global INPUT_LINES if INPUT_LINES is None: INPUT_LINES = [line.strip() for line in sys.stdin] return INPUT_LINES def read_password(last=False): """Return the password. Set last=True to read password from last line of the input. """ line = -1 if last else 0 return get_input_lines()[line] def subcommand_create_user(arguments): """Create an LDAP user, set password and flush cache.""" username = arguments.username auth_user = arguments.auth_user validate_user(auth_user) _run(['ldapadduser', username, 'users']) password = read_password() set_user_password(username, password) flush_cache() set_samba_user(username, password) def subcommand_remove_user(arguments): """Remove an LDAP user.""" username = arguments.username groups = get_user_groups(username) # require authentication if the user is last admin user if get_group_users('admin') == [username]: validate_password(username) delete_samba_user(username) for group in groups: remove_user_from_group(username, group) _run(['ldapdeleteuser', username]) flush_cache() def subcommand_rename_user(arguments): """Rename an LDAP user.""" old_username = arguments.oldusername new_username = arguments.newusername groups = get_user_groups(old_username) delete_samba_user(old_username) for group in groups: remove_user_from_group(old_username, group) _run(['ldaprenameuser', old_username, new_username]) for group in groups: add_user_to_group(new_username, group) flush_cache() def set_user_password(username, password): """Set a user's password.""" process = _run(['slappasswd', '-s', password], stdout=subprocess.PIPE) password = process.stdout.decode().strip() _run(['ldapsetpasswd', username, password]) def set_samba_user(username, password): """Insert a user to the Samba database. If a user already exists, update password. """ proc = subprocess.run(['smbpasswd', '-a', '-s', username], input='{0}\n{0}\n'.format(password).encode(), stderr=subprocess.PIPE, check=False) if proc.returncode != 0: raise RuntimeError('Unable to add Samba user: ', proc.stderr) def subcommand_set_user_password(arguments): """Set a user's password.""" username = arguments.username auth_user = arguments.auth_user must_be_admin = username != auth_user validate_user(auth_user, must_be_admin=must_be_admin) password = read_password() set_user_password(username, password) set_samba_user(username, password) def get_admin_users(): """Returns list of members in the admin group. Raises an error if the slapd service is not running. """ admin_users = [] try: output = subprocess.check_output([ 'ldapsearch', '-LLL', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///', '-o', 'ldif-wrap=no', '-s', 'base', '-b', 'cn=admin,ou=groups,dc=thisbox', 'memberUid' ]).decode() except subprocess.CalledProcessError as error: if error.returncode == 32: # no entries found return [] raise for line in output.splitlines(): if line.startswith('memberUid: '): user = line.split('memberUid: ', 1)[1].strip() admin_users.append(user) return admin_users def get_group_users(groupname): """Returns list of members in the group.""" try: process = _run(['ldapgid', '-P', groupname], stdout=subprocess.PIPE) except subprocess.CalledProcessError: return [] # Group does not exist output = process.stdout.decode() # extract users from output, example: 'admin:*:10001:user1,user2' users = output.rsplit(':')[-1].strip().split(',') if users == ['']: return [] return users def get_user_groups(username): """Returns only the supplementary groups of the given user. Exclude the 'users' primary group from the returned list.""" process = _run(['ldapid', username], stdout=subprocess.PIPE, check=False) output = process.stdout.decode().strip() if output: groups_part = output.split(' ')[2] try: groups = groups_part.split('=')[1] except IndexError: logging.warning('Could not read groups for user %s: \n%s', username, output) return [] group_names = [ user.strip('()') for user in re.findall(r'\(.*?\)', groups) ] group_names.remove('users') return group_names logging.warning('User %s not found in LDAP', username) return [] def subcommand_get_user_groups(arguments): """Return list of a given user's groups.""" groups = get_user_groups(arguments.username) if groups: print(*groups, sep='\n') def group_exists(groupname): """Return whether a group already exits.""" process = _run(['ldapgid', groupname], check=False) return process.returncode == 0 def create_group(groupname): """Add an LDAP group.""" if not group_exists(groupname): _run(['ldapaddgroup', groupname]) def subcommand_create_group(arguments): """Add an LDAP group.""" create_group(arguments.groupname) flush_cache() def subcommand_rename_group(arguments): """Rename an LDAP group. Skip if the group to rename from doesn't exist. """ old_groupname = arguments.old_groupname new_groupname = arguments.new_groupname if old_groupname == 'admin' or new_groupname == 'admin': raise argparse.ArgumentTypeError('Can\'t rename the group "admin"') if group_exists(old_groupname): _run(['ldaprenamegroup', old_groupname, new_groupname]) flush_cache() def subcommand_remove_group(arguments): """Remove an LDAP group.""" groupname = arguments.groupname if groupname == 'admin': raise argparse.ArgumentTypeError("Can't remove the group 'admin'") if group_exists(groupname): _run(['ldapdeletegroup', groupname]) flush_cache() def add_user_to_group(username, groupname): """Add an LDAP user to an LDAP group.""" create_group(groupname) _run(['ldapaddusertogroup', username, groupname]) def subcommand_add_user_to_group(arguments): """Add an LDAP user to an LDAP group.""" groupname = arguments.groupname if groupname == 'admin': validate_user(arguments.auth_user) add_user_to_group(arguments.username, groupname) flush_cache() def remove_user_from_group(username, groupname): """Remove an LDAP user from an LDAP group.""" _run(['ldapdeleteuserfromgroup', username, groupname]) def subcommand_remove_user_from_group(arguments): """Remove an LDAP user from an LDAP group.""" username = arguments.username groupname = arguments.groupname if groupname == 'admin': validate_user(arguments.auth_user) remove_user_from_group(username, groupname) flush_cache() if groupname == 'freedombox-share': disconnect_samba_user(username) def subcommand_get_group_users(arguments): """Get the list of users of an LDAP group.""" for user in get_group_users(arguments.groupname): print(user) def subcommand_set_user_status(arguments): """Set the status of the user.""" username = arguments.username status = arguments.status auth_user = arguments.auth_user validate_user(auth_user) if status == 'active': flag = '-e' else: flag = '-d' if username in get_samba_users(): subprocess.check_call(['smbpasswd', flag, username]) if status == 'inactive': disconnect_samba_user(username) def flush_cache(): """Flush nscd and apache2 cache.""" _run(['nscd', '--invalidate=passwd']) _run(['nscd', '--invalidate=group']) action_utils.service_reload('apache2') def _run(arguments, check=True, **kwargs): """Run a command. Check return code and suppress output by default.""" env = dict(os.environ, LDAPSCRIPTS_CONF=LDAPSCRIPTS_CONF) kwargs['stdout'] = kwargs.get('stdout', subprocess.DEVNULL) kwargs['stderr'] = kwargs.get('stderr', subprocess.DEVNULL) return subprocess.run(arguments, env=env, check=check, **kwargs) def main(): """Parse arguments and perform all duties""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()