p2p_connector.wifi_direct_connector
This module provides functions to be able to connect to a target p2p device using the application 'wpa_cli'
1""" 2This module provides functions to be able to connect to a target p2p device using the application 'wpa_cli' 3""" 4import argparse 5import re 6import subprocess 7import sys 8import time 9from typing import Optional 10 11 12class P2PPeer: 13 """ 14 This class represents a p2p peer. 15 """ 16 17 def __init__(self, device_name: str, mac_address: str, status: int): 18 self.device_name = device_name 19 self.mac_address = mac_address 20 self.status = status 21 22 def __str__(self): 23 return f"{self.device_name} - {self.mac_address}" 24 25 def __repr__(self): 26 return self.__str__() 27 28 29class WifiDirectConnector: 30 """ 31 This class provides methods for establishing a WiFi-Direct connection with an Android device. 32 """ 33 WPA_CLI_LOCATION = "/usr/sbin/wpa_cli" 34 WPA_CLI_LISTEN = "p2p_listen" 35 WPA_CLI_P2P_PEERS = "p2p_peers" 36 WPA_CLI_P2P_PEER_DETAILS = "p2p_peer {mac_address}" 37 WPA_CLI_HELP = "-h" 38 P2P_FIND_COMMAND = f"{WPA_CLI_LOCATION} -i p2p-dev-wlan0 p2p_find" 39 P2P_GROUP_REMOVE_COMMAND = "-ip2p-dev-wlan0 p2p_group_remove {interface_name}" 40 GET_WIFI_INTERFACE = "ip -br link | grep -Po 'p2p-wlan0-\\d+'" 41 P2P_CONNECT = "p2p_connect {target_device_mac_addr} pbc" 42 43 def __init__(self): 44 self.check_wpa_cli_available() 45 46 def execute_wpa_cli_command(self, command: str, ignore_status_code: Optional[int] = None) -> str: 47 """ 48 Executes wpa_cli command 49 @param command: argument(s) as str 50 @param ignore_status_code: optional argument - if set no error will be raised if status code is not equal to 51 zero but is equal to 'ignore_status_code' 52 """ 53 command = f"{self.WPA_CLI_LOCATION} {command}" 54 result = subprocess.run(command, shell=True, capture_output=True) 55 if result.returncode != 0 and (ignore_status_code is None or result.returncode != ignore_status_code): 56 raise ValueError(f'Error occurred while executing command "{command}": {result.stderr}') 57 return result.stdout.decode() 58 59 def check_wpa_cli_available(self): 60 """ 61 check whether the application 'wpa_cli`is installed 62 @raise ValueError: raises if not installed or status-code is not equal to 0 63 """ 64 self.execute_wpa_cli_command(self.WPA_CLI_HELP) 65 66 def remove_p2p_group(self): 67 """ 68 removes all p2p groups 69 """ 70 result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True) 71 if len(result.stdout) > 0: 72 self.execute_wpa_cli_command(self.P2P_GROUP_REMOVE_COMMAND.format(interface_name=result.stdout.decode())) 73 print('removing p2p group...') 74 time.sleep(10) 75 76 def remove_orphan_p2p_groups(self): 77 """ 78 removes orphan p2p groups 79 """ 80 result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True) 81 interface_name = result.stdout.decode().strip() 82 if len(interface_name) > 0: 83 devices = self.get_available_devices(interface_name=interface_name) 84 if len(devices) == 0: 85 print(f'group with interface "{interface_name}" has no members.') 86 self.remove_p2p_group() 87 else: 88 all_device_inactive = True 89 for device in devices: 90 if self.check_already_connected(device.device_name): 91 all_device_inactive = False 92 if all_device_inactive: 93 print(f'group with interface "{interface_name}" has only inactive members: {devices}.') 94 self.remove_p2p_group() 95 96 def make_device_visible(self): 97 """ 98 executes 'wpa_cli listen' to make device visible 99 """ 100 self.execute_wpa_cli_command(self.WPA_CLI_LISTEN) 101 102 def get_available_devices(self, interface_name: str = None) -> list[P2PPeer]: 103 """ 104 Gets all available target devices. 105 @return: list containing all target devices 106 """ 107 interface_command = f'-i {interface_name} ' 108 if interface_name is None: 109 interface_command = '' 110 mac_addresses: list[str] = [] 111 stdout = self.execute_wpa_cli_command(f'{interface_command}{self.WPA_CLI_P2P_PEERS}', ignore_status_code=255) 112 for line in stdout.splitlines(): 113 if re.search(r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$", line): 114 mac_addresses.append(line) 115 116 available_peers: list[P2PPeer] = [] 117 for mac_address in mac_addresses: 118 result = subprocess.run(f"/usr/sbin/wpa_cli p2p_peer {mac_address}", shell=True, capture_output=True, 119 check=True) 120 stdout = result.stdout.decode() 121 device_name = re.search(r'(?<=device_name=).*', stdout).group() 122 device_status = re.search(r'(?<=status=).*', stdout).group() 123 available_peers.append(P2PPeer(device_name, mac_address, int(device_status))) 124 return available_peers 125 126 def connect_to_device_if_available(self, device_name, attempts: int = 30, time_to_sleep: int = 2): 127 """ 128 Connects to a target device and waits for an incoming connection request. 129 @param device_name: target device name 130 @param attempts: how many attempts should be used to wait for the connection request 131 @param time_to_sleep: time in seconds how long should be waited between each attempt 132 """ 133 for _ in range(attempts): 134 status_text = f'\rwaiting for connection request from {device_name}. ' 135 target_device: P2PPeer 136 for target_device in self.get_available_devices(): 137 if target_device.device_name == device_name: 138 # found device 139 if target_device.status == 1: 140 # got connection request 141 self.connect_to_target_device(target_device) 142 return True 143 status_text = status_text + 'Found device, but waiting for connection_request' 144 print(status_text, end='') 145 time.sleep(time_to_sleep) 146 return False 147 148 def connect_to_target_device(self, device: P2PPeer): 149 """ 150 Connects to a target device. 151 @param device: target device as P2PPeer object 152 """ 153 self.execute_wpa_cli_command(self.P2P_CONNECT.format(target_device_mac_addr=device.mac_address)) 154 155 def check_already_connected(self, device_name: str) -> bool: 156 """ 157 Checks whether a target device is already connected to this device. 158 @param device_name: device name of target device as str 159 @return: True if device is already connected, else False 160 """ 161 for target_device in self.get_available_devices(): 162 if target_device.device_name == device_name: 163 stdout = self.execute_wpa_cli_command( 164 self.WPA_CLI_P2P_PEER_DETAILS.format(mac_address=target_device.mac_address)) 165 return re.search(r'(?<=interface_addr=).*', stdout).group() != '00:00:00:00:00:00' 166 return False 167 168 169if __name__ == "__main__": 170 parser = argparse.ArgumentParser(description='Provides an Interface to set up a P2P Server.') 171 group = parser.add_mutually_exclusive_group(required=True) 172 group.add_argument('--target', help="Set name of target device") 173 group.add_argument('--remove-group', action='store_true', help="remove p2p group of this device") 174 args = parser.parse_args() 175 176 wifi_direct_conn = WifiDirectConnector() 177 wifi_direct_conn.remove_orphan_p2p_groups() 178 179 target_device_name = args.target 180 if args.remove_group: 181 print('removing p2p group') 182 wifi_direct_conn.remove_p2p_group() 183 else: 184 wifi_direct_conn.remove_orphan_p2p_groups() 185 if wifi_direct_conn.check_already_connected(target_device_name): 186 print(f'device "{target_device_name}" is already connected') 187 sys.exit(0) 188 189 wifi_direct_conn.make_device_visible() 190 wifi_direct_conn.connect_to_device_if_available(target_device_name)
13class P2PPeer: 14 """ 15 This class represents a p2p peer. 16 """ 17 18 def __init__(self, device_name: str, mac_address: str, status: int): 19 self.device_name = device_name 20 self.mac_address = mac_address 21 self.status = status 22 23 def __str__(self): 24 return f"{self.device_name} - {self.mac_address}" 25 26 def __repr__(self): 27 return self.__str__()
This class represents a p2p peer.
30class WifiDirectConnector: 31 """ 32 This class provides methods for establishing a WiFi-Direct connection with an Android device. 33 """ 34 WPA_CLI_LOCATION = "/usr/sbin/wpa_cli" 35 WPA_CLI_LISTEN = "p2p_listen" 36 WPA_CLI_P2P_PEERS = "p2p_peers" 37 WPA_CLI_P2P_PEER_DETAILS = "p2p_peer {mac_address}" 38 WPA_CLI_HELP = "-h" 39 P2P_FIND_COMMAND = f"{WPA_CLI_LOCATION} -i p2p-dev-wlan0 p2p_find" 40 P2P_GROUP_REMOVE_COMMAND = "-ip2p-dev-wlan0 p2p_group_remove {interface_name}" 41 GET_WIFI_INTERFACE = "ip -br link | grep -Po 'p2p-wlan0-\\d+'" 42 P2P_CONNECT = "p2p_connect {target_device_mac_addr} pbc" 43 44 def __init__(self): 45 self.check_wpa_cli_available() 46 47 def execute_wpa_cli_command(self, command: str, ignore_status_code: Optional[int] = None) -> str: 48 """ 49 Executes wpa_cli command 50 @param command: argument(s) as str 51 @param ignore_status_code: optional argument - if set no error will be raised if status code is not equal to 52 zero but is equal to 'ignore_status_code' 53 """ 54 command = f"{self.WPA_CLI_LOCATION} {command}" 55 result = subprocess.run(command, shell=True, capture_output=True) 56 if result.returncode != 0 and (ignore_status_code is None or result.returncode != ignore_status_code): 57 raise ValueError(f'Error occurred while executing command "{command}": {result.stderr}') 58 return result.stdout.decode() 59 60 def check_wpa_cli_available(self): 61 """ 62 check whether the application 'wpa_cli`is installed 63 @raise ValueError: raises if not installed or status-code is not equal to 0 64 """ 65 self.execute_wpa_cli_command(self.WPA_CLI_HELP) 66 67 def remove_p2p_group(self): 68 """ 69 removes all p2p groups 70 """ 71 result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True) 72 if len(result.stdout) > 0: 73 self.execute_wpa_cli_command(self.P2P_GROUP_REMOVE_COMMAND.format(interface_name=result.stdout.decode())) 74 print('removing p2p group...') 75 time.sleep(10) 76 77 def remove_orphan_p2p_groups(self): 78 """ 79 removes orphan p2p groups 80 """ 81 result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True) 82 interface_name = result.stdout.decode().strip() 83 if len(interface_name) > 0: 84 devices = self.get_available_devices(interface_name=interface_name) 85 if len(devices) == 0: 86 print(f'group with interface "{interface_name}" has no members.') 87 self.remove_p2p_group() 88 else: 89 all_device_inactive = True 90 for device in devices: 91 if self.check_already_connected(device.device_name): 92 all_device_inactive = False 93 if all_device_inactive: 94 print(f'group with interface "{interface_name}" has only inactive members: {devices}.') 95 self.remove_p2p_group() 96 97 def make_device_visible(self): 98 """ 99 executes 'wpa_cli listen' to make device visible 100 """ 101 self.execute_wpa_cli_command(self.WPA_CLI_LISTEN) 102 103 def get_available_devices(self, interface_name: str = None) -> list[P2PPeer]: 104 """ 105 Gets all available target devices. 106 @return: list containing all target devices 107 """ 108 interface_command = f'-i {interface_name} ' 109 if interface_name is None: 110 interface_command = '' 111 mac_addresses: list[str] = [] 112 stdout = self.execute_wpa_cli_command(f'{interface_command}{self.WPA_CLI_P2P_PEERS}', ignore_status_code=255) 113 for line in stdout.splitlines(): 114 if re.search(r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$", line): 115 mac_addresses.append(line) 116 117 available_peers: list[P2PPeer] = [] 118 for mac_address in mac_addresses: 119 result = subprocess.run(f"/usr/sbin/wpa_cli p2p_peer {mac_address}", shell=True, capture_output=True, 120 check=True) 121 stdout = result.stdout.decode() 122 device_name = re.search(r'(?<=device_name=).*', stdout).group() 123 device_status = re.search(r'(?<=status=).*', stdout).group() 124 available_peers.append(P2PPeer(device_name, mac_address, int(device_status))) 125 return available_peers 126 127 def connect_to_device_if_available(self, device_name, attempts: int = 30, time_to_sleep: int = 2): 128 """ 129 Connects to a target device and waits for an incoming connection request. 130 @param device_name: target device name 131 @param attempts: how many attempts should be used to wait for the connection request 132 @param time_to_sleep: time in seconds how long should be waited between each attempt 133 """ 134 for _ in range(attempts): 135 status_text = f'\rwaiting for connection request from {device_name}. ' 136 target_device: P2PPeer 137 for target_device in self.get_available_devices(): 138 if target_device.device_name == device_name: 139 # found device 140 if target_device.status == 1: 141 # got connection request 142 self.connect_to_target_device(target_device) 143 return True 144 status_text = status_text + 'Found device, but waiting for connection_request' 145 print(status_text, end='') 146 time.sleep(time_to_sleep) 147 return False 148 149 def connect_to_target_device(self, device: P2PPeer): 150 """ 151 Connects to a target device. 152 @param device: target device as P2PPeer object 153 """ 154 self.execute_wpa_cli_command(self.P2P_CONNECT.format(target_device_mac_addr=device.mac_address)) 155 156 def check_already_connected(self, device_name: str) -> bool: 157 """ 158 Checks whether a target device is already connected to this device. 159 @param device_name: device name of target device as str 160 @return: True if device is already connected, else False 161 """ 162 for target_device in self.get_available_devices(): 163 if target_device.device_name == device_name: 164 stdout = self.execute_wpa_cli_command( 165 self.WPA_CLI_P2P_PEER_DETAILS.format(mac_address=target_device.mac_address)) 166 return re.search(r'(?<=interface_addr=).*', stdout).group() != '00:00:00:00:00:00' 167 return False
This class provides methods for establishing a WiFi-Direct connection with an Android device.
47 def execute_wpa_cli_command(self, command: str, ignore_status_code: Optional[int] = None) -> str: 48 """ 49 Executes wpa_cli command 50 @param command: argument(s) as str 51 @param ignore_status_code: optional argument - if set no error will be raised if status code is not equal to 52 zero but is equal to 'ignore_status_code' 53 """ 54 command = f"{self.WPA_CLI_LOCATION} {command}" 55 result = subprocess.run(command, shell=True, capture_output=True) 56 if result.returncode != 0 and (ignore_status_code is None or result.returncode != ignore_status_code): 57 raise ValueError(f'Error occurred while executing command "{command}": {result.stderr}') 58 return result.stdout.decode()
Executes wpa_cli command @param command: argument(s) as str @param ignore_status_code: optional argument - if set no error will be raised if status code is not equal to zero but is equal to 'ignore_status_code'
60 def check_wpa_cli_available(self): 61 """ 62 check whether the application 'wpa_cli`is installed 63 @raise ValueError: raises if not installed or status-code is not equal to 0 64 """ 65 self.execute_wpa_cli_command(self.WPA_CLI_HELP)
check whether the application 'wpa_cli`is installed @raise ValueError: raises if not installed or status-code is not equal to 0
67 def remove_p2p_group(self): 68 """ 69 removes all p2p groups 70 """ 71 result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True) 72 if len(result.stdout) > 0: 73 self.execute_wpa_cli_command(self.P2P_GROUP_REMOVE_COMMAND.format(interface_name=result.stdout.decode())) 74 print('removing p2p group...') 75 time.sleep(10)
removes all p2p groups
77 def remove_orphan_p2p_groups(self): 78 """ 79 removes orphan p2p groups 80 """ 81 result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True) 82 interface_name = result.stdout.decode().strip() 83 if len(interface_name) > 0: 84 devices = self.get_available_devices(interface_name=interface_name) 85 if len(devices) == 0: 86 print(f'group with interface "{interface_name}" has no members.') 87 self.remove_p2p_group() 88 else: 89 all_device_inactive = True 90 for device in devices: 91 if self.check_already_connected(device.device_name): 92 all_device_inactive = False 93 if all_device_inactive: 94 print(f'group with interface "{interface_name}" has only inactive members: {devices}.') 95 self.remove_p2p_group()
removes orphan p2p groups
97 def make_device_visible(self): 98 """ 99 executes 'wpa_cli listen' to make device visible 100 """ 101 self.execute_wpa_cli_command(self.WPA_CLI_LISTEN)
executes 'wpa_cli listen' to make device visible
103 def get_available_devices(self, interface_name: str = None) -> list[P2PPeer]: 104 """ 105 Gets all available target devices. 106 @return: list containing all target devices 107 """ 108 interface_command = f'-i {interface_name} ' 109 if interface_name is None: 110 interface_command = '' 111 mac_addresses: list[str] = [] 112 stdout = self.execute_wpa_cli_command(f'{interface_command}{self.WPA_CLI_P2P_PEERS}', ignore_status_code=255) 113 for line in stdout.splitlines(): 114 if re.search(r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$", line): 115 mac_addresses.append(line) 116 117 available_peers: list[P2PPeer] = [] 118 for mac_address in mac_addresses: 119 result = subprocess.run(f"/usr/sbin/wpa_cli p2p_peer {mac_address}", shell=True, capture_output=True, 120 check=True) 121 stdout = result.stdout.decode() 122 device_name = re.search(r'(?<=device_name=).*', stdout).group() 123 device_status = re.search(r'(?<=status=).*', stdout).group() 124 available_peers.append(P2PPeer(device_name, mac_address, int(device_status))) 125 return available_peers
Gets all available target devices. @return: list containing all target devices
127 def connect_to_device_if_available(self, device_name, attempts: int = 30, time_to_sleep: int = 2): 128 """ 129 Connects to a target device and waits for an incoming connection request. 130 @param device_name: target device name 131 @param attempts: how many attempts should be used to wait for the connection request 132 @param time_to_sleep: time in seconds how long should be waited between each attempt 133 """ 134 for _ in range(attempts): 135 status_text = f'\rwaiting for connection request from {device_name}. ' 136 target_device: P2PPeer 137 for target_device in self.get_available_devices(): 138 if target_device.device_name == device_name: 139 # found device 140 if target_device.status == 1: 141 # got connection request 142 self.connect_to_target_device(target_device) 143 return True 144 status_text = status_text + 'Found device, but waiting for connection_request' 145 print(status_text, end='') 146 time.sleep(time_to_sleep) 147 return False
Connects to a target device and waits for an incoming connection request. @param device_name: target device name @param attempts: how many attempts should be used to wait for the connection request @param time_to_sleep: time in seconds how long should be waited between each attempt
149 def connect_to_target_device(self, device: P2PPeer): 150 """ 151 Connects to a target device. 152 @param device: target device as P2PPeer object 153 """ 154 self.execute_wpa_cli_command(self.P2P_CONNECT.format(target_device_mac_addr=device.mac_address))
Connects to a target device. @param device: target device as P2PPeer object
156 def check_already_connected(self, device_name: str) -> bool: 157 """ 158 Checks whether a target device is already connected to this device. 159 @param device_name: device name of target device as str 160 @return: True if device is already connected, else False 161 """ 162 for target_device in self.get_available_devices(): 163 if target_device.device_name == device_name: 164 stdout = self.execute_wpa_cli_command( 165 self.WPA_CLI_P2P_PEER_DETAILS.format(mac_address=target_device.mac_address)) 166 return re.search(r'(?<=interface_addr=).*', stdout).group() != '00:00:00:00:00:00' 167 return False
Checks whether a target device is already connected to this device. @param device_name: device name of target device as str @return: True if device is already connected, else False