From 49c4c1dce6d943a9840cc151714e688e4df98e20 Mon Sep 17 00:00:00 2001 From: Sunil Mohan Adapa Date: Fri, 21 Aug 2015 19:44:48 +0530 Subject: [PATCH] diagnostics: Action helpers for modules - Helper for checking whether a port is being listened on. - Helper for checking whether a URL is accessible. - Helper for checking whether a URL is accessible with all possible IPs and machine names. - Helper for checking whether it is possible to connect to a port. --- plinth/action_utils.py | 168 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) diff --git a/plinth/action_utils.py b/plinth/action_utils.py index afff6c85b..78108dbcf 100644 --- a/plinth/action_utils.py +++ b/plinth/action_utils.py @@ -19,6 +19,9 @@ Python action utility functions. """ +from gettext import gettext as _ +import psutil +import socket import subprocess @@ -173,3 +176,168 @@ class WebserverChange(object): """ restart_required = webserver_disable(name, kind, apply_changes=False) self.restart_required = self.restart_required or restart_required + + +def diagnose_port_listening(port, kind='tcp', listen_address=None): + """Run a diagnostic on whether a port is being listened on. + + Kind must be one of inet, inet4, inet6, tcp, tcp4, tcp6, udp, + udp4, udp6, unix, all. See psutil.net_connection() for more + information. + """ + result = _check_port(port, kind, listen_address) + + if listen_address: + test = _('Listening on {kind} port {listen_address}:{port}') \ + .format(kind=kind, listen_address=listen_address, port=port) + else: + test = _('Listening on {kind} port {port}') \ + .format(kind=kind, port=port) + + return [test, 'passed' if result else 'failed'] + + +def _check_port(port, kind='tcp', listen_address=None): + """Return whether a port is being listened on.""" + run_kind = kind + + if kind == 'tcp4': + run_kind = 'tcp' + + if kind == 'udp4': + run_kind = 'udp' + + for connection in psutil.net_connections(run_kind): + # TCP connections must have status='listen' + if kind in ('tcp', 'tcp4', 'tcp6') and \ + connection.status != psutil.CONN_LISTEN: + continue + + # UDP connections must have empty remote address + if kind in ('udp', 'udp4', 'udp6') and \ + connection.raddr != (): + continue + + # Port should match + if connection.laddr[1] != port: + continue + + # Listen address if requested should match + if listen_address and connection.laddr[0] != listen_address: + continue + + # Special additional checks only for IPv4 + if kind != 'tcp4' and kind != 'udp4': + return True + + # Found socket is IPv4 + if connection.family == socket.AF_INET: + return True + + # Full IPv6 address range includes mapped IPv4 address also + if connection.laddr[0] == '::': + return True + + return False + + +def diagnose_url(url, kind=None, env=None, extra_options=None, wrapper=None, + expected_output=None): + """Run a diagnostic on whether a URL is accessible. + + Kind can be '4' for IPv4 or '6' for IPv6. + """ + command = ['wget', '-q', '-O', '-', url] + + if wrapper: + command.insert(0, wrapper) + + if extra_options: + command.extend(extra_options) + + if kind: + command.append({'4': '-4', '6': '-6'}[kind]) + + try: + output = subprocess.check_output(command, env=env) + result = 'passed' + if expected_output and expected_output not in output.decode(): + result = 'failed' + except subprocess.CalledProcessError as exception: + result = 'failed' + # Authorization failed is a success + if exception.returncode == 6: + result = 'passed' + + if kind: + return [_('Access URL {url} on tcp{kind}') + .format(url=url, kind=kind), result] + else: + return [_('Access URL {url}').format(url=url), result] + + +def diagnose_url_on_all(url, extra_options=None): + """Run a diagnostic on whether a URL is accessible.""" + results = [] + for address in get_addresses(): + if address['kind'] == '6' and ':' in address['address']: + address['address'] = '[{0}]'.format(address['address']) + + current_url = url.format(host=address['address']) + results.append(diagnose_url(current_url, kind=address['kind'], + extra_options=extra_options)) + + return results + + +def diagnose_netcat(host, port, input='', negate=False): + """Run a diagnostic using netcat.""" + try: + process = subprocess.Popen( + ['nc', host, str(port)], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process.communicate(input=input.encode()) + if process.returncode != 0: + result = 'failed' + else: + result = 'passed' + except Exception: + result = 'failed' + + test = _('Connect to {host}:{port}') + if negate: + result = 'failed' if result == 'passed' else 'passed' + test = _('Cannot connect to {host}:{port}') + + return [test.format(host=host, port=port), result] + + +def get_addresses(): + """Return a list of IP addreses and hostnames.""" + addresses = get_ip_addresses() + + hostname = get_hostname() + addresses.append({'kind': '4', 'address': 'localhost'}) + addresses.append({'kind': '6', 'address': 'localhost'}) + addresses.append({'kind': '4', 'address': hostname}) + addresses.append({'kind': '6', 'address': hostname}) + + return addresses + + +def get_ip_addresses(): + """Return a list of IP addresses assigned to the system.""" + addresses = [] + + output = subprocess.check_output(['ip', '-o', 'addr']) + for line in output.decode().splitlines(): + parts = line.split() + addresses.append({'kind': '4' if parts[2] == 'inet' else '6', + 'address': parts[3].split('/')[0]}) + + return addresses + + +def get_hostname(): + """Return the current hostname.""" + return subprocess.check_output(['hostname']).decode().strip()