mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
*: Collect output for all privileged sub-processes
- Now that we have a mechanism for properly collecting, transmitting, and display the stdout and stderr. There is no reason not to collect all of the stdin and stderr. - Also, the stdin/stderr=subprocess.PIPE is redundant and prevents the output from getting collected for debugging. So, remove it. Tests: - Ran functional tests on backups, calibre, ejabberd, email, gitweb, ikiwiki, infinoted, kiwix, mediawiki, mumble, nextcloud,, openvpn, samba, wireguard, zoph. 2-3 issues were found but did not seem like new errors. Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: Veiko Aasa <veiko17@disroot.org>
This commit is contained in:
parent
b253166f6d
commit
7f608cd570
@ -33,8 +33,7 @@ def is_systemd_running():
|
||||
|
||||
def systemd_get_default() -> str:
|
||||
"""Return the default target that systemd will boot into."""
|
||||
process = run(['systemctl', 'get-default'], stdout=subprocess.PIPE,
|
||||
check=True)
|
||||
process = run(['systemctl', 'get-default'], check=True)
|
||||
return process.stdout.decode().strip()
|
||||
|
||||
|
||||
@ -45,7 +44,7 @@ def systemd_set_default(target: str):
|
||||
|
||||
def service_daemon_reload():
|
||||
"""Reload systemd to ensure that newer unit files are read."""
|
||||
run(['systemctl', 'daemon-reload'], check=True, stdout=subprocess.DEVNULL)
|
||||
run(['systemctl', 'daemon-reload'], check=True)
|
||||
|
||||
|
||||
def service_is_running(servicename):
|
||||
@ -54,8 +53,7 @@ def service_is_running(servicename):
|
||||
Does not need to run as root.
|
||||
"""
|
||||
try:
|
||||
run(['systemctl', 'status', servicename], check=True,
|
||||
stdout=subprocess.DEVNULL)
|
||||
run(['systemctl', 'status', servicename], check=True)
|
||||
return True
|
||||
except subprocess.CalledProcessError:
|
||||
# If a service is not running we get a status code != 0 and
|
||||
@ -101,8 +99,7 @@ def service_is_enabled(service_name, strict_check=False):
|
||||
|
||||
"""
|
||||
try:
|
||||
process = run(['systemctl', 'is-enabled', service_name], check=True,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
|
||||
process = run(['systemctl', 'is-enabled', service_name], check=True)
|
||||
if not strict_check:
|
||||
return True
|
||||
|
||||
@ -179,14 +176,14 @@ def service_get_logs(service_name: str) -> str:
|
||||
command = [
|
||||
'journalctl', '--no-pager', '--lines=200', '--unit', service_name
|
||||
]
|
||||
process = run(command, check=False, stdout=subprocess.PIPE)
|
||||
process = run(command, check=False)
|
||||
return process.stdout.decode()
|
||||
|
||||
|
||||
def service_show(service_name: str) -> dict[str, str]:
|
||||
"""Return the status of the service in dictionary format."""
|
||||
command = ['systemctl', 'show', service_name]
|
||||
process = run(command, check=False, stdout=subprocess.PIPE)
|
||||
process = run(command, check=False)
|
||||
status = {}
|
||||
for line in process.stdout.decode().splitlines():
|
||||
parts = line.partition('=')
|
||||
@ -197,8 +194,7 @@ def service_show(service_name: str) -> dict[str, str]:
|
||||
|
||||
def service_action(service_name: str, action: str, check: bool = False):
|
||||
"""Perform the given action on the service_name."""
|
||||
run(['systemctl', action, service_name], stdout=subprocess.DEVNULL,
|
||||
check=check)
|
||||
run(['systemctl', action, service_name], check=check)
|
||||
|
||||
|
||||
def webserver_is_enabled(name, kind='config'):
|
||||
@ -469,8 +465,7 @@ def is_disk_image():
|
||||
return os.path.exists('/var/lib/freedombox/is-freedombox-disk-image')
|
||||
|
||||
|
||||
def run_apt_command(arguments, stdout=subprocess.DEVNULL,
|
||||
enable_triggers: bool = False):
|
||||
def run_apt_command(arguments, enable_triggers: bool = False):
|
||||
"""Run apt-get with provided arguments."""
|
||||
command = ['apt-get', '--assume-yes', '--quiet=2'] + arguments
|
||||
|
||||
@ -478,8 +473,7 @@ def run_apt_command(arguments, stdout=subprocess.DEVNULL,
|
||||
env['DEBIAN_FRONTEND'] = 'noninteractive'
|
||||
if not enable_triggers:
|
||||
env['FREEDOMBOX_INVOKED'] = 'true'
|
||||
process = run(command, stdin=subprocess.DEVNULL, stdout=stdout, env=env,
|
||||
check=False)
|
||||
process = run(command, stdin=subprocess.DEVNULL, env=env, check=False)
|
||||
return process.returncode
|
||||
|
||||
|
||||
@ -535,8 +529,7 @@ def apt_hold_freedombox():
|
||||
|
||||
def apt_unhold_freedombox():
|
||||
"""Remove any hold on freedombox package, and clear flag."""
|
||||
run(['apt-mark', 'unhold', 'freedombox'], stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL, check=False)
|
||||
run(['apt-mark', 'unhold', 'freedombox'], check=False)
|
||||
if apt_hold_flag.exists():
|
||||
apt_hold_flag.unlink()
|
||||
|
||||
|
||||
@ -6,7 +6,6 @@ pytest configuration for all tests.
|
||||
import importlib
|
||||
import os
|
||||
import pathlib
|
||||
import subprocess
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
@ -188,7 +187,8 @@ def fixture_mock_run_as_user():
|
||||
"""A fixture to override action_utils.run_as_user."""
|
||||
|
||||
def _bypass_runuser(*args, username, **kwargs):
|
||||
return subprocess.run(*args, **kwargs)
|
||||
from plinth import action_utils
|
||||
return action_utils.run(*args, **kwargs)
|
||||
|
||||
with patch('plinth.action_utils.run_as_user') as mock:
|
||||
mock.side_effect = _bypass_runuser
|
||||
|
||||
@ -244,8 +244,7 @@ def init(path: str, encryption: str,
|
||||
@privileged
|
||||
def info(path: str, encryption_passphrase: secret_str | None = None) -> dict:
|
||||
"""Show repository information."""
|
||||
process = _run(['borg', 'info', '--json', path], encryption_passphrase,
|
||||
stdout=subprocess.PIPE)
|
||||
process = _run(['borg', 'info', '--json', path], encryption_passphrase)
|
||||
return json.loads(process.stdout.decode())
|
||||
|
||||
|
||||
@ -255,7 +254,7 @@ def list_repo(path: str,
|
||||
encryption_passphrase: secret_str | None = None) -> dict:
|
||||
"""List repository contents."""
|
||||
process = _run(['borg', 'list', '--json', '--format="{comment}"', path],
|
||||
encryption_passphrase, stdout=subprocess.PIPE)
|
||||
encryption_passphrase)
|
||||
return json.loads(process.stdout.decode())
|
||||
|
||||
|
||||
@ -281,7 +280,7 @@ def remove_uploaded_archive(file_path: str):
|
||||
|
||||
def _get_borg_version():
|
||||
"""Return the version of borgbackup."""
|
||||
process = _run(['borg', '--version'], stdout=subprocess.PIPE)
|
||||
process = _run(['borg', '--version'])
|
||||
return process.stdout.decode().split()[1] # Example: "borg 1.1.9"
|
||||
|
||||
|
||||
@ -322,10 +321,7 @@ def _extract(archive_path, destination, encryption_passphrase, locations=None):
|
||||
|
||||
try:
|
||||
os.chdir(os.path.expanduser(destination))
|
||||
# TODO: with python 3.7 use subprocess.run with the 'capture_output'
|
||||
# argument
|
||||
process = _run(borg_call, encryption_passphrase, check=False,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
process = _run(borg_call, encryption_passphrase, check=False)
|
||||
if process.returncode != 0:
|
||||
error = process.stderr.decode()
|
||||
# Don't fail on the borg error when no files were matched
|
||||
@ -350,8 +346,7 @@ def export_tar(path: str, encryption_passphrase: secret_str | None = None):
|
||||
def _read_archive_file(archive, filepath, encryption_passphrase):
|
||||
"""Read the content of a file inside an archive."""
|
||||
borg_call = ['borg', 'extract', archive, filepath, '--stdout']
|
||||
return _run(borg_call, encryption_passphrase,
|
||||
stdout=subprocess.PIPE).stdout.decode()
|
||||
return _run(borg_call, encryption_passphrase).stdout.decode()
|
||||
|
||||
|
||||
@reraise_known_errors
|
||||
@ -365,8 +360,7 @@ def get_archive_apps(
|
||||
'borg', 'list', path, manifest_folder, '--format', '{path}{NEWLINE}'
|
||||
]
|
||||
try:
|
||||
borg_process = _run(borg_call, encryption_passphrase,
|
||||
stdout=subprocess.PIPE)
|
||||
borg_process = _run(borg_call, encryption_passphrase)
|
||||
manifest_path = borg_process.stdout.decode().strip()
|
||||
except subprocess.CalledProcessError:
|
||||
raise RuntimeError('Borg exited unsuccessfully')
|
||||
|
||||
@ -3,7 +3,6 @@
|
||||
|
||||
import pathlib
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
from plinth import action_utils
|
||||
from plinth.actions import privileged
|
||||
@ -30,7 +29,7 @@ def create_library(name: str):
|
||||
library.mkdir(mode=0o755) # Raise exception if already exists
|
||||
action_utils.run(
|
||||
['calibredb', '--with-library', library, 'list_categories'],
|
||||
stdout=subprocess.DEVNULL, check=False)
|
||||
check=False)
|
||||
|
||||
# Force systemd StateDirectory= logic to assign proper ownership to the
|
||||
# DynamicUser=
|
||||
|
||||
@ -1,8 +1,6 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""Set time zone with timedatectl."""
|
||||
|
||||
import subprocess
|
||||
|
||||
from plinth import action_utils
|
||||
from plinth.actions import privileged
|
||||
|
||||
@ -11,4 +9,4 @@ from plinth.actions import privileged
|
||||
def set_timezone(timezone: str):
|
||||
"""Set time zone with timedatectl."""
|
||||
command = ['timedatectl', 'set-timezone', timezone]
|
||||
action_utils.run(command, stdout=subprocess.DEVNULL, check=True)
|
||||
action_utils.run(command, check=True)
|
||||
|
||||
@ -111,7 +111,7 @@ def _run(args):
|
||||
Raise a RuntimeError on non-zero exit codes.
|
||||
"""
|
||||
try:
|
||||
result = action_utils.run(args, check=True, capture_output=True)
|
||||
result = action_utils.run(args, check=True)
|
||||
return result.stdout.decode()
|
||||
except subprocess.SubprocessError as subprocess_error:
|
||||
raise RuntimeError('Subprocess failed') from subprocess_error
|
||||
|
||||
@ -7,7 +7,6 @@ See: https://rspamd.com/doc/modules/dkim_signing.html
|
||||
import pathlib
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
from plinth import action_utils
|
||||
from plinth.actions import privileged
|
||||
@ -33,8 +32,7 @@ def get_dkim_public_key(domain: str) -> str:
|
||||
key_file = _keys_dir / f'{domain}.dkim.key'
|
||||
output = action_utils.run(
|
||||
['openssl', 'rsa', '-in',
|
||||
str(key_file), '-pubout'], stderr=subprocess.DEVNULL,
|
||||
check=True).stdout
|
||||
str(key_file), '-pubout'], check=True).stdout
|
||||
return ''.join(output.decode().splitlines()[1:-1])
|
||||
|
||||
|
||||
|
||||
@ -67,8 +67,7 @@ def set_firewall_backend(backend):
|
||||
|
||||
def _run_firewall_cmd(args):
|
||||
"""Run firewall-cmd command, discard output and check return value."""
|
||||
action_utils.run(['firewall-cmd'] + args, stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL, check=True)
|
||||
action_utils.run(['firewall-cmd'] + args, check=True)
|
||||
|
||||
|
||||
def _setup_local_service_protection():
|
||||
@ -161,7 +160,6 @@ def setup():
|
||||
"""Perform basic firewalld setup."""
|
||||
action_utils.service_enable('firewalld')
|
||||
action_utils.run(['firewall-cmd', '--set-default-zone=external'],
|
||||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
||||
check=True)
|
||||
set_firewall_backend('nftables')
|
||||
|
||||
|
||||
@ -202,8 +202,7 @@ def _get_default_branch(repo):
|
||||
return action_utils.run_as_user(
|
||||
['git', '-C',
|
||||
str(repo_path), 'symbolic-ref', '--short', 'HEAD'],
|
||||
username=REPO_DIR_OWNER, check=True,
|
||||
stdout=subprocess.PIPE).stdout.decode().strip()
|
||||
username=REPO_DIR_OWNER, check=True).stdout.decode().strip()
|
||||
|
||||
|
||||
def _get_repo_description(repo):
|
||||
@ -271,8 +270,7 @@ def _get_branches(repo):
|
||||
"""Return list of the branches in the repository."""
|
||||
process = action_utils.run_as_user(
|
||||
['git', '-C', repo, 'branch', '--format=%(refname:short)'],
|
||||
cwd=GIT_REPO_PATH, username=REPO_DIR_OWNER, check=True,
|
||||
stdout=subprocess.PIPE)
|
||||
cwd=GIT_REPO_PATH, username=REPO_DIR_OWNER, check=True)
|
||||
|
||||
return process.stdout.decode().strip().split()
|
||||
|
||||
|
||||
@ -5,7 +5,6 @@ import os
|
||||
import pathlib
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
from plinth import action_utils
|
||||
from plinth.actions import privileged, secret_str
|
||||
@ -63,9 +62,8 @@ def create_wiki(wiki_name: str, admin_name: str, admin_password: secret_str):
|
||||
pw_bytes = admin_password.encode()
|
||||
input_ = pw_bytes + b'\n' + pw_bytes
|
||||
action_utils.run(['ikiwiki', '-setup', SETUP_WIKI, wiki_name, admin_name],
|
||||
stdout=subprocess.PIPE, input=input_,
|
||||
stderr=subprocess.PIPE,
|
||||
env=dict(os.environ, PERL_UNICODE='AS'), check=True)
|
||||
input=input_, env=dict(os.environ,
|
||||
PERL_UNICODE='AS'), check=True)
|
||||
|
||||
|
||||
@privileged
|
||||
@ -74,9 +72,7 @@ def create_blog(blog_name: str, admin_name: str, admin_password: secret_str):
|
||||
pw_bytes = admin_password.encode()
|
||||
input_ = pw_bytes + b'\n' + pw_bytes
|
||||
action_utils.run(['ikiwiki', '-setup', SETUP_BLOG, blog_name, admin_name],
|
||||
stdout=subprocess.PIPE, input=input_,
|
||||
stderr=subprocess.PIPE, env=dict(os.environ,
|
||||
PERL_UNICODE='AS'))
|
||||
input=input_, env=dict(os.environ, PERL_UNICODE='AS'))
|
||||
|
||||
|
||||
@privileged
|
||||
|
||||
@ -27,8 +27,7 @@ def get_php_command():
|
||||
version = ''
|
||||
|
||||
try:
|
||||
process = action_utils.run(['dpkg', '-s', 'php'],
|
||||
stdout=subprocess.PIPE, check=True)
|
||||
process = action_utils.run(['dpkg', '-s', 'php'], check=True)
|
||||
for line in process.stdout.decode().splitlines():
|
||||
if line.startswith('Version:'):
|
||||
version = line.split(':')[-1].split('+')[0].strip()
|
||||
|
||||
@ -4,7 +4,6 @@ Configure Mumble server.
|
||||
"""
|
||||
|
||||
import pathlib
|
||||
import subprocess
|
||||
|
||||
import augeas
|
||||
|
||||
@ -38,7 +37,7 @@ def check_setup() -> bool:
|
||||
def set_super_user_password(password: secret_str):
|
||||
"""Set the superuser password with murmurd command."""
|
||||
action_utils.run(['murmurd', '-readsupw'], input=password.encode(),
|
||||
stdout=subprocess.DEVNULL, check=False)
|
||||
check=False)
|
||||
|
||||
|
||||
@privileged
|
||||
|
||||
@ -73,14 +73,13 @@ def setup():
|
||||
|
||||
|
||||
def _run_in_container(
|
||||
*args, capture_output: bool = False, check: bool = True,
|
||||
*args, check: bool = True,
|
||||
env: dict[str, str] | None = None) -> subprocess.CompletedProcess:
|
||||
"""Run a command inside the container."""
|
||||
env_args = [f'--env={key}={value}' for key, value in (env or {}).items()]
|
||||
command = ['podman', 'exec', '--user', WWW_DATA_UID
|
||||
] + env_args + [CONTAINER_NAME] + list(args)
|
||||
return action_utils.run(command, capture_output=capture_output,
|
||||
check=check)
|
||||
return action_utils.run(command, check=check)
|
||||
|
||||
|
||||
def _run_occ(*args, **kwargs) -> subprocess.CompletedProcess:
|
||||
@ -110,8 +109,7 @@ def disable():
|
||||
def get_override_domain():
|
||||
"""Return the domain name that Nextcloud is configured to override with."""
|
||||
try:
|
||||
domain = _run_occ('config:system:get', 'overwritehost',
|
||||
capture_output=True)
|
||||
domain = _run_occ('config:system:get', 'overwritehost')
|
||||
return domain.stdout.decode().strip()
|
||||
except subprocess.CalledProcessError:
|
||||
return None
|
||||
@ -160,8 +158,7 @@ def get_default_phone_region():
|
||||
""""Get the value of default_phone_region."""
|
||||
try:
|
||||
default_phone_region = _run_occ('config:system:get',
|
||||
'default_phone_region',
|
||||
capture_output=True)
|
||||
'default_phone_region')
|
||||
return default_phone_region.stdout.decode().strip()
|
||||
except subprocess.CalledProcessError:
|
||||
return None
|
||||
@ -247,7 +244,7 @@ def _nextcloud_wait_until_ready():
|
||||
|
||||
def _nextcloud_get_status():
|
||||
"""Return Nextcloud status such installed, in maintenance, etc."""
|
||||
output = _run_occ('status', '--output=json', capture_output=True)
|
||||
output = _run_occ('status', '--output=json')
|
||||
return json.loads(output.stdout)
|
||||
|
||||
|
||||
@ -282,8 +279,7 @@ def _configure_ldap():
|
||||
# Check if LDAP has already been configured. This is necessary because
|
||||
# if the setup proccess is rerun when updating the FredomBox app another
|
||||
# redundant LDAP config would be created.
|
||||
output = _run_occ('ldap:test-config', 's01', capture_output=True,
|
||||
check=False)
|
||||
output = _run_occ('ldap:test-config', 's01', check=False)
|
||||
if 'Invalid configID' in output.stdout.decode():
|
||||
_run_occ('ldap:create-empty-config')
|
||||
|
||||
@ -406,8 +402,7 @@ def _get_database_password():
|
||||
code = 'if (file_exists("/var/www/html/config/config.php")) {' \
|
||||
'include_once("/var/www/html/config/config.php");' \
|
||||
'print($CONFIG["dbpassword"] ?? ""); }'
|
||||
return _run_in_container('php', '-r', code,
|
||||
capture_output=True).stdout.decode().strip()
|
||||
return _run_in_container('php', '-r', code).stdout.decode().strip()
|
||||
|
||||
|
||||
def _create_redis_config():
|
||||
|
||||
@ -112,7 +112,7 @@ def _setup_firewall():
|
||||
try:
|
||||
process = action_utils.run(
|
||||
['firewall-cmd', '--zone', 'internal', '--list-interfaces'],
|
||||
stdout=subprocess.PIPE, check=True)
|
||||
check=True)
|
||||
return 'tun+' in process.stdout.decode().strip().split()
|
||||
except subprocess.CalledProcessError:
|
||||
return True # Safer
|
||||
@ -164,7 +164,7 @@ def _is_renewable(cert_name):
|
||||
|
||||
process = action_utils.run(
|
||||
['openssl', 'x509', '-noout', '-enddate', '-in',
|
||||
str(cert_path)], check=True, stdout=subprocess.PIPE)
|
||||
str(cert_path)], check=True)
|
||||
date_string = process.stdout.decode().strip().partition('=')[2]
|
||||
cert_expiry_time = datetime.datetime.strptime(date_string,
|
||||
'%b %d %H:%M:%S %Y GMT')
|
||||
|
||||
@ -105,7 +105,7 @@ def _create_share_name(mount_point):
|
||||
def _define_open_share(name, path, windows_filesystem=False):
|
||||
"""Define an open samba share."""
|
||||
try:
|
||||
_conf_command(['delshare', name], stderr=subprocess.DEVNULL)
|
||||
_conf_command(['delshare', name])
|
||||
except subprocess.CalledProcessError:
|
||||
pass
|
||||
_conf_command(['addshare', name, path, 'writeable=y', 'guest_ok=y'])
|
||||
@ -117,7 +117,7 @@ def _define_open_share(name, path, windows_filesystem=False):
|
||||
def _define_group_share(name, path, windows_filesystem=False):
|
||||
"""Define a group samba share."""
|
||||
try:
|
||||
_conf_command(['delshare', name], stderr=subprocess.DEVNULL)
|
||||
_conf_command(['delshare', name])
|
||||
except subprocess.CalledProcessError:
|
||||
pass
|
||||
_conf_command(['addshare', name, path, 'writeable=y', 'guest_ok=n'])
|
||||
@ -130,7 +130,7 @@ def _define_group_share(name, path, windows_filesystem=False):
|
||||
def _define_homes_share(name, path):
|
||||
"""Define a samba share for private homes."""
|
||||
try:
|
||||
_conf_command(['delshare', name], stderr=subprocess.DEVNULL)
|
||||
_conf_command(['delshare', name])
|
||||
except subprocess.CalledProcessError:
|
||||
pass
|
||||
userpath = os.path.join(path, '%u')
|
||||
|
||||
@ -4,7 +4,6 @@
|
||||
import os
|
||||
import pathlib
|
||||
import signal
|
||||
import subprocess
|
||||
|
||||
import augeas
|
||||
import dbus
|
||||
@ -21,7 +20,7 @@ def setup(old_version: int):
|
||||
"""Configure snapper."""
|
||||
# Check if root config exists.
|
||||
command = ['snapper', 'list-configs']
|
||||
process = action_utils.run(command, stdout=subprocess.PIPE, check=True)
|
||||
process = action_utils.run(command, check=True)
|
||||
output = process.stdout.decode()
|
||||
|
||||
# Create root config if needed.
|
||||
@ -137,8 +136,7 @@ def _remove_fstab_entry(mount_point):
|
||||
|
||||
def _systemd_path_escape(path):
|
||||
"""Escape a string using systemd path rules."""
|
||||
process = action_utils.run(['systemd-escape', '--path', path],
|
||||
stdout=subprocess.PIPE, check=True)
|
||||
process = action_utils.run(['systemd-escape', '--path', path], check=True)
|
||||
return process.stdout.decode().strip()
|
||||
|
||||
|
||||
@ -146,8 +144,7 @@ def _get_subvolume_path(mount_point):
|
||||
"""Return the subvolume path for .snapshots in a filesystem."""
|
||||
# -o causes the list of subvolumes directly under the given mount point
|
||||
process = action_utils.run(
|
||||
['btrfs', 'subvolume', 'list', '-o', mount_point],
|
||||
stdout=subprocess.PIPE, check=True)
|
||||
['btrfs', 'subvolume', 'list', '-o', mount_point], check=True)
|
||||
for line in process.stdout.decode().splitlines():
|
||||
entry = line.split()
|
||||
|
||||
@ -224,8 +221,7 @@ def _parse_number(number):
|
||||
@privileged
|
||||
def list_() -> list[dict[str, str]]:
|
||||
"""List snapshots."""
|
||||
process = action_utils.run(['snapper', 'list'], stdout=subprocess.PIPE,
|
||||
check=True)
|
||||
process = action_utils.run(['snapper', 'list'], check=True)
|
||||
lines = process.stdout.decode().splitlines()
|
||||
|
||||
keys = ('number', 'is_default', 'is_active', 'type', 'pre_number', 'date',
|
||||
@ -247,7 +243,7 @@ def list_() -> list[dict[str, str]]:
|
||||
def _get_default_snapshot():
|
||||
"""Return the default snapshot by looking at default subvolume."""
|
||||
command = ['btrfs', 'subvolume', 'get-default', '/']
|
||||
process = action_utils.run(command, stdout=subprocess.PIPE, check=True)
|
||||
process = action_utils.run(command, check=True)
|
||||
output = process.stdout.decode()
|
||||
|
||||
output_parts = output.split()
|
||||
@ -297,7 +293,7 @@ def set_config(config: list[str]):
|
||||
|
||||
def _get_config():
|
||||
command = ['snapper', 'get-config']
|
||||
process = action_utils.run(command, stdout=subprocess.PIPE, check=True)
|
||||
process = action_utils.run(command, check=True)
|
||||
lines = process.stdout.decode().splitlines()
|
||||
config = {}
|
||||
for line in lines[2:]:
|
||||
|
||||
@ -4,7 +4,6 @@
|
||||
import pathlib
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
from plinth import action_utils, utils
|
||||
@ -145,7 +144,7 @@ def set_domain(domain: str):
|
||||
def _get_config_value(key: str) -> str:
|
||||
"""Return the value of a property from the configuration file."""
|
||||
process = action_utils.run(['plget', key], input=CONFIG_FILE.read_bytes(),
|
||||
stdout=subprocess.PIPE, check=True)
|
||||
check=True)
|
||||
return process.stdout.decode().strip()
|
||||
|
||||
|
||||
|
||||
@ -90,8 +90,7 @@ def _resize_ext4(device, requested_partition, _free_space, _mount_point):
|
||||
requested_partition['number'])
|
||||
try:
|
||||
command = ['resize2fs', partition_device]
|
||||
action_utils.run(command, stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL, check=True)
|
||||
action_utils.run(command, check=True)
|
||||
except subprocess.CalledProcessError as exception:
|
||||
raise RuntimeError(f'Error expanding filesystem: {exception}')
|
||||
|
||||
@ -100,7 +99,7 @@ def _resize_btrfs(_device, _requested_partition, _free_space, mount_point='/'):
|
||||
"""Resize a btrfs file system inside a partition."""
|
||||
try:
|
||||
command = ['btrfs', 'filesystem', 'resize', 'max', mount_point]
|
||||
action_utils.run(command, stdout=subprocess.DEVNULL, check=True)
|
||||
action_utils.run(command, check=True)
|
||||
except subprocess.CalledProcessError as exception:
|
||||
raise RuntimeError(f'Error expanding filesystem: {exception}')
|
||||
|
||||
@ -167,7 +166,7 @@ def _get_partitions_and_free_spaces(device, partition_number):
|
||||
command = [
|
||||
'parted', '--machine', '--script', device, 'unit', 'B', 'print', 'free'
|
||||
]
|
||||
process = action_utils.run(command, stdout=subprocess.PIPE, check=True)
|
||||
process = action_utils.run(command, check=True)
|
||||
|
||||
requested_partition = None
|
||||
free_spaces = []
|
||||
|
||||
@ -72,7 +72,7 @@ distribution_info: dict = {
|
||||
|
||||
def _apt_run(arguments: list[str]):
|
||||
"""Run an apt command and ensure that output is written to stdout."""
|
||||
returncode = action_utils.run_apt_command(arguments, stdout=None)
|
||||
returncode = action_utils.run_apt_command(arguments)
|
||||
if returncode:
|
||||
raise RuntimeError(
|
||||
f'Apt command failed with return code: {returncode}')
|
||||
|
||||
@ -132,8 +132,7 @@ def release_held_packages():
|
||||
check=True).stdout.decode().strip()
|
||||
holds = output.split('\n')
|
||||
logger.info('Releasing package holds: %s', holds)
|
||||
action_utils.run(['apt-mark', 'unhold', *holds], stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL, check=True)
|
||||
action_utils.run(['apt-mark', 'unhold', *holds], check=True)
|
||||
|
||||
|
||||
@privileged
|
||||
|
||||
@ -24,7 +24,7 @@ def test_apt_run(run):
|
||||
distupgrade._apt_run(args)
|
||||
assert run.call_args.args == \
|
||||
(['apt-get', '--assume-yes', '--quiet=2'] + args,)
|
||||
assert not run.call_args.kwargs['stdout']
|
||||
assert run.call_args.kwargs['stdout'] == subprocess.PIPE
|
||||
|
||||
run.return_value.returncode = 10
|
||||
with pytest.raises(RuntimeError):
|
||||
@ -305,9 +305,8 @@ def test_apt_hold_packages(run, tmp_path):
|
||||
stderr=subprocess.PIPE, check=True),
|
||||
call(['apt-mark', 'unhold', 'package2'], stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE, check=True),
|
||||
call(['apt-mark', 'unhold', 'freedombox'],
|
||||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
||||
check=False),
|
||||
call(['apt-mark', 'unhold', 'freedombox'], stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE, check=False),
|
||||
]
|
||||
assert run.call_args_list == expected_call
|
||||
|
||||
|
||||
@ -148,7 +148,7 @@ def _create_organizational_unit(unit):
|
||||
action_utils.run([
|
||||
'ldapsearch', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///', '-s',
|
||||
'base', '-b', distinguished_name, '(objectclass=*)'
|
||||
], stdout=subprocess.DEVNULL, check=True)
|
||||
], check=True)
|
||||
return # Already exists
|
||||
except subprocess.CalledProcessError:
|
||||
input = '''
|
||||
@ -158,7 +158,7 @@ objectClass: organizationalUnit
|
||||
ou: {unit}'''.format(unit=unit)
|
||||
action_utils.run(
|
||||
['ldapadd', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'],
|
||||
input=input.encode(), stdout=subprocess.DEVNULL, check=True)
|
||||
input=input.encode(), check=True)
|
||||
|
||||
|
||||
def _setup_admin():
|
||||
@ -167,7 +167,7 @@ def _setup_admin():
|
||||
'ldapsearch', '-Q', '-L', '-L', '-L', '-Y', 'EXTERNAL', '-H',
|
||||
'ldapi:///', '-s', 'base', '-b', 'olcDatabase={1}mdb,cn=config',
|
||||
'(objectclass=*)', 'olcRootDN', 'olcRootPW'
|
||||
], check=True, stdout=subprocess.PIPE)
|
||||
], check=True)
|
||||
ldap_object = {}
|
||||
for line in process.stdout.decode().splitlines():
|
||||
if line:
|
||||
@ -177,7 +177,7 @@ def _setup_admin():
|
||||
if 'olcRootPW' in ldap_object:
|
||||
action_utils.run(
|
||||
['ldapmodify', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'],
|
||||
check=True, stdout=subprocess.DEVNULL, input=b'''
|
||||
check=True, input=b'''
|
||||
dn: olcDatabase={1}mdb,cn=config
|
||||
changetype: modify
|
||||
delete: olcRootPW''')
|
||||
@ -186,7 +186,7 @@ delete: olcRootPW''')
|
||||
if ldap_object['olcRootDN'] != root_dn:
|
||||
action_utils.run(
|
||||
['ldapmodify', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'],
|
||||
check=True, stdout=subprocess.DEVNULL, input=b'''
|
||||
check=True, input=b'''
|
||||
dn: olcDatabase={1}mdb,cn=config
|
||||
changetype: modify
|
||||
replace: olcRootDN
|
||||
@ -207,7 +207,7 @@ def _setup_ldap_ppolicy() -> bool:
|
||||
try:
|
||||
action_utils.run(
|
||||
['ldapmodify', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'],
|
||||
check=True, stdout=subprocess.DEVNULL, input=b'''
|
||||
check=True, input=b'''
|
||||
dn: cn=module{0},cn=config
|
||||
changetype: modify
|
||||
add: olcModuleLoad
|
||||
@ -221,7 +221,7 @@ olcModuleLoad: ppolicy''')
|
||||
action_utils.run([
|
||||
'ldapadd', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///', '-f',
|
||||
'/etc/ldap/schema/namedobject.ldif'
|
||||
], check=True, stdout=subprocess.DEVNULL)
|
||||
], check=True)
|
||||
except subprocess.CalledProcessError as error:
|
||||
if error.returncode != 80: # Schema already added
|
||||
raise
|
||||
@ -230,7 +230,7 @@ olcModuleLoad: ppolicy''')
|
||||
try:
|
||||
action_utils.run(
|
||||
['ldapadd', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'], check=True,
|
||||
stdout=subprocess.DEVNULL, input=b'''
|
||||
input=b'''
|
||||
dn: cn=DefaultPPolicy,ou=policies,dc=thisbox
|
||||
cn: DefaultPPolicy
|
||||
objectClass: pwdPolicy
|
||||
@ -246,7 +246,7 @@ pwdLockout: TRUE''')
|
||||
try:
|
||||
action_utils.run(
|
||||
['ldapadd', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///'], check=True,
|
||||
stdout=subprocess.DEVNULL, input=b'''
|
||||
input=b'''
|
||||
dn: olcOverlay={0}ppolicy,olcDatabase={1}mdb,cn=config
|
||||
objectClass: olcOverlayConfig
|
||||
objectClass: olcPPolicyConfig
|
||||
@ -456,7 +456,7 @@ def rename_user(old_username: str, new_username: str):
|
||||
|
||||
def _set_user_password(username, password):
|
||||
"""Set a user's password."""
|
||||
process = _run(['slappasswd', '-s', password], stdout=subprocess.PIPE)
|
||||
process = _run(['slappasswd', '-s', password])
|
||||
password = process.stdout.decode().strip()
|
||||
_run(['ldapsetpasswd', username, password])
|
||||
|
||||
@ -468,7 +468,7 @@ def _set_samba_user(username, password):
|
||||
"""
|
||||
proc = action_utils.run(['smbpasswd', '-a', '-s', username],
|
||||
input='{0}\n{0}\n'.format(password).encode(),
|
||||
stderr=subprocess.PIPE, check=False)
|
||||
check=False)
|
||||
if proc.returncode != 0:
|
||||
raise RuntimeError('Unable to add Samba user: ', proc.stderr)
|
||||
|
||||
@ -514,7 +514,7 @@ def _get_admin_users():
|
||||
def _get_user_ids(username: str) -> str | None:
|
||||
"""Get user information in format like `id` command."""
|
||||
try:
|
||||
process = _run(['ldapid', username], stdout=subprocess.PIPE)
|
||||
process = _run(['ldapid', username])
|
||||
except subprocess.CalledProcessError as error:
|
||||
if error.returncode == 1:
|
||||
# User doesn't exist
|
||||
@ -533,7 +533,7 @@ def _user_exists(username):
|
||||
def _get_group_users(groupname):
|
||||
"""Return list of members in the group."""
|
||||
try:
|
||||
process = _run(['ldapgid', '-P', groupname], stdout=subprocess.PIPE)
|
||||
process = _run(['ldapgid', '-P', groupname])
|
||||
except subprocess.CalledProcessError:
|
||||
return [] # Group does not exist
|
||||
|
||||
@ -709,6 +709,4 @@ def _flush_cache():
|
||||
def _run(arguments, check=True, **kwargs):
|
||||
"""Run a command. Check return code and suppress output by default."""
|
||||
env = dict(os.environ, LDAPSCRIPTS_CONF=LDAPSCRIPTS_CONF)
|
||||
kwargs['stdout'] = kwargs.get('stdout', subprocess.DEVNULL)
|
||||
kwargs['stderr'] = kwargs.get('stderr', subprocess.DEVNULL)
|
||||
return action_utils.run(arguments, env=env, check=check, **kwargs)
|
||||
|
||||
@ -33,15 +33,13 @@ def get_configuration() -> dict[str, str]:
|
||||
"""Return the current configuration."""
|
||||
configuration = {}
|
||||
try:
|
||||
process = action_utils.run(['zoph', '--dump-config'],
|
||||
stdout=subprocess.PIPE, check=True)
|
||||
process = action_utils.run(['zoph', '--dump-config'], check=True)
|
||||
except subprocess.CalledProcessError as exception:
|
||||
if exception.returncode != 96:
|
||||
raise
|
||||
|
||||
_zoph_setup_cli_user()
|
||||
process = action_utils.run(['zoph', '--dump-config'],
|
||||
stdout=subprocess.PIPE, check=True)
|
||||
process = action_utils.run(['zoph', '--dump-config'], check=True)
|
||||
|
||||
for line in process.stdout.decode().splitlines():
|
||||
name, value = line.partition(':')[::2]
|
||||
@ -146,8 +144,7 @@ def is_configured() -> bool | None:
|
||||
"""Return whether zoph app is configured."""
|
||||
try:
|
||||
process = action_utils.run(
|
||||
['zoph', '--get-config', 'interface.user.remote'],
|
||||
stdout=subprocess.PIPE, check=True)
|
||||
['zoph', '--get-config', 'interface.user.remote'], check=True)
|
||||
return process.stdout.decode().strip() == 'true'
|
||||
except (FileNotFoundError, subprocess.CalledProcessError):
|
||||
return None
|
||||
|
||||
@ -81,59 +81,55 @@ def test_is_enabled(service_is_enabled, daemon):
|
||||
@patch('subprocess.run')
|
||||
def test_enable(subprocess_run, apps_init, app_list, mock_privileged, daemon):
|
||||
"""Test that enabling the daemon works."""
|
||||
common_args1 = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
check=False)
|
||||
common_args2 = dict(stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
|
||||
check=False)
|
||||
common_args = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
check=False)
|
||||
daemon.enable()
|
||||
subprocess_run.assert_has_calls(
|
||||
[call(['systemctl', 'enable', 'test-unit'], **common_args1)])
|
||||
[call(['systemctl', 'enable', 'test-unit'], **common_args)])
|
||||
subprocess_run.assert_any_call(['systemctl', 'start', 'test-unit'],
|
||||
**common_args2)
|
||||
**common_args)
|
||||
|
||||
subprocess_run.reset_mock()
|
||||
daemon.alias = 'test-unit-2'
|
||||
daemon.enable()
|
||||
|
||||
subprocess_run.assert_has_calls([
|
||||
call(['systemctl', 'enable', 'test-unit'], **common_args1),
|
||||
call(['systemctl', 'start', 'test-unit'], **common_args2),
|
||||
call(['systemctl', 'enable', 'test-unit-2'], **common_args1),
|
||||
call(['systemctl', 'start', 'test-unit-2'], **common_args2),
|
||||
call(['systemctl', 'enable', 'test-unit'], **common_args),
|
||||
call(['systemctl', 'start', 'test-unit'], **common_args),
|
||||
call(['systemctl', 'enable', 'test-unit-2'], **common_args),
|
||||
call(['systemctl', 'start', 'test-unit-2'], **common_args),
|
||||
])
|
||||
subprocess_run.assert_any_call(['systemctl', 'start', 'test-unit'],
|
||||
**common_args2)
|
||||
**common_args)
|
||||
subprocess_run.assert_any_call(['systemctl', 'start', 'test-unit-2'],
|
||||
**common_args2)
|
||||
**common_args)
|
||||
|
||||
|
||||
@patch('plinth.app.apps_init')
|
||||
@patch('subprocess.run')
|
||||
def test_disable(subprocess_run, apps_init, app_list, mock_privileged, daemon):
|
||||
"""Test that disabling the daemon works."""
|
||||
common_args1 = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
check=False)
|
||||
common_args2 = dict(stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
|
||||
check=False)
|
||||
common_args = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
check=False)
|
||||
daemon.disable()
|
||||
subprocess_run.assert_has_calls(
|
||||
[call(['systemctl', 'disable', 'test-unit'], **common_args1)])
|
||||
[call(['systemctl', 'disable', 'test-unit'], **common_args)])
|
||||
subprocess_run.assert_any_call(['systemctl', 'stop', 'test-unit'],
|
||||
**common_args2)
|
||||
**common_args)
|
||||
|
||||
subprocess_run.reset_mock()
|
||||
daemon.alias = 'test-unit-2'
|
||||
daemon.disable()
|
||||
subprocess_run.assert_has_calls([
|
||||
call(['systemctl', 'disable', 'test-unit'], **common_args1),
|
||||
call(['systemctl', 'stop', 'test-unit'], **common_args2),
|
||||
call(['systemctl', 'disable', 'test-unit-2'], **common_args1),
|
||||
call(['systemctl', 'stop', 'test-unit-2'], **common_args2),
|
||||
call(['systemctl', 'disable', 'test-unit'], **common_args),
|
||||
call(['systemctl', 'stop', 'test-unit'], **common_args),
|
||||
call(['systemctl', 'disable', 'test-unit-2'], **common_args),
|
||||
call(['systemctl', 'stop', 'test-unit-2'], **common_args),
|
||||
])
|
||||
subprocess_run.assert_any_call(['systemctl', 'stop', 'test-unit'],
|
||||
**common_args2)
|
||||
**common_args)
|
||||
subprocess_run.assert_any_call(['systemctl', 'stop', 'test-unit-2'],
|
||||
**common_args2)
|
||||
**common_args)
|
||||
|
||||
|
||||
@patch('plinth.action_utils.service_is_running')
|
||||
@ -153,10 +149,8 @@ def test_is_running(service_is_running, daemon):
|
||||
def test_ensure_running(subprocess_run, service_is_running, apps_init,
|
||||
app_list, mock_privileged, daemon):
|
||||
"""Test that checking that the daemon is running works."""
|
||||
common_args1 = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
check=False)
|
||||
common_args2 = dict(stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
|
||||
check=False)
|
||||
common_args = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
check=False)
|
||||
service_is_running.return_value = True
|
||||
with daemon.ensure_running() as starting_state:
|
||||
assert starting_state
|
||||
@ -168,14 +162,14 @@ def test_ensure_running(subprocess_run, service_is_running, apps_init,
|
||||
with daemon.ensure_running() as starting_state:
|
||||
assert not starting_state
|
||||
assert subprocess_run.mock_calls == [
|
||||
call(['systemctl', 'enable', 'test-unit'], **common_args1),
|
||||
call(['systemctl', 'start', 'test-unit'], **common_args2),
|
||||
call(['systemctl', 'enable', 'test-unit'], **common_args),
|
||||
call(['systemctl', 'start', 'test-unit'], **common_args),
|
||||
]
|
||||
subprocess_run.reset_mock()
|
||||
|
||||
assert subprocess_run.mock_calls == [
|
||||
call(['systemctl', 'disable', 'test-unit'], **common_args1),
|
||||
call(['systemctl', 'stop', 'test-unit'], **common_args2),
|
||||
call(['systemctl', 'disable', 'test-unit'], **common_args),
|
||||
call(['systemctl', 'stop', 'test-unit'], **common_args),
|
||||
]
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user