daemon: Move diagnosing port listening into daemon module

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2019-12-17 14:39:03 -08:00 committed by James Valleroy
parent a1fd8f45be
commit 97d8166f31
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
6 changed files with 158 additions and 98 deletions

View File

@ -22,11 +22,9 @@ import logging
import os
import re
import shutil
import socket
import subprocess
import tempfile
import psutil
from django.utils.translation import ugettext as _
logger = logging.getLogger(__name__)
@ -285,69 +283,6 @@ def uwsgi_disable(config_name):
service_start('uwsgi')
def diagnose_port_listening(port, kind='tcp', listen_address=None):
"""Run a diagnostic on whether a port is being listened on.
Kind must be one of inet, inet4, inet6, tcp, tcp4, tcp6, udp,
udp4, udp6, unix, all. See psutil.net_connection() for more
information.
"""
result = _check_port(port, kind, listen_address)
if listen_address:
test = _('Listening on {kind} port {listen_address}:{port}') \
.format(kind=kind, listen_address=listen_address, port=port)
else:
test = _('Listening on {kind} port {port}') \
.format(kind=kind, port=port)
return [test, 'passed' if result else 'failed']
def _check_port(port, kind='tcp', listen_address=None):
"""Return whether a port is being listened on."""
run_kind = kind
if kind == 'tcp4':
run_kind = 'tcp'
if kind == 'udp4':
run_kind = 'udp'
for connection in psutil.net_connections(run_kind):
# TCP connections must have status='listen'
if kind in ('tcp', 'tcp4', 'tcp6') and \
connection.status != psutil.CONN_LISTEN:
continue
# UDP connections must have empty remote address
if kind in ('udp', 'udp4', 'udp6') and \
connection.raddr != ():
continue
# Port should match
if connection.laddr[1] != port:
continue
# Listen address if requested should match
if listen_address and connection.laddr[0] != listen_address:
continue
# Special additional checks only for IPv4
if kind != 'tcp4' and kind != 'udp4':
return True
# Found socket is IPv4
if connection.family == socket.AF_INET:
return True
# Full IPv6 address range includes mapped IPv4 address also
if connection.laddr[0] == '::':
return True
return False
def check_url(url, kind=None, env=None, check_certificate=True,
extra_options=None, wrapper=None, expected_output=None):
"""Check whether a URL is accessible."""

View File

