diff --git a/container b/container index 120b8c616..94158c10c 100755 --- a/container +++ b/container @@ -128,6 +128,7 @@ import sys import tempfile import time import urllib.parse +from typing import Callable from urllib.request import urlopen URLS_AMD64 = { @@ -252,11 +253,8 @@ exit 0 logger = logging.getLogger(__name__) -work_directory = None -systemd_version = None - -def parse_arguments(): +def parse_arguments() -> argparse.Namespace: """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -271,12 +269,17 @@ def parse_arguments(): if distribution and distribution in distributions: default_distribution = distribution + def _add_common_args(subparser): + subparser.add_argument('--distribution', choices=distributions, + default=default_distribution, + help='Distribution to work with') + subparser.add_argument('--machine-type', choices=('container', ), + default='container') + # Up subparser = subparsers.add_parser('up', help='Bring up the container', formatter_class=formatter_class) - subparser.add_argument( - '--distribution', choices=distributions, default=default_distribution, - help='Distribution of the image to download and setup') + _add_common_args(subparser) subparser.add_argument('--image-size', default='16G', help='Disk image size to resize to after download') subparser.add_argument('--hkp-client', choices=('gpg', 'wget'), @@ -286,24 +289,18 @@ def parse_arguments(): subparser = subparsers.add_parser( 'ip', help='Print the IP address of the container.', formatter_class=formatter_class) - subparser.add_argument( - '--distribution', choices=distributions, default=default_distribution, - help='Distribution of the container to print IP address') + _add_common_args(subparser) # ssh subparser = subparsers.add_parser('ssh', help='SSH into the container', formatter_class=formatter_class) - subparser.add_argument('--distribution', choices=distributions, - default=default_distribution, - help='Distribution of the container to SSH into') + _add_common_args(subparser) # Run tests subparser = subparsers.add_parser('run-tests', help='Run tests in the container', formatter_class=formatter_class) - subparser.add_argument('--distribution', choices=distributions, - default=default_distribution, - help='Distribution of the container to run tests') + _add_common_args(subparser) subparser.add_argument( '--pytest-args', nargs='...', help='Additional arguments to pass to the pytest command') @@ -311,25 +308,19 @@ def parse_arguments(): # Stop subparser = subparsers.add_parser('stop', help='Stop the container', formatter_class=formatter_class) - subparser.add_argument('--distribution', choices=distributions, - default=default_distribution, - help='Distribution of the container to stop') + _add_common_args(subparser) # Destroy subparser = subparsers.add_parser('destroy', help='Destroy the container image', formatter_class=formatter_class) - subparser.add_argument('--distribution', choices=distributions, - default=default_distribution, - help='Distribution of the image to delete') + _add_common_args(subparser) # Update subparser = subparsers.add_parser( 'update', help='Update the container image to the latest version', formatter_class=formatter_class) - subparser.add_argument('--distribution', choices=distributions, - default=default_distribution, - help='Distribution of the image to update') + _add_common_args(subparser) subparser.add_argument('--hkp-client', choices=('gpg', 'wget'), default='gpg', help='Client for key retrieval') @@ -341,7 +332,12 @@ def parse_arguments(): return parser.parse_args() -def _check_command(command): +def _get_work_directory() -> pathlib.Path: + """Return script's working directory.""" + return pathlib.Path(__file__).parent / '.container' + + +def _check_command(command) -> bool: """Check that a command is present in PATH.""" if command == 'sudo': which = ['which', command] @@ -352,7 +348,7 @@ def _check_command(command): return process.returncode == 0 -def _verify_dependencies(): +def _verify_dependencies() -> None: """Ensure that all the dependencies are present.""" if not _check_command('sudo'): logger.info('sudo command is needed. Run `apt install sudo`.') @@ -403,19 +399,7 @@ def _verify_dependencies(): subprocess.run(['sudo', 'apt', 'install'] + missing_packages, check=True) -def _get_systemd_nspawn_version(): - """Retrieve and store systemd-nspawn version (same as systemd).""" - try: - process = subprocess.run(['systemd', '--version'], - stdout=subprocess.PIPE, check=True) - except FileNotFoundError: - process = subprocess.run(['/usr/lib/systemd/systemd', '--version'], - stdout=subprocess.PIPE, check=True) - global systemd_version - systemd_version = float(process.stdout.decode().split()[1]) - - -def _download_file(url, target_file, force=False): +def _download_file(url: str, target_file: pathlib.Path, force: bool = False): """Download a file from remote URL.""" if target_file.exists(): if force: @@ -434,7 +418,7 @@ def _download_file(url, target_file, force=False): partial_file.rename(target_file) -def _receive_keys_with_gpg(gpg_home, key_ids): +def _receive_keys_with_gpg(gpg_home: pathlib.Path, key_ids: list[str]): """Use gpg to retrieve and import a list of keys.""" subprocess.run([ 'gpg', '--quiet', '--homedir', @@ -442,7 +426,7 @@ def _receive_keys_with_gpg(gpg_home, key_ids): ] + key_ids, check=True) -def _receive_keys_with_wget(gpg_home, key_ids): +def _receive_keys_with_wget(gpg_home: pathlib.Path, key_ids: list[str]): """Use wget to retrieve a list of keys and import them.""" for key_id in key_ids: # Download public key @@ -457,14 +441,15 @@ def _receive_keys_with_wget(gpg_home, key_ids): str(gpg_home), '--import=-'], input=public_key, check=True) -def _verify_signature(hkp_client, data_file, signature_file): +def _verify_signature(hkp_client: str, data_file: pathlib.Path, + signature_file: pathlib.Path): """Verify the detached signature on a file using GPG.""" verified_file = signature_file.with_suffix(signature_file.suffix + '.verified') if verified_file.exists(): return - gpg_home = work_directory / 'gpg' + gpg_home = _get_work_directory() / 'gpg' gpg_home.mkdir(exist_ok=True) gpg_home.chmod(0o700) @@ -497,7 +482,7 @@ def _verify_signature(hkp_client, data_file, signature_file): verified_file.touch() -def _extract_image(compressed_file): +def _extract_image(compressed_file: pathlib.Path): """Extract the image file.""" decompressed_file = compressed_file.with_suffix('') if decompressed_file.exists(): @@ -514,20 +499,20 @@ def _extract_image(compressed_file): return decompressed_file -def _get_compressed_image_path(distribution): +def _get_compressed_image_path(distribution: str) -> pathlib.Path: """Return the path of the compressed image.""" url = URLS[distribution] result = urllib.parse.urlparse(url) - return work_directory / pathlib.Path(result.path).name + return _get_work_directory() / pathlib.Path(result.path).name -def _get_project_folder(): +def _get_project_folder() -> pathlib.Path: """Return the read-only folder that should be exposed into container.""" - return work_directory.parent.resolve() + return _get_work_directory().parent.resolve() -def _get_overlay_folder(distribution): +def _get_overlay_folder(distribution: str) -> pathlib.Path: """Return the writable folder that should be exposed into container.""" default_data_dir = pathlib.Path.home() / '.local' / 'share' data_folder = os.environ.get('XDG_DATA_HOME', default_data_dir) @@ -536,9 +521,10 @@ def _get_overlay_folder(distribution): return folder.resolve() -def _download_disk_image(distribution, hkp_client, force=False): +def _download_disk_image(distribution: str, hkp_client: str, + force: bool = False) -> pathlib.Path: """Download and unpack FreedomBox disk image.""" - work_directory.mkdir(exist_ok=True) + _get_work_directory().mkdir(exist_ok=True) url = URLS[distribution] @@ -553,7 +539,7 @@ def _download_disk_image(distribution, hkp_client, force=False): return _extract_image(target_file) -def _get_partition_info(image_file): +def _get_partition_info(image_file: pathlib.Path) -> tuple[str, str]: """Return the number of the final partition in the image file.""" process = subprocess.run( ['sudo', 'parted', '--script', @@ -569,7 +555,7 @@ def _get_partition_info(image_file): return partition_table_type, last_partition_number -def _resize_disk_image(image_file, new_size): +def _resize_disk_image(image_file: pathlib.Path, new_size: str): """Resize the disk image if has not already been.""" if new_size[-1] != 'G': raise ValueError(f'Invalid size: {new_size}') @@ -581,7 +567,7 @@ def _resize_disk_image(image_file, new_size): logger.info('Resizing disk image to %s', new_size) - disk_free = shutil.disk_usage(work_directory).free + disk_free = shutil.disk_usage(_get_work_directory()).free if disk_free < new_size_bytes - image_size: raise ValueError(f'Not enough free space on disk: {disk_free} bytes') @@ -608,7 +594,7 @@ def _resize_disk_image(image_file, new_size): subprocess.run(['sudo', 'btrfstune', '-uf', partition], check=True) with tempfile.TemporaryDirectory( - dir=work_directory.resolve()) as mount_point: + dir=_get_work_directory().resolve()) as mount_point: subprocess.run(['sudo', 'mount', partition, mount_point], check=True) subprocess.run( ['sudo', 'btrfs', 'filesystem', 'resize', 'max', mount_point], @@ -620,68 +606,15 @@ def _resize_disk_image(image_file, new_size): stderr=subprocess.DEVNULL) -def _get_nspawn_command(image_file): - """Return the base nspwan command.""" - pipe_argument = [] - if systemd_version >= 247: - pipe_argument = ['--console=autopipe'] - elif systemd_version > 241: - pipe_argument = ['--console=pipe'] - - return [ - 'sudo', - 'systemd-nspawn', - '--image', - str(image_file), - ] + pipe_argument - - -def _runc(image_file, command, **kwargs): +def _runc(image_file: pathlib.Path, command: list[str], **kwargs): """Run a command inside the container.""" - subprocess.run( - _get_nspawn_command(image_file) + ['--quiet'] + command, check=True, - **kwargs) + subprocess.run([ + 'sudo', 'systemd-nspawn', '--image', + str(image_file), '--console=autopipe', '--quiet' + ] + command, check=True, **kwargs) -def _get_interface_name(distribution): - """Return the name of the interface.""" - interface = f've-fbx-{distribution}' - process = subprocess.run(['systemd-nspawn', '--version'], - stdout=subprocess.PIPE, check=True) - version = process.stdout.decode().splitlines()[0].split()[1] - if int(float(version)) < 245: - return interface[:14] - - return interface - - -def _setup_nm_connection(distribution): - """Create a network manager conn. on host for DHCP/DNS with container.""" - connection_name = f'fbx-{distribution}-shared' - process = subprocess.run( - ['sudo', 'nmcli', 'connection', 'show', connection_name], - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) - if not process.returncode: - return - - logger.info('Creating Network Manager connection in host: %s', - connection_name) - properties = { - 'connection.id': connection_name, - 'connection.type': '802-3-ethernet', - 'connection.interface-name': _get_interface_name(distribution), - 'connection.autoconnect': 'yes', - 'connection.zone': 'trusted', - 'ipv4.method': 'shared', - } - subprocess.run(['sudo', 'nmcli', 'connection', 'add'] + - list(itertools.chain(*properties.items())), check=True) - subprocess.run(['sudo', 'nmcli', 'connection', 'up', connection_name], - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, - check=False) - - -def _setup_users(image_file): +def _setup_users(image_file: pathlib.Path): """Change UID, GID and password of fbx user in container.""" folder = _get_project_folder() @@ -714,13 +647,13 @@ def _setup_users(image_file): input=sudo_config.encode(), stdout=subprocess.DEVNULL) -def _setup_ssh(image_file): +def _setup_ssh(image_file: pathlib.Path): """Setup SSH server and client keys.""" logger.info('In container: Generating SSH server keys') _runc(image_file, ['dpkg-reconfigure', 'openssh-server'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - ssh_directory = work_directory / 'ssh' + ssh_directory = _get_work_directory() / 'ssh' ssh_directory.mkdir(exist_ok=True) ssh_directory.chmod(0o700) @@ -746,7 +679,7 @@ def _setup_ssh(image_file): _runc(image_file, ['chown', 'fbx:fbx', '/home/fbx/.ssh/authorized_keys']) -def _setup(image_file, distribution): +def _setup_image(image_file: pathlib.Path): """Prepare the image for execution.""" setup_file = image_file.with_suffix(image_file.suffix + '.setup') if setup_file.exists(): @@ -766,124 +699,12 @@ def _setup(image_file, distribution): _setup_users(image_file) - _setup_nm_connection(distribution) - setup_file.touch() logger.info('Setup completed') -def _create_nspawn_machine(image_file, distribution): - """Create systemd-nspawn options/image used by machinectl.""" - machine_name = f'fbx-{distribution}' - - overlay_folder = _get_overlay_folder(distribution) - logger.info('Creating overlay folder %s', overlay_folder) - overlay_folder.mkdir(parents=True, exist_ok=True) - - nspawn_dir = pathlib.Path('/run/systemd/nspawn/') - if not nspawn_dir.exists(): - subprocess.run(['sudo', 'mkdir', '--mode=700', - str(nspawn_dir)], check=True) - - nspawn_options = f'''[Exec] -Boot=yes -PrivateUsers=no -# Allow all system calls to enable podman containers inside the nspawn -# container. -SystemCallFilter=@known - -[Files] -Overlay={_get_project_folder()}:{overlay_folder}:/freedombox - -[Network] -VirtualEthernet=yes -''' - nspawn_file = f'/run/systemd/nspawn/{machine_name}.nspawn' - logger.info('Creating systemd-nspawn configuration: %s', nspawn_file) - subprocess.run(['sudo', 'rm', '--force', nspawn_file], check=False) - subprocess.run(['sudo', 'tee', nspawn_file], input=nspawn_options.encode(), - stdout=subprocess.DEVNULL, check=True) - - image_link = pathlib.Path(f'/var/lib/machines/{machine_name}.raw') - logger.info('Linking systemd-nspawn image %s -> %s', image_link, - image_file) - result = subprocess.run( - ['sudo', 'test', '-e', str(image_link)], check=False) - if not result.returncode: - result = subprocess.run( - ['sudo', 'test', '-L', str(image_link)], check=False) - if result.returncode: - raise Exception(f'Image file {image_link} is not a symlink.') - - subprocess.run(['sudo', 'rm', '--force', str(image_link)], check=False) - - subprocess.run([ - 'sudo', 'ln', '--symbolic', - str(image_file.resolve()), - str(image_link) - ], check=False) - - -def _get_machine_status(machine_name): - """Return the running status of a container.""" - process = subprocess.run(['sudo', 'machinectl', 'status', machine_name], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, check=False) - return process.returncode == 0 - - -def _launch(image_file, distribution): - """Launch and boot the container.""" - machine_name = f'fbx-{distribution}' - - _create_nspawn_machine(image_file, distribution) - - logger.info('Running `machinectl start %s`', machine_name) - subprocess.run(['sudo', 'machinectl', 'start', machine_name], check=True) - - logger.info('Bringing up host network connection: %s', - f'fbx-{distribution}-shared') - # This command requires dnsmasq - subprocess.run( - ['sudo', 'nmcli', 'connection', 'up', f'fbx-{distribution}-shared'], - stdout=subprocess.DEVNULL, check=True) - - -def _stop(distribution): - """Stop the container.""" - machine_name = f'fbx-{distribution}' - logger.info('Running `machinectl stop %s`', machine_name) - subprocess.run(['sudo', 'machinectl', 'stop', machine_name], check=False) - _wait_for(lambda: not _get_machine_status(machine_name)) - - -def _terminate(distribution): - """Terminal the container.""" - machine_name = f'fbx-{distribution}' - logger.info('Running `machinectl terminate %s`', machine_name) - subprocess.run(['sudo', 'machinectl', 'terminate', machine_name], - check=False) - _wait_for(lambda: not _get_machine_status(machine_name)) - - -def _destroy(distribution): +def _destroy_image(distribution: str): """Remove all traces of the machine and its image.""" - machine_name = f'fbx-{distribution}' - image_link = pathlib.Path(f'/var/lib/machines/{machine_name}.raw') - - logger.info('Removing link to systemd-nspawn image %s', image_link) - subprocess.run(['sudo', 'rm', '--force', str(image_link)], check=False) - nspawn_file = f'/run/systemd/nspawn/{machine_name}.nspawn' - - logger.info('Removing systemd-nspawn configuration: %s', nspawn_file) - subprocess.run(['sudo', 'rm', '--force', nspawn_file], check=False) - - overlay_folder = _get_overlay_folder(distribution) - logger.info('Removing overlay folder with container written data: %s', - overlay_folder) - subprocess.run(['sudo', 'rm', '-r', '--force', overlay_folder], - check=False) - compressed_image = _get_compressed_image_path(distribution) image_file = compressed_image.with_suffix('') logger.info('Removing image file %s', image_file) @@ -902,50 +723,39 @@ def _destroy(distribution): except FileNotFoundError: pass - connection_name = f'fbx-{distribution}-shared' - logger.info('Removing Network Manager connection %s', connection_name) - result = subprocess.run( - ['sudo', 'nmcli', 'connection', 'delete', connection_name], - capture_output=True, check=False) - if result.returncode not in (0, 10): - # nmcli failed and not due to 'Connection, device, or access point does - # not exist.' See - # https://developer-old.gnome.org/NetworkManager/stable/nmcli.html - logger.error('nmcli returned code %d', result.returncode) - logger.error('Error message:\n%s', result.stderr.decode()) - logger.error('Output:\n%s', result.stdout.decode()) - logger.info('Keeping downloaded image: %s', _get_compressed_image_path(distribution)) -def _is_privisioned(distribution): +def _is_provisioned(distribution: str) -> bool: """Return the container has been provisioned fully.""" compressed_image = _get_compressed_image_path(distribution) - provision_file = compressed_image.with_suffix('.provisioned') - if provision_file.exists(): - return - - -def _provision(image_file, distribution): - """Run app setup inside the container.""" + image_file = compressed_image.with_suffix('') provision_file = image_file.with_suffix(image_file.suffix + '.provisioned') - if provision_file.exists(): + return provision_file.exists() + + +def _provision(image_file: pathlib.Path, machine_type: str, distribution: str): + """Run app setup inside the container.""" + if _is_provisioned(distribution): return - ip_address = _wait_for(lambda: _get_ip_address(distribution)) - ssh_command = _get_ssh_command(ip_address, distribution) + machine = Machine.get_instance(machine_type, distribution) + ssh_command = machine.get_ssh_command() subprocess.run(ssh_command + ['bash'], check=True, input=PROVISION_SCRIPT.encode()) + provision_file = image_file.with_suffix(image_file.suffix + '.provisioned') provision_file.touch() logger.info('Provision completed') -def _print_banner(distribution): +def _print_banner(machine_type: str, distribution: str): """Print a friendly message on how to use.""" - ip_address = _wait_for(lambda: _get_ip_address(distribution)) - work_directory.owner() + machine = Machine.get_instance(machine_type, distribution) + ip_address = _wait_for(lambda: machine.get_ip_address()) + + _get_work_directory().owner() script = sys.argv[0] options = '' if distribution != 'testing': @@ -976,41 +786,7 @@ Reset : {script} destroy {options}; {script} up {options}''' logger.info(message) -def _get_ip_address(distribution): - """Return the first IPv4 address of container or None.""" - process = subprocess.run( - ['machinectl', 'list', '--output=json', '--max-addresses=all'], - stdout=subprocess.PIPE, check=True) - output = process.stdout.decode() - if not output: - return None - - machines = json.loads(output) - for machine in machines: - if (machine['machine'] == f'fbx-{distribution}' - and machine['addresses'] not in (None, '-')): - for address in machine['addresses'].split(): - if ipaddress.ip_address(address).version == 4: - return address - - return None - - -def _get_ssh_command(ip_address, distribution): - """Exec an SSH command.""" - public_key = work_directory / 'ssh' / 'id_ed25519' - if ipaddress.ip_address(ip_address).is_link_local: - ip_address = f'{ip_address}%' + _get_interface_name(distribution) - - return [ - 'ssh', '-Y', '-C', '-t', '-i', - str(public_key), '-o', 'LogLevel=error', '-o', - 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null', '-o', - 'IdentitiesOnly=yes', f'fbx@{ip_address}' - ] - - -def _wait_for(method): +def _wait_for(method: Callable[[], None | object]): """Wait until a condition is satisfied or finally give up.""" for _ in range(60): return_value = method() @@ -1024,7 +800,7 @@ def _wait_for(method): sys.exit(1) -def _get_latest_image_timestamp(distribution): +def _get_latest_image_timestamp(distribution: str) -> float: """Get the timestamp of the latest available image.""" url = URLS[distribution] response = urlopen(url[0:url.rindex('/')]) @@ -1033,9 +809,9 @@ def _get_latest_image_timestamp(distribution): return datetime.datetime.strptime(str_time, '%Y-%m-%d %H:%M').timestamp() -def _is_update_required(distribution): +def _is_update_required(distribution: str) -> bool: """Compare local image timestamp against the latest image timestamp.""" - file_path = work_directory / URLS[distribution].split('/')[-1] + file_path = _get_work_directory() / URLS[distribution].split('/')[-1] if not file_path.exists(): return True @@ -1046,47 +822,283 @@ def _is_update_required(distribution): return latest_image_timestamp - local_image_timestamp > one_day -def subcommand_up(arguments): +class Machine: + """Base class for different types of machines.""" + + def __init__(self, distribution: str): + """Initialize the machine object.""" + machine_name = f'fbx-{distribution}' + self.machine_name = machine_name + self.distribution = distribution + + @staticmethod + def get_instance(machine_type: str, distribution: str): + """Return instance of a concrete machine class based on type.""" + if machine_type == 'container': + return Container(distribution) + + raise ValueError('Unknown machine type') + + def get_status(self) -> bool: + """Return whether the machine is currently running.""" + return False + + def setup(self) -> None: + """Setup the infrastructure needed for the machine.""" + + def launch(self) -> None: + """Start the machine.""" + + def stop(self) -> None: + """Stop the machine.""" + + def terminate(self) -> None: + """Terminate, i.e., force stop the machine.""" + + def destory(self) -> None: + """Remove all traces of the machine from the host.""" + + def get_ip_address(self) -> str | None: + """Return the IP address assigned to the machine.""" + + def get_ssh_command(self) -> list[str]: + """Return the SSH command to execute for the machine.""" + return [] + + +class Container(Machine): + """Handle container specific operations.""" + + def get_status(self) -> bool: + """Return the running status of a container.""" + process = subprocess.run( + ['sudo', 'machinectl', 'status', self.machine_name], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) + return process.returncode == 0 + + def setup(self) -> None: + """Setup the infrastructure needed for the container.""" + self._setup_nm_connection() + + def launch(self) -> None: + """Start the container.""" + self._create_nspawn_machine() + + logger.info('Running `machinectl start %s`', self.machine_name) + subprocess.run(['sudo', 'machinectl', 'start', self.machine_name], + check=True) + + logger.info('Bringing up host network connection: %s', + f'fbx-{self.distribution}-shared') + # This command requires dnsmasq + subprocess.run([ + 'sudo', 'nmcli', 'connection', 'up', + f'fbx-{self.distribution}-shared' + ], stdout=subprocess.DEVNULL, check=True) + + def stop(self) -> None: + """Stop the container.""" + logger.info('Running `machinectl stop %s`', self.machine_name) + subprocess.run(['sudo', 'machinectl', 'stop', self.machine_name], + check=False) + + def terminate(self) -> None: + """Terminate, i.e., force stop the container.""" + logger.info('Running `machinectl terminate %s`', self.machine_name) + subprocess.run(['sudo', 'machinectl', 'terminate', self.machine_name], + check=False) + + def destory(self) -> None: + """Remove all traces of the machine from the host.""" + image_link = pathlib.Path(f'/var/lib/machines/{self.machine_name}.raw') + + logger.info('Removing link to systemd-nspawn image %s', image_link) + subprocess.run(['sudo', 'rm', '--force', str(image_link)], check=False) + nspawn_file = f'/run/systemd/nspawn/{self.machine_name}.nspawn' + + logger.info('Removing systemd-nspawn configuration: %s', nspawn_file) + subprocess.run(['sudo', 'rm', '--force', nspawn_file], check=False) + + overlay_folder = _get_overlay_folder(self.distribution) + logger.info('Removing overlay folder with container written data: %s', + overlay_folder) + subprocess.run(['sudo', 'rm', '-r', '--force', overlay_folder], + check=False) + + connection_name = f'fbx-{self.distribution}-shared' + logger.info('Removing Network Manager connection %s', connection_name) + result = subprocess.run( + ['sudo', 'nmcli', 'connection', 'delete', connection_name], + capture_output=True, check=False) + if result.returncode not in (0, 10): + # nmcli failed and not due to 'Connection, device, or access point + # does not exist.' See + # https://developer-old.gnome.org/NetworkManager/stable/nmcli.html + logger.error('nmcli returned code %d', result.returncode) + logger.error('Error message:\n%s', result.stderr.decode()) + logger.error('Output:\n%s', result.stdout.decode()) + + def get_ip_address(self) -> str | None: + """Return the IPv4 address assigned to the container or None.""" + process = subprocess.run( + ['machinectl', 'list', '--output=json', '--max-addresses=all'], + stdout=subprocess.PIPE, check=True) + output = process.stdout.decode() + if not output: + return None + + machines = json.loads(output) + for machine in machines: + if (machine['machine'] == f'fbx-{self.distribution}' + and machine['addresses'] not in (None, '-')): + for address in machine['addresses'].split(): + if ipaddress.ip_address(address).version == 4: + return address + + return None + + def get_ssh_command(self) -> list[str]: + """Return the SSH command to execute for the container.""" + ip_address = _wait_for(lambda: self.get_ip_address()) + public_key = _get_work_directory() / 'ssh' / 'id_ed25519' + if ipaddress.ip_address(ip_address).is_link_local: + ip_address = f'{ip_address}%' + self._get_interface_name() + + return [ + 'ssh', '-Y', '-C', '-t', '-i', + str(public_key), '-o', 'LogLevel=error', '-o', + 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null', + '-o', 'IdentitiesOnly=yes', f'fbx@{ip_address}' + ] + + def _get_interface_name(self) -> str: + """Return the name of the interface.""" + return f've-fbx-{self.distribution}' + + def _setup_nm_connection(self) -> None: + """Create a NM conn. on host for DHCP/DNS with container.""" + connection_name = f'fbx-{self.distribution}-shared' + process = subprocess.run( + ['sudo', 'nmcli', 'connection', 'show', connection_name], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) + if not process.returncode: + return + + logger.info('Creating Network Manager connection in host: %s', + connection_name) + properties = { + 'connection.id': connection_name, + 'connection.type': '802-3-ethernet', + 'connection.interface-name': self._get_interface_name(), + 'connection.autoconnect': 'yes', + 'connection.zone': 'trusted', + 'ipv4.method': 'shared', + } + subprocess.run(['sudo', 'nmcli', 'connection', 'add'] + + list(itertools.chain(*properties.items())), check=True) + subprocess.run(['sudo', 'nmcli', 'connection', 'up', connection_name], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, + check=False) + + def _create_nspawn_machine(self) -> None: + """Create systemd-nspawn options/image used by machinectl.""" + compressed_image = _get_compressed_image_path(self.distribution) + image_file = compressed_image.with_suffix('') + + overlay_folder = _get_overlay_folder(self.distribution) + logger.info('Creating overlay folder %s', overlay_folder) + overlay_folder.mkdir(parents=True, exist_ok=True) + + nspawn_dir = pathlib.Path('/run/systemd/nspawn/') + if not nspawn_dir.exists(): + subprocess.run(['sudo', 'mkdir', '--mode=700', + str(nspawn_dir)], check=True) + + nspawn_options = f'''[Exec] +Boot=yes +PrivateUsers=no +# Allow all system calls to enable podman containers inside the nspawn +# container. +SystemCallFilter=@known + +[Files] +Overlay={_get_project_folder()}:{overlay_folder}:/freedombox + +[Network] +VirtualEthernet=yes +''' + nspawn_file = f'/run/systemd/nspawn/{self.machine_name}.nspawn' + logger.info('Creating systemd-nspawn configuration: %s', nspawn_file) + subprocess.run(['sudo', 'rm', '--force', nspawn_file], check=False) + subprocess.run(['sudo', 'tee', nspawn_file], + input=nspawn_options.encode(), + stdout=subprocess.DEVNULL, check=True) + + image_link = pathlib.Path(f'/var/lib/machines/{self.machine_name}.raw') + logger.info('Linking systemd-nspawn image %s -> %s', image_link, + image_file) + result = subprocess.run( + ['sudo', 'test', '-e', str(image_link)], check=False) + if not result.returncode: + result = subprocess.run( + ['sudo', 'test', '-L', str(image_link)], check=False) + if result.returncode: + raise Exception(f'Image file {image_link} is not a symlink.') + + subprocess.run(['sudo', 'rm', '--force', + str(image_link)], check=False) + + subprocess.run([ + 'sudo', 'ln', '--symbolic', + str(image_file.resolve()), + str(image_link) + ], check=False) + + +def subcommand_up(arguments: argparse.Namespace): """Download, setup and bring up the container.""" - machine_name = f'fbx-{arguments.distribution}' - if _get_machine_status(machine_name) and _is_privisioned( - arguments.distribution): + machine = Machine.get_instance(arguments.machine_type, + arguments.distribution) + if machine.get_status() and _is_provisioned(arguments.distribution): logger.info('Container is already running') - _print_banner(arguments.distribution) + _print_banner(arguments.machine_type, arguments.distribution) return _verify_dependencies() - _get_systemd_nspawn_version() image_file = _download_disk_image(arguments.distribution, arguments.hkp_client) _resize_disk_image(image_file, arguments.image_size) - _setup(image_file, arguments.distribution) - _launch(image_file, arguments.distribution) - _provision(image_file, arguments.distribution) - _print_banner(arguments.distribution) + _setup_image(image_file) + machine.setup() + machine.launch() + _provision(image_file, arguments.machine_type, arguments.distribution) + _print_banner(arguments.machine_type, arguments.distribution) -def subcommand_ip(arguments): +def subcommand_ip(arguments: argparse.Namespace): """Print the IP address of the container.""" - print(_get_ip_address(arguments.distribution) or '') + machine = Machine.get_instance(arguments.machine_type, + arguments.distribution) + print(machine.get_ip_address() or '') -def subcommand_ssh(arguments): +def subcommand_ssh(arguments: argparse.Namespace): """Open an SSH shell into the container.""" - ip_address = _wait_for(lambda: _get_ip_address(arguments.distribution)) - command = _get_ssh_command(ip_address, arguments.distribution) + machine = Machine.get_instance(arguments.machine_type, + arguments.distribution) + command = machine.get_ssh_command() logger.info('Running command: %s', ' '.join(command)) os.execlp('ssh', *command) -def subcommand_run_tests(arguments): +def subcommand_run_tests(arguments: argparse.Namespace): """Run tests in the container.""" distribution = arguments.distribution + machine = Machine.get_instance(arguments.machine_type, distribution) pytest_args_list = arguments.pytest_args or [] pytest_args = ' '.join((shlex.quote(arg) for arg in pytest_args_list)) - ip_address = _wait_for(lambda: _get_ip_address(distribution)) - ssh_command = _get_ssh_command(ip_address, distribution) + ssh_command = machine.get_ssh_command() pytest_command = f'py.test-3 --color=yes {pytest_args}' logger.info('Pytest command: %s', pytest_command) @@ -1098,18 +1110,26 @@ def subcommand_run_tests(arguments): sys.exit(process.returncode) -def subcommand_stop(arguments): +def subcommand_stop(arguments: argparse.Namespace): """Stop the container.""" - _stop(arguments.distribution) + machine = Machine.get_instance(arguments.machine_type, + arguments.distribution) + machine.stop() + _wait_for(lambda: not machine.get_status()) -def subcommand_destroy(arguments): +def subcommand_destroy(arguments: argparse.Namespace): """Destroy the disk image.""" - _terminate(arguments.distribution) - _destroy(arguments.distribution) + machine = Machine.get_instance(arguments.machine_type, + arguments.distribution) + machine.terminate() + _wait_for(lambda: not machine.get_status()) + + machine.destory() + _destroy_image(arguments.distribution) -def subcommand_update(arguments): +def subcommand_update(arguments: argparse.Namespace): """Update the disk image.""" if _is_update_required(arguments.distribution): logger.info('Updating...') @@ -1119,7 +1139,7 @@ def subcommand_update(arguments): logger.info('Already using the latest image') -def set_URLs(): +def set_URLs() -> None: global URLS arch = platform.machine() if arch == 'x86_64' or arch == 'amd64': @@ -1138,9 +1158,6 @@ def main(): logging.basicConfig(level='INFO', format='> %(message)s') arguments = parse_arguments() - global work_directory - work_directory = pathlib.Path(__file__).parent / '.container' - subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments)