diff --git a/plinth/modules/upgrades/distupgrade.py b/plinth/modules/upgrades/distupgrade.py new file mode 100644 index 000000000..4e47829da --- /dev/null +++ b/plinth/modules/upgrades/distupgrade.py @@ -0,0 +1,291 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Perform distribution upgrade.""" + +import logging +import pathlib +import subprocess +import time + +from plinth.action_utils import (apt_hold, apt_hold_freedombox, + debconf_set_selections, run_apt_command, + service_daemon_reload, service_is_running, + service_restart, service_start, service_stop) +from plinth.modules import snapshot as snapshot_module + +from . import utils + +SOURCES_LIST = '/etc/apt/sources.list' + +DIST_UPGRADE_OBSOLETE_PACKAGES: list[str] = [] + +DIST_UPGRADE_PACKAGES_WITH_PROMPTS = [ + 'bind9', 'firewalld', 'janus', 'minetest-server', 'minidlna', + 'mumble-server', 'radicale', 'roundcube-core', 'tt-rss' +] + +DIST_UPGRADE_PRE_DEBCONF_SELECTIONS: list[str] = [ + # Tell grub-pc to continue without installing grub again. + 'grub-pc grub-pc/install_devices_empty boolean true' +] + +DIST_UPGRADE_SERVICE = ''' +[Unit] +Description=Upgrade to new stable Debian release + +[Service] +Type=oneshot +ExecStart=systemd-inhibit /usr/share/plinth/actions/actions \ + upgrades dist_upgrade --no-args +KillMode=process +TimeoutSec=12hr +''' + +DIST_UPGRADE_SERVICE_PATH = \ + '/run/systemd/system/freedombox-dist-upgrade.service' + +dist_upgrade_flag = pathlib.Path( + '/var/lib/freedombox/dist-upgrade-in-progress') + + +def check(test_upgrade=False) -> tuple[bool, str]: + """Check if a distribution upgrade be performed. + + Check for new stable release, if updates are enabled, and if there is + enough free space for the dist upgrade. + + If test_upgrade is True, also check for upgrade to testing. + + Return (boolean, string) indicating if the upgrade is ready, and a reason + if not. + """ + if dist_upgrade_flag.exists(): + return (True, 'found-previous') + + from plinth.modules.upgrades import get_current_release + release, dist = get_current_release() + if release in ['unstable', 'testing', 'n/a']: + return (False, f'already-{release}') + + check_dists = ['stable'] + if test_upgrade: + check_dists.append('testing') + + codename = None + for check_dist in check_dists: + url = utils.RELEASE_FILE_URL.format(check_dist) + command = ['curl', '--silent', '--location', '--fail', url] + protocol = utils.get_http_protocol() + if protocol == 'tor+http': + command.insert(0, 'torsocks') + logging.info('Package download over Tor is enabled.') + + try: + output = subprocess.check_output(command).decode() + except (subprocess.CalledProcessError, FileNotFoundError): + logging.warning('Error while checking for new %s release', + check_dist) + else: + for line in output.split('\n'): + if line.startswith('Codename:'): + codename = line.split()[1] + + if not codename: + return (False, 'codename-not-found') + + if codename == dist: + return (False, f'already-{dist}') + + if not utils.check_auto(): + return (False, 'upgrades-not-enabled') + + if check_dist == 'testing' and not test_upgrade: + return (False, 'test-not-set') + + if not utils.is_sufficient_free_space(): + return (False, 'not-enough-free-space') + + logging.info('Upgrading from %s to %s...', dist, codename) + with open(SOURCES_LIST, 'r', encoding='utf-8') as sources_list: + lines = sources_list.readlines() + + with open(SOURCES_LIST, 'w', encoding='utf-8') as sources_list: + for line in lines: + # E.g. replace 'bullseye' with 'bookworm'. + new_line = line.replace(dist, codename) + if check_dist == 'testing': + # E.g. replace 'stable' with 'bookworm'. + new_line = new_line.replace('stable', codename) + + sources_list.write(new_line) + + logging.info('Dist upgrade in progress. Setting flag.') + dist_upgrade_flag.touch(mode=0o660) + return (True, 'started-dist-upgrade') + + +def _take_snapshot_and_disable() -> bool: + """Take a snapshot if supported and enabled, then disable snapshots. + + Return whether snapshots shall be re-enabled at the end. + """ + if snapshot_module.is_supported(): + print('Taking a snapshot before dist upgrade...', flush=True) + subprocess.run([ + '/usr/share/plinth/actions/actions', 'snapshot', 'create', + '--no-args' + ], check=True) + aug = snapshot_module.load_augeas() + if snapshot_module.is_apt_snapshots_enabled(aug): + print('Disable apt snapshots during dist upgrade...', flush=True) + subprocess.run([ + '/usr/share/plinth/actions/actions', + 'snapshot', + 'disable_apt_snapshot', + ], input='{"args": ["yes"], "kwargs": {}}'.encode(), check=True) + return True + else: + print('Apt snapshots already disabled.', flush=True) + else: + print('Snapshots are not supported, skip taking a snapshot.', + flush=True) + + return False + + +def _restore_snapshots_config(reenable=False): + """Restore original snapshots configuration.""" + if reenable: + print('Re-enable apt snapshots...', flush=True) + subprocess.run([ + '/usr/share/plinth/actions/actions', 'snapshot', + 'disable_apt_snapshot' + ], input='{"args": ["no"], "kwargs": {}}'.encode(), check=True) + + +def _disable_searx() -> bool: + """If searx is enabled, disable it until we can upgrade it properly. + + Return whether searx was originally enabled. + """ + searx_is_enabled = pathlib.Path( + '/etc/uwsgi/apps-enabled/searx.ini').exists() + if searx_is_enabled: + print('Disabling searx...', flush=True) + subprocess.run( + ['/usr/share/plinth/actions/actions', 'apache', 'uwsgi_disable'], + input='{"args": ["searx"], "kwargs": {}}'.encode(), check=True) + + return searx_is_enabled + + +def _update_searx(reenable=False): + """If searx is installed, update search engines list. + + Re-enable if previously enabled. + """ + if pathlib.Path('/etc/searx/settings.yml').exists(): + print('Updating searx search engines list...', flush=True) + subprocess.run([ + '/usr/share/plinth/actions/actions', 'searx', 'setup', '--no-args' + ], check=True) + if reenable: + print('Re-enabling searx after upgrade...', flush=True) + subprocess.run([ + '/usr/share/plinth/actions/actions', 'apache', 'uwsgi_enable' + ], input='{"args": ["searx"], "kwargs": {}}'.encode(), check=True) + + +def perform(): + """Perform upgrade to next release of Debian.""" + reenable_snapshots = _take_snapshot_and_disable() + reenable_searx = _disable_searx() + + # If quassel is running during dist upgrade, it may be restarted + # several times. This causes IRC users to rapidly leave/join + # channels. Stop quassel for the duration of the dist upgrade. + quassel_service = 'quasselcore' + quassel_was_running = service_is_running(quassel_service) + if quassel_was_running: + print('Stopping quassel service before dist upgrade...', flush=True) + service_stop(quassel_service) + + # Hold freedombox package during entire dist upgrade. + print('Holding freedombox package...', flush=True) + with apt_hold_freedombox(): + print('Updating Apt cache...', flush=True) + run_apt_command(['update']) + + # Pre-set debconf selections if they are required during the + # dist upgrade. + if DIST_UPGRADE_PRE_DEBCONF_SELECTIONS: + print( + f'Setting debconf selections: ' + f'{DIST_UPGRADE_PRE_DEBCONF_SELECTIONS}', flush=True) + debconf_set_selections(DIST_UPGRADE_PRE_DEBCONF_SELECTIONS) + + # Remove obsolete packages that may prevent other packages from + # upgrading. + if DIST_UPGRADE_OBSOLETE_PACKAGES: + print(f'Removing packages: {DIST_UPGRADE_OBSOLETE_PACKAGES}...', + flush=True) + run_apt_command(['remove'] + DIST_UPGRADE_OBSOLETE_PACKAGES) + + # Hold packages known to have conffile prompts. FreedomBox service + # will handle their upgrade later. + print( + 'Holding packages with conffile prompts: ' + + ', '.join(DIST_UPGRADE_PACKAGES_WITH_PROMPTS) + '...', flush=True) + with apt_hold(DIST_UPGRADE_PACKAGES_WITH_PROMPTS): + print('Running apt full-upgrade...', flush=True) + returncode = run_apt_command(['full-upgrade']) + + # Check if apt upgrade was successful. + if returncode: + raise RuntimeError( + 'Apt full-upgrade was not successful. Distribution upgrade ' + 'will be retried at a later time.') + + _update_searx(reenable_searx) + + if quassel_was_running: + print('Re-starting quassel service after dist upgrade...', + flush=True) + service_start(quassel_service) + + print('Running apt autoremove...', flush=True) + run_apt_command(['autoremove']) + + # Run unattended-upgrade once more to handle upgrading the + # freedombox package. + print('Running unattended-upgrade...', flush=True) + subprocess.run(['unattended-upgrade', '--verbose'], check=False) + + _restore_snapshots_config(reenable_snapshots) + + # Restart FreedomBox service to ensure it is using the latest + # dependencies. + print('Restarting FreedomBox service...', flush=True) + service_restart('plinth') + + # After 10 minutes, update apt cache again to trigger force_upgrades. + print('Waiting for 10 minutes...', flush=True) + time.sleep(10 * 60) + print('Updating Apt cache...', flush=True) + run_apt_command(['update']) + + print('Dist upgrade complete. Removing flag.', flush=True) + if dist_upgrade_flag.exists(): + dist_upgrade_flag.unlink() + + +def start_service(): + """Create dist upgrade service and start it.""" + with open(DIST_UPGRADE_SERVICE_PATH, 'w', + encoding='utf-8') as service_file: + service_file.write(DIST_UPGRADE_SERVICE) + + service_daemon_reload() + subprocess.Popen(['systemctl', 'start', 'freedombox-dist-upgrade'], + stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, close_fds=True, + start_new_session=True) diff --git a/plinth/modules/upgrades/privileged.py b/plinth/modules/upgrades/privileged.py index 46bb37c2f..18afe6509 100644 --- a/plinth/modules/upgrades/privileged.py +++ b/plinth/modules/upgrades/privileged.py @@ -6,29 +6,21 @@ import os import pathlib import re import subprocess -import time -from plinth.action_utils import (apt_hold, apt_hold_flag, apt_hold_freedombox, - apt_unhold_freedombox, debconf_set_selections, +from plinth.action_utils import (apt_hold_flag, apt_unhold_freedombox, is_package_manager_busy, run_apt_command, - service_daemon_reload, service_is_running, - service_restart, service_start, service_stop) + service_is_running) from plinth.actions import privileged -from plinth.modules.apache.components import check_url -from plinth.modules.snapshot import is_apt_snapshots_enabled -from plinth.modules.snapshot import is_supported as snapshot_is_supported -from plinth.modules.snapshot import load_augeas as snapshot_load_augeas + +from . import distupgrade, utils logger = logging.getLogger(__name__) -SOURCES_LIST = '/etc/apt/sources.list' BACKPORTS_SOURCES_LIST = '/etc/apt/sources.list.d/freedombox2.list' AUTO_CONF_FILE = '/etc/apt/apt.conf.d/20auto-upgrades' LOG_FILE = '/var/log/unattended-upgrades/unattended-upgrades.log' DPKG_LOG_FILE = '/var/log/unattended-upgrades/unattended-upgrades-dpkg.log' -RELEASE_FILE_URL = \ - 'https://deb.debian.org/debian/dists/{}/Release' APT_PREFERENCES_FREEDOMBOX = \ '''Explanation: This file is managed by FreedomBox, do not edit. @@ -53,38 +45,6 @@ Pin: release n=bookworm-backports Pin-Priority: 500 ''' -DIST_UPGRADE_OBSOLETE_PACKAGES: list[str] = [] - -DIST_UPGRADE_PACKAGES_WITH_PROMPTS = [ - 'bind9', 'firewalld', 'janus', 'minetest-server', 'minidlna', - 'mumble-server', 'radicale', 'roundcube-core', 'tt-rss' -] - -DIST_UPGRADE_PRE_DEBCONF_SELECTIONS: list[str] = [ - # Tell grub-pc to continue without installing grub again. - 'grub-pc grub-pc/install_devices_empty boolean true' -] - -DIST_UPGRADE_REQUIRED_FREE_SPACE = 5000000 - -DIST_UPGRADE_SERVICE = ''' -[Unit] -Description=Upgrade to new stable Debian release - -[Service] -Type=oneshot -ExecStart=systemd-inhibit /usr/share/plinth/actions/actions \ - upgrades dist_upgrade --no-args -KillMode=process -TimeoutSec=12hr -''' - -DIST_UPGRADE_SERVICE_PATH = \ - '/run/systemd/system/freedombox-dist-upgrade.service' - -dist_upgrade_flag = pathlib.Path( - '/var/lib/freedombox/dist-upgrade-in-progress') - def _release_held_freedombox(): """If freedombox package was left in held state, release it. @@ -127,25 +87,10 @@ def run(): start_new_session=True) -def _check_auto() -> bool: - """Check if automatic upgrades are enabled.""" - arguments = [ - 'apt-config', 'shell', 'UpdateInterval', - 'APT::Periodic::Update-Package-Lists' - ] - output = subprocess.check_output(arguments).decode() - update_interval = 0 - match = re.match(r"UpdateInterval='(.*)'", output) - if match: - update_interval = int(match.group(1)) - - return bool(update_interval) - - @privileged def check_auto() -> bool: """Check if automatic upgrades are enabled.""" - return _check_auto() + return utils.check_auto() @privileged @@ -185,34 +130,6 @@ def get_log() -> str: return '\n'.join(log_lines) -def _get_protocol() -> str: - """Return the protocol to use for newly added repository sources.""" - try: - from plinth.modules.torproxy import utils - if utils.is_apt_transport_tor_enabled(): - return 'tor+http' - except Exception: - pass - - return 'http' - - -def _is_release_file_available(protocol: str, dist: str, - backports=False) -> bool: - """Return whether the release for dist[-backports] is available.""" - wrapper = None - if protocol == 'tor+http': - wrapper = 'torsocks' - - if backports: - dist += '-backports' - - try: - return check_url(RELEASE_FILE_URL.format(dist), wrapper=wrapper) - except FileNotFoundError: - return False - - def _add_backports_sources(sources_list: str, protocol: str, dist: str): """Add backports sources to freedombox repositories list.""" sources = '''# This file is managed by FreedomBox, do not edit. @@ -260,11 +177,11 @@ def _check_and_backports_sources(develop=False): logging.info(f'System release is {release}. Skip enabling backports.') return - protocol = _get_protocol() + protocol = utils.get_http_protocol() if protocol == 'tor+http': logging.info('Package download over Tor is enabled.') - if not _is_release_file_available(protocol, dist, backports=True): + if not utils.is_release_file_available(protocol, dist, backports=True): logging.info( f'Release file for {dist}-backports is not available yet.') return @@ -302,244 +219,6 @@ def _add_apt_preferences(): file_handle.write(APT_PREFERENCES_APPS) -def _is_sufficient_free_space() -> bool: - """Return whether there is sufficient free space for dist upgrade.""" - output = subprocess.check_output(['df', '--output=avail', '/']) - free_space = int(output.decode().split('\n')[1]) - return free_space >= DIST_UPGRADE_REQUIRED_FREE_SPACE - - -def _check_dist_upgrade(test_upgrade=False) -> tuple[bool, str]: - """Check if a distribution upgrade be performed. - - Check for new stable release, if updates are enabled, and if there is - enough free space for the dist upgrade. - - If test_upgrade is True, also check for upgrade to testing. - - Return (boolean, string) indicating if the upgrade is ready, and a reason - if not. - """ - if dist_upgrade_flag.exists(): - return (True, 'found-previous') - - from plinth.modules.upgrades import get_current_release - release, dist = get_current_release() - if release in ['unstable', 'testing', 'n/a']: - return (False, f'already-{release}') - - check_dists = ['stable'] - if test_upgrade: - check_dists.append('testing') - - codename = None - for check_dist in check_dists: - url = RELEASE_FILE_URL.format(check_dist) - command = ['curl', '--silent', '--location', '--fail', url] - protocol = _get_protocol() - if protocol == 'tor+http': - command.insert(0, 'torsocks') - logging.info('Package download over Tor is enabled.') - - try: - output = subprocess.check_output(command).decode() - except (subprocess.CalledProcessError, FileNotFoundError): - logging.warning('Error while checking for new %s release', - check_dist) - else: - for line in output.split('\n'): - if line.startswith('Codename:'): - codename = line.split()[1] - - if not codename: - return (False, 'codename-not-found') - - if codename == dist: - return (False, f'already-{dist}') - - if not _check_auto(): - return (False, 'upgrades-not-enabled') - - if check_dist == 'testing' and not test_upgrade: - return (False, 'test-not-set') - - if not _is_sufficient_free_space(): - return (False, 'not-enough-free-space') - - logging.info('Upgrading from %s to %s...', dist, codename) - with open(SOURCES_LIST, 'r', encoding='utf-8') as sources_list: - lines = sources_list.readlines() - - with open(SOURCES_LIST, 'w', encoding='utf-8') as sources_list: - for line in lines: - # E.g. replace 'bullseye' with 'bookworm'. - new_line = line.replace(dist, codename) - if check_dist == 'testing': - # E.g. replace 'stable' with 'bookworm'. - new_line = new_line.replace('stable', codename) - - sources_list.write(new_line) - - logging.info('Dist upgrade in progress. Setting flag.') - dist_upgrade_flag.touch(mode=0o660) - return (True, 'started-dist-upgrade') - - -def _take_snapshot_and_disable() -> bool: - """Take a snapshot if supported and enabled, then disable snapshots. - - Return whether snapshots shall be re-enabled at the end. - """ - if snapshot_is_supported(): - print('Taking a snapshot before dist upgrade...', flush=True) - subprocess.run([ - '/usr/share/plinth/actions/actions', 'snapshot', 'create', - '--no-args' - ], check=True) - aug = snapshot_load_augeas() - if is_apt_snapshots_enabled(aug): - print('Disable apt snapshots during dist upgrade...', flush=True) - subprocess.run([ - '/usr/share/plinth/actions/actions', - 'snapshot', - 'disable_apt_snapshot', - ], input='{"args": ["yes"], "kwargs": {}}'.encode(), check=True) - return True - else: - print('Apt snapshots already disabled.', flush=True) - else: - print('Snapshots are not supported, skip taking a snapshot.', - flush=True) - - return False - - -def _restore_snapshots_config(reenable=False): - """Restore original snapshots configuration.""" - if reenable: - print('Re-enable apt snapshots...', flush=True) - subprocess.run([ - '/usr/share/plinth/actions/actions', 'snapshot', - 'disable_apt_snapshot' - ], input='{"args": ["no"], "kwargs": {}}'.encode(), check=True) - - -def _disable_searx() -> bool: - """If searx is enabled, disable it until we can upgrade it properly. - - Return whether searx was originally enabled. - """ - searx_is_enabled = pathlib.Path( - '/etc/uwsgi/apps-enabled/searx.ini').exists() - if searx_is_enabled: - print('Disabling searx...', flush=True) - subprocess.run( - ['/usr/share/plinth/actions/actions', 'apache', 'uwsgi_disable'], - input='{"args": ["searx"], "kwargs": {}}'.encode(), check=True) - - return searx_is_enabled - - -def _update_searx(reenable=False): - """If searx is installed, update search engines list. - - Re-enable if previously enabled. - """ - if pathlib.Path('/etc/searx/settings.yml').exists(): - print('Updating searx search engines list...', flush=True) - subprocess.run([ - '/usr/share/plinth/actions/actions', 'searx', 'setup', '--no-args' - ], check=True) - if reenable: - print('Re-enabling searx after upgrade...', flush=True) - subprocess.run([ - '/usr/share/plinth/actions/actions', 'apache', 'uwsgi_enable' - ], input='{"args": ["searx"], "kwargs": {}}'.encode(), check=True) - - -def _perform_dist_upgrade(): - """Perform upgrade to next release of Debian.""" - reenable_snapshots = _take_snapshot_and_disable() - reenable_searx = _disable_searx() - - # If quassel is running during dist upgrade, it may be restarted - # several times. This causes IRC users to rapidly leave/join - # channels. Stop quassel for the duration of the dist upgrade. - quassel_service = 'quasselcore' - quassel_was_running = service_is_running(quassel_service) - if quassel_was_running: - print('Stopping quassel service before dist upgrade...', flush=True) - service_stop(quassel_service) - - # Hold freedombox package during entire dist upgrade. - print('Holding freedombox package...', flush=True) - with apt_hold_freedombox(): - print('Updating Apt cache...', flush=True) - run_apt_command(['update']) - - # Pre-set debconf selections if they are required during the - # dist upgrade. - if DIST_UPGRADE_PRE_DEBCONF_SELECTIONS: - print( - f'Setting debconf selections: ' - f'{DIST_UPGRADE_PRE_DEBCONF_SELECTIONS}', flush=True) - debconf_set_selections(DIST_UPGRADE_PRE_DEBCONF_SELECTIONS) - - # Remove obsolete packages that may prevent other packages from - # upgrading. - if DIST_UPGRADE_OBSOLETE_PACKAGES: - print(f'Removing packages: {DIST_UPGRADE_OBSOLETE_PACKAGES}...', - flush=True) - run_apt_command(['remove'] + DIST_UPGRADE_OBSOLETE_PACKAGES) - - # Hold packages known to have conffile prompts. FreedomBox service - # will handle their upgrade later. - print( - 'Holding packages with conffile prompts: ' + - ', '.join(DIST_UPGRADE_PACKAGES_WITH_PROMPTS) + '...', flush=True) - with apt_hold(DIST_UPGRADE_PACKAGES_WITH_PROMPTS): - print('Running apt full-upgrade...', flush=True) - returncode = run_apt_command(['full-upgrade']) - - # Check if apt upgrade was successful. - if returncode: - raise RuntimeError( - 'Apt full-upgrade was not successful. Distribution upgrade ' - 'will be retried at a later time.') - - _update_searx(reenable_searx) - - if quassel_was_running: - print('Re-starting quassel service after dist upgrade...', - flush=True) - service_start(quassel_service) - - print('Running apt autoremove...', flush=True) - run_apt_command(['autoremove']) - - # Run unattended-upgrade once more to handle upgrading the - # freedombox package. - print('Running unattended-upgrade...', flush=True) - subprocess.run(['unattended-upgrade', '--verbose'], check=False) - - _restore_snapshots_config(reenable_snapshots) - - # Restart FreedomBox service to ensure it is using the latest - # dependencies. - print('Restarting FreedomBox service...', flush=True) - service_restart('plinth') - - # After 10 minutes, update apt cache again to trigger force_upgrades. - print('Waiting for 10 minutes...', flush=True) - time.sleep(10 * 60) - print('Updating Apt cache...', flush=True) - run_apt_command(['update']) - - print('Dist upgrade complete. Removing flag.', flush=True) - if dist_upgrade_flag.exists(): - dist_upgrade_flag.unlink() - - @privileged def setup(): """Setup apt preferences.""" @@ -556,19 +235,6 @@ def activate_backports(develop: bool = False): _check_and_backports_sources(develop) -def _start_dist_upgrade_service(): - """Create dist upgrade service and start it.""" - with open(DIST_UPGRADE_SERVICE_PATH, 'w', - encoding='utf-8') as service_file: - service_file.write(DIST_UPGRADE_SERVICE) - - service_daemon_reload() - subprocess.Popen(['systemctl', 'start', 'freedombox-dist-upgrade'], - stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, close_fds=True, - start_new_session=True) - - @privileged def start_dist_upgrade(test: bool = False) -> dict[str, str | bool]: """Start dist upgrade process. @@ -578,9 +244,9 @@ def start_dist_upgrade(test: bool = False) -> dict[str, str | bool]: """ _release_held_freedombox() - upgrade_ready, reason = _check_dist_upgrade(test) + upgrade_ready, reason = distupgrade.check(test) if upgrade_ready: - _start_dist_upgrade_service() + distupgrade.start_service() return {'dist_upgrade_started': upgrade_ready, 'reason': reason} @@ -588,4 +254,4 @@ def start_dist_upgrade(test: bool = False) -> dict[str, str | bool]: @privileged def dist_upgrade(): """Perform major distribution upgrade.""" - _perform_dist_upgrade() + distupgrade.perform() diff --git a/plinth/modules/upgrades/utils.py b/plinth/modules/upgrades/utils.py new file mode 100644 index 000000000..0c1780749 --- /dev/null +++ b/plinth/modules/upgrades/utils.py @@ -0,0 +1,62 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Utilities for regular updates and dist-upgrades.""" + +import re +import subprocess + +from plinth.modules.apache.components import check_url + +RELEASE_FILE_URL = \ + 'https://deb.debian.org/debian/dists/{}/Release' + +DIST_UPGRADE_REQUIRED_FREE_SPACE = 5000000 + + +def check_auto() -> bool: + """Return whether automatic updates are enabled.""" + arguments = [ + 'apt-config', 'shell', 'UpdateInterval', + 'APT::Periodic::Update-Package-Lists' + ] + output = subprocess.check_output(arguments).decode() + update_interval = 0 + match = re.match(r"UpdateInterval='(.*)'", output) + if match: + update_interval = int(match.group(1)) + + return bool(update_interval) + + +def get_http_protocol() -> str: + """Return the protocol to use for newly added repository sources.""" + try: + from plinth.modules.torproxy import utils + if utils.is_apt_transport_tor_enabled(): + return 'tor+http' + except Exception: + pass + + return 'http' + + +def is_release_file_available(protocol: str, dist: str, + backports=False) -> bool: + """Return whether the release for dist[-backports] is available.""" + wrapper = None + if protocol == 'tor+http': + wrapper = 'torsocks' + + if backports: + dist += '-backports' + + try: + return check_url(RELEASE_FILE_URL.format(dist), wrapper=wrapper) + except FileNotFoundError: + return False + + +def is_sufficient_free_space() -> bool: + """Return whether there is sufficient free space for dist upgrade.""" + output = subprocess.check_output(['df', '--output=avail', '/']) + free_space = int(output.decode().split('\n')[1]) + return free_space >= DIST_UPGRADE_REQUIRED_FREE_SPACE