diff --git a/package-lock.json b/package-lock.json index 1513527..62882ba 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.1.12", + "version": "1.1.13", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.1.12", + "version": "1.1.13", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index 016be26..98782f1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.12", + "version": "1.1.13", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", diff --git a/src/from/dynamodb.js b/src/from/dynamodb.js index 57aea6d..9191166 100644 --- a/src/from/dynamodb.js +++ b/src/from/dynamodb.js @@ -165,22 +165,20 @@ export const outGlobalTableExtraModify = (record) => { //-------------------------------------------- export const outTtlExpiredEvents = (ignoreTtlExpiredEvents) => (record) => { - // this is not a REMOVE event - if (record.eventName !== 'REMOVE') return true; - - const { OldImage } = record.dynamodb; - - // this record does not have ttl - if (!OldImage.ttl || !OldImage.timestamp) return true; - - // ttl has not expired - if (Number(OldImage.ttl.N) * 1000 > Number(OldImage.timestamp.N)) return true; - - // this is a ttl expired event - // should we ignore it - /* istanbul ignore else */ - if (ignoreTtlExpiredEvents) { - return false; + const { eventName, userIdentity, dynamodb: { OldImage, ApproximateCreationDateTime } } = record; + // this is not a REMOVE event or we're not ignoring the ttl expired events anyway. + if (eventName !== 'REMOVE' || !ignoreTtlExpiredEvents) return true; + + if (userIdentity) { + // See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Record.html + // We trust dynamodb that the ttl expired if its a remove and has the ttl expiry indicating + // identity attributes. + return !(userIdentity?.type === 'Service' && userIdentity?.principalId === 'dynamodb.amazonaws.com'); + } else if (OldImage.ttl?.N) { + // If no user identity attribute is present, this may be a replicated TTL delete, but we still + // want to honor it because filtering out replica region events may be disabled. + const ttlSec = Number(OldImage.ttl.N); + return !(ttlSec <= ApproximateCreationDateTime); } else { return true; } @@ -206,6 +204,12 @@ export const toDynamodbRecords = (events, { removeUndefinedValues = true } = {}) StreamViewType: 'NEW_AND_OLD_IMAGES', }, // eventSourceARN: 'arn:aws:dynamodb:us-west-2:123456789012:table/myservice-entities/stream/2016-11-16T20:42:48.104', + ...(e.ttlDelete && { + userIdentity: { + principalId: 'dynamodb.amazonaws.com', + type: 'Service', + }, + }), })), }); diff --git a/test/unit/from/dynamodb.test.js b/test/unit/from/dynamodb.test.js index 239202c..c97875d 100644 --- a/test/unit/from/dynamodb.test.js +++ b/test/unit/from/dynamodb.test.js @@ -907,7 +907,49 @@ describe('from/dynamodb.js', () => { it('should ignore expired ttl', (done) => { const events = toDynamodbRecords([ { - timestamp: 1573005490000, + timestamp: 1573005490, + keys: { + pk: '1', + sk: 'thing', + }, + oldImage: { + pk: '1', + sk: 'thing', + name: 'N1', + ttl: 1573005490, + timestamp: 1573005490000, + }, + ttlDelete: true, + }, + { + timestamp: 1573005490, + keys: { + pk: '1', + sk: 'thing', + }, + oldImage: { + pk: '1', + sk: 'thing', + name: 'N1', + ttl: 1573015491, + timestamp: 1573005490000, + }, + }, + ]); + + fromDynamodb(events, { ignoreTtlExpiredEvents: true }) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + expect(collected.length).to.equal(1); + }) + .done(done); + }); + + it('should ignore replicated ttl', (done) => { + const events = toDynamodbRecords([ + { + timestamp: 1573005491, keys: { pk: '1', sk: 'thing', @@ -930,7 +972,7 @@ describe('from/dynamodb.js', () => { pk: '1', sk: 'thing', name: 'N1', - ttl: 1573015490, // hasn't expired yet + ttl: 1573015490, // expired, has no identity attributes to indicate ttl delete timestamp: 1573005490000, }, }, @@ -945,6 +987,45 @@ describe('from/dynamodb.js', () => { .done(done); }); + it('should passes through record with no ttl if ignore ttl events is true', (done) => { + const events = toDynamodbRecords([ + { + timestamp: 1573005491, + keys: { + pk: '1', + sk: 'thing', + }, + oldImage: { + pk: '1', + sk: 'thing', + name: 'N1', + timestamp: 1573005490000, + }, + }, + { + timestamp: 1573005490, + keys: { + pk: '1', + sk: 'thing', + }, + oldImage: { + pk: '1', + sk: 'thing', + name: 'N1', + timestamp: 1573005490000, + }, + }, + ]); + + fromDynamodb(events, { ignoreTtlExpiredEvents: true }) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + expect(collected.length).to.equal(2); + }) + .done(done); + }); + it('should keep replica records if ignoreReplicas is false', (done) => { const events = toDynamodbRecords([ {