p2p_connector.wifi_direct_connector

This module provides functions to be able to connect to a target p2p device using the application 'wpa_cli'

  1"""
  2This module provides functions to be able to connect to a target p2p device using the application 'wpa_cli'
  3"""
  4import argparse
  5import re
  6import subprocess
  7import sys
  8import time
  9from typing import Optional
 10
 11
 12class P2PPeer:
 13    """
 14    This class represents a p2p peer.
 15    """
 16
 17    def __init__(self, device_name: str, mac_address: str, status: int):
 18        self.device_name = device_name
 19        self.mac_address = mac_address
 20        self.status = status
 21
 22    def __str__(self):
 23        return f"{self.device_name} - {self.mac_address}"
 24
 25    def __repr__(self):
 26        return self.__str__()
 27
 28
 29class WifiDirectConnector:
 30    """
 31    This class provides methods for establishing a WiFi-Direct connection with an Android device.
 32    """
 33    WPA_CLI_LOCATION = "/usr/sbin/wpa_cli"
 34    WPA_CLI_LISTEN = "p2p_listen"
 35    WPA_CLI_P2P_PEERS = "p2p_peers"
 36    WPA_CLI_P2P_PEER_DETAILS = "p2p_peer {mac_address}"
 37    WPA_CLI_HELP = "-h"
 38    P2P_FIND_COMMAND = f"{WPA_CLI_LOCATION} -i p2p-dev-wlan0 p2p_find"
 39    P2P_GROUP_REMOVE_COMMAND = "-ip2p-dev-wlan0 p2p_group_remove {interface_name}"
 40    GET_WIFI_INTERFACE = "ip -br link | grep -Po 'p2p-wlan0-\\d+'"
 41    P2P_CONNECT = "p2p_connect {target_device_mac_addr} pbc"
 42
 43    def __init__(self):
 44        self.check_wpa_cli_available()
 45
 46    def execute_wpa_cli_command(self, command: str, ignore_status_code: Optional[int] = None) -> str:
 47        """
 48        Executes wpa_cli command
 49        @param command: argument(s) as str
 50        @param ignore_status_code: optional argument - if set no error will be raised if status code is not equal to
 51        zero but is equal to 'ignore_status_code'
 52        """
 53        command = f"{self.WPA_CLI_LOCATION} {command}"
 54        result = subprocess.run(command, shell=True, capture_output=True)
 55        if result.returncode != 0 and (ignore_status_code is None or result.returncode != ignore_status_code):
 56            raise ValueError(f'Error occurred while executing command "{command}": {result.stderr}')
 57        return result.stdout.decode()
 58
 59    def check_wpa_cli_available(self):
 60        """
 61        check whether the application 'wpa_cli`is installed
 62        @raise ValueError: raises if not installed or status-code is not equal to 0
 63        """
 64        self.execute_wpa_cli_command(self.WPA_CLI_HELP)
 65
 66    def remove_p2p_group(self):
 67        """
 68        removes all p2p groups
 69        """
 70        result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True)
 71        if len(result.stdout) > 0:
 72            self.execute_wpa_cli_command(self.P2P_GROUP_REMOVE_COMMAND.format(interface_name=result.stdout.decode()))
 73            print('removing p2p group...')
 74            time.sleep(10)
 75
 76    def remove_orphan_p2p_groups(self):
 77        """
 78        removes orphan p2p groups
 79        """
 80        result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True)
 81        interface_name = result.stdout.decode().strip()
 82        if len(interface_name) > 0:
 83            devices = self.get_available_devices(interface_name=interface_name)
 84            if len(devices) == 0:
 85                print(f'group with interface "{interface_name}" has no members.')
 86                self.remove_p2p_group()
 87            else:
 88                all_device_inactive = True
 89                for device in devices:
 90                    if self.check_already_connected(device.device_name):
 91                        all_device_inactive = False
 92                if all_device_inactive:
 93                    print(f'group with interface "{interface_name}" has only inactive members: {devices}.')
 94                    self.remove_p2p_group()
 95
 96    def make_device_visible(self):
 97        """
 98        executes 'wpa_cli listen' to make device visible
 99        """
