diff --git a/README.md b/README.md index a93d318..14a75bc 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,8 @@ const stream = new DynamoStreamHandler({ /* ... create the "context", e.g. data sources ... */ return { doSomething: () => null }; }, + // Optionally specify a concurrency setting for processing events. + concurrency: 5, }) .onInsert(async (ctx, entity) => { // INSERT actions receive a single strongly typed new entities @@ -107,6 +109,8 @@ const queue = new SQSMessageHandler({ /* ... create the "context", e.g. data sources ... */ return { doSomething: () => null }; }, + // Optionally specify a concurrency setting for processing events. + concurrency: 5, }) .onMessage(async (ctx, message) => { // `ctx` contains the nice result of `createRunContext`: @@ -158,3 +162,15 @@ test('something', async () => { expect(context.doSomething).toHaveBeenCalledTimes(3) }) ``` + +### Parallel Processing + Ordering + +By default, the abstractions in `@lifeomic/delta` (`DynamoStreamHandler` and `SQSMessageHandler`) will process events in parallel. To control the parallelization, specify a `concurrency` value when creating the handler. + +These abstractions also ensure that within a batch of events correct _ordering_ of events is maintained according to the ordering semantics of the upstream event source, even when processing in parallel. + +In `DynamoStreamHandler`, events for the same _key_ will always be processed serially -- events from different keys will be processed in parallel. + +In `SQSMessageHandler`, events with the same `MessageGroupId` will always processed serially -- events with different `MessageGroupId` values will be processed in parallel. + +**Note**: while the ordering semantics above will always be preserved, events that do _not_ need to be ordered will not necessarily be processed in the same order they were received in the batch (even when using a `concurrency` value of `1`). diff --git a/package.json b/package.json index ef49b6d..587b154 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "@lifeomic/logging": "^4.0.0", "@lifeomic/typescript-config": "^1.0.3", "@types/jest": "^27.4.1", + "@types/lodash": "^4.14.195", "@types/uuid": "^8.3.4", "conventional-changelog-conventionalcommits": "^4.6.3", "eslint": "^8.9.0", @@ -44,6 +45,8 @@ "dependencies": { "@aws-sdk/util-dynamodb": "^3.369.0", "@types/aws-lambda": "^8.10.92", + "lodash": "^4.17.21", + "p-map": "^4.0.0", "uuid": "^8.3.2" }, "peerDependencies": { diff --git a/src/dynamo-streams.test.ts b/src/dynamo-streams.test.ts index 33ceea4..80abfbf 100644 --- a/src/dynamo-streams.test.ts +++ b/src/dynamo-streams.test.ts @@ -4,7 +4,11 @@ import { DynamoStreamHandler } from './dynamo-streams'; import { marshall } from '@aws-sdk/util-dynamodb'; import { z } from 'zod'; -const TestSchema = z.object({ id: z.string(), name: z.string().optional() }); +const TestSchema = z.object({ + id: z.string(), + name: z.string().optional(), + otherValue: z.string().optional(), +}); const testSerializer = { parse: (object: any) => TestSchema.parse(object), @@ -170,16 +174,16 @@ describe('DynamoStreamHandler', () => { }) // onInsert twice to test same event through multiple actions .onInsert((ctx, entity) => { - ctx.dataSources.doSomething(entity); + ctx.dataSources.doSomething('insert 1', entity); }) .onInsert((ctx, entity) => { - ctx.dataSources.doSomething(entity); + ctx.dataSources.doSomething('insert 2', entity); }) .onModify((ctx, oldEntity, newEntity) => { - ctx.dataSources.doSomething(oldEntity, newEntity); + ctx.dataSources.doSomething('modify', oldEntity, newEntity); }) .onRemove((ctx, entity) => { - ctx.dataSources.doSomething(entity); + ctx.dataSources.doSomething('remove', entity); }) .lambda(); @@ -189,27 +193,27 @@ describe('DynamoStreamHandler', () => { { eventName: 'INSERT', dynamodb: { - NewImage: marshall({ id: 'new-insert-varied-lambda' }) as any, + NewImage: marshall({ id: 'test-id-1' }) as any, }, }, { eventName: 'MODIFY', dynamodb: { - OldImage: marshall({ id: 'old-modify-varied-lambda' }) as any, - NewImage: marshall({ id: 'new-modify-varied-lambda' }) as any, + OldImage: marshall({ id: 'test-id-1' }) as any, + NewImage: marshall({ id: 'test-id-1', name: 'new name' }) as any, }, }, { eventName: 'REMOVE', dynamodb: { - OldImage: marshall({ id: 'old-remove-varied-lambda' }) as any, + OldImage: marshall({ id: 'test-id-2' }) as any, }, }, // A second remove event to test multiple events through a single action { eventName: 'REMOVE', dynamodb: { - OldImage: marshall({ id: 'old-remove-varied-lambda-second' }), + OldImage: marshall({ id: 'test-id-0' }), }, }, ], @@ -220,22 +224,23 @@ describe('DynamoStreamHandler', () => { expect(dataSources.doSomething).toHaveBeenCalledTimes(5); - expect(dataSources.doSomething).toHaveBeenNthCalledWith(1, { - id: 'new-insert-varied-lambda', - }); - expect(dataSources.doSomething).toHaveBeenNthCalledWith(2, { - id: 'new-insert-varied-lambda', + expect(dataSources.doSomething).toHaveBeenNthCalledWith(1, 'insert 1', { + id: 'test-id-1', }); expect(dataSources.doSomething).toHaveBeenNthCalledWith( - 3, - { id: 'old-modify-varied-lambda' }, - { id: 'new-modify-varied-lambda' }, + 2, + 'modify', + { id: 'test-id-1' }, + { id: 'test-id-1', name: 'new name' }, ); - expect(dataSources.doSomething).toHaveBeenNthCalledWith(4, { - id: 'old-remove-varied-lambda', + expect(dataSources.doSomething).toHaveBeenNthCalledWith(3, 'remove', { + id: 'test-id-2', + }); + expect(dataSources.doSomething).toHaveBeenNthCalledWith(4, 'remove', { + id: 'test-id-0', }); - expect(dataSources.doSomething).toHaveBeenNthCalledWith(5, { - id: 'old-remove-varied-lambda-second', + expect(dataSources.doSomething).toHaveBeenNthCalledWith(5, 'insert 2', { + id: 'test-id-1', }); }); @@ -318,54 +323,55 @@ describe('DynamoStreamHandler', () => { }) // onInsert twice to test same event through multiple actions .onInsert((ctx, entity) => { - ctx.dataSources.doSomething(entity); + ctx.dataSources.doSomething('insert 1', entity); }) .onInsert((ctx, entity) => { - ctx.dataSources.doSomething(entity); + ctx.dataSources.doSomething('insert 2', entity); }) .onModify((ctx, oldEntity, newEntity) => { - ctx.dataSources.doSomething(oldEntity, newEntity); + ctx.dataSources.doSomething('modify', oldEntity, newEntity); }) .onRemove((ctx, entity) => { - ctx.dataSources.doSomething(entity); + ctx.dataSources.doSomething('remove', entity); }) .harness(); await sendEvent({ records: [ - { type: 'insert', entity: { id: 'new-insert-varied-harness' } }, + { type: 'insert', entity: { id: 'test-id-1' } }, { type: 'modify', - oldEntity: { id: 'old-modify-varied-harness' }, - newEntity: { id: 'new-modify-varied-harness' }, + oldEntity: { id: 'test-id-1' }, + newEntity: { id: 'test-id-1', name: 'new name' }, }, - { type: 'remove', entity: { id: 'old-remove-varied-harness' } }, + { type: 'remove', entity: { id: 'test-id-2' } }, // A second remove event to test multiple events through a single action { type: 'remove', - entity: { id: 'old-remove-varied-harness-second' }, + entity: { id: 'test-id-0' }, }, ], }); expect(dataSources.doSomething).toHaveBeenCalledTimes(5); - expect(dataSources.doSomething).toHaveBeenNthCalledWith(1, { - id: 'new-insert-varied-harness', - }); - expect(dataSources.doSomething).toHaveBeenNthCalledWith(2, { - id: 'new-insert-varied-harness', + expect(dataSources.doSomething).toHaveBeenNthCalledWith(1, 'insert 1', { + id: 'test-id-1', }); expect(dataSources.doSomething).toHaveBeenNthCalledWith( - 3, - { id: 'old-modify-varied-harness' }, - { id: 'new-modify-varied-harness' }, + 2, + 'modify', + { id: 'test-id-1' }, + { id: 'test-id-1', name: 'new name' }, ); - expect(dataSources.doSomething).toHaveBeenNthCalledWith(4, { - id: 'old-remove-varied-harness', + expect(dataSources.doSomething).toHaveBeenNthCalledWith(3, 'remove', { + id: 'test-id-2', + }); + expect(dataSources.doSomething).toHaveBeenNthCalledWith(4, 'remove', { + id: 'test-id-0', }); - expect(dataSources.doSomething).toHaveBeenNthCalledWith(5, { - id: 'old-remove-varied-harness-second', + expect(dataSources.doSomething).toHaveBeenNthCalledWith(5, 'insert 2', { + id: 'test-id-1', }); }); @@ -508,4 +514,227 @@ describe('DynamoStreamHandler', () => { ); }); }); + + describe('parallelization', () => { + const wait = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + + test('processes events in parallel, while maintaining ordering by Keys', async () => { + const mocks = { + insert: { started: jest.fn(), finished: jest.fn() }, + modify: { started: jest.fn(), finished: jest.fn() }, + remove: { started: jest.fn(), finished: jest.fn() }, + }; + + const handler = new DynamoStreamHandler({ + logger, + parse: testSerializer.parse, + createRunContext: () => ({}), + }) + .onInsert(async (ctx, item) => { + mocks.insert.started(Date.now(), item); + await wait(100); + mocks.insert.finished(Date.now(), item); + }) + .onModify(async (ctx, oldItem, newItem) => { + mocks.modify.started(Date.now(), oldItem, newItem); + await wait(100); + mocks.modify.finished(Date.now(), oldItem, newItem); + }) + .onRemove(async (ctx, item) => { + mocks.remove.started(Date.now(), item); + await wait(100); + mocks.remove.finished(Date.now(), item); + }) + .lambda(); + + const start = Date.now(); + + await handler( + { + Records: [ + { + eventName: 'INSERT', + dynamodb: { + Keys: marshall({ id: 'test-id-1', name: 'test-name-1' }) as any, + NewImage: marshall({ + id: 'test-id-1', + name: 'test-name-1', + otherValue: 'test-value-1', + }) as any, + }, + }, + { + eventName: 'MODIFY', + dynamodb: { + Keys: marshall({ id: 'test-id-1', name: 'test-name-1' }) as any, + NewImage: marshall({ + id: 'test-id-1', + name: 'test-name-1', + otherValue: 'test-value-1', + }) as any, + OldImage: marshall({ + id: 'test-id-1', + name: 'test-name-1', + otherValue: 'test-value-2', + }) as any, + }, + }, + { + eventName: 'MODIFY', + dynamodb: { + Keys: marshall({ id: 'test-id-2', name: 'test-name-1' }) as any, + NewImage: marshall({ + id: 'test-id-2', + name: 'test-name-1', + otherValue: 'test-value-1', + }) as any, + OldImage: marshall({ + id: 'test-id-2', + name: 'test-name-1', + otherValue: 'test-value-2', + }) as any, + }, + }, + { + eventName: 'REMOVE', + dynamodb: { + Keys: marshall({ id: 'test-id-1', name: 'test-name-1' }) as any, + OldImage: marshall({ + id: 'test-id-1', + name: 'test-name-1', + otherValue: 'test-value-2', + }) as any, + }, + }, + ], + }, + {} as any, + null as any, + ); + + const end = Date.now(); + + // This assertion confirms that the group doesn't process in less than 300ms. + // If it did, then some of the ordered events would be parallelized, which would be bad. + expect(end - start).toBeGreaterThanOrEqual(300); + + // This assertions confirms that there is some parallelization happening. + expect(end - start).toBeLessThan(400); + + // Now, let's also explicitly assert that the events were processed in order. + const insertFinishedTime = mocks.insert.finished.mock.calls.find( + ([, { id }]) => id === 'test-id-1', + )[0]; + const modifyStartedTime = mocks.modify.started.mock.calls.find( + ([, { id }]) => id === 'test-id-1', + )[0]; + + expect(modifyStartedTime).toBeGreaterThanOrEqual(insertFinishedTime); + + const modifyFinishedTime = mocks.modify.finished.mock.calls.find( + ([, { id }]) => id === 'test-id-1', + )[0]; + + const removeStartedTime = mocks.remove.started.mock.calls.find( + ([, { id }]) => id === 'test-id-1', + )[0]; + + expect(removeStartedTime).toBeGreaterThanOrEqual(modifyFinishedTime); + }); + + test('concurrency can be set to 1, which will result in serial processing', async () => { + const processMock = jest.fn(); + + const handler = new DynamoStreamHandler({ + logger, + parse: testSerializer.parse, + createRunContext: () => ({}), + concurrency: 1, + }) + .onInsert(async (ctx, item) => { + processMock('insert', item); + await wait(100); + }) + .onModify(async (ctx, oldItem, newItem) => { + processMock('modify', oldItem, newItem); + await wait(100); + }) + .onRemove(async (ctx, item) => { + processMock('remove', item); + await wait(100); + }) + .lambda(); + + const start = Date.now(); + + await handler( + { + Records: [ + { + eventName: 'INSERT', + dynamodb: { + Keys: marshall({ id: 'test-id-1', name: 'test-name-1' }) as any, + NewImage: marshall({ + id: 'test-id-1', + name: 'test-name-1', + otherValue: 'test-value-1', + }) as any, + }, + }, + { + eventName: 'MODIFY', + dynamodb: { + Keys: marshall({ id: 'test-id-1', name: 'test-name-1' }) as any, + OldImage: marshall({ + id: 'test-id-1', + name: 'test-name-1', + otherValue: 'test-value-1', + }) as any, + NewImage: marshall({ + id: 'test-id-1', + name: 'test-name-1', + otherValue: 'test-value-2', + }) as any, + }, + }, + { + eventName: 'MODIFY', + dynamodb: { + Keys: marshall({ id: 'test-id-2', name: 'test-name-1' }) as any, + OldImage: marshall({ + id: 'test-id-2', + name: 'test-name-1', + otherValue: 'test-value-1', + }) as any, + NewImage: marshall({ + id: 'test-id-2', + name: 'test-name-1', + otherValue: 'test-value-2', + }) as any, + }, + }, + { + eventName: 'REMOVE', + dynamodb: { + Keys: marshall({ id: 'test-id-1', name: 'test-name-1' }) as any, + OldImage: marshall({ + id: 'test-id-1', + name: 'test-name-1', + otherValue: 'test-value-2', + }) as any, + }, + }, + ], + }, + {} as any, + null as any, + ); + + const end = Date.now(); + + // This assertions provides some reasonable confirmation that parallelization is not happening. + expect(end - start).toBeGreaterThanOrEqual(400); + }); + }); }); diff --git a/src/dynamo-streams.ts b/src/dynamo-streams.ts index d2f9759..a7acee1 100644 --- a/src/dynamo-streams.ts +++ b/src/dynamo-streams.ts @@ -2,7 +2,11 @@ import { LoggerInterface } from '@lifeomic/logging'; import { v4 as uuid } from 'uuid'; import { DynamoDBStreamEvent, DynamoDBStreamHandler } from 'aws-lambda'; import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'; -import { BaseContext, withHealthCheckHandling } from './utils'; +import { + BaseContext, + processWithOrdering, + withHealthCheckHandling, +} from './utils'; export type DynamoStreamHandlerConfig = { /** @@ -20,6 +24,12 @@ export type DynamoStreamHandlerConfig = { * Create a "context" for the lambda execution. (e.g. "data sources") */ createRunContext: (base: BaseContext) => Context | Promise; + /** + * The maximum concurrency for processing records. + * + * @default 5 + */ + concurrency?: number; }; export type InsertAction = ( @@ -170,82 +180,111 @@ export class DynamoStreamHandler { context.logger.info({ event }, 'Processing DynamoDB stream event'); - // Iterate through every event. - for (const record of event.Records) { - const recordLogger = this.config.logger.child({ record }); - if (!record.dynamodb) { - recordLogger.error( - { record }, - 'The dynamodb property was not present on event', - ); - continue; - } - - // Unmarshall the entities. - const oldEntity = - record.dynamodb.OldImage && - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - this.config.parse(unmarshall(record.dynamodb.OldImage as any)); - - const newEntity = - record.dynamodb.NewImage && - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - this.config.parse(unmarshall(record.dynamodb.NewImage as any)); - - // Handle INSERT events -- invoke the INSERT actions in order. - if (record.eventName === 'INSERT') { - if (!newEntity) { - recordLogger.error( - { record }, - 'No NewImage was defined for an INSERT event', + await processWithOrdering( + { + items: event.Records, + orderBy: (record) => { + const KeyObject = record.dynamodb?.Keys; + + // This scenario should only ever happen in tests. + if (!KeyObject) { + return uuid(); + } + + // We need to order by key -- so, just stringify the key. + // + // But, add custom logic to ensure that the key object is stringified + // determinstically, regardless of the order of its keys. (e.g. we + // should stringify { a: 1, b: 2 } and { b: 2, a: 1 } to the same string) + // + // It's possible that AWS already ensures that the keys are deterministically + // ordered, and therefore we don't need to do this. But we add this logic just + // to be extra sure. + return JSON.stringify( + Object.keys(KeyObject) + .sort() + .map((key) => [key, KeyObject[key]]), ); - continue; - } - - for (const action of this.actions.insert) { - await action({ ...context, logger: recordLogger }, newEntity); - } - } - // Handle MODIFY events -- invoke the MODIFY actions in order. - else if (record.eventName === 'MODIFY') { - if (!oldEntity) { + }, + concurrency: this.config.concurrency ?? 5, + stopOnError: false, + }, + async (record) => { + const recordLogger = this.config.logger.child({ record }); + if (!record.dynamodb) { recordLogger.error( { record }, - 'No OldImage was defined for a MODIFY event', + 'The dynamodb property was not present on event', ); - continue; - } - if (!newEntity) { - recordLogger.error( - { record }, - 'No NewImage was defined for a MODIFY event', - ); - continue; + return; } - for (const action of this.actions.modify) { - await action( - { ...context, logger: recordLogger }, - oldEntity, - newEntity, - ); + // Unmarshall the entities. + const oldEntity = + record.dynamodb.OldImage && + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + this.config.parse(unmarshall(record.dynamodb.OldImage as any)); + + const newEntity = + record.dynamodb.NewImage && + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + this.config.parse(unmarshall(record.dynamodb.NewImage as any)); + + // Handle INSERT events -- invoke the INSERT actions in order. + if (record.eventName === 'INSERT') { + if (!newEntity) { + recordLogger.error( + { record }, + 'No NewImage was defined for an INSERT event', + ); + return; + } + + for (const action of this.actions.insert) { + await action({ ...context, logger: recordLogger }, newEntity); + } } - } - // Handle REMOVE events -- invoke the REMOVE actions in order. - else if (record.eventName === 'REMOVE') { - if (!oldEntity) { - recordLogger.error( - { record }, - 'No OldImage was defined for a REMOVE event', - ); - continue; + // Handle MODIFY events -- invoke the MODIFY actions in order. + else if (record.eventName === 'MODIFY') { + if (!oldEntity) { + recordLogger.error( + { record }, + 'No OldImage was defined for a MODIFY event', + ); + return; + } + if (!newEntity) { + recordLogger.error( + { record }, + 'No NewImage was defined for a MODIFY event', + ); + return; + } + + for (const action of this.actions.modify) { + await action( + { ...context, logger: recordLogger }, + oldEntity, + newEntity, + ); + } } - - for (const action of this.actions.remove) { - await action({ ...context, logger: recordLogger }, oldEntity); + // Handle REMOVE events -- invoke the REMOVE actions in order. + else if (record.eventName === 'REMOVE') { + if (!oldEntity) { + recordLogger.error( + { record }, + 'No OldImage was defined for a REMOVE event', + ); + return; + } + + for (const action of this.actions.remove) { + await action({ ...context, logger: recordLogger }, oldEntity); + } } - } - } + }, + ); }); } diff --git a/src/sqs.test.ts b/src/sqs.test.ts index 69009e0..3dac844 100644 --- a/src/sqs.test.ts +++ b/src/sqs.test.ts @@ -59,7 +59,11 @@ describe('SQSMessageHandler', () => { }).lambda(); await lambda( - { Records: [{ body: JSON.stringify({ data: 'test-event-1' }) }] } as any, + { + Records: [ + { attributes: {}, body: JSON.stringify({ data: 'test-event-1' }) }, + ], + } as any, {} as any, ); @@ -79,6 +83,7 @@ describe('SQSMessageHandler', () => { logger, parseMessage: testSerializer.parseMessage, createRunContext: () => dataSources, + concurrency: 1, }) .onMessage((ctx, message) => { ctx.doSomething('first-handler', message); @@ -91,10 +96,10 @@ describe('SQSMessageHandler', () => { await lambda( { Records: [ - { body: JSON.stringify({ data: 'test-event-1' }) }, - { body: JSON.stringify({ data: 'test-event-2' }) }, - { body: JSON.stringify({ data: 'test-event-3' }) }, - { body: JSON.stringify({ data: 'test-event-4' }) }, + { attributes: {}, body: JSON.stringify({ data: 'test-event-1' }) }, + { attributes: {}, body: JSON.stringify({ data: 'test-event-2' }) }, + { attributes: {}, body: JSON.stringify({ data: 'test-event-3' }) }, + { attributes: {}, body: JSON.stringify({ data: 'test-event-4' }) }, ], } as any, {} as any, @@ -214,4 +219,119 @@ describe('SQSMessageHandler', () => { expect(dataSources.doSomething).toHaveBeenCalledWith(testValue); }); }); + + const wait = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + + describe('parallelization', () => { + test('processes events in parallel', async () => { + const handler = new SQSMessageHandler({ + logger, + parseMessage: testSerializer.parseMessage, + createRunContext: () => ({}), + }) + .onMessage(async () => { + // We'll wait 100ms per event, then use that to make timing + // assertions below. + await wait(100); + }) + .lambda(); + + const start = Date.now(); + + await handler( + { + Records: [ + { attributes: {}, body: JSON.stringify({ data: 'test-event-1' }) }, + { attributes: {}, body: JSON.stringify({ data: 'test-event-2' }) }, + { attributes: {}, body: JSON.stringify({ data: 'test-event-3' }) }, + ] as any, + }, + {} as any, + ); + + const end = Date.now(); + + // This assertion confirms there is parallel processing. + expect(end - start).toBeLessThan(200); + }); + + test('maintains ordering by MessageGroupId', async () => { + const messageStarted = jest.fn(); + const messageFinished = jest.fn(); + const handler = new SQSMessageHandler({ + logger, + parseMessage: testSerializer.parseMessage, + createRunContext: () => ({}), + }) + .onMessage(async (ctx, msg) => { + messageStarted(msg, Date.now()); + // We'll wait 100ms per event, then use that to make timing + // assertions below. + await wait(100); + messageFinished(msg, Date.now()); + }) + .lambda(); + + const start = Date.now(); + + await handler( + { + Records: [ + { + attributes: { MessageGroupId: 'group-id' }, + body: JSON.stringify({ data: 'test-event-1' }), + }, + { attributes: {}, body: JSON.stringify({ data: 'test-event-2' }) }, + { + attributes: { MessageGroupId: 'group-id-2' }, + body: JSON.stringify({ data: 'test-event-other-1' }), + }, + { + attributes: { MessageGroupId: 'group-id' }, + body: JSON.stringify({ data: 'test-event-3' }), + }, + { + attributes: { MessageGroupId: 'group-id-2' }, + body: JSON.stringify({ data: 'test-event-other-2' }), + }, + { attributes: {}, body: JSON.stringify({ data: 'test-event-4' }) }, + ] as any, + }, + {} as any, + ); + + const end = Date.now(); + + // This assertion confirms that the group doesn't process in less than 200ms. + // If it did, then the events would be fully parallelized, which would be bad. + expect(end - start).toBeGreaterThan(200); + + // This assertion confirms that there is at least some parallelization happening. + expect(end - start).toBeLessThanOrEqual(450); + + // Now, let's also assert that event 3 was processed _after_ the end of event 1. + const event1FinishedTime = messageFinished.mock.calls.find( + (call) => call[0].data === 'test-event-1', + )[1]; + + const event3StartedTime = messageStarted.mock.calls.find( + (call) => call[0].data === 'test-event-3', + )[1]; + + expect(event3StartedTime).toBeGreaterThanOrEqual(event1FinishedTime); + + const eventOther1FinishedTime = messageFinished.mock.calls.find( + (call) => call[0].data === 'test-event-other-1', + )[1]; + + const eventOther2StartedTime = messageStarted.mock.calls.find( + (call) => call[0].data === 'test-event-other-2', + )[1]; + + expect(eventOther2StartedTime).toBeGreaterThanOrEqual( + eventOther1FinishedTime, + ); + }); + }); }); diff --git a/src/sqs.ts b/src/sqs.ts index e22f4f2..32246ef 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -1,7 +1,11 @@ import { LoggerInterface } from '@lifeomic/logging'; import { v4 as uuid } from 'uuid'; import { SQSEvent, Context as AWSContext } from 'aws-lambda'; -import { BaseContext, withHealthCheckHandling } from './utils'; +import { + BaseContext, + processWithOrdering, + withHealthCheckHandling, +} from './utils'; export type SQSMessageHandlerConfig = { /** @@ -16,6 +20,12 @@ export type SQSMessageHandlerConfig = { * Create a "context" for the lambda execution. (e.g. "data sources") */ createRunContext: (base: BaseContext) => Context | Promise; + /** + * The maximum concurrency for processing messages. + * + * @default 5 + */ + concurrency?: number; }; export type SQSMessageAction = ( @@ -81,19 +91,31 @@ export class SQSMessageHandler { // 2. Process all the records. context.logger.info({ event }, 'Processing SQS topic message'); - for (const record of event.Records) { - const messageLogger = context.logger.child({ - messageId: record.messageId, - }); - - const parsedMessage = this.config.parseMessage(record.body); - - for (const action of this.messageActions) { - await action({ ...context, logger: messageLogger }, parsedMessage); - } - messageLogger.info('Successfully processed message'); - } + await processWithOrdering( + { + items: event.Records, + // If there is not a MessageGroupId, then we don't care about + // the ordering for the event. We can just generate a UUID for the + // ordering key. + orderBy: (record) => record.attributes.MessageGroupId ?? uuid(), + concurrency: this.config.concurrency ?? 5, + stopOnError: false, + }, + async (record) => { + const messageLogger = context.logger.child({ + messageId: record.messageId, + }); + + const parsedMessage = this.config.parseMessage(record.body); + + for (const action of this.messageActions) { + await action({ ...context, logger: messageLogger }, parsedMessage); + } + + messageLogger.info('Successfully processed message'); + }, + ); context.logger.info('Succesfully processed all messages'); }); @@ -120,7 +142,11 @@ export class SQSMessageHandler { (msg) => // We don't need to mock every field on this event -- there are lots. // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - ({ messageId: uuid(), body: stringifyMessage(msg) } as any), + ({ + attributes: {}, + messageId: uuid(), + body: stringifyMessage(msg), + } as any), ), }; diff --git a/src/utils.ts b/src/utils.ts index ef9d132..0085df4 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,5 +1,7 @@ import { LoggerInterface } from '@lifeomic/logging'; import { Context } from 'aws-lambda'; +import pMap from 'p-map'; +import groupBy from 'lodash/groupBy'; export type BaseContext = { logger: LoggerInterface; @@ -22,3 +24,49 @@ export const withHealthCheckHandling = return handler(event, context); }; + +export type ProcessWithOrderingParams = { + items: T[]; + orderBy: (msg: T) => string; + concurrency: number; + stopOnError: boolean; +}; + +/** + * A utility for performing parallel asynchronous processing of a + * list of items, while also maintaining ordering. + * + * For example, in the case of DynamoDB streams: + * + * We want to maximize throughput, while also maintaining the ordering + * guarantees from Dynamo. + * + * Dynamo guarantees that we will not receive events out-of-order for a + * single item. But, it is possible that we will receive multiple events + * for the same item in a single batch. + * + * So, we can handle events concurrently, but we need to ensure we never + * handle multiple events for the same item at the same time. To prevent + * that, we will: + * - Re-organize the list of events into a "list of lists", where each + * element corresponds to a single item. + * - Then, we'll process the lists in parallel. + * + * This same scenario is true for SQS FIFO queues, which will order messages + * by MessageGroupId. + */ +export const processWithOrdering = async ( + params: ProcessWithOrderingParams, + process: (item: T) => Promise, +) => { + const lists = Object.values(groupBy(params.items, params.orderBy)); + await pMap( + lists, + async (list) => { + for (const item of list) { + await process(item); + } + }, + { concurrency: params.concurrency, stopOnError: params.stopOnError }, + ); +}; diff --git a/tsconfig.json b/tsconfig.json index be6163b..88738aa 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -8,7 +8,7 @@ "outDir": "./dist", "inlineSources": false, "inlineSourceMap": false, - "esModuleInterop": false, + "esModuleInterop": true, "declaration": true } } diff --git a/yarn.lock b/yarn.lock index f2ea613..fc2c0a3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1868,6 +1868,11 @@ resolved "https://registry.yarnpkg.com/@types/json-schema/-/json-schema-7.0.9.tgz#97edc9037ea0c38585320b28964dde3b39e4660d" integrity sha512-qcUXuemtEu+E5wZSJHNxUXeCZhAfXKQ41D+duX+VYPde7xyEVZci+/oXKJL13tnRs9lR2pr4fod59GT6/X1/yQ== +"@types/lodash@^4.14.195": + version "4.14.195" + resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.195.tgz#bafc975b252eb6cea78882ce8a7b6bf22a6de632" + integrity sha512-Hwx9EUgdwf2GLarOjQp5ZH8ZmblzcbTBC2wtQWNKARBSxM9ezRIAUpeDTgoQRAFB0+8CNWXVA9+MaSOzOF3nPg== + "@types/minimist@^1.2.0": version "1.2.2" resolved "https://registry.yarnpkg.com/@types/minimist/-/minimist-1.2.2.tgz#ee771e2ba4b3dc5b372935d549fd9617bf345b8c"