-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
212 lines (169 loc) · 6.94 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
from __future__ import print_function
#Python Headers
import math
import os
# ROS Headers
import rospy
# GEM PACMod Headers
from std_msgs.msg import Header, Bool
from pacmod_msgs.msg import PacmodCmd, PositionWithSpeed
from sensor_msgs.msg import Image
import time
# CV image
from cv_bridge import CvBridge
import cv2
import imutils
import numpy as np
import torch
from utils import detect_streams, detect_static
import cv2
import time
class vars_:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
img_size = 640 # avoid touching this value unless necessary
conf_thres = 0.25
iou_thres = 0.45 # incresing this will make the model detect objects (for object detection) which it is unsure of
threshold = 70 # minimum number of votes (intersections in Hough grid cell)
min_line_length = 4 # minimum number of pixels making up a line
max_line_gap = 70 # maximum gap in pixels between connectable line segments
lane_boundary_top = 450 # top limit of the box in which we are considering the lane boundaries
lane_boundary_top_offset = 450 # its the amount of pixels we are leaving at either side at the top of box inside which we consider lane boundaries.
lane_boundary_bottom_offset = 150 # its the amount of pixels we are leaving at either side at the bottom of box inside which we consider lane boundaries.
text_color = (0,0,255)
implement_half_precision = True # set to false for making model more precise (making it more precise will increase the processing time!)
save_dir = "temp/"
# source = '0' # [path to image/video] OR ['0' for camera on PC]
source = 'test_data/car.jpeg'
weights = "model_weights/End-to-end.pth" # models weights
class Ego :
def __init__(self) :
rospy.init_node('ego', anonymous=True)
# Config sub/pub
self.image_sub = rospy.Subscriber("/zed2/zed_node/rgb_raw/image_raw_color", Image, self.callback)
self.brake_pub = rospy.Publisher("/pacmod/as_rx/brake_cmd", PacmodCmd, queue_size = 1)
self.accel_pub = rospy.Publisher("/pacmod/as_rx/accel_cmd", PacmodCmd, queue_size = 1)
self.angle_pub = rospy.Publisher("/pacmod/as_rx/steer_cmd", PositionWithSpeed, queue_size = 1)
self.angle_msg = PositionWithSpeed()
self.accel_cmd = PacmodCmd()
self.enable_pub = rospy.Publisher("/pacmod/as_rx/enable", Bool)
self.brake_cmd = PacmodCmd()
self.data_buffer = []
self.rate = rospy.Rate(10)
# Ego state
self.should_emergency_brake = False
self.cv_bridge = CvBridge()
# pedestrian detector
self.hog = cv2.HOGDescriptor()
self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
self.threshold = 0.5
# Other config
self.angular_velocity_limit = 0.4
# This one should directly go to flashdisk
self.image_path = None
# Event loop
def run(self) :
# enable control
self.enable_pub.publish(True)
while not rospy.is_shutdown() :
self.rate.sleep()
def record(self, msg) :
self.data_buffer.append(msg)
# self.flush()
def flush(self) :
if len(self.data_buffer) > 0 and len(self.data_buffer)%100 == 0 :
for data in self.data_buffer[-100:] :
image = self.cv_bridge.imgmsg_to_cv2(data)
ts = round(time.time() * 1000) # 1/1000 of a second
fname = str(ts) + ".jpg"
if self.image_path is None :
cv2.imwrite(fname, image)
else :
cv2.imwrite(os.path.join(self.image_path, fname), image)
"""
input: image matrix
return value: True if one of the (pedestrian) bboxes have acc>0.8, else returns False
"""
def pedestrian_exists(self, img) :
img = imutils.resize(img, width=min(400, img.shape[1]))
img = img[:,:,0:3]
(rects, weights) = self.hog.detectMultiScale(
img.astype(np.uint8), winStride=(4, 4), padding=(8, 8), scale=1.05
)
for i in weights:
if i > self.threshold:
return True
return False
def callback(self, msg) :
# print("callback", msg)
self.record(msg)
# Process current knowns state of the system
def process(self) :
if not self.data_buffer:
# print("qqq")
return
cv_image = self.cv_bridge.imgmsg_to_cv2(self.data_buffer[-1])
cv2.imwrite('temp/img.jpeg', cv_image)
opt = vars_
opt.source = 'test_data/img.jpeg'
# you get data
with torch.no_grad():
if opt.source != '0':
# steering angle is in degrees.
steering_angle, obstacle_present = detect_static(opt)
img = cv2.imread('temp/img.jpeg')
cv2.imshow('image', img)
cv2.waitKey(1)
# So that vehicle doesnt turn for small angles
''' Angle '''
self.angle = self.__get_angle(steering_angle)
''' Obstacles '''
if obstacle_present :
print("obstacle detected")
self.should_emergency_brake = True
else :
print("obstacle not detected")
self.should_emergency_brake = False
def __get_angle(self, angle) :
if -0.05 < angle < 0.05 :
return 0
return angle
def control(self) :
''' Angle '''
self.angle_cmd.angular_position = self.angle
self.angle_cmd.angular_velocity_limit = self.angular_velocity_limit
self.angle_pub.publish(self.angle_cmd)
''' Obstacle '''
if self.should_emergency_brake :
self._apply_brake()
self.accel_cmd.f64_cmd = 0.0
self.accel_cmd.enable = True
self.accel_pub.publish(self.accel_cmd)
else :
self._release_brake()
self.accel_cmd.f64_cmd = 0.32
self.accel_cmd.enable = True
self.accel_pub.publish(self.accel_cmd)
def _apply_brake(self) :
self.brake_cmd.f64_cmd = 0.6
self.brake_cmd.enable = True
self.brake_pub.publish(self.brake_cmd)
def _release_brake(self) :
self.brake_cmd.f64_cmd = 0
self.brake_cmd.enable = True
self.brake_pub.publish(self.brake_cmd)
if __name__ == '__main__':
ego = Ego()
ego.run()
# opt = vars_
# with torch.no_grad():
# if opt.source != '0':
# detect_static(opt)
# else:
# for img, steering_angle, obstacle_in_way in detect_streams(opt):
# print(steering_angle)
# cv2.imshow('image', img)
# cv2.waitKey(1) # 1 millisecond
# Todo:
# 1. Check bboxs and return if one is too big or obj too close (use lidar inferences)
# 2. Warning if going out of drivable region
# 3. Write car translation code