-
Notifications
You must be signed in to change notification settings - Fork 15
/
migration-script.py
340 lines (264 loc) · 12.3 KB
/
migration-script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
'''
@Author Alessandro Bortoletto - LINKS Foundation
@Version 0.0.6
@Date 01-07-2019
@Brief This is a collection of useful functions to perform operations with ThingsBoard DB via RESTful API or MQTT
'''
# *** Import section ***
import requests
from datetime import datetime
import time
import sys
import paho.mqtt.client as mqtt
import json
import argparse
import yaml
import logging
from tqdm import tqdm
'''
@Name getData(ip,port,user,password,deviceId,key,startTs,endTs)
@Return Returns a timeseries with the response to the HTTP RESTful request
@Parameters
ip: ip address of target ThingsBoard
port: port used by the target ThingsBoard
user: username to obtain API authorization
password: password to obtain API authorization
deviceId: id of the device to read
key: key of the timeseries to read
startTs: first unix timestamp to fetch
endTs: last unix timestamp to fetch
@Notes The function reads all the database entries associated with the device and key, with a LIMIT of 200000 and no aggregation
'''
def getData(ip,port,user,password,deviceId,key,startTs,endTs):
# Define some constants
INTERVAL = '0'
LIMIT = '200000'
AGG = 'NONE'
# Define the headers of the authorization request
headersToken = {
'Content-Type': 'application/json',
'Accept': 'application/json',
}
# Define the body of the authorization request
data = '{"username":"'+user+'", "password":"'+password+'"}'
# Perform the POST request to obtain X-Token Authorization
try:
response = requests.post('https://'+ip+':'+str(port)+'/api/auth/login', headers=headersToken, data=data)
X_AUTH_TOKEN = response.json()['token']
#print("Codice autorizzazione: ",X_AUTH_TOKEN)
except Exception as e:
print("\nAn exception occurred while trying to obtain the authorization from SOURCE Thigsboard: ", e)
sys.exit(1)
# Define the headers of the request
headers = {'Accept':'application/json','X-Authorization': 'Bearer '+X_AUTH_TOKEN}
# Perform the GET request to obtain timeseries
try:
r = requests.get("https://"+ip+":"+port+"/api/plugins/telemetry/DEVICE/"+deviceId+"/values/timeseries?interval="+INTERVAL+"&limit="+LIMIT+"&agg="+AGG+"&keys="+key+"&startTs="+startTs+"&endTs="+endTs,headers=headers)
print("Request to SOURCE ThingsBoard - response code: ", r.status_code)
except Exception as e:
print("\nAn exception occurred while trying to obtain and print the timeseries from SOURCE Thigsboard: ", e)
logging.error('Timeseries request to device '+deviceId+' with key '+key+' failed.')
sys.exit(1)
# Define the timeseries to upload
TIMESERIES = r.json()
#print("Fetched timseries: ", TIMESERIES)
logging.info('Timeseries request to device '+deviceId+' with key '+key+' was successful.')
# Return the result of the GET request
return TIMESERIES
'''
@Name sendData(ip,port,deviceToken,key,timeseries)
@Return None
@Parameters
ip: ip address of target ThingsBoard
port: port used by the target ThingsBoard
deviceToken: token of the device to upload
key: key of the timeseries to upload
timeseries: actual array containing the timeseries to upload: the last element shall be the one with the OLDEST unix timestamp
@Notes This function uploads a timeseries (passed as argument) via MQTT
'''
def sendData(ip,port,deviceToken,key,timeseries):
# Create MQTT client
client = mqtt.Client()
# Set access token
client.username_pw_set(deviceToken)
# Connect to ThingsBoard using default MQTT port and 60 seconds keepalive interval
client.connect(ip, int(port), 60)
client.loop_start()
# Declare data format
data = {"ts":0, "values":{key:0}}
# Upload the timeseries with the proper timestamps and values
try:
# Send all the TIMESERIES values via MQTT, START FROM THE LAST ELEMENT SINCE IT IS THE OLDEST ONE
for i in tqdm(range(len(timeseries[key])-1, -1, -1), desc='Uploading data to TARGET ThingsBoard'):
value = timeseries[key][i]['value']
ts = timeseries[key][i]['ts']
data['ts'] = ts
data['values'][key] = value
# Send data to ThingsBoard via MQTT
client.publish('v1/devices/me/telemetry', json.dumps(data), 1)
#print("Upload timestamp: ", datetime.fromtimestamp(ts/1000), " | Raw data: ", data)
# THE DELAY IS NECESSARY TO AVOID THE ThingsBoard "WEB_SOCKET: TOO MANY REQUESTS" ERROR
time.sleep(0.01)
except KeyboardInterrupt:
print("The user manually interrputed the MQTT upload using keyboard.")
logging.error('Timeseries upload to device '+deviceToken+' with key '+key+' failed.')
pass
else:
print("Data successfully published via MQTT.")
logging.info('Timeseries upload to device '+deviceToken+' with key '+key+' was successful.')
# Close the MQTT connections
client.loop_stop()
client.disconnect()
'''
@Name saveToFile(key,timeseries,file)
@Return None
@Parameters
key: key of the timeseries to save on file
timeseries: actual array containing the timeseries to save
file: name of the file to use for saving
@Notes This function saves into a file a timeseries
'''
def saveToFile(key,timeseries,file):
try:
# Open the file in append mode
file = open(file, 'a+')
# Temporary variable to save timeseries entries
data = {"ts":0, "value":0}
# Iterate the timeseries and append to file, FROM THE LAST ELEMENT SINCE IT IS THE OLDEST ONE
for i in tqdm(range(len(timeseries[key])-1, -1, -1), desc='Saving data to local file'):
value = timeseries[key][i]['value']
ts = timeseries[key][i]['ts']
data['ts'] = ts
data['value'] = value
file.write(json.dumps(data) + "\n")
# Close the file when finished
file.close()
except Exception as exception:
print("An error occoured while saving the data into local text file: ", exception)
logging.error('An error occoured while saving the data into local text file: ' + file)
sys.exit()
'''
@Name readFromFile(key,file)
@Return timeseries
@Parameters
key: key of the timeseries to read from file
file: name of the file to use for reading
@Notes This function reads a timeseries from a file
'''
def readFromFile(key,file):
timeseries = []
try:
# Open the file in append mode
file = open(file, 'r')
for line in file:
timeseries.append(json.loads(line))
# Close the file when finished
file.close()
# Format and return the correct timeseries
response = {key: timeseries}
return response
except Exception as exception:
print("An error occoured while reading the data from local text file: ", exception)
logging.error('An error occoured while reading the data from local text file.')
sys.exit()
# ******* THE SCRIPT'S OPERATIONS BEGIN HERE *******
'''
@Notes This is the argument parser of the script
'''
# This sections parses the input arguments and saves them into constant variables
parser = argparse.ArgumentParser(description="This script performs a data migration between two different instances of ThingsBoard server.")
parser.add_argument("-c", action="store", dest="configuration", type=str, default="./migrationConf.yml",
help="specify path of the YAML configuration file (default './migrationConf.yml')")
parser.add_argument("-m", action="store", dest="mode", type=str, default="both",
help="specify operating mode of the script. 'fetch' --> save data in local text file | 'send' --> send data from local text file | 'both' --> fetch and send data (default 'both')")
parser.add_argument("-i", action="store", dest="initialTs", type=str, default="",
help="specify initial UNIX timestamp", required=True)
parser.add_argument("-f", action="store", dest="finalTs", type=str, default="",
help="specify final UNIX timestamp", required=True)
parser.add_argument("-s", action="store", dest="sourceDeviceId", type=str, default="None",
help="specify the ID of the source device", required=True)
parser.add_argument("-t", action="store", dest="targetDeviceToken", type=str, default="None",
help="specify the token of the target device", required=True)
parser.add_argument("-k", action="store", dest="timeseriesKey", type=str, default="None",
help="specify the key (name) of the timeseries to migrate", required=True)
args = parser.parse_args()
CONFIG_FILE = args.configuration
MODE = args.mode
STARTTS = args.initialTs
ENDTS = args.finalTs
SOURCE_TB_DEVICE_ID = args.sourceDeviceId
TARGET_TB_DEVICE_TOKEN = args.targetDeviceToken
TIMESERIES_KEY = args.timeseriesKey
'''
@Notes This is the config file reader of the script
'''
try:
with open(CONFIG_FILE, 'r') as ymlfile:
cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
# Save here all the configuration variables
SOURCE_TB_ADDRESS = cfg['source']['host']
SOURCE_TB_PORT = cfg['source']['port']
SOURCE_TB_USER = cfg['source']['user']
SOURCE_TB_PASSWORD = cfg['source']['password']
TARGET_TB_ADDRESS = cfg['target']['host']
TARGET_TB_PORT = cfg['target']['port']
LOG_FILE = cfg['log']['file']
DB_FILE = cfg['db']['file']
ymlfile.close()
except Exception as exception:
print("An error occoured while reading the configuration file:", exception)
print("Is it configured correctly?")
sys.exit()
# The port of source/target can be empty
if SOURCE_TB_PORT == None:
SOURCE_TB_PORT = ""
if TARGET_TB_PORT == None:
TARGET_TB_PORT = ""
# Modify the local database text file name --> This makes it unique for any source device and key
DB_FILE += "-" + SOURCE_TB_DEVICE_ID + "-" + TIMESERIES_KEY + ".db"
'''
@Notes Here i open the log file. Levels of logging: DEBUG INFO WARNING ERROR CRITICAL
'''
# Open and configure logger
logging.basicConfig(filename=LOG_FILE, filemode='a+', format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%d-%b-%y %H:%M:%S')
# Set minimum log level to "info"
logging.getLogger().setLevel(logging.INFO)
# Log something
logging.info('The script has been correctly initialized.')
'''
@Notes This is the core of the algorithm, which uses a different operating mode depending on what is passed to the script as an argument
'''
logging.info('Migrating from device '+SOURCE_TB_DEVICE_ID+' to device '+TARGET_TB_DEVICE_TOKEN)
print('Migrating from device '+SOURCE_TB_DEVICE_ID+' to device '+TARGET_TB_DEVICE_TOKEN)
# Both mode
if MODE == "both":
logging.info('Operating mode: both')
logging.info('Started to fetch data')
data = getData(SOURCE_TB_ADDRESS,SOURCE_TB_PORT,SOURCE_TB_USER,SOURCE_TB_PASSWORD,SOURCE_TB_DEVICE_ID,TIMESERIES_KEY,STARTTS,ENDTS)
logging.info('Finished to fetch data')
logging.info('Started to send data')
sendData(TARGET_TB_ADDRESS,TARGET_TB_PORT,TARGET_TB_DEVICE_TOKEN,TIMESERIES_KEY,data)
logging.info('Finished to send data')
# Fetch mode
if MODE == "fetch":
logging.info('Operating mode: fetch')
logging.info('Started to fetch data')
data = getData(SOURCE_TB_ADDRESS,SOURCE_TB_PORT,SOURCE_TB_USER,SOURCE_TB_PASSWORD,SOURCE_TB_DEVICE_ID,TIMESERIES_KEY,STARTTS,ENDTS)
logging.info('Finished to fetch data')
logging.info('Started to save data in file ' + DB_FILE)
saveToFile(TIMESERIES_KEY,data,DB_FILE)
logging.info('Finished to save data')
# Send mode
if MODE == "send":
logging.info('Operating mode: send')
logging.info('Started reading data from file ' + DB_FILE)
data = readFromFile(TIMESERIES_KEY,DB_FILE)
logging.info('Finished reading data from file')
logging.info('Started to send data')
sendData(TARGET_TB_ADDRESS,TARGET_TB_PORT,TARGET_TB_DEVICE_TOKEN,TIMESERIES_KEY,data)
logging.info('Finished to send data')
'''
@Notes Close and exit
'''
logging.info('Execution finished.')
print('Execution finished.')