diff --git a/package-lock.json b/package-lock.json index 1a10acb..1513527 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.1.11", + "version": "1.1.12", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.1.11", + "version": "1.1.12", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index 3e334c6..016be26 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.11", + "version": "1.1.12", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", diff --git a/src/connectors/firehose.js b/src/connectors/firehose.js index 6a5c5ce..c63c406 100644 --- a/src/connectors/firehose.js +++ b/src/connectors/firehose.js @@ -1,8 +1,13 @@ /* eslint import/no-extraneous-dependencies: ["error", {"devDependencies": true}] */ import { FirehoseClient, PutRecordBatchCommand } from '@aws-sdk/client-firehose'; import { NodeHttpHandler } from '@smithy/node-http-handler'; +import { ConfiguredRetryStrategy } from '@smithy/util-retry'; import Promise from 'bluebird'; + import { omit, pick } from 'lodash'; +import { + defaultRetryConfig, wait, getDelay, assertMaxRetries, defaultBackoffDelay, +} from '../utils/retry'; import { defaultDebugLogger } from '../utils/log'; class Connector { @@ -11,12 +16,14 @@ class Connector { pipelineId, deliveryStreamName = process.env.DELIVERY_STREAM_NAME, timeout = Number(process.env.FIREHOSE_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, + retryConfig = defaultRetryConfig, additionalClientOpts = {}, ...opt }) { this.debug = (msg) => debug('%j', msg); this.deliveryStreamName = deliveryStreamName || 'undefined'; this.client = Connector.getClient(pipelineId, debug, timeout, additionalClientOpts); + this.retryConfig = retryConfig; this.opt = opt; } @@ -33,6 +40,7 @@ class Connector { connectionTimeout: timeout, ...addlRequestHandlerOpts, }), + retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay), logger: defaultDebugLogger(debug), ...addlClientOpts, }); @@ -46,7 +54,23 @@ class Connector { ...inputParams, }; - return this._sendCommand(new PutRecordBatchCommand(params), ctx); + return this._putRecordBatch(params, [], ctx); + } + + _putRecordBatch(params, attempts, ctx) { + assertMaxRetries(attempts, this.retryConfig.maxRetries); + + return wait(getDelay(this.retryConfig.retryWait, attempts.length)) + .then(() => this._sendCommand(new PutRecordBatchCommand(params)) + .tap(this.debug) + .tapCatch(this.debug) + .then((resp) => { + if (resp.FailedPutCount > 0) { + return this._putRecordBatch(unprocessed(params, resp), [...attempts, resp]); + } else { + return accumlate(attempts, resp); + } + })); } _sendCommand(command, ctx) { @@ -58,3 +82,17 @@ class Connector { } export default Connector; + +const unprocessed = (params, resp) => ({ + ...params, + Records: params.Records.filter((e, i) => resp.RequestResponses[i].ErrorCode), +}); + +const accumlate = (attempts, resp) => attempts.reduceRight((a, c) => ({ + ...a, + RequestResponses: [ + ...c.RequestResponses.filter((r) => !r.ErrorCode), + ...a.RequestResponses.filter((r) => !r.ErrorCode), + ], + attempts: [...attempts, resp], +}), resp); diff --git a/test/unit/connectors/firehose.test.js b/test/unit/connectors/firehose.test.js index 082d0bb..e3bd92a 100644 --- a/test/unit/connectors/firehose.test.js +++ b/test/unit/connectors/firehose.test.js @@ -51,4 +51,117 @@ describe('connectors/firehose.js', () => { }); expect(data).to.deep.equal({}); }); + + it('should retry', async () => { + const responses = [ + { RequestResponses: [{ SequenceNumber: '1' }, { ErrorCode: 'X' }, { ErrorCode: 'X' }], FailedPutCount: 2 }, + { RequestResponses: [{ SequenceNumber: '2' }, { ErrorCode: 'X' }], FailedPutCount: 1 }, + { RequestResponses: [{ SequenceNumber: '3' }], FailedPutCount: 0 }, + ]; + + const spy = sinon.spy((_) => responses.shift()); + mockFirehose.on(PutRecordBatchCommand).callsFake(spy); + + const inputParams = { + Records: [ + { + Data: Buffer.from(JSON.stringify({ type: 't1' })), + }, + { + Data: Buffer.from(JSON.stringify({ type: 't2' })), + }, + { + Data: Buffer.from(JSON.stringify({ type: 't3' })), + }, + ], + }; + + const data = await new Connector({ + debug: debug('firehose'), + deliveryStreamName: 'ds1', + }).putRecordBatch(inputParams); + + expect(spy).to.have.been.calledWith({ + DeliveryStreamName: 'ds1', + Records: [inputParams.Records[0], inputParams.Records[1], inputParams.Records[2]], + }); + expect(spy).to.have.been.calledWith({ + DeliveryStreamName: 'ds1', + Records: [inputParams.Records[1], inputParams.Records[2]], + }); + expect(spy).to.have.been.calledWith({ + DeliveryStreamName: 'ds1', + Records: [inputParams.Records[2]], + }); + + expect(data).to.deep.equal({ + RequestResponses: [{ SequenceNumber: '1' }, { SequenceNumber: '2' }, { SequenceNumber: '3' }], + FailedPutCount: 0, + attempts: [ + { + RequestResponses: [{ SequenceNumber: '1' }, { ErrorCode: 'X' }, { ErrorCode: 'X' }], + FailedPutCount: 2, + }, + { + RequestResponses: [{ SequenceNumber: '2' }, { ErrorCode: 'X' }], + FailedPutCount: 1, + }, + { + RequestResponses: [{ SequenceNumber: '3' }], + FailedPutCount: 0, + }, + ], + }); + }); + + it('should throw on max retry', async () => { + const responses = [ + { RequestResponses: [{ SequenceNumber: '1' }, { ErrorCode: 'X' }, { ErrorCode: 'X' }], FailedPutCount: 2 }, + { RequestResponses: [{ SequenceNumber: '2' }, { ErrorCode: 'X' }], FailedPutCount: 1 }, + ]; + + const spy = sinon.spy((_) => responses.shift()); + mockFirehose.on(PutRecordBatchCommand).callsFake(spy); + + const inputParams = { + Records: [ + { + Data: Buffer.from(JSON.stringify({ type: 't1' })), + }, + { + Data: Buffer.from(JSON.stringify({ type: 't2' })), + }, + { + Data: Buffer.from(JSON.stringify({ type: 't3' })), + }, + ], + }; + + await new Connector({ + debug: debug('firehose'), + deliveryStreamName: 'ds1', + retryConfig: { + maxRetries: 1, + retryWait: 100, + }, + }).putRecordBatch(inputParams) + .then(() => { + expect.fail('should have thrown'); + }).catch((err) => { + expect(spy).to.have.been.calledWith({ + DeliveryStreamName: 'ds1', + Records: [inputParams.Records[0], inputParams.Records[1], inputParams.Records[2]], + }); + expect(spy).to.have.been.calledWith({ + DeliveryStreamName: 'ds1', + Records: [inputParams.Records[1], inputParams.Records[2]], + }); + expect(spy).to.not.have.been.calledWith({ + DeliveryStreamName: 'ds1', + Records: [inputParams.Records[2]], + }); + + expect(err.message).to.contain('Failed batch requests'); + }); + }); });