Skip to content

Commit

Permalink
[LUM-834] Better handling of queries timeouts, merkle tree verificati…
Browse files Browse the repository at this point in the history
…on and cosmetic fixes (#54)

* Better handling of queries timeouts

* code format

* Logic improvement

* Use the block height as submission height

* Added v1.6.1 upgrade handler

* Code cleaning

* Added v1.6.2

* Use the revision height from connection itself as submission height

* Removed unused upgrade handlers

* temporarily remove tests related to icq queries

* Delete the query in case of failed proof verification

* fix import ordering issue

* Code cleaning

* Update app.go

* Update msg_server_query_response.go

* Downgrade used IBC version

* Patch the upgrade handler

* Update app.go

* Prepare v1.6.1 release

* Leftover

* Fixed nil result reference

* Update abci.go

* Only check for historical

* Update proposal_register_pool.go

* Chicken egg issue again

---------

Co-authored-by: RLM <rcd.remy@gmail.com>
  • Loading branch information
Segfaultd and Ricardo-Remy authored Sep 28, 2023
1 parent d0ea389 commit 0f96fc6
Show file tree
Hide file tree
Showing 19 changed files with 588 additions and 306 deletions.
8 changes: 3 additions & 5 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,10 +892,8 @@ func (app *App) registerUpgradeHandlers() {
return app.mm.RunMigrations(ctx, app.configurator, fromVM)
})

app.UpgradeKeeper.SetUpgradeHandler("v1.6.0", func(ctx sdk.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) {
app.Logger().Info("Starting v1.6.0 upgrade")

app.Logger().Info("v1.6.0 upgrade applied")
app.UpgradeKeeper.SetUpgradeHandler("v1.6.1", func(ctx sdk.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) {
app.Logger().Info("Starting v1.6.1 upgrade")
return app.mm.RunMigrations(ctx, app.configurator, fromVM)
})

Expand Down Expand Up @@ -981,7 +979,7 @@ func (app *App) registerUpgradeHandlers() {
app.SetStoreLoader(upgradetypes.UpgradeStoreLoader(upgradeInfo.Height, &storeUpgrades))
}

if upgradeInfo.Name == "v1.6.0" && !app.UpgradeKeeper.IsSkipHeight(upgradeInfo.Height) {
if upgradeInfo.Name == "v1.6.1" && !app.UpgradeKeeper.IsSkipHeight(upgradeInfo.Height) {
storeUpgrades := storetypes.StoreUpgrades{}
app.SetStoreLoader(upgradetypes.UpgradeStoreLoader(upgradeInfo.Height, &storeUpgrades))
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/cosmos/cosmos-proto v1.0.0-beta.2
github.com/cosmos/cosmos-sdk v0.47.5
github.com/cosmos/gogoproto v1.4.10
github.com/cosmos/ibc-go/v7 v7.3.0
github.com/cosmos/ibc-go/v7 v7.2.0
github.com/cosmos/ics23/go v0.10.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ github.com/cosmos/gogoproto v1.4.10 h1:QH/yT8X+c0F4ZDacDv3z+xE3WU1P1Z3wQoLMBRJoK
github.com/cosmos/gogoproto v1.4.10/go.mod h1:3aAZzeRWpAwr+SS/LLkICX2/kDFyaYVzckBDzygIxek=
github.com/cosmos/iavl v0.20.0 h1:fTVznVlepH0KK8NyKq8w+U7c2L6jofa27aFX6YGlm38=
github.com/cosmos/iavl v0.20.0/go.mod h1:WO7FyvaZJoH65+HFOsDir7xU9FWk2w9cHXNW1XHcl7A=
github.com/cosmos/ibc-go/v7 v7.3.0 h1:QtGeVMi/3JeLWuvEuC60sBHpAF40Oenx/y+bP8+wRRw=
github.com/cosmos/ibc-go/v7 v7.3.0/go.mod h1:mUmaHFXpXrEdcxfdXyau+utZf14pGKVUiXwYftRZZfQ=
github.com/cosmos/ibc-go/v7 v7.2.0 h1:dx0DLUl7rxdyZ8NiT6UsrbzKOJx/w7s+BOaewFRH6cg=
github.com/cosmos/ibc-go/v7 v7.2.0/go.mod h1:OOcjKIRku/j1Xs1RgKK0yvKRrJ5iFuZYMetR1n3yMlc=
github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM=
github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0=
github.com/cosmos/ledger-cosmos-go v0.12.2 h1:/XYaBlE2BJxtvpkHiBm97gFGSGmYGKunKyF3nNqAXZA=
Expand Down
14 changes: 13 additions & 1 deletion proto/stride/interchainquery/v1/genesis.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,31 @@ package stride.interchainquery.v1;

import "gogoproto/gogo.proto";
import "cosmos_proto/cosmos.proto";
import "google/protobuf/duration.proto";

option go_package = "github.com/lum-network/chain/x/icqueries/types";

enum TimeoutPolicy {
REJECT_QUERY_RESPONSE = 0;
RETRY_QUERY_REQUEST = 1;
EXECUTE_QUERY_CALLBACK = 2;
}

message Query {
string id = 1;
string connection_id = 2;
string chain_id = 3;
string query_type = 4;
bytes request = 5;
string callback_module_name = 7;
string callback_id = 8;
uint64 ttl = 9;
uint64 timeout_timestamp = 9;
bool request_sent = 11;
string extra_id = 12;
TimeoutPolicy timeout_policy = 13;
google.protobuf.Duration timeout_duration = 14
[ (gogoproto.nullable) = false, (gogoproto.stdduration) = true ];
uint64 submission_height = 15;
}

message DataPoint {
Expand Down
73 changes: 43 additions & 30 deletions x/icqueries/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,48 @@ func (k *Keeper) EndBlocker(ctx sdk.Context) {
defer telemetry.ModuleMeasureSince(types.ModuleName, time.Now(), telemetry.MetricKeyEndBlocker)
logger := k.Logger(ctx).With("ctx", "blocker_icq")

// Loop through queries and emit them
k.IterateQueries(ctx, func(_ int64, query types.Query) (stop bool) {
if !query.RequestSent {
events := sdk.Events{}
logger.Info(fmt.Sprintf("InterchainQuery event emitted %s", query.Id))
// QUESTION: Do we need to emit this event twice?
event := sdk.NewEvent(
sdk.EventTypeMessage,
sdk.NewAttribute(sdk.AttributeKeyModule, types.AttributeValueCategory),
sdk.NewAttribute(sdk.AttributeKeyAction, types.AttributeValueQuery),
sdk.NewAttribute(types.AttributeKeyQueryId, query.Id),
sdk.NewAttribute(types.AttributeKeyChainId, query.ChainId),
sdk.NewAttribute(types.AttributeKeyConnectionId, query.ConnectionId),
sdk.NewAttribute(types.AttributeKeyType, query.QueryType),
sdk.NewAttribute(types.AttributeKeyHeight, "0"),
sdk.NewAttribute(types.AttributeKeyRequest, hex.EncodeToString(query.Request)),
)
events = append(events, event)

event.Type = "query_request"
events = append(events, event)

query.RequestSent = true
k.SetQuery(ctx, query)

// Emit events, it will append
ctx.EventManager().EmitEvents(events)
logger.Info(fmt.Sprintf("[ICQ] Emitted a total of %d events, on block %d for query %s", len(events), ctx.BlockHeight(), query.Id))
events := sdk.Events{}

// We can loop on each query since when response is received, we delete them, there will never be much to process
for _, query := range k.AllQueries(ctx) {
if query.RequestSent {
// If the query has timed out here, it means we still haven't received the relayer timeout notification.
// This is fault-tolerant piece of code, since if the relayer confirmation finally arrives, it will just get ignored
if query.HasTimedOut(ctx.BlockTime()) {
// Try to invoke the timeout query, but continue in case of failure (panicking here is heavily dangerous)
if err := k.HandleQueryTimeout(ctx, nil, query); err != nil {
logger.Error(fmt.Sprintf("Error encountered while handling query timeout : %v", err.Error()))
continue
}
// If everything went fine, just delete the query data
k.DeleteQuery(ctx, query.Id)
}
continue
}
return false
})
logger.Info(fmt.Sprintf("InterchainQuery event emitted %s", query.Id))
event := sdk.NewEvent(
sdk.EventTypeMessage,
sdk.NewAttribute(sdk.AttributeKeyModule, types.AttributeValueCategory),
sdk.NewAttribute(sdk.AttributeKeyAction, types.AttributeValueQuery),
sdk.NewAttribute(types.AttributeKeyQueryId, query.Id),
sdk.NewAttribute(types.AttributeKeyChainId, query.ChainId),
sdk.NewAttribute(types.AttributeKeyConnectionId, query.ConnectionId),
sdk.NewAttribute(types.AttributeKeyType, query.QueryType),
sdk.NewAttribute(types.AttributeKeyHeight, "0"),
sdk.NewAttribute(types.AttributeKeyRequest, hex.EncodeToString(query.Request)),
)

// Emit our event twice
events = append(events, event)
event.Type = "query_request"
events = append(events, event)

// Patch our query
query.RequestSent = true
k.SetQuery(ctx, query)
}

if len(events) > 0 {
ctx.EventManager().EmitEvents(events)
}
}
203 changes: 171 additions & 32 deletions x/icqueries/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,25 @@ package keeper

import (
"fmt"
"net/url"
"sort"
"strings"
"time"

ibcconnectiontypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types"

errorsmod "cosmossdk.io/errors"
"github.com/cometbft/cometbft/libs/log"
"github.com/cosmos/cosmos-sdk/codec"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types"
commitmenttypes "github.com/cosmos/ibc-go/v7/modules/core/23-commitment/types"
"github.com/cosmos/ibc-go/v7/modules/core/exported"
ibckeeper "github.com/cosmos/ibc-go/v7/modules/core/keeper"
tendermint "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint"
ics23 "github.com/cosmos/ics23/go"
"github.com/spf13/cast"

"github.com/lum-network/chain/x/icqueries/types"
)
Expand Down Expand Up @@ -45,45 +55,174 @@ func (k *Keeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger().With("module", fmt.Sprintf("x/%s", types.ModuleName))
}

func (k *Keeper) MakeRequest(ctx sdk.Context, module string, callbackId string, chainId string, connectionId string, extraId string, queryType string, request []byte, ttl uint64) error {
k.Logger(ctx).Info(fmt.Sprintf("Submitting ICQ Request - module=%s, callbackId=%s, connectionId=%s, queryType=%s, ttl=%d", module, callbackId, connectionId, queryType, ttl))
func (k *Keeper) MakeRequest(ctx sdk.Context, module string, callbackId string, chainId string, connectionId string, extraId string, queryType string, request []byte, timeoutPolicy types.TimeoutPolicy, timeoutDuration time.Duration) error {
k.Logger(ctx).Info(fmt.Sprintf("Submitting ICQ Request - module=%s, callbackId=%s, connectionId=%s, queryType=%s", module, callbackId, connectionId, queryType))

// Compute our timestamp ttl
timeoutTimestamp := uint64(ctx.BlockTime().UnixNano() + timeoutDuration.Nanoseconds())

// Set the submission height on the Query to the latest light client height
// In the query response, this will be used to verify that the query wasn't historical
connection, found := k.IBCKeeper.ConnectionKeeper.GetConnection(ctx, connectionId)
if !found {
return errorsmod.Wrapf(ibcconnectiontypes.ErrConnectionNotFound, connectionId)
}
clientState, found := k.IBCKeeper.ClientKeeper.GetClientState(ctx, connection.ClientId)
if !found {
return errorsmod.Wrapf(clienttypes.ErrClientNotFound, connection.ClientId)
}

// Construct our query payload and validate it
query := k.NewQuery(ctx, module, callbackId, chainId, connectionId, extraId, queryType, request, timeoutTimestamp, timeoutPolicy, timeoutDuration, clientState.GetLatestHeight().GetRevisionHeight())
if err := k.ValidateQuery(ctx, query); err != nil {
return err
}

// Store the actual query
k.SetQuery(ctx, *query)
return nil
}

// RetryRequest Re-submit an ICQ, generally used after a timeout
func (k *Keeper) RetryRequest(ctx sdk.Context, queryId string) error {
query, found := k.GetQuery(ctx, queryId)
if !found {
return errorsmod.Wrapf(types.ErrInvalidQuery, "Query %s not found", queryId)
}

k.Logger(ctx).Info(fmt.Sprintf("Queuing ICQ Retry - Query Type: %s, Query ID: %s", query.CallbackId, query.Id))

// Patch our query for re-scheduling by patching the flag, and resetting the timeout timestamp to now + duration (avoid instant timeout of it)
query.RequestSent = false
query.TimeoutTimestamp = uint64(ctx.BlockTime().UnixNano() + query.TimeoutDuration.Nanoseconds())
k.SetQuery(ctx, query)
return nil
}

// VerifyKeyProof check if the query requires proving; if it does, verify it!
func (k Keeper) VerifyKeyProof(ctx sdk.Context, msg *types.MsgSubmitQueryResponse, query types.Query) error {
pathParts := strings.Split(query.QueryType, "/")

// the query does NOT have an associated proof, so no need to verify it.
if pathParts[len(pathParts)-1] != "key" {
return nil
}

// If the query is a "key" proof query, verify the results are valid by checking the poof
if msg.ProofOps == nil {
return errorsmod.Wrapf(types.ErrInvalidICQProof, "Unable to validate proof. No proof submitted")
}

// Get the client consensus state at the height 1 block above the message height
proofHeight, err := cast.ToUint64E(msg.Height)
if err != nil {
return err
}
height := clienttypes.NewHeight(clienttypes.ParseChainID(query.ChainId), proofHeight+1)

// Confirm the query proof height occurred after the submission height
if proofHeight < query.SubmissionHeight {
return errorsmod.Wrapf(types.ErrInvalidICQProof, "Query proof height (%d) is older than the submission height (%d)", proofHeight, query.SubmissionHeight)
}

// Confirm the connectionId and chainId are valid
if connectionId == "" {
errMsg := "[ICQ Validation Check] Failed! connection id cannot be empty"
k.Logger(ctx).Error(errMsg)
return errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, errMsg)
// Get the client state and consensus state from the ConnectionId
connection, found := k.IBCKeeper.ConnectionKeeper.GetConnection(ctx, query.ConnectionId)
if !found {
return errorsmod.Wrapf(types.ErrInvalidICQProof, "ConnectionId %s does not exist", query.ConnectionId)
}
if !strings.HasPrefix(connectionId, "connection") {
errMsg := "[ICQ Validation Check] Failed! connection id must begin with 'connection'"
k.Logger(ctx).Error(errMsg)
return errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, errMsg)
consensusState, found := k.IBCKeeper.ClientKeeper.GetClientConsensusState(ctx, connection.ClientId, height)
if !found {
return errorsmod.Wrapf(types.ErrInvalidICQProof, "Consensus state not found for client %s and height %d", connection.ClientId, height)
}
if chainId == "" {
errMsg := "[ICQ Validation Check] Failed! chain_id cannot be empty"
k.Logger(ctx).Error(errMsg)
return errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, errMsg)
clientState, found := k.IBCKeeper.ClientKeeper.GetClientState(ctx, connection.ClientId)
if !found {
return errorsmod.Wrapf(types.ErrInvalidICQProof, "Unable to fetch client state for client %s", connection.ClientId)
}

// Confirm the module and callbackId exist
if module != "" {
if _, exists := k.callbacks[module]; !exists {
err := fmt.Errorf("no callback handler registered for module %s", module)
k.Logger(ctx).Error(err.Error())
return errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "no callback handler registered for module")
// Cast the client and consensus state to tendermint type
tendermintConsensusState, ok := consensusState.(*tendermint.ConsensusState)
if !ok {
return errorsmod.Wrapf(types.ErrInvalidICQProof, "Only tendermint consensus state is supported (%s provided)", consensusState.ClientType())
}
tendermintClientState, ok := clientState.(*tendermint.ClientState)
if !ok {
return errorsmod.Wrapf(types.ErrInvalidICQProof, "Only tendermint client state is supported (%s provided)", clientState.ClientType())
}
var stateRoot exported.Root = tendermintConsensusState.Root
var clientStateProof []*ics23.ProofSpec = tendermintClientState.ProofSpecs

// Get the merkle path and merkle proof
path := commitmenttypes.NewMerklePath([]string{pathParts[1], url.PathEscape(string(query.Request))}...)
merkleProof, err := commitmenttypes.ConvertProofs(msg.ProofOps)
if err != nil {
return errorsmod.Wrapf(types.ErrInvalidICQProof, "Error converting proofs: %s", err.Error())
}

// If we got a non-nil response, verify inclusion proof
if len(msg.Result) != 0 {
if err := merkleProof.VerifyMembership(clientStateProof, stateRoot, path, msg.Result); err != nil {
return errorsmod.Wrapf(types.ErrInvalidICQProof, "Unable to verify membership proof: %s", err.Error())
}
if exists := k.callbacks[module].HasICQCallback(callbackId); !exists {
err := fmt.Errorf("no callback %s registered for module %s", callbackId, module)
k.Logger(ctx).Error(err.Error())
return errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "no callback handler registered for module")
k.Logger(ctx).Info(fmt.Sprintf("Inclusion proof validated - QueryId %s", query.Id))

} else {
// if we got a nil query response, verify non inclusion proof.
if err := merkleProof.VerifyNonMembership(clientStateProof, stateRoot, path); err != nil {
return errorsmod.Wrapf(types.ErrInvalidICQProof, "Unable to verify non-membership proof: %s", err.Error())
}
k.Logger(ctx).Info(fmt.Sprintf("Non-inclusion proof validated - QueryId %s", query.Id))
}

// Save the query to the store
// If the same query is re-requested, it will get replace in the store with an updated TTL
// and the RequestSent bool reset to false
query := k.NewQuery(ctx, module, callbackId, chainId, connectionId, extraId, queryType, request, ttl)
k.SetQuery(ctx, *query)

return nil
}

// HandleQueryTimeout Handles a query timeout based on the timeout policy
func (k Keeper) HandleQueryTimeout(ctx sdk.Context, msg *types.MsgSubmitQueryResponse, query types.Query) error {
k.Logger(ctx).Error(fmt.Sprintf("QUERY TIMEOUT - QueryId: %s, TTL: %d, BlockTime: %d", query.Id, query.TimeoutTimestamp, ctx.BlockHeader().Time.UnixNano()))

switch query.TimeoutPolicy {
case types.TimeoutPolicy_REJECT_QUERY_RESPONSE:
k.Logger(ctx).Info(fmt.Sprintf("Rejecting query %s", query.GetId()))
return nil

case types.TimeoutPolicy_RETRY_QUERY_REQUEST:
k.Logger(ctx).Info(fmt.Sprintf("Retrying query %s", query.GetId()))
return k.RetryRequest(ctx, query.GetId())

case types.TimeoutPolicy_EXECUTE_QUERY_CALLBACK:
k.Logger(ctx).Info(fmt.Sprintf("Invoking query %s callback", query.GetId()))
return k.InvokeCallback(ctx, msg, query, types.QueryResponseStatus_TIMEOUT)

default:
return fmt.Errorf("unsupported query timeout policy: %s", query.TimeoutPolicy.String())
}
}

func (k Keeper) InvokeCallback(ctx sdk.Context, msg *types.MsgSubmitQueryResponse, query types.Query, status types.QueryResponseStatus) error {
// Extract the result payload
var result []byte
if msg != nil {
result = msg.Result
}

// get all the callback handlers and sort them for determinism
// (each module has their own callback handler)
moduleNames := []string{}
for moduleName := range k.callbacks {
moduleNames = append(moduleNames, moduleName)
}
sort.Strings(moduleNames)

// Loop through each module until the callbackId is found in one of the module handlers
for _, moduleName := range moduleNames {
moduleCallbackHandler := k.callbacks[moduleName]

// Once the callback is found, invoke the function
if moduleCallbackHandler.HasICQCallback(query.CallbackId) {
return moduleCallbackHandler.CallICQCallback(ctx, query.CallbackId, result, query, status)
}
}

// If no callback was found, return an error
return types.ErrICQCallbackNotFound
}
Loading

0 comments on commit 0f96fc6

Please sign in to comment.