From 6dff82c4fad04542fd366fd4c793382001b65467 Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Fri, 26 Sep 2025 10:56:06 -0400 Subject: [PATCH 1/5] Update check for ttl deletes based on AWS docs and add ttlDelete option to toDynamodbRecords helper. --- src/from/dynamodb.js | 33 ++++++++++++++------------------- test/unit/from/dynamodb.test.js | 5 +++-- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/src/from/dynamodb.js b/src/from/dynamodb.js index 57aea6d..db6f255 100644 --- a/src/from/dynamodb.js +++ b/src/from/dynamodb.js @@ -165,25 +165,14 @@ export const outGlobalTableExtraModify = (record) => { //-------------------------------------------- export const outTtlExpiredEvents = (ignoreTtlExpiredEvents) => (record) => { - // this is not a REMOVE event - if (record.eventName !== 'REMOVE') return true; - - const { OldImage } = record.dynamodb; - - // this record does not have ttl - if (!OldImage.ttl || !OldImage.timestamp) return true; - - // ttl has not expired - if (Number(OldImage.ttl.N) * 1000 > Number(OldImage.timestamp.N)) return true; - - // this is a ttl expired event - // should we ignore it - /* istanbul ignore else */ - if (ignoreTtlExpiredEvents) { - return false; - } else { - return true; - } + const { eventName, userIdentity } = record; + // this is not a REMOVE event or we're not ignoring the ttl expired events anyway. + if (eventName !== 'REMOVE' || !ignoreTtlExpiredEvents) return true; + + // See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Record.html + // We trust dynamodb that the ttl expired if its a remove and has the ttl expiry indicating + // identity attributes. + return !(userIdentity?.type === 'Service' && userIdentity?.principalId === 'dynamodb.amazonaws.com'); }; // test helper @@ -206,6 +195,12 @@ export const toDynamodbRecords = (events, { removeUndefinedValues = true } = {}) StreamViewType: 'NEW_AND_OLD_IMAGES', }, // eventSourceARN: 'arn:aws:dynamodb:us-west-2:123456789012:table/myservice-entities/stream/2016-11-16T20:42:48.104', + ...(e.ttlDelete && { + userIdentity: { + principalId: 'dynamodb.amazonaws.com', + type: 'Service', + }, + }), })), }); diff --git a/test/unit/from/dynamodb.test.js b/test/unit/from/dynamodb.test.js index 239202c..ea0c020 100644 --- a/test/unit/from/dynamodb.test.js +++ b/test/unit/from/dynamodb.test.js @@ -919,9 +919,10 @@ describe('from/dynamodb.js', () => { ttl: 1573005490, timestamp: 1573005490000, }, + ttlDelete: true, }, { - timestamp: 1573005490, + timestamp: 1573005490000, keys: { pk: '1', sk: 'thing', @@ -930,7 +931,7 @@ describe('from/dynamodb.js', () => { pk: '1', sk: 'thing', name: 'N1', - ttl: 1573015490, // hasn't expired yet + ttl: 1573015490, // has expired, but event is not a ddb ttl remove timestamp: 1573005490000, }, }, From 4d8132f5dffa8429bcf9c45d4fd63d73011b4ca8 Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Fri, 26 Sep 2025 10:56:35 -0400 Subject: [PATCH 2/5] Bump version. --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 1513527..62882ba 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.1.12", + "version": "1.1.13", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.1.12", + "version": "1.1.13", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index 016be26..98782f1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.12", + "version": "1.1.13", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", From 24ab6ebafed2f40a2eabd671c03b1024163f5eb5 Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Wed, 1 Oct 2025 09:37:03 -0400 Subject: [PATCH 3/5] Fallback to approx create time check to catch replicated ttl removes. --- src/from/dynamodb.js | 22 ++++++++++++++---- test/unit/from/dynamodb.test.js | 41 +++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/src/from/dynamodb.js b/src/from/dynamodb.js index db6f255..4c3f5a0 100644 --- a/src/from/dynamodb.js +++ b/src/from/dynamodb.js @@ -165,14 +165,23 @@ export const outGlobalTableExtraModify = (record) => { //-------------------------------------------- export const outTtlExpiredEvents = (ignoreTtlExpiredEvents) => (record) => { - const { eventName, userIdentity } = record; + const { eventName, userIdentity, dynamodb: { OldImage, ApproximateCreationDateTime } } = record; // this is not a REMOVE event or we're not ignoring the ttl expired events anyway. if (eventName !== 'REMOVE' || !ignoreTtlExpiredEvents) return true; - // See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Record.html - // We trust dynamodb that the ttl expired if its a remove and has the ttl expiry indicating - // identity attributes. - return !(userIdentity?.type === 'Service' && userIdentity?.principalId === 'dynamodb.amazonaws.com'); + if (userIdentity) { + // See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Record.html + // We trust dynamodb that the ttl expired if its a remove and has the ttl expiry indicating + // identity attributes. + return !(userIdentity?.type === 'Service' && userIdentity?.principalId === 'dynamodb.amazonaws.com'); + } else if (OldImage.ttl?.N) { + // If no user identity attribute is present, this may be a replicated TTL delete, but we still + // want to honor it because filtering out replica region events may be disabled. + const ttlSec = Number(OldImage.ttl.N); + return !(ttlSec <= ApproximateCreationDateTime); + } else { + return true; + } }; // test helper @@ -185,6 +194,9 @@ export const toDynamodbRecords = (events, { removeUndefinedValues = true } = {}) eventSource: 'aws:dynamodb', awsRegion: e.newImage?.awsregion || process.env.AWS_REGION || /* istanbul ignore next */ 'us-west-2', dynamodb: { + // TODO - Fix this in next major version bump. ApproximateCreationDateTime is meant to be in seconds, not millis. + // Didn't want to fix until a major version bump to avoid compatibility issues for consumers + // upgrading the lib. This should be e.timestamp / 1000 ApproximateCreationDateTime: e.timestamp, Keys: e.keys ? marshall(e.keys, { removeUndefinedValues }) : /* istanbul ignore next */ undefined, NewImage: e.newImage ? marshall(e.newImage, { removeUndefinedValues }) : undefined, diff --git a/test/unit/from/dynamodb.test.js b/test/unit/from/dynamodb.test.js index ea0c020..46ab065 100644 --- a/test/unit/from/dynamodb.test.js +++ b/test/unit/from/dynamodb.test.js @@ -946,6 +946,47 @@ describe('from/dynamodb.js', () => { .done(done); }); + it('should ignore replicated ttl', (done) => { + const events = toDynamodbRecords([ + { + timestamp: 1573005491, + keys: { + pk: '1', + sk: 'thing', + }, + oldImage: { + pk: '1', + sk: 'thing', + name: 'N1', + ttl: 1573005490, + timestamp: 1573005490000, + }, + }, + { + timestamp: 1573005490, + keys: { + pk: '1', + sk: 'thing', + }, + oldImage: { + pk: '1', + sk: 'thing', + name: 'N1', + ttl: 1573015490, // expired, has no identity attributes to indicate ttl delete + timestamp: 1573005490000, + }, + }, + ]); + + fromDynamodb(events, { ignoreTtlExpiredEvents: true }) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + expect(collected.length).to.equal(1); + }) + .done(done); + }); + it('should keep replica records if ignoreReplicas is false', (done) => { const events = toDynamodbRecords([ { From 55ce844d2bccf1c6a133aca80a617ddc80ae45ca Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Wed, 1 Oct 2025 09:40:25 -0400 Subject: [PATCH 4/5] Fix unit test. --- test/unit/from/dynamodb.test.js | 45 ++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/test/unit/from/dynamodb.test.js b/test/unit/from/dynamodb.test.js index 46ab065..c97875d 100644 --- a/test/unit/from/dynamodb.test.js +++ b/test/unit/from/dynamodb.test.js @@ -907,7 +907,7 @@ describe('from/dynamodb.js', () => { it('should ignore expired ttl', (done) => { const events = toDynamodbRecords([ { - timestamp: 1573005490000, + timestamp: 1573005490, keys: { pk: '1', sk: 'thing', @@ -922,7 +922,7 @@ describe('from/dynamodb.js', () => { ttlDelete: true, }, { - timestamp: 1573005490000, + timestamp: 1573005490, keys: { pk: '1', sk: 'thing', @@ -931,7 +931,7 @@ describe('from/dynamodb.js', () => { pk: '1', sk: 'thing', name: 'N1', - ttl: 1573015490, // has expired, but event is not a ddb ttl remove + ttl: 1573015491, timestamp: 1573005490000, }, }, @@ -987,6 +987,45 @@ describe('from/dynamodb.js', () => { .done(done); }); + it('should passes through record with no ttl if ignore ttl events is true', (done) => { + const events = toDynamodbRecords([ + { + timestamp: 1573005491, + keys: { + pk: '1', + sk: 'thing', + }, + oldImage: { + pk: '1', + sk: 'thing', + name: 'N1', + timestamp: 1573005490000, + }, + }, + { + timestamp: 1573005490, + keys: { + pk: '1', + sk: 'thing', + }, + oldImage: { + pk: '1', + sk: 'thing', + name: 'N1', + timestamp: 1573005490000, + }, + }, + ]); + + fromDynamodb(events, { ignoreTtlExpiredEvents: true }) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + expect(collected.length).to.equal(2); + }) + .done(done); + }); + it('should keep replica records if ignoreReplicas is false', (done) => { const events = toDynamodbRecords([ { From cbfbca8e2cc80279ab0407aeb7d918b1499e5e1d Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Wed, 1 Oct 2025 09:43:30 -0400 Subject: [PATCH 5/5] Remove erroneous comment. --- src/from/dynamodb.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/from/dynamodb.js b/src/from/dynamodb.js index 4c3f5a0..9191166 100644 --- a/src/from/dynamodb.js +++ b/src/from/dynamodb.js @@ -194,9 +194,6 @@ export const toDynamodbRecords = (events, { removeUndefinedValues = true } = {}) eventSource: 'aws:dynamodb', awsRegion: e.newImage?.awsregion || process.env.AWS_REGION || /* istanbul ignore next */ 'us-west-2', dynamodb: { - // TODO - Fix this in next major version bump. ApproximateCreationDateTime is meant to be in seconds, not millis. - // Didn't want to fix until a major version bump to avoid compatibility issues for consumers - // upgrading the lib. This should be e.timestamp / 1000 ApproximateCreationDateTime: e.timestamp, Keys: e.keys ? marshall(e.keys, { removeUndefinedValues }) : /* istanbul ignore next */ undefined, NewImage: e.newImage ? marshall(e.newImage, { removeUndefinedValues }) : undefined,