diff --git a/CHANGELOG.md b/CHANGELOG.md index 42d4143fa1..a9b01dfa8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased * [#21](https://github.com/osmosis-labs/cometbft/pull/21) Move websocket logs to Debug +* [#22](https://github.com/osmosis-labs/cometbft/pull/22) Fix txSearch pagination performance issues ## v0.37.4-v24-osmo-2 diff --git a/go.mod b/go.mod index a089ee4f53..6ee88bb521 100644 --- a/go.mod +++ b/go.mod @@ -33,8 +33,8 @@ require ( github.com/spf13/cobra v1.6.1 github.com/spf13/viper v1.13.0 github.com/stretchr/testify v1.8.4 - golang.org/x/crypto v0.17.0 - golang.org/x/net v0.19.0 + golang.org/x/crypto v0.21.0 + golang.org/x/net v0.23.0 google.golang.org/grpc v1.58.3 ) @@ -279,8 +279,8 @@ require ( golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect golang.org/x/exp/typeparams v0.0.0-20220827204233-334a2380cb91 // indirect golang.org/x/mod v0.12.0 // indirect - golang.org/x/sys v0.15.0 // indirect - golang.org/x/term v0.15.0 // indirect + golang.org/x/sys v0.18.0 // indirect + golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect golang.org/x/tools v0.13.0 // indirect diff --git a/go.sum b/go.sum index 88a83ae85e..a36a9aa1fb 100644 --- a/go.sum +++ b/go.sum @@ -962,8 +962,8 @@ golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1061,8 +1061,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= -golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1171,15 +1171,15 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= -golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= -golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 684395fb11..f3ceb7a4b9 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -232,7 +232,7 @@ func BlockSearch( totalCount := len(results) perPage := validatePerPage(perPagePtr) - page, err := validatePage(pagePtr, perPage, totalCount) + page, err := ValidatePage(pagePtr, perPage, totalCount) if err != nil { return nil, err } diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 4d195e1c3d..05402ab42a 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -29,7 +29,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in totalCount := len(validators.Validators) perPage := validatePerPage(perPagePtr) - page, err := validatePage(pagePtr, perPage, totalCount) + page, err := ValidatePage(pagePtr, perPage, totalCount) if err != nil { return nil, err } diff --git a/rpc/core/env.go b/rpc/core/env.go index 5b87ab7dd0..73c2be0bbe 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -104,7 +104,7 @@ type Environment struct { //---------------------------------------------- -func validatePage(pagePtr *int, perPage, totalCount int) (int, error) { +func ValidatePage(pagePtr *int, perPage, totalCount int) (int, error) { if perPage < 1 { panic(fmt.Sprintf("zero or negative perPage: %d", perPage)) } diff --git a/rpc/core/env_test.go b/rpc/core/env_test.go index b44c21a4cb..54e7739d5a 100644 --- a/rpc/core/env_test.go +++ b/rpc/core/env_test.go @@ -40,7 +40,7 @@ func TestPaginationPage(t *testing.T) { } for _, c := range cases { - p, err := validatePage(&c.page, c.perPage, c.totalCount) + p, err := ValidatePage(&c.page, c.perPage, c.totalCount) if c.expErr { assert.Error(t, err) continue @@ -50,7 +50,7 @@ func TestPaginationPage(t *testing.T) { } // nil case - p, err := validatePage(nil, 1, 1) + p, err := ValidatePage(nil, 1, 1) if assert.NoError(t, err) { assert.Equal(t, 1, p) } diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 6fbc7b6ee5..11de06446c 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -3,9 +3,7 @@ package core import ( "errors" "fmt" - "sort" - cmtmath "github.com/cometbft/cometbft/libs/math" cmtquery "github.com/cometbft/cometbft/libs/pubsub/query" ctypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" @@ -70,47 +68,28 @@ func TxSearch( return nil, err } - results, err := env.TxIndexer.Search(ctx.Context(), q) - if err != nil { - return nil, err + // Validate number of results per page + perPage := validatePerPage(perPagePtr) + if pagePtr == nil { + // Default to page 1 if not specified + pagePtr = new(int) + *pagePtr = 1 } - // sort results (must be done before pagination) - switch orderBy { - case "desc": - sort.Slice(results, func(i, j int) bool { - if results[i].Height == results[j].Height { - return results[i].Index > results[j].Index - } - return results[i].Height > results[j].Height - }) - case "asc", "": - sort.Slice(results, func(i, j int) bool { - if results[i].Height == results[j].Height { - return results[i].Index < results[j].Index - } - return results[i].Height < results[j].Height - }) - default: - return nil, errors.New("expected order_by to be either `asc` or `desc` or empty") + pagSettings := ctypes.Pagination{ + OrderDesc: orderBy == "desc", + IsPaginated: true, + Page: *pagePtr, + PerPage: perPage, } - // paginate results - totalCount := len(results) - perPage := validatePerPage(perPagePtr) - - page, err := validatePage(pagePtr, perPage, totalCount) + results, totalCount, err := env.TxIndexer.Search(ctx.Context(), q, pagSettings) if err != nil { return nil, err } - skipCount := validateSkipCount(page, perPage) - pageSize := cmtmath.MinInt(perPage, totalCount-skipCount) - - apiResults := make([]*ctypes.ResultTx, 0, pageSize) - for i := skipCount; i < skipCount+pageSize; i++ { - r := results[i] - + apiResults := make([]*ctypes.ResultTx, 0, len(results)) + for _, r := range results { var proof types.TxProof if prove { block := env.BlockStore.LoadBlock(r.Height) diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 4e5cbb1c23..7221fe84ba 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -254,3 +254,13 @@ type ResultEvent struct { Data types.TMEventData `json:"data"` Events map[string][]string `json:"events"` } + +// Pagination provides pagination information for queries. +// This allows us to use the same TxSearch API for pruning to return all relevant data, +// while still limiting public queries to pagination. +type Pagination struct { + OrderDesc bool + IsPaginated bool + Page int + PerPage int +} diff --git a/state/indexer/sink/psql/backport.go b/state/indexer/sink/psql/backport.go index 184687318e..a05720fbf4 100644 --- a/state/indexer/sink/psql/backport.go +++ b/state/indexer/sink/psql/backport.go @@ -19,6 +19,7 @@ import ( abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/libs/pubsub/query" + ctypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/cometbft/cometbft/state/txindex" "github.com/cometbft/cometbft/types" ) @@ -55,8 +56,8 @@ func (BackportTxIndexer) Get([]byte) (*abci.TxResult, error) { // Search is implemented to satisfy the TxIndexer interface, but it is not // supported by the psql event sink and reports an error for all inputs. -func (BackportTxIndexer) Search(context.Context, *query.Query) ([]*abci.TxResult, error) { - return nil, errors.New("the TxIndexer.Search method is not supported") +func (BackportTxIndexer) Search(ctx context.Context, q *query.Query, pagSettings ctypes.Pagination) ([]*abci.TxResult, int, error) { + return nil, 0, errors.New("the TxIndexer.Search method is not supported") } // BlockIndexer returns a bridge that implements the CometBFT v0.34 block diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index a70c461c2f..bb71a3011a 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -6,6 +6,7 @@ import ( abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/libs/pubsub/query" + ctypes "github.com/cometbft/cometbft/rpc/core/types" ) // XXX/TODO: These types should be moved to the indexer package. @@ -25,7 +26,7 @@ type TxIndexer interface { Get(hash []byte) (*abci.TxResult, error) // Search allows you to query for transactions. - Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) + Search(ctx context.Context, q *query.Query, pagSettings ctypes.Pagination) ([]*abci.TxResult, int, error) } // Batch groups together multiple Index operations to be performed at the same time. diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index 308c6580fc..6369b04c4f 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "math/big" + "sort" "strconv" "strings" @@ -15,6 +16,7 @@ import ( abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/libs/pubsub/query" + ctypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/cometbft/cometbft/state/indexer" "github.com/cometbft/cometbft/state/txindex" "github.com/cometbft/cometbft/types" @@ -197,36 +199,36 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba // // Search will exit early and return any result fetched so far, // when a message is received on the context chan. -func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) { +func (txi *TxIndex) Search(ctx context.Context, q *query.Query, pagSettings ctypes.Pagination) ([]*abci.TxResult, int, error) { select { case <-ctx.Done(): - return make([]*abci.TxResult, 0), nil + return make([]*abci.TxResult, 0), 0, nil default: } var hashesInitialized bool - filteredHashes := make(map[string][]byte) + filteredHashes := make(map[string]TxInfo) // get a list of conditions (like "tx.height > 5") conditions, err := q.Conditions() if err != nil { - return nil, fmt.Errorf("error during parsing conditions from query: %w", err) + return nil, 0, fmt.Errorf("error during parsing conditions from query: %w", err) } // if there is a hash condition, return the result immediately hash, ok, err := lookForHash(conditions) if err != nil { - return nil, fmt.Errorf("error during searching for a hash in the query: %w", err) + return nil, 0, fmt.Errorf("error during searching for a hash in the query: %w", err) } else if ok { res, err := txi.Get(hash) switch { case err != nil: - return []*abci.TxResult{}, fmt.Errorf("error while retrieving the result: %w", err) + return []*abci.TxResult{}, 0, fmt.Errorf("error while retrieving the result: %w", err) case res == nil: - return []*abci.TxResult{}, nil + return []*abci.TxResult{}, 0, nil default: - return []*abci.TxResult{res}, nil + return []*abci.TxResult{res}, 0, nil } } @@ -296,14 +298,66 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul } } - results := make([]*abci.TxResult, 0, len(filteredHashes)) + numResults := len(filteredHashes) + + // Convert map keys to slice for deterministic ordering + hashKeys := make([]string, 0, numResults) + for k := range filteredHashes { + hashKeys = append(hashKeys, k) + } + + // Sort by height + sort.Slice(hashKeys, func(i, j int) bool { + hi := filteredHashes[hashKeys[i]].Height + hj := filteredHashes[hashKeys[j]].Height + if hi == hj { + // If heights are equal, sort lexicographically + if pagSettings.OrderDesc { + return hashKeys[i] > hashKeys[j] + } else { + return hashKeys[i] < hashKeys[j] + } + } + if pagSettings.OrderDesc { + return hi > hj + + } else { + return hi < hj + } + }) + + // If paginated, determine which hash keys to return + if pagSettings.IsPaginated { + // Now that we know the total number of results, validate that the page + // requested is within bounds + pagSettings.Page, err = validatePage(&pagSettings.Page, pagSettings.PerPage, numResults) + if err != nil { + return nil, 0, err + } + + // Calculate pagination start and end indices + startIndex := (pagSettings.Page - 1) * pagSettings.PerPage + endIndex := startIndex + pagSettings.PerPage + + // Apply pagination limits + if endIndex > len(hashKeys) { + endIndex = len(hashKeys) + } + if startIndex >= len(hashKeys) { + return []*abci.TxResult{}, 0, nil + } + + hashKeys = hashKeys[startIndex:endIndex] + } + + results := make([]*abci.TxResult, 0, len(hashKeys)) resultMap := make(map[string]struct{}) RESULTS_LOOP: - for _, h := range filteredHashes { - + for _, hKey := range hashKeys { + h := filteredHashes[hKey].TxBytes res, err := txi.Get(h) if err != nil { - return nil, fmt.Errorf("failed to get Tx{%X}: %w", h, err) + return nil, 0, fmt.Errorf("failed to get Tx{%X}: %w", h, err) } hashString := string(h) if _, ok := resultMap[hashString]; !ok { @@ -318,7 +372,7 @@ RESULTS_LOOP: } } - return results, nil + return results, numResults, nil } func lookForHash(conditions []query.Condition) (hash []byte, ok bool, err error) { @@ -331,9 +385,18 @@ func lookForHash(conditions []query.Condition) (hash []byte, ok bool, err error) return } -func (txi *TxIndex) setTmpHashes(tmpHeights map[string][]byte, it dbm.Iterator) { +func (txi *TxIndex) setTmpHashes(tmpHeights map[string]TxInfo, it dbm.Iterator, height int64) { eventSeq := extractEventSeqFromKey(it.Key()) - tmpHeights[string(it.Value())+eventSeq] = it.Value() + txInfo := TxInfo{ + TxBytes: it.Value(), + Height: height, + } + tmpHeights[string(it.Value())+eventSeq] = txInfo +} + +type TxInfo struct { + TxBytes []byte + Height int64 } // match returns all matching txs by hash that meet a given condition and start @@ -341,21 +404,26 @@ func (txi *TxIndex) setTmpHashes(tmpHeights map[string][]byte, it dbm.Iterator) // non-intersecting matches are removed. // // NOTE: filteredHashes may be empty if no previous condition has matched. +// +// Additionally, this method retrieves the height of the hash via the key, +// and adds it to the TxInfo struct, which is then added to the filteredHashes. +// This is done to paginate the results prior to retrieving all the TxResults, +// which is needed for performance reasons. func (txi *TxIndex) match( ctx context.Context, c query.Condition, startKeyBz []byte, - filteredHashes map[string][]byte, + filteredHashes map[string]TxInfo, firstRun bool, heightInfo HeightInfo, -) map[string][]byte { +) map[string]TxInfo { // A previous match was attempted but resulted in no matches, so we return // no matches (assuming AND operand). if !firstRun && len(filteredHashes) == 0 { return filteredHashes } - tmpHashes := make(map[string][]byte) + tmpHashes := make(map[string]TxInfo) switch c.Op { case query.OpEqual: @@ -375,7 +443,7 @@ func (txi *TxIndex) match( continue } - txi.setTmpHashes(tmpHashes, it) + txi.setTmpHashes(tmpHashes, it, keyHeight) // Potentially exit early. select { case <-ctx.Done(): @@ -402,7 +470,7 @@ func (txi *TxIndex) match( if err != nil || !checkHeightConditions(heightInfo, keyHeight) { continue } - txi.setTmpHashes(tmpHashes, it) + txi.setTmpHashes(tmpHashes, it, keyHeight) // Potentially exit early. select { @@ -436,7 +504,7 @@ func (txi *TxIndex) match( if err != nil || !checkHeightConditions(heightInfo, keyHeight) { continue } - txi.setTmpHashes(tmpHashes, it) + txi.setTmpHashes(tmpHashes, it, keyHeight) } // Potentially exit early. @@ -463,24 +531,25 @@ func (txi *TxIndex) match( // 2. A previous match was not attempted, so we return all results. return tmpHashes } - // Remove/reduce matches in filteredHashes that were not found in this // match (tmpHashes). REMOVE_LOOP: for k, v := range filteredHashes { tmpHash := tmpHashes[k] - if tmpHash == nil || !bytes.Equal(tmpHash, v) { + if tmpHash.TxBytes == nil || !bytes.Equal(tmpHash.TxBytes, v.TxBytes) { delete(filteredHashes, k) - - // Potentially exit early. - select { - case <-ctx.Done(): - break REMOVE_LOOP - default: - } + } else { + // If there is a match, update the height in filteredHashes + v.Height = tmpHash.Height + filteredHashes[k] = v + } + // Potentially exit early. + select { + case <-ctx.Done(): + break REMOVE_LOOP + default: } } - return filteredHashes } @@ -489,21 +558,26 @@ REMOVE_LOOP: // any non-intersecting matches are removed. // // NOTE: filteredHashes may be empty if no previous condition has matched. +// +// Additionally, this method retrieves the height of the hash via the key, +// and adds it to the TxInfo struct, which is then added to the filteredHashes. +// This is done to paginate the results prior to retrieving all the TxResults, +// which is needed for performance reasons. func (txi *TxIndex) matchRange( ctx context.Context, qr indexer.QueryRange, startKey []byte, - filteredHashes map[string][]byte, + filteredHashes map[string]TxInfo, firstRun bool, heightInfo HeightInfo, -) map[string][]byte { +) map[string]TxInfo { // A previous match was attempted but resulted in no matches, so we return // no matches (assuming AND operand). if !firstRun && len(filteredHashes) == 0 { return filteredHashes } - tmpHashes := make(map[string][]byte) + tmpHashes := make(map[string]TxInfo) it, err := dbm.IteratePrefix(txi.store, startKey) if err != nil { @@ -524,15 +598,22 @@ LOOP: if !ok { continue LOOP } + // Regardless of the query condition, we retrieve the height in order to sort later + keyHeight, err := extractHeightFromKey(it.Key()) + if err != nil { + continue LOOP + } if qr.Key != types.TxHeightKey { - keyHeight, err := extractHeightFromKey(it.Key()) - if err != nil || !checkHeightConditions(heightInfo, keyHeight) { + // If the query condition specifies a height range, we need to check if the height + // of the transaction is within the range + if !checkHeightConditions(heightInfo, keyHeight) { continue LOOP } } + if checkBounds(qr, v) { - txi.setTmpHashes(tmpHashes, it) + txi.setTmpHashes(tmpHashes, it, keyHeight) } // XXX: passing time in a ABCI Events is not yet implemented @@ -570,15 +651,19 @@ LOOP: REMOVE_LOOP: for k, v := range filteredHashes { tmpHash := tmpHashes[k] - if tmpHash == nil || !bytes.Equal(tmpHashes[k], v) { + if tmpHash.TxBytes == nil || !bytes.Equal(tmpHash.TxBytes, v.TxBytes) { delete(filteredHashes, k) + } else { + // If there is a match, update the height in filteredHashes + v.Height = tmpHash.Height + filteredHashes[k] = v + } - // Potentially exit early. - select { - case <-ctx.Done(): - break REMOVE_LOOP - default: - } + // Potentially exit early. + select { + case <-ctx.Done(): + break REMOVE_LOOP + default: } } @@ -688,3 +773,24 @@ func lookForHeight(conditions []query.Condition) (height int64) { } return 0 } + +func validatePage(pagePtr *int, perPage, totalCount int) (int, error) { + if perPage < 1 { + return 1, fmt.Errorf("zero or negative perPage: %d", perPage) + } + + if pagePtr == nil { // no page parameter + return 1, nil + } + + pages := ((totalCount - 1) / perPage) + 1 + if pages == 0 { + pages = 1 // one page (even if it's empty) + } + page := *pagePtr + if page <= 0 || page > pages { + return 1, fmt.Errorf("page should be within [1, %d] range, given %d", pages, page) + } + + return page, nil +} diff --git a/state/txindex/kv/kv_bench_test.go b/state/txindex/kv/kv_bench_test.go index 7b99ea1a12..a2592ccaaf 100644 --- a/state/txindex/kv/kv_bench_test.go +++ b/state/txindex/kv/kv_bench_test.go @@ -67,7 +67,7 @@ func BenchmarkTxSearch(b *testing.B) { ctx := context.Background() for i := 0; i < b.N; i++ { - if _, err := indexer.Search(ctx, txQuery); err != nil { + if _, _, err := indexer.Search(ctx, txQuery, DefaultPagination); err != nil { b.Errorf("failed to query for txs: %s", err) } } diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index e8246ece04..e779095e4e 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -15,10 +15,18 @@ import ( abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/libs/pubsub/query" cmtrand "github.com/cometbft/cometbft/libs/rand" + ctypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/cometbft/cometbft/state/txindex" "github.com/cometbft/cometbft/types" ) +var DefaultPagination = ctypes.Pagination{ + IsPaginated: true, + Page: 1, + PerPage: 100, + OrderDesc: false, +} + func TestBigInt(t *testing.T) { indexer := NewTxIndex(db.NewMemDB()) @@ -80,7 +88,7 @@ func TestBigInt(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.q, func(t *testing.T) { - results, err := indexer.Search(ctx, query.MustParse(tc.q)) + results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination) assert.NoError(t, err) assert.Len(t, results, tc.resultsLength) if tc.resultsLength > 0 && tc.txRes != nil { @@ -212,7 +220,7 @@ func TestTxSearch(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.q, func(t *testing.T) { - results, err := indexer.Search(ctx, query.MustParse(tc.q)) + results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination) assert.NoError(t, err) assert.Len(t, results, tc.resultsLength) @@ -310,7 +318,7 @@ func TestTxSearchEventMatch(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.q, func(t *testing.T) { - results, err := indexer.Search(ctx, query.MustParse(tc.q)) + results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination) assert.NoError(t, err) assert.Len(t, results, tc.resultsLength) @@ -386,7 +394,7 @@ func TestTxSearchEventMatchByHeight(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.q, func(t *testing.T) { - results, err := indexer.Search(ctx, query.MustParse(tc.q)) + results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination) assert.NoError(t, err) assert.Len(t, results, tc.resultsLength) @@ -418,7 +426,7 @@ func TestTxSearchWithCancelation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - results, err := indexer.Search(ctx, query.MustParse("account.number = 1")) + results, _, err := indexer.Search(ctx, query.MustParse("account.number = 1"), DefaultPagination) assert.NoError(t, err) assert.Empty(t, results) } @@ -491,7 +499,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.q, func(t *testing.T) { - results, err := indexer.Search(ctx, query.MustParse(tc.q)) + results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination) require.NoError(t, err) for _, txr := range results { for _, tr := range tc.results { @@ -573,7 +581,7 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { ctx := context.Background() for _, tc := range testCases { - results, err := indexer.Search(ctx, query.MustParse(tc.q)) + results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination) assert.NoError(t, err) len := 0 if tc.found { @@ -730,7 +738,7 @@ func TestTxSearchMultipleTxs(t *testing.T) { ctx := context.Background() - results, err := indexer.Search(ctx, query.MustParse("account.number >= 1")) + results, _, err := indexer.Search(ctx, query.MustParse("account.number >= 1"), DefaultPagination) assert.NoError(t, err) require.Len(t, results, 3) diff --git a/state/txindex/mocks/tx_indexer.go b/state/txindex/mocks/tx_indexer.go index daaca389f1..280e9f2399 100644 --- a/state/txindex/mocks/tx_indexer.go +++ b/state/txindex/mocks/tx_indexer.go @@ -5,9 +5,11 @@ package mocks import ( context "context" - query "github.com/cometbft/cometbft/libs/pubsub/query" + coretypes "github.com/cometbft/cometbft/rpc/core/types" mock "github.com/stretchr/testify/mock" + query "github.com/cometbft/cometbft/libs/pubsub/query" + txindex "github.com/cometbft/cometbft/state/txindex" types "github.com/cometbft/cometbft/abci/types" @@ -69,27 +71,34 @@ func (_m *TxIndexer) Index(result *types.TxResult) error { return r0 } -// Search provides a mock function with given fields: ctx, q -func (_m *TxIndexer) Search(ctx context.Context, q *query.Query) ([]*types.TxResult, error) { - ret := _m.Called(ctx, q) +// Search provides a mock function with given fields: ctx, q, pagSettings +func (_m *TxIndexer) Search(ctx context.Context, q *query.Query, pagSettings coretypes.Pagination) ([]*types.TxResult, int, error) { + ret := _m.Called(ctx, q, pagSettings) var r0 []*types.TxResult - if rf, ok := ret.Get(0).(func(context.Context, *query.Query) []*types.TxResult); ok { - r0 = rf(ctx, q) + if rf, ok := ret.Get(0).(func(context.Context, *query.Query, coretypes.Pagination) []*types.TxResult); ok { + r0 = rf(ctx, q, pagSettings) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]*types.TxResult) } } - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *query.Query) error); ok { - r1 = rf(ctx, q) + var r1 int + if rf, ok := ret.Get(1).(func(context.Context, *query.Query, coretypes.Pagination) int); ok { + r1 = rf(ctx, q, pagSettings) } else { - r1 = ret.Error(1) + r1 = ret.Get(1).(int) } - return r0, r1 + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, *query.Query, coretypes.Pagination) error); ok { + r2 = rf(ctx, q, pagSettings) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 } type mockConstructorTestingTNewTxIndexer interface { diff --git a/state/txindex/null/null.go b/state/txindex/null/null.go index 3e881e826f..43ad5665de 100644 --- a/state/txindex/null/null.go +++ b/state/txindex/null/null.go @@ -6,6 +6,7 @@ import ( abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/libs/pubsub/query" + ctypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/cometbft/cometbft/state/txindex" ) @@ -29,6 +30,6 @@ func (txi *TxIndex) Index(result *abci.TxResult) error { return nil } -func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) { - return []*abci.TxResult{}, nil +func (txi *TxIndex) Search(ctx context.Context, q *query.Query, pagSettings ctypes.Pagination) ([]*abci.TxResult, int, error) { + return []*abci.TxResult{}, 0, nil }