actions: Add method to handle privileged JSON request to a server

Tests:

- When privileged daemon receives a non-JSON request, a proper error structure
  is returned with SyntaxError.

- When privileged daemon receives a request without 'module', 'action', 'args'
  or 'kwargs' parameters, a proper error structure is returned with TypeError.

- When privileged daemon receives a request for invalid 'module' or 'action', a
  proper error structure is returned with SyntaxError.

- When an exception is thrown in a privileged method, the error is properly
  returned in error structure and caller is shown all the proper details.

- Valid return values are sent when a privileged call is made.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>

- Refactor validation of fields in the JSON object.
- Throw distinct errors for missing field and wrong type.
Signed-off-by: Joseph Nuthalapati <njoseph@riseup.net>

Reviewed-by: Joseph Nuthalapati <njoseph@riseup.net>
This commit is contained in:
Sunil Mohan Adapa 2025-06-25 11:44:08 -07:00 committed by Joseph Nuthalapati
parent f2edc6ab2b
commit 72bcb93f56
No known key found for this signature in database
GPG Key ID: 5398F00A2FA43C35

View File

@ -359,6 +359,55 @@ def privileged_main():
sys.exit(1)
def privileged_handle_json_request(request_string: str) -> str:
"""Parse arguments for the program spawned as a privileged action."""
def _parse_request() -> dict:
"""Return a JSON parsed and validated request."""
try:
request = json.loads(request_string)
except json.JSONDecodeError:
raise SyntaxError('Invalid JSON in request')
required_parameters = [('module', str), ('action', str),
('args', list), ('kwargs', dict)]
for parameter, expected_type in required_parameters:
if parameter not in request:
raise TypeError(f'Missing required parameter "{parameter}"')
if not isinstance(request[parameter], expected_type):
raise TypeError(
f'Parameter "{parameter}" must be of type {expected_type.__name__}'
)
return request
try:
request = _parse_request()
logger.info('Received request for %s..%s(..)', request['module'],
request['action'])
arguments = {'args': request['args'], 'kwargs': request['kwargs']}
return_value = _privileged_call(request['module'], request['action'],
arguments)
except (PermissionError, SyntaxError, TypeError, Exception) as exception:
if isinstance(exception, (PermissionError, SyntaxError, TypeError)):
logger.error(exception.args[0])
else:
logger.exception(exception)
return_value = {
'result': 'exception',
'exception': {
'module': type(exception).__module__,
'name': type(exception).__name__,
'args': exception.args,
'traceback': traceback.format_tb(exception.__traceback__)
}
}
return json.dumps(return_value, cls=JSONEncoder)
def _privileged_call(module_name, action_name, arguments):
"""Import the module and run action as superuser"""
if '.' in module_name: