*: Use action_utils.run instead of subprocess.check_call

- This is to capture stdout and stderr and transmit that from privileged daemon
back to the service to be displayed in HTML.

Tests:

- Unit tests and code checks pass.

- Some of the modified actions work as expected.

- systemd daemon-reload is performed during infinoted setup.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: Veiko Aasa <veiko17@disroot.org>
This commit is contained in:
Sunil Mohan Adapa 2025-08-14 21:43:50 -07:00 committed by Veiko Aasa
parent 61ff15a04f
commit 80e6d940a4
No known key found for this signature in database
GPG Key ID: 478539CAE680674E
9 changed files with 44 additions and 44 deletions

View File

@ -511,7 +511,7 @@ def apt_hold(packages):
yield held_packages
finally:
for package in held_packages:
subprocess.check_call(['apt-mark', 'unhold', package])
run(['apt-mark', 'unhold', package], check=True)
@contextmanager
@ -527,7 +527,7 @@ def apt_hold_freedombox():
# Set the flag.
apt_hold_flag.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
apt_hold_flag.touch(mode=0o660)
yield subprocess.check_call(['apt-mark', 'hold', 'freedombox'])
yield run(['apt-mark', 'hold', 'freedombox'], check=True)
finally:
# Was the package held, either in this process or a previous one?
if not current_hold or apt_hold_flag.exists():

View File

@ -57,8 +57,8 @@ def _get_global_default_branch():
def _set_global_default_branch(name):
"""Configure default branch name globally."""
subprocess.check_call(
['git', 'config', '--global', 'init.defaultBranch', name])
action_utils.run(['git', 'config', '--global', 'init.defaultBranch', name],
check=True)
def _clone_with_progress_report(url, repo_dir):
@ -166,9 +166,9 @@ def _clone_repo(url: str, description: str, owner: str, keep_ownership: bool):
shutil.rmtree(repo_temp_path)
if not keep_ownership:
subprocess.check_call(
action_utils.run(
['chown', '-R', f'{REPO_DIR_OWNER}:{REPO_DIR_OWNER}', repo],
cwd=GIT_REPO_PATH)
cwd=GIT_REPO_PATH, check=True)
_set_repo_description(repo, description)
_set_repo_owner(repo, owner)
@ -178,12 +178,12 @@ def _create_repo(repo: str, description: str, owner: str, is_private: bool,
keep_ownership: bool):
"""Create an empty repository."""
try:
subprocess.check_call(['git', 'init', '-q', '--bare', repo],
cwd=GIT_REPO_PATH)
action_utils.run(['git', 'init', '-q', '--bare', repo],
cwd=GIT_REPO_PATH, check=True)
if not keep_ownership:
subprocess.check_call(
action_utils.run(
['chown', '-R', f'{REPO_DIR_OWNER}:{REPO_DIR_OWNER}', repo],
cwd=GIT_REPO_PATH)
cwd=GIT_REPO_PATH, check=True)
_set_repo_description(repo, description)
_set_repo_owner(repo, owner)
if is_private:
@ -372,8 +372,8 @@ def repo_exists(url: str) -> bool:
url = validate_repo_url(url)
env = dict(os.environ, GIT_TERMINAL_PROMPT='0')
try:
subprocess.check_call(['git', 'ls-remote', url, 'HEAD'], timeout=10,
env=env)
action_utils.run(['git', 'ls-remote', url, 'HEAD'], timeout=10,
env=env, check=True)
return True
except subprocess.CalledProcessError:
return False

View File

@ -124,7 +124,7 @@ def setup():
with open(SYSTEMD_SERVICE_PATH, 'w', encoding='utf-8') as file_handle:
file_handle.write(SYSTEMD_SERVICE)
subprocess.check_call(['systemctl', 'daemon-reload'])
action_utils.service_daemon_reload()
# Create infinoted group if needed.
try:

View File

