mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
- Done automatically by running isort . in top level directory. Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
400 lines
14 KiB
Python
Executable File
400 lines
14 KiB
Python
Executable File
#!/usr/bin/python3
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""
|
|
Wrapper to handle package installation with apt-get.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from collections import defaultdict
|
|
from importlib import import_module
|
|
|
|
import apt.cache
|
|
import apt_inst
|
|
import apt_pkg
|
|
|
|
from plinth import cfg
|
|
from plinth.action_utils import (apt_hold_freedombox, is_package_manager_busy,
|
|
run_apt_command)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def parse_arguments():
|
|
"""Return parsed command line arguments as dictionary."""
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
|
|
|
|
subparsers.add_parser('update', help='update the package lists')
|
|
|
|
subparser = subparsers.add_parser('install', help='install packages')
|
|
subparser.add_argument(
|
|
'--skip-recommends', action='store_true',
|
|
help='whether to skip installing recommended packages')
|
|
subparser.add_argument(
|
|
'--force-configuration', choices=['new', 'old'],
|
|
help='force old/new configuration files during install')
|
|
subparser.add_argument(
|
|
'--reinstall', action='store_true',
|
|
help='force re-installation of package even if it is current')
|
|
subparser.add_argument(
|
|
'--force-missing-configuration', action='store_true',
|
|
help='force installation of missing configuration files')
|
|
subparser.add_argument(
|
|
'module', help='name of module for which package is being installed')
|
|
subparser.add_argument('packages', nargs='+',
|
|
help='list of packages to install')
|
|
subparsers.add_parser('is-package-manager-busy',
|
|
help='Return whether package manager is busy')
|
|
subparser = subparsers.add_parser(
|
|
'filter-conffile-packages',
|
|
help='Filter out packages that do not have pending conffile prompts')
|
|
subparser.add_argument('--packages', required=True,
|
|
help='List of packages to filter', nargs='+')
|
|
|
|
subparsers.required = True
|
|
return parser.parse_args()
|
|
|
|
|
|
def subcommand_update(arguments):
|
|
"""Update apt package lists."""
|
|
sys.exit(run_apt_command(['update']))
|
|
|
|
|
|
def subcommand_install(arguments):
|
|
"""Install packages using apt-get."""
|
|
try:
|
|
_assert_managed_packages(arguments.module, arguments.packages)
|
|
except Exception as exception:
|
|
print('Access check failed:', exception, file=sys.stderr)
|
|
sys.exit(99)
|
|
|
|
extra_arguments = []
|
|
if arguments.skip_recommends:
|
|
extra_arguments.append('--no-install-recommends')
|
|
|
|
if arguments.force_configuration == 'old':
|
|
extra_arguments += [
|
|
'-o', 'Dpkg::Options::=--force-confdef', '-o',
|
|
'Dpkg::Options::=--force-confold'
|
|
]
|
|
elif arguments.force_configuration == 'new':
|
|
extra_arguments += ['-o', 'Dpkg::Options::=--force-confnew']
|
|
|
|
if arguments.reinstall:
|
|
extra_arguments.append('--reinstall')
|
|
|
|
if arguments.force_missing_configuration:
|
|
extra_arguments += ['-o', 'Dpkg::Options::=--force-confmiss']
|
|
|
|
subprocess.run(['dpkg', '--configure', '-a'])
|
|
with apt_hold_freedombox():
|
|
run_apt_command(['--fix-broken', 'install'])
|
|
returncode = run_apt_command(['install'] + extra_arguments +
|
|
arguments.packages)
|
|
|
|
sys.exit(returncode)
|
|
|
|
|
|
def _assert_managed_packages(module, packages):
|
|
"""Check that list of packages are in fact managed by module."""
|
|
cfg.read()
|
|
module_file = os.path.join(cfg.config_dir, 'modules-enabled', module)
|
|
|
|
with open(module_file, 'r') as file_handle:
|
|
module_path = file_handle.read().strip()
|
|
|
|
module = import_module(module_path)
|
|
for package in packages:
|
|
assert package in module.managed_packages
|
|
|
|
|
|
def subcommand_is_package_manager_busy(_):
|
|
"""Check whether package manager is busy.
|
|
|
|
An exit code of zero indicates that package manager is busy."""
|
|
if not is_package_manager_busy():
|
|
sys.exit(-1)
|
|
|
|
|
|
def subcommand_filter_conffile_packages(arguments):
|
|
"""Return filtered list of packages which have pending conffile prompts.
|
|
|
|
When considering which file needs a configuration file prompt, mimic the
|
|
behavior of unattended-upgrades package. This is because when
|
|
unattended-upgrades gives up on the package due to conffile prompts, that
|
|
is when this logic needs to step in and perform the upgrades.
|
|
|
|
The logic is (roughly):
|
|
|
|
- Read /var/lib/dpkg/status file to read hashes as provided by currently
|
|
installed version of a package.
|
|
|
|
- Read each configuration file for the package from disk and compute
|
|
hashes.
|
|
|
|
- If the hashes match, package has no configuration file that got
|
|
modified. There will be no conffile prompt.
|
|
|
|
- If hashes don't match then check if the version of the package being
|
|
upgraded to has the same hash as the old version of the package or in
|
|
the new version or the package that configuration file has been
|
|
removed. If the conditions satisfy, then there will be no conffile
|
|
prompt. Otherwise, package will have conffile prompt.
|
|
|
|
- A further condition for showing conffile prompt is when new package
|
|
brings in additional configuration files not known before and some of
|
|
which are already present on the disk and mismatch with incoming files.
|
|
|
|
"""
|
|
apt_pkg.init() # Read configuration that will be used later.
|
|
packages = set(arguments.packages)
|
|
|
|
status_hashes, current_versions = _get_conffile_hashes_from_status_file(
|
|
packages)
|
|
|
|
mismatched_hashes = _filter_matching_package_hashes(status_hashes)
|
|
|
|
downloaded_files = _download_packages(packages)
|
|
|
|
new_package_hashes, new_versions = \
|
|
_get_conffile_hashes_from_downloaded_files(
|
|
packages, downloaded_files, status_hashes, mismatched_hashes)
|
|
|
|
packages_info = {}
|
|
for package in packages:
|
|
modified_conffiles = _get_modified_conffiles(
|
|
status_hashes[package], mismatched_hashes[package],
|
|
new_package_hashes[package])
|
|
if not modified_conffiles:
|
|
continue
|
|
|
|
package_info = {
|
|
'current_version': current_versions[package],
|
|
'new_version': new_versions[package],
|
|
'modified_conffiles': modified_conffiles
|
|
}
|
|
packages_info[package] = package_info
|
|
|
|
print(json.dumps(packages_info))
|
|
|
|
|
|
def _get_modified_conffiles(status_hashes, mismatched_hashes,
|
|
new_package_hashes):
|
|
"""Return list of conffiles that will cause prompts for a package."""
|
|
modified_conffiles = []
|
|
for conffile, hash_value in mismatched_hashes.items():
|
|
if conffile not in new_package_hashes:
|
|
# Modified configuration file not present new package
|
|
continue
|
|
|
|
if status_hashes[conffile] == new_package_hashes[conffile]:
|
|
# Configuration file is same in both versions of package. Conffile
|
|
# prompt is not triggered even if the file is modified on disk.
|
|
continue
|
|
|
|
modified_conffiles.append(conffile)
|
|
|
|
for conffile, hash_value in new_package_hashes.items():
|
|
if conffile in status_hashes:
|
|
# File is not new, it was read and matched against new/old hashes
|
|
continue
|
|
|
|
if not os.path.exists(conffile):
|
|
# New configuration file brought by new package doesn't not already
|
|
# exist on disk.
|
|
continue
|
|
|
|
if _get_conffile_hash(conffile) != hash_value:
|
|
# New configuration file brought by new package doesn't match file
|
|
# on the disk.
|
|
#
|
|
# If existing file is a directory, unattended-upgrades allows it,
|
|
# we still treat it as a conffile prompt. This should be okay.
|
|
modified_conffiles.append(conffile)
|
|
|
|
return modified_conffiles
|
|
|
|
|
|
def _get_conffile_hashes_from_status_file(packages):
|
|
"""For each of the packages, return a dict of conffile hashes.
|
|
|
|
Work on all packages at the same time to avoid parsing the status file
|
|
multiple times.
|
|
|
|
"""
|
|
package_hashes = defaultdict(dict)
|
|
package_versions = defaultdict(lambda: None)
|
|
|
|
status_file = apt_pkg.config.find('Dir::State::status')
|
|
with apt_pkg.TagFile(status_file) as tag_file:
|
|
for section in tag_file:
|
|
package = section.get('Package')
|
|
if package not in packages:
|
|
continue
|
|
|
|
hashes = _parse_conffiles_value(section.get('Conffiles', ''))
|
|
package_hashes[package] = hashes
|
|
package_versions[package] = section.get('Version')
|
|
|
|
return package_hashes, package_versions
|
|
|
|
|
|
def _parse_conffiles_value(value):
|
|
"""Parse and return the list of conffiles as found in dpkg status file."""
|
|
conffiles = {}
|
|
for line in value.splitlines():
|
|
parts = line.strip().split()
|
|
if len(parts) > 2 and parts[2] == 'obsolete':
|
|
continue
|
|
|
|
md5sum = parts[1]
|
|
if md5sum == 'newconffile': # (LP: #936870)
|
|
continue
|
|
|
|
file_path = parts[0]
|
|
conffiles[file_path] = md5sum
|
|
|
|
return conffiles
|
|
|
|
|
|
def _filter_matching_package_hashes(package_hashes):
|
|
"""Return hashes of only conffiles that don't match for each package."""
|
|
mismatched_hashes = defaultdict(dict)
|
|
for package, hashes in package_hashes.items():
|
|
system_hashes = {}
|
|
for conffile, md5sum in hashes.items():
|
|
system_md5sum = _get_conffile_hash(conffile)
|
|
if md5sum != system_md5sum:
|
|
system_hashes[conffile] = system_md5sum
|
|
|
|
if system_hashes:
|
|
mismatched_hashes[package] = system_hashes
|
|
|
|
return mismatched_hashes
|
|
|
|
|
|
def _get_conffile_hash(conffile):
|
|
"""Return hash of a conffile in the system."""
|
|
try:
|
|
with open(conffile, 'rb') as file_handle:
|
|
return apt_pkg.md5sum(file_handle)
|
|
except (FileNotFoundError, OSError):
|
|
return None
|
|
|
|
|
|
def _download_packages(packages):
|
|
"""Download the package for upgrade."""
|
|
sources_list = apt_pkg.SourceList()
|
|
sources_list.read_main_list()
|
|
|
|
apt_pkg_cache = apt_pkg.Cache(None) # None prevents progress messages
|
|
apt_cache = apt.cache.Cache()
|
|
dep_cache = apt_pkg.DepCache(apt_pkg_cache)
|
|
for package_name in packages:
|
|
package = apt_cache[package_name]
|
|
if package.is_upgradable:
|
|
dep_cache.mark_install(apt_pkg_cache[package_name], True,
|
|
not package.is_auto_installed)
|
|
|
|
package_manager = apt_pkg.PackageManager(dep_cache)
|
|
records = apt_pkg.PackageRecords(apt_pkg_cache)
|
|
fetcher = apt_pkg.Acquire()
|
|
package_manager.get_archives(fetcher, sources_list, records)
|
|
run_result = fetcher.run()
|
|
if run_result != apt_pkg.Acquire.RESULT_CONTINUE:
|
|
logger.error('Downloading packages failed.')
|
|
sys.exit(1)
|
|
|
|
downloaded_files = []
|
|
for item in fetcher.items:
|
|
if not item.complete or item.status == item.STAT_ERROR or \
|
|
item.status == item.STAT_AUTH_ERROR:
|
|
continue
|
|
|
|
if not item.is_trusted:
|
|
continue
|
|
|
|
if not os.path.exists(item.destfile):
|
|
continue
|
|
|
|
if not item.destfile.endswith('.deb'):
|
|
continue
|
|
|
|
downloaded_files.append(item.destfile)
|
|
|
|
return downloaded_files
|
|
|
|
|
|
def _get_conffile_hashes_from_downloaded_files(packages, downloaded_files,
|
|
status_hashes,
|
|
mismatched_hashes):
|
|
"""Retrieve the conffile hashes from downloaded .deb files."""
|
|
new_hashes = defaultdict(dict)
|
|
new_versions = defaultdict(lambda: None)
|
|
for downloaded_file in downloaded_files:
|
|
try:
|
|
package_name, hashes, new_version = \
|
|
_get_conffile_hashes_from_downloaded_file(
|
|
packages, downloaded_file, status_hashes,
|
|
mismatched_hashes)
|
|
except (LookupError, apt_pkg.Error, ValueError):
|
|
continue
|
|
|
|
new_hashes[package_name] = hashes
|
|
new_versions[package_name] = new_version
|
|
|
|
return new_hashes, new_versions
|
|
|
|
|
|
def _get_conffile_hashes_from_downloaded_file(packages, downloaded_file,
|
|
status_hashes,
|
|
mismatched_hashes):
|
|
"""Retrieve the conffile hashes from a single downloaded .deb file."""
|
|
deb_file = apt_inst.DebFile(downloaded_file)
|
|
|
|
control = deb_file.control.extractdata('control')
|
|
|
|
section = apt_pkg.TagSection(control)
|
|
package_name = section['Package']
|
|
if package_name not in packages:
|
|
raise ValueError
|
|
|
|
new_version = section['Version']
|
|
|
|
conffiles = deb_file.control.extractdata('conffiles')
|
|
conffiles = conffiles.decode().strip().split()
|
|
|
|
status_hashes = status_hashes.get(package_name, {})
|
|
mismatched_hashes = mismatched_hashes.get(package_name, {})
|
|
|
|
hashes = {}
|
|
for conffile in conffiles:
|
|
if conffile in status_hashes and conffile not in mismatched_hashes:
|
|
# File already in old package and there is no change on disk.
|
|
# Optimization to make sure we read as fewer files as possible.
|
|
continue
|
|
|
|
conffile_data = deb_file.data.extractdata(conffile.lstrip('/'))
|
|
md5sum = apt_pkg.md5sum(conffile_data)
|
|
hashes[conffile] = md5sum
|
|
|
|
return package_name, hashes, new_version
|
|
|
|
|
|
def main():
|
|
"""Parse arguments and perform all duties."""
|
|
arguments = parse_arguments()
|
|
|
|
subcommand = arguments.subcommand.replace('-', '_')
|
|
subcommand_method = globals()['subcommand_' + subcommand]
|
|
subcommand_method(arguments)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|