sharing: Use privileged decorator for actions

Tests:

- Initial setup works.
  - Empty Apache configuration file is created
- Adding a share works all the information added is shown during editing.
  Configuration file is updated as expected.
- List of shares is shown as expected.
- When editing a share, information about share is shown correctly. Editing
  works are expected.
- Removing a share works.
- Trying to add share with a name that already exists throws a proper error
  message.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2022-08-26 11:02:10 -07:00 committed by James Valleroy
parent 6e7b31a3cf
commit 637e6b1198
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
5 changed files with 175 additions and 248 deletions

View File

@ -1,208 +0,0 @@
#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Configuration helper for the sharing app.
"""
import argparse
import json
import os
import pathlib
import re
import augeas
from plinth import action_utils
APACHE_CONFIGURATION = '/etc/apache2/conf-available/sharing-freedombox.conf'
def parse_arguments():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
subparsers.add_parser('list', help='List all existing shares')
add_parser = subparsers.add_parser('add', help='Add a new share')
add_parser.add_argument('--name', required=True, help='Name of the share')
add_parser.add_argument('--path', required=True, help='Disk path to share')
add_parser.add_argument('--groups', nargs='*',
help='List of groups that can access the share')
add_parser.add_argument('--is-public', required=False, default=False,
action="store_true",
help='Allow public access to this share')
remove_parser = subparsers.add_parser('remove',
help='Remove an existing share')
remove_parser.add_argument('--name', required=True,
help='Name of the share to remove')
subparsers.required = True
return parser.parse_args()
def load_augeas():
"""Initialize augeas for this app's configuration file."""
aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
augeas.Augeas.NO_MODL_AUTOLOAD)
aug.set('/augeas/load/Httpd/lens', 'Httpd.lns')
aug.set('/augeas/load/Httpd/incl[last() + 1]', APACHE_CONFIGURATION)
aug.load()
aug.defvar('conf', '/files' + APACHE_CONFIGURATION)
return aug
def subcommand_add(arguments):
"""Add a share to Apache configuration."""
name = arguments.name
path = '"' + arguments.path.replace('"', r'\"') + '"'
groups = arguments.groups
is_public = arguments.is_public
url = '/share/' + name
if not os.path.exists(APACHE_CONFIGURATION):
pathlib.Path(APACHE_CONFIGURATION).touch()
aug = load_augeas()
shares = _list(aug)
if any([share for share in shares if share['name'] == name]):
raise Exception('Share already present')
aug.set('$conf/directive[last() + 1]', 'Alias')
aug.set('$conf/directive[last()]/arg[1]', url)
aug.set('$conf/directive[last()]/arg[2]', path)
aug.set('$conf/Location[last() + 1]/arg', url)
aug.set('$conf/Location[last()]/directive[last() + 1]', 'Include')
aug.set('$conf/Location[last()]/directive[last()]/arg',
'includes/freedombox-sharing.conf')
if not is_public:
aug.set('$conf/Location[last()]/directive[last() + 1]', 'Include')
aug.set('$conf/Location[last()]/directive[last()]/arg',
'includes/freedombox-single-sign-on.conf')
aug.set('$conf/Location[last()]/IfModule/arg', 'mod_auth_pubtkt.c')
aug.set('$conf/Location[last()]/IfModule/directive[1]', 'TKTAuthToken')
for group_name in groups:
aug.set(
'$conf/Location[last()]/IfModule/directive[1]/arg[last() + 1]',
group_name)
else:
aug.set('$conf/Location[last()]/directive[last() + 1]', 'Require')
aug.set('$conf/Location[last()]/directive[last()]/arg[1]', 'all')
aug.set('$conf/Location[last()]/directive[last()]/arg[2]', 'granted')
aug.save()
with action_utils.WebserverChange() as webserver_change:
webserver_change.enable('sharing-freedombox')
def subcommand_remove(arguments):
"""Remove a share from Apache configuration."""
url_to_remove = '/share/' + arguments.name
aug = load_augeas()
for directive in aug.match('$conf/directive'):
if aug.get(directive) != 'Alias':
continue
url = aug.get(directive + '/arg[1]')
if url == url_to_remove:
aug.remove(directive)
for location in aug.match('$conf/Location'):
url = aug.get(location + '/arg')
if url == url_to_remove:
aug.remove(location)
aug.save()
with action_utils.WebserverChange() as webserver_change:
webserver_change.enable('sharing-freedombox')
def _get_name_from_url(url):
"""Return the name of the share given the URL for it."""
matches = re.match(r'/share/([a-z0-9\-]*)', url)
if not matches:
raise ValueError
return matches[1]
def _list(aug=None):
"""List all Apache configuration shares."""
if not aug:
aug = load_augeas()
shares = []
for match in aug.match('$conf/directive'):
if aug.get(match) != 'Alias':
continue
url = aug.get(match + '/arg[1]')
path = aug.get(match + '/arg[2]')
path = path.removesuffix('"').removeprefix('"')
path = path.replace(r'\"', '"')
try:
name = _get_name_from_url(url)
shares.append({
'name': name,
'path': path,
'url': '/share/' + name
})
except ValueError:
continue
for location in aug.match('$conf/Location'):
url = aug.get(location + '/arg')
try:
name = _get_name_from_url(url)
except ValueError:
continue
groups = []
for group in aug.match(location + '//directive["TKTAuthToken"]/arg'):
groups.append(aug.get(group))
def _is_public():
"""Must contain the line 'Require all granted'."""
require = location + '//directive["Require"]'
return bool(aug.match(require)) and aug.get(
require +
'/arg[1]') == 'all' and aug.get(require +
'/arg[2]') == 'granted'
for share in shares:
if share['name'] == name:
share['groups'] = groups
share['is_public'] = _is_public()
return shares
def subcommand_list(_):
"""List all Apache configuration shares and print as JSON."""
print(json.dumps({'shares': _list()}))
def main():
"""Parse arguments and perform all duties"""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()

