gitweb: Use privileged decorator for actions

Tests:

- Functions tests work
- Initial setup works
  - Global default branch is set to 'main'
- Creating an repository works
- Cloning a repository works
  - Progress is shown on the app page
- List of repositories is shown properly in the app page
- Deleting a repo works
- Editing a repository works
  - Repository information is shown properly in the form
  - Renaming a repository
  - Setting description
  - Setting owner
  - Setting a repository private/public
  - Setting default branch (list of branches is shown properly)
  - Error is thrown properly when a remote repository does not exist
- Errors are handled properly when creating/editing/deleting repo

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
Sunil Mohan Adapa 2022-09-01 23:13:31 -07:00 committed by James Valleroy
parent a62b7c7522
commit b91f1cf922
No known key found for this signature in database
GPG Key ID: 77C0C75E7B650808
8 changed files with 271 additions and 449 deletions

View File

@ -1,24 +1,19 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
FreedomBox app to configure Gitweb.
"""
"""FreedomBox app to configure Gitweb."""
import json
import os
from django.utils.translation import gettext_lazy as _
from plinth import actions
from plinth import app as app_module
from plinth import frontpage, menu
from plinth.errors import ActionError
from plinth.modules.apache.components import Webserver
from plinth.modules.backups.components import BackupRestore
from plinth.modules.firewall.components import Firewall
from plinth.modules.users.components import UsersAndGroups
from plinth.package import Packages
from . import manifest
from . import manifest, privileged
from .forms import is_repo_url
from .manifest import GIT_REPO_PATH
@ -129,7 +124,7 @@ class GitwebApp(app_module.App):
def setup(self, old_version):
"""Install and configure the app."""
super().setup(old_version)
actions.superuser_run('gitweb', ['setup'])
privileged.setup()
self.enable()
@ -161,36 +156,31 @@ class GitwebBackupRestore(BackupRestore):
self.app.update_service_access()
def repo_exists(name):
"""Check whether a remote repository exists."""
try:
actions.run('gitweb', ['check-repo-exists', '--url', name])
except ActionError:
return False
return True
def have_public_repos(repos):
"""Check for public repositories"""
"""Check for public repositories."""
return any((repo['access'] == 'public' for repo in repos))
def create_repo(repo, repo_description, owner, is_private):
"""Create a new repository or clone a remote repository."""
args = ['--description', repo_description, '--owner', owner]
if is_private:
args.append('--is-private')
kwargs = {
'url': None,
'name': None,
'description': repo_description,
'owner': owner,
'is_private': is_private
}
if is_repo_url(repo):
args = ['create-repo', '--url', repo] + args
kwargs['url'] = repo
# create a repo directory and set correct access rights
actions.superuser_run('gitweb', args + ['--prepare-only'])
privileged.create_repo(prepare_only=True, **kwargs)
# start cloning in background
actions.superuser_run('gitweb', args + ['--skip-prepare'],
run_in_background=True)
privileged.create_repo(skip_prepare=True, _run_in_background=True,
**kwargs)
else:
args = ['create-repo', '--name', repo] + args
actions.superuser_run('gitweb', args)
kwargs['name'] = repo
privileged.create_repo(**kwargs)
def get_repo_list():
@ -223,8 +213,7 @@ def get_repo_list():
def repo_info(repo):
"""Get information about repository."""
output = actions.run('gitweb', ['repo-info', '--name', repo])
info = json.loads(output)
info = privileged.repo_info(repo)
if info['access'] == 'private':
info['is_private'] = True
else:
@ -234,72 +223,25 @@ def repo_info(repo):
return info
def _rename_repo(oldname, newname):
"""Rename a repository."""
args = ['rename-repo', '--oldname', oldname, '--newname', newname]
actions.superuser_run('gitweb', args)
def _set_default_branch(repo, branch):
"""Set default branch of the repository."""
args = [
'set-default-branch',
'--name',
repo,
'--branch',
branch,
]
actions.superuser_run('gitweb', args)
def _set_repo_description(repo, repo_description):
"""Set description of the repository."""
args = [
'set-repo-description',
'--name',
repo,
'--description',
repo_description,
]
actions.superuser_run('gitweb', args)
def _set_repo_owner(repo, owner):
"""Set repository's owner name."""
args = ['set-repo-owner', '--name', repo, '--owner', owner]
actions.superuser_run('gitweb', args)
def _set_repo_access(repo, access):
"""Set repository's owner name."""
args = ['set-repo-access', '--name', repo, '--access', access]
actions.superuser_run('gitweb', args)
def edit_repo(form_initial, form_cleaned):
"""Edit repository data."""
repo = form_initial['name']
if form_cleaned['name'] != repo:
_rename_repo(repo, form_cleaned['name'])
privileged.rename_repo(repo, form_cleaned['name'])
repo = form_cleaned['name']
if form_cleaned['description'] != form_initial['description']:
_set_repo_description(repo, form_cleaned['description'])
privileged.set_repo_description(repo, form_cleaned['description'])
if form_cleaned['owner'] != form_initial['owner']:
_set_repo_owner(repo, form_cleaned['owner'])
privileged.set_repo_owner(repo, form_cleaned['owner'])
if form_cleaned['is_private'] != form_initial['is_private']:
if form_cleaned['is_private']:
_set_repo_access(repo, 'private')
privileged.set_repo_access(repo, 'private')
else:
_set_repo_access(repo, 'public')
privileged.set_repo_access(repo, 'public')
if form_cleaned['default_branch'] != form_initial['default_branch']:
_set_default_branch(repo, form_cleaned['default_branch'])
def delete_repo(repo):
"""Delete a repository."""
actions.superuser_run('gitweb', ['delete-repo', '--name', repo])
privileged.set_default_branch(repo, form_cleaned['default_branch'])

