Module lora_multihop.protocol

Expand source code
import base64
import logging
import random
import signal
import threading
import time
import traceback
from queue import Queue
from contextlib import contextmanager

from lora_multihop import ipc, serial_connection, header, variables
from lora_multihop.header import RegistrationHeader, ConnectRequestHeader, DisconnectRequestHeader
from lora_multihop.routing_table import RoutingTable

__author__ = "Marvin Rausch"


class Protocol:
    PROCESS_INCOMING_MESSAGES = True
    VERIFICATION_TIMEOUT = 25
    PAUSE_PROCESSING_INCOMING_MESSAGES = False
    MESSAGES_ACKNOWLEDGMENT = []

    def __init__(self):
        logging.info('created protocol obj: {}'.format(str(self)))
        self.routing_table = RoutingTable()
        self.received_messages_queue = Queue()
        self.sending_messages_queue = Queue()
        self.sending_queue = Queue()

        self.connected_node = None
        self.message_counter = 0
        self.received_own_registration_message = False

    def start_protocol_thread(self):
        """
        starts new thread which processes incoming messages in background
        """
        receiving_thread = threading.Thread(target=self.process_incoming_message)
        receiving_thread.start()

    def send_header(self, header_str):
        """
        sends a string to LoRa network
        @param header_str: message to send
        """
        wait_random_time()
        serial_connection.writing_q.put(('AT+SEND={}'.format(str(len(header_str))), ['AT,OK']))
        if serial_connection.status_q.get(timeout=self.VERIFICATION_TIMEOUT):
            serial_connection.writing_q.put((header_str, ['AT,SENDING', 'AT,SENDED']))
            if serial_connection.status_q.get(timeout=self.VERIFICATION_TIMEOUT):
                logging.debug("sent header '{}'.".format(header_str))
                return
        logging.debug("could not send header '{}', because got invalid status from lora module".format(header_str))

    def process_incoming_message(self):
        """
        get messages from LoRa module, create header object and call appropriate method to process the received message
        """
        while self.PROCESS_INCOMING_MESSAGES:
            if not serial_connection.response_q.empty() and not self.PAUSE_PROCESSING_INCOMING_MESSAGES:
                raw_message = serial_connection.response_q.get()
                logging.debug(f'process: {raw_message}')
                try:
                    header_obj = header.create_header_obj_from_raw_message(raw_message)
                    if header_obj.ttl > 1:
                        self.routing_table.add_neighbor_to_routing_table(header_obj)
                        if header_obj.flag == header.RouteRequestHeader.HEADER_TYPE:
                            self.process_route_request(header_obj)
                        elif header_obj.flag == header.MessageHeader.HEADER_TYPE:
                            self.process_message_header(header_obj)
                        elif header_obj.flag == header.RouteReplyHeader.HEADER_TYPE:
                            self.process_route_reply_header(header_obj)
                        elif header_obj.flag == header.RouteErrorHeader.HEADER_TYPE:
                            self.process_route_error_header(header_obj)
                        elif header_obj.flag == header.MessageAcknowledgeHeader.HEADER_TYPE:
                            self.process_ack_header(header_obj)
                        elif header_obj.flag == header.RegistrationHeader.HEADER_TYPE:
                            self.process_registration_header(header_obj)
                        elif header_obj.flag == header.ConnectRequestHeader.HEADER_TYPE:
                            self.process_connect_request_header(header_obj)
                        elif header_obj.flag == header.DisconnectRequestHeader.HEADER_TYPE:
                            self.process_disconnect_request_header(header_obj)
                except ValueError as e:
                    logging.warning(str(e))
                    traceback.print_exc()
                    try:
                        logging.debug('try to add received signal to unsupported devices list...')
                        addr = header.get_received_from_value(raw_message)
                        self.routing_table.add_neighbor_with_unsupported_protocol(addr)
                    except ValueError as e:
                        logging.warning(str(e))

    def send_message(self, payload):
        """
        send message to currently connected peer
        @param payload: message to send as bytes
        """
        if self.connected_node is not None:
            destination = self.connected_node
            best_route = self.routing_table.get_best_route_for_destination(destination)
            if len(best_route) == 0:
                logging.info('could not find a route to {}. Sending route request...'.format(destination))
                if self.send_route_request_message(destination):
                    best_route = self.routing_table.get_best_route_for_destination(destination)
                else:
                    logging.info('Got no answer on route requested.'.format(destination))
                    return
            self.message_counter += 1
            header_obj = header.MessageHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, destination,
                                              best_route['next_node'], self.message_counter,
                                              base64.b64encode(payload).decode(variables.ENCODING))
            attempt = 0
            self.add_message_to_waiting_acknowledgement_list(header_obj)
            message_confirmed = False
            while attempt < 3 and not message_confirmed:
                logging.debug(f'attempt: {attempt}')
                self.send_header(header_obj.get_header_str())
                attempt_count_received_ack = 0
                while attempt_count_received_ack < 10:
                    if header_obj.message_id not in self.MESSAGES_ACKNOWLEDGMENT:
                        message_confirmed = True
                        break
                    else:
                        time.sleep(0.5)
                        attempt_count_received_ack += 1
                if message_confirmed:
                    break
                else:
                    attempt += 1
            if message_confirmed:
                print('*******************message was acknowledged by receiver*******************')
            else:
                logging.debug(
                    f'message was not acknowledged by receiver. Current ack_list: {self.MESSAGES_ACKNOWLEDGMENT}'
                    f'\nSending route error message')
                self.routing_table.delete_all_entries_of_destination(destination)
                self.delete_from_ack_list(header_obj.message_id)
                self.send_header(header.RouteErrorHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL,
                                                         header_obj.destination).get_header_str())

    def send_route_request_message(self, end_node):
        """
        sends route request
        @param end_node: node for which a route is required
        @return: True, if route request was confirmed, else False
        """
        route_request_header_obj = header.RouteRequestHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, 0,
                                                             end_node)
        attempt = 0
        message_confirmed = False
        while attempt < 3 and not message_confirmed:
            logging.debug('attempt: {}'.format(attempt))
            self.send_header(route_request_header_obj.get_header_str())
            check_request_attempt_count = 0
            while check_request_attempt_count < 10:
                if len(self.routing_table.get_best_route_for_destination(end_node)) != 0:
                    logging.debug('new route for {} found'.format(end_node))
                    message_confirmed = True
                    break
                else:
                    time.sleep(0.5)
                    check_request_attempt_count += 1
            attempt += 1
            if message_confirmed:
                return message_confirmed
            else:
                attempt += 1
        return message_confirmed

    def process_route_request(self, header_obj):
        """
        processes received route request header
        @param header_obj: route request header object
        """
        # first of all check whether source of route request is myself (to prevent cycle)
        if header_obj.source != variables.MY_ADDRESS:
            # look whether requested node is myself
            if header_obj.end_node == variables.MY_ADDRESS:
                logging.debug('add new routing table entry before sending route reply')
                self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from,
                                                           header_obj.hops + 1)
                logging.info('sending route reply message...')
                self.send_route_reply(next_node=header_obj.received_from, end_node=header_obj.source)
            else:
                if len(self.routing_table.get_best_route_for_destination(header_obj.source)) == 0:
                    # if there is no entry for source of route request, you can add routing table entry
                    self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from,
                                                               header_obj.hops)

                header_obj.ttl = header_obj.ttl - 1
                header_obj.hops = header_obj.hops + 1
                if not self.routing_table.check_route_request_already_processed(header_obj.end_node):
                    logging.debug('forward route request message')
                    self.routing_table.add_address_to_processed_requests_list(header_obj.end_node)
                    self.send_header(header_obj.get_header_str())
                else:
                    logging.debug('route request was already processed')

    def send_route_reply(self, next_node, end_node):
        """
        sends route reply message
        @param next_node: next receiver of the message, which should forward the message to the destination node
        @param end_node: node which sent the route request
        """
        route_reply_header_obj = header.RouteReplyHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, 0, end_node,
                                                         next_node)
        self.send_header(route_reply_header_obj.get_header_str())

    def process_message_header(self, header_obj):
        """
        processed received message header; if the end node of the message is this node the message will be put
        into the received_messages queue to forward the message via IPC to the Java side; else the message will be
        forwarded to the next_node
        @param header_obj: message header object
        """
        if header_obj.destination == variables.MY_ADDRESS and header_obj.source == self.connected_node:
            ack_header_str = header.MessageAcknowledgeHeader(None, variables.MY_ADDRESS, variables.TTL_START_VALUE,
                                                             header_obj.source, header_obj.message_id).get_header_str()
            if self.routing_table.check_message_already_received(header_obj.source, header_obj.message_id):
                self.send_header(ack_header_str)
            else:
                logging.debug(f'payload: {str(header_obj.payload)}')
                self.received_messages_queue.put(base64.b64decode(header_obj.payload))
                # send acknowledge message
                logging.debug('sending acknowledgement')
                self.send_header(ack_header_str)

        elif header_obj.next_node == variables.MY_ADDRESS and header_obj.destination != variables.MY_ADDRESS:
            best_route = self.routing_table.get_best_route_for_destination(header_obj.destination)
            if len(best_route) == 0:
                logging.info('no routing table entry for {} to forward message found'.format(header_obj.next_node))
            else:
                header_obj.next_node = best_route['next_node']
                logging.info('forwarding message from {source} to {destination} over hop {next_node}'.format(
                    source=header_obj.source, destination=header_obj.destination, next_node=header_obj.next_node))
                header_obj.ttl = header_obj.ttl - 1
                self.send_header(header_obj.get_header_str())
        else:
            logging.debug('ignoring message: {}'.format(str(header_obj)))

    def process_route_reply_header(self, header_obj):
        """
        processes route reply header; if the source address is equal to the own address the message will be rejected;
        if the destination address is equal to the own address a new route will be added to the routing table, else
        the message will be forwarded to the address mentioned in the next_node field
        @param header_obj: route reply header object
        """
        if header_obj.source == variables.MY_ADDRESS:
            return
        if header_obj.end_node == variables.MY_ADDRESS:
            # add entry to routing table
            self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from, header_obj.hops + 1)
        elif header_obj.next_node == variables.MY_ADDRESS:
            if len(self.routing_table.get_best_route_for_destination(header_obj.source)) != 0:
                # forward route reply message
                # add routing table entry
                logging.debug("add routing table entry before forwarding route reply message")
                self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from,
                                                           header_obj.hops + 1)
                # forward message
                header_obj.next_node = self.routing_table.get_best_route_for_destination(header_obj.end_node)[
                    'next_node']
                header_obj.hops = header_obj.hops + 1
                header_obj.ttl = header_obj.ttl - 1
                self.send_header(header_obj.get_header_str())

    def process_route_error_header(self, header_obj):
        """
        processes route error header; node will be deleted from routing table
        @param header_obj: route error header object
        """
        if header_obj.broken_node in self.routing_table.get_list_of_all_available_destinations():
            logging.debug(f'received route error. Remove {header_obj.broken_node} from routing table')
            self.routing_table.delete_all_entries_of_destination(header_obj.broken_node)
        else:
            logging.debug(
                f'broken node is not in available nodes: {self.routing_table.get_list_of_all_available_destinations()}')
        header_obj.ttl -= 1
        self.send_header(header_obj.get_header_str())

    def process_ack_header(self, header_obj):
        """
        processes message acknowledgement header; if the destination address is equal to the own address the header
        object will be added to the message_acknowledgement_list, else the message will be forwarded
        @param header_obj: message acknowledgement header object
        """
        if header_obj.destination == variables.MY_ADDRESS:
            self.delete_from_ack_list(header_obj.message_id)
        header_obj.ttl -= 1
        logging.debug('forward ack message')
        if header_obj.destination != variables.MY_ADDRESS:
            self.send_header(header_obj.get_header_str())
        else:
            logging.debug(f'do not forward ack message, because end node was my address')

    def process_registration_header(self, header_obj):
        """
        processes registration message header
        :param header_obj: object of class RegistrationMessageHeader
        """
        if header_obj.source != variables.MY_ADDRESS:
            header_obj.ttl -= 1
            self.routing_table.add_address_to_processed_registration_messages_list(header_obj.source)
            if header_obj.subscribe:
                logging.debug('registered new peer')
                self.routing_table.add_peer(header_obj.peer_id, header_obj.source)
            else:
                logging.debug('unregistered peer')
                self.routing_table.delete_peer(header_obj.peer_id, header_obj.source)
            logging.debug('forward registration message')
            self.send_header(header_obj.get_header_str())
        else:
            self.received_own_registration_message = True

    def process_connect_request_header(self, header_obj):
        """
        processes connect request header
        :param header_obj: object of class ConnectRequestHeader
        """
        if header_obj.received_from != variables.MY_ADDRESS:
            if header_obj.end_node == variables.MY_ADDRESS:
                self.connected_node = header_obj.source
                # send connect request to java side
                logging.debug("send connect request to java side")
                self.sending_queue.put(
                    ipc.create_connect_request_message(header_obj.source_peer_id, header_obj.target_peer_id,
                                                       header_obj.timeout))
            elif header_obj.next_node == variables.MY_ADDRESS:
                logging.debug('forward connect request header')
                route = self.routing_table.get_best_route_for_destination(header_obj.end_node)
                if len(route) > 0:
                    header_obj.next_node = route['next_node']
                    header_obj.ttl -= 1
                    self.send_header(header_obj.get_header_str())
                else:
                    logging.debug(f'could not forward connect request header, because there is no routing table entry '
                                  f'for destination address {header_obj.end_node}')

    def process_disconnect_request_header(self, header_obj):
        """
        processes disconnect request header
        :param header_obj: object of class DisconnectRequestHeader
        """
        if header_obj.received_from != variables.MY_ADDRESS:
            if header_obj.end_node == variables.MY_ADDRESS:
                self.connected_node = header_obj.source
                # send connect request to java side
                logging.debug("send disconnect request to java side")
                self.sending_queue.put(
                    ipc.create_disconnect_request_message(header_obj.source_peer_id, header_obj.target_peer_id))
            elif header_obj.next_node == variables.MY_ADDRESS:
                logging.debug('forward disconnect request header')
                route = self.routing_table.get_best_route_for_destination(header_obj.end_node)
                if len(route) > 0:
                    header_obj.next_node = route['next_node']
                    header_obj.ttl -= 1
                    self.send_header(header_obj.get_header_str())
                else:
                    logging.debug(f'could not forward connect request header, because there is no routing table entry '
                                  f'for destination address {header_obj.end_node}')

    def send_connect_request_header(self, source_peer_id, target_peer_id, timeout_in_sec):
        """
        sends connect request
        :param source_peer_id: peer id of source peer
        :param target_peer_id: peer id of target peer
        :param timeout_in_sec: timeout in seconds
        """
        # look for address of source peer id and check whether source peer is already registered
        # wait until timeout for ConnectRequestHeader of other HubConnector
        self.check_peers(source_peer_id, target_peer_id)
        if not self.routing_table.check_connect_request_entry_already_exists(source_peer_id, target_peer_id):
            self.routing_table.add_connect_request(source_peer_id, target_peer_id)
            end_node = self.routing_table.get_address_of_peer(target_peer_id)
            route = self.routing_table.get_best_route_for_destination(end_node)
            if len(route) == 0:
                logging.info(
                    'could not find a route to {}. Sending route request...'.format(end_node))
                if self.send_route_request_message(end_node):
                    route = self.routing_table.get_best_route_for_destination(end_node)
                else:
                    logging.info('Got no answer on route requested.'.format(end_node))
                    return
            self.send_header(ConnectRequestHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, end_node,
                                                  route['next_node'], source_peer_id, target_peer_id,
                                                  timeout_in_sec).get_header_str())

    def send_disconnect_request_header(self, source_peer_id, target_peer_id):
        """
        sends disconnect request
        :param source_peer_id: peer id of source peer
        :param target_peer_id: peer id of target peer
        """
        self.check_peers(source_peer_id, target_peer_id)
        end_node = self.routing_table.get_address_of_peer(target_peer_id)
        route = self.routing_table.get_best_route_for_destination(end_node)
        if len(route) == 0:
            logging.info(f'could not find a route to {end_node}. Sending route request...')
            if self.send_route_request_message(end_node):
                route = self.routing_table.get_best_route_for_destination(end_node)
            else:
                logging.info(f'Got no answer on route requested for end node: {end_node}')
                return
        self.send_header(DisconnectRequestHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, end_node,
                                                 route['next_node'], source_peer_id, target_peer_id).get_header_str())

    def check_peers(self, source_peer_id, target_peer_id):
        """
        helper function to check whether peers are already registered; raises ValueError if peer is not registered
        :param source_peer_id: peer id of source peer
        :param target_peer_id: peer id of target peer
        """
        if not self.routing_table.check_peer_is_already_registered(source_peer_id):
            raise ValueError(f"source peer '{source_peer_id}' is not registered")
        elif not self.routing_table.check_peer_is_already_registered(target_peer_id):
            raise ValueError(f"target peer '{target_peer_id}' is not registered")
        elif self.routing_table.get_address_of_peer(source_peer_id) != variables.MY_ADDRESS:
            raise ValueError('source peer is not registered on this node')

    def send_registration_message(self, subscribe, peer_id):
        """
        function to registers/unregister a peer
        :param subscribe: if True peer will be registered on network; else the peer will be unregistered
        :param peer_id: id of peer
        """
        if subscribe:
            self.routing_table.add_peer(peer_id, variables.MY_ADDRESS)
        else:
            self.routing_table.delete_peer(peer_id, variables.MY_ADDRESS)
        attempts = 0
        received_own_request = False
        self.received_own_registration_message = False

        while attempts < 3:
            self.send_header(RegistrationHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, subscribe,
                                                peer_id).get_header_str())
            check_attempt_count = 0
            while check_attempt_count < 5:
                if self.received_own_registration_message:
                    received_own_request = True
                    break
                else:
                    check_attempt_count += 1
                    time.sleep(0.5)
            attempts += 1
            if received_own_request:
                return

    def stop(self):
        """
        function to shutdown background threads:

        thread for reading from serial port
        thread for writing to serial port
        thread for processing received header messages
        """
        self.PROCESS_INCOMING_MESSAGES = False
        serial_connection.WRITING_THREAD_ACTIVE = False
        serial_connection.READING_THREAD_ACTIVE = False

    def add_message_to_waiting_acknowledgement_list(self, message_header_obj):
        """
        adds the message id of a message to the list of pending acknowledgements
        :param message_header_obj: message as object of class MessageHeader
        """
        message_id = message_header_obj.message_id
        logging.debug(f"adding '{message_id}' to ack list")
        self.MESSAGES_ACKNOWLEDGMENT.append(message_id)

    def delete_from_ack_list(self, ack_id):
        """
        remove message id from list of pending acknowledgements
        :param ack_id: message id which should be deleted
        """
        logging.debug(f'remove {ack_id} from ack list')
        try:
            self.MESSAGES_ACKNOWLEDGMENT.remove(int(ack_id))
        except ValueError:
            logging.debug(f'ack is not in list. Current ack list: {self.MESSAGES_ACKNOWLEDGMENT}')


