Alternate Main
This alternate main file incoporates IMU control outlined in attachments.
# main file
# by Kai De La Cruz and Kenzie Goldman
import gc
import pyb
from pyb import USB_VCP, Pin, UART
import cotask
import task_share
import time
from motor import Motor
from encoder import Encoder
from collector import Collector
from IR_sensor import QTR_HD_15A
from imu import BNO055
from controller import PIDController
class SystemInitializer:
def __init__(self, R_encoder, L_encoder, R_motor, L_motor, R_collector,
L_collector, IR_sens, imu, R_effort_share, L_effort_share):
"""Initialization task which is run once at startup to set up
necessary components."""
print("[Init] Initializing system...")
self.R_encoder = R_encoder
self.L_encoder = L_encoder
self.R_motor = R_motor
self.L_motor = L_motor
self.R_collector = R_collector
self.L_collector = L_collector
self.IR_sens = IR_sens
self.R_effort_share = R_effort_share
self.L_effort_share = L_effort_share
# Zero the encoders
self.zero_encoders()
# Initialize motors
self.initialize_motors()
# Initialize collectors
self.initialize_collectors()
self.initialize_IR_sensor()
time.sleep_ms(3000)
# Initialize the IMU
self.initialize_IMU()
global effort_value_initial, L_effort_value, R_effort_value
# Set initial effort values
effort_value_initial = 35
L_effort_value = effort_value_initial
R_effort_value = effort_value_initial
R_effort_share.put(R_effort_value)
L_effort_share.put(L_effort_value)
def zero_encoders(self):
"""Zero the encoders."""
self.R_encoder.zero()
self.L_encoder.zero()
print("[Init] Encoders zeroed.")
def initialize_motors(self):
"""Initialize the motors."""
self.R_motor.disable()
self.L_motor.disable()
print("[Init] Motors initialized and disabled.")
def initialize_collectors(self):
"""Initialize the collectors."""
# Add any necessary initialization for collectors here
print("[Init] Collectors initialized.")
def initialize_IR_sensor(self):
"""Initialize the IR sensor."""
file_path = "IR_calibration_data.txt"
try:
self.IR_sens.enable()
# Try to load calibration data
calibration_data = IR_sens.load_calibration(file_path)
# Ensure the file contains valid data
if calibration_data:
print("[IR Sensor] Calibration data loaded successfully.")
self.IR_sens.disable()
else:
self.IR_sens.disable()
raise ValueError("[IMU Task] Calibration file is invalid.")
except:
self.IR_sens.enable()
self.IR_sens.calibrate_light()
self.IR_sens.calibrate_dark()
self.IR_sens.disable()
self.IR_sens.save_calibration(file_path)
self.IR_sens.load_calibration(file_path)
def initialize_IMU(self):
"""Initialize the IMU."""
print("[Init] Initializing IMU...")
imu.scan()
# self.imu.set_mode(0x0C) # NDOF mode
print("[Init] IMU initialized.")
def Right_motor(shares):
"""Set mode, direction, and PWM within motor class and pulls encoder
data at a set frequency."""
print("[Right motor task] Starting...")
global R_motor, R_encoder
state = "IDLE" # Get references to the share and queue which have been
# passed to this task
R_effort_share, R_pos_queue, R_vel_queue, data_collection_complete = shares
while True:
if state == "IDLE":
if R_effort_share.get() is not None:
m_effort = R_effort_share.get()
if m_effort != 0:
state = "RUNNING"
elif state == "RUNNING":
# Set motor effort
m_effort = -1*R_effort_share.get()
R_motor.set_effort(m_effort)
R_motor.enable()
R_encoder.update()
R_position = R_encoder.get_position()
R_velocity = R_encoder.get_velocity()
R_pos_queue.put(R_position)
R_vel_queue.put(R_velocity)
yield state
def Left_motor(shares):
"""Set mode, direction, and PWM within motor class and pulls encoder
data at a set frequency."""
print("[Left motor task] Starting...")
global L_motor, L_encoder
state = "IDLE" # Get references to the share and queue which have been
# passed to this task
L_effort_share, R_pos_queue, R_vel_queue, data_collection_complete = shares
while True:
if state == "IDLE":
if L_effort_share.get() is not None:
m_effort = L_effort_share.get()
if m_effort != 0:
state = "RUNNING"
elif state == "RUNNING":
# Set motor effort
m_effort = -1*L_effort_share.get()
L_motor.set_effort(m_effort)
L_motor.enable()
L_encoder.update()
L_position = L_encoder.get_position()
L_velocity = L_encoder.get_velocity()
L_pos_queue.put(L_position)
L_vel_queue.put(L_velocity)
yield state
def collector(shares):
"""Pull data from collector class and format it as CSV for further analysis."""
print("[COLLECTOR] Starting...")
state = "COLLECTING" # Get references to the share and queue which have
# been passed to this task
L_pos_queue, L_vel_queue, R_pos_queue, R_vel_queue,
data_collection_complete = shares
while True:
if state == "COLLECTING":
while not L_pos_queue.empty() and not L_vel_queue.empty():
#print("[COLLECTOR] Collecting data...")
L_position = L_pos_queue.get()
L_velocity = L_vel_queue.get()
L_collector.collect(L_position, L_velocity)
if L_collector.is_full():
L_collector.reset()
while not R_pos_queue.empty() and not R_vel_queue.empty():
#print("[COLLECTOR] Collecting data...")
R_position = R_pos_queue.get()
R_velocity = R_vel_queue.get()
R_collector.collect(R_position, R_velocity)
if R_collector.is_full():
R_collector.reset()
yield state
def comms(shares):
"""USB communication task for user interaction."""
state = "WAIT"
print("[Comms Task] Starting...")
R_effort_share, L_effort_share = shares
while True:
if state == "WAIT":
try:
R_effort_input = int(R_effort_value)
L_effort_input = int(L_effort_value)
if -100 <= R_effort_input <= 100:
R_effort_share.put(R_effort_input)
if -100 <= L_effort_input <= 100:
L_effort_share.put(L_effort_input)
except ValueError:
print(b"Invalid input. Must be an integer.\r\n")
yield state
def IR_sensor(shares):
'''Task to read the IR sensor and determine the position of the line'''
state = "READING"
line_position_queue, dark_threshold = shares
MIN_SENSOR_VALUE = 0 # Example: minimum line position value
MAX_SENSOR_VALUE = 100 # Example: maximum line position value
recent_values = [] # To store recent sensor values
window_size = 5 # Number of values to average for smoothing
while True:
if state == "READING":
IR_sens.enable()
Centroid = IR_sens.read_line_position() # Get the raw line position
Cal = IR_sens.read_calibrated()
# print(f"[IR Sensor Task] Calibrated values: {Cal}")
ave_cal = sum(Cal) / len(Cal)
dark_threshold.put(ave_cal)
# Apply sensor saturation to the Centroid value
if Centroid < MIN_SENSOR_VALUE:
Centroid = MIN_SENSOR_VALUE
elif Centroid > MAX_SENSOR_VALUE:
Centroid = MAX_SENSOR_VALUE
# Filtering: Use a moving average to smooth the values
recent_values.append(Centroid)
# Keep only the last `window_size` values
if len(recent_values) > window_size:
recent_values.pop(0)
# Calculate the average position to smooth out the sensor readings
filtered_position = sum(recent_values) / len(recent_values)
IR_sens.disable()
# Put the filtered sensor value into the queue for the next task
line_position_queue.put(filtered_position)
yield state
def IMU_task(shares):
"""Task to read IMU data and output Euler angles."""
# print("[IMU Task] Starting...")
state = "CALIBRATING"
imu_data_queue, imu_control_flg, imu_straight_heading = shares
file_path = "calibration_data.txt"
while True:
if state == "CALIBRATING":
try:
# Try to load calibration data
calibration_data = imu.load_calibration_data(file_path)
# Ensure the file contains valid data
if calibration_data:
print("[IMU Task] Calibration data loaded successfully.")
imu.write_calibration_data_to_imu(calibration_data)
print("[IMU Task] IMU initialized with calibration data.")
state = "READING"
else:
raise ValueError("[IMU Task] Calibration file is invalid.")
except (ValueError, IndexError):
# If file is missing or corrupted, generate new calibration data
print("[IMU Task] No calibration data found. Creating new file.")
imu.set_mode(0x0C) # NDOF mode
calibration_data = imu.get_calibration_coefficients()
if imu.get_calibration_status() == 0x3F:
imu.set_mode(0x0F) # Config mode
imu.write_calibration_data(file_path, calibration_data)
imu.write_calibration_data_to_imu(calibration_data)
state = "READING"
elif imu.get_calibration_status() == 0xFF:
imu.set_mode(0x0F) # Config mode
imu.write_calibration_data(file_path, calibration_data)
imu.write_calibration_data_to_imu(calibration_data)
state = "READING"
else:
print("[IMU Task] IMU not calibrated. Please calibrate.")
state = "CALIBRATING"
elif state == "READING":
imu.set_mode(0x0C) # NDOF mode
flag = imu_control_flg.get()
heading, roll, pitch = imu.get_euler_angles()
# print(f"[IMU Task] Heading: {heading}")
if flag == False:
imu_straight_heading.put(heading)
imu_data_queue.put(heading)
else:
imu_data_queue.put(heading)
yield state
def IMU_vs_IR(shares):
"""Task to determine whether to use IMU or IRvsensor for line following."""
state = "IR_SENSOR"
imu_data_queue, line_position_queue, R_effort_share, L_effort_share,
imu_straight_heading, imu_control_flg, dark_threshold = shares
while True:
if state == "IR_SENSOR":
# Get the threshold value from the dark_threshold share
threshold = dark_threshold.get()
print(f"[IMU vs IR Task] IR value: {threshold}")
# Check if the IR sensor is not detecting the line
if threshold < 8: # Adjust this threshold based on your environment
imu_control_flg.put(True)
state = "IMU_CONTROL"
heading_setpoint = imu_straight_heading.get()
print("[IMU vs IR Task] Switching to IMU control...")
if not line_position_queue.empty():
print("[IR control] Using IR sensor for line following...")
imu_control_flg.put(False) # Turn off flag
IR_sensor_center = 3
L_controller.set_setpoint(IR_sensor_center)
R_controller.set_setpoint(IR_sensor_center)
# Get the line position from the queue
line_position = line_position_queue.get()
# Compute correction from PID controller
L_correction = L_controller.compute(line_position)
R_correction = R_controller.compute(line_position)
print(f"[IR Sensor] Correction: {R_correction}, {L_correction}")
# Adjust motor efforts based on the correction
# Slow down one side and speed up the other
R_effort = R_effort_share.get()
L_effort = L_effort_share.get()
# Saturation limits
R_effort = max(min(R_effort + R_correction, 20), 8)
L_effort = max(min(L_effort - L_correction, 20), 8)
print(f"[IR control] R effort: {R_effort} L effort: {L_effort}")
R_effort = int(R_effort)
L_effort = int(L_effort)
# Update motor effort shares
R_effort_share.put(R_effort)
L_effort_share.put(L_effort)
elif state == "IMU_CONTROL":
print("[IMU control] Using IMU for line following...")
if not imu_data_queue.empty():
heading = imu_data_queue.get()
current_effort_R = R_effort_share.get()
current_effort_L = L_effort_share.get()
heading_error = heading_setpoint - heading
if heading_error < -180:
heading_error += 360
elif heading_error > 180:
heading_error -= 360
print(f"[IMU control] Heading error: {heading_error}")
print(f"[IMU control] Heading setpoint: {heading_setpoint}")
correction = imu_controller.compute(heading_error)
print(f"[IMU control] PID correction: {correction}")
# Apply correction to motor efforts
R_effort = int(max(min(current_effort_R + correction, 20), 8))
L_effort = int(max(min(current_effort_L - correction, 20), 8))
R_effort_share.put(R_effort)
L_effort_share.put(L_effort)
# Check if the IR sensor has detected the line again
if not line_position_queue.empty():
state = "IR_SENSOR"
yield state
if __name__ == "__main__":
# Create instances of the encoders, motors, and collectors
R_encoder = Encoder(3, 'A6', 'A7')
L_encoder = Encoder(2, 'A0', 'A1')
R_motor = Motor(4, 1,'B6', 'C8', 'C7')
L_motor = Motor(4, 2,'B7', 'H0', 'H1')
R_collector = Collector()
L_collector = Collector()
IR_sens = QTR_HD_15A(['C0','C1','C2','C3','A4','B0','B1'],7,'A9','A15')
imu = BNO055(pyb.I2C(2, pyb.I2C.CONTROLLER)) # preconfiged as CONTROLLER
R_controller = PIDController(25, 0, 0.01)
L_controller = PIDController(20, 0, 0.01)
imu_controller = PIDController(.2, 0, 0)
# serial_stream = USB_VCP()
# Create a share and a queue to test function and diagnostic printouts
R_effort_share = task_share.Share('h', thread_protect=False,
name="R_effort_share")
L_effort_share = task_share.Share('h', thread_protect=False,
name="L_effort_share")
R_pos_queue = task_share.Queue('f', 100, thread_protect=False,
overwrite=False, name="R_pos_queue")
R_vel_queue = task_share.Queue('f', 100, thread_protect=False,
overwrite=False, name="R_vel_queue")
L_pos_queue = task_share.Queue('f', 100, thread_protect=False,
overwrite=False, name="L_pos_queue")
L_vel_queue = task_share.Queue('f', 100, thread_protect=False,
overwrite=False, name="L_vel_queue")
data_collection_complete = task_share.Share('b', thread_protect=False,
name="data_collection_complete") # Shared flag
line_position_queue = task_share.Queue('f', 100, thread_protect=False,
overwrite=False, name="line_position_queue")
imu_data_queue = task_share.Queue('f', 100, thread_protect=False,
overwrite=False, name="imu_data_queue")
imu_control_flg = task_share.Share('b', thread_protect=False,
name="imu_control_flg") # Shared flag
imu_straight_heading = task_share.Share('f', thread_protect=False,
name="imu_straight_heading") # Shared flag
dark_threshold = task_share.Share('f', thread_protect=False,
name="dark_threshold") # Shared flag
# Initialize the system
initializer = SystemInitializer(R_encoder, L_encoder, R_motor, L_motor,
R_collector, L_collector, IR_sens, imu,
R_effort_share, L_effort_share)
R_motor_task = cotask.Task(Right_motor, name="Right_motor", priority=3,
period=20, profile=True, trace=False,
shares=(R_effort_share,R_pos_queue, R_vel_queue,
data_collection_complete,))
L_motor_task = cotask.Task(Left_motor, name="Left_motor", priority=3,
period=20, profile=True, trace=False,
shares=(L_effort_share,L_pos_queue, L_vel_queue,
data_collection_complete,))
collector_task = cotask.Task(collector, name="collector", priority=2,
period=250,
profile=True, trace=False, shares=(R_pos_queue,
R_vel_queue,data_collection_complete,))
comms_task = cotask.Task(comms, name="comms", priority=2, period=750,
profile=True, trace=False, shares=(R_effort_share,
L_effort_share,))
IR_sensor_task = cotask.Task(IR_sensor, name="IR_sensor",
priority=2, period=40, profile=True, trace=False,
shares = (line_position_queue,dark_threshold))
imu_task = cotask.Task(IMU_task, name="IMU_task", priority=2, period=40,
profile=True, trace=False,
shares=(imu_data_queue,imu_control_flg,
imu_straight_heading))
imu_vs_IR_task = cotask.Task(IMU_vs_IR, name="IMU_vs_IR",
priority=2, period=40, profile=True, trace=False,
shares=(imu_data_queue, line_position_queue,
R_effort_share, L_effort_share,imu_control_flg,
imu_straight_heading, dark_threshold))
cotask.task_list.append(comms_task)
cotask.task_list.append(R_motor_task)
cotask.task_list.append(L_motor_task)
cotask.task_list.append(collector_task)
cotask.task_list.append(IR_sensor_task)
cotask.task_list.append(imu_task)
cotask.task_list.append(imu_vs_IR_task)
# Run the memory garbage collector to ensure memory is as defragmented as
# possible before the real-time scheduler is started
gc.collect()
# Run the scheduler with the chosen scheduling algorithm. Quit if ^C pressed
while True:
try:
cotask.task_list.pri_sched()
except KeyboardInterrupt:
L_motor.disable()
R_motor.disable()
IR_sens.disable()
print("[Main] IR sensor disabled.")
print("[Main] Motors disabled.")
print(f"R M Ticks: {R_encoder.get_position()}")
print("[Main] Exiting...")
break
# Print a table of task data and a table of shared information data
print('\n' + str (cotask.task_list))
print(task_share.show_all())
print(comms_task.get_trace())
print('')