#!/usr/bin/python3 # # This file is part of FreedomBox. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # """ Configuration helper for Gitweb. """ import argparse import configparser import json import logging import os import re import shutil import subprocess import time from plinth import action_utils from plinth.modules.gitweb.forms import RepositoryValidator, get_name_from_url from plinth.modules.gitweb.manifest import GIT_REPO_PATH logger = logging.getLogger(__name__) class ValidateRepoName(argparse.Action): """Validate a repository name and add .git extension if necessary.""" def __call__(self, parser, namespace, values, option_string=None): RepositoryValidator()(values) if not values.endswith('.git'): values = values + '.git' setattr(namespace, self.dest, values) class ValidateRepoUrl(argparse.Action): """Validate a repository URL.""" def __call__(self, parser, namespace, values, option_string=None): RepositoryValidator(input_should_be='url')(values) setattr(namespace, self.dest, values) def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') subparsers.add_parser( 'setup', help='Perform post-installation operations for Gitweb') subparser = subparsers.add_parser('create-repo', help='Create a new repository') group = subparser.add_mutually_exclusive_group(required=True) group.add_argument('--name', action=ValidateRepoName, help='Name of the repository') group.add_argument('--url', action=ValidateRepoUrl, help='URL of the remote repository') subparser.add_argument('--description', required=True, help='Description of the repository') subparser.add_argument('--owner', required=True, help='Repository’s owner name') subparser.add_argument( '--is-private', required=False, default=False, action='store_true', help='Allow only authorized users to access this repository') subparser.add_argument( '--keep-ownership', required=False, default=False, action="store_true", help='Do not chanege ownership of the repository directory') subparser.add_argument('--prepare-only', required=False, default=False, action='store_true', help='Run preparation tasks for cloning.') subparser.add_argument('--skip-prepare', required=False, default=False, action='store_true', help='Skip preparation tasks for cloning.') subparser = subparsers.add_parser( 'repo-info', help='Get information about the repository') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository') subparser = subparsers.add_parser( 'check-repo-exists', help='Check whether the remote repository exists') subparser.add_argument('--url', required=True, action=ValidateRepoUrl, help='URL of the remote repository') subparser = subparsers.add_parser('rename-repo', help='Rename an repository') subparser.add_argument('--oldname', required=True, action=ValidateRepoName, help='Old name of the repository') subparser.add_argument('--newname', required=True, action=ValidateRepoName, help='New name of the repository') subparser = subparsers.add_parser('set-repo-description', help='Set description of the repository') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository') subparser.add_argument('--description', required=True, help='Description of the repository') subparser = subparsers.add_parser('set-repo-owner', help='Set repository\'s owner name') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository') subparser.add_argument('--owner', required=True, help='Repository’s owner name') subparser = subparsers.add_parser( 'set-repo-access', help='Set repository as private or public') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository') subparser.add_argument('--access', required=True, choices=['public', 'private'], help='Access status') subparser = subparsers.add_parser('delete-repo', help='Delete an existing repository') subparser.add_argument('--name', required=True, action=ValidateRepoName, help='Name of the repository to remove') subparsers.required = True args = parser.parse_args() if args.subcommand == 'create-repo' and args.name: if args.prepare_only: parser.error('--prepare-only can be set when using --url') if args.skip_prepare: parser.error('--skip-prepare can be set when using --url') return args def subcommand_setup(_): """Disable default Apache2 Gitweb configuration.""" action_utils.webserver_disable('gitweb') def _clone_with_progress_report(url, repo_dir): """Clone a repository and write progress info to the file.""" starttime = time.time() status_file = os.path.join(repo_dir, 'clone_progress') repo_temp_dir = os.path.join(repo_dir, '.temp') # do not ask for credidentials and set low speed timeout env = dict(os.environ, GIT_TERMINAL_PROMPT='0', LC_ALL='C', GIT_HTTP_LOW_SPEED_LIMIT='100', GIT_HTTP_LOW_SPEED_TIME='60') proc = subprocess.Popen( ['git', 'clone', '--bare', '--progress', url, repo_temp_dir], stderr=subprocess.PIPE, text=True, env=env) # write clone progress to the file errors = [] while True: line = proc.stderr.readline() if not line: break if 'error:' in line or 'fatal:' in line: errors.append(line.strip()) currenttime = time.time() if currenttime - starttime > 1: elapsed = _clone_status_line_to_percent(line) if elapsed is not None: with open(status_file, 'w') as file_handle: file_handle.write(elapsed) starttime = currenttime # make sure process is ended try: proc.communicate(timeout=10) except subprocess.TimeoutExpired: proc.kill() os.remove(status_file) if proc.returncode != 0: shutil.rmtree(repo_dir) logger.error('Git repository cloning failed: %s', '\n'.join(errors)) raise RuntimeError('Git repository cloning failed.', errors) def _prepare_clone_repo(arguments): """Prepare cloning a repository.""" repo_name = get_name_from_url(arguments.url) if not repo_name.endswith('.git'): repo_name = repo_name + '.git' repo_dir = os.path.join(GIT_REPO_PATH, repo_name) os.mkdir(repo_dir) if arguments.is_private: _set_access_status(repo_name, 'private') status_file = os.path.join(repo_dir, 'clone_progress') with open(status_file, 'w') as file_handle: file_handle.write('0') def _clone_status_line_to_percent(line): """Parse Git clone command output.""" result = re.match(r'.* ([0-9]+)% ', line) if result is not None: text = result.group(0) progress = int(result.group(1)) if 'Counting objects' in text: total_progress = 0.05 * progress elif 'Compressing objects' in text: total_progress = 5 + 0.05 * progress elif 'Receiving objects' in text: total_progress = 10 + 0.6 * progress elif 'Resolving deltas' in text: total_progress = 70 + 0.3 * progress return str(int(total_progress)) return None def _clone_repo(arguments): """Clone a repository.""" url = arguments.url repo = get_name_from_url(url) if not repo.endswith('.git'): repo = repo + '.git' repo_path = os.path.join(GIT_REPO_PATH, repo) repo_temp_path = os.path.join(repo_path, '.temp') _clone_with_progress_report(url, repo_path) for item in os.listdir(repo_temp_path): shutil.move(os.path.join(repo_temp_path, item), repo_path) shutil.rmtree(repo_temp_path) if not arguments.keep_ownership: subprocess.check_call(['chown', '-R', 'www-data:www-data', repo], cwd=GIT_REPO_PATH) _set_repo_description(repo, arguments.description) _set_repo_owner(repo, arguments.owner) def _create_repo(arguments): """Create an empty repository.""" repo = arguments.name subprocess.check_call(['git', 'init', '--bare', repo], cwd=GIT_REPO_PATH) if not arguments.keep_ownership: subprocess.check_call(['chown', '-R', 'www-data:www-data', repo], cwd=GIT_REPO_PATH) _set_repo_description(repo, arguments.description) _set_repo_owner(repo, arguments.owner) if arguments.is_private: _set_access_status(repo, 'private') def _get_repo_description(repo): """Set description of the repository.""" description_file = os.path.join(GIT_REPO_PATH, repo, 'description') if os.path.exists(description_file): with open(description_file, 'r') as file_handle: description = file_handle.read() else: description = '' return description def _set_repo_description(repo, description): """Set description of the repository.""" description_file = os.path.join(GIT_REPO_PATH, repo, 'description') with open(description_file, 'w') as file_handle: file_handle.write(description) def _get_repo_owner(repo): """Set repository's owner name.""" repo_config = os.path.join(GIT_REPO_PATH, repo, 'config') config = configparser.ConfigParser() config.read(repo_config) try: owner = config['gitweb']['owner'] except KeyError: owner = '' return owner def _set_repo_owner(repo, owner): """Set repository's owner name.""" repo_config = os.path.join(GIT_REPO_PATH, repo, 'config') config = configparser.ConfigParser() config.read(repo_config) if not config.has_section('gitweb'): config.add_section('gitweb') config['gitweb']['owner'] = owner with open(repo_config, 'w') as file_handle: config.write(file_handle) def _get_access_status(repo): """Get repository's access status.""" private_file = os.path.join(GIT_REPO_PATH, repo, 'private') if os.path.exists(private_file): return 'private' return 'public' def _set_access_status(repo, status): """Set repository as private or public""" private_file = os.path.join(GIT_REPO_PATH, repo, 'private') if status == 'private': open(private_file, 'a') elif status == 'public': if os.path.exists(private_file): os.remove(private_file) def subcommand_rename_repo(arguments): """Rename a repository.""" oldpath = os.path.join(GIT_REPO_PATH, arguments.oldname) newpath = os.path.join(GIT_REPO_PATH, arguments.newname) os.rename(oldpath, newpath) def subcommand_set_repo_description(arguments): """Set description of the repository.""" _set_repo_description(arguments.name, arguments.description) def subcommand_set_repo_owner(arguments): """Set repository's owner name.""" _set_repo_owner(arguments.name, arguments.owner) def subcommand_set_repo_access(arguments): """Set repository's access status.""" _set_access_status(arguments.name, arguments.access) def subcommand_repo_info(arguments): """Get information about repository.""" repo_path = os.path.join(GIT_REPO_PATH, arguments.name) if not os.path.exists(repo_path): raise RuntimeError('Repository not found') print( json.dumps( dict(name=arguments.name[:-4], description=_get_repo_description( arguments.name), owner=_get_repo_owner(arguments.name), access=_get_access_status(arguments.name)))) def subcommand_create_repo(arguments): """Create a new or clone a remote repository.""" if arguments.url: if not arguments.skip_prepare: _prepare_clone_repo(arguments) if not arguments.prepare_only: _clone_repo(arguments) else: _create_repo(arguments) def subcommand_check_repo_exists(arguments): """Check whether remote repository exists.""" env = dict(os.environ, GIT_TERMINAL_PROMPT='0') subprocess.check_call(['git', 'ls-remote', arguments.url, 'HEAD'], timeout=10, env=env) def subcommand_delete_repo(arguments): """Delete a git repository.""" repo_path = os.path.join(GIT_REPO_PATH, arguments.name) shutil.rmtree(repo_path) def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()