@contextmanager
def timeout(time_in_sec):
    """
    could be used as context manager to run code snippet until a custom timeout; calls raise_timeout function after
    timeout
    :param time_in_sec: specifies timeout in seconds
    """
    # Register a function to raise a TimeoutError on the signal.
    signal.signal(signal.SIGALRM, raise_timeout)
    # Schedule the signal to be sent after ``time``.
    signal.alarm(time_in_sec)
    try:
        yield
    except TimeoutError:
        pass
    finally:
        # Unregister the signal so it won't be triggered
        # if the timeout is not reached.
        signal.signal(signal.SIGALRM, signal.SIG_IGN)


def raise_timeout(signum, frame):
    """
    raises TimeoutError; is called by timeout context manager; to change timeout behavior edit code in this function
    """
    raise TimeoutError


def wait_random_time():
    """
    sleep for a random time; timespan is between 0 and variables.MAX_SLEEP_TIME seconds
    """
    sleep_time = random.uniform(0, variables.MAX_SLEEP_TIME)
    logging.debug('waiting {} seconds before sending'.format(sleep_time))
    time.sleep(sleep_time)

Functions

def raise_timeout(signum, frame)

raises TimeoutError; is called by timeout context manager; to change timeout behavior edit code in this function

Expand source code
def raise_timeout(signum, frame):
    """
    raises TimeoutError; is called by timeout context manager; to change timeout behavior edit code in this function
    """
    raise TimeoutError
