# Alternate Main This alternate main file incoporates IMU control outlined in [attachments](alt_main.rst). # main file # by Kai De La Cruz and Kenzie Goldman import gc import pyb from pyb import USB_VCP, Pin, UART import cotask import task_share import time from motor import Motor from encoder import Encoder from collector import Collector from IR_sensor import QTR_HD_15A from imu import BNO055 from controller import PIDController class SystemInitializer: def __init__(self, R_encoder, L_encoder, R_motor, L_motor, R_collector, L_collector, IR_sens, imu, R_effort_share, L_effort_share): """Initialization task which is run once at startup to set up necessary components.""" print("[Init] Initializing system...") self.R_encoder = R_encoder self.L_encoder = L_encoder self.R_motor = R_motor self.L_motor = L_motor self.R_collector = R_collector self.L_collector = L_collector self.IR_sens = IR_sens self.R_effort_share = R_effort_share self.L_effort_share = L_effort_share # Zero the encoders self.zero_encoders() # Initialize motors self.initialize_motors() # Initialize collectors self.initialize_collectors() self.initialize_IR_sensor() time.sleep_ms(3000) # Initialize the IMU self.initialize_IMU() global effort_value_initial, L_effort_value, R_effort_value # Set initial effort values effort_value_initial = 35 L_effort_value = effort_value_initial R_effort_value = effort_value_initial R_effort_share.put(R_effort_value) L_effort_share.put(L_effort_value) def zero_encoders(self): """Zero the encoders.""" self.R_encoder.zero() self.L_encoder.zero() print("[Init] Encoders zeroed.") def initialize_motors(self): """Initialize the motors.""" self.R_motor.disable() self.L_motor.disable() print("[Init] Motors initialized and disabled.") def initialize_collectors(self): """Initialize the collectors.""" # Add any necessary initialization for collectors here print("[Init] Collectors initialized.") def initialize_IR_sensor(self): """Initialize the IR sensor.""" file_path = "IR_calibration_data.txt" try: self.IR_sens.enable() # Try to load calibration data calibration_data = IR_sens.load_calibration(file_path) # Ensure the file contains valid data if calibration_data: print("[IR Sensor] Calibration data loaded successfully.") self.IR_sens.disable() else: self.IR_sens.disable() raise ValueError("[IMU Task] Calibration file is invalid.") except: self.IR_sens.enable() self.IR_sens.calibrate_light() self.IR_sens.calibrate_dark() self.IR_sens.disable() self.IR_sens.save_calibration(file_path) self.IR_sens.load_calibration(file_path) def initialize_IMU(self): """Initialize the IMU.""" print("[Init] Initializing IMU...") imu.scan() # self.imu.set_mode(0x0C) # NDOF mode print("[Init] IMU initialized.") def Right_motor(shares): """Set mode, direction, and PWM within motor class and pulls encoder data at a set frequency.""" print("[Right motor task] Starting...") global R_motor, R_encoder state = "IDLE" # Get references to the share and queue which have been # passed to this task R_effort_share, R_pos_queue, R_vel_queue, data_collection_complete = shares while True: if state == "IDLE": if R_effort_share.get() is not None: m_effort = R_effort_share.get() if m_effort != 0: state = "RUNNING" elif state == "RUNNING": # Set motor effort m_effort = -1*R_effort_share.get() R_motor.set_effort(m_effort) R_motor.enable() R_encoder.update() R_position = R_encoder.get_position() R_velocity = R_encoder.get_velocity() R_pos_queue.put(R_position) R_vel_queue.put(R_velocity) yield state def Left_motor(shares): """Set mode, direction, and PWM within motor class and pulls encoder data at a set frequency.""" print("[Left motor task] Starting...") global L_motor, L_encoder state = "IDLE" # Get references to the share and queue which have been # passed to this task L_effort_share, R_pos_queue, R_vel_queue, data_collection_complete = shares while True: if state == "IDLE": if L_effort_share.get() is not None: m_effort = L_effort_share.get() if m_effort != 0: state = "RUNNING" elif state == "RUNNING": # Set motor effort m_effort = -1*L_effort_share.get() L_motor.set_effort(m_effort) L_motor.enable() L_encoder.update() L_position = L_encoder.get_position() L_velocity = L_encoder.get_velocity() L_pos_queue.put(L_position) L_vel_queue.put(L_velocity) yield state def collector(shares): """Pull data from collector class and format it as CSV for further analysis.""" print("[COLLECTOR] Starting...") state = "COLLECTING" # Get references to the share and queue which have # been passed to this task L_pos_queue, L_vel_queue, R_pos_queue, R_vel_queue, data_collection_complete = shares while True: if state == "COLLECTING": while not L_pos_queue.empty() and not L_vel_queue.empty(): #print("[COLLECTOR] Collecting data...") L_position = L_pos_queue.get() L_velocity = L_vel_queue.get() L_collector.collect(L_position, L_velocity) if L_collector.is_full(): L_collector.reset() while not R_pos_queue.empty() and not R_vel_queue.empty(): #print("[COLLECTOR] Collecting data...") R_position = R_pos_queue.get() R_velocity = R_vel_queue.get() R_collector.collect(R_position, R_velocity) if R_collector.is_full(): R_collector.reset() yield state def comms(shares): """USB communication task for user interaction.""" state = "WAIT" print("[Comms Task] Starting...") R_effort_share, L_effort_share = shares while True: if state == "WAIT": try: R_effort_input = int(R_effort_value) L_effort_input = int(L_effort_value) if -100 <= R_effort_input <= 100: R_effort_share.put(R_effort_input) if -100 <= L_effort_input <= 100: L_effort_share.put(L_effort_input) except ValueError: print(b"Invalid input. Must be an integer.\r\n") yield state def IR_sensor(shares): '''Task to read the IR sensor and determine the position of the line''' state = "READING" line_position_queue, dark_threshold = shares MIN_SENSOR_VALUE = 0 # Example: minimum line position value MAX_SENSOR_VALUE = 100 # Example: maximum line position value recent_values = [] # To store recent sensor values window_size = 5 # Number of values to average for smoothing while True: if state == "READING": IR_sens.enable() Centroid = IR_sens.read_line_position() # Get the raw line position Cal = IR_sens.read_calibrated() # print(f"[IR Sensor Task] Calibrated values: {Cal}") ave_cal = sum(Cal) / len(Cal) dark_threshold.put(ave_cal) # Apply sensor saturation to the Centroid value if Centroid < MIN_SENSOR_VALUE: Centroid = MIN_SENSOR_VALUE elif Centroid > MAX_SENSOR_VALUE: Centroid = MAX_SENSOR_VALUE # Filtering: Use a moving average to smooth the values recent_values.append(Centroid) # Keep only the last `window_size` values if len(recent_values) > window_size: recent_values.pop(0) # Calculate the average position to smooth out the sensor readings filtered_position = sum(recent_values) / len(recent_values) IR_sens.disable() # Put the filtered sensor value into the queue for the next task line_position_queue.put(filtered_position) yield state def IMU_task(shares): """Task to read IMU data and output Euler angles.""" # print("[IMU Task] Starting...") state = "CALIBRATING" imu_data_queue, imu_control_flg, imu_straight_heading = shares file_path = "calibration_data.txt" while True: if state == "CALIBRATING": try: # Try to load calibration data calibration_data = imu.load_calibration_data(file_path) # Ensure the file contains valid data if calibration_data: print("[IMU Task] Calibration data loaded successfully.") imu.write_calibration_data_to_imu(calibration_data) print("[IMU Task] IMU initialized with calibration data.") state = "READING" else: raise ValueError("[IMU Task] Calibration file is invalid.") except (ValueError, IndexError): # If file is missing or corrupted, generate new calibration data print("[IMU Task] No calibration data found. Creating new file.") imu.set_mode(0x0C) # NDOF mode calibration_data = imu.get_calibration_coefficients() if imu.get_calibration_status() == 0x3F: imu.set_mode(0x0F) # Config mode imu.write_calibration_data(file_path, calibration_data) imu.write_calibration_data_to_imu(calibration_data) state = "READING" elif imu.get_calibration_status() == 0xFF: imu.set_mode(0x0F) # Config mode imu.write_calibration_data(file_path, calibration_data) imu.write_calibration_data_to_imu(calibration_data) state = "READING" else: print("[IMU Task] IMU not calibrated. Please calibrate.") state = "CALIBRATING" elif state == "READING": imu.set_mode(0x0C) # NDOF mode flag = imu_control_flg.get() heading, roll, pitch = imu.get_euler_angles() # print(f"[IMU Task] Heading: {heading}") if flag == False: imu_straight_heading.put(heading) imu_data_queue.put(heading) else: imu_data_queue.put(heading) yield state def IMU_vs_IR(shares): """Task to determine whether to use IMU or IRvsensor for line following.""" state = "IR_SENSOR" imu_data_queue, line_position_queue, R_effort_share, L_effort_share, imu_straight_heading, imu_control_flg, dark_threshold = shares while True: if state == "IR_SENSOR": # Get the threshold value from the dark_threshold share threshold = dark_threshold.get() print(f"[IMU vs IR Task] IR value: {threshold}") # Check if the IR sensor is not detecting the line if threshold < 8: # Adjust this threshold based on your environment imu_control_flg.put(True) state = "IMU_CONTROL" heading_setpoint = imu_straight_heading.get() print("[IMU vs IR Task] Switching to IMU control...") if not line_position_queue.empty(): print("[IR control] Using IR sensor for line following...") imu_control_flg.put(False) # Turn off flag IR_sensor_center = 3 L_controller.set_setpoint(IR_sensor_center) R_controller.set_setpoint(IR_sensor_center) # Get the line position from the queue line_position = line_position_queue.get() # Compute correction from PID controller L_correction = L_controller.compute(line_position) R_correction = R_controller.compute(line_position) print(f"[IR Sensor] Correction: {R_correction}, {L_correction}") # Adjust motor efforts based on the correction # Slow down one side and speed up the other R_effort = R_effort_share.get() L_effort = L_effort_share.get() # Saturation limits R_effort = max(min(R_effort + R_correction, 20), 8) L_effort = max(min(L_effort - L_correction, 20), 8) print(f"[IR control] R effort: {R_effort} L effort: {L_effort}") R_effort = int(R_effort) L_effort = int(L_effort) # Update motor effort shares R_effort_share.put(R_effort) L_effort_share.put(L_effort) elif state == "IMU_CONTROL": print("[IMU control] Using IMU for line following...") if not imu_data_queue.empty(): heading = imu_data_queue.get() current_effort_R = R_effort_share.get() current_effort_L = L_effort_share.get() heading_error = heading_setpoint - heading if heading_error < -180: heading_error += 360 elif heading_error > 180: heading_error -= 360 print(f"[IMU control] Heading error: {heading_error}") print(f"[IMU control] Heading setpoint: {heading_setpoint}") correction = imu_controller.compute(heading_error) print(f"[IMU control] PID correction: {correction}") # Apply correction to motor efforts R_effort = int(max(min(current_effort_R + correction, 20), 8)) L_effort = int(max(min(current_effort_L - correction, 20), 8)) R_effort_share.put(R_effort) L_effort_share.put(L_effort) # Check if the IR sensor has detected the line again if not line_position_queue.empty(): state = "IR_SENSOR" yield state if __name__ == "__main__": # Create instances of the encoders, motors, and collectors R_encoder = Encoder(3, 'A6', 'A7') L_encoder = Encoder(2, 'A0', 'A1') R_motor = Motor(4, 1,'B6', 'C8', 'C7') L_motor = Motor(4, 2,'B7', 'H0', 'H1') R_collector = Collector() L_collector = Collector() IR_sens = QTR_HD_15A(['C0','C1','C2','C3','A4','B0','B1'],7,'A9','A15') imu = BNO055(pyb.I2C(2, pyb.I2C.CONTROLLER)) # preconfiged as CONTROLLER R_controller = PIDController(25, 0, 0.01) L_controller = PIDController(20, 0, 0.01) imu_controller = PIDController(.2, 0, 0) # serial_stream = USB_VCP() # Create a share and a queue to test function and diagnostic printouts R_effort_share = task_share.Share('h', thread_protect=False, name="R_effort_share") L_effort_share = task_share.Share('h', thread_protect=False, name="L_effort_share") R_pos_queue = task_share.Queue('f', 100, thread_protect=False, overwrite=False, name="R_pos_queue") R_vel_queue = task_share.Queue('f', 100, thread_protect=False, overwrite=False, name="R_vel_queue") L_pos_queue = task_share.Queue('f', 100, thread_protect=False, overwrite=False, name="L_pos_queue") L_vel_queue = task_share.Queue('f', 100, thread_protect=False, overwrite=False, name="L_vel_queue") data_collection_complete = task_share.Share('b', thread_protect=False, name="data_collection_complete") # Shared flag line_position_queue = task_share.Queue('f', 100, thread_protect=False, overwrite=False, name="line_position_queue") imu_data_queue = task_share.Queue('f', 100, thread_protect=False, overwrite=False, name="imu_data_queue") imu_control_flg = task_share.Share('b', thread_protect=False, name="imu_control_flg") # Shared flag imu_straight_heading = task_share.Share('f', thread_protect=False, name="imu_straight_heading") # Shared flag dark_threshold = task_share.Share('f', thread_protect=False, name="dark_threshold") # Shared flag # Initialize the system initializer = SystemInitializer(R_encoder, L_encoder, R_motor, L_motor, R_collector, L_collector, IR_sens, imu, R_effort_share, L_effort_share) R_motor_task = cotask.Task(Right_motor, name="Right_motor", priority=3, period=20, profile=True, trace=False, shares=(R_effort_share,R_pos_queue, R_vel_queue, data_collection_complete,)) L_motor_task = cotask.Task(Left_motor, name="Left_motor", priority=3, period=20, profile=True, trace=False, shares=(L_effort_share,L_pos_queue, L_vel_queue, data_collection_complete,)) collector_task = cotask.Task(collector, name="collector", priority=2, period=250, profile=True, trace=False, shares=(R_pos_queue, R_vel_queue,data_collection_complete,)) comms_task = cotask.Task(comms, name="comms", priority=2, period=750, profile=True, trace=False, shares=(R_effort_share, L_effort_share,)) IR_sensor_task = cotask.Task(IR_sensor, name="IR_sensor", priority=2, period=40, profile=True, trace=False, shares = (line_position_queue,dark_threshold)) imu_task = cotask.Task(IMU_task, name="IMU_task", priority=2, period=40, profile=True, trace=False, shares=(imu_data_queue,imu_control_flg, imu_straight_heading)) imu_vs_IR_task = cotask.Task(IMU_vs_IR, name="IMU_vs_IR", priority=2, period=40, profile=True, trace=False, shares=(imu_data_queue, line_position_queue, R_effort_share, L_effort_share,imu_control_flg, imu_straight_heading, dark_threshold)) cotask.task_list.append(comms_task) cotask.task_list.append(R_motor_task) cotask.task_list.append(L_motor_task) cotask.task_list.append(collector_task) cotask.task_list.append(IR_sensor_task) cotask.task_list.append(imu_task) cotask.task_list.append(imu_vs_IR_task) # Run the memory garbage collector to ensure memory is as defragmented as # possible before the real-time scheduler is started gc.collect() # Run the scheduler with the chosen scheduling algorithm. Quit if ^C pressed while True: try: cotask.task_list.pri_sched() except KeyboardInterrupt: L_motor.disable() R_motor.disable() IR_sens.disable() print("[Main] IR sensor disabled.") print("[Main] Motors disabled.") print(f"R M Ticks: {R_encoder.get_position()}") print("[Main] Exiting...") break # Print a table of task data and a table of shared information data print('\n' + str (cotask.task_list)) print(task_share.show_all()) print(comms_task.get_trace()) print('')