FreedomBox/plinth/actions.py
Sunil Mohan Adapa ef23ebedbc
privileged: Don't log exception that are expected
Fixes: #2551.

Tests:

- In a VM visit the storage page. Without the patch, an exception is logged when
there is no space to expand the partition. With the patch, the exception is not
logged.

- Raise an exception in the storage.usage_info() method and notice that the
exception is logged when visiting the Storage app page.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
2025-12-03 14:23:03 -05:00

626 lines
22 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Framework to run specified actions with elevated privileges."""
import functools
import importlib
import inspect
import io
import json
import logging
import pathlib
import socket
import threading
import traceback
import types
import typing
from plinth import cfg, module_loader
logger = logging.getLogger(__name__)
socket_path = '/run/freedombox/privileged.socket'
thread_storage = threading.local()
# An alias for 'str' to mark some strings as sensitive. Sensitive strings are
# not logged. Use 'type secret_str = str' when Python 3.11 support is no longer
# needed.
class secret_str(str):
pass
def privileged(func):
"""Mark a method as allowed to be run as privileged method.
This decorator is to mark any method as needing to be executed with
superuser privileges. This is necessary because the primary FreedomBox
service daemon runs as a regular user and has no special privileges. When
performing system operations, FreedomBox service will either communicate
with privileged daemons such as NetworkManager and systemd, or spawns a
separate process with higher privileges. When spawning a separate process
all the action parameters need to serialized, communicated to the process
and then de-serialized inside the process. The return value also need to
undergo such serialization and de-serialization. This decorator makes this
task simpler.
A call to a decorated method will be serialized into a sudo call (or later
into a D-Bus call). The method arguments are turned to JSON and method is
called with superuser privileges. As arguments are de-serialized, they are
verified for type before the actual call as superuser. Return values are
serialized and returned where they are de-serialized. Exceptions are also
serialized and de-serialized. The decorator wrapper code will either return
the value or raise exception.
For a method to be decorated, the method must have type annotations for all
of its parameters and should not use keyword-only arguments. It must also
be in a module named privileged.py directly under the application similar
to models.py, views.py and urls.py. Currently supported types are bool,
int, float, str, dict/Dict, list/List, Optional and Union.
Privileged methods many not output to the stdout as it interferes
with the serialization and de-serialization process.
"""
setattr(func, '_privileged', True)
_check_privileged_action_arguments(func)
@functools.wraps(func)
def wrapper(*args, **kwargs):
if getattr(func, '_skip_privileged_call', False):
return func(*args, **kwargs)
module_name = _get_privileged_action_module_name(func)
action_name = func.__name__
return run_privileged_method(func, module_name, action_name, args,
kwargs)
return wrapper
def _read_from_server(client_socket: socket.socket) -> bytes:
"""Read everything from a socket and return the data."""
response = b''
while True:
chunk = client_socket.recv(4096)
if not chunk:
break
response += chunk
if not response:
raise ConnectionError('Server returned empty response')
return json.loads(response)
def _request_to_server(request: dict) -> socket.socket:
"""Connect to the server and make a request."""
request_string = json.dumps(request)
client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
client_socket.connect(socket_path)
client_socket.sendall(request_string.encode('utf-8'))
# Close the write end of the socket signaling an EOF and no more data
# will be sent.
client_socket.shutdown(socket.SHUT_WR)
except Exception:
client_socket.close()
raise
return client_socket
def run_privileged_method(func, module_name, action_name, args, kwargs):
"""Execute a privileged method using a server."""
run_in_background = kwargs.pop('_run_in_background', False)
raw_output = kwargs.pop('_raw_output', False)
log_error = kwargs.pop('_log_error', True)
if func:
_log_action(func, module_name, action_name, args, kwargs,
run_in_background)
request = {
'module': module_name,
'action': action_name,
'args': args,
'kwargs': kwargs,
}
if raw_output:
request['raw_output'] = raw_output
if not log_error:
request['log_error'] = False
client_socket = _request_to_server(request)
if raw_output:
def _reader_func():
while True:
chunk = client_socket.recv(4096)
if chunk:
yield chunk
else:
break
client_socket.close()
return _reader_func()
args = (func, module_name, action_name, args, kwargs, log_error,
client_socket)
if not run_in_background:
return _wait_for_server_response(*args)
read_thread = threading.Thread(target=_wait_for_server_response, args=args)
read_thread.start()
def _wait_for_server_response(func, module_name, action_name, args, kwargs,
log_error, client_socket):
"""Wait for the server to respond and process the response."""
try:
return_value = _read_from_server(client_socket)
except json.JSONDecodeError:
logger.error('Error decoding action return value %s..%s(*%s, **%s)',
module_name, action_name, args, kwargs)
raise
finally:
client_socket.close()
if return_value['result'] == 'success':
return return_value['return']
module = importlib.import_module(return_value['exception']['module'])
exception_class = getattr(module, return_value['exception']['name'])
exception = exception_class(*return_value['exception']['args'])
exception.stdout = return_value['exception'].get('stdout', b'').encode()
exception.stderr = return_value['exception'].get('stderr', b'').encode()
def _get_html_message():
"""Return an HTML format error that can be shown in messages."""
from django.utils.html import format_html
formatted_args = _format_args(func, args, kwargs)
exception_args, stdout, stderr, traceback = _format_error(
exception, return_value)
return format_html('Error running action: {}..{}({}): {}({})\n{}{}{}',
module_name, action_name, formatted_args,
return_value['exception']['name'], exception_args,
stdout, stderr, traceback)
if func:
exception.get_html_message = _get_html_message
if log_error and func:
formatted_args = _format_args(func, args, kwargs)
exception_args, stdout, stderr, traceback = _format_error(
exception, return_value)
logger.error('Error running action %s..%s(%s): %s(%s)\n'
'%s%s%s', module_name, action_name, formatted_args,
return_value['exception']['name'], exception_args, stdout,
stderr, traceback)
raise exception
class ProcessBufferedReader(io.BufferedReader):
"""Improve performance of buffered binary streaming.
Read from the stdout of a process and close the process after reading is
completed.
Iteration calls __next__ over the BufferedReader holding process.stdout.
However, this seems to call readline() which looks for \n in binary data
which leads to short unpredictably sized chunks which in turn lead to
severe performance degradation. So, overwrite this and call read() which is
better geared for handling binary data.
"""
def __init__(self, process, extra_cleanup_func=None, *args, **kwargs):
"""Store the process object."""
super().__init__(process.stdout, *args, **kwargs)
self.process = process
self.extra_cleanup_func = extra_cleanup_func
def __next__(self):
"""Override to call read() instead of readline()."""
chunk = self.read(io.DEFAULT_BUFFER_SIZE)
if not chunk:
self._cleanup_func()
raise StopIteration
return chunk
def _cleanup_func(self):
"""After the process has been read from, cleanup the process."""
try:
if self.process.stdout:
self.process.stdout.close()
if self.process.stderr:
self.process.stderr.close()
self.process.wait(30)
if self.extra_cleanup_func:
self.extra_cleanup_func()
except Exception:
logger.exception('Closing process failed after raw output')
def _format_args(func, args, kwargs):
"""Return a loggable representation of arguments."""
argspec = inspect.getfullargspec(func)
if len(args) > len(argspec.args):
raise SyntaxError('Too many arguments')
args_str_list = []
for arg_index, arg_value in enumerate(args):
arg_name = argspec.args[arg_index]
if argspec.annotations[arg_name] in [secret_str, secret_str | None]:
value = '****'
else:
value = json.dumps(arg_value)
args_str_list.append(value)
kwargs_str_list = []
for arg_name, arg_value in kwargs.items():
if argspec.annotations[arg_name] in [secret_str, secret_str | None]:
value = "****"
else:
value = json.dumps(arg_value)
kwargs_str_list.append(f'{arg_name}=' + value)
return ', '.join(args_str_list + kwargs_str_list)
def _format_error(exception, return_value):
"""Log the exception in a readable manner."""
exception_args = ', '.join([json.dumps(arg) for arg in exception.args])
stdout = exception.stdout.decode()
if stdout:
lines = stdout.split('\n')
lines = lines[:-1] if not lines[-1] else lines
stdout = '\n'.join(('' + line for line in lines))
stdout = 'Stdout:\n' + stdout + '\n'
stderr = exception.stderr.decode()
if stderr:
lines = stderr.split('\n')
lines = lines[:-1] if not lines[-1] else lines
stderr = '\n'.join(('' + line for line in lines))
stderr = 'Stderr:\n' + stderr + '\n'
traceback = return_value['exception']['traceback']
if traceback:
all_lines = []
for entry in traceback:
lines = entry.split('\n')
all_lines += lines[:-1] if not lines[-1] else lines
traceback = '\n'.join(('' + line for line in all_lines))
traceback = 'Action traceback:\n' + traceback + '\n'
return (exception_args, stdout, stderr, traceback)
def _check_privileged_action_arguments(func):
"""Check that a privileged action has well defined types."""
argspec = inspect.getfullargspec(func)
if (argspec.varargs or argspec.varkw or argspec.kwonlyargs
or argspec.kwonlydefaults):
raise SyntaxError('Actions must not have variable args')
for arg in argspec.args:
if arg not in argspec.annotations:
raise SyntaxError('All arguments must be annotated')
for arg_name, arg_value in argspec.annotations.items():
for keyword in ('password', 'passphrase', 'secret'):
if keyword in arg_name:
if arg_value not in [secret_str, secret_str | None]:
raise SyntaxError(
f'Argument {arg_name} should likely be a "secret_str"')
def _get_privileged_action_module_name(func):
"""Figure out the module name of a privileged action."""
module_name = func.__module__
while module_name:
module_name, _, last = module_name.rpartition('.')
if last == 'privileged':
break
if not module_name:
raise ValueError('Privileged actions must be placed under a '
'package/module named privileged')
return module_name.rpartition('.')[2]
def _log_action(func, module_name, action_name, args, kwargs,
run_in_background):
"""Log an action in a compact format."""
prompt = '»'
suffix = '&' if run_in_background else ''
formatted_args = _format_args(func, args, kwargs)
logger.info('%s %s..%s(%s) %s', prompt, module_name, action_name,
formatted_args, suffix)
class JSONEncoder(json.JSONEncoder):
"""Handle to special types that default JSON encoder does not."""
def default(self, obj):
"""Handle special object types."""
# When subprocess.call() fails and one of the arguments is a Path-like
# object, the exception also contains a Path-like object.
if isinstance(obj, pathlib.Path):
return str(obj)
return super().default(obj)
def _setup_thread_storage():
"""Setup collection of stdout/stderr from any process in this thread."""
global thread_storage
thread_storage.stdout = b''
thread_storage.stderr = b''
def _clear_thread_storage():
"""Cleanup memory used for stdout/stderr from processes in this thread.
Python documentation is silent on whether thread local storage will be
cleaned up after a thread terminates.
"""
global thread_storage
thread_storage.stdout = None
thread_storage.stderr = None
def get_return_value_from_exception(exception):
"""Return the value to return from server when an exception is raised."""
global thread_storage
return_value = {
'result': 'exception',
'exception': {
'module': type(exception).__module__,
'name': type(exception).__name__,
'args': exception.args,
'traceback': traceback.format_tb(exception.__traceback__),
'stdout': getattr(thread_storage, 'stdout', b'').decode(),
'stderr': getattr(thread_storage, 'stderr', b'').decode(),
}
}
return return_value
def privileged_handle_json_request(
request_string: str) -> str | io.BufferedReader:
"""Parse arguments for the program spawned as a privileged action."""
def _parse_request() -> dict:
"""Return a JSON parsed and validated request."""
try:
request = json.loads(request_string)
except json.JSONDecodeError:
raise SyntaxError('Invalid JSON in request')
required_parameters = [('module', str), ('action', str),
('args', list), ('kwargs', dict)]
for parameter, expected_type in required_parameters:
if parameter not in request:
raise TypeError(f'Missing required parameter "{parameter}"')
if not isinstance(request[parameter], expected_type):
raise TypeError(f'Parameter "{parameter}" must be of type'
f'{expected_type.__name__}')
if 'raw_output' in request and not isinstance(request['raw_output'],
bool):
raise TypeError('Incorrect "raw_output" parameter')
if 'log_error' in request and not isinstance(request['log_error'],
bool):
raise TypeError('Incorrect "log_error" parameter')
return request
try:
request = _parse_request()
log_error = request.get('log_error', True)
arguments = {'args': request['args'], 'kwargs': request['kwargs']}
_setup_thread_storage()
return_value = _privileged_call(request['module'], request['action'],
arguments, log_error)
if isinstance(return_value, io.BufferedReader):
raw_output = request.get('raw_output', False)
if not raw_output:
raise TypeError('Invalid call to raw output API.')
return return_value
except (PermissionError, SyntaxError, TypeError, Exception) as exception:
if isinstance(exception, (PermissionError, SyntaxError, TypeError)):
logger.error(exception.args[0])
else:
logger.exception(exception)
return_value = get_return_value_from_exception(exception)
_clear_thread_storage()
return json.dumps(return_value, cls=JSONEncoder)
def _privileged_call(module_name, action_name, arguments, log_error=True):
"""Import the module and run action as superuser"""
if '.' in module_name:
raise SyntaxError('Invalid module name')
cfg.read()
if module_name == 'plinth':
import_path = 'plinth'
else:
try:
import_path = module_loader.get_module_import_path(module_name)
except FileNotFoundError as exception:
raise SyntaxError('Specified module not found') from exception
try:
module = importlib.import_module(import_path + '.privileged')
except ModuleNotFoundError as exception:
raise SyntaxError('Specified module not found') from exception
try:
action = getattr(module, action_name)
except AttributeError as exception:
raise SyntaxError('Specified action not found') from exception
if not getattr(action, '_privileged', None):
raise SyntaxError('Specified action is not privileged action')
# Get the original function that may have been wrapped/decorated multiple
# times
func = action
while True:
try:
func = getattr(func, '__wrapped__')
except AttributeError:
break
_privileged_assert_valid_arguments(func, arguments)
_log_action(func, module_name, action_name, arguments['args'],
arguments['kwargs'], run_in_background=False)
try:
return_values = func(*arguments['args'], **arguments['kwargs'])
if isinstance(return_values, io.BufferedReader):
return return_values
return_value = {'result': 'success', 'return': return_values}
except Exception as exception:
return_value = get_return_value_from_exception(exception)
if log_error:
logger.exception(
'Error running action: %s..%s(..): %s\nstdout:\n%s\n'
'stderr:\n%s\n', module_name, action_name, exception,
return_value['exception']['stdout'],
return_value['exception']['stderr'])
return return_value
def _privileged_assert_valid_arguments(func, arguments):
"""Check the names, types and completeness of the arguments passed."""
# Check if arguments match types
if not isinstance(arguments, dict):
raise SyntaxError('Invalid arguments format')
if 'args' not in arguments or 'kwargs' not in arguments:
raise SyntaxError('Invalid arguments format')
args = arguments['args']
kwargs = arguments['kwargs']
if not isinstance(args, list) or not isinstance(kwargs, dict):
raise SyntaxError('Invalid arguments format')
argspec = inspect.getfullargspec(func)
if len(args) + len(kwargs) > len(argspec.args):
raise SyntaxError('Too many arguments')
no_defaults = len(argspec.args)
if argspec.defaults:
no_defaults -= len(argspec.defaults)
for key in argspec.args[len(args):no_defaults]:
if key not in kwargs:
raise SyntaxError(f'Argument not provided: {key}')
for key, value in kwargs.items():
if key not in argspec.args:
raise SyntaxError(f'Unknown argument: {key}')
if argspec.args.index(key) < len(args):
raise SyntaxError(f'Duplicate argument: {key}')
_privileged_assert_valid_type(f'arg {key}', value,
argspec.annotations[key])
for index, arg in enumerate(args):
annotation = argspec.annotations[argspec.args[index]]
_privileged_assert_valid_type(f'arg #{index}', arg, annotation)
def _privileged_assert_valid_type(arg_name, value, annotation):
"""Assert that the type of argument value matches the annotation."""
if annotation == typing.Any:
return
NoneType = type(None)
if annotation == NoneType:
if value is not None:
raise TypeError('Expected None for {arg_name}')
return
basic_types = {bool, int, str, float}
if annotation in basic_types:
if not isinstance(value, annotation):
raise TypeError(
f'Expected type {annotation.__name__} for {arg_name}')
return
# secret_str should be a regular string
if annotation == secret_str:
if not isinstance(value, str):
raise TypeError(f'Expected type str for {arg_name}')
return
# 'int | str' or 'typing.Union[int, str]'
if (isinstance(annotation, types.UnionType)
or getattr(annotation, '__origin__', None) == typing.Union):
for arg in annotation.__args__:
try:
_privileged_assert_valid_type(arg_name, value, arg)
return
except TypeError:
pass
raise TypeError(f'Expected one of unioned types for {arg_name}')
# 'list[int]' or 'typing.List[int]'
if getattr(annotation, '__origin__', None) == list:
if not isinstance(value, list):
raise TypeError(f'Expected type list for {arg_name}')
for index, inner_item in enumerate(value):
_privileged_assert_valid_type(f'{arg_name}[{index}]', inner_item,
annotation.__args__[0])
return
# 'list[dict]' or 'typing.List[dict]'
if getattr(annotation, '__origin__', None) == dict:
if not isinstance(value, dict):
raise TypeError(f'Expected type dict for {arg_name}')
for inner_key, inner_value in value.items():
_privileged_assert_valid_type(f'{arg_name}[{inner_key}]',
inner_key, annotation.__args__[0])
_privileged_assert_valid_type(f'{arg_name}[{inner_value}]',
inner_value, annotation.__args__[1])
return
raise TypeError('Unsupported annotation type')