Skip to content

Commit

Permalink
fix: optimize for HighLevelProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
kalinkrustev committed May 16, 2024
1 parent 539b90f commit 17b086e
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
"@mojaloop/central-services-logger": "11.3.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.3.5",
"@mojaloop/central-services-stream": "11.2.5",
"@mojaloop/central-services-stream": "11.2.6",
"@mojaloop/event-sdk": "14.0.3-snapshot.3",
"@mojaloop/ml-number": "11.2.4",
"@mojaloop/object-store-lib": "12.0.3",
Expand Down
12 changes: 6 additions & 6 deletions src/handlers/positions/handlerBatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ const positions = async (error, messages) => {
// Iterate through consumedMessages
const bins = {}
const lastPerPartition = {}
for (const message of consumedMessages) {

This comment has been minimized.

Copy link
@vijayg10

vijayg10 May 17, 2024

Contributor

HI @kalinkrustev is it safe to use promise.all for consumedMessages instead of for loop? Don't we get race conditions?
May be we can span.audit can be taken out of the for loop and using Promise.all for that?

This comment has been minimized.

Copy link
@kalinkrustev

kalinkrustev May 17, 2024

Author Contributor

I do not think there will be race conditions, the rest of the code in the loop is sync and will be executed sequentially. The only difference is that the await happens outside the loop. What will change compared to before is what happens in case of an error. The new code will execute the whole loop before noticing any error. But I do not see an issue with that.

await Promise.all(consumedMessages.map(message => {
const histTimerMsgEnd = Metrics.getHistogram(
'transfer_position',
'Process a prepare transfer message',
Expand Down Expand Up @@ -126,8 +126,8 @@ const positions = async (error, messages) => {
lastPerPartition[message.partition] = message
}

await span.audit(message, EventSdk.AuditEventAction.start)
}
return span.audit(message, EventSdk.AuditEventAction.start)
}))

// Start DB Transaction
const trx = await BatchPositionModel.startDbTransaction()
Expand All @@ -148,12 +148,12 @@ const positions = async (error, messages) => {
await trx.commit()

// Loop through results and produce notification messages and audit messages
for (const item of result.notifyMessages) {
await Promise.all(result.notifyMessages.map(item => {
// Produce notification message and audit message
const action = item.binItem.message?.value.metadata.event.action
const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span)
}
return Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span)
}))
histTimerEnd({ success: true })
} catch (err) {
// If Bin Processor returns failure
Expand Down

0 comments on commit 17b086e

Please sign in to comment.