diff --git "a/UI/pages/\360\237\233\243 Lane_Detection.py" "b/UI/pages/\360\237\233\243 Lane_Detection.py" index 7fa607102..5c49fa3c3 100644 --- "a/UI/pages/\360\237\233\243 Lane_Detection.py" +++ "b/UI/pages/\360\237\233\243 Lane_Detection.py" @@ -7,36 +7,42 @@ from cv_bridge import CvBridge import typing +IMG_CATEGORIES = ["raw","tf","sliding"] + class Image_Handler(Node): def __init__(self): - self._tabs = None - self._img = None + self._tab_dict = None self._frame_containers = None super().__init__('Streamlit_Image_Handler') self._bridge = CvBridge() - self.camData_subscriber = self.create_subscription( - Image, - '/camera/image', - self.camData_callback, - qos_profile_sensor_data) + image_labels = ("raw_left", "raw_right", "tf_left", "tf_right", "sliding_left", "sliding_right") + self._subscribers = (self.create_subscription(Image, "/" + label, lambda msg: self.camData_callback(msg,label),qos_profile_sensor_data) for label in image_labels) + def tabs(self,tabs): - self._tabs = tabs - # SRC is cv2.VideoCapture FOR NOW: Will need to be tweaked in the future to rossify - self._frame_containers = [tab.empty() for tab in self._tabs] + self._tab_dict = tabs + + + def camData_callback(self, msg_in,label): + print("Put image from %s" % label) + img = self._bridge.imgmsg_to_cv2(msg_in) + self.render_imgs(img,label) + + def render_imgs(self,img,label): + #Specifies which camera the image is from + which_cam = 0 + if img is not None: + type, cam_label = label.split("_") + match cam_label: + case "left": + which_cam = 0 + case "right": + which_cam = 1 + tab_dict[type][cam_label].image(cv2.cvtColor(self._img,cv2.COLOR_BGR2RGB),channels="RGB") - def camData_callback(self, msg_in): - raw = msg_in - self._img = self._bridge.imgmsg_to_cv2(raw) - self.render_imgs() - def render_imgs(self): - # SRC is cv2.VideoCapture FOR NOW: Will need to be tweaked in the future to rossify - if self._img is not None: - for frame_container in self._frame_containers: - frame_container.image(cv2.cvtColor(self._img,cv2.COLOR_BGR2RGB),channels="RGB") @st.cache_resource def image_instantiator(): @@ -53,12 +59,15 @@ def image_instantiator(): page_icon="🛣") st.write( "This should render all of the images related to Lane Detection, and relevant parameters.") - render = st.checkbox("Display Parameters") - tabs = st.tabs(["Original", "HSV", "Sliding Windows"]) + render = st.checkbox("Display Camera Output") + tabs = st.tabs(IMG_CATEGORIES) + imgs_w_places = tuple(zip(tabs, [tab.columns(2) for tab in tabs])) + #Dictionary + tab_dict = dict(zip(IMG_CATEGORIES,imgs_w_places)) - if not render: + if render: ros_node = image_instantiator() - ros_node.tabs(tabs) + ros_node.tabs(tab_dict) rclpy.spin(ros_node) ros_node.destroy_node() rclpy.shutdown() @@ -66,3 +75,5 @@ def image_instantiator(): # I Don't even know if we'll want this, but it's a nice to have anyway Restart = st.button("ESTOP", type="primary") + + diff --git a/ros2/src/lanefollowingtest/LaneDetection.py b/ros2/src/lanefollowingtest/LaneDetection.py index 4038d8ecc..e82038171 100644 --- a/ros2/src/lanefollowingtest/LaneDetection.py +++ b/ros2/src/lanefollowingtest/LaneDetection.py @@ -6,18 +6,19 @@ import rclpy from rclpy.node import Node from rclpy.qos import qos_profile_sensor_data - +from sensor_msgs.msg import Image +from cv_bridge import CvBridge import cv2 import numpy as np # Inputs from both cameras vidcap_left = cv2.VideoCapture("/dev/video0") -vidcap_left.set(3,640) -vidcap_left.set(4,480) -vidcap_right = cv2.VideoCapture("/dev/video2") -vidcap_right.set(3,640) -vidcap_right.set(4,480) +vidcap_left.set(3, 640) +vidcap_left.set(4, 480) +vidcap_right = cv2.VideoCapture("/dev/video4") +vidcap_right.set(3, 640) +vidcap_right.set(4, 480) # These are constants nwindows = 9 @@ -32,8 +33,8 @@ def nothing(x): pass -class Individual_Follower(): +class Individual_Follower(): def __init__(self): self._fit = None @@ -124,7 +125,7 @@ def Plot_Line(self, smoothen=False, prevFrameCount=6): return result -#These are constants for the timer callback: Should be modified by UI +# These are constants for the timer callback: Should be modified by UI l_h = 0 l_s = 0 l_v = 200 @@ -136,7 +137,7 @@ def Plot_Line(self, smoothen=False, prevFrameCount=6): lower = np.array([l_h, l_s, l_v]) upper = np.array([u_h, u_s, u_v]) -#Coordinates for the 4 alignment points: again, should be handled by the UI +# Coordinates for the 4 alignment points: again, should be handled by the UI bl = (12, 355) tl = (66, 304) br = (635, 344) @@ -148,9 +149,13 @@ def Plot_Line(self, smoothen=False, prevFrameCount=6): matrix = cv2.getPerspectiveTransform(pts1, pts2) + + class Lane_Follower(Node): + GUI = True def __init__(self): + super().__init__('lane_detection_node') self._tolerance = 0 self._left_follower = Individual_Follower() @@ -162,29 +167,41 @@ def __init__(self): # Publisher for error from the lane lines relative to the camera FOV self.camData_publisher = self.create_publisher( Float64, '/cam_data', 10) + + #GUI Controller Initializer + if Lane_Follower.GUI: + self._bridge = CvBridge() + image_labels = ("raw_left", "raw_right", "tf_left", "tf_right", "sliding_left", "sliding_right") + self._publishers = {label: self.create_publisher(Image, "/" + label, 10) for label in image_labels} + + def img_publish(self, label, img_raw): + self._publishers[label].publish(self._bridge.cv2_to_imgmsg(img_raw,encoding="bgr8")) + - def measure_position_meters(self,left,right): + def measure_position_meters(self, left, right): left_x_pos = 0 right_x_pos = 0 - # Will be the same for left side & right side y_max = self._left_follower._binary_warped.shape[0] - + # Calculate left and right line positions at the bottom of the image - if left is not None: + if left is not None: left_fit = self._left_follower._fit left_x_pos = left_fit[0]*y_max**2 + \ left_fit[1]*y_max + left_fit[2] - cv2.imshow("Result Left", left) + if(Lane_Follower.GUI): + self.img_publish("sliding_left", left) + cv2.imshow("Result Left", left) if right is not None: right_fit = self._right_follower._fit right_x_pos = right_fit[0]*y_max**2 + \ right_fit[1]*y_max + right_fit[2] - cv2.imshow("Result Right", right) - + if(Lane_Follower.GUI): + self.img_publish("sliding_right", right) + cv2.imshow("Result Right", right) center_lanes_x_pos = (left_x_pos + right_x_pos)//2 # Calculate the deviation between the center of the lane and the center of the picture @@ -193,27 +210,25 @@ def measure_position_meters(self,left,right): veh_pos = ( (self._left_follower._binary_warped.shape[1]//2) - center_lanes_x_pos) * xm_per_pix - if(left is None): + if (left is None): veh_pos += 91 elif (right is None): veh_pos -= 91 return veh_pos / 100 - def timer_callback(self): success_l, image_l = vidcap_left.read() success_r, image_r = vidcap_right.read() - images = [(image_l, "Left"), (image_r, "Right")] - if not(success_l and success_r): + images = [(image_l, "left"), (image_r, "right")] + if not (success_l and success_r): return for image in images: frame = image[0] # frame = cv2.resize(image[0], (640, 480)) # I might've cooked with this list comprehension - for point in (bl,tl,br,tr): - frame = cv2.circle(frame,point,5,(0,0,255),-1) - + for point in (bl, tl, br, tr): + frame = cv2.circle(frame, point, 5, (0, 0, 255), -1) transformed_frame = cv2.warpPerspective( frame, matrix, (640, 480)) # Object Detection @@ -221,19 +236,22 @@ def timer_callback(self): hsv_transformed_frame = cv2.cvtColor( transformed_frame, cv2.COLOR_BGR2HSV) mask = cv2.inRange(hsv_transformed_frame, lower, upper) - if image[1] == "Left": + if image[1] == "left": self._left_follower.set_binwarp(binwarp=mask) else: self._right_follower.set_binwarp(binwarp=mask) - - cv2.imshow("Original " + image[1], frame) - cv2.imshow("Bird's Eye View " + image[1], transformed_frame) + + if(Lane_Follower.GUI): + self.img_publish("raw_" +image[1],frame) + self.img_publish("tf_" + image[1],transformed_frame) + cv2.imshow("Original " + image[1], frame) + cv2.imshow("Bird's Eye View " + image[1], transformed_frame) result_left = self._left_follower.Plot_Line() result_right = self._right_follower.Plot_Line() msg_out = Float64() - #TODO: Is this the behavior we want? Or do we need it to do something else if one of the lines is invalid? + # TODO: Is this the behavior we want? Or do we need it to do something else if one of the lines is invalid? if (result_left is not None or result_right is not None): pos = self.measure_position_meters(result_left, result_right) print(pos) @@ -242,14 +260,13 @@ def timer_callback(self): else: TOLERANCE = 100 self._tolerance += 1 - if(self._tolerance > TOLERANCE): + if (self._tolerance > TOLERANCE): msg_out.data = 1000.0 self.camData_publisher.publish(msg_out) - if cv2.waitKey(10) == 27: return - + def main(args=None): rclpy.init(args=args) # Initialize ROS2 program