actions: test case for is-package-manager-busy command

comments, import optimization and YAPF formatting.

Signed-off-by: Joseph Nuthalapati <njoseph@thoughtworks.com>
This commit is contained in:
Joseph Nuthalapati 2018-01-30 10:49:15 +05:30
parent 5c83dea442
commit 9dae13ada5
No known key found for this signature in database
GPG Key ID: 5398F00A2FA43C35
3 changed files with 54 additions and 52 deletions

View File

@ -15,16 +15,15 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
""" """
Wrapper to handle package installation with apt-get. Wrapper to handle package installation with apt-get.
""" """
import argparse import argparse
from importlib import import_module
import os import os
import subprocess import subprocess
import sys import sys
from importlib import import_module
from plinth import cfg from plinth import cfg
@ -41,8 +40,8 @@ def parse_arguments():
subparser = subparsers.add_parser('install', help='install packages') subparser = subparsers.add_parser('install', help='install packages')
subparser.add_argument( subparser.add_argument(
'module', help='name of module for which package is being installed') 'module', help='name of module for which package is being installed')
subparser.add_argument( subparser.add_argument('packages', nargs='+',
'packages', nargs='+', help='list of packages to install') help='list of packages to install')
subparsers.add_parser('is-package-manager-busy', subparsers.add_parser('is-package-manager-busy',
help='Return whether package manager is busy') help='Return whether package manager is busy')
@ -53,8 +52,9 @@ def parse_arguments():
def _run_apt_command(arguments): def _run_apt_command(arguments):
"""Run apt-get with provided arguments.""" """Run apt-get with provided arguments."""
# Ask apt-get to output its progress to file descriptor 3. # Ask apt-get to output its progress to file descriptor 3.
command = ['apt-get', '--assume-yes', '--quiet=2', '--option', command = [
'APT::Status-Fd=3'] + arguments 'apt-get', '--assume-yes', '--quiet=2', '--option', 'APT::Status-Fd=3'
] + arguments
# Duplicate stdout to file descriptor 3 for this process. # Duplicate stdout to file descriptor 3 for this process.
os.dup2(1, 3) os.dup2(1, 3)
@ -63,9 +63,9 @@ def _run_apt_command(arguments):
# so that regular output is ignored. # so that regular output is ignored.
env = os.environ.copy() env = os.environ.copy()
env['DEBIAN_FRONTEND'] = 'noninteractive' env['DEBIAN_FRONTEND'] = 'noninteractive'
process = subprocess.run( process = subprocess.run(command, stdin=subprocess.DEVNULL,
command, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL, close_fds=False,
close_fds=False, env=env) env=env)
sys.exit(process.returncode) sys.exit(process.returncode)
@ -99,7 +99,9 @@ def _assert_managed_packages(module, packages):
def subcommand_is_package_manager_busy(_): def subcommand_is_package_manager_busy(_):
"""Return whether package manager is busy.""" """Return whether package manager is busy.
This command uses the `lsof` command to check whether the dpkg lock file
is open which indicates that the package manager is busy"""
try: try:
subprocess.check_output(['lsof', LOCK_FILE]) subprocess.check_output(['lsof', LOCK_FILE])
except subprocess.CalledProcessError: except subprocess.CalledProcessError:

View File

@ -14,7 +14,6 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
"""Run specified actions. """Run specified actions.
Actions run commands with this contract (version 1.1): Actions run commands with this contract (version 1.1):
@ -109,7 +108,8 @@ def run(action, options=None, input=None, async=False):
return _run(action, options, input, async, False) return _run(action, options, input, async, False)
def superuser_run(action, options=None, input=None, async=False, log_error=True): def superuser_run(action, options=None, input=None, async=False,
log_error=True):
"""Safely run a specific action as root. """Safely run a specific action as root.
See actions._run for more information. See actions._run for more information.
@ -175,20 +175,16 @@ def _run(action, options=None, input=None, async=False, run_as_root=False,
# Contract 3C: don't interpret shell escape sequences. # Contract 3C: don't interpret shell escape sequences.
# Contract 5 (and 6-ish). # Contract 5 (and 6-ish).
proc = subprocess.Popen( proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
cmd, stderr=subprocess.PIPE, shell=False)
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False)
if not async: if not async:
output, error = proc.communicate(input=input) output, error = proc.communicate(input=input)
output, error = output.decode(), error.decode() output, error = output.decode(), error.decode()
if proc.returncode != 0: if proc.returncode != 0:
if log_error: if log_error:
LOGGER.error('Error executing command - %s, %s, %s', cmd, output, LOGGER.error('Error executing command - %s, %s, %s', cmd,
error) output, error)
raise ActionError(action, output, error) raise ActionError(action, output, error)
return output return output

View File

