Module lora_multihop.ipc
Expand source code
import logging
import socket
import threading
from lora_multihop import protocol, variables, module_config
class IPC:
def __init__(self, ipc_port, message_port, module_address=None):
self.listen_for_data = True
if module_address is None:
variables.MY_ADDRESS = module_config.get_current_address()
logging.info('loaded address of module: {}'.format(variables.MY_ADDRESS))
else:
variables.MY_ADDRESS = module_address
self.protocol = protocol.Protocol()
self.protocol.start_protocol_thread()
self.connection = None
self.ipc_port = ipc_port
self.message_port = message_port
self.tcp_server_active = False
self.message_transfer_thread = None
self.ipc_tcp_server_thread = None
def start_tcp_server_for_message_transfer(self):
"""
starts a loop for sending and receiving data; received messages from tcp socket are sent over LoRa network;
received message from LoRa network are sent to TCP client which is connected to socket of ipc.py
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", self.message_port))
s.listen(1)
conn, addr = s.accept()
print('client for message transfer connected')
conn.setblocking(False)
while self.listen_for_data:
try:
data = conn.recv(220)
logging.debug(f'data: {data}')
if len(data) > 0:
self.protocol.send_message(data)
if not data:
conn.close()
print('closed message socket')
break
except socket.error:
pass
if not self.protocol.received_messages_queue.empty():
message = self.protocol.received_messages_queue.get()
logging.debug(f'send message via rpc to java side: {message}')
conn.send(message)
def start_tcp_server(self):
"""
start loop which processes received commands to control routing protocol;
requests (e.g. connect request) received from LoRa network are forwarded to connected application
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", self.ipc_port))
s.listen(1)
try:
while self.listen_for_data:
try:
s.settimeout(2)
conn, addr = s.accept()
self.connection = conn
logging.debug(f'connected to java ipc with address {addr}')
while self.tcp_server_active:
conn.settimeout(1)
try:
data = conn.recv(1024)
print(f'data: {data}')
if not data:
conn.close()
print('closed')
break
received_data_as_list = data.decode().split(variables.HEADER_DELIMITER)
for message in received_data_as_list:
if message == 'registeredPeers?':
registered_peers = self.protocol.routing_table.get_peers()
message = 'RegisteredPeers'
for i in range(0, len(registered_peers)):
message = message + ',' + registered_peers[i]['peer_id']
logging.debug(f'send registered peer to java: {registered_peers}')
conn.send((message + '|').encode(variables.ENCODING))
elif len(message) != 0:
logging.debug(f'request from java side: {message}')
message_values = message.split(variables.JAVA_IPC_MESSAGE_VALUES_DELIMITER)
message_type = message_values[0]
if message_type == 'Registration':
subscribe_str = message_values[2]
if subscribe_str.lower() == 'true':
subscribe = True
else:
subscribe = False
self.protocol.send_registration_message(subscribe, message_values[1])
elif message_type == 'ConnectRequest':
self.protocol.send_connect_request_header(message_values[1], message_values[2],
message_values[3])
elif message_type == 'DisconnectRequest':
self.protocol.send_disconnect_request_header(message_values[1],
message_values[2])
except socket.timeout:
while not self.protocol.sending_queue.empty():
payload = self.protocol.sending_queue.get()
logging.debug(f'sending: {payload}')
conn.send((payload + '|').encode(variables.ENCODING))
except BrokenPipeError:
logging.debug('connection reset by client')
break
except socket.timeout:
pass
except ConnectionResetError:
logging.debug('Connection reset by client. Wait for new connection')
finally:
logging.debug('java ipc: tcp server socket closed')
s.close()
def start_ipc(self):
"""
starts a thread to exchange messages between connected application and LoRa network; starts a second thread
which receives control commands for routing protocol and forwards requests to connected application
"""
self.message_transfer_thread = threading.Thread(target=self.start_tcp_server_for_message_transfer)
self.message_transfer_thread.start()
self.ipc_tcp_server_thread = threading.Thread(target=self.start_tcp_server)
self.tcp_server_active = True
self.ipc_tcp_server_thread.start()
def stop_ipc_instance(self):
"""
stops threads for message exchange and command processing
"""
self.listen_for_data = False
self.tcp_server_active = False
self.protocol.stop()
def create_connect_request_message(source_peer_id, target_peer_id, timeout):
"""
creates a connect request message
:param source_peer_id: peer id of source peer
:param target_peer_id: peer id of target peer
:param timeout: timeout in seconds
:return: connect request message as string (formatted like defined in protocol)
"""
return f'ConnectRequest,{source_peer_id},{target_peer_id},{timeout}'
def create_disconnect_request_message(source_peer_id, target_peer_id):
"""
creates a disconnect request message
:param source_peer_id: peer id of source peer
:param target_peer_id: peer id of target peer
:return: disconnect request message as string (formatted like defined in protocol)
"""
return f'DisconnectRequest,{source_peer_id},{target_peer_id}'
Functions
def create_connect_request_message(source_peer_id, target_peer_id, timeout)
-
creates a connect request message :param source_peer_id: peer id of source peer :param target_peer_id: peer id of target peer :param timeout: timeout in seconds :return: connect request message as string (formatted like defined in protocol)
Expand source code
def create_connect_request_message(source_peer_id, target_peer_id, timeout): """ creates a connect request message :param source_peer_id: peer id of source peer :param target_peer_id: peer id of target peer :param timeout: timeout in seconds :return: connect request message as string (formatted like defined in protocol) """ return f'ConnectRequest,{source_peer_id},{target_peer_id},{timeout}'
def create_disconnect_request_message(source_peer_id, target_peer_id)
-
creates a disconnect request message :param source_peer_id: peer id of source peer :param target_peer_id: peer id of target peer :return: disconnect request message as string (formatted like defined in protocol)
Expand source code
def create_disconnect_request_message(source_peer_id, target_peer_id): """ creates a disconnect request message :param source_peer_id: peer id of source peer :param target_peer_id: peer id of target peer :return: disconnect request message as string (formatted like defined in protocol) """ return f'DisconnectRequest,{source_peer_id},{target_peer_id}'
Classes
class IPC (ipc_port, message_port, module_address=None)
-
Expand source code
class IPC: def __init__(self, ipc_port, message_port, module_address=None): self.listen_for_data = True if module_address is None: variables.MY_ADDRESS = module_config.get_current_address() logging.info('loaded address of module: {}'.format(variables.MY_ADDRESS)) else: variables.MY_ADDRESS = module_address self.protocol = protocol.Protocol() self.protocol.start_protocol_thread() self.connection = None self.ipc_port = ipc_port self.message_port = message_port self.tcp_server_active = False self.message_transfer_thread = None self.ipc_tcp_server_thread = None def start_tcp_server_for_message_transfer(self): """ starts a loop for sending and receiving data; received messages from tcp socket are sent over LoRa network; received message from LoRa network are sent to TCP client which is connected to socket of ipc.py """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("", self.message_port)) s.listen(1) conn, addr = s.accept() print('client for message transfer connected') conn.setblocking(False) while self.listen_for_data: try: data = conn.recv(220) logging.debug(f'data: {data}') if len(data) > 0: self.protocol.send_message(data) if not data: conn.close() print('closed message socket') break except socket.error: pass if not self.protocol.received_messages_queue.empty(): message = self.protocol.received_messages_queue.get() logging.debug(f'send message via rpc to java side: {message}') conn.send(message) def start_tcp_server(self): """ start loop which processes received commands to control routing protocol; requests (e.g. connect request) received from LoRa network are forwarded to connected application """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("", self.ipc_port)) s.listen(1) try: while self.listen_for_data: try: s.settimeout(2) conn, addr = s.accept() self.connection = conn logging.debug(f'connected to java ipc with address {addr}') while self.tcp_server_active: conn.settimeout(1) try: data = conn.recv(1024) print(f'data: {data}') if not data: conn.close() print('closed') break received_data_as_list = data.decode().split(variables.HEADER_DELIMITER) for message in received_data_as_list: if message == 'registeredPeers?': registered_peers = self.protocol.routing_table.get_peers() message = 'RegisteredPeers' for i in range(0, len(registered_peers)): message = message + ',' + registered_peers[i]['peer_id'] logging.debug(f'send registered peer to java: {registered_peers}') conn.send((message + '|').encode(variables.ENCODING)) elif len(message) != 0: logging.debug(f'request from java side: {message}') message_values = message.split(variables.JAVA_IPC_MESSAGE_VALUES_DELIMITER) message_type = message_values[0] if message_type == 'Registration': subscribe_str = message_values[2] if subscribe_str.lower() == 'true': subscribe = True else: subscribe = False self.protocol.send_registration_message(subscribe, message_values[1]) elif message_type == 'ConnectRequest': self.protocol.send_connect_request_header(message_values[1], message_values[2], message_values[3]) elif message_type == 'DisconnectRequest': self.protocol.send_disconnect_request_header(message_values[1], message_values[2]) except socket.timeout: while not self.protocol.sending_queue.empty(): payload = self.protocol.sending_queue.get() logging.debug(f'sending: {payload}') conn.send((payload + '|').encode(variables.ENCODING)) except BrokenPipeError: logging.debug('connection reset by client') break except socket.timeout: pass except ConnectionResetError: logging.debug('Connection reset by client. Wait for new connection') finally: logging.debug('java ipc: tcp server socket closed') s.close() def start_ipc(self): """ starts a thread to exchange messages between connected application and LoRa network; starts a second thread which receives control commands for routing protocol and forwards requests to connected application """ self.message_transfer_thread = threading.Thread(target=self.start_tcp_server_for_message_transfer) self.message_transfer_thread.start() self.ipc_tcp_server_thread = threading.Thread(target=self.start_tcp_server) self.tcp_server_active = True self.ipc_tcp_server_thread.start() def stop_ipc_instance(self): """ stops threads for message exchange and command processing """ self.listen_for_data = False self.tcp_server_active = False self.protocol.stop()
Methods
def start_ipc(self)
-
starts a thread to exchange messages between connected application and LoRa network; starts a second thread which receives control commands for routing protocol and forwards requests to connected application
Expand source code
def start_ipc(self): """ starts a thread to exchange messages between connected application and LoRa network; starts a second thread which receives control commands for routing protocol and forwards requests to connected application """ self.message_transfer_thread = threading.Thread(target=self.start_tcp_server_for_message_transfer) self.message_transfer_thread.start() self.ipc_tcp_server_thread = threading.Thread(target=self.start_tcp_server) self.tcp_server_active = True self.ipc_tcp_server_thread.start()
def start_tcp_server(self)
-
start loop which processes received commands to control routing protocol; requests (e.g. connect request) received from LoRa network are forwarded to connected application
Expand source code
def start_tcp_server(self): """ start loop which processes received commands to control routing protocol; requests (e.g. connect request) received from LoRa network are forwarded to connected application """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("", self.ipc_port)) s.listen(1) try: while self.listen_for_data: try: s.settimeout(2) conn, addr = s.accept() self.connection = conn logging.debug(f'connected to java ipc with address {addr}') while self.tcp_server_active: conn.settimeout(1) try: data = conn.recv(1024) print(f'data: {data}') if not data: conn.close() print('closed') break received_data_as_list = data.decode().split(variables.HEADER_DELIMITER) for message in received_data_as_list: if message == 'registeredPeers?': registered_peers = self.protocol.routing_table.get_peers() message = 'RegisteredPeers' for i in range(0, len(registered_peers)): message = message + ',' + registered_peers[i]['peer_id'] logging.debug(f'send registered peer to java: {registered_peers}') conn.send((message + '|').encode(variables.ENCODING)) elif len(message) != 0: logging.debug(f'request from java side: {message}') message_values = message.split(variables.JAVA_IPC_MESSAGE_VALUES_DELIMITER) message_type = message_values[0] if message_type == 'Registration': subscribe_str = message_values[2] if subscribe_str.lower() == 'true': subscribe = True else: subscribe = False self.protocol.send_registration_message(subscribe, message_values[1]) elif message_type == 'ConnectRequest': self.protocol.send_connect_request_header(message_values[1], message_values[2], message_values[3]) elif message_type == 'DisconnectRequest': self.protocol.send_disconnect_request_header(message_values[1], message_values[2]) except socket.timeout: while not self.protocol.sending_queue.empty(): payload = self.protocol.sending_queue.get() logging.debug(f'sending: {payload}') conn.send((payload + '|').encode(variables.ENCODING)) except BrokenPipeError: logging.debug('connection reset by client') break except socket.timeout: pass except ConnectionResetError: logging.debug('Connection reset by client. Wait for new connection') finally: logging.debug('java ipc: tcp server socket closed') s.close()
def start_tcp_server_for_message_transfer(self)
-
starts a loop for sending and receiving data; received messages from tcp socket are sent over LoRa network; received message from LoRa network are sent to TCP client which is connected to socket of ipc.py
Expand source code
def start_tcp_server_for_message_transfer(self): """ starts a loop for sending and receiving data; received messages from tcp socket are sent over LoRa network; received message from LoRa network are sent to TCP client which is connected to socket of ipc.py """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("", self.message_port)) s.listen(1) conn, addr = s.accept() print('client for message transfer connected') conn.setblocking(False) while self.listen_for_data: try: data = conn.recv(220) logging.debug(f'data: {data}') if len(data) > 0: self.protocol.send_message(data) if not data: conn.close() print('closed message socket') break except socket.error: pass if not self.protocol.received_messages_queue.empty(): message = self.protocol.received_messages_queue.get() logging.debug(f'send message via rpc to java side: {message}') conn.send(message)
def stop_ipc_instance(self)
-
stops threads for message exchange and command processing
Expand source code
def stop_ipc_instance(self): """ stops threads for message exchange and command processing """ self.listen_for_data = False self.tcp_server_active = False self.protocol.stop()