Skip to content

Commit

Permalink
address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Swain Molster committed Aug 2, 2023
1 parent 39785fa commit 8087b66
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 31 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,4 @@ In `DynamoStreamHandler`, events for the same _key_ will always be processed ser

In `SQSMessageHandler`, events with the same `MessageGroupId` will always processed serially -- events with different `MessageGroupId` values will be processed in parallel.

**Note**: when using a `concurrency` value of `1`, the ordering semantics above will still be preserved. But, events that do _not_ need to be ordered will not necessarily be processed in the same order they were received in the batch.
**Note**: while the ordering semantics above will always be preserved, events that do _not_ need to be ordered will not necessarily be processed in the same order they were received in the batch (even when using a `concurrency` value of `1`).
2 changes: 1 addition & 1 deletion src/dynamo-streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ describe('DynamoStreamHandler', () => {

const end = Date.now();

// This assertion confirms that the group doesn't process in less than 200ms.
// This assertion confirms that the group doesn't process in less than 300ms.
// If it did, then some of the ordered events would be parallelized, which would be bad.
expect(end - start).toBeGreaterThanOrEqual(300);

Expand Down
37 changes: 17 additions & 20 deletions src/dynamo-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,6 @@ export type TestEvent<Entity> = {
records: TestRecord<Entity>[];
};

/**
* Stringifies an unmarshalled DynamoDB key deterministically, regardless
* of ordering of keys.
*/
const deterministicStringify = (obj: {
[key: string]: string | number | boolean;
}): string =>
JSON.stringify(
Object.keys(obj)
.sort()
.reduce((result, key) => {
result[key] = obj[key];
return result;
}, {} as any),
);

/**
* An abstraction for a DynamoDB stream handler.
*/
Expand Down Expand Up @@ -200,13 +184,26 @@ export class DynamoStreamHandler<Entity, Context> {
{
items: event.Records,
orderBy: (record) => {
const KeyObject = record.dynamodb?.Keys;

// This scenario should only ever happen in tests.
if (!record.dynamodb?.Keys) {
if (!KeyObject) {
return uuid();
}
return deterministicStringify(
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
unmarshall(record.dynamodb.Keys as any),

// We need to order by key -- so, just stringify the key.
//
// But, add custom logic to ensure that the key object is stringified
// determinstically, regardless of the order of its keys. (e.g. we
// should stringify { a: 1, b: 2 } and { b: 2, a: 1 } to the same string)
//
// It's possible that AWS already ensures that the keys are deterministically
// ordered, and therefore we don't need to do this. But we add this logic just
// to be extra sure.
return JSON.stringify(
Object.keys(KeyObject)
.sort()
.map((key) => [key, KeyObject[key]]),
);
},
concurrency: this.config.concurrency ?? 5,
Expand Down
24 changes: 23 additions & 1 deletion src/sqs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,18 @@ describe('SQSMessageHandler', () => {
body: JSON.stringify({ data: 'test-event-1' }),
},
{ attributes: {}, body: JSON.stringify({ data: 'test-event-2' }) },
{
attributes: { MessageGroupId: 'group-id-2' },
body: JSON.stringify({ data: 'test-event-other-1' }),
},
{
attributes: { MessageGroupId: 'group-id' },
body: JSON.stringify({ data: 'test-event-3' }),
},
{
attributes: { MessageGroupId: 'group-id-2' },
body: JSON.stringify({ data: 'test-event-other-2' }),
},
{ attributes: {}, body: JSON.stringify({ data: 'test-event-4' }) },
] as any,
},
Expand All @@ -299,8 +307,10 @@ describe('SQSMessageHandler', () => {
// If it did, then the events would be fully parallelized, which would be bad.
expect(end - start).toBeGreaterThan(200);

// Now, let's also assert that event 3 was processed _after_ the end of event 1.
// This assertion confirms that there is at least some parallelization happening.
expect(end - start).toBeLessThanOrEqual(450);

// Now, let's also assert that event 3 was processed _after_ the end of event 1.
const event1FinishedTime = messageFinished.mock.calls.find(
(call) => call[0].data === 'test-event-1',
)[1];
Expand All @@ -310,6 +320,18 @@ describe('SQSMessageHandler', () => {
)[1];

expect(event3StartedTime).toBeGreaterThanOrEqual(event1FinishedTime);

const eventOther1FinishedTime = messageFinished.mock.calls.find(
(call) => call[0].data === 'test-event-other-1',
)[1];

const eventOther2StartedTime = messageStarted.mock.calls.find(
(call) => call[0].data === 'test-event-other-2',
)[1];

expect(eventOther2StartedTime).toBeGreaterThanOrEqual(
eventOther1FinishedTime,
);
});
});
});
2 changes: 1 addition & 1 deletion src/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export class SQSMessageHandler<Message, Context> {
await processWithOrdering(
{
items: event.Records,
// If there is not a MessageGroupId, then we can don't care about
// If there is not a MessageGroupId, then we don't care about
// the ordering for the event. We can just generate a UUID for the
// ordering key.
orderBy: (record) => record.attributes.MessageGroupId ?? uuid(),
Expand Down
8 changes: 1 addition & 7 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@ export const withHealthCheckHandling =
return handler(event, context);
};

const groupIntoLists = <T>(arr: T[], predicate: (item: T) => string): T[][] => {
const grouped = groupBy(arr, (r) => predicate(r));
const entries = Object.entries(grouped);
return entries.map(([, items]) => items);
};

export type ProcessWithOrderingParams<T> = {
items: T[];
orderBy: (msg: T) => string;
Expand Down Expand Up @@ -65,7 +59,7 @@ export const processWithOrdering = async <T>(
params: ProcessWithOrderingParams<T>,
process: (item: T) => Promise<void>,
) => {
const lists = groupIntoLists(params.items, params.orderBy);
const lists = Object.values(groupBy(params.items, params.orderBy));
await pMap(
lists,
async (list) => {
Expand Down

0 comments on commit 8087b66

Please sign in to comment.