@ -14,36 +14,39 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
""" """
Test module for actions utilities that modify configuration. Test module for actions utilities that modify configuration.
""" """
import apt_pkg
import os import os
import shutil import shutil
import tempfile import tempfile
import unittest import unittest
from plinth.errors import ActionError import apt_pkg
from plinth.actions import superuser_run, run
from plinth import cfg from plinth import cfg
from plinth.actions import run, superuser_run
from plinth.errors import ActionError
class TestPrivileged(unittest.TestCase): class TestActions(unittest.TestCase):
"""Verify that privileged actions perform as expected. """Verify that privileged actions perform as expected.
See actions.py for a full description of the expectations. See actions.py for a full description of the expectations.
Symlink to ``echo`` and ``id`` during testing. Symlink to ``echo`` and ``id`` during testing.
""" """
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
"""Initial setup for all the classes.""" """Initial setup for all the classes."""
cls.action_directory = tempfile.TemporaryDirectory() cls.action_directory = tempfile.TemporaryDirectory()
cfg.actions_dir = cls.action_directory.name cfg.actions_dir = cls.action_directory.name
shutil.copy(os.path.join(os.path.dirname(__file__), '..', '..', 'actions', 'packages'), cfg.actions_dir) shutil.copy(
os.path.join(
os.path.dirname(__file__), '..', '..', 'actions', 'packages'),
cfg.actions_dir)
shutil.copy('/bin/echo', cfg.actions_dir) shutil.copy('/bin/echo', cfg.actions_dir)
shutil.copy('/usr/bin/id', cfg.actions_dir) shutil.copy('/usr/bin/id', cfg.actions_dir)
@ -86,8 +89,7 @@ class TestPrivileged(unittest.TestCase):
If multiple actions are specified, bail out. If multiple actions are specified, bail out.
""" """
# Counting is safer than actual badness. # Counting is safer than actual badness.
actions = ('echo ""; echo $((1+1))', actions = ('echo ""; echo $((1+1))', 'echo "" && echo $((1+1))',
'echo "" && echo $((1+1))',
'echo "" || echo $((1+1))') 'echo "" || echo $((1+1))')
options = ('good', '') options = ('good', '')
@ -101,12 +103,8 @@ class TestPrivileged(unittest.TestCase):
Verify that shell control characters aren't interpreted. Verify that shell control characters aren't interpreted.
""" """
options = ('; echo hello', options = ('; echo hello', '&& echo hello', '|| echo hello',
'&& echo hello', '& echo hello', r'\; echo hello', '| echo hello',
'|| echo hello',
'& echo hello',
r'\; echo hello',
'| echo hello',
r':;!&\/$%@`"~#*(){}[]|+=') r':;!&\/$%@`"~#*(){}[]|+=')
for option in options: for option in options:
output = run('echo', [option]) output = run('echo', [option])
@ -119,11 +117,13 @@ class TestPrivileged(unittest.TestCase):
Verify that shell control characters aren't interpreted in Verify that shell control characters aren't interpreted in
option lists. option lists.
""" """
option_lists = ((';', 'echo', 'hello'), option_lists = (
(';', 'echo', 'hello'),
('&&', 'echo', 'hello'), ('&&', 'echo', 'hello'),
('||', 'echo', 'hello'), ('||', 'echo', 'hello'),
('&', 'echo', 'hello'), ('&', 'echo', 'hello'),
(r'\;', 'echo' 'hello'), (r'\;', 'echo'
'hello'),
('|', 'echo', 'hello'), ('|', 'echo', 'hello'),
('', 'echo', '', 'hello'), # Empty option argument ('', 'echo', '', 'hello'), # Empty option argument
tuple(r':;!&\/$%@`"~#*(){}[]|+=')) tuple(r':;!&\/$%@`"~#*(){}[]|+='))
@ -148,14 +148,18 @@ class TestPrivileged(unittest.TestCase):
output = output.rstrip('\n') output = output.rstrip('\n')
self.assertEqual(options, output) self.assertEqual(options, output)
def test_error_handling_for_superuser(self): def test_is_package_manager_busy(self):
"""Test that errors are raised only when expected.""" """Test the behavior of `is-package-manager-busy` in both locked and
unlocked states of the dpkg lock file."""
apt_pkg.init() # initialize apt_pkg module
# In the locked state, the lsof command returns 0.
# Hence no error is thrown.
with apt_pkg.SystemLock(): with apt_pkg.SystemLock():
with self.assertRaises(ActionError): superuser_run('packages', ['is-package-manager-busy'])
superuser_run('packages', ['is-package-manager-busy'],
log_error=True) # In the unlocked state, the lsof command returns 1.
with self.assertRaises(ActionError): # An ActionError is raised in this case.
superuser_run('packages', ['is-package-manager-busy'],
log_error=False)
with self.assertRaises(ActionError): with self.assertRaises(ActionError):
superuser_run('packages', ['is-package-manager-busy']) superuser_run('packages', ['is-package-manager-busy'])