FreedomBox/actions/packages
Alice Kile eb83e00011
fix formatting issues
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
2019-12-07 13:08:35 -05:00

419 lines
14 KiB
Python
Executable File

#!/usr/bin/python3
#
# This file is part of FreedomBox.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Wrapper to handle package installation with apt-get.
"""
import argparse
import json
import logging
import os
import subprocess
import sys
from collections import defaultdict
from importlib import import_module
import apt.cache
import apt_inst
import apt_pkg
from plinth import cfg
LOCK_FILE = '/var/lib/dpkg/lock'
logger = logging.getLogger(__name__)
def parse_arguments():
"""Return parsed command line arguments as dictionary."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
subparsers.add_parser('update', help='update the package lists')
subparser = subparsers.add_parser('install', help='install packages')
subparser.add_argument(
'--skip-recommends', action='store_true',
help='whether to skip installing recommended packages')
subparser.add_argument(
'--force-configuration', choices=['new', 'old'],
help='force old/new configuration files during install')
subparser.add_argument(
'module', help='name of module for which package is being installed')
subparser.add_argument('packages', nargs='+',
help='list of packages to install')
subparsers.add_parser('is-package-manager-busy',
help='Return whether package manager is busy')
subparser = subparsers.add_parser(
'filter-conffile-packages',
help='Filter out packages that do not have pending conffile prompts')
subparser.add_argument('--packages', required=True,
help='List of packages to filter', nargs='+')
subparsers.required = True
return parser.parse_args()
def _run_apt_command(arguments):
"""Run apt-get with provided arguments."""
# Ask apt-get to output its progress to file descriptor 3.
command = [
'apt-get', '--assume-yes', '--quiet=2', '--option', 'APT::Status-Fd=3'
] + arguments
# Duplicate stdout to file descriptor 3 for this process.
os.dup2(1, 3)
# Pass on file descriptor 3 instead of closing it. Close stdout
# so that regular output is ignored.
env = os.environ.copy()
env['DEBIAN_FRONTEND'] = 'noninteractive'
process = subprocess.run(command, stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL, close_fds=False,
env=env)
sys.exit(process.returncode)
def subcommand_update(arguments):
"""Update apt package lists."""
_run_apt_command(['update'])
def subcommand_install(arguments):
"""Install packages using apt-get."""
try:
_assert_managed_packages(arguments.module, arguments.packages)
except Exception as exception:
print('Access check failed:', exception, file=sys.stderr)
sys.exit(99)
extra_arguments = []
if arguments.skip_recommends:
extra_arguments.append('--no-install-recommends')
if arguments.force_configuration == 'old':
extra_arguments += [
'-o', 'Dpkg::Options::=--force-confdef', '-o',
'Dpkg::Options::=--force-confold'
]
elif arguments.force_configuration == 'new':
extra_arguments += ['-o', 'Dpkg::Options::=--force-confnew']
_run_apt_command(['install'] + extra_arguments + arguments.packages)
def _assert_managed_packages(module, packages):
"""Check that list of packages are in fact managed by module."""
cfg.read()
module_file = os.path.join(cfg.config_dir, 'modules-enabled', module)
with open(module_file, 'r') as file_handle:
module_path = file_handle.read().strip()
module = import_module(module_path)
for package in packages:
assert package in module.managed_packages
def subcommand_is_package_manager_busy(_):
"""Return whether package manager is busy.
This command uses the `lsof` command to check whether the dpkg lock file
is open which indicates that the package manager is busy"""
try:
subprocess.check_output(['lsof', LOCK_FILE])
except subprocess.CalledProcessError:
sys.exit(-1)
def subcommand_filter_conffile_packages(arguments):
"""Return filtered list of packages which have pending conffile prompts.
When considering which file needs a configuration file prompt, mimic the
behavior of unattended-upgrades package. This is because when
unattended-upgrades gives up on the package due to conffile prompts, that
is when this logic needs to step in and perform the upgrades.
The logic is (roughly):
- Read /var/lib/dpkg/status file to read hashes as provided by currently
installed version of a package.
- Read each configuration file for the package from disk and compute
hashes.
- If the hashes match, package has no configuration file that got
modified. There will be no conffile prompt.
- If hashes don't match then check if the version of the package being
upgraded to has the same hash as the old version of the package or in
the new version or the package that configuration file has been
removed. If the conditions satisfy, then there will be no conffile
prompt. Otherwise, package will have conffile prompt.
- A further condition for showing conffile prompt is when new package
brings in additional configuration files not known before and some of
which are already present on the disk and mismatch with incoming files.
"""
apt_pkg.init() # Read configuration that will be used later.
packages = set(arguments.packages)
status_hashes, current_versions = _get_conffile_hashes_from_status_file(
packages)
mismatched_hashes = _filter_matching_package_hashes(status_hashes)
downloaded_files = _download_packages(packages)
new_package_hashes, new_versions = \
_get_conffile_hashes_from_downloaded_files(
packages, downloaded_files, status_hashes, mismatched_hashes)
packages_info = {}
for package in packages:
modified_conffiles = _get_modified_conffiles(
status_hashes[package], mismatched_hashes[package],
new_package_hashes[package])
if not modified_conffiles:
continue
package_info = {
'current_version': current_versions[package],
'new_version': new_versions[package],
'modified_conffiles': modified_conffiles
}
packages_info[package] = package_info
print(json.dumps(packages_info))
def _get_modified_conffiles(status_hashes, mismatched_hashes,
new_package_hashes):
"""Return list of conffiles that will cause prompts for a package."""
modified_conffiles = []
for conffile, hash_value in mismatched_hashes.items():
if conffile not in new_package_hashes:
# Modified configuration file not present new package
continue
if status_hashes[conffile] == new_package_hashes[conffile]:
# Configuration file is same in both versions of package. Conffile
# prompt is not triggered even if the file is modified on disk.
continue
modified_conffiles.append(conffile)
for conffile, hash_value in new_package_hashes.items():
if conffile in status_hashes:
# File is not new, it was read and matched against new/old hashes
continue
if not os.path.exists(conffile):
# New configuration file brought by new package doesn't not already
# exist on disk.
continue
if _get_conffile_hash(conffile) != hash_value:
# New configuration file brought by new package doesn't match file
# on the disk.
#
# If existing file is a directory, unattended-upgrades allows it,
# we still treat it as a conffile prompt. This should be okay.
modified_conffiles.append(conffile)
return modified_conffiles
def _get_conffile_hashes_from_status_file(packages):
"""For each of the packages, return a dict of conffile hashes.
Work on all packages at the same time to avoid parsing the status file
multiple times.
"""
package_hashes = defaultdict(dict)
package_versions = defaultdict(lambda: None)
status_file = apt_pkg.config.find('Dir::State::status')
with apt_pkg.TagFile(status_file) as tag_file:
for section in tag_file:
package = section.get('Package')
if package not in packages:
continue
hashes = _parse_conffiles_value(section.get('Conffiles', ''))
package_hashes[package] = hashes
package_versions[package] = section.get('Version')
return package_hashes, package_versions
def _parse_conffiles_value(value):
"""Parse and return the list of conffiles as found in dpkg status file."""
conffiles = {}
for line in value.splitlines():
parts = line.strip().split()
if len(parts) > 2 and parts[2] == 'obsolete':
continue
md5sum = parts[1]
if md5sum == 'newconffile': # (LP: #936870)
continue
file_path = parts[0]
conffiles[file_path] = md5sum
return conffiles
def _filter_matching_package_hashes(package_hashes):
"""Return hashes of only conffiles that don't match for each package."""
mismatched_hashes = defaultdict(dict)
for package, hashes in package_hashes.items():
system_hashes = {}
for conffile, md5sum in hashes.items():
system_md5sum = _get_conffile_hash(conffile)
if md5sum != system_md5sum:
system_hashes[conffile] = system_md5sum
if system_hashes:
mismatched_hashes[package] = system_hashes
return mismatched_hashes
def _get_conffile_hash(conffile):
"""Return hash of a conffile in the system."""
try:
with open(conffile, 'rb') as file_handle:
return apt_pkg.md5sum(file_handle)
except (FileNotFoundError, OSError):
return None
def _download_packages(packages):
"""Download the package for upgrade."""
sources_list = apt_pkg.SourceList()
sources_list.read_main_list()
apt_pkg_cache = apt_pkg.Cache(None) # None prevents progress messages
apt_cache = apt.cache.Cache()
dep_cache = apt_pkg.DepCache(apt_pkg_cache)
for package_name in packages:
package = apt_cache[package_name]
if package.is_upgradable:
dep_cache.mark_install(apt_pkg_cache[package_name], True,
not package.is_auto_installed)
package_manager = apt_pkg.PackageManager(dep_cache)
records = apt_pkg.PackageRecords(apt_pkg_cache)
fetcher = apt_pkg.Acquire()
package_manager.get_archives(fetcher, sources_list, records)
run_result = fetcher.run()
if run_result != apt_pkg.Acquire.RESULT_CONTINUE:
logger.error('Downloading packages failed.')
sys.exit(1)
downloaded_files = []
for item in fetcher.items:
if not item.complete or item.status == item.STAT_ERROR or \
item.status == item.STAT_AUTH_ERROR:
continue
if not item.is_trusted:
continue
if not os.path.exists(item.destfile):
continue
if not item.destfile.endswith('.deb'):
continue
downloaded_files.append(item.destfile)
return downloaded_files
def _get_conffile_hashes_from_downloaded_files(packages, downloaded_files,
status_hashes,
mismatched_hashes):
"""Retrieve the conffile hashes from downloaded .deb files."""
new_hashes = defaultdict(dict)
new_versions = defaultdict(lambda: None)
for downloaded_file in downloaded_files:
try:
package_name, hashes, new_version = \
_get_conffile_hashes_from_downloaded_file(
packages, downloaded_file, status_hashes,
mismatched_hashes)
except (LookupError, apt_pkg.Error, ValueError):
continue
new_hashes[package_name] = hashes
new_versions[package_name] = new_version
return new_hashes, new_versions
def _get_conffile_hashes_from_downloaded_file(packages, downloaded_file,
status_hashes,
mismatched_hashes):
"""Retrieve the conffile hashes from a single downloaded .deb file."""
deb_file = apt_inst.DebFile(downloaded_file)
control = deb_file.control.extractdata('control')
section = apt_pkg.TagSection(control)
package_name = section['Package']
if package_name not in packages:
raise ValueError
new_version = section['Version']
conffiles = deb_file.control.extractdata('conffiles')
conffiles = conffiles.decode().strip().split()
status_hashes = status_hashes.get(package_name, {})
mismatched_hashes = mismatched_hashes.get(package_name, {})
hashes = {}
for conffile in conffiles:
if conffile in status_hashes and conffile not in mismatched_hashes:
# File already in old package and there is no change on disk.
# Optimization to make sure we read as fewer files as possible.
continue
conffile_data = deb_file.data.extractdata(conffile.lstrip('/'))
md5sum = apt_pkg.md5sum(conffile_data)
hashes[conffile] = md5sum
return package_name, hashes, new_version
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()