mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-03-11 09:04:54 +00:00
package: Implement identifying packages that need conffile prompts
Given a list of packages, check with among those will result in showing a configuration file prompt. Irrespective of whether apt shows a conffile prompt, this logic mimics what unattended-upgrades perceives as package needing a conffile prompt. This is because when unattended-upgrades gives up, that is when this logic need to take over. Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
This commit is contained in:
parent
f03336253e
commit
5b4aa1cda0
253
actions/packages
253
actions/packages
@ -20,15 +20,24 @@ Wrapper to handle package installation with apt-get.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from importlib import import_module
|
||||
|
||||
import apt.cache
|
||||
|
||||
import apt_inst
|
||||
import apt_pkg
|
||||
from plinth import cfg
|
||||
|
||||
LOCK_FILE = '/var/lib/dpkg/lock'
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_arguments():
|
||||
"""Return parsed command line arguments as dictionary."""
|
||||
@ -50,6 +59,11 @@ def parse_arguments():
|
||||
help='list of packages to install')
|
||||
subparsers.add_parser('is-package-manager-busy',
|
||||
help='Return whether package manager is busy')
|
||||
subparser = subparsers.add_parser(
|
||||
'filter-conffile-packages',
|
||||
help='Filter out packages that do not have pending conffile prompts')
|
||||
subparser.add_argument('--packages', required=True,
|
||||
help='List of packages to filter', nargs='+')
|
||||
|
||||
subparsers.required = True
|
||||
return parser.parse_args()
|
||||
@ -126,6 +140,245 @@ def subcommand_is_package_manager_busy(_):
|
||||
sys.exit(-1)
|
||||
|
||||
|
||||
def subcommand_filter_conffile_packages(arguments):
|
||||
"""Return filtered list of packages which have pending conffile prompts.
|
||||
|
||||
When considering which file needs a configuration file prompt, mimic the
|
||||
behavior of unattended-upgrades package. This is because when
|
||||
unattended-upgrades gives up on the package due to conffile prompts, that
|
||||
is when this logic needs to step in and perform the upgrades.
|
||||
|
||||
The logic is (roughly):
|
||||
|
||||
- Read /var/lib/dpkg/status file to read hashes as provided by currently
|
||||
installed version of a package.
|
||||
|
||||
- Read each configuration file for the package from disk and compute hashes.
|
||||
|
||||
- If the hashes match, package has no configuration file that got
|
||||
modified. There will be no conffile prompt.
|
||||
|
||||
- If hashes don't match then check if the version of the package being
|
||||
upgraded to has the same hash as the old version of the package or in
|
||||
the new version or the package that configuration file has been
|
||||
removed. If the conditions satisfy, then there will be no conffile
|
||||
prompt. Otherwise, package will have conffile prompt.
|
||||
|
||||
- A further condition for showing conffile prompt is when new package
|
||||
brings in additional configuration files not known before and some of
|
||||
which are already present on the disk and mismatch with incoming files.
|
||||
|
||||
"""
|
||||
apt_pkg.init() # Read configuration that will be used later.
|
||||
packages = set(arguments.packages)
|
||||
|
||||
status_hashes = _get_conffile_hashes_from_status_file(packages)
|
||||
|
||||
mismatched_hashes = _filter_matching_package_hashes(status_hashes)
|
||||
|
||||
downloaded_files = _download_packages(packages)
|
||||
|
||||
new_package_hashes = _get_conffile_hashes_from_downloaded_files(
|
||||
packages, downloaded_files, status_hashes, mismatched_hashes)
|
||||
|
||||
conffile_packages = [
|
||||
package for package in packages if _is_conffile_prompt_pending(
|
||||
status_hashes[package], mismatched_hashes[package],
|
||||
new_package_hashes[package])
|
||||
]
|
||||
print(json.dumps(conffile_packages))
|
||||
|
||||
|
||||
def _is_conffile_prompt_pending(status_hashes, mismatched_hashes,
|
||||
new_package_hashes):
|
||||
"""Return whether upgrading the package requires a conffile prompt."""
|
||||
for conffile, hash_value in mismatched_hashes.items():
|
||||
if conffile not in new_package_hashes:
|
||||
# Modified configuration file not present new package
|
||||
continue
|
||||
|
||||
if status_hashes[conffile] != new_package_hashes[conffile]:
|
||||
# Configuration file is same in both versions of package. Conffile
|
||||
# prompt is not triggered even if the file is modified on disk.
|
||||
return True
|
||||
|
||||
for conffile, hash_value in new_package_hashes.items():
|
||||
if conffile in status_hashes:
|
||||
# File is not new, it was read and matched against new/old hashes
|
||||
continue
|
||||
|
||||
if not os.path.exists(conffile):
|
||||
# New configuration file brought by new package doesn't not already
|
||||
# exist on disk.
|
||||
continue
|
||||
|
||||
if _get_conffile_hash(conffile) != hash_value:
|
||||
# New configuration file brought by new package doesn't match file
|
||||
# on the disk.
|
||||
#
|
||||
# If existing file is a directory, unattended-upgrades allows it,
|
||||
# we still treat it as a conffile prompt. This should be okay.
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _get_conffile_hashes_from_status_file(packages):
|
||||
"""For each of the packages, return a dict of conffile hashes.
|
||||
|
||||
Work on all packages at the same time to avoid parsing the status file
|
||||
multiple times.
|
||||
|
||||
"""
|
||||
package_hashes = defaultdict(dict)
|
||||
|
||||
status_file = apt_pkg.config.find('Dir::State::status')
|
||||
with apt_pkg.TagFile(status_file) as tag_file:
|
||||
for section in tag_file:
|
||||
package = section.get('Package')
|
||||
if package not in packages:
|
||||
continue
|
||||
|
||||
hashes = _parse_conffiles_value(section.get('Conffiles', ''))
|
||||
package_hashes[package] = hashes
|
||||
|
||||
return package_hashes
|
||||
|
||||
|
||||
def _parse_conffiles_value(value):
|
||||
"""Parse and return the list of conffiles as found in dpkg status file."""
|
||||
conffiles = {}
|
||||
for line in value.splitlines():
|
||||
parts = line.strip().split()
|
||||
if len(parts) > 2 and parts[2] == 'obsolete':
|
||||
continue
|
||||
|
||||
md5sum = parts[1]
|
||||
if md5sum == 'newconffile': # (LP: #936870)
|
||||
continue
|
||||
|
||||
file_path = parts[0]
|
||||
conffiles[file_path] = md5sum
|
||||
|
||||
return conffiles
|
||||
|
||||
|
||||
def _filter_matching_package_hashes(package_hashes):
|
||||
"""Return hashes of only conffiles that don't match for each package."""
|
||||
mismatched_hashes = defaultdict(dict)
|
||||
for package, hashes in package_hashes.items():
|
||||
system_hashes = {}
|
||||
for conffile, md5sum in hashes.items():
|
||||
system_md5sum = _get_conffile_hash(conffile)
|
||||
if md5sum != system_md5sum:
|
||||
system_hashes[conffile] = system_md5sum
|
||||
|
||||
if system_hashes:
|
||||
mismatched_hashes[package] = system_hashes
|
||||
|
||||
return mismatched_hashes
|
||||
|
||||
|
||||
def _get_conffile_hash(conffile):
|
||||
"""Return hash of a conffile in the system."""
|
||||
try:
|
||||
with open(conffile, 'rb') as file_handle:
|
||||
return apt_pkg.md5sum(file_handle)
|
||||
except (FileNotFoundError, OSError):
|
||||
return None
|
||||
|
||||
|
||||
def _download_packages(packages):
|
||||
"""Download the package for upgrade."""
|
||||
sources_list = apt_pkg.SourceList()
|
||||
sources_list.read_main_list()
|
||||
|
||||
apt_pkg_cache = apt_pkg.Cache(None) # None prevents progress messages
|
||||
apt_cache = apt.cache.Cache()
|
||||
dep_cache = apt_pkg.DepCache(apt_pkg_cache)
|
||||
for package_name in packages:
|
||||
package = apt_cache[package_name]
|
||||
if package.is_upgradable:
|
||||
dep_cache.mark_install(apt_pkg_cache[package_name], True,
|
||||
not package.is_auto_installed)
|
||||
|
||||
package_manager = apt_pkg.PackageManager(dep_cache)
|
||||
records = apt_pkg.PackageRecords(apt_pkg_cache)
|
||||
fetcher = apt_pkg.Acquire()
|
||||
package_manager.get_archives(fetcher, sources_list, records)
|
||||
run_result = fetcher.run()
|
||||
if run_result != apt_pkg.Acquire.RESULT_CONTINUE:
|
||||
logger.error('Downloading packages failed.')
|
||||
sys.exit(1)
|
||||
|
||||
downloaded_files = []
|
||||
for item in fetcher.items:
|
||||
if not item.complete or item.status == item.STAT_ERROR or \
|
||||
item.status == item.STAT_AUTH_ERROR:
|
||||
continue
|
||||
|
||||
if not item.is_trusted:
|
||||
continue
|
||||
|
||||
if not os.path.exists(item.destfile):
|
||||
continue
|
||||
|
||||
if not item.destfile.endswith('.deb'):
|
||||
continue
|
||||
|
||||
downloaded_files.append(item.destfile)
|
||||
|
||||
return downloaded_files
|
||||
|
||||
|
||||
def _get_conffile_hashes_from_downloaded_files(
|
||||
packages, downloaded_files, status_hashes, mismatched_hashes):
|
||||
"""Retrieve the conffile hashes from downloaded .deb files."""
|
||||
new_hashes = defaultdict(dict)
|
||||
for downloaded_file in downloaded_files:
|
||||
try:
|
||||
package_name, hashes = _get_conffile_hashes_from_downloaded_file(
|
||||
packages, downloaded_file, status_hashes, mismatched_hashes)
|
||||
except (LookupError, apt_pkg.Error, ValueError):
|
||||
continue
|
||||
|
||||
new_hashes[package_name] = hashes
|
||||
|
||||
return new_hashes
|
||||
|
||||
|
||||
def _get_conffile_hashes_from_downloaded_file(
|
||||
packages, downloaded_file, status_hashes, mismatched_hashes):
|
||||
"""Retrieve the conffile hashes from a single downloaded .deb file."""
|
||||
deb_file = apt_inst.DebFile(downloaded_file)
|
||||
|
||||
control = deb_file.control.extractdata('control')
|
||||
|
||||
section = apt_pkg.TagSection(control)
|
||||
package_name = section['Package']
|
||||
if package_name not in packages:
|
||||
raise ValueError
|
||||
|
||||
conffiles = deb_file.control.extractdata('conffiles')
|
||||
conffiles = conffiles.decode().strip().split()
|
||||
|
||||
status_hashes = status_hashes.get(package_name, {})
|
||||
mismatched_hashes = mismatched_hashes.get(package_name, {})
|
||||
|
||||
hashes = {}
|
||||
for conffile in conffiles:
|
||||
if conffile in status_hashes and conffile not in mismatched_hashes:
|
||||
# File already in old package and there is no change on disk.
|
||||
# Optimization to make sure we read as fewer files as possible.
|
||||
continue
|
||||
|
||||
conffile_data = deb_file.data.extractdata(conffile.lstrip('/'))
|
||||
md5sum = apt_pkg.md5sum(conffile_data)
|
||||
hashes[conffile] = md5sum
|
||||
|
||||
return package_name, hashes
|
||||
|
||||
|
||||
def main():
|
||||
"""Parse arguments and perform all duties."""
|
||||
arguments = parse_arguments()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user