-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathedge.py
146 lines (117 loc) · 4.68 KB
/
edge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import time
import random
import sys
import zmq
import json
import datetime
import threading
import os
DEVICES_PORT = 5555
CLOUD_PORT = 5556
CLOUD_HEARTBEAT_PORT = 5557
DATA_FROM_CLOUD_PORT = 5558
CLOUD_IP = "localhost"
class Edge:
def __init__(self):
self.device_data = {}
self.send_to_cloud_interval = 5 # buffer time in sec
self.data_to_cloud = []
self.sending_to_cloud = False
self.cached_data_file = "cached_data.txt"
def save_data_to_file(self, data):
with open(self.cached_data_file, 'w') as file:
file.write(json.dumps(data))
def append_cached_data(self, data):
cached_data = None
with open(self.cached_data_file, 'r') as file:
cached_data = json.load(file)
for key, value in cached_data.items():
if key in data:
data[key] += value
open(self.cached_data_file, 'w').close()
return data
def cached_data_size(self):
return os.stat(self.cached_data_file).st_size
def get_device_data(self):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind("tcp://*:%s" % DEVICES_PORT)
socket.subscribe("") # subscribe to all topics
start = datetime.datetime.now()
while True:
if (datetime.datetime.now() - start).total_seconds() > self.send_to_cloud_interval and not self.sending_to_cloud:
res = self.check_heartbeat()
start = datetime.datetime.now()
if res:
# TODO check for cached data and append to self.device_data
if self.cached_data_size() > 0:
self.device_data = self.append_cached_data(self.device_data)
print("Appended Cached Data")
self.data_to_cloud.append(self.device_data)
start = datetime.datetime.now()
self.sending_to_cloud = True
self.device_data = {}
else:
print("save data locally")
self.save_data_to_file(self.device_data)
# TODO save data to local files -> max 10 MB
topic, data = socket.recv_multipart()
topic = topic.decode()
data = json.loads(data)
if topic not in self.device_data:
self.device_data[topic] = []
self.device_data[topic].append(data)
# print("TOPIC: {}; DATA: {}".format(topic, data))
time.sleep(0.001)
def check_heartbeat(self):
time_out = 1000 # ms
context2 = zmq.Context()
socket2 = context2.socket(zmq.REQ)
socket2.connect("tcp://" + CLOUD_IP + ":%s" % CLOUD_HEARTBEAT_PORT)
socket2.setsockopt(zmq.LINGER, 0)
socket2.setsockopt(zmq.RCVTIMEO, time_out)
res = False
try:
heartbeat_msg = "hi there".encode()
socket2.send(heartbeat_msg)
socket2.recv()
res = True
except zmq.error.Again:
print('cloud not available')
socket2.close()
return res
def send_data_to_cloud(self):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("tcp://" + CLOUD_IP + ":%s" % CLOUD_PORT)
while True:
while self.sending_to_cloud:
print("SENDING TO CLOUD")
for batch in self.data_to_cloud:
for key, values in batch.items():
for value in values:
multipart_msg = [key.encode(), json.dumps(value).encode()]
socket.send_multipart(multipart_msg)
self.data_to_cloud = []
self.sending_to_cloud = False
time.sleep(0.001)
def get_data_from_cloud():
while True:
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://" + CLOUD_IP + ":%s" % DATA_FROM_CLOUD_PORT)
socket.setsockopt(zmq.LINGER, 0)
socket.send("laser_cutter".encode())
data = socket.recv()
print("Got avg data from cloud: " + data)
time.sleep(10)
def run(self):
th_from_devices = threading.Thread(target=self.get_device_data)
th_to_cloud = threading.Thread(target=self.send_data_to_cloud)
th_from_cloud = threading.Thread(target=self.get_data_from_cloud)
th_from_devices.start()
th_to_cloud.start()
th_from_cloud.start()
edge = Edge()
edge.run()
# edge.get_device_data()