Cotask

This file is adapted from Dr. John Ridgely, Cal Poly SLO Mechanical Engineering

# @file cotask.py

# This file contains classes to run cooperatively scheduled tasks in a

# multitasking system.

# 

# Tasks are created as generators, functions which have infinite loops and call

# @c yield at least once in the loop. References to all the tasks to be run

# in the system are kept in a list maintained by class @c CoTaskList; the

# system scheduler then runs the tasks' @c run() methods according to a

# chosen scheduling algorithm such as round-robin or highest-priority-first.

# 

# @author JR Ridgely

# @date 2017-Jan-01 JRR Approximate date of creation of file

# @date 2021-Dec-18 JRR Docstrings modified to work without DoxyPyPy

# @copyright This program is copyright (c) 2017-2023 by JR Ridgely and

# released under the GNU Public License, version 3.0.

# 

# It is intended for educational use only, but its use is not limited thereto.

# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"

# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE

# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE

# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE

# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR

# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF

# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS

# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN

# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)

# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE

# POSSIBILITY OF SUCH DAMAGE.

import gc \# Memory allocation garbage collector import utime \#
Micropython version of time library import micropython \# This shuts up
incorrect warnings

## Implements multitasking with scheduling and some performance logging.

# 

# This class implements behavior common to tasks in a cooperative

# multitasking system which runs in MicroPython. The ability to be scheduled

# on the basis of time or an external software trigger or interrupt is

# implemented, state transitions can be recorded, and run times can be

# profiled. The user's task code must be implemented in a generator which

# yields the state (and the CPU) after it has run for a short and bounded

# period of time.

# 

# @b Example:

# @code

# def task1_fun ():

# '''! This function switches states repeatedly for no reason'''

# state = 0

# while True:

# if state == 0:

# state = 1

# elif state == 1:

# state = 0

# yield (state)

# 

# \# In main routine, create this task and set it to run twice per second

# task1 = cotask.Task (task1_fun, name = 'Task 1', priority = 1,

# period = 500, profile = True, trace = True)

# 

# \# Add the task to the list (so it will be run) and run scheduler

# cotask.task_list.append (task1)

# while True:

# cotask.task_list.pri_sched ()

# @endcode

