mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-05-27 10:44:33 +00:00
users: Inactivate users in LDAP user database
Previously, users were inactivated only in plinth users database. This change
adds ability to inactivate users in LDAP database.
Changes:
- Inactive users in plinth users database are also inactivated in LDAP
during app upgrade.
- Inactivated users can't login using LDAP password.
- Apache2 single-sign-on module now requires LDAP connection. SSO
sessions are now invalidated when users are inactivated.
- PAM/nslcd now performs authorization checks against LDAP, which means
inactivated users can't do passwordless ssh logins and running their
crontabs are blocked.
- When inactivating a user, all user's processes are killed.
Also, update LDAP diagnostics:
- Fix LDAP checks returned always passed results.
- Fix `ou=people` entry doesn't exist in LDAP.
- Add diagnostics checks for `ou=policies` and `cn=DefaultPPolicy`.
Tests performed:
- App upgrade works.
- App upgrade with previously disabled user works, user is inactivated
also in LDAP.
- App upgrade with disabled user that doesn't exists in LDAP database works.
- Increment app version again, to 7, app upgrade works second time.
- Inactivate user and test logins:
- can't login using direct LDAP (nextcloud, ejabberd, matrixsynapse)
- can't login using Apache2 LDAP module (gitweb, ikiwiki, rssbridge,
transmission)
- can't login using apache sso module (featherwiki, gitweb, rssbridge,
sharing, syncthing, tiddlywiki, transmission, wordpress).
- can't login using ssh with password or passwordless
- Inactivate user and test exsisting sessions:
- ssh, cockpit and samba sessions are killed.
- Configure crontab, configured crontab is failing to run after user
is inactivated.
- All the users app tests pass.
Notes:
- Only Apache2 SSO sessions are disabled. Apps that create their own
sessions keep working, like nextcloud, ejabberd, matrix-synapse,
ikiwiki. In the future, we could add a feature that apps can implement
their own users locking functions.
- When testing inactivated users, users and IP-s can be banned by the system,
banned IP-s/users can be viewed with commands `fail2ban-client banned` and
`pam_abl`.
- Existing sessions keep working when deleting a user or removing
a user from an access group.
- I didn't test e-mail app.
Signed-off-by: Veiko Aasa <veiko17@disroot.org>
This commit is contained in:
parent
da02b464a9
commit
67b6c0f9e8
@ -1,15 +1,21 @@
|
|||||||
<IfModule mod_auth_pubtkt.c>
|
<RequireALL>
|
||||||
TKTAuthPublicKey /etc/apache2/auth-pubtkt-keys/pubkey.pem
|
<IfModule mod_auth_pubtkt.c>
|
||||||
TKTAuthLoginURL /plinth/accounts/sso/login/
|
TKTAuthPublicKey /etc/apache2/auth-pubtkt-keys/pubkey.pem
|
||||||
TKTAuthBackArgName next
|
TKTAuthLoginURL /plinth/accounts/sso/login/
|
||||||
TKTAuthDigest SHA512
|
TKTAuthBackArgName next
|
||||||
TKTAuthRefreshURL /plinth/accounts/sso/refresh/
|
TKTAuthDigest SHA512
|
||||||
TKTAuthUnauthURL /plinth
|
TKTAuthRefreshURL /plinth/accounts/sso/refresh/
|
||||||
AuthType mod_auth_pubtkt
|
TKTAuthUnauthURL /plinth
|
||||||
AuthName "FreedomBox Single Sign On"
|
AuthType mod_auth_pubtkt
|
||||||
Require valid-user
|
AuthName "FreedomBox Single Sign On"
|
||||||
</IfModule>
|
Require valid-user
|
||||||
|
</IfModule>
|
||||||
|
|
||||||
<IfModule !mod_auth_pubtkt.c>
|
<IfModule !mod_auth_pubtkt.c>
|
||||||
Require all denied
|
Require all denied
|
||||||
</IfModule>
|
</IfModule>
|
||||||
|
|
||||||
|
# Require that LDAP account is not locked
|
||||||
|
AuthLDAPUrl "ldap:///ou=users,dc=thisbox?uid"
|
||||||
|
Require not ldap-attribute pwdAccountLockedTime="000001010000Z"
|
||||||
|
</RequireAll>
|
||||||
|
|||||||
@ -92,14 +92,15 @@ class UsersApp(app_module.App):
|
|||||||
results = super().diagnose()
|
results = super().diagnose()
|
||||||
|
|
||||||
results.append(_diagnose_ldap_entry('dc=thisbox'))
|
results.append(_diagnose_ldap_entry('dc=thisbox'))
|
||||||
results.append(_diagnose_ldap_entry('ou=people'))
|
results.append(_diagnose_ldap_entry('ou=users'))
|
||||||
results.append(_diagnose_ldap_entry('ou=groups'))
|
results.append(_diagnose_ldap_entry('ou=groups'))
|
||||||
|
results.append(_diagnose_ldap_entry('ou=policies'))
|
||||||
|
results.append(_diagnose_ldap_entry('cn=DefaultPPolicy'))
|
||||||
|
|
||||||
config = privileged.get_nslcd_config()
|
config = privileged.get_nslcd_config()
|
||||||
results.append(_diagnose_nslcd_config(config, 'uri', 'ldapi:///'))
|
results.append(_diagnose_nslcd_config(config, 'uri', 'ldapi:///'))
|
||||||
results.append(_diagnose_nslcd_config(config, 'base', 'dc=thisbox'))
|
results.append(_diagnose_nslcd_config(config, 'base', 'dc=thisbox'))
|
||||||
results.append(_diagnose_nslcd_config(config, 'sasl_mech', 'EXTERNAL'))
|
results.append(_diagnose_nslcd_config(config, 'sasl_mech', 'EXTERNAL'))
|
||||||
|
|
||||||
results.extend(_diagnose_nsswitch_config())
|
results.extend(_diagnose_nsswitch_config())
|
||||||
|
|
||||||
return results
|
return results
|
||||||
@ -110,6 +111,12 @@ class UsersApp(app_module.App):
|
|||||||
if not old_version:
|
if not old_version:
|
||||||
privileged.first_setup()
|
privileged.first_setup()
|
||||||
|
|
||||||
|
if old_version and old_version < 6:
|
||||||
|
# Setup password policy and lock LDAP passwords for inactive users.
|
||||||
|
inactivated_users = _get_inactivated_users()
|
||||||
|
if inactivated_users:
|
||||||
|
privileged.setup_and_sync_user_states(inactivated_users)
|
||||||
|
|
||||||
privileged.setup()
|
privileged.setup()
|
||||||
privileged.create_group('freedombox-share')
|
privileged.create_group('freedombox-share')
|
||||||
|
|
||||||
@ -120,9 +127,10 @@ def _diagnose_ldap_entry(search_item: str) -> DiagnosticCheck:
|
|||||||
result = Result.FAILED
|
result = Result.FAILED
|
||||||
|
|
||||||
try:
|
try:
|
||||||
subprocess.check_output(
|
output = subprocess.check_output(
|
||||||
['ldapsearch', '-x', '-b', 'dc=thisbox', search_item])
|
['ldapsearch', '-LLL', '-x', '-b', 'dc=thisbox', search_item])
|
||||||
result = Result.PASSED
|
if search_item in output.decode():
|
||||||
|
result = Result.PASSED
|
||||||
except subprocess.CalledProcessError:
|
except subprocess.CalledProcessError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -189,6 +197,14 @@ def get_last_admin_user():
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_inactivated_users() -> list[str]:
|
||||||
|
"""Get list of inactivated usernames"""
|
||||||
|
from django.contrib.auth.models import User
|
||||||
|
users = User.objects.filter(is_active=False)
|
||||||
|
|
||||||
|
return [user.username for user in users]
|
||||||
|
|
||||||
|
|
||||||
def add_user_to_share_group(username, service=None):
|
def add_user_to_share_group(username, service=None):
|
||||||
"""Add user to the freedombox-share group."""
|
"""Add user to the freedombox-share group."""
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -70,6 +70,16 @@ def setup():
|
|||||||
_configure_ldap_structure()
|
_configure_ldap_structure()
|
||||||
|
|
||||||
|
|
||||||
|
@privileged
|
||||||
|
def setup_and_sync_user_states(inactivated_users: list[str]):
|
||||||
|
"""Setup password policy and inactivate users."""
|
||||||
|
_create_organizational_unit('policies')
|
||||||
|
first_ppolicy_setup = _setup_ldap_ppolicy()
|
||||||
|
|
||||||
|
if first_ppolicy_setup and inactivated_users:
|
||||||
|
_upgrade_inactivate_users(inactivated_users)
|
||||||
|
|
||||||
|
|
||||||
def _configure_ldap_authentication():
|
def _configure_ldap_authentication():
|
||||||
"""Configure LDAP authentication."""
|
"""Configure LDAP authentication."""
|
||||||
action_utils.dpkg_reconfigure(
|
action_utils.dpkg_reconfigure(
|
||||||
@ -79,6 +89,18 @@ def _configure_ldap_authentication():
|
|||||||
'ldap-auth-type': 'SASL',
|
'ldap-auth-type': 'SASL',
|
||||||
'ldap-sasl-mech': 'EXTERNAL'
|
'ldap-sasl-mech': 'EXTERNAL'
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# Set nslcd authorization filter for user locking
|
||||||
|
authorization_filter = ('(&(objectClass=posixAccount)(uid=$username)'
|
||||||
|
'(!(pwdAccountLockedTime=000001010000Z)))')
|
||||||
|
aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
|
||||||
|
augeas.Augeas.NO_MODL_AUTOLOAD)
|
||||||
|
aug.set('/augeas/load/Nslcd/lens', 'Nslcd.lns')
|
||||||
|
aug.set('/augeas/load/Nslcd/incl[last() + 1]', '/etc/nslcd.conf')
|
||||||
|
aug.load()
|
||||||
|
aug.set('/files/etc/nslcd.conf/pam_authz_search', authorization_filter)
|
||||||
|
aug.save()
|
||||||
|
|
||||||
action_utils.dpkg_reconfigure('libnss-ldapd',
|
action_utils.dpkg_reconfigure('libnss-ldapd',
|
||||||
{'nsswitch': 'group, passwd, shadow'})
|
{'nsswitch': 'group, passwd, shadow'})
|
||||||
|
|
||||||
@ -107,6 +129,8 @@ def _configure_ldap_structure():
|
|||||||
_setup_admin()
|
_setup_admin()
|
||||||
_create_organizational_unit('users')
|
_create_organizational_unit('users')
|
||||||
_create_organizational_unit('groups')
|
_create_organizational_unit('groups')
|
||||||
|
_create_organizational_unit('policies')
|
||||||
|
_setup_ldap_ppolicy()
|
||||||
|
|
||||||
|
|
||||||
def _create_organizational_unit(unit):
|
def _create_organizational_unit(unit):
|
||||||
@ -162,6 +186,66 @@ olcRootDN: gidNumber=0+uidNumber=0,cn=peercred,cn=external,cn=auth
|
|||||||
''')
|
''')
|
||||||
|
|
||||||
|
|
||||||
|
def _setup_ldap_ppolicy() -> bool:
|
||||||
|
"""Setup default password policy for user accounts.
|
||||||
|
|
||||||
|
The default password policy makes passwords lockable. Users who have
|
||||||
|
the LDAP operational attribute pwdAccountLockedTime=000001010000Z can't
|
||||||
|
login with password.
|
||||||
|
|
||||||
|
Returns whether it was the first run that enables this policy.
|
||||||
|
"""
|
||||||
|
# Load ppolicy module
|
||||||
|
try:
|
||||||
|
subprocess.run(
|
||||||
|
['ldapmodify', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'],
|
||||||
|
check=True, stdout=subprocess.DEVNULL, input=b'''
|
||||||
|
dn: cn=module{0},cn=config
|
||||||
|
changetype: modify
|
||||||
|
add: olcModuleLoad
|
||||||
|
olcModuleLoad: ppolicy''')
|
||||||
|
except subprocess.CalledProcessError as error:
|
||||||
|
if error.returncode == 20: # Value already exists
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Set up default password policy
|
||||||
|
try:
|
||||||
|
subprocess.run(['ldapadd', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'],
|
||||||
|
check=True, stdout=subprocess.DEVNULL, input=b'''
|
||||||
|
dn: cn=DefaultPPolicy,ou=policies,dc=thisbox
|
||||||
|
cn: DefaultPPolicy
|
||||||
|
objectClass: pwdPolicy
|
||||||
|
objectClass: device
|
||||||
|
objectClass: top
|
||||||
|
pwdAttribute: userPassword
|
||||||
|
pwdLockout: TRUE''')
|
||||||
|
except subprocess.CalledProcessError as error:
|
||||||
|
if error.returncode == 68: # Value already exists
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Make DefaultPPolicy as a default ppolicy overlay
|
||||||
|
try:
|
||||||
|
subprocess.run(['ldapadd', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'],
|
||||||
|
check=True, stdout=subprocess.DEVNULL, input=b'''
|
||||||
|
dn: olcOverlay={0}ppolicy,olcDatabase={1}mdb,cn=config
|
||||||
|
objectClass: olcOverlayConfig
|
||||||
|
objectClass: olcPPolicyConfig
|
||||||
|
olcOverlay: {0}ppolicy
|
||||||
|
olcPPolicyDefault: cn=DefaultPPolicy,ou=policies,dc=thisbox
|
||||||
|
''')
|
||||||
|
except subprocess.CalledProcessError as error:
|
||||||
|
if error.returncode == 80: # Value already in list
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def _configure_ldapscripts():
|
def _configure_ldapscripts():
|
||||||
"""Set the configuration used by ldapscripts for later user management."""
|
"""Set the configuration used by ldapscripts for later user management."""
|
||||||
# modify a copy of the config file
|
# modify a copy of the config file
|
||||||
@ -185,6 +269,33 @@ def _configure_ldapscripts():
|
|||||||
aug.save()
|
aug.save()
|
||||||
|
|
||||||
|
|
||||||
|
def _lock_ldap_user(username: str):
|
||||||
|
"""Lock user."""
|
||||||
|
if not _get_user_ids(username):
|
||||||
|
# User not found
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Replace command adds the attribute if it doesn't exist.
|
||||||
|
input = '''changetype: modify
|
||||||
|
replace: pwdAccountLockedTime
|
||||||
|
pwdAccountLockedTime: 000001010000Z
|
||||||
|
'''
|
||||||
|
_run(["ldapmodifyuser", username], input=input.encode())
|
||||||
|
|
||||||
|
|
||||||
|
def _unlock_ldap_user(username: str):
|
||||||
|
"""Unlock user."""
|
||||||
|
if not _get_user_ids(username):
|
||||||
|
# User not found
|
||||||
|
return None
|
||||||
|
# Replace command without providing a value will remove the attribute
|
||||||
|
# and ignores when the attribute doesn't exist.
|
||||||
|
input = '''changetype: modify
|
||||||
|
replace: pwdAccountLockedTime
|
||||||
|
'''
|
||||||
|
_run(["ldapmodifyuser", username], input=input.encode())
|
||||||
|
|
||||||
|
|
||||||
@privileged
|
@privileged
|
||||||
def get_nslcd_config() -> dict[str, str]:
|
def get_nslcd_config() -> dict[str, str]:
|
||||||
"""Get nslcd configuration for diagnostics."""
|
"""Get nslcd configuration for diagnostics."""
|
||||||
@ -333,6 +444,19 @@ def _get_admin_users():
|
|||||||
return admin_users
|
return admin_users
|
||||||
|
|
||||||
|
|
||||||
|
def _get_user_ids(username: str) -> str | None:
|
||||||
|
"""Get user information in format like `id` command."""
|
||||||
|
try:
|
||||||
|
process = _run(['ldapid', username], stdout=subprocess.PIPE)
|
||||||
|
except subprocess.CalledProcessError as error:
|
||||||
|
if error.returncode == 1:
|
||||||
|
# User doesn't exist
|
||||||
|
return None
|
||||||
|
raise
|
||||||
|
|
||||||
|
return process.stdout.decode().strip()
|
||||||
|
|
||||||
|
|
||||||
def _get_group_users(groupname):
|
def _get_group_users(groupname):
|
||||||
"""Return list of members in the group."""
|
"""Return list of members in the group."""
|
||||||
try:
|
try:
|
||||||
@ -353,8 +477,7 @@ def _get_user_groups(username):
|
|||||||
|
|
||||||
Exclude the 'users' primary group from the returned list.
|
Exclude the 'users' primary group from the returned list.
|
||||||
"""
|
"""
|
||||||
process = _run(['ldapid', username], stdout=subprocess.PIPE, check=False)
|
output = _get_user_ids(username)
|
||||||
output = process.stdout.decode().strip()
|
|
||||||
if output:
|
if output:
|
||||||
groups_part = output.split(' ')[2]
|
groups_part = output.split(' ')[2]
|
||||||
try:
|
try:
|
||||||
@ -476,14 +599,33 @@ def set_user_status(username: str, status: str, auth_user: str,
|
|||||||
_validate_user(auth_user, auth_password)
|
_validate_user(auth_user, auth_password)
|
||||||
|
|
||||||
if status == 'active':
|
if status == 'active':
|
||||||
flag = '-e'
|
_unlock_ldap_user(username)
|
||||||
|
smbpasswd_flag = '-e'
|
||||||
else:
|
else:
|
||||||
flag = '-d'
|
_lock_ldap_user(username)
|
||||||
|
smbpasswd_flag = '-d'
|
||||||
|
|
||||||
|
# Set user status in Samba password database
|
||||||
if username in _get_samba_users():
|
if username in _get_samba_users():
|
||||||
subprocess.check_call(['smbpasswd', flag, username])
|
subprocess.check_call(['smbpasswd', smbpasswd_flag, username])
|
||||||
if status == 'inactive':
|
|
||||||
_disconnect_samba_user(username)
|
_flush_cache()
|
||||||
|
|
||||||
|
if status == 'inactive':
|
||||||
|
# Kill all user processes. This includes disconnectiong ssh, samba and
|
||||||
|
# cockpit sessions.
|
||||||
|
subprocess.run(['pkill', "--signal", "KILL", '--uid', username])
|
||||||
|
|
||||||
|
|
||||||
|
def _upgrade_inactivate_users(usernames: list[str]):
|
||||||
|
"""Inactivate users in LDAP."""
|
||||||
|
for username in usernames:
|
||||||
|
_lock_ldap_user(username)
|
||||||
|
|
||||||
|
_flush_cache()
|
||||||
|
|
||||||
|
for username in usernames:
|
||||||
|
subprocess.run(['pkill', "--signal", "KILL", '--uid', username])
|
||||||
|
|
||||||
|
|
||||||
def _flush_cache():
|
def _flush_cache():
|
||||||
|
|||||||
@ -117,7 +117,6 @@ def test_users_can_connect_passwordless_over_ssh(session_browser,
|
|||||||
"""Test that users can connect passwordless over ssh if the keys are
|
"""Test that users can connect passwordless over ssh if the keys are
|
||||||
set."""
|
set."""
|
||||||
functional.app_enable(session_browser, 'ssh')
|
functional.app_enable(session_browser, 'ssh')
|
||||||
_generate_ssh_keys(session_browser, tmp_path_factory)
|
|
||||||
_configure_ssh_keys(session_browser, tmp_path_factory)
|
_configure_ssh_keys(session_browser, tmp_path_factory)
|
||||||
_should_connect_passwordless_over_ssh(session_browser, tmp_path_factory)
|
_should_connect_passwordless_over_ssh(session_browser, tmp_path_factory)
|
||||||
|
|
||||||
@ -127,7 +126,6 @@ def test_users_cannot_connect_passwordless_over_ssh(session_browser,
|
|||||||
"""Test that users cannot connect passwordless over ssh if the keys aren't
|
"""Test that users cannot connect passwordless over ssh if the keys aren't
|
||||||
set."""
|
set."""
|
||||||
functional.app_enable(session_browser, 'ssh')
|
functional.app_enable(session_browser, 'ssh')
|
||||||
_generate_ssh_keys(session_browser, tmp_path_factory)
|
|
||||||
_configure_ssh_keys(session_browser, tmp_path_factory)
|
_configure_ssh_keys(session_browser, tmp_path_factory)
|
||||||
_set_ssh_keys(session_browser, '')
|
_set_ssh_keys(session_browser, '')
|
||||||
_should_not_connect_passwordless_over_ssh(session_browser,
|
_should_not_connect_passwordless_over_ssh(session_browser,
|
||||||
@ -152,11 +150,29 @@ def test_change_language(session_browser, language_code):
|
|||||||
assert _check_language(session_browser, language_code)
|
assert _check_language(session_browser, language_code)
|
||||||
|
|
||||||
|
|
||||||
def test_admin_users_can_set_others_as_inactive(session_browser):
|
def test_user_states(session_browser, tmp_path_factory):
|
||||||
"""Test that admin users can set other users as inactive."""
|
"""Test that admin users can set other users as inactive/active."""
|
||||||
_non_admin_user_exists(session_browser, 'alice')
|
username = 'bob2'
|
||||||
_set_user_inactive(session_browser, 'alice')
|
_non_admin_user_exists(session_browser, username,
|
||||||
_cannot_log_in(session_browser, 'alice')
|
groups=["freedombox-ssh"])
|
||||||
|
_configure_ssh_keys(session_browser, tmp_path_factory, username=username)
|
||||||
|
|
||||||
|
# Test set user inactive
|
||||||
|
_set_user_status(session_browser, username, 'inactive')
|
||||||
|
# Test Django login
|
||||||
|
_cannot_log_in(session_browser, username)
|
||||||
|
# Test PAM/nslcd authorization
|
||||||
|
_should_not_connect_passwordless_over_ssh(session_browser,
|
||||||
|
tmp_path_factory,
|
||||||
|
username=username)
|
||||||
|
|
||||||
|
# Test set user active
|
||||||
|
functional.login(session_browser)
|
||||||
|
_set_user_status(session_browser, username, 'active')
|
||||||
|
_can_log_in(session_browser, username)
|
||||||
|
_should_connect_passwordless_over_ssh(session_browser, tmp_path_factory,
|
||||||
|
username=username)
|
||||||
|
|
||||||
functional.login(session_browser)
|
functional.login(session_browser)
|
||||||
|
|
||||||
|
|
||||||
@ -201,14 +217,13 @@ def _admin_user_exists(session_browser, name):
|
|||||||
functional.create_user(session_browser, name, groups=['admin'])
|
functional.create_user(session_browser, name, groups=['admin'])
|
||||||
|
|
||||||
|
|
||||||
def _non_admin_user_exists(session_browser, name):
|
def _non_admin_user_exists(session_browser, name, groups=[]):
|
||||||
if functional.user_exists(session_browser, name):
|
if functional.user_exists(session_browser, name):
|
||||||
functional.delete_user(session_browser, name)
|
functional.delete_user(session_browser, name)
|
||||||
functional.create_user(session_browser, name)
|
functional.create_user(session_browser, name, groups=groups)
|
||||||
|
|
||||||
|
|
||||||
def _generate_ssh_keys(session_browser, tmp_path_factory):
|
def _generate_ssh_keys(session_browser, key_file):
|
||||||
key_file = tmp_path_factory.getbasetemp() / 'users-ssh.key'
|
|
||||||
try:
|
try:
|
||||||
key_file.unlink()
|
key_file.unlink()
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
@ -219,10 +234,12 @@ def _generate_ssh_keys(session_browser, tmp_path_factory):
|
|||||||
str(key_file)])
|
str(key_file)])
|
||||||
|
|
||||||
|
|
||||||
def _configure_ssh_keys(session_browser, tmp_path_factory):
|
def _configure_ssh_keys(session_browser, tmp_path_factory, username=None):
|
||||||
public_key_file = tmp_path_factory.getbasetemp() / 'users-ssh.key.pub'
|
key_file = tmp_path_factory.getbasetemp() / 'users-ssh.key'
|
||||||
|
_generate_ssh_keys(session_browser, key_file)
|
||||||
|
public_key_file = key_file.with_suffix(key_file.suffix + '.pub')
|
||||||
public_key = public_key_file.read_text()
|
public_key = public_key_file.read_text()
|
||||||
_set_ssh_keys(session_browser, public_key)
|
_set_ssh_keys(session_browser, public_key, username=username)
|
||||||
|
|
||||||
|
|
||||||
def _can_log_in(session_browser, username):
|
def _can_log_in(session_browser, username):
|
||||||
@ -244,16 +261,17 @@ def _cannot_log_in(session_browser, username):
|
|||||||
assert len(session_browser.find_by_id('id_user_menu')) == 0
|
assert len(session_browser.find_by_id('id_user_menu')) == 0
|
||||||
|
|
||||||
|
|
||||||
def _should_connect_passwordless_over_ssh(session_browser, tmp_path_factory):
|
def _should_connect_passwordless_over_ssh(session_browser, tmp_path_factory,
|
||||||
|
username=None):
|
||||||
key_file = tmp_path_factory.getbasetemp() / 'users-ssh.key'
|
key_file = tmp_path_factory.getbasetemp() / 'users-ssh.key'
|
||||||
_try_login_to_ssh(key_file=key_file)
|
_try_login_to_ssh(key_file=key_file, username=username)
|
||||||
|
|
||||||
|
|
||||||
def _should_not_connect_passwordless_over_ssh(session_browser,
|
def _should_not_connect_passwordless_over_ssh(session_browser,
|
||||||
tmp_path_factory):
|
tmp_path_factory, username=None):
|
||||||
key_file = tmp_path_factory.getbasetemp() / 'users-ssh.key'
|
key_file = tmp_path_factory.getbasetemp() / 'users-ssh.key'
|
||||||
with pytest.raises(subprocess.CalledProcessError):
|
with pytest.raises(subprocess.CalledProcessError):
|
||||||
_try_login_to_ssh(key_file=key_file)
|
_try_login_to_ssh(key_file=key_file, username=username)
|
||||||
|
|
||||||
|
|
||||||
def _rename_user(browser, old_name, new_name):
|
def _rename_user(browser, old_name, new_name):
|
||||||
@ -323,9 +341,12 @@ def _set_ssh_keys(browser, ssh_keys, username=None):
|
|||||||
functional.submit(browser, form_class='form-update')
|
functional.submit(browser, form_class='form-update')
|
||||||
|
|
||||||
|
|
||||||
def _set_user_inactive(browser, username):
|
def _set_user_status(browser, username, status):
|
||||||
functional.visit(browser, '/plinth/sys/users/{}/edit/'.format(username))
|
functional.visit(browser, '/plinth/sys/users/{}/edit/'.format(username))
|
||||||
browser.find_by_id('id_is_active').uncheck()
|
if status == "inactive":
|
||||||
|
browser.find_by_id('id_is_active').uncheck()
|
||||||
|
elif status == "active":
|
||||||
|
browser.find_by_id('id_is_active').check()
|
||||||
browser.find_by_id('id_confirm_password').fill(_admin_password)
|
browser.find_by_id('id_confirm_password').fill(_admin_password)
|
||||||
functional.submit(browser, form_class='form-update')
|
functional.submit(browser, form_class='form-update')
|
||||||
|
|
||||||
@ -348,8 +369,8 @@ def _change_password(browser, new_password, current_password=None,
|
|||||||
functional.submit(browser, form_class='form-change-password')
|
functional.submit(browser, form_class='form-change-password')
|
||||||
|
|
||||||
|
|
||||||
def _try_login_to_ssh(key_file=None):
|
def _try_login_to_ssh(key_file=None, username=None):
|
||||||
user = functional.config['DEFAULT']['username']
|
user = username if username else functional.config['DEFAULT']['username']
|
||||||
hostname = urllib.parse.urlparse(
|
hostname = urllib.parse.urlparse(
|
||||||
functional.config['DEFAULT']['url']).hostname
|
functional.config['DEFAULT']['url']).hostname
|
||||||
port = functional.config['DEFAULT']['ssh_port']
|
port = functional.config['DEFAULT']['ssh_port']
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user