Module lora_multihop.serial_connection

Expand source code
import threading
import queue
import time
import logging

from lora_multihop import variables

__author__ = "Marvin Rausch"

ser = None

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-9s) %(message)s', )

BUF_SIZE = 100
writing_q = queue.Queue(BUF_SIZE)
BUF_SIZE = 1000
response_q = queue.Queue(BUF_SIZE)
BUF_SIZE = 1000
status_q = queue.Queue(BUF_SIZE)
WRITE_DATA = False
READING_THREAD_ACTIVE = True
WRITING_THREAD_ACTIVE = True


def bytes_to_str(message_in_bytes):
    """
    converts bytes to string
    :param message_in_bytes: bytes which should be convert
    :return: decoded bytes as string using decoding defined under variables.ENCODING
    """
    return message_in_bytes.decode(variables.ENCODING)


def str_to_bytes(string_to_convert):
    """
    encodes string to bytes
    :param string_to_convert: string which should be encoded
    :return: encoded string as bytes using encoding defined under variables.ENCODING
    """
    return bytes(string_to_convert, variables.ENCODING)


class ReadingThread(threading.Thread):
    def __init__(self, name):
        super(ReadingThread, self).__init__()
        self.name = name

    def run(self):
        """
        starts a thread for reading messages from serial port
        """
        global READING_THREAD_ACTIVE
        while READING_THREAD_ACTIVE:
            global WRITE_DATA
            if writing_q.empty() and not WRITE_DATA:
                if ser.in_waiting:
                    received_raw_message = ser.readline()
                    logging.debug('received: {}'.format(received_raw_message))
                    try:
                        received_raw_message = bytes_to_str(received_raw_message)
                        response_q.put(received_raw_message)
                    except UnicodeDecodeError:
                        logging.debug(f"message '{received_raw_message}' dumped. because it is not encoded in UTF-8")


class WritingThread(threading.Thread):
    def __init__(self, name):
        super(WritingThread, self).__init__()
        self.name = name
        return

    def run(self):
        """
        starts a thread for writing messages to a serial port
        """
        global WRITING_THREAD_ACTIVE
        while WRITING_THREAD_ACTIVE:
            if not writing_q.empty():
                global WRITE_DATA
                WRITE_DATA = True
                command_tuple = writing_q.get()
                command = command_tuple[0]
                command = command + '\r\n'
                command = str_to_bytes(command)
                verify_list = command_tuple[1]
                logging.debug("sending command '{}'".format(command))
                ser.write(command)
                successful = True
                if len(verify_list) > 0:
                    for entry in verify_list:
                        status = bytes_to_str(ser.readline())
                        status = status.strip()
                        if 'LR' in status:
                            logging.warning('got message while verifying command: {}. Message dumped.'.format(status))
                            #  dump message, if receiving message while verifying status of command
                            status = bytes_to_str(ser.readline())
                            status = status.strip()
                        if entry != status:
                            logging.warning(
                                'could not verify {expected} != {status}'.format(expected=entry, status=status))
                            successful = False
                        else:
                            logging.debug('verified {status}'.format(status=status))
                    status_q.put(successful)

                time.sleep(0.2)
                WRITE_DATA = False


def start_send_receive_threads(serial_conn):
    """
    starts threads for communication with serial port
    :param serial_conn: object for serial connection from pyserial library
    """
    global ser
    ser = serial_conn
    t1 = ReadingThread(name='producer')
    t2 = WritingThread(name='consumer')

    t1.start()
    time.sleep(0.5)
    t2.start()
    time.sleep(0.5)


def execute_command(command_as_str, verification_list=None):
    """
    helper function to send AT-command to serial port
    :param command_as_str: command which should be sent
    :param verification_list: list of expected results; can also be empty if result of command should not be verified
    :return: True if expected results equal to results received from serial port, else False
    """
    if verification_list is None:
        verification_list = []
    writing_q.put((command_as_str, verification_list))
    if len(verification_list) != 0:
        return status_q.get(timeout=variables.COMMAND_VERIFICATION_TIMEOUT)


if __name__ == '__main__':
    p = ReadingThread(name='producer')
    c = WritingThread(name='consumer')

    p.start()
    time.sleep(0.5)
    c.start()
    time.sleep(0.5)
    writing_q.put(('AT', ['AT,OK']))
    writing_q.put(('AT', []))

    time.sleep(2)
    print(response_q.get())

Functions

def bytes_to_str(message_in_bytes)

converts bytes to string :param message_in_bytes: bytes which should be convert :return: decoded bytes as string using decoding defined under variables.ENCODING

Expand source code
def bytes_to_str(message_in_bytes):
    """
    converts bytes to string
    :param message_in_bytes: bytes which should be convert
    :return: decoded bytes as string using decoding defined under variables.ENCODING
    """
    return message_in_bytes.decode(variables.ENCODING)
def execute_command(command_as_str, verification_list=None)

helper function to send AT-command to serial port :param command_as_str: command which should be sent :param verification_list: list of expected results; can also be empty if result of command should not be verified :return: True if expected results equal to results received from serial port, else False

Expand source code
def execute_command(command_as_str, verification_list=None):
    """
    helper function to send AT-command to serial port
    :param command_as_str: command which should be sent
    :param verification_list: list of expected results; can also be empty if result of command should not be verified
    :return: True if expected results equal to results received from serial port, else False
    """
    if verification_list is None:
        verification_list = []
    writing_q.put((command_as_str, verification_list))
    if len(verification_list) != 0:
        return status_q.get(timeout=variables.COMMAND_VERIFICATION_TIMEOUT)