@ -53,7 +53,8 @@ def add_package(file_name: str, temporary_file_path: str):
def _kiwix_manage_add(zim_file: str):
subprocess.check_call(['kiwix-manage', LIBRARY_FILE, 'add', zim_file])
action_utils.run(['kiwix-manage', LIBRARY_FILE, 'add', zim_file],
check=True)
# kiwix-serve doesn't read the library file unless it is restarted.
action_utils.service_try_restart('kiwix-server-freedombox')
@ -97,8 +98,8 @@ def delete_package(zim_id: str):
if book.attrib['id'] != zim_id:
continue
subprocess.check_call(
['kiwix-manage', LIBRARY_FILE, 'remove', zim_id])
action_utils.run(['kiwix-manage', LIBRARY_FILE, 'remove', zim_id],
check=True)
(KIWIX_HOME / book.attrib['path']).unlink()
action_utils.service_try_restart('kiwix-server-freedombox')
return

View File

@ -52,12 +52,12 @@ def setup():
with tempfile.NamedTemporaryFile() as password_file_handle:
password_file_handle.write(password.encode())
password_file_handle.flush()
subprocess.check_call([
action_utils.run([
get_php_command(), install_script, '--confpath=/etc/mediawiki',
'--dbtype=sqlite', '--dbpath=' + data_dir,
'--scriptpath=/mediawiki', '--passfile',
password_file_handle.name, 'Wiki', 'admin'
])
], check=True)
action_utils.run(['chmod', '-R', 'o-rwx', data_dir], check=True)
action_utils.run(['chown', '-R', 'www-data:www-data', data_dir],
check=True)
@ -102,17 +102,17 @@ def change_password(username: str, password: secret_str):
change_password_script = os.path.join(MAINTENANCE_SCRIPTS_DIR,
'changePassword.php')
subprocess.check_call([
action_utils.run([
get_php_command(), change_password_script, '--user', username,
'--password', password
])
], check=True)
@privileged
def update():
"""Run update.php maintenance script when version upgrades happen."""
update_script = os.path.join(MAINTENANCE_SCRIPTS_DIR, 'update.php')
subprocess.check_call([get_php_command(), update_script, '--quick'])
action_utils.run([get_php_command(), update_script, '--quick'], check=True)
def _update_setting(setting_name, setting_line):
@ -180,7 +180,7 @@ def set_default_language(language: str):
# languages.
rebuild_messages_script = os.path.join(MAINTENANCE_SCRIPTS_DIR,
'rebuildmessages.php')
subprocess.check_call([get_php_command(), rebuild_messages_script])
action_utils.run([get_php_command(), rebuild_messages_script], check=True)
@privileged

View File

