From 22092e488fc08ca80d9cc4e5c70319d04fec61b5 Mon Sep 17 00:00:00 2001 From: Vaibhav Hariani Date: Tue, 21 May 2024 14:45:41 -0400 Subject: [PATCH] Modified lane detection for a cleaner, more efficient, and likely more accurate aproach to lane determination. --- ros2/src/lanefollowingtest/LaneDetection.py | 120 +++++++++----------- 1 file changed, 55 insertions(+), 65 deletions(-) diff --git a/ros2/src/lanefollowingtest/LaneDetection.py b/ros2/src/lanefollowingtest/LaneDetection.py index bb67184c6..3f0d9e58f 100644 --- a/ros2/src/lanefollowingtest/LaneDetection.py +++ b/ros2/src/lanefollowingtest/LaneDetection.py @@ -8,6 +8,7 @@ import cv2 import numpy as np from std_msgs.msg import String +from numpy.polynomial import Polynomial def nothing(x): @@ -33,14 +34,12 @@ def Plot_Line(self, smoothen=False, prevFrameCount=6): minpix = 20 histogram = np.sum( - self._binary_warped[self._binary_warped.shape[0] // 2 :, :], axis=0 + self._binary_warped[self._binary_warped.shape[0] // 2:, :], axis=0 ) - # Create an output image to draw on and visualize the result + # Create an output image to draw on and visualize the result out_img = ( - np.dstack( - (self._binary_warped, self._binary_warped, self._binary_warped) - ) - * 255 + np.dstack((self._binary_warped, self._binary_warped, + self._binary_warped)) * 255 ) base = np.argmax(histogram[:]) @@ -54,6 +53,9 @@ def Plot_Line(self, smoothen=False, prevFrameCount=6): nonzerox = np.array(nonzero[1]) current = base + # Used for calculating the number of "empty windows". + #Relevant for our lane determination: the more empty windows, the more likely this is the dashed line + empty_windows = 0 # Step through the windows one by one for window in range(nwindows): # Identify window boundaries in x and y (and right and left) @@ -71,17 +73,20 @@ def Plot_Line(self, smoothen=False, prevFrameCount=6): (0, 255, 0), 2, ) - # Identify the nonzero pixels in x and y within the window + # Identify the nonzero pixels in x and y within the window: inclusive of lower bounds good_inds = ( (nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & (nonzerox >= win_x_low) & (nonzerox < win_x_high) ).nonzero()[0] + # Append these indices to the lists lane_inds.append(good_inds) if len(good_inds) > minpix: current = np.int32(np.mean(nonzerox[good_inds])) + else: + empty_windows += 1 # Concatenate the arrays of indices if len(lane_inds) > 0: @@ -93,13 +98,13 @@ def Plot_Line(self, smoothen=False, prevFrameCount=6): # Fit a second order polynomial to each if x_pos.any() and y_pos.any(): - self._fit = np.polyfit(y_pos, x_pos, 2) + self._fit = Polynomial.fit(y_pos, x_pos, 2) else: - return None + return None, None ploty = np.linspace( 0, self._binary_warped.shape[0] - 1, self._binary_warped.shape[0] - ) + ) fitx = self._fit[0] * ploty**2 + self._fit[1] * ploty + self._fit[2] out_img[nonzeroy[lane_inds], nonzerox[lane_inds]] = [255, 0, 0] @@ -129,7 +134,7 @@ def Plot_Line(self, smoothen=False, prevFrameCount=6): ) result = cv2.addWeighted(out_img, 1, window_img, 0.3, 0) - return result + return result, empty_windows class Lane_Follower(Node): @@ -246,41 +251,39 @@ def measure_position_meters(self, left, right): return veh_pos - def determine_lane(self, img, label): - # Taking in both warped images, determine which lane line is the longer one, and ergo the "solid line", - # Based on that line, return the heading. - # This may struggle on turns, but might work depending: Will need to characterize - # If this needs to be modified, can be converted to a contour size detection instead. That code exsits in Yolo_World_Detection already. - - edges = cv2.Canny(img, 50, 150) - lines = cv2.HoughLinesP( - edges, 1, np.pi / 180, 100, minLineLength=50, maxLineGap=5 - ) - m_length = 0 - heading = 0 - maxs = [0, 0, 0, 0] - if lines is not None: - for line in lines: - x1, y1, x2, y2 = line[0] - length = (x1 - x2) ^ 2 + (y1 - y2) ^ 2 - m_length = max(m_length, length) - if (m_length) == length: - maxs[0] = x1 - maxs[1] = y1 - maxs[2] = x2 - maxs[3] = y2 - cos_theta = math.sqrt(length) / ((y1 - y2)) - heading = math.acos(cos_theta) - img_disp = cv2.line( - img, - (maxs[0], maxs[1]), - (maxs[2], maxs[3]), - (0, 0, 255), - thickness=10, - ) - cv2.imshow("LANE DETERMINATION" + label, img_disp) - - return m_length, math.degrees(heading) + # def determine_lane(self, img, label): + # # Taking in both warped images, determine which lane line is the longer one, and ergo the "solid line", + # # Based on that line, return the heading. + + # edges = cv2.Canny(img, 50, 150) + # lines = cv2.HoughLinesP( + # edges, 1, np.pi / 180, 100, minLineLength=50, maxLineGap=5 + # ) + # m_length = 0 + # heading = 0 + # maxs = [0, 0, 0, 0] + # if lines is not None: + # for line in lines: + # x1, y1, x2, y2 = line[0] + # length = (x1 - x2) ^ 2 + (y1 - y2) ^ 2 + # m_length = max(m_length, length) + # if (m_length) == length: + # maxs[0] = x1 + # maxs[1] = y1 + # maxs[2] = x2 + # maxs[3] = y2 + # cos_theta = math.sqrt(length) / ((y1 - y2)) + # heading = math.acos(cos_theta) + # img_disp = cv2.line( + # img, + # (maxs[0], maxs[1]), + # (maxs[2], maxs[3]), + # (0, 0, 255), + # thickness=10, + # ) + # cv2.imshow("LANE DETERMINATION" + label, img_disp) + + # return m_length, math.degrees(heading) def timer_callback(self): success_l, image_l = self.vidcap_left.read() @@ -318,7 +321,6 @@ def timer_callback(self): mask = cv2.inRange( hsv_transformed_frame, Lane_Follower.LOWER, Lane_Follower.UPPER ) - # cv2.imshow("MASKED IMAGE" + image[1],mask) if image[1] == "left": self._left_follower.set_binwarp(mask) @@ -331,9 +333,10 @@ def timer_callback(self): self.img_publish("raw_" + image[1], frame) self.img_publish("tf_" + image[1], transformed_frame) - result_left = self._left_follower.Plot_Line() - result_right = self._right_follower.Plot_Line() + result_left, empty_left = self._left_follower.Plot_Line() + result_right, empty_right = self._right_follower.Plot_Line() crosstrack = Float64() + heading = Float64() # TODO: Is this the behavior we want? Or do we need it to do something else if one of the lines is invalid? if result_left is not None or result_right is not None: @@ -341,28 +344,15 @@ def timer_callback(self): print(pos) crosstrack.data = pos self.crosstrack_pub.publish(crosstrack) - msg = String() - # Checking if the difference is substantial enough to warrant a change - if left_buffer - right_buffer > Lane_Follower.LANE_TOLERANCE: - msg.data = "In Left lane" - self.lane_pubs.publish(msg) - self._Left_Lane = True - - elif right_buffer - left_buffer > Lane_Follower.LANE_TOLERANCE: - msg.data = "In Right lane" - self.lane_pubs.publish(msg) - self._Left_Lane = False + + self._Left_Lane = True if empty_left < empty_right else self._Left_Lane + self._Left_Lane = False if empty_left > empty_right else self._Left_Lane # Heading message heading = Float64() heading.data = left_heading if self._Left_Lane else right_heading self.heading_pub.publish(heading) - # This is our way of handling a loss of data from both cameras - # TODO: Incorporate this with Stanley as our error parameter. - if cv2.waitKey(10) == 27: - return - else: TOLERANCE = 100 self._tolerance += 1