diff --git a/packages/laconia-adapter/src/DynamoDbStreamJsonInputConverter.js b/packages/laconia-adapter/src/DynamoDbStreamJsonInputConverter.js new file mode 100644 index 00000000..d907aa25 --- /dev/null +++ b/packages/laconia-adapter/src/DynamoDbStreamJsonInputConverter.js @@ -0,0 +1,7 @@ +const { dynamodb } = require("@laconia/event"); + +module.exports = class DynamoDbStreamJsonInputConverter { + convert(event) { + return dynamodb(event).records.map(r => r.jsonNewImage); + } +}; diff --git a/packages/laconia-adapter/src/index.d.ts b/packages/laconia-adapter/src/index.d.ts index c528d908..b4dd5b7b 100644 --- a/packages/laconia-adapter/src/index.d.ts +++ b/packages/laconia-adapter/src/index.d.ts @@ -7,6 +7,7 @@ declare type S3AdapterFactoryOptions = { declare namespace adapter { function s3(options?: S3AdapterFactoryOptions): AdapterFactory; function kinesis(): AdapterFactory; + function dynamodb(): AdapterFactory; function sns(): AdapterFactory; function sqs(): AdapterFactory; } diff --git a/packages/laconia-adapter/src/index.js b/packages/laconia-adapter/src/index.js index 0422f922..71cc5eed 100644 --- a/packages/laconia-adapter/src/index.js +++ b/packages/laconia-adapter/src/index.js @@ -1,4 +1,5 @@ const KinesisJsonInputConverter = require("./KinesisJsonInputConverter"); +const DynamoDbStreamJsonInputConverter = require("./DynamoDbStreamJsonInputConverter"); const SnsJsonInputConverter = require("./SnsJsonInputConverter"); const SqsJsonInputConverter = require("./SqsJsonInputConverter"); const createS3EventAdapter = require("./createS3EventAdapter"); @@ -7,5 +8,6 @@ const createEventAdapter = require("./createEventAdapter"); exports.sns = createEventAdapter(new SnsJsonInputConverter()); exports.sqs = createEventAdapter(new SqsJsonInputConverter()); exports.kinesis = createEventAdapter(new KinesisJsonInputConverter()); +exports.dynamodb = createEventAdapter(new DynamoDbStreamJsonInputConverter()); exports.s3 = createS3EventAdapter; diff --git a/packages/laconia-adapter/test/DynamoDbStreamJsonInputConverter.spec.js b/packages/laconia-adapter/test/DynamoDbStreamJsonInputConverter.spec.js new file mode 100644 index 00000000..dbfdebb7 --- /dev/null +++ b/packages/laconia-adapter/test/DynamoDbStreamJsonInputConverter.spec.js @@ -0,0 +1,97 @@ +const merge = require("lodash/merge"); +const DynamoDbStreamJsonInputConverter = require("../src/DynamoDbStreamJsonInputConverter"); + +const dynamodbTemplate = { + eventID: "1", + eventVersion: "1.0", + dynamodb: { + Keys: { + Id: { + N: "101" + } + }, + NewImage: { + Message: { + S: "New item!" + }, + Id: { + N: "101" + } + }, + StreamViewType: "NEW_AND_OLD_IMAGES", + SequenceNumber: "111", + SizeBytes: 26 + }, + awsRegion: "us-west-2", + eventName: "INSERT", + eventSourceARN: "arn:aws:dynamodb:us-east-1:123456789012:table/images", + eventSource: "aws:dynamodb" +}; + +const createDynamoDbStreamEvent = records => { + return merge({}, dynamodbTemplate, { + Records: records.map(r => ({ + dynamodb: { + NewImage: r + } + })) + }); +}; + +describe("DynamoDbStreamJsonInputConverter", () => { + it("Should convert one stream record event", async () => { + const inputConverter = new DynamoDbStreamJsonInputConverter(); + const newImage = { + Number: { N: "123" }, + Null: { NULL: true }, + Boolean: { BOOL: true } + }; + const input = await inputConverter.convert( + createDynamoDbStreamEvent([newImage]) + ); + + expect(input).toEqual([ + { + Number: 123, + Null: null, + Boolean: true + } + ]); + }); + + it("Should convert multiple stream record event", async () => { + const inputConverter = new DynamoDbStreamJsonInputConverter(); + const images = [ + { + Number: { N: "123" }, + Null: { NULL: true }, + Boolean: { BOOL: true } + }, + { + Message: { + S: "Stream sample record!" + }, + Id: { + N: "110" + }, + List: { L: [{ S: "fizz" }, { S: "buzz" }, { S: "pop" }] } + } + ]; + const input = await inputConverter.convert( + createDynamoDbStreamEvent(images) + ); + + expect(input).toEqual([ + { + Number: 123, + Null: null, + Boolean: true + }, + { + Message: "Stream sample record!", + Id: 110, + List: ["fizz", "buzz", "pop"] + } + ]); + }); +}); diff --git a/packages/laconia-adapter/test/index.spec.js b/packages/laconia-adapter/test/index.spec.js index 8a3f0c7a..ce8cddf2 100644 --- a/packages/laconia-adapter/test/index.spec.js +++ b/packages/laconia-adapter/test/index.spec.js @@ -1,7 +1,7 @@ const event = require("../src/index"); describe("@laconia/adapter", () => { - const eventAdapters = ["s3", "kinesis", "sns", "sqs"]; + const eventAdapters = ["s3", "kinesis", "sns", "sqs", "dynamodb"]; eventAdapters.forEach(eventAdapter => { describe(`#${eventAdapter}`, () => { diff --git a/packages/laconia-event/src/DynamoDbStreamEvent.js b/packages/laconia-event/src/DynamoDbStreamEvent.js new file mode 100644 index 00000000..bcad38ed --- /dev/null +++ b/packages/laconia-event/src/DynamoDbStreamEvent.js @@ -0,0 +1,13 @@ +const DynamoDbStreamRecord = require("./DynamoDbStreamRecord"); + +module.exports = class DynamoDbStreamEvent { + constructor(records) { + this.records = records; + } + + static fromRaw(event) { + return new DynamoDbStreamEvent( + event.Records.map(r => DynamoDbStreamRecord.fromRaw(r)) + ); + } +}; diff --git a/packages/laconia-event/src/DynamoDbStreamRecord.js b/packages/laconia-event/src/DynamoDbStreamRecord.js new file mode 100644 index 00000000..5ec0a284 --- /dev/null +++ b/packages/laconia-event/src/DynamoDbStreamRecord.js @@ -0,0 +1,19 @@ +const AWS = require("aws-sdk"); + +module.exports = class DynamoDbStreamRecord { + constructor(data) { + this.data = data; + } + + get jsonNewImage() { + return AWS.DynamoDB.Converter.unmarshall(this.newImage); + } + + get newImage() { + return this.data.NewImage; + } + + static fromRaw(record) { + return new DynamoDbStreamRecord(record.dynamodb); + } +}; diff --git a/packages/laconia-event/src/index.js b/packages/laconia-event/src/index.js index 0df64ad0..d45332ab 100644 --- a/packages/laconia-event/src/index.js +++ b/packages/laconia-event/src/index.js @@ -2,6 +2,7 @@ const S3Event = require("./S3Event"); const SqsEvent = require("./SqsEvent"); const KinesisEvent = require("./KinesisEvent"); const SnsEvent = require("./SnsEvent"); +const DynamoDbStreamEvent = require("./DynamoDbStreamEvent"); const ApiGatewayEvent = require("./apigateway/ApiGatewayEvent"); const ApiGatewayResponse = require("./apigateway/ApiGatewayResponse"); const ApiGatewayWebSocketEvent = require("./apigateway/ApiGatewayWebSocketEvent"); @@ -9,6 +10,7 @@ const ApiGatewayWebSocketEvent = require("./apigateway/ApiGatewayWebSocketEvent" exports.s3 = S3Event.fromRaw; exports.sqs = SqsEvent.fromRaw; exports.kinesis = KinesisEvent.fromRaw; +exports.dynamodb = DynamoDbStreamEvent.fromRaw; exports.sns = SnsEvent.fromRaw; exports.apigateway = { req: ApiGatewayEvent.fromRaw, diff --git a/packages/laconia-event/test/DynamoDbStreamEvent.spec.js b/packages/laconia-event/test/DynamoDbStreamEvent.spec.js new file mode 100644 index 00000000..1e549c32 --- /dev/null +++ b/packages/laconia-event/test/DynamoDbStreamEvent.spec.js @@ -0,0 +1,94 @@ +const merge = require("lodash/merge"); +const DynamoDbStreamEvent = require("../src/DynamoDbStreamEvent"); +const DynamoDbStreamRecord = require("../src/DynamoDbStreamRecord"); + +const dynamodbTemplate = { + eventID: "1", + eventVersion: "1.0", + dynamodb: { + Keys: { + Id: { + N: "101" + } + }, + NewImage: { + Message: { + S: "New item!" + }, + Id: { + N: "101" + } + }, + StreamViewType: "NEW_AND_OLD_IMAGES", + SequenceNumber: "111", + SizeBytes: 26 + }, + awsRegion: "us-west-2", + eventName: "INSERT", + eventSourceARN: "arn:aws:dynamodb:us-east-1:123456789012:table/images", + eventSource: "aws:dynamodb" +}; + +const createDynamoDbStreamEvent = records => { + return merge({}, dynamodbTemplate, { + Records: records.map(r => ({ + dynamodb: { + NewImage: r + } + })) + }); +}; + +describe("DynamoDbStreamEvent", () => { + it("Should parse single record", () => { + const newImage = { + Message: { + S: "Stream sample record!" + }, + Id: { + N: "110" + }, + List: { L: [{ S: "fizz" }, { S: "buzz" }, { S: "pop" }] } + }; + + const jsonNewImage = { + Message: "Stream sample record!", + Id: 110, + List: ["fizz", "buzz", "pop"] + }; + const dynamoDbStreamEvent = DynamoDbStreamEvent.fromRaw( + createDynamoDbStreamEvent([Object.assign({}, newImage)]) + ); + + expect(dynamoDbStreamEvent.records).toHaveLength(1); + expect(dynamoDbStreamEvent.records[0]).toBeInstanceOf(DynamoDbStreamRecord); + expect(dynamoDbStreamEvent.records[0].newImage).toEqual(newImage); + expect(dynamoDbStreamEvent.records[0].jsonNewImage).toEqual(jsonNewImage); + }); + + it("Should parse multiple record", () => { + const dynamoDbStreamEvent = DynamoDbStreamEvent.fromRaw( + createDynamoDbStreamEvent([ + { + Message: { + S: "New item!" + }, + Id: { + N: "105" + } + }, + { + Message: { + S: "Newest item!" + }, + Id: { + N: "106" + } + } + ]) + ); + + expect(dynamoDbStreamEvent).toBeInstanceOf(DynamoDbStreamEvent); + expect(dynamoDbStreamEvent.records).toHaveLength(2); + }); +}); diff --git a/packages/laconia-event/test/DynamoDbStreamRecord.spec.js b/packages/laconia-event/test/DynamoDbStreamRecord.spec.js new file mode 100644 index 00000000..ba48485a --- /dev/null +++ b/packages/laconia-event/test/DynamoDbStreamRecord.spec.js @@ -0,0 +1,59 @@ +const DynamoDbStreamRecord = require("../src/DynamoDbStreamRecord"); + +const rawRecord = { + eventID: "1", + eventVersion: "1.0", + dynamodb: { + Keys: { + Id: { + N: "101" + } + }, + NewImage: { + Message: { + S: "New item!" + }, + Id: { + N: "101" + } + }, + StreamViewType: "NEW_AND_OLD_IMAGES", + SequenceNumber: "111", + SizeBytes: 26 + }, + awsRegion: "us-west-2", + eventName: "INSERT", + eventSourceARN: "arn:aws:dynamodb:us-east-1:123456789012:table/images", + eventSource: "aws:dynamodb" +}; + +const createRawRecord = stream => { + const raw = Object.assign({}, rawRecord); + raw.dynamodb = Object.assign({}, rawRecord.dynamodb, stream); + return raw; +}; + +describe("DynamoDbStreamRecord", () => { + it("Should be able to parse from raw event", () => { + const newImage = { + Message: { + S: "Test New item!" + }, + Id: { + N: "105" + } + }; + const jsonNewImage = { Message: "Test New item!", Id: 105 }; + + const dynamoDbStreamRecord = DynamoDbStreamRecord.fromRaw( + createRawRecord({ + NewImage: newImage + }) + ); + + expect(dynamoDbStreamRecord).toHaveProperty("newImage"); + expect(dynamoDbStreamRecord).toHaveProperty("jsonNewImage"); + expect(dynamoDbStreamRecord.newImage).toEqual(newImage); + expect(dynamoDbStreamRecord.jsonNewImage).toEqual(jsonNewImage); + }); +}); diff --git a/packages/laconia-event/test/index.spec.js b/packages/laconia-event/test/index.spec.js index 5c1ac7a8..ad1bab63 100644 --- a/packages/laconia-event/test/index.spec.js +++ b/packages/laconia-event/test/index.spec.js @@ -1,7 +1,7 @@ const event = require("../src/index"); describe("index", () => { - const eventParsers = ["s3", "kinesis", "sns", "sqs"]; + const eventParsers = ["s3", "kinesis", "sns", "sqs", "dynamodb"]; eventParsers.forEach(eventParser => { describe(`#${eventParser}`, () => {