diff --git a/plinth/action_utils.py b/plinth/action_utils.py index 3e483c14d..d38a393ce 100644 --- a/plinth/action_utils.py +++ b/plinth/action_utils.py @@ -356,31 +356,6 @@ def diagnose_url_on_all(url, **kwargs): return results -def diagnose_netcat(host, port, input='', negate=False): - """Run a diagnostic using netcat.""" - try: - process = subprocess.Popen(['nc', host, str(port)], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - process.communicate(input=input.encode()) - if process.returncode != 0: - result = 'failed' - else: - result = 'passed' - - if negate: - result = 'failed' if result == 'passed' else 'passed' - except Exception: - result = 'failed' - - test = _('Connect to {host}:{port}') - if negate: - test = _('Cannot connect to {host}:{port}') - - return [test.format(host=host, port=port), result] - - def get_addresses(): """Return a list of IP addresses and hostnames.""" addresses = get_ip_addresses() diff --git a/plinth/daemon.py b/plinth/daemon.py index c867e8171..d48166460 100644 --- a/plinth/daemon.py +++ b/plinth/daemon.py @@ -19,6 +19,7 @@ Component for managing a background daemon or any systemd unit. """ import socket +import subprocess import psutil from django.utils.translation import ugettext as _ @@ -158,3 +159,28 @@ def _check_port(port, kind='tcp', listen_address=None): return True return False + + +def diagnose_netcat(host, port, input='', negate=False): + """Run a diagnostic using netcat.""" + try: + process = subprocess.Popen(['nc', host, str(port)], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + process.communicate(input=input.encode()) + if process.returncode != 0: + result = 'failed' + else: + result = 'passed' + + if negate: + result = 'failed' if result == 'passed' else 'passed' + except Exception: + result = 'failed' + + test = _('Connect to {host}:{port}') + if negate: + test = _('Cannot connect to {host}:{port}') + + return [test.format(host=host, port=port), result] diff --git a/plinth/modules/tor/__init__.py b/plinth/modules/tor/__init__.py index f6ccefb73..bd97e8d2b 100644 --- a/plinth/modules/tor/__init__.py +++ b/plinth/modules/tor/__init__.py @@ -25,7 +25,7 @@ from django.utils.translation import ugettext_lazy as _ from plinth import action_utils, actions from plinth import app as app_module from plinth import menu -from plinth.daemon import Daemon, diagnose_port_listening +from plinth.daemon import Daemon, diagnose_netcat, diagnose_port_listening from plinth.modules.firewall.components import Firewall from plinth.modules.names.components import DomainType from plinth.signals import domain_added, domain_removed @@ -212,8 +212,8 @@ def _diagnose_control_port(): negate = False results.append( - action_utils.diagnose_netcat(address['address'], 9051, - input='QUIT\n', negate=negate)) + diagnose_netcat(address['address'], 9051, input='QUIT\n', + negate=negate)) return results diff --git a/plinth/tests/test_daemon.py b/plinth/tests/test_daemon.py index 68c39999b..6a969fa6f 100644 --- a/plinth/tests/test_daemon.py +++ b/plinth/tests/test_daemon.py @@ -24,7 +24,8 @@ from unittest.mock import Mock, call, patch import pytest from plinth.app import App, FollowerComponent -from plinth.daemon import Daemon, app_is_running, diagnose_port_listening +from plinth.daemon import (Daemon, app_is_running, diagnose_netcat, + diagnose_port_listening) @pytest.fixture(name='daemon') @@ -210,3 +211,25 @@ def test_diagnose_port_listening(connections): assert diagnose_port_listening(3456, 'udp4')[1] == 'passed' assert diagnose_port_listening(6789, 'udp4')[1] == 'passed' assert diagnose_port_listening(5678, 'udp4')[1] == 'failed' + + +@patch('subprocess.Popen') +def test_diagnose_netcat(popen): + """Test running diagnostic test using netcat.""" + popen().returncode = 0 + result = diagnose_netcat('test-host', 3300, input='test-input') + assert result == ['Connect to test-host:3300', 'passed'] + assert popen.mock_calls[1][1] == (['nc', 'test-host', '3300'], ) + assert popen.mock_calls[2] == call().communicate(input=b'test-input') + + result = diagnose_netcat('test-host', 3300, input='test-input', + negate=True) + assert result == ['Cannot connect to test-host:3300', 'failed'] + + popen().returncode = 1 + result = diagnose_netcat('test-host', 3300, input='test-input') + assert result == ['Connect to test-host:3300', 'failed'] + + result = diagnose_netcat('test-host', 3300, input='test-input', + negate=True) + assert result == ['Cannot connect to test-host:3300', 'passed']