Viewing file: p2p_connect.py (7.44 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
#!/usr/bin/python # Tests p2p_connect # Will try to connect to another peer # and form a group ######### MAY NEED TO RUN AS SUDO #############
import dbus import sys, os import time import gobject import getopt from dbus.mainloop.glib import DBusGMainLoop
def usage(): print "Usage:" print " %s -i <interface_name> -m <wps_method> \ " \ % sys.argv[0] print " -a <addr> [-p <pin>] [-g <go_intent>] \ " print " [-w <wpas_dbus_interface>]" print "Options:" print " -i = interface name" print " -m = wps method" print " -a = peer address" print " -p = pin number (8 digits)" print " -g = group owner intent" print " -w = wpas dbus interface = fi.w1.wpa_supplicant1" print "Example:" print " %s -i wlan0 -a 0015008352c0 -m display -p 12345670" % sys.argv[0]
# Required Signals def GONegotiationSuccess(status): print "Go Negotiation Success"
def GONegotiationFailure(status): print 'Go Negotiation Failed. Status:' print format(status) os._exit(0)
def GroupStarted(properties): if properties.has_key("group_object"): print 'Group Formation Complete %s' \ % properties["group_object"] os._exit(0)
def WpsFailure(status, etc): print "WPS Authentication Failure".format(status) print etc os._exit(0)
class P2P_Connect(): # Needed Variables global bus global wpas_object global interface_object global p2p_interface global ifname global wpas global wpas_dbus_interface global timeout global path global wps_method global go_intent global addr global pin
# Dbus Paths global wpas_dbus_opath global wpas_dbus_interfaces_opath global wpas_dbus_interfaces_interface global wpas_dbus_interfaces_p2pdevice
# Dictionary of Arguements global p2p_connect_arguements
# Constructor def __init__(self,ifname,wpas_dbus_interface,addr, pin,wps_method,go_intent): # Initializes variables and threads self.ifname = ifname self.wpas_dbus_interface = wpas_dbus_interface self.wps_method = wps_method self.go_intent = go_intent self.addr = addr self.pin = pin
# Generating interface/object paths self.wpas_dbus_opath = \ "/" + self.wpas_dbus_interface.replace(".","/") self.wpas_wpas_dbus_interfaces_opath = \ self.wpas_dbus_opath + "/Interfaces" self.wpas_dbus_interfaces_interface = \ self.wpas_dbus_interface + ".Interface" self.wpas_dbus_interfaces_p2pdevice = \ self.wpas_dbus_interfaces_interface + ".P2PDevice"
# Getting interfaces and objects DBusGMainLoop(set_as_default=True) self.bus = dbus.SystemBus() self.wpas_object = self.bus.get_object( self.wpas_dbus_interface, self.wpas_dbus_opath) self.wpas = dbus.Interface( self.wpas_object, self.wpas_dbus_interface)
# See if wpa_supplicant already knows about this interface self.path = None try: self.path = self.wpas.GetInterface(ifname) except: if not str(exc).startswith( self.wpas_dbus_interface + \ ".InterfaceUnknown:"): raise exc try: path = self.wpas.CreateInterface( {'Ifname': ifname, 'Driver': 'test'}) time.sleep(1)
except dbus.DBusException, exc: if not str(exc).startswith( self.wpas_dbus_interface + \ ".InterfaceExists:"): raise exc
# Get Interface and objects self.interface_object = self.bus.get_object( self.wpas_dbus_interface,self.path) self.p2p_interface = dbus.Interface( self.interface_object, self.wpas_dbus_interfaces_p2pdevice)
# Add signals self.bus.add_signal_receiver(GONegotiationSuccess, dbus_interface=self.wpas_dbus_interfaces_p2pdevice, signal_name="GONegotiationSuccess") self.bus.add_signal_receiver(GONegotiationFailure, dbus_interface=self.wpas_dbus_interfaces_p2pdevice, signal_name="GONegotiationFailure") self.bus.add_signal_receiver(GroupStarted, dbus_interface=self.wpas_dbus_interfaces_p2pdevice, signal_name="GroupStarted") self.bus.add_signal_receiver(WpsFailure, dbus_interface=self.wpas_dbus_interfaces_p2pdevice, signal_name="WpsFailed")
#Constructing all the arguements needed to connect def constructArguements(self): # Adding required arguements self.p2p_connect_arguements = {'wps_method':self.wps_method, 'peer':dbus.ObjectPath(self.path+'/Peers/'+self.addr)}
# Display requires a pin, and a go intent of 15 if (self.wps_method == 'display'): if (self.pin != None): self.p2p_connect_arguements.update({'pin':self.pin}) else: print "Error:\n Pin required for wps_method=display" usage() quit()
if (self.go_intent != None and int(self.go_intent) != 15): print "go_intent overwritten to 15"
self.go_intent = '15'
# Keypad requires a pin, and a go intent of less than 15 elif (self.wps_method == 'keypad'): if (self.pin != None): self.p2p_connect_arguements.update({'pin':self.pin}) else: print "Error:\n Pin required for wps_method=keypad" usage() quit()
if (self.go_intent != None and int(self.go_intent) == 15): error = "Error :\n Group Owner intent cannot be" + \ " 15 for wps_method=keypad" print error usage() quit()
# Doesn't require pin # for ./wpa_cli, p2p_connect [mac] [pin#], wps_method=keypad elif (self.wps_method == 'pin'): if (self.pin != None): print "pin ignored"
# No pin is required for pbc so it is ignored elif (self.wps_method == 'pbc'): if (self.pin != None): print "pin ignored"
else: print "Error:\n wps_method not supported or does not exist" usage() quit()
# Go_intent is optional for all arguements if (self.go_intent != None): self.p2p_connect_arguements.update( {'go_intent':dbus.Int32(self.go_intent)})
# Running p2p_connect def run(self): try: result_pin = self.p2p_interface.Connect( self.p2p_connect_arguements)
except dbus.DBusException, exc: raise exc
if (self.wps_method == 'pin' and \ not self.p2p_connect_arguements.has_key('pin') ): print "Connect return with pin value of %d " % int(result_pin) gobject.MainLoop().run()
if __name__ == "__main__":
# Required interface_name = None wps_method = None addr = None
# Conditionally optional pin = None
# Optional wpas_dbus_interface = 'fi.w1.wpa_supplicant1' go_intent = None
# Using getopts to handle options try: options, args = getopt.getopt(sys.argv[1:],"hi:m:a:p:g:w:")
except getopt.GetoptError: usage() quit()
# If theres a switch, override default option for key, value in options: # Help if (key == "-h"): usage() quit() # Interface Name elif (key == "-i"): interface_name = value # WPS Method elif (key == "-m"): wps_method = value # Address elif (key == "-a"): addr = value # Pin elif (key == "-p"): pin = value # Group Owner Intent elif (key == "-g"): go_intent = value # Dbus interface elif (key == "-w"): wpas_dbus_interface = value else: assert False, "unhandled option"
# Required Arguements check if (interface_name == None or wps_method == None or addr == None): print "Error:\n Required arguements not specified" usage() quit()
# Group Owner Intent Check if (go_intent != None and (int(go_intent) > 15 or int(go_intent) < 0) ): print "Error:\n Group Owner Intent must be between 0 and 15 inclusive" usage() quit()
# Pin Check if (pin != None and len(pin) != 8): print "Error:\n Pin is not 8 digits" usage() quit()
try: p2p_connect_test = P2P_Connect(interface_name,wpas_dbus_interface, addr,pin,wps_method,go_intent)
except: print "Error:\n Invalid Arguements" usage() quit()
p2p_connect_test.constructArguements() p2p_connect_test.run()
os._exit(0)
|