Skip to content

Commit

Permalink
feat(lib/cchain): support grpc cprovider (#2216)
Browse files Browse the repository at this point in the history
Implement a gRPC cprovider that connects directly to CosmosSDK gprc
server bypassing cometBFT. This reduces negative impact of CometBFT ABCI
queries on chain performance and speeds up queries.

issue: fixes #2002
  • Loading branch information
corverroos authored Oct 18, 2024
1 parent 7802ce7 commit b928597
Show file tree
Hide file tree
Showing 17 changed files with 112 additions and 172 deletions.
2 changes: 1 addition & 1 deletion cli/cmd/initnodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func maybeDownloadGenesis(ctx context.Context, network netconf.ID) error {
return errors.Wrap(err, "create rpc client")
}
stubNamer := func(xchain.ChainVersion) string { return "" }
cprov := cprovider.NewABCIProvider(rpcCl, network, stubNamer)
cprov := cprovider.NewABCI(rpcCl, network, stubNamer)

execution, consensus, err := cprov.GenesisFiles(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cli/cmd/staking.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func setupClients(
return nil, nil, nil, errors.Wrap(err, "new tendermint client")
}

cprov := provider.NewABCIProvider(cl, conf.Network, netconf.ChainVersionNamer(conf.Network))
cprov := provider.NewABCI(cl, conf.Network, netconf.ChainVersionNamer(conf.Network))

eth, err := ethclient.Dial(chainMeta.Name, conf.ExecutionRPC)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions e2e/app/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func StartMonitoringReceipts(ctx context.Context, def Definition) func() error {
}

network := NetworkFromDef(def) // Safe to call NetworkFromDef since this after netman.DeployContracts
cProvider := cprovider.NewABCIProvider(client, def.Testnet.Network, netconf.ChainVersionNamer(def.Testnet.Network))
cProvider := cprovider.NewABCI(client, def.Testnet.Network, netconf.ChainVersionNamer(def.Testnet.Network))
xProvider := xprovider.New(network, def.Backends().RPCClients(), cProvider)
cChainID := def.Testnet.Network.Static().OmniConsensusChainIDUint64()

Expand Down Expand Up @@ -153,7 +153,7 @@ func MonitorCProvider(ctx context.Context, node *e2e.Node, network netconf.Netwo
return errors.Wrap(err, "getting client")
}

cprov := cprovider.NewABCIProvider(client, network.ID, netconf.ChainVersionNamer(network.ID))
cprov := cprovider.NewABCI(client, network.ID, netconf.ChainVersionNamer(network.ID))

for _, chain := range network.Chains {
for _, chainVer := range chain.ChainVersions() {
Expand Down
4 changes: 2 additions & 2 deletions e2e/test/attestations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestApprovedAttestations(t *testing.T) {

client, err := node.Client()
require.NoError(t, err)
cprov := provider.NewABCIProvider(client, network.ID, netconf.ChainVersionNamer(netconf.Simnet))
cprov := provider.NewABCI(client, network.ID, netconf.ChainVersionNamer(netconf.Simnet))

ctx := context.Background()
for _, portal := range portals {
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestApprovedValUpdates(t *testing.T) {

client, err := node.Client()
require.NoError(t, err)
cprov := provider.NewABCIProvider(client, network.ID, netconf.ChainVersionNamer(netconf.Simnet))
cprov := provider.NewABCI(client, network.ID, netconf.ChainVersionNamer(netconf.Simnet))

addr, err := k1util.PubKeyToAddress(node.PrivvalKey.PubKey())
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion e2e/test/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestCLIOperator(t *testing.T) {
cl, err := http.New(testnet.Network.Static().ConsensusRPC(), "/websocket")
require.NoError(t, err)

cprov := provider.NewABCIProvider(cl, network.ID, netconf.ChainVersionNamer(network.ID))
cprov := provider.NewABCI(cl, network.ID, netconf.ChainVersionNamer(network.ID))

// wait for validator to be created
const valChangeWait = 15 * time.Second
Expand Down
3 changes: 1 addition & 2 deletions halo/app/lazyvoter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/omni-network/omni/halo/comet"
vtypes "github.com/omni-network/omni/halo/valsync/types"
"github.com/omni-network/omni/lib/cchain"
cprovider "github.com/omni-network/omni/lib/cchain/provider"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/ethclient"
"github.com/omni-network/omni/lib/expbackoff"
Expand Down Expand Up @@ -66,7 +65,7 @@ func (l *voterLoader) LazyLoad(
netID netconf.ID,
omniEVMCl ethclient.Client,
endpoints xchain.RPCEndpoints,
cprov cprovider.Provider,
cprov cchain.Provider,
privKey crypto.PrivKey,
voterStateFile string,
cmtAPI comet.API,
Expand Down
2 changes: 1 addition & 1 deletion halo/app/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestPruningHistory(t *testing.T) {
cl, err := rpchttp.New(cfg.Comet.RPC.ListenAddress, "/websocket")
require.NoError(t, err)

cprov := cprovider.NewABCIProvider(cl, netconf.Simnet, netconf.ChainVersionNamer(netconf.Simnet))
cprov := cprovider.NewABCI(cl, netconf.Simnet, netconf.ChainVersionNamer(netconf.Simnet))

// Wait until we get to block 1.
waitUntilHeight := uint64(1)
Expand Down
26 changes: 20 additions & 6 deletions halo/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
halocfg "github.com/omni-network/omni/halo/config"
"github.com/omni-network/omni/halo/genutil/genserve"
"github.com/omni-network/omni/lib/buildinfo"
"github.com/omni-network/omni/lib/cchain"
cprovider "github.com/omni-network/omni/lib/cchain/provider"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/ethclient"
Expand Down Expand Up @@ -174,7 +175,15 @@ func Start(ctx context.Context, cfg Config) (<-chan error, func(context.Context)
cmtAPI := comet.NewAPI(rpcClient)
app.SetCometAPI(cmtAPI)

cProvider := cprovider.NewABCIProvider(rpcClient, cfg.Network, netconf.ChainVersionNamer(cfg.Network))
clientCtx := app.ClientContext(ctx).WithClient(rpcClient).WithHomeDir(cfg.HomeDir)
if err := startRPCServers(ctx, cfg, app, sdkLogger, metrics, asyncAbort, clientCtx); err != nil {
return nil, nil, err
}

cProvider, err := newCProvider(rpcClient, cfg)
if err != nil {
return nil, nil, err
}

go func() {
err := voter.LazyLoad(
Expand All @@ -193,11 +202,6 @@ func Start(ctx context.Context, cfg Config) (<-chan error, func(context.Context)
}
}()

clientCtx := app.ClientContext(ctx).WithClient(rpcClient).WithHomeDir(cfg.HomeDir)
if err := startRPCServers(ctx, cfg, app, sdkLogger, metrics, asyncAbort, clientCtx); err != nil {
return nil, nil, err
}

log.Info(ctx, "Starting CometBFT")

if err := cmtNode.Start(); err != nil {
Expand Down Expand Up @@ -239,6 +243,16 @@ func Start(ctx context.Context, cfg Config) (<-chan error, func(context.Context)
}, nil
}

// newCProvider returns a new cchain provider. Either GRPC if enabled since it is faster,
// otherwise the ABCI provider.
func newCProvider(rpcClient *rpclocal.Local, cfg Config) (cchain.Provider, error) {
if cfg.SDKGRPC.Enable {
return cprovider.NewGRPC(cfg.SDKGRPC.Address, cfg.Network, netconf.ChainVersionNamer(cfg.Network))
}

return cprovider.NewABCI(rpcClient, cfg.Network, netconf.ChainVersionNamer(cfg.Network)), nil
}

// startRPCServers starts the Cosmos REST and gRPC servers.
func startRPCServers(
ctx context.Context,
Expand Down
40 changes: 4 additions & 36 deletions halo/app/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ import (

haloapp "github.com/omni-network/omni/halo/app"
uluwatu1 "github.com/omni-network/omni/halo/app/upgrades/uluwatu"
atypes "github.com/omni-network/omni/halo/attest/types"
halocmd "github.com/omni-network/omni/halo/cmd"
halocfg "github.com/omni-network/omni/halo/config"
ptypes "github.com/omni-network/omni/halo/portal/types"
"github.com/omni-network/omni/lib/cchain/grpc"
cprovider "github.com/omni-network/omni/lib/cchain/provider"
"github.com/omni-network/omni/lib/ethclient"
"github.com/omni-network/omni/lib/log"
Expand All @@ -29,8 +26,6 @@ import (
"github.com/cometbft/cometbft/types"

db "github.com/cosmos/cosmos-db"
sltypes "github.com/cosmos/cosmos-sdk/x/slashing/types"
stypes "github.com/cosmos/cosmos-sdk/x/staking/types"
"github.com/stretchr/testify/require"
)

Expand All @@ -54,7 +49,9 @@ func TestSmoke(t *testing.T) {
cl, err := rpchttp.New(cfg.Comet.RPC.ListenAddress, "/websocket")
require.NoError(t, err)

cprov := cprovider.NewABCIProvider(cl, netconf.Simnet, netconf.ChainVersionNamer(netconf.Simnet))
cprov := cprovider.NewABCI(cl, netconf.Simnet, netconf.ChainVersionNamer(netconf.Simnet))
cprovGRPC, err := cprovider.NewGRPC(cfg.SDKGRPC.Address, netconf.Simnet, netconf.ChainVersionNamer(netconf.Simnet))
require.NoError(t, err)

// Wait until we get to block 3.
const target = uint64(3)
Expand All @@ -70,8 +67,8 @@ func TestSmoke(t *testing.T) {

testReadyEndpoint(t, cfg)
testAPI(t, cfg)
testGRPC(t, ctx, cfg)
testCProvider(t, ctx, cprov)
testCProvider(t, ctx, cprovGRPC)

genSet, err := cl.Validators(ctx, int64Ptr(1), nil, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -153,35 +150,6 @@ func testReadyEndpoint(t *testing.T, cfg haloapp.Config) {
require.NoError(t, err)
}

func testGRPC(t *testing.T, ctx context.Context, cfg haloapp.Config) {
t.Helper()
cl, err := grpc.Dial(cfg.SDKGRPC.Address)
require.NoError(t, err)

vals, err := cl.Staking.Validators(ctx, &stypes.QueryValidatorsRequest{})
require.NoError(t, err)
require.NotEmpty(t, vals.Validators)

infos, err := cl.Slashing.SigningInfos(ctx, &sltypes.QuerySigningInfosRequest{})
require.NoError(t, err)
require.NotEmpty(t, infos.Info)

pResp, err := cl.Slashing.Params(ctx, &sltypes.QueryParamsRequest{})
require.NoError(t, err)
require.Equal(t, uluwatu1.SlashingParams, pResp.Params)

_, err = cl.Portal.Block(ctx, &ptypes.BlockRequest{Latest: true})
require.NoError(t, err)

_, err = cl.Attest.ListAllAttestations(ctx, &atypes.ListAllAttestationsRequest{
ChainId: cfg.Network.Static().OmniExecutionChainID,
ConfLevel: uint32(xchain.ConfFinalized),
Status: 1,
FromOffset: 0,
})
require.NoError(t, err)
}

func testCProvider(t *testing.T, ctx context.Context, cprov cprovider.Provider) {
t.Helper()

Expand Down
33 changes: 0 additions & 33 deletions lib/cchain/grpc/client.go

This file was deleted.

5 changes: 0 additions & 5 deletions lib/cchain/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
rtypes "github.com/omni-network/omni/halo/registry/types"
"github.com/omni-network/omni/lib/xchain"

rpcclient "github.com/cometbft/cometbft/rpc/client"

"github.com/ethereum/go-ethereum/common"

utypes "cosmossdk.io/x/upgrade/types"
Expand Down Expand Up @@ -77,9 +75,6 @@ type Provider interface {
// GenesisFiles returns the execution (optional) and consensus genesis files.
GenesisFiles(ctx context.Context) (execution []byte, consensus []byte, err error)

// CometClient returns the underlying cometBFT RPC client.
CometClient() rpcclient.Client

// Portals returns the portals registered in the registry module.
Portals(ctx context.Context) ([]rtypes.Portal, bool, error)

Expand Down
Loading

0 comments on commit b928597

Please sign in to comment.