diff --git a/plinth/action_utils.py b/plinth/action_utils.py index dfe012748..4ff7c1654 100644 --- a/plinth/action_utils.py +++ b/plinth/action_utils.py @@ -209,8 +209,7 @@ def webserver_is_enabled(name, kind='config'): option_map = {'config': '-c', 'site': '-s', 'module': '-m'} try: # Don't print anything on the terminal - subprocess.check_output(['a2query', option_map[kind], name], - stderr=subprocess.STDOUT) + run(['a2query', option_map[kind], name], check=True) return True except subprocess.CalledProcessError: return False @@ -232,7 +231,7 @@ def webserver_enable(name, kind='config', apply_changes=True): 'site': 'a2ensite', 'module': 'a2enmod' } - subprocess.check_output([command_map[kind], name]) + run([command_map[kind], name], check=True) action_required = 'restart' if kind == 'module' else 'reload' @@ -261,7 +260,7 @@ def webserver_disable(name, kind='config', apply_changes=True): 'site': 'a2dissite', 'module': 'a2dismod' } - subprocess.check_output([command_map[kind], name]) + run([command_map[kind], name], check=True) action_required = 'restart' if kind == 'module' else 'reload' @@ -389,7 +388,7 @@ def get_ip_addresses() -> list[dict[str, str | bool]]: """Return a list of IP addresses assigned to the system.""" addresses = [] - output = subprocess.check_output(['ip', '-o', 'addr']) + output = run(['ip', '-o', 'addr'], check=True).stdout for line in output.decode().splitlines(): parts = line.split() address: dict[str, str | bool] = { @@ -415,7 +414,7 @@ def get_ip_addresses() -> list[dict[str, str | bool]]: def get_hostname(): """Return the current hostname.""" - return subprocess.check_output(['hostname']).decode().strip() + return run(['hostname'], check=True).stdout.decode().strip() def dpkg_reconfigure(package, config): @@ -457,7 +456,7 @@ def debconf_set_selections(presets): pass presets = '\n'.join(presets) - subprocess.check_output(['debconf-set-selections'], input=presets.encode()) + run(['debconf-set-selections'], input=presets.encode(), check=True) def is_disk_image(): @@ -501,8 +500,8 @@ def apt_hold(packages): held_packages = [] try: for package in packages: - current_hold = subprocess.check_output( - ['apt-mark', 'showhold', package]) + current_hold = run(['apt-mark', 'showhold', package], + check=True).stdout if not current_hold: process = run(['apt-mark', 'hold', package], check=False) if process.returncode == 0: # success @@ -517,8 +516,8 @@ def apt_hold(packages): @contextmanager def apt_hold_freedombox(): """Prevent freedombox package from being removed during apt operations.""" - current_hold = subprocess.check_output( - ['apt-mark', 'showhold', 'freedombox']) + current_hold = run(['apt-mark', 'showhold', 'freedombox'], + check=True).stdout try: if current_hold: # Package is already held, possibly by administrator. @@ -548,7 +547,7 @@ def is_package_manager_busy(): is open which indicates that the package manager is busy""" LOCK_FILE = '/var/lib/dpkg/lock' try: - subprocess.check_output(['lsof', LOCK_FILE]) + run(['lsof', LOCK_FILE], check=True) return True except subprocess.CalledProcessError: return False diff --git a/plinth/modules/ejabberd/privileged.py b/plinth/modules/ejabberd/privileged.py index 49d7a68a8..85810d4cf 100644 --- a/plinth/modules/ejabberd/privileged.py +++ b/plinth/modules/ejabberd/privileged.py @@ -89,7 +89,7 @@ def setup(domain_name: str): _upgrade_config(domain_name) try: - subprocess.check_output(['ejabberdctl', 'restart']) + action_utils.run(['ejabberdctl', 'restart'], check=True) except subprocess.CalledProcessError as err: logger.warn('Failed to restart ejabberd with new configuration: %s', err) @@ -146,10 +146,10 @@ def pre_change_hostname(old_hostname: str, new_hostname: str): return action_utils.run(['ejabberdctl', 'backup', EJABBERD_BACKUP], check=False) - subprocess.check_output([ + action_utils.run([ 'ejabberdctl', 'mnesia-change-nodename', 'ejabberd@' + old_hostname, 'ejabberd@' + new_hostname, EJABBERD_BACKUP, EJABBERD_BACKUP_NEW - ]) + ], check=True) os.remove(EJABBERD_BACKUP) @@ -172,8 +172,8 @@ def change_hostname(): # restore backup database if os.path.exists(EJABBERD_BACKUP_NEW): try: - subprocess.check_output( - ['ejabberdctl', 'restore', EJABBERD_BACKUP_NEW]) + action_utils.run(['ejabberdctl', 'restore', EJABBERD_BACKUP_NEW], + check=True) os.remove(EJABBERD_BACKUP_NEW) except subprocess.CalledProcessError as err: logger.error('Failed to restore ejabberd backup database: %s', err) @@ -365,8 +365,8 @@ def configure_turn(turn_server_config: dict[str, Any], managed: bool): def _get_version(): """Get the current ejabberd version.""" try: - output = subprocess.check_output(['ejabberdctl', - 'status']).decode('utf-8') + output = action_utils.run(['ejabberdctl', 'status'], + check=True).stdout.decode('utf-8') except subprocess.CalledProcessError: return None diff --git a/plinth/modules/email/privileged/dkim.py b/plinth/modules/email/privileged/dkim.py index 62195a5d9..4fa401efc 100644 --- a/plinth/modules/email/privileged/dkim.py +++ b/plinth/modules/email/privileged/dkim.py @@ -31,9 +31,10 @@ def get_dkim_public_key(domain: str) -> str: """Privileged action to get the public key from DKIM key.""" _validate_domain_name(domain) key_file = _keys_dir / f'{domain}.dkim.key' - output = subprocess.check_output( + output = action_utils.run( ['openssl', 'rsa', '-in', - str(key_file), '-pubout'], stderr=subprocess.DEVNULL) + str(key_file), '-pubout'], stderr=subprocess.DEVNULL, + check=True).stdout return ''.join(output.decode().splitlines()[1:-1]) diff --git a/plinth/modules/firewall/privileged.py b/plinth/modules/firewall/privileged.py index 33f0dd7bf..9c326d1ba 100644 --- a/plinth/modules/firewall/privileged.py +++ b/plinth/modules/firewall/privileged.py @@ -176,7 +176,8 @@ def get_config() -> FirewallConfig: config: FirewallConfig = {} # Get the default zone. - output = subprocess.check_output(['firewall-cmd', '--get-default-zone']) + output = action_utils.run(['firewall-cmd', '--get-default-zone'], + check=True).stdout config['default_zone'] = output.decode().strip() # Load Augeas lens. @@ -191,8 +192,9 @@ def get_config() -> FirewallConfig: config['backend'] = aug.get('FirewallBackend') # Get the list of direct passthroughs. - output = subprocess.check_output( - ['firewall-cmd', '--direct', '--get-all-passthroughs']) + output = action_utils.run( + ['firewall-cmd', '--direct', '--get-all-passthroughs'], + check=True).stdout config['passthroughs'] = output.decode().strip().split('\n') return config diff --git a/plinth/modules/gitweb/privileged.py b/plinth/modules/gitweb/privileged.py index f64af5c65..70e4b765f 100644 --- a/plinth/modules/gitweb/privileged.py +++ b/plinth/modules/gitweb/privileged.py @@ -44,9 +44,9 @@ def setup(): def _get_global_default_branch(): """Get globally configured default branch name.""" try: - default_branch = subprocess.check_output( - ['git', 'config', '--global', '--get', - 'init.defaultBranch']).decode().strip() + default_branch = action_utils.run( + ['git', 'config', '--global', '--get', 'init.defaultBranch'], + check=True).stdout.decode().strip() except subprocess.CalledProcessError as exception: if exception.returncode == 1: # Default branch not configured return None diff --git a/plinth/modules/letsencrypt/privileged.py b/plinth/modules/letsencrypt/privileged.py index 3002a0ef8..f96184cf2 100644 --- a/plinth/modules/letsencrypt/privileged.py +++ b/plinth/modules/letsencrypt/privileged.py @@ -8,7 +8,6 @@ import os import pathlib import re import shutil -import subprocess import sys from typing import Any @@ -28,8 +27,9 @@ WEB_ROOT_PATH = '/var/www/html' def _get_certificate_expiry(domain: str) -> str: """Return the expiry date of a certificate.""" certificate_file = os.path.join(le.LIVE_DIRECTORY, domain, 'cert.pem') - output = subprocess.check_output( - ['openssl', 'x509', '-enddate', '-noout', '-in', certificate_file]) + output = action_utils.run( + ['openssl', 'x509', '-enddate', '-noout', '-in', certificate_file], + check=True).stdout return output.decode().strip().split('=')[1] @@ -41,7 +41,8 @@ def _get_modified_time(domain: str) -> int: def _get_validity_status(domain: str) -> str: """Return validity status of a certificate; valid, revoked, expired.""" - output = subprocess.check_output(['certbot', 'certificates', '-d', domain]) + output = action_utils.run(['certbot', 'certificates', '-d', domain], + check=True).stdout line = output.decode(sys.stdout.encoding) match = re.search(r'INVALID: (.*)\)', line) diff --git a/plinth/modules/networks/privileged.py b/plinth/modules/networks/privileged.py index 4f671a97b..05eaf2be6 100644 --- a/plinth/modules/networks/privileged.py +++ b/plinth/modules/networks/privileged.py @@ -29,8 +29,9 @@ def _sort_interfaces(interfaces: list[str]) -> list[str]: def _get_interfaces() -> dict[str, list[str]]: """Return all network interfaces by their type.""" - output = subprocess.check_output( - ['nmcli', '--terse', '--fields', 'type,device', 'device']) + output = action_utils.run( + ['nmcli', '--terse', '--fields', 'type,device', 'device'], + check=True).stdout interfaces = collections.defaultdict(list) for line in output.decode().splitlines(): type_, _, interface = line.partition(':') @@ -45,8 +46,9 @@ def _get_interfaces() -> dict[str, list[str]]: def _add_connection(connection_name: str, interface: str, remaining_arguments: list[str]): """Add an Ethernet/Wi-Fi connection of type regular or shared.""" - output = subprocess.check_output( - ['nmcli', '--terse', '--fields', 'name,device', 'con', 'show']) + output = action_utils.run( + ['nmcli', '--terse', '--fields', 'name,device', 'con', 'show'], + check=True).stdout lines = output.decode().splitlines() if f'{connection_name}:{interface}' in lines: logging.info('Connection %s already exists for device %s, not adding.', diff --git a/plinth/modules/networks/tests/test_privileged.py b/plinth/modules/networks/tests/test_privileged.py index b18de501e..9bcefa2bf 100644 --- a/plinth/modules/networks/tests/test_privileged.py +++ b/plinth/modules/networks/tests/test_privileged.py @@ -6,10 +6,10 @@ from unittest.mock import patch from .. import privileged -@patch('subprocess.check_output') -def test_get_interfaces(check_output): +@patch('subprocess.run') +def test_get_interfaces(run): """Test returning list of network interfaces in sorted order.""" - check_output.return_value = '\n'.join([ + run.return_value.stdout = '\n'.join([ 'ethernet:ve-fbx-testing', 'ethernet:enp39s0', 'ethernet:enp32s1', diff --git a/plinth/modules/samba/privileged.py b/plinth/modules/samba/privileged.py index d03b39f67..db407a596 100644 --- a/plinth/modules/samba/privileged.py +++ b/plinth/modules/samba/privileged.py @@ -155,7 +155,7 @@ def _get_mount_point(path): def _get_shares() -> list[dict[str, str]]: """Get shares.""" shares = [] - output = subprocess.check_output(['net', 'conf', 'list']) + output = action_utils.run(['net', 'conf', 'list'], check=True).stdout config = configparser.RawConfigParser() config.read_string(output.decode()) for name in config.sections(): @@ -272,7 +272,7 @@ def get_shares() -> list[dict[str, str]]: @privileged def get_users() -> list[str]: """Get users from Samba database.""" - output = subprocess.check_output(['pdbedit', '-L']).decode() + output = action_utils.run(['pdbedit', '-L'], check=True).stdout.decode() samba_users = [line.split(':')[0] for line in output.split()] return samba_users diff --git a/plinth/modules/upgrades/__init__.py b/plinth/modules/upgrades/__init__.py index f573ecdcd..a412e022e 100644 --- a/plinth/modules/upgrades/__init__.py +++ b/plinth/modules/upgrades/__init__.py @@ -351,6 +351,15 @@ def is_backports_enabled(): return os.path.exists(privileged.BACKPORTS_SOURCES_LIST) +def get_current_release(): + """Return current release and codename as a tuple.""" + output = action_utils.run( + ['lsb_release', '--release', '--codename', '--short'], + check=True).stdout.decode().strip() + lines = output.split('\n') + return lines[0], lines[1] + + def is_backports_current(): """Return whether backports are enabled for the current release.""" if not is_backports_enabled(): diff --git a/plinth/modules/upgrades/privileged.py b/plinth/modules/upgrades/privileged.py index 8629f5345..8b82976ae 100644 --- a/plinth/modules/upgrades/privileged.py +++ b/plinth/modules/upgrades/privileged.py @@ -128,7 +128,8 @@ def release_held_packages(): 'holds.') return - output = subprocess.check_output(['apt-mark', 'showhold']).decode().strip() + output = action_utils.run(['apt-mark', 'showhold'], + check=True).stdout.decode().strip() holds = output.split('\n') logger.info('Releasing package holds: %s', holds) action_utils.run(['apt-mark', 'unhold', *holds], stdout=subprocess.DEVNULL, diff --git a/plinth/modules/upgrades/tests/test_distupgrade.py b/plinth/modules/upgrades/tests/test_distupgrade.py index e1cd07e39..89bc0ca1b 100644 --- a/plinth/modules/upgrades/tests/test_distupgrade.py +++ b/plinth/modules/upgrades/tests/test_distupgrade.py @@ -7,7 +7,7 @@ import re import subprocess from datetime import datetime as datetime_original from datetime import timezone -from unittest.mock import call, patch +from unittest.mock import Mock, call, patch import pytest @@ -264,28 +264,40 @@ def test_services_disable(service_is_running, service_disable, service_enable): @patch('subprocess.run') -@patch('subprocess.check_output') -def test_apt_hold_packages(check_output, run, tmp_path): +def test_apt_hold_packages(run, tmp_path): """Test that holding apt packages works.""" + + def _run(command, **kwargs): + if 'showhold' in command: + return Mock(stdout=False) + + return Mock(returncode=0) + hold_flag = tmp_path / 'flag' - run.return_value.returncode = 0 + run.side_effect = _run with patch('plinth.action_utils.apt_hold_flag', hold_flag), \ patch('plinth.modules.upgrades.distupgrade.PACKAGES_WITH_PROMPTS', ['package1', 'package2']): - check_output.return_value = False with distupgrade._apt_hold_packages(): assert hold_flag.exists() assert hold_flag.stat().st_mode & 0o117 == 0 expected_calls = [ - call(['apt-mark', 'hold', 'freedombox'], + call(['apt-mark', 'showhold', 'freedombox'], check=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE), + call(['apt-mark', 'hold', 'freedombox'], check=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE), + call(['apt-mark', 'showhold', 'package1'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True), - call(['apt-mark', 'hold', 'package1'], stdout=subprocess.PIPE, - stderr=subprocess.PIPE, check=False), - call(['apt-mark', 'hold', 'package2'], stdout=subprocess.PIPE, - stderr=subprocess.PIPE, check=False) + call(['apt-mark', 'hold', 'package1'], check=False, + stdout=subprocess.PIPE, stderr=subprocess.PIPE), + call(['apt-mark', 'showhold', 'package2'], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + check=True), + call(['apt-mark', 'hold', 'package2'], check=False, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) ] - assert run.call_args_list == expected_calls + assert run.mock_calls == expected_calls run.reset_mock() expected_call = [ diff --git a/plinth/modules/upgrades/tests/test_utils.py b/plinth/modules/upgrades/tests/test_utils.py index 8336c9731..abd98bc84 100644 --- a/plinth/modules/upgrades/tests/test_utils.py +++ b/plinth/modules/upgrades/tests/test_utils.py @@ -64,10 +64,10 @@ deb http://deb.debian.org/debian trixie main assert utils.get_sources_list_codename() == 'testing' -@patch('subprocess.check_output') -def test_get_current_release(check_output): +@patch('subprocess.run') +def test_get_current_release(run): """Test that getting current release works.""" - check_output.return_value = b'test-release\ntest-codename\n\n' + run.return_value.stdout = b'test-release\ntest-codename\n\n' assert utils.get_current_release() == ('test-release', 'test-codename') diff --git a/plinth/modules/upgrades/utils.py b/plinth/modules/upgrades/utils.py index 5904554fc..bc48cfeab 100644 --- a/plinth/modules/upgrades/utils.py +++ b/plinth/modules/upgrades/utils.py @@ -3,10 +3,10 @@ import pathlib import re -import subprocess import augeas +from plinth import action_utils from plinth.modules.apache.components import check_url RELEASE_FILE_URL = \ @@ -23,7 +23,7 @@ def check_auto() -> bool: 'apt-config', 'shell', 'UpdateInterval', 'APT::Periodic::Update-Package-Lists' ] - output = subprocess.check_output(arguments).decode() + output = action_utils.run(arguments, check=True).stdout.decode() update_interval = 0 match = re.match(r"UpdateInterval='(.*)'", output) if match: @@ -62,7 +62,7 @@ def is_release_file_available(protocol: str, dist: str, def is_sufficient_free_space() -> bool: """Return whether there is sufficient free space for dist upgrade.""" - output = subprocess.check_output(['df', '--output=avail', '/']) + output = action_utils.run(['df', '--output=avail', '/'], check=True).stdout free_space = int(output.decode().split('\n')[1]) return free_space >= DIST_UPGRADE_REQUIRED_FREE_SPACE @@ -100,9 +100,9 @@ def get_sources_list_codename() -> str | None: def get_current_release(): """Return current release and codename as a tuple.""" - output = subprocess.check_output( - ['lsb_release', '--release', '--codename', - '--short']).decode().strip() + output = action_utils.run( + ['lsb_release', '--release', '--codename', '--short'], + check=True).stdout.decode().strip() lines = output.split('\n') return lines[0], lines[1] diff --git a/plinth/modules/users/__init__.py b/plinth/modules/users/__init__.py index 71e6418cb..67543f3cd 100644 --- a/plinth/modules/users/__init__.py +++ b/plinth/modules/users/__init__.py @@ -9,6 +9,7 @@ from django.utils.text import format_lazy from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_noop +from plinth import action_utils from plinth import app as app_module from plinth import cfg, menu from plinth.config import DropinConfigs @@ -127,8 +128,9 @@ def _diagnose_ldap_entry(search_item: str) -> DiagnosticCheck: result = Result.FAILED try: - output = subprocess.check_output( - ['ldapsearch', '-LLL', '-x', '-b', 'dc=thisbox', search_item]) + output = action_utils.run( + ['ldapsearch', '-LLL', '-x', '-b', 'dc=thisbox', search_item], + check=True).stdout if search_item in output.decode(): result = Result.PASSED except subprocess.CalledProcessError: diff --git a/plinth/modules/users/privileged.py b/plinth/modules/users/privileged.py index 0ca7c291e..0d7ad0bd3 100644 --- a/plinth/modules/users/privileged.py +++ b/plinth/modules/users/privileged.py @@ -331,8 +331,8 @@ def get_nslcd_config() -> dict[str, str]: def _get_samba_users(): """Get users from the Samba user database.""" # 'pdbedit -L' is better for listing users but is installed only with samba - stdout = subprocess.check_output( - ['tdbdump', '/var/lib/samba/private/passdb.tdb']).decode() + stdout = action_utils.run(['tdbdump', '/var/lib/samba/private/passdb.tdb'], + check=True).stdout.decode() return re.findall(r'USER_(.*)\\0', stdout) @@ -354,7 +354,8 @@ def _disconnect_samba_user(username): def _get_user_home(username): """Return the user home directory.""" - output = subprocess.check_output(['getent', 'passwd', username], text=True) + output = action_utils.run(['getent', 'passwd', username], + check=True).stdout.decode() return pathlib.Path(output.split(':')[5]) @@ -491,11 +492,11 @@ def _get_admin_users(): admin_users = [] try: - output = subprocess.check_output([ + output = action_utils.run([ 'ldapsearch', '-LLL', '-Q', '-Y', 'EXTERNAL', '-H', 'ldapi:///', '-o', 'ldif-wrap=no', '-s', 'base', '-b', 'cn=admin,ou=groups,dc=thisbox', 'memberUid' - ]).decode() + ], check=True).stdout.decode() except subprocess.CalledProcessError as error: if error.returncode == 32: # no entries found diff --git a/plinth/modules/wireguard/privileged.py b/plinth/modules/wireguard/privileged.py index 50dd0cb26..fc37ca833 100644 --- a/plinth/modules/wireguard/privileged.py +++ b/plinth/modules/wireguard/privileged.py @@ -1,8 +1,7 @@ # SPDX-License-Identifier: AGPL-3.0-or-later """Configuration helper for WireGuard.""" -import subprocess - +from plinth import action_utils from plinth.actions import privileged SERVER_INTERFACE = 'wg0' @@ -11,8 +10,8 @@ SERVER_INTERFACE = 'wg0' @privileged def get_info() -> dict[str, dict]: """Return info for each configured interface.""" - output = subprocess.check_output(['wg', 'show', 'all', - 'dump']).decode().strip() + output = action_utils.run(['wg', 'show', 'all', 'dump'], + check=True).stdout.decode().strip() lines = output.split('\n') interfaces: dict[str, dict] = {} for line in lines: