1
- import {
2
- ApiGatewayManagementApi ,
3
- PostToConnectionCommand ,
4
- } from '@aws-sdk/client-apigatewaymanagementapi' ;
5
- import {
6
- AttributeValue ,
7
- DeleteItemCommand ,
8
- DynamoDBClient ,
9
- ScanCommand ,
10
- } from '@aws-sdk/client-dynamodb' ;
11
-
12
1
import { unmarshall } from '@aws-sdk/util-dynamodb' ;
2
+ import iot from 'aws-iot-device-sdk' ;
13
3
import {
14
4
DynamoDBStreamEvent ,
15
5
S3Event ,
16
6
SNSEvent ,
17
7
EventBridgeEvent ,
18
8
SQSEvent ,
19
9
} from 'aws-lambda' ;
10
+ import { v4 } from 'uuid' ;
11
+ import {
12
+ fragment ,
13
+ getConnection ,
14
+ SSPY_TOPIC ,
15
+ } from '../listener/iot-connection' ;
20
16
import { envVariableNames } from '../src/common/envVariableNames' ;
21
17
import { DynamoDBSpyEvent } from './spyEvents/DynamoDBSpyEvent' ;
22
18
import { EventBridgeRuleSpyEvent } from './spyEvents/EventBridgeRuleSpyEvent' ;
@@ -28,27 +24,32 @@ import { SpyMessage } from './spyEvents/SpyMessage';
28
24
import { SqsSpyEvent } from './spyEvents/SqsSpyEvent' ;
29
25
30
26
export class SpyEventSender {
31
- ddb = new DynamoDBClient ( {
32
- region : process . env . AWS_REGION ,
33
- } ) ;
34
27
debugMode = process . env [ envVariableNames . SSPY_DEBUG ] === 'true' ;
35
- apigwManagementApi = new ApiGatewayManagementApi ( {
36
- apiVersion : '2018-11-29' ,
37
- endpoint : process . env [ envVariableNames . SSPY_WS_ENDPOINT ] ! ,
38
- } ) ;
39
- connections : Record < string , AttributeValue > [ ] | undefined ;
40
-
41
- constructor ( params ?: {
42
- log : ( message : string , ...optionalParams : any [ ] ) => void ;
43
- logError : ( message : string , ...optionalParams : any [ ] ) => void ;
28
+ connection : iot . device | undefined ;
29
+ scope : string ;
30
+
31
+ constructor ( params : {
32
+ log ?: ( message : string , ...optionalParams : any [ ] ) => void ;
33
+ logError ?: ( message : string , ...optionalParams : any [ ] ) => void ;
34
+ scope : string ;
44
35
} ) {
45
- if ( params ? .log ) {
36
+ if ( params . log ) {
46
37
this . log = params . log ;
47
38
}
48
39
49
- if ( params ? .logError ) {
40
+ if ( params . logError ) {
50
41
this . logError = params . logError ;
51
42
}
43
+
44
+ this . scope = params . scope ;
45
+ }
46
+
47
+ public async close ( ) {
48
+ this . connection ?. end ( ) ;
49
+ }
50
+
51
+ public async connect ( ) {
52
+ this . connection = await getConnection ( this . debugMode ) ;
52
53
}
53
54
54
55
public async publishSpyEvent ( event : any ) {
@@ -59,17 +60,6 @@ export class SpyEventSender {
59
60
) ;
60
61
this . log ( 'ARN to names mapping' , JSON . stringify ( mapping ) ) ;
61
62
62
- let connectionData ;
63
-
64
- const scanParams = new ScanCommand ( {
65
- TableName : process . env [ envVariableNames . SSPY_WS_TABLE_NAME ] as string ,
66
- ProjectionExpression : 'connectionId' ,
67
- } ) ;
68
-
69
- connectionData = await this . ddb . send ( scanParams ) ;
70
-
71
- this . connections = connectionData . Items ;
72
-
73
63
const postDataPromises : Promise < any > [ ] = [ ] ;
74
64
75
65
if ( event ?. Records && event . Records [ 0 ] ?. Sns ) {
@@ -227,45 +217,55 @@ export class SpyEventSender {
227
217
await Promise . all ( postDataPromises ) ;
228
218
}
229
219
230
- private async postData ( spyMessage : Omit < SpyMessage , 'timestamp' > ) {
231
- this . log ( 'Post spy message' , JSON . stringify ( spyMessage ) ) ;
220
+ private encode ( input : any ) : fragment [ ] {
221
+ const payload = JSON . stringify ( input ) ;
222
+ const parts = payload . match ( / .{ 1 , 50000 } / g) ;
223
+ if ( ! parts ) return [ ] ;
224
+ this . log ( `Encoded iot message, ${ parts . length } ` ) ;
225
+ const id = v4 ( ) ;
226
+ return parts . map ( ( part , index ) => ( {
227
+ id,
228
+ index,
229
+ count : parts . length ,
230
+ data : part ,
231
+ } ) ) ;
232
+ }
232
233
233
- if ( ! this . connections ) {
234
- return ;
234
+ private async postData ( spyMessage : Omit < SpyMessage , 'timestamp' > ) {
235
+ if ( this . connection === undefined ) {
236
+ throw new Error (
237
+ 'No IoT connection created yet, did you forget to call connect()?'
238
+ ) ;
235
239
}
236
240
237
- const postCalls = this . connections . map ( async ( { connectionId } ) => {
238
- this . log ( `Sending message to client: ${ connectionId . S } ` ) ;
239
-
240
- try {
241
- const postToConnectionCommand = new PostToConnectionCommand ( {
242
- ConnectionId : connectionId . S ,
243
- Data : JSON . stringify ( {
244
- timestamp : new Date ( ) . toISOString ( ) ,
245
- serviceKey : spyMessage . serviceKey ,
246
- data : spyMessage . data ,
247
- } ) as any ,
248
- } ) ;
249
-
250
- await this . apigwManagementApi . send ( postToConnectionCommand ) ;
251
- } catch ( e ) {
252
- this . logError ( `Faild sending spy message to: ${ connectionId . S } ` , e ) ;
253
- if ( ( e as any ) . $metadata . httpStatusCode === 410 ) {
254
- this . log ( `Found stale connection, deleting ${ connectionId } ` ) ;
255
-
256
- const deleteParams = new DeleteItemCommand ( {
257
- TableName : process . env [ envVariableNames . SSPY_WS_TABLE_NAME ] ,
258
- Key : { connectionId } ,
259
- } ) ;
241
+ this . log ( 'Post spy message' , JSON . stringify ( spyMessage ) ) ;
260
242
261
- await this . ddb . send ( deleteParams ) ;
262
- } else {
263
- throw e ;
264
- }
243
+ const connection = this . connection ;
244
+ const topic = `${ SSPY_TOPIC } /${ this . scope } ` ;
245
+
246
+ try {
247
+ for ( const fragment of this . encode ( spyMessage ) ) {
248
+ await new Promise < void > ( ( resolve ) => {
249
+ connection . publish (
250
+ topic ,
251
+ JSON . stringify ( fragment ) ,
252
+ {
253
+ qos : 1 ,
254
+ } ,
255
+ ( ) => {
256
+ console . error ( 'Publishing finished' ) ;
257
+ resolve ( ) ;
258
+ }
259
+ ) ;
260
+ } ) ;
261
+ this . log (
262
+ `Published fragment ${ fragment . index } out of ${ fragment . count } to topic ${ topic } `
263
+ ) ;
265
264
}
266
- } ) ;
265
+ } catch ( e ) {
266
+ this . logError ( `Failed to send payload to iot: ${ e } ` ) ;
267
+ }
267
268
268
- await Promise . all ( postCalls ) ;
269
269
this . log ( 'Send spy message finish' ) ;
270
270
}
271
271
0 commit comments