Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptor for dynamodb streams #347

Merged
merged 3 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const { dynamodb } = require("@laconia/event");

module.exports = class DynamoDbStreamJsonInputConvertor {
convert(event) {
return dynamodb(event).records.map(r => r.jsonNewImage);
}
};
1 change: 1 addition & 0 deletions packages/laconia-adapter/src/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ declare type S3AdapterFactoryOptions = {
declare namespace adapter {
function s3(options?: S3AdapterFactoryOptions): AdapterFactory<any>;
function kinesis(): AdapterFactory<any[]>;
function dynamodb(): AdapterFactory<any[]>;
function sns(): AdapterFactory<any>;
function sqs(): AdapterFactory<any[]>;
}
Expand Down
2 changes: 2 additions & 0 deletions packages/laconia-adapter/src/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const KinesisJsonInputConverter = require("./KinesisJsonInputConverter");
const DynamoDbStreamJsonInputConvertor = require("./DynamoDbStreamJsonInputConvertor");
const SnsJsonInputConverter = require("./SnsJsonInputConverter");
const SqsJsonInputConverter = require("./SqsJsonInputConverter");
const createS3EventAdapter = require("./createS3EventAdapter");
Expand All @@ -7,5 +8,6 @@ const createEventAdapter = require("./createEventAdapter");
exports.sns = createEventAdapter(new SnsJsonInputConverter());
exports.sqs = createEventAdapter(new SqsJsonInputConverter());
exports.kinesis = createEventAdapter(new KinesisJsonInputConverter());
exports.dynamodb = createEventAdapter(new DynamoDbStreamJsonInputConvertor());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Converter instead of Convertor


exports.s3 = createS3EventAdapter;
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
const merge = require("lodash/merge");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great test!

const DynamoDbStreamJsonInputConvertor = require("../src/DynamoDbStreamJsonInputConvertor");

const dynamodbTemplate = {
eventID: "1",
eventVersion: "1.0",
dynamodb: {
Keys: {
Id: {
N: "101"
}
},
NewImage: {
Message: {
S: "New item!"
},
Id: {
N: "101"
}
},
StreamViewType: "NEW_AND_OLD_IMAGES",
SequenceNumber: "111",
SizeBytes: 26
},
awsRegion: "us-west-2",
eventName: "INSERT",
eventSourceARN: "arn:aws:dynamodb:us-east-1:123456789012:table/images",
eventSource: "aws:dynamodb"
};

const createDynamoDbStreamEvent = records => {
return merge({}, dynamodbTemplate, {
Records: records.map(r => ({
dynamodb: {
NewImage: r
}
}))
});
};

describe("DynamoDbStreamJsonInputConvertor", () => {
it("Should convert one stream record event", async () => {
const inputConverter = new DynamoDbStreamJsonInputConvertor();
const newImage = {
Number: { N: "123" },
Null: { NULL: true },
Boolean: { BOOL: true }
};
const input = await inputConverter.convert(
createDynamoDbStreamEvent([newImage])
);

expect(input).toEqual([
{
Number: 123,
Null: null,
Boolean: true
}
]);
});

it("Should convert multiple stream record event", async () => {
const inputConverter = new DynamoDbStreamJsonInputConvertor();
const images = [
{
Number: { N: "123" },
Null: { NULL: true },
Boolean: { BOOL: true }
},
{
Message: {
S: "Stream sample record!"
},
Id: {
N: "110"
},
List: { L: [{ S: "fizz" }, { S: "buzz" }, { S: "pop" }] }
}
];
const input = await inputConverter.convert(
createDynamoDbStreamEvent(images)
);

expect(input).toEqual([
{
Number: 123,
Null: null,
Boolean: true
},
{
Message: "Stream sample record!",
Id: 110,
List: ["fizz", "buzz", "pop"]
}
]);
});
});
2 changes: 1 addition & 1 deletion packages/laconia-adapter/test/index.spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const event = require("../src/index");

describe("@laconia/adapter", () => {
const eventAdapters = ["s3", "kinesis", "sns", "sqs"];
const eventAdapters = ["s3", "kinesis", "sns", "sqs", "dynamodb"];

eventAdapters.forEach(eventAdapter => {
describe(`#${eventAdapter}`, () => {
Expand Down
13 changes: 13 additions & 0 deletions packages/laconia-event/src/DynamoDbStreamEvent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const DynamoDbStreamRecord = require("./DynamoDbStreamRecord");

module.exports = class DynamoDbStreamEvent {
constructor(records) {
this.records = records;
}

static fromRaw(event) {
return new DynamoDbStreamEvent(
event.Records.map(r => DynamoDbStreamRecord.fromRaw(r))
);
}
};
19 changes: 19 additions & 0 deletions packages/laconia-event/src/DynamoDbStreamRecord.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const AWS = require("aws-sdk");

module.exports = class DynamoDbStreamRecord {
constructor(data) {
this.data = data;
}

get jsonNewImage() {
return AWS.DynamoDB.Converter.unmarshall(this.newImage);
}

get newImage() {
return this.data.NewImage;
}

static fromRaw(record) {
return new DynamoDbStreamRecord(record.dynamodb);
}
};
2 changes: 2 additions & 0 deletions packages/laconia-event/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ const S3Event = require("./S3Event");
const SqsEvent = require("./SqsEvent");
const KinesisEvent = require("./KinesisEvent");
const SnsEvent = require("./SnsEvent");
const DynamoDbStreamEvent = require("./DynamoDbStreamEvent");
const ApiGatewayEvent = require("./apigateway/ApiGatewayEvent");
const ApiGatewayResponse = require("./apigateway/ApiGatewayResponse");
const ApiGatewayWebSocketEvent = require("./apigateway/ApiGatewayWebSocketEvent");

exports.s3 = S3Event.fromRaw;
exports.sqs = SqsEvent.fromRaw;
exports.kinesis = KinesisEvent.fromRaw;
exports.dynamodb = DynamoDbStreamEvent.fromRaw;
exports.sns = SnsEvent.fromRaw;
exports.apigateway = {
req: ApiGatewayEvent.fromRaw,
Expand Down
94 changes: 94 additions & 0 deletions packages/laconia-event/test/DynamoDbStreamEvent.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
const merge = require("lodash/merge");
const DynamoDbStreamEvent = require("../src/DynamoDbStreamEvent");
const DynamoDbStreamRecord = require("../src/DynamoDbStreamRecord");

const dynamodbTemplate = {
eventID: "1",
eventVersion: "1.0",
dynamodb: {
Keys: {
Id: {
N: "101"
}
},
NewImage: {
Message: {
S: "New item!"
},
Id: {
N: "101"
}
},
StreamViewType: "NEW_AND_OLD_IMAGES",
SequenceNumber: "111",
SizeBytes: 26
},
awsRegion: "us-west-2",
eventName: "INSERT",
eventSourceARN: "arn:aws:dynamodb:us-east-1:123456789012:table/images",
eventSource: "aws:dynamodb"
};

const createDynamoDbStreamEvent = records => {
return merge({}, dynamodbTemplate, {
Records: records.map(r => ({
dynamodb: {
NewImage: r
}
}))
});
};

describe("DynamoDbStreamEvent", () => {
it("Should parse single record", () => {
const newImage = {
Message: {
S: "Stream sample record!"
},
Id: {
N: "110"
},
List: { L: [{ S: "fizz" }, { S: "buzz" }, { S: "pop" }] }
};

const jsonNewImage = {
Message: "Stream sample record!",
Id: 110,
List: ["fizz", "buzz", "pop"]
};
const dynamoDbStreamEvent = DynamoDbStreamEvent.fromRaw(
createDynamoDbStreamEvent([Object.assign({}, newImage)])
);

expect(dynamoDbStreamEvent.records).toHaveLength(1);
expect(dynamoDbStreamEvent.records[0]).toBeInstanceOf(DynamoDbStreamRecord);
expect(dynamoDbStreamEvent.records[0].newImage).toEqual(newImage);
expect(dynamoDbStreamEvent.records[0].jsonNewImage).toEqual(jsonNewImage);
});

it("Should parse multiple record", () => {
const dynamoDbStreamEvent = DynamoDbStreamEvent.fromRaw(
createDynamoDbStreamEvent([
{
Message: {
S: "New item!"
},
Id: {
N: "105"
}
},
{
Message: {
S: "Newest item!"
},
Id: {
N: "106"
}
}
])
);

expect(dynamoDbStreamEvent).toBeInstanceOf(DynamoDbStreamEvent);
expect(dynamoDbStreamEvent.records).toHaveLength(2);
});
});
59 changes: 59 additions & 0 deletions packages/laconia-event/test/DynamoDbStreamRecord.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
const DynamoDbStreamRecord = require("../src/DynamoDbStreamRecord");

const rawRecord = {
eventID: "1",
eventVersion: "1.0",
dynamodb: {
Keys: {
Id: {
N: "101"
}
},
NewImage: {
Message: {
S: "New item!"
},
Id: {
N: "101"
}
},
StreamViewType: "NEW_AND_OLD_IMAGES",
SequenceNumber: "111",
SizeBytes: 26
},
awsRegion: "us-west-2",
eventName: "INSERT",
eventSourceARN: "arn:aws:dynamodb:us-east-1:123456789012:table/images",
eventSource: "aws:dynamodb"
};

const createRawRecord = stream => {
const raw = Object.assign({}, rawRecord);
raw.dynamodb = Object.assign({}, rawRecord.dynamodb, stream);
return raw;
};

describe("DynamoDbStreamRecord", () => {
it("Should be able to parse from raw event", () => {
const newImage = {
Message: {
S: "Test New item!"
},
Id: {
N: "105"
}
};
const jsonNewImage = { Message: "Test New item!", Id: 105 };

const dynamoDbStreamRecord = DynamoDbStreamRecord.fromRaw(
createRawRecord({
NewImage: newImage
})
);

expect(dynamoDbStreamRecord).toHaveProperty("newImage");
expect(dynamoDbStreamRecord).toHaveProperty("jsonNewImage");
expect(dynamoDbStreamRecord.newImage).toEqual(newImage);
expect(dynamoDbStreamRecord.jsonNewImage).toEqual(jsonNewImage);
});
});
2 changes: 1 addition & 1 deletion packages/laconia-event/test/index.spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const event = require("../src/index");

describe("index", () => {
const eventParsers = ["s3", "kinesis", "sns", "sqs"];
const eventParsers = ["s3", "kinesis", "sns", "sqs", "dynamodb"];

eventParsers.forEach(eventParser => {
describe(`#${eventParser}`, () => {
Expand Down