def timeout(time_in_sec)

could be used as context manager to run code snippet until a custom timeout; calls raise_timeout function after timeout :param time_in_sec: specifies timeout in seconds

Expand source code
@contextmanager
def timeout(time_in_sec):
    """
    could be used as context manager to run code snippet until a custom timeout; calls raise_timeout function after
    timeout
    :param time_in_sec: specifies timeout in seconds
    """
    # Register a function to raise a TimeoutError on the signal.
    signal.signal(signal.SIGALRM, raise_timeout)
    # Schedule the signal to be sent after ``time``.
    signal.alarm(time_in_sec)
    try:
        yield
    except TimeoutError:
        pass
    finally:
        # Unregister the signal so it won't be triggered
        # if the timeout is not reached.
        signal.signal(signal.SIGALRM, signal.SIG_IGN)
def wait_random_time()

sleep for a random time; timespan is between 0 and variables.MAX_SLEEP_TIME seconds

Expand source code
def wait_random_time():
    """
    sleep for a random time; timespan is between 0 and variables.MAX_SLEEP_TIME seconds
    """
    sleep_time = random.uniform(0, variables.MAX_SLEEP_TIME)
    logging.debug('waiting {} seconds before sending'.format(sleep_time))
    time.sleep(sleep_time)

Classes