View File

@ -3,7 +3,6 @@
Django form for configuring Gitweb.
"""
import json
import re
from urllib.parse import urlparse
@ -12,14 +11,14 @@ from django.core.exceptions import ValidationError
from django.core.validators import URLValidator
from django.utils.translation import gettext_lazy as _
from plinth import actions
from plinth.modules import gitweb
from . import privileged
def _get_branches(repo):
"""Get all the branches in the repository."""
branch_data = json.loads(
actions.run('gitweb', ['get-branches', '--name', repo]))
branch_data = privileged.get_branches(repo)
default_branch = branch_data['default_branch']
branches = branch_data['branches']
@ -113,7 +112,7 @@ class CreateRepoForm(forms.Form):
_('A repository with this name already exists.'))
if is_repo_url(name):
if not gitweb.repo_exists(name):
if not privileged.repo_exists(name):
raise ValidationError('Remote repository is not available.')
return name

288
actions/gitweb → plinth/modules/gitweb/privileged.py Executable file → Normal file
View File

@ -1,145 +1,40 @@
#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Configuration helper for Gitweb.
"""
"""Configuration helper for Gitweb."""
import argparse
import configparser
import json
import logging
import os
import re
import shutil
import subprocess
import sys
import time
from typing import Any, Optional
from plinth import action_utils
from plinth.actions import privileged
from plinth.modules.gitweb.forms import RepositoryValidator, get_name_from_url
from plinth.modules.gitweb.manifest import GIT_REPO_PATH
logger = logging.getLogger(__name__)
class ValidateRepoName(argparse.Action):
def validate_repo_name(name: str) -> str:
"""Validate a repository name and add .git extension if necessary."""
RepositoryValidator()(name)
if not name.endswith('.git'):
name = name + '.git'
def __call__(self, parser, namespace, values, option_string=None):
RepositoryValidator()(values)
if not values.endswith('.git'):
values = values + '.git'
setattr(namespace, self.dest, values)
return name
class ValidateRepoUrl(argparse.Action):
def validate_repo_url(url: str) -> str:
"""Validate a repository URL."""
def __call__(self, parser, namespace, values, option_string=None):
RepositoryValidator(input_should_be='url')(values)
setattr(namespace, self.dest, values)
RepositoryValidator(input_should_be='url')(url)
return url
def parse_arguments():
"""Return parsed command line arguments as dictionary."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
subparsers.add_parser(
'setup', help='Perform post-installation operations for Gitweb')
subparser = subparsers.add_parser('create-repo',
help='Create a new repository')
group = subparser.add_mutually_exclusive_group(required=True)
group.add_argument('--name', action=ValidateRepoName,
help='Name of the repository')
group.add_argument('--url', action=ValidateRepoUrl,
help='URL of the remote repository')
subparser.add_argument('--description', required=True,
help='Description of the repository')
subparser.add_argument('--owner', required=True,
help='Repositorys owner name')
subparser.add_argument(
'--is-private', required=False, default=False, action='store_true',
help='Allow only authorized users to access this repository')
subparser.add_argument(
'--keep-ownership', required=False, default=False, action="store_true",
help='Do not chanege ownership of the repository directory')
subparser.add_argument('--prepare-only', required=False, default=False,
action='store_true',
help='Run preparation tasks for cloning.')
subparser.add_argument('--skip-prepare', required=False, default=False,
action='store_true',
help='Skip preparation tasks for cloning.')
subparser = subparsers.add_parser(
'repo-info', help='Get information about the repository')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser = subparsers.add_parser(
'check-repo-exists', help='Check whether the remote repository exists')
subparser.add_argument('--url', required=True, action=ValidateRepoUrl,
help='URL of the remote repository')
subparser = subparsers.add_parser('rename-repo',
help='Rename an repository')
subparser.add_argument('--oldname', required=True, action=ValidateRepoName,
help='Old name of the repository')
subparser.add_argument('--newname', required=True, action=ValidateRepoName,
help='New name of the repository')
subparser = subparsers.add_parser(
'set-default-branch', help='Set default branch of the repository')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser.add_argument('--branch', required=True,
help='Name of the branch')
subparser = subparsers.add_parser(
'get-branches', help='Get all the branches of the repository')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser = subparsers.add_parser('set-repo-description',
help='Set description of the repository')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser.add_argument('--description', required=True,
help='Description of the repository')
subparser = subparsers.add_parser('set-repo-owner',
help='Set repository\'s owner name')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser.add_argument('--owner', required=True,
help='Repositorys owner name')
subparser = subparsers.add_parser(
'set-repo-access', help='Set repository as private or public')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser.add_argument('--access', required=True,
choices=['public', 'private'], help='Access status')
subparser = subparsers.add_parser('delete-repo',
help='Delete an existing repository')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository to remove')
subparsers.required = True
args = parser.parse_args()
if args.subcommand == 'create-repo' and args.name:
if args.prepare_only:
parser.error('--prepare-only can be set when using --url')
if args.skip_prepare:
parser.error('--skip-prepare can be set when using --url')
return args
def subcommand_setup(_):
@privileged
def setup():
"""Disable default Apache2 Gitweb configuration."""
action_utils.webserver_disable('gitweb')
if not _get_global_default_branch():
@ -215,9 +110,9 @@ def _clone_with_progress_report(url, repo_dir):
raise RuntimeError('Git repository cloning failed.', errors)
def _prepare_clone_repo(arguments):
def _prepare_clone_repo(url: str, is_private: bool):
"""Prepare cloning a repository."""
repo_name = get_name_from_url(arguments.url)
repo_name = get_name_from_url(url)
if not repo_name.endswith('.git'):
repo_name = repo_name + '.git'
@ -226,7 +121,7 @@ def _prepare_clone_repo(arguments):
status_file = os.path.join(repo_dir, 'clone_progress')
try:
if arguments.is_private:
if is_private:
_set_access_status(repo_name, 'private')
with open(status_file, 'w', encoding='utf-8') as file_handle:
file_handle.write('0')
@ -255,9 +150,8 @@ def _clone_status_line_to_percent(line):
return None
def _clone_repo(arguments):
def _clone_repo(url: str, description: str, owner: str, keep_ownership: bool):
"""Clone a repository."""
url = arguments.url
repo = get_name_from_url(url)
if not repo.endswith('.git'):
repo = repo + '.git'
@ -271,26 +165,26 @@ def _clone_repo(arguments):
shutil.move(os.path.join(repo_temp_path, item), repo_path)
shutil.rmtree(repo_temp_path)
if not arguments.keep_ownership:
if not keep_ownership:
subprocess.check_call(['chown', '-R', 'www-data:www-data', repo],
cwd=GIT_REPO_PATH)
_set_repo_description(repo, arguments.description)
_set_repo_owner(repo, arguments.owner)
_set_repo_description(repo, description)
_set_repo_owner(repo, owner)
def _create_repo(arguments):
def _create_repo(repo: str, description: str, owner: str, is_private: bool,
keep_ownership: bool):
"""Create an empty repository."""
repo = arguments.name
try:
subprocess.check_call(['git', 'init', '-q', '--bare', repo],
cwd=GIT_REPO_PATH)
if not arguments.keep_ownership:
if not keep_ownership:
subprocess.check_call(['chown', '-R', 'www-data:www-data', repo],
cwd=GIT_REPO_PATH)
_set_repo_description(repo, arguments.description)
_set_repo_owner(repo, arguments.owner)
if arguments.is_private:
_set_repo_description(repo, description)
_set_repo_owner(repo, owner)
if is_private:
_set_access_status(repo, 'private')
except (subprocess.CalledProcessError, OSError):
repo_path = os.path.join(GIT_REPO_PATH, repo)
@ -363,7 +257,7 @@ def _get_access_status(repo):
def _set_access_status(repo, status):
"""Set repository as private or public"""
"""Set repository as private or public."""
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
if status == 'private':
open(private_file, 'a', encoding='utf-8')
@ -381,30 +275,30 @@ def _get_branches(repo):
return output.decode().strip().split()
def subcommand_get_branches(arguments):
@privileged
def get_branches(name: str) -> dict[str, Any]:
"""Check whether a branch exists in the repository."""
repo = arguments.name
print(
json.dumps(
dict(default_branch=_get_default_branch(repo),
branches=_get_branches(repo))))
repo = validate_repo_name(name)
return dict(default_branch=_get_default_branch(repo),
branches=_get_branches(repo))
def subcommand_rename_repo(arguments):
@privileged
def rename_repo(old_name: str, new_name: str):
"""Rename a repository."""
oldpath = os.path.join(GIT_REPO_PATH, arguments.oldname)
newpath = os.path.join(GIT_REPO_PATH, arguments.newname)
old_name = validate_repo_name(old_name)
new_name = validate_repo_name(new_name)
oldpath = os.path.join(GIT_REPO_PATH, old_name)
newpath = os.path.join(GIT_REPO_PATH, new_name)
os.rename(oldpath, newpath)
def subcommand_set_default_branch(arguments):
@privileged
def set_default_branch(name: str, branch: str):
"""Set description of the repository."""
repo = arguments.name
branch = arguments.branch
repo = validate_repo_name(name)
if branch not in _get_branches(repo):
sys.exit('No such branch.')
raise ValueError('No such branch')
subprocess.check_call([
'git', '-C', repo, 'symbolic-ref', 'HEAD',
@ -412,71 +306,81 @@ def subcommand_set_default_branch(arguments):
], cwd=GIT_REPO_PATH)
def subcommand_set_repo_description(arguments):
@privileged
def set_repo_description(name: str, description: str):
"""Set description of the repository."""
_set_repo_description(arguments.name, arguments.description)
repo = validate_repo_name(name)
_set_repo_description(repo, description)
def subcommand_set_repo_owner(arguments):
@privileged
def set_repo_owner(name: str, owner: str):
"""Set repository's owner name."""
_set_repo_owner(arguments.name, arguments.owner)
repo = validate_repo_name(name)
_set_repo_owner(repo, owner)
def subcommand_set_repo_access(arguments):
@privileged
def set_repo_access(name: str, access: str):
"""Set repository's access status."""
_set_access_status(arguments.name, arguments.access)
repo = validate_repo_name(name)
if access not in ('public', 'private'):
raise ValueError('Invalid access parameter')
_set_access_status(repo, access)
def subcommand_repo_info(arguments):
@privileged
def repo_info(name: str) -> dict[str, str]:
"""Get information about repository."""
repo_path = os.path.join(GIT_REPO_PATH, arguments.name)
repo = validate_repo_name(name)
repo_path = os.path.join(GIT_REPO_PATH, repo)
if not os.path.exists(repo_path):
raise RuntimeError('Repository not found')
print(
json.dumps(
dict(
name=arguments.name[:-4],
description=_get_repo_description(arguments.name),
owner=_get_repo_owner(arguments.name),
access=_get_access_status(arguments.name),
default_branch=_get_default_branch(arguments.name),
)))
return dict(name=repo[:-4], description=_get_repo_description(repo),
owner=_get_repo_owner(repo), access=_get_access_status(repo),
default_branch=_get_default_branch(repo))
def subcommand_create_repo(arguments):
@privileged
def create_repo(url: Optional[str] = None, name: Optional[str] = None,
description: str = '', owner: str = '',
keep_ownership: bool = False, is_private: bool = False,
skip_prepare: bool = False, prepare_only: bool = False):
"""Create a new or clone a remote repository."""
if arguments.url:
if not arguments.skip_prepare:
_prepare_clone_repo(arguments)
if url:
url = validate_repo_url(url)
if not arguments.prepare_only:
_clone_repo(arguments)
else:
_create_repo(arguments)
if name:
repo = validate_repo_name(name)
if url:
if not skip_prepare:
_prepare_clone_repo(url, is_private)
if not prepare_only:
_clone_repo(url, description, owner, keep_ownership)
elif repo is not None:
_create_repo(repo, description, owner, is_private, keep_ownership)
def subcommand_check_repo_exists(arguments):
"""Check whether remote repository exists."""
@privileged
def repo_exists(url: str) -> bool:
"""Return whether remote repository exists."""
url = validate_repo_url(url)
env = dict(os.environ, GIT_TERMINAL_PROMPT='0')
subprocess.check_call(['git', 'ls-remote', arguments.url, 'HEAD'],
timeout=10, env=env)
try:
subprocess.check_call(['git', 'ls-remote', url, 'HEAD'], timeout=10,
env=env)
return True
except subprocess.CalledProcessError:
return False
def subcommand_delete_repo(arguments):
@privileged
def delete_repo(name: str):
"""Delete a git repository."""
repo_path = os.path.join(GIT_REPO_PATH, arguments.name)
repo = validate_repo_name(name)
repo_path = os.path.join(GIT_REPO_PATH, repo)
shutil.rmtree(repo_path)
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()

View File

@ -1,129 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Test module for gitweb module operations.
"""
import json
import pytest
from django.forms import ValidationError
REPO_NAME = 'Test-repo'
REPO_DATA = {
'name': REPO_NAME,
'description': '',
'owner': '',
'access': 'private',
}
actions_name = 'gitweb'
@pytest.fixture(autouse=True)
def fixture_set_repo_path(actions_module, tmpdir):
"""Set a repository path in the actions module."""
actions_module.GIT_REPO_PATH = str(tmpdir)
@pytest.fixture(name='existing_repo')
def fixture_existing_repo(call_action):
"""A fixture to create a repository."""
try:
call_action(['delete-repo', '--name', REPO_NAME])
except FileNotFoundError:
pass
call_action([
'create-repo', '--name', REPO_NAME, '--description', '', '--owner', '',
'--is-private', '--keep-ownership'
])
def test_create_repo(call_action):
"""Test creating a repository."""
call_action([
'create-repo', '--name', REPO_NAME, '--description', '', '--owner', '',
'--is-private', '--keep-ownership'
])
repo = json.loads(call_action(['repo-info', '--name', REPO_NAME]))
default_branch = repo.pop('default_branch')
assert repo == REPO_DATA
assert len(default_branch) > 0
def test_change_repo_medatada(call_action, existing_repo):
"""Test change a metadata of the repository."""
new_data = {
'name': REPO_NAME,
'description': 'description2',
'owner': 'owner2',
'access': 'public',
}
call_action([
'set-repo-description', '--name', REPO_NAME, '--description',
new_data['description']
])
call_action(
['set-repo-owner', '--name', REPO_NAME, '--owner', new_data['owner']])
call_action([
'set-repo-access', '--name', REPO_NAME, '--access', new_data['access']
])
repo = json.loads(call_action(['repo-info', '--name', REPO_NAME]))
del repo['default_branch']
assert repo == new_data
def test_rename_repository(call_action, existing_repo):
"""Test renaming a repository."""
new_name = 'Test-repo_2'
call_action(['rename-repo', '--oldname', REPO_NAME, '--newname', new_name])
with pytest.raises(RuntimeError, match='Repository not found'):
call_action(['repo-info', '--name', REPO_NAME])
repo = json.loads(call_action(['repo-info', '--name', new_name]))
assert repo['name'] == new_name
def test_get_branches(call_action, existing_repo):
"""Test getting all the branches of the repository."""
result = json.loads(call_action(['get-branches', '--name', REPO_NAME]))
assert 'default_branch' in result
assert result['branches'] == []
def test_delete_repository(call_action, existing_repo):
"""Test deleting a repository."""
call_action(['delete-repo', '--name', REPO_NAME])
with pytest.raises(RuntimeError, match='Repository not found'):
call_action(['repo-info', '--name', REPO_NAME])
@pytest.mark.parametrize(
'name',
['.Test-repo', 'Test-repo.git.git', '/root/Test-repo', 'Test-repö'])
def test_action_create_repo_with_invalid_names(call_action, name):
"""Test that creating repository with invalid names fails."""
with pytest.raises(ValidationError):
call_action([
'create-repo', '--name', name, '--description', '', '--owner', '',
'--keep-ownership'
])
@pytest.mark.parametrize('url', [
'Test-repo', 'file://root/Test-repo', 'localhost/Test-repo',
'ssh://localhost/Test-repo', 'https://localhost/.Test-repo'
])
def test_action_create_repo_with_invalid_urls(call_action, url):
"""Test that cloning repository with invalid URL fails."""
with pytest.raises(ValidationError):
call_action([
'create-repo', '--url', url, '--description', '', '--owner', '',
'--keep-ownership'
])

View File

@ -237,7 +237,7 @@ def _gitweb_git_command_is_successful(command, cwd):
if process.returncode != 0:
if 'Authentication failed' in process.stderr.decode():
return False
print(process.stdout.decode())
process.check_returncode() # Raise exception
return True

View File

@ -0,0 +1,114 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Test module for gitweb module operations."""
import pytest
from django.forms import ValidationError
from plinth.modules.gitweb import privileged
REPO_NAME = 'Test-repo'
REPO_DATA = {
'name': REPO_NAME,
'description': '',
'owner': '',
'access': 'private',
}
pytestmark = pytest.mark.usefixtures('mock_privileged')
privileged_modules_to_mock = ['plinth.modules.gitweb.privileged']
@pytest.fixture(autouse=True)
def fixture_set_repo_path(tmpdir):
"""Set a repository path in the actions module."""
privileged.GIT_REPO_PATH = str(tmpdir)
@pytest.fixture(name='existing_repo')
def fixture_existing_repo():
"""A fixture to create a repository."""
try:
privileged.delete_repo(REPO_NAME)
except FileNotFoundError:
pass
privileged.create_repo(name=REPO_NAME, description='', owner='',
keep_ownership=True, is_private=True)
def test_create_repo():
"""Test creating a repository."""
privileged.create_repo(name=REPO_NAME, description='', owner='',
is_private=True, keep_ownership=True)
repo = privileged.repo_info(REPO_NAME)
default_branch = repo.pop('default_branch')
assert repo == REPO_DATA
assert default_branch
def test_change_repo_medatada(existing_repo):
"""Test change a metadata of the repository."""
new_data = {
'name': REPO_NAME,
'description': 'description2',
'owner': 'owner2',
'access': 'public',
}
privileged.set_repo_description(REPO_NAME, new_data['description'])
privileged.set_repo_owner(REPO_NAME, new_data['owner'])
privileged.set_repo_access(REPO_NAME, new_data['access'])
repo = privileged.repo_info(REPO_NAME)
del repo['default_branch']
assert repo == new_data
def test_rename_repository(existing_repo):
"""Test renaming a repository."""
new_name = 'Test-repo_2'
privileged.rename_repo(REPO_NAME, new_name)
with pytest.raises(RuntimeError, match='Repository not found'):
privileged.repo_info(REPO_NAME)
repo = privileged.repo_info(new_name)
assert repo['name'] == new_name
def test_get_branches(existing_repo):
"""Test getting all the branches of the repository."""
result = privileged.get_branches(REPO_NAME)
assert 'default_branch' in result
assert result['branches'] == []
def test_delete_repository(existing_repo):
"""Test deleting a repository."""
privileged.delete_repo(REPO_NAME)
with pytest.raises(RuntimeError, match='Repository not found'):
privileged.repo_info(REPO_NAME)
@pytest.mark.parametrize(
'name',
['.Test-repo', 'Test-repo.git.git', '/root/Test-repo', 'Test-repö'])
def test_action_create_repo_with_invalid_names(name):
"""Test that creating repository with invalid names fails."""
with pytest.raises(ValidationError):
privileged.create_repo(name=name, description='', owner='',
keep_ownership=True)
@pytest.mark.parametrize('url', [
'Test-repo', 'file://root/Test-repo', 'localhost/Test-repo',
'ssh://localhost/Test-repo', 'https://localhost/.Test-repo'
])
def test_action_create_repo_with_invalid_urls(url):
"""Test that cloning repository with invalid URL fails."""
with pytest.raises(ValidationError):
privileged.create_repo(url=url, description='', owner='',
keep_ownership=True)

