Task Share

This file is adapted from Dr. John Ridgely, Cal Poly SLO Mechanical Engineering

# @file task_share.py

# This file contains classes which allow tasks to share data without the risk

# of data corruption by interrupts.

# 

# @author JR Ridgely

# @date 2017-Jan-01 JRR Approximate date of creation of file

# @date 2021-Dec-18 JRR Docstrings changed to work without DoxyPyPy

# @copyright This program is copyright (c) 2017-2023 by JR Ridgely and released

# under the GNU Public License, version 3.0.

# 

# It is intended for educational use only, but its use is not limited thereto.

# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"

# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE

# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE

# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE

# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR

# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF

# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS

# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN

# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)

# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE

# POSSIBILITY OF SUCH DAMAGE.

import array import gc import pyb import micropython

## This is a system-wide list of all the queues and shared variables. It is

# used to create diagnostic printouts.

share_list = \[\]

## This dictionary allows readable printouts of queue and share data types.

type_code_strings = {'b' : "int8", 'B' : "uint8", 'h' : "int16", 'H' :
"uint16", 'i' : "int(?)", 'I' : "uint(?)", 'l' : "int32", 'L' :
"uint32", 'q' : "int64", 'Q' : "uint64", 'f' : "float", 'd' : "double"}

## Create a string holding a diagnostic printout showing the status of

# each queue and share in the system.

# @return A string containing information about each queue and share

def show_all (): gen = (str (item) for item in share_list) return
'`\n`{=tex}'.join (gen)

## Base class for queues and shares which exchange data between tasks.

# 

# One should never create an object from this class; it doesn't do anything

# useful. It exists to implement things which are common between its child

# classes @c Queue and @c Share.

class BaseShare:

    ## Create a base queue object when called by a child class initializer.
    #
    #  This method creates the things which queues and shares have in common.
    def __init__ (self, type_code, thread_protect = True, name = None):
        self._type_code = type_code
        self._thread_protect = thread_protect

        # Add this queue to the global share and queue list
        share_list.append (self)

## A queue which is used to transfer data from one task to another.

# 

# If parameter 'thread_protect' is @c True when a queue is created, transfers

# of data will be protected from corruption in the case that one task might

# interrupt another due to use in a pre-emptive multithreading environment or

# due to one task being run as an interrupt service routine.

# 

# An example of the creation and use of a queue is as follows:

# 

# @code

# import task_share

# 

# \# This queue holds unsigned short (16-bit) integers

# my_queue = task_share.Queue ('H', 100, name="My Queue")

# 

# \# Somewhere in one task, put data into the queue

# my_queue.put (some_data)

# 

# \# In another task, read data from the queue

# something = my_queue.get ()

# @endcode

