-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflexiforce.py
executable file
·123 lines (102 loc) · 3.16 KB
/
flexiforce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python3
import sys
import subprocess
import pathlib
import numpy as np
import threading
import rospy
from sensor_msgs.msg import Joy
# Serial devices to try
DEVICES = ['/dev/ttyACM0']
BAUD_RATE = 9600
# ROS rate in Hertz
ROS_rate = 100
# Value range of joints
joint_range = {
0: (-0.5, 0.5), # shoulder_yaw
1: (-0.1, 0.5), # shoulder_pitch
3: (-0.5, 0.5), # upper_arm_roll
4: (-0.3, 0.5), # elbow_pitch
}
joint_min = { k: v[0] for k, v in joint_range.items() }
joint_max = { k: v[1] for k, v in joint_range.items() }
joints = np.zeros((max(joint_range.keys()),))
# Increment maximum this much (radians) per ROS poll
joint_increment = 0.002
class Flexiforce(threading.Thread):
def __init__(self):
self.find_device(DEVICES)
self.configure_device(BAUD_RATE)
self.maximum = 1024
self.value_ = 0
super().__init__(daemon=True)
def run(self):
self.f = open(self.device, 'rt', encoding='ascii')
while True:
try:
value_str = self.f.readline().rstrip('\n')
value = int(value_str)
self.value_ = value
except (ValueError, UnicodeDecodeError):
continue
self.f.close()
@property
def value(self):
return self.value_/self.maximum
def find_device(self, candidates):
# Find device
self.device = None
for device in candidates:
path = pathlib.Path(device)
if path.exists() and path.is_char_device():
self.device = device
break
if self.device is None:
fatal("Cannot find device, tried:", *devices, sep='\n')
else:
status("Found device on", device)
def configure_device(self, baud):
# Configure serial
def run_stty(*args):
result = subprocess.run(['stty', '-F', self.device] + list(args), check=True)
status("Configuring", self.device, "with", baud, "baud")
try:
run_stty('raw')
run_stty('-echo', '-echoe', '-echok')
run_stty(str(baud))
except subprocess.CalledProcessError:
fatal("Error configuring", self.device)
def status(*args):
print(*args, file=sys.stderr)
def fatal(*args):
print(*args, file=sys.stderr)
sys.exit(1)
def increment_joint(joint, amount):
"""
Increment the joint in positive direction, stopping at maximum
"""
global joints
joints[joint] = min(joints[joint] + amount, joint_max[joint])
def decrement_joint(joint, amount):
"""
Decrement the joint in positive direction, stopping at minimum
"""
global joints
joints[joint] = max(joints[joint] - amount, joint_min[joint])
def main():
print(sys.version_info)
flexi = Flexiforce()
# Set up ROS node
rospy.init_node('flexiforce')
rate = rospy.Rate(ROS_rate)
pub = rospy.Publisher('/joy', Joy, queue_size=1)
joy = Joy()
flexi.start()
while not rospy.is_shutdown():
increment_joint(1, flexi.value*joint_increment)
joy.axes = joints.tolist()
pub.publish(joy)
rate.sleep()
if __name__ == '__main__':
main()
#EOF