100        self.execute_wpa_cli_command(self.WPA_CLI_LISTEN)
101
102    def get_available_devices(self, interface_name: str = None) -> list[P2PPeer]:
103        """
104        Gets all available target devices.
105        @return: list containing all target devices
106        """
107        interface_command = f'-i {interface_name} '
108        if interface_name is None:
109            interface_command = ''
110        mac_addresses: list[str] = []
111        stdout = self.execute_wpa_cli_command(f'{interface_command}{self.WPA_CLI_P2P_PEERS}', ignore_status_code=255)
112        for line in stdout.splitlines():
113            if re.search(r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$", line):
114                mac_addresses.append(line)
115
116        available_peers: list[P2PPeer] = []
117        for mac_address in mac_addresses:
118            result = subprocess.run(f"/usr/sbin/wpa_cli p2p_peer {mac_address}", shell=True, capture_output=True,
119                                    check=True)
120            stdout = result.stdout.decode()
121            device_name = re.search(r'(?<=device_name=).*', stdout).group()
122            device_status = re.search(r'(?<=status=).*', stdout).group()
123            available_peers.append(P2PPeer(device_name, mac_address, int(device_status)))
124        return available_peers
125
126    def connect_to_device_if_available(self, device_name, attempts: int = 30, time_to_sleep: int = 2):
127        """
128        Connects to a target device and waits for an incoming connection request.
129        @param device_name: target device name
130        @param attempts: how many attempts should be used to wait for the connection request
131        @param time_to_sleep: time in seconds how long should be waited between each attempt
132        """
133        for _ in range(attempts):
134            status_text = f'\rwaiting for connection request from {device_name}. '
135            target_device: P2PPeer
136            for target_device in self.get_available_devices():
137                if target_device.device_name == device_name:
138                    # found device
139                    if target_device.status == 1:
140                        # got connection request
141                        self.connect_to_target_device(target_device)
142                        return True
143                    status_text = status_text + 'Found device, but waiting for connection_request'
144            print(status_text, end='')
145            time.sleep(time_to_sleep)
146        return False
147
148    def connect_to_target_device(self, device: P2PPeer):
149        """
150        Connects to a target device.
151        @param device: target device as P2PPeer object
152        """
153        self.execute_wpa_cli_command(self.P2P_CONNECT.format(target_device_mac_addr=device.mac_address))
154
155    def check_already_connected(self, device_name: str) -> bool:
156        """
157        Checks whether a target device is already connected to this device.
158        @param device_name: device name of target device as str
159        @return: True if device is already connected, else False
160        """
161        for target_device in self.get_available_devices():
162            if target_device.device_name == device_name:
163                stdout = self.execute_wpa_cli_command(
164                    self.WPA_CLI_P2P_PEER_DETAILS.format(mac_address=target_device.mac_address))
165                return re.search(r'(?<=interface_addr=).*', stdout).group() != '00:00:00:00:00:00'
166        return False
167
168
169if __name__ == "__main__":
170    parser = argparse.ArgumentParser(description='Provides an Interface to set up a P2P Server.')
171    group = parser.add_mutually_exclusive_group(required=True)
172    group.add_argument('--target', help="Set name of target device")
173    group.add_argument('--remove-group', action='store_true', help="remove p2p group of this device")
174    args = parser.parse_args()
175
176    wifi_direct_conn = WifiDirectConnector()
177    wifi_direct_conn.remove_orphan_p2p_groups()
178
179    target_device_name = args.target
180    if args.remove_group:
181        print('removing p2p group')
182        wifi_direct_conn.remove_p2p_group()
183    else:
184        wifi_direct_conn.remove_orphan_p2p_groups()
185        if wifi_direct_conn.check_already_connected(target_device_name):
186            print(f'device "{target_device_name}" is already connected')
187            sys.exit(0)
188
189        wifi_direct_conn.make_device_visible()
190        wifi_direct_conn.connect_to_device_if_available(target_device_name)
class P2PPeer:
13class P2PPeer:
14    """
15    This class represents a p2p peer.
16    """
17
18    def __init__(self, device_name: str, mac_address: str, status: int):
19        self.device_name = device_name
20        self.mac_address = mac_address
21        self.status = status
22
23    def __str__(self):
24        return f"{self.device_name} - {self.mac_address}"
25
26    def __repr__(self):
27        return self.__str__()

This class represents a p2p peer.

P2PPeer(device_name: str, mac_address: str, status: int)
18    def __init__(self, device_name: str, mac_address: str, status: int):
19        self.device_name = device_name
20        self.mac_address = mac_address
21        self.status = status
class WifiDirectConnector:
 30class WifiDirectConnector:
 31    """
 32    This class provides methods for establishing a WiFi-Direct connection with an Android device.
 33    """
 34    WPA_CLI_LOCATION = "/usr/sbin/wpa_cli"
 35    WPA_CLI_LISTEN = "p2p_listen"
 36    WPA_CLI_P2P_PEERS = "p2p_peers"
 37    WPA_CLI_P2P_PEER_DETAILS = "p2p_peer {mac_address}"
 38    WPA_CLI_HELP = "-h"
 39    P2P_FIND_COMMAND = f"{WPA_CLI_LOCATION} -i p2p-dev-wlan0 p2p_find"
 40    P2P_GROUP_REMOVE_COMMAND = "-ip2p-dev-wlan0 p2p_group_remove {interface_name}"
 41    GET_WIFI_INTERFACE = "ip -br link | grep -Po 'p2p-wlan0-\\d+'"
 42    P2P_CONNECT = "p2p_connect {target_device_mac_addr} pbc"
 43
 44    def __init__(self):
 45        self.check_wpa_cli_available()
 46
 47    def execute_wpa_cli_command(self, command: str, ignore_status_code: Optional[int] = None) -> str:
 48        """
 49        Executes wpa_cli command
 50        @param command: argument(s) as str
 51        @param ignore_status_code: optional argument - if set no error will be raised if status code is not equal to
 52        zero but is equal to 'ignore_status_code'
 53        """
 54        command = f"{self.WPA_CLI_LOCATION} {command}"
 55        result = subprocess.run(command, shell=True, capture_output=True)
 56        if result.returncode != 0 and (ignore_status_code is None or result.returncode != ignore_status_code):
 57            raise ValueError(f'Error occurred while executing command "{command}": {result.stderr}')
 58        return result.stdout.decode()
 59
 60    def check_wpa_cli_available(self):
 61        """
 62        check whether the application 'wpa_cli`is installed
 63        @raise ValueError: raises if not installed or status-code is not equal to 0
 64        """
 65        self.execute_wpa_cli_command(self.WPA_CLI_HELP)
 66
 67    def remove_p2p_group(self):
 68        """
 69        removes all p2p groups
 70        """
 71        result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True)
 72        if len(result.stdout) > 0:
 73            self.execute_wpa_cli_command(self.P2P_GROUP_REMOVE_COMMAND.format(interface_name=result.stdout.decode()))
 74            print('removing p2p group...')
 75            time.sleep(10)
 76
 77    def remove_orphan_p2p_groups(self):
 78        """
 79        removes orphan p2p groups
 80        """
 81        result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True)
 82        interface_name = result.stdout.decode().strip()
 83        if len(interface_name) > 0:
 84            devices = self.get_available_devices(interface_name=interface_name)
 85            if len(devices) == 0:
 86                print(f'group with interface "{interface_name}" has no members.')
 87                self.remove_p2p_group()
 88            else:
 89                all_device_inactive = True
 90                for device in devices:
 91                    if self.check_already_connected(device.device_name):
 92                        all_device_inactive = False
 93                if all_device_inactive:
 94                    print(f'group with interface "{interface_name}" has only inactive members: {devices}.')
 95                    self.remove_p2p_group()
 96
 97    def make_device_visible(self):
 98        """
 99        executes 'wpa_cli listen' to make device visible
100        """
101        self.execute_wpa_cli_command(self.WPA_CLI_LISTEN)
102
103    def get_available_devices(self, interface_name: str = None) -> list[P2PPeer]:
104        """
105        Gets all available target devices.
106        @return: list containing all target devices
107        """
108        interface_command = f'-i {interface_name} '
109        if interface_name is None:
110            interface_command = ''
111        mac_addresses: list[str] = []
112        stdout = self.execute_wpa_cli_command(f'{interface_command}{self.WPA_CLI_P2P_PEERS}', ignore_status_code=255)
113        for line in stdout.splitlines():
114            if re.search(r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$", line):
115                mac_addresses.append(line)
116
117        available_peers: list[P2PPeer] = []
118        for mac_address in mac_addresses:
119            result = subprocess.run(f"/usr/sbin/wpa_cli p2p_peer {mac_address}", shell=True, capture_output=True,
120                                    check=True)
121            stdout = result.stdout.decode()
122            device_name = re.search(r'(?<=device_name=).*', stdout).group()
123            device_status = re.search(r'(?<=status=).*', stdout).group()
124            available_peers.append(P2PPeer(device_name, mac_address, int(device_status)))
125        return available_peers
126
127    def connect_to_device_if_available(self, device_name, attempts: int = 30, time_to_sleep: int = 2):
128        """
129        Connects to a target device and waits for an incoming connection request.
130        @param device_name: target device name
131        @param attempts: how many attempts should be used to wait for the connection request
132        @param time_to_sleep: time in seconds how long should be waited between each attempt
133        """
134        for _ in range(attempts):
135            status_text = f'\rwaiting for connection request from {device_name}. '
136            target_device: P2PPeer
137            for target_device in self.get_available_devices():
138                if target_device.device_name == device_name:
139                    # found device
140                    if target_device.status == 1:
141                        # got connection request
142                        self.connect_to_target_device(target_device)
143                        return True
144                    status_text = status_text + 'Found device, but waiting for connection_request'
145            print(status_text, end='')
146            time.sleep(time_to_sleep)
147        return False
148
149    def connect_to_target_device(self, device: P2PPeer):
150        """
151        Connects to a target device.
152        @param device: target device as P2PPeer object
153        """
154        self.execute_wpa_cli_command(self.P2P_CONNECT.format(target_device_mac_addr=device.mac_address))
155
156    def check_already_connected(self, device_name: str) -> bool:
157        """
158        Checks whether a target device is already connected to this device.
159        @param device_name: device name of target device as str
160        @return: True if device is already connected, else False
161        """
162        for target_device in self.get_available_devices():
163            if target_device.device_name == device_name:
164                stdout = self.execute_wpa_cli_command(
165                    self.WPA_CLI_P2P_PEER_DETAILS.format(mac_address=target_device.mac_address))
166                return re.search(r'(?<=interface_addr=).*', stdout).group() != '00:00:00:00:00:00'
167        return False

This class provides methods for establishing a WiFi-Direct connection with an Android device.

WifiDirectConnector()
44    def __init__(self):
45        self.check_wpa_cli_available()
def execute_wpa_cli_command(self, command: str, ignore_status_code: Optional[int] = None) -> str:
47    def execute_wpa_cli_command(self, command: str, ignore_status_code: Optional[int] = None) -> str:
48        """
49        Executes wpa_cli command
50        @param command: argument(s) as str
51        @param ignore_status_code: optional argument - if set no error will be raised if status code is not equal to
52        zero but is equal to 'ignore_status_code'
53        """
54        command = f"{self.WPA_CLI_LOCATION} {command}"
55        result = subprocess.run(command, shell=True, capture_output=True)
56        if result.returncode != 0 and (ignore_status_code is None or result.returncode != ignore_status_code):
57            raise ValueError(f'Error occurred while executing command "{command}": {result.stderr}')
58        return result.stdout.decode()

Executes wpa_cli command @param command: argument(s) as str @param ignore_status_code: optional argument - if set no error will be raised if status code is not equal to zero but is equal to 'ignore_status_code'

def check_wpa_cli_available(self):
60    def check_wpa_cli_available(self):
61        """
62        check whether the application 'wpa_cli`is installed
63        @raise ValueError: raises if not installed or status-code is not equal to 0
64        """
65        self.execute_wpa_cli_command(self.WPA_CLI_HELP)

check whether the application 'wpa_cli`is installed @raise ValueError: raises if not installed or status-code is not equal to 0

def remove_p2p_group(self):
67    def remove_p2p_group(self):
68        """
69        removes all p2p groups
70        """
71        result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True)
72        if len(result.stdout) > 0:
73            self.execute_wpa_cli_command(self.P2P_GROUP_REMOVE_COMMAND.format(interface_name=result.stdout.decode()))
74            print('removing p2p group...')
75            time.sleep(10)

