FreedomBox/plinth/utils.py
Veiko Aasa 6b61ca2f18
ssh: action script: Require user credentials when editing ssh keys
This change prevents the plinth user to set the ssh-keys without
knowing the user password.

- Debian: added new dependency python3-pampy to authenticate users.
- Added additional required parameter --auth-user to the
  'actions/ssh set-keys' command. A password should be
  provided through STDIN.

Tests performed:
- running 'actions/ssh set-keys' with empty or wrong admin credentials
  fails.
- running 'actions/ssh set-keys' with correct admin credentials
  succeeds.
- running 'actions/ssh set-keys' with correct non-admin credentials
  succeeds if the --username is the same user.
- running 'actions/ssh set-keys' with correct non-admin credentials
  fails if the --username is a different user.

Signed-off-by: Veiko Aasa <veiko17@disroot.org>
Reviewed-by: Sunil Mohan Adapa <sunil@medhas.org>
2020-10-05 00:05:41 -07:00

166 lines
4.5 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Miscellaneous utility methods.
"""
import gzip
import importlib
import os
import random
import re
import string
from distutils.version import LooseVersion
import markupsafe
import pam
import ruamel.yaml
from django.utils.functional import lazy
Version = LooseVersion # Abstraction over distutils.version.LooseVersion
def import_from_gi(library, version):
"""Import and return a GObject introspection library."""
try:
import gi as package
package_name = 'gi'
except ImportError:
import pgi as package
package_name = 'pgi'
package.require_version(library, version)
return importlib.import_module(package_name + '.repository.' + library)
def _format_lazy(string, *args, **kwargs):
"""Lazily format a lazy string."""
allow_markup = kwargs.pop('allow_markup', False)
string = str(string)
string = string.format(*args, **kwargs)
if allow_markup:
string = markupsafe.Markup(string)
return string
format_lazy = lazy(_format_lazy, str)
def non_admin_view(func):
"""Decorator to mark a view as accessible by non-admin users."""
setattr(func, 'IS_NON_ADMIN', True)
return func
def is_user_admin(request, cached=False):
"""Return whether user is an administrator."""
if not request.user.is_authenticated:
return False
if 'cache_user_is_admin' in request.session and cached:
return request.session['cache_user_is_admin']
user_is_admin = request.user.groups.filter(name='admin').exists()
request.session['cache_user_is_admin'] = user_is_admin
return user_is_admin
class YAMLFile(object):
"""A context management class for updating YAML files"""
def __init__(self, yaml_file):
"""Return a context object for the YAML file.
Parameters:
yaml_file - the YAML file to update.
updating the YAML file.
"""
self.yaml_file = yaml_file
self.conf = None
def __enter__(self):
with open(self.yaml_file, 'r') as intro_conf:
if not self.is_file_empty():
self.conf = ruamel.yaml.round_trip_load(intro_conf)
else:
self.conf = {}
return self.conf
def __exit__(self, type_, value, traceback):
if not traceback:
with open(self.yaml_file, 'w') as intro_conf:
ruamel.yaml.round_trip_dump(self.conf, intro_conf)
def is_file_empty(self):
return os.stat(self.yaml_file).st_size == 0
def random_string(size=8):
"""Generate a random alphanumeric string."""
chars = (random.SystemRandom().choice(string.ascii_letters)
for _ in range(size))
return ''.join(chars)
def generate_password(size=32):
"""Generate a random password using ascii alphabet and digits."""
chars = (random.SystemRandom().choice(string.ascii_letters + string.digits)
for _ in range(size))
return ''.join(chars)
def grep(pattern, file_name):
"""Return lines of a file matching a pattern."""
return [
line.rstrip() for line in open(file_name) if re.search(pattern, line)
]
def gunzip(gzip_file, output_file):
"""Utility to unzip a given gzip file and write it to an output file
gzip_file: string path to a gzip file
output_file: string path to the output file
mode: an octal number to specify file permissions
"""
output_dir = os.path.dirname(output_file)
if not os.path.exists(output_dir):
os.makedirs(output_dir, mode=0o755)
with gzip.open(gzip_file, 'rb') as file_handle:
contents = file_handle.read()
def opener(path, flags):
return os.open(path, flags, mode=0o644)
with open(output_file, 'wb', opener=opener) as file_handle:
file_handle.write(contents)
def is_non_empty_file(file_path):
return os.path.isfile(file_path) and os.path.getsize(file_path) > 0
def is_axes_old():
"""Return true if using django-axes version strictly less than 5.0.0.
XXX: Remove this method and allow code that uses it after django-axes >=
5.0.0 becomes available in Debian stable.
"""
import axes
try:
version = axes.get_version()
except AttributeError:
# axes.get_version() was removed in 5.0.13
return False
return LooseVersion(version) < LooseVersion('5.0')
def is_authenticated_user(username, password):
"""Return true if the user authentication succeeds."""
pam_authenticator = pam.pam()
return bool(pam_authenticator.authenticate(username, password))