mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
Now it is possible to change default branch when editing a repository. Gitweb site shows default branch as a main branch and the 'git clone' command checks out to default branch. Added unit and functional tests. Splitted one large 'test_actions' into multiple tests. Tests performed: - All gitweb unit and functional tests pass. - Created a repository from a remote repository which has default branch other than master. Confirmed that the 'Edit repository' page shows correct branch and gitweb site shows this branch as a default branch Closes #1925 Signed-off-by: Veiko Aasa <veiko17@disroot.org> Reviewed-by: Sunil Mohan Adapa <sunil@medhas.org>
460 lines
16 KiB
Python
Executable File
460 lines
16 KiB
Python
Executable File
#!/usr/bin/python3
|
||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||
"""
|
||
Configuration helper for Gitweb.
|
||
"""
|
||
|
||
import argparse
|
||
import configparser
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
|
||
from plinth import action_utils
|
||
from plinth.modules.gitweb.forms import RepositoryValidator, get_name_from_url
|
||
from plinth.modules.gitweb.manifest import GIT_REPO_PATH
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class ValidateRepoName(argparse.Action):
|
||
"""Validate a repository name and add .git extension if necessary."""
|
||
|
||
def __call__(self, parser, namespace, values, option_string=None):
|
||
RepositoryValidator()(values)
|
||
if not values.endswith('.git'):
|
||
values = values + '.git'
|
||
setattr(namespace, self.dest, values)
|
||
|
||
|
||
class ValidateRepoUrl(argparse.Action):
|
||
"""Validate a repository URL."""
|
||
|
||
def __call__(self, parser, namespace, values, option_string=None):
|
||
RepositoryValidator(input_should_be='url')(values)
|
||
setattr(namespace, self.dest, values)
|
||
|
||
|
||
def parse_arguments():
|
||
"""Return parsed command line arguments as dictionary."""
|
||
parser = argparse.ArgumentParser()
|
||
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
|
||
|
||
subparsers.add_parser(
|
||
'setup', help='Perform post-installation operations for Gitweb')
|
||
|
||
subparser = subparsers.add_parser('create-repo',
|
||
help='Create a new repository')
|
||
group = subparser.add_mutually_exclusive_group(required=True)
|
||
group.add_argument('--name', action=ValidateRepoName,
|
||
help='Name of the repository')
|
||
group.add_argument('--url', action=ValidateRepoUrl,
|
||
help='URL of the remote repository')
|
||
subparser.add_argument('--description', required=True,
|
||
help='Description of the repository')
|
||
subparser.add_argument('--owner', required=True,
|
||
help='Repository’s owner name')
|
||
subparser.add_argument(
|
||
'--is-private', required=False, default=False, action='store_true',
|
||
help='Allow only authorized users to access this repository')
|
||
subparser.add_argument(
|
||
'--keep-ownership', required=False, default=False, action="store_true",
|
||
help='Do not chanege ownership of the repository directory')
|
||
subparser.add_argument('--prepare-only', required=False, default=False,
|
||
action='store_true',
|
||
help='Run preparation tasks for cloning.')
|
||
subparser.add_argument('--skip-prepare', required=False, default=False,
|
||
action='store_true',
|
||
help='Skip preparation tasks for cloning.')
|
||
|
||
subparser = subparsers.add_parser(
|
||
'repo-info', help='Get information about the repository')
|
||
subparser.add_argument('--name', required=True, action=ValidateRepoName,
|
||
help='Name of the repository')
|
||
|
||
subparser = subparsers.add_parser(
|
||
'check-repo-exists', help='Check whether the remote repository exists')
|
||
subparser.add_argument('--url', required=True, action=ValidateRepoUrl,
|
||
help='URL of the remote repository')
|
||
|
||
subparser = subparsers.add_parser('rename-repo',
|
||
help='Rename an repository')
|
||
subparser.add_argument('--oldname', required=True, action=ValidateRepoName,
|
||
help='Old name of the repository')
|
||
subparser.add_argument('--newname', required=True, action=ValidateRepoName,
|
||
help='New name of the repository')
|
||
|
||
subparser = subparsers.add_parser(
|
||
'set-default-branch', help='Set default branch of the repository')
|
||
subparser.add_argument('--name', required=True, action=ValidateRepoName,
|
||
help='Name of the repository')
|
||
subparser.add_argument('--branch', required=True,
|
||
help='Name of the branch')
|
||
|
||
subparser = subparsers.add_parser(
|
||
'get-branches', help='Get all the branches of the repository')
|
||
subparser.add_argument('--name', required=True, action=ValidateRepoName,
|
||
help='Name of the repository')
|
||
|
||
subparser = subparsers.add_parser('set-repo-description',
|
||
help='Set description of the repository')
|
||
subparser.add_argument('--name', required=True, action=ValidateRepoName,
|
||
help='Name of the repository')
|
||
subparser.add_argument('--description', required=True,
|
||
help='Description of the repository')
|
||
|
||
subparser = subparsers.add_parser('set-repo-owner',
|
||
help='Set repository\'s owner name')
|
||
subparser.add_argument('--name', required=True, action=ValidateRepoName,
|
||
help='Name of the repository')
|
||
subparser.add_argument('--owner', required=True,
|
||
help='Repository’s owner name')
|
||
|
||
subparser = subparsers.add_parser(
|
||
'set-repo-access', help='Set repository as private or public')
|
||
subparser.add_argument('--name', required=True, action=ValidateRepoName,
|
||
help='Name of the repository')
|
||
subparser.add_argument('--access', required=True,
|
||
choices=['public', 'private'], help='Access status')
|
||
|
||
subparser = subparsers.add_parser('delete-repo',
|
||
help='Delete an existing repository')
|
||
subparser.add_argument('--name', required=True, action=ValidateRepoName,
|
||
help='Name of the repository to remove')
|
||
|
||
subparsers.required = True
|
||
args = parser.parse_args()
|
||
if args.subcommand == 'create-repo' and args.name:
|
||
if args.prepare_only:
|
||
parser.error('--prepare-only can be set when using --url')
|
||
|
||
if args.skip_prepare:
|
||
parser.error('--skip-prepare can be set when using --url')
|
||
|
||
return args
|
||
|
||
|
||
def subcommand_setup(_):
|
||
"""Disable default Apache2 Gitweb configuration."""
|
||
action_utils.webserver_disable('gitweb')
|
||
|
||
|
||
def _clone_with_progress_report(url, repo_dir):
|
||
"""Clone a repository and write progress info to the file."""
|
||
starttime = time.time()
|
||
status_file = os.path.join(repo_dir, 'clone_progress')
|
||
repo_temp_dir = os.path.join(repo_dir, '.temp')
|
||
# do not ask for credidentials and set low speed timeout
|
||
env = dict(os.environ, GIT_TERMINAL_PROMPT='0', LC_ALL='C',
|
||
GIT_HTTP_LOW_SPEED_LIMIT='100', GIT_HTTP_LOW_SPEED_TIME='60')
|
||
|
||
proc = subprocess.Popen(
|
||
['git', 'clone', '--bare', '--progress', url, repo_temp_dir],
|
||
stderr=subprocess.PIPE, text=True, env=env)
|
||
|
||
# write clone progress to the file
|
||
errors = []
|
||
while True:
|
||
line = proc.stderr.readline()
|
||
if not line:
|
||
break
|
||
|
||
if 'error:' in line or 'fatal:' in line:
|
||
errors.append(line.strip())
|
||
|
||
currenttime = time.time()
|
||
if currenttime - starttime > 1:
|
||
elapsed = _clone_status_line_to_percent(line)
|
||
if elapsed is not None:
|
||
try:
|
||
with open(status_file, 'w') as file_handle:
|
||
file_handle.write(elapsed)
|
||
except OSError as error:
|
||
errors.append(str(error))
|
||
|
||
starttime = currenttime
|
||
|
||
# make sure process is ended
|
||
try:
|
||
proc.communicate(timeout=10)
|
||
except subprocess.TimeoutExpired:
|
||
proc.kill()
|
||
|
||
os.remove(status_file)
|
||
if proc.returncode != 0:
|
||
shutil.rmtree(repo_dir)
|
||
logger.error('Git repository cloning failed: %s', '\n'.join(errors))
|
||
raise RuntimeError('Git repository cloning failed.', errors)
|
||
|
||
|
||
def _prepare_clone_repo(arguments):
|
||
"""Prepare cloning a repository."""
|
||
repo_name = get_name_from_url(arguments.url)
|
||
if not repo_name.endswith('.git'):
|
||
repo_name = repo_name + '.git'
|
||
|
||
repo_dir = os.path.join(GIT_REPO_PATH, repo_name)
|
||
os.mkdir(repo_dir)
|
||
|
||
status_file = os.path.join(repo_dir, 'clone_progress')
|
||
try:
|
||
if arguments.is_private:
|
||
_set_access_status(repo_name, 'private')
|
||
with open(status_file, 'w') as file_handle:
|
||
file_handle.write('0')
|
||
except OSError:
|
||
shutil.rmtree(repo_dir)
|
||
raise
|
||
|
||
|
||
def _clone_status_line_to_percent(line):
|
||
"""Parse Git clone command output."""
|
||
result = re.match(r'.* ([0-9]+)% ', line)
|
||
if result is not None:
|
||
text = result.group(0)
|
||
progress = int(result.group(1))
|
||
if 'Counting objects' in text:
|
||
total_progress = 0.05 * progress
|
||
elif 'Compressing objects' in text:
|
||
total_progress = 5 + 0.05 * progress
|
||
elif 'Receiving objects' in text:
|
||
total_progress = 10 + 0.6 * progress
|
||
elif 'Resolving deltas' in text:
|
||
total_progress = 70 + 0.3 * progress
|
||
|
||
return str(int(total_progress))
|
||
|
||
return None
|
||
|
||
|
||
def _clone_repo(arguments):
|
||
"""Clone a repository."""
|
||
url = arguments.url
|
||
repo = get_name_from_url(url)
|
||
if not repo.endswith('.git'):
|
||
repo = repo + '.git'
|
||
|
||
repo_path = os.path.join(GIT_REPO_PATH, repo)
|
||
repo_temp_path = os.path.join(repo_path, '.temp')
|
||
|
||
_clone_with_progress_report(url, repo_path)
|
||
|
||
for item in os.listdir(repo_temp_path):
|
||
shutil.move(os.path.join(repo_temp_path, item), repo_path)
|
||
|
||
shutil.rmtree(repo_temp_path)
|
||
if not arguments.keep_ownership:
|
||
subprocess.check_call(['chown', '-R', 'www-data:www-data', repo],
|
||
cwd=GIT_REPO_PATH)
|
||
|
||
_set_repo_description(repo, arguments.description)
|
||
_set_repo_owner(repo, arguments.owner)
|
||
|
||
|
||
def _create_repo(arguments):
|
||
"""Create an empty repository."""
|
||
repo = arguments.name
|
||
try:
|
||
subprocess.check_call(['git', 'init', '-q', '--bare', repo],
|
||
cwd=GIT_REPO_PATH)
|
||
if not arguments.keep_ownership:
|
||
subprocess.check_call(['chown', '-R', 'www-data:www-data', repo],
|
||
cwd=GIT_REPO_PATH)
|
||
_set_repo_description(repo, arguments.description)
|
||
_set_repo_owner(repo, arguments.owner)
|
||
if arguments.is_private:
|
||
_set_access_status(repo, 'private')
|
||
except (subprocess.CalledProcessError, OSError):
|
||
repo_path = os.path.join(GIT_REPO_PATH, repo)
|
||
if os.path.isdir(repo_path):
|
||
shutil.rmtree(repo_path)
|
||
raise
|
||
|
||
|
||
def _get_default_branch(repo):
|
||
"""Get default branch of the repository."""
|
||
repo_path = os.path.join(GIT_REPO_PATH, repo)
|
||
|
||
return subprocess.check_output(
|
||
['git', '-C', repo_path, 'symbolic-ref', '--short',
|
||
'HEAD']).decode().strip()
|
||
|
||
|
||
def _get_repo_description(repo):
|
||
"""Set description of the repository."""
|
||
description_file = os.path.join(GIT_REPO_PATH, repo, 'description')
|
||
if os.path.exists(description_file):
|
||
with open(description_file, 'r') as file_handle:
|
||
description = file_handle.read()
|
||
else:
|
||
description = ''
|
||
|
||
return description
|
||
|
||
|
||
def _set_repo_description(repo, description):
|
||
"""Set description of the repository."""
|
||
description_file = os.path.join(GIT_REPO_PATH, repo, 'description')
|
||
with open(description_file, 'w') as file_handle:
|
||
file_handle.write(description)
|
||
|
||
|
||
def _get_repo_owner(repo):
|
||
"""Set repository's owner name."""
|
||
repo_config = os.path.join(GIT_REPO_PATH, repo, 'config')
|
||
config = configparser.ConfigParser()
|
||
config.read(repo_config)
|
||
try:
|
||
owner = config['gitweb']['owner']
|
||
except KeyError:
|
||
owner = ''
|
||
|
||
return owner
|
||
|
||
|
||
def _set_repo_owner(repo, owner):
|
||
"""Set repository's owner name."""
|
||
repo_config = os.path.join(GIT_REPO_PATH, repo, 'config')
|
||
config = configparser.ConfigParser()
|
||
config.read(repo_config)
|
||
if not config.has_section('gitweb'):
|
||
config.add_section('gitweb')
|
||
|
||
config['gitweb']['owner'] = owner
|
||
with open(repo_config, 'w') as file_handle:
|
||
config.write(file_handle)
|
||
|
||
|
||
def _get_access_status(repo):
|
||
"""Get repository's access status."""
|
||
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
|
||
if os.path.exists(private_file):
|
||
return 'private'
|
||
|
||
return 'public'
|
||
|
||
|
||
def _set_access_status(repo, status):
|
||
"""Set repository as private or public"""
|
||
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
|
||
if status == 'private':
|
||
open(private_file, 'a')
|
||
elif status == 'public':
|
||
if os.path.exists(private_file):
|
||
os.remove(private_file)
|
||
|
||
|
||
def _get_branches(repo):
|
||
"""Return list of the branches in the repository."""
|
||
output = subprocess.check_output(
|
||
['git', '-C', repo, 'branch', '--format=%(refname:short)'],
|
||
cwd=GIT_REPO_PATH)
|
||
|
||
return output.decode().strip().split()
|
||
|
||
|
||
def subcommand_get_branches(arguments):
|
||
"""Check whether a branch exists in the repository."""
|
||
repo = arguments.name
|
||
|
||
print(
|
||
json.dumps(
|
||
dict(default_branch=_get_default_branch(repo),
|
||
branches=_get_branches(repo))))
|
||
|
||
|
||
def subcommand_rename_repo(arguments):
|
||
"""Rename a repository."""
|
||
oldpath = os.path.join(GIT_REPO_PATH, arguments.oldname)
|
||
newpath = os.path.join(GIT_REPO_PATH, arguments.newname)
|
||
os.rename(oldpath, newpath)
|
||
|
||
|
||
def subcommand_set_default_branch(arguments):
|
||
"""Set description of the repository."""
|
||
repo = arguments.name
|
||
branch = arguments.branch
|
||
|
||
if branch not in _get_branches(repo):
|
||
sys.exit('No such branch.')
|
||
|
||
subprocess.check_call([
|
||
'git', '-C', repo, 'symbolic-ref', 'HEAD',
|
||
"refs/heads/{}".format(branch)
|
||
], cwd=GIT_REPO_PATH)
|
||
|
||
|
||
def subcommand_set_repo_description(arguments):
|
||
"""Set description of the repository."""
|
||
_set_repo_description(arguments.name, arguments.description)
|
||
|
||
|
||
def subcommand_set_repo_owner(arguments):
|
||
"""Set repository's owner name."""
|
||
_set_repo_owner(arguments.name, arguments.owner)
|
||
|
||
|
||
def subcommand_set_repo_access(arguments):
|
||
"""Set repository's access status."""
|
||
_set_access_status(arguments.name, arguments.access)
|
||
|
||
|
||
def subcommand_repo_info(arguments):
|
||
"""Get information about repository."""
|
||
repo_path = os.path.join(GIT_REPO_PATH, arguments.name)
|
||
if not os.path.exists(repo_path):
|
||
raise RuntimeError('Repository not found')
|
||
|
||
print(
|
||
json.dumps(
|
||
dict(
|
||
name=arguments.name[:-4],
|
||
description=_get_repo_description(arguments.name),
|
||
owner=_get_repo_owner(arguments.name),
|
||
access=_get_access_status(arguments.name),
|
||
default_branch=_get_default_branch(arguments.name),
|
||
)))
|
||
|
||
|
||
def subcommand_create_repo(arguments):
|
||
"""Create a new or clone a remote repository."""
|
||
if arguments.url:
|
||
if not arguments.skip_prepare:
|
||
_prepare_clone_repo(arguments)
|
||
|
||
if not arguments.prepare_only:
|
||
_clone_repo(arguments)
|
||
else:
|
||
_create_repo(arguments)
|
||
|
||
|
||
def subcommand_check_repo_exists(arguments):
|
||
"""Check whether remote repository exists."""
|
||
env = dict(os.environ, GIT_TERMINAL_PROMPT='0')
|
||
subprocess.check_call(['git', 'ls-remote', arguments.url, 'HEAD'],
|
||
timeout=10, env=env)
|
||
|
||
|
||
def subcommand_delete_repo(arguments):
|
||
"""Delete a git repository."""
|
||
repo_path = os.path.join(GIT_REPO_PATH, arguments.name)
|
||
shutil.rmtree(repo_path)
|
||
|
||
|
||
def main():
|
||
"""Parse arguments and perform all duties."""
|
||
arguments = parse_arguments()
|
||
|
||
subcommand = arguments.subcommand.replace('-', '_')
|
||
subcommand_method = globals()['subcommand_' + subcommand]
|
||
subcommand_method(arguments)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|