samba: Use privileged decorator for actions

Tests:

- Functional tests work (uninstall fails)
- Initial setup works
  - File /etc/default/samba is updated
- Dump and restore share during backup/restore works
  - Setup run successfully during restore
  - /var/lib/plinth/backups-data/samba-shares-dump.conf
- Adding/deleting a share works
  - Not tested: Add a share on ntfs or vfat file system works
- Showing list of shares in app view works
- Getting list of samba users in app view works
- Handling errors during add/delete share works

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2022-09-02 08:58:54 -07:00 committed by James Valleroy
parent 6072b1cea6
commit 49e4ebf8f9
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
5 changed files with 62 additions and 132 deletions

View File

@ -1,17 +1,13 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
""" """FreedomBox app to configure samba."""
FreedomBox app to configure samba.
"""
import grp import grp
import json
import pwd import pwd
import socket import socket
from django.urls import reverse_lazy from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from plinth import actions
from plinth import app as app_module from plinth import app as app_module
from plinth import frontpage, menu from plinth import frontpage, menu
from plinth.daemon import Daemon from plinth.daemon import Daemon
@ -21,7 +17,7 @@ from plinth.modules.users.components import UsersAndGroups
from plinth.package import Packages from plinth.package import Packages
from plinth.utils import format_lazy from plinth.utils import format_lazy
from . import manifest from . import manifest, privileged
_description = [ _description = [
_('Samba allows to share files and folders between FreedomBox and ' _('Samba allows to share files and folders between FreedomBox and '
@ -102,7 +98,7 @@ class SambaApp(app_module.App):
def setup(self, old_version): def setup(self, old_version):
"""Install and configure the app.""" """Install and configure the app."""
super().setup(old_version) super().setup(old_version)
actions.superuser_run('samba', ['setup']) privileged.setup()
self.enable() self.enable()
@ -112,37 +108,24 @@ class SambaBackupRestore(BackupRestore):
def backup_pre(self, packet): def backup_pre(self, packet):
"""Save registry share configuration.""" """Save registry share configuration."""
super().backup_pre(packet) super().backup_pre(packet)
actions.superuser_run('samba', ['dump-shares']) privileged.dump_shares()
def restore_post(self, packet): def restore_post(self, packet):
"""Restore configuration.""" """Restore configuration."""
super().restore_post(packet) super().restore_post(packet)
actions.superuser_run('samba', ['setup']) privileged.setup()
actions.superuser_run('samba', ['restore-shares']) privileged.restore_shares()
def add_share(mount_point, share_type, filesystem): def add_share(mount_point, share_type, filesystem):
"""Add a share.""" """Add a share."""
command = [ windows_filesystem = (filesystem in ['ntfs', 'vfat'])
'add-share', '--mount-point', mount_point, '--share-type', share_type privileged.add_share(mount_point, share_type, windows_filesystem)
]
if filesystem in ['ntfs', 'vfat']:
command = command + ['--windows-filesystem']
actions.superuser_run('samba', command)
def delete_share(mount_point, share_type):
"""Delete a share."""
actions.superuser_run('samba', [
'delete-share', '--mount-point', mount_point, '--share-type',
share_type
])
def get_users(): def get_users():
"""Get non-system users who are in the freedombox-share or admin group.""" """Get non-system users who are in the freedombox-share or admin group."""
output = actions.superuser_run('samba', ['get-users']) samba_users = privileged.get_users()
samba_users = json.loads(output)['users']
group_users = grp.getgrnam('freedombox-share').gr_mem + grp.getgrnam( group_users = grp.getgrnam('freedombox-share').gr_mem + grp.getgrnam(
'admin').gr_mem 'admin').gr_mem
@ -158,10 +141,3 @@ def get_users():
'password_re_enter_needed': 'password_re_enter_needed':
sorted(set(allowed_users) - set(samba_users)) sorted(set(allowed_users) - set(samba_users))
} }
def get_shares():
"""Get defined shares."""
output = actions.superuser_run('samba', ['get-shares'])
return json.loads(output)

99
actions/samba → plinth/modules/samba/privileged.py Executable file → Normal file
View File

@ -1,16 +1,13 @@
#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
""" """Configuration helper for samba."""
Configuration helper for samba.
"""
import argparse
import configparser import configparser
import json
import os import os
import shutil import shutil
import subprocess import subprocess
from plinth.actions import privileged
SHARES_CONF_BACKUP_FILE = '/var/lib/plinth/backups-data/samba-shares-dump.conf' SHARES_CONF_BACKUP_FILE = '/var/lib/plinth/backups-data/samba-shares-dump.conf'
DEFAULT_FILE = '/etc/default/samba' DEFAULT_FILE = '/etc/default/samba'
@ -45,42 +42,6 @@ CONF = r'''
''' # noqa: E501 ''' # noqa: E501
def parse_arguments():
"""Return parsed command line arguments as dictionary."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
subparsers.add_parser('setup', help='Configure samba after install')
subparsers.add_parser('get-shares', help='Get configured samba shares')
subparsers.add_parser('get-users', help='Get users from Samba database')
subparser = subparsers.add_parser('add-share', help='Add new samba share')
subparser.add_argument('--mount-point', help='Path of the mount point',
required=True)
subparser.add_argument('--share-type', help='Type of the share',
required=True, choices=['open', 'group', 'home'])
subparser.add_argument('--windows-filesystem', required=False,
default=False, action='store_true',
help='Path is Windows filesystem')
subparser = subparsers.add_parser(
'delete-share', help='Delete a samba share configuration')
subparser.add_argument('--mount-point', help='Path of the mount point',
required=True)
subparser.add_argument('--share-type', help='Type of the share',
required=True, choices=['open', 'group', 'home'])
subparsers.add_parser('dump-shares',
help='Dump share configuration to file')
subparsers.add_parser('restore-shares',
help='Restore share configuration from file')
subparsers.required = True
return parser.parse_args()
def _close_share(share_name): def _close_share(share_name):
"""Disconnect all samba users who are connected to the share.""" """Disconnect all samba users who are connected to the share."""
subprocess.check_call(['smbcontrol', 'smbd', 'close-share', share_name]) subprocess.check_call(['smbcontrol', 'smbd', 'close-share', share_name])
@ -182,7 +143,7 @@ def _get_mount_point(path):
return path.split(subpath)[0] return path.split(subpath)[0]
def _get_shares(): def _get_shares() -> list[dict[str, str]]:
"""Get shares.""" """Get shares."""
shares = [] shares = []
output = subprocess.check_output(['net', 'conf', 'list']) output = subprocess.check_output(['net', 'conf', 'list'])
@ -265,40 +226,50 @@ def _set_share_permissions(directory):
subprocess.check_call(['setfacl', '-Rdm', 'g::rwX', directory]) subprocess.check_call(['setfacl', '-Rdm', 'g::rwX', directory])
def subcommand_add_share(arguments): @privileged
def add_share(mount_point: str, share_type: str, windows_filesystem: bool):
"""Create a samba share.""" """Create a samba share."""
mount_point = os.path.normpath(arguments.mount_point) if share_type not in ('open', 'group', 'home'):
raise ValueError('Invalid share type')
mount_point = os.path.normpath(mount_point)
if not os.path.ismount(mount_point): if not os.path.ismount(mount_point):
raise RuntimeError( raise RuntimeError(
'Path "{0}" is not a mount point.'.format(mount_point)) 'Path "{0}" is not a mount point.'.format(mount_point))
_create_share(mount_point, arguments.share_type, _create_share(mount_point, share_type, windows_filesystem)
arguments.windows_filesystem)
def subcommand_delete_share(arguments): @privileged
def delete_share(mount_point: str, share_type: str):
"""Delete a samba share configuration.""" """Delete a samba share configuration."""
mount_point = os.path.normpath(arguments.mount_point) if share_type not in ('open', 'group', 'home'):
raise ValueError('Invalid share type')
mount_point = os.path.normpath(mount_point)
shares = _get_shares() shares = _get_shares()
for share in shares: for share in shares:
if share['mount_point'] == mount_point and share[ if share['mount_point'] == mount_point and share[
'share_type'] == arguments.share_type: 'share_type'] == share_type:
_close_share(share['name']) _close_share(share['name'])
_conf_command(['delshare', share['name']]) _conf_command(['delshare', share['name']])
def subcommand_get_shares(_): @privileged
def get_shares() -> list[dict[str, str]]:
"""Get samba shares.""" """Get samba shares."""
print(json.dumps(_get_shares())) return _get_shares()
def subcommand_get_users(_): @privileged
def get_users() -> list[str]:
"""Get users from Samba database.""" """Get users from Samba database."""
output = subprocess.check_output(['pdbedit', '-L']).decode() output = subprocess.check_output(['pdbedit', '-L']).decode()
samba_users = [line.split(':')[0] for line in output.split()] samba_users = [line.split(':')[0] for line in output.split()]
print(json.dumps({'users': samba_users})) return samba_users
def subcommand_setup(_): @privileged
def setup():
"""Configure samba, use custom samba config file.""" """Configure samba, use custom samba config file."""
from plinth import action_utils from plinth import action_utils
with open(CONF_PATH, 'w', encoding='utf-8') as file_handle: with open(CONF_PATH, 'w', encoding='utf-8') as file_handle:
@ -310,7 +281,8 @@ def subcommand_setup(_):
action_utils.service_restart('smbd') action_utils.service_restart('smbd')
def subcommand_dump_shares(_): @privileged
def dump_shares():
"""Dump registy share configuration.""" """Dump registy share configuration."""
os.makedirs(os.path.dirname(SHARES_CONF_BACKUP_FILE), exist_ok=True) os.makedirs(os.path.dirname(SHARES_CONF_BACKUP_FILE), exist_ok=True)
with open(SHARES_CONF_BACKUP_FILE, 'w', encoding='utf-8') as backup_file: with open(SHARES_CONF_BACKUP_FILE, 'w', encoding='utf-8') as backup_file:
@ -318,22 +290,11 @@ def subcommand_dump_shares(_):
subprocess.run(command, stdout=backup_file, check=True) subprocess.run(command, stdout=backup_file, check=True)
def subcommand_restore_shares(_): @privileged
def restore_shares():
"""Restore registy share configuration.""" """Restore registy share configuration."""
if not os.path.exists(SHARES_CONF_BACKUP_FILE): if not os.path.exists(SHARES_CONF_BACKUP_FILE):
raise RuntimeError( raise RuntimeError(
'Backup file {0} does not exist.'.format(SHARES_CONF_BACKUP_FILE)) 'Backup file {0} does not exist.'.format(SHARES_CONF_BACKUP_FILE))
_conf_command(['drop']) _conf_command(['drop'])
_conf_command(['import', SHARES_CONF_BACKUP_FILE]) _conf_command(['import', SHARES_CONF_BACKUP_FILE])
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()

View File

@ -3,7 +3,7 @@
Tests for samba views. Tests for samba views.
""" """
import json import pathlib
import urllib import urllib
from unittest.mock import patch from unittest.mock import patch
@ -12,11 +12,11 @@ from django import urls
from django.contrib.messages.storage.fallback import FallbackStorage from django.contrib.messages.storage.fallback import FallbackStorage
from plinth import module_loader from plinth import module_loader
from plinth.errors import ActionError
from plinth.modules.samba import views from plinth.modules.samba import views
# For all tests, use plinth.urls instead of urls configured for testing # For all tests, use plinth.urls instead of urls configured for testing
pytestmark = pytest.mark.urls('plinth.urls') pytestmark = pytest.mark.urls('plinth.urls')
setfacl_path = pathlib.Path('/usr/bin/setfacl')
USERS = {"access_ok": ["testuser"], 'password_re_enter_needed': []} USERS = {"access_ok": ["testuser"], 'password_re_enter_needed': []}
@ -70,18 +70,12 @@ def fixture_samba_urls():
yield yield
def action_run(action, options, **kwargs):
"""Action return values."""
if action == 'samba' and options == ['get-shares']:
return json.dumps(SHARES)
return None
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def samba_patch_actions(): def samba_patch_privileged():
"""Patch actions scripts runner.""" """Patch privileged scripts runner."""
with patch('plinth.actions.superuser_run', side_effect=action_run): with patch('plinth.modules.samba.privileged.get_shares') as get_shares, \
patch('plinth.modules.samba.privileged.delete_share'):
get_shares.return_value = SHARES
yield yield
@ -120,6 +114,7 @@ def test_samba_shares_view(rf):
assert response.status_code == 200 assert response.status_code == 200
@pytest.mark.skipif(not setfacl_path.exists(), reason='setfacl not installed')
def test_enable_samba_share_view(rf): def test_enable_samba_share_view(rf):
"""Test that enabling share sends correct success message.""" """Test that enabling share sends correct success message."""
form_data = {'filesystem_type': 'ext4', 'open_share': 'enable'} form_data = {'filesystem_type': 'ext4', 'open_share': 'enable'}
@ -138,7 +133,7 @@ def test_enable_samba_share_failed_view(rf):
mount_point = urllib.parse.quote('/') mount_point = urllib.parse.quote('/')
error_message = 'Sharing failed' error_message = 'Sharing failed'
with patch('plinth.modules.samba.add_share', with patch('plinth.modules.samba.add_share',
side_effect=ActionError(error_message)): side_effect=RuntimeError(error_message)):
response, messages = make_request(rf.post('', data=form_data), response, messages = make_request(rf.post('', data=form_data),
views.share, mount_point=mount_point) views.share, mount_point=mount_point)
@ -165,13 +160,12 @@ def test_disable_samba_share_failed_view(rf):
form_data = {'filesystem_type': 'ext4', 'open_share': 'disable'} form_data = {'filesystem_type': 'ext4', 'open_share': 'disable'}
mount_point = urllib.parse.quote('/') mount_point = urllib.parse.quote('/')
error_message = 'Unsharing failed' error_message = 'Unsharing failed'
with patch('plinth.modules.samba.delete_share', with patch('plinth.modules.samba.privileged.delete_share',
side_effect=ActionError(error_message)): side_effect=RuntimeError(error_message)):
response, messages = make_request(rf.post('', data=form_data), response, messages = make_request(rf.post('', data=form_data),
views.share, mount_point=mount_point) views.share, mount_point=mount_point)
assert list( assert list(messages)[
messages)[0].message == 'Error disabling share: {0}'.format( 0].message == 'Error disabling share: {0}'.format(error_message)
error_message)
assert response.status_code == 302 assert response.status_code == 302
assert response.url == urls.reverse('samba:index') assert response.url == urls.reverse('samba:index')

View File

@ -1,7 +1,5 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
""" """Views for samba module."""
Views for samba module.
"""
import logging import logging
import os import os
@ -15,9 +13,10 @@ from django.utils.translation import gettext as _
from django.views.decorators.http import require_POST from django.views.decorators.http import require_POST
from plinth import views from plinth import views
from plinth.errors import ActionError
from plinth.modules import samba, storage from plinth.modules import samba, storage
from . import privileged
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -40,6 +39,7 @@ def get_share_mounts():
class SambaAppView(views.AppView): class SambaAppView(views.AppView):
"""Samba sharing basic configuration.""" """Samba sharing basic configuration."""
app_id = 'samba' app_id = 'samba'
template_name = 'samba.html' template_name = 'samba.html'
@ -49,7 +49,7 @@ class SambaAppView(views.AppView):
disks = get_share_mounts() disks = get_share_mounts()
context['disks'] = disks context['disks'] = disks
shares = samba.get_shares() shares = privileged.get_shares()
shared_mounts = defaultdict(list) shared_mounts = defaultdict(list)
for share in shares: for share in shares:
shared_mounts[share['mount_point']].append(share['share_type']) shared_mounts[share['mount_point']].append(share['share_type'])
@ -101,7 +101,7 @@ def share(request, mount_point):
try: try:
samba.add_share(mount_point, share_type, filesystem) samba.add_share(mount_point, share_type, filesystem)
messages.success(request, _('Share enabled.')) messages.success(request, _('Share enabled.'))
except ActionError as exception: except Exception as exception:
logger.exception('Error enabling share') logger.exception('Error enabling share')
messages.error( messages.error(
request, request,
@ -109,9 +109,9 @@ def share(request, mount_point):
error_message=exception)) error_message=exception))
elif action == 'disable': elif action == 'disable':
try: try:
samba.delete_share(mount_point, share_type) privileged.delete_share(mount_point, share_type)
messages.success(request, _('Share disabled.')) messages.success(request, _('Share disabled.'))
except ActionError as exception: except Exception as exception:
logger.exception('Error disabling share') logger.exception('Error disabling share')
messages.error( messages.error(
request, request,

View File

@ -3,7 +3,6 @@
Forms for directory selection. Forms for directory selection.
""" """
import json
import os import os
from django import forms from django import forms
@ -13,14 +12,14 @@ from django.utils.translation import gettext_lazy as _
from plinth import actions from plinth import actions
from plinth import app as app_module from plinth import app as app_module
from plinth.modules import storage from plinth.modules import storage
from plinth.modules.samba import privileged as samba_privileged
def get_available_samba_shares(): def get_available_samba_shares():
"""Get available samba shares.""" """Get available samba shares."""
available_shares = [] available_shares = []
if _is_app_enabled('samba'): if _is_app_enabled('samba'):
samba_shares = json.loads( samba_shares = samba_privileged.get_shares()
actions.superuser_run('samba', ['get-shares']))
if samba_shares: if samba_shares:
disks = storage.get_mounts() disks = storage.get_mounts()
for share in samba_shares: for share in samba_shares: