gitweb: Allow to import from a remote repository

- Allow to enter either name or URL when creating repository

- Validate a repository URL, allow only http and https schemes

- Set autocomplete off on the name/URL field because URL may contain
  username:password

- Check whether the repository actually exists before cloning

- Show progress info while cloning

- Actions script: new subcommand check-repo-exists and new arguments
  for the create-repo: --url, --prepare-only and --skip-prepare

- Add test for invalid URLs

Closes #1670

Signed-off-by: Veiko Aasa <veiko17@disroot.org>
[sunil: Fix validating repo name in edit form]
[sunil: Don't pipe stdin of clone process, it may lead to a hang]
[sunil: Always run clone process with 'C' locale since we are parsing output]
[sunil: Cosmetic changes]
Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: Sunil Mohan Adapa <sunil@medhas.org>
This commit is contained in:
Veiko Aasa 2019-11-01 15:13:08 +03:00 committed by Sunil Mohan Adapa
parent ca0c991562
commit c99b33b40e
No known key found for this signature in database
GPG Key ID: 43EA1CFF0AA7C5F2
6 changed files with 378 additions and 88 deletions

View File

@ -22,14 +22,37 @@ Configuration helper for Gitweb.
import argparse
import configparser
import json
import logging
import os
import re
import shutil
import subprocess
import time
from plinth import action_utils
from plinth.modules.gitweb.forms import validate_repository
from plinth.modules.gitweb.forms import RepositoryValidator, get_name_from_url
from plinth.modules.gitweb.manifest import GIT_REPO_PATH
logger = logging.getLogger(__name__)
class ValidateRepoName(argparse.Action):
"""Validate a repository name and add .git extension if necessary."""
def __call__(self, parser, namespace, values, option_string=None):
RepositoryValidator()(values)
if not values.endswith('.git'):
values = values + '.git'
setattr(namespace, self.dest, values)
class ValidateRepoUrl(argparse.Action):
"""Validate a repository URL."""
def __call__(self, parser, namespace, values, option_string=None):
RepositoryValidator(input_should_be='url')(values)
setattr(namespace, self.dest, values)
def parse_arguments():
"""Return parsed command line arguments as dictionary."""
@ -41,8 +64,11 @@ def parse_arguments():
subparser = subparsers.add_parser('create-repo',
help='Create a new repository')
subparser.add_argument('--name', required=True,
help='Name of the repository')
group = subparser.add_mutually_exclusive_group(required=True)
group.add_argument('--name', action=ValidateRepoName,
help='Name of the repository')
group.add_argument('--url', action=ValidateRepoUrl,
help='URL of the remote repository')
subparser.add_argument('--description', required=True,
help='Description of the repository')
subparser.add_argument('--owner', required=True,
@ -53,58 +79,195 @@ def parse_arguments():
subparser.add_argument(
'--keep-ownership', required=False, default=False, action="store_true",
help='Do not chanege ownership of the repository directory')
subparser.add_argument('--prepare-only', required=False, default=False,
action='store_true',
help='Run preparation tasks for cloning.')
subparser.add_argument('--skip-prepare', required=False, default=False,
action='store_true',
help='Skip preparation tasks for cloning.')
subparser = subparsers.add_parser(
'repo-info', help='Get information about the repository')
subparser.add_argument('--name', required=True,
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser = subparsers.add_parser(
'check-repo-exists', help='Check whether the remote repository exists')
subparser.add_argument('--url', required=True, action=ValidateRepoUrl,
help='URL of the remote repository')
subparser = subparsers.add_parser('rename-repo',
help='Rename an repository')
subparser.add_argument('--oldname', required=True,
subparser.add_argument('--oldname', required=True, action=ValidateRepoName,
help='Old name of the repository')
subparser.add_argument('--newname', required=True,
subparser.add_argument('--newname', required=True, action=ValidateRepoName,
help='New name of the repository')
subparser = subparsers.add_parser('set-repo-description',
help='Set description of the repository')
subparser.add_argument('--name', required=True,
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser.add_argument('--description', required=True,
help='Description of the repository')
subparser = subparsers.add_parser('set-repo-owner',
help='Set repository\'s owner name')
subparser.add_argument('--name', required=True,
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser.add_argument('--owner', required=True,
help='Repositorys owner name')
subparser = subparsers.add_parser(
'set-repo-access', help='Set repository as private or public')
subparser.add_argument('--name', required=True,
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser.add_argument('--access', required=True,
choices=['public', 'private'], help='Access status')
subparser = subparsers.add_parser('delete-repo',
help='Delete an existing repository')
subparser.add_argument('--name', required=True,
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository to remove')
subparsers.required = True
return parser.parse_args()
args = parser.parse_args()
if args.subcommand == 'create-repo' and args.name:
if args.prepare_only:
parser.error('--prepare-only can be set when using --url')
if args.skip_prepare:
parser.error('--skip-prepare can be set when using --url')
return args
def subcommand_setup(_):
"""Disable default Apache2 Gitweb configuration"""
"""Disable default Apache2 Gitweb configuration."""
action_utils.webserver_disable('gitweb')
def _clone_with_progress_report(url, repo_dir):
"""Clone a repository and write progress info to the file."""
starttime = time.time()
status_file = os.path.join(repo_dir, 'clone_progress')
repo_temp_dir = os.path.join(repo_dir, '.temp')
# do not ask for credidentials and set low speed timeout
env = dict(os.environ, GIT_TERMINAL_PROMPT='0', LC_ALL='C',
GIT_HTTP_LOW_SPEED_LIMIT='100', GIT_HTTP_LOW_SPEED_TIME='60')
proc = subprocess.Popen(
['git', 'clone', '--bare', '--progress', url, repo_temp_dir],
stderr=subprocess.PIPE, text=True, env=env)
# write clone progress to the file
errors = []
while True:
line = proc.stderr.readline()
if not line:
break
if 'error:' in line or 'fatal:' in line:
errors.append(line.strip())
currenttime = time.time()
if currenttime - starttime > 1:
elapsed = _clone_status_line_to_percent(line)
if elapsed is not None:
with open(status_file, 'w') as file_handle:
file_handle.write(elapsed)
starttime = currenttime
# make sure process is ended
try:
proc.communicate(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
os.remove(status_file)
if proc.returncode != 0:
shutil.rmtree(repo_dir)
logger.error('Git repository cloning failed: %s', '\n'.join(errors))
raise RuntimeError('Git repository cloning failed.', errors)
def _prepare_clone_repo(arguments):
"""Prepare cloning a repository."""
repo_name = get_name_from_url(arguments.url)
if not repo_name.endswith('.git'):
repo_name = repo_name + '.git'
repo_dir = os.path.join(GIT_REPO_PATH, repo_name)
os.mkdir(repo_dir)
if arguments.is_private:
_set_access_status(repo_name, 'private')
status_file = os.path.join(repo_dir, 'clone_progress')
with open(status_file, 'w') as file_handle:
file_handle.write('0')
def _clone_status_line_to_percent(line):
"""Parse Git clone command output."""
result = re.match(r'.* ([0-9]+)% ', line)
if result is not None:
text = result.group(0)
progress = int(result.group(1))
if 'Counting objects' in text:
total_progress = 0.05 * progress
elif 'Compressing objects' in text:
total_progress = 5 + 0.05 * progress
elif 'Receiving objects' in text:
total_progress = 10 + 0.6 * progress
elif 'Resolving deltas' in text:
total_progress = 70 + 0.3 * progress
return str(int(total_progress))
return None
def _clone_repo(arguments):
"""Clone a repository."""
url = arguments.url
repo = get_name_from_url(url)
if not repo.endswith('.git'):
repo = repo + '.git'
repo_path = os.path.join(GIT_REPO_PATH, repo)
repo_temp_path = os.path.join(repo_path, '.temp')
_clone_with_progress_report(url, repo_path)
for item in os.listdir(repo_temp_path):
shutil.move(os.path.join(repo_temp_path, item), repo_path)
shutil.rmtree(repo_temp_path)
if not arguments.keep_ownership:
subprocess.check_call(['chown', '-R', 'www-data:www-data', repo],
cwd=GIT_REPO_PATH)
_set_repo_description(repo, arguments.description)
_set_repo_owner(repo, arguments.owner)
def _create_repo(arguments):
"""Create an empty repository."""
repo = arguments.name
subprocess.check_call(['git', 'init', '--bare', repo], cwd=GIT_REPO_PATH)
if not arguments.keep_ownership:
subprocess.check_call(['chown', '-R', 'www-data:www-data', repo],
cwd=GIT_REPO_PATH)
_set_repo_description(repo, arguments.description)
_set_repo_owner(repo, arguments.owner)
if arguments.is_private:
_set_access_status(repo, 'private')
def _get_repo_description(repo):
"""Set description of the repository."""
description_file = os.path.join(GIT_REPO_PATH, repo + '.git',
'description')
description_file = os.path.join(GIT_REPO_PATH, repo, 'description')
if os.path.exists(description_file):
with open(description_file, 'r') as file_handle:
description = file_handle.read()
@ -116,15 +279,14 @@ def _get_repo_description(repo):
def _set_repo_description(repo, description):
"""Set description of the repository."""
description_file = os.path.join(GIT_REPO_PATH, repo + '.git',
'description')
description_file = os.path.join(GIT_REPO_PATH, repo, 'description')
with open(description_file, 'w') as file_handle:
file_handle.write(description)
def _get_repo_owner(repo):
"""Set repository's owner name."""
repo_config = os.path.join(GIT_REPO_PATH, repo + '.git', 'config')
repo_config = os.path.join(GIT_REPO_PATH, repo, 'config')
config = configparser.ConfigParser()
config.read(repo_config)
try:
@ -137,7 +299,7 @@ def _get_repo_owner(repo):
def _set_repo_owner(repo, owner):
"""Set repository's owner name."""
repo_config = os.path.join(GIT_REPO_PATH, repo + '.git', 'config')
repo_config = os.path.join(GIT_REPO_PATH, repo, 'config')
config = configparser.ConfigParser()
config.read(repo_config)
if not config.has_section('gitweb'):
@ -149,8 +311,8 @@ def _set_repo_owner(repo, owner):
def _get_access_status(repo):
"""Get repository's access status"""
private_file = os.path.join(GIT_REPO_PATH, repo + '.git', 'private')
"""Get repository's access status."""
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
if os.path.exists(private_file):
return 'private'
@ -159,7 +321,7 @@ def _get_access_status(repo):
def _set_access_status(repo, status):
"""Set repository as private or public"""
private_file = os.path.join(GIT_REPO_PATH, repo + '.git', 'private')
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
if status == 'private':
open(private_file, 'a')
elif status == 'public':
@ -169,64 +331,61 @@ def _set_access_status(repo, status):
def subcommand_rename_repo(arguments):
"""Rename a repository."""
validate_repository(arguments.oldname)
validate_repository(arguments.newname)
oldpath = os.path.join(GIT_REPO_PATH, arguments.oldname + '.git')
newpath = os.path.join(GIT_REPO_PATH, arguments.newname + '.git')
oldpath = os.path.join(GIT_REPO_PATH, arguments.oldname)
newpath = os.path.join(GIT_REPO_PATH, arguments.newname)
os.rename(oldpath, newpath)
def subcommand_set_repo_description(arguments):
"""Set description of the repository."""
validate_repository(arguments.name)
_set_repo_description(arguments.name, arguments.description)
def subcommand_set_repo_owner(arguments):
"""Set repository's owner name."""
validate_repository(arguments.name)
_set_repo_owner(arguments.name, arguments.owner)
def subcommand_set_repo_access(arguments):
"""Set repository's access status."""
validate_repository(arguments.name)
_set_access_status(arguments.name, arguments.access)
def subcommand_repo_info(arguments):
"""Get information about repository."""
validate_repository(arguments.name)
repo_path = os.path.join(GIT_REPO_PATH, arguments.name + '.git')
repo_path = os.path.join(GIT_REPO_PATH, arguments.name)
if not os.path.exists(repo_path):
raise RuntimeError('Repository not found')
print(
json.dumps(
dict(name=arguments.name, description=_get_repo_description(
dict(name=arguments.name[:-4], description=_get_repo_description(
arguments.name), owner=_get_repo_owner(arguments.name),
access=_get_access_status(arguments.name))))
def subcommand_create_repo(arguments):
"""Create a new git repository."""
validate_repository(arguments.name)
repo_name = arguments.name + '.git'
subprocess.check_call(['git', 'init', '--bare', repo_name],
cwd=GIT_REPO_PATH)
if not arguments.keep_ownership:
subprocess.check_call(['chown', '-R', 'www-data:www-data', repo_name],
cwd=GIT_REPO_PATH)
_set_repo_description(arguments.name, arguments.description)
_set_repo_owner(arguments.name, arguments.owner)
if arguments.is_private:
_set_access_status(arguments.name, 'private')
"""Create a new or clone a remote repository."""
if arguments.url:
if not arguments.skip_prepare:
_prepare_clone_repo(arguments)
if not arguments.prepare_only:
_clone_repo(arguments)
else:
_create_repo(arguments)
def subcommand_check_repo_exists(arguments):
"""Check whether remote repository exists."""
env = dict(os.environ, GIT_TERMINAL_PROMPT='0')
subprocess.check_call(['git', 'ls-remote', arguments.url, 'HEAD'],
timeout=10, env=env)
def subcommand_delete_repo(arguments):
"""Delete a git repository."""
validate_repository(arguments.name)
repo_path = os.path.join(GIT_REPO_PATH, arguments.name + '.git')
repo_path = os.path.join(GIT_REPO_PATH, arguments.name)
shutil.rmtree(repo_path)

View File

@ -26,10 +26,12 @@ from django.utils.translation import ugettext_lazy as _
from plinth import action_utils, actions
from plinth import app as app_module
from plinth import frontpage, menu
from plinth.errors import ActionError
from plinth.modules.apache.components import Webserver
from plinth.modules.firewall.components import Firewall
from plinth.modules.users import register_group
from .forms import is_repo_url
from .manifest import GIT_REPO_PATH, backup, clients # noqa, pylint: disable=unused-import
clients = clients
@ -102,13 +104,26 @@ class GitwebApp(app_module.App):
repos = []
if os.path.exists(GIT_REPO_PATH):
for repo in os.listdir(GIT_REPO_PATH):
if not repo.endswith('.git'):
if not repo.endswith('.git') or repo.startswith('.'):
continue
repo_info = {}
repo_info['name'] = repo[:-4]
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
access = 'public'
if os.path.exists(private_file):
access = 'private'
repos.append({'name': repo[:-4], 'access': access})
repo_info['access'] = 'private'
else:
repo_info['access'] = 'public'
progress_file = os.path.join(GIT_REPO_PATH, repo,
'clone_progress')
if os.path.exists(progress_file):
with open(progress_file) as file_handle:
clone_progress = file_handle.read()
repo_info['clone_progress'] = clone_progress
repos.append(repo_info)
return sorted(repos, key=lambda repo: repo['name'])
@ -189,21 +204,36 @@ def restore_post(packet):
app.update_service_access()
def repo_exists(name):
"""Check whether a remote repository exists."""
try:
actions.run('gitweb', ['check-repo-exists', '--url', name])
except ActionError:
return False
return True
def have_public_repos(repos):
"""Check for public repositories"""
return any((repo['access'] == 'public' for repo in repos))
def create_repo(repo, repo_description, owner, is_private):
"""Create a new repository by calling the action script."""
args = [
'create-repo', '--name', repo, '--description', repo_description,
'--owner', owner
]
"""Create a new repository or clone a remote repository."""
args = ['--description', repo_description, '--owner', owner]
if is_private:
args.append('--is-private')
actions.superuser_run('gitweb', args)
if is_repo_url(repo):
args = ['create-repo', '--url', repo] + args
# create a repo directory and set correct access rights
actions.superuser_run('gitweb', args + ['--prepare-only'])
# start cloning in background
actions.superuser_run('gitweb', args + ['--skip-prepare'],
run_in_background=True)
else:
args = ['create-repo', '--name', repo] + args
actions.superuser_run('gitweb', args)
def repo_info(repo):

View File

@ -19,37 +19,65 @@ Django form for configuring Gitweb.
"""
import re
from urllib.parse import urlparse
from django import forms
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator
from django.utils.translation import ugettext_lazy as _
from plinth.modules import gitweb
def validate_repository(name):
"""Validate a Git repository name."""
if not re.match(r'^[a-zA-Z0-9-._]+$', name):
raise ValidationError(_('Invalid repository name.'))
if name.startswith(('-', '.')):
raise ValidationError(_('Invalid repository name.'))
if name.endswith('.git.git'):
raise ValidationError(_('Invalid repository name.'))
def get_name_from_url(url):
"""Get a repository name from URL"""
return urlparse(url).path.split('/')[-1]
class EditRepoForm(forms.Form):
def is_repo_url(url):
"""Check if URL is valid."""
try:
RepositoryValidator(input_should_be='url')(url)
except ValidationError:
return False
return True
class RepositoryValidator:
input_should_be = 'name'
def __init__(self, input_should_be=None):
if input_should_be is not None:
self.input_should_be = input_should_be
def __call__(self, value):
"""Validate that the input is a correct repository name or URL"""
if self.input_should_be in ('url', 'url_or_name'):
try:
URLValidator(schemes=['http', 'https'],
message=_('Invalid repository URL.'))(value)
except ValidationError:
if self.input_should_be == 'url':
raise
else:
value = get_name_from_url(value)
if (not re.match(r'^[a-zA-Z0-9-._]+$', value)) \
or value.startswith(('-', '.')) \
or value.endswith('.git.git'):
raise ValidationError(_('Invalid repository name.'), 'invalid')
class CreateRepoForm(forms.Form):
"""Form to create and edit a new repository."""
name = forms.CharField(
label=_('Name of the repository'),
strip=True,
validators=[validate_repository],
help_text=_(
'An alpha-numeric string that uniquely identifies a repository.'),
)
label=_(
'Name of a new repository or URL to import an existing repository.'
), strip=True,
validators=[RepositoryValidator(input_should_be='url_or_name')],
widget=forms.TextInput(attrs={'autocomplete': 'off'}))
description = forms.CharField(
label=_('Description of the repository'), strip=True, required=False,
@ -68,6 +96,40 @@ class EditRepoForm(forms.Form):
super().__init__(*args, **kwargs)
self.fields['name'].widget.attrs.update({'autofocus': 'autofocus'})
def clean_name(self):
"""Check if the name is valid."""
name = self.cleaned_data['name']
repo_name = name
if is_repo_url(name):
repo_name = get_name_from_url(name)
if repo_name.endswith('.git'):
repo_name = repo_name[:-4]
for repo in gitweb.app.get_repo_list():
if repo_name == repo['name']:
raise ValidationError(
_('A repository with this name already exists.'))
if is_repo_url(name):
if not gitweb.repo_exists(name):
raise ValidationError('Remote repository is not available.')
return name
class EditRepoForm(CreateRepoForm):
"""Form to create and edit a new repository."""
name = forms.CharField(
label=_('Name of the repository'),
strip=True,
validators=[RepositoryValidator()],
help_text=_(
'An alpha-numeric string that uniquely identifies a repository.'),
)
def clean_name(self):
"""Check if the name is valid."""
name = self.cleaned_data['name']

View File

@ -20,6 +20,7 @@
{% load bootstrap %}
{% load i18n %}
{% load static %}
{% block page_head %}
<style type="text/css">
@ -33,6 +34,9 @@
.list-group-item .btn {
margin: -5px 2px;
}
.repo-cloning {
margin: 0px 10px;
}
</style>
{% endblock %}
@ -58,13 +62,13 @@
{% for repo in repos %}
<div class="list-group-item clearfix">
<a href="{% url 'gitweb:delete' repo.name %}"
class="btn btn-default btn-sm pull-right"
class="btn btn-default btn-sm pull-right {% if 'clone_progress' in repo %} disabled {% endif %}"
role="button"
title="{% blocktrans %}Delete repository {{ repo.name }}{% endblocktrans %}">
<span class="fa fa-trash-o" aria-hidden="true"></span>
</a>
<a class="repo-edit btn btn-sm btn-default pull-right"
<a class="repo-edit btn btn-sm btn-default pull-right {% if 'clone_progress' in repo %} disabled {% endif %}"
href="{% url 'gitweb:edit' repo.name %}">
<span class="fa fa-pencil-square-o" aria-hidden="true"></span>
</a>
@ -74,10 +78,17 @@
aria-label="private"></span>
{% endif %}
<a class="repo-label" href="/gitweb/{{ repo.name }}.git"
title="{% blocktrans %}Go to repository {{ repo.name }}{% endblocktrans %}">
{{ repo.name }}
</a>
{% if 'clone_progress' in repo %}
<span class="repo-cloning pull-right">
{% trans 'Cloning...' %} {{ repo.clone_progress }}%
</span>
<span class="repo-label">{{ repo.name }}<span>
{% else %}
<a class="repo-label" href="/gitweb/{{ repo.name }}.git"
title="{% blocktrans %}Go to repository {{ repo.name }}{% endblocktrans %}">
{{ repo.name }}
</a>
{% endif %}
</div>
{% endfor %}
</div>
@ -86,3 +97,12 @@
</div>
{% endblock %}
{% block page_js %}
{% if cloning %}
<script type="text/javascript" src="{% static 'theme/js/refresh.js' %}"></script>
{% endif %}
{% endblock %}

View File

@ -104,3 +104,16 @@ def test_action_create_repo_with_invalid_names(call_action, name):
'create-repo', '--name', name, '--description', '', '--owner', '',
'--keep-ownership'
])
@pytest.mark.parametrize('url', [
'Test-repo', 'file://root/Test-repo', 'localhost/Test-repo',
'ssh://localhost/Test-repo', 'https://localhost/.Test-repo'
])
def test_action_create_repo_with_invalid_urls(call_action, url):
"""Test that cloning repository with invalid URL fails."""
with pytest.raises(ValidationError):
call_action([
'create-repo', '--url', url, '--description', '', '--owner', '',
'--keep-ownership'
])

View File

@ -31,7 +31,7 @@ from plinth import actions, views
from plinth.errors import ActionError
from plinth.modules import gitweb
from .forms import EditRepoForm
from .forms import CreateRepoForm, EditRepoForm
class GitwebAppView(views.AppView):
@ -48,14 +48,16 @@ class GitwebAppView(views.AppView):
def get_context_data(self, *args, **kwargs):
"""Add repositories to the context data."""
context = super().get_context_data(*args, **kwargs)
context['repos'] = gitweb.app.get_repo_list()
repos = gitweb.app.get_repo_list()
context['repos'] = repos
context['cloning'] = any('clone_progress' in repo for repo in repos)
return context
class CreateRepoView(SuccessMessageMixin, FormView):
"""View to create a new repository."""
form_class = EditRepoForm
form_class = CreateRepoForm
prefix = 'gitweb'
template_name = 'gitweb_create_edit.html'
success_url = reverse_lazy('gitweb:index')
@ -75,9 +77,13 @@ class CreateRepoView(SuccessMessageMixin, FormView):
form_data[key] = ''
else:
form_data[key] = value
gitweb.create_repo(form_data['name'], form_data['description'],
form_data['owner'], form_data['is_private'])
try:
gitweb.create_repo(form_data['name'], form_data['description'],
form_data['owner'], form_data['is_private'])
except ActionError:
messages.error(
self.request,
_('An error occurred while creating the repository.'))
gitweb.app.update_service_access()
return super().form_valid(form)
@ -102,7 +108,7 @@ class EditRepoView(SuccessMessageMixin, FormView):
"""Load information about repository being edited."""
name = self.kwargs['name']
for repo in gitweb.app.get_repo_list():
if repo['name'] == name:
if repo['name'] == name and 'clone_progress' not in repo:
break
else:
raise Http404
@ -136,7 +142,7 @@ def delete(request, name):
On POST, delete the repository.
"""
for repo in gitweb.app.get_repo_list():
if repo['name'] == name:
if repo['name'] == name and 'clone_progress' not in repo:
break
else:
raise Http404