Alternate Main

This alternate main file incoporates IMU control outlined in attachments.

# main file
# by Kai De La Cruz and Kenzie Goldman

import gc 
import pyb 
from pyb import USB_VCP, Pin, UART 
import cotask
import task_share 
import time
from motor import Motor 
from encoder import Encoder 
from collector import Collector 
from IR_sensor import QTR_HD_15A 
from imu import BNO055
from controller import PIDController

class SystemInitializer: 
    def __init__(self, R_encoder, L_encoder, R_motor, L_motor, R_collector, 
                L_collector, IR_sens, imu, R_effort_share, L_effort_share): 
        """Initialization task which is run once at startup to set up 
        necessary components.""" 
        print("[Init] Initializing system...") 
        self.R_encoder = R_encoder 
        self.L_encoder = L_encoder 
        self.R_motor = R_motor 
        self.L_motor = L_motor 
        self.R_collector = R_collector 
        self.L_collector = L_collector 
        self.IR_sens = IR_sens
        self.R_effort_share = R_effort_share 
        self.L_effort_share = L_effort_share

        # Zero the encoders
        self.zero_encoders()

        # Initialize motors
        self.initialize_motors()

        # Initialize collectors
        self.initialize_collectors()

        self.initialize_IR_sensor()
        time.sleep_ms(3000)

        # Initialize the IMU 
        self.initialize_IMU()

        global effort_value_initial, L_effort_value, R_effort_value
        # Set initial effort values
        effort_value_initial = 35
        L_effort_value = effort_value_initial
        R_effort_value = effort_value_initial
        R_effort_share.put(R_effort_value)
        L_effort_share.put(L_effort_value)

    def zero_encoders(self):
        """Zero the encoders."""
        self.R_encoder.zero()
        self.L_encoder.zero()
        print("[Init] Encoders zeroed.")

    def initialize_motors(self):
        """Initialize the motors."""
        self.R_motor.disable()
        self.L_motor.disable()
        print("[Init] Motors initialized and disabled.")

    def initialize_collectors(self):
        """Initialize the collectors."""
        # Add any necessary initialization for collectors here
        print("[Init] Collectors initialized.")

    def initialize_IR_sensor(self):
        """Initialize the IR sensor."""
        file_path = "IR_calibration_data.txt"
        try:
            self.IR_sens.enable()
            # Try to load calibration data
            calibration_data = IR_sens.load_calibration(file_path)

            # Ensure the file contains valid data
            if calibration_data:
                print("[IR Sensor] Calibration data loaded successfully.")
                self.IR_sens.disable()
            else:
                self.IR_sens.disable()
                raise ValueError("[IMU Task] Calibration file is invalid.")
        except:
            self.IR_sens.enable()
            self.IR_sens.calibrate_light()
            self.IR_sens.calibrate_dark()
            self.IR_sens.disable()
            self.IR_sens.save_calibration(file_path)
            self.IR_sens.load_calibration(file_path)

    def initialize_IMU(self):
        """Initialize the IMU."""
        print("[Init] Initializing IMU...")
        imu.scan()
        # self.imu.set_mode(0x0C)  # NDOF mode
        print("[Init] IMU initialized.")

def Right_motor(shares): 
    """Set mode, direction, and PWM within motor class and pulls encoder 
    data at a set frequency.""" 
    print("[Right motor task] Starting...") 
    global R_motor, R_encoder 
    state = "IDLE" # Get references to the share and queue which have been 
    # passed to this task
    R_effort_share, R_pos_queue, R_vel_queue,  data_collection_complete = shares

    while True:
        if state == "IDLE":
            if R_effort_share.get() is not None:
                m_effort = R_effort_share.get()
                if m_effort != 0:
                    state = "RUNNING"

        elif state == "RUNNING":
            # Set motor effort
            m_effort = -1*R_effort_share.get()
            R_motor.set_effort(m_effort)
            R_motor.enable()

            R_encoder.update() 
            R_position = R_encoder.get_position()
            R_velocity = R_encoder.get_velocity()
            R_pos_queue.put(R_position)
            R_vel_queue.put(R_velocity)
            
        yield state

def Left_motor(shares): 
    """Set mode, direction, and PWM within motor class and pulls encoder 
    data at a set frequency.""" 
    print("[Left motor task] Starting...") 
    global L_motor, L_encoder 
    state = "IDLE" # Get references to the share and queue which have been 
    # passed to this task
    L_effort_share, R_pos_queue, R_vel_queue, data_collection_complete = shares

    while True:
        if state == "IDLE":
            if L_effort_share.get() is not None:
                m_effort = L_effort_share.get()
                if m_effort != 0:
                    state = "RUNNING"

        elif state == "RUNNING":
            # Set motor effort
            m_effort = -1*L_effort_share.get()
            L_motor.set_effort(m_effort)
            L_motor.enable()

            L_encoder.update() 
            L_position = L_encoder.get_position()
            L_velocity = L_encoder.get_velocity()
            L_pos_queue.put(L_position)
            L_vel_queue.put(L_velocity)
            
        yield state
        

