Handle arguments to actions properly

- Don't allow strings to be sent as arguments.  The mixup with
  lists/tuples could be dangerous.

- Don't escape arguments.  subprocess.Popen takes care that arguments
  are passed on nicely to the actions.

- Update tests.
This commit is contained in:
Sunil Mohan Adapa 2015-04-10 14:47:23 +05:30
parent 4e42486822
commit b62f6746f4
6 changed files with 100 additions and 82 deletions

View File

@ -40,14 +40,16 @@ Actions run commands with this contract (version 1.1):
sub-directories of the actions directory. sub-directories of the actions directory.
(An important side-effect of this is that the system will not try to (An important side-effect of this is that the system will not try to
follow symlinks to other action directories.) follow symlinks to other action directories.)
Arguments that fail this validation will raise a ValueError. Arguments that fail this validation will raise a ValueError.
C. Only one action can be called at a time. C. Only one action can be called at a time.
This prevents us from appending multiple (unexpected) actions to the This prevents us from appending multiple (unexpected) actions to
call. the call. Any special shell characters in the action name will
simply be treated as the action itself when trying to search for
an action. The action will then not be found.
$ action="echo '$options'; echo 'oops'" $ action="echo '$options'; echo 'oops'"
$ options="hi" $ options="hi"
@ -61,21 +63,23 @@ Actions run commands with this contract (version 1.1):
$ options="hi'; rm -rf /;'" $ options="hi'; rm -rf /;'"
$ $action # oops, the file system is gone! $ $action # oops, the file system is gone!
Arguments that fail this validation won't, but probably should, raise a Any option that tries to include special shell characters will
ValueError. They don't because sanitizing this case is significantly simply be treated as an option with special characters and will
easier than detecting if it occurs. not be interpreted by the shell.
The options list is coerced into a space-separated string before being Any call wishing to include special shell characters in options
shell-escaped. Option lists including shell escape characters may need list does not need to escape them. They are taken care of. The
to be unescaped on the receiving end. option string is passed to the action exactly as it is passed in.
E. Actions must exist in the actions directory. E. Actions must exist in the actions directory.
4. (promise) Options are appended to the action. 4. (promise) Options are passed as arguments to the action.
Options can be provided as a list or strings. Options can be provided as a list or as a tuple.
5. (promise) Output and error strings are returned from the command. 5. (promise) Output is returned from the command. In case of an
error, ActionError is raised with action, output and error strings
as arguments.
6. (limitation) Providing the process with input is not possible. 6. (limitation) Providing the process with input is not possible.
@ -89,7 +93,6 @@ Actions run commands with this contract (version 1.1):
import logging import logging
import os import os
import pipes
import subprocess import subprocess
from plinth import cfg from plinth import cfg
@ -103,7 +106,6 @@ def run(action, options=None, async=False):
"""Safely run a specific action as the current user. """Safely run a specific action as the current user.
See actions._run for more information. See actions._run for more information.
""" """
return _run(action, options, async, False) return _run(action, options, async, False)
@ -112,7 +114,6 @@ def superuser_run(action, options=None, async=False):
"""Safely run a specific action as root. """Safely run a specific action as root.
See actions._run for more information. See actions._run for more information.
""" """
return _run(action, options, async, True) return _run(action, options, async, True)
@ -124,42 +125,44 @@ def _run(action, options=None, async=False, run_as_root=False):
- options are added to the action command. - options are added to the action command.
- async: run asynchronously or wait for the command to complete. - async: run asynchronously or wait for the command to complete.
- run_as_root: execute the command through sudo. - run_as_root: execute the command through sudo.
""" """
if options is None: if options is None:
options = [] options = []
# contract 3A and 3B: don't call anything outside of the actions directory. # Contract 3A and 3B: don't call anything outside of the actions directory.
if os.sep in action: if os.sep in action:
raise ValueError("Action can't contain:" + os.sep) raise ValueError('Action cannot contain: ' + os.sep)
cmd = os.path.join(cfg.actions_dir, action) cmd = os.path.join(cfg.actions_dir, action)
if not os.path.realpath(cmd).startswith(cfg.actions_dir): if not os.path.realpath(cmd).startswith(cfg.actions_dir):
raise ValueError("Action has to be in directory %s" % cfg.actions_dir) raise ValueError('Action has to be in directory %s' % cfg.actions_dir)
# contract 3C: interpret shell escape sequences as literal file names. # Contract 3C: interpret shell escape sequences as literal file names.
# contract 3E: fail if the action doesn't exist or exists elsewhere. # Contract 3E: fail if the action doesn't exist or exists elsewhere.
if not os.access(cmd, os.F_OK): if not os.access(cmd, os.F_OK):
raise ValueError("Action must exist in action directory.") raise ValueError('Action must exist in action directory.')
cmd = [cmd] cmd = [cmd]
# contract: 3C, 3D: don't allow users to insert escape characters in # Contract: 3C, 3D: don't allow shell special characters in
# options # options be interpreted by the shell. When using
# subprocess.Popen with list invocation and not shell invocation,
# escaping is unnecessary as each argument is passed directly to
# the command and not parsed by a shell.
if options: if options:
if not isinstance(options, (list, tuple)): if not isinstance(options, (list, tuple)):
options = [options] raise ValueError('Options must be list or tuple.')
cmd += [pipes.quote(option) for option in options] cmd += list(options) # No escaping necessary
# contract 1: commands can run via sudo. # Contract 1: commands can run via sudo.
if run_as_root: if run_as_root:
cmd = ["sudo", "-n"] + cmd cmd = ['sudo', '-n'] + cmd
LOGGER.info('Executing command - %s', cmd) LOGGER.info('Executing command - %s', cmd)
# contract 3C: don't interpret shell escape sequences. # Contract 3C: don't interpret shell escape sequences.
# contract 5 (and 6-ish). # Contract 5 (and 6-ish).
proc = subprocess.Popen( proc = subprocess.Popen(
cmd, cmd,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,

View File

@ -195,7 +195,7 @@ def set_hostname(hostname):
new_hostname=hostname) new_hostname=hostname)
LOGGER.info('Changing hostname to - %s', hostname) LOGGER.info('Changing hostname to - %s', hostname)
actions.superuser_run('hostname-change', hostname) actions.superuser_run('hostname-change', [hostname])
post_hostname_change.send_robust(sender='config', post_hostname_change.send_robust(sender='config',
old_hostname=old_hostname, old_hostname=old_hostname,
@ -210,7 +210,7 @@ def set_domainname(domainname):
domainname = str(domainname) domainname = str(domainname)
LOGGER.info('Changing domain name to - %s', domainname) LOGGER.info('Changing domain name to - %s', domainname)
actions.superuser_run('domainname-change', domainname) actions.superuser_run('domainname-change', [domainname])
domainname_change.send_robust(sender='config', domainname_change.send_robust(sender='config',
old_domainname=old_domainname, old_domainname=old_domainname,

View File

@ -257,7 +257,7 @@ def get_status():
"""Return the current status""" """Return the current status"""
"""ToDo: use key/value instead of hard coded value list""" """ToDo: use key/value instead of hard coded value list"""
status = {} status = {}
output = actions.run('dynamicdns', 'status') output = actions.run('dynamicdns', ['status'])
details = output.split() details = output.split()
status['enabled'] = (output.split()[0] == 'enabled') status['enabled'] = (output.split()[0] == 'enabled')

View File

@ -72,7 +72,7 @@ def index(request):
def get_status(): def get_status():
"""Return the current status""" """Return the current status"""
output = actions.run('owncloud-setup', 'status') output = actions.run('owncloud-setup', ['status'])
return {'enabled': 'enable' in output.split()} return {'enabled': 'enable' in output.split()}

View File

@ -103,7 +103,7 @@ def configure(request):
def get_status(): def get_status():
"""Return the current status""" """Return the current status"""
output = actions.run('xmpp-setup', 'status') output = actions.run('xmpp-setup', ['status'])
return {'inband_enabled': 'inband_enable' in output.split()} return {'inband_enabled': 'inband_enable' in output.split()}

View File

@ -18,16 +18,17 @@
# #
import os import os
import shlex import shutil
import subprocess
import unittest import unittest
from plinth.actions import superuser_run, run from plinth.actions import superuser_run, run
from plinth import cfg from plinth import cfg
ROOT_DIR = os.path.split(os.path.abspath(os.path.split(__file__)[0]))[0] test_dir = os.path.split(__file__)[0]
cfg.actions_dir = os.path.join(ROOT_DIR, 'actions') root_dir = os.path.abspath(os.path.join(test_dir, os.path.pardir + os.path.sep +
os.path.pardir))
cfg.actions_dir = os.path.join(root_dir, 'actions')
class TestPrivileged(unittest.TestCase): class TestPrivileged(unittest.TestCase):
@ -36,30 +37,27 @@ class TestPrivileged(unittest.TestCase):
See actions.py for a full description of the expectations. See actions.py for a full description of the expectations.
Symlink to ``echo`` and ``id`` during testing. Symlink to ``echo`` and ``id`` during testing.
""" """
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
os.symlink("/bin/echo", "actions/echo") shutil.copy('/bin/echo', cfg.actions_dir)
os.symlink("/usr/bin/id", "actions/id") shutil.copy('/usr/bin/id', cfg.actions_dir)
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
os.remove("actions/echo") os.remove('actions/echo')
os.remove("actions/id") os.remove('actions/id')
def notest_run_as_root(self): def notest_run_as_root(self):
"""1. Privileged actions run as root. """ """1. Privileged actions run as root. """
# TODO: it's not allowed to call a symlink in the actions dir anymore
self.assertEqual( self.assertEqual(
"0", # user 0 is root '0', # user 0 is root
superuser_run("id", "-ur")[0].strip()) superuser_run('id', ['-ur'])[0].strip())
def test_breakout_actions_dir(self): def test_breakout_actions_dir(self):
"""2. The actions directory can't be changed at run time. """2. The actions directory can't be changed at run time.
Can't currently be tested, as the actions directory is hardcoded. Can't currently be tested, as the actions directory is hardcoded.
""" """
pass pass
@ -67,17 +65,14 @@ class TestPrivileged(unittest.TestCase):
"""3A. Users can't call actions above the actions directory. """3A. Users can't call actions above the actions directory.
Tests both a relative and a literal path. Tests both a relative and a literal path.
""" """
options="hi" for action in ('../echo', '/bin/echo'):
for arg in ("../echo", "/bin/echo"):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
run(arg, options) run(action, ['hi'])
def test_breakout_down(self): def test_breakout_down(self):
"""3B. Users can't call actions beneath the actions directory.""" """3B. Users can't call actions beneath the actions directory."""
action="directory/echo" action = 'directory/echo'
self.assertRaises(ValueError, superuser_run, action) self.assertRaises(ValueError, superuser_run, action)
@ -85,48 +80,68 @@ class TestPrivileged(unittest.TestCase):
"""3C. Actions can't be used to run other actions. """3C. Actions can't be used to run other actions.
If multiple actions are specified, bail out. If multiple actions are specified, bail out.
""" """
# counting is safer than actual badness. # Counting is safer than actual badness.
actions = ("echo ''; echo $((1+1))", actions = ('echo ""; echo $((1+1))',
"echo '' && echo $((1+1))", 'echo "" && echo $((1+1))',
"echo '' || echo $((1+1))") 'echo "" || echo $((1+1))')
options = ("good", "") options = ('good', '')
for action in actions: for action in actions:
for option in options: for option in options:
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
output = run(action, option) run(action, [option])
# if it somewhow doesn't error, we'd better not evaluate
# the data.
self.assertFalse("2" in output[0])
def test_breakout_option_string(self): def test_breakout_option_string(self):
"""3D. Option strings can't be used to run other actions. """3D. Option strings can't be used to run other actions.
Verify that shell control characters aren't interpreted. Verify that shell control characters aren't interpreted.
""" """
action = "echo" options = ('; echo hello',
# counting is safer than actual badness. '&& echo hello',
options = "good; echo $((1+1))" '|| echo hello',
self.assertRaises(ValueError, run, action, options) '& echo hello',
r'\; echo hello',
'| echo hello',
r':;!&\/$%@`"~#*(){}[]|+=')
for option in options:
output = run('echo', [option])
output = output.rstrip('\n')
self.assertEqual(option, output)
def test_breakout_option_list(self): def test_breakout_option_list(self):
"""3D. Option lists can't be used to run other actions. """3D. Option lists can't be used to run other actions.
Verify that only a string of options is accepted and that we can't just
tack additional shell control characters onto the list.
"""
action = "echo"
# counting is safer than actual badness.
options = ["good", ";", "echo $((1+1))"]
# we'd better not evaluate the data.
self.assertRaises(ValueError, run, action, options)
def notest_multiple_options(self): Verify that shell control characters aren't interpreted in option lists.
""" 4. Multiple options can be provided as a list. """ """
# TODO: it's not allowed to call a symlink in the actions dir anymore option_lists = ((';', 'echo', 'hello'),
self.assertEqual( ('&&', 'echo', 'hello'),
subprocess.check_output(shlex.split("id -ur")).strip(), ('||', 'echo', 'hello'),
run("id", ["-u" ,"-r"])[0].strip()) ('&', 'echo', 'hello'),
(r'\;', 'echo' 'hello'),
('|', 'echo', 'hello'),
tuple(r':;!&\/$%@`"~#*(){}[]|+='))
for options in option_lists:
output = run('echo', options)
output = output.rstrip('\n')
expected_output = ' '.join(options)
self.assertEqual(output, expected_output)
def test_multiple_options_and_output(self):
"""4. Multiple options can be provided as a list or as a tuple.
5. Output is returned from the command.
"""
options = '1 2 3 4 5 6 7 8 9'
output = run('echo', options.split())
output = output.rstrip('\n')
self.assertEqual(options, output)
output = run('echo', tuple(options.split()))
output = output.rstrip('\n')
self.assertEqual(options, output)
if __name__ == "__main__": if __name__ == "__main__":