class Task:

    ## Initialize a task object so it may be run by the scheduler.
    # 
    #  This method initializes a task object, saving copies of constructor
    #  parameters and preparing an empty dictionary for states.
    # 
    #  @param run_fun The function which implements the task's code. It must
    #         be a generator which yields the current state.
    #  @param name The name of the task, by default @c NoName. This should
    #         be overridden with a more descriptive name by the programmer.
    #  @param priority The priority of the task, a positive integer with
    #         higher numbers meaning higher priority (default 0)
    #  @param period The time in milliseconds between runs of the task if it's
    #         run by a timer or @c None if the task is not run by a timer.
    #         The time can be given in a @c float or @c int; it will be 
    #         converted to microseconds for internal use by the scheduler.
    #  @param profile Set to @c True to enable run-time profiling 
    #  @param trace Set to @c True to generate a list of transitions between
    #         states. @b Note: This slows things down and allocates memory.
    #  @param shares A list or tuple of shares and queues used by this task.
    #         If no list is given, no shares are passed to the task
    def __init__(self, run_fun, name="NoName", priority=0, period=None,
                profile=False, trace=False, shares=()):
        # The function which is run to implement this task's code. Since it 
        # is a generator, we "run" it here, which doesn't actually run it but
        # gets it going as a generator which is ready to yield values
        if shares:
            self._run_gen = run_fun(shares)
        else:
            self._run_gen = run_fun()

        ## The name of the task, hopefully a short and descriptive string.
        self.name = name

        ## The task's priority, an integer with higher numbers meaning higher 
        #  priority. 
        self.priority = int(priority)

        ## The period, in milliseconds, between runs of the task's @c run()
        #  method. If the period is @c None, the @c run() method won't be run
        #  on a time basis but will instead be run by the scheduler as soon
        #  as feasible after code such as an interrupt handler calls the 
        #  @c go() method. 
        if period != None:
            self.period = int(period * 1000)
            self._next_run = utime.ticks_us() + self.period
        else:
            self.period = period
            self._next_run = None

        # Flag which causes the task to be profiled, in which the execution
        #  time of the @c run() method is measured and basic statistics kept. 
        self._prof = profile
        self.reset_profile()

        # The previous state in which the task last ran. It is used to watch
        # for and track state transitions.
        self._prev_state = 0

        # If transition tracing has been enabled, create an empty list in 
        # which to store transition (time, to-state) stamps
        self._trace = trace
        self._tr_data = []
        self._prev_time = utime.ticks_us()

        ## Flag which is set true when the task is ready to be run by the
        #  scheduler
        self.go_flag = False


    ## This method is called by the scheduler; it attempts to run this task.
    #  If the task is not yet ready to run, this method returns @c False
    #  immediately; if this task is ready to run, it runs the task's generator
    #  up to the next @c yield() and then returns @c True.
    # 
    #  @return @c True if the task ran or @c False if it did not
    def schedule(self) -> bool:
        if self.ready():

            # Reset the go flag for the next run
            self.go_flag = False

            # If profiling, save the start time
            if self._prof:
                stime = utime.ticks_us()

            # Run the method belonging to the state which should be run next
            curr_state = next(self._run_gen)

            # If profiling or tracing, save timing data
            if self._prof or self._trace:
                etime = utime.ticks_us()

            # If profiling, save timing data
            if self._prof:
                self._runs += 1
                runt = utime.ticks_diff(etime, stime)
                if self._runs > 2:
                    self._run_sum += runt
                    if runt > self._slowest:
                        self._slowest = runt

            # If transition logic tracing is on, record a transition; if not,
            # ignore the state. If out of memory, switch tracing off and 
            # run the memory allocation garbage collector
            if self._trace:
                try:
                    if curr_state != self._prev_state:
                        self._tr_data.append(
                            (utime.ticks_diff(etime, self._prev_time),
                            curr_state))
                except MemoryError:
                    self._trace = False
                    gc.collect()

                self._prev_state = curr_state
                self._prev_time = etime

            return True

        else:
            return False


    ## This method checks if the task is ready to run.
    #  If the task runs on a timer, this method checks what time it is; if not,
    #  this method checks the flag which indicates that the task is ready to
    #  go. This method may be overridden in descendent classes to implement
    #  some other behavior.
    @micropython.native
    def ready(self) -> bool:
        # If this task uses a timer, check if it's time to run run() again. If
        # so, set go flag and set the timer to go off at the next run time
        if self.period != None:
            late = utime.ticks_diff(utime.ticks_us(), self._next_run)
            if late > 0:
                self.go_flag = True
                self._next_run = utime.ticks_diff(self.period, 
                                                -self._next_run)

                # If keeping a latency profile, record the data
                if self._prof:
                    self._late_sum += late
                    if late > self._latest:
                        self._latest = late

        # If the task doesn't use a timer, we rely on go_flag to signal ready
        return self.go_flag


    ## This method sets the period between runs of the task to the given
    #  number of milliseconds, or @c None if the task is triggered by calls
    #  to @c go() rather than time.
    #  @param new_period The new period in milliseconds between task runs
    def set_period(self, new_period):
        if new_period is None:
            self.period = None
        else:
            self.period = int(new_period) * 1000


    ## This method resets the variables used for execution time profiling.
    #  This method is also used by @c __init__() to create the variables.
    def reset_profile(self):
        self._runs = 0
        self._run_sum = 0
        self._slowest = 0
        self._late_sum = 0
        self._latest = 0


    ## This method returns a string containing the task's transition trace.
    #  The trace is a set of tuples, each of which contains a time and the
    #  states from and to which the system transitioned. 
    #  @return A possibly quite large string showing state transitions
    def get_trace(self):
        tr_str = 'Task ' + self.name + ':'
        if self._trace:
            tr_str += '\n'
            last_state = 0
            total_time = 0.0
            for item in self._tr_data:
                total_time += item[0] / 1000000.0
                tr_str += '{: 12.6f}: {: 2d} -> {:d}\n'.format (total_time, 
                    last_state, item[1])
                last_state = item[1]
        else:
            tr_str += ' not traced'
        return tr_str


    ## Method to set a flag so that this task indicates that it's ready to run.
    #  This method may be called from an interrupt service routine or from
    #  another task which has data that this task needs to process soon.
    def go(self):
        self.go_flag = True


    ## This method converts the task to a string for diagnostic use.
    #  It shows information about the task, including execution time
    #  profiling results if profiling has been done.
    #  @returns The string which represents the task
    def __repr__(self):
        rst = f"{self.name:<16s}{self.priority: 4d}"
        try:
            rst += f"{(self.period / 1000.0): 10.1f}"
        except TypeError:
            rst += '         -'
        rst += f"{self._runs: 8d}"

        if self._prof and self._runs > 0:
            avg_dur = (self._run_sum / self._runs) / 1000.0
            avg_late = (self._late_sum / self._runs) / 1000.0
            rst += f"{avg_dur: 10.3f}{(self._slowest / 1000.0): 10.3f}"
            if self.period != None:
                rst += f"{avg_late: 10.3f}{(self._latest / 1000.0): 10.3f}"
        return rst

