#!/usr/bin/python3 # -*- mode: python -*- # # This file is part of FreedomBox. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # """ Actions for sshfs. """ import argparse import json import os import subprocess import sys TIMEOUT = 30 class AlreadyMountedError(Exception): """Exception raised when mount point is already mounted.""" def parse_arguments(): """Return parsed command line arguments as dictionary.""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') mount = subparsers.add_parser('mount', help='mount an ssh filesystem') mount.add_argument('--mountpoint', help='Local mountpoint', required=True) mount.add_argument('--path', help='Remote ssh path to mount', required=True) mount.add_argument('--ssh-keyfile', help='Path of private ssh key', default=None, required=False) umount = subparsers.add_parser('umount', help='unmount an ssh filesystem') umount.add_argument('--mountpoint', help='Mountpoint to unmount', required=True) is_mounted = subparsers.add_parser( 'is-mounted', help='Check whether a mountpoint is mounted') is_mounted.add_argument('--mountpoint', help='Mountpoint to check', required=True) subparsers.required = True return parser.parse_args() def subcommand_mount(arguments): """Mount a remote ssh path via sshfs.""" try: validate_mountpoint(arguments.mountpoint) except AlreadyMountedError: return remote_path = arguments.path kwargs = {} # the shell would expand ~/ to the local home directory remote_path = remote_path.replace('~/', '').replace('~', '') cmd = [ 'sshfs', remote_path, arguments.mountpoint, '-o', 'UserKnownHostsFile=/dev/null', '-o', 'StrictHostKeyChecking=no' ] if arguments.ssh_keyfile: cmd += ['-o', 'IdentityFile=' + arguments.ssh_keyfile] else: password = read_password() if not password: raise ValueError('mount requires either a password or ssh_keyfile') cmd += ['-o', 'password_stdin'] kwargs['input'] = password.encode() subprocess.run(cmd, check=True, timeout=TIMEOUT, **kwargs) def subcommand_umount(arguments): """Unmount a mountpoint.""" subprocess.run(['umount', arguments.mountpoint], check=True) def validate_mountpoint(mountpoint): """Check that the folder is empty, and create it if it doesn't exist""" if os.path.exists(mountpoint): if _is_mounted(mountpoint): raise AlreadyMountedError( 'Mountpoint %s already mounted' % mountpoint) if os.listdir(mountpoint) or not os.path.isdir(mountpoint): raise ValueError( 'Mountpoint %s is not an empty directory' % mountpoint) else: os.makedirs(mountpoint) def _is_mounted(mountpoint): """Return boolean whether a local directory is a mountpoint.""" cmd = ['mountpoint', '-q', mountpoint] # mountpoint exits with status non-zero if it didn't find a mountpoint try: subprocess.run(cmd, check=True) return True except subprocess.CalledProcessError: return False def subcommand_is_mounted(arguments): """Print whether a path is already mounted.""" print(json.dumps(_is_mounted(arguments.mountpoint))) def read_password(): """Read the password from stdin.""" if sys.stdin.isatty(): return '' return ''.join(sys.stdin) def main(): """Parse arguments and perform all duties.""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()