firewall: Introduce component for local service protection

- Automatically handle a setup of the component getting added to an existing
app.

Tests:

- Run unit tests

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2022-11-11 11:08:41 -08:00 committed by James Valleroy
parent 2240f7a151
commit 4bf347dbe3
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
6 changed files with 161 additions and 3 deletions

View File

@ -5,3 +5,6 @@ Firewall
.. autoclass:: plinth.modules.firewall.components.Firewall
:members:
.. autoclass:: plinth.modules.firewall.components.FirewallLocalProtection
:members:

View File

@ -34,10 +34,12 @@ _DBUS_NAME = 'org.fedoraproject.FirewallD1'
_FIREWALLD_OBJECT = '/org/fedoraproject/FirewallD1'
_FIREWALLD_INTERFACE = 'org.fedoraproject.FirewallD1'
_ZONE_INTERFACE = 'org.fedoraproject.FirewallD1.zone'
_DIRECT_INTERFACE = 'org.fedoraproject.FirewallD1.direct'
_CONFIG_OBJECT = '/org/fedoraproject/FirewallD1/config'
_CONFIG_INTERFACE = 'org.fedoraproject.FirewallD1.config'
_CONFIG_SERVICE_INTERFACE = 'org.fedoraproject.FirewallD1.config.service'
_CONFIG_ZONE_INTERFACE = 'org.fedoraproject.FirewallD1.config.zone'
_CONFIG_DIRECT_INTERFACE = 'org.fedoraproject.FirewallD1.config.direct'
class FirewallApp(app_module.App):
@ -228,3 +230,25 @@ def remove_service(port, zone):
config_zone = _get_dbus_proxy(zone_path, _CONFIG_ZONE_INTERFACE)
with ignore_dbus_error(service_error='NOT_ENABLED'):
config_zone.removeService('(s)', port)
def add_passthrough(ipv, *args):
"""Add a direct rule with passthrough to ip(6)tables."""
direct_proxy = _get_dbus_proxy(_FIREWALLD_OBJECT, _DIRECT_INTERFACE)
if not direct_proxy.queryPassthrough('(sas)', ipv, args):
direct_proxy.addPassthrough('(sas)', ipv, args)
config_direct = _get_dbus_proxy(_CONFIG_OBJECT, _CONFIG_DIRECT_INTERFACE)
if not config_direct.queryPassthrough('(sas)', ipv, args):
config_direct.addPassthrough('(sas)', ipv, args)
def remove_passthrough(ipv, *args):
"""Add a direct rule with passthrough to ip(6)tables."""
direct_proxy = _get_dbus_proxy(_FIREWALLD_OBJECT, _DIRECT_INTERFACE)
if direct_proxy.queryPassthrough('(sas)', ipv, args):
direct_proxy.removePassthrough('(sas)', ipv, args)
config_direct = _get_dbus_proxy(_CONFIG_OBJECT, _CONFIG_DIRECT_INTERFACE)
if config_direct.queryPassthrough('(sas)', ipv, args):
config_direct.removePassthrough('(sas)', ipv, args)

View File

@ -152,6 +152,59 @@ class Firewall(app.FollowerComponent):
return results
class FirewallLocalProtection(app.FollowerComponent):
"""Component to protect local services from access by local users.
Local service protection means that only administrators and Apache web
server should be able to access certain services and not other users who
have logged into the system. This is needed because some of the services
are protected with authentication and authorization provided by Apache web
server. If services are contacted directly then auth can be bypassed by all
local users.
`component_id` should be a unique ID across all components of an app and
across all components.
`tcp_ports` is list of all local TCP ports on which daemons of this app are
listening. Administrators and Apache web server will be allowed to connect
and all other connections to these ports will be rejected.
"""
def __init__(self, component_id: str, tcp_ports: list[str]):
"""Initialize the firewall component."""
super().__init__(component_id)
self.tcp_ports = tcp_ports
def enable(self):
"""Block traffic to local service from local users."""
super().enable()
for port in self.tcp_ports:
firewall.add_passthrough('ipv6', '-A', 'INPUT', '-p', 'tcp',
'--dport', port, '-j', 'REJECT')
firewall.add_passthrough('ipv4', '-A', 'INPUT', '-p', 'tcp',
'--dport', port, '-j', 'REJECT')
def disable(self):
"""Unblock traffic to local service from local users."""
super().disable()
for port in self.tcp_ports:
firewall.remove_passthrough('ipv6', '-A', 'INPUT', '-p', 'tcp',
'--dport', port, '-j', 'REJECT')
firewall.remove_passthrough('ipv4', '-A', 'INPUT', '-p', 'tcp',
'--dport', port, '-j', 'REJECT')
def setup(self, old_version):
"""Protect services of an app that newly introduced the feature."""
if not old_version:
# Fresh installation of an app. app.enable() will run at the end.
return
if self.app.is_enabled():
# Don't enable if the app is being updated but is disabled.
self.enable()
def get_port_forwarding_info(app_):
"""Return a list of ports to be forwarded for this app to work."""
from plinth.modules import networks