@ -18,6 +18,11 @@
Component for managing a background daemon or any systemd unit.
"""
import socket
import psutil
from django.utils.translation import ugettext as _
from plinth import action_utils, actions, app
@ -70,8 +75,7 @@ class Daemon(app.LeaderComponent):
results = []
results.append(self._diagnose_unit_is_running())
for port in self.listen_ports:
results.append(
action_utils.diagnose_port_listening(port[0], port[1]))
results.append(diagnose_port_listening(port[0], port[1]))
return results
@ -90,3 +94,67 @@ def app_is_running(app_):
return False
return True
def diagnose_port_listening(port, kind='tcp', listen_address=None):
"""Run a diagnostic on whether a port is being listened on.
Kind must be one of inet, inet4, inet6, tcp, tcp4, tcp6, udp,
udp4, udp6, unix, all. See psutil.net_connection() for more
information.
"""
result = _check_port(port, kind, listen_address)
if listen_address:
test = _('Listening on {kind} port {listen_address}:{port}') \
.format(kind=kind, listen_address=listen_address, port=port)
else:
test = _('Listening on {kind} port {port}') \
.format(kind=kind, port=port)
return [test, 'passed' if result else 'failed']
def _check_port(port, kind='tcp', listen_address=None):
"""Return whether a port is being listened on."""
run_kind = kind
if kind == 'tcp4':
run_kind = 'tcp'
if kind == 'udp4':
run_kind = 'udp'
for connection in psutil.net_connections(run_kind):
# TCP connections must have status='listen'
if kind in ('tcp', 'tcp4', 'tcp6') and \
connection.status != psutil.CONN_LISTEN:
continue
# UDP connections must have empty remote address
if kind in ('udp', 'udp4', 'udp6') and \
connection.raddr != ():
continue
# Port should match
if connection.laddr[1] != port:
continue
# Listen address if requested should match
if listen_address and connection.laddr[0] != listen_address:
continue
# Special additional checks only for IPv4
if kind not in ('tcp4', 'udp4'):
return True
# Found socket is IPv4
if connection.family == socket.AF_INET:
return True
# Full IPv6 address range includes mapped IPv4 address also
if connection.laddr[0] == '::':
return True
return False

View File

@ -22,7 +22,7 @@ from django.utils.translation import ugettext_lazy as _
from plinth import action_utils
from plinth import app as app_module
from plinth import menu
from plinth import daemon, menu
from .manifest import backup # noqa, pylint: disable=unused-import
@ -58,7 +58,7 @@ class DiagnosticsApp(app_module.App):
def diagnose(self):
"""Run diagnostics and return the results."""
results = super().diagnose()
results.append(action_utils.diagnose_port_listening(8000, 'tcp4'))
results.append(daemon.diagnose_port_listening(8000, 'tcp4'))
results.extend(
action_utils.diagnose_url_on_all('http://{host}/plinth/',
check_certificate=False))

View File

@ -23,9 +23,9 @@ from logging import Logger
from django.utils.translation import ugettext_lazy as _
from plinth import action_utils, actions
from plinth import actions
from plinth import app as app_module
from plinth import menu, network
from plinth import daemon, menu, network
version = 1
@ -69,10 +69,8 @@ class NetworksApp(app_module.App):
addresses = _get_interface_addresses(interfaces)
for address in addresses:
results.append(
action_utils.diagnose_port_listening(53, 'tcp', address))
results.append(
action_utils.diagnose_port_listening(53, 'udp', address))
results.append(daemon.diagnose_port_listening(53, 'tcp', address))
results.append(daemon.diagnose_port_listening(53, 'udp', address))
results.append(_diagnose_dnssec('4'))
results.append(_diagnose_dnssec('6'))

View File

@ -25,7 +25,7 @@ from django.utils.translation import ugettext_lazy as _
from plinth import action_utils, actions
from plinth import app as app_module
from plinth import menu
from plinth.daemon import Daemon
from plinth.daemon import Daemon, diagnose_port_listening
from plinth.modules.firewall.components import Firewall
from plinth.modules.names.components import DomainType
from plinth.signals import domain_added, domain_removed
@ -113,35 +113,29 @@ class TorApp(app_module.App):
])
if 'orport' in ports:
results.append(
action_utils.diagnose_port_listening(int(ports['orport']),
'tcp4'))
diagnose_port_listening(int(ports['orport']), 'tcp4'))
results.append(
action_utils.diagnose_port_listening(int(ports['orport']),
'tcp6'))
diagnose_port_listening(int(ports['orport']), 'tcp6'))
results.append([
_('Obfs3 transport registered'),
'passed' if 'obfs3' in ports else 'failed'
])
if 'obfs3' in ports:
results.append(
action_utils.diagnose_port_listening(int(ports['obfs3']),
'tcp4'))
results.append(
action_utils.diagnose_port_listening(int(ports['obfs3']),
'tcp6'))
results.append(diagnose_port_listening(int(ports['obfs3']),
'tcp4'))
results.append(diagnose_port_listening(int(ports['obfs3']),
'tcp6'))
results.append([
_('Obfs4 transport registered'),
'passed' if 'obfs4' in ports else 'failed'
])
if 'obfs4' in ports:
results.append(
action_utils.diagnose_port_listening(int(ports['obfs4']),
'tcp4'))
results.append(
action_utils.diagnose_port_listening(int(ports['obfs4']),
'tcp6'))
results.append(diagnose_port_listening(int(ports['obfs4']),
'tcp4'))
results.append(diagnose_port_listening(int(ports['obfs4']),
'tcp6'))
results.append(_diagnose_url_via_tor('http://www.debian.org', '4'))
results.append(_diagnose_url_via_tor('http://www.debian.org', '6'))

View File

@ -18,12 +18,13 @@
Test module for component managing system daemons and other systemd units.
"""
import socket
from unittest.mock import Mock, call, patch
import pytest
from plinth.app import App, FollowerComponent
from plinth.daemon import Daemon, app_is_running
from plinth.daemon import Daemon, app_is_running, diagnose_port_listening
@pytest.fixture(name='daemon')
@ -93,22 +94,21 @@ def test_is_running(service_is_running, daemon):
@patch('plinth.action_utils.service_is_running')
@patch('plinth.action_utils.diagnose_port_listening')
def test_diagnose(diagnose_port_listening, service_is_running, daemon):
@patch('plinth.daemon.diagnose_port_listening')
def test_diagnose(port_listening, service_is_running, daemon):
"""Test running diagnostics."""
def side_effect(port, kind):
return [f'test-result-{port}-{kind}', 'passed']
daemon = Daemon('test-daemon', 'test-unit', listen_ports=[(8273, 'tcp4'),
(345, 'udp')])
diagnose_port_listening.side_effect = side_effect
port_listening.side_effect = side_effect
service_is_running.return_value = True
results = daemon.diagnose()
assert results == [['Service test-unit is running', 'passed'],
['test-result-8273-tcp4', 'passed'],
['test-result-345-udp', 'passed']]
diagnose_port_listening.assert_has_calls(
[call(8273, 'tcp4'), call(345, 'udp')])
port_listening.assert_has_calls([call(8273, 'tcp4'), call(345, 'udp')])
service_is_running.assert_has_calls([call('test-unit')])
service_is_running.return_value = False
@ -145,3 +145,68 @@ def test_app_is_running(service_is_running):
service_is_running.return_value = True
daemon2.is_running.return_value = True
assert app_is_running(app)
@patch('psutil.net_connections')
def test_diagnose_port_listening(connections):
"""Test running port listening diagnostics test."""
connections.return_value = [
Mock(status='LISTEN', laddr=('0.0.0.0', 1234), family=socket.AF_INET),
Mock(status='ESTABLISHED', laddr=('0.0.0.0', 2345),
family=socket.AF_INET),
Mock(raddr=(), laddr=('0.0.0.0', 3456), family=socket.AF_INET),
Mock(raddr=('1.1.1.1', 53), laddr=('0.0.0.0', 4567),
family=socket.AF_INET),
Mock(status='LISTEN', laddr=('::1', 5678), familiy=socket.AF_INET6),
Mock(status='LISTEN', laddr=('::', 6789), familiy=socket.AF_INET6),
Mock(raddr=(), laddr=('::1', 5678), familiy=socket.AF_INET6),
Mock(raddr=(), laddr=('::', 6789), familiy=socket.AF_INET6),
]
# Check that message is correct
results = diagnose_port_listening(1234)
assert results == ['Listening on tcp port 1234', 'passed']
results = diagnose_port_listening(1234, 'tcp', '0.0.0.0')
assert results == ['Listening on tcp port 0.0.0.0:1234', 'passed']
# Failed results
results = diagnose_port_listening(4321)
assert results == ['Listening on tcp port 4321', 'failed']
results = diagnose_port_listening(4321, 'tcp', '0.0.0.0')
assert results == ['Listening on tcp port 0.0.0.0:4321', 'failed']
# Check if psutil call is being made with right argument
results = diagnose_port_listening(1234, 'tcp')
connections.assert_called_with('tcp')
results = diagnose_port_listening(1234, 'tcp4')
connections.assert_called_with('tcp')
results = diagnose_port_listening(1234, 'tcp6')
connections.assert_called_with('tcp6')
results = diagnose_port_listening(3456, 'udp')
connections.assert_called_with('udp')
results = diagnose_port_listening(3456, 'udp4')
connections.assert_called_with('udp')
results = diagnose_port_listening(3456, 'udp6')
connections.assert_called_with('udp6')
# TCP
assert diagnose_port_listening(1234)[1] == 'passed'
assert diagnose_port_listening(1000)[1] == 'failed'
assert diagnose_port_listening(2345)[1] == 'failed'
assert diagnose_port_listening(1234, 'tcp', '0.0.0.0')[1] == 'passed'
assert diagnose_port_listening(1234, 'tcp', '1.1.1.1')[1] == 'failed'
assert diagnose_port_listening(1234, 'tcp6')[1] == 'passed'
assert diagnose_port_listening(1234, 'tcp4')[1] == 'passed'
assert diagnose_port_listening(6789, 'tcp4')[1] == 'passed'
assert diagnose_port_listening(5678, 'tcp4')[1] == 'failed'
# UDP
assert diagnose_port_listening(3456, 'udp')[1] == 'passed'
assert diagnose_port_listening(3000, 'udp')[1] == 'failed'
assert diagnose_port_listening(4567, 'udp')[1] == 'failed'
assert diagnose_port_listening(3456, 'udp', '0.0.0.0')[1] == 'passed'
assert diagnose_port_listening(3456, 'udp', '1.1.1.1')[1] == 'failed'
assert diagnose_port_listening(3456, 'udp6')[1] == 'passed'
assert diagnose_port_listening(3456, 'udp4')[1] == 'passed'
assert diagnose_port_listening(6789, 'udp4')[1] == 'passed'
assert diagnose_port_listening(5678, 'udp4')[1] == 'failed'