FreedomBox/plinth/network.py
2015-05-21 23:42:11 +05:30

328 lines
11 KiB
Python

#
# This file is part of Plinth.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Helper functions for working with network manager.
"""
from gi.repository import GLib as glib
from gi.repository import NM as nm
import logging
import socket
import struct
import uuid
logger = logging.getLogger(__name__)
CONNECTION_TYPE_NAMES = {
'802-3-ethernet': 'Ethernet',
'802-11-wireless': 'Wi-Fi',
}
class ConnectionNotFound(Exception):
"""Network connection with a given name could not be found."""
pass
class DeviceNotFound(Exception):
"""Network device for specified operation could not be found."""
pass
def ipv4_string_to_int(address):
"""Return an integer equivalent of a string contain IPv4 address."""
return struct.unpack("=I", socket.inet_aton(address))[0]
def _callback(source_object, result, user_data):
"""Called when an operation is completed."""
del source_object # Unused
del result # Unused
del user_data # Unused
def _commit_callback(connection, error, data=None):
"""Called when the connection changes are committed."""
del connection
del error
del data
def get_connection_list():
"""Get a list of active and available connections."""
active_uuids = []
client = nm.Client.new(None)
for connection in client.get_active_connections():
active_uuids.append(connection.get_uuid())
connections = []
for connection in client.get_connections():
# Display a friendly type name if known.
connection_type = connection.get_connection_type()
connection_type = CONNECTION_TYPE_NAMES.get(connection_type,
connection_type)
connections.append({
'name': connection.get_id(),
'uuid': connection.get_uuid(),
'type': connection_type,
'is_active': connection.get_uuid() in active_uuids,
})
connections.sort(key=lambda connection: connection['is_active'],
reverse=True)
return connections
def get_connection(connection_uuid):
"""Return connection with matching uuid.
Raise ConnectionNotFound if a connection with that uuid is not
found.
"""
client = nm.Client.new(None)
connection = client.get_connection_by_uuid(connection_uuid)
if not connection:
raise ConnectionNotFound(connection_uuid)
return connection
def get_active_connection(connection_uuid):
"""Return active connection with matching UUID.
Raise ConnectionNotFound if a connection with that uuid is not
found.
"""
connections = nm.Client.new(None).get_active_connections()
connections = {connection.get_uuid(): connection
for connection in connections}
try:
return connections[connection_uuid]
except KeyError:
raise ConnectionNotFound(connection_uuid)
def _update_common_settings(connection, connection_uuid, name, type_, zone,
ipv4_method, ipv4_address):
"""Create/edit basic settings for network manager connections."""
if not connection:
connection = nm.SimpleConnection.new()
# Connection
settings = connection.get_setting_connection()
if not settings:
settings = nm.SettingConnection.new()
connection.add_setting(settings)
settings.set_property(nm.SETTING_CONNECTION_ID, name)
settings.set_property(nm.SETTING_CONNECTION_TYPE, type_)
settings.set_property(nm.SETTING_CONNECTION_ZONE, zone)
settings.set_property(nm.SETTING_CONNECTION_UUID, connection_uuid)
# IPv4
settings = connection.get_setting_ip4_config()
if not settings:
settings = nm.SettingIP4Config.new()
connection.add_setting(settings)
settings.set_property(nm.SETTING_IP_CONFIG_METHOD, ipv4_method)
if ipv4_method == nm.SETTING_IP4_CONFIG_METHOD_MANUAL and ipv4_address:
ipv4_address_int = ipv4_string_to_int(ipv4_address)
ipv4_prefix = nm.utils_ip4_get_default_prefix(ipv4_address_int)
address = nm.IPAddress.new(socket.AF_INET, ipv4_address, ipv4_prefix)
settings.add_address(address)
settings.set_property(nm.SETTING_IP_CONFIG_GATEWAY, '0.0.0.0')
else:
settings.clear_addresses()
return connection
def _update_ethernet_settings(connection, connection_uuid, name, zone,
ipv4_method, ipv4_address):
"""Create/edit ethernet settings for network manager connections."""
type_ = '802-3-ethernet'
connection = _update_common_settings(connection, connection_uuid, name,
type_, zone, ipv4_method, ipv4_address)
# Ethernet
settings = connection.get_setting_wired()
if not settings:
settings = nm.SettingWired.new()
connection.add_setting(settings)
return connection
def add_ethernet_connection(name, zone, ipv4_method, ipv4_address):
"""Add an automatic ethernet connection in network manager.
Return the UUID for the connection.
"""
connection_uuid = str(uuid.uuid4())
connection = _update_ethernet_settings(
None, connection_uuid, name, zone, ipv4_method, ipv4_address)
client = nm.Client.new(None)
client.add_connection_async(connection, True, None, _callback, None)
return connection_uuid
def edit_ethernet_connection(connection, name, zone, ipv4_method,
ipv4_address):
"""Edit an existing ethernet connection in network manager."""
_update_ethernet_settings(
connection, connection.get_uuid(), name, zone, ipv4_method,
ipv4_address)
connection.commit_changes(True)
def _update_wifi_settings(connection, connection_uuid, name, zone, ssid, mode,
auth_mode, passphrase, ipv4_method, ipv4_address):
"""Create/edit wifi settings for network manager connections."""
type_ = '802-11-wireless'
key_mgmt = 'wpa-psk'
connection = _update_common_settings(connection, connection_uuid, name,
type_, zone, ipv4_method, ipv4_address)
# Wireless
settings = connection.get_setting_wireless()
if not settings:
settings = nm.SettingWireless.new()
connection.add_setting(settings)
ssid_gbytes = glib.Bytes.new(ssid.encode())
settings.set_property(nm.SETTING_WIRELESS_SSID, ssid_gbytes)
settings.set_property(nm.SETTING_WIRELESS_MODE, mode)
# Wireless Security
if auth_mode == 'wpa' and passphrase:
settings = connection.get_setting_wireless_security()
if not settings:
settings = nm.SettingWirelessSecurity.new()
connection.add_setting(settings)
settings.set_property(nm.SETTING_WIRELESS_SECURITY_KEY_MGMT, key_mgmt)
settings.set_property(nm.SETTING_WIRELESS_SECURITY_PSK, passphrase)
else:
connection.remove_setting(nm.SettingWirelessSecurity)
return connection
def add_wifi_connection(name, zone, ssid, mode, auth_mode, passphrase,
ipv4_method, ipv4_address):
"""Add an automatic Wi-Fi connection in network manager.
Return the UUID for the connection.
"""
connection_uuid = str(uuid.uuid4())
connection = _update_wifi_settings(
None, connection_uuid, name, zone, ssid, mode, auth_mode, passphrase,
ipv4_method, ipv4_address)
client = nm.Client.new(None)
client.add_connection_async(connection, True, None, _callback, None)
return connection_uuid
def edit_wifi_connection(connection, name, zone,
ssid, mode, auth_mode, passphrase,
ipv4_method, ipv4_address):
"""Edit an existing wifi connection in network manager."""
_update_wifi_settings(
connection, connection.get_uuid(), name, zone, ssid, mode, auth_mode,
passphrase, ipv4_method, ipv4_address)
connection.commit_changes(True)
def activate_connection(connection_uuid):
"""Find and activate a network connection."""
# Find the connection
connection = get_connection(connection_uuid)
# Find a suitable device
client = nm.Client.new(None)
connection_type = connection.get_connection_type()
if connection_type == 'vpn':
for device in client.get_devices():
if device.get_state() == nm.DeviceState.ACTIVATED and \
device.get_managed():
break
else:
raise DeviceNotFound(connection)
else:
device_type = {
'802-11-wireless': nm.DeviceType.WIFI,
'802-3-ethernet': nm.DeviceType.ETHERNET,
'gsm': nm.DeviceType.MODEM,
}.get(connection_type, connection_type)
for device in client.get_devices():
logger.warn('Device - %s', device.get_hw_address())
if device.get_device_type() == device_type and \
device.get_state() == nm.DeviceState.DISCONNECTED:
break
else:
raise DeviceNotFound(connection)
client.activate_connection_async(connection, device, '/', None, _callback,
None)
return connection
def deactivate_connection(connection_uuid):
"""Find and de-activate a network connection."""
active_connection = get_active_connection(connection_uuid)
nm.Client.new(None).deactivate_connection(active_connection)
return active_connection
def delete_connection(connection_uuid):
"""Delete an exiting connection from network manager.
Raise ConnectionNotFound if connection does not exist.
"""
connection = get_connection(connection_uuid)
name = connection.get_id()
connection.delete()
return name
def wifi_scan():
"""Scan for available access points across all Wi-Fi devices."""
access_points = []
for device in nm.Client.new(None).get_devices():
if device.get_device_type() != nm.DeviceType.WIFI:
continue
for access_point in device.get_access_points():
# Retrieve the bytes in SSID. Don't convert to utf-8 or
# escape it in any way as it may contain null bytes. When
# this is used in the URL it will be escaped properly and
# unescaped when taken as view function's argument.
ssid = access_point.get_ssid()
ssid_string = ssid.get_data() if ssid else ''
access_points.append({
'ssid': ssid_string,
'strength': access_point.get_strength()})
return access_points