def collector(shares): 
    """Pull data from collector class and format it as CSV for further analysis."""
    print("[COLLECTOR] Starting...") 
    state = "COLLECTING" # Get references to the share and queue which have 
    # been passed to this task
    L_pos_queue, L_vel_queue, R_pos_queue, R_vel_queue,
    data_collection_complete = shares

    while True:
        if state == "COLLECTING":
            while not L_pos_queue.empty() and not L_vel_queue.empty():
                #print("[COLLECTOR] Collecting data...")
                L_position = L_pos_queue.get()
                L_velocity = L_vel_queue.get()
                L_collector.collect(L_position, L_velocity)
                if L_collector.is_full():
                    L_collector.reset()
            while not R_pos_queue.empty() and not R_vel_queue.empty():
                #print("[COLLECTOR] Collecting data...")
                R_position = R_pos_queue.get()
                R_velocity = R_vel_queue.get()
                R_collector.collect(R_position, R_velocity)
                if R_collector.is_full():
                    R_collector.reset()

        yield state

def comms(shares): 
    """USB communication task for user interaction."""
    state = "WAIT" 
    print("[Comms Task] Starting...") 
    R_effort_share, L_effort_share = shares

    while True:
        if state == "WAIT":
            try:
                R_effort_input = int(R_effort_value)
                L_effort_input = int(L_effort_value)
                if -100 <= R_effort_input <= 100:
                    R_effort_share.put(R_effort_input)
                if -100 <= L_effort_input <= 100:
                    L_effort_share.put(L_effort_input)
            except ValueError:
                    print(b"Invalid input. Must be an integer.\r\n")

        yield state

def IR_sensor(shares): 
    '''Task to read the IR sensor and determine the position of the line''' 
    state = "READING"
    line_position_queue, dark_threshold = shares

    MIN_SENSOR_VALUE = 0   # Example: minimum line position value
    MAX_SENSOR_VALUE = 100  # Example: maximum line position value

    recent_values = []  # To store recent sensor values
    window_size = 5     # Number of values to average for smoothing

    while True:
        if state == "READING":
            IR_sens.enable()
            Centroid = IR_sens.read_line_position()  # Get the raw line position
            Cal = IR_sens.read_calibrated()  
            # print(f"[IR Sensor Task] Calibrated values: {Cal}")
            ave_cal = sum(Cal) / len(Cal)
            dark_threshold.put(ave_cal)
            # Apply sensor saturation to the Centroid value
            if Centroid < MIN_SENSOR_VALUE:
                Centroid = MIN_SENSOR_VALUE
            elif Centroid > MAX_SENSOR_VALUE:
                Centroid = MAX_SENSOR_VALUE
            
            # Filtering: Use a moving average to smooth the values
            recent_values.append(Centroid)

            # Keep only the last `window_size` values
            if len(recent_values) > window_size:
                recent_values.pop(0)

            # Calculate the average position to smooth out the sensor readings
            filtered_position = sum(recent_values) / len(recent_values)
            
            IR_sens.disable()

            # Put the filtered sensor value into the queue for the next task
            line_position_queue.put(filtered_position)
        
        yield state

def IMU_task(shares): 
    """Task to read IMU data and output Euler angles.""" 
    # print("[IMU Task] Starting...") 
    state = "CALIBRATING"
    imu_data_queue, imu_control_flg, imu_straight_heading = shares 
    
    file_path = "calibration_data.txt"

    while True:
        if state == "CALIBRATING":
            try:
                # Try to load calibration data
                calibration_data = imu.load_calibration_data(file_path)

                # Ensure the file contains valid data
                if calibration_data:
                    print("[IMU Task] Calibration data loaded successfully.")
                    imu.write_calibration_data_to_imu(calibration_data)
                    print("[IMU Task] IMU initialized with calibration data.")
                    state = "READING"
                else:
                    raise ValueError("[IMU Task] Calibration file is invalid.")

            except (ValueError, IndexError):
                # If file is missing or corrupted, generate new calibration data
                print("[IMU Task] No calibration data found. Creating new file.")
                imu.set_mode(0x0C)  # NDOF mode
                calibration_data = imu.get_calibration_coefficients()
                
                if imu.get_calibration_status() == 0x3F:
                    imu.set_mode(0x0F)  # Config mode
                    imu.write_calibration_data(file_path, calibration_data)
                    imu.write_calibration_data_to_imu(calibration_data)
                    state = "READING"  
                elif imu.get_calibration_status() == 0xFF:
                    imu.set_mode(0x0F)  # Config mode
                    imu.write_calibration_data(file_path, calibration_data)
                    imu.write_calibration_data_to_imu(calibration_data)
                    state = "READING"  
                else:
                    print("[IMU Task] IMU not calibrated. Please calibrate.")
                    state = "CALIBRATING"

        elif state == "READING":
            imu.set_mode(0x0C)  # NDOF mode
            flag = imu_control_flg.get()
            heading, roll, pitch = imu.get_euler_angles()
            # print(f"[IMU Task] Heading: {heading}")
            
            if flag == False:
                imu_straight_heading.put(heading)
                imu_data_queue.put(heading)
            else:
                imu_data_queue.put(heading)

        yield state

