Skip to content

Commit

Permalink
Modified LaneDetection.py to publish images (in a very clean manner),…
Browse files Browse the repository at this point in the history
… and added subscriber functionality to the UI
  • Loading branch information
Vaibhav-Hariani committed Mar 29, 2024
1 parent d37836f commit 95b7453
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 54 deletions.
57 changes: 34 additions & 23 deletions UI/pages/🛣 Lane_Detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,42 @@
from cv_bridge import CvBridge
import typing

IMG_CATEGORIES = ["raw","tf","sliding"]

class Image_Handler(Node):

def __init__(self):
self._tabs = None
self._img = None
self._tab_dict = None
self._frame_containers = None
super().__init__('Streamlit_Image_Handler')
self._bridge = CvBridge()
self.camData_subscriber = self.create_subscription(
Image,
'/camera/image',
self.camData_callback,
qos_profile_sensor_data)
image_labels = ("raw_left", "raw_right", "tf_left", "tf_right", "sliding_left", "sliding_right")
self._subscribers = (self.create_subscription(Image, "/" + label, lambda msg: self.camData_callback(msg,label),qos_profile_sensor_data) for label in image_labels)


def tabs(self,tabs):
self._tabs = tabs
# SRC is cv2.VideoCapture FOR NOW: Will need to be tweaked in the future to rossify
self._frame_containers = [tab.empty() for tab in self._tabs]
self._tab_dict = tabs


def camData_callback(self, msg_in,label):
print("Put image from %s" % label)
img = self._bridge.imgmsg_to_cv2(msg_in)
self.render_imgs(img,label)

def render_imgs(self,img,label):
#Specifies which camera the image is from
which_cam = 0
if img is not None:
type, cam_label = label.split("_")
match cam_label:
case "left":
which_cam = 0
case "right":
which_cam = 1
tab_dict[type][cam_label].image(cv2.cvtColor(self._img,cv2.COLOR_BGR2RGB),channels="RGB")


def camData_callback(self, msg_in):
raw = msg_in
self._img = self._bridge.imgmsg_to_cv2(raw)
self.render_imgs()

def render_imgs(self):
# SRC is cv2.VideoCapture FOR NOW: Will need to be tweaked in the future to rossify
if self._img is not None:
for frame_container in self._frame_containers:
frame_container.image(cv2.cvtColor(self._img,cv2.COLOR_BGR2RGB),channels="RGB")

@st.cache_resource
def image_instantiator():
Expand All @@ -53,16 +59,21 @@ def image_instantiator():
page_icon="🛣")
st.write(
"This should render all of the images related to Lane Detection, and relevant parameters.")
render = st.checkbox("Display Parameters")
tabs = st.tabs(["Original", "HSV", "Sliding Windows"])
render = st.checkbox("Display Camera Output")
tabs = st.tabs(IMG_CATEGORIES)
imgs_w_places = tuple(zip(tabs, [tab.columns(2) for tab in tabs]))
#Dictionary
tab_dict = dict(zip(IMG_CATEGORIES,imgs_w_places))

if not render:
if render:
ros_node = image_instantiator()
ros_node.tabs(tabs)
ros_node.tabs(tab_dict)
rclpy.spin(ros_node)
ros_node.destroy_node()
rclpy.shutdown()


# I Don't even know if we'll want this, but it's a nice to have anyway
Restart = st.button("ESTOP", type="primary")


79 changes: 48 additions & 31 deletions ros2/src/lanefollowingtest/LaneDetection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data

from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np


# Inputs from both cameras
vidcap_left = cv2.VideoCapture("/dev/video0")
vidcap_left.set(3,640)
vidcap_left.set(4,480)
vidcap_right = cv2.VideoCapture("/dev/video2")
vidcap_right.set(3,640)
vidcap_right.set(4,480)
vidcap_left.set(3, 640)
vidcap_left.set(4, 480)
vidcap_right = cv2.VideoCapture("/dev/video4")
vidcap_right.set(3, 640)
vidcap_right.set(4, 480)

# These are constants
nwindows = 9
Expand All @@ -32,8 +33,8 @@
def nothing(x):
pass

class Individual_Follower():

class Individual_Follower():

def __init__(self):
self._fit = None
Expand Down Expand Up @@ -124,7 +125,7 @@ def Plot_Line(self, smoothen=False, prevFrameCount=6):
return result


#These are constants for the timer callback: Should be modified by UI
# These are constants for the timer callback: Should be modified by UI
l_h = 0
l_s = 0
l_v = 200
Expand All @@ -136,7 +137,7 @@ def Plot_Line(self, smoothen=False, prevFrameCount=6):
lower = np.array([l_h, l_s, l_v])
upper = np.array([u_h, u_s, u_v])

