From 13daa67109d6fc4c2d894cf34166c41969c7432e Mon Sep 17 00:00:00 2001 From: Cody Date: Sat, 25 Jan 2025 09:34:44 -0800 Subject: [PATCH] give rules control over throwing conditional check failures --- package-lock.json | 2 +- package.json | 2 +- src/connectors/dynamodb.js | 4 ++- src/sinks/dynamodb.js | 3 +- test/unit/flavors/update.test.js | 60 ++++++++++++++++++++++++++++++-- 5 files changed, 65 insertions(+), 6 deletions(-) diff --git a/package-lock.json b/package-lock.json index 019b9e92..b14ee77d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.0.29", + "version": "1.0.30", "lockfileVersion": 3, "requires": true, "packages": { diff --git a/package.json b/package.json index a0262dec..58187c62 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.0.29", + "version": "1.0.30", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", diff --git a/src/connectors/dynamodb.js b/src/connectors/dynamodb.js index 6f6b9a8a..038ded4c 100644 --- a/src/connectors/dynamodb.js +++ b/src/connectors/dynamodb.js @@ -27,6 +27,7 @@ class Connector { removeUndefinedValues = true, timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, retryConfig = defaultRetryConfig, + throwConditionFailure = false, additionalClientOpts = {}, ...opt }) { @@ -34,6 +35,7 @@ class Connector { this.tableName = tableName || /* istanbul ignore next */ 'undefined'; this.client = Connector.getClient(pipelineId, debug, convertEmptyValues, removeUndefinedValues, timeout, additionalClientOpts); this.retryConfig = retryConfig; + this.throwConditionFailure = throwConditionFailure; this.opt = opt; } @@ -73,7 +75,7 @@ class Connector { return this._sendCommand(new UpdateCommand(params), ctx) .catch((err) => { /* istanbul ignore else */ - if (err.name === 'ConditionalCheckFailedException') { + if (err.name === 'ConditionalCheckFailedException' && !this.throwConditionFailure) { return {}; } /* istanbul ignore next */ diff --git a/src/sinks/dynamodb.js b/src/sinks/dynamodb.js index a365dd0b..2e9a5415 100644 --- a/src/sinks/dynamodb.js +++ b/src/sinks/dynamodb.js @@ -59,11 +59,12 @@ export const updateDynamoDB = ({ parallel = Number(process.env.UPDATE_PARALLEL) || Number(process.env.PARALLEL) || 4, timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, removeUndefinedValues = true, + throwConditionFailure = false, step = 'save', ...opt } = {}) => { const connector = new Connector({ - pipelineId, debug, tableName, timeout, removeUndefinedValues, ...opt, + pipelineId, debug, tableName, timeout, removeUndefinedValues, throwConditionFailure, ...opt, }); const invoke = (uow) => { diff --git a/test/unit/flavors/update.test.js b/test/unit/flavors/update.test.js index fa71b299..b1ba689c 100644 --- a/test/unit/flavors/update.test.js +++ b/test/unit/flavors/update.test.js @@ -3,6 +3,9 @@ import { expect } from 'chai'; import sinon from 'sinon'; import { KmsConnector, MOCK_GEN_DK_RESPONSE } from 'aws-kms-ee'; +import { DynamoDBDocumentClient, UpdateCommand } from '@aws-sdk/lib-dynamodb'; +import { ConditionalCheckFailedException } from '@aws-sdk/client-dynamodb'; +import { mockClient } from 'aws-sdk-client-mock'; import { initialize, initializeFrom, @@ -17,16 +20,21 @@ import { import { updateExpression, timestampCondition, } from '../../../src/sinks/dynamodb'; -import { DynamoDBConnector } from '../../../src/connectors'; +import { DynamoDBConnector, EventBridgeConnector } from '../../../src/connectors'; import { update } from '../../../src/flavors/update'; describe('flavors/update.js', () => { + let mockDdb; + beforeEach(() => { sinon.stub(DynamoDBConnector.prototype, 'update').resolves({}); }); - afterEach(sinon.restore); + afterEach(() => { + sinon.restore(); + mockDdb?.restore(); + }); it('should execute', (done) => { sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]); @@ -157,6 +165,54 @@ describe('flavors/update.js', () => { }) .done(done); }); + + it('should optionally throw conditional check', (done) => { + sinon.restore(); + sinon.stub(EventBridgeConnector.prototype, 'putEvents').resolves({}); + mockDdb = mockClient(DynamoDBDocumentClient); + mockDdb.on(UpdateCommand).rejects(new ConditionalCheckFailedException({})); + + const events = toDynamodbRecords([ + { + timestamp: 1572832690, + keys: { + pk: '1', + sk: 'thing', + }, + newImage: { + pk: '1', + sk: 'thing', + discriminator: 'thing', + ttl: 1549053422, + timestamp: 1548967022000, + }, + }, + ]); + + const rule = { + id: 'updateThrow', + flavor: update, + eventType: /thing-*/, + toUpdateRequest: () => ({}), + throwConditionFailure: true, + }; + + initialize({ + ...initializeFrom([ + rule, + ]), + }, { ...defaultOptions, AES: false }) + .assemble(fromDynamodb(events), true) + .collect() + // .tap((collected) => console.log(JSON.stringify(collected, null, 2))) + .tap((collected) => { + expect(collected.length).to.equal(1); + expect(collected[0].event.tags.pipeline).to.equal('updateThrow'); + expect(collected[0].event.type).to.equal('fault'); + expect(collected[0].event.err.name).to.equal('ConditionalCheckFailedException'); + }) + .done(done); + }); }); const toUpdateRequest = (uow) => ({