diff --git a/package-lock.json b/package-lock.json index 3d1d4868..9b34f465 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.0.34", + "version": "1.1.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.0.34", + "version": "1.1.0", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index 51f0f3f5..dbb144fd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.0.34", + "version": "1.1.0", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", diff --git a/src/from/dynamodb.js b/src/from/dynamodb.js index f17679c8..be3c2597 100644 --- a/src/from/dynamodb.js +++ b/src/from/dynamodb.js @@ -34,7 +34,7 @@ export const fromDynamodb = (event, { id: record.eventID, type: `${calculateEventTypePrefix(record, { skFn, discriminatorFn, eventTypePrefix })}-${calculateEventTypeSuffix(record)}`, partitionKey: record.dynamodb.Keys[pkFn].S, - timestamp: record.dynamodb.ApproximateCreationDateTime * 1000, + timestamp: deriveTimestamp(record), tags: { region: record.awsRegion, }, @@ -88,6 +88,11 @@ const calculateEventTypeSuffix = (record) => { return suffix; }; +const deriveTimestamp = (record) => + parseInt(record.dynamodb.NewImage?.timestamp?.N, 10) || ddbApproximateCreationTimestamp(record); + +export const ddbApproximateCreationTimestamp = (record) => record.dynamodb.ApproximateCreationDateTime * 1000; + //-------------------------------------------- // global table support - version: 2017.11.29 //-------------------------------------------- diff --git a/test/unit/from/dynamodb.test.js b/test/unit/from/dynamodb.test.js index 28373879..e2c7554d 100644 --- a/test/unit/from/dynamodb.test.js +++ b/test/unit/from/dynamodb.test.js @@ -111,6 +111,116 @@ describe('from/dynamodb.js', () => { .done(done); }); + it('should prefer image timestamp if present', (done) => { + const events = toDynamodbRecords([ + { + timestamp: 1572832690, + keys: { + pk: '1', + sk: 'thing', + }, + newImage: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + name: 'n1', + timestamp: 1572832690001, + // insert in the current region will not have the awsregion field + }, + }, + // dynamodb stream emits an extra update event as it adorns the 'aws:rep' global table metadata + // so this extra event should be skipped + { + timestamp: 1572832690, + keys: { + pk: '1', + sk: 'thing', + }, + newImage: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + name: 'n1', + awsregion: 'us-west-2', + }, + oldImage: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + name: 'n1', + // as mentioned above there was no awsregion field on the insert event + }, + }, + ]); + + fromDynamodb(events) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + + expect(collected.length).to.equal(1); + expect(collected[0]).to.deep.equal({ + record: { + eventID: '0', + eventName: 'INSERT', + eventSource: 'aws:dynamodb', + awsRegion: 'us-west-2', + dynamodb: { + ApproximateCreationDateTime: 1572832690, + Keys: { + pk: { + S: '1', + }, + sk: { + S: 'thing', + }, + }, + NewImage: { + pk: { + S: '1', + }, + sk: { + S: 'thing', + }, + discriminator: { + S: 'thing', + }, + name: { + S: 'n1', + }, + timestamp: { + N: '1572832690001', + }, + }, + OldImage: undefined, + SequenceNumber: '0', + StreamViewType: 'NEW_AND_OLD_IMAGES', + }, + }, + event: { + id: '0', + type: 'thing-created', + partitionKey: '1', + timestamp: 1572832690001, + tags: { + region: 'us-west-2', + }, + raw: { + new: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + name: 'n1', + timestamp: 1572832690001, + }, + old: undefined, + }, + }, + }); + }) + .done(done); + }); + it('should parse MODIFY record', (done) => { const events = toDynamodbRecords([ {