Skip to content

Commit

Permalink
polygon/bridge: Support Astrid bridge on standalone and privateapi se…
Browse files Browse the repository at this point in the history
…rver (#11693)

Fixes #11485
  • Loading branch information
shohamc1 authored Sep 7, 2024
1 parent cabea06 commit 79d6617
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 50 deletions.
68 changes: 39 additions & 29 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/erigontech/erigon-lib/kv/temporal"
"github.com/erigontech/erigon-lib/log/v3"
libstate "github.com/erigontech/erigon-lib/state"
"github.com/erigontech/erigon/polygon/bridge"

"github.com/erigontech/erigon/cmd/rpcdaemon/cli/httpcfg"
"github.com/erigontech/erigon/cmd/rpcdaemon/graphql"
Expand Down Expand Up @@ -94,6 +95,7 @@ var rootCmd = &cobra.Command{

var (
stateCacheStr string
polygonSync bool
)

func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
Expand All @@ -105,6 +107,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().BoolVar(&cfg.GraphQLEnabled, "graphql", false, "enables graphql endpoint (disabled by default)")
rootCmd.PersistentFlags().Uint64Var(&cfg.Gascap, "rpc.gascap", 50_000_000, "Sets a cap on gas that can be used in eth_call/estimateGas")
rootCmd.PersistentFlags().Uint64Var(&cfg.MaxTraces, "trace.maxtraces", 200, "Sets a limit on traces that can be returned in trace_filter")
rootCmd.PersistentFlags().BoolVar(&polygonSync, "polygon.sync", false, "Enable if Erigon has been synced using the new polygon sync component")

rootCmd.PersistentFlags().StringVar(&cfg.RpcAllowListFilePath, utils.RpcAccessListFlag.Name, "", "Specify granular (method-by-method) API allowlist")
rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, utils.RpcBatchConcurrencyFlag.Name, 2, utils.RpcBatchConcurrencyFlag.Usage)
Expand Down Expand Up @@ -320,24 +323,24 @@ func EmbeddedServices(ctx context.Context,
func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger, rootCancel context.CancelFunc) (
db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
stateCache kvcache.Cache, blockReader services.FullBlockReader, engine consensus.EngineReader,
ff *rpchelper.Filters, err error) {
ff *rpchelper.Filters, bridgeReader bridge.ReaderService, err error) {
if !cfg.WithDatadir && cfg.PrivateApiAddr == "" {
return nil, nil, nil, nil, nil, nil, nil, ff, errors.New("either remote db or local db must be specified")
return nil, nil, nil, nil, nil, nil, nil, ff, nil, errors.New("either remote db or local db must be specified")
}
creds, err := grpcutil.TLS(cfg.TLSCACert, cfg.TLSCertfile, cfg.TLSKeyFile)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("open tls cert: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("open tls cert: %w", err)
}
conn, err := grpcutil.Connect(creds, cfg.PrivateApiAddr)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to execution service privateApi: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err)
}

remoteBackendClient := remote.NewETHBACKENDClient(conn)
remoteKvClient := remote.NewKVClient(conn)
remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, remoteKvClient).Open()
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to remoteKv: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
}

// Configure DB first
Expand All @@ -362,10 +365,10 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
limiter := semaphore.NewWeighted(int64(cfg.DBReadConcurrency))
rwKv, err = kv2.NewMDBX(logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Accede().Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, err
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
}
if compatErr := checkDbCompatibility(ctx, rwKv); compatErr != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, compatErr
return nil, nil, nil, nil, nil, nil, nil, ff, nil, compatErr
}
db = rwKv

Expand All @@ -380,10 +383,10 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}
return nil
}); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, err
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
}
if cc == nil {
return nil, nil, nil, nil, nil, nil, nil, ff, errors.New("chain config not found in db. Need start erigon at least once on this db")
return nil, nil, nil, nil, nil, nil, nil, ff, nil, errors.New("chain config not found in db. Need start erigon at least once on this db")
}

