Skip to content

Commit

Permalink
fix: add wait group to handle message (#77)
Browse files Browse the repository at this point in the history
## Description


This PR adds wait group to handle messages concurrently. 

Background: 
When I used `parse blocks`cmd to parse 1 specific height that has transactions & messages,  the messages can be lost because of being handled through go routines and when the main program is done, all go routines are closed. 

## Checklist
- [x] Targeted PR against correct branch.
- [ ] Linked to Github issue with discussion and accepted design OR link to spec that describes this work.
- [ ] Wrote unit tests.  
- [x] Re-reviewed `Files changed` in the Github PR explorer.
  • Loading branch information
huichiaotsou authored and MonikaCat committed Jan 19, 2024
1 parent 9eeac40 commit a6bde6d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- ([\#74](https://github.com/forbole/juno/pull/74)) Added database block count to prometheus to improve alert monitoring
- ([\#75](https://github.com/forbole/juno/pull/75)) Allow modules to handle MsgExec inner messages
- ([\#76](https://github.com/forbole/juno/pull/76)) Return 0 as height for `GetLastBlockHeight()` method if there are no blocks saved in database
- ([\#77](https://github.com/forbole/juno/pull/77)) Add wait group to handle messages concurrently
- ([\#79](https://github.com/forbole/juno/pull/79)) Use `sqlx` instead of `sql` while dealing with a PostgreSQL database
- ([\#83](https://github.com/forbole/juno/pull/83)) Bump `github.com/tendermint/tendermint` to `v0.34.22`

Expand Down
7 changes: 6 additions & 1 deletion database/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ func (db *Database) GetLastBlockHeight() (int64, error) {
stmt := `SELECT height FROM block ORDER BY height DESC LIMIT 1;`

var height int64
if err := db.SQL.QueryRow(stmt).Scan(&height); err != nil {
err := db.SQL.QueryRow(stmt).Scan(&height)
if err != nil {
if strings.Contains(err.Error(), "no rows in result set") {
// If no rows stored in block table, return 0 as height
return 0, nil
}
return 0, fmt.Errorf("error while getting last block height, error: %s", err)
}

Expand Down
27 changes: 20 additions & 7 deletions parser/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parser
import (
"encoding/json"
"fmt"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/x/authz"
Expand Down Expand Up @@ -327,18 +328,23 @@ func (w Worker) handleMessage(index int, msg sdk.Msg, tx *types.Tx) {
// ExportTxs accepts a slice of transactions and persists then inside the database.
// An error is returned if the write fails.
func (w Worker) ExportTxs(txs []*types.Tx) error {
// handle all transactions inside the block
var wg sync.WaitGroup

for _, tx := range txs {
// save the transaction
// Save the transaction
err := w.saveTx(tx)
if err != nil {
return fmt.Errorf("error while storing txs: %s", err)
}

// call the tx handlers
go w.handleTx(tx)
// Handle the transaction concurrently
wg.Add(1)
go func(tx *types.Tx) {
defer wg.Done()
w.handleTx(tx)
}(tx)

// handle all messages contained inside the transaction
// Parse all the messages contained inside the transaction
sdkMsgs := make([]sdk.Msg, len(tx.Body.Messages))
for i, msg := range tx.Body.Messages {
var stdMsg sdk.Msg
Expand All @@ -349,12 +355,19 @@ func (w Worker) ExportTxs(txs []*types.Tx) error {
sdkMsgs[i] = stdMsg
}

// call the msg handlers
// Handle all the messages concurrently
for i, sdkMsg := range sdkMsgs {
go w.handleMessage(i, sdkMsg, tx)
wg.Add(1)
go func(i int, sdkMsg sdk.Msg) {
defer wg.Done()
w.handleMessage(i, sdkMsg, tx)
}(i, sdkMsg)
}
}

// Wait all transactions and messages to be parsed
wg.Wait()

totalBlocks := w.db.GetTotalBlocks()
logging.DbBlockCount.WithLabelValues("total_blocks_in_db").Set(float64(totalBlocks))

Expand Down

0 comments on commit a6bde6d

Please sign in to comment.