Skip to content

Commit

Permalink
Merge pull request #51 from lifeomic/parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
swain authored Aug 2, 2023
2 parents 87e1538 + 8087b66 commit f34b671
Show file tree
Hide file tree
Showing 9 changed files with 617 additions and 131 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const stream = new DynamoStreamHandler({
/* ... create the "context", e.g. data sources ... */
return { doSomething: () => null };
},
// Optionally specify a concurrency setting for processing events.
concurrency: 5,
})
.onInsert(async (ctx, entity) => {
// INSERT actions receive a single strongly typed new entities
Expand Down Expand Up @@ -107,6 +109,8 @@ const queue = new SQSMessageHandler({
/* ... create the "context", e.g. data sources ... */
return { doSomething: () => null };
},
// Optionally specify a concurrency setting for processing events.
concurrency: 5,
})
.onMessage(async (ctx, message) => {
// `ctx` contains the nice result of `createRunContext`:
Expand Down Expand Up @@ -158,3 +162,15 @@ test('something', async () => {
expect(context.doSomething).toHaveBeenCalledTimes(3)
})
```

### Parallel Processing + Ordering

By default, the abstractions in `@lifeomic/delta` (`DynamoStreamHandler` and `SQSMessageHandler`) will process events in parallel. To control the parallelization, specify a `concurrency` value when creating the handler.

These abstractions also ensure that within a batch of events correct _ordering_ of events is maintained according to the ordering semantics of the upstream event source, even when processing in parallel.

In `DynamoStreamHandler`, events for the same _key_ will always be processed serially -- events from different keys will be processed in parallel.

In `SQSMessageHandler`, events with the same `MessageGroupId` will always processed serially -- events with different `MessageGroupId` values will be processed in parallel.

**Note**: while the ordering semantics above will always be preserved, events that do _not_ need to be ordered will not necessarily be processed in the same order they were received in the batch (even when using a `concurrency` value of `1`).
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"@lifeomic/logging": "^4.0.0",
"@lifeomic/typescript-config": "^1.0.3",
"@types/jest": "^27.4.1",
"@types/lodash": "^4.14.195",
"@types/uuid": "^8.3.4",
"conventional-changelog-conventionalcommits": "^4.6.3",
"eslint": "^8.9.0",
Expand All @@ -44,6 +45,8 @@
"dependencies": {
"@aws-sdk/util-dynamodb": "^3.369.0",
"@types/aws-lambda": "^8.10.92",
"lodash": "^4.17.21",
"p-map": "^4.0.0",
"uuid": "^8.3.2"
},
"peerDependencies": {
Expand Down
Loading

0 comments on commit f34b671

Please sign in to comment.