From 5c72a5529193d85597cc81ff7caaf2e00c67dc6b Mon Sep 17 00:00:00 2001 From: John Gilbert Date: Sat, 21 Dec 2024 14:59:53 -0500 Subject: [PATCH 1/4] add-firehose-support-2 --- src/filters/event.js | 6 +- src/flavors/firehose.js | 64 +++++++++++++++++ src/flavors/index.js | 1 + src/from/firehose.js | 42 ++++++++++++ src/utils/handler.js | 18 +++++ test/unit/flavors/firehose.test.js | 106 +++++++++++++++++++++++++++++ test/unit/from/firehose.test.js | 48 +++++++++++++ test/unit/utils/handler.test.js | 16 ++++- 8 files changed, 299 insertions(+), 2 deletions(-) create mode 100644 src/flavors/firehose.js create mode 100644 src/from/firehose.js create mode 100644 test/unit/flavors/firehose.test.js create mode 100644 test/unit/from/firehose.test.js diff --git a/src/filters/event.js b/src/filters/event.js index 50890d6e..4e002bc6 100644 --- a/src/filters/event.js +++ b/src/filters/event.js @@ -19,7 +19,7 @@ export const filterOnEventType = (rule, uow) => { export const prefilterOnEventTypes = (rules) => (uow) => - rules.reduce((a, c) => a || filterOnEventType(c, uow), false); + rules.reduce((a, r) => a || filterOnEventType(r, uow), false); export const filterOnContent = (rule, uow) => { /* istanbul ignore else */ @@ -29,3 +29,7 @@ export const filterOnContent = (rule, uow) => { return true; } }; + +export const prefilterOnContent = (rules) => + (uow) => + rules.reduce((a, r) => a || filterOnContent(r, uow), false); diff --git a/src/flavors/firehose.js b/src/flavors/firehose.js new file mode 100644 index 00000000..363a3e25 --- /dev/null +++ b/src/flavors/firehose.js @@ -0,0 +1,64 @@ +import { + printStartPipeline, printEndPipeline, + faulty, +} from '../utils'; +import { + filterOnEventType, filterOnContent, + prefilterOnEventTypes, prefilterOnContent, +} from '../filters'; + +export const firehoseTransform = (rule) => (s) => s + .filter(onEventType(rule)) + .tap(printStartPipeline) + + .filter(onContent(rule)) + + .map(transform(rule)) + + .tap(printEndPipeline); + +export const firehoseDrop = (rules) => (opt) => { + console.log('%j', { opt }); + console.log('%j', { rules }); + return (s) => s + .filter((uow) => !(prefilterOnEventTypes(rules)(uow) && prefilterOnContent(rules)(uow))) + .tap(printEndPipeline); +}; + +const onEventType = (rule) => faulty((uow) => filterOnEventType(rule, uow)); +const onContent = (rule) => faulty((uow) => filterOnContent(rule, uow)); + +const spreadDateTime = (dt) => { + const date = new Date(dt); + + return { + year: `${date.getFullYear()}`, + month: `${date.getMonth() + 1}`, // JavaScript months are 0-indexed + day: `${date.getDate()}`, + hour: `${date.getHours()}`, + minute: `${date.getMinutes()}`, + }; +}; + +const metadata = (rule, uow) => (rule.metadata + ? /* istanbul ignore next */ rule.metadata(uow, rule) + : { + partitionKeys: { + table: rule.tableName, + ...spreadDateTime(uow.event.timestamp), + }, + }); + +const transform = (rule) => faulty((uow) => { + const transformed = rule.transform + ? rule.transform(uow, rule) + : /* istanbul ignore next */ {}; + + return { + ...uow, + transformed, + result: 'Ok', + data: Buffer.from(JSON.stringify(transformed), 'utf-8').toString('base64'), + metadata: metadata(rule, uow), + }; +}); diff --git a/src/flavors/index.js b/src/flavors/index.js index 47c27198..b885174d 100644 --- a/src/flavors/index.js +++ b/src/flavors/index.js @@ -3,6 +3,7 @@ export * from './correlate'; export * from './cdc'; export * from './evaluate'; export * from './expired'; +export * from './firehose'; export * from './job'; export * from './materialize'; export * from './materializeS3'; diff --git a/src/from/firehose.js b/src/from/firehose.js new file mode 100644 index 00000000..d3bd8d0d --- /dev/null +++ b/src/from/firehose.js @@ -0,0 +1,42 @@ +import _ from 'highland'; + +import { + faulty, decompress, compress, +} from '../utils'; +import { outSkip } from '../filters'; +import { redeemClaimcheck } from '../queries'; + +export const fromFirehose = (event) => + + _(event.records) + .map((record) => + // create a unit-of-work for each event + // so we can correlate related work for error handling + ({ + record, + event: Buffer.from(record.data, 'base64').toString('utf8'), + recordId: record.recordId, + result: 'Dropped', // by default, set to 'Ok' in transform, 'ProcessingFailed' in fault processing ??? + })) + + .map(faulty((uow) => ({ + ...uow, + event: JSON.parse(uow.event, decompress), + }))) + .filter(outSkip) + .through(redeemClaimcheck()); + +// test helper +export const toFirehoseRecords = (events, approximateArrivalTimestamp) => ({ + invocationId: 'invocationIdExample', + deliveryStreamArn: 'arn:aws:kinesis:TEST', + region: process.env.AWS_REGION || /* istanbul ignore next */ 'us-west-2', + records: events.map((e, i) => + ({ + recordId: `${i}`, // "49546986683135544286507457936321625675700192471156785154", + data: Buffer.from(JSON.stringify(e, compress())).toString('base64'), + approximateArrivalTimestamp, // format: 1495072949453 + })), +}); + +export const UNKNOWN_FIREHOSE_EVENT_TYPE = toFirehoseRecords([{ type: 'unknown-type' }]); diff --git a/src/utils/handler.js b/src/utils/handler.js index 97ac20ad..6f971ec2 100644 --- a/src/utils/handler.js +++ b/src/utils/handler.js @@ -39,3 +39,21 @@ export const toPromise = (s) => { }); } }; + +export const toFirehose = (s) => new Promise((resolve, reject) => { + const records = []; + s.consume((err, x, push, next) => { + /* istanbul ignore if */ + if (err) { + reject(err); + } else if (x === _.nil) { + resolve({ + records, + }); + } else { + records.push(x); + next(); + } + }) + .resume(); +}); diff --git a/test/unit/flavors/firehose.test.js b/test/unit/flavors/firehose.test.js new file mode 100644 index 00000000..e3430449 --- /dev/null +++ b/test/unit/flavors/firehose.test.js @@ -0,0 +1,106 @@ +import 'mocha'; +import { expect } from 'chai'; +import sinon from 'sinon'; + +import { + initialize, initializeFrom, +} from '../../../src'; + +import { toFirehoseRecords, fromFirehose } from '../../../src/from/firehose'; +import { firehoseTransform, firehoseDrop } from '../../../src/flavors/firehose'; + +describe('flavors/firehose.js', () => { + beforeEach(() => { + }); + + afterEach(sinon.restore); + + it('should execute', (done) => { + const events = toFirehoseRecords([ + { + type: 't1', + timestamp: 1734754684001, + thing: { + id: '1', + name: 'Thing One', + description: 'This is thing one', + }, + }, + { + type: 't2', + timestamp: 1734754684001, + thing: { + id: '2', + name: 'Thing Two', + description: 'This is thing two', + }, + }, + { + type: 't3', // should be dropped + timestamp: 1734754684001, + }, + ]); + + initialize({ + ...initializeFrom(rules), + drop: firehoseDrop(rules), + }) + .assemble(fromFirehose(events), false) + .collect() + // .tap((records) => console.log(JSON.stringify(records, null, 2))) + .tap((records) => { + expect(records.length).to.equal(3); + expect(records[0].pipeline).to.equal('ft1'); + expect(records[0].event.type).to.equal('t1'); + expect(records[0].transformed).to.deep.equal({ + ID: '1', + NM: 'Thing One', + DESC: 'This is thing one', + }); + expect(records[0].metadata).to.deep.equal({ + partitionKeys: { + table: 'T1', + year: '2024', + month: '12', + day: '20', + hour: '23', + minute: '18', + }, + }); + expect(records[0].data).to.equal('eyJJRCI6IjEiLCJOTSI6IlRoaW5nIE9uZSIsIkRFU0MiOiJUaGlzIGlzIHRoaW5nIG9uZSJ9'); + expect(records[0].result).to.equal('Ok'); + expect(records[1].result).to.equal('Ok'); + expect(records[2].result).to.equal('Dropped'); + }) + .done(done); + }); +}); + +const transform = (uow) => ({ + ID: uow.event.thing.id, + NM: uow.event.thing.name, + DESC: uow.event.thing.description, +}); + +const rules = [ + { + id: 'ft1', + flavor: firehoseTransform, + eventType: 't1', + tableName: 'T1', + transform, + }, + { + id: 'ft2', + flavor: firehoseTransform, + eventType: 't2', + filters: [() => true], + tableName: 'T2', + transform, + }, + { + id: 'other1', + eventType: 'x9', + flavor: firehoseTransform, + }, +]; diff --git a/test/unit/from/firehose.test.js b/test/unit/from/firehose.test.js new file mode 100644 index 00000000..6047af74 --- /dev/null +++ b/test/unit/from/firehose.test.js @@ -0,0 +1,48 @@ +import 'mocha'; +import { expect } from 'chai'; + +import { fromFirehose, toFirehoseRecords } from '../../../src/from/firehose'; + +describe('from/firehose.js', () => { + it('should parse records', (done) => { + const event = toFirehoseRecords([ + { + id: '1', + type: 't1', + partitionKey: '1', + }, + { + id: 'x', + type: 't1', + partitionKey: '1', + tags: { + skip: true, + }, + }, + ]); + + // console.log(JSON.stringify({ event }, null, 2)); + + fromFirehose(event) + .collect() + // .tap((collected) => console.log(JSON.stringify(collected, null, 2))) + .tap((collected) => { + expect(collected.length).to.equal(1); + expect(collected[0]).to.deep.equal({ + record: { + recordId: '0', + approximateArrivalTimestamp: undefined, + data: 'eyJpZCI6IjEiLCJ0eXBlIjoidDEiLCJwYXJ0aXRpb25LZXkiOiIxIn0=', + }, + event: { + id: '1', + type: 't1', + partitionKey: '1', + }, + recordId: '0', + result: 'Dropped', + }); + }) + .done(done); + }); +}); diff --git a/test/unit/utils/handler.test.js b/test/unit/utils/handler.test.js index 23720510..35a747b0 100644 --- a/test/unit/utils/handler.test.js +++ b/test/unit/utils/handler.test.js @@ -5,7 +5,7 @@ import sinon from 'sinon'; import _ from 'highland'; import Promise from 'bluebird'; -import { mw, toPromise } from '../../../src/utils/handler'; +import { mw, toPromise, toFirehose } from '../../../src/utils/handler'; describe('utils/handler.js', () => { afterEach(sinon.restore); @@ -78,4 +78,18 @@ describe('utils/handler.js', () => { .then(() => Promise.reject(new Error('failed'))) .catch((err) => ('Caught')); }); + + it('should return firehose payload', async () => { + const handlerWithPromise = async (event, context) => + _(event.records) + .through(toFirehose); + + const result = await handlerWithPromise({ + records: [{ data: 'r11' }, { data: 'r12' }], + }, {}); + + expect(result).to.deep.equal({ + records: [{ data: 'r11' }, { data: 'r12' }], + }); + }); }); From 640f4113dbf8c879d52f2923654225f863ac4742 Mon Sep 17 00:00:00 2001 From: John Gilbert Date: Sat, 21 Dec 2024 15:10:59 -0500 Subject: [PATCH 2/4] add-firehose-support-2 --- src/flavors/firehose.js | 20 ++++++++------------ test/unit/flavors/firehose.test.js | 4 ++-- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/src/flavors/firehose.js b/src/flavors/firehose.js index 363a3e25..b4bb61df 100644 --- a/src/flavors/firehose.js +++ b/src/flavors/firehose.js @@ -17,13 +17,9 @@ export const firehoseTransform = (rule) => (s) => s .tap(printEndPipeline); -export const firehoseDrop = (rules) => (opt) => { - console.log('%j', { opt }); - console.log('%j', { rules }); - return (s) => s - .filter((uow) => !(prefilterOnEventTypes(rules)(uow) && prefilterOnContent(rules)(uow))) - .tap(printEndPipeline); -}; +export const firehoseDrop = (rules) => (opt) => (s) => s + .filter((uow) => !(prefilterOnEventTypes(rules)(uow) && prefilterOnContent(rules)(uow))) + .tap(printEndPipeline); const onEventType = (rule) => faulty((uow) => filterOnEventType(rule, uow)); const onContent = (rule) => faulty((uow) => filterOnContent(rule, uow)); @@ -32,11 +28,11 @@ const spreadDateTime = (dt) => { const date = new Date(dt); return { - year: `${date.getFullYear()}`, - month: `${date.getMonth() + 1}`, // JavaScript months are 0-indexed - day: `${date.getDate()}`, - hour: `${date.getHours()}`, - minute: `${date.getMinutes()}`, + year: `${date.getUTCFullYear()}`, + month: `${date.getUTCMonth() + 1}`.padStart(2, '0'), // JavaScript months are 0-indexed + day: `${date.getUTCDate()}`.padStart(2, '0'), + hour: `${date.getUTCHours()}`.padStart(2, '0'), + minute: `${date.getUTCMinutes()}`.padStart(2, '0'), }; }; diff --git a/test/unit/flavors/firehose.test.js b/test/unit/flavors/firehose.test.js index e3430449..6adc4f2a 100644 --- a/test/unit/flavors/firehose.test.js +++ b/test/unit/flavors/firehose.test.js @@ -62,8 +62,8 @@ describe('flavors/firehose.js', () => { table: 'T1', year: '2024', month: '12', - day: '20', - hour: '23', + day: '21', + hour: '04', minute: '18', }, }); From c27d4334eaf032942e0e43e6b23f6310200c79a4 Mon Sep 17 00:00:00 2001 From: John Gilbert Date: Mon, 23 Dec 2024 22:41:19 -0500 Subject: [PATCH 3/4] add-firehose-support-2 --- src/flavors/firehose.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flavors/firehose.js b/src/flavors/firehose.js index b4bb61df..2edcc963 100644 --- a/src/flavors/firehose.js +++ b/src/flavors/firehose.js @@ -24,7 +24,7 @@ export const firehoseDrop = (rules) => (opt) => (s) => s const onEventType = (rule) => faulty((uow) => filterOnEventType(rule, uow)); const onContent = (rule) => faulty((uow) => filterOnContent(rule, uow)); -const spreadDateTime = (dt) => { +export const spreadDateTime = (dt) => { const date = new Date(dt); return { From 4e31acfe4e4e88af1cfe631672e71e4da4e60267 Mon Sep 17 00:00:00 2001 From: John Gilbert Date: Mon, 23 Dec 2024 22:41:52 -0500 Subject: [PATCH 4/4] v1.0.29 --- package-lock.json | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index 3b5b4776..019b9e92 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.0.28", + "version": "1.0.29", "lockfileVersion": 3, "requires": true, "packages": { diff --git a/package.json b/package.json index 9cc04a9b..a0262dec 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.0.28", + "version": "1.0.29", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws",