From aa5a6b8a62e7d40cf82b37787e65e5098991b1c1 Mon Sep 17 00:00:00 2001 From: Cody Date: Sat, 25 Jan 2025 10:04:27 -0800 Subject: [PATCH] allow overriding the cdc event field for cdc short circuiting --- src/flavors/cdc.js | 20 +++++++++++------- test/unit/flavors/cdc.test.js | 40 ++++++++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/src/flavors/cdc.js b/src/flavors/cdc.js index cbb8d87d..3d0ec487 100644 --- a/src/flavors/cdc.js +++ b/src/flavors/cdc.js @@ -29,7 +29,11 @@ export const cdc = (rule) => (s) => s // eslint-disable-line import/prefer-defau .map(toEvent(rule)) .parallel(rule.parallel || Number(process.env.PARALLEL) || 4) - .through(encryptEvent(rule)) + .through(encryptEvent({ + sourceField: rule.eventField || 'event', + targetField: rule.eventField || 'event', + ...rule, + })) .through(rule.publish(rule)) .tap(printEndPipeline); @@ -55,12 +59,12 @@ const toGetRequest = (rule) => faulty((uow) => ({ : undefined, })); -const toEvent = (rule) => faultyAsyncStream(async (uow) => (!rule.toEvent - ? uow - : ({ +const toEvent = (rule) => faultyAsyncStream(async (uow) => (!rule.toEvent // eslint-disable-line no-nested-ternary + ? uow : ({ ...uow, - event: { - ...uow.event, - ...await faultify(rule.toEvent)(uow, rule), - }, + [rule.eventField || 'event']: rule.eventField + ? await faultify(rule.toEvent)(uow, rule) : { + ...uow.event, + ...await faultify(rule.toEvent)(uow, rule), + }, }))); diff --git a/test/unit/flavors/cdc.test.js b/test/unit/flavors/cdc.test.js index dbc90e76..d7583cd0 100644 --- a/test/unit/flavors/cdc.test.js +++ b/test/unit/flavors/cdc.test.js @@ -61,6 +61,22 @@ describe('flavors/cdc.js', () => { timestamp: 1548967022000, }, }, + { + timestamp: 1572832690, + keys: { + pk: '1', + sk: 'override', + }, + newImage: { + pk: '1', + sk: 'override', + discriminator: 'override', + name: 'Override One', + description: 'This is override one', + ttl: 1549053422, + timestamp: 1548967022000, + }, + }, ]); initialize({ @@ -70,7 +86,7 @@ describe('flavors/cdc.js', () => { .collect() // .tap((collected) => console.log(JSON.stringify(collected, null, 2))) .tap((collected) => { - expect(collected.length).to.equal(2); + expect(collected.length).to.equal(3); expect(collected[1].pipeline).to.equal('cdc1'); expect(collected[1].event.type).to.equal('thing-created'); expect(collected[1].event.thing).to.deep.equal({ @@ -86,10 +102,17 @@ describe('flavors/cdc.js', () => { }); expect(collected[1].queryRequest).to.be.undefined; - // this pipeline speeds ahead since it does less async - expect(collected[0].pipeline).to.equal('cdc2'); - expect(collected[0].queryRequest).to.not.be.undefined; - expect(collected[0].queryResponse).to.not.be.undefined; + expect(collected[2].pipeline).to.equal('cdc2'); + expect(collected[2].queryRequest).to.not.be.undefined; + expect(collected[2].queryResponse).to.not.be.undefined; + + expect(collected[0].pipeline).to.equal('cdc3'); + expect(collected[0].event.type).to.equal('override-created'); + expect(collected[0].event.thing).to.be.undefined; + expect(collected[0].emit).to.be.null; + expect(collected[0].publishRequest).to.deep.equal({ + Entries: [], + }); }) .done(done); }); @@ -129,4 +152,11 @@ const rules = [ flavor: cdc, eventType: 'x9', }, + { + id: 'cdc3', + flavor: cdc, + toEvent: () => null, + eventField: 'emit', + eventType: /override-*/, + }, ];