class Protocol
Expand source code
class Protocol:
    PROCESS_INCOMING_MESSAGES = True
    VERIFICATION_TIMEOUT = 25
    PAUSE_PROCESSING_INCOMING_MESSAGES = False
    MESSAGES_ACKNOWLEDGMENT = []

    def __init__(self):
        logging.info('created protocol obj: {}'.format(str(self)))
        self.routing_table = RoutingTable()
        self.received_messages_queue = Queue()
        self.sending_messages_queue = Queue()
        self.sending_queue = Queue()

        self.connected_node = None
        self.message_counter = 0
        self.received_own_registration_message = False

    def start_protocol_thread(self):
        """
        starts new thread which processes incoming messages in background
        """
        receiving_thread = threading.Thread(target=self.process_incoming_message)
        receiving_thread.start()

    def send_header(self, header_str):
        """
        sends a string to LoRa network
        @param header_str: message to send
        """
        wait_random_time()
        serial_connection.writing_q.put(('AT+SEND={}'.format(str(len(header_str))), ['AT,OK']))
        if serial_connection.status_q.get(timeout=self.VERIFICATION_TIMEOUT):
            serial_connection.writing_q.put((header_str, ['AT,SENDING', 'AT,SENDED']))
            if serial_connection.status_q.get(timeout=self.VERIFICATION_TIMEOUT):
                logging.debug("sent header '{}'.".format(header_str))
                return
        logging.debug("could not send header '{}', because got invalid status from lora module".format(header_str))

    def process_incoming_message(self):
        """
        get messages from LoRa module, create header object and call appropriate method to process the received message
        """
        while self.PROCESS_INCOMING_MESSAGES:
            if not serial_connection.response_q.empty() and not self.PAUSE_PROCESSING_INCOMING_MESSAGES:
                raw_message = serial_connection.response_q.get()
                logging.debug(f'process: {raw_message}')
                try:
                    header_obj = header.create_header_obj_from_raw_message(raw_message)
                    if header_obj.ttl > 1:
                        self.routing_table.add_neighbor_to_routing_table(header_obj)
                        if header_obj.flag == header.RouteRequestHeader.HEADER_TYPE:
                            self.process_route_request(header_obj)
                        elif header_obj.flag == header.MessageHeader.HEADER_TYPE:
                            self.process_message_header(header_obj)
                        elif header_obj.flag == header.RouteReplyHeader.HEADER_TYPE:
                            self.process_route_reply_header(header_obj)
                        elif header_obj.flag == header.RouteErrorHeader.HEADER_TYPE:
                            self.process_route_error_header(header_obj)
                        elif header_obj.flag == header.MessageAcknowledgeHeader.HEADER_TYPE:
                            self.process_ack_header(header_obj)
                        elif header_obj.flag == header.RegistrationHeader.HEADER_TYPE:
                            self.process_registration_header(header_obj)
                        elif header_obj.flag == header.ConnectRequestHeader.HEADER_TYPE:
                            self.process_connect_request_header(header_obj)
                        elif header_obj.flag == header.DisconnectRequestHeader.HEADER_TYPE:
                            self.process_disconnect_request_header(header_obj)
                except ValueError as e:
                    logging.warning(str(e))
                    traceback.print_exc()
                    try:
                        logging.debug('try to add received signal to unsupported devices list...')
                        addr = header.get_received_from_value(raw_message)
                        self.routing_table.add_neighbor_with_unsupported_protocol(addr)
                    except ValueError as e:
                        logging.warning(str(e))

    def send_message(self, payload):
        """
        send message to currently connected peer
        @param payload: message to send as bytes
        """
        if self.connected_node is not None:
            destination = self.connected_node
            best_route = self.routing_table.get_best_route_for_destination(destination)
            if len(best_route) == 0:
                logging.info('could not find a route to {}. Sending route request...'.format(destination))
                if self.send_route_request_message(destination):
                    best_route = self.routing_table.get_best_route_for_destination(destination)
                else:
                    logging.info('Got no answer on route requested.'.format(destination))
                    return
            self.message_counter += 1
            header_obj = header.MessageHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, destination,
                                              best_route['next_node'], self.message_counter,
                                              base64.b64encode(payload).decode(variables.ENCODING))
            attempt = 0
            self.add_message_to_waiting_acknowledgement_list(header_obj)
            message_confirmed = False
            while attempt < 3 and not message_confirmed:
                logging.debug(f'attempt: {attempt}')
                self.send_header(header_obj.get_header_str())
                attempt_count_received_ack = 0
                while attempt_count_received_ack < 10:
                    if header_obj.message_id not in self.MESSAGES_ACKNOWLEDGMENT:
                        message_confirmed = True
                        break
                    else:
                        time.sleep(0.5)
                        attempt_count_received_ack += 1
                if message_confirmed:
                    break
                else:
                    attempt += 1
            if message_confirmed:
                print('*******************message was acknowledged by receiver*******************')
            else:
                logging.debug(
                    f'message was not acknowledged by receiver. Current ack_list: {self.MESSAGES_ACKNOWLEDGMENT}'
                    f'\nSending route error message')
                self.routing_table.delete_all_entries_of_destination(destination)
                self.delete_from_ack_list(header_obj.message_id)
                self.send_header(header.RouteErrorHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL,
                                                         header_obj.destination).get_header_str())

    def send_route_request_message(self, end_node):
        """
        sends route request
        @param end_node: node for which a route is required
        @return: True, if route request was confirmed, else False
        """
        route_request_header_obj = header.RouteRequestHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, 0,
                                                             end_node)
        attempt = 0
        message_confirmed = False
        while attempt < 3 and not message_confirmed:
            logging.debug('attempt: {}'.format(attempt))
            self.send_header(route_request_header_obj.get_header_str())
            check_request_attempt_count = 0
            while check_request_attempt_count < 10:
                if len(self.routing_table.get_best_route_for_destination(end_node)) != 0:
                    logging.debug('new route for {} found'.format(end_node))
                    message_confirmed = True
                    break
                else:
                    time.sleep(0.5)
                    check_request_attempt_count += 1
            attempt += 1
            if message_confirmed:
                return message_confirmed
            else:
                attempt += 1
        return message_confirmed

    def process_route_request(self, header_obj):
        """
        processes received route request header
        @param header_obj: route request header object
        """
        # first of all check whether source of route request is myself (to prevent cycle)
        if header_obj.source != variables.MY_ADDRESS:
            # look whether requested node is myself
            if header_obj.end_node == variables.MY_ADDRESS:
                logging.debug('add new routing table entry before sending route reply')
                self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from,
                                                           header_obj.hops + 1)
                logging.info('sending route reply message...')
                self.send_route_reply(next_node=header_obj.received_from, end_node=header_obj.source)
            else:
                if len(self.routing_table.get_best_route_for_destination(header_obj.source)) == 0:
                    # if there is no entry for source of route request, you can add routing table entry
                    self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from,
                                                               header_obj.hops)

                header_obj.ttl = header_obj.ttl - 1
                header_obj.hops = header_obj.hops + 1
                if not self.routing_table.check_route_request_already_processed(header_obj.end_node):
                    logging.debug('forward route request message')
                    self.routing_table.add_address_to_processed_requests_list(header_obj.end_node)
                    self.send_header(header_obj.get_header_str())
                else:
                    logging.debug('route request was already processed')

    def send_route_reply(self, next_node, end_node):
        """
        sends route reply message
        @param next_node: next receiver of the message, which should forward the message to the destination node
        @param end_node: node which sent the route request
        """
        route_reply_header_obj = header.RouteReplyHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, 0, end_node,
                                                         next_node)
        self.send_header(route_reply_header_obj.get_header_str())

    def process_message_header(self, header_obj):
        """
        processed received message header; if the end node of the message is this node the message will be put
        into the received_messages queue to forward the message via IPC to the Java side; else the message will be
        forwarded to the next_node
        @param header_obj: message header object
        """
        if header_obj.destination == variables.MY_ADDRESS and header_obj.source == self.connected_node:
            ack_header_str = header.MessageAcknowledgeHeader(None, variables.MY_ADDRESS, variables.TTL_START_VALUE,
                                                             header_obj.source, header_obj.message_id).get_header_str()
            if self.routing_table.check_message_already_received(header_obj.source, header_obj.message_id):
                self.send_header(ack_header_str)
            else:
                logging.debug(f'payload: {str(header_obj.payload)}')
                self.received_messages_queue.put(base64.b64decode(header_obj.payload))
                # send acknowledge message
                logging.debug('sending acknowledgement')
                self.send_header(ack_header_str)

        elif header_obj.next_node == variables.MY_ADDRESS and header_obj.destination != variables.MY_ADDRESS:
            best_route = self.routing_table.get_best_route_for_destination(header_obj.destination)
            if len(best_route) == 0:
                logging.info('no routing table entry for {} to forward message found'.format(header_obj.next_node))
            else:
                header_obj.next_node = best_route['next_node']
                logging.info('forwarding message from {source} to {destination} over hop {next_node}'.format(
                    source=header_obj.source, destination=header_obj.destination, next_node=header_obj.next_node))
                header_obj.ttl = header_obj.ttl - 1
                self.send_header(header_obj.get_header_str())
        else:
            logging.debug('ignoring message: {}'.format(str(header_obj)))

    def process_route_reply_header(self, header_obj):
        """
        processes route reply header; if the source address is equal to the own address the message will be rejected;
        if the destination address is equal to the own address a new route will be added to the routing table, else
        the message will be forwarded to the address mentioned in the next_node field
        @param header_obj: route reply header object
        """
        if header_obj.source == variables.MY_ADDRESS:
            return
        if header_obj.end_node == variables.MY_ADDRESS:
            # add entry to routing table
            self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from, header_obj.hops + 1)
        elif header_obj.next_node == variables.MY_ADDRESS:
            if len(self.routing_table.get_best_route_for_destination(header_obj.source)) != 0:
                # forward route reply message
                # add routing table entry
                logging.debug("add routing table entry before forwarding route reply message")
                self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from,
                                                           header_obj.hops + 1)
                # forward message
                header_obj.next_node = self.routing_table.get_best_route_for_destination(header_obj.end_node)[
                    'next_node']
                header_obj.hops = header_obj.hops + 1
                header_obj.ttl = header_obj.ttl - 1
                self.send_header(header_obj.get_header_str())

    def process_route_error_header(self, header_obj):
        """
        processes route error header; node will be deleted from routing table
        @param header_obj: route error header object
        """
        if header_obj.broken_node in self.routing_table.get_list_of_all_available_destinations():
            logging.debug(f'received route error. Remove {header_obj.broken_node} from routing table')
            self.routing_table.delete_all_entries_of_destination(header_obj.broken_node)
        else:
            logging.debug(
                f'broken node is not in available nodes: {self.routing_table.get_list_of_all_available_destinations()}')
        header_obj.ttl -= 1
        self.send_header(header_obj.get_header_str())

    def process_ack_header(self, header_obj):
        """
        processes message acknowledgement header; if the destination address is equal to the own address the header
        object will be added to the message_acknowledgement_list, else the message will be forwarded
        @param header_obj: message acknowledgement header object
        """
        if header_obj.destination == variables.MY_ADDRESS:
            self.delete_from_ack_list(header_obj.message_id)
        header_obj.ttl -= 1
        logging.debug('forward ack message')
        if header_obj.destination != variables.MY_ADDRESS:
            self.send_header(header_obj.get_header_str())
        else:
            logging.debug(f'do not forward ack message, because end node was my address')

    def process_registration_header(self, header_obj):
        """
        processes registration message header
        :param header_obj: object of class RegistrationMessageHeader
        """
        if header_obj.source != variables.MY_ADDRESS:
            header_obj.ttl -= 1
            self.routing_table.add_address_to_processed_registration_messages_list(header_obj.source)
            if header_obj.subscribe:
                logging.debug('registered new peer')
                self.routing_table.add_peer(header_obj.peer_id, header_obj.source)
            else:
                logging.debug('unregistered peer')
                self.routing_table.delete_peer(header_obj.peer_id, header_obj.source)
            logging.debug('forward registration message')
            self.send_header(header_obj.get_header_str())
        else:
            self.received_own_registration_message = True

    def process_connect_request_header(self, header_obj):
        """
        processes connect request header
        :param header_obj: object of class ConnectRequestHeader
        """
        if header_obj.received_from != variables.MY_ADDRESS:
            if header_obj.end_node == variables.MY_ADDRESS:
                self.connected_node = header_obj.source
                # send connect request to java side
                logging.debug("send connect request to java side")
                self.sending_queue.put(
                    ipc.create_connect_request_message(header_obj.source_peer_id, header_obj.target_peer_id,
                                                       header_obj.timeout))
            elif header_obj.next_node == variables.MY_ADDRESS:
                logging.debug('forward connect request header')
                route = self.routing_table.get_best_route_for_destination(header_obj.end_node)
                if len(route) > 0:
                    header_obj.next_node = route['next_node']
                    header_obj.ttl -= 1
                    self.send_header(header_obj.get_header_str())
                else:
                    logging.debug(f'could not forward connect request header, because there is no routing table entry '
                                  f'for destination address {header_obj.end_node}')

    def process_disconnect_request_header(self, header_obj):
        """
        processes disconnect request header
        :param header_obj: object of class DisconnectRequestHeader
        """
        if header_obj.received_from != variables.MY_ADDRESS:
            if header_obj.end_node == variables.MY_ADDRESS:
                self.connected_node = header_obj.source
                # send connect request to java side
                logging.debug("send disconnect request to java side")
                self.sending_queue.put(
                    ipc.create_disconnect_request_message(header_obj.source_peer_id, header_obj.target_peer_id))
            elif header_obj.next_node == variables.MY_ADDRESS:
                logging.debug('forward disconnect request header')
                route = self.routing_table.get_best_route_for_destination(header_obj.end_node)
                if len(route) > 0:
                    header_obj.next_node = route['next_node']
                    header_obj.ttl -= 1
                    self.send_header(header_obj.get_header_str())
                else:
                    logging.debug(f'could not forward connect request header, because there is no routing table entry '
                                  f'for destination address {header_obj.end_node}')

    def send_connect_request_header(self, source_peer_id, target_peer_id, timeout_in_sec):
        """
        sends connect request
        :param source_peer_id: peer id of source peer
        :param target_peer_id: peer id of target peer
        :param timeout_in_sec: timeout in seconds
        """
        # look for address of source peer id and check whether source peer is already registered
        # wait until timeout for ConnectRequestHeader of other HubConnector
        self.check_peers(source_peer_id, target_peer_id)
        if not self.routing_table.check_connect_request_entry_already_exists(source_peer_id, target_peer_id):
            self.routing_table.add_connect_request(source_peer_id, target_peer_id)
            end_node = self.routing_table.get_address_of_peer(target_peer_id)
            route = self.routing_table.get_best_route_for_destination(end_node)
            if len(route) == 0:
                logging.info(
                    'could not find a route to {}. Sending route request...'.format(end_node))
                if self.send_route_request_message(end_node):
                    route = self.routing_table.get_best_route_for_destination(end_node)
                else:
                    logging.info('Got no answer on route requested.'.format(end_node))
                    return
            self.send_header(ConnectRequestHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, end_node,
                                                  route['next_node'], source_peer_id, target_peer_id,
                                                  timeout_in_sec).get_header_str())

    def send_disconnect_request_header(self, source_peer_id, target_peer_id):
        """
        sends disconnect request
        :param source_peer_id: peer id of source peer
        :param target_peer_id: peer id of target peer
        """
        self.check_peers(source_peer_id, target_peer_id)
        end_node = self.routing_table.get_address_of_peer(target_peer_id)
        route = self.routing_table.get_best_route_for_destination(end_node)
        if len(route) == 0:
            logging.info(f'could not find a route to {end_node}. Sending route request...')
            if self.send_route_request_message(end_node):
                route = self.routing_table.get_best_route_for_destination(end_node)
            else:
                logging.info(f'Got no answer on route requested for end node: {end_node}')
                return
        self.send_header(DisconnectRequestHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, end_node,
                                                 route['next_node'], source_peer_id, target_peer_id).get_header_str())

    def check_peers(self, source_peer_id, target_peer_id):
        """
        helper function to check whether peers are already registered; raises ValueError if peer is not registered
        :param source_peer_id: peer id of source peer
        :param target_peer_id: peer id of target peer
        """
        if not self.routing_table.check_peer_is_already_registered(source_peer_id):
            raise ValueError(f"source peer '{source_peer_id}' is not registered")
        elif not self.routing_table.check_peer_is_already_registered(target_peer_id):
            raise ValueError(f"target peer '{target_peer_id}' is not registered")
        elif self.routing_table.get_address_of_peer(source_peer_id) != variables.MY_ADDRESS:
            raise ValueError('source peer is not registered on this node')

    def send_registration_message(self, subscribe, peer_id):
        """
        function to registers/unregister a peer
        :param subscribe: if True peer will be registered on network; else the peer will be unregistered
        :param peer_id: id of peer
        """
        if subscribe:
            self.routing_table.add_peer(peer_id, variables.MY_ADDRESS)
        else:
            self.routing_table.delete_peer(peer_id, variables.MY_ADDRESS)
        attempts = 0
        received_own_request = False
        self.received_own_registration_message = False

        while attempts < 3:
            self.send_header(RegistrationHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, subscribe,
                                                peer_id).get_header_str())
            check_attempt_count = 0
            while check_attempt_count < 5:
                if self.received_own_registration_message:
                    received_own_request = True
                    break
                else:
                    check_attempt_count += 1
                    time.sleep(0.5)
            attempts += 1
            if received_own_request:
                return

    def stop(self):
        """
        function to shutdown background threads:

        thread for reading from serial port
        thread for writing to serial port
        thread for processing received header messages
        """
        self.PROCESS_INCOMING_MESSAGES = False
        serial_connection.WRITING_THREAD_ACTIVE = False
        serial_connection.READING_THREAD_ACTIVE = False

    def add_message_to_waiting_acknowledgement_list(self, message_header_obj):
        """
        adds the message id of a message to the list of pending acknowledgements
        :param message_header_obj: message as object of class MessageHeader
        """
        message_id = message_header_obj.message_id
        logging.debug(f"adding '{message_id}' to ack list")
        self.MESSAGES_ACKNOWLEDGMENT.append(message_id)

    def delete_from_ack_list(self, ack_id):
        """
        remove message id from list of pending acknowledgements
        :param ack_id: message id which should be deleted
        """
        logging.debug(f'remove {ack_id} from ack list')
        try:
            self.MESSAGES_ACKNOWLEDGMENT.remove(int(ack_id))
        except ValueError:
            logging.debug(f'ack is not in list. Current ack list: {self.MESSAGES_ACKNOWLEDGMENT}')

