diff --git a/cli/cmd/initnodes.go b/cli/cmd/initnodes.go index f941cf712..e8e1a2690 100644 --- a/cli/cmd/initnodes.go +++ b/cli/cmd/initnodes.go @@ -170,7 +170,7 @@ func maybeDownloadGenesis(ctx context.Context, network netconf.ID) error { return errors.Wrap(err, "create rpc client") } stubNamer := func(xchain.ChainVersion) string { return "" } - cprov := cprovider.NewABCIProvider(rpcCl, network, stubNamer) + cprov := cprovider.NewABCI(rpcCl, network, stubNamer) execution, consensus, err := cprov.GenesisFiles(ctx) if err != nil { diff --git a/cli/cmd/staking.go b/cli/cmd/staking.go index ec93e71fc..53541d9d6 100644 --- a/cli/cmd/staking.go +++ b/cli/cmd/staking.go @@ -487,7 +487,7 @@ func setupClients( return nil, nil, nil, errors.Wrap(err, "new tendermint client") } - cprov := provider.NewABCIProvider(cl, conf.Network, netconf.ChainVersionNamer(conf.Network)) + cprov := provider.NewABCI(cl, conf.Network, netconf.ChainVersionNamer(conf.Network)) eth, err := ethclient.Dial(chainMeta.Name, conf.ExecutionRPC) if err != nil { diff --git a/e2e/app/monitor.go b/e2e/app/monitor.go index c07d5d418..d3f77e1e6 100644 --- a/e2e/app/monitor.go +++ b/e2e/app/monitor.go @@ -44,7 +44,7 @@ func StartMonitoringReceipts(ctx context.Context, def Definition) func() error { } network := NetworkFromDef(def) // Safe to call NetworkFromDef since this after netman.DeployContracts - cProvider := cprovider.NewABCIProvider(client, def.Testnet.Network, netconf.ChainVersionNamer(def.Testnet.Network)) + cProvider := cprovider.NewABCI(client, def.Testnet.Network, netconf.ChainVersionNamer(def.Testnet.Network)) xProvider := xprovider.New(network, def.Backends().RPCClients(), cProvider) cChainID := def.Testnet.Network.Static().OmniConsensusChainIDUint64() @@ -153,7 +153,7 @@ func MonitorCProvider(ctx context.Context, node *e2e.Node, network netconf.Netwo return errors.Wrap(err, "getting client") } - cprov := cprovider.NewABCIProvider(client, network.ID, netconf.ChainVersionNamer(network.ID)) + cprov := cprovider.NewABCI(client, network.ID, netconf.ChainVersionNamer(network.ID)) for _, chain := range network.Chains { for _, chainVer := range chain.ChainVersions() { diff --git a/e2e/test/attestations_test.go b/e2e/test/attestations_test.go index 040c12078..3f2e0d672 100644 --- a/e2e/test/attestations_test.go +++ b/e2e/test/attestations_test.go @@ -30,7 +30,7 @@ func TestApprovedAttestations(t *testing.T) { client, err := node.Client() require.NoError(t, err) - cprov := provider.NewABCIProvider(client, network.ID, netconf.ChainVersionNamer(netconf.Simnet)) + cprov := provider.NewABCI(client, network.ID, netconf.ChainVersionNamer(netconf.Simnet)) ctx := context.Background() for _, portal := range portals { @@ -80,7 +80,7 @@ func TestApprovedValUpdates(t *testing.T) { client, err := node.Client() require.NoError(t, err) - cprov := provider.NewABCIProvider(client, network.ID, netconf.ChainVersionNamer(netconf.Simnet)) + cprov := provider.NewABCI(client, network.ID, netconf.ChainVersionNamer(netconf.Simnet)) addr, err := k1util.PubKeyToAddress(node.PrivvalKey.PubKey()) require.NoError(t, err) diff --git a/e2e/test/cli_test.go b/e2e/test/cli_test.go index 5cbe65d19..0ae814449 100644 --- a/e2e/test/cli_test.go +++ b/e2e/test/cli_test.go @@ -89,7 +89,7 @@ func TestCLIOperator(t *testing.T) { cl, err := http.New(testnet.Network.Static().ConsensusRPC(), "/websocket") require.NoError(t, err) - cprov := provider.NewABCIProvider(cl, network.ID, netconf.ChainVersionNamer(network.ID)) + cprov := provider.NewABCI(cl, network.ID, netconf.ChainVersionNamer(network.ID)) // wait for validator to be created const valChangeWait = 15 * time.Second diff --git a/halo/app/lazyvoter.go b/halo/app/lazyvoter.go index 22dd68b9e..a3a10ba68 100644 --- a/halo/app/lazyvoter.go +++ b/halo/app/lazyvoter.go @@ -10,7 +10,6 @@ import ( "github.com/omni-network/omni/halo/comet" vtypes "github.com/omni-network/omni/halo/valsync/types" "github.com/omni-network/omni/lib/cchain" - cprovider "github.com/omni-network/omni/lib/cchain/provider" "github.com/omni-network/omni/lib/errors" "github.com/omni-network/omni/lib/ethclient" "github.com/omni-network/omni/lib/expbackoff" @@ -66,7 +65,7 @@ func (l *voterLoader) LazyLoad( netID netconf.ID, omniEVMCl ethclient.Client, endpoints xchain.RPCEndpoints, - cprov cprovider.Provider, + cprov cchain.Provider, privKey crypto.PrivKey, voterStateFile string, cmtAPI comet.API, diff --git a/halo/app/pruning_test.go b/halo/app/pruning_test.go index 46d89ec2b..faa4f8481 100644 --- a/halo/app/pruning_test.go +++ b/halo/app/pruning_test.go @@ -45,7 +45,7 @@ func TestPruningHistory(t *testing.T) { cl, err := rpchttp.New(cfg.Comet.RPC.ListenAddress, "/websocket") require.NoError(t, err) - cprov := cprovider.NewABCIProvider(cl, netconf.Simnet, netconf.ChainVersionNamer(netconf.Simnet)) + cprov := cprovider.NewABCI(cl, netconf.Simnet, netconf.ChainVersionNamer(netconf.Simnet)) // Wait until we get to block 1. waitUntilHeight := uint64(1) diff --git a/halo/app/start.go b/halo/app/start.go index 6e7d8a938..bd8197f54 100644 --- a/halo/app/start.go +++ b/halo/app/start.go @@ -10,6 +10,7 @@ import ( halocfg "github.com/omni-network/omni/halo/config" "github.com/omni-network/omni/halo/genutil/genserve" "github.com/omni-network/omni/lib/buildinfo" + "github.com/omni-network/omni/lib/cchain" cprovider "github.com/omni-network/omni/lib/cchain/provider" "github.com/omni-network/omni/lib/errors" "github.com/omni-network/omni/lib/ethclient" @@ -174,7 +175,15 @@ func Start(ctx context.Context, cfg Config) (<-chan error, func(context.Context) cmtAPI := comet.NewAPI(rpcClient) app.SetCometAPI(cmtAPI) - cProvider := cprovider.NewABCIProvider(rpcClient, cfg.Network, netconf.ChainVersionNamer(cfg.Network)) + clientCtx := app.ClientContext(ctx).WithClient(rpcClient).WithHomeDir(cfg.HomeDir) + if err := startRPCServers(ctx, cfg, app, sdkLogger, metrics, asyncAbort, clientCtx); err != nil { + return nil, nil, err + } + + cProvider, err := newCProvider(rpcClient, cfg) + if err != nil { + return nil, nil, err + } go func() { err := voter.LazyLoad( @@ -193,11 +202,6 @@ func Start(ctx context.Context, cfg Config) (<-chan error, func(context.Context) } }() - clientCtx := app.ClientContext(ctx).WithClient(rpcClient).WithHomeDir(cfg.HomeDir) - if err := startRPCServers(ctx, cfg, app, sdkLogger, metrics, asyncAbort, clientCtx); err != nil { - return nil, nil, err - } - log.Info(ctx, "Starting CometBFT") if err := cmtNode.Start(); err != nil { @@ -239,6 +243,16 @@ func Start(ctx context.Context, cfg Config) (<-chan error, func(context.Context) }, nil } +// newCProvider returns a new cchain provider. Either GRPC if enabled since it is faster, +// otherwise the ABCI provider. +func newCProvider(rpcClient *rpclocal.Local, cfg Config) (cchain.Provider, error) { + if cfg.SDKGRPC.Enable { + return cprovider.NewGRPC(cfg.SDKGRPC.Address, cfg.Network, netconf.ChainVersionNamer(cfg.Network)) + } + + return cprovider.NewABCI(rpcClient, cfg.Network, netconf.ChainVersionNamer(cfg.Network)), nil +} + // startRPCServers starts the Cosmos REST and gRPC servers. func startRPCServers( ctx context.Context, diff --git a/halo/app/start_test.go b/halo/app/start_test.go index f946aca8c..56a2cfbc6 100644 --- a/halo/app/start_test.go +++ b/halo/app/start_test.go @@ -13,11 +13,8 @@ import ( haloapp "github.com/omni-network/omni/halo/app" uluwatu1 "github.com/omni-network/omni/halo/app/upgrades/uluwatu" - atypes "github.com/omni-network/omni/halo/attest/types" halocmd "github.com/omni-network/omni/halo/cmd" halocfg "github.com/omni-network/omni/halo/config" - ptypes "github.com/omni-network/omni/halo/portal/types" - "github.com/omni-network/omni/lib/cchain/grpc" cprovider "github.com/omni-network/omni/lib/cchain/provider" "github.com/omni-network/omni/lib/ethclient" "github.com/omni-network/omni/lib/log" @@ -29,8 +26,6 @@ import ( "github.com/cometbft/cometbft/types" db "github.com/cosmos/cosmos-db" - sltypes "github.com/cosmos/cosmos-sdk/x/slashing/types" - stypes "github.com/cosmos/cosmos-sdk/x/staking/types" "github.com/stretchr/testify/require" ) @@ -54,7 +49,9 @@ func TestSmoke(t *testing.T) { cl, err := rpchttp.New(cfg.Comet.RPC.ListenAddress, "/websocket") require.NoError(t, err) - cprov := cprovider.NewABCIProvider(cl, netconf.Simnet, netconf.ChainVersionNamer(netconf.Simnet)) + cprov := cprovider.NewABCI(cl, netconf.Simnet, netconf.ChainVersionNamer(netconf.Simnet)) + cprovGRPC, err := cprovider.NewGRPC(cfg.SDKGRPC.Address, netconf.Simnet, netconf.ChainVersionNamer(netconf.Simnet)) + require.NoError(t, err) // Wait until we get to block 3. const target = uint64(3) @@ -70,8 +67,8 @@ func TestSmoke(t *testing.T) { testReadyEndpoint(t, cfg) testAPI(t, cfg) - testGRPC(t, ctx, cfg) testCProvider(t, ctx, cprov) + testCProvider(t, ctx, cprovGRPC) genSet, err := cl.Validators(ctx, int64Ptr(1), nil, nil) require.NoError(t, err) @@ -153,35 +150,6 @@ func testReadyEndpoint(t *testing.T, cfg haloapp.Config) { require.NoError(t, err) } -func testGRPC(t *testing.T, ctx context.Context, cfg haloapp.Config) { - t.Helper() - cl, err := grpc.Dial(cfg.SDKGRPC.Address) - require.NoError(t, err) - - vals, err := cl.Staking.Validators(ctx, &stypes.QueryValidatorsRequest{}) - require.NoError(t, err) - require.NotEmpty(t, vals.Validators) - - infos, err := cl.Slashing.SigningInfos(ctx, &sltypes.QuerySigningInfosRequest{}) - require.NoError(t, err) - require.NotEmpty(t, infos.Info) - - pResp, err := cl.Slashing.Params(ctx, &sltypes.QueryParamsRequest{}) - require.NoError(t, err) - require.Equal(t, uluwatu1.SlashingParams, pResp.Params) - - _, err = cl.Portal.Block(ctx, &ptypes.BlockRequest{Latest: true}) - require.NoError(t, err) - - _, err = cl.Attest.ListAllAttestations(ctx, &atypes.ListAllAttestationsRequest{ - ChainId: cfg.Network.Static().OmniExecutionChainID, - ConfLevel: uint32(xchain.ConfFinalized), - Status: 1, - FromOffset: 0, - }) - require.NoError(t, err) -} - func testCProvider(t *testing.T, ctx context.Context, cprov cprovider.Provider) { t.Helper() diff --git a/lib/cchain/grpc/client.go b/lib/cchain/grpc/client.go deleted file mode 100644 index 1447a6ef1..000000000 --- a/lib/cchain/grpc/client.go +++ /dev/null @@ -1,33 +0,0 @@ -package grpc - -import ( - atypes "github.com/omni-network/omni/halo/attest/types" - ptypes "github.com/omni-network/omni/halo/portal/types" - "github.com/omni-network/omni/lib/errors" - - sltypes "github.com/cosmos/cosmos-sdk/x/slashing/types" - sttypes "github.com/cosmos/cosmos-sdk/x/staking/types" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -type Client struct { - Staking sttypes.QueryClient - Slashing sltypes.QueryClient - Attest atypes.QueryClient - Portal ptypes.QueryClient -} - -func Dial(target string) (Client, error) { - grpcClient, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return Client{}, errors.Wrap(err, "new grpc client") - } - - return Client{ - Staking: sttypes.NewQueryClient(grpcClient), - Slashing: sltypes.NewQueryClient(grpcClient), - Attest: atypes.NewQueryClient(grpcClient), - Portal: ptypes.NewQueryClient(grpcClient), - }, nil -} diff --git a/lib/cchain/provider.go b/lib/cchain/provider.go index f2485a8b3..6c6c0a844 100644 --- a/lib/cchain/provider.go +++ b/lib/cchain/provider.go @@ -6,8 +6,6 @@ import ( rtypes "github.com/omni-network/omni/halo/registry/types" "github.com/omni-network/omni/lib/xchain" - rpcclient "github.com/cometbft/cometbft/rpc/client" - "github.com/ethereum/go-ethereum/common" utypes "cosmossdk.io/x/upgrade/types" @@ -77,9 +75,6 @@ type Provider interface { // GenesisFiles returns the execution (optional) and consensus genesis files. GenesisFiles(ctx context.Context) (execution []byte, consensus []byte, err error) - // CometClient returns the underlying cometBFT RPC client. - CometClient() rpcclient.Client - // Portals returns the portals registered in the registry module. Portals(ctx context.Context) ([]rtypes.Portal, bool, error) diff --git a/lib/cchain/provider/abci.go b/lib/cchain/provider/abci.go index bbb054035..d6406a5a6 100644 --- a/lib/cchain/provider/abci.go +++ b/lib/cchain/provider/abci.go @@ -26,6 +26,7 @@ import ( errorsmod "cosmossdk.io/errors" utypes "cosmossdk.io/x/upgrade/types" + "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" dtypes "github.com/cosmos/cosmos-sdk/x/distribution/types" @@ -34,8 +35,12 @@ import ( gogogrpc "github.com/cosmos/gogoproto/grpc" "github.com/cosmos/gogoproto/proto" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" ) +// Dial returns a ABCI provider to the provided network connecting to well-known public RPCs. func Dial(network netconf.ID) (Provider, error) { consRPC := network.Static().ConsensusRPC() if consRPC == "" { @@ -47,27 +52,45 @@ func Dial(network netconf.ID) (Provider, error) { return Provider{}, errors.Wrap(err, "new tendermint client") } - return NewABCIProvider(cl, network, netconf.ChainVersionNamer(network)), nil + return NewABCI(cl, network, netconf.ChainVersionNamer(network)), nil } -func NewABCIProvider(cmtCl rpcclient.Client, network netconf.ID, chainNamer func(xchain.ChainVersion) string) Provider { +// NewABCI returns a new provider using the provided cometBFT ABCI client. +func NewABCI(cmtCl rpcclient.Client, network netconf.ID, chainNamer func(xchain.ChainVersion) string) Provider { + return newProvider(rpcAdaptor{abci: cmtCl}, network, chainNamer) +} + +// NewGRPC returns a new provider using the provided gRPC server address. +// This is preferred to NewABCI as it bypasses CometBFT so is much faster +// and doesn't affect chain performance. +func NewGRPC(target string, network netconf.ID, chainNamer func(xchain.ChainVersion) string) (Provider, error) { + grpcClient, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return Provider{}, errors.Wrap(err, "new grpc client") + } + + return newProvider(grpcClient, network, chainNamer), nil +} + +func newProvider(cc gogogrpc.ClientConn, network netconf.ID, chainNamer func(xchain.ChainVersion) string) Provider { // Stream backoff for 1s, querying new attestations after 1 consensus block backoffFunc := func(ctx context.Context) func() { return expbackoff.New(ctx, expbackoff.WithPeriodicConfig(time.Second)) } - acl := atypes.NewQueryClient(rpcAdaptor{abci: cmtCl}) - vcl := vtypes.NewQueryClient(rpcAdaptor{abci: cmtCl}) - pcl := ptypes.NewQueryClient(rpcAdaptor{abci: cmtCl}) - rcl := rtypes.NewQueryClient(rpcAdaptor{abci: cmtCl}) - gcl := genserve.NewQueryClient(rpcAdaptor{abci: cmtCl}) - ucl := utypes.NewQueryClient(rpcAdaptor{abci: cmtCl}) - scl := stypes.NewQueryClient(rpcAdaptor{abci: cmtCl}) - dcl := dtypes.NewQueryClient(rpcAdaptor{abci: cmtCl}) - slcl := sltypes.NewQueryClient(rpcAdaptor{abci: cmtCl}) + acl := atypes.NewQueryClient(cc) + vcl := vtypes.NewQueryClient(cc) + pcl := ptypes.NewQueryClient(cc) + rcl := rtypes.NewQueryClient(cc) + gcl := genserve.NewQueryClient(cc) + ucl := utypes.NewQueryClient(cc) + scl := stypes.NewQueryClient(cc) + dcl := dtypes.NewQueryClient(cc) + slcl := sltypes.NewQueryClient(cc) + cmtcl := cmtservice.NewServiceClient(cc) return Provider{ - fetch: newABCIFetchFunc(acl, cmtCl, chainNamer), + fetch: newABCIFetchFunc(acl, cmtcl, chainNamer), allAtts: newABCIAllAttsFunc(acl), latest: newABCILatestFunc(acl), window: newABCIWindowFunc(acl), @@ -81,12 +104,10 @@ func NewABCIProvider(cmtCl rpcclient.Client, network netconf.ID, chainNamer func genesisFunc: newABCIGenesisFunc(gcl), plannedFunc: newABCIPlannedUpgradeFunc(ucl), appliedFunc: newABCIAppliedUpgradeFunc(ucl), - chainID: newChainIDFunc(cmtCl), - header: cmtCl.Header, + chainID: newChainIDFunc(cmtcl), backoffFunc: backoffFunc, chainNamer: chainNamer, network: network, - cometCl: cmtCl, } } @@ -160,7 +181,7 @@ func newABCIPlannedUpgradeFunc(ucl utypes.QueryClient) planedUpgradeFunc { } // newChainIDFunc returns a function that returns the consensus chain ID. It caches the result. -func newChainIDFunc(abci rpcclient.SignClient) chainIDFunc { +func newChainIDFunc(cmtCl cmtservice.ServiceClient) chainIDFunc { var mu sync.Mutex var chainID uint64 @@ -174,12 +195,12 @@ func newChainIDFunc(abci rpcclient.SignClient) chainIDFunc { ctx, span := tracer.Start(ctx, spanName("chain_id")) defer span.End() - resp, err := abci.Header(ctx, nil) + resp, err := cmtCl.GetLatestBlock(ctx, &cmtservice.GetLatestBlockRequest{}) if err != nil { return 0, errors.Wrap(err, "abci header") } - chainID, err = netconf.ConsensusChainIDStr2Uint64(resp.Header.ChainID) + chainID, err = netconf.ConsensusChainIDStr2Uint64(resp.SdkBlock.Header.ChainID) if err != nil { return 0, errors.Wrap(err, "parse chain ID") } @@ -201,7 +222,7 @@ func newABCIRewards(cl dtypes.QueryClient) rewardsFunc { } resp, err := cl.ValidatorOutstandingRewards(ctx, req) - if errors.Is(err, sdkerrors.ErrKeyNotFound) { + if errors.Is(err, sdkerrors.ErrKeyNotFound) || status.Code(err) == codes.NotFound { return 0, false, nil } else if err != nil { incQueryErr(endpoint) @@ -222,7 +243,7 @@ func newABCIValFunc(cl stypes.QueryClient) valFunc { valAddr := sdk.ValAddress(operatorAddr.Bytes()) resp, err := cl.Validator(ctx, &stypes.QueryValidatorRequest{ValidatorAddr: valAddr.String()}) - if errors.Is(err, sdkerrors.ErrKeyNotFound) { + if errors.Is(err, sdkerrors.ErrKeyNotFound) || status.Code(err) == codes.NotFound { return cchain.SDKValidator{}, false, nil } else if err != nil { incQueryErr(endpoint) @@ -265,7 +286,7 @@ func newABCIValsetFunc(cl vtypes.QueryClient) valsetFunc { defer span.End() resp, err := cl.ValidatorSet(ctx, &vtypes.ValidatorSetRequest{Id: valSetID, Latest: latest}) - if errors.Is(err, sdkerrors.ErrKeyNotFound) { + if errors.Is(err, sdkerrors.ErrKeyNotFound) || status.Code(err) == codes.NotFound { return valSetResponse{}, false, nil } else if err != nil { incQueryErr(endpoint) @@ -322,7 +343,7 @@ func newABCIAllAttsFunc(cl atypes.QueryClient) allAttsFunc { } } -func newABCIFetchFunc(cl atypes.QueryClient, client rpcclient.Client, chainNamer func(xchain.ChainVersion) string) fetchFunc { +func newABCIFetchFunc(attCl atypes.QueryClient, cmtCl cmtservice.ServiceClient, chainNamer func(xchain.ChainVersion) string) fetchFunc { return func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64) ([]xchain.Attestation, error) { const endpoint = "fetch_attestations" defer latency(endpoint)() @@ -333,18 +354,16 @@ func newABCIFetchFunc(cl atypes.QueryClient, client rpcclient.Client, chainNamer chainName := chainNamer(chainVer) // try fetching from latest height - atts, ok, err := attsFromAtHeight(ctx, cl, chainVer, fromOffset, 0) + atts, ok, err := attsFromAtHeight(ctx, attCl, chainVer, fromOffset, 0) if err != nil { incQueryErr(endpoint) return nil, errors.Wrap(err, "abci query attestations-from") - } - - if ok { + } else if ok { fetchStepsMetrics(chainName, 0, 0) return atts, nil } - earliestAttestationAtLatestHeight, ok, err := queryEarliestAttestation(ctx, cl, chainVer, 0) + earliestAttestationAtLatestHeight, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, 0) if err != nil { incQueryErr(endpoint) return nil, errors.Wrap(err, "abci query earliest-attestation-in-state") @@ -357,19 +376,17 @@ func newABCIFetchFunc(cl atypes.QueryClient, client rpcclient.Client, chainNamer return []xchain.Attestation{}, nil } - offsetHeight, err := searchOffsetInHistory(ctx, client, cl, chainVer, chainName, fromOffset) + offsetHeight, err := searchOffsetInHistory(ctx, cmtCl, attCl, chainVer, chainName, fromOffset) if err != nil { incQueryErr(endpoint) return nil, errors.Wrap(err, "searching offset in history") } - atts, attsFromOk, err := attsFromAtHeight(ctx, cl, chainVer, fromOffset, offsetHeight) + atts, ok, err = attsFromAtHeight(ctx, attCl, chainVer, fromOffset, offsetHeight) if err != nil { incQueryErr(endpoint) return nil, errors.Wrap(err, "abci query attestations-from") - } - - if !attsFromOk { + } else if !ok { return nil, errors.New("expected to find attestations [BUG]") } @@ -429,7 +446,7 @@ func newABCIPortalBlockFunc(pcl ptypes.QueryClient) portalBlockFunc { defer span.End() resp, err := pcl.Block(ctx, &ptypes.BlockRequest{Id: attestOffset, Latest: latest}) - if errors.Is(err, sdkerrors.ErrKeyNotFound) { + if errors.Is(err, sdkerrors.ErrKeyNotFound) || status.Code(err) == codes.NotFound { return nil, false, nil } else if err != nil { incQueryErr(endpoint) @@ -449,7 +466,7 @@ func newABCINetworkFunc(pcl rtypes.QueryClient) networkFunc { defer span.End() resp, err := pcl.Network(ctx, &rtypes.NetworkRequest{Id: networkID, Latest: latest}) - if errors.Is(err, sdkerrors.ErrKeyNotFound) { + if errors.Is(err, sdkerrors.ErrKeyNotFound) || status.Code(err) == codes.NotFound { return nil, false, nil } else if err != nil { incQueryErr(endpoint) @@ -548,18 +565,20 @@ func spanName(endpoint string) string { // searchOffsetInHistory searches the consensus state history and // returns a historical consensus block height that contains an approved attestation // for the provided chain version and fromOffset. -func searchOffsetInHistory(ctx context.Context, client rpcclient.Client, cl atypes.QueryClient, chainVer xchain.ChainVersion, chainName string, fromOffset uint64) (uint64, error) { +func searchOffsetInHistory(ctx context.Context, cmtCl cmtservice.ServiceClient, attCl atypes.QueryClient, chainVer xchain.ChainVersion, chainName string, fromOffset uint64) (uint64, error) { const endpoint = "search_offset" defer latency(endpoint)() // Exponentially backoff to find a good start point for binary search, this prefers more recent queries - info, err := client.ABCIInfo(ctx) + + latestBlockResp, err := cmtCl.GetLatestBlock(ctx, &cmtservice.GetLatestBlockRequest{}) if err != nil { - return 0, errors.Wrap(err, "abci query info") + return 0, errors.Wrap(err, "query latest block") } + latestHeight := uint64(latestBlockResp.SdkBlock.Header.Height) var startHeightIndex uint64 - endHeightIndex := uint64(info.Response.LastBlockHeight) + endHeightIndex := latestHeight lookback := uint64(1) var lookbackStepsCounter uint64 // For metrics only queryHeight := endHeightIndex @@ -572,17 +591,17 @@ func searchOffsetInHistory(ctx context.Context, client rpcclient.Client, cl atyp queryHeight -= lookback } - if queryHeight == 0 || queryHeight >= uint64(info.Response.LastBlockHeight) { + if queryHeight == 0 || queryHeight >= latestHeight { return 0, errors.New("unexpected query height [BUG]", "height", queryHeight) // This should never happen } - earliestAtt, ok, err := queryEarliestAttestation(ctx, cl, chainVer, queryHeight) + earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, queryHeight) if IsErrHistoryPruned(err) { // We've jumped to before the prune height, but _might_ still have the requested offset - earliestStoreHeight, err := getEarliestStoreHeight(ctx, cl, chainVer, queryHeight+1) + earliestStoreHeight, err := getEarliestStoreHeight(ctx, attCl, chainVer, queryHeight+1) if err != nil { return 0, errors.Wrap(err, "failed to get earliest store height") } - earliestAtt, ok, err = queryEarliestAttestation(ctx, cl, chainVer, earliestStoreHeight) + earliestAtt, ok, err = queryEarliestAttestation(ctx, attCl, chainVer, earliestStoreHeight) if err != nil { incQueryErr(endpoint) return 0, errors.Wrap(err, "abci query earliest-attestation-in-state") @@ -620,7 +639,7 @@ func searchOffsetInHistory(ctx context.Context, client rpcclient.Client, cl atyp binarySearchStepsCounter++ midHeightIndex := startHeightIndex + umath.SubtractOrZero(endHeightIndex, startHeightIndex)/2 - earliestAtt, ok, err := queryEarliestAttestation(ctx, cl, chainVer, midHeightIndex) + earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, midHeightIndex) if err != nil { incQueryErr(endpoint) return 0, errors.Wrap(err, "abci query earliest-attestation-in-state") @@ -632,7 +651,7 @@ func searchOffsetInHistory(ctx context.Context, client rpcclient.Client, cl atyp continue } - latestAtt, ok, err := queryLatestAttestation(ctx, cl, chainVer, midHeightIndex) + latestAtt, ok, err := queryLatestAttestation(ctx, attCl, chainVer, midHeightIndex) if err != nil { incQueryErr(endpoint) return 0, errors.Wrap(err, "abci query latest-attestation") @@ -643,7 +662,7 @@ func searchOffsetInHistory(ctx context.Context, client rpcclient.Client, cl atyp } if fromOffset >= earliestAtt.AttestOffset && fromOffset <= latestAtt.AttestOffset { - log.Debug(ctx, "Fetching offset from history", "chain", chainName, "from", fromOffset, "latest", info.Response.LastBlockHeight, "found", midHeightIndex, "lookback", lookbackStepsCounter, "search", binarySearchStepsCounter) + log.Debug(ctx, "Fetching offset from history", "chain", chainName, "from", fromOffset, "latest", latestHeight, "found", midHeightIndex, "lookback", lookbackStepsCounter, "search", binarySearchStepsCounter) fetchStepsMetrics(chainName, lookbackStepsCounter, binarySearchStepsCounter) return midHeightIndex, nil @@ -687,7 +706,7 @@ func queryEarliestAttestation(ctx context.Context, cl atypes.QueryClient, chainV ChainId: chainVer.ID, ConfLevel: uint32(chainVer.ConfLevel), }) - if errors.Is(err, sdkerrors.ErrKeyNotFound) { + if errors.Is(err, sdkerrors.ErrKeyNotFound) || status.Code(err) == codes.NotFound { return xchain.Attestation{}, false, nil } else if err != nil { return xchain.Attestation{}, false, err @@ -709,7 +728,7 @@ func queryLatestAttestation(ctx context.Context, cl atypes.QueryClient, chainVer ChainId: chainVer.ID, ConfLevel: uint32(chainVer.ConfLevel), }) - if errors.Is(err, sdkerrors.ErrKeyNotFound) { + if errors.Is(err, sdkerrors.ErrKeyNotFound) || status.Code(err) == codes.NotFound { return xchain.Attestation{}, false, nil } else if err != nil { return xchain.Attestation{}, false, err @@ -733,9 +752,7 @@ func attsFromAtHeight(ctx context.Context, cl atypes.QueryClient, chainVer xchai }) if err != nil { return []xchain.Attestation{}, false, errors.Wrap(err, "abci query attestations-from") - } - - if len(resp.Attestations) == 0 { + } else if len(resp.Attestations) == 0 { return []xchain.Attestation{}, false, nil } diff --git a/lib/cchain/provider/provider.go b/lib/cchain/provider/provider.go index 62c821eab..57431e9a1 100644 --- a/lib/cchain/provider/provider.go +++ b/lib/cchain/provider/provider.go @@ -18,9 +18,6 @@ import ( "github.com/omni-network/omni/lib/tracer" "github.com/omni-network/omni/lib/xchain" - rpcclient "github.com/cometbft/cometbft/rpc/client" - ctypes "github.com/cometbft/cometbft/rpc/core/types" - "github.com/ethereum/go-ethereum/common" upgradetypes "cosmossdk.io/x/upgrade/types" @@ -42,7 +39,6 @@ type valFunc func(ctx context.Context, operator common.Address) (cchain.SDKValid type valsFunc func(ctx context.Context) ([]cchain.SDKValidator, error) type rewardsFunc func(ctx context.Context, operator common.Address) (float64, bool, error) type valsetFunc func(ctx context.Context, valSetID uint64, latest bool) (valSetResponse, bool, error) -type headerFunc func(ctx context.Context, height *int64) (*ctypes.ResultHeader, error) type chainIDFunc func(ctx context.Context) (uint64, error) type genesisFunc func(ctx context.Context) (execution []byte, consensus []byte, err error) type planedUpgradeFunc func(ctx context.Context) (upgradetypes.Plan, bool, error) @@ -58,7 +54,6 @@ type valSetResponse struct { // Provider implements cchain.Provider. type Provider struct { - cometCl rpcclient.Client fetch fetchFunc allAtts allAttsFunc latest latestFunc @@ -69,7 +64,6 @@ type Provider struct { vals valsFunc rewards rewardsFunc chainID chainIDFunc - header headerFunc portalBlock portalBlockFunc networkFunc networkFunc genesisFunc genesisFunc @@ -93,10 +87,6 @@ func NewProviderForT(_ *testing.T, fetch fetchFunc, latest latestFunc, window wi } } -func (p Provider) CometClient() rpcclient.Client { - return p.cometCl -} - func (p Provider) CurrentPlannedPlan(ctx context.Context) (upgradetypes.Plan, bool, error) { return p.plannedFunc(ctx) } diff --git a/lib/cchain/provider/xblock_internal_test.go b/lib/cchain/provider/xblock_internal_test.go index 11e0e860f..4ed74aa08 100644 --- a/lib/cchain/provider/xblock_internal_test.go +++ b/lib/cchain/provider/xblock_internal_test.go @@ -3,23 +3,19 @@ package provider import ( "context" "testing" - "time" ptypes "github.com/omni-network/omni/halo/portal/types" rtypes "github.com/omni-network/omni/halo/registry/types" "github.com/omni-network/omni/lib/cchain" "github.com/omni-network/omni/lib/tutil" - ctypes "github.com/cometbft/cometbft/rpc/core/types" - "github.com/cometbft/cometbft/types" - fuzz "github.com/google/gofuzz" "github.com/stretchr/testify/require" ) //go:generate go test . -golden -clean -func setupTest(t *testing.T) (uint64, func(ctx context.Context, h uint64, _ bool) (*rtypes.NetworkResponse, bool, error), valsetFunc, chainIDFunc, headerFunc, portalBlockFunc) { +func setupTest(t *testing.T) (uint64, func(ctx context.Context, h uint64, _ bool) (*rtypes.NetworkResponse, bool, error), valsetFunc, chainIDFunc, portalBlockFunc) { t.Helper() f := fuzz.NewWithSeed(99).NilChance(0).Funcs( // Fuzz valid validators. @@ -35,8 +31,6 @@ func setupTest(t *testing.T) (uint64, func(ctx context.Context, h uint64, _ bool var height uint64 f.Fuzz(&height) - timestamp := time.Unix(1712312027, 0).UTC() - valFunc := func(ctx context.Context, h uint64, _ bool) (valSetResponse, bool, error) { require.EqualValues(t, height, h) var resp []cchain.PortalValidator @@ -57,13 +51,6 @@ func setupTest(t *testing.T) (uint64, func(ctx context.Context, h uint64, _ bool chainFunc := func(ctx context.Context) (uint64, error) { return 77, nil } - headerFunc := func(ctx context.Context, h *int64) (*ctypes.ResultHeader, error) { - return &ctypes.ResultHeader{ - Header: &types.Header{ - Time: timestamp, - }, - }, nil - } portalBlockFunc := func(ctx context.Context, h uint64, _ bool) (*ptypes.BlockResponse, bool, error) { var valSetMsg ptypes.Msg f.Fuzz(&valSetMsg) @@ -82,15 +69,15 @@ func setupTest(t *testing.T) (uint64, func(ctx context.Context, h uint64, _ bool }, true, nil } - return height, networkFunc, valFunc, chainFunc, headerFunc, portalBlockFunc + return height, networkFunc, valFunc, chainFunc, portalBlockFunc } // TestXBlock ensures we receive expected xblock response from provider. func TestXBlock(t *testing.T) { t.Parallel() - height, networkFunc, valFunc, chainFunc, headerFunc, portalBlockFunc := setupTest(t) - prov := Provider{valset: valFunc, networkFunc: networkFunc, chainID: chainFunc, header: headerFunc, portalBlock: portalBlockFunc} + height, networkFunc, valFunc, chainFunc, portalBlockFunc := setupTest(t) + prov := Provider{valset: valFunc, networkFunc: networkFunc, chainID: chainFunc, portalBlock: portalBlockFunc} block, ok, err := prov.XBlock(context.Background(), height, false) require.NoError(t, err) @@ -111,8 +98,8 @@ func TestXBlock_MaliciousResponse(t *testing.T) { }, true, nil } - height, networkFunc, valFunc, chainFunc, headerFunc, _ := setupTest(t) - prov := Provider{valset: valFunc, networkFunc: networkFunc, chainID: chainFunc, header: headerFunc, portalBlock: portalBlockFunc} + height, networkFunc, valFunc, chainFunc, _ := setupTest(t) + prov := Provider{valset: valFunc, networkFunc: networkFunc, chainID: chainFunc, portalBlock: portalBlockFunc} _, ok, err := prov.XBlock(context.Background(), height, false) require.Error(t, err) require.Equal(t, "unexpected empty block [BUG]", err.Error()) diff --git a/lib/xchain/connect/connect.go b/lib/xchain/connect/connect.go index 7aa73966a..122860867 100644 --- a/lib/xchain/connect/connect.go +++ b/lib/xchain/connect/connect.go @@ -16,6 +16,7 @@ import ( "github.com/omni-network/omni/lib/xchain" xprovider "github.com/omni-network/omni/lib/xchain/provider" + rpcclient "github.com/cometbft/cometbft/rpc/client" rpchttp "github.com/cometbft/cometbft/rpc/client/http" "github.com/ethereum/go-ethereum/common" @@ -27,6 +28,7 @@ type Connector struct { XProvider xchain.Provider CProvider cchain.Provider EthClients map[uint64]ethclient.Client + CmtCl rpcclient.Client } // Backend returns an ethbackend for the given chainID. @@ -98,7 +100,7 @@ func New(ctx context.Context, netID netconf.ID, endpoints xchain.RPCEndpoints) ( return Connector{}, errors.Wrap(err, "comet rpc client") } - cprov := cprovider.NewABCIProvider(cometCl, netID, netconf.ChainVersionNamer(netID)) + cprov := cprovider.NewABCI(cometCl, netID, netconf.ChainVersionNamer(netID)) xprov := xprovider.New(network, ethClients, cprov) @@ -107,6 +109,7 @@ func New(ctx context.Context, netID netconf.ID, endpoints xchain.RPCEndpoints) ( XProvider: xprov, CProvider: cprov, EthClients: ethClients, + CmtCl: cometCl, }, nil } diff --git a/monitor/app/app.go b/monitor/app/app.go index 330d21a0a..d4000f883 100644 --- a/monitor/app/app.go +++ b/monitor/app/app.go @@ -68,7 +68,7 @@ func Run(ctx context.Context, cfg Config) error { return err } - cprov := cprovider.NewABCIProvider(tmClient, network.ID, netconf.ChainVersionNamer(cfg.Network)) + cprov := cprovider.NewABCI(tmClient, network.ID, netconf.ChainVersionNamer(cfg.Network)) xprov := xprovider.New(network, ethClients, cprov) diff --git a/relayer/app/app.go b/relayer/app/app.go index 5c7f7e26e..27be8ab16 100644 --- a/relayer/app/app.go +++ b/relayer/app/app.go @@ -54,7 +54,7 @@ func Run(ctx context.Context, cfg Config) error { return err } - cprov := cprovider.NewABCIProvider(tmClient, network.ID, netconf.ChainVersionNamer(cfg.Network)) + cprov := cprovider.NewABCI(tmClient, network.ID, netconf.ChainVersionNamer(cfg.Network)) xprov := xprovider.New(network, rpcClientPerChain, cprov) pricer := newTokenPricer(ctx)