#!/usr/bin/python3 # -*- mode: python -*- # # This file is part of Plinth. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # """ Configuration helper for the Tor service """ import argparse import codecs import os import re import socket import time from plinth import action_utils from plinth.modules.tor import is_enabled, is_running, get_augeas, \ get_real_apt_uri_path, iter_apt_uris, APT_TOR_PREFIX SERVICE_FILE = '/etc/firewalld/services/tor-{0}.xml' TOR_CONFIG = '/etc/tor/torrc' TOR_STATE_FILE = '/var/lib/tor/state' TOR_AUTH_COOKIE = '/var/run/tor/control.authcookie' def parse_arguments(): """Return parsed command line arguments as dictionary""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest='subcommand', help='Sub command') subparsers.add_parser('setup', help='Setup Tor configuration') subparsers.add_parser('get-ports', help='Get list of Tor ports') subparsers.add_parser('get-hs', help='Get hidden service') configure = subparsers.add_parser('configure', help='Configure Tor') configure.add_argument('--service', choices=['enable', 'disable'], help='Configure Tor service') configure.add_argument('--hidden-service', choices=['enable', 'disable'], help='Configure hidden service') configure.add_argument('--apt-transport-tor', choices=['enable', 'disable'], help='Configure package download over Tor') return parser.parse_args() def subcommand_setup(_): """Setup Tor configuration after installing it.""" # XXX: Performing this as a post-install step instead of # pre-install setup for now. Creating a configuration before hand # leads dpkg to ask question about configuration overwrite which # makes aptcc backend of packagekit to wait forever even with # interactive=False. lines = """ # Run as non-exit bridge relay SocksPort [::]:9050 SocksPort 0.0.0.0:9050 ORPort auto ControlPort 9051 BridgeRelay 1 Exitpolicy reject *:* Exitpolicy reject6 *:* # Enable obfsproxy ServerTransportPlugin obfs3,obfs4 exec /usr/bin/obfs4proxy ExtORPort auto # Enable transparent proxy VirtualAddrNetworkIPv4 10.192.0.0/10 AutomapHostsOnResolve 1 TransPort 127.0.0.1:9040 TransPort [::1]:9040 DNSPort 127.0.0.1:9053 DNSPort [::1]:9053 """ with open(TOR_CONFIG, 'w') as conffile: conffile.writelines(lines) action_utils.service_restart('tor') _update_ports() def subcommand_get_ports(_): """Get list of Tor ports.""" ports = get_ports() for name, number in ports.items(): print(name, number) def subcommand_get_hs(_): """Print currently configured Tor hidden service information""" print(get_hidden_service()) def subcommand_configure(arguments): """Configure Tor.""" if arguments.service == 'disable': _disable() restart = arguments.service == None if arguments.hidden_service == 'enable': _enable_hs(restart=restart) elif arguments.hidden_service == 'disable': _disable_hs(restart=restart) if arguments.service == 'enable': _enable() if arguments.apt_transport_tor == 'enable': _enable_apt_transport_tor() elif arguments.apt_transport_tor == 'disable': _disable_apt_transport_tor() def get_ports(): """Return dict mapping port names to numbers.""" ports = {} try: ports['orport'] = _get_orport() except Exception: pass try: with open(TOR_STATE_FILE, 'r') as state_file: for line in state_file: matches = re.match( r'^\s*TransportProxy\s+(\S*)\s+\S+:(\d+)\s*$', line) if matches: ports[matches.group(1)] = matches.group(2) except FileNotFoundError: pass return ports def _get_orport(): """Return the ORPort by querying running instance.""" cookie = open(TOR_AUTH_COOKIE, 'rb').read() cookie = codecs.encode(cookie, 'hex').decode() commands = '''AUTHENTICATE {cookie} GETINFO net/listeners/or QUIT '''.format(cookie=cookie) tor_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tor_socket.connect(('localhost', 9051)) tor_socket.send(commands.encode()) response = tor_socket.recv(1024) tor_socket.close() line = response.split(b'\r\n')[1].decode() matches = re.match(r'.*="[^:]+:(\d+)"', line) return matches.group(1) def get_hidden_service(): """Return a string with configured Tor hidden service information""" hs_dir = None hs_ports = [] try: with open(TOR_CONFIG, 'r') as conf_file: for line in conf_file: if line.startswith('HiddenServiceDir'): hs_dir = line.split()[1] elif line.startswith('HiddenServicePort'): hs_ports.append(line.split()[1]) except FileNotFoundError: return 'error' if not hs_dir: return '' try: with open(os.path.join(hs_dir, 'hostname'), 'r') as conf_file: hs_hostname = conf_file.read().strip() except Exception: return 'error' return hs_hostname + ' ' + ','.join(hs_ports) def _enable(): """Enable and start the service.""" action_utils.service_enable('tor') _update_ports() def _disable(): """Disable and stop the service.""" _disable_apt_transport_tor() action_utils.service_disable('tor') def _enable_hs(restart=True): """Enable Tor hidden service""" if get_hidden_service(): return with open(TOR_CONFIG, 'r') as conffile: lines = conffile.readlines() lines.append('# Hidden Service configured by Plinth\n') lines.append('HiddenServiceDir /var/lib/tor/hidden_service/\n') lines.append('HiddenServicePort 80 127.0.0.1:80\n') lines.append('HiddenServicePort 443 127.0.0.1:443\n') lines.append('# end of Plinth Hidden Service config\n') with open(TOR_CONFIG, 'w') as conffile: conffile.writelines(lines) if restart: if is_enabled() and is_running(): action_utils.service_restart('tor') # wait until hidden service information is available tries = 0 while get_hidden_service() in ('', 'error'): tries += 1 if tries >= 12: return time.sleep(10) def _disable_hs(restart=True): """Disable Tor hidden service""" if not get_hidden_service(): return with open(TOR_CONFIG, 'r') as conffile: lines = conffile.readlines() filtered_lines = [] removing = False for line in lines: if removing: if line.startswith('# end of Plinth Hidden Service config'): # last line of Plinth hidden service block # stop removing after this line removing = False elif not line.startswith('HiddenService'): # end of Plinth hidden service block # stop removing lines removing = False filtered_lines.append(line) else: if line.startswith('# Hidden Service configured by Plinth'): # start of Plinth hidden service block # remove following HiddenService lines removing = True else: filtered_lines.append(line) with open(TOR_CONFIG, 'w') as conffile: conffile.writelines(filtered_lines) if restart: if is_enabled() and is_running(): action_utils.service_restart('tor') def _enable_apt_transport_tor(): """Enable package download over Tor.""" try: aug = get_augeas() except Exception: # If there was an error, don't proceed print('Error: Unable to understand sources format.') exit(1) for uri_path in iter_apt_uris(aug): uri_path = get_real_apt_uri_path(aug, uri_path) uri = aug.get(uri_path) if uri.startswith('http://') or uri.startswith('https://'): aug.set(uri_path, APT_TOR_PREFIX + uri) aug.save() def _disable_apt_transport_tor(): """Disable package download over Tor.""" try: aug = get_augeas() except Exception: # Disable what we can, so APT is not unusable. pass for uri_path in iter_apt_uris(aug): uri_path = get_real_apt_uri_path(aug, uri_path) uri = aug.get(uri_path) if uri.startswith(APT_TOR_PREFIX): aug.set(uri_path, uri[len(APT_TOR_PREFIX):]) aug.save() def _update_ports(): """Update firewall service information.""" ready = False tries = 0 # port information may not be available immediately after Tor started while not ready: ports = get_ports() ready = 'orport' in ports and 'obfs3' in ports and 'obfs4' in ports if ready: break tries += 1 if tries >= 12: return time.sleep(10) lines = """ Tor - {0} """ for name, number in ports.items(): try: with open(SERVICE_FILE.format(name), 'w') as service_file: service_file.writelines(lines.format(name, number)) except FileNotFoundError: return # XXX: We should ideally do firewalld reload instead. However, # firewalld seems to fail to successfully reload sometimes. action_utils.service_restart('firewalld') def main(): """Parse arguments and perform all duties""" arguments = parse_arguments() subcommand = arguments.subcommand.replace('-', '_') subcommand_method = globals()['subcommand_' + subcommand] subcommand_method(arguments) if __name__ == '__main__': main()