removes all p2p groups

def remove_orphan_p2p_groups(self):
77    def remove_orphan_p2p_groups(self):
78        """
79        removes orphan p2p groups
80        """
81        result = subprocess.run(self.GET_WIFI_INTERFACE, shell=True, capture_output=True)
82        interface_name = result.stdout.decode().strip()
83        if len(interface_name) > 0:
84            devices = self.get_available_devices(interface_name=interface_name)
85            if len(devices) == 0:
86                print(f'group with interface "{interface_name}" has no members.')
87                self.remove_p2p_group()
88            else:
89                all_device_inactive = True
90                for device in devices:
91                    if self.check_already_connected(device.device_name):
92                        all_device_inactive = False
93                if all_device_inactive:
94                    print(f'group with interface "{interface_name}" has only inactive members: {devices}.')
95                    self.remove_p2p_group()

removes orphan p2p groups

def make_device_visible(self):
 97    def make_device_visible(self):
 98        """
 99        executes 'wpa_cli listen' to make device visible
100        """
101        self.execute_wpa_cli_command(self.WPA_CLI_LISTEN)

executes 'wpa_cli listen' to make device visible

def get_available_devices( self, interface_name: str = None) -> list[p2p_connector.wifi_direct_connector.P2PPeer]:
103    def get_available_devices(self, interface_name: str = None) -> list[P2PPeer]:
104        """
105        Gets all available target devices.
106        @return: list containing all target devices
107        """
108        interface_command = f'-i {interface_name} '
109        if interface_name is None:
110            interface_command = ''
111        mac_addresses: list[str] = []
112        stdout = self.execute_wpa_cli_command(f'{interface_command}{self.WPA_CLI_P2P_PEERS}', ignore_status_code=255)
113        for line in stdout.splitlines():
114            if re.search(r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$", line):
115                mac_addresses.append(line)
116
117        available_peers: list[P2PPeer] = []
118        for mac_address in mac_addresses:
119            result = subprocess.run(f"/usr/sbin/wpa_cli p2p_peer {mac_address}", shell=True, capture_output=True,
120                                    check=True)
121            stdout = result.stdout.decode()
122            device_name = re.search(r'(?<=device_name=).*', stdout).group()
123            device_status = re.search(r'(?<=status=).*', stdout).group()
124            available_peers.append(P2PPeer(device_name, mac_address, int(device_status)))
125        return available_peers

Gets all available target devices. @return: list containing all target devices

def connect_to_device_if_available(self, device_name, attempts: int = 30, time_to_sleep: int = 2):
127    def connect_to_device_if_available(self, device_name, attempts: int = 30, time_to_sleep: int = 2):
128        """
129        Connects to a target device and waits for an incoming connection request.
130        @param device_name: target device name
131        @param attempts: how many attempts should be used to wait for the connection request
132        @param time_to_sleep: time in seconds how long should be waited between each attempt
133        """
134        for _ in range(attempts):
135            status_text = f'\rwaiting for connection request from {device_name}. '
136            target_device: P2PPeer
137            for target_device in self.get_available_devices():
138                if target_device.device_name == device_name:
139                    # found device
140                    if target_device.status == 1:
141                        # got connection request
142                        self.connect_to_target_device(target_device)
143                        return True
144                    status_text = status_text + 'Found device, but waiting for connection_request'
145            print(status_text, end='')
146            time.sleep(time_to_sleep)
147        return False

Connects to a target device and waits for an incoming connection request. @param device_name: target device name @param attempts: how many attempts should be used to wait for the connection request @param time_to_sleep: time in seconds how long should be waited between each attempt

def connect_to_target_device(self, device: p2p_connector.wifi_direct_connector.P2PPeer):
149    def connect_to_target_device(self, device: P2PPeer):
150        """
151        Connects to a target device.
152        @param device: target device as P2PPeer object
153        """
154        self.execute_wpa_cli_command(self.P2P_CONNECT.format(target_device_mac_addr=device.mac_address))

Connects to a target device. @param device: target device as P2PPeer object

def check_already_connected(self, device_name: str) -> bool:
156    def check_already_connected(self, device_name: str) -> bool:
157        """
158        Checks whether a target device is already connected to this device.
159        @param device_name: device name of target device as str
160        @return: True if device is already connected, else False
161        """
162        for target_device in self.get_available_devices():
163            if target_device.device_name == device_name:
164                stdout = self.execute_wpa_cli_command(
165                    self.WPA_CLI_P2P_PEER_DETAILS.format(mac_address=target_device.mac_address))
166                return re.search(r'(?<=interface_addr=).*', stdout).group() != '00:00:00:00:00:00'
167        return False

Checks whether a target device is already connected to this device. @param device_name: device name of target device as str @return: True if device is already connected, else False