-
Notifications
You must be signed in to change notification settings - Fork 0
/
V3_fake_module.py
248 lines (209 loc) · 7.92 KB
/
V3_fake_module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import argparse
import json
import time
import atexit
import paho.mqtt.client as mqtt
from mhp import topics
from das.utils import MockSensor
parser = argparse.ArgumentParser(
description="MQTT wireless module test script that sends fake data",
add_help=True,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"-t",
"--time",
action="store",
type=int,
default=float("Inf"),
help="""Length of time to send data in seconds (duration)""",
)
parser.add_argument(
"-r",
"--rate",
action="store",
type=float,
default=1,
help="""Rate of data sent per second""",
)
parser.add_argument(
"--host",
action="store",
type=str,
default="localhost",
help="""Address of the MQTT broker""",
)
parser.add_argument(
"-i",
"--id",
action="store",
nargs="+",
type=int,
default=[1, 2, 3, 4, 5],
help="""Specify the modules to produce fake data. eg. --id 1 2 25 specifies
that module 1, 2 and 25 will produce data.""",
)
# Generate a dict of the fake sensors with average values
sensors = {
"steeringAngle": MockSensor(10),
"co2": MockSensor(325),
"temperature": MockSensor(25),
"humidity": MockSensor(85),
"reedVelocity": MockSensor(50),
"reedDistance": MockSensor(1000, increment=True, percent_range=0.05),
"battery": MockSensor(80),
"accelerometer": MockSensor(("x", 90), ("y", 90), ("z", 90)),
"gyroscope": MockSensor(("x", 90), ("y", 90), ("z", 90)),
"gps": MockSensor(
("speed", 50),
("satellites", 10),
("pdop", 10),
("latitude", -37),
("longitude", 145),
("altitude", 50),
("course", 0),
("datetime", "2017-11-28 23:55:59.342380"),
),
"power": MockSensor(200, percent_range=0.8),
"cadence": MockSensor(90, percent_range=0.2),
"heartRate": MockSensor(120),
"windDirection":
MockSensor(
("minDirection", 180),
("avgDirection", 180),
("maxDirection", 180),percent_range=1),
"windSpeed":
MockSensor(
("minSpeed", 10),
("avgSpeed", 10),
("maxSpeed", 10),
)
}
# HARDCODED MODULE ONBOARD SENSORS (dict above contains the MockSensor objects)
M1_sensors = ["temperature", "humidity", "steeringAngle"]
M2_sensors = ["co2", "temperature", "humidity", "accelerometer", "gyroscope"]
M3_sensors = ["co2", "reedVelocity", "reedDistance", "gps"]
M4_sensors = ["power", "cadence", "heartRate"]
M5_sensors = ["windDirection","windSpeed"]
Mn_sensors = list(sensors.keys()) # For other fake module all sensors are used
def generate_module_data(module_id_num, sensor_list):
"""
Function to generate the module in the correct dict format before turning
it into JSON
module_id_num: Unique module number (int)
sensor_list: list of sensors as strings such as ["co2", "reedVelocity"]
"""
# Full dict containing all of the sensor data and their type
module_data = {"module-id": module_id_num, "sensors": []}
for sensor_name in sensor_list:
sensor_data = {"type": sensor_name, "value": sensors[sensor_name].get_value()}
module_data["sensors"].append(sensor_data)
return module_data
def send_fake_data(client, duration, rate, module_id_nums):
"""Send artificial data over MQTT for each module chanel. Sends [rate] per
second for [duration] seconds
client: MQTT client
duration: How long in seconds the script should output data before
terminating
rate: Frequency of sending out data in Hz
module_id_nums: List of ints containing module ids that are enabled for the
mock test
"""
start_time = round(time.time(), 2)
total_time = 0
battery_duration = 5 * 60 # Battery info to be published every 5min
battery_counter = 0 # Battery counter to determine when to publish
while total_time < duration:
current_time = round(time.time(), 2)
total_time = round(current_time - start_time, 2)
publish_battery = (battery_counter * battery_duration) <= total_time
# TODO: create function that outputs the wireless data output so that
# it can be compaired with the the data read by the wireless logging
# script
def publish_data_and_battery(module_id_num):
battery_data = {
"module-id": module_id_num,
"voltage": sensors["battery"].get_value(),
}
module_topic = topics.WirelessModule.id(module_id_num).data
battery_topic = topics.WirelessModule.id(module_id_num).battery
# Publish data and battery if needed
publish(client, module_topic, module_data)
if publish_battery:
publish(client, battery_topic, battery_data)
if publish_battery:
battery_counter += 1
print("TIME:", current_time)
for module_id_num in module_id_nums:
# Wireless module 1 (Front)
if module_id_num == 1:
module_data = generate_module_data(module_id_num, M1_sensors)
publish_data_and_battery(module_id_num)
# Wireless module 2 (Middle)
elif module_id_num == 2:
module_data = generate_module_data(module_id_num, M2_sensors)
publish_data_and_battery(module_id_num)
# Wireless module 3 (Back)
elif module_id_num == 3:
module_data = generate_module_data(module_id_num, M3_sensors)
publish_data_and_battery(module_id_num)
# Wireless module 4 (ANT+ sensor/DAS.js)
elif module_id_num == 4:
module_data = generate_module_data(module_id_num, M4_sensors)
publish_data_and_battery(module_id_num)
# Wireless module 5 (Anemometer)
elif module_id_num == 5:
module_data = generate_module_data(module_id_num, M5_sensors)
publish_data_and_battery(module_id_num)
# Wireless module n (Other random sensor)
else:
module_data = generate_module_data(module_id_num, Mn_sensors)
publish_data_and_battery(module_id_num)
print() # Newline for clarity
time.sleep(1 / rate)
def publish(client, topic, data={}):
"""
Publishes python dict data to a specific topic in JSON and prints it out
client: MQTT client object
topic: MQTT topic eg. '/v3/wireless_module/<id>/start'
data: Python dict containing the data to be published on the topic
"""
# Generate JSON from the python dict
json_data = json.dumps(data)
# Publish the data over MQTT
client.publish(str(topic), json_data)
print(topic, "--> ", json_data)
def start_modules(args):
"""Sends a start message on the V3 start channel"""
payload = {"start": True}
publish(client, topics.V3.start, payload)
def stop_modules(args):
"""Sends a stop message on the V3 start channels"""
payload = {"start": False}
publish(client, topics.V3.start, payload)
time.sleep(1)
print("Stopped all modules.")
def start_publishing(client, args):
print("\npublishing started...")
start_modules(args)
send_fake_data(client, args.time, args.rate, args.id)
stop_modules(args)
print("\npublishing finished")
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected Sucessfully! Result code: " + str(rc))
else:
print("Something went wrong! Result code: " + str(rc))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload.decode("utf-8")))
if __name__ == "__main__":
args = parser.parse_args()
broker_address = args.host
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
atexit.register(stop_modules, args)
client.connect(broker_address)
client.loop_start()
start_publishing(client, args)
client.loop_stop()