email_server: action: Refactor for simplicity

- Parse arguments in a readable way.

- Convert decorator into simple call.

- Make a simple call instead of looking for subcommand.

- Don't setup logging in global scope.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2021-10-17 22:12:25 -07:00 committed by James Valleroy
parent 220149e6e0
commit a2038e98d6
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
8 changed files with 38 additions and 45 deletions

View File

@ -1,5 +1,8 @@
#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Configuration helper for email server.
"""
import argparse
import logging
@ -7,50 +10,39 @@ import os
import sys
import plinth.log
from plinth.modules.email_server import audit
EXIT_SYNTAX = 10
EXIT_PERM = 20
# Set up logging
plinth.log.pipe_to_syslog(to_stderr='tty')
logger = logging.getLogger(os.path.basename(__file__))
def reserved_for_root(fun):
def wrapped(*args, **kwargs):
if os.getuid() != 0:
logger.critical('This action is reserved for root')
sys.exit(EXIT_PERM)
return fun(*args, **kwargs)
return wrapped
def main():
if not sys.stdin.isatty():
print('WARNING: Output will not be shown. Check syslog for logs',
file=sys.stderr)
"""Parse arguments."""
# Set up logging
plinth.log.pipe_to_syslog(to_stderr='tty')
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-i', nargs='+', dest='ipc')
parser.add_argument('module', help='Module to trigger action in')
parser.add_argument('action', help='Action to trigger in module')
parser.add_argument('arguments', help='String arguments for action',
nargs='*')
args = parser.parse_args()
# Select the first non-empty dict item
adict = vars(parser.parse_args())
generator = (kv for kv in adict.items() if kv[1] is not None)
subcommand, arguments = next(generator)
function = globals()['subcommand_' + subcommand]
try:
function(*arguments)
except Exception as e:
logger.exception(e)
_call(args.module, args.action, args.arguments)
except Exception as exception:
logger.exception(exception)
_log_additional_info()
sys.exit(1)
@reserved_for_root
def subcommand_ipc(module_name, action_name, *args):
import plinth.modules.email_server.audit as audit
def _call(module_name, action_name, arguments):
"""Import the module and run action as superuser."""
if os.getuid() != 0:
logger.critical('This action is reserved for root')
sys.exit(EXIT_PERM)
# We only run actions defined in the audit module
if module_name not in audit.__all__:
@ -58,15 +50,17 @@ def subcommand_ipc(module_name, action_name, *args):
sys.exit(EXIT_SYNTAX)
module = getattr(audit, module_name)
function = getattr(module, 'action_' + action_name, None)
if function is None:
try:
action = getattr(module, 'action_' + action_name)
except AttributeError:
logger.critical('Bad action: %s/%r', module_name, action_name)
sys.exit(EXIT_SYNTAX)
function(*args)
action(*arguments)
def _log_additional_info():
"""Log additional debugging information."""
import grp
import pwd
resu = ','.join(pwd.getpwuid(uid).pw_name for uid in os.getresuid())

View File

@ -98,7 +98,7 @@ def _set_status(uid, aliases, status):
def first_setup():
"""Create the database file and schema inside it."""
actions.superuser_run('email_server', ['-i', 'aliases', 'setup'])
actions.superuser_run('email_server', ['aliases', 'setup'])
# Create schema if not exists
query = '''

View File

@ -42,14 +42,14 @@ def get():
def repair():
superuser_run('email_server', ['-i', 'domain', 'set_up'])
superuser_run('email_server', ['domain', 'set_up'])
def repair_component(action_name):
allowed_actions = {'set_up': ['postfix']}
if action_name not in allowed_actions:
return
superuser_run('email_server', ['-i', 'domain', action_name])
superuser_run('email_server', ['domain', action_name])
return allowed_actions[action_name]
@ -199,7 +199,7 @@ def set_keys(raw):
raise ClientError('POST data exceeds max line length')
try:
superuser_run('email_server', ['-i', 'domain', 'set_keys'], input=ipc)
superuser_run('email_server', ['domain', 'set_keys'], input=ipc)
except ActionError as e:
stdout = e.args[1]
if not stdout.startswith('ClientError:'):

View File

@ -49,8 +49,7 @@ def put_uid(uid_number):
def _put(arg_type, user_info):
try:
args = ['-i', 'home', 'mk', arg_type, user_info]
superuser_run('email_server', args)
superuser_run('email_server', ['home', 'mk', arg_type, user_info])
except ActionError as e:
raise RuntimeError('Action script failure') from e

View File

@ -86,7 +86,7 @@ def repair():
POST /audit/ldap/repair
"""
aliases.first_setup()
actions.superuser_run('email_server', ['-i', 'ldap', 'set_up'])
actions.superuser_run('email_server', ['ldap', 'set_up'])
def action_set_up():

View File

@ -32,7 +32,7 @@ def get():
'rc_config_header': _('RoundCube configured for FreedomBox email'),
}
output = actions.superuser_run('email_server', ['-i', 'rcube', 'check'])
output = actions.superuser_run('email_server', ['rcube', 'check'])
results = json.loads(output)
for i in range(0, len(results)):
results[i] = models.Diagnosis.from_json(results[i], translation.get)
@ -41,14 +41,14 @@ def get():
def repair():
actions.superuser_run('email_server', ['-i', 'rcube', 'set_up'])
actions.superuser_run('email_server', ['rcube', 'set_up'])
def repair_component(action):
action_to_services = {'set_up': []}
if action not in action_to_services:
return
actions.superuser_run('email_server', ['-i', 'rcube', action])
actions.superuser_run('email_server', ['rcube', action])
return action_to_services[action]

View File

@ -94,7 +94,7 @@ def get():
def repair():
actions.superuser_run('email_server', ['-i', 'spam', 'set_filter'])
actions.superuser_run('email_server', ['spam', 'set_filter'])
def check_filter(title=''):

View File

@ -90,20 +90,20 @@ def _get_superuser_results(results):
translation = {
'cert_availability': _('Has a TLS certificate'),
}
dump = actions.superuser_run('email_server', ['-i', 'tls', 'check'])
dump = actions.superuser_run('email_server', ['tls', 'check'])
for jmap in json.loads(dump):
results.append(models.Diagnosis.from_json(jmap, translation.get))
def repair():
actions.superuser_run('email_server', ['-i', 'tls', 'set_up'])
actions.superuser_run('email_server', ['tls', 'set_up'])
def repair_component(action):
action_to_services = {'set_cert': ['dovecot', 'postfix']}
if action not in action_to_services: # action not allowed
return
actions.superuser_run('email_server', ['-i', 'tls', action])
actions.superuser_run('email_server', ['tls', action])
return action_to_services[action]