class Queue (BaseShare):

    ## A counter used to give serial numbers to queues for diagnostic use.
    ser_num = 0

    ## Initialize a queue object to carry and buffer data between tasks.
    #
    #  This method sets up a queue by allocating memory for the contents and 
    #  setting up the components in an empty configuration. 
    #
    #  Each queue can only carry data of one particular type which must be
    #  chosen from the following list. The data type is specified by a 
    #  one-letter type code which is given as for the Python @c array.array
    #  type, which can be any of the following:
    #  |      |      |      |
    #  |:-----|:-----|:-----|
    #  | **b** (signed char) | **B** (unsigned char) | 8 bit integers |
    #  | **h** (signed short) | **H** (unsigned short) | 16 bit integers |
    #  | **i** (signed int) | **I** (unsigned int) | 32 bit integers (probably) |
    #  | **l** (signed long) | **L** (unsigned long) | 32 bit integers |
    #  | **q** (signed long long) | **Q** (unsigned long long) | 64 bit integers |
    #  | **f** (float) | **d** (double-precision float) | |
    #
    #  @param type_code The type of data items which the queue can hold
    #  @param size The maximum number of items which the queue can hold
    #  @param thread_protect @c True if mutual exclusion protection is used
    #  @param overwrite If @c True, oldest data will be overwritten with new
    #         data if the queue becomes full 
    #  @param name A short name for the queue, default @c QueueN where @c N
    #         is a serial number for the queue
    def __init__ (self, type_code, size, thread_protect = False, 
                overwrite = False, name = None):
        # First call the parent class initializer
        super ().__init__ (type_code, thread_protect, name)

        self._size = size
        self._overwrite = overwrite
        self._name = str (name) if name != None \
            else 'Queue' + str (Queue.ser_num)
        Queue.ser_num += 1

        # Allocate memory in which the queue's data will be stored
        try:
            self._buffer = array.array (type_code, range (size))
        except MemoryError:
            self._buffer = None
            raise
        except ValueError:
            self._buffer = None
            raise

        # Initialize pointers to be used for reading and writing data
        self.clear ()

        # Since we may have allocated a bunch of memory, call the garbage
        # collector to neaten up what memory is left for future use
        gc.collect ()


    ## Put an item into the queue.
    # 
    #  If there isn't room for the item, wait (blocking the calling process)
    #  until room becomes available, unless the @c overwrite constructor
    #  parameter was set to @c True to allow old data to be clobbered. If
    #  non-blocking behavior without overwriting is needed, one should call
    #  @c full() to ensure that the queue is not full before putting data
    #  into it:
    #  @code
    #     def some_task ():
    #         # Setup
    #         while True:
    #             if not my_queue.full ():
    #                 my_queue.put (create_something_to_put ())
    #             yield 0
    #  @endcode
    #  @param item The item to be placed into the queue
    #  @param in_ISR Set this to @c True if calling from within an ISR
    @micropython.native
    def put (self, item, in_ISR = False):
        # If we're in an ISR and the queue is full and we're not allowed to
        # overwrite data, we have to give up and exit
        if self.full ():
            if in_ISR:
                return

            # Wait (if needed) until there's room in the buffer for the data
            if not self._overwrite:
                while self.full ():
                    pass

        # Prevent data corruption by blocking interrupts during data transfer
        if self._thread_protect and not in_ISR:
            _irq_state = pyb.disable_irq ()

        # Write the data and advance the counts and pointers
        self._buffer[self._wr_idx] = item
        self._wr_idx += 1
        if self._wr_idx >= self._size:
            self._wr_idx = 0
        self._num_items += 1
        if self._num_items >= self._size:        # Can't be fuller than full
            self._num_items = self._size
        if self._num_items > self._max_full:     # Record maximum fillage
            self._max_full = self._num_items

        # Re-enable interrupts
        if self._thread_protect and not in_ISR:
            pyb.enable_irq (_irq_state)


    ## Read an item from the queue.
    # 
    #  If there isn't anything in there, wait (blocking the calling process)
    #  until something becomes available. If non-blocking reads are needed,
    #  one should call @c any() to check for items before attempting to read
    #  from the queue. This is usually done in a low priority task:
    #  @code
    #     def some_task ():
    #         # Setup
    #         while True:
    #             if my_queue.any ():
    #                 something = my_queue.get ()
    #                 do_something_with (something)
    #             # More loop stuff
    #             yield 0
    #  @endcode
    #  @param in_ISR Set this to @c True if calling from within an ISR
    @micropython.native
    def get (self, in_ISR = False):
        # Wait until there's something in the queue to be returned
        while self.empty ():
            pass

        # Prevent data corruption by blocking interrupts during data transfer
        if self._thread_protect and not in_ISR:
            irq_state = pyb.disable_irq ()

        # Get the item to be returned from the queue
        to_return = self._buffer[self._rd_idx]

        # Move the read pointer and adjust the number of items in the queue
        self._rd_idx += 1
        if self._rd_idx >= self._size:
            self._rd_idx = 0
        self._num_items -= 1
        if self._num_items < 0:
            self._num_items = 0

        # Re-enable interrupts
        if self._thread_protect and not in_ISR:
            pyb.enable_irq (irq_state)

        return (to_return)


    ## Check if there are any items in the queue.
    # 
    #  Returns @c True if there are any items in the queue and @c False
    #  if the queue is empty.
    #  @return @c True if items are in the queue, @c False if not
    @micropython.native
    def any (self):
        return (self._num_items > 0)


    ## Check if the queue is empty.
    # 
    #  Returns @c True if there are no items in the queue and @c False if 
    #  there are any items therein.
    #  @return @c True if queue is empty, @c False if it's not empty
    @micropython.native
    def empty (self):
        return (self._num_items <= 0)


    ## Check if the queue is full.
    # 
    #  This method returns @c True if the queue is already full and there
    #  is no room for more data without overwriting existing data. 
    #  @return @c True if the queue is full
    @micropython.native
    def full (self):
        return (self._num_items >= self._size)


    ## Check how many items are in the queue.
    # 
    #  This method returns the number of items which are currently in the 
    #  queue.
    #  @return The number of items in the queue
    @micropython.native
    def num_in (self):
        return (self._num_items)


    ## Remove all contents from the queue.
    def clear (self):
        self._rd_idx = 0
        self._wr_idx = 0
        self._num_items = 0
        self._max_full = 0


    ## This method puts diagnostic information about the queue into a string.
    # 
    #  It shows the queue's name and type as well as the maximum number of
    #  items and queue size. 
    def __repr__ (self):
        return ('{:<12s} Queue<{:s}> Max Full {:d}/{:d}'.format (self._name,
                type_code_strings[self._type_code], self._max_full, self._size))

