# Task Share *This file is adapted from Dr. John Ridgely, Cal Poly SLO Mechanical Engineering* # @file task_share.py # This file contains classes which allow tasks to share data without the risk # of data corruption by interrupts. # # @author JR Ridgely # @date 2017-Jan-01 JRR Approximate date of creation of file # @date 2021-Dec-18 JRR Docstrings changed to work without DoxyPyPy # @copyright This program is copyright (c) 2017-2023 by JR Ridgely and released # under the GNU Public License, version 3.0. # # It is intended for educational use only, but its use is not limited thereto. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. import array import gc import pyb import micropython ## This is a system-wide list of all the queues and shared variables. It is # used to create diagnostic printouts. share_list = \[\] ## This dictionary allows readable printouts of queue and share data types. type_code_strings = {'b' : "int8", 'B' : "uint8", 'h' : "int16", 'H' : "uint16", 'i' : "int(?)", 'I' : "uint(?)", 'l' : "int32", 'L' : "uint32", 'q' : "int64", 'Q' : "uint64", 'f' : "float", 'd' : "double"} ## Create a string holding a diagnostic printout showing the status of # each queue and share in the system. # @return A string containing information about each queue and share def show_all (): gen = (str (item) for item in share_list) return '`\n`{=tex}'.join (gen) ## Base class for queues and shares which exchange data between tasks. # # One should never create an object from this class; it doesn't do anything # useful. It exists to implement things which are common between its child # classes @c Queue and @c Share. class BaseShare: ## Create a base queue object when called by a child class initializer. # # This method creates the things which queues and shares have in common. def __init__ (self, type_code, thread_protect = True, name = None): self._type_code = type_code self._thread_protect = thread_protect # Add this queue to the global share and queue list share_list.append (self) ## A queue which is used to transfer data from one task to another. # # If parameter 'thread_protect' is @c True when a queue is created, transfers # of data will be protected from corruption in the case that one task might # interrupt another due to use in a pre-emptive multithreading environment or # due to one task being run as an interrupt service routine. # # An example of the creation and use of a queue is as follows: # # @code # import task_share # # \# This queue holds unsigned short (16-bit) integers # my_queue = task_share.Queue ('H', 100, name="My Queue") # # \# Somewhere in one task, put data into the queue # my_queue.put (some_data) # # \# In another task, read data from the queue # something = my_queue.get () # @endcode class Queue (BaseShare): ## A counter used to give serial numbers to queues for diagnostic use. ser_num = 0 ## Initialize a queue object to carry and buffer data between tasks. # # This method sets up a queue by allocating memory for the contents and # setting up the components in an empty configuration. # # Each queue can only carry data of one particular type which must be # chosen from the following list. The data type is specified by a # one-letter type code which is given as for the Python @c array.array # type, which can be any of the following: # | | | | # |:-----|:-----|:-----| # | **b** (signed char) | **B** (unsigned char) | 8 bit integers | # | **h** (signed short) | **H** (unsigned short) | 16 bit integers | # | **i** (signed int) | **I** (unsigned int) | 32 bit integers (probably) | # | **l** (signed long) | **L** (unsigned long) | 32 bit integers | # | **q** (signed long long) | **Q** (unsigned long long) | 64 bit integers | # | **f** (float) | **d** (double-precision float) | | # # @param type_code The type of data items which the queue can hold # @param size The maximum number of items which the queue can hold # @param thread_protect @c True if mutual exclusion protection is used # @param overwrite If @c True, oldest data will be overwritten with new # data if the queue becomes full # @param name A short name for the queue, default @c QueueN where @c N # is a serial number for the queue def __init__ (self, type_code, size, thread_protect = False, overwrite = False, name = None): # First call the parent class initializer super ().__init__ (type_code, thread_protect, name) self._size = size self._overwrite = overwrite self._name = str (name) if name != None \ else 'Queue' + str (Queue.ser_num) Queue.ser_num += 1 # Allocate memory in which the queue's data will be stored try: self._buffer = array.array (type_code, range (size)) except MemoryError: self._buffer = None raise except ValueError: self._buffer = None raise # Initialize pointers to be used for reading and writing data self.clear () # Since we may have allocated a bunch of memory, call the garbage # collector to neaten up what memory is left for future use gc.collect () ## Put an item into the queue. # # If there isn't room for the item, wait (blocking the calling process) # until room becomes available, unless the @c overwrite constructor # parameter was set to @c True to allow old data to be clobbered. If # non-blocking behavior without overwriting is needed, one should call # @c full() to ensure that the queue is not full before putting data # into it: # @code # def some_task (): # # Setup # while True: # if not my_queue.full (): # my_queue.put (create_something_to_put ()) # yield 0 # @endcode # @param item The item to be placed into the queue # @param in_ISR Set this to @c True if calling from within an ISR @micropython.native def put (self, item, in_ISR = False): # If we're in an ISR and the queue is full and we're not allowed to # overwrite data, we have to give up and exit if self.full (): if in_ISR: return # Wait (if needed) until there's room in the buffer for the data if not self._overwrite: while self.full (): pass # Prevent data corruption by blocking interrupts during data transfer if self._thread_protect and not in_ISR: _irq_state = pyb.disable_irq () # Write the data and advance the counts and pointers self._buffer[self._wr_idx] = item self._wr_idx += 1 if self._wr_idx >= self._size: self._wr_idx = 0 self._num_items += 1 if self._num_items >= self._size: # Can't be fuller than full self._num_items = self._size if self._num_items > self._max_full: # Record maximum fillage self._max_full = self._num_items # Re-enable interrupts if self._thread_protect and not in_ISR: pyb.enable_irq (_irq_state) ## Read an item from the queue. # # If there isn't anything in there, wait (blocking the calling process) # until something becomes available. If non-blocking reads are needed, # one should call @c any() to check for items before attempting to read # from the queue. This is usually done in a low priority task: # @code # def some_task (): # # Setup # while True: # if my_queue.any (): # something = my_queue.get () # do_something_with (something) # # More loop stuff # yield 0 # @endcode # @param in_ISR Set this to @c True if calling from within an ISR @micropython.native def get (self, in_ISR = False): # Wait until there's something in the queue to be returned while self.empty (): pass # Prevent data corruption by blocking interrupts during data transfer if self._thread_protect and not in_ISR: irq_state = pyb.disable_irq () # Get the item to be returned from the queue to_return = self._buffer[self._rd_idx] # Move the read pointer and adjust the number of items in the queue self._rd_idx += 1 if self._rd_idx >= self._size: self._rd_idx = 0 self._num_items -= 1 if self._num_items < 0: self._num_items = 0 # Re-enable interrupts if self._thread_protect and not in_ISR: pyb.enable_irq (irq_state) return (to_return) ## Check if there are any items in the queue. # # Returns @c True if there are any items in the queue and @c False # if the queue is empty. # @return @c True if items are in the queue, @c False if not @micropython.native def any (self): return (self._num_items > 0) ## Check if the queue is empty. # # Returns @c True if there are no items in the queue and @c False if # there are any items therein. # @return @c True if queue is empty, @c False if it's not empty @micropython.native def empty (self): return (self._num_items <= 0) ## Check if the queue is full. # # This method returns @c True if the queue is already full and there # is no room for more data without overwriting existing data. # @return @c True if the queue is full @micropython.native def full (self): return (self._num_items >= self._size) ## Check how many items are in the queue. # # This method returns the number of items which are currently in the # queue. # @return The number of items in the queue @micropython.native def num_in (self): return (self._num_items) ## Remove all contents from the queue. def clear (self): self._rd_idx = 0 self._wr_idx = 0 self._num_items = 0 self._max_full = 0 ## This method puts diagnostic information about the queue into a string. # # It shows the queue's name and type as well as the maximum number of # items and queue size. def __repr__ (self): return ('{:<12s} Queue<{:s}> Max Full {:d}/{:d}'.format (self._name, type_code_strings[self._type_code], self._max_full, self._size)) # ============================================================================ ## An item which holds data to be shared between tasks. # This class implements a shared data item which can be protected against # data corruption by pre-emptive multithreading. Multithreading which can # corrupt shared data includes the use of ordinary interrupts as well as the # use of pre-emptive multithreading such as by a Real-Time Operating System # (RTOS). # # An example of the creation and use of a share is as follows: # @code # import task_share # # \# This share holds a signed short (16-bit) integer # my_share = task_share.Queue ('h', name="My Share") # # \# Somewhere in one task, put data into the share # my_share.put (some_data) # # \# In another task, read data from the share # something = my_share.get () # @endcode class Share (BaseShare): ## A counter used to give serial numbers to shares for diagnostic use. ser_num = 0 ## Create a shared data item used to transfer data between tasks. # # This method allocates memory in which the shared data will be buffered. # # Each share can only carry data of one particular type which must be # chosen from the following list. The data type is specified by a # one-letter type code which is given as for the Python @c array.array # type, which can be any of the following: # | | | | # |:-----|:-----|:-----| # | **b** (signed char) | **B** (unsigned char) | 8 bit integers | # | **h** (signed short) | **H** (unsigned short) | 16 bit integers | # | **i** (signed int) | **I** (unsigned int) | 32 bit integers (probably) | # | **l** (signed long) | **L** (unsigned long) | 32 bit integers | # | **q** (signed long long) | **Q** (unsigned long long) | 64 bit integers | # | **f** (float) | **d** (double-precision float) | | # # @param type_code The type of data items which the share can hold # @param thread_protect True if mutual exclusion protection is used # @param name A short name for the share, default @c ShareN where @c N # is a serial number for the share def __init__ (self, type_code, thread_protect = True, name = None): # First call the parent class initializer super ().__init__ (type_code, thread_protect, name) self._buffer = array.array (type_code, [0]) self._name = str (name) if name != None \ else 'Share' + str (Share.ser_num) Share.ser_num += 1 ## Write an item of data into the share. # # This method puts data into the share; any old data is overwritten. # This code disables interrupts during the writing so as to prevent # data corrupting by an interrupt service routine which might access # the same data. # @param data The data to be put into this share # @param in_ISR Set this to True if calling from within an ISR @micropython.native def put (self, data, in_ISR = False): # Disable interrupts before writing the data if self._thread_protect and not in_ISR: irq_state = pyb.disable_irq () self._buffer[0] = data # Re-enable interrupts if self._thread_protect and not in_ISR: pyb.enable_irq (irq_state) ## Read an item of data from the share. # # If thread protection is enabled, interrupts are disabled during the time # that the data is being read so as to prevent data corruption by changes # in the data as it is being read. # @param in_ISR Set this to True if calling from within an ISR @micropython.native def get (self, in_ISR = False): # Disable interrupts before reading the data if self._thread_protect and not in_ISR: irq_state = pyb.disable_irq () to_return = self._buffer[0] # Re-enable interrupts if self._thread_protect and not in_ISR: pyb.enable_irq (irq_state) return (to_return) ## Puts diagnostic information about the share into a string. # # Shares are pretty simple, so we just put the name and type. def __repr__ (self): return ("{:<12s} Share<{:s}>".format (self._name, type_code_strings[self._type_code]))