def IMU_vs_IR(shares): 
    """Task to determine whether to use IMU or IRvsensor for line following.""" 
    state = "IR_SENSOR" 
    imu_data_queue, line_position_queue, R_effort_share, L_effort_share,
    imu_straight_heading, imu_control_flg, dark_threshold = shares

    while True:
        if state == "IR_SENSOR":
            # Get the threshold value from the dark_threshold share
            threshold = dark_threshold.get()
            print(f"[IMU vs IR Task] IR value: {threshold}")

            # Check if the IR sensor is not detecting the line
            if threshold < 8:  # Adjust this threshold based on your environment
                imu_control_flg.put(True)
                state = "IMU_CONTROL"
                heading_setpoint = imu_straight_heading.get()  
                print("[IMU vs IR Task] Switching to IMU control...")

            if not line_position_queue.empty():
                print("[IR control] Using IR sensor for line following...")
                imu_control_flg.put(False)  # Turn off flag
                IR_sensor_center = 3  
                L_controller.set_setpoint(IR_sensor_center)
                R_controller.set_setpoint(IR_sensor_center)

                # Get the line position from the queue
                line_position = line_position_queue.get()

                # Compute correction from PID controller
                L_correction = L_controller.compute(line_position)
                R_correction = R_controller.compute(line_position)
                print(f"[IR Sensor] Correction: {R_correction}, {L_correction}")

                # Adjust motor efforts based on the correction
                # Slow down one side and speed up the other
                R_effort = R_effort_share.get()
                L_effort = L_effort_share.get()

                # Saturation limits
                R_effort = max(min(R_effort + R_correction, 20), 8)
                L_effort = max(min(L_effort - L_correction, 20), 8)
                print(f"[IR control] R effort: {R_effort} L effort: {L_effort}")
                R_effort = int(R_effort)
                L_effort = int(L_effort)

                # Update motor effort shares
                R_effort_share.put(R_effort)
                L_effort_share.put(L_effort)

        elif state == "IMU_CONTROL":
            print("[IMU control] Using IMU for line following...")
            if not imu_data_queue.empty():
                heading = imu_data_queue.get()
                current_effort_R = R_effort_share.get()
                current_effort_L = L_effort_share.get()
                heading_error = heading_setpoint - heading

                if heading_error < -180:
                    heading_error += 360
                elif heading_error > 180:
                    heading_error -= 360
                print(f"[IMU control] Heading error: {heading_error}")
                print(f"[IMU control] Heading setpoint: {heading_setpoint}")

                correction = imu_controller.compute(heading_error)
                print(f"[IMU control] PID correction: {correction}")

                # Apply correction to motor efforts
                R_effort = int(max(min(current_effort_R + correction, 20), 8))
                L_effort = int(max(min(current_effort_L - correction, 20), 8))

                R_effort_share.put(R_effort)
                L_effort_share.put(L_effort)

            # Check if the IR sensor has detected the line again
            if not line_position_queue.empty():
                state = "IR_SENSOR"

        yield state

