-
Notifications
You must be signed in to change notification settings - Fork 0
/
antelope-server.js
700 lines (585 loc) · 21.6 KB
/
antelope-server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
/*
Copyright (c) 2023 Firmware Modules Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/* This rudementary server is designed to receive data from a Helium Console
* instance. The payload, coming from a suitably configured Measurement{Earth}
* Blockchain sensor module, is passed through the Antelope decoder to reconstruct
* the transaction in full. Once reconstructed, the transaction can be
* submitted to an Antelope blockchain API endpoint.
*/
require('log-timestamp')
const antelope = require('./decoder-antelope')
const elpp = require('./decoder')
const elpp_encoder = require('./encoder')
const log = console.log
const log_obj = function (obj) { console.dir(obj, { depth: null }) }
/* Use 'request' instead of 'http' to handle redirects on endpoints such as Helium downlink URLs */
const { http, https } = require('follow-redirects')
//const http = require('http')
/* Setup server based on passed-in arguments:
* arg1: network interface name to search for ipv4 address, or if not found, ip-address to listen on
* arg2: server's port
*/
const os = require('os')
const networkInterfaces = os.networkInterfaces()
let host = 'localhost'
let port = 2000
const args = process.argv.slice(2);
if (args && args.length >= 2) {
/* First arg is:
* (a) : the network interface name to look up for first ipv4 family address, or
* (b) : the host
*/
let iface = args[0]
if (iface in networkInterfaces) {
let addrs = networkInterfaces[iface]
for (i in addrs) {
if (addrs[i].family && addrs[i].family.toLowerCase() === 'ipv4') {
host = addrs[i].address
log('found ' + host + ' in ' + iface)
break
}
}
} else {
let ifaces = ''
for (i in networkInterfaces) {
ifaces += i + ', '
}
log('could not find ' + iface + ' in [' + ifaces + ']')
host = iface
}
/* port is second arg */
port = parseInt(args[1])
} else {
log('need arguments: <iface-name|ip-addr> <port>')
process.exit(1)
}
log('Starting ELPP Antelope server on ' + host + ':' + port)
/* A dispatcher list handles in-flight transactions
* per chain.
*/
let dispatch_queue = {
0: [], /* TELOS TESTNET */
1: [] /* TELOS MAINNET */
}
/*
* {
"payload_raw": "SGVsbG8sIHdvcmxkIQ==",
"port": 1,
"confirmed": false
}
Dispatch the tapos response downlink to the device
*/
function dispatch_tapos(tapos_req, tapos, time_ms, path, res) {
function tapos_provider() {
return [
/* pass back the chain and req ids */
tapos_req.chain_id,
tapos_req.req_id,
/* pass back the gateway time stamp of the request.
* the device will have stored the timestamp of the transmission
* along with the request id for lookup later to compute the actual time.
*/
time_ms / 1000 >> 0,
time_ms % 1000,
/* Actual blockchain TAPOS data. */
tapos.ref_block_num,
tapos.ref_block_prefix
]
}
const encoder_map = {
4: { encoder: elpp_encoder.antelope_message_tapos_resp_encoder, provider: tapos_provider }
}
var tapos_encoded = elpp_encoder.encoder([4], encoder_map)
log('== tapos encoded ==')
let tbuf = Buffer.from(tapos_encoded)
log(tbuf.toString('hex'))
let data = {
"payload_raw": tbuf.toString('base64'),
"port": ELPP_PORT_LORAWAN,
"confirmed": false
}
let json = JSON.stringify(data)
log(json)
// An object of options to indicate where to post to
var post_options = {
host: 'console.helium.com',
port: '80', /* http - this works */
path: path,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(json)
}
};
/* Set up and execute the request */
var post_req = http.request(post_options, function (hres) {
hres.setEncoding('utf8')
let data = ''
hres.on('data', function (chunk) {
data += chunk
})
hres.on('end', () => {
log(data)
if (res) {
log('downlink response ' + hres.statusCode)
res.statusCode = hres.statusCode
res.end(data)
}
})
})
/* Ideally if timeouts can be differentiated from other errors - this is when we would want to try again */
post_req.on('error', (err) => {
log('downlink POST error: ' + err.message)
if (res) {
res.statusCode = 500
res.end('downlinkn POST error: ' + err.message)
}
})
log('write downlink request: ')
log_obj(post_options)
// post the data
post_req.write(json)
post_req.end()
}
/* This can be a result of an API request from the remote platform, in which case
* 'res' will be available to possibly return the result of the blockchain API call.
* This can also be called as a retry in which case 'res' will be null.
*/
function dispatch_trx(json, host, res, chain_q, chain_q_index) {
// An object of options to indicate where to post to
var post_options = {
host: host,
port: '80',
path: '/v1/chain/send_transaction',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(json)
}
};
/* Set up and execute the request */
var post_req = http.request(post_options, function (hres) {
hres.setEncoding('utf8')
let data = ''
hres.on('data', function (chunk) {
data += chunk
})
hres.on('end', () => {
log(data)
if (res) {
log('response ' + hres.statusCode)
res.statusCode = hres.statusCode
res.end(data)
}
/* remove the trx whether it succeeded or not (assuming that sending it again will not change the situation) */
if (1 /* res.statusCode == 200 */) {
chain_q.splice(chain_q_index, 1)
log('trx removed from queue at ' + chain_q_index)
}
})
hres.on('error', (err) => {
log('POST xfer error: ' + err.message)
if (res) {
res.statusCode = 500
res.end('POST xfer error: ' + err.message)
}
if (1 /* res.statusCode == 200 */) {
chain_q.splice(chain_q_index, 1)
log('trx removed from queue at ' + chain_q_index)
}
})
})
/* Ideally if timeouts can be differentiated from other errors - this is when we would want to try again */
post_req.on('error', (err) => {
log('POST error: ' + err.message)
if (res) {
res.statusCode = 500
res.end('POST error: ' + err.message)
}
if (1 /* res.statusCode == 200 */) {
chain_q.splice(chain_q_index, 1)
log('trx removed from queue at ' + chain_q_index)
}
})
log('write request: ')
log_obj(post_options)
// post the data
post_req.write(json)
post_req.end()
}
function hostFromChain(c) {
switch (c) {
case '0':
return 'testnet.telos.net'
case '1':
return 'mainnet.telos.net'
default:
throw Error('unsupported chain: ' + c)
}
}
function manage_dispatch_queue(res) {
log('manage dispatch queue')
for (c in dispatch_queue) {
log('checking chain ' + c)
let chain_q = dispatch_queue[c]
/* for each chain, see what is not started and start them. */
for (t in chain_q) {
log('checking trx ' + t)
let trx = chain_q[t]
if (!trx.started) {
log('dispatching pending trx at index ' + t)
dispatch_trx(trx.json, hostFromChain(c), res, chain_q, t)
trx.started = true
}
}
}
}
function push_trx(trx, state, res) {
log('push trx')
if (trx) {
let epoch = Date.now() / 1000 >> 0
/* Move it to the dispatcher queue for the chain */
if (trx.chain in dispatch_queue) {
log('posting trx to chain ' + trx.chain + ' dispatch queue at ' + dispatch_queue[trx.chain].length)
dispatch_queue[trx.chain].push({
epoch: epoch,
started: false,
json: trx.json
})
} else {
res.writeHead(500)
res.end('decoder: unknown chain ' + trx.chain)
}
}
else {
/* Get the status */
log(' - no trx')
let status = antelope.get_status(state.trx_map)
log(status)
res.writeHead(200)
res.end('decoder: need more data\n' + status)
}
}
let device_states = {}
function manage_device_state(state, key) {
log('Managing device ['+ key +'] state:')
if (state.trx_map) {
for (i in state.trx_map) {
let trx = state.trx_map[i]
if (trx.last_epoch) {
let trx_epoch = trx.last_epoch
let now_epoch = Date.now() / 1000 >> 0
let age = now_epoch - trx_epoch
/* Period before purging is adjustable and depends on
* the frequency of device measurements and uplinks
* and the number of possible trx_ids.
*/
if ((age) > 300) {
delete state.trx_map[i]
log(' - Purging device state trx ' + i + ', age too old: ' + age)
} else {
log(' - Device state trx ' + i + ' age ok: ' + age)
}
}
}
}
}
function get_device_state(key) {
let state
if (key in device_states) {
log('existing state for ' + key)
state = device_states[key]
/* perform state maintenance like purging old/incomplete trx
* before it is handed to the decoder
* */
manage_device_state(state, key)
} else {
log('new state for ' + key)
state = device_states[key] = antelope.new_state()
}
/* Add a 'last used' epoch for possible garbage collection */
state.last_epoch = Date.now() / 1000 >> 0
return state
}
/* This arbitrarily assigned port is used for ELPP protocol on LORAWAN. */
const ELPP_PORT_LORAWAN = 8
function decodeHelium(data, res) {
log('Decode Helium')
/* Expect key: 'port' */
if (1 && data.port != ELPP_PORT_LORAWAN) {
res.writeHead(500)
let msg = 'Unsupported port: ' + data.port + '. Expect data on port ' + ELPP_PORT_LORAWAN + ' for ELPP.'
res.end(msg)
log(msg)
return
}
/* Expect key: 'payload' */
let payload = Buffer.from(data.payload, 'base64')
log('Payload: ' + payload.toString('hex'))
/* Expect key: "dev_eui": "ED2126B2424BF383" */
let key = data.dev_eui
log('DevEUI: ' + key)
let state = get_device_state(key)
let dresult = antelope.decoder(payload, state)
/* Decoder result may have:
* trx : {} transaction
* tapos_req : request for TAPOS for specified chain
* or nothing if waiting for more data
*/
if (dresult && dresult.trx) {
push_trx(dresult.trx, state, res)
} else if (dresult && dresult.tapos_req) {
/* get_tapos
* expect key : "downlink_url" :"https://console.helium.com/api/v1/down/..."
*/
if (data.downlink_url) {
/* Expect key "reported_at": 1681833081694 */
const gw_rx_time_ms = data.reported_at
const url = new URL(data.downlink_url)
/* tapos acquisition is a multi-step process
Ideally we have it ready to go and can be sent back, perhaps available for the 2nd downlink slot
*/
let tapos = tapos_get(dresult.tapos_req.chain_id)
if (tapos) {
dispatch_tapos(dresult.tapos_req, tapos, gw_rx_time_ms, url.pathname, res)
} else {
res.writeHead(500)
let msg = 'decoder error: no tapos'
res.end(msg)
}
} else {
res.writeHead(500)
let msg = 'decoder error: no downlink url'
res.end(msg)
}
} else if (dresult) {
/* Get the status */
log(' - no trx')
let status = antelope.get_status(state.trx_map)
log(status)
res.writeHead(200)
res.end('decoder: need more data\n' + status)
}
else {
res.writeHead(500)
let msg = 'decoder error: no result'
res.end(msg)
}
manage_dispatch_queue(res)
}
const requestListener = function (req, res) {
let data = ''
if (req.method === 'POST') {
/* Collect POST data */
req.on('data', (chunk) => {
data += chunk
})
/* When there is no more data, deal with it */
req.on('end', () => {
log(req.url)
log(req.method)
log(data)
try {
/* The decoder is responsible for sending the response in success or error. */
decodeHelium(JSON.parse(data), res)
} catch (e) {
/* Get a SyntaxError if JSON can't be parsed, for example */
res.writeHead(500)
let msg = 'Error: ' + ((e && ('message' in e)) ? e.message : 'unknown')
log(msg)
res.end(msg)
}
})
} else {
res.writeHead(500)
res.end('Unsuported method ' + req.method)
}
};
if (1) {
const server = http.createServer(requestListener);
server.listen(port, host, () => {
console.log(`Server is running on http://${host}:${port}`);
});
} else {
http.get('http://bit.ly/900913', response => {
response.on('data', chunk => {
console.log(chunk);
});
}).on('error', err => {
console.error(err);
});
}
/* Helium JSON example:
* {"app_eui":"3454323456432543",
* "dc":{"balance":111111,"nonce":7},
* "dev_eui":"2646524545245245",
* "devaddr":"01231231",
* "downlink_url":"https://console.helium.com/api/v1/down/ab164223542094820948230482304820427a/a35425452354332221412414114141Oq/13fa3141412413434134134134134134159a","fcnt":209,"hotspots":[{"channel":0,"frequency":903.9,"hold_time":0,"id":"112c4ZLR1G721414141241231231231231235434342342323423","lat":55.462452452406855,"long":-115.24524545234556,"name":"fgsgggsdfggsdgsdfgg","reported_at":1681833081694,"rssi":-42.0,"snr":14.0,"spreading":"SF8BW125","status":"success"}],"id":"24543524524545243543534534534534523a","metadata":{"adr_allowed":false,"cf_list_enabled":false,"multi_buy":1,"organization_id":"45345454534524543545345345345345435f","preferred_hotspots":["452452452454545345345234534534543534523452345345245q"],"rx_delay":1,"rx_delay_actual":1,"rx_delay_state":"rx_delay_established"},"name":"254352543523523452455","payload":"BAA=","payload_size":2,"port":8,"raw_packet":"24545245454543523455","replay":false,"reported_at":1681833081694,"type":"uplink","uuid":"4253454354545345345435345345345345b2"}
*/
/*---- TAPOS manager -----*/
/* errors beyond which it is removed from service for CHECK_MAX
* number of checks.
*/
const TAPOS_MANAGER_API_ERRORS_MAX = 5
const TAPOS_MANAGER_API_CHECK_MAX = 10
/* Run a manager for each chain */
const KEY_TELOS_TESTNET = 'TELOS_TESTNET'
const KEY_TELOS_MAINNET = 'TELOS_MAINNET'
const CHAIN_KEY_FROM_ID = {
0: KEY_TELOS_TESTNET,
1: KEY_TELOS_MAINNET
}
var tapos_manager_state = {
[KEY_TELOS_TESTNET]: {
name : KEY_TELOS_TESTNET,
tapos: {
acq_time_epoch : 0, /* time of acquisition (not the expiry) */
ref_block_num: 0,
ref_block_prefix : 0
},
api_pool : [
{ host: 'http://testnet.telos.net', errors: 0, check_count: 0, use_count : 0 }
]
},
[KEY_TELOS_MAINNET]: {
name: KEY_TELOS_MAINNET,
tapos: {
acq_time_epoch: 0, /* time of acquisition (not the expiry) */
ref_block_num: 0,
ref_block_prefix: 0
},
api_pool: [
{ host: 'http://mainnet.telos.net', errors: 0, check_count: 0, use_count: 0 }
]
}
}
function tapos_get(chain_id) {
//log('lookup chain_id ' + chain_id)
if (chain_id in CHAIN_KEY_FROM_ID) {
let key = CHAIN_KEY_FROM_ID[chain_id]
//log('lookup chain_key ' + key)
if (key in tapos_manager_state) {
let state = tapos_manager_state[key]
//log('tapos for ' + state.name)
return state.tapos
}
}
return null
}
/* Check for tapos within a random window interval */
function tapos_manager_next_run(name, error) {
let max = 10 * 60 * 1000
let min = 5 * 60 * 1000
if (error) {
/* check again quickly for error */
max = 30 * 1000
min = 10 * 1000
}
let period = Math.floor(Math.random() * (max - min) + min)
log('TAPOS manager for ' + name + ' next run in ' + ((period / 1000) >> 0))
return period
}
/* Kick off */
for (key in tapos_manager_state) {
let state = tapos_manager_state[key]
tapos_manager_run(state)
}
function tapos_manager_select_pool(state) {
/* Do some housekeeping first:
* slowly decrease errors to allow trying again */
for (i in state.api_pool) {
let api = state.api_pool[i]
if (api.errors >= TAPOS_MANAGER_API_ERRORS_MAX) {
if (++api.check_count == TAPOS_MANAGER_API_CHECK_MAX) {
api.errors--
api.check_count = 0
}
}
}
let attempts = 10
let api = null
do {
let index = Math.floor(Math.random() * state.api_pool.length)
api = state.api_pool[index]
} while (--attempts && api.errors >= TAPOS_MANAGER_API_ERRORS_MAX)
if (api) {
api.use_count++;
log('TAPOS Manager: selected api ' + api.host + ' uses ' + api.use_count + ' errors ' + api.errors + ' check_count ' + api.check_count)
}
return api
}
function tapos_manager_error(msg, state, api, timeout) {
log('TAPOS Manager Error for ' + state.name + ' : ' + msg)
if (api) {
api.errors++
}
clearTimeout(timeout)
/* want to try again quickly */
setTimeout(tapos_manager_run, tapos_manager_next_run(state.name, true), state)
}
function tapos_manager_finish(msg, state, api, timeout) {
if (api) {
/* decrement the error counter on a succesful poll */
if (api.errors > 0) {
api.errors--
}
}
}
function tapos_manager_run(state) {
let timeout = setTimeout(tapos_manager_run, tapos_manager_next_run(state.name), state)
let api = tapos_manager_select_pool(state)
if (api) {
let url = api.host + '/v1/chain/get_info'
http.get(url, response => {
let info = ''
response.on('data', chunk => {
info += chunk
})
response.on('end', () => {
try {
let json = JSON.parse(info)
/* Interestingly, the ref_block_prefix is, converted to hex, embedded in the same
* block ID at the 8th byte position in reversed byte order. Which means we can get the
* prefix right from the block id in the get_info request without making an additional request to get_block.
*/
let ref_block_num = json.last_irreversible_block_num & 0xFFFF;
let last_irr_block_id = Buffer.from(json.last_irreversible_block_id, 'hex');
let ref_block_prefix = last_irr_block_id.readUInt32LE(8);
state.tapos.acq_time_epoch = Date().now / 1000 >> 0
state.tapos.ref_block_num = ref_block_num
state.tapos.ref_block_prefix = ref_block_prefix
log('Aquired TAPOS at ' + Date(state.tapos.acq_time_epoch).toString() + ' for ' + state.name + 'ref_block_num: ' + state.tapos.ref_block_num.toString(16) + ' prefix: ' + state.tapos.ref_block_prefix.toString(16))
tapos_manager_finish('success', state, api, timeout)
} catch (e) {
let msg = ((e && ('message' in e)) ? e.message : 'unknown')
tapos_manager_error(msg, state, api, timeout)
}
})
response.on('error', err => {
let msg = err
tapos_manager_error(msg, state, api, timeout)
})
}).on('error', err => {
let msg = err
tapos_manager_error(msg, state, api, timeout)
})
} else {
tapos_manager_error(msg, state, api, timeout)
}
}