mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
- Try to mark class variables in component classes. - Leave typing hints generic, such as 'list' and 'dict' where content is usually not filled, too complex, or context is unimportant. - backups: Handle failure for tarfile extraction so that methods are not called on potentially None valued variables. - backups: Prevent potentially passing a keyword argument twice. - dynamicdns: Deal properly with outcome of urlparsing. - ejabberd: Deal with failed regex match - email: Fix a mypy compliant when iterating a filtered list. - tor: Don't reuse variables for different typed values. - tor: Don't reuse variables for different typed values. - operation: Return None explicitly. - operation: Ensure that keyword argument is not repeated. Tests: - Where only typing hints were modified and no syntax error came up, additional testing was not done. - `mypy --ignore-missing-imports .` run successfully. - Generate developer documentation. - Service runs without errors upon start up. - backups: Listing and restoring specific apps from a backup works. - backups: Mounting a remote backup repository works. - NOT TESTED: dynamicdns: Migrating from old style configuration works. - ejabberd: Verify that setting coturn configuration works. - email: Test that showing configuration from postfix works. - tor: Orport value is properly shown. - transmission: Configuration values are properly set. - users: Running unit tests as root works. - operation: Operation status messages are show properly during app install. - ./setup.py install runs Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
290 lines
10 KiB
Python
290 lines
10 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""FreedomBox app to configure a firewall."""
|
|
|
|
import contextlib
|
|
import logging
|
|
|
|
from django.utils.translation import gettext
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from plinth import app as app_module
|
|
from plinth import cfg, menu
|
|
from plinth.daemon import Daemon
|
|
from plinth.modules.backups.components import BackupRestore
|
|
from plinth.package import Packages, install
|
|
from plinth.utils import Version, format_lazy, import_from_gi
|
|
|
|
from . import manifest, privileged
|
|
|
|
gio = import_from_gi('Gio', '2.0')
|
|
glib = import_from_gi('GLib', '2.0')
|
|
|
|
_description = [
|
|
format_lazy(
|
|
_('Firewall is a security system that controls the incoming and '
|
|
'outgoing network traffic on your {box_name}. Keeping a '
|
|
'firewall enabled and properly configured reduces risk of '
|
|
'security threat from the Internet.'), box_name=cfg.box_name)
|
|
]
|
|
|
|
_port_details: dict[str, list[str]] = {}
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DBUS_NAME = 'org.fedoraproject.FirewallD1'
|
|
_FIREWALLD_OBJECT = '/org/fedoraproject/FirewallD1'
|
|
_FIREWALLD_INTERFACE = 'org.fedoraproject.FirewallD1'
|
|
_ZONE_INTERFACE = 'org.fedoraproject.FirewallD1.zone'
|
|
_DIRECT_INTERFACE = 'org.fedoraproject.FirewallD1.direct'
|
|
_CONFIG_OBJECT = '/org/fedoraproject/FirewallD1/config'
|
|
_CONFIG_INTERFACE = 'org.fedoraproject.FirewallD1.config'
|
|
_CONFIG_SERVICE_INTERFACE = 'org.fedoraproject.FirewallD1.config.service'
|
|
_CONFIG_ZONE_INTERFACE = 'org.fedoraproject.FirewallD1.config.zone'
|
|
_CONFIG_DIRECT_INTERFACE = 'org.fedoraproject.FirewallD1.config.direct'
|
|
|
|
|
|
class FirewallApp(app_module.App):
|
|
"""FreedomBox app for Firewall."""
|
|
|
|
app_id = 'firewall'
|
|
|
|
_version = 3
|
|
|
|
can_be_disabled = False
|
|
|
|
def __init__(self):
|
|
"""Create components for the app."""
|
|
super().__init__()
|
|
|
|
info = app_module.Info(app_id=self.app_id, version=self._version,
|
|
is_essential=True, name=_('Firewall'),
|
|
icon='fa-shield', description=_description,
|
|
manual_page='Firewall')
|
|
self.add(info)
|
|
|
|
menu_item = menu.Menu('menu-firewall', info.name, None, info.icon,
|
|
'firewall:index', parent_url_name='system')
|
|
self.add(menu_item)
|
|
|
|
packages = Packages('packages-firewall', ['firewalld', 'nftables'])
|
|
self.add(packages)
|
|
|
|
daemon = Daemon('daemon-firewall', 'firewalld')
|
|
self.add(daemon)
|
|
|
|
backup_restore = BackupRestore('backup-restore-firewall',
|
|
**manifest.backup)
|
|
self.add(backup_restore)
|
|
|
|
def setup(self, old_version):
|
|
"""Install and configure the app."""
|
|
super().setup(old_version)
|
|
_run_setup()
|
|
|
|
def force_upgrade(self, packages):
|
|
"""Force upgrade firewalld to resolve conffile prompts."""
|
|
if 'firewalld' not in packages:
|
|
return False
|
|
|
|
# Allow upgrade from any version to any version below 2.0
|
|
package = packages['firewalld']
|
|
if Version(package['new_version']) > Version('2~'):
|
|
return False
|
|
|
|
install(['firewalld'], force_configuration='new')
|
|
_run_setup()
|
|
return True
|
|
|
|
def diagnose(self):
|
|
"""Run diagnostics and return the results."""
|
|
results = super().diagnose()
|
|
config = privileged.get_config()
|
|
results.append(_diagnose_default_zone(config))
|
|
results.append(_diagnose_firewall_backend(config))
|
|
results.append(_diagnose_direct_passthroughs(config))
|
|
return results
|
|
|
|
|
|
def _run_setup():
|
|
"""Run firewalld setup."""
|
|
privileged.setup()
|
|
add_service('http', 'external')
|
|
add_service('http', 'internal')
|
|
add_service('https', 'external')
|
|
add_service('https', 'internal')
|
|
add_service('dns', 'internal')
|
|
add_service('dhcp', 'internal')
|
|
|
|
|
|
def _get_dbus_proxy(object, interface):
|
|
"""Return a DBusProxy for a given firewalld object and interface."""
|
|
connection = gio.bus_get_sync(gio.BusType.SYSTEM)
|
|
return gio.DBusProxy.new_sync(connection, gio.DBusProxyFlags.NONE, None,
|
|
_DBUS_NAME, object, interface)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def ignore_dbus_error(dbus_error=None, service_error=None):
|
|
try:
|
|
yield
|
|
except glib.Error as exception:
|
|
parts = exception.message.split(':')
|
|
if parts[0] != 'GDBus.Error':
|
|
raise
|
|
|
|
if (dbus_error and parts[1].strip()
|
|
== 'org.freedesktop.DBus.Error.' + dbus_error):
|
|
logger.error('Firewalld is not running.')
|
|
elif (service_error and parts[2].strip() == service_error):
|
|
logger.warning('Ignoring firewall exception: %s', service_error)
|
|
else:
|
|
raise
|
|
|
|
|
|
def parse_dbus_error(exception):
|
|
"""Parse a GDBus error."""
|
|
parts = exception.message.split(':')
|
|
if parts[0] != 'GDBus.Error' or \
|
|
parts[1] != 'org.fedoraproject.FirewallD1.Exception':
|
|
return None
|
|
|
|
return parts[2].strip()
|
|
|
|
|
|
def reload():
|
|
"""Reload firewalld."""
|
|
logger.info('Reloading firewalld')
|
|
with ignore_dbus_error(dbus_error='ServiceUnknown'):
|
|
proxy = _get_dbus_proxy(_FIREWALLD_OBJECT, _FIREWALLD_INTERFACE)
|
|
proxy.reload()
|
|
|
|
|
|
def try_with_reload(operation):
|
|
"""Try an operation and retry after firewalld reload.
|
|
|
|
When a service file is newly installed into /usr/lib/firewalld/services,
|
|
it's information can be immediately queried but the service can't be
|
|
added/removed from a zone. A firewalld reload is necessary. So, try an
|
|
operation and if it fails with INVALID_SERVICE error, reload firewalld and
|
|
try again.
|
|
|
|
"""
|
|
try:
|
|
operation()
|
|
except glib.Error as exception:
|
|
error = parse_dbus_error(exception)
|
|
if error != 'INVALID_SERVICE':
|
|
raise
|
|
|
|
reload()
|
|
operation()
|
|
|
|
|
|
def get_enabled_services(zone):
|
|
"""Return the status of various services currently enabled."""
|
|
with ignore_dbus_error(dbus_error='ServiceUnknown'):
|
|
zone_proxy = _get_dbus_proxy(_FIREWALLD_OBJECT, _ZONE_INTERFACE)
|
|
return zone_proxy.getServices('(s)', zone)
|
|
|
|
return [] # When firewalld is not running
|
|
|
|
|
|
def get_port_details(service_port):
|
|
"""Return the port types and numbers for a service port."""
|
|
try:
|
|
return _port_details[service_port]
|
|
except KeyError:
|
|
config = _get_dbus_proxy(_CONFIG_OBJECT, _CONFIG_INTERFACE)
|
|
try:
|
|
service_path = config.getServiceByName('(s)', service_port)
|
|
except glib.Error:
|
|
return [] # Don't cache the error result
|
|
|
|
service = _get_dbus_proxy(service_path, _CONFIG_SERVICE_INTERFACE)
|
|
_port_details[service_port] = service.getPorts()
|
|
return _port_details[service_port]
|
|
|
|
|
|
def get_interfaces(zone):
|
|
"""Return the list of interfaces in a zone."""
|
|
with ignore_dbus_error(dbus_error='ServiceUnknown'):
|
|
zone_proxy = _get_dbus_proxy(_FIREWALLD_OBJECT, _ZONE_INTERFACE)
|
|
return zone_proxy.getInterfaces('(s)', zone)
|
|
|
|
return [] # When firewalld is not running
|
|
|
|
|
|
def add_service(port, zone):
|
|
"""Enable a service in firewall."""
|
|
with ignore_dbus_error(dbus_error='ServiceUnknown'):
|
|
zone_proxy = _get_dbus_proxy(_FIREWALLD_OBJECT, _ZONE_INTERFACE)
|
|
with ignore_dbus_error(service_error='ALREADY_ENABLED'):
|
|
zone_proxy.addService('(ssi)', zone, port, 0)
|
|
|
|
config = _get_dbus_proxy(_CONFIG_OBJECT, _CONFIG_INTERFACE)
|
|
zone_path = config.getZoneByName('(s)', zone)
|
|
config_zone = _get_dbus_proxy(zone_path, _CONFIG_ZONE_INTERFACE)
|
|
with ignore_dbus_error(service_error='ALREADY_ENABLED'):
|
|
config_zone.addService('(s)', port)
|
|
|
|
|
|
def remove_service(port, zone):
|
|
"""Remove a service in firewall."""
|
|
with ignore_dbus_error(dbus_error='ServiceUnknown'):
|
|
zone_proxy = _get_dbus_proxy(_FIREWALLD_OBJECT, _ZONE_INTERFACE)
|
|
with ignore_dbus_error(service_error='NOT_ENABLED'):
|
|
zone_proxy.removeService('(ss)', zone, port)
|
|
|
|
config = _get_dbus_proxy(_CONFIG_OBJECT, _CONFIG_INTERFACE)
|
|
zone_path = config.getZoneByName('(s)', zone)
|
|
config_zone = _get_dbus_proxy(zone_path, _CONFIG_ZONE_INTERFACE)
|
|
with ignore_dbus_error(service_error='NOT_ENABLED'):
|
|
config_zone.removeService('(s)', port)
|
|
|
|
|
|
def add_passthrough(ipv, *args):
|
|
"""Add a direct rule with passthrough to ip(6)tables."""
|
|
direct_proxy = _get_dbus_proxy(_FIREWALLD_OBJECT, _DIRECT_INTERFACE)
|
|
if not direct_proxy.queryPassthrough('(sas)', ipv, args):
|
|
direct_proxy.addPassthrough('(sas)', ipv, args)
|
|
|
|
config_direct = _get_dbus_proxy(_CONFIG_OBJECT, _CONFIG_DIRECT_INTERFACE)
|
|
if not config_direct.queryPassthrough('(sas)', ipv, args):
|
|
config_direct.addPassthrough('(sas)', ipv, args)
|
|
|
|
|
|
def remove_passthrough(ipv, *args):
|
|
"""Add a direct rule with passthrough to ip(6)tables."""
|
|
direct_proxy = _get_dbus_proxy(_FIREWALLD_OBJECT, _DIRECT_INTERFACE)
|
|
if direct_proxy.queryPassthrough('(sas)', ipv, args):
|
|
direct_proxy.removePassthrough('(sas)', ipv, args)
|
|
|
|
config_direct = _get_dbus_proxy(_CONFIG_OBJECT, _CONFIG_DIRECT_INTERFACE)
|
|
if config_direct.queryPassthrough('(sas)', ipv, args):
|
|
config_direct.removePassthrough('(sas)', ipv, args)
|
|
|
|
|
|
def _diagnose_default_zone(config):
|
|
"""Diagnose whether the default zone is external."""
|
|
testname = gettext('Default zone is external')
|
|
result = 'passed' if config['default_zone'] == 'external' else 'failed'
|
|
return [testname, result]
|
|
|
|
|
|
def _diagnose_firewall_backend(config):
|
|
"""Diagnose whether the firewall backend is nftables."""
|
|
testname = gettext('Firewall backend is nftables')
|
|
result = 'passed' if config['backend'] == 'nftables' else 'failed'
|
|
return [testname, result]
|
|
|
|
|
|
def _diagnose_direct_passthroughs(config):
|
|
"""Diagnose direct passthroughs for local service protection.
|
|
|
|
Currently, we just check that the number of passthroughs is at least 12,
|
|
which are the number that are added by firewall's setup.
|
|
"""
|
|
testname = gettext('Direct passthrough rules exist')
|
|
result = 'passed' if len(config['passthroughs']) >= 12 else 'failed'
|
|
return [testname, result]
|