diagnostics: Action helpers for modules

- Helper for checking whether a port is being listened on.

- Helper for checking whether a URL is accessible.

- Helper for checking whether a URL is accessible with all possible IPs
  and machine names.

- Helper for checking whether it is possible to connect to a port.
This commit is contained in:
Sunil Mohan Adapa 2015-08-21 19:44:48 +05:30 committed by James Valleroy
parent 991f848c23
commit 49c4c1dce6

View File

@ -19,6 +19,9 @@
Python action utility functions.
"""
from gettext import gettext as _
import psutil
import socket
import subprocess
@ -173,3 +176,168 @@ class WebserverChange(object):
"""
restart_required = webserver_disable(name, kind, apply_changes=False)
self.restart_required = self.restart_required or restart_required
def diagnose_port_listening(port, kind='tcp', listen_address=None):
"""Run a diagnostic on whether a port is being listened on.
Kind must be one of inet, inet4, inet6, tcp, tcp4, tcp6, udp,
udp4, udp6, unix, all. See psutil.net_connection() for more
information.
"""
result = _check_port(port, kind, listen_address)
if listen_address:
test = _('Listening on {kind} port {listen_address}:{port}') \
.format(kind=kind, listen_address=listen_address, port=port)
else:
test = _('Listening on {kind} port {port}') \
.format(kind=kind, port=port)
return [test, 'passed' if result else 'failed']
def _check_port(port, kind='tcp', listen_address=None):
"""Return whether a port is being listened on."""
run_kind = kind
if kind == 'tcp4':
run_kind = 'tcp'
if kind == 'udp4':
run_kind = 'udp'
for connection in psutil.net_connections(run_kind):
# TCP connections must have status='listen'
if kind in ('tcp', 'tcp4', 'tcp6') and \
connection.status != psutil.CONN_LISTEN:
continue
# UDP connections must have empty remote address
if kind in ('udp', 'udp4', 'udp6') and \
connection.raddr != ():
continue
# Port should match
if connection.laddr[1] != port:
continue
# Listen address if requested should match
if listen_address and connection.laddr[0] != listen_address:
continue
# Special additional checks only for IPv4
if kind != 'tcp4' and kind != 'udp4':
return True
# Found socket is IPv4
if connection.family == socket.AF_INET:
return True
# Full IPv6 address range includes mapped IPv4 address also
if connection.laddr[0] == '::':
return True
return False
def diagnose_url(url, kind=None, env=None, extra_options=None, wrapper=None,
expected_output=None):
"""Run a diagnostic on whether a URL is accessible.
Kind can be '4' for IPv4 or '6' for IPv6.
"""
command = ['wget', '-q', '-O', '-', url]
if wrapper:
command.insert(0, wrapper)
if extra_options:
command.extend(extra_options)
if kind:
command.append({'4': '-4', '6': '-6'}[kind])
try:
output = subprocess.check_output(command, env=env)
result = 'passed'
if expected_output and expected_output not in output.decode():
result = 'failed'
except subprocess.CalledProcessError as exception:
result = 'failed'
# Authorization failed is a success
if exception.returncode == 6:
result = 'passed'
if kind:
return [_('Access URL {url} on tcp{kind}')
.format(url=url, kind=kind), result]
else:
return [_('Access URL {url}').format(url=url), result]
def diagnose_url_on_all(url, extra_options=None):
"""Run a diagnostic on whether a URL is accessible."""
results = []
for address in get_addresses():
if address['kind'] == '6' and ':' in address['address']:
address['address'] = '[{0}]'.format(address['address'])
current_url = url.format(host=address['address'])
results.append(diagnose_url(current_url, kind=address['kind'],
extra_options=extra_options))
return results
def diagnose_netcat(host, port, input='', negate=False):
"""Run a diagnostic using netcat."""
try:
process = subprocess.Popen(
['nc', host, str(port)], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.communicate(input=input.encode())
if process.returncode != 0:
result = 'failed'
else:
result = 'passed'
except Exception:
result = 'failed'
test = _('Connect to {host}:{port}')
if negate:
result = 'failed' if result == 'passed' else 'passed'
test = _('Cannot connect to {host}:{port}')
return [test.format(host=host, port=port), result]
def get_addresses():
"""Return a list of IP addreses and hostnames."""
addresses = get_ip_addresses()
hostname = get_hostname()
addresses.append({'kind': '4', 'address': 'localhost'})
addresses.append({'kind': '6', 'address': 'localhost'})
addresses.append({'kind': '4', 'address': hostname})
addresses.append({'kind': '6', 'address': hostname})
return addresses
def get_ip_addresses():
"""Return a list of IP addresses assigned to the system."""
addresses = []
output = subprocess.check_output(['ip', '-o', 'addr'])
for line in output.decode().splitlines():
parts = line.split()
addresses.append({'kind': '4' if parts[2] == 'inet' else '6',
'address': parts[3].split('/')[0]})
return addresses
def get_hostname():
"""Return the current hostname."""
return subprocess.check_output(['hostname']).decode().strip()