# ============================================================================

## An item which holds data to be shared between tasks.

# This class implements a shared data item which can be protected against

# data corruption by pre-emptive multithreading. Multithreading which can

# corrupt shared data includes the use of ordinary interrupts as well as the

# use of pre-emptive multithreading such as by a Real-Time Operating System

# (RTOS).

# 

# An example of the creation and use of a share is as follows:

# @code

# import task_share

# 

# \# This share holds a signed short (16-bit) integer

# my_share = task_share.Queue ('h', name="My Share")

# 

# \# Somewhere in one task, put data into the share

# my_share.put (some_data)

# 

# \# In another task, read data from the share

# something = my_share.get ()

# @endcode

class Share (BaseShare):

    ## A counter used to give serial numbers to shares for diagnostic use.
    ser_num = 0


    ## Create a shared data item used to transfer data between tasks.
    # 
    #  This method allocates memory in which the shared data will be buffered.
    # 
    #  Each share can only carry data of one particular type which must be
    #  chosen from the following list. The data type is specified by a 
    #  one-letter type code which is given as for the Python @c array.array
    #  type, which can be any of the following:
    #  |      |      |      |
    #  |:-----|:-----|:-----|
    #  | **b** (signed char) | **B** (unsigned char) | 8 bit integers |
    #  | **h** (signed short) | **H** (unsigned short) | 16 bit integers |
    #  | **i** (signed int) | **I** (unsigned int) | 32 bit integers (probably) |
    #  | **l** (signed long) | **L** (unsigned long) | 32 bit integers |
    #  | **q** (signed long long) | **Q** (unsigned long long) | 64 bit integers |
    #  | **f** (float) | **d** (double-precision float) | |
    # 
    #  @param type_code The type of data items which the share can hold
    #  @param thread_protect True if mutual exclusion protection is used
    #  @param name A short name for the share, default @c ShareN where @c N
    #         is a serial number for the share
    def __init__ (self, type_code, thread_protect = True, name = None):
        # First call the parent class initializer
        super ().__init__ (type_code, thread_protect, name)

        self._buffer = array.array (type_code, [0])

        self._name = str (name) if name != None \
            else 'Share' + str (Share.ser_num)
        Share.ser_num += 1


    ## Write an item of data into the share.
    # 
    #  This method puts data into the share; any old data is overwritten.
    #  This code disables interrupts during the writing so as to prevent
    #  data corrupting by an interrupt service routine which might access
    #  the same data.
    #  @param data The data to be put into this share
    #  @param in_ISR Set this to True if calling from within an ISR
    @micropython.native
    def put (self, data, in_ISR = False):

        # Disable interrupts before writing the data
        if self._thread_protect and not in_ISR:
            irq_state = pyb.disable_irq ()

        self._buffer[0] = data

        # Re-enable interrupts
        if self._thread_protect and not in_ISR:
            pyb.enable_irq (irq_state)


    ## Read an item of data from the share.
    # 
    #  If thread protection is enabled, interrupts are disabled during the time
    #  that the data is being read so as to prevent data corruption by changes
    #  in the data as it is being read. 
    #  @param in_ISR Set this to True if calling from within an ISR
    @micropython.native
    def get (self, in_ISR = False):
        # Disable interrupts before reading the data
        if self._thread_protect and not in_ISR:
            irq_state = pyb.disable_irq ()

        to_return = self._buffer[0]

        # Re-enable interrupts
        if self._thread_protect and not in_ISR:
            pyb.enable_irq (irq_state)

        return (to_return)


    ## Puts diagnostic information about the share into a string.
    #
    #  Shares are pretty simple, so we just put the name and type. 
    def __repr__ (self):
        return ("{:<12s} Share<{:s}>".format (self._name,
                type_code_strings[self._type_code]))