# Main # main file # Kai De La Cruz and Kenzie Goldman 3/13/25 import gc import pyb from pyb import USB_VCP, Pin, UART import cotask import task_share from motor import Motor from encoder import Encoder from collector import Collector from IR_sensor import QTR_HD_15A from controller import PIDController import time class SystemInitializer: def __init__(self, R_encoder, L_encoder, R_motor, L_motor, R_collector, L_collector, IR_sens, R_effort_share, L_effort_share): """Initialization task which is run once at startup to set up necessary components.""" print("[Init] Initializing system...") self.R_encoder = R_encoder self.L_encoder = L_encoder self.R_motor = R_motor self.L_motor = L_motor self.R_collector = R_collector self.L_collector = L_collector self.IR_sens = IR_sens self.R_effort_share = R_effort_share self.L_effort_share = L_effort_share # Zero the encoders self.zero_encoders() # Initialize motors self.initialize_motors() # Initialize collectors self.initialize_collectors() self.initialize_IR_sensor() time.sleep_ms(3000) global effort_value_initial, L_effort_value, R_effort_value # Set initial effort values effort_value_initial = 20 L_effort_value = effort_value_initial R_effort_value = effort_value_initial R_effort_share.put(R_effort_value) L_effort_share.put(L_effort_value) def zero_encoders(self): """Zero the encoders.""" self.R_encoder.zero() self.L_encoder.zero() print("[Init] Encoders zeroed.") def initialize_motors(self): """Initialize the motors.""" self.R_motor.disable() self.L_motor.disable() print("[Init] Motors initialized and disabled.") def initialize_collectors(self): """Initialize the collectors.""" print("[Init] Collectors initialized.") def initialize_IR_sensor(self): """Initialize the IR sensor.""" file_path = "IR_calibration_data.txt" try: self.IR_sens.enable() self.IR_sens.set_pwm_duty_cycle(60) # Try to load calibration data calibration_data = IR_sens.load_calibration(file_path) # Ensure the file contains valid data if calibration_data: print("[IR Sensor] Calibration data loaded successfully.") self.IR_sens.disable() else: self.IR_sens.disable() raise ValueError("[IMU Task] Calibration file is empty or invalid.") except: self.IR_sens.enable() self.IR_sens.set_pwm_duty_cycle(60) self.IR_sens.calibrate_light() self.IR_sens.calibrate_dark() self.IR_sens.disable() self.IR_sens.save_calibration(file_path) self.IR_sens.load_calibration(file_path) def Right_motor(shares): """Set mode, direction, and PWM within motor class and pulls encoder data at a set frequency.""" global R_motor, R_encoder state = "IDLE" # Get references to the share and queue which have been passed to this task R_effort_share, R_pos_queue, R_vel_queue, data_collection_complete = shares while True: if state == "IDLE": if R_effort_share.get() is not None: m_effort = R_effort_share.get() if m_effort != 0: state = "RUNNING" elif state == "RUNNING": # Set motor effort m_effort = -1*R_effort_share.get() R_motor.set_effort(m_effort) R_motor.enable() R_encoder.update() R_position = R_encoder.get_position() R_velocity = R_encoder.get_velocity() R_pos_queue.put(R_position) R_vel_queue.put(R_velocity) yield state def Left_motor(shares): """Set mode, direction, and PWM within motor class and pulls encoder data at a set frequency.""" global L_motor, L_encoder state = "IDLE" # Get references to the share and queue which have been passed to this task L_effort_share, R_pos_queue, R_vel_queue, data_collection_complete = shares while True: if state == "IDLE": if L_effort_share.get() is not None: m_effort = L_effort_share.get() if m_effort != 0: state = "RUNNING" elif state == "RUNNING": # Set motor effort m_effort = -1*L_effort_share.get() L_motor.set_effort(m_effort) L_motor.enable() L_encoder.update() L_position = L_encoder.get_position() L_velocity = L_encoder.get_velocity() L_pos_queue.put(L_position) L_vel_queue.put(L_velocity) yield state def collector(shares): """Pull data from collector class and format it as CSV for further analysis. And maybe plot csv data?""" state = "COLLECTING" L_pos_queue, L_vel_queue, R_pos_queue, R_vel_queue, data_collection_complete = shares while True: if state == "COLLECTING": while not L_pos_queue.empty() and not L_vel_queue.empty(): #print("[COLLECTOR] Collecting data...") L_position = L_pos_queue.get() L_velocity = L_vel_queue.get() L_collector.collect(L_position, L_velocity) if L_collector.is_full(): L_collector.reset() while not R_pos_queue.empty() and not R_vel_queue.empty(): #print("[COLLECTOR] Collecting data...") R_position = R_pos_queue.get() R_velocity = R_vel_queue.get() R_collector.collect(R_position, R_velocity) if R_collector.is_full(): R_collector.reset() yield state def comms(shares): """USB communication task for user interaction.""" state = "WAIT" R_effort_share, L_effort_share = shares while True: if state == "WAIT": try: R_effort_input = int(R_effort_value) L_effort_input = int(L_effort_value) if -100 <= R_effort_input <= 100: R_effort_share.put(R_effort_input) if -100 <= L_effort_input <= 100: L_effort_share.put(L_effort_input) except ValueError: print(b"Invalid input. Must be an integer.\r\n") yield state def IR_sensor(shares): '''Task to read the IR sensor and determine the position of the line''' state = "READING" line_position_queue = shares # Define the saturation limits based on your sensor's expected range MIN_SENSOR_VALUE = 0 # Example: minimum line position value MAX_SENSOR_VALUE = 100 # Example: maximum line position value recent_values = [] # To store recent sensor values window_size = 5 # Number of values to average for smoothing while True: if state == "READING": IR_sens.enable() IR_sens.set_pwm_duty_cycle(60) Centroid = IR_sens.read_line_position() # Get the raw line position # print(f"Centoid: {Centroid}") raw = IR_sens.read_raw() # print(f"Raw: {raw}") # Apply sensor saturation to the Centroid value if Centroid < MIN_SENSOR_VALUE: Centroid = MIN_SENSOR_VALUE elif Centroid > MAX_SENSOR_VALUE: Centroid = MAX_SENSOR_VALUE # Filtering: Use a moving average to smooth the values recent_values.append(Centroid) # Keep only the last `window_size` values if len(recent_values) > window_size: recent_values.pop(0) # Calculate the average position to smooth out the sensor readings filtered_position = sum(recent_values) / len(recent_values) IR_sens.disable() # Put the filtered sensor value into the queue for the next task line_position_queue.put(filtered_position) yield state def mot_controller(shares): """PID controller task for motor control based on IR sensor and encoder ticks.""" state = "CALCULATING" line_position_queue, R_effort_share, L_effort_share = shares while True: if state == "CALCULATING": if not line_position_queue.empty(): line_position = line_position_queue.get() # print(f"Line position: {line_position}") # Compute correction using the new line sensor controller line_correction = line_sensor_controller.compute(line_position) # Compute motor-specific corrections L_correction = L_controller.compute(line_correction) R_correction = R_controller.compute(line_correction) # print(f"Correction: {line_correction}") # Adjust motor efforts R_effort = max(min(R_effort_share.get() - R_correction, 25), 11) L_effort = max(min(L_effort_share.get() + L_correction, 25), 11) # Update motor effort shares R_effort_share.put(int(R_effort)) L_effort_share.put(int(L_effort)) if -31 < L_encoder.get_position() <= -28.5: state = "DIAMOND" elif state == "DIAMOND": print("Diamond state") R_effort_share.put(18) L_effort_share.put(20) print(f"Pos: {L_encoder.get_position()}") if L_encoder.get_position() < -32: state = "CALCULATING" print("Diamond Left") elif not line_position_queue.empty(): line_position = line_position_queue.get() yield state if **name** == "**main**": # Create instances of the encoders, motors, and collectors R_encoder = Encoder(3, 'A6', 'A7') L_encoder = Encoder(2,'A0', 'A1') R_motor = Motor(4, 1,'B6', 'C8', 'C7') L_motor = Motor(4, 2,'B7', 'H0', 'H1') R_collector = Collector() L_collector = Collector() IR_sens = QTR_HD_15A(\['C0','C1','C2','C3','A4','B0','B1'\],7,'A9','A15', 1, 2, 50) R_controller = PIDController(10, 0, 0.0002) L_controller = PIDController(10, 0, 0.0002) line_sensor_controller = PIDController(10, 0.0003, 0) line_sensor_controller.set_setpoint(3) # 3 is the centroid position # Create a share and a queue to test function and diagnostic printouts R_effort_share = task_share.Share('h', thread_protect=False, name="R_effort_share") L_effort_share = task_share.Share('h', thread_protect=False, name="L_effort_share") R_pos_queue = task_share.Queue('f', 100, thread_protect=False, overwrite=False, name="R_pos_queue") R_vel_queue = task_share.Queue('f', 100, thread_protect=False, overwrite=False, name="R_vel_queue") L_pos_queue = task_share.Queue('f', 100, thread_protect=False, overwrite=False, name="L_pos_queue") L_vel_queue = task_share.Queue('f', 100, thread_protect=False, overwrite=False, name="L_vel_queue") data_collection_complete = task_share.Share('b', thread_protect=False, name="data_collection_complete") # Shared flag line_position_queue = task_share.Queue('f', 100, thread_protect=False, overwrite=False, name="line_position_queue") # Initialize the system initializer = SystemInitializer(R_encoder, L_encoder, R_motor, L_motor, R_collector, L_collector, IR_sens, R_effort_share, L_effort_share) # Create the tasks. R_motor_task = cotask.Task(Right_motor, name="Right_motor", priority=3, period=10, profile=True, trace=False, shares=(R_effort_share,R_pos_queue, R_vel_queue, data_collection_complete,)) L_motor_task = cotask.Task(Left_motor, name="Left_motor", priority=3, period=10, profile=True, trace=False, shares=(L_effort_share,L_pos_queue, L_vel_queue, data_collection_complete,)) collector_task = cotask.Task(collector, name="collector", priority=2, period=250, profile=True, trace=False, shares=(R_pos_queue, R_vel_queue,data_collection_complete,)) comms_task = cotask.Task(comms, name="comms", priority=2, period=750, profile=True, trace=False, shares=(R_effort_share, L_effort_share,)) IR_sensor_task = cotask.Task(IR_sensor, name="IR_sensor", priority=2, period=10, profile=True, trace=False, shares = (line_position_queue)) mot_controller_task = cotask.Task(mot_controller, name="mot_controller", priority=2, period=10, profile=True, trace=False, shares = (line_position_queue,R_effort_share, L_effort_share)) cotask.task_list.append(comms_task) cotask.task_list.append(R_motor_task) cotask.task_list.append(L_motor_task) cotask.task_list.append(collector_task) cotask.task_list.append(IR_sensor_task) cotask.task_list.append(mot_controller_task) # Run the memory garbage collector to ensure memory is as defragmented as # possible before the real-time scheduler is started gc.collect() # Run the scheduler with the chosen scheduling algorithm. Quit if ^C pressed while True: try: cotask.task_list.pri_sched() except KeyboardInterrupt: L_motor.disable() R_motor.disable() IR_sens.disable() print("[Main] IR sensor disabled.") print("[Main] Motors disabled.") print("[Main] Exiting...") break # Print a table of task data and a table of shared information data print('\n' + str (cotask.task_list)) print(task_share.show_all()) print(comms_task.get_trace()) print('')