#Coordinates for the 4 alignment points: again, should be handled by the UI
# Coordinates for the 4 alignment points: again, should be handled by the UI
bl = (12, 355)
tl = (66, 304)
br = (635, 344)
Expand All @@ -148,9 +149,13 @@ def Plot_Line(self, smoothen=False, prevFrameCount=6):
matrix = cv2.getPerspectiveTransform(pts1, pts2)




class Lane_Follower(Node):
GUI = True

def __init__(self):

super().__init__('lane_detection_node')
self._tolerance = 0
self._left_follower = Individual_Follower()
Expand All @@ -162,29 +167,41 @@ def __init__(self):
# Publisher for error from the lane lines relative to the camera FOV
self.camData_publisher = self.create_publisher(
Float64, '/cam_data', 10)

#GUI Controller Initializer
if Lane_Follower.GUI:
self._bridge = CvBridge()
image_labels = ("raw_left", "raw_right", "tf_left", "tf_right", "sliding_left", "sliding_right")
self._publishers = {label: self.create_publisher(Image, "/" + label, 10) for label in image_labels}

def img_publish(self, label, img_raw):
self._publishers[label].publish(self._bridge.cv2_to_imgmsg(img_raw,encoding="bgr8"))


def measure_position_meters(self,left,right):
def measure_position_meters(self, left, right):
left_x_pos = 0
right_x_pos = 0


# Will be the same for left side & right side
y_max = self._left_follower._binary_warped.shape[0]

# Calculate left and right line positions at the bottom of the image
if left is not None:
if left is not None:
left_fit = self._left_follower._fit
left_x_pos = left_fit[0]*y_max**2 + \
left_fit[1]*y_max + left_fit[2]
cv2.imshow("Result Left", left)

if(Lane_Follower.GUI):
self.img_publish("sliding_left", left)
cv2.imshow("Result Left", left)

if right is not None:
right_fit = self._right_follower._fit
right_x_pos = right_fit[0]*y_max**2 + \
right_fit[1]*y_max + right_fit[2]
cv2.imshow("Result Right", right)

if(Lane_Follower.GUI):
self.img_publish("sliding_right", right)
cv2.imshow("Result Right", right)

center_lanes_x_pos = (left_x_pos + right_x_pos)//2
# Calculate the deviation between the center of the lane and the center of the picture
Expand All @@ -193,47 +210,48 @@ def measure_position_meters(self,left,right):
veh_pos = (
(self._left_follower._binary_warped.shape[1]//2) - center_lanes_x_pos) * xm_per_pix

if(left is None):
if (left is None):
veh_pos += 91
elif (right is None):
veh_pos -= 91
return veh_pos / 100


def timer_callback(self):
success_l, image_l = vidcap_left.read()
success_r, image_r = vidcap_right.read()
images = [(image_l, "Left"), (image_r, "Right")]
if not(success_l and success_r):
images = [(image_l, "left"), (image_r, "right")]
if not (success_l and success_r):
return

for image in images:
frame = image[0]
# frame = cv2.resize(image[0], (640, 480))
# I might've cooked with this list comprehension
for point in (bl,tl,br,tr):
frame = cv2.circle(frame,point,5,(0,0,255),-1)

for point in (bl, tl, br, tr):
frame = cv2.circle(frame, point, 5, (0, 0, 255), -1)
transformed_frame = cv2.warpPerspective(
frame, matrix, (640, 480))
# Object Detection
# Image Thresholding
hsv_transformed_frame = cv2.cvtColor(
transformed_frame, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv_transformed_frame, lower, upper)
if image[1] == "Left":
if image[1] == "left":
self._left_follower.set_binwarp(binwarp=mask)
else:
self._right_follower.set_binwarp(binwarp=mask)

cv2.imshow("Original " + image[1], frame)
cv2.imshow("Bird's Eye View " + image[1], transformed_frame)

if(Lane_Follower.GUI):
self.img_publish("raw_" +image[1],frame)
self.img_publish("tf_" + image[1],transformed_frame)
cv2.imshow("Original " + image[1], frame)
cv2.imshow("Bird's Eye View " + image[1], transformed_frame)

result_left = self._left_follower.Plot_Line()
result_right = self._right_follower.Plot_Line()

msg_out = Float64()
#TODO: Is this the behavior we want? Or do we need it to do something else if one of the lines is invalid?
# TODO: Is this the behavior we want? Or do we need it to do something else if one of the lines is invalid?
if (result_left is not None or result_right is not None):
pos = self.measure_position_meters(result_left, result_right)
print(pos)
Expand All @@ -242,14 +260,13 @@ def timer_callback(self):
else:
TOLERANCE = 100
self._tolerance += 1
if(self._tolerance > TOLERANCE):
if (self._tolerance > TOLERANCE):
msg_out.data = 1000.0
self.camData_publisher.publish(msg_out)


if cv2.waitKey(10) == 27:
return


def main(args=None):
rclpy.init(args=args) # Initialize ROS2 program
Expand Down

0 comments on commit 95b7453

Please sign in to comment.