container: Refactor nspawn specific operations into a separate class

- In anticipation of adding support to launch VMs using the same script.

- Assume image operations will be common other backends, even when they use
systemd-nspawn.

- Drop support for systemd-nspawn (<247). Bullseye ships with systemd-container
package 247.3. Remove version specific code that is no longer needed.

- Fix issue with checking if an image has been provisioned or not.

- Attempt to setup network manager connection every time container is launched
instead only once when image is setup. This ensures that if the connection is
removed after image setup, it will re-created when container is launched.

Tests:

- Run all the basic commands of the container and ensure they are working.

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
[vexch: Removed unused argument in _setup_image() and fixed one typo]
Signed-off-by: Veiko Aasa <veiko17@disroot.org>
Reviewed-by: Veiko Aasa <veiko17@disroot.org>
This commit is contained in:
Sunil Mohan Adapa 2024-12-18 13:52:52 -08:00 committed by Veiko Aasa
parent 83acf5cb64
commit 4f9e5e9e14
No known key found for this signature in database
GPG Key ID: 478539CAE680674E

665
container
View File

@ -128,6 +128,7 @@ import sys
import tempfile
import time
import urllib.parse
from typing import Callable
from urllib.request import urlopen
URLS_AMD64 = {
@ -252,11 +253,8 @@ exit 0
logger = logging.getLogger(__name__)
work_directory = None
systemd_version = None
def parse_arguments():
def parse_arguments() -> argparse.Namespace:
"""Return parsed command line arguments as dictionary."""
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
@ -271,12 +269,17 @@ def parse_arguments():
if distribution and distribution in distributions:
default_distribution = distribution
def _add_common_args(subparser):
subparser.add_argument('--distribution', choices=distributions,
default=default_distribution,
help='Distribution to work with')
subparser.add_argument('--machine-type', choices=('container', ),
default='container')
# Up
subparser = subparsers.add_parser('up', help='Bring up the container',
formatter_class=formatter_class)
subparser.add_argument(
'--distribution', choices=distributions, default=default_distribution,
help='Distribution of the image to download and setup')
_add_common_args(subparser)
subparser.add_argument('--image-size', default='16G',
help='Disk image size to resize to after download')
subparser.add_argument('--hkp-client', choices=('gpg', 'wget'),
@ -286,24 +289,18 @@ def parse_arguments():
subparser = subparsers.add_parser(
'ip', help='Print the IP address of the container.',
formatter_class=formatter_class)
subparser.add_argument(
'--distribution', choices=distributions, default=default_distribution,
help='Distribution of the container to print IP address')
_add_common_args(subparser)
# ssh
subparser = subparsers.add_parser('ssh', help='SSH into the container',
formatter_class=formatter_class)
subparser.add_argument('--distribution', choices=distributions,
default=default_distribution,
help='Distribution of the container to SSH into')
_add_common_args(subparser)
# Run tests
subparser = subparsers.add_parser('run-tests',
help='Run tests in the container',
formatter_class=formatter_class)
subparser.add_argument('--distribution', choices=distributions,
default=default_distribution,
help='Distribution of the container to run tests')
_add_common_args(subparser)
subparser.add_argument(
'--pytest-args', nargs='...',
help='Additional arguments to pass to the pytest command')
@ -311,25 +308,19 @@ def parse_arguments():
# Stop
subparser = subparsers.add_parser('stop', help='Stop the container',
formatter_class=formatter_class)
subparser.add_argument('--distribution', choices=distributions,
default=default_distribution,
help='Distribution of the container to stop')
_add_common_args(subparser)
# Destroy
subparser = subparsers.add_parser('destroy',
help='Destroy the container image',
formatter_class=formatter_class)
subparser.add_argument('--distribution', choices=distributions,
default=default_distribution,
help='Distribution of the image to delete')
_add_common_args(subparser)
# Update
subparser = subparsers.add_parser(
'update', help='Update the container image to the latest version',
formatter_class=formatter_class)
subparser.add_argument('--distribution', choices=distributions,
default=default_distribution,
help='Distribution of the image to update')
_add_common_args(subparser)
subparser.add_argument('--hkp-client', choices=('gpg', 'wget'),
default='gpg', help='Client for key retrieval')
@ -341,7 +332,12 @@ def parse_arguments():
return parser.parse_args()
def _check_command(command):
def _get_work_directory() -> pathlib.Path:
"""Return script's working directory."""
return pathlib.Path(__file__).parent / '.container'
def _check_command(command) -> bool:
"""Check that a command is present in PATH."""
if command == 'sudo':
which = ['which', command]
@ -352,7 +348,7 @@ def _check_command(command):
return process.returncode == 0
def _verify_dependencies():
def _verify_dependencies() -> None:
"""Ensure that all the dependencies are present."""
if not _check_command('sudo'):
logger.info('sudo command is needed. Run `apt install sudo`.')
@ -403,19 +399,7 @@ def _verify_dependencies():
subprocess.run(['sudo', 'apt', 'install'] + missing_packages, check=True)
def _get_systemd_nspawn_version():
"""Retrieve and store systemd-nspawn version (same as systemd)."""
try:
process = subprocess.run(['systemd', '--version'],
stdout=subprocess.PIPE, check=True)
except FileNotFoundError:
process = subprocess.run(['/usr/lib/systemd/systemd', '--version'],
stdout=subprocess.PIPE, check=True)
global systemd_version
systemd_version = float(process.stdout.decode().split()[1])
def _download_file(url, target_file, force=False):
def _download_file(url: str, target_file: pathlib.Path, force: bool = False):
"""Download a file from remote URL."""
if target_file.exists():
if force:
@ -434,7 +418,7 @@ def _download_file(url, target_file, force=False):
partial_file.rename(target_file)
def _receive_keys_with_gpg(gpg_home, key_ids):
def _receive_keys_with_gpg(gpg_home: pathlib.Path, key_ids: list[str]):
"""Use gpg to retrieve and import a list of keys."""
subprocess.run([
'gpg', '--quiet', '--homedir',
@ -442,7 +426,7 @@ def _receive_keys_with_gpg(gpg_home, key_ids):
] + key_ids, check=True)
def _receive_keys_with_wget(gpg_home, key_ids):
def _receive_keys_with_wget(gpg_home: pathlib.Path, key_ids: list[str]):
"""Use wget to retrieve a list of keys and import them."""
for key_id in key_ids:
# Download public key
@ -457,14 +441,15 @@ def _receive_keys_with_wget(gpg_home, key_ids):
str(gpg_home), '--import=-'], input=public_key, check=True)
def _verify_signature(hkp_client, data_file, signature_file):
def _verify_signature(hkp_client: str, data_file: pathlib.Path,
signature_file: pathlib.Path):
"""Verify the detached signature on a file using GPG."""
verified_file = signature_file.with_suffix(signature_file.suffix +
'.verified')
if verified_file.exists():
return
gpg_home = work_directory / 'gpg'
gpg_home = _get_work_directory() / 'gpg'
gpg_home.mkdir(exist_ok=True)
gpg_home.chmod(0o700)
@ -497,7 +482,7 @@ def _verify_signature(hkp_client, data_file, signature_file):
verified_file.touch()
def _extract_image(compressed_file):
def _extract_image(compressed_file: pathlib.Path):
"""Extract the image file."""
decompressed_file = compressed_file.with_suffix('')
if decompressed_file.exists():
@ -514,20 +499,20 @@ def _extract_image(compressed_file):
return decompressed_file
def _get_compressed_image_path(distribution):
def _get_compressed_image_path(distribution: str) -> pathlib.Path:
"""Return the path of the compressed image."""
url = URLS[distribution]
result = urllib.parse.urlparse(url)
return work_directory / pathlib.Path(result.path).name
return _get_work_directory() / pathlib.Path(result.path).name
def _get_project_folder():
def _get_project_folder() -> pathlib.Path:
"""Return the read-only folder that should be exposed into container."""
return work_directory.parent.resolve()
return _get_work_directory().parent.resolve()
def _get_overlay_folder(distribution):
def _get_overlay_folder(distribution: str) -> pathlib.Path:
"""Return the writable folder that should be exposed into container."""
default_data_dir = pathlib.Path.home() / '.local' / 'share'
data_folder = os.environ.get('XDG_DATA_HOME', default_data_dir)
@ -536,9 +521,10 @@ def _get_overlay_folder(distribution):
return folder.resolve()
def _download_disk_image(distribution, hkp_client, force=False):
def _download_disk_image(distribution: str, hkp_client: str,
force: bool = False) -> pathlib.Path:
"""Download and unpack FreedomBox disk image."""
work_directory.mkdir(exist_ok=True)
_get_work_directory().mkdir(exist_ok=True)
url = URLS[distribution]
@ -553,7 +539,7 @@ def _download_disk_image(distribution, hkp_client, force=False):
return _extract_image(target_file)
def _get_partition_info(image_file):
def _get_partition_info(image_file: pathlib.Path) -> tuple[str, str]:
"""Return the number of the final partition in the image file."""
process = subprocess.run(
['sudo', 'parted', '--script',
@ -569,7 +555,7 @@ def _get_partition_info(image_file):
return partition_table_type, last_partition_number
def _resize_disk_image(image_file, new_size):
def _resize_disk_image(image_file: pathlib.Path, new_size: str):
"""Resize the disk image if has not already been."""
if new_size[-1] != 'G':
raise ValueError(f'Invalid size: {new_size}')
@ -581,7 +567,7 @@ def _resize_disk_image(image_file, new_size):
logger.info('Resizing disk image to %s', new_size)
disk_free = shutil.disk_usage(work_directory).free
disk_free = shutil.disk_usage(_get_work_directory()).free
if disk_free < new_size_bytes - image_size:
raise ValueError(f'Not enough free space on disk: {disk_free} bytes')
@ -608,7 +594,7 @@ def _resize_disk_image(image_file, new_size):
subprocess.run(['sudo', 'btrfstune', '-uf', partition], check=True)
with tempfile.TemporaryDirectory(
dir=work_directory.resolve()) as mount_point:
dir=_get_work_directory().resolve()) as mount_point:
subprocess.run(['sudo', 'mount', partition, mount_point], check=True)
subprocess.run(
['sudo', 'btrfs', 'filesystem', 'resize', 'max', mount_point],
@ -620,68 +606,15 @@ def _resize_disk_image(image_file, new_size):
stderr=subprocess.DEVNULL)
def _get_nspawn_command(image_file):
"""Return the base nspwan command."""
pipe_argument = []
if systemd_version >= 247:
pipe_argument = ['--console=autopipe']
elif systemd_version > 241:
pipe_argument = ['--console=pipe']
return [
'sudo',
'systemd-nspawn',
'--image',
str(image_file),
] + pipe_argument
def _runc(image_file, command, **kwargs):
def _runc(image_file: pathlib.Path, command: list[str], **kwargs):
"""Run a command inside the container."""
subprocess.run(
_get_nspawn_command(image_file) + ['--quiet'] + command, check=True,
**kwargs)
subprocess.run([
'sudo', 'systemd-nspawn', '--image',
str(image_file), '--console=autopipe', '--quiet'
] + command, check=True, **kwargs)
def _get_interface_name(distribution):
"""Return the name of the interface."""
interface = f've-fbx-{distribution}'
process = subprocess.run(['systemd-nspawn', '--version'],
stdout=subprocess.PIPE, check=True)
version = process.stdout.decode().splitlines()[0].split()[1]
if int(float(version)) < 245:
return interface[:14]
return interface
def _setup_nm_connection(distribution):
"""Create a network manager conn. on host for DHCP/DNS with container."""
connection_name = f'fbx-{distribution}-shared'
process = subprocess.run(
['sudo', 'nmcli', 'connection', 'show', connection_name],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
if not process.returncode:
return
logger.info('Creating Network Manager connection in host: %s',
connection_name)
properties = {
'connection.id': connection_name,
'connection.type': '802-3-ethernet',
'connection.interface-name': _get_interface_name(distribution),
'connection.autoconnect': 'yes',
'connection.zone': 'trusted',
'ipv4.method': 'shared',
}
subprocess.run(['sudo', 'nmcli', 'connection', 'add'] +
list(itertools.chain(*properties.items())), check=True)
subprocess.run(['sudo', 'nmcli', 'connection', 'up', connection_name],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=False)
def _setup_users(image_file):
def _setup_users(image_file: pathlib.Path):
"""Change UID, GID and password of fbx user in container."""
folder = _get_project_folder()
@ -714,13 +647,13 @@ def _setup_users(image_file):
input=sudo_config.encode(), stdout=subprocess.DEVNULL)
def _setup_ssh(image_file):
def _setup_ssh(image_file: pathlib.Path):
"""Setup SSH server and client keys."""
logger.info('In container: Generating SSH server keys')
_runc(image_file, ['dpkg-reconfigure', 'openssh-server'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
ssh_directory = work_directory / 'ssh'
ssh_directory = _get_work_directory() / 'ssh'
ssh_directory.mkdir(exist_ok=True)
ssh_directory.chmod(0o700)
@ -746,7 +679,7 @@ def _setup_ssh(image_file):
_runc(image_file, ['chown', 'fbx:fbx', '/home/fbx/.ssh/authorized_keys'])
def _setup(image_file, distribution):
def _setup_image(image_file: pathlib.Path):
"""Prepare the image for execution."""
setup_file = image_file.with_suffix(image_file.suffix + '.setup')
if setup_file.exists():
@ -766,124 +699,12 @@ def _setup(image_file, distribution):
_setup_users(image_file)
_setup_nm_connection(distribution)
setup_file.touch()
logger.info('Setup completed')
def _create_nspawn_machine(image_file, distribution):
"""Create systemd-nspawn options/image used by machinectl."""
machine_name = f'fbx-{distribution}'
overlay_folder = _get_overlay_folder(distribution)
logger.info('Creating overlay folder %s', overlay_folder)
overlay_folder.mkdir(parents=True, exist_ok=True)
nspawn_dir = pathlib.Path('/run/systemd/nspawn/')
if not nspawn_dir.exists():
subprocess.run(['sudo', 'mkdir', '--mode=700',
str(nspawn_dir)], check=True)
nspawn_options = f'''[Exec]
Boot=yes
PrivateUsers=no
# Allow all system calls to enable podman containers inside the nspawn
# container.
SystemCallFilter=@known
[Files]
Overlay={_get_project_folder()}:{overlay_folder}:/freedombox
[Network]
VirtualEthernet=yes
'''
nspawn_file = f'/run/systemd/nspawn/{machine_name}.nspawn'
logger.info('Creating systemd-nspawn configuration: %s', nspawn_file)
subprocess.run(['sudo', 'rm', '--force', nspawn_file], check=False)
subprocess.run(['sudo', 'tee', nspawn_file], input=nspawn_options.encode(),
stdout=subprocess.DEVNULL, check=True)
image_link = pathlib.Path(f'/var/lib/machines/{machine_name}.raw')
logger.info('Linking systemd-nspawn image %s -> %s', image_link,
image_file)
result = subprocess.run(
['sudo', 'test', '-e', str(image_link)], check=False)
if not result.returncode:
result = subprocess.run(
['sudo', 'test', '-L', str(image_link)], check=False)
if result.returncode:
raise Exception(f'Image file {image_link} is not a symlink.')
subprocess.run(['sudo', 'rm', '--force', str(image_link)], check=False)
subprocess.run([
'sudo', 'ln', '--symbolic',
str(image_file.resolve()),
str(image_link)
], check=False)
def _get_machine_status(machine_name):
"""Return the running status of a container."""
process = subprocess.run(['sudo', 'machinectl', 'status', machine_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, check=False)
return process.returncode == 0
def _launch(image_file, distribution):
"""Launch and boot the container."""
machine_name = f'fbx-{distribution}'
_create_nspawn_machine(image_file, distribution)
logger.info('Running `machinectl start %s`', machine_name)
subprocess.run(['sudo', 'machinectl', 'start', machine_name], check=True)
logger.info('Bringing up host network connection: %s',
f'fbx-{distribution}-shared')
# This command requires dnsmasq
subprocess.run(
['sudo', 'nmcli', 'connection', 'up', f'fbx-{distribution}-shared'],
stdout=subprocess.DEVNULL, check=True)
def _stop(distribution):
"""Stop the container."""
machine_name = f'fbx-{distribution}'
logger.info('Running `machinectl stop %s`', machine_name)
subprocess.run(['sudo', 'machinectl', 'stop', machine_name], check=False)
_wait_for(lambda: not _get_machine_status(machine_name))
def _terminate(distribution):
"""Terminal the container."""
machine_name = f'fbx-{distribution}'
logger.info('Running `machinectl terminate %s`', machine_name)
subprocess.run(['sudo', 'machinectl', 'terminate', machine_name],
check=False)
_wait_for(lambda: not _get_machine_status(machine_name))
def _destroy(distribution):
def _destroy_image(distribution: str):
"""Remove all traces of the machine and its image."""
machine_name = f'fbx-{distribution}'
image_link = pathlib.Path(f'/var/lib/machines/{machine_name}.raw')
logger.info('Removing link to systemd-nspawn image %s', image_link)
subprocess.run(['sudo', 'rm', '--force', str(image_link)], check=False)
nspawn_file = f'/run/systemd/nspawn/{machine_name}.nspawn'
logger.info('Removing systemd-nspawn configuration: %s', nspawn_file)
subprocess.run(['sudo', 'rm', '--force', nspawn_file], check=False)
overlay_folder = _get_overlay_folder(distribution)
logger.info('Removing overlay folder with container written data: %s',
overlay_folder)
subprocess.run(['sudo', 'rm', '-r', '--force', overlay_folder],
check=False)
compressed_image = _get_compressed_image_path(distribution)
image_file = compressed_image.with_suffix('')
logger.info('Removing image file %s', image_file)
@ -902,50 +723,39 @@ def _destroy(distribution):
except FileNotFoundError:
pass
connection_name = f'fbx-{distribution}-shared'
logger.info('Removing Network Manager connection %s', connection_name)
result = subprocess.run(
['sudo', 'nmcli', 'connection', 'delete', connection_name],
capture_output=True, check=False)
if result.returncode not in (0, 10):
# nmcli failed and not due to 'Connection, device, or access point does
# not exist.' See
# https://developer-old.gnome.org/NetworkManager/stable/nmcli.html
logger.error('nmcli returned code %d', result.returncode)
logger.error('Error message:\n%s', result.stderr.decode())
logger.error('Output:\n%s', result.stdout.decode())
logger.info('Keeping downloaded image: %s',
_get_compressed_image_path(distribution))
def _is_privisioned(distribution):
def _is_provisioned(distribution: str) -> bool:
"""Return the container has been provisioned fully."""
compressed_image = _get_compressed_image_path(distribution)
provision_file = compressed_image.with_suffix('.provisioned')
if provision_file.exists():
return
def _provision(image_file, distribution):
"""Run app setup inside the container."""
image_file = compressed_image.with_suffix('')
provision_file = image_file.with_suffix(image_file.suffix + '.provisioned')
if provision_file.exists():
return provision_file.exists()
def _provision(image_file: pathlib.Path, machine_type: str, distribution: str):
"""Run app setup inside the container."""
if _is_provisioned(distribution):
return
ip_address = _wait_for(lambda: _get_ip_address(distribution))
ssh_command = _get_ssh_command(ip_address, distribution)
machine = Machine.get_instance(machine_type, distribution)
ssh_command = machine.get_ssh_command()
subprocess.run(ssh_command + ['bash'], check=True,
input=PROVISION_SCRIPT.encode())
provision_file = image_file.with_suffix(image_file.suffix + '.provisioned')
provision_file.touch()
logger.info('Provision completed')
def _print_banner(distribution):
def _print_banner(machine_type: str, distribution: str):
"""Print a friendly message on how to use."""
ip_address = _wait_for(lambda: _get_ip_address(distribution))
work_directory.owner()
machine = Machine.get_instance(machine_type, distribution)
ip_address = _wait_for(lambda: machine.get_ip_address())
_get_work_directory().owner()
script = sys.argv[0]
options = ''
if distribution != 'testing':
@ -976,41 +786,7 @@ Reset : {script} destroy {options}; {script} up {options}'''
logger.info(message)
def _get_ip_address(distribution):
"""Return the first IPv4 address of container or None."""
process = subprocess.run(
['machinectl', 'list', '--output=json', '--max-addresses=all'],
stdout=subprocess.PIPE, check=True)
output = process.stdout.decode()
if not output:
return None
machines = json.loads(output)
for machine in machines:
if (machine['machine'] == f'fbx-{distribution}'
and machine['addresses'] not in (None, '-')):
for address in machine['addresses'].split():
if ipaddress.ip_address(address).version == 4:
return address
return None
def _get_ssh_command(ip_address, distribution):
"""Exec an SSH command."""
public_key = work_directory / 'ssh' / 'id_ed25519'
if ipaddress.ip_address(ip_address).is_link_local:
ip_address = f'{ip_address}%' + _get_interface_name(distribution)
return [
'ssh', '-Y', '-C', '-t', '-i',
str(public_key), '-o', 'LogLevel=error', '-o',
'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null', '-o',
'IdentitiesOnly=yes', f'fbx@{ip_address}'
]
def _wait_for(method):
def _wait_for(method: Callable[[], None | object]):
"""Wait until a condition is satisfied or finally give up."""
for _ in range(60):
return_value = method()
@ -1024,7 +800,7 @@ def _wait_for(method):
sys.exit(1)
def _get_latest_image_timestamp(distribution):
def _get_latest_image_timestamp(distribution: str) -> float:
"""Get the timestamp of the latest available image."""
url = URLS[distribution]
response = urlopen(url[0:url.rindex('/')])
@ -1033,9 +809,9 @@ def _get_latest_image_timestamp(distribution):
return datetime.datetime.strptime(str_time, '%Y-%m-%d %H:%M').timestamp()
def _is_update_required(distribution):
def _is_update_required(distribution: str) -> bool:
"""Compare local image timestamp against the latest image timestamp."""
file_path = work_directory / URLS[distribution].split('/')[-1]
file_path = _get_work_directory() / URLS[distribution].split('/')[-1]
if not file_path.exists():
return True
@ -1046,47 +822,283 @@ def _is_update_required(distribution):
return latest_image_timestamp - local_image_timestamp > one_day
def subcommand_up(arguments):
class Machine:
"""Base class for different types of machines."""
def __init__(self, distribution: str):
"""Initialize the machine object."""
machine_name = f'fbx-{distribution}'
self.machine_name = machine_name
self.distribution = distribution
@staticmethod
def get_instance(machine_type: str, distribution: str):
"""Return instance of a concrete machine class based on type."""
if machine_type == 'container':
return Container(distribution)
raise ValueError('Unknown machine type')
def get_status(self) -> bool:
"""Return whether the machine is currently running."""
return False
def setup(self) -> None:
"""Setup the infrastructure needed for the machine."""
def launch(self) -> None:
"""Start the machine."""
def stop(self) -> None:
"""Stop the machine."""
def terminate(self) -> None:
"""Terminate, i.e., force stop the machine."""
def destory(self) -> None:
"""Remove all traces of the machine from the host."""
def get_ip_address(self) -> str | None:
"""Return the IP address assigned to the machine."""
def get_ssh_command(self) -> list[str]:
"""Return the SSH command to execute for the machine."""
return []
class Container(Machine):
"""Handle container specific operations."""
def get_status(self) -> bool:
"""Return the running status of a container."""
process = subprocess.run(
['sudo', 'machinectl', 'status', self.machine_name],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
return process.returncode == 0
def setup(self) -> None:
"""Setup the infrastructure needed for the container."""
self._setup_nm_connection()
def launch(self) -> None:
"""Start the container."""
self._create_nspawn_machine()
logger.info('Running `machinectl start %s`', self.machine_name)
subprocess.run(['sudo', 'machinectl', 'start', self.machine_name],
check=True)
logger.info('Bringing up host network connection: %s',
f'fbx-{self.distribution}-shared')
# This command requires dnsmasq
subprocess.run([
'sudo', 'nmcli', 'connection', 'up',
f'fbx-{self.distribution}-shared'
], stdout=subprocess.DEVNULL, check=True)
def stop(self) -> None:
"""Stop the container."""
logger.info('Running `machinectl stop %s`', self.machine_name)
subprocess.run(['sudo', 'machinectl', 'stop', self.machine_name],
check=False)
def terminate(self) -> None:
"""Terminate, i.e., force stop the container."""
logger.info('Running `machinectl terminate %s`', self.machine_name)
subprocess.run(['sudo', 'machinectl', 'terminate', self.machine_name],
check=False)
def destory(self) -> None:
"""Remove all traces of the machine from the host."""
image_link = pathlib.Path(f'/var/lib/machines/{self.machine_name}.raw')
logger.info('Removing link to systemd-nspawn image %s', image_link)
subprocess.run(['sudo', 'rm', '--force', str(image_link)], check=False)
nspawn_file = f'/run/systemd/nspawn/{self.machine_name}.nspawn'
logger.info('Removing systemd-nspawn configuration: %s', nspawn_file)
subprocess.run(['sudo', 'rm', '--force', nspawn_file], check=False)
overlay_folder = _get_overlay_folder(self.distribution)
logger.info('Removing overlay folder with container written data: %s',
overlay_folder)
subprocess.run(['sudo', 'rm', '-r', '--force', overlay_folder],
check=False)
connection_name = f'fbx-{self.distribution}-shared'
logger.info('Removing Network Manager connection %s', connection_name)
result = subprocess.run(
['sudo', 'nmcli', 'connection', 'delete', connection_name],
capture_output=True, check=False)
if result.returncode not in (0, 10):
# nmcli failed and not due to 'Connection, device, or access point
# does not exist.' See
# https://developer-old.gnome.org/NetworkManager/stable/nmcli.html
logger.error('nmcli returned code %d', result.returncode)
logger.error('Error message:\n%s', result.stderr.decode())
logger.error('Output:\n%s', result.stdout.decode())
def get_ip_address(self) -> str | None:
"""Return the IPv4 address assigned to the container or None."""
process = subprocess.run(
['machinectl', 'list', '--output=json', '--max-addresses=all'],
stdout=subprocess.PIPE, check=True)
output = process.stdout.decode()
if not output:
return None
machines = json.loads(output)
for machine in machines:
if (machine['machine'] == f'fbx-{self.distribution}'
and machine['addresses'] not in (None, '-')):
for address in machine['addresses'].split():
if ipaddress.ip_address(address).version == 4:
return address
return None
def get_ssh_command(self) -> list[str]:
"""Return the SSH command to execute for the container."""
ip_address = _wait_for(lambda: self.get_ip_address())
public_key = _get_work_directory() / 'ssh' / 'id_ed25519'
if ipaddress.ip_address(ip_address).is_link_local:
ip_address = f'{ip_address}%' + self._get_interface_name()
return [
'ssh', '-Y', '-C', '-t', '-i',
str(public_key), '-o', 'LogLevel=error', '-o',
'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null',
'-o', 'IdentitiesOnly=yes', f'fbx@{ip_address}'
]
def _get_interface_name(self) -> str:
"""Return the name of the interface."""
return f've-fbx-{self.distribution}'
def _setup_nm_connection(self) -> None:
"""Create a NM conn. on host for DHCP/DNS with container."""
connection_name = f'fbx-{self.distribution}-shared'
process = subprocess.run(
['sudo', 'nmcli', 'connection', 'show', connection_name],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
if not process.returncode:
return
logger.info('Creating Network Manager connection in host: %s',
connection_name)
properties = {
'connection.id': connection_name,
'connection.type': '802-3-ethernet',
'connection.interface-name': self._get_interface_name(),
'connection.autoconnect': 'yes',
'connection.zone': 'trusted',
'ipv4.method': 'shared',
}
subprocess.run(['sudo', 'nmcli', 'connection', 'add'] +
list(itertools.chain(*properties.items())), check=True)
subprocess.run(['sudo', 'nmcli', 'connection', 'up', connection_name],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=False)
def _create_nspawn_machine(self) -> None:
"""Create systemd-nspawn options/image used by machinectl."""
compressed_image = _get_compressed_image_path(self.distribution)
image_file = compressed_image.with_suffix('')
overlay_folder = _get_overlay_folder(self.distribution)
logger.info('Creating overlay folder %s', overlay_folder)
overlay_folder.mkdir(parents=True, exist_ok=True)
nspawn_dir = pathlib.Path('/run/systemd/nspawn/')
if not nspawn_dir.exists():
subprocess.run(['sudo', 'mkdir', '--mode=700',
str(nspawn_dir)], check=True)
nspawn_options = f'''[Exec]
Boot=yes
PrivateUsers=no
# Allow all system calls to enable podman containers inside the nspawn
# container.
SystemCallFilter=@known
[Files]
Overlay={_get_project_folder()}:{overlay_folder}:/freedombox
[Network]
VirtualEthernet=yes
'''
nspawn_file = f'/run/systemd/nspawn/{self.machine_name}.nspawn'
logger.info('Creating systemd-nspawn configuration: %s', nspawn_file)
subprocess.run(['sudo', 'rm', '--force', nspawn_file], check=False)
subprocess.run(['sudo', 'tee', nspawn_file],
input=nspawn_options.encode(),
stdout=subprocess.DEVNULL, check=True)
image_link = pathlib.Path(f'/var/lib/machines/{self.machine_name}.raw')
logger.info('Linking systemd-nspawn image %s -> %s', image_link,
image_file)
result = subprocess.run(
['sudo', 'test', '-e', str(image_link)], check=False)
if not result.returncode:
result = subprocess.run(
['sudo', 'test', '-L', str(image_link)], check=False)
if result.returncode:
raise Exception(f'Image file {image_link} is not a symlink.')
subprocess.run(['sudo', 'rm', '--force',
str(image_link)], check=False)
subprocess.run([
'sudo', 'ln', '--symbolic',
str(image_file.resolve()),
str(image_link)
], check=False)
def subcommand_up(arguments: argparse.Namespace):
"""Download, setup and bring up the container."""
machine_name = f'fbx-{arguments.distribution}'
if _get_machine_status(machine_name) and _is_privisioned(
arguments.distribution):
machine = Machine.get_instance(arguments.machine_type,
arguments.distribution)
if machine.get_status() and _is_provisioned(arguments.distribution):
logger.info('Container is already running')
_print_banner(arguments.distribution)
_print_banner(arguments.machine_type, arguments.distribution)
return
_verify_dependencies()
_get_systemd_nspawn_version()
image_file = _download_disk_image(arguments.distribution,
arguments.hkp_client)
_resize_disk_image(image_file, arguments.image_size)
_setup(image_file, arguments.distribution)
_launch(image_file, arguments.distribution)
_provision(image_file, arguments.distribution)
_print_banner(arguments.distribution)
_setup_image(image_file)
machine.setup()
machine.launch()
_provision(image_file, arguments.machine_type, arguments.distribution)
_print_banner(arguments.machine_type, arguments.distribution)
def subcommand_ip(arguments):
def subcommand_ip(arguments: argparse.Namespace):
"""Print the IP address of the container."""
print(_get_ip_address(arguments.distribution) or '')
machine = Machine.get_instance(arguments.machine_type,
arguments.distribution)
print(machine.get_ip_address() or '')
def subcommand_ssh(arguments):
def subcommand_ssh(arguments: argparse.Namespace):
"""Open an SSH shell into the container."""
ip_address = _wait_for(lambda: _get_ip_address(arguments.distribution))
command = _get_ssh_command(ip_address, arguments.distribution)
machine = Machine.get_instance(arguments.machine_type,
arguments.distribution)
command = machine.get_ssh_command()
logger.info('Running command: %s', ' '.join(command))
os.execlp('ssh', *command)
def subcommand_run_tests(arguments):
def subcommand_run_tests(arguments: argparse.Namespace):
"""Run tests in the container."""
distribution = arguments.distribution
machine = Machine.get_instance(arguments.machine_type, distribution)
pytest_args_list = arguments.pytest_args or []
pytest_args = ' '.join((shlex.quote(arg) for arg in pytest_args_list))
ip_address = _wait_for(lambda: _get_ip_address(distribution))
ssh_command = _get_ssh_command(ip_address, distribution)
ssh_command = machine.get_ssh_command()
pytest_command = f'py.test-3 --color=yes {pytest_args}'
logger.info('Pytest command: %s', pytest_command)
@ -1098,18 +1110,26 @@ def subcommand_run_tests(arguments):
sys.exit(process.returncode)
def subcommand_stop(arguments):
def subcommand_stop(arguments: argparse.Namespace):
"""Stop the container."""
_stop(arguments.distribution)
machine = Machine.get_instance(arguments.machine_type,
arguments.distribution)
machine.stop()
_wait_for(lambda: not machine.get_status())
def subcommand_destroy(arguments):
def subcommand_destroy(arguments: argparse.Namespace):
"""Destroy the disk image."""
_terminate(arguments.distribution)
_destroy(arguments.distribution)
machine = Machine.get_instance(arguments.machine_type,
arguments.distribution)
machine.terminate()
_wait_for(lambda: not machine.get_status())
machine.destory()
_destroy_image(arguments.distribution)
def subcommand_update(arguments):
def subcommand_update(arguments: argparse.Namespace):
"""Update the disk image."""
if _is_update_required(arguments.distribution):
logger.info('Updating...')
@ -1119,7 +1139,7 @@ def subcommand_update(arguments):
logger.info('Already using the latest image')
def set_URLs():
def set_URLs() -> None:
global URLS
arch = platform.machine()
if arch == 'x86_64' or arch == 'amd64':
@ -1138,9 +1158,6 @@ def main():
logging.basicConfig(level='INFO', format='> %(message)s')
arguments = parse_arguments()
global work_directory
work_directory = pathlib.Path(__file__).parent / '.container'
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)