Sunil Mohan Adapa 2e112d751c
backups: Minor styling fixes
- Ran yapf

Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org>
Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
2019-02-07 19:12:12 -05:00

139 lines
4.4 KiB
Python
Executable File

#!/usr/bin/python3
# -*- mode: python -*-
#
# This file is part of FreedomBox.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Actions for sshfs.
"""
import argparse
import json
import os
import subprocess
import sys
TIMEOUT = 30
class AlreadyMountedError(Exception):
"""Exception raised when mount point is already mounted."""
def parse_arguments():
"""Return parsed command line arguments as dictionary."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
mount = subparsers.add_parser('mount', help='mount an ssh filesystem')
mount.add_argument('--mountpoint', help='Local mountpoint', required=True)
mount.add_argument('--path', help='Remote ssh path to mount',
required=True)
mount.add_argument('--ssh-keyfile', help='Path of private ssh key',
default=None, required=False)
umount = subparsers.add_parser('umount', help='unmount an ssh filesystem')
umount.add_argument('--mountpoint', help='Mountpoint to unmount',
required=True)
is_mounted = subparsers.add_parser(
'is-mounted', help='Check whether a mountpoint is mounted')
is_mounted.add_argument('--mountpoint', help='Mountpoint to check',
required=True)
subparsers.required = True
return parser.parse_args()
def subcommand_mount(arguments):
"""Mount a remote ssh path via sshfs."""
try:
validate_mountpoint(arguments.mountpoint)
except AlreadyMountedError:
return
remote_path = arguments.path
kwargs = {}
# the shell would expand ~/ to the local home directory
remote_path = remote_path.replace('~/', '').replace('~', '')
cmd = [
'sshfs', remote_path, arguments.mountpoint, '-o',
'UserKnownHostsFile=/dev/null', '-o', 'StrictHostKeyChecking=no'
]
if arguments.ssh_keyfile:
cmd += ['-o', 'IdentityFile=' + arguments.ssh_keyfile]
else:
password = read_password()
if not password:
raise ValueError('mount requires either a password or ssh_keyfile')
cmd += ['-o', 'password_stdin']
kwargs['input'] = password.encode()
subprocess.run(cmd, check=True, timeout=TIMEOUT, **kwargs)
def subcommand_umount(arguments):
"""Unmount a mountpoint."""
subprocess.run(['umount', arguments.mountpoint], check=True)
def validate_mountpoint(mountpoint):
"""Check that the folder is empty, and create it if it doesn't exist"""
if os.path.exists(mountpoint):
if _is_mounted(mountpoint):
raise AlreadyMountedError(
'Mountpoint %s already mounted' % mountpoint)
if os.listdir(mountpoint) or not os.path.isdir(mountpoint):
raise ValueError(
'Mountpoint %s is not an empty directory' % mountpoint)
else:
os.makedirs(mountpoint)
def _is_mounted(mountpoint):
"""Return boolean whether a local directory is a mountpoint."""
cmd = ['mountpoint', '-q', mountpoint]
# mountpoint exits with status non-zero if it didn't find a mountpoint
try:
subprocess.run(cmd, check=True)
return True
except subprocess.CalledProcessError:
return False
def subcommand_is_mounted(arguments):
"""Print whether a path is already mounted."""
print(json.dumps(_is_mounted(arguments.mountpoint)))
def read_password():
"""Read the password from stdin."""
if sys.stdin.isatty():
return ''
return ''.join(sys.stdin)
def main():
"""Parse arguments and perform all duties."""
arguments = parse_arguments()
subcommand = arguments.subcommand.replace('-', '_')
subcommand_method = globals()['subcommand_' + subcommand]
subcommand_method(arguments)
if __name__ == '__main__':
main()