View File

@ -1,13 +1,8 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
FreedomBox app to configure sharing.
"""
import json
"""FreedomBox app to configure sharing."""
from django.utils.translation import gettext_lazy as _
from plinth import actions
from plinth import app as app_module
from plinth import cfg, menu
from plinth.modules.apache.components import Webserver
@ -60,22 +55,3 @@ class SharingApp(app_module.App):
super().setup(old_version)
privileged.setup()
self.enable()
def list_shares():
"""Return a list of shares."""
output = actions.superuser_run('sharing', ['list'])
return json.loads(output)['shares']
def add_share(name, path, groups, is_public):
"""Add a new share by called the action script."""
args = ['add', '--name', name, '--path', path, '--groups'] + groups
if is_public:
args.append('--is-public')
actions.superuser_run('sharing', args)
def remove_share(name):
"""Remove a share by calling the action script."""
actions.superuser_run('sharing', ['remove', '--name', name])

View File

@ -1,15 +1,14 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Django forms for sharing app.
"""
"""Django forms for sharing app."""
from django import forms
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
from plinth.modules import sharing
from plinth.modules.users.components import UsersAndGroups
from . import privileged
class AddShareForm(forms.Form):
"""Form to add a new share."""
@ -47,7 +46,7 @@ class AddShareForm(forms.Form):
if 'name' in self.initial and name == self.initial['name']:
return name
if any((share for share in sharing.list_shares()
if any((share for share in privileged.list_shares()
if name == share['name'])):
raise ValidationError(_('A share with this name already exists.'))

View File

@ -1,8 +1,13 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Configure sharing."""
import os
import pathlib
import re
import augeas
from plinth import action_utils
from plinth.actions import privileged
APACHE_CONFIGURATION = '/etc/apache2/conf-available/sharing-freedombox.conf'
@ -14,3 +19,157 @@ def setup():
path = pathlib.Path(APACHE_CONFIGURATION)
if not path.exists():
path.touch()
def load_augeas():
"""Initialize augeas for this app's configuration file."""
aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
augeas.Augeas.NO_MODL_AUTOLOAD)
aug.set('/augeas/load/Httpd/lens', 'Httpd.lns')
aug.set('/augeas/load/Httpd/incl[last() + 1]', APACHE_CONFIGURATION)
aug.load()
aug.defvar('conf', '/files' + APACHE_CONFIGURATION)
return aug
@privileged
def add(name: str, path: str, groups: list[str], is_public: bool):
"""Add a share to Apache configuration."""
path = '"' + path.replace('"', r'\"') + '"'
url = '/share/' + name
if not os.path.exists(APACHE_CONFIGURATION):
pathlib.Path(APACHE_CONFIGURATION).touch()
aug = load_augeas()
shares = _list(aug)
if any([share for share in shares if share['name'] == name]):
raise Exception('Share already present')
aug.set('$conf/directive[last() + 1]', 'Alias')
aug.set('$conf/directive[last()]/arg[1]', url)
aug.set('$conf/directive[last()]/arg[2]', path)
aug.set('$conf/Location[last() + 1]/arg', url)
aug.set('$conf/Location[last()]/directive[last() + 1]', 'Include')
aug.set('$conf/Location[last()]/directive[last()]/arg',
'includes/freedombox-sharing.conf')
if not is_public:
aug.set('$conf/Location[last()]/directive[last() + 1]', 'Include')
aug.set('$conf/Location[last()]/directive[last()]/arg',
'includes/freedombox-single-sign-on.conf')
aug.set('$conf/Location[last()]/IfModule/arg', 'mod_auth_pubtkt.c')
aug.set('$conf/Location[last()]/IfModule/directive[1]', 'TKTAuthToken')
for group_name in groups:
aug.set(
'$conf/Location[last()]/IfModule/directive[1]/arg[last() + 1]',
group_name)
else:
aug.set('$conf/Location[last()]/directive[last() + 1]', 'Require')
aug.set('$conf/Location[last()]/directive[last()]/arg[1]', 'all')
aug.set('$conf/Location[last()]/directive[last()]/arg[2]', 'granted')
aug.save()
with action_utils.WebserverChange() as webserver_change:
webserver_change.enable('sharing-freedombox')
@privileged
def remove(name: str):
"""Remove a share from Apache configuration."""
url_to_remove = '/share/' + name
aug = load_augeas()
for directive in aug.match('$conf/directive'):
if aug.get(directive) != 'Alias':
continue
url = aug.get(directive + '/arg[1]')
if url == url_to_remove:
aug.remove(directive)
for location in aug.match('$conf/Location'):
url = aug.get(location + '/arg')
if url == url_to_remove:
aug.remove(location)
aug.save()
with action_utils.WebserverChange() as webserver_change:
webserver_change.enable('sharing-freedombox')
def _get_name_from_url(url):
"""Return the name of the share given the URL for it."""
matches = re.match(r'/share/([a-z0-9\-]*)', url)
if not matches:
raise ValueError
return matches[1]
def _list(aug=None):
"""List all Apache configuration shares."""
if not aug:
aug = load_augeas()
shares = []
for match in aug.match('$conf/directive'):
if aug.get(match) != 'Alias':
continue
url = aug.get(match + '/arg[1]')
path = aug.get(match + '/arg[2]')
path = path.removesuffix('"').removeprefix('"')
path = path.replace(r'\"', '"')
try:
name = _get_name_from_url(url)
shares.append({
'name': name,
'path': path,
'url': '/share/' + name
})
except ValueError:
continue
for location in aug.match('$conf/Location'):
url = aug.get(location + '/arg')
try:
name = _get_name_from_url(url)
except ValueError:
continue
groups = []
for group in aug.match(location + '//directive["TKTAuthToken"]/arg'):
groups.append(aug.get(group))
def _is_public():
"""Must contain the line 'Require all granted'."""
require = location + '//directive["Require"]'
return bool(aug.match(require)) and aug.get(
require +
'/arg[1]') == 'all' and aug.get(require +
'/arg[2]') == 'granted'
for share in shares:
if share['name'] == name:
share['groups'] = groups
share['is_public'] = _is_public()
return shares
@privileged
def list_shares() -> list[dict[str, object]]:
"""List all Apache configuration shares and print as JSON."""
return _list()

View File

@ -1,7 +1,5 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Views for the sharing app.
"""
"""Views for the sharing app."""
from django.contrib import messages
from django.contrib.messages.views import SuccessMessageMixin
@ -12,26 +10,28 @@ from django.utils.translation import gettext_lazy as _
from django.views.decorators.http import require_POST
from django.views.generic import FormView
from plinth.modules import sharing
from plinth.views import AppView
from . import privileged
from .forms import AddShareForm
class SharingAppView(AppView):
"""Sharing configuration page."""
app_id = 'sharing'
template_name = 'sharing.html'
def get_context_data(self, **kwargs):
"""Return additional context for rendering the template."""
context = super().get_context_data(**kwargs)
context['shares'] = sharing.list_shares()
context['shares'] = privileged.list_shares()
return context
class AddShareView(SuccessMessageMixin, FormView):
"""View to add a new share."""
form_class = AddShareForm
prefix = 'sharing'
template_name = 'sharing_add_edit.html'
@ -52,6 +52,7 @@ class AddShareView(SuccessMessageMixin, FormView):
class EditShareView(SuccessMessageMixin, FormView):
"""View to edit an existing share."""
form_class = AddShareForm
prefix = 'sharing'
template_name = 'sharing_add_edit.html'
@ -68,7 +69,7 @@ class EditShareView(SuccessMessageMixin, FormView):
"""Load information about share being edited."""
try:
return [
share for share in sharing.list_shares()
share for share in privileged.list_shares()
if share['name'] == self.kwargs['name']
][0]
except IndexError:
@ -77,20 +78,20 @@ class EditShareView(SuccessMessageMixin, FormView):
def form_valid(self, form):
"""Add the share on valid form submission."""
if form.initial != form.cleaned_data:
sharing.remove_share(form.initial['name'])
privileged.remove(form.initial['name'])
_add_share(form.cleaned_data)
return super().form_valid(form)
def _add_share(form_data):
sharing.add_share(form_data['name'], form_data['path'],
form_data['groups'], form_data['is_public'])
privileged.add(form_data['name'], form_data['path'], form_data['groups'],
form_data['is_public'])
@require_POST
def remove(request, name):
"""View to remove a share."""
sharing.remove_share(name)
privileged.remove(name)
messages.success(request, _('Share deleted.'))
return redirect(reverse_lazy('sharing:index'))