gitweb: Use pathlib API more

Tests:

- Existing gitweb repos are listed properly. Newly created repos are reported
  properly.

  - Directories without .git extension or starting with . are not shown.

  - Private repos are shown as private and public ones as public.

- Cloning progress is shown properly in the list of repos.

  - Cloning starts with 0%.

  - Cloning file is removed after completion of cloning process.

  - Cloning is done into .temp directory.

  - After cloning repo can be checked out as expected.

- Getting/setting of default branch/description/owner/private works.

- Getting the list of branches work when selecting the default branch.

- Creating new blank repo works.

- Deleting a repo works

- Uninstalling the app works. All repos are removed.

- Retrieving non-existent repo shows error as expected

- Backup/restore of repos works as expected.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: Joseph Nuthalapati <njoseph@riseup.net>
This commit is contained in:
Sunil Mohan Adapa 2025-08-10 19:57:54 -07:00 committed by Joseph Nuthalapati
parent de1070df35
commit 7a4fb9d9f6
No known key found for this signature in database
GPG Key ID: 5398F00A2FA43C35
4 changed files with 51 additions and 59 deletions

View File

@ -1,8 +1,6 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""FreedomBox app to configure Gitweb."""
import os
from django.utils.translation import gettext_lazy as _
from plinth import app as app_module
@ -194,25 +192,23 @@ def create_repo(repo, repo_description, owner, is_private):
def get_repo_list():
"""List all git repositories."""
repos = []
if os.path.exists(GIT_REPO_PATH):
for repo in os.listdir(GIT_REPO_PATH):
if not repo.endswith('.git') or repo.startswith('.'):
if GIT_REPO_PATH.exists():
for repo in GIT_REPO_PATH.iterdir():
if not repo.name.endswith('.git') or repo.name.startswith('.'):
continue
repo_info = {}
repo_info['name'] = repo[:-4]
repo_info['name'] = repo.name[:-4]
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
if os.path.exists(private_file):
private_file = repo / 'private'
if private_file.exists():
repo_info['access'] = 'private'
else:
repo_info['access'] = 'public'
progress_file = os.path.join(GIT_REPO_PATH, repo, 'clone_progress')
if os.path.exists(progress_file):
with open(progress_file, encoding='utf-8') as file_handle:
clone_progress = file_handle.read()
repo_info['clone_progress'] = clone_progress
progress_file = repo / 'clone_progress'
if progress_file.exists():
repo_info['clone_progress'] = progress_file.read_text()
repos.append(repo_info)

View File

@ -1,8 +1,10 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import pathlib
from django.utils.translation import gettext_lazy as _
GIT_REPO_PATH = '/var/lib/git'
GIT_REPO_PATH = pathlib.Path('/var/lib/git')
REPO_DIR_OWNER = 'www-data'
clients = [
@ -32,6 +34,6 @@ clients = [
},
]
backup = {'data': {'directories': [GIT_REPO_PATH]}}
backup = {'data': {'directories': [str(GIT_REPO_PATH)]}}
tags = [_('Git hosting'), _('Version control'), _('Developer tool')]

View File

@ -4,7 +4,6 @@
import configparser
import logging
import os
import pathlib
import re
import shutil
import subprocess
@ -65,15 +64,15 @@ def _set_global_default_branch(name):
def _clone_with_progress_report(url, repo_dir):
"""Clone a repository and write progress info to the file."""
starttime = time.time()
status_file = os.path.join(repo_dir, 'clone_progress')
repo_temp_dir = os.path.join(repo_dir, '.temp')
status_file = repo_dir / 'clone_progress'
repo_temp_dir = repo_dir / '.temp'
# do not ask for credidentials and set low speed timeout
env = dict(os.environ, GIT_TERMINAL_PROMPT='0', LC_ALL='C',
GIT_HTTP_LOW_SPEED_LIMIT='100', GIT_HTTP_LOW_SPEED_TIME='60')
proc = subprocess.Popen(
['git', 'clone', '--bare', '--progress', url, repo_temp_dir],
stderr=subprocess.PIPE, text=True, env=env)
['git', 'clone', '--bare', '--progress', url,
str(repo_temp_dir)], stderr=subprocess.PIPE, text=True, env=env)
# write clone progress to the file
errors = []
@ -90,9 +89,7 @@ def _clone_with_progress_report(url, repo_dir):
elapsed = _clone_status_line_to_percent(line)
if elapsed is not None:
try:
with open(status_file, 'w',
encoding='utf-8') as file_handle:
file_handle.write(elapsed)
status_file.write_text(elapsed)
except OSError as error:
errors.append(str(error))
@ -104,7 +101,7 @@ def _clone_with_progress_report(url, repo_dir):
except subprocess.TimeoutExpired:
proc.kill()
os.remove(status_file)
status_file.unlink()
if proc.returncode != 0:
shutil.rmtree(repo_dir)
logger.error('Git repository cloning failed: %s', '\n'.join(errors))
@ -117,15 +114,15 @@ def _prepare_clone_repo(url: str, is_private: bool):
if not repo_name.endswith('.git'):
repo_name = repo_name + '.git'
repo_dir = os.path.join(GIT_REPO_PATH, repo_name)
repo_dir = GIT_REPO_PATH / repo_name
os.mkdir(repo_dir)
status_file = os.path.join(repo_dir, 'clone_progress')
status_file = repo_dir / 'clone_progress'
try:
if is_private:
_set_access_status(repo_name, 'private')
with open(status_file, 'w', encoding='utf-8') as file_handle:
file_handle.write('0')
status_file.write_text('0')
except OSError:
shutil.rmtree(repo_dir)
raise
@ -159,8 +156,8 @@ def _clone_repo(url: str, description: str, owner: str, keep_ownership: bool):
if not repo.endswith('.git'):
repo = repo + '.git'
repo_path = os.path.join(GIT_REPO_PATH, repo)
repo_temp_path = os.path.join(repo_path, '.temp')
repo_path = GIT_REPO_PATH / repo
repo_temp_path = repo_path / '.temp'
_clone_with_progress_report(url, repo_path)
@ -192,27 +189,26 @@ def _create_repo(repo: str, description: str, owner: str, is_private: bool,
if is_private:
_set_access_status(repo, 'private')
except (subprocess.CalledProcessError, OSError):
repo_path = os.path.join(GIT_REPO_PATH, repo)
if os.path.isdir(repo_path):
repo_path = GIT_REPO_PATH / repo
if repo_path.is_dir():
shutil.rmtree(repo_path)
raise
def _get_default_branch(repo):
"""Get default branch of the repository."""
repo_path = os.path.join(GIT_REPO_PATH, repo)
repo_path = GIT_REPO_PATH / repo
return subprocess.check_output(
['git', '-C', repo_path, 'symbolic-ref', '--short',
'HEAD']).decode().strip()
['git', '-C',
str(repo_path), 'symbolic-ref', '--short', 'HEAD']).decode().strip()
def _get_repo_description(repo):
"""Set description of the repository."""
description_file = os.path.join(GIT_REPO_PATH, repo, 'description')
if os.path.exists(description_file):
with open(description_file, 'r', encoding='utf-8') as file_handle:
description = file_handle.read()
description_file = GIT_REPO_PATH / repo / 'description'
if description_file.exists():
description = description_file.read_text()
else:
description = ''
@ -221,14 +217,13 @@ def _get_repo_description(repo):
def _set_repo_description(repo, description):
"""Set description of the repository."""
description_file = os.path.join(GIT_REPO_PATH, repo, 'description')
with open(description_file, 'w', encoding='utf-8') as file_handle:
file_handle.write(description)
description_file = GIT_REPO_PATH / repo / 'description'
description_file.write_text(description)
def _get_repo_owner(repo):
"""Set repository's owner name."""
repo_config = os.path.join(GIT_REPO_PATH, repo, 'config')
repo_config = GIT_REPO_PATH / repo / 'config'
config = configparser.ConfigParser()
config.read(repo_config)
try:
@ -241,21 +236,21 @@ def _get_repo_owner(repo):
def _set_repo_owner(repo, owner):
"""Set repository's owner name."""
repo_config = os.path.join(GIT_REPO_PATH, repo, 'config')
repo_config = GIT_REPO_PATH / repo / 'config'
config = configparser.ConfigParser()
config.read(repo_config)
if not config.has_section('gitweb'):
config.add_section('gitweb')
config['gitweb']['owner'] = owner
with open(repo_config, 'w', encoding='utf-8') as file_handle:
with repo_config.open('w', encoding='utf-8') as file_handle:
config.write(file_handle)
def _get_access_status(repo):
"""Get repository's access status."""
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
if os.path.exists(private_file):
private_file = GIT_REPO_PATH / repo / 'private'
if private_file.exists():
return 'private'
return 'public'
@ -263,12 +258,11 @@ def _get_access_status(repo):
def _set_access_status(repo, status):
"""Set repository as private or public."""
private_file = os.path.join(GIT_REPO_PATH, repo, 'private')
private_file = GIT_REPO_PATH / repo / 'private'
if status == 'private':
open(private_file, 'a', encoding='utf-8')
private_file.touch()
elif status == 'public':
if os.path.exists(private_file):
os.remove(private_file)
private_file.unlink(missing_ok=True)
def _get_branches(repo):
@ -293,9 +287,9 @@ def rename_repo(old_name: str, new_name: str):
"""Rename a repository."""
old_name = validate_repo_name(old_name)
new_name = validate_repo_name(new_name)
oldpath = os.path.join(GIT_REPO_PATH, old_name)
newpath = os.path.join(GIT_REPO_PATH, new_name)
os.rename(oldpath, newpath)
oldpath = GIT_REPO_PATH / old_name
newpath = GIT_REPO_PATH / new_name
oldpath.rename(newpath)
@privileged
@ -339,8 +333,8 @@ def set_repo_access(name: str, access: str):
def repo_info(name: str) -> dict[str, str]:
"""Get information about repository."""
repo = validate_repo_name(name)
repo_path = os.path.join(GIT_REPO_PATH, repo)
if not os.path.exists(repo_path):
repo_path = GIT_REPO_PATH / repo
if not repo_path.exists():
raise RuntimeError('Repository not found')
return dict(name=repo[:-4], description=_get_repo_description(repo),
@ -387,12 +381,12 @@ def repo_exists(url: str) -> bool:
def delete_repo(name: str):
"""Delete a git repository."""
repo = validate_repo_name(name)
repo_path = os.path.join(GIT_REPO_PATH, repo)
repo_path = GIT_REPO_PATH / repo
shutil.rmtree(repo_path)
@privileged
def uninstall():
"""Remove git repositories."""
for item in pathlib.Path(GIT_REPO_PATH).glob('*'):
for item in GIT_REPO_PATH.iterdir():
shutil.rmtree(item, ignore_errors=True)

View File

@ -25,7 +25,7 @@ git_installed = pytest.mark.skipif(not pathlib.Path('/usr/bin/git').exists(),
@pytest.fixture(autouse=True)
def fixture_set_repo_path(tmpdir):
"""Set a repository path in the actions module."""
privileged.GIT_REPO_PATH = str(tmpdir)
privileged.GIT_REPO_PATH = tmpdir
@pytest.fixture(name='existing_repo')