From 8087b66cb777e76e889b1da858545e4fb5d861e6 Mon Sep 17 00:00:00 2001 From: Swain Molster Date: Wed, 2 Aug 2023 12:16:34 -0400 Subject: [PATCH] address PR feedback --- README.md | 2 +- src/dynamo-streams.test.ts | 2 +- src/dynamo-streams.ts | 37 +++++++++++++++++-------------------- src/sqs.test.ts | 24 +++++++++++++++++++++++- src/sqs.ts | 2 +- src/utils.ts | 8 +------- 6 files changed, 44 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index d13fc62..14a75bc 100644 --- a/README.md +++ b/README.md @@ -173,4 +173,4 @@ In `DynamoStreamHandler`, events for the same _key_ will always be processed ser In `SQSMessageHandler`, events with the same `MessageGroupId` will always processed serially -- events with different `MessageGroupId` values will be processed in parallel. -**Note**: when using a `concurrency` value of `1`, the ordering semantics above will still be preserved. But, events that do _not_ need to be ordered will not necessarily be processed in the same order they were received in the batch. +**Note**: while the ordering semantics above will always be preserved, events that do _not_ need to be ordered will not necessarily be processed in the same order they were received in the batch (even when using a `concurrency` value of `1`). diff --git a/src/dynamo-streams.test.ts b/src/dynamo-streams.test.ts index 3f4e30a..80abfbf 100644 --- a/src/dynamo-streams.test.ts +++ b/src/dynamo-streams.test.ts @@ -615,7 +615,7 @@ describe('DynamoStreamHandler', () => { const end = Date.now(); - // This assertion confirms that the group doesn't process in less than 200ms. + // This assertion confirms that the group doesn't process in less than 300ms. // If it did, then some of the ordered events would be parallelized, which would be bad. expect(end - start).toBeGreaterThanOrEqual(300); diff --git a/src/dynamo-streams.ts b/src/dynamo-streams.ts index 9c78e95..a7acee1 100644 --- a/src/dynamo-streams.ts +++ b/src/dynamo-streams.ts @@ -80,22 +80,6 @@ export type TestEvent = { records: TestRecord[]; }; -/** - * Stringifies an unmarshalled DynamoDB key deterministically, regardless - * of ordering of keys. - */ -const deterministicStringify = (obj: { - [key: string]: string | number | boolean; -}): string => - JSON.stringify( - Object.keys(obj) - .sort() - .reduce((result, key) => { - result[key] = obj[key]; - return result; - }, {} as any), - ); - /** * An abstraction for a DynamoDB stream handler. */ @@ -200,13 +184,26 @@ export class DynamoStreamHandler { { items: event.Records, orderBy: (record) => { + const KeyObject = record.dynamodb?.Keys; + // This scenario should only ever happen in tests. - if (!record.dynamodb?.Keys) { + if (!KeyObject) { return uuid(); } - return deterministicStringify( - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - unmarshall(record.dynamodb.Keys as any), + + // We need to order by key -- so, just stringify the key. + // + // But, add custom logic to ensure that the key object is stringified + // determinstically, regardless of the order of its keys. (e.g. we + // should stringify { a: 1, b: 2 } and { b: 2, a: 1 } to the same string) + // + // It's possible that AWS already ensures that the keys are deterministically + // ordered, and therefore we don't need to do this. But we add this logic just + // to be extra sure. + return JSON.stringify( + Object.keys(KeyObject) + .sort() + .map((key) => [key, KeyObject[key]]), ); }, concurrency: this.config.concurrency ?? 5, diff --git a/src/sqs.test.ts b/src/sqs.test.ts index 7e6ff19..3dac844 100644 --- a/src/sqs.test.ts +++ b/src/sqs.test.ts @@ -283,10 +283,18 @@ describe('SQSMessageHandler', () => { body: JSON.stringify({ data: 'test-event-1' }), }, { attributes: {}, body: JSON.stringify({ data: 'test-event-2' }) }, + { + attributes: { MessageGroupId: 'group-id-2' }, + body: JSON.stringify({ data: 'test-event-other-1' }), + }, { attributes: { MessageGroupId: 'group-id' }, body: JSON.stringify({ data: 'test-event-3' }), }, + { + attributes: { MessageGroupId: 'group-id-2' }, + body: JSON.stringify({ data: 'test-event-other-2' }), + }, { attributes: {}, body: JSON.stringify({ data: 'test-event-4' }) }, ] as any, }, @@ -299,8 +307,10 @@ describe('SQSMessageHandler', () => { // If it did, then the events would be fully parallelized, which would be bad. expect(end - start).toBeGreaterThan(200); - // Now, let's also assert that event 3 was processed _after_ the end of event 1. + // This assertion confirms that there is at least some parallelization happening. + expect(end - start).toBeLessThanOrEqual(450); + // Now, let's also assert that event 3 was processed _after_ the end of event 1. const event1FinishedTime = messageFinished.mock.calls.find( (call) => call[0].data === 'test-event-1', )[1]; @@ -310,6 +320,18 @@ describe('SQSMessageHandler', () => { )[1]; expect(event3StartedTime).toBeGreaterThanOrEqual(event1FinishedTime); + + const eventOther1FinishedTime = messageFinished.mock.calls.find( + (call) => call[0].data === 'test-event-other-1', + )[1]; + + const eventOther2StartedTime = messageStarted.mock.calls.find( + (call) => call[0].data === 'test-event-other-2', + )[1]; + + expect(eventOther2StartedTime).toBeGreaterThanOrEqual( + eventOther1FinishedTime, + ); }); }); }); diff --git a/src/sqs.ts b/src/sqs.ts index 228a48c..32246ef 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -95,7 +95,7 @@ export class SQSMessageHandler { await processWithOrdering( { items: event.Records, - // If there is not a MessageGroupId, then we can don't care about + // If there is not a MessageGroupId, then we don't care about // the ordering for the event. We can just generate a UUID for the // ordering key. orderBy: (record) => record.attributes.MessageGroupId ?? uuid(), diff --git a/src/utils.ts b/src/utils.ts index e40a661..0085df4 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -25,12 +25,6 @@ export const withHealthCheckHandling = return handler(event, context); }; -const groupIntoLists = (arr: T[], predicate: (item: T) => string): T[][] => { - const grouped = groupBy(arr, (r) => predicate(r)); - const entries = Object.entries(grouped); - return entries.map(([, items]) => items); -}; - export type ProcessWithOrderingParams = { items: T[]; orderBy: (msg: T) => string; @@ -65,7 +59,7 @@ export const processWithOrdering = async ( params: ProcessWithOrderingParams, process: (item: T) => Promise, ) => { - const lists = groupIntoLists(params.items, params.orderBy); + const lists = Object.values(groupBy(params.items, params.orderBy)); await pMap( lists, async (list) => {