Class variables

var MESSAGES_ACKNOWLEDGMENT
var PAUSE_PROCESSING_INCOMING_MESSAGES
var PROCESS_INCOMING_MESSAGES
var VERIFICATION_TIMEOUT

Methods

def add_message_to_waiting_acknowledgement_list(self, message_header_obj)

adds the message id of a message to the list of pending acknowledgements :param message_header_obj: message as object of class MessageHeader

Expand source code
def add_message_to_waiting_acknowledgement_list(self, message_header_obj):
    """
    adds the message id of a message to the list of pending acknowledgements
    :param message_header_obj: message as object of class MessageHeader
    """
    message_id = message_header_obj.message_id
    logging.debug(f"adding '{message_id}' to ack list")
    self.MESSAGES_ACKNOWLEDGMENT.append(message_id)
def check_peers(self, source_peer_id, target_peer_id)

helper function to check whether peers are already registered; raises ValueError if peer is not registered :param source_peer_id: peer id of source peer :param target_peer_id: peer id of target peer

Expand source code
def check_peers(self, source_peer_id, target_peer_id):
    """
    helper function to check whether peers are already registered; raises ValueError if peer is not registered
    :param source_peer_id: peer id of source peer
    :param target_peer_id: peer id of target peer
    """
    if not self.routing_table.check_peer_is_already_registered(source_peer_id):
        raise ValueError(f"source peer '{source_peer_id}' is not registered")
    elif not self.routing_table.check_peer_is_already_registered(target_peer_id):
        raise ValueError(f"target peer '{target_peer_id}' is not registered")
    elif self.routing_table.get_address_of_peer(source_peer_id) != variables.MY_ADDRESS:
        raise ValueError('source peer is not registered on this node')
