Veiko Aasa dc837bd6b8
gitweb: Use Git credential helper when cloning URLs with credentials
This prevents logging usernames and passwords to the journal logs and to the
Git repo configuration. Also, avoids usernames and passwords appear in the
process list when cloning a repository.

Tests performed:
- Create a new repository by cloning an existing repository URL with basic
auth credentials. Check that:
  - Cloning succeeds.
  - Journal logs don't contain URLs with credential info.
  - The configuration of the cloned repository doesn't contain credential info.
- Try to clone a non-existing repository URL that contains credential
info. Cloning fails and there are no credential info in the journal logs.
- Cloning a public git repository without credential info succeeds.
- All the gitweb module tests pass.

Signed-off-by: Veiko Aasa <veiko17@disroot.org>
[sunil: Add/fix some more type hints]
[sunil: Add tests for URL parsing]
Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: Sunil Mohan Adapa <sunil@medhas.org>
2025-09-29 16:42:17 -07:00

425 lines
13 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Configuration helper for Gitweb."""
import configparser
import logging
import os
import pathlib
import re
import shutil
import subprocess
import time
from typing import Any
from urllib import parse
from plinth import action_utils
from plinth.actions import privileged, secret_str
from plinth.modules.gitweb.forms import RepositoryValidator, get_name_from_url
from plinth.modules.gitweb.manifest import GIT_REPO_PATH, REPO_DIR_OWNER
logger = logging.getLogger(__name__)
def validate_repo_name(name: str) -> str:
"""Validate a repository name and add .git extension if necessary."""
RepositoryValidator()(name)
if not name.endswith('.git'):
name = name + '.git'
return name
def validate_repo_url(url: secret_str) -> secret_str:
"""Validate a repository URL."""
RepositoryValidator(input_should_be='url')(url)
return url
@privileged
def setup():
"""Configure Gitweb module."""
# Disable default Apache2 Gitweb configuration.
action_utils.webserver_disable('gitweb')
# Configure Git client.
if not _get_git_global_config('init.defaultBranch'):
_set_git_global_config('init.defaultBranch', 'main')
if not _get_git_global_config('credential.helper'):
_set_git_global_config('credential.helper', 'cache')
def _get_git_global_config(key: str) -> str | None:
"""Return a value from Git global configuration."""
try:
value = action_utils.run(['git', 'config', '--global', '--get', key],
check=True).stdout.decode().strip()
except subprocess.CalledProcessError as exception:
if exception.returncode == 1: # Configuration option doesn't exist
return None
raise
return value
def _set_git_global_config(key: str, value: str) -> None:
"""Set a Git global configuration value."""
action_utils.run(['git', 'config', '--global', key, value], check=True)
def _setup_git_credentials(url: secret_str) -> str:
"""Set up git credential helper and return URL without credentials."""
url_parts = parse.urlsplit(url)
safe_netloc = url_parts.netloc.split('@')[-1]
safe_url = url_parts._replace(netloc=safe_netloc).geturl()
username = url_parts.username or ''
password = url_parts.password or ''
if username or password:
# Feed credentials to Git credential helper
input = (f'protocol={url_parts.scheme}\n'
f'host={safe_netloc}\n'
f'username={username}\n'
f'password={password}\n\n')
env = dict(os.environ, GIT_TERMINAL_PROMPT='0')
action_utils.run(['git', 'credential', 'approve'],
input=input.encode(), stdout=subprocess.DEVNULL,
check=True, env=env)
return safe_url
def _clone_with_progress_report(url: secret_str, repo_dir: pathlib.Path):
"""Clone a repository and write progress info to the file."""
starttime = time.time()
status_file = repo_dir / 'clone_progress'
repo_temp_dir = repo_dir / '.temp'
# do not ask for credidentials and set low speed timeout
env = dict(os.environ, GIT_TERMINAL_PROMPT='0', LC_ALL='C',
GIT_HTTP_LOW_SPEED_LIMIT='100', GIT_HTTP_LOW_SPEED_TIME='60')
safe_url = _setup_git_credentials(url)
logger.info(f'Cloning Git repository {safe_url} ...')
proc = subprocess.Popen(
['git', 'clone', '--bare', '--progress', safe_url,
str(repo_temp_dir)], stderr=subprocess.PIPE, text=True, env=env)
assert proc.stderr is not None
# write clone progress to the file
errors = []
while True:
line = proc.stderr.readline()
if not line:
break
if 'error:' in line or 'fatal:' in line:
errors.append(line.strip())
currenttime = time.time()
if currenttime - starttime > 1:
elapsed = _clone_status_line_to_percent(line)
if elapsed is not None:
try:
status_file.write_text(elapsed)
except OSError as error:
errors.append(str(error))
starttime = currenttime
# make sure process is ended
try:
proc.communicate(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
status_file.unlink()
if proc.returncode != 0:
shutil.rmtree(repo_dir)
logger.error('Git repository cloning failed: %s', '\n'.join(errors))
raise RuntimeError('Git repository cloning failed.', errors)
def _prepare_clone_repo(url: secret_str, is_private: bool):
"""Prepare cloning a repository."""
repo_name = get_name_from_url(url)
if not repo_name.endswith('.git'):
repo_name = repo_name + '.git'
repo_dir = GIT_REPO_PATH / repo_name
os.mkdir(repo_dir)
status_file = repo_dir / 'clone_progress'
try:
if is_private:
_set_access_status(repo_name, 'private')
status_file.write_text('0')
except OSError:
shutil.rmtree(repo_dir)
raise
def _clone_status_line_to_percent(line):
"""Parse Git clone command output."""
result = re.match(r'.* ([0-9]+)% ', line)
if result is not None:
text = result.group(0)
progress = int(result.group(1))
if 'Counting objects' in text:
total_progress = 0.05 * progress
elif 'Compressing objects' in text:
total_progress = 5 + 0.05 * progress
elif 'Receiving objects' in text:
total_progress = 10 + 0.6 * progress
elif 'Resolving deltas' in text:
total_progress = 70 + 0.3 * progress
else:
return None
return str(int(total_progress))
return None
def _clone_repo(url: secret_str, description: str, owner: str,
keep_ownership: bool):
"""Clone a repository."""
repo = get_name_from_url(url)
if not repo.endswith('.git'):
repo = repo + '.git'
repo_path = GIT_REPO_PATH / repo
repo_temp_path = repo_path / '.temp'
_clone_with_progress_report(url, repo_path)
for item in os.listdir(repo_temp_path):
shutil.move(os.path.join(repo_temp_path, item), repo_path)
shutil.rmtree(repo_temp_path)
if not keep_ownership:
action_utils.run(
['chown', '-R', f'{REPO_DIR_OWNER}:{REPO_DIR_OWNER}', repo],
cwd=GIT_REPO_PATH, check=True)
_set_repo_description(repo, description)
_set_repo_owner(repo, owner)
def _create_repo(repo: str, description: str, owner: str, is_private: bool,
keep_ownership: bool):
"""Create an empty repository."""
try:
action_utils.run(['git', 'init', '-q', '--bare', repo],
cwd=GIT_REPO_PATH, check=True)
if not keep_ownership:
action_utils.run(
['chown', '-R', f'{REPO_DIR_OWNER}:{REPO_DIR_OWNER}', repo],
cwd=GIT_REPO_PATH, check=True)
_set_repo_description(repo, description)
_set_repo_owner(repo, owner)
if is_private:
_set_access_status(repo, 'private')
except (subprocess.CalledProcessError, OSError):
repo_path = GIT_REPO_PATH / repo
if repo_path.is_dir():
shutil.rmtree(repo_path)
raise
def _get_default_branch(repo):
"""Get default branch of the repository."""
repo_path = GIT_REPO_PATH / repo
return action_utils.run_as_user(
['git', '-C',
str(repo_path), 'symbolic-ref', '--short', 'HEAD'],
username=REPO_DIR_OWNER, check=True).stdout.decode().strip()
def _get_repo_description(repo):
"""Set description of the repository."""
description_file = GIT_REPO_PATH / repo / 'description'
if description_file.exists():
description = description_file.read_text()
else:
description = ''
return description
def _set_repo_description(repo, description):
"""Set description of the repository."""
description_file = GIT_REPO_PATH / repo / 'description'
description_file.write_text(description)
def _get_repo_owner(repo):
"""Set repository's owner name."""
repo_config = GIT_REPO_PATH / repo / 'config'
config = configparser.ConfigParser()
config.read(repo_config)
try:
owner = config['gitweb']['owner']
except KeyError:
owner = ''
return owner
def _set_repo_owner(repo, owner):
"""Set repository's owner name."""
repo_config = GIT_REPO_PATH / repo / 'config'
config = configparser.ConfigParser()
config.read(repo_config)
if not config.has_section('gitweb'):
config.add_section('gitweb')
config['gitweb']['owner'] = owner
with repo_config.open('w', encoding='utf-8') as file_handle:
config.write(file_handle)
def _get_access_status(repo):
"""Get repository's access status."""
private_file = GIT_REPO_PATH / repo / 'private'
if private_file.exists():
return 'private'
return 'public'
def _set_access_status(repo, status):
"""Set repository as private or public."""
private_file = GIT_REPO_PATH / repo / 'private'
if status == 'private':
private_file.touch()
elif status == 'public':
private_file.unlink(missing_ok=True)
def _get_branches(repo):
"""Return list of the branches in the repository."""
process = action_utils.run_as_user(
['git', '-C', repo, 'branch', '--format=%(refname:short)'],
cwd=GIT_REPO_PATH, username=REPO_DIR_OWNER, check=True)
return process.stdout.decode().strip().split()
@privileged
def get_branches(name: str) -> dict[str, Any]:
"""Check whether a branch exists in the repository."""
repo = validate_repo_name(name)
return dict(default_branch=_get_default_branch(repo),
branches=_get_branches(repo))
@privileged
def rename_repo(old_name: str, new_name: str):
"""Rename a repository."""
old_name = validate_repo_name(old_name)
new_name = validate_repo_name(new_name)
oldpath = GIT_REPO_PATH / old_name
newpath = GIT_REPO_PATH / new_name
oldpath.rename(newpath)
@privileged
def set_default_branch(name: str, branch: str):
"""Set description of the repository."""
repo = validate_repo_name(name)
if branch not in _get_branches(repo):
raise ValueError('No such branch')
action_utils.run_as_user(
['git', '-C', repo, 'symbolic-ref', 'HEAD', f'refs/heads/{branch}'],
cwd=GIT_REPO_PATH, check=True, username=REPO_DIR_OWNER)
@privileged
def set_repo_description(name: str, description: str):
"""Set description of the repository."""
repo = validate_repo_name(name)
_set_repo_description(repo, description)
@privileged
def set_repo_owner(name: str, owner: str):
"""Set repository's owner name."""
repo = validate_repo_name(name)
_set_repo_owner(repo, owner)
@privileged
def set_repo_access(name: str, access: str):
"""Set repository's access status."""
repo = validate_repo_name(name)
if access not in ('public', 'private'):
raise ValueError('Invalid access parameter')
_set_access_status(repo, access)
@privileged
def repo_info(name: str) -> dict[str, str]:
"""Get information about repository."""
repo = validate_repo_name(name)
repo_path = GIT_REPO_PATH / repo
if not repo_path.exists():
raise RuntimeError('Repository not found')
return dict(name=repo[:-4], description=_get_repo_description(repo),
owner=_get_repo_owner(repo), access=_get_access_status(repo),
default_branch=_get_default_branch(repo))
@privileged
def create_repo(url: secret_str | None = None, name: str | None = None,
description: str = '', owner: str = '',
keep_ownership: bool = False, is_private: bool = False,
skip_prepare: bool = False, prepare_only: bool = False):
"""Create a new or clone a remote repository."""
if url:
url = validate_repo_url(url)
if name:
repo = validate_repo_name(name)
if url:
if not skip_prepare:
_prepare_clone_repo(url, is_private)
if not prepare_only:
_clone_repo(url, description, owner, keep_ownership)
elif repo is not None:
_create_repo(repo, description, owner, is_private, keep_ownership)
@privileged
def repo_exists(url: secret_str) -> bool:
"""Return whether remote repository exists."""
url = validate_repo_url(url)
safe_url = _setup_git_credentials(url)
env = dict(os.environ, GIT_TERMINAL_PROMPT='0')
try:
action_utils.run(['git', 'ls-remote', safe_url, 'HEAD'], timeout=10,
env=env, check=True)
return True
except subprocess.CalledProcessError:
return False
@privileged
def delete_repo(name: str):
"""Delete a git repository."""
repo = validate_repo_name(name)
repo_path = GIT_REPO_PATH / repo
shutil.rmtree(repo_path)
@privileged
def uninstall():
"""Remove git repositories."""
for item in GIT_REPO_PATH.iterdir():
shutil.rmtree(item, ignore_errors=True)