if __name__ == "__main__": 
    # Create instances of the encoders, motors, and collectors 
    R_encoder = Encoder(3, 'A6', 'A7') 
    L_encoder = Encoder(2, 'A0', 'A1') 
    R_motor = Motor(4, 1,'B6', 'C8', 'C7') 
    L_motor = Motor(4, 2,'B7', 'H0', 'H1') 
    R_collector = Collector() 
    L_collector = Collector()
    IR_sens = QTR_HD_15A(['C0','C1','C2','C3','A4','B0','B1'],7,'A9','A15') 
    imu = BNO055(pyb.I2C(2, pyb.I2C.CONTROLLER)) # preconfiged as CONTROLLER
    R_controller = PIDController(25, 0, 0.01) 
    L_controller = PIDController(20, 0, 0.01) 
    imu_controller = PIDController(.2, 0, 0)

    # serial_stream = USB_VCP()

    # Create a share and a queue to test function and diagnostic printouts
    R_effort_share = task_share.Share('h', thread_protect=False, 
                                    name="R_effort_share")
    L_effort_share = task_share.Share('h', thread_protect=False, 
                                    name="L_effort_share")
    R_pos_queue = task_share.Queue('f', 100, thread_protect=False, 
                                        overwrite=False, name="R_pos_queue")
    R_vel_queue = task_share.Queue('f', 100, thread_protect=False, 
                                        overwrite=False, name="R_vel_queue")
    L_pos_queue = task_share.Queue('f', 100, thread_protect=False, 
                                        overwrite=False, name="L_pos_queue")
    L_vel_queue = task_share.Queue('f', 100, thread_protect=False, 
                                        overwrite=False, name="L_vel_queue") 
    data_collection_complete = task_share.Share('b', thread_protect=False,
                            name="data_collection_complete")  # Shared flag                                   
    line_position_queue = task_share.Queue('f', 100, thread_protect=False,
                                    overwrite=False, name="line_position_queue")
    imu_data_queue = task_share.Queue('f', 100, thread_protect=False, 
                                    overwrite=False, name="imu_data_queue")
    imu_control_flg = task_share.Share('b', thread_protect=False,
                            name="imu_control_flg")  # Shared flag
    imu_straight_heading = task_share.Share('f', thread_protect=False,
                            name="imu_straight_heading")  # Shared flag
    dark_threshold = task_share.Share('f', thread_protect=False,
                            name="dark_threshold")  # Shared flag
    # Initialize the system
    initializer = SystemInitializer(R_encoder, L_encoder, R_motor, L_motor, 
                                    R_collector, L_collector, IR_sens, imu, 
                                    R_effort_share, L_effort_share)

    R_motor_task = cotask.Task(Right_motor, name="Right_motor", priority=3, 
                        period=20, profile=True, trace=False, 
                        shares=(R_effort_share,R_pos_queue, R_vel_queue,
                        data_collection_complete,))
    L_motor_task = cotask.Task(Left_motor, name="Left_motor", priority=3, 
                        period=20, profile=True, trace=False, 
                        shares=(L_effort_share,L_pos_queue, L_vel_queue,
                        data_collection_complete,))
    collector_task = cotask.Task(collector, name="collector", priority=2, 
                        period=250,
                        profile=True, trace=False, shares=(R_pos_queue, 
                        R_vel_queue,data_collection_complete,)) 
    comms_task = cotask.Task(comms, name="comms", priority=2, period=750,
                        profile=True, trace=False, shares=(R_effort_share, 
                        L_effort_share,)) 

    IR_sensor_task = cotask.Task(IR_sensor, name="IR_sensor", 
                        priority=2, period=40, profile=True, trace=False,
                        shares = (line_position_queue,dark_threshold))

    imu_task = cotask.Task(IMU_task, name="IMU_task", priority=2, period=40, 
                            profile=True, trace=False, 
                            shares=(imu_data_queue,imu_control_flg, 
                                    imu_straight_heading))

    imu_vs_IR_task = cotask.Task(IMU_vs_IR, name="IMU_vs_IR",
                                priority=2, period=40, profile=True, trace=False,
                                shares=(imu_data_queue, line_position_queue,
                                R_effort_share, L_effort_share,imu_control_flg,
                                imu_straight_heading, dark_threshold))

    cotask.task_list.append(comms_task)
    cotask.task_list.append(R_motor_task)
    cotask.task_list.append(L_motor_task)
    cotask.task_list.append(collector_task)
    cotask.task_list.append(IR_sensor_task)
    cotask.task_list.append(imu_task)
    cotask.task_list.append(imu_vs_IR_task)


    # Run the memory garbage collector to ensure memory is as defragmented as
    # possible before the real-time scheduler is started
    gc.collect()

    # Run the scheduler with the chosen scheduling algorithm. Quit if ^C pressed
    while True:
        try:
            cotask.task_list.pri_sched()
        except KeyboardInterrupt:
            L_motor.disable()
            R_motor.disable()
            IR_sens.disable()
            print("[Main] IR sensor disabled.")
            print("[Main] Motors disabled.")
            print(f"R M Ticks: {R_encoder.get_position()}")
            print("[Main] Exiting...")
            break

    # Print a table of task data and a table of shared information data
    print('\n' + str (cotask.task_list))
    print(task_share.show_all())
    print(comms_task.get_trace())
    print('')