mirror of
https://github.com/freedombox/FreedomBox.git
synced 2026-03-11 09:04:54 +00:00
And add one bugfix to reset cfg.actions_dir after changing it Reviewed-by: James Valleroy <jvalleroy@mailbox.org>
153 lines
4.8 KiB
Python
Executable File
153 lines
4.8 KiB
Python
Executable File
#!/usr/bin/python3
|
|
# -*- mode: python -*-
|
|
#
|
|
# This file is part of FreedomBox.
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
"""
|
|
Actions for sshfs.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
TIMEOUT = 5
|
|
|
|
|
|
class AlreadyMountedError(Exception):
|
|
pass
|
|
|
|
|
|
def parse_arguments():
|
|
"""Return parsed command line arguments as dictionary."""
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers(dest='subcommand', help='Sub command')
|
|
|
|
mount = subparsers.add_parser('mount', help='mount an ssh filesystem')
|
|
mount.add_argument('--mountpoint', help='Local mountpoint', required=True)
|
|
mount.add_argument('--path', help='Remote ssh path to mount',
|
|
required=True)
|
|
umount = subparsers.add_parser('umount',
|
|
help='unmount an ssh filesystem')
|
|
umount.add_argument('--mountpoint', help='Mountpoint to unmount',
|
|
required=True)
|
|
is_mounted = subparsers.add_parser('is-mounted',
|
|
help='Check whether an sshfs is mouned')
|
|
is_mounted.add_argument('--mountpoint', help='Mountpoint to check',
|
|
required=True)
|
|
|
|
subparsers.required = True
|
|
return parser.parse_args()
|
|
|
|
|
|
def get_env(arguments, read_input=True):
|
|
"""Create encryption and ssh kwargs out of given arguments"""
|
|
env = dict(os.environ)
|
|
password = read_password() if read_input else None
|
|
if password:
|
|
env['SSHPASS'] = password
|
|
env['BORG_RSH'] = 'sshpass -e ssh -o StrictHostKeyChecking=no'
|
|
return env
|
|
|
|
|
|
def subcommand_mount(arguments):
|
|
"""Show repository information."""
|
|
try:
|
|
validate_mountpoint(arguments.mountpoint)
|
|
except AlreadyMountedError:
|
|
return
|
|
|
|
env = get_env(arguments)
|
|
remote_path = arguments.path
|
|
kwargs = {}
|
|
# the shell would expand ~/ to the local home directory
|
|
remote_path = remote_path.replace('~/', '').replace('~', '')
|
|
cmd = ['sshfs', remote_path, arguments.mountpoint, '-o',
|
|
'UserKnownHostsFile=/dev/null', '-o',
|
|
'StrictHostKeyChecking=no']
|
|
timeout = None
|
|
if 'SSHPASS' in env:
|
|
cmd += ['-o', 'password_stdin']
|
|
kwargs['input'] = env['SSHPASS'].encode()
|
|
elif 'SSHKEY' in env:
|
|
cmd += ['-o', 'IdentityFile=$SSHKEY']
|
|
timeout = TIMEOUT
|
|
else:
|
|
raise ValueError('mount requires either SSHPASS or SSHKEY in env')
|
|
subprocess.run(cmd, check=True, env=env, timeout=timeout, **kwargs)
|
|
|
|
|
|
def subcommand_umount(arguments):
|
|
"""Show repository information."""
|
|
run(['umount', arguments.mountpoint])
|
|
|
|
|
|
def validate_mountpoint(mountpoint):
|
|
"""Check that the folder is empty, and create it if it doesn't exist"""
|
|
if os.path.exists(mountpoint):
|
|
if _is_mounted(mountpoint):
|
|
raise AlreadyMountedError('Mountpoint %s already mounted' %
|
|
mountpoint)
|
|
if os.listdir(mountpoint) or not os.path.isdir(mountpoint):
|
|
raise ValueError('Mountpoint %s is not an empty directory' %
|
|
mountpoint)
|
|
else:
|
|
os.makedirs(mountpoint)
|
|
|
|
|
|
def _is_mounted(mountpoint):
|
|
"""Return boolean whether a local directory is a mountpoint."""
|
|
cmd = ['mountpoint', '-q', mountpoint]
|
|
# mountpoint exits with status non-zero if it didn't find a mountpoint
|
|
try:
|
|
subprocess.run(cmd, check=True)
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
|
|
def subcommand_is_mounted(arguments):
|
|
print(json.dumps(_is_mounted(arguments.mountpoint)))
|
|
|
|
|
|
def read_password():
|
|
"""Read the password from stdin."""
|
|
if sys.stdin.isatty():
|
|
return ''
|
|
else:
|
|
return ''.join(sys.stdin)
|
|
|
|
|
|
def run(cmd, env=None, check=True):
|
|
"""Wrap the command with ssh password or keyfile authentication"""
|
|
# Set a timeout to not get stuck if the remote server asks for a password.
|
|
subprocess.run(cmd, check=check, env=env, timeout=TIMEOUT)
|
|
|
|
|
|
def main():
|
|
"""Parse arguments and perform all duties."""
|
|
arguments = parse_arguments()
|
|
|
|
subcommand = arguments.subcommand.replace('-', '_')
|
|
subcommand_method = globals()['subcommand_' + subcommand]
|
|
subcommand_method(arguments)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|