def delete_from_ack_list(self, ack_id)

remove message id from list of pending acknowledgements :param ack_id: message id which should be deleted

Expand source code
def delete_from_ack_list(self, ack_id):
    """
    remove message id from list of pending acknowledgements
    :param ack_id: message id which should be deleted
    """
    logging.debug(f'remove {ack_id} from ack list')
    try:
        self.MESSAGES_ACKNOWLEDGMENT.remove(int(ack_id))
    except ValueError:
        logging.debug(f'ack is not in list. Current ack list: {self.MESSAGES_ACKNOWLEDGMENT}')
def process_ack_header(self, header_obj)

processes message acknowledgement header; if the destination address is equal to the own address the header object will be added to the message_acknowledgement_list, else the message will be forwarded @param header_obj: message acknowledgement header object

Expand source code
def process_ack_header(self, header_obj):
    """
    processes message acknowledgement header; if the destination address is equal to the own address the header
    object will be added to the message_acknowledgement_list, else the message will be forwarded
    @param header_obj: message acknowledgement header object
    """
    if header_obj.destination == variables.MY_ADDRESS:
        self.delete_from_ack_list(header_obj.message_id)
    header_obj.ttl -= 1
    logging.debug('forward ack message')
    if header_obj.destination != variables.MY_ADDRESS:
        self.send_header(header_obj.get_header_str())
    else:
        logging.debug(f'do not forward ack message, because end node was my address')
def process_connect_request_header(self, header_obj)

processes connect request header :param header_obj: object of class ConnectRequestHeader

Expand source code
def process_connect_request_header(self, header_obj):
    """
    processes connect request header
    :param header_obj: object of class ConnectRequestHeader
    """
    if header_obj.received_from != variables.MY_ADDRESS:
        if header_obj.end_node == variables.MY_ADDRESS:
            self.connected_node = header_obj.source
            # send connect request to java side
            logging.debug("send connect request to java side")
            self.sending_queue.put(
                ipc.create_connect_request_message(header_obj.source_peer_id, header_obj.target_peer_id,
                                                   header_obj.timeout))
        elif header_obj.next_node == variables.MY_ADDRESS:
            logging.debug('forward connect request header')
            route = self.routing_table.get_best_route_for_destination(header_obj.end_node)
            if len(route) > 0:
                header_obj.next_node = route['next_node']
                header_obj.ttl -= 1
                self.send_header(header_obj.get_header_str())
            else:
                logging.debug(f'could not forward connect request header, because there is no routing table entry '
                              f'for destination address {header_obj.end_node}')
def process_disconnect_request_header(self, header_obj)

processes disconnect request header :param header_obj: object of class DisconnectRequestHeader

Expand source code
def process_disconnect_request_header(self, header_obj):
    """
    processes disconnect request header
    :param header_obj: object of class DisconnectRequestHeader
    """
    if header_obj.received_from != variables.MY_ADDRESS:
        if header_obj.end_node == variables.MY_ADDRESS:
            self.connected_node = header_obj.source
            # send connect request to java side
            logging.debug("send disconnect request to java side")
            self.sending_queue.put(
                ipc.create_disconnect_request_message(header_obj.source_peer_id, header_obj.target_peer_id))
        elif header_obj.next_node == variables.MY_ADDRESS:
            logging.debug('forward disconnect request header')
            route = self.routing_table.get_best_route_for_destination(header_obj.end_node)
            if len(route) > 0:
                header_obj.next_node = route['next_node']
                header_obj.ttl -= 1
                self.send_header(header_obj.get_header_str())
            else:
                logging.debug(f'could not forward connect request header, because there is no routing table entry '
                              f'for destination address {header_obj.end_node}')
def process_incoming_message(self)

get messages from LoRa module, create header object and call appropriate method to process the received message