View File

@ -9,7 +9,9 @@ https://davidz25.blogspot.com/2012/06/authorization-rules-in-polkit.html
polkit.addRule(function(action, subject) {
if ((action.id == "org.fedoraproject.FirewallD1.config.info" ||
action.id == "org.fedoraproject.FirewallD1.config") &&
action.id == "org.fedoraproject.FirewallD1.config" ||
action.id == "org.fedoraproject.FirewallD1.direct.info" ||
action.id == "org.fedoraproject.FirewallD1.direct") &&
subject.user == "plinth") {
return polkit.Result.YES;
}

View File

@ -1,4 +1,4 @@
[Allow FreedomBox to manage firewalld]
Identity=unix-user:plinth
Action=org.fedoraproject.FirewallD1.config.info;org.fedoraproject.FirewallD1.config
Action=org.fedoraproject.FirewallD1.config.info;org.fedoraproject.FirewallD1.config;org.fedoraproject.FirewallD1.direct.info;org.fedoraproject.FirewallD1.direct;
ResultAny=yes

View File

@ -7,7 +7,9 @@ from unittest.mock import call, patch
import pytest
from plinth.modules.firewall.components import Firewall
from plinth.app import App
from plinth.modules.firewall.components import (Firewall,
FirewallLocalProtection)
@pytest.fixture(name='empty_firewall_list', autouse=True)
@ -65,6 +67,7 @@ def test_port_details(get_port_details):
@patch('plinth.modules.firewall.get_enabled_services')
def test_enable(get_enabled_services, add_service):
"""Test enabling a firewall component."""
def get_enabled_services_side_effect(zone):
return {'internal': ['test-port1'], 'external': ['test-port2']}[zone]
@ -130,6 +133,7 @@ def test_disable(get_enabled_services, add_service, remove_service):
@patch('plinth.modules.firewall.get_enabled_services')
def test_diagnose(get_enabled_services, get_port_details):
"""Test diagnosing open/closed firewall ports."""
def get_port_details_side_effect(port):
return {
'test-port1': [(1234, 'tcp'), (1234, 'udp')],
@ -180,3 +184,75 @@ def test_diagnose(get_enabled_services, get_port_details):
], [
'Port test-port4 (4567/udp) available for external networks', 'failed'
]]
def test_local_protection_init():
"""Test initializing the local protection component."""
component = FirewallLocalProtection('test-component', ['1234', '4567'])
assert component.component_id == 'test-component'
assert component.tcp_ports == ['1234', '4567']
@patch('plinth.modules.firewall.add_passthrough')
def test_local_protection_enable(add_passthrough):
"""Test enabling local protection component."""
component = FirewallLocalProtection('test-component', ['1234', '4567'])
component.enable()
calls = [
call('ipv6', '-A', 'INPUT', '-p', 'tcp', '--dport', '1234', '-j',
'REJECT'),
call('ipv4', '-A', 'INPUT', '-p', 'tcp', '--dport', '1234', '-j',
'REJECT'),
call('ipv6', '-A', 'INPUT', '-p', 'tcp', '--dport', '4567', '-j',
'REJECT'),
call('ipv4', '-A', 'INPUT', '-p', 'tcp', '--dport', '4567', '-j',
'REJECT')
]
add_passthrough.assert_has_calls(calls)
@patch('plinth.modules.firewall.remove_passthrough')
def test_local_protection_disable(remove_passthrough):
"""Test disabling local protection component."""
component = FirewallLocalProtection('test-component', ['1234', '4567'])
component.disable()
calls = [
call('ipv6', '-A', 'INPUT', '-p', 'tcp', '--dport', '1234', '-j',
'REJECT'),
call('ipv4', '-A', 'INPUT', '-p', 'tcp', '--dport', '1234', '-j',
'REJECT'),
call('ipv6', '-A', 'INPUT', '-p', 'tcp', '--dport', '4567', '-j',
'REJECT'),
call('ipv4', '-A', 'INPUT', '-p', 'tcp', '--dport', '4567', '-j',
'REJECT')
]
remove_passthrough.assert_has_calls(calls)
@patch('plinth.modules.firewall.components.FirewallLocalProtection.enable')
def test_local_protection_setup(enable):
"""Test setting up protection when updating the app."""
class TestApp(App):
app_id = 'test-app'
enabled = True
def is_enabled(self):
return self.enabled
app = TestApp()
component = FirewallLocalProtection('test-component', ['1234', '4567'])
app.add(component)
component.setup(old_version=0)
enable.assert_not_called()
app.enabled = False
component.setup(old_version=1)
enable.assert_not_called()
app.enabled = True
component.setup(old_version=1)
enable.assert_has_calls([call()])