// Configure sapshots
Expand All @@ -401,7 +404,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
cr := rawdb.NewCanonicalReader(txNumsReader)
agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("create aggregator: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
}
_ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB`

Expand Down Expand Up @@ -452,7 +455,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger

db, err = temporal.New(rwKv, agg)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, nil, nil, nil, err
}
stateCache = kvcache.NewDummy()
}
Expand All @@ -476,7 +479,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
if cfg.TxPoolApiAddr != cfg.PrivateApiAddr {
txpoolConn, err = grpcutil.Connect(creds, cfg.TxPoolApiAddr)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to txpool api: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to txpool api: %w", err)
}
}

Expand All @@ -496,29 +499,36 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
var remoteCE *remoteConsensusEngine

if cfg.WithDatadir {
switch {
case cc != nil:
switch {
case cc.Bor != nil:
var borKv kv.RoDB

// bor (consensus) specific db
borDbPath := filepath.Join(cfg.DataDir, "bor")
logger.Warn("[rpc] Opening Bor db", "path", borDbPath)
borKv, err = kv2.NewMDBX(logger).Path(borDbPath).Label(kv.ConsensusDB).Accede().Open(ctx)
if cc != nil && cc.Bor != nil {
if polygonSync {
stateReceiverContractAddress := cc.Bor.GetStateReceiverContract()
bridgeReader, err = bridge.AssembleReader(ctx, cfg.DataDir, logger, stateReceiverContractAddress)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, err
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
}
// Skip the compatibility check, until we have a schema in erigon-lib
engine = bor.NewRo(cc, borKv, blockReader, logger)
default:
engine = ethash.NewFaker()
}

default:
// NOTE: bor_* RPCs are not fully supported when using polygon.sync (https://github.com/erigontech/erigon/issues/11171)
var borKv kv.RoDB

// bor (consensus) specific db
borDbPath := filepath.Join(cfg.DataDir, "bor")
logger.Warn("[rpc] Opening Bor db", "path", borDbPath)
borKv, err = kv2.NewMDBX(logger).Path(borDbPath).Label(kv.ConsensusDB).Accede().Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
}
// Skip the compatibility check, until we have a schema in erigon-lib
engine = bor.NewRo(cc, borKv, blockReader, logger)
} else {
engine = ethash.NewFaker()
}
} else {
if cc != nil && cc.Bor != nil && polygonSync {
stateReceiverContractAddress := cc.Bor.GetStateReceiverContract()
bridgeReader = bridge.NewRemoteReader(remoteBackendClient, stateReceiverContractAddress)
}

remoteCE = &remoteConsensusEngine{}
engine = remoteCE
}
Expand All @@ -544,7 +554,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}()

ff = rpchelper.New(ctx, cfg.RpcFiltersConfig, eth, txPool, mining, onNewSnapshot, logger)
return db, eth, txPool, mining, stateCache, blockReader, engine, ff, err
return db, eth, txPool, mining, stateCache, blockReader, engine, ff, bridgeReader, err
}

func StartRpcServer(ctx context.Context, cfg *httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) error {
Expand Down
7 changes: 5 additions & 2 deletions cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func main() {
cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
logger := debug.SetupCobra(cmd, "sentry")
db, backend, txPool, mining, stateCache, blockReader, engine, ff, err := cli.RemoteServices(ctx, cfg, logger, rootCancel)
db, backend, txPool, mining, stateCache, blockReader, engine, ff, bridgeReader, err := cli.RemoteServices(ctx, cfg, logger, rootCancel)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("Could not connect to DB", "err", err)
Expand All @@ -49,8 +49,11 @@ func main() {
}
defer db.Close()
defer engine.Close()
if bridgeReader != nil {
defer bridgeReader.Close()
}

apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, cfg, engine, logger, nil)
apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, cfg, engine, logger, bridgeReader)
rpc.PreAllocateRPCMetricLabels(apiList)
if err := cli.StartRpcServer(ctx, cfg, apiList, logger); err != nil {
logger.Error(err.Error())
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/rpcdaemontest/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func CreateTestGrpcConn(t *testing.T, m *mock.MockSentry) (context.Context, *grp
server := grpc.NewServer()

remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events,
m.BlockReader, log.New(), builder.NewLatestBlockBuiltStore()))
m.BlockReader, log.New(), builder.NewLatestBlockBuiltStore(), nil))
txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer)
txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi, m.Log))
listener := bufconn.Listen(1024 * 1024)
Expand Down
1 change: 1 addition & 0 deletions erigon-lib/chain/chain_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type BorConfig interface {
IsNapoli(num uint64) bool
GetNapoliBlock() *big.Int
IsAhmedabad(number uint64) bool
GetStateReceiverContract() string
}

func (c *Config) String() string {
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}

// Initialize ethbackend
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events, blockReader, logger, latestBlockBuiltStore)
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events, blockReader, logger, latestBlockBuiltStore, polygonBridge)
// initialize engine backend

blockSnapBuildSema := semaphore.NewWeighted(int64(dbg.BuildSnapshotAllowance))
Expand Down
49 changes: 45 additions & 4 deletions ethdb/privateapi/ethbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/core/types"

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/direct"
Expand All @@ -40,6 +41,11 @@ import (
"github.com/erigontech/erigon/turbo/shards"
)

type bridgeReader interface {
Events(ctx context.Context, blockNum uint64) ([]*types.Message, error)
EventTxnLookup(ctx context.Context, borTxHash libcommon.Hash) (uint64, bool, error)
}

// EthBackendAPIVersion
// 2.0.0 - move all mining-related methods to 'txpool/mining' server
// 2.1.0 - add NetPeerCount function
Expand All @@ -58,6 +64,7 @@ type EthBackendServer struct {
events *shards.Events
db kv.RoDB
blockReader services.FullBlockReader
bridgeReader bridgeReader
latestBlockBuiltStore *builder.LatestBlockBuiltStore

logsFilter *LogsFilterAggregator
Expand All @@ -74,9 +81,15 @@ type EthBackend interface {
}

func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, events *shards.Events, blockReader services.FullBlockReader,
logger log.Logger, latestBlockBuiltStore *builder.LatestBlockBuiltStore,
logger log.Logger, latestBlockBuiltStore *builder.LatestBlockBuiltStore, bridgeReader bridgeReader,
) *EthBackendServer {
s := &EthBackendServer{ctx: ctx, eth: eth, events: events, db: db, blockReader: blockReader,
s := &EthBackendServer{
ctx: ctx,
eth: eth,
events: events,
db: db,
blockReader: blockReader,
bridgeReader: bridgeReader,
logsFilter: NewLogsFilterAggregator(events),
logger: logger,
latestBlockBuiltStore: latestBlockBuiltStore,
Expand Down Expand Up @@ -329,6 +342,18 @@ func (s *EthBackendServer) SubscribeLogs(server remote.ETHBACKEND_SubscribeLogsS
}

func (s *EthBackendServer) BorTxnLookup(ctx context.Context, req *remote.BorTxnLookupRequest) (*remote.BorTxnLookupReply, error) {
if s.bridgeReader != nil {
blockNum, ok, err := s.bridgeReader.EventTxnLookup(ctx, gointerfaces.ConvertH256ToHash(req.BorTxHash))
if err != nil {
return nil, err
}

return &remote.BorTxnLookupReply{
Present: ok,
BlockNumber: blockNum,
}, nil
}

tx, err := s.db.BeginRo(ctx)
if err != nil {
return nil, err
Expand All @@ -346,6 +371,22 @@ func (s *EthBackendServer) BorTxnLookup(ctx context.Context, req *remote.BorTxnL
}

func (s *EthBackendServer) BorEvents(ctx context.Context, req *remote.BorEventsRequest) (*remote.BorEventsReply, error) {
if s.bridgeReader != nil {
events, err := s.bridgeReader.Events(ctx, req.BlockNum)
if err != nil {
return nil, err
}

eventsRaw := make([][]byte, len(events))
for i, event := range events {
eventsRaw[i] = event.Data()
}

return &remote.BorEventsReply{
EventRlps: eventsRaw,
}, nil
}

tx, err := s.db.BeginRo(ctx)
if err != nil {
return nil, err
Expand All @@ -358,8 +399,8 @@ func (s *EthBackendServer) BorEvents(ctx context.Context, req *remote.BorEventsR
}

eventsRaw := make([][]byte, len(events))
for i, e := range events {
eventsRaw[i] = e
for i, event := range events {
eventsRaw[i] = event
}

return &remote.BorEventsReply{
Expand Down
4 changes: 4 additions & 0 deletions polygon/bor/borcfg/bor_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func (c *BorConfig) CalculateStateSyncDelay(number uint64) uint64 {
return borKeyValueConfigHelper(c.StateSyncConfirmationDelay, number)
}

func (c *BorConfig) GetStateReceiverContract() string {
return c.StateReceiverContract
}

func borKeyValueConfigHelper[T uint64 | common.Address](field map[string]T, number uint64) T {
fieldUint := make(map[uint64]T)
for k, v := range field {
Expand Down
2 changes: 1 addition & 1 deletion polygon/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type eventFetcher interface {
}

func Assemble(dataDir string, logger log.Logger, borConfig *borcfg.BorConfig, eventFetcher eventFetcher) *Bridge {
bridgeDB := polygoncommon.NewDatabase(dataDir, kv.PolygonBridgeDB, databaseTablesCfg, logger)
bridgeDB := polygoncommon.NewDatabase(dataDir, kv.PolygonBridgeDB, databaseTablesCfg, logger, false /* accede */)
bridgeStore := NewStore(bridgeDB)
reader := NewReader(bridgeStore, logger, borConfig.StateReceiverContract)
return NewBridge(bridgeStore, logger, borConfig, eventFetcher, reader)
Expand Down
Loading

0 comments on commit 79d6617

Please sign in to comment.