Expand source code
def process_incoming_message(self):
    """
    get messages from LoRa module, create header object and call appropriate method to process the received message
    """
    while self.PROCESS_INCOMING_MESSAGES:
        if not serial_connection.response_q.empty() and not self.PAUSE_PROCESSING_INCOMING_MESSAGES:
            raw_message = serial_connection.response_q.get()
            logging.debug(f'process: {raw_message}')
            try:
                header_obj = header.create_header_obj_from_raw_message(raw_message)
                if header_obj.ttl > 1:
                    self.routing_table.add_neighbor_to_routing_table(header_obj)
                    if header_obj.flag == header.RouteRequestHeader.HEADER_TYPE:
                        self.process_route_request(header_obj)
                    elif header_obj.flag == header.MessageHeader.HEADER_TYPE:
                        self.process_message_header(header_obj)
                    elif header_obj.flag == header.RouteReplyHeader.HEADER_TYPE:
                        self.process_route_reply_header(header_obj)
                    elif header_obj.flag == header.RouteErrorHeader.HEADER_TYPE:
                        self.process_route_error_header(header_obj)
                    elif header_obj.flag == header.MessageAcknowledgeHeader.HEADER_TYPE:
                        self.process_ack_header(header_obj)
                    elif header_obj.flag == header.RegistrationHeader.HEADER_TYPE:
                        self.process_registration_header(header_obj)
                    elif header_obj.flag == header.ConnectRequestHeader.HEADER_TYPE:
                        self.process_connect_request_header(header_obj)
                    elif header_obj.flag == header.DisconnectRequestHeader.HEADER_TYPE:
                        self.process_disconnect_request_header(header_obj)
            except ValueError as e:
                logging.warning(str(e))
                traceback.print_exc()
                try:
                    logging.debug('try to add received signal to unsupported devices list...')
                    addr = header.get_received_from_value(raw_message)
                    self.routing_table.add_neighbor_with_unsupported_protocol(addr)
                except ValueError as e:
                    logging.warning(str(e))
def process_message_header(self, header_obj)

processed received message header; if the end node of the message is this node the message will be put into the received_messages queue to forward the message via IPC to the Java side; else the message will be forwarded to the next_node @param header_obj: message header object

Expand source code
def process_message_header(self, header_obj):
    """
    processed received message header; if the end node of the message is this node the message will be put
    into the received_messages queue to forward the message via IPC to the Java side; else the message will be
    forwarded to the next_node
    @param header_obj: message header object
    """
    if header_obj.destination == variables.MY_ADDRESS and header_obj.source == self.connected_node:
        ack_header_str = header.MessageAcknowledgeHeader(None, variables.MY_ADDRESS, variables.TTL_START_VALUE,
                                                         header_obj.source, header_obj.message_id).get_header_str()
        if self.routing_table.check_message_already_received(header_obj.source, header_obj.message_id):
            self.send_header(ack_header_str)
        else:
            logging.debug(f'payload: {str(header_obj.payload)}')
            self.received_messages_queue.put(base64.b64decode(header_obj.payload))
            # send acknowledge message
            logging.debug('sending acknowledgement')
            self.send_header(ack_header_str)

    elif header_obj.next_node == variables.MY_ADDRESS and header_obj.destination != variables.MY_ADDRESS:
        best_route = self.routing_table.get_best_route_for_destination(header_obj.destination)
        if len(best_route) == 0:
            logging.info('no routing table entry for {} to forward message found'.format(header_obj.next_node))
        else:
            header_obj.next_node = best_route['next_node']
            logging.info('forwarding message from {source} to {destination} over hop {next_node}'.format(
                source=header_obj.source, destination=header_obj.destination, next_node=header_obj.next_node))
            header_obj.ttl = header_obj.ttl - 1
            self.send_header(header_obj.get_header_str())
    else:
        logging.debug('ignoring message: {}'.format(str(header_obj)))
def process_registration_header(self, header_obj)

processes registration message header :param header_obj: object of class RegistrationMessageHeader

Expand source code
def process_registration_header(self, header_obj):
    """
    processes registration message header
    :param header_obj: object of class RegistrationMessageHeader
    """
    if header_obj.source != variables.MY_ADDRESS:
        header_obj.ttl -= 1
        self.routing_table.add_address_to_processed_registration_messages_list(header_obj.source)
        if header_obj.subscribe:
            logging.debug('registered new peer')
            self.routing_table.add_peer(header_obj.peer_id, header_obj.source)
        else:
            logging.debug('unregistered peer')
            self.routing_table.delete_peer(header_obj.peer_id, header_obj.source)
        logging.debug('forward registration message')
        self.send_header(header_obj.get_header_str())
    else:
        self.received_own_registration_message = True
def process_route_error_header(self, header_obj)

processes route error header; node will be deleted from routing table @param header_obj: route error header object

Expand source code
def process_route_error_header(self, header_obj):
    """
    processes route error header; node will be deleted from routing table
    @param header_obj: route error header object
    """
    if header_obj.broken_node in self.routing_table.get_list_of_all_available_destinations():
        logging.debug(f'received route error. Remove {header_obj.broken_node} from routing table')
        self.routing_table.delete_all_entries_of_destination(header_obj.broken_node)
    else:
        logging.debug(
            f'broken node is not in available nodes: {self.routing_table.get_list_of_all_available_destinations()}')
    header_obj.ttl -= 1
    self.send_header(header_obj.get_header_str())
def process_route_reply_header(self, header_obj)

processes route reply header; if the source address is equal to the own address the message will be rejected; if the destination address is equal to the own address a new route will be added to the routing table, else the message will be forwarded to the address mentioned in the next_node field @param header_obj: route reply header object

Expand source code
def process_route_reply_header(self, header_obj):
    """
    processes route reply header; if the source address is equal to the own address the message will be rejected;
    if the destination address is equal to the own address a new route will be added to the routing table, else
    the message will be forwarded to the address mentioned in the next_node field
    @param header_obj: route reply header object
    """
    if header_obj.source == variables.MY_ADDRESS:
        return
    if header_obj.end_node == variables.MY_ADDRESS:
        # add entry to routing table
        self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from, header_obj.hops + 1)
    elif header_obj.next_node == variables.MY_ADDRESS:
        if len(self.routing_table.get_best_route_for_destination(header_obj.source)) != 0:
            # forward route reply message
            # add routing table entry
            logging.debug("add routing table entry before forwarding route reply message")
            self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from,
                                                       header_obj.hops + 1)
            # forward message
            header_obj.next_node = self.routing_table.get_best_route_for_destination(header_obj.end_node)[
                'next_node']
            header_obj.hops = header_obj.hops + 1
            header_obj.ttl = header_obj.ttl - 1
            self.send_header(header_obj.get_header_str())
def process_route_request(self, header_obj)

processes received route request header @param header_obj: route request header object

Expand source code
def process_route_request(self, header_obj):
    """
    processes received route request header
    @param header_obj: route request header object
    """
    # first of all check whether source of route request is myself (to prevent cycle)
    if header_obj.source != variables.MY_ADDRESS:
        # look whether requested node is myself
        if header_obj.end_node == variables.MY_ADDRESS:
            logging.debug('add new routing table entry before sending route reply')
            self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from,
                                                       header_obj.hops + 1)
            logging.info('sending route reply message...')
            self.send_route_reply(next_node=header_obj.received_from, end_node=header_obj.source)
        else:
            if len(self.routing_table.get_best_route_for_destination(header_obj.source)) == 0:
                # if there is no entry for source of route request, you can add routing table entry
                self.routing_table.add_routing_table_entry(header_obj.source, header_obj.received_from,
                                                           header_obj.hops)

            header_obj.ttl = header_obj.ttl - 1
            header_obj.hops = header_obj.hops + 1
            if not self.routing_table.check_route_request_already_processed(header_obj.end_node):
                logging.debug('forward route request message')
                self.routing_table.add_address_to_processed_requests_list(header_obj.end_node)
                self.send_header(header_obj.get_header_str())
            else:
                logging.debug('route request was already processed')
def send_connect_request_header(self, source_peer_id, target_peer_id, timeout_in_sec)

sends connect request :param source_peer_id: peer id of source peer :param target_peer_id: peer id of target peer :param timeout_in_sec: timeout in seconds

