Skip to content

Commit

Permalink
Merge pull request #4 from 0xkalvin/feature/add-diagnostic-channels
Browse files Browse the repository at this point in the history
add diagnostic channels
  • Loading branch information
0xkalvin authored Jan 30, 2022
2 parents 12b751c + ab320e5 commit 6e8ed3b
Show file tree
Hide file tree
Showing 6 changed files with 1,941 additions and 22 deletions.
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,56 @@ process.once('SIGTERM', async (signal) => {
process.exit(0)
})
```

## Diagnostics Channels

The `sqs-poller` supports diagnostics channels (feature currently available only on Node.js v16+). It is the preferred way to instrument this package. The data is only published to these channels in case there are subscribers listening to them. The available channels are the following:
- `sqspoller:poller:eachMessage:start`
- Before a SQS message gets processed by the `eachMessage` handler, the SQS message gets published to this channel.

```js
const diagnosticsChannel = require('diagnostics_channel')

diagnosticsChannel.channel('sqspoller:poller:eachMessage:start').subscribe(({ message }) => {
console.log('body', message.Body)
console.log('ReceiptHandle', message.ReceiptHandle)
})
```

- `sqspoller:poller:eachMessage:end`
- When the `eachMessage` handler has been either resolved or rejected, the SQS message gets published to this channel.

```js
diagnosticsChannel.channel('sqspoller:poller:eachMessage:end').subscribe(({ message }) => {
console.log('body', message.Body)
console.log('ReceiptHandle', message.ReceiptHandle)
})
```
- `sqspoller:poller:eachMessage:error`
- When the `eachMessage` handler rejects, the SQS message and the error get published to this channel.

```js
diagnosticsChannel.channel('sqspoller:poller:eachMessage:error').subscribe(({ message, error }) => {
console.log('body', message.Body)
console.log('ReceiptHandle', message.ReceiptHandle)
console.log('error', error)
})
```
- `sqspoller:poller:eachBatch:start`
- Before a SQS message batch gets processed by the `eachBatch` handler, the batch (`messages`) gets published to this channel.
- `sqspoller:poller:eachBatch:end`
- When the `eachBatch` resolves or rejects, the message batch (`messages`) gets published to this channel.
- `sqspoller:poller:eachBatch:error`
- When the `eachBatch` rejects, the message batch (`messages`) and `error` get published to this channel.
- `sqspoller:poller:deleteMessage:start`
- Before deleting a message from SQS, the `message` gets published to this channel.
- `sqspoller:poller:deleteMessage:end`
- When the deleteMessage call resolves or rejects, the `message` gets published to this channel.
- `sqspoller:poller:deleteMessage:error`
- When the deleteMessage call rejects, the `message` and `error` get published to this channel.
- `sqspoller:poller:deleteBatch:start`
- Before deleting a message batch from SQS, the batch (`messages`) gets published to this channel.
- `sqspoller:poller:deleteBatch:end`
- When the deleteBatch call resolves or rejects, the batch (`messages`) gets published to this channel.
- `sqspoller:poller:deleteBatch:error`
- When the deleteBatch call rejects, the batch (`messages`) and `error` get published to this channel.
61 changes: 61 additions & 0 deletions examples/diagnostics-channels/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
const { SQS } = require('@aws-sdk/client-sqs')
const diagnosticsChannel = require('diagnostics_channel')

const { Poller } = require('sqs-poller')

const sqsEndpoint = 'http://localhost:9324'
const sqsRegion = 'us-east-1'
const queueURL = `${sqsEndpoint}/queue/foo-queue`

const sqs = new SQS({
endpoint: sqsEndpoint,
region: sqsRegion,
})

const poller = new Poller({
queueUrl: queueURL,
sqsClient: sqs
})

// Send 100 messages to sqs queue
function sendMessages() {
const entries = Array(10).fill(0).map((_, index) => ({
MessageBody: JSON.stringify({ foo: 'bar' }),
Id: index,
}))

const promises = Array(10).fill(0).map(() => sqs.sendMessageBatch({
QueueUrl: queueURL,
Entries: entries
}))

return Promise.all(promises)
}

async function run() {
await sendMessages()

poller.start({
eachMessage: async function (message) {
return message
},
})

let counter = 1

diagnosticsChannel.channel('sqspoller:poller:eachMessage:start').subscribe(({ message }) => {
console.log(`Message ${counter}, id ${message.MessageId} - start`);
message.counter = counter
counter++
})

diagnosticsChannel.channel('sqspoller:poller:eachMessage:end').subscribe(({ message }) => {
console.log(`Message ${message.counter}, id ${message.MessageId} - end`);
})

diagnosticsChannel.channel('sqspoller:poller:eachMessage:error').subscribe(({ message, error }) => {
console.log(`Message ${message.counter}, id ${message.MessageId} - error`, error);
})
}

run()
Loading

0 comments on commit 6e8ed3b

Please sign in to comment.