From 7a4fb9d9f62a2a0715204210c203808548c6da6c Mon Sep 17 00:00:00 2001 From: Sunil Mohan Adapa Date: Sun, 10 Aug 2025 19:57:54 -0700 Subject: [PATCH] gitweb: Use pathlib API more Tests: - Existing gitweb repos are listed properly. Newly created repos are reported properly. - Directories without .git extension or starting with . are not shown. - Private repos are shown as private and public ones as public. - Cloning progress is shown properly in the list of repos. - Cloning starts with 0%. - Cloning file is removed after completion of cloning process. - Cloning is done into .temp directory. - After cloning repo can be checked out as expected. - Getting/setting of default branch/description/owner/private works. - Getting the list of branches work when selecting the default branch. - Creating new blank repo works. - Deleting a repo works - Uninstalling the app works. All repos are removed. - Retrieving non-existent repo shows error as expected - Backup/restore of repos works as expected. Signed-off-by: Sunil Mohan Adapa Reviewed-by: Joseph Nuthalapati --- plinth/modules/gitweb/__init__.py | 22 +++-- plinth/modules/gitweb/manifest.py | 6 +- plinth/modules/gitweb/privileged.py | 80 +++++++++---------- .../modules/gitweb/tests/test_privileged.py | 2 +- 4 files changed, 51 insertions(+), 59 deletions(-) diff --git a/plinth/modules/gitweb/__init__.py b/plinth/modules/gitweb/__init__.py index 59e350318..56933c3e6 100644 --- a/plinth/modules/gitweb/__init__.py +++ b/plinth/modules/gitweb/__init__.py @@ -1,8 +1,6 @@ # SPDX-License-Identifier: AGPL-3.0-or-later """FreedomBox app to configure Gitweb.""" -import os - from django.utils.translation import gettext_lazy as _ from plinth import app as app_module @@ -194,25 +192,23 @@ def create_repo(repo, repo_description, owner, is_private): def get_repo_list(): """List all git repositories.""" repos = [] - if os.path.exists(GIT_REPO_PATH): - for repo in os.listdir(GIT_REPO_PATH): - if not repo.endswith('.git') or repo.startswith('.'): + if GIT_REPO_PATH.exists(): + for repo in GIT_REPO_PATH.iterdir(): + if not repo.name.endswith('.git') or repo.name.startswith('.'): continue repo_info = {} - repo_info['name'] = repo[:-4] + repo_info['name'] = repo.name[:-4] - private_file = os.path.join(GIT_REPO_PATH, repo, 'private') - if os.path.exists(private_file): + private_file = repo / 'private' + if private_file.exists(): repo_info['access'] = 'private' else: repo_info['access'] = 'public' - progress_file = os.path.join(GIT_REPO_PATH, repo, 'clone_progress') - if os.path.exists(progress_file): - with open(progress_file, encoding='utf-8') as file_handle: - clone_progress = file_handle.read() - repo_info['clone_progress'] = clone_progress + progress_file = repo / 'clone_progress' + if progress_file.exists(): + repo_info['clone_progress'] = progress_file.read_text() repos.append(repo_info) diff --git a/plinth/modules/gitweb/manifest.py b/plinth/modules/gitweb/manifest.py index c5102c31e..8813a9f15 100644 --- a/plinth/modules/gitweb/manifest.py +++ b/plinth/modules/gitweb/manifest.py @@ -1,8 +1,10 @@ # SPDX-License-Identifier: AGPL-3.0-or-later +import pathlib + from django.utils.translation import gettext_lazy as _ -GIT_REPO_PATH = '/var/lib/git' +GIT_REPO_PATH = pathlib.Path('/var/lib/git') REPO_DIR_OWNER = 'www-data' clients = [ @@ -32,6 +34,6 @@ clients = [ }, ] -backup = {'data': {'directories': [GIT_REPO_PATH]}} +backup = {'data': {'directories': [str(GIT_REPO_PATH)]}} tags = [_('Git hosting'), _('Version control'), _('Developer tool')] diff --git a/plinth/modules/gitweb/privileged.py b/plinth/modules/gitweb/privileged.py index 5d81d568f..2dc79b008 100644 --- a/plinth/modules/gitweb/privileged.py +++ b/plinth/modules/gitweb/privileged.py @@ -4,7 +4,6 @@ import configparser import logging import os -import pathlib import re import shutil import subprocess @@ -65,15 +64,15 @@ def _set_global_default_branch(name): def _clone_with_progress_report(url, repo_dir): """Clone a repository and write progress info to the file.""" starttime = time.time() - status_file = os.path.join(repo_dir, 'clone_progress') - repo_temp_dir = os.path.join(repo_dir, '.temp') + status_file = repo_dir / 'clone_progress' + repo_temp_dir = repo_dir / '.temp' # do not ask for credidentials and set low speed timeout env = dict(os.environ, GIT_TERMINAL_PROMPT='0', LC_ALL='C', GIT_HTTP_LOW_SPEED_LIMIT='100', GIT_HTTP_LOW_SPEED_TIME='60') proc = subprocess.Popen( - ['git', 'clone', '--bare', '--progress', url, repo_temp_dir], - stderr=subprocess.PIPE, text=True, env=env) + ['git', 'clone', '--bare', '--progress', url, + str(repo_temp_dir)], stderr=subprocess.PIPE, text=True, env=env) # write clone progress to the file errors = [] @@ -90,9 +89,7 @@ def _clone_with_progress_report(url, repo_dir): elapsed = _clone_status_line_to_percent(line) if elapsed is not None: try: - with open(status_file, 'w', - encoding='utf-8') as file_handle: - file_handle.write(elapsed) + status_file.write_text(elapsed) except OSError as error: errors.append(str(error)) @@ -104,7 +101,7 @@ def _clone_with_progress_report(url, repo_dir): except subprocess.TimeoutExpired: proc.kill() - os.remove(status_file) + status_file.unlink() if proc.returncode != 0: shutil.rmtree(repo_dir) logger.error('Git repository cloning failed: %s', '\n'.join(errors)) @@ -117,15 +114,15 @@ def _prepare_clone_repo(url: str, is_private: bool): if not repo_name.endswith('.git'): repo_name = repo_name + '.git' - repo_dir = os.path.join(GIT_REPO_PATH, repo_name) + repo_dir = GIT_REPO_PATH / repo_name os.mkdir(repo_dir) - status_file = os.path.join(repo_dir, 'clone_progress') + status_file = repo_dir / 'clone_progress' try: if is_private: _set_access_status(repo_name, 'private') - with open(status_file, 'w', encoding='utf-8') as file_handle: - file_handle.write('0') + + status_file.write_text('0') except OSError: shutil.rmtree(repo_dir) raise @@ -159,8 +156,8 @@ def _clone_repo(url: str, description: str, owner: str, keep_ownership: bool): if not repo.endswith('.git'): repo = repo + '.git' - repo_path = os.path.join(GIT_REPO_PATH, repo) - repo_temp_path = os.path.join(repo_path, '.temp') + repo_path = GIT_REPO_PATH / repo + repo_temp_path = repo_path / '.temp' _clone_with_progress_report(url, repo_path) @@ -192,27 +189,26 @@ def _create_repo(repo: str, description: str, owner: str, is_private: bool, if is_private: _set_access_status(repo, 'private') except (subprocess.CalledProcessError, OSError): - repo_path = os.path.join(GIT_REPO_PATH, repo) - if os.path.isdir(repo_path): + repo_path = GIT_REPO_PATH / repo + if repo_path.is_dir(): shutil.rmtree(repo_path) raise def _get_default_branch(repo): """Get default branch of the repository.""" - repo_path = os.path.join(GIT_REPO_PATH, repo) + repo_path = GIT_REPO_PATH / repo return subprocess.check_output( - ['git', '-C', repo_path, 'symbolic-ref', '--short', - 'HEAD']).decode().strip() + ['git', '-C', + str(repo_path), 'symbolic-ref', '--short', 'HEAD']).decode().strip() def _get_repo_description(repo): """Set description of the repository.""" - description_file = os.path.join(GIT_REPO_PATH, repo, 'description') - if os.path.exists(description_file): - with open(description_file, 'r', encoding='utf-8') as file_handle: - description = file_handle.read() + description_file = GIT_REPO_PATH / repo / 'description' + if description_file.exists(): + description = description_file.read_text() else: description = '' @@ -221,14 +217,13 @@ def _get_repo_description(repo): def _set_repo_description(repo, description): """Set description of the repository.""" - description_file = os.path.join(GIT_REPO_PATH, repo, 'description') - with open(description_file, 'w', encoding='utf-8') as file_handle: - file_handle.write(description) + description_file = GIT_REPO_PATH / repo / 'description' + description_file.write_text(description) def _get_repo_owner(repo): """Set repository's owner name.""" - repo_config = os.path.join(GIT_REPO_PATH, repo, 'config') + repo_config = GIT_REPO_PATH / repo / 'config' config = configparser.ConfigParser() config.read(repo_config) try: @@ -241,21 +236,21 @@ def _get_repo_owner(repo): def _set_repo_owner(repo, owner): """Set repository's owner name.""" - repo_config = os.path.join(GIT_REPO_PATH, repo, 'config') + repo_config = GIT_REPO_PATH / repo / 'config' config = configparser.ConfigParser() config.read(repo_config) if not config.has_section('gitweb'): config.add_section('gitweb') config['gitweb']['owner'] = owner - with open(repo_config, 'w', encoding='utf-8') as file_handle: + with repo_config.open('w', encoding='utf-8') as file_handle: config.write(file_handle) def _get_access_status(repo): """Get repository's access status.""" - private_file = os.path.join(GIT_REPO_PATH, repo, 'private') - if os.path.exists(private_file): + private_file = GIT_REPO_PATH / repo / 'private' + if private_file.exists(): return 'private' return 'public' @@ -263,12 +258,11 @@ def _get_access_status(repo): def _set_access_status(repo, status): """Set repository as private or public.""" - private_file = os.path.join(GIT_REPO_PATH, repo, 'private') + private_file = GIT_REPO_PATH / repo / 'private' if status == 'private': - open(private_file, 'a', encoding='utf-8') + private_file.touch() elif status == 'public': - if os.path.exists(private_file): - os.remove(private_file) + private_file.unlink(missing_ok=True) def _get_branches(repo): @@ -293,9 +287,9 @@ def rename_repo(old_name: str, new_name: str): """Rename a repository.""" old_name = validate_repo_name(old_name) new_name = validate_repo_name(new_name) - oldpath = os.path.join(GIT_REPO_PATH, old_name) - newpath = os.path.join(GIT_REPO_PATH, new_name) - os.rename(oldpath, newpath) + oldpath = GIT_REPO_PATH / old_name + newpath = GIT_REPO_PATH / new_name + oldpath.rename(newpath) @privileged @@ -339,8 +333,8 @@ def set_repo_access(name: str, access: str): def repo_info(name: str) -> dict[str, str]: """Get information about repository.""" repo = validate_repo_name(name) - repo_path = os.path.join(GIT_REPO_PATH, repo) - if not os.path.exists(repo_path): + repo_path = GIT_REPO_PATH / repo + if not repo_path.exists(): raise RuntimeError('Repository not found') return dict(name=repo[:-4], description=_get_repo_description(repo), @@ -387,12 +381,12 @@ def repo_exists(url: str) -> bool: def delete_repo(name: str): """Delete a git repository.""" repo = validate_repo_name(name) - repo_path = os.path.join(GIT_REPO_PATH, repo) + repo_path = GIT_REPO_PATH / repo shutil.rmtree(repo_path) @privileged def uninstall(): """Remove git repositories.""" - for item in pathlib.Path(GIT_REPO_PATH).glob('*'): + for item in GIT_REPO_PATH.iterdir(): shutil.rmtree(item, ignore_errors=True) diff --git a/plinth/modules/gitweb/tests/test_privileged.py b/plinth/modules/gitweb/tests/test_privileged.py index dc8996f5a..36b6c0f6e 100644 --- a/plinth/modules/gitweb/tests/test_privileged.py +++ b/plinth/modules/gitweb/tests/test_privileged.py @@ -25,7 +25,7 @@ git_installed = pytest.mark.skipif(not pathlib.Path('/usr/bin/git').exists(), @pytest.fixture(autouse=True) def fixture_set_repo_path(tmpdir): """Set a repository path in the actions module.""" - privileged.GIT_REPO_PATH = str(tmpdir) + privileged.GIT_REPO_PATH = tmpdir @pytest.fixture(name='existing_repo')