Expand source code
def send_connect_request_header(self, source_peer_id, target_peer_id, timeout_in_sec):
    """
    sends connect request
    :param source_peer_id: peer id of source peer
    :param target_peer_id: peer id of target peer
    :param timeout_in_sec: timeout in seconds
    """
    # look for address of source peer id and check whether source peer is already registered
    # wait until timeout for ConnectRequestHeader of other HubConnector
    self.check_peers(source_peer_id, target_peer_id)
    if not self.routing_table.check_connect_request_entry_already_exists(source_peer_id, target_peer_id):
        self.routing_table.add_connect_request(source_peer_id, target_peer_id)
        end_node = self.routing_table.get_address_of_peer(target_peer_id)
        route = self.routing_table.get_best_route_for_destination(end_node)
        if len(route) == 0:
            logging.info(
                'could not find a route to {}. Sending route request...'.format(end_node))
            if self.send_route_request_message(end_node):
                route = self.routing_table.get_best_route_for_destination(end_node)
            else:
                logging.info('Got no answer on route requested.'.format(end_node))
                return
        self.send_header(ConnectRequestHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, end_node,
                                              route['next_node'], source_peer_id, target_peer_id,
                                              timeout_in_sec).get_header_str())
def send_disconnect_request_header(self, source_peer_id, target_peer_id)

sends disconnect request :param source_peer_id: peer id of source peer :param target_peer_id: peer id of target peer

Expand source code
def send_disconnect_request_header(self, source_peer_id, target_peer_id):
    """
    sends disconnect request
    :param source_peer_id: peer id of source peer
    :param target_peer_id: peer id of target peer
    """
    self.check_peers(source_peer_id, target_peer_id)
    end_node = self.routing_table.get_address_of_peer(target_peer_id)
    route = self.routing_table.get_best_route_for_destination(end_node)
    if len(route) == 0:
        logging.info(f'could not find a route to {end_node}. Sending route request...')
        if self.send_route_request_message(end_node):
            route = self.routing_table.get_best_route_for_destination(end_node)
        else:
            logging.info(f'Got no answer on route requested for end node: {end_node}')
            return
    self.send_header(DisconnectRequestHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, end_node,
                                             route['next_node'], source_peer_id, target_peer_id).get_header_str())
def send_header(self, header_str)

sends a string to LoRa network @param header_str: message to send

Expand source code
def send_header(self, header_str):
    """
    sends a string to LoRa network
    @param header_str: message to send
    """
    wait_random_time()
    serial_connection.writing_q.put(('AT+SEND={}'.format(str(len(header_str))), ['AT,OK']))
    if serial_connection.status_q.get(timeout=self.VERIFICATION_TIMEOUT):
        serial_connection.writing_q.put((header_str, ['AT,SENDING', 'AT,SENDED']))
        if serial_connection.status_q.get(timeout=self.VERIFICATION_TIMEOUT):
            logging.debug("sent header '{}'.".format(header_str))
            return
    logging.debug("could not send header '{}', because got invalid status from lora module".format(header_str))
def send_message(self, payload)

send message to currently connected peer @param payload: message to send as bytes

Expand source code
def send_message(self, payload):
    """
    send message to currently connected peer
    @param payload: message to send as bytes
    """
    if self.connected_node is not None:
        destination = self.connected_node
        best_route = self.routing_table.get_best_route_for_destination(destination)
        if len(best_route) == 0:
            logging.info('could not find a route to {}. Sending route request...'.format(destination))
            if self.send_route_request_message(destination):
                best_route = self.routing_table.get_best_route_for_destination(destination)
            else:
                logging.info('Got no answer on route requested.'.format(destination))
                return
        self.message_counter += 1
        header_obj = header.MessageHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, destination,
                                          best_route['next_node'], self.message_counter,
                                          base64.b64encode(payload).decode(variables.ENCODING))
        attempt = 0
        self.add_message_to_waiting_acknowledgement_list(header_obj)
        message_confirmed = False
        while attempt < 3 and not message_confirmed:
            logging.debug(f'attempt: {attempt}')
            self.send_header(header_obj.get_header_str())
            attempt_count_received_ack = 0
            while attempt_count_received_ack < 10:
                if header_obj.message_id not in self.MESSAGES_ACKNOWLEDGMENT:
                    message_confirmed = True
                    break
                else:
                    time.sleep(0.5)
                    attempt_count_received_ack += 1
            if message_confirmed:
                break
            else:
                attempt += 1
        if message_confirmed:
            print('*******************message was acknowledged by receiver*******************')
        else:
            logging.debug(
                f'message was not acknowledged by receiver. Current ack_list: {self.MESSAGES_ACKNOWLEDGMENT}'
                f'\nSending route error message')
            self.routing_table.delete_all_entries_of_destination(destination)
            self.delete_from_ack_list(header_obj.message_id)
            self.send_header(header.RouteErrorHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL,
                                                     header_obj.destination).get_header_str())
def send_registration_message(self, subscribe, peer_id)

function to registers/unregister a peer :param subscribe: if True peer will be registered on network; else the peer will be unregistered :param peer_id: id of peer

Expand source code
def send_registration_message(self, subscribe, peer_id):
    """
    function to registers/unregister a peer
    :param subscribe: if True peer will be registered on network; else the peer will be unregistered
    :param peer_id: id of peer
    """
    if subscribe:
        self.routing_table.add_peer(peer_id, variables.MY_ADDRESS)
    else:
        self.routing_table.delete_peer(peer_id, variables.MY_ADDRESS)
    attempts = 0
    received_own_request = False
    self.received_own_registration_message = False

    while attempts < 3:
        self.send_header(RegistrationHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, subscribe,
                                            peer_id).get_header_str())
        check_attempt_count = 0
        while check_attempt_count < 5:
            if self.received_own_registration_message:
                received_own_request = True
                break
            else:
                check_attempt_count += 1
                time.sleep(0.5)
        attempts += 1
        if received_own_request:
            return
def send_route_reply(self, next_node, end_node)

sends route reply message @param next_node: next receiver of the message, which should forward the message to the destination node @param end_node: node which sent the route request

Expand source code
def send_route_reply(self, next_node, end_node):
    """
    sends route reply message
    @param next_node: next receiver of the message, which should forward the message to the destination node
    @param end_node: node which sent the route request
    """
    route_reply_header_obj = header.RouteReplyHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, 0, end_node,
                                                     next_node)
    self.send_header(route_reply_header_obj.get_header_str())
def send_route_request_message(self, end_node)

sends route request @param end_node: node for which a route is required @return: True, if route request was confirmed, else False

Expand source code
def send_route_request_message(self, end_node):
    """
    sends route request
    @param end_node: node for which a route is required
    @return: True, if route request was confirmed, else False
    """
    route_request_header_obj = header.RouteRequestHeader(None, variables.MY_ADDRESS, variables.DEFAULT_TTL, 0,
                                                         end_node)
    attempt = 0
    message_confirmed = False
    while attempt < 3 and not message_confirmed:
        logging.debug('attempt: {}'.format(attempt))
        self.send_header(route_request_header_obj.get_header_str())
        check_request_attempt_count = 0
        while check_request_attempt_count < 10:
            if len(self.routing_table.get_best_route_for_destination(end_node)) != 0:
                logging.debug('new route for {} found'.format(end_node))
                message_confirmed = True
                break
            else:
                time.sleep(0.5)
                check_request_attempt_count += 1
        attempt += 1
        if message_confirmed:
            return message_confirmed
        else:
            attempt += 1
    return message_confirmed
def start_protocol_thread(self)

starts new thread which processes incoming messages in background

Expand source code
def start_protocol_thread(self):
    """
    starts new thread which processes incoming messages in background
    """
    receiving_thread = threading.Thread(target=self.process_incoming_message)
    receiving_thread.start()
def stop(self)

function to shutdown background threads:

thread for reading from serial port thread for writing to serial port thread for processing received header messages

Expand source code
def stop(self):
    """
    function to shutdown background threads:

    thread for reading from serial port
    thread for writing to serial port
    thread for processing received header messages
    """
    self.PROCESS_INCOMING_MESSAGES = False
    serial_connection.WRITING_THREAD_ACTIVE = False
    serial_connection.READING_THREAD_ACTIVE = False