def start_send_receive_threads(serial_conn)

starts threads for communication with serial port :param serial_conn: object for serial connection from pyserial library

Expand source code
def start_send_receive_threads(serial_conn):
    """
    starts threads for communication with serial port
    :param serial_conn: object for serial connection from pyserial library
    """
    global ser
    ser = serial_conn
    t1 = ReadingThread(name='producer')
    t2 = WritingThread(name='consumer')

    t1.start()
    time.sleep(0.5)
    t2.start()
    time.sleep(0.5)
def str_to_bytes(string_to_convert)

encodes string to bytes :param string_to_convert: string which should be encoded :return: encoded string as bytes using encoding defined under variables.ENCODING

Expand source code
def str_to_bytes(string_to_convert):
    """
    encodes string to bytes
    :param string_to_convert: string which should be encoded
    :return: encoded string as bytes using encoding defined under variables.ENCODING
    """
    return bytes(string_to_convert, variables.ENCODING)

Classes

class ReadingThread (name)

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.

Expand source code
class ReadingThread(threading.Thread):
    def __init__(self, name):
        super(ReadingThread, self).__init__()
        self.name = name

    def run(self):
        """
        starts a thread for reading messages from serial port
        """
        global READING_THREAD_ACTIVE
        while READING_THREAD_ACTIVE:
            global WRITE_DATA
            if writing_q.empty() and not WRITE_DATA:
                if ser.in_waiting:
                    received_raw_message = ser.readline()
                    logging.debug('received: {}'.format(received_raw_message))
                    try:
                        received_raw_message = bytes_to_str(received_raw_message)
                        response_q.put(received_raw_message)
                    except UnicodeDecodeError:
                        logging.debug(f"message '{received_raw_message}' dumped. because it is not encoded in UTF-8")

Ancestors

  • threading.Thread

Methods

def run(self)

starts a thread for reading messages from serial port

Expand source code
def run(self):
    """
    starts a thread for reading messages from serial port
    """
    global READING_THREAD_ACTIVE
    while READING_THREAD_ACTIVE:
        global WRITE_DATA
        if writing_q.empty() and not WRITE_DATA:
            if ser.in_waiting:
                received_raw_message = ser.readline()
                logging.debug('received: {}'.format(received_raw_message))
                try:
                    received_raw_message = bytes_to_str(received_raw_message)
                    response_q.put(received_raw_message)
                except UnicodeDecodeError:
                    logging.debug(f"message '{received_raw_message}' dumped. because it is not encoded in UTF-8")
class WritingThread (name)

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.

Expand source code
class WritingThread(threading.Thread):
    def __init__(self, name):
        super(WritingThread, self).__init__()
        self.name = name
        return

    def run(self):
        """
        starts a thread for writing messages to a serial port
        """
        global WRITING_THREAD_ACTIVE
        while WRITING_THREAD_ACTIVE:
            if not writing_q.empty():
                global WRITE_DATA
                WRITE_DATA = True
                command_tuple = writing_q.get()
                command = command_tuple[0]
                command = command + '\r\n'
                command = str_to_bytes(command)
                verify_list = command_tuple[1]
                logging.debug("sending command '{}'".format(command))
                ser.write(command)
                successful = True
                if len(verify_list) > 0:
                    for entry in verify_list:
                        status = bytes_to_str(ser.readline())
                        status = status.strip()
                        if 'LR' in status:
                            logging.warning('got message while verifying command: {}. Message dumped.'.format(status))
                            #  dump message, if receiving message while verifying status of command
                            status = bytes_to_str(ser.readline())
                            status = status.strip()
                        if entry != status:
                            logging.warning(
                                'could not verify {expected} != {status}'.format(expected=entry, status=status))
                            successful = False
                        else:
                            logging.debug('verified {status}'.format(status=status))
                    status_q.put(successful)

                time.sleep(0.2)
                WRITE_DATA = False

Ancestors

  • threading.Thread

Methods

def run(self)

starts a thread for writing messages to a serial port

Expand source code
def run(self):
    """
    starts a thread for writing messages to a serial port
    """
    global WRITING_THREAD_ACTIVE
    while WRITING_THREAD_ACTIVE:
        if not writing_q.empty():
            global WRITE_DATA
            WRITE_DATA = True
            command_tuple = writing_q.get()
            command = command_tuple[0]
            command = command + '\r\n'
            command = str_to_bytes(command)
            verify_list = command_tuple[1]
            logging.debug("sending command '{}'".format(command))
            ser.write(command)
            successful = True
            if len(verify_list) > 0:
                for entry in verify_list:
                    status = bytes_to_str(ser.readline())
                    status = status.strip()
                    if 'LR' in status:
                        logging.warning('got message while verifying command: {}. Message dumped.'.format(status))
                        #  dump message, if receiving message while verifying status of command
                        status = bytes_to_str(ser.readline())
                        status = status.strip()
                    if entry != status:
                        logging.warning(
                            'could not verify {expected} != {status}'.format(expected=entry, status=status))
                        successful = False
                    else:
                        logging.debug('verified {status}'.format(status=status))
                status_q.put(successful)

            time.sleep(0.2)
            WRITE_DATA = False