From b4293c59859de824461ec3c01c2f8ca047adb351 Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Tue, 4 Nov 2025 10:57:54 -0500 Subject: [PATCH 1/4] Fix issue with job flavor swallowing cursors if multiple jobs triggered in a single lambda invocation. --- src/flavors/job.js | 46 ++--- test/unit/flavors/job.test.js | 314 ++++++++++++++++++++++++++++++++++ 2 files changed, 333 insertions(+), 27 deletions(-) diff --git a/src/flavors/job.js b/src/flavors/job.js index f8daa873..b05ca711 100644 --- a/src/flavors/job.js +++ b/src/flavors/job.js @@ -1,8 +1,8 @@ -import _ from 'highland'; import { printStartPipeline, printEndPipeline, faulty, faultyAsyncStream, faultify, splitObject, encryptEvent, + compact, } from '../utils'; import { scanSplitDynamoDB, querySplitDynamoDB, queryAllDynamoDB, batchGetDynamoDB, @@ -132,35 +132,27 @@ export const toCursorUpdateRequest = (rule) => faulty((uow) => ({ })); export const flushCursor = (rule) => (s) => { - let lastUow; - - const cursorStream = () => _([lastUow]) - .map(toCursorUpdateRequest(rule)) - .through(updateDynamoDB({ - ...rule, - updateRequestField: 'cursorUpdateRequest', - updateResponseField: 'cursorUpdateResponse', - })); - /* istanbul ignore else */ if (rule.toCursorUpdateRequest) { return s - .consume((err, x, push, next) => { - /* istanbul ignore if */ - if (err) { - push(err); - next(); - } else if (x === _.nil) { - if (lastUow) { - next(cursorStream()); - } else { - push(null, x); - } - } else { - lastUow = x; - push(null, x); - next(); - } + // Compact explicitly on PK here since we want to capture just the last event per PK in this + // invocation after the query split. This handles the case where multiple cursor events + // ended up in a single lambda invocation. + .through(compact({ ...rule, compact: true })) + .map(toCursorUpdateRequest(rule)) + .through(updateDynamoDB({ + ...rule, + updateRequestField: 'cursorUpdateRequest', + updateResponseField: 'cursorUpdateResponse', + })) + // Maintains backwards compatibility with how this used to manipulate the UOWs, + // duping the last uow. + .flatMap((uow) => { + const { batch, ...lastUow } = uow; + return [ + ...batch, + lastUow, + ]; }); } else { return s; diff --git a/test/unit/flavors/job.test.js b/test/unit/flavors/job.test.js index 073e2218..66e5cc38 100644 --- a/test/unit/flavors/job.test.js +++ b/test/unit/flavors/job.test.js @@ -255,6 +255,320 @@ describe('flavors/job.js', () => { .done(done); }); + it('should propagate cursors across multiple job triggers in a single invocation', (done) => { + sinon.stub(DynamoDBConnector.prototype, 'queryPage') + .onCall(0) + .resolves({ + LastEvaluatedKey: { + pk: '4', + sk: 'thing', + }, + Items: [ + { + pk: '3', + sk: 'thing', + name: 'thing 3', + }, + { + pk: '4', + sk: 'thing', + name: 'thing 4', + }, + ], + }) + .onCall(1) + .resolves({ + LastEvaluatedKey: { + pk: '6', + sk: 'thing', + }, + Items: [ + { + pk: '5', + sk: 'thing', + name: 'thing 5', + }, + { + pk: '6', + sk: 'thing', + name: 'thing 6', + }, + ], + }); + + const events = toDynamodbRecords([ + { + timestamp: 1572832694, + keys: { + pk: 'job-1', + sk: 'job', + }, + newImage: { + pk: 'job-1', + sk: 'job', + discriminator: 'job', + cursor: { + pk: '2', + sk: 'thing', + }, + }, + oldImage: { + pk: 'job-1', + sk: 'job', + discriminator: 'job', + }, + }, { + timestamp: 1572832694, + keys: { + pk: 'job-2', + sk: 'job', + }, + newImage: { + pk: 'job-2', + sk: 'job', + discriminator: 'job', + cursor: { + pk: '4', + sk: 'thing', + }, + }, + oldImage: { + pk: 'job-2', + sk: 'job', + discriminator: 'job', + }, + }, + ]); + + initialize({ + ...initializeFrom(rules), + }, { ...defaultOptions, AES: false }) + .assemble(fromDynamodb(events), false) + .collect() + // .tap((collected) => console.log(JSON.stringify(collected, null, 2))) + .tap((collected) => { + // 1 per query split result and 1 per cursor. + expect(collected.length).to.equal(6); + + // First pk cursor processing + expect(collected[0].pipeline).to.equal('job1-continued'); + expect(collected[0].querySplitRequest).to.be.deep.equal({ + ExclusiveStartKey: { + pk: '2', + sk: 'thing', + }, + ExpressionAttributeNames: { + '#discriminator': 'discriminator', + }, + ExpressionAttributeValues: { + ':discriminator': 'thing', + }, + Limit: 2, + }); + expect(collected[0].querySplitResponse).to.be.deep.equal({ + LastEvaluatedKey: { + pk: '4', + sk: 'thing', + }, + Item: { + pk: '3', + sk: 'thing', + name: 'thing 3', + }, + }); + expect(collected[0].emit).to.deep.equal({ + type: 'xyz', + raw: { + pk: '3', + sk: 'thing', + name: 'thing 3', + }, + tags: { + account: 'undefined', + region: 'us-west-2', + stage: 'undefined', + source: 'undefined', + functionname: 'undefined', + pipeline: 'job1-continued', + skip: true, + }, + }); + expect(collected[0].cursorUpdateRequest).to.be.undefined; + expect(collected[1].querySplitRequest).to.be.deep.equal({ + ExclusiveStartKey: { + pk: '2', + sk: 'thing', + }, + ExpressionAttributeNames: { + '#discriminator': 'discriminator', + }, + ExpressionAttributeValues: { + ':discriminator': 'thing', + }, + Limit: 2, + }); + expect(collected[1].querySplitResponse).to.be.deep.equal({ + LastEvaluatedKey: { + pk: '4', + sk: 'thing', + }, + Item: { + pk: '4', + sk: 'thing', + name: 'thing 4', + }, + }); + expect(collected[1].emit).to.deep.equal({ + type: 'xyz', + raw: { + pk: '4', + sk: 'thing', + name: 'thing 4', + }, + tags: { + account: 'undefined', + region: 'us-west-2', + stage: 'undefined', + source: 'undefined', + functionname: 'undefined', + pipeline: 'job1-continued', + skip: true, + }, + }); + expect(collected[1].cursorUpdateRequest).to.be.undefined; + expect(collected[2].cursorUpdateRequest).to.deep.equal({ + Key: { + pk: 'job-1', + sk: 'job', + }, + ExpressionAttributeNames: { + '#cursor': 'cursor', + '#timestamp': 'timestamp', + }, + ExpressionAttributeValues: { + ':timestamp': 1572832694000, + ':cursor': { + pk: '4', + sk: 'thing', + }, + }, + UpdateExpression: 'SET #cursor = :cursor, #timestamp = :timestamp', + ReturnValues: 'ALL_NEW', + ConditionExpression: 'attribute_not_exists(#timestamp) OR #timestamp < :timestamp', + }); + // End first uow cursor processing + + // Second pk cursor processing + expect(collected[3].pipeline).to.equal('job1-continued'); + expect(collected[3].querySplitRequest).to.be.deep.equal({ + ExclusiveStartKey: { + pk: '4', + sk: 'thing', + }, + ExpressionAttributeNames: { + '#discriminator': 'discriminator', + }, + ExpressionAttributeValues: { + ':discriminator': 'thing', + }, + Limit: 2, + }); + expect(collected[3].querySplitResponse).to.be.deep.equal({ + LastEvaluatedKey: { + pk: '6', + sk: 'thing', + }, + Item: { + pk: '5', + sk: 'thing', + name: 'thing 5', + }, + }); + expect(collected[3].emit).to.deep.equal({ + type: 'xyz', + raw: { + pk: '5', + sk: 'thing', + name: 'thing 5', + }, + tags: { + account: 'undefined', + region: 'us-west-2', + stage: 'undefined', + source: 'undefined', + functionname: 'undefined', + pipeline: 'job1-continued', + skip: true, + }, + }); + expect(collected[3].cursorUpdateRequest).to.be.undefined; + expect(collected[4].querySplitRequest).to.be.deep.equal({ + ExclusiveStartKey: { + pk: '4', + sk: 'thing', + }, + ExpressionAttributeNames: { + '#discriminator': 'discriminator', + }, + ExpressionAttributeValues: { + ':discriminator': 'thing', + }, + Limit: 2, + }); + expect(collected[4].querySplitResponse).to.be.deep.equal({ + LastEvaluatedKey: { + pk: '6', + sk: 'thing', + }, + Item: { + pk: '6', + sk: 'thing', + name: 'thing 6', + }, + }); + expect(collected[4].emit).to.deep.equal({ + type: 'xyz', + raw: { + pk: '6', + sk: 'thing', + name: 'thing 6', + }, + tags: { + account: 'undefined', + region: 'us-west-2', + stage: 'undefined', + source: 'undefined', + functionname: 'undefined', + pipeline: 'job1-continued', + skip: true, + }, + }); + expect(collected[4].cursorUpdateRequest).to.be.undefined; + expect(collected[5].cursorUpdateRequest).to.deep.equal({ + Key: { + pk: 'job-2', + sk: 'job', + }, + ExpressionAttributeNames: { + '#cursor': 'cursor', + '#timestamp': 'timestamp', + }, + ExpressionAttributeValues: { + ':timestamp': 1572832694000, + ':cursor': { + pk: '6', + sk: 'thing', + }, + }, + UpdateExpression: 'SET #cursor = :cursor, #timestamp = :timestamp', + ReturnValues: 'ALL_NEW', + ConditionExpression: 'attribute_not_exists(#timestamp) OR #timestamp < :timestamp', + }); + // End second uow cursor processing + }) + .done(done); + }); + it('should stop', (done) => { const events = toDynamodbRecords([ { From d3ab5fddaf7ab2fbcedeb286ad785dd9ee1634af Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Tue, 4 Nov 2025 10:58:30 -0500 Subject: [PATCH 2/4] Bump version. --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index fcd194f6..64a19091 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.1.14", + "version": "1.1.15", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.1.14", + "version": "1.1.15", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index ebd915b2..57b8ef06 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.14", + "version": "1.1.15", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", From cdf9149419ab3b3f41011869e76bf0372466cb0d Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Tue, 4 Nov 2025 14:18:59 -0500 Subject: [PATCH 3/4] Group by full key with configurable override. --- src/flavors/job.js | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/flavors/job.js b/src/flavors/job.js index b05ca711..de11017f 100644 --- a/src/flavors/job.js +++ b/src/flavors/job.js @@ -132,13 +132,25 @@ export const toCursorUpdateRequest = (rule) => faulty((uow) => ({ })); export const flushCursor = (rule) => (s) => { + const { + // By default group on a stringified version of the full key. If the key structure + // differs in a users particular implementation or they want to group by something + // else they can simply override this fn in their rule. + cursorKeyFn = (uow) => `pk:${uow.event.raw.new.pk}|sk:${uow.event.raw.new.sk}`, + } = rule; + /* istanbul ignore else */ if (rule.toCursorUpdateRequest) { return s // Compact explicitly on PK here since we want to capture just the last event per PK in this // invocation after the query split. This handles the case where multiple cursor events // ended up in a single lambda invocation. - .through(compact({ ...rule, compact: true })) + .through(compact({ + ...rule, + compact: { + group: (uow) => cursorKeyFn(uow), + }, + })) .map(toCursorUpdateRequest(rule)) .through(updateDynamoDB({ ...rule, From 503bee97d6880c801f2079e81235f4939d7ca543 Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Tue, 4 Nov 2025 14:22:11 -0500 Subject: [PATCH 4/4] Cleanup comment. --- src/flavors/job.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/flavors/job.js b/src/flavors/job.js index de11017f..d5a883ef 100644 --- a/src/flavors/job.js +++ b/src/flavors/job.js @@ -142,9 +142,6 @@ export const flushCursor = (rule) => (s) => { /* istanbul ignore else */ if (rule.toCursorUpdateRequest) { return s - // Compact explicitly on PK here since we want to capture just the last event per PK in this - // invocation after the query split. This handles the case where multiple cursor events - // ended up in a single lambda invocation. .through(compact({ ...rule, compact: {