# main file
# Kai De La Cruz and Kenzie Goldman 3/13/25
import gc
import pyb
from pyb import USB_VCP, Pin, UART
import cotask
import task_share
from motor import Motor
from encoder import Encoder
from collector import Collector
from IR_sensor import QTR_HD_15A
from controller import PIDController
import time
class SystemInitializer:
def __init__(self, R_encoder, L_encoder,
R_motor, L_motor, R_collector, L_collector, IR_sens,
R_effort_share, L_effort_share):
"""Initialization task which is run once at startup to
set up necessary components."""
print("[Init] Initializing system...")
self.R_encoder = R_encoder
self.L_encoder = L_encoder
self.R_motor = R_motor
self.L_motor = L_motor
self.R_collector = R_collector
self.L_collector = L_collector
self.IR_sens = IR_sens
self.R_effort_share = R_effort_share
self.L_effort_share = L_effort_share
# Zero the encoders
self.zero_encoders()
# Initialize motors
self.initialize_motors()
# Initialize collectors
self.initialize_collectors()
self.initialize_IR_sensor()
time.sleep_ms(3000)
global effort_value_initial, L_effort_value, R_effort_value
# Set initial effort values
effort_value_initial = 20
L_effort_value = effort_value_initial
R_effort_value = effort_value_initial
R_effort_share.put(R_effort_value)
L_effort_share.put(L_effort_value)
def zero_encoders(self):
"""Zero the encoders."""
self.R_encoder.zero()
self.L_encoder.zero()
print("[Init] Encoders zeroed.")
def initialize_motors(self):
"""Initialize the motors."""
self.R_motor.disable()
self.L_motor.disable()
print("[Init] Motors initialized and disabled.")
def initialize_collectors(self):
"""Initialize the collectors."""
print("[Init] Collectors initialized.")
def initialize_IR_sensor(self):
"""Initialize the IR sensor."""
file_path = "IR_calibration_data.txt"
try:
self.IR_sens.enable()
self.IR_sens.set_pwm_duty_cycle(60)
# Try to load calibration data
calibration_data = IR_sens.load_calibration(file_path)
# Ensure the file contains valid data
if calibration_data:
print("[IR Sensor] Calibration data loaded successfully.")
self.IR_sens.disable()
else:
self.IR_sens.disable()
raise ValueError("[IMU Task] Calibration file is empty or invalid.")
except:
self.IR_sens.enable()
self.IR_sens.set_pwm_duty_cycle(60)
self.IR_sens.calibrate_light()
self.IR_sens.calibrate_dark()
self.IR_sens.disable()
self.IR_sens.save_calibration(file_path)
self.IR_sens.load_calibration(file_path)
def Right_motor(shares):
"""Set mode, direction, and PWM within motor class and pulls encoder
data at a set frequency."""
global R_motor, R_encoder
state = "IDLE"
# Get references to the share and queue which have been passed to this task
R_effort_share, R_pos_queue, R_vel_queue, data_collection_complete = shares
while True:
if state == "IDLE":
if R_effort_share.get() is not None:
m_effort = R_effort_share.get()
if m_effort != 0:
state = "RUNNING"
elif state == "RUNNING":
# Set motor effort
m_effort = -1*R_effort_share.get()
R_motor.set_effort(m_effort)
R_motor.enable()
R_encoder.update()
R_position = R_encoder.get_position()
R_velocity = R_encoder.get_velocity()
R_pos_queue.put(R_position)
R_vel_queue.put(R_velocity)
yield state
def Left_motor(shares):
"""Set mode, direction, and PWM within motor class and pulls encoder data at
a set frequency."""
global L_motor, L_encoder
state = "IDLE"
# Get references to the share and queue which have been passed to this task
L_effort_share, R_pos_queue, R_vel_queue, data_collection_complete = shares
while True:
if state == "IDLE":
if L_effort_share.get() is not None:
m_effort = L_effort_share.get()
if m_effort != 0:
state = "RUNNING"
elif state == "RUNNING":
# Set motor effort
m_effort = -1*L_effort_share.get()
L_motor.set_effort(m_effort)
L_motor.enable()
L_encoder.update()
L_position = L_encoder.get_position()
L_velocity = L_encoder.get_velocity()
L_pos_queue.put(L_position)
L_vel_queue.put(L_velocity)
yield state
def collector(shares):
"""Pull data from collector class and format it
as CSV for further analysis. And maybe plot csv data?"""
state = "COLLECTING"
L_pos_queue, L_vel_queue, R_pos_queue, R_vel_queue, data_collection_complete = shares
while True:
if state == "COLLECTING":
while not L_pos_queue.empty() and not L_vel_queue.empty():
#print("[COLLECTOR] Collecting data...")
L_position = L_pos_queue.get()
L_velocity = L_vel_queue.get()
L_collector.collect(L_position, L_velocity)
if L_collector.is_full():
L_collector.reset()
while not R_pos_queue.empty() and not R_vel_queue.empty():
#print("[COLLECTOR] Collecting data...")
R_position = R_pos_queue.get()
R_velocity = R_vel_queue.get()
R_collector.collect(R_position, R_velocity)
if R_collector.is_full():
R_collector.reset()
yield state
def comms(shares):
"""USB communication task for user interaction."""
state = "WAIT"
R_effort_share, L_effort_share = shares
while True:
if state == "WAIT":
try:
R_effort_input = int(R_effort_value)
L_effort_input = int(L_effort_value)
if -100 <= R_effort_input <= 100:
R_effort_share.put(R_effort_input)
if -100 <= L_effort_input <= 100:
L_effort_share.put(L_effort_input)
except ValueError:
print(b"Invalid input. Must be an integer.\r\n")
yield state
def IR_sensor(shares):
'''Task to read the IR sensor and determine the position of the line'''
state = "READING"
line_position_queue = shares
# Define the saturation limits based on your sensor's expected range
MIN_SENSOR_VALUE = 0 # Example: minimum line position value
MAX_SENSOR_VALUE = 100 # Example: maximum line position value
recent_values = [] # To store recent sensor values
window_size = 5 # Number of values to average for smoothing
while True:
if state == "READING":
IR_sens.enable()
IR_sens.set_pwm_duty_cycle(60)
Centroid = IR_sens.read_line_position() # Get the raw line position
# print(f"Centoid: {Centroid}")
raw = IR_sens.read_raw()
# print(f"Raw: {raw}")
# Apply sensor saturation to the Centroid value
if Centroid < MIN_SENSOR_VALUE:
Centroid = MIN_SENSOR_VALUE
elif Centroid > MAX_SENSOR_VALUE:
Centroid = MAX_SENSOR_VALUE
# Filtering: Use a moving average to smooth the values
recent_values.append(Centroid)
# Keep only the last `window_size` values
if len(recent_values) > window_size:
recent_values.pop(0)
# Calculate the average position to smooth out the sensor readings
filtered_position = sum(recent_values) / len(recent_values)
IR_sens.disable()
# Put the filtered sensor value into the queue for the next task
line_position_queue.put(filtered_position)
yield state
def mot_controller(shares):
"""PID controller task for motor control based on IR sensor and encoder ticks."""
state = "CALCULATING"
line_position_queue, R_effort_share, L_effort_share = shares
while True:
if state == "CALCULATING":
if not line_position_queue.empty():
line_position = line_position_queue.get()
# print(f"Line position: {line_position}")
# Compute correction using the new line sensor controller
line_correction = line_sensor_controller.compute(line_position)
# Compute motor-specific corrections
L_correction = L_controller.compute(line_correction)
R_correction = R_controller.compute(line_correction)
# print(f"Correction: {line_correction}")
# Adjust motor efforts
R_effort = max(min(R_effort_share.get() - R_correction, 25), 11)
L_effort = max(min(L_effort_share.get() + L_correction, 25), 11)
# Update motor effort shares
R_effort_share.put(int(R_effort))
L_effort_share.put(int(L_effort))
if -31 < L_encoder.get_position() <= -28.5:
state = "DIAMOND"
elif state == "DIAMOND":
print("Diamond state")
R_effort_share.put(18)
L_effort_share.put(20)
print(f"Pos: {L_encoder.get_position()}")
if L_encoder.get_position() < -32:
state = "CALCULATING"
print("Diamond Left")
elif not line_position_queue.empty():
line_position = line_position_queue.get()
yield state
if **name** == "**main**":
# Create instances of the encoders, motors, and collectors
R_encoder = Encoder(3, 'A6', 'A7')
L_encoder = Encoder(2,'A0', 'A1')
R_motor = Motor(4, 1,'B6', 'C8', 'C7')
L_motor = Motor(4, 2,'B7', 'H0', 'H1')
R_collector = Collector() L_collector = Collector()
IR_sens = QTR_HD_15A(\['C0','C1','C2','C3','A4','B0','B1'\],7,'A9','A15', 1, 2, 50)
R_controller = PIDController(10, 0, 0.0002)
L_controller = PIDController(10, 0, 0.0002)
line_sensor_controller = PIDController(10, 0.0003, 0)
line_sensor_controller.set_setpoint(3) # 3 is the centroid position
# Create a share and a queue to test function and diagnostic printouts
R_effort_share = task_share.Share('h', thread_protect=False,
name="R_effort_share")
L_effort_share = task_share.Share('h', thread_protect=False,
name="L_effort_share")
R_pos_queue = task_share.Queue('f', 100, thread_protect=False,
overwrite=False, name="R_pos_queue")
R_vel_queue = task_share.Queue('f', 100, thread_protect=False,
overwrite=False, name="R_vel_queue")
L_pos_queue = task_share.Queue('f', 100, thread_protect=False,
overwrite=False, name="L_pos_queue")
L_vel_queue = task_share.Queue('f', 100, thread_protect=False,
overwrite=False, name="L_vel_queue")
data_collection_complete = task_share.Share('b', thread_protect=False,
name="data_collection_complete") # Shared flag
line_position_queue = task_share.Queue('f', 100, thread_protect=False,
overwrite=False, name="line_position_queue")
# Initialize the system
initializer = SystemInitializer(R_encoder, L_encoder, R_motor, L_motor,
R_collector, L_collector, IR_sens,
R_effort_share, L_effort_share)
# Create the tasks.
R_motor_task = cotask.Task(Right_motor, name="Right_motor", priority=3,
period=10, profile=True, trace=False,
shares=(R_effort_share,R_pos_queue, R_vel_queue,
data_collection_complete,))
L_motor_task = cotask.Task(Left_motor, name="Left_motor", priority=3,
period=10, profile=True, trace=False,
shares=(L_effort_share,L_pos_queue, L_vel_queue,
data_collection_complete,))
collector_task = cotask.Task(collector, name="collector", priority=2,
period=250,
profile=True, trace=False, shares=(R_pos_queue,
R_vel_queue,data_collection_complete,))
comms_task = cotask.Task(comms, name="comms", priority=2, period=750,
profile=True, trace=False, shares=(R_effort_share,
L_effort_share,))
IR_sensor_task = cotask.Task(IR_sensor, name="IR_sensor",
priority=2, period=10, profile=True, trace=False,
shares = (line_position_queue))
mot_controller_task = cotask.Task(mot_controller, name="mot_controller",
priority=2, period=10, profile=True, trace=False,
shares = (line_position_queue,R_effort_share,
L_effort_share))
cotask.task_list.append(comms_task)
cotask.task_list.append(R_motor_task)
cotask.task_list.append(L_motor_task)
cotask.task_list.append(collector_task)
cotask.task_list.append(IR_sensor_task)
cotask.task_list.append(mot_controller_task)
# Run the memory garbage collector to ensure memory is as defragmented as
# possible before the real-time scheduler is started
gc.collect()
# Run the scheduler with the chosen scheduling algorithm. Quit if ^C pressed
while True:
try:
cotask.task_list.pri_sched()
except KeyboardInterrupt:
L_motor.disable()
R_motor.disable()
IR_sens.disable()
print("[Main] IR sensor disabled.")
print("[Main] Motors disabled.")
print("[Main] Exiting...")
break
# Print a table of task data and a table of shared information data
print('\n' + str (cotask.task_list))
print(task_share.show_all())
print(comms_task.get_trace())
print('')