apache: Move diagnostics for checking URLs into apache module

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2019-12-17 15:14:41 -08:00 committed by James Valleroy
parent c617cecc02
commit e0dba2cc17
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
10 changed files with 217 additions and 113 deletions

View File

@ -26,7 +26,7 @@ import re
import subprocess
import sys
from plinth import action_utils
from plinth.modules.apache.components import check_url
AUTO_CONF_FILE = '/etc/apt/apt.conf.d/20auto-upgrades'
LOG_FILE = '/var/log/unattended-upgrades/unattended-upgrades.log'
@ -129,8 +129,7 @@ def _is_release_file_available(protocol):
if protocol == 'tor+http':
wrapper = 'torsocks'
result = action_utils.check_url(BUSTER_BACKPORTS_RELEASE_FILE_URL,
wrapper=wrapper)
result = check_url(BUSTER_BACKPORTS_RELEASE_FILE_URL, wrapper=wrapper)
return result == 'passed'

View File

@ -20,13 +20,10 @@ Python action utility functions.
import logging
import os
import re
import shutil
import subprocess
import tempfile
from django.utils.translation import ugettext as _
logger = logging.getLogger(__name__)
UWSGI_ENABLED_PATH = '/etc/uwsgi/apps-enabled/{config_name}.ini'
@ -283,79 +280,6 @@ def uwsgi_disable(config_name):
service_start('uwsgi')
def check_url(url, kind=None, env=None, check_certificate=True,
extra_options=None, wrapper=None, expected_output=None):
"""Check whether a URL is accessible."""
command = ['curl', '--location', '-f', '-w', '%{response_code}']
if kind == '6':
# extract zone index
match = re.match(r'(.*://)\[(.*)%(?P<zone>.*)\](.*)', url)
if match:
command = command + ['--interface', match.group('zone')]
url = '{0}[{1}]{2}'.format(*match.group(1, 2, 4))
command.append(url)
if wrapper:
command.insert(0, wrapper)
if not check_certificate:
command.append('-k')
if extra_options:
command.extend(extra_options)
if kind:
command.append({'4': '-4', '6': '-6'}[kind])
try:
process = subprocess.run(command, env=env, check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result = 'passed'
if expected_output and expected_output not in process.stdout.decode():
result = 'failed'
except subprocess.CalledProcessError as exception:
result = 'failed'
# Authorization failed is a success
if exception.stdout.decode().strip() in ('401', '405'):
result = 'passed'
except FileNotFoundError:
result = 'error'
return result
def diagnose_url(url, kind=None, env=None, check_certificate=True,
extra_options=None, wrapper=None, expected_output=None):
"""Run a diagnostic on whether a URL is accessible.
Kind can be '4' for IPv4 or '6' for IPv6.
"""
result = check_url(url, kind, env, check_certificate, extra_options,
wrapper, expected_output)
if kind:
return [
_('Access URL {url} on tcp{kind}').format(url=url, kind=kind),
result
]
return [_('Access URL {url}').format(url=url), result]
def diagnose_url_on_all(url, **kwargs):
"""Run a diagnostic on whether a URL is accessible."""
results = []
for address in get_addresses():
current_url = url.format(host=address['url_address'])
results.append(
diagnose_url(current_url, kind=address['kind'], **kwargs))
return results
def get_addresses():
"""Return a list of IP addresses and hostnames."""
addresses = get_ip_addresses()

View File

@ -18,6 +18,11 @@
App component for other apps to use Apache configuration functionality.
"""
import re
import subprocess
from django.utils.translation import ugettext as _
from plinth import action_utils, actions, app
@ -72,11 +77,9 @@ class Webserver(app.LeaderComponent):
for url in self.urls:
if '{host}' in url:
results.extend(
action_utils.diagnose_url_on_all(url,
check_certificate=False))
diagnose_url_on_all(url, check_certificate=False))
else:
results.append(
action_utils.diagnose_url(url, check_certificate=False))
results.append(diagnose_url(url, check_certificate=False))
return results
@ -116,3 +119,76 @@ class Uwsgi(app.LeaderComponent):
"""Return whether the uWSGI daemon is running with configuration."""
return action_utils.uwsgi_is_enabled(self.uwsgi_name) \
and action_utils.service_is_running('uwsgi')
def diagnose_url(url, kind=None, env=None, check_certificate=True,
extra_options=None, wrapper=None, expected_output=None):
"""Run a diagnostic on whether a URL is accessible.
Kind can be '4' for IPv4 or '6' for IPv6.
"""
result = check_url(url, kind, env, check_certificate, extra_options,
wrapper, expected_output)
if kind:
return [
_('Access URL {url} on tcp{kind}').format(url=url, kind=kind),
result
]
return [_('Access URL {url}').format(url=url), result]
def diagnose_url_on_all(url, **kwargs):
"""Run a diagnostic on whether a URL is accessible."""
results = []
for address in action_utils.get_addresses():
current_url = url.format(host=address['url_address'])
results.append(
diagnose_url(current_url, kind=address['kind'], **kwargs))
return results
def check_url(url, kind=None, env=None, check_certificate=True,
extra_options=None, wrapper=None, expected_output=None):
"""Check whether a URL is accessible."""
command = ['curl', '--location', '-f', '-w', '%{response_code}']
if kind == '6':
# extract zone index
match = re.match(r'(.*://)\[(.*)%(?P<zone>.*)\](.*)', url)
if match:
command = command + ['--interface', match.group('zone')]
url = '{0}[{1}]{2}'.format(*match.group(1, 2, 4))
command.append(url)
if wrapper:
command.insert(0, wrapper)
if not check_certificate:
command.append('-k')
if extra_options:
command.extend(extra_options)
if kind:
command.append({'4': '-4', '6': '-6'}[kind])
try:
process = subprocess.run(command, env=env, check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result = 'passed'
if expected_output and expected_output not in process.stdout.decode():
result = 'failed'
except subprocess.CalledProcessError as exception:
result = 'failed'
# Authorization failed is a success
if exception.stdout.decode().strip() in ('401', '405'):
result = 'passed'
except FileNotFoundError:
result = 'error'
return result

View File

@ -18,11 +18,14 @@
Test module for webserver components.
"""
import subprocess
from unittest.mock import call, patch
import pytest
from plinth.modules.apache.components import Uwsgi, Webserver
from plinth.modules.apache.components import (Uwsgi, Webserver, check_url,
diagnose_url,
diagnose_url_on_all)
def test_webserver_init():
@ -80,8 +83,8 @@ def test_webserver_disable(superuser_run):
])
@patch('plinth.action_utils.diagnose_url')
@patch('plinth.action_utils.diagnose_url_on_all')
@patch('plinth.modules.apache.components.diagnose_url')
@patch('plinth.modules.apache.components.diagnose_url_on_all')
def test_webserver_diagnose(diagnose_url_on_all, diagnose_url):
"""Test running diagnostics."""
def on_all_side_effect(url, check_certificate):
@ -177,3 +180,104 @@ def test_uwsgi_is_running(uwsgi_is_enabled, service_is_running):
uwsgi_is_enabled.return_value = False
service_is_running.return_value = False
assert not uwsgi.is_running()
@patch('plinth.modules.apache.components.check_url')
@patch('plinth.action_utils.get_addresses')
def test_diagnose_url(get_addresses, check):
"""Test diagnosing a URL."""
args = {
'url': 'https://localhost/test',
'kind': '4',
'env': {
'test': 'value'
},
'check_certificate': False,
'extra_options': {
'test-1': 'value-1'
},
'wrapper': 'test-wrapper',
'expected_output': 'test-expected'
}
check.return_value = 'passed'
result = diagnose_url(**args)
assert result == ['Access URL https://localhost/test on tcp4', 'passed']
check.return_value = 'failed'
result = diagnose_url(**args)
assert result == ['Access URL https://localhost/test on tcp4', 'failed']
del args['kind']
args['url'] = 'https://{host}/test'
check.return_value = 'passed'
get_addresses.return_value = [{
'kind': '4',
'address': 'test-host-1',
'numeric': False,
'url_address': 'test-host-1'
}, {
'kind': '6',
'address': 'test-host-2',
'numeric': False,
'url_address': 'test-host-2'
}]
result = diagnose_url_on_all(**args)
assert result == [
['Access URL https://test-host-1/test on tcp4', 'passed'],
['Access URL https://test-host-2/test on tcp6', 'passed'],
]
@patch('subprocess.run')
def test_check_url(run):
"""Test checking whether a URL is accessible."""
url = 'http://localhost/test'
basic_command = ['curl', '--location', '-f', '-w', '%{response_code}']
extra_args = {'env': None, 'check': True, 'stdout': -1, 'stderr': -1}
# Basic
assert check_url(url) == 'passed'
run.assert_called_with(basic_command + [url], **extra_args)
# Wrapper
check_url(url, wrapper='test-wrapper')
run.assert_called_with(['test-wrapper'] + basic_command + [url],
**extra_args)
# No certificate check
check_url(url, check_certificate=False)
run.assert_called_with(basic_command + [url, '-k'], **extra_args)
# Extra options
check_url(url, extra_options=['test-opt1', 'test-opt2'])
run.assert_called_with(basic_command + [url, 'test-opt1', 'test-opt2'],
**extra_args)
# TCP4/TCP6
check_url(url, kind='4')
run.assert_called_with(basic_command + [url, '-4'], **extra_args)
check_url(url, kind='6')
run.assert_called_with(basic_command + [url, '-6'], **extra_args)
# IPv6 Link Local URLs
check_url('https://[::2%eth0]/test', kind='6')
run.assert_called_with(
basic_command + ['--interface', 'eth0', 'https://[::2]/test', '-6'],
**extra_args)
# Failure
exception = subprocess.CalledProcessError(returncode=1, cmd=['curl'])
run.side_effect = exception
run.side_effect.stdout = b'500'
assert check_url(url) == 'failed'
# Return code 401, 405
run.side_effect = exception
run.side_effect.stdout = b' 401 '
assert check_url(url) == 'passed'
run.side_effect.stdout = b'405\n'
assert check_url(url) == 'passed'
# Error
run.side_effect = FileNotFoundError()
assert check_url(url) == 'error'