@ -7,6 +7,7 @@ import pathlib
import shutil
import subprocess
from plinth import action_utils
from plinth.actions import privileged
DEFAULT_FILE = '/etc/default/samba'
@ -51,12 +52,13 @@ CONF = r'''
def _close_share(share_name):
"""Disconnect all samba users who are connected to the share."""
subprocess.check_call(['smbcontrol', 'smbd', 'close-share', share_name])
action_utils.run(['smbcontrol', 'smbd', 'close-share', share_name],
check=True)
def _conf_command(parameters, **kwargs):
"""Run samba configuration registry command."""
subprocess.check_call(['net', 'conf'] + parameters, **kwargs)
action_utils.run(['net', 'conf'] + parameters, check=True, **kwargs)
def _create_share(mount_point, share_type, windows_filesystem=False):
@ -198,8 +200,8 @@ def _set_open_share_permissions(directory):
file_path = os.path.join(root, file)
shutil.chown(file_path, group='freedombox-share')
os.chmod(file_path, 0o0664)
subprocess.check_call(['setfacl', '-Rm', 'g::rwX', directory])
subprocess.check_call(['setfacl', '-Rdm', 'g::rwX', directory])
action_utils.run(['setfacl', '-Rm', 'g::rwX', directory], check=True)
action_utils.run(['setfacl', '-Rdm', 'g::rwX', directory], check=True)
def _use_config_file(conf_file):
@ -229,8 +231,8 @@ def _set_share_permissions(directory):
file_path = os.path.join(root, file)
shutil.chown(file_path, group='freedombox-share')
os.chmod(file_path, 0o0664)
subprocess.check_call(['setfacl', '-Rm', 'g::rwX', directory])
subprocess.check_call(['setfacl', '-Rdm', 'g::rwX', directory])
action_utils.run(['setfacl', '-Rm', 'g::rwX', directory], check=True)
action_utils.run(['setfacl', '-Rdm', 'g::rwX', directory], check=True)
@privileged

View File

@ -7,7 +7,6 @@ import pathlib
import pwd
import shutil
import stat
import subprocess
import augeas
@ -101,7 +100,7 @@ def set_keys(user: str, keys: str, auth_user: str, auth_password: secret_str):
ssh_folder = os.path.join(get_user_homedir(user), '.ssh')
key_file_path = os.path.join(ssh_folder, 'authorized_keys')
subprocess.check_call(['mkhomedir_helper', user])
action_utils.run(['mkhomedir_helper', user], check=True)
if not os.path.exists(ssh_folder):
os.makedirs(ssh_folder)

View File

@ -264,9 +264,8 @@ def test_services_disable(service_is_running, service_disable, service_enable):
@patch('subprocess.run')
@patch('subprocess.check_call')
@patch('subprocess.check_output')
def test_apt_hold_packages(check_output, check_call, run, tmp_path):
def test_apt_hold_packages(check_output, run, tmp_path):
"""Test that holding apt packages works."""
hold_flag = tmp_path / 'flag'
run.return_value.returncode = 0
@ -277,29 +276,28 @@ def test_apt_hold_packages(check_output, check_call, run, tmp_path):
with distupgrade._apt_hold_packages():
assert hold_flag.exists()
assert hold_flag.stat().st_mode & 0o117 == 0
expected_call = [call(['apt-mark', 'hold', 'freedombox'])]
assert check_call.call_args_list == expected_call
expected_calls = [
call(['apt-mark', 'hold', 'freedombox'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True),
call(['apt-mark', 'hold', 'package1'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, check=False),
call(['apt-mark', 'hold', 'package2'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, check=False)
]
assert run.call_args_list == expected_calls
check_call.reset_mock()
run.reset_mock()
expected_call = [
call(['apt-mark', 'unhold', 'package1'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, check=True),
call(['apt-mark', 'unhold', 'package2'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, check=True),
call(['apt-mark', 'unhold', 'freedombox'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=False)
check=False),
]
assert run.call_args_list == expected_call
expected_calls = [
call(['apt-mark', 'unhold', 'package1']),
call(['apt-mark', 'unhold', 'package2'])
]
assert check_call.call_args_list == expected_calls
@patch('plinth.action_utils.debconf_set_selections')

View File

@ -339,14 +339,14 @@ def _get_samba_users():
def _delete_samba_user(username):
"""Delete a Samba user."""
if username in _get_samba_users():
subprocess.check_call(['smbpasswd', '-x', username])
action_utils.run(['smbpasswd', '-x', username], check=True)
_disconnect_samba_user(username)
def _disconnect_samba_user(username):
"""Disconnect a Samba user."""
try:
subprocess.check_call(['pkill', '-U', username, 'smbd'])
action_utils.run(['pkill', '-U', username, 'smbd'], check=True)
except subprocess.CalledProcessError as error:
if error.returncode != 1:
raise
@ -679,7 +679,7 @@ def set_user_status(username: str, status: str, auth_user: str,
# Set user status in Samba password database
if username in _get_samba_users():
subprocess.check_call(['smbpasswd', smbpasswd_flag, username])
action_utils.run(['smbpasswd', smbpasswd_flag, username], check=True)
_flush_cache()