-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstate_machine_node
executable file
·244 lines (194 loc) · 8.89 KB
/
state_machine_node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/env python
"""
The state machine node.
Looks at the state of the robot and calls necessary services.
PUBLISHERS:
state_pub (String): publishes the current state of the robot
display_pub (Image): to display an image on the screen depending on the state of the robot
SUBSCRIBERS:
face_pose_sub (Pose): calculates and outputs the location of the face from the face detection algorithm
found_face_sub (Bool): outputs a boolean depending on wether face detection finds a face or not
SERVICES:
go_to_pose (GoToPose): commands the right arm to go to the correct location based on the face detection
go_to_neutral (Empty): commands the right arm to go to home/ ready position
blink (Trigger): blinks the lights on the right arm to trigger the thermometer
go_to_pose_left (GoToPose): commands the left arm to go to the specified position depending on the temperature reading from the thermometer
"""
import os
from pathlib import Path
import numpy as np
import cv2
import cv_bridge
import baxter_interface.digital_io as DIO
import rospy
from std_msgs.msg import String, Bool
from geometry_msgs.msg import Pose
from state_machine.srv import SetState, SetStateResponse
from sensor_msgs.msg import Image
from std_srvs.srv import Empty, EmptyRequest, EmptyResponse, Trigger
from motion.srv import GoToPose, GoToPoseResponse, GetPose, GetPoseResponse
from state_machine.displays import get_image, create_temp_image
class StateMachine:
""" Decides what state robot is in. """
def __init__(self):
# Service proxies
## Wait for services to be started
rospy.wait_for_service('/reset')
rospy.wait_for_service('/get_pose')
rospy.wait_for_service('/go_to_pose')
rospy.wait_for_service('/blink_arm_light')
rospy.wait_for_service('/go_to_joint_angle')
rospy.wait_for_service('/go_to_pose_left')
self.go_to_neutral = rospy.ServiceProxy('/go_to_joint_angle', Empty)
# self.get_pose = rospy.ServiceProxy('/get_pose', GetPose)
self.go_to_pose = rospy.ServiceProxy('/go_to_pose', GoToPose)
self.blink = rospy.ServiceProxy('/blink_arm_light', Trigger)
self.go_to_pose_left = rospy.ServiceProxy('/go_to_pose_left', GoToPose)
# State publisher
self.state_pub = rospy.Publisher('/state', String, queue_size=10)
self.state = 'search_for_face'
self.prev_state = ''
self.set_state = rospy.Service('/set_state', SetState,
self.set_state_callback)
self.rate = 100
# Face pose subscriber
self.face_pose_sub = rospy.Subscriber('/face_pose', Pose, self.face_pose_callback)
self.face_pose = None
# self.pause_face_pose = False
# Found face subscriber
self.found_face_sub = rospy.Subscriber('/found_face', Bool, self.found_face_callback)
self.found_face = False
# Display publisher
self.display_pub = rospy.Publisher('/robot/xdisplay', Image, latch=True, queue_size=1)
# Digital IO for controlling arm light
self.light_control = DIO.DigitalIO('left_outer_light')
# Initialization
## Display initial image
self.images_dir = str(Path(os.path.realpath(__file__)).parents[1])+'/images/'
self.update_screen()
## Robot should be in search_for_face pose
# self.reset()
# Timer callback
self.timer = rospy.Timer(rospy.Duration(1/self.rate), self.timer_callback)
self.go_to_neutral()
def set_state_callback(self, req):
allowable_states = ['search_for_face', 'move_to_face',
'take_temperature', 'check_temperature',
'display_result']
if req.state not in allowable_states:
raise ValueError(f'Requested state must be in {allowable_states}') # is raising an error the best approach
self.state = req.state
return SetStateResponse()
def update_screen(self):
img = get_image(self.images_dir, self.state)
msg = cv_bridge.CvBridge().cv2_to_imgmsg(img, encoding='bgr8')
self.display_pub.publish(msg)
# rospy.sleep(1)
def update_screen_temp(self, temp, temp_level):
print("Temp screen update")
img = create_temp_image(self.images_dir,temp,temp_level)
msg = cv_bridge.CvBridge().cv2_to_imgmsg(img, encoding='bgr8')
self.display_pub.publish(msg)
# rospy.sleep(1)
def set_lights(self):
# Turns on light for 10s
self.light_control.set_output(True)
rospy.sleep(10)
self.light_control.set_output(False)
def face_pose_callback(self, data):
# if not self.pause_face_pose:
self.face_pose = data
def found_face_callback(self, data):
# if not self.pause_face_pose:
self.found_face = data.data
def timer_callback(self,event):
"""
Timer callback to publish state
"""
self.state_pub.publish(self.state)
if self.state == 'search_for_face':
# Go to search_for_face pose and wait for face to be found
"""
Vision node will be publishing on 'face_pose'
If the distance is less than 3m, do not go to 'move_to_face'
"""
if self.found_face:
path = self.images_dir + 'face_found.png'
img = cv2.imread(path)
msg = cv_bridge.CvBridge().cv2_to_imgmsg(img, encoding='bgr8')
self.display_pub.publish(msg)
else:
self.update_screen()
if self.face_pose:
self.state = 'move_to_face'
if self.state != self.prev_state:
if self.state == 'move_to_face':
self.update_screen()
# self.pause_face_pose = True
# When a face is found, vision node will set state to move_to_face
# Call motion Step service to move to face pose
"""
Update topic and service call when nodes are done
"""
# face_pose = rospy.wait_for_message('face_pose',Pose,timeout=1.5)
res = self.go_to_pose(x=self.face_pose.position.x - 0.1,
y=self.face_pose.position.y,
z=self.face_pose.position.z,
r=0.26,
p=-1.5,
yaw=2.91)
# self.state = 'search_for_face'
# self.pause_face_pose = False
# self.face_pose = None
self.prev_state = self.state
if res.success:
self.state = 'take_temperature'
else:
self.go_to_neutral()
self.state = 'take_temperature'
elif self.state == 'take_temperature':
self.update_screen()
self.blink()
self.prev_state = self.state
self.state = 'display_result'
pass
# elif self.state == 'check_temperature':
# # self.update_screen()
# # Waits for vision node to complete checking temperature
# # Vision node sets state to display_result
# pass
elif self.state == 'display_result':
# Update screen as either high or low temp
# Vision node sets state to display_result
"""
Update /digit_reading to latch publisher??
"""
print('came in display_result')
temp_level = rospy.wait_for_message('/temperature_reading', String)
temp = rospy.wait_for_message('/digit_reading', String)
print(temp_level.data, temp.data)
self.update_screen_temp(temp, temp_level)
# self.update_screen_temp()
# Wait for 10s or some user action, then return to search_for_face
rospy.sleep(2)
if temp_level.data == "GREEN":
self.go_to_pose_left(x= 0.22979708015918732, y= 1.0588802099227905, z= -0.08750268816947937, r=3.096444845199585,p=0.015665998682379723, yaw=-1.6834826469421387)
self.go_to_neutral()
elif temp_level.data == "RED":
self.go_to_neutral()
self.prev_state = self.state
print("State machine end")
self.state = 'search_for_face'
self.face_pose = None
else:
pass
def main():
""" The main() function. """
rospy.init_node('state_machine')
s = StateMachine()
rospy.spin()
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
pass