Softwares
ROS2
Robot Operating System (ROS) is a set of software libraries and tools that help you build robot applications. From drivers to state-of-the-art algorithms, and with powerful developer tools, ROS has what you need for your next robotics project. And it's all open source. In this project, we use ROS2 Humble. ros2 documentation
Nav2
Nav2 is the go-to industry-standard mobile robot navigation system, deploying Autonomous Vehicle technologies brought down, reworked, and optimized for mobile and surface robotics. As the successor to the ROS Navigation Stack, Nav2 builds on 15 years of heritage and is accelerating the robotics industry. nav2 documentation
Slam Toolbox
The Slam Toolbox package incorporates information from laser scanners in the form of a LaserScan message and TF transforms from odom->base link, and creates a map 2D map of a space. This package will allow you to fully serialize the data and pose-graph of the SLAM map to be reloaded to continue mapping, localize, merge, or otherwise manipulate. We allow for SLAM Toolbox to be run in synchronous (process all valid sensor measurements, regardless of lag) and asynchronous (process valid sensors measurements on an as-possible basis) modes.
RPLidar ROS
The rplidar_ros_package that support rplidar A1/A2/A3/S1/S2/S3/T1 rplidar_ros2_github. It is used in ROS2 for SLAM with an additional support for odometry rf2o_laser_odometry_github
Ultralytics (Yolov8) - Optional
Ultralytics YOLOv8, the latest version of the acclaimed real-time object detection and image segmentation model. YOLOv8 is built on cutting-edge advancements in deep learning and computer vision, offering unparalleled performance in terms of speed and accuracy. Its streamlined design makes it suitable for various applications and easily adaptable to different hardware platforms, from edge devices to cloud APIs.
Object Detection
Object detection is a task that involves identifying the location and class of objects in an image or video stream. The documentation is available on the official website
Programs
Raspberry Pi
In this project we are using Raspberry Pi 4 with 8GB RAM. Use Raspberry Pi Imager to flash suitable OS on the micro sd card. In this case, we are using Ubuntu 20.04.
GPIO Zero
Interface to GPIO devices with Raspberry Pi. Read documentation
It needs to be installed in order to make the pins to function. Installation Guide
To get started with the pins, check out this demo development kit: SunFounder DaVinci Kit for Raspberry Pi
If you are using Raspberry Pi OS, follow this tutorial to set up GPIO with wiringPi based on your preferred programming language:
Tutorial
Camera
In order for the camera to work properly after it is connected to the Raspberry Pi,
we need to install necessary packages and configure the configuration file. Camera is purchased from Freenove.
- If you are using Ubuntu 20.04, you can download installation guide here
, or you can find it on GitHub
- If you are using Raspberry Pi OS, the specific tutorial can be downloaded here
YOLOv8
Object Detection Code Sample
# Linyun Liu (2024)
import cv2
from picamera2 import Picamera2
from ultralytics import YOLO
import sys
import os
import time
# Initialize the Picamera2
picam2 = Picamera2()
picam2.preview_configuration.main.size = (1280, 720)
picam2.preview_configuration.main.format = "RGB888"
picam2.preview_configuration.align()
picam2.configure("preview")
picam2.start()
# Load the YOLOv8 model
model_custom = YOLO("/home/airobot/Rover/camera/detection/v3.pt")
model_default = YOLO("/home/airobot/Rover/camera/detection/yolov8n.pt")
def extract(results, frame):
boxes = results.boxes.xywh.cpu()
classes = results.boxes.cls.cpu().tolist() # return detected objects ID -> List of int
names = results.names # return all possible names for dtection -> Dictionary (int: name)
confs = results.boxes.conf.float().cpu().tolist() # return detected objects confidence score -> List of Float
output = []
for i in range(len(classes)):
result = []
result.append(names[classes[i]])
result.append(confs[i])
x1, y1, x2, y2 = (results[i].boxes.xyxy)[0]
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
center_x = (x1+x2)//2
center_y = (y1+y2)//2
result.append([center_x, center_y])
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.circle(frame, (center_x, center_y), 5, (0, 0, 255), -1)
x, y, w, h = (results[i].boxes.xywh)[0]
result.append(int(w)*int(h))
output.append(result)
return output # 2D array, i.e. [[person, 0.92, coordinates, area], [cat, 0.67, coordinates, area]], [detcted object, confidence score, object bounding box coordinates, box area]
def detect(controller):
try:
while True:
frame = picam2.capture_array()
results_c = model_custom(frame)
results_d = model_default(frame)
os.system("clear")
annotated_frame = results_d[0].plot()
try:
result_c = extract(results_c[0])
result_d = extract(results_d[0], annotated_frame)
for r in result_c:
print(f"{r[0]}: {r[1]}")
for r in result_d:
print(f"{r[0]}: {r[1]}")
except:
pass
cv2.imshow("Camera", annotated_frame)
if cv2.waitKey(1) == ord("q"):
cv2.destroyAllWindows()
break
except KeyboardInterrupt:
os.system("clear")
cv2.destroyAllWindows()
Model Training
To learn how to train a model, please go to ultralytics websites, there are plenty instructions, tutorials, and videos as reference. Read Documentation
Motor Controllers
We have written some python codes that works with the DAC and Motor Controller in order to control the motors as well as code that measure the tacheometry and position of the wheels from motor hall sensors
motors.py
# contributors:
# Jacobus Burger (Jun 2024)
# Jacobus Burger (Aug 2024)
# description:
# This is a low level wrapper for the motors (the motor control board, DAC, and associated GPIO pins). It includes only basic functionality.
import gpiozero
import board
import busio
from adafruit_mcp4725 import MCP4725
class Motor():
def __init__(self, DIR_PIN, SPEED_ADDR, I2C_INTERFACE, dir_reversed=True):
# attach digital pins for controlling features
self.dir_reversed = dir_reversed
self.DIR = gpiozero.DigitalOutputDevice(DIR_PIN, initial_value=dir_reversed)
# self.STOP = gpiozero.DigitalOutputDevice(STOP_PIN, active_high=False) # 1 = off, 0 = on
# connect I2C DAC for controlling speed
while not I2C_INTERFACE.try_lock():
pass
try:
devices = I2C_INTERFACE.scan()
print("I2C devices found:", [hex(device) for device in devices])
finally:
I2C_INTERFACE.unlock()
self.SPEED = MCP4725(I2C_INTERFACE, address=SPEED_ADDR)
def enable(self):
"""enable motors with pin 13"""
MOTOR_ENABLE.on()
def disable(self):
MOTOR_ENABLE.off()
def move(self, velocity, forward=True):
# print("move: ")
# y = mx + b where y = velocity, m = ratio, and x is normalized voltage value
# so to find x, x = (y-b)/m
# reselt from linear regression
# y = 5.2667x + 0.22
# print(" min((max({}-0.22) / 5.2667, 1.0), 0.0)".format(velocity))
normalized_voltage = max(0.0, min((velocity-0.22) / 5.2667, 1.0))
# print("norm_volt = {}".format(normalized_voltage))
# where 0.0 is 0v and 1.0 is 5v
self.SPEED.normalized_value = normalized_voltage
# self.SPEED.normalized_value = velocity # it is normalized voltage
# set direction value (when dir_reversed then forward = 1 otherwise forward = 0)
self.DIR.value = int(forward) if self.dir_reversed else int(not forward)
controller.py
# Linyun Liu, Jacobus Burger
# this file is an abstraction to control the movement system on a high level
from motors import *
from time import sleep
from threading import Thread
# from digitalio import DigitalInOut
# from adafruit_mcp4725 import MCP4725
# create I2C interface
i2c = busio.I2C(board.SCL, board.SDA)
# DIR, I2C_ADDR, I2C_INTERFACE, [dir_reversed]
left = Motor(19, 0x61, i2c, dir_reversed=False)
right = Motor(20, 0x60, i2c, dir_reversed=True)
# start watcher thread to disable motors when button is pressed
EMERGENCY_DISABLE = gpiozero.DigitalInputDevice(6)
def emergency():
while True:
if EMERGENCY_DISABLE.value == 1:
disable()
t = Thread(target=emergency)
t.start()
# enable/disable motor power
MOTOR_ENABLE = gpiozero.DigitalOutputDevice(13)
def enable():
MOTOR_ENABLE.on()
def disable():
MOTOR_ENABLE.off()
# WARNING: speed is in m/s
DEFAULT_SPEED=0.70
enable()
def stop():
left.move(0)
right.move(0)
stop()
def forward(speed=DEFAULT_SPEED):
stop()
left.move(speed)
right.move(speed)
def test(speed, sec):
forward(speed)
sleep(sec)
stop()
def reverse(speed=DEFAULT_SPEED):
stop()
left.move(speed, forward=False)
right.move(speed, forward=False)
def turn_right(speed=DEFAULT_SPEED, rate=0.3):
left.move(speed+rate)
right.move(speed)
def turn_left(speed=DEFAULT_SPEED, rate=0.3):
left.move(speed)
right.move(speed+rate)
def spin_right(speed=DEFAULT_SPEED):
left.move(speed)
right.move(speed, forward=False)
def spin_left(speed=DEFAULT_SPEED):
left.move(speed, forward=False)
right.move(speed)
tacheometry.py
# Jacobus Burger (2024-07-04)
import math
import time
import gpiozero
# CONSTANTS
CW = 1
CCW = -1
RADIUS = 0.085 # meters
PULSES_PER_REV = 42 # should be 90 in theory since 30 poles * 3 sensors = 90
# Tachometry class
class WheelEncoder:
# public result values
radius: float # radius of the motor
rpm: float # speed of rotation (Revolutions Per Minute)
direction: int # direction of rotation (1 for CW, -1 for CCW)
theta: float # absolute angle of current revolution (in radians)
length: float # absolute arc length of current revolution
# private variables
# constant pin devices
__U: gpiozero.DigitalInputDevice
__V: gpiozero.DigitalInputDevice
__W: gpiozero.DigitalInputDevice
# last interrupt caller pin (for direction)
__prev_pin: gpiozero.DigitalInputDevice
# public last interupt time
__prev_time: float
# pulse period T (for rpm)
__pulse_time_u: float
__pulse_time_v: float
__pulse_time_w: float
__pulse_time_avg: float
# number of pulses elapsed (for theta and length)
pulse_count: int
# smoothing value of Linear intERPolation (for rpm)
__alpha: float # between 0 and 1 inclusive
# helpers
__reversed: bool # reverse returned RPM direction
__debug: bool # print state of variables and functions
def __init__(self, u_pin, v_pin, w_pin, radius, reversed=False, debug=False, alpha=0.5):
# define pins and info
self.__U = gpiozero.DigitalInputDevice(u_pin)
self.__V = gpiozero.DigitalInputDevice(v_pin)
self.__W = gpiozero.DigitalInputDevice(w_pin)
self.__reversed = reversed
self.__debug = debug
# setup interrupts
self.__U.when_activated = self.__u_int
self.__V.when_activated = self.__v_int
self.__W.when_activated = self.__w_int
# setup default values
self.__prev_pin = None # may cause false direction for first pulse if not CW
self.__prev_time = 0
self.pulse_count = 0
self.__pulse_time_u = 0
self.__pulse_time_v = 0
self.__pulse_time_w = 0
self.__pulse_time_avg = 0
self.__alpha = alpha
# setup state values
self.radius = radius
self.rpm = 0
self.direction = CW if not reversed else CCW
self.theta = 0
self.length = 0
def lerp(self, previous, current, alpha=None):
if alpha is None:
alpha = self.__alpha
result = (1 - alpha) * previous + alpha * current
if self.__debug: print("lerp()")
if self.__debug: print(" prev {}".format(previous))
if self.__debug: print(" curr {}".format(current))
if self.__debug: print(" alpha {}".format(alpha))
if self.__debug: print(" lerp {}".format(result))
return result
def get_rpm(self, pulse_time_avg):
f = 1 / pulse_time_avg # f = 1 / T
# was: result = 2 * (f / PULSES_PER_REV) * 60
result = (f / PULSES_PER_REV) * 60
if self.__debug: print("get_rpm()")
if self.__debug: print(" pulse_time_avg {} seconds".format(pulse_time_avg))
if self.__debug: print(" frequency {} Hertz".format(f))
if self.__debug: print(" rpm {} RPM".format(result))
return result
def get_theta(self, pulse_count):
R = pulse_count / PULSES_PER_REV # 14 pulses from each of the 3 sensors in each revolution based on testing, 42 in total
result = (2*math.pi) * R
if self.__debug: print("get_theta()")
if self.__debug: print(" pulse_count {} pulses".format(pulse_count))
if self.__debug: print(" R {} revolutions".format(R))
if self.__debug: print(" theta {} radians".format(result))
return result
def get_length(self, pulse_count):
theta = self.get_theta(pulse_count)
result = self.radius * theta
if self.__debug: print("get_length()")
if self.__debug: print(" pulse_count {} pulses".format(pulse_count))
if self.__debug: print(" r {} units".format(self.radius))
if self.__debug: print(" theta {} radians".format(theta))
if self.__debug: print(" length {} units".format(result))
return result
def __u_int(self):
# debug info before
if self.__debug:
print("__u_int() before")
print(" __prev_pin {}".format(self.__prev_pin))
print(" __prev_time {} seconds".format(self.__prev_time))
print(" __pulse_time_u {} seconds".format(self.__pulse_time_u))
print(" __pulse_time_v {} seconds".format(self.__pulse_time_v))
print(" __pulse_time_w {} seconds".format(self.__pulse_time_w))
print(" __pulse_time_avg {} seconds".format(self.__pulse_time_avg))
print(" rpm {} RPM".format(self.rpm))
print(" direction {}".format(self.direction))
print(" theta {} radians".format(self.theta))
print(" length {} units".format(self.length))
print(" U V W {} {} {}".format(self.__U.value, self.__V.value, self.__W.value))
# update pulse time
elapsed_time = time.time()
self.__pulse_time_u = elapsed_time - self.__prev_time
self.__pulse_time_avg = (self.__pulse_time_u + self.__pulse_time_v + self.__pulse_time_w) / 3
# update rpm
prev_rpm = self.rpm
self.rpm = self.lerp(prev_rpm, self.get_rpm(self.__pulse_time_avg), self.__alpha)
self.__prev_time = elapsed_time
# update direction
if self.__prev_pin == self.__V:
self.direction = CW if not self.__reversed else CCW
if self.__prev_pin == self.__W:
self.direction = CCW if not self.__reversed else CW
self.__prev_pin = self.__U
# if not self.__reversed:
# self.direction = CCW if self.__U.value == self.__W.value else CW
# else:
# self.direction = CW if self.__U.value == self.__W.value else CCW
# update pulse count
self.pulse_count = (self.pulse_count + self.direction) % PULSES_PER_REV
# update angle position theta
self.theta = self.get_theta(self.pulse_count)
# update traveled arc length
self.length = self.get_length(self.pulse_count)
# debug info after
if self.__debug:
print("__u_int() after")
print(" __prev_pin {}".format(self.__prev_pin))
print(" __prev_time {} seconds".format(self.__prev_time))
print(" __pulse_time_u {} seconds".format(self.__pulse_time_u))
print(" __pulse_time_v {} seconds".format(self.__pulse_time_v))
print(" __pulse_time_w {} seconds".format(self.__pulse_time_w))
print(" __pulse_time_avg {} seconds".format(self.__pulse_time_avg))
print(" rpm {} RPM".format(self.rpm))
print(" direction {}".format(self.direction))
print(" theta {} radians".format(self.theta))
print(" length {} units".format(self.length))
print(" U V W {} {} {}".format(self.__U.value, self.__V.value, self.__W.value))
def __v_int(self):
# debug info before
if self.__debug:
print("__v_int() before")
print(" __prev_pin {}".format(self.__prev_pin))
print(" __prev_time {} seconds".format(self.__prev_time))
print(" __pulse_time_u {} seconds".format(self.__pulse_time_u))
print(" __pulse_time_v {} seconds".format(self.__pulse_time_v))
print(" __pulse_time_w {} seconds".format(self.__pulse_time_w))
print(" __pulse_time_avg {} seconds".format(self.__pulse_time_avg))
print(" rpm {} RPM".format(self.rpm))
print(" direction {}".format(self.direction))
print(" theta {} radians".format(self.theta))
print(" length {} units".format(self.length))
print(" U V W {} {} {}".format(self.__U.value, self.__V.value, self.__W.value))
# update pulse time
elapsed_time = time.time()
self.__pulse_time_v = elapsed_time - self.__prev_time
self.__pulse_time_avg = (self.__pulse_time_u + self.__pulse_time_v + self.__pulse_time_w) / 3
# update rpm
prev_rpm = self.rpm
self.rpm = self.lerp(prev_rpm, self.get_rpm(self.__pulse_time_avg), self.__alpha)
self.__prev_time = elapsed_time
# update direction
if self.__prev_pin == self.__W:
self.direction = CW if not self.__reversed else CCW
if self.__prev_pin == self.__U:
self.direction = CCW if not self.__reversed else CW
self.__prev_pin = self.__V
# if not self.__reversed:
# self.direction = CCW if self.__V.value == self.__U.value else CW
# else:
# self.direction = CW if self.__V.value == self.__U.value else CCW
# update pulse count
self.pulse_count = (self.pulse_count + self.direction) % PULSES_PER_REV
# update angle position theta
self.theta = self.get_theta(self.pulse_count)
# update traveled arc length
self.length = self.get_length(self.pulse_count)
# debug info after
if self.__debug:
print("__v_int() after")
print(" __prev_pin {}".format(self.__prev_pin))
print(" __prev_time {} seconds".format(self.__prev_time))
print(" __pulse_time_u {} seconds".format(self.__pulse_time_u))
print(" __pulse_time_v {} seconds".format(self.__pulse_time_v))
print(" __pulse_time_w {} seconds".format(self.__pulse_time_w))
print(" __pulse_time_avg {} seconds".format(self.__pulse_time_avg))
print(" rpm {} RPM".format(self.rpm))
print(" direction {}".format(self.direction))
print(" theta {} radians".format(self.theta))
print(" length {} units".format(self.length))
print(" U V W {} {} {}".format(self.__U.value, self.__V.value, self.__W.value))
def __w_int(self):
# debug info before
if self.__debug:
print("__w_int() before")
print(" __prev_time {} seconds".format(self.__prev_time))
print(" __pulse_time_u {} seconds".format(self.__pulse_time_u))
print(" __pulse_time_v {} seconds".format(self.__pulse_time_v))
print(" __pulse_time_w {} seconds".format(self.__pulse_time_w))
print(" __pulse_time_avg {} seconds".format(self.__pulse_time_avg))
print(" rpm {} RPM".format(self.rpm))
print(" direction {}".format(self.direction))
print(" theta {} radians".format(self.theta))
print(" length {} units".format(self.length))
print(" U V W {} {} {}".format(self.__U.value, self.__V.value, self.__W.value))
# update pulse time
elapsed_time = time.time()
self.__pulse_time_w = elapsed_time - self.__prev_time
self.__pulse_time_avg = (self.__pulse_time_u + self.__pulse_time_v + self.__pulse_time_w) / 3
# update rpm
prev_rpm = self.rpm
self.rpm = self.lerp(prev_rpm, self.get_rpm(self.__pulse_time_avg), self.__alpha)
self.__prev_time = elapsed_time
# update direction
if self.__prev_pin == self.__U:
self.direction = CW if not self.__reversed else CCW
if self.__prev_pin == self.__V:
self.direction = CCW if not self.__reversed else CW
self.__prev_pin = self.__W
# if not self.__reversed:
# self.direction = CCW if self.__W.value == self.__V.value else CW
# else:
# self.direction = CW if self.__W.value == self.__V.value else CCW
# update pulse count
self.pulse_count = (self.pulse_count + self.direction) % PULSES_PER_REV
# update angle position theta
self.theta = self.get_theta(self.pulse_count)
# update traveled arc length
self.length = self.get_length(self.pulse_count)
# debug info after
if self.__debug:
print("__v_int() after")
print(" __prev_pin {}".format(self.__prev_pin))
print(" __prev_time {} seconds".format(self.__prev_time))
print(" __pulse_time_u {} seconds".format(self.__pulse_time_u))
print(" __pulse_time_v {} seconds".format(self.__pulse_time_v))
print(" __pulse_time_w {} seconds".format(self.__pulse_time_w))
print(" __pulse_time_avg {} seconds".format(self.__pulse_time_avg))
print(" rpm {} RPM".format(self.rpm))
print(" direction {}".format(self.direction))
print(" theta {} radians".format(self.theta))
print(" length {} units".format(self.length))
print(" U V W {} {} {}".format(self.__U.value, self.__V.value, self.__W.value))
def timed_out(self, timeout):
elapsed_time = time.time()
if self.__debug: print("timed_out({})".format(timeout))
if self.__debug: print(" elapsed_time: {}".format(elapsed_time))
if self.__debug: print(" self.__prev_time: {}".format(self.__prev_time))
if elapsed_time - self.__prev_time >= timeout:
return True
return False
def rpm_to_rads(self, rpm):
return (rpm / 60) * 2*math.pi
def rpm_to_vel(self, rpm):
return self.rpm_to_rads(rpm) * self.radius
More Code?
For more information of the programs and codes, please refer to the GitHub