Veiko Aasa 545b35c0aa
gitweb: Switch default branch name to main for new repositories
I tested additionally that if the root user has already configured
default branch other than main, it is not changed by the gitweb app
setup process.

Signed-off-by: Veiko Aasa <veiko17@disroot.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
2022-07-15 20:54:32 -04:00

483 lines
16 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Configuration helper for Gitweb.
"""
import argparse
import configparser
import json
import logging
import os
import re
import shutil
import subprocess
import sys
import time
from plinth import action_utils
from plinth.modules.gitweb.forms import RepositoryValidator, get_name_from_url
from plinth.modules.gitweb.manifest import GIT_REPO_PATH
logger = logging.getLogger(__name__)
class ValidateRepoName(argparse.Action):
"""Validate a repository name and add .git extension if necessary."""
def __call__(self, parser, namespace, values, option_string=None):
RepositoryValidator()(values)
if not values.endswith('.git'):
values = values + '.git'
setattr(namespace, self.dest, values)
class ValidateRepoUrl(argparse.Action):
"""Validate a repository URL."""
def __call__(self, parser, namespace, values, option_string=None):
RepositoryValidator(input_should_be='url')(values)
setattr(namespace, self.dest, values)
def parse_arguments():
"""Return parsed command line arguments as dictionary."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
subparsers.add_parser(
'setup', help='Perform post-installation operations for Gitweb')
subparser = subparsers.add_parser('create-repo',
help='Create a new repository')
group = subparser.add_mutually_exclusive_group(required=True)
group.add_argument('--name', action=ValidateRepoName,
help='Name of the repository')
group.add_argument('--url', action=ValidateRepoUrl,
help='URL of the remote repository')
subparser.add_argument('--description', required=True,
help='Description of the repository')
subparser.add_argument('--owner', required=True,
help='Repositorys owner name')
subparser.add_argument(
'--is-private', required=False, default=False, action='store_true',
help='Allow only authorized users to access this repository')
subparser.add_argument(
'--keep-ownership', required=False, default=False, action="store_true",
help='Do not chanege ownership of the repository directory')
subparser.add_argument('--prepare-only', required=False, default=False,
action='store_true',
help='Run preparation tasks for cloning.')
subparser.add_argument('--skip-prepare', required=False, default=False,
action='store_true',
help='Skip preparation tasks for cloning.')
subparser = subparsers.add_parser(
'repo-info', help='Get information about the repository')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser = subparsers.add_parser(
'check-repo-exists', help='Check whether the remote repository exists')
subparser.add_argument('--url', required=True, action=ValidateRepoUrl,
help='URL of the remote repository')
subparser = subparsers.add_parser('rename-repo',
help='Rename an repository')
subparser.add_argument('--oldname', required=True, action=ValidateRepoName,
help='Old name of the repository')
subparser.add_argument('--newname', required=True, action=ValidateRepoName,
help='New name of the repository')
subparser = subparsers.add_parser(
'set-default-branch', help='Set default branch of the repository')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser.add_argument('--branch', required=True,
help='Name of the branch')
subparser = subparsers.add_parser(
'get-branches', help='Get all the branches of the repository')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser = subparsers.add_parser('set-repo-description',
help='Set description of the repository')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser.add_argument('--description', required=True,
help='Description of the repository')
subparser = subparsers.add_parser('set-repo-owner',
help='Set repository\'s owner name')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser.add_argument('--owner', required=True,
help='Repositorys owner name')
subparser = subparsers.add_parser(
'set-repo-access', help='Set repository as private or public')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository')
subparser.add_argument('--access', required=True,
choices=['public', 'private'], help='Access status')
subparser = subparsers.add_parser('delete-repo',
help='Delete an existing repository')
subparser.add_argument('--name', required=True, action=ValidateRepoName,
help='Name of the repository to remove')
subparsers.required = True
args = parser.parse_args()
if args.subcommand == 'create-repo' and args.name:
if args.prepare_only:
parser.error('--prepare-only can be set when using --url')
if args.skip_prepare:
parser.error('--skip-prepare can be set when using --url')
return args
def subcommand_setup(_):
"""Disable default Apache2 Gitweb configuration."""
action_utils.webserver_disable('gitweb')
if not _get_global_default_branch():
_set_global_default_branch('main')
def _get_global_default_branch():
"""Get globally configured default branch name."""
try:
default_branch = subprocess.check_output(
['git', 'config', '--global', '--get',
'init.defaultBranch']).decode().strip()
except subprocess.CalledProcessError as exception:
if exception.returncode == 1: # Default branch not configured
return None
raise
return default_branch
def _set_global_default_branch(name):
"""Configure default branch name globally."""
subprocess.check_call(
['git', 'config', '--global', 'init.defaultBranch', name])
def _clone_with_progress_report(url, repo_dir):
"""Clone a repository and write progress info to the file."""
starttime = time.time()
status_file = os.path.join(repo_dir, 'clone_progress')
repo_temp_dir = os.path.join(repo_dir, '.temp')
# do not ask for credidentials and set low speed timeout
env = dict(os.environ, GIT_TERMINAL_PROMPT='0', LC_ALL='C',
GIT_HTTP_LOW_SPEED_LIMIT='100', GIT_HTTP_LOW_SPEED_TIME='60')
proc = subprocess.Popen(
['git', 'clone', '--bare', '--progress', url, repo_temp_dir],
stderr=subprocess.PIPE, text=True, env=env)
# write clone progress to the file
errors = []
while True:
line = proc.stderr.readline()
if not line:
break
if 'error:' in line or 'fatal:' in line:
errors.append(line.strip())
currenttime = time.time()
if currenttime - starttime > 1:
elapsed = _clone_status_line_to_percent(line)
if elapsed is not None:
try:
with open(status_file, 'w',
encoding='utf-8') as file_handle:
file_handle.write(elapsed)
except OSError as error:
errors.append(str(error))
starttime = currenttime
# make sure process is ended
try:
proc.communicate(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
os.remove(status_file)
if proc.returncode != 0:
shutil.rmtree(repo_dir)
logger.error('Git repository cloning failed: %s', '\n'.join(errors))
raise RuntimeError('Git repository cloning failed.', errors)
def _prepare_clone_repo(arguments):
"""Prepare cloning a repository."""
repo_name = get_name_from_url(arguments.url)
if not repo_name.endswith('.git'):
repo_name = repo_name + '.git'
repo_dir = os.path.join(GIT_REPO_PATH, repo_name)
os.mkdir(repo_dir)
status_file = os.path.join(repo_dir, 'clone_progress')
try:
if arguments.is_private:
_set_access_status(repo_name, 'private')
with open(status_file, 'w', encoding='utf-8') as file_handle:
file_handle.write('0')
except OSError:
shutil.rmtree(repo_dir)
raise
def _clone_status_line_to_percent(line):
"""Parse Git clone command output."""
result = re.match(r'.* ([0-9]+)% ', line)
if result is not None:
text = result.group(0)
progress = int(result.group(1))
if 'Counting objects' in text:
total_progress = 0.05 * progress
elif 'Compressing objects' in text:
total_progress = 5 + 0.05 * progress
elif 'Receiving objects' in text:
total_progress = 10 + 0.6 * progress
elif 'Resolving deltas' in text:
total_progress = 70 + 0.3 * progress
return str(int(total_progress))
return None
def _clone_repo(arguments):
"""Clone a repository."""
url = arguments.url
repo = get_name_from_url(url)
if not repo.endswith('.git'):
repo = repo + '.git'
repo_path = os.path.join(GIT_REPO_PATH, repo)
repo_temp_path = os.path.join(repo_path, '.temp')
_clone_with_progress_report(url, repo_path)
for item in os.listdir(repo_temp_path):
shutil.move(os.path.join(repo_temp_path, item), repo_path)
shutil.rmtree(repo_temp_path)
if not arguments.keep_ownership:
subprocess.check_call(['chown', '-R', 'www-data:www-data', repo],
cwd=GIT_REPO_PATH)
_set_repo_description(repo, arguments.description)
_set_repo_owner(repo, arguments.owner)
def _create_repo(arguments):
"""Create an empty repository."""
repo = arguments.name
try:
subprocess.check_call(['git', 'init', '-q', '--bare', repo],
cwd=GIT_REPO_PATH)
if not arguments.keep_ownership:
subprocess.check_call(['chown', '-R', 'www-data:www-data', repo],
cwd=GIT_REPO_PATH)
_set_repo_description(repo, arguments.description)
_set_repo_owner(repo, arguments.owner)
if arguments.is_private:
_set_access_status(repo, 'private')
except (subprocess.CalledProcessError, OSError):
repo_path = os.path.join(GIT_REPO_PATH, repo)
if os.path.isdir(repo_path):
shutil.rmtree(repo_path)
raise
def _get_default_branch(repo):
"""Get default branch of the repository."""
repo_path = os.path.join(GIT_REPO_PATH, repo)
return subprocess.check_output(
['git', '-C', repo_path, 'symbolic-ref', '--short',
'HEAD']).decode().strip()
def _get_repo_description(repo):
"""Set description of the repository."""
description_file = os.path.join(GIT_REPO_PATH, repo, 'description')
if os.path.exists(description_file):
with open(description_file, 'r', encoding='utf-8') as file_handle:
description = file_handle.read()
else:
description = ''
return description
def _set_repo_description(repo, description):
"""Set description of the repository."""
description_file = os.path.join(GIT_REPO_PATH, repo, 'description')
with open(description_file, 'w', encoding='utf-8') as file_handle:
file_handle.write(description)
def _get_repo_owner(repo):
"""Set repository's owner name."""
repo_config = os.path.join(GIT_REPO_PATH, repo, 'config')
config = configparser.ConfigParser()
config.read(repo_config)
try:
owner = config['gitweb']['owner']
except KeyError:
owner = ''
return owner
def _set_repo_owner(repo, owner):
"""Set repository's owner name."""
repo_config = os.path.join(GIT_REPO_PATH, repo, 'config')
config = configparser.ConfigParser()
config.read(repo_config)
if not config.has_section('gitweb'):
config.add_section('gitweb')
config['gitweb']['owner'] = owner
with open(repo_config, 'w', encoding='utf-8') as file_handle:
config.write(file_handle)
def _get_access_status(repo):
"""Get repository's access status."""
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
if os.path.exists(private_file):
return 'private'
return 'public'
def _set_access_status(repo, status):
"""Set repository as private or public"""
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
if status == 'private':
open(private_file, 'a', encoding='utf-8')
elif status == 'public':
if os.path.exists(private_file):
os.remove(private_file)
def _get_branches(repo):
"""Return list of the branches in the repository."""
output = subprocess.check_output(
['git', '-C', repo, 'branch', '--format=%(refname:short)'],
cwd=GIT_REPO_PATH)
return output.decode().strip().split()
def subcommand_get_branches(arguments):
"""Check whether a branch exists in the repository."""
repo = arguments.name
print(
json.dumps(
dict(default_branch=_get_default_branch(repo),
branches=_get_branches(repo))))
def subcommand_rename_repo(arguments):
"""Rename a repository."""
oldpath = os.path.join(GIT_REPO_PATH, arguments.oldname)
newpath = os.path.join(GIT_REPO_PATH, arguments.newname)
os.rename(oldpath, newpath)
def subcommand_set_default_branch(arguments):
"""Set description of the repository."""
repo = arguments.name
branch = arguments.branch
if branch not in _get_branches(repo):
sys.exit('No such branch.')
subprocess.check_call([
'git', '-C', repo, 'symbolic-ref', 'HEAD',
"refs/heads/{}".format(branch)
], cwd=GIT_REPO_PATH)
def subcommand_set_repo_description(arguments):
"""Set description of the repository."""
_set_repo_description(arguments.name, arguments.description)
def subcommand_set_repo_owner(arguments):
"""Set repository's owner name."""
_set_repo_owner(arguments.name, arguments.owner)
def subcommand_set_repo_access(arguments):
"""Set repository's access status."""
_set_access_status(arguments.name, arguments.access)
def subcommand_repo_info(arguments):
"""Get information about repository."""
repo_path = os.path.join(GIT_REPO_PATH, arguments.name)
if not os.path.exists(repo_path):
raise RuntimeError('Repository not found')
print(
json.dumps(
dict(
name=arguments.name[:-4],
description=_get_repo_description(arguments.name),
owner=_get_repo_owner(arguments.name),
access=_get_access_status(arguments.name),
default_branch=_get_default_branch(arguments.name),
)))
def subcommand_create_repo(arguments):
"""Create a new or clone a remote repository."""
if arguments.url:
if not arguments.skip_prepare:
_prepare_clone_repo(arguments)
if not arguments.prepare_only:
_clone_repo(arguments)
else:
_create_repo(arguments)
def subcommand_check_repo_exists(arguments):
"""Check whether remote repository exists."""
env = dict(os.environ, GIT_TERMINAL_PROMPT='0')
subprocess.check_call(['git', 'ls-remote', arguments.url, 'HEAD'],
timeout=10, env=env)
def subcommand_delete_repo(arguments):
"""Delete a git repository."""
repo_path = os.path.join(GIT_REPO_PATH, arguments.name)
shutil.rmtree(repo_path)
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()