Skip to content

Commit

Permalink
perf: TxSearch pagination (backport cometbft#2855) (cometbft#2910)
Browse files Browse the repository at this point in the history
Since moving to faster blocks, Osmosis public RPC nodes have noticed
massive RAM spikes, resulting in nodes constantly crashing:

![Screenshot 2024-04-20 at 11 25
36 AM](https://github.com/osmosis-labs/cometbft/assets/40078083/18d0513e-25fc-4510-b4bd-b48472a9df69)

After heap profiling, the issue was clearly coming from TxSearch,
showing that it was unmarshaling a huge amount of data.

![Screenshot 2024-04-20 at 11 28
29 AM](https://github.com/osmosis-labs/cometbft/assets/40078083/5d88a66a-c72d-4752-8770-a2c00e6d7669)

After looking into the method, the issue is that txSearch retrieves all
hashes (filtered by the query condition), but we call Get (and therefore
unmarshal) every filtered transaction from the transaction index store,
regaurdless whether or not the transactions are within the pagination
request. Therefore, if one were to call txSearch on an event that
happens on almost every transaction, this causes the node to unmarshal
essentially every transaction.

We have all the data we need in the key though to sort the transaction
hashes without unmarshaling the transactions at all! This PR filters and
sorts the hashes, paginates them, and then only retrieves the
transactions that fall in the page being requested.

We have run this patch on two of our RPC nodes, and have seen zero
spikes on the patched ones thus far!

![Screenshot 2024-04-20 at 11 33
11 AM](https://github.com/osmosis-labs/cometbft/assets/40078083/fd815f81-5756-45bd-b1c0-818e6774ea53)

- [x] Tests written/updated
- [x] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [x] Updated relevant documentation (`docs/` or `spec/`) and code
comments
- [x] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec
<hr>This is an automatic backport of pull request cometbft#2855 done by
[Mergify](https://mergify.com).

---------

Co-authored-by: Adam Tucker <adam@osmosis.team>
Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
  • Loading branch information
3 people authored and PaddyMc committed Aug 19, 2024
1 parent 58c0aef commit 5bd76b0
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 117 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[state/indexer]` Fix txSearch performance issue
([\#2855](https://github.com/cometbft/cometbft/pull/2855))
4 changes: 2 additions & 2 deletions inspect/inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func TestTxSearch(t *testing.T) {
txIndexerMock.On("Search", mock.Anything,
mock.MatchedBy(func(q *query.Query) bool {
return testQuery == strings.ReplaceAll(q.String(), " ", "")
})).
Return([]*abcitypes.TxResult{testTxResult}, nil)
}), mock.Anything).
Return([]*abcitypes.TxResult{testTxResult}, 1, nil)

rpcConfig := config.TestRPCConfig()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock)
Expand Down
4 changes: 2 additions & 2 deletions rpc/core/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ func (env *Environment) BlockSearch(

// sort results (must be done before pagination)
switch orderBy {
case "desc", "":
case Descending, "":
sort.Slice(results, func(i, j int) bool { return results[i] > results[j] })

case "asc":
case Ascending:
sort.Slice(results, func(i, j int) bool { return results[i] < results[j] })

default:
Expand Down
9 changes: 9 additions & 0 deletions rpc/core/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package core

type ErrInvalidOrderBy struct {
OrderBy string
}

func (e ErrInvalidOrderBy) Error() string {
return "invalid order_by: expected `asc`, `desc` or ``, but got " + e.OrderBy
}
60 changes: 25 additions & 35 deletions rpc/core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package core
import (
"errors"
"fmt"
"sort"

cmtmath "github.com/cometbft/cometbft/libs/math"
cmtquery "github.com/cometbft/cometbft/libs/pubsub/query"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types"
"github.com/cometbft/cometbft/state/txindex"
"github.com/cometbft/cometbft/state/txindex/null"
"github.com/cometbft/cometbft/types"
)

const (
Ascending = "asc"
Descending = "desc"
)

// Tx allows you to query the transaction results. `nil` could mean the
// transaction is in the mempool, invalidated, or was not sent in the first
// place.
Expand Down Expand Up @@ -67,52 +71,38 @@ func (env *Environment) TxSearch(
return nil, errors.New("maximum query length exceeded")
}

q, err := cmtquery.New(query)
if err != nil {
return nil, err
// if orderBy is not "asc", "desc", or blank, return error
if orderBy != "" && orderBy != Ascending && orderBy != Descending {
return nil, ErrInvalidOrderBy{orderBy}
}

results, err := env.TxIndexer.Search(ctx.Context(), q)
q, err := cmtquery.New(query)
if err != nil {
return nil, err
}

// sort results (must be done before pagination)
switch orderBy {
case "desc":
sort.Slice(results, func(i, j int) bool {
if results[i].Height == results[j].Height {
return results[i].Index > results[j].Index
}
return results[i].Height > results[j].Height
})
case "asc", "":
sort.Slice(results, func(i, j int) bool {
if results[i].Height == results[j].Height {
return results[i].Index < results[j].Index
}
return results[i].Height < results[j].Height
})
default:
return nil, errors.New("expected order_by to be either `asc` or `desc` or empty")
// Validate number of results per page
perPage := env.validatePerPage(perPagePtr)
if pagePtr == nil {
// Default to page 1 if not specified
pagePtr = new(int)
*pagePtr = 1
}

// paginate results
totalCount := len(results)
perPage := env.validatePerPage(perPagePtr)
pagSettings := txindex.Pagination{
OrderDesc: orderBy == Descending,
IsPaginated: true,
Page: *pagePtr,
PerPage: perPage,
}

page, err := validatePage(pagePtr, perPage, totalCount)
results, totalCount, err := env.TxIndexer.Search(ctx.Context(), q, pagSettings)
if err != nil {
return nil, err
}

skipCount := validateSkipCount(page, perPage)
pageSize := cmtmath.MinInt(perPage, totalCount-skipCount)

apiResults := make([]*ctypes.ResultTx, 0, pageSize)
for i := skipCount; i < skipCount+pageSize; i++ {
r := results[i]

apiResults := make([]*ctypes.ResultTx, 0, len(results))
for _, r := range results {
var proof types.TxProof
if prove {
block := env.BlockStore.LoadBlock(r.Height)
Expand Down
4 changes: 2 additions & 2 deletions state/indexer/sink/psql/backport.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (BackportTxIndexer) Get([]byte) (*abci.TxResult, error) {

// Search is implemented to satisfy the TxIndexer interface, but it is not
// supported by the psql event sink and reports an error for all inputs.
func (BackportTxIndexer) Search(context.Context, *query.Query) ([]*abci.TxResult, error) {
return nil, errors.New("the TxIndexer.Search method is not supported")
func (BackportTxIndexer) Search(context.Context, *query.Query, txindex.Pagination) ([]*abci.TxResult, int, error) {
return nil, 0, errors.New("the TxIndexer.Search method is not supported")
}

func (BackportTxIndexer) SetLogger(log.Logger) {}
Expand Down
12 changes: 11 additions & 1 deletion state/txindex/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type TxIndexer interface {
Get(hash []byte) (*abci.TxResult, error)

// Search allows you to query for transactions.
Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error)
Search(ctx context.Context, q *query.Query, pagSettings Pagination) ([]*abci.TxResult, int, error)

//Set Logger
SetLogger(l log.Logger)
Expand All @@ -39,6 +39,16 @@ type Batch struct {
Ops []*abci.TxResult
}

// Pagination provides pagination information for queries.
// This allows us to use the same TxSearch API for pruning to return all relevant data,
// while still limiting public queries to pagination.
type Pagination struct {
OrderDesc bool
IsPaginated bool
Page int
PerPage int
}

// NewBatch creates a new Batch.
func NewBatch(n int64) *Batch {
return &Batch{
Expand Down
Loading

0 comments on commit 5bd76b0

Please sign in to comment.