From 5708303da7df6de8e6740d4ba9c59adcd34b7b89 Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Wed, 18 Feb 2026 16:09:07 -0500 Subject: [PATCH 1/4] Optionally provide a fallback update request to the updateDynamoDB sink. --- src/flavors/update.js | 8 + src/sinks/dynamodb.js | 16 +- .../flavors/materializeTimestream.test.js | 1 - test/unit/flavors/update.test.js | 158 ++++++- test/unit/sinks/dynamodb.test.js | 408 ++++++++++++------ 5 files changed, 450 insertions(+), 141 deletions(-) diff --git a/src/flavors/update.js b/src/flavors/update.js index 3464d506..9a1700f5 100644 --- a/src/flavors/update.js +++ b/src/flavors/update.js @@ -41,6 +41,9 @@ export const update = (rule) => (s) => s // eslint-disable-line import/prefer-de .map(toUpdateRequest(rule)) .parallel(rule.parallel || Number(process.env.PARALLEL) || 4) + .map(toFallbackUpdateRequest(rule)) + .parallel(rule.parallel || Number(process.env.PARALLEL) || 4) + .through(updateDynamoDB(rule)) .tap(printEndPipeline); @@ -68,3 +71,8 @@ const toUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({ ...uow, updateRequest: await faultify(rule.toUpdateRequest)(uow, rule), })); + +const toFallbackUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({ + ...uow, + fallbackUpdateRequest: rule.toFallbackUpdateRequest ? await faultify(rule.toFallbackUpdateRequest)(uow, rule) : undefined, +})); diff --git a/src/sinks/dynamodb.js b/src/sinks/dynamodb.js index 452b40ed..261042fe 100644 --- a/src/sinks/dynamodb.js +++ b/src/sinks/dynamodb.js @@ -1,5 +1,6 @@ import _ from 'highland'; +import { isEmpty } from 'lodash'; import Connector from '../connectors/dynamodb'; import { rejectWithFault } from '../utils/faults'; @@ -81,6 +82,7 @@ export const updateDynamoDB = ({ tableName = process.env.ENTITY_TABLE_NAME || process.env.EVENT_TABLE_NAME, updateRequestField = 'updateRequest', updateResponseField = 'updateResponse', + fallbackUpdateRequestField = 'fallbackUpdateRequest', parallel = Number(process.env.UPDATE_PARALLEL) || Number(process.env.PARALLEL) || 4, timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, removeUndefinedValues = true, @@ -95,11 +97,19 @@ export const updateDynamoDB = ({ const invoke = (uow) => { if (!uow[updateRequestField]) return _(Promise.resolve(uow)); - const p = () => connector.update(uow[updateRequestField], uow) - .then((updateResponse) => ({ ...uow, [updateResponseField]: updateResponse })) + const p = (updateRequest, isFallback) => () => connector.update(updateRequest, uow) + .then((updateResponse) => { + if (isEmpty(updateResponse) && uow[fallbackUpdateRequestField] && !isFallback) { + // If its empty, that indicates a conditional write failure, in that case we want to run the fallback + // update, if present. + return p(uow[fallbackUpdateRequestField], true)(); + } else { + return { ...uow, [updateResponseField]: updateResponse }; + } + }) .catch(rejectWithFault(uow)); - return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream + return _(uow.metrics?.w(p(uow[updateRequestField], false), step) || p(uow[updateRequestField], false)()); // wrap promise in a stream }; return (s) => s diff --git a/test/unit/flavors/materializeTimestream.test.js b/test/unit/flavors/materializeTimestream.test.js index d947ec5b..e8a262ec 100644 --- a/test/unit/flavors/materializeTimestream.test.js +++ b/test/unit/flavors/materializeTimestream.test.js @@ -4,7 +4,6 @@ import sinon from 'sinon'; import { initialize, initializeFrom, - ttl, } from '../../../src'; import { toKinesisRecords, fromKinesis } from '../../../src/from/kinesis'; diff --git a/test/unit/flavors/update.test.js b/test/unit/flavors/update.test.js index b88c31b8..8801cf2d 100644 --- a/test/unit/flavors/update.test.js +++ b/test/unit/flavors/update.test.js @@ -7,6 +7,7 @@ import { DynamoDBDocumentClient, UpdateCommand } from '@aws-sdk/lib-dynamodb'; import { ConditionalCheckFailedException } from '@aws-sdk/client-dynamodb'; import { mockClient } from 'aws-sdk-client-mock'; +import { cloneDeep } from 'lodash'; import { initialize, initializeFrom, } from '../../../src'; @@ -28,7 +29,10 @@ describe('flavors/update.js', () => { let mockDdb; beforeEach(() => { - sinon.stub(DynamoDBConnector.prototype, 'update').resolves({}); + sinon.stub(DynamoDBConnector.prototype, 'update') + .onFirstCall().resolves({}) + .onSecondCall() + .resolves({}); }); afterEach(() => { @@ -214,6 +218,158 @@ describe('flavors/update.js', () => { .done(done); }); + it('should optionally run fallback update request', (done) => { + sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]); + sinon.stub(DynamoDBConnector.prototype, 'batchGet').resolves({ + Responses: { + undefined: [{ + pk: '2', + sk: 'thing', + discriminator: 'thing', + name: 'thing2', + }], + }, + UnprocessedKeys: {}, + }); + + sinon.stub(KmsConnector.prototype, 'generateDataKey').resolves(MOCK_GEN_DK_RESPONSE); + + const events = toDynamodbRecords([ + { + timestamp: 1572832690, + keys: { + pk: '1', + sk: 'thing', + }, + newImage: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + name: 'Thing One', + description: 'This is thing one', + otherThing: 'thing|2', + ttl: 1549053422, + timestamp: 1548967022000, + }, + }, + { + timestamp: 1572832690, + keys: { + pk: '1', + sk: 'other', + }, + newImage: { + pk: '1', + sk: 'other', + discriminator: 'other', + name: 'Other One', + description: 'This is other one', + ttl: 1549053422, + timestamp: 1548967022000, + }, + }, + ]); + const rulesWithFallbackUpdateRequest = cloneDeep([rules[0]]); + rulesWithFallbackUpdateRequest[0].toFallbackUpdateRequest = (uow) => ({ + Key: { + pk: uow.event.raw.new.pk, + sk: uow.event.raw.new.sk, + }, + ...updateExpression({ + fallbackUpdate: true, + }), + }); + + initialize({ + ...initializeFrom(rulesWithFallbackUpdateRequest), + }, { ...defaultOptions, AES: false }) + .assemble(fromDynamodb(events), false) + .collect() + // .tap((collected) => console.log(JSON.stringify(collected, null, 2))) + .tap((collected) => { + expect(collected.length).to.equal(1); + expect(collected[0].pipeline).to.equal('update1'); + expect(collected[0].event.type).to.equal('thing-created'); + expect(collected[0].batchGetRequest).to.deep.equal({ + RequestItems: { + undefined: { + Keys: [{ + pk: '2', + sk: 'thing', + }], + }, + }, + }); + expect(collected[0].batchGetResponse).to.deep.equal({ + Responses: { + undefined: [ + { + pk: '2', + sk: 'thing', + discriminator: 'thing', + name: 'thing2', + }, + ], + }, + UnprocessedKeys: {}, + }); + expect(collected[0].updateRequest).to.deep.equal({ + Key: { + pk: '1', + sk: 'thing', + }, + ExpressionAttributeNames: { + '#pk': 'pk', + '#sk': 'sk', + '#discriminator': 'discriminator', + '#name': 'name', + '#description': 'description', + '#otherThing': 'otherThing', + '#ttl': 'ttl', + '#timestamp': 'timestamp', + }, + ExpressionAttributeValues: { + ':pk': '1', + ':sk': 'thing', + ':discriminator': 'thing', + ':name': 'Thing One', + ':description': 'This is thing one', + ':ttl': 1549053422, + ':timestamp': 1548967022000, + ':otherThing': { + pk: '2', + sk: 'thing', + discriminator: 'thing', + name: 'thing2', + }, + }, + UpdateExpression: 'SET #pk = :pk, #sk = :sk, #discriminator = :discriminator, #name = :name, #description = :description, #otherThing = :otherThing, #ttl = :ttl, #timestamp = :timestamp', + ReturnValues: 'ALL_NEW', + ConditionExpression: 'attribute_not_exists(#timestamp) OR #timestamp < :timestamp', + }); + + expect(collected[0].fallbackUpdateRequest).to.deep.equal({ + Key: { + pk: '1', + sk: 'thing', + }, + ExpressionAttributeNames: { + '#fallbackUpdate': 'fallbackUpdate', + }, + ExpressionAttributeValues: { + ':fallbackUpdate': true, + }, + UpdateExpression: 'SET #fallbackUpdate = :fallbackUpdate', + ReturnValues: 'ALL_NEW', + }); + + expect(collected[0].updateResponse).to.deep.equal({}); + expect(collected[0].queryRequest).to.be.undefined; + expect(collected[0].getRequest).to.be.undefined; + }) + .done(done); + }); + it('should fault on error', (done) => { sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]); sinon.stub(DynamoDBConnector.prototype, 'batchGet').resolves({ diff --git a/test/unit/sinks/dynamodb.test.js b/test/unit/sinks/dynamodb.test.js index 6d0a00be..7919db95 100644 --- a/test/unit/sinks/dynamodb.test.js +++ b/test/unit/sinks/dynamodb.test.js @@ -19,157 +19,159 @@ describe('sinks/dynamodb.js', () => { expect(ttl(1540454400000, 30)).to.equal(1543046400); }); - it('should calculate updateExpression', () => { - expect(updateExpression({ - 'id': '2f8ac025-d9e3-48f9-ba80-56487ddf0b89', - 'name': 'Thing One', - 'description': 'This is thing one.', - 'status': undefined, - 'status2': null, - 'discriminator': 'thing', - 'latched': true, - 'ttl': ttl(1540454400000, 30), - 'timestamp': 1540454400000, - 'some unsafe att name': true, - 'some unsafe att name to delete': null, - })).to.deep.equal({ - ExpressionAttributeNames: { - '#description': 'description', - '#discriminator': 'discriminator', - '#id': 'id', - '#latched': 'latched', - '#name': 'name', - '#some_x20_unsafe_x20_att_x20_name': 'some unsafe att name', - '#some_x20_unsafe_x20_att_x20_name_x20_to_x20_delete': 'some unsafe att name to delete', - '#status2': 'status2', - '#timestamp': 'timestamp', - '#ttl': 'ttl', - }, - ExpressionAttributeValues: { - ':description': 'This is thing one.', - ':discriminator': 'thing', - ':id': '2f8ac025-d9e3-48f9-ba80-56487ddf0b89', - ':latched': true, - ':name': 'Thing One', - ':some_x20_unsafe_x20_att_x20_name': true, - ':timestamp': 1540454400000, - ':ttl': 1543046400, - }, - UpdateExpression: 'SET #id = :id, #name = :name, #description = :description, #discriminator = :discriminator, #latched = :latched, #ttl = :ttl, #timestamp = :timestamp, #some_x20_unsafe_x20_att_x20_name = :some_x20_unsafe_x20_att_x20_name REMOVE #status2, #some_x20_unsafe_x20_att_x20_name_x20_to_x20_delete', - ReturnValues: 'ALL_NEW', + describe('updateExpression', () => { + it('should calculate updateExpression', () => { + expect(updateExpression({ + 'id': '2f8ac025-d9e3-48f9-ba80-56487ddf0b89', + 'name': 'Thing One', + 'description': 'This is thing one.', + 'status': undefined, + 'status2': null, + 'discriminator': 'thing', + 'latched': true, + 'ttl': ttl(1540454400000, 30), + 'timestamp': 1540454400000, + 'some unsafe att name': true, + 'some unsafe att name to delete': null, + })).to.deep.equal({ + ExpressionAttributeNames: { + '#description': 'description', + '#discriminator': 'discriminator', + '#id': 'id', + '#latched': 'latched', + '#name': 'name', + '#some_x20_unsafe_x20_att_x20_name': 'some unsafe att name', + '#some_x20_unsafe_x20_att_x20_name_x20_to_x20_delete': 'some unsafe att name to delete', + '#status2': 'status2', + '#timestamp': 'timestamp', + '#ttl': 'ttl', + }, + ExpressionAttributeValues: { + ':description': 'This is thing one.', + ':discriminator': 'thing', + ':id': '2f8ac025-d9e3-48f9-ba80-56487ddf0b89', + ':latched': true, + ':name': 'Thing One', + ':some_x20_unsafe_x20_att_x20_name': true, + ':timestamp': 1540454400000, + ':ttl': 1543046400, + }, + UpdateExpression: 'SET #id = :id, #name = :name, #description = :description, #discriminator = :discriminator, #latched = :latched, #ttl = :ttl, #timestamp = :timestamp, #some_x20_unsafe_x20_att_x20_name = :some_x20_unsafe_x20_att_x20_name REMOVE #status2, #some_x20_unsafe_x20_att_x20_name_x20_to_x20_delete', + ReturnValues: 'ALL_NEW', + }); }); - }); - it('should calculate updateExpression adding values to a set', () => { - const result = updateExpression({ - tags: new Set(['a', 'b']), - }); + it('should calculate updateExpression adding values to a set', () => { + const result = updateExpression({ + tags: new Set(['a', 'b']), + }); - expect(normalizeObj(result)).to.deep.equal({ - ExpressionAttributeNames: { - '#tags': 'tags', - }, - ExpressionAttributeValues: { - ':tags': ['a', 'b'], - }, - UpdateExpression: 'ADD #tags :tags', - ReturnValues: 'ALL_NEW', + expect(normalizeObj(result)).to.deep.equal({ + ExpressionAttributeNames: { + '#tags': 'tags', + }, + ExpressionAttributeValues: { + ':tags': ['a', 'b'], + }, + UpdateExpression: 'ADD #tags :tags', + ReturnValues: 'ALL_NEW', + }); }); - }); - it('should calculate updateExpression removing values from a set', () => { - const result = updateExpression({ - tags_delete: new Set(['x', 'y']), - }); + it('should calculate updateExpression removing values from a set', () => { + const result = updateExpression({ + tags_delete: new Set(['x', 'y']), + }); - expect(normalizeObj(result)).to.deep.equal({ - ExpressionAttributeNames: { - '#tags': 'tags', - }, - ExpressionAttributeValues: { - ':tags_delete': ['x', 'y'], - }, - UpdateExpression: 'DELETE #tags :tags_delete', - ReturnValues: 'ALL_NEW', + expect(normalizeObj(result)).to.deep.equal({ + ExpressionAttributeNames: { + '#tags': 'tags', + }, + ExpressionAttributeValues: { + ':tags_delete': ['x', 'y'], + }, + UpdateExpression: 'DELETE #tags :tags_delete', + ReturnValues: 'ALL_NEW', + }); }); - }); - it('should calculate updateExpression removing values from a set when attribute names have illegal characters if used as an alias', () => { - const result = updateExpression({ - 'some|tags_delete': new Set(['x', 'y']), - 'a-b': true, - 'a--b': false, - 'a|b': 1, - }); + it('should calculate updateExpression removing values from a set when attribute names have illegal characters if used as an alias', () => { + const result = updateExpression({ + 'some|tags_delete': new Set(['x', 'y']), + 'a-b': true, + 'a--b': false, + 'a|b': 1, + }); - expect(normalizeObj(result)).to.deep.equal({ - ExpressionAttributeNames: { - '#some_x7c_tags': 'some|tags', - '#a_x2d_b': 'a-b', - '#a_x2d__x2d_b': 'a--b', - '#a_x7c_b': 'a|b', - }, - ExpressionAttributeValues: { - ':some_x7c_tags_delete': [ - 'x', - 'y', - ], - ':a_x2d_b': true, - ':a_x2d__x2d_b': false, - ':a_x7c_b': 1, - }, - UpdateExpression: 'SET #a_x2d_b = :a_x2d_b, #a_x2d__x2d_b = :a_x2d__x2d_b, #a_x7c_b = :a_x7c_b DELETE #some_x7c_tags :some_x7c_tags_delete', - ReturnValues: 'ALL_NEW', + expect(normalizeObj(result)).to.deep.equal({ + ExpressionAttributeNames: { + '#some_x7c_tags': 'some|tags', + '#a_x2d_b': 'a-b', + '#a_x2d__x2d_b': 'a--b', + '#a_x7c_b': 'a|b', + }, + ExpressionAttributeValues: { + ':some_x7c_tags_delete': [ + 'x', + 'y', + ], + ':a_x2d_b': true, + ':a_x2d__x2d_b': false, + ':a_x7c_b': 1, + }, + UpdateExpression: 'SET #a_x2d_b = :a_x2d_b, #a_x2d__x2d_b = :a_x2d__x2d_b, #a_x7c_b = :a_x7c_b DELETE #some_x7c_tags :some_x7c_tags_delete', + ReturnValues: 'ALL_NEW', + }); }); - }); - it('should wrap calculate updateExpression wrapping a delete set value in a set', () => { - const result = updateExpression({ - tags_delete: 'x', - }); + it('should wrap calculate updateExpression wrapping a delete set value in a set', () => { + const result = updateExpression({ + tags_delete: 'x', + }); - expect(normalizeObj(result)).to.deep.equal({ - ExpressionAttributeNames: { - '#tags': 'tags', - }, - ExpressionAttributeValues: { - ':tags_delete': ['x'], - }, - UpdateExpression: 'DELETE #tags :tags_delete', - ReturnValues: 'ALL_NEW', + expect(normalizeObj(result)).to.deep.equal({ + ExpressionAttributeNames: { + '#tags': 'tags', + }, + ExpressionAttributeValues: { + ':tags_delete': ['x'], + }, + UpdateExpression: 'DELETE #tags :tags_delete', + ReturnValues: 'ALL_NEW', + }); }); - }); - it('should calculate complex updateExpression using SET, REMOVE, ADD, and DELETE', () => { - const result = updateExpression({ - id: '123', - name: 'Complex Thing', - description: null, - tags: new Set(['blue', 'green']), - categories: new Set(['a', 'b']), - tags_delete: 'red', - categories_delete: new Set(['x', 'y']), - ignoredField: undefined, - }); + it('should calculate complex updateExpression using SET, REMOVE, ADD, and DELETE', () => { + const result = updateExpression({ + id: '123', + name: 'Complex Thing', + description: null, + tags: new Set(['blue', 'green']), + categories: new Set(['a', 'b']), + tags_delete: 'red', + categories_delete: new Set(['x', 'y']), + ignoredField: undefined, + }); - expect(normalizeObj(result)).to.deep.equal({ - ExpressionAttributeNames: { - '#id': 'id', - '#name': 'name', - '#description': 'description', - '#tags': 'tags', - '#categories': 'categories', - }, - ExpressionAttributeValues: { - ':id': '123', - ':name': 'Complex Thing', - ':tags': ['blue', 'green'], - ':categories': ['a', 'b'], - ':tags_delete': ['red'], - ':categories_delete': ['x', 'y'], - }, - UpdateExpression: 'SET #id = :id, #name = :name REMOVE #description ADD #tags :tags, #categories :categories DELETE #tags :tags_delete, #categories :categories_delete', - ReturnValues: 'ALL_NEW', + expect(normalizeObj(result)).to.deep.equal({ + ExpressionAttributeNames: { + '#id': 'id', + '#name': 'name', + '#description': 'description', + '#tags': 'tags', + '#categories': 'categories', + }, + ExpressionAttributeValues: { + ':id': '123', + ':name': 'Complex Thing', + ':tags': ['blue', 'green'], + ':categories': ['a', 'b'], + ':tags_delete': ['red'], + ':categories_delete': ['x', 'y'], + }, + UpdateExpression: 'SET #id = :id, #name = :name REMOVE #description ADD #tags :tags, #categories :categories DELETE #tags :tags_delete, #categories :categories_delete', + ReturnValues: 'ALL_NEW', + }); }); }); @@ -217,6 +219,140 @@ describe('sinks/dynamodb.js', () => { .done(done); }); + it('should call fallback update after conditional update failure if provided', (done) => { + const stub = sinon.stub(Connector.prototype, 'update') + .onFirstCall().resolves({}) + .onSecondCall() + .resolves({ + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing2', + }); + + const uows = [{ + updateRequest: { + Key: { + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing', + }, + }, + fallbackUpdateRequest: { + Key: { + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing2', + }, + }, + }, { + updateRequest: undefined, + }]; + + _(uows) + .through(updateDynamoDB()) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + + expect(collected.length).to.equal(2); + expect(stub).to.have.been.calledWith({ + Key: { + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing', + }, + }); + expect(stub).to.have.been.calledWith({ + Key: { + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing2', + }, + }); + expect(collected[0].updateResponse).to.deep.equal({ + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing2', + }); + }) + .done(done); + }); + + it('should call not fallback update after conditional update failure if not provided', (done) => { + const stub = sinon.stub(Connector.prototype, 'update') + .onFirstCall().resolves({}); + + const uows = [{ + updateRequest: { + Key: { + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing', + }, + }, + }, { + updateRequest: undefined, + }]; + + _(uows) + .through(updateDynamoDB()) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + + expect(collected.length).to.equal(2); + expect(stub).to.have.callCount(1); + expect(stub).to.have.been.calledWith({ + Key: { + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing', + }, + }); + expect(collected[0].updateResponse).to.deep.equal({}); + }) + .done(done); + }); + + it('should return {} if both update request and fallback update request fail due to conditional update failures', (done) => { + const stub = sinon.stub(Connector.prototype, 'update') + .onFirstCall().resolves({}) + .onSecondCall() + .resolves({}); + + const uows = [{ + updateRequest: { + Key: { + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing', + }, + }, + fallbackUpdateRequest: { + Key: { + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing2', + }, + }, + }, { + updateRequest: undefined, + }]; + + _(uows) + .through(updateDynamoDB()) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + + expect(collected.length).to.equal(2); + expect(stub).to.have.been.calledWith({ + Key: { + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing', + }, + }); + expect(stub).to.have.been.calledWith({ + Key: { + pk: '72363701-fd38-4887-94b9-e8f8aecf6208', + sk: 'thing2', + }, + }); + expect(collected[0].updateResponse).to.deep.equal({}); + }) + .done(done); + }); + it('should call put', (done) => { const stub = sinon.stub(Connector.prototype, 'put').resolves({}); From 01a928ec6e55c9ca28f3b98c5d3bee45215a01d1 Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Wed, 18 Feb 2026 16:09:36 -0500 Subject: [PATCH 2/4] Bump version. --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 44be0c30..d4c80dc0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.1.19", + "version": "1.1.20", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.1.19", + "version": "1.1.20", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index e1f92e57..567f4563 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.19", + "version": "1.1.20", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", From 9cc83a6e1e52e05f1223567ba1817d60cc7e382b Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Wed, 18 Feb 2026 17:00:17 -0500 Subject: [PATCH 3/4] Update materialize flavor. --- src/flavors/materialize.js | 8 +++ test/unit/flavors/materialize.test.js | 94 +++++++++++++++++++++++++++ test/unit/metrics/index.test.js | 32 ++++----- 3 files changed, 118 insertions(+), 16 deletions(-) diff --git a/src/flavors/materialize.js b/src/flavors/materialize.js index 7a86d033..f15780f3 100644 --- a/src/flavors/materialize.js +++ b/src/flavors/materialize.js @@ -26,6 +26,9 @@ export const materialize = (rule) => (s) => s // eslint-disable-line import/pref .map(toUpdateRequest(rule)) .parallel(rule.parallel || Number(process.env.PARALLEL) || 4) + .map(toFallbackUpdateRequest(rule)) + .parallel(rule.parallel || Number(process.env.PARALLEL) || 4) + .through(updateDynamoDB(rule)) .tap(printEndPipeline); @@ -37,3 +40,8 @@ const toUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({ ...uow, updateRequest: await faultify(rule.toUpdateRequest)(uow, rule), })); + +const toFallbackUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({ + ...uow, + fallbackUpdateRequest: rule.toFallbackUpdateRequest ? await faultify(rule.toFallbackUpdateRequest)(uow, rule) : undefined, +})); diff --git a/test/unit/flavors/materialize.test.js b/test/unit/flavors/materialize.test.js index 13f5ba42..90f5e5e4 100644 --- a/test/unit/flavors/materialize.test.js +++ b/test/unit/flavors/materialize.test.js @@ -2,6 +2,7 @@ import 'mocha'; import { expect } from 'chai'; import sinon from 'sinon'; +import { cloneDeep } from 'lodash'; import { initialize, initializeFrom, ttl, @@ -92,6 +93,99 @@ describe('flavors/materialize.js', () => { }) .done(done); }); + + it('should optionally call the fallback update request', (done) => { + const events = toKinesisRecords([ + { + type: 'm1', + timestamp: 1548967022000, + thing: { + id: '1', + name: 'Thing One', + description: 'This is thing one', + }, + }, + { + type: 'split', + timestamp: 1548967022000, + root: { + things: [{ + id: '2', + name: 'Thing One', + description: 'This is thing one', + }, { + id: '3', + name: 'Thing One', + description: 'This is thing one', + }], + }, + }, + ]); + + const ruleWithFallbackUpdateRequest = cloneDeep(rules[0]); + ruleWithFallbackUpdateRequest.toFallbackUpdateRequest = (uow) => ({ + Key: { + pk: uow.event.thing.id, + sk: 'thing', + }, + ...updateExpression({ + fallbackUpdate: true, + }), + }); + + initialize({ + ...initializeFrom([ruleWithFallbackUpdateRequest]), + }) + .assemble(fromKinesis(events), false) + .collect() + // .tap((collected) => console.log(JSON.stringify(collected, null, 2))) + .tap((collected) => { + expect(collected.length).to.equal(1); + expect(collected[0].pipeline).to.equal('mv1'); + expect(collected[0].event.type).to.equal('m1'); + expect(collected[0].updateRequest).to.deep.equal({ + Key: { + pk: '1', + sk: 'thing', + }, + ExpressionAttributeNames: { + '#id': 'id', + '#name': 'name', + '#description': 'description', + + '#discriminator': 'discriminator', + '#ttl': 'ttl', + '#timestamp': 'timestamp', + }, + ExpressionAttributeValues: { + ':id': '1', + ':name': 'Thing One', + ':description': 'This is thing one', + ':discriminator': 'thing', + ':ttl': 1549053422, + ':timestamp': 1548967022000, + }, + UpdateExpression: 'SET #id = :id, #name = :name, #description = :description, #discriminator = :discriminator, #ttl = :ttl, #timestamp = :timestamp', + ReturnValues: 'ALL_NEW', + ConditionExpression: 'attribute_not_exists(#timestamp) OR #timestamp < :timestamp', + }); + expect(collected[0].fallbackUpdateRequest).to.deep.equal({ + Key: { + pk: '1', + sk: 'thing', + }, + ExpressionAttributeNames: { + '#fallbackUpdate': 'fallbackUpdate', + }, + ExpressionAttributeValues: { + ':fallbackUpdate': true, + }, + UpdateExpression: 'SET #fallbackUpdate = :fallbackUpdate', + ReturnValues: 'ALL_NEW', + }); + }) + .done(done); + }); }); const toUpdateRequest = (uow) => ({ diff --git a/test/unit/metrics/index.test.js b/test/unit/metrics/index.test.js index f814b67e..1f293c2e 100644 --- a/test/unit/metrics/index.test.js +++ b/test/unit/metrics/index.test.js @@ -187,17 +187,17 @@ describe('metrics/index.js', () => { count: 3, }, 'p1|save|stream.pipeline.io.wait.time': { - average: 20, - min: 14, - max: 26, - sum: 60, + average: 22, + min: 16, + max: 28, + sum: 66, count: 3, }, 'p1|save|stream.pipeline.io.time': { - average: 8, - min: 8, - max: 8, - sum: 24, + average: 6, + min: 6, + max: 6, + sum: 18, count: 3, }, 'p1|stream.pipeline.time': { @@ -229,17 +229,17 @@ describe('metrics/index.js', () => { count: 1, }, 'p2|get|stream.pipeline.io.wait.time': { - average: 8, - min: 8, - max: 8, - sum: 8, + average: 2, + min: 2, + max: 2, + sum: 2, count: 1, }, 'p2|get|stream.pipeline.io.time': { - average: 14, - min: 14, - max: 14, - sum: 14, + average: 20, + min: 20, + max: 20, + sum: 20, count: 1, }, 'p2|publish|stream.pipeline.io.wait.time': { From dc961c942e731af689ad399d4d381717720e356d Mon Sep 17 00:00:00 2001 From: Peter Myers Date: Thu, 19 Feb 2026 08:30:10 -0500 Subject: [PATCH 4/4] Wrap fallback promise with metrics. --- src/sinks/dynamodb.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/dynamodb.js b/src/sinks/dynamodb.js index 261042fe..ccfa01d4 100644 --- a/src/sinks/dynamodb.js +++ b/src/sinks/dynamodb.js @@ -102,7 +102,7 @@ export const updateDynamoDB = ({ if (isEmpty(updateResponse) && uow[fallbackUpdateRequestField] && !isFallback) { // If its empty, that indicates a conditional write failure, in that case we want to run the fallback // update, if present. - return p(uow[fallbackUpdateRequestField], true)(); + return uow.metrics?.w(p(uow[fallbackUpdateRequestField], true)) || p(uow[fallbackUpdateRequestField], true)(); } else { return { ...uow, [updateResponseField]: updateResponse }; }