diff --git a/package-lock.json b/package-lock.json index 241d6895..c6506c8b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.0.32", + "version": "1.0.33", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.0.32", + "version": "1.0.33", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index a33fb84f..e964cab3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.0.32", + "version": "1.0.33", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", diff --git a/src/sinks/eventbridge.js b/src/sinks/eventbridge.js index b24e7f9f..41ea82be 100644 --- a/src/sinks/eventbridge.js +++ b/src/sinks/eventbridge.js @@ -21,6 +21,7 @@ export const publishToEventBridge = ({ // eslint-disable-line import/prefer-defa maxPublishRequestSize = Number(process.env.PUBLISH_MAX_REQ_SIZE) || Number(process.env.MAX_REQ_SIZE) || 256 * 1024, batchSize = Number(process.env.PUBLISH_BATCH_SIZE) || Number(process.env.BATCH_SIZE) || 10, parallel = Number(process.env.PUBLISH_PARALLEL) || Number(process.env.PARALLEL) || 8, + endpointId = process.env.BUS_ENDPOINT_ID, handleErrors = true, retryConfig, step = 'publish', @@ -47,6 +48,9 @@ export const publishToEventBridge = ({ // eslint-disable-line import/prefer-defa Entries: batchUow.batch .filter((uow) => uow[publishRequestEntryField]) .map((uow) => uow[publishRequestEntryField]), + ...(endpointId && { + EndpointId: endpointId, + }), }, }); diff --git a/test/unit/sinks/eventbridge.test.js b/test/unit/sinks/eventbridge.test.js index de62c2cb..b826ded9 100644 --- a/test/unit/sinks/eventbridge.test.js +++ b/test/unit/sinks/eventbridge.test.js @@ -270,4 +270,48 @@ describe('sinks/eventbridge.js', () => { }) .done(done); }); + + it('should publish to global endpoint', (done) => { + sinon.stub(Connector.prototype, 'putEvents').resolves({ FailedEntryCount: 0 }); + + const uows = [{ + event: { + id: '79a0d8f0-0eef-11ea-8d71-362b9e155667', + type: 'p1', + partitionKey: '79a0d8f0-0eef-11ea-8d71-362b9e155667', + }, + }]; + + _(uows) + .through(publish({ busName: 'b1', debug: (msg, v) => console.log(msg, v), endpointId: 'ep1' })) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + + expect(collected.length).to.equal(1); + expect(collected[0].publishRequest).to.deep.equal({ + EndpointId: 'ep1', + Entries: [{ + EventBusName: 'b1', + Source: 'custom', + DetailType: 'p1', + Detail: JSON.stringify({ + id: '79a0d8f0-0eef-11ea-8d71-362b9e155667', + type: 'p1', + partitionKey: '79a0d8f0-0eef-11ea-8d71-362b9e155667', + tags: { + account: 'undefined', + region: 'us-west-2', + stage: 'undefined', + source: 'undefined', + functionname: 'undefined', + pipeline: 'undefined', + skip: true, + }, + }), + }], + }); + }) + .done(done); + }); });