# =============================================================================

## A list of tasks used internally by the task scheduler.

# This class holds the list of tasks which will be run by the task scheduler.

# The task list is usually not directly used by the programmer except when

# tasks are added to it and the scheduler is called. An example showing the

# use of the task list is given in the last few lines of the documentation

# for class @c Task.

# 

# The task list is sorted by priority so that the scheduler can efficiently

# look through the list to find the highest priority task which is ready to

# run at any given time. Tasks can also be scheduled in a simpler

# "round-robin" fashion.

class TaskList:

    ## Initialize the task list. This creates the list of priorities in
    #  which tasks will be organized by priority.
    def __init__(self):

        ## The list of priority lists. Each priority for which at least one 
        #  task has been created has a list whose first element is a task 
        #  priority and whose other elements are references to task objects at
        #  that priority. 
        self.pri_list = []


    ## Append a task to the task list. The list will be sorted by task 
    #  priorities so that the scheduler can quickly find the highest priority
    #  task which is ready to run at any given time. 
    #  @param task The task to be appended to the list
    def append(self, task):
        # See if there's a tasklist with the given priority in the main list
        new_pri = task.priority
        for pri in self.pri_list:
            # If a tasklist with this priority exists, add this task to it.
            if pri[0] == new_pri:
                pri.append(task)
                break

        # If the priority isn't in the list, this else clause starts a new 
        # priority list with this task as first one. A priority list has the
        # priority as element 0, an index into the list of tasks (used for
        # round-robin scheduling those tasks) as the second item, and tasks
        # after those
        else:
            self.pri_list.append([new_pri, 2, task])

        # Make sure the main list (of lists at each priority) is sorted
        self.pri_list.sort(key=lambda pri: pri[0], reverse=True)


    ## Run tasks in order, ignoring the tasks' priorities.
    #
    #  This scheduling method runs tasks in a round-robin fashion. Each
    #  time it is called, it goes through the list of tasks and gives each of
    #  them a chance to run. Although this scheduler first runs higher priority
    #  tasks first, that has no significant effect in the long run, as all the
    #  tasks are given a chance to run each time through the list, and it takes
    #  about the same amount of time before each is given a chance to run 
    #  again.
    @micropython.native
    def rr_sched(self):
        # For each priority level, run all tasks at that level
        for pri in self.pri_list:
            for task in pri[2:]:
                task.schedule()


    ## Run tasks according to their priorities.
    #
    #  This scheduler runs tasks in a priority based fashion. Each time it is
    #  called, it finds the highest priority task which is ready to run and
    #  calls that task's @c run() method.
    @micropython.native
    def pri_sched(self):
        # Go down the list of priorities, beginning with the highest
        for pri in self.pri_list:
            # Within each priority list, run tasks in round-robin order
            # Each priority list is [priority, index, task, task, ...] where
            # index is the index of the next task in the list to be run
            tries = 2
            length = len(pri)
            while tries < length:
                ran = pri[pri[1]].schedule()
                tries += 1
                pri[1] += 1
                if pri[1] >= length:
                    pri[1] = 2
                if ran:
                    return


    ## Create some diagnostic text showing the tasks in the task list.
    def __repr__(self):
        ret_str = 'TASK             PRI    PERIOD    RUNS   AVG DUR   MAX ' \
            'DUR  AVG LATE  MAX LATE\n'
        for pri in self.pri_list:
            for task in pri[2:]:
                ret_str += str(task) + '\n'

        return ret_str

## This is @b the main task list which is created for scheduling when

# @c cotask.py is imported into a program.

task_list = TaskList()