View File

@ -3,7 +3,6 @@
Tests for gitweb views.
"""
import json
from unittest.mock import Mock, patch
import pytest
@ -12,7 +11,6 @@ from django.contrib.messages.storage.fallback import FallbackStorage
from django.http.response import Http404
from plinth import module_loader
from plinth.errors import ActionError
from plinth.modules.gitweb import views
# For all tests, use plinth.urls instead of urls configured for testing
@ -48,31 +46,28 @@ def fixture_gitweb_urls():
yield
def action_run(*args, **kwargs):
"""Action return values."""
subcommand = args[1][0]
if subcommand == 'repo-info':
return json.dumps(EXISTING_REPOS[0])
elif subcommand == 'check-repo-exists':
return True
elif subcommand == 'get-branches':
return json.dumps({
"default_branch": "main",
"branches": ["main", "branch1"]
})
return None
@pytest.fixture(autouse=True)
def gitweb_patch():
"""Patch gitweb."""
privileged = 'plinth.modules.gitweb.privileged'
with patch('plinth.modules.gitweb.get_repo_list') as get_repo_list, \
patch('plinth.app.App.get') as app_get, \
patch('plinth.actions.superuser_run', side_effect=action_run), \
patch('plinth.actions.run', side_effect=action_run):
patch('plinth.app.App.get') as app_get, \
patch(f'{privileged}.create_repo'), \
patch(f'{privileged}.repo_exists') as repo_exists,\
patch(f'{privileged}.repo_info') as repo_info, \
patch(f'{privileged}.rename_repo'), \
patch(f'{privileged}.set_repo_description'), \
patch(f'{privileged}.set_repo_owner'), \
patch(f'{privileged}.set_repo_access'), \
patch(f'{privileged}.set_default_branch'), \
patch(f'{privileged}.delete_repo'), \
patch(f'{privileged}.get_branches') as get_branches:
repo_exists.return_value = True
repo_info.return_value = dict(EXISTING_REPOS[0])
get_branches.return_value = {
'default_branch': 'main',
'branches': ['main', 'branch1']
}
get_repo_list.return_value = [{
'name': EXISTING_REPOS[0]['name']
}, {
@ -163,7 +158,7 @@ def test_create_repo_failed_view(rf):
general_error_message = "An error occurred while creating the repository."
error_description = 'some error'
with patch('plinth.modules.gitweb.create_repo',
side_effect=ActionError('gitweb', '', error_description)):
side_effect=PermissionError(error_description)):
form_data = {
'gitweb-name': 'something_other',
'gitweb-description': '',
@ -198,7 +193,8 @@ def test_clone_repo_view(rf):
def test_clone_repo_missing_remote_view(rf):
"""Test that cloning non-existing repo shows correct error message."""
with patch('plinth.modules.gitweb.repo_exists', return_value=False):
with patch('plinth.modules.gitweb.privileged.repo_exists',
return_value=False):
form_data = {
'gitweb-name': 'https://example.com/test.git',
'gitweb-description': '',
@ -306,7 +302,7 @@ def test_edit_repository_no_change_view(rf):
def test_edit_repository_failed_view(rf):
"""Test that failed repo editing sends correct error message."""
with patch('plinth.modules.gitweb.edit_repo',
side_effect=ActionError('Error')):
side_effect=PermissionError('Error')):
form_data = {
'gitweb-name': 'something_other',
'gitweb-description': 'test-description',
@ -347,8 +343,8 @@ def test_delete_repository_view(rf):
def test_delete_repository_fail_view(rf):
"""Test that failed repository deletion sends correct error message."""
with patch('plinth.modules.gitweb.delete_repo',
side_effect=ActionError('Error')):
with patch('plinth.modules.gitweb.privileged.delete_repo',
side_effect=FileNotFoundError('Error')):
response, messages = make_request(rf.post(''), views.delete,
name=EXISTING_REPOS[0]['name'])

View File

@ -1,7 +1,5 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Django views for Gitweb.
"""
"""Django views for Gitweb."""
from django.contrib import messages
from django.contrib.messages.views import SuccessMessageMixin
@ -12,12 +10,11 @@ from django.urls import reverse_lazy
from django.utils.translation import gettext as _
from django.views.generic import FormView
from plinth import actions
from plinth import app as app_module
from plinth import views
from plinth.errors import ActionError
from plinth.modules import gitweb
from . import privileged
from .forms import CreateRepoForm, EditRepoForm
@ -65,13 +62,12 @@ class CreateRepoView(SuccessMessageMixin, FormView):
try:
gitweb.create_repo(form_data['name'], form_data['description'],
form_data['owner'], form_data['is_private'])
except ActionError as error:
except Exception as error:
self.success_message = ''
error_text = error.args[2].split('\n')[0]
messages.error(
self.request, "{0} {1}".format(
_('An error occurred while creating the repository.'),
error_text))
error))
else:
app_module.App.get('gitweb').update_service_access()
@ -116,7 +112,7 @@ class EditRepoView(SuccessMessageMixin, FormView):
try:
gitweb.edit_repo(form.initial, form_data)
except ActionError:
except Exception:
messages.error(self.request,
_('An error occurred during configuration.'))
app_module.App.get('gitweb').update_service_access()
@ -139,9 +135,9 @@ def delete(request, name):
app = app_module.App.get('gitweb')
if request.method == 'POST':
try:
gitweb.delete_repo(name)
privileged.delete_repo(name)
messages.success(request, _('{name} deleted.').format(name=name))
except actions.ActionError as error:
except Exception as error:
messages.error(
request,
_('Could not delete {name}: {error}').format(