diff --git a/CHANGELOG.md b/CHANGELOG.md index 22617011..77914d1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - ([\#74](https://github.com/forbole/juno/pull/74)) Added database block count to prometheus to improve alert monitoring - ([\#75](https://github.com/forbole/juno/pull/75)) Allow modules to handle MsgExec inner messages - ([\#76](https://github.com/forbole/juno/pull/76)) Return 0 as height for `GetLastBlockHeight()` method if there are no blocks saved in database +- ([\#77](https://github.com/forbole/juno/pull/77)) Add wait group to handle messages concurrently - ([\#79](https://github.com/forbole/juno/pull/79)) Use `sqlx` instead of `sql` while dealing with a PostgreSQL database - ([\#83](https://github.com/forbole/juno/pull/83)) Bump `github.com/tendermint/tendermint` to `v0.34.22` diff --git a/database/postgresql/postgresql.go b/database/postgresql/postgresql.go index 1013a222..77d2c2ad 100644 --- a/database/postgresql/postgresql.go +++ b/database/postgresql/postgresql.go @@ -101,7 +101,12 @@ func (db *Database) GetLastBlockHeight() (int64, error) { stmt := `SELECT height FROM block ORDER BY height DESC LIMIT 1;` var height int64 - if err := db.SQL.QueryRow(stmt).Scan(&height); err != nil { + err := db.SQL.QueryRow(stmt).Scan(&height) + if err != nil { + if strings.Contains(err.Error(), "no rows in result set") { + // If no rows stored in block table, return 0 as height + return 0, nil + } return 0, fmt.Errorf("error while getting last block height, error: %s", err) } diff --git a/parser/worker.go b/parser/worker.go index 507b73dc..779a94db 100644 --- a/parser/worker.go +++ b/parser/worker.go @@ -3,6 +3,7 @@ package parser import ( "encoding/json" "fmt" + "sync" "time" "github.com/cosmos/cosmos-sdk/x/authz" @@ -327,18 +328,23 @@ func (w Worker) handleMessage(index int, msg sdk.Msg, tx *types.Tx) { // ExportTxs accepts a slice of transactions and persists then inside the database. // An error is returned if the write fails. func (w Worker) ExportTxs(txs []*types.Tx) error { - // handle all transactions inside the block + var wg sync.WaitGroup + for _, tx := range txs { - // save the transaction + // Save the transaction err := w.saveTx(tx) if err != nil { return fmt.Errorf("error while storing txs: %s", err) } - // call the tx handlers - go w.handleTx(tx) + // Handle the transaction concurrently + wg.Add(1) + go func(tx *types.Tx) { + defer wg.Done() + w.handleTx(tx) + }(tx) - // handle all messages contained inside the transaction + // Parse all the messages contained inside the transaction sdkMsgs := make([]sdk.Msg, len(tx.Body.Messages)) for i, msg := range tx.Body.Messages { var stdMsg sdk.Msg @@ -349,12 +355,19 @@ func (w Worker) ExportTxs(txs []*types.Tx) error { sdkMsgs[i] = stdMsg } - // call the msg handlers + // Handle all the messages concurrently for i, sdkMsg := range sdkMsgs { - go w.handleMessage(i, sdkMsg, tx) + wg.Add(1) + go func(i int, sdkMsg sdk.Msg) { + defer wg.Done() + w.handleMessage(i, sdkMsg, tx) + }(i, sdkMsg) } } + // Wait all transactions and messages to be parsed + wg.Wait() + totalBlocks := w.db.GetTotalBlocks() logging.DbBlockCount.WithLabelValues("total_blocks_in_db").Set(float64(totalBlocks))