# Cotask *This file is adapted from Dr. John Ridgely, Cal Poly SLO Mechanical Engineering* # @file cotask.py # This file contains classes to run cooperatively scheduled tasks in a # multitasking system. # # Tasks are created as generators, functions which have infinite loops and call # @c yield at least once in the loop. References to all the tasks to be run # in the system are kept in a list maintained by class @c CoTaskList; the # system scheduler then runs the tasks' @c run() methods according to a # chosen scheduling algorithm such as round-robin or highest-priority-first. # # @author JR Ridgely # @date 2017-Jan-01 JRR Approximate date of creation of file # @date 2021-Dec-18 JRR Docstrings modified to work without DoxyPyPy # @copyright This program is copyright (c) 2017-2023 by JR Ridgely and # released under the GNU Public License, version 3.0. # # It is intended for educational use only, but its use is not limited thereto. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. import gc \# Memory allocation garbage collector import utime \# Micropython version of time library import micropython \# This shuts up incorrect warnings ## Implements multitasking with scheduling and some performance logging. # # This class implements behavior common to tasks in a cooperative # multitasking system which runs in MicroPython. The ability to be scheduled # on the basis of time or an external software trigger or interrupt is # implemented, state transitions can be recorded, and run times can be # profiled. The user's task code must be implemented in a generator which # yields the state (and the CPU) after it has run for a short and bounded # period of time. # # @b Example: # @code # def task1_fun (): # '''! This function switches states repeatedly for no reason''' # state = 0 # while True: # if state == 0: # state = 1 # elif state == 1: # state = 0 # yield (state) # # \# In main routine, create this task and set it to run twice per second # task1 = cotask.Task (task1_fun, name = 'Task 1', priority = 1, # period = 500, profile = True, trace = True) # # \# Add the task to the list (so it will be run) and run scheduler # cotask.task_list.append (task1) # while True: # cotask.task_list.pri_sched () # @endcode class Task: ## Initialize a task object so it may be run by the scheduler. # # This method initializes a task object, saving copies of constructor # parameters and preparing an empty dictionary for states. # # @param run_fun The function which implements the task's code. It must # be a generator which yields the current state. # @param name The name of the task, by default @c NoName. This should # be overridden with a more descriptive name by the programmer. # @param priority The priority of the task, a positive integer with # higher numbers meaning higher priority (default 0) # @param period The time in milliseconds between runs of the task if it's # run by a timer or @c None if the task is not run by a timer. # The time can be given in a @c float or @c int; it will be # converted to microseconds for internal use by the scheduler. # @param profile Set to @c True to enable run-time profiling # @param trace Set to @c True to generate a list of transitions between # states. @b Note: This slows things down and allocates memory. # @param shares A list or tuple of shares and queues used by this task. # If no list is given, no shares are passed to the task def __init__(self, run_fun, name="NoName", priority=0, period=None, profile=False, trace=False, shares=()): # The function which is run to implement this task's code. Since it # is a generator, we "run" it here, which doesn't actually run it but # gets it going as a generator which is ready to yield values if shares: self._run_gen = run_fun(shares) else: self._run_gen = run_fun() ## The name of the task, hopefully a short and descriptive string. self.name = name ## The task's priority, an integer with higher numbers meaning higher # priority. self.priority = int(priority) ## The period, in milliseconds, between runs of the task's @c run() # method. If the period is @c None, the @c run() method won't be run # on a time basis but will instead be run by the scheduler as soon # as feasible after code such as an interrupt handler calls the # @c go() method. if period != None: self.period = int(period * 1000) self._next_run = utime.ticks_us() + self.period else: self.period = period self._next_run = None # Flag which causes the task to be profiled, in which the execution # time of the @c run() method is measured and basic statistics kept. self._prof = profile self.reset_profile() # The previous state in which the task last ran. It is used to watch # for and track state transitions. self._prev_state = 0 # If transition tracing has been enabled, create an empty list in # which to store transition (time, to-state) stamps self._trace = trace self._tr_data = [] self._prev_time = utime.ticks_us() ## Flag which is set true when the task is ready to be run by the # scheduler self.go_flag = False ## This method is called by the scheduler; it attempts to run this task. # If the task is not yet ready to run, this method returns @c False # immediately; if this task is ready to run, it runs the task's generator # up to the next @c yield() and then returns @c True. # # @return @c True if the task ran or @c False if it did not def schedule(self) -> bool: if self.ready(): # Reset the go flag for the next run self.go_flag = False # If profiling, save the start time if self._prof: stime = utime.ticks_us() # Run the method belonging to the state which should be run next curr_state = next(self._run_gen) # If profiling or tracing, save timing data if self._prof or self._trace: etime = utime.ticks_us() # If profiling, save timing data if self._prof: self._runs += 1 runt = utime.ticks_diff(etime, stime) if self._runs > 2: self._run_sum += runt if runt > self._slowest: self._slowest = runt # If transition logic tracing is on, record a transition; if not, # ignore the state. If out of memory, switch tracing off and # run the memory allocation garbage collector if self._trace: try: if curr_state != self._prev_state: self._tr_data.append( (utime.ticks_diff(etime, self._prev_time), curr_state)) except MemoryError: self._trace = False gc.collect() self._prev_state = curr_state self._prev_time = etime return True else: return False ## This method checks if the task is ready to run. # If the task runs on a timer, this method checks what time it is; if not, # this method checks the flag which indicates that the task is ready to # go. This method may be overridden in descendent classes to implement # some other behavior. @micropython.native def ready(self) -> bool: # If this task uses a timer, check if it's time to run run() again. If # so, set go flag and set the timer to go off at the next run time if self.period != None: late = utime.ticks_diff(utime.ticks_us(), self._next_run) if late > 0: self.go_flag = True self._next_run = utime.ticks_diff(self.period, -self._next_run) # If keeping a latency profile, record the data if self._prof: self._late_sum += late if late > self._latest: self._latest = late # If the task doesn't use a timer, we rely on go_flag to signal ready return self.go_flag ## This method sets the period between runs of the task to the given # number of milliseconds, or @c None if the task is triggered by calls # to @c go() rather than time. # @param new_period The new period in milliseconds between task runs def set_period(self, new_period): if new_period is None: self.period = None else: self.period = int(new_period) * 1000 ## This method resets the variables used for execution time profiling. # This method is also used by @c __init__() to create the variables. def reset_profile(self): self._runs = 0 self._run_sum = 0 self._slowest = 0 self._late_sum = 0 self._latest = 0 ## This method returns a string containing the task's transition trace. # The trace is a set of tuples, each of which contains a time and the # states from and to which the system transitioned. # @return A possibly quite large string showing state transitions def get_trace(self): tr_str = 'Task ' + self.name + ':' if self._trace: tr_str += '\n' last_state = 0 total_time = 0.0 for item in self._tr_data: total_time += item[0] / 1000000.0 tr_str += '{: 12.6f}: {: 2d} -> {:d}\n'.format (total_time, last_state, item[1]) last_state = item[1] else: tr_str += ' not traced' return tr_str ## Method to set a flag so that this task indicates that it's ready to run. # This method may be called from an interrupt service routine or from # another task which has data that this task needs to process soon. def go(self): self.go_flag = True ## This method converts the task to a string for diagnostic use. # It shows information about the task, including execution time # profiling results if profiling has been done. # @returns The string which represents the task def __repr__(self): rst = f"{self.name:<16s}{self.priority: 4d}" try: rst += f"{(self.period / 1000.0): 10.1f}" except TypeError: rst += ' -' rst += f"{self._runs: 8d}" if self._prof and self._runs > 0: avg_dur = (self._run_sum / self._runs) / 1000.0 avg_late = (self._late_sum / self._runs) / 1000.0 rst += f"{avg_dur: 10.3f}{(self._slowest / 1000.0): 10.3f}" if self.period != None: rst += f"{avg_late: 10.3f}{(self._latest / 1000.0): 10.3f}" return rst # ============================================================================= ## A list of tasks used internally by the task scheduler. # This class holds the list of tasks which will be run by the task scheduler. # The task list is usually not directly used by the programmer except when # tasks are added to it and the scheduler is called. An example showing the # use of the task list is given in the last few lines of the documentation # for class @c Task. # # The task list is sorted by priority so that the scheduler can efficiently # look through the list to find the highest priority task which is ready to # run at any given time. Tasks can also be scheduled in a simpler # "round-robin" fashion. class TaskList: ## Initialize the task list. This creates the list of priorities in # which tasks will be organized by priority. def __init__(self): ## The list of priority lists. Each priority for which at least one # task has been created has a list whose first element is a task # priority and whose other elements are references to task objects at # that priority. self.pri_list = [] ## Append a task to the task list. The list will be sorted by task # priorities so that the scheduler can quickly find the highest priority # task which is ready to run at any given time. # @param task The task to be appended to the list def append(self, task): # See if there's a tasklist with the given priority in the main list new_pri = task.priority for pri in self.pri_list: # If a tasklist with this priority exists, add this task to it. if pri[0] == new_pri: pri.append(task) break # If the priority isn't in the list, this else clause starts a new # priority list with this task as first one. A priority list has the # priority as element 0, an index into the list of tasks (used for # round-robin scheduling those tasks) as the second item, and tasks # after those else: self.pri_list.append([new_pri, 2, task]) # Make sure the main list (of lists at each priority) is sorted self.pri_list.sort(key=lambda pri: pri[0], reverse=True) ## Run tasks in order, ignoring the tasks' priorities. # # This scheduling method runs tasks in a round-robin fashion. Each # time it is called, it goes through the list of tasks and gives each of # them a chance to run. Although this scheduler first runs higher priority # tasks first, that has no significant effect in the long run, as all the # tasks are given a chance to run each time through the list, and it takes # about the same amount of time before each is given a chance to run # again. @micropython.native def rr_sched(self): # For each priority level, run all tasks at that level for pri in self.pri_list: for task in pri[2:]: task.schedule() ## Run tasks according to their priorities. # # This scheduler runs tasks in a priority based fashion. Each time it is # called, it finds the highest priority task which is ready to run and # calls that task's @c run() method. @micropython.native def pri_sched(self): # Go down the list of priorities, beginning with the highest for pri in self.pri_list: # Within each priority list, run tasks in round-robin order # Each priority list is [priority, index, task, task, ...] where # index is the index of the next task in the list to be run tries = 2 length = len(pri) while tries < length: ran = pri[pri[1]].schedule() tries += 1 pri[1] += 1 if pri[1] >= length: pri[1] = 2 if ran: return ## Create some diagnostic text showing the tasks in the task list. def __repr__(self): ret_str = 'TASK PRI PERIOD RUNS AVG DUR MAX ' \ 'DUR AVG LATE MAX LATE\n' for pri in self.pri_list: for task in pri[2:]: ret_str += str(task) + '\n' return ret_str ## This is @b the main task list which is created for scheduling when # @c cotask.py is imported into a program. task_list = TaskList()