diff --git a/package-lock.json b/package-lock.json index 9b34f46..923dd20 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.1.0", + "version": "1.1.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.1.0", + "version": "1.1.1", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index dbb144f..e38466c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.0", + "version": "1.1.1", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", diff --git a/src/from/dynamodb.js b/src/from/dynamodb.js index be3c259..57aea6d 100644 --- a/src/from/dynamodb.js +++ b/src/from/dynamodb.js @@ -11,6 +11,7 @@ export const fromDynamodb = (event, { eventTypePrefix = undefined, ignoreTtlExpiredEvents = false, ignoreReplicas = true, + preferApproximateTimestamp = false, } = {}) => // eslint-disable-line import/prefer-default-export // prepare the event stream @@ -34,7 +35,7 @@ export const fromDynamodb = (event, { id: record.eventID, type: `${calculateEventTypePrefix(record, { skFn, discriminatorFn, eventTypePrefix })}-${calculateEventTypeSuffix(record)}`, partitionKey: record.dynamodb.Keys[pkFn].S, - timestamp: deriveTimestamp(record), + timestamp: deriveTimestamp(record, preferApproximateTimestamp), tags: { region: record.awsRegion, }, @@ -88,8 +89,13 @@ const calculateEventTypeSuffix = (record) => { return suffix; }; -const deriveTimestamp = (record) => - parseInt(record.dynamodb.NewImage?.timestamp?.N, 10) || ddbApproximateCreationTimestamp(record); +const deriveTimestamp = (record, preferApproximateTimestamp) => { + if (preferApproximateTimestamp) { + return ddbApproximateCreationTimestamp(record); + } else { + return parseInt(record.dynamodb.NewImage?.timestamp?.N, 10) || ddbApproximateCreationTimestamp(record); + } +}; export const ddbApproximateCreationTimestamp = (record) => record.dynamodb.ApproximateCreationDateTime * 1000; diff --git a/test/unit/from/dynamodb.test.js b/test/unit/from/dynamodb.test.js index e2c7554..239202c 100644 --- a/test/unit/from/dynamodb.test.js +++ b/test/unit/from/dynamodb.test.js @@ -221,6 +221,116 @@ describe('from/dynamodb.js', () => { .done(done); }); + it('should prefer approximate timestamp if flag set', (done) => { + const events = toDynamodbRecords([ + { + timestamp: 1572832690, + keys: { + pk: '1', + sk: 'thing', + }, + newImage: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + name: 'n1', + timestamp: 1572832690001, + // insert in the current region will not have the awsregion field + }, + }, + // dynamodb stream emits an extra update event as it adorns the 'aws:rep' global table metadata + // so this extra event should be skipped + { + timestamp: 1572832690, + keys: { + pk: '1', + sk: 'thing', + }, + newImage: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + name: 'n1', + awsregion: 'us-west-2', + }, + oldImage: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + name: 'n1', + // as mentioned above there was no awsregion field on the insert event + }, + }, + ]); + + fromDynamodb(events, { preferApproximateTimestamp: true }) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + + expect(collected.length).to.equal(1); + expect(collected[0]).to.deep.equal({ + record: { + eventID: '0', + eventName: 'INSERT', + eventSource: 'aws:dynamodb', + awsRegion: 'us-west-2', + dynamodb: { + ApproximateCreationDateTime: 1572832690, + Keys: { + pk: { + S: '1', + }, + sk: { + S: 'thing', + }, + }, + NewImage: { + pk: { + S: '1', + }, + sk: { + S: 'thing', + }, + discriminator: { + S: 'thing', + }, + name: { + S: 'n1', + }, + timestamp: { + N: '1572832690001', + }, + }, + OldImage: undefined, + SequenceNumber: '0', + StreamViewType: 'NEW_AND_OLD_IMAGES', + }, + }, + event: { + id: '0', + type: 'thing-created', + partitionKey: '1', + timestamp: 1572832690000, + tags: { + region: 'us-west-2', + }, + raw: { + new: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + name: 'n1', + timestamp: 1572832690001, + }, + old: undefined, + }, + }, + }); + }) + .done(done); + }); + it('should parse MODIFY record', (done) => { const events = toDynamodbRecords([ {