Commands can be executed asynchronously and as non-root.

If commands are executed synchronously, they'll return output and
error strings.  If commands are executed asynchronously, nothing is
returned.  We assume you can communicate with asynchronous processes
out-of-band.

Not every command needs to be executed as root, so there's a new
entry-point, *actions.run*, which executes actions as the current
user.
This commit is contained in:
Nick Daly 2013-11-24 18:42:15 -06:00
parent 6567dc1758
commit 0349113e97
2 changed files with 61 additions and 48 deletions

View File

@ -1,17 +1,17 @@
#! /usr/bin/env python
# -*- mode: python; mode: auto-fill; fill-column: 80 -*-
"""Run specified privileged actions as root.
"""Run specified actions.
Privileged actions run commands with this contract (version 1.0):
Actions run commands with this contract (version 1.1):
1. (promise) Privileged actions run as root.
1. (promise) Super-user actions run as root. Normal actions do not.
2. (promise) The actions directory can't be changed at run time.
This guarantees that we can only select from the correct set of actions.
3. (restriction) Only whitelisted privileged actions can run.
3. (restriction) Only whitelisted actions can run.
A. Scripts in a directory above the actions directory can't be run.
@ -58,6 +58,8 @@ Privileged actions run commands with this contract (version 1.0):
4. (promise) Options are appended to the action.
Options can be provided as a list or strings.
5. (promise) Output and error strings are returned from the command.
6. (limitation) Providing the process with input is not possible.
@ -66,26 +68,36 @@ Privileged actions run commands with this contract (version 1.0):
interaction with the spawned process must be carried out through some other
method (maybe the process opens a socket, or something).
7. Option
"""
import contract
import os
import pipes
import shlex
import subprocess
import pipes, shlex, subprocess
contract.checkmod(__name__)
def privilegedaction_run(action, options = None):
def run(action, options = None, async = False):
return _run(action, options, async, False)
def superuser_run(action, options = None, async = False):
return _run(action, options, async, True)
def _run(action, options = None, async = False, run_as_root = False):
"""Safely run a specific action as root.
pre:
os.sep not in actions
os.sep not in action
inv:
True # Actions directory hasn't changed. It's hardcoded :)
"""
DIRECTORY = "actions"
if options == None:
options = []
# contract 3A and 3B: don't call anything outside of the actions directory.
if os.sep in action:
@ -98,18 +110,27 @@ def privilegedaction_run(action, options = None):
if not os.access(cmd, os.F_OK):
raise ValueError("Action must exist in action directory.")
if hasattr(options, "__iter__"):
options = " ".join(options)
cmd = [cmd]
# contract: 3C, 3D: don't allow users to insert escape characters
cmd = ["sudo", "-n", cmd, pipes.quote(options)]
# contract: 3C, 3D: don't allow users to insert escape characters in options
if options:
if not hasattr(options, "__iter__"):
options = [options]
cmd += [pipes.quote(option) for option in options]
# contract 1: commands can run via sudo.
if run_as_root:
cmd = ["sudo", "-n"] + cmd
# contract 3C: don't interpret shell escape sequences.
# contract 5 (and 6-ish).
output, error = \
subprocess.Popen(cmd,
stdout = subprocess.PIPE,
stderr= subprocess.PIPE,
shell=False).communicate()
proc = subprocess.Popen(
cmd,
stdout = subprocess.PIPE,
stderr= subprocess.PIPE,
shell=False)
return output, error
if not async:
output, error = proc.communicate()
return output, error

View File

@ -1,32 +1,16 @@
#! /usr/bin/env python
# -*- mode: python; mode: auto-fill; fill-column: 80 -*-
from actions import superuser_run, run
import shlex
import subprocess
import sys
from privilegedactions import privilegedaction_run
import unittest
class TestPrivileged(unittest.TestCase):
"""Verify that privileged actions perform as expected:
"""Verify that privileged actions perform as expected.
1. Privileged actions run as root.
2. Only whitelisted privileged actions can run.
A. Actions can't be used to run other actions:
$ action="echo 'hi'; rm -rf /"
$ $action
B. Options can't be used to run other actions:
$ options="hi'; rm -rf /;'"
$ "echo " + "'$options'"
C. Scripts in a directory above the actions directory can't be run.
D. Scripts in a directory beneath the actions directory can't be run.
3. The actions directory can't be changed at run time.
See actions.py for a full description of the expectations.
"""
def test_run_as_root(self):
@ -35,7 +19,7 @@ class TestPrivileged(unittest.TestCase):
"""
self.assertEqual(
"0", # user 0 is root
privilegedaction_run("id", "-ur")[0].strip())
superuser_run("id", "-ur")[0].strip())
def test_breakout_actions_dir(self):
"""2. The actions directory can't be changed at run time.
@ -55,13 +39,13 @@ class TestPrivileged(unittest.TestCase):
for arg in ("../echo", "/bin/echo"):
with self.assertRaises(ValueError):
privilegedaction_run(arg, options)
run(arg, options)
def test_breakout_down(self):
"""3B. Users can't call actions beneath the actions directory."""
action="directory/echo"
self.assertRaises(ValueError, privilegedaction_run, action)
self.assertRaises(ValueError, superuser_run, action)
def test_breakout_actions(self):
"""3C. Actions can't be used to run other actions.
@ -78,14 +62,14 @@ class TestPrivileged(unittest.TestCase):
for action in actions:
for option in options:
with self.assertRaises(ValueError):
output = privilegedaction_run(action, option)
output = run(action, option)
# if it somewhow doesn't error, we'd better not evaluate the
# data.
self.assertFalse("2" in output[0])
def test_breakout_option_string(self):
"""3D. Options can't be used to run other actions.
"""3D. Option strings can't be used to run other actions.
Verify that shell control characters aren't interpreted.
@ -94,12 +78,12 @@ class TestPrivileged(unittest.TestCase):
# counting is safer than actual badness.
options = "good; echo $((1+1))"
output, error = privilegedaction_run(action, options)
output, error = run(action, options)
self.assertFalse("2" in output)
def test_breakout_option_list(self):
"""3D. Options can't be used to run other actions.
"""3D. Option lists can't be used to run other actions.
Verify that only a string of options is accepted and that we can't just
tack additional shell control characters onto the list.
@ -109,10 +93,18 @@ class TestPrivileged(unittest.TestCase):
# counting is safer than actual badness.
options = ["good", ";", "echo $((1+1))"]
output, error = privilegedaction_run(action, options)
output, error = run(action, options)
# we'd better not evaluate the data.
self.assertFalse("2" in output)
def test_multiple_options(self):
"""4. Multiple options can be provided as a list.
"""
self.assertEqual(
subprocess.check_output(shlex.split("id -ur")).strip(),
run("id", ["-u" ,"-r"])[0].strip())
if __name__ == "__main__":
unittest.main()