View File

@ -20,9 +20,9 @@ FreedomBox app for system diagnostics.
from django.utils.translation import ugettext_lazy as _
from plinth import action_utils
from plinth import app as app_module
from plinth import daemon, menu
from plinth.modules.apache.components import diagnose_url_on_all
from .manifest import backup # noqa, pylint: disable=unused-import
@ -60,8 +60,8 @@ class DiagnosticsApp(app_module.App):
results = super().diagnose()
results.append(daemon.diagnose_port_listening(8000, 'tcp4'))
results.extend(
action_utils.diagnose_url_on_all('http://{host}/plinth/',
check_certificate=False))
diagnose_url_on_all('http://{host}/plinth/',
check_certificate=False))
return results

View File

@ -19,12 +19,12 @@ import os
import augeas
from django.utils.translation import ugettext_lazy as _
from plinth import action_utils, actions
from plinth import actions
from plinth import app as app_module
from plinth import frontpage, menu
from plinth.daemon import Daemon
from plinth.errors import DomainNotRegisteredError
from plinth.modules.apache.components import Webserver
from plinth.modules.apache.components import Webserver, diagnose_url
from plinth.modules.firewall.components import Firewall
from plinth.utils import format_lazy
@ -110,13 +110,13 @@ class DiasporaApp(app_module.App):
results = super().diagnose()
results.append(
action_utils.diagnose_url('http://diaspora.localhost', kind='4',
check_certificate=False))
diagnose_url('http://diaspora.localhost', kind='4',
check_certificate=False))
results.append(
action_utils.diagnose_url('http://diaspora.localhost', kind='6',
check_certificate=False))
diagnose_url('http://diaspora.localhost', kind='6',
check_certificate=False))
results.append(
action_utils.diagnose_url(
diagnose_url(
'http://diaspora.{}'.format(get_configured_domain_name()),
kind='4', check_certificate=False))

View File

@ -24,11 +24,12 @@ import pathlib
from django.utils.translation import ugettext_lazy as _
from plinth import action_utils, actions
from plinth import actions
from plinth import app as app_module
from plinth import cfg, menu
from plinth.errors import ActionError
from plinth.modules import names
from plinth.modules.apache.components import diagnose_url
from plinth.modules.names.components import DomainType
from plinth.signals import domain_added, domain_removed, post_module_loading
from plinth.utils import format_lazy
@ -91,8 +92,7 @@ class LetsEncryptApp(app_module.App):
for domain in names.components.DomainName.list():
if domain.domain_type.can_have_certificate:
results.append(
action_utils.diagnose_url('https://' + domain.name))
results.append(diagnose_url('https://' + domain.name))
return results

View File

@ -25,6 +25,7 @@ from plinth import action_utils, actions
from plinth import app as app_module
from plinth import cfg, frontpage, menu
from plinth.daemon import Daemon
from plinth.modules.apache.components import diagnose_url
from plinth.modules.firewall.components import Firewall
from plinth.utils import format_lazy
from plinth.views import AppView
@ -97,7 +98,7 @@ class PrivoxyApp(app_module.App):
def diagnose(self):
"""Run diagnostics and return the results."""
results = super().diagnose()
results.append(action_utils.diagnose_url('https://www.debian.org'))
results.append(diagnose_url('https://www.debian.org'))
results.extend(diagnose_url_with_proxy())
return results
@ -137,7 +138,7 @@ def diagnose_url_with_proxy():
proxy = 'http://{host}:8118/'.format(host=address['url_address'])
env = {'https_proxy': proxy}
result = action_utils.diagnose_url(url, kind=address['kind'], env=env)
result = diagnose_url(url, kind=address['kind'], env=env)
result[0] = _('Access {url} with proxy {proxy} on tcp{kind}') \
.format(url=url, proxy=proxy, kind=address['kind'])
results.append(result)

View File

@ -23,11 +23,11 @@ import os
from django.utils.translation import ugettext_lazy as _
from plinth import action_utils, actions
from plinth import actions
from plinth import app as app_module
from plinth import cfg, frontpage, menu
from plinth.daemon import Daemon
from plinth.modules.apache.components import Webserver
from plinth.modules.apache.components import Webserver, diagnose_url
from plinth.modules.firewall.components import Firewall
from plinth.utils import format_lazy
@ -96,13 +96,12 @@ class TahoeApp(app_module.App):
"""Run diagnostics and return the results."""
results = super().diagnose()
results.extend([
action_utils.diagnose_url('http://localhost:5678', kind='4',
check_certificate=False),
action_utils.diagnose_url('http://localhost:5678', kind='6',
check_certificate=False),
action_utils.diagnose_url(
'http://{}:5678'.format(get_configured_domain_name()),
kind='4', check_certificate=False)
diagnose_url('http://localhost:5678', kind='4',
check_certificate=False),
diagnose_url('http://localhost:5678', kind='6',
check_certificate=False),
diagnose_url('http://{}:5678'.format(get_configured_domain_name()),
kind='4', check_certificate=False)
])
return results

View File

@ -26,6 +26,7 @@ from plinth import action_utils, actions
from plinth import app as app_module
from plinth import menu
from plinth.daemon import Daemon, diagnose_netcat, diagnose_port_listening
from plinth.modules.apache.components import diagnose_url
from plinth.modules.firewall.components import Firewall
from plinth.modules.names.components import DomainType
from plinth.signals import domain_added, domain_removed
@ -220,7 +221,7 @@ def _diagnose_control_port():
def _diagnose_url_via_tor(url, kind=None):
"""Diagnose whether a URL is reachable via Tor."""
result = action_utils.diagnose_url(url, kind=kind, wrapper='torsocks')
result = diagnose_url(url, kind=kind, wrapper='torsocks')
result[0] = _('Access URL {url} on tcp{kind} via Tor') \
.format(url=url, kind=kind)
@ -230,8 +231,8 @@ def _diagnose_url_via_tor(url, kind=None):
def _diagnose_tor_use(url, kind=None):
"""Diagnose whether webpage at URL reports that we are using Tor."""
expected_output = 'Congratulations. This browser is configured to use Tor.'
result = action_utils.diagnose_url(url, kind=kind, wrapper='torsocks',
expected_output=expected_output)
result = diagnose_url(url, kind=kind, wrapper='torsocks',
expected_output=expected_output)
result[0] = _('Confirm Tor usage at {url} on tcp{kind}') \
.format(url=url, kind=kind)