mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-01-21 07:55:00 +00:00
- Ran yapf Signed-off-by: Sunil Mohan Adapa <sunil@medhas.org> Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
139 lines
4.4 KiB
Python
Executable File
139 lines
4.4 KiB
Python
Executable File
#!/usr/bin/python3
|
|
# -*- mode: python -*-
|
|
#
|
|
# This file is part of FreedomBox.
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
"""
|
|
Actions for sshfs.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
TIMEOUT = 30
|
|
|
|
|
|
class AlreadyMountedError(Exception):
|
|
"""Exception raised when mount point is already mounted."""
|
|
|
|
|
|
def parse_arguments():
|
|
"""Return parsed command line arguments as dictionary."""
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
|
|
|
|
mount = subparsers.add_parser('mount', help='mount an ssh filesystem')
|
|
mount.add_argument('--mountpoint', help='Local mountpoint', required=True)
|
|
mount.add_argument('--path', help='Remote ssh path to mount',
|
|
required=True)
|
|
mount.add_argument('--ssh-keyfile', help='Path of private ssh key',
|
|
default=None, required=False)
|
|
umount = subparsers.add_parser('umount', help='unmount an ssh filesystem')
|
|
umount.add_argument('--mountpoint', help='Mountpoint to unmount',
|
|
required=True)
|
|
is_mounted = subparsers.add_parser(
|
|
'is-mounted', help='Check whether a mountpoint is mounted')
|
|
is_mounted.add_argument('--mountpoint', help='Mountpoint to check',
|
|
required=True)
|
|
|
|
subparsers.required = True
|
|
return parser.parse_args()
|
|
|
|
|
|
def subcommand_mount(arguments):
|
|
"""Mount a remote ssh path via sshfs."""
|
|
try:
|
|
validate_mountpoint(arguments.mountpoint)
|
|
except AlreadyMountedError:
|
|
return
|
|
|
|
remote_path = arguments.path
|
|
kwargs = {}
|
|
# the shell would expand ~/ to the local home directory
|
|
remote_path = remote_path.replace('~/', '').replace('~', '')
|
|
cmd = [
|
|
'sshfs', remote_path, arguments.mountpoint, '-o',
|
|
'UserKnownHostsFile=/dev/null', '-o', 'StrictHostKeyChecking=no'
|
|
]
|
|
if arguments.ssh_keyfile:
|
|
cmd += ['-o', 'IdentityFile=' + arguments.ssh_keyfile]
|
|
else:
|
|
password = read_password()
|
|
if not password:
|
|
raise ValueError('mount requires either a password or ssh_keyfile')
|
|
cmd += ['-o', 'password_stdin']
|
|
kwargs['input'] = password.encode()
|
|
|
|
subprocess.run(cmd, check=True, timeout=TIMEOUT, **kwargs)
|
|
|
|
|
|
def subcommand_umount(arguments):
|
|
"""Unmount a mountpoint."""
|
|
subprocess.run(['umount', arguments.mountpoint], check=True)
|
|
|
|
|
|
def validate_mountpoint(mountpoint):
|
|
"""Check that the folder is empty, and create it if it doesn't exist"""
|
|
if os.path.exists(mountpoint):
|
|
if _is_mounted(mountpoint):
|
|
raise AlreadyMountedError(
|
|
'Mountpoint %s already mounted' % mountpoint)
|
|
if os.listdir(mountpoint) or not os.path.isdir(mountpoint):
|
|
raise ValueError(
|
|
'Mountpoint %s is not an empty directory' % mountpoint)
|
|
else:
|
|
os.makedirs(mountpoint)
|
|
|
|
|
|
def _is_mounted(mountpoint):
|
|
"""Return boolean whether a local directory is a mountpoint."""
|
|
cmd = ['mountpoint', '-q', mountpoint]
|
|
# mountpoint exits with status non-zero if it didn't find a mountpoint
|
|
try:
|
|
subprocess.run(cmd, check=True)
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
|
|
def subcommand_is_mounted(arguments):
|
|
"""Print whether a path is already mounted."""
|
|
print(json.dumps(_is_mounted(arguments.mountpoint)))
|
|
|
|
|
|
def read_password():
|
|
"""Read the password from stdin."""
|
|
if sys.stdin.isatty():
|
|
return ''
|
|
|
|
return ''.join(sys.stdin)
|
|
|
|
|
|
def main():
|
|
"""Parse arguments and perform all duties."""
|
|
arguments = parse_arguments()
|
|
|
|
subcommand = arguments.subcommand.replace('-', '_')
|
|
subcommand_method = globals()['subcommand_' + subcommand]
|
|
subcommand_method(arguments)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|