From 60e257c40c162388552c794ebbdde03a0ec75e4e Mon Sep 17 00:00:00 2001 From: Markus Tacker Date: Mon, 30 Oct 2023 16:22:36 +0100 Subject: [PATCH] feat(nrplus): implement NR+ control messages --- cdk/backend.ts | 2 +- cdk/resources/WebsocketAPI.ts | 66 ++++++++++++++++---- lambda/onMessage.ts | 111 +++++++++++++++++++++++++++------- 3 files changed, 146 insertions(+), 33 deletions(-) diff --git a/cdk/backend.ts b/cdk/backend.ts index b11de4c..947d45e 100644 --- a/cdk/backend.ts +++ b/cdk/backend.ts @@ -38,7 +38,7 @@ new BackendApp({ lambdaSources: { publishToWebsocketClients: await pack('publishToWebsocketClients'), onConnect: await pack('onConnect'), - onMessage: await pack('onMessage', 'lambda/onMessage.handler'), + onMessage: await pack('onMessage'), onDisconnect: await pack('onDisconnect'), onCellGeoLocationResolved: await pack('onCellGeoLocationResolved'), resolveCellLocation: await pack('resolveCellLocation'), diff --git a/cdk/resources/WebsocketAPI.ts b/cdk/resources/WebsocketAPI.ts index 9aee710..54ad739 100644 --- a/cdk/resources/WebsocketAPI.ts +++ b/cdk/resources/WebsocketAPI.ts @@ -18,7 +18,7 @@ export class WebsocketAPI extends Construct { public readonly websocketAPIArn: string public readonly websocketManagementAPIURL: string public constructor( - parent: Stack, + parent: Construct, { lambdaSources, baseLayer, @@ -63,9 +63,15 @@ export class WebsocketAPI extends Construct { apiId: api.ref, }) - this.websocketURI = `wss://${api.ref}.execute-api.${parent.region}.amazonaws.com/${stage.ref}` - this.websocketAPIArn = `arn:aws:execute-api:${parent.region}:${parent.account}:${api.ref}/${stage.stageName}/POST/@connections/*` - this.websocketManagementAPIURL = `https://${api.ref}.execute-api.${parent.region}.amazonaws.com/${stage.stageName}` + this.websocketURI = `wss://${api.ref}.execute-api.${ + Stack.of(parent).region + }.amazonaws.com/${stage.ref}` + this.websocketAPIArn = `arn:aws:execute-api:${Stack.of(parent).region}:${ + Stack.of(parent).account + }:${api.ref}/${stage.stageName}/POST/@connections/*` + this.websocketManagementAPIURL = `https://${api.ref}.execute-api.${ + Stack.of(parent).region + }.amazonaws.com/${stage.stageName}` // Connect const onConnect = new Lambda.Function(this, 'onConnect', { @@ -93,7 +99,11 @@ export class WebsocketAPI extends Construct { apiId: api.ref, description: 'Connect integration', integrationType: 'AWS_PROXY', - integrationUri: `arn:aws:apigateway:${parent.region}:lambda:path/2015-03-31/functions/${onConnect.functionArn}/invocations`, + integrationUri: `arn:aws:apigateway:${ + Stack.of(parent).region + }:lambda:path/2015-03-31/functions/${ + onConnect.functionArn + }/invocations`, }, ) const connectRoute = new ApiGateway.CfnRoute(this, 'connectRoute', { @@ -120,6 +130,24 @@ export class WebsocketAPI extends Construct { }, layers: [baseLayer], logRetention: Logs.RetentionDays.ONE_WEEK, + initialPolicy: [ + new IAM.PolicyStatement({ + actions: ['iot:Publish'], + resources: [ + `arn:aws:iot:${Stack.of(parent).region}:${ + Stack.of(parent).account + }:topic/*/nrplus-ctrl`, + ], + }), + new IAM.PolicyStatement({ + actions: ['iot:DescribeThing'], + resources: [ + `arn:aws:iot:${Stack.of(parent).region}:${ + Stack.of(parent).account + }:thing/nrplus-gw-*`, + ], + }), + ], }) this.connectionsTable.grantReadWriteData(onMessage) @@ -130,7 +158,11 @@ export class WebsocketAPI extends Construct { apiId: api.ref, description: 'Send messages integration', integrationType: 'AWS_PROXY', - integrationUri: `arn:aws:apigateway:${parent.region}:lambda:path/2015-03-31/functions/${onMessage.functionArn}/invocations`, + integrationUri: `arn:aws:apigateway:${ + Stack.of(parent).region + }:lambda:path/2015-03-31/functions/${ + onMessage.functionArn + }/invocations`, }, ) const sendMessageRoute = new ApiGateway.CfnRoute(this, 'sendMessageRoute', { @@ -168,7 +200,11 @@ export class WebsocketAPI extends Construct { apiId: api.ref, description: 'Disconnect integration', integrationType: 'AWS_PROXY', - integrationUri: `arn:aws:apigateway:${parent.region}:lambda:path/2015-03-31/functions/${onDisconnect.functionArn}/invocations`, + integrationUri: `arn:aws:apigateway:${ + Stack.of(parent).region + }:lambda:path/2015-03-31/functions/${ + onDisconnect.functionArn + }/invocations`, }, ) const disconnectRoute = new ApiGateway.CfnRoute(this, 'disconnectRoute', { @@ -184,19 +220,25 @@ export class WebsocketAPI extends Construct { principal: new IAM.ServicePrincipal( 'apigateway.amazonaws.com', ) as IAM.IPrincipal, - sourceArn: `arn:aws:execute-api:${parent.region}:${parent.account}:${api.ref}/${stage.stageName}/sendmessage`, + sourceArn: `arn:aws:execute-api:${Stack.of(parent).region}:${ + Stack.of(parent).account + }:${api.ref}/${stage.stageName}/sendmessage`, }) onConnect.addPermission('invokeByAPI', { principal: new IAM.ServicePrincipal( 'apigateway.amazonaws.com', ) as IAM.IPrincipal, - sourceArn: `arn:aws:execute-api:${parent.region}:${parent.account}:${api.ref}/${stage.stageName}/$connect`, + sourceArn: `arn:aws:execute-api:${Stack.of(parent).region}:${ + Stack.of(parent).account + }:${api.ref}/${stage.stageName}/$connect`, }) onDisconnect.addPermission('invokeByAPI', { principal: new IAM.ServicePrincipal( 'apigateway.amazonaws.com', ) as IAM.IPrincipal, - sourceArn: `arn:aws:execute-api:${parent.region}:${parent.account}:${api.ref}/${stage.stageName}/$disconnect`, + sourceArn: `arn:aws:execute-api:${Stack.of(parent).region}:${ + Stack.of(parent).account + }:${api.ref}/${stage.stageName}/$disconnect`, }) // Publish events @@ -248,7 +290,9 @@ export class WebsocketAPI extends Construct { new IAM.PolicyStatement({ actions: ['iot:Publish'], resources: [ - `arn:aws:iot:${parent.region}:${parent.account}:topic/errors`, + `arn:aws:iot:${Stack.of(parent).region}:${ + Stack.of(parent).account + }:topic/errors`, ], }), ], diff --git a/lambda/onMessage.ts b/lambda/onMessage.ts index 4d7b362..e1393ae 100644 --- a/lambda/onMessage.ts +++ b/lambda/onMessage.ts @@ -1,17 +1,36 @@ import { DynamoDBClient, UpdateItemCommand } from '@aws-sdk/client-dynamodb' import { fromEnv } from '@nordicsemiconductor/from-env' +import { Type } from '@sinclair/typebox' import type { APIGatewayProxyStructuredResultV2, APIGatewayProxyWebsocketEventV2, } from 'aws-lambda' +import { validateWithTypeBox } from './validateWithTypeBox.js' +import { + IoTDataPlaneClient, + PublishCommand, +} from '@aws-sdk/client-iot-data-plane' +import { DescribeThingCommand, IoTClient } from '@aws-sdk/client-iot' const db = new DynamoDBClient({}) const { TableName } = fromEnv({ TableName: 'CONNECTIONS_TABLE_NAME', - websocketManagementAPIURL: 'WEBSOCKET_MANAGEMENT_API_URL', })(process.env) +const message = Type.Object({ + message: Type.Literal('sendmessage'), + data: Type.Object({ + deviceId: Type.String({ minLength: 1 }), + code: Type.String({ minLength: 1 }), + nrplusCtrl: Type.String({ minLength: 1 }), + }), +}) +const validateMessage = validateWithTypeBox(message) + +const iotData = new IoTDataPlaneClient({}) +const iot = new IoTClient({}) + export const handler = async ( event: APIGatewayProxyWebsocketEventV2, ): Promise => { @@ -21,28 +40,78 @@ export const handler = async ( }), ) - await db.send( - new UpdateItemCommand({ - TableName, - Key: { - connectionId: { - S: event.requestContext.connectionId, + let message: Record | undefined = undefined + try { + message = JSON.parse(event.body ?? '{}') + } catch (err) { + console.error(`Failed to parse message as JSON.`) + } + + if (message === undefined) { + console.error(`No message provided.`) + return { + statusCode: 400, + } + } + if (message.data === 'PING') { + await db.send( + new UpdateItemCommand({ + TableName, + Key: { + connectionId: { + S: event.requestContext.connectionId, + }, }, - }, - UpdateExpression: 'SET #lastSeen = :lastSeen', - ExpressionAttributeNames: { - '#lastSeen': 'lastSeen', - }, - ExpressionAttributeValues: { - ':lastSeen': { - S: new Date().toISOString(), + UpdateExpression: 'SET #lastSeen = :lastSeen', + ExpressionAttributeNames: { + '#lastSeen': 'lastSeen', }, - }, - }), - ) + ExpressionAttributeValues: { + ':lastSeen': { + S: new Date().toISOString(), + }, + }, + }), + ) + + return { + statusCode: 200, + } + } - return { - statusCode: 200, - body: `Got your message, ${event.requestContext.connectionId}!`, + const maybeValidMessage = validateMessage(message) + if ('errors' in maybeValidMessage) { + console.error( + `Failed to validate message: ${JSON.stringify(maybeValidMessage.errors)}`, + ) + return { + statusCode: 400, + } + } else { + const { deviceId, code, nrplusCtrl } = maybeValidMessage.value.data + const attributes = ( + await iot.send(new DescribeThingCommand({ thingName: deviceId })) + ).attributes + if ( + attributes === undefined || + !('code' in attributes) || + attributes.code !== code + ) { + return { + statusCode: 403, + body: `Code ${code} not valid for device ${deviceId}!`, + } + } + await iotData.send( + new PublishCommand({ + topic: `${deviceId}/nrplus-ctrl`, + payload: Buffer.from(nrplusCtrl, 'hex'), + qos: 1, + }), + ) + console.log(`>`, `${deviceId}/nrplus-ctrl`, nrplusCtrl) + return { + statusCode: 202, + } } }