#!/usr/bin/python3 # SPDX-License-Identifier: AGPL-3.0-or-later """ Configuration helper for Gitweb. """ import argparse import configparser import json import logging import os import re import shutil import subprocess import sys import time from plinth import action_utils from plinth.modules.gitweb.forms import RepositoryValidator, get_name_from_url from plinth.modules.gitweb.manifest import GIT_REPO_PATH logger = logging.getLogger(__name__) class ValidateRepoName(argparse.Action): """Validate a repository name and add .git extension if necessary.""" def __call__(self, parser, namespace, values, option_string=None): RepositoryValidator()(values) if not values.endswith('.git'): values = values + '.git' setattr(namespace, self.dest, values) class ValidateRepoUrl(argparse.Action): """Validate a repository URL.""" def __call__(self, parser, namespace, values, option_string=None): RepositoryValidator(input_should_be='url')(values) setattr(namespace, self.dest, values) def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') subparsers.add_parser( 'setup', help='Perform post-installation operations for Gitweb') subparser = subparsers.add_parser('create-repo', help='Create a new repository') group = subparser.add_mutually_exclusive_group(required=True) group.add_argument('--name', action=ValidateRepoName, help='Name of the repository') group.add_argument('--url', action=ValidateRepoUrl, help='URL of the remote repository') subparser.add_argument('--description', required=True, help='Description of the repository') subparser.add_argument('--owner', required=True, help='Repository’s owner name') subparser.add_argument( '--is-private', required=False, default=False, action='store_true', help='Allow only authorized users to access this repository') subparser.add_argument( '--keep-ownership', required=False, default=False, action="store_true", help='Do not chanege ownership of the repository directory') subparser.add_argument('--prepare-only', required=False, default=False, action='store_true', help='Run preparation tasks for cloning.') subparser.add_argument('--skip-prepare', required=False, default=False, action='store_true', help='Skip preparation tasks for cloning.') subparser = subparsers.add_parser( 'repo-info', help='Get information about the repository') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository') subparser = subparsers.add_parser( 'check-repo-exists', help='Check whether the remote repository exists') subparser.add_argument('--url', required=True, action=ValidateRepoUrl, help='URL of the remote repository') subparser = subparsers.add_parser('rename-repo', help='Rename an repository') subparser.add_argument('--oldname', required=True, action=ValidateRepoName, help='Old name of the repository') subparser.add_argument('--newname', required=True, action=ValidateRepoName, help='New name of the repository') subparser = subparsers.add_parser( 'set-default-branch', help='Set default branch of the repository') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository') subparser.add_argument('--branch', required=True, help='Name of the branch') subparser = subparsers.add_parser( 'get-branches', help='Get all the branches of the repository') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository') subparser = subparsers.add_parser('set-repo-description', help='Set description of the repository') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository') subparser.add_argument('--description', required=True, help='Description of the repository') subparser = subparsers.add_parser('set-repo-owner', help='Set repository\'s owner name') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository') subparser.add_argument('--owner', required=True, help='Repository’s owner name') subparser = subparsers.add_parser( 'set-repo-access', help='Set repository as private or public') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository') subparser.add_argument('--access', required=True, choices=['public', 'private'], help='Access status') subparser = subparsers.add_parser('delete-repo', help='Delete an existing repository') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository to remove') subparsers.required = True args = parser.parse_args() if args.subcommand == 'create-repo' and args.name: if args.prepare_only: parser.error('--prepare-only can be set when using --url') if args.skip_prepare: parser.error('--skip-prepare can be set when using --url') return args def subcommand_setup(_): """Disable default Apache2 Gitweb configuration.""" action_utils.webserver_disable('gitweb') def _clone_with_progress_report(url, repo_dir): """Clone a repository and write progress info to the file.""" starttime = time.time() status_file = os.path.join(repo_dir, 'clone_progress') repo_temp_dir = os.path.join(repo_dir, '.temp') # do not ask for credidentials and set low speed timeout env = dict(os.environ, GIT_TERMINAL_PROMPT='0', LC_ALL='C', GIT_HTTP_LOW_SPEED_LIMIT='100', GIT_HTTP_LOW_SPEED_TIME='60') proc = subprocess.Popen( ['git', 'clone', '--bare', '--progress', url, repo_temp_dir], stderr=subprocess.PIPE, text=True, env=env) # write clone progress to the file errors = [] while True: line = proc.stderr.readline() if not line: break if 'error:' in line or 'fatal:' in line: errors.append(line.strip()) currenttime = time.time() if currenttime - starttime > 1: elapsed = _clone_status_line_to_percent(line) if elapsed is not None: try: with open(status_file, 'w') as file_handle: file_handle.write(elapsed) except OSError as error: errors.append(str(error)) starttime = currenttime # make sure process is ended try: proc.communicate(timeout=10) except subprocess.TimeoutExpired: proc.kill() os.remove(status_file) if proc.returncode != 0: shutil.rmtree(repo_dir) logger.error('Git repository cloning failed: %s', '\n'.join(errors)) raise RuntimeError('Git repository cloning failed.', errors) def _prepare_clone_repo(arguments): """Prepare cloning a repository.""" repo_name = get_name_from_url(arguments.url) if not repo_name.endswith('.git'): repo_name = repo_name + '.git' repo_dir = os.path.join(GIT_REPO_PATH, repo_name) os.mkdir(repo_dir) status_file = os.path.join(repo_dir, 'clone_progress') try: if arguments.is_private: _set_access_status(repo_name, 'private') with open(status_file, 'w') as file_handle: file_handle.write('0') except OSError: shutil.rmtree(repo_dir) raise def _clone_status_line_to_percent(line): """Parse Git clone command output.""" result = re.match(r'.* ([0-9]+)% ', line) if result is not None: text = result.group(0) progress = int(result.group(1)) if 'Counting objects' in text: total_progress = 0.05 * progress elif 'Compressing objects' in text: total_progress = 5 + 0.05 * progress elif 'Receiving objects' in text: total_progress = 10 + 0.6 * progress elif 'Resolving deltas' in text: total_progress = 70 + 0.3 * progress return str(int(total_progress)) return None def _clone_repo(arguments): """Clone a repository.""" url = arguments.url repo = get_name_from_url(url) if not repo.endswith('.git'): repo = repo + '.git' repo_path = os.path.join(GIT_REPO_PATH, repo) repo_temp_path = os.path.join(repo_path, '.temp') _clone_with_progress_report(url, repo_path) for item in os.listdir(repo_temp_path): shutil.move(os.path.join(repo_temp_path, item), repo_path) shutil.rmtree(repo_temp_path) if not arguments.keep_ownership: subprocess.check_call(['chown', '-R', 'www-data:www-data', repo], cwd=GIT_REPO_PATH) _set_repo_description(repo, arguments.description) _set_repo_owner(repo, arguments.owner) def _create_repo(arguments): """Create an empty repository.""" repo = arguments.name try: subprocess.check_call(['git', 'init', '-q', '--bare', repo], cwd=GIT_REPO_PATH) if not arguments.keep_ownership: subprocess.check_call(['chown', '-R', 'www-data:www-data', repo], cwd=GIT_REPO_PATH) _set_repo_description(repo, arguments.description) _set_repo_owner(repo, arguments.owner) if arguments.is_private: _set_access_status(repo, 'private') except (subprocess.CalledProcessError, OSError): repo_path = os.path.join(GIT_REPO_PATH, repo) if os.path.isdir(repo_path): shutil.rmtree(repo_path) raise def _get_default_branch(repo): """Get default branch of the repository.""" repo_path = os.path.join(GIT_REPO_PATH, repo) return subprocess.check_output( ['git', '-C', repo_path, 'symbolic-ref', '--short', 'HEAD']).decode().strip() def _get_repo_description(repo): """Set description of the repository.""" description_file = os.path.join(GIT_REPO_PATH, repo, 'description') if os.path.exists(description_file): with open(description_file, 'r') as file_handle: description = file_handle.read() else: description = '' return description def _set_repo_description(repo, description): """Set description of the repository.""" description_file = os.path.join(GIT_REPO_PATH, repo, 'description') with open(description_file, 'w') as file_handle: file_handle.write(description) def _get_repo_owner(repo): """Set repository's owner name.""" repo_config = os.path.join(GIT_REPO_PATH, repo, 'config') config = configparser.ConfigParser() config.read(repo_config) try: owner = config['gitweb']['owner'] except KeyError: owner = '' return owner def _set_repo_owner(repo, owner): """Set repository's owner name.""" repo_config = os.path.join(GIT_REPO_PATH, repo, 'config') config = configparser.ConfigParser() config.read(repo_config) if not config.has_section('gitweb'): config.add_section('gitweb') config['gitweb']['owner'] = owner with open(repo_config, 'w') as file_handle: config.write(file_handle) def _get_access_status(repo): """Get repository's access status.""" private_file = os.path.join(GIT_REPO_PATH, repo, 'private') if os.path.exists(private_file): return 'private' return 'public' def _set_access_status(repo, status): """Set repository as private or public""" private_file = os.path.join(GIT_REPO_PATH, repo, 'private') if status == 'private': open(private_file, 'a') elif status == 'public': if os.path.exists(private_file): os.remove(private_file) def _get_branches(repo): """Return list of the branches in the repository.""" output = subprocess.check_output( ['git', '-C', repo, 'branch', '--format=%(refname:short)'], cwd=GIT_REPO_PATH) return output.decode().strip().split() def subcommand_get_branches(arguments): """Check whether a branch exists in the repository.""" repo = arguments.name print( json.dumps( dict(default_branch=_get_default_branch(repo), branches=_get_branches(repo)))) def subcommand_rename_repo(arguments): """Rename a repository.""" oldpath = os.path.join(GIT_REPO_PATH, arguments.oldname) newpath = os.path.join(GIT_REPO_PATH, arguments.newname) os.rename(oldpath, newpath) def subcommand_set_default_branch(arguments): """Set description of the repository.""" repo = arguments.name branch = arguments.branch if branch not in _get_branches(repo): sys.exit('No such branch.') subprocess.check_call([ 'git', '-C', repo, 'symbolic-ref', 'HEAD', "refs/heads/{}".format(branch) ], cwd=GIT_REPO_PATH) def subcommand_set_repo_description(arguments): """Set description of the repository.""" _set_repo_description(arguments.name, arguments.description) def subcommand_set_repo_owner(arguments): """Set repository's owner name.""" _set_repo_owner(arguments.name, arguments.owner) def subcommand_set_repo_access(arguments): """Set repository's access status.""" _set_access_status(arguments.name, arguments.access) def subcommand_repo_info(arguments): """Get information about repository.""" repo_path = os.path.join(GIT_REPO_PATH, arguments.name) if not os.path.exists(repo_path): raise RuntimeError('Repository not found') print( json.dumps( dict( name=arguments.name[:-4], description=_get_repo_description(arguments.name), owner=_get_repo_owner(arguments.name), access=_get_access_status(arguments.name), default_branch=_get_default_branch(arguments.name), ))) def subcommand_create_repo(arguments): """Create a new or clone a remote repository.""" if arguments.url: if not arguments.skip_prepare: _prepare_clone_repo(arguments) if not arguments.prepare_only: _clone_repo(arguments) else: _create_repo(arguments) def subcommand_check_repo_exists(arguments): """Check whether remote repository exists.""" env = dict(os.environ, GIT_TERMINAL_PROMPT='0') subprocess.check_call(['git', 'ls-remote', arguments.url, 'HEAD'], timeout=10, env=env) def subcommand_delete_repo(arguments): """Delete a git repository.""" repo_path = os.path.join(GIT_REPO_PATH, arguments.name) shutil.rmtree(repo_path) def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()