From 6421d48c2a40f75ccf9f75c27f8458c285f4078c Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Tue, 17 Oct 2023 09:54:31 +0800 Subject: [PATCH 1/2] feat(op-node/op-batcher/op-proposer): add fallbackClient (#55) * FallbackClient impl * double check fail count * RegisterSubscribeFunc * FallbackClient for op-batcher,op-proposer * miss currentClient * add log and change order * fallback client add fallbackThreshold * add validateRpc * add document * add document * Put the switching logic into goroutine and modify the code according to the comments * add metrics and don't switch url when error is Rpc.Error * use const to remove magic number * fix NoopTxMetrics * add TestL1FallbackClient_SwitchUrl e2e case * should be >= threshold * change threshold to 20 * log->logT * miss make channel * fix lint --------- Co-authored-by: Welkin --- op-batcher/batcher/config.go | 4 +- op-batcher/batcher/driver.go | 2 +- op-batcher/flags/flags.go | 2 +- op-batcher/metrics/metrics.go | 5 +- op-batcher/metrics/noop.go | 3 + op-e2e/actions/fallback_client_test.go | 120 +++++++++ op-e2e/actions/l1_miner.go | 7 + op-e2e/actions/l1_replica.go | 45 ++++ op-node/flags/flags.go | 2 +- op-node/metrics/metrics.go | 10 + op-node/node/client.go | 19 ++ op-node/node/node.go | 7 + op-node/sources/fallback_client.go | 282 +++++++++++++++++++ op-proposer/flags/flags.go | 2 +- op-proposer/metrics/metrics.go | 5 +- op-proposer/metrics/noop.go | 3 + op-proposer/proposer/config.go | 5 +- op-proposer/proposer/l2_output_submitter.go | 2 +- op-service/client/ethclient.go | 52 ++++ op-service/client/fallback_client.go | 284 ++++++++++++++++++++ op-service/metrics/balance.go | 4 +- op-service/txmgr/cli.go | 7 +- op-service/txmgr/metrics/noop.go | 1 + op-service/txmgr/metrics/tx_metrics.go | 4 + op-service/txmgr/txmgr.go | 2 +- 25 files changed, 861 insertions(+), 18 deletions(-) create mode 100644 op-e2e/actions/fallback_client_test.go create mode 100644 op-node/sources/fallback_client.go create mode 100644 op-service/client/fallback_client.go diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index ecd241b3fd..7ca9ef0f13 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -3,6 +3,8 @@ package batcher import ( "time" + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli" @@ -22,7 +24,7 @@ import ( type Config struct { log log.Logger metr metrics.Metricer - L1Client *ethclient.Client + L1Client client.EthClient L2Client *ethclient.Client RollupNode *sources.RollupClient TxManager txmgr.TxManager diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 09893c2ed4..5bbd884cff 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -53,7 +53,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri // Connect to L1 and L2 providers. Perform these last since they are the // most expensive. - l1Client, err := opclient.DialEthClientWithTimeout(ctx, cfg.L1EthRpc, opclient.DefaultDialTimeout) + l1Client, err := opclient.DialEthClientWithTimeoutAndFallback(ctx, cfg.L1EthRpc, opclient.DefaultDialTimeout, l, opclient.BatcherFallbackThreshold, m) if err != nil { return nil, err } diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index c82e26d096..4fcb9daf77 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -22,7 +22,7 @@ var ( // Required flags L1EthRpcFlag = cli.StringFlag{ Name: "l1-eth-rpc", - Usage: "HTTP provider URL for L1", + Usage: "HTTP provider URL for L1. Multiple alternative addresses are supported, separated by commas, and the first address is used by default", EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "L1_ETH_RPC"), } L2EthRpcFlag = cli.StringFlag{ diff --git a/op-batcher/metrics/metrics.go b/op-batcher/metrics/metrics.go index 9fba14da6e..466432a01a 100644 --- a/op-batcher/metrics/metrics.go +++ b/op-batcher/metrics/metrics.go @@ -3,9 +3,10 @@ package metrics import ( "context" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" "github.com/prometheus/client_golang/prometheus" @@ -185,7 +186,7 @@ func (m *Metrics) Document() []opmetrics.DocumentedMetric { } func (m *Metrics) StartBalanceMetrics(ctx context.Context, - l log.Logger, client *ethclient.Client, account common.Address) { + l log.Logger, client ethereum.ChainStateReader, account common.Address) { opmetrics.LaunchBalanceMetrics(ctx, l, m.registry, m.ns, client, account) } diff --git a/op-batcher/metrics/noop.go b/op-batcher/metrics/noop.go index d949d8844c..6293fcf7c8 100644 --- a/op-batcher/metrics/noop.go +++ b/op-batcher/metrics/noop.go @@ -35,3 +35,6 @@ func (*noopMetrics) RecordChannelTimedOut(derive.ChannelID) {} func (*noopMetrics) RecordBatchTxSubmitted() {} func (*noopMetrics) RecordBatchTxSuccess() {} func (*noopMetrics) RecordBatchTxFailed() {} + +func (m *noopMetrics) RecordL1UrlSwitchEvt(url string) { +} diff --git a/op-e2e/actions/fallback_client_test.go b/op-e2e/actions/fallback_client_test.go new file mode 100644 index 0000000000..a5fad0e0a7 --- /dev/null +++ b/op-e2e/actions/fallback_client_test.go @@ -0,0 +1,120 @@ +package actions + +import ( + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" + "github.com/ethereum-optimism/optimism/op-node/client" + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/sources" + "github.com/ethereum-optimism/optimism/op-node/testlog" + service_client "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + "github.com/stretchr/testify/require" + "math/big" + "testing" + "time" +) + +func setupFallbackClientTest(t Testing, sd *e2eutils.SetupData, log log.Logger, l1Url string) (*L1Miner, *L1Replica, *L1Replica, *L2Engine, *L2Sequencer, *sources.FallbackClient) { + jwtPath := e2eutils.WriteDefaultJWT(t) + + miner := NewL1MinerWithPort(t, log, sd.L1Cfg, 8545) + l1_2 := NewL1ReplicaWithPort(t, log, sd.L1Cfg, 8546) + l1_3 := NewL1ReplicaWithPort(t, log, sd.L1Cfg, 8547) + isMultiUrl, urlList := service_client.MultiUrlParse(l1Url) + require.True(t, isMultiUrl) + opts := []client.RPCOption{ + client.WithHttpPollInterval(0), + client.WithDialBackoff(10), + } + rpc, err := client.NewRPC(t.Ctx(), log, urlList[0], opts...) + require.NoError(t, err) + fallbackClient := sources.NewFallbackClient(t.Ctx(), rpc, urlList, log, sd.RollupCfg.L1ChainID, sd.RollupCfg.Genesis.L1, func(url string) (client.RPC, error) { + return client.NewRPC(t.Ctx(), log, url, opts...) + }) + l1F, err := sources.NewL1Client(fallbackClient, log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic)) + require.NoError(t, err) + engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) + l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) + require.NoError(t, err) + + sequencer := NewL2Sequencer(t, log, l1F, l2Cl, sd.RollupCfg, 0) + return miner, l1_2, l1_3, engine, sequencer, fallbackClient.(*sources.FallbackClient) +} + +func TestL1FallbackClient_SwitchUrl(gt *testing.T) { + t := NewDefaultTesting(gt) + p := &e2eutils.TestParams{ + MaxSequencerDrift: 300, + SequencerWindowSize: 200, + ChannelTimeout: 120, + L1BlockTime: 12, + } + dp := e2eutils.MakeDeployParams(t, p) + sd := e2eutils.Setup(t, dp, defaultAlloc) + logT := testlog.Logger(t, log.LvlDebug) + miner, l1_2, _, engine, sequencer, fallbackClient := setupFallbackClientTest(t, sd, logT, "http://127.0.0.1:8545,http://127.0.0.1:8546,http://127.0.0.1:8547") + miner.ActL1SetFeeRecipient(common.Address{'A'}) + + sequencer.ActL2PipelineFull(t) + + signer := types.LatestSigner(sd.L2Cfg.Config) + cl := engine.EthClient() + aliceTx := func() { + n, err := cl.PendingNonceAt(t.Ctx(), dp.Addresses.Alice) + require.NoError(t, err) + tx := types.MustSignNewTx(dp.Secrets.Alice, signer, &types.DynamicFeeTx{ + ChainID: sd.L2Cfg.Config.ChainID, + Nonce: n, + GasTipCap: big.NewInt(2 * params.GWei), + GasFeeCap: new(big.Int).Add(miner.l1Chain.CurrentBlock().BaseFee, big.NewInt(2*params.GWei)), + Gas: params.TxGas, + To: &dp.Addresses.Bob, + Value: e2eutils.Ether(2), + }) + require.NoError(gt, cl.SendTransaction(t.Ctx(), tx)) + } + makeL2BlockWithAliceTx := func() { + aliceTx() + sequencer.ActL2StartBlock(t) + engine.ActL2IncludeTx(dp.Addresses.Alice)(t) // include a test tx from alice + sequencer.ActL2EndBlock(t) + } + + errRpc := miner.RPCClient().CallContext(t.Ctx(), nil, "admin_stopHTTP") + require.NoError(t, errRpc) + + l2BlockCount := 0 + for i := 0; i < 6; i++ { + miner.ActL1StartBlock(12)(t) + miner.ActL1EndBlock(t) + newBlock := miner.l1Chain.GetBlockByHash(miner.l1Chain.CurrentBlock().Hash()) + _, err := l1_2.l1Chain.InsertChain([]*types.Block{newBlock}) + require.NoError(t, err) + + sequencer.L2Verifier.l1State.HandleNewL1HeadBlock(eth.L1BlockRef{ + Hash: newBlock.Hash(), + Number: newBlock.NumberU64(), + ParentHash: newBlock.ParentHash(), + Time: newBlock.Time(), + }) + origin := miner.l1Chain.CurrentBlock() + + for sequencer.SyncStatus().UnsafeL2.Time+sd.RollupCfg.BlockTime < origin.Time { + makeL2BlockWithAliceTx() + //require.Equal(t, uint64(i), sequencer.SyncStatus().UnsafeL2.L1Origin.Number, "no L1 origin change before time matches") + l2BlockCount++ + if l2BlockCount == 23 { + require.Equal(t, 1, fallbackClient.GetCurrentIndex(), "fallback client should switch url to second url") + errRpc2 := miner.RPCClient().CallContext(t.Ctx(), nil, "admin_startHTTP", "127.0.0.1", 8545, "*", "eth,net,web3,debug,admin,txpool", "*") + require.NoError(t, errRpc2) + } + if l2BlockCount == 34 { + require.Equal(t, 0, fallbackClient.GetCurrentIndex(), "fallback client should recover url to first url") + } + time.Sleep(500 * time.Millisecond) + } + } +} diff --git a/op-e2e/actions/l1_miner.go b/op-e2e/actions/l1_miner.go index 8e3afe26b9..6be9c08d22 100644 --- a/op-e2e/actions/l1_miner.go +++ b/op-e2e/actions/l1_miner.go @@ -39,6 +39,13 @@ func NewL1Miner(t Testing, log log.Logger, genesis *core.Genesis) *L1Miner { } } +func NewL1MinerWithPort(t Testing, log log.Logger, genesis *core.Genesis, port int) *L1Miner { + rep := NewL1ReplicaWithPort(t, log, genesis, port) + return &L1Miner{ + L1Replica: *rep, + } +} + // ActL1StartBlock returns an action to build a new L1 block on top of the head block, // with timeDelta added to the head block time. func (s *L1Miner) ActL1StartBlock(timeDelta uint64) Action { diff --git a/op-e2e/actions/l1_replica.go b/op-e2e/actions/l1_replica.go index d3d18dec54..065dab7cdc 100644 --- a/op-e2e/actions/l1_replica.go +++ b/op-e2e/actions/l1_replica.go @@ -89,6 +89,51 @@ func NewL1Replica(t Testing, log log.Logger, genesis *core.Genesis) *L1Replica { } } +func NewL1ReplicaWithPort(t Testing, log log.Logger, genesis *core.Genesis, port int) *L1Replica { + ethCfg := ðconfig.Config{ + NetworkId: genesis.Config.ChainID.Uint64(), + Genesis: genesis, + RollupDisableTxPoolGossip: true, + } + nodeCfg := &node.Config{ + Name: "l1-geth", + WSHost: "127.0.0.1", + WSPort: port, + HTTPHost: "127.0.0.1", + HTTPPort: port, + WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal"}, + HTTPModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal"}, + DataDir: "", // in-memory + P2P: p2p.Config{ + NoDiscovery: true, + NoDial: true, + }, + } + n, err := node.New(nodeCfg) + require.NoError(t, err) + t.Cleanup(func() { + _ = n.Close() + }) + + backend, err := eth.New(n, ethCfg) + require.NoError(t, err) + backend.Merger().FinalizePoS() + + n.RegisterAPIs(tracers.APIs(backend.APIBackend)) + + require.NoError(t, n.Start(), "failed to start L1 geth node") + return &L1Replica{ + log: log, + node: n, + eth: backend, + l1Chain: backend.BlockChain(), + l1Database: backend.ChainDb(), + l1Cfg: genesis, + l1Signer: types.LatestSigner(genesis.Config), + failL1RPC: nil, + } +} + // ActL1RewindToParent rewinds the L1 chain to parent block of head func (s *L1Replica) ActL1RewindToParent(t Testing) { s.ActL1RewindDepth(1)(t) diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 0c9df1251b..878b437ffe 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -25,7 +25,7 @@ var ( /* Required Flags */ L1NodeAddr = cli.StringFlag{ Name: "l1", - Usage: "Address of L1 User JSON-RPC endpoint to use (eth namespace required)", + Usage: "Address of L1 User JSON-RPC endpoint to use (eth namespace required). Multiple alternative addresses are supported, separated by commas, and the first address is used by default", Value: "http://127.0.0.1:8545", EnvVar: prefixEnvVar("L1_ETH_RPC"), } diff --git a/op-node/metrics/metrics.go b/op-node/metrics/metrics.go index af66152216..2c2751042e 100644 --- a/op-node/metrics/metrics.go +++ b/op-node/metrics/metrics.go @@ -76,6 +76,7 @@ type Metricer interface { RecordIPUnban() RecordDial(allow bool) RecordAccept(allow bool) + RecordL1UrlSwitchEvent() } // Metrics tracks all the metrics for the op-node. @@ -99,6 +100,7 @@ type Metrics struct { DerivationErrors *EventMetrics SequencingErrors *EventMetrics PublishingErrors *EventMetrics + L1UrlSwitchEvent *EventMetrics P2PReqDurationSeconds *prometheus.HistogramVec P2PReqTotal *prometheus.CounterVec @@ -236,6 +238,7 @@ func NewMetrics(procName string) *Metrics { DerivationErrors: NewEventMetrics(factory, ns, "derivation_errors", "derivation errors"), SequencingErrors: NewEventMetrics(factory, ns, "sequencing_errors", "sequencing errors"), PublishingErrors: NewEventMetrics(factory, ns, "publishing_errors", "p2p publishing errors"), + L1UrlSwitchEvent: NewEventMetrics(factory, ns, "l1_url_switch", "L1 URL switch events"), SequencerInconsistentL1Origin: NewEventMetrics(factory, ns, "sequencer_inconsistent_l1_origin", "events when the sequencer selects an inconsistent L1 origin"), SequencerResets: NewEventMetrics(factory, ns, "sequencer_resets", "sequencer resets"), @@ -725,6 +728,10 @@ func (m *Metrics) RecordAccept(allow bool) { } } +func (m *Metrics) RecordL1UrlSwitchEvent() { + m.L1UrlSwitchEvent.RecordEvent() +} + type noopMetricer struct{} var NoopMetrics Metricer = new(noopMetricer) @@ -845,3 +852,6 @@ func (n *noopMetricer) RecordDial(allow bool) { func (n *noopMetricer) RecordAccept(allow bool) { } + +func (n *noopMetricer) RecordL1UrlSwitchEvent() { +} diff --git a/op-node/node/client.go b/op-node/node/client.go index 4d70abe813..825170cd13 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/sources" + service_client "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum/go-ethereum/log" gn "github.com/ethereum/go-ethereum/node" @@ -175,6 +176,11 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize)) } + isMultiUrl, urlList := service_client.MultiUrlParse(cfg.L1NodeAddr) + if isMultiUrl { + return fallbackClientWrap(ctx, log, urlList, cfg, rollupCfg, opts...) + } + l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr, opts...) if err != nil { return nil, nil, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err) @@ -184,6 +190,19 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf return l1Node, rpcCfg, nil } +func fallbackClientWrap(ctx context.Context, logger log.Logger, urlList []string, cfg *L1EndpointConfig, rollupCfg *rollup.Config, opts ...client.RPCOption) (client.RPC, *sources.L1ClientConfig, error) { + l1Node, err := client.NewRPC(ctx, logger, urlList[0], opts...) + if err != nil { + return nil, nil, fmt.Errorf("failed to dial L1 address (%s): %w", urlList[0], err) + } + l1Node = sources.NewFallbackClient(ctx, l1Node, urlList, logger, rollupCfg.L1ChainID, rollupCfg.Genesis.L1, func(url string) (client.RPC, error) { + return client.NewRPC(ctx, logger, url, opts...) + }) + rpcCfg := sources.L1ClientDefaultConfig(rollupCfg, cfg.L1TrustRPC, cfg.L1RPCKind) + rpcCfg.MaxRequestsPerBatch = cfg.BatchSize + return l1Node, rpcCfg, nil +} + // PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1 type PreparedL1Endpoint struct { Client client.RPC diff --git a/op-node/node/node.go b/op-node/node/node.go index c76591fa98..6dc6ce3045 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -138,6 +138,13 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error { } return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewL1Head) }) + + if fallbackClient, ok := l1Node.(*sources.FallbackClient); ok { + fallbackClient.RegisterSubscribeFunc(func() (event.Subscription, error) { + return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewL1Head) + }, &n.l1HeadsSub) + fallbackClient.RegisterMetrics(n.metrics) + } go func() { err, ok := <-n.l1HeadsSub.Err() if !ok { diff --git a/op-node/sources/fallback_client.go b/op-node/sources/fallback_client.go new file mode 100644 index 0000000000..c8ed56f1a7 --- /dev/null +++ b/op-node/sources/fallback_client.go @@ -0,0 +1,282 @@ +package sources + +import ( + "context" + "errors" + "fmt" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum-optimism/optimism/op-node/client" + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/metrics" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" +) + +// FallbackClient is an RPC client, it can automatically switch to the next l1 endpoint +// when there is a problem with the current l1 endpoint +// and automatically switch back after the first l1 endpoint recovers. +type FallbackClient struct { + // firstRpc is created by the first of the l1 urls, it should be used first in a healthy state + firstRpc client.RPC + urlList []string + rpcInitFunc func(url string) (client.RPC, error) + lastMinuteFail atomic.Int64 + currentRpc atomic.Pointer[client.RPC] + currentIndex int + mx sync.Mutex + log log.Logger + isInFallbackState bool + subscribeFunc func() (event.Subscription, error) + l1HeadsSub *ethereum.Subscription + l1ChainId *big.Int + l1Block eth.BlockID + ctx context.Context + isClose chan struct{} + metrics metrics.Metricer +} + +const threshold int64 = 20 + +// NewFallbackClient returns a new FallbackClient. l1ChainId and l1Block are used to check +// whether the newly switched rpc is legal. +func NewFallbackClient(ctx context.Context, rpc client.RPC, urlList []string, log log.Logger, l1ChainId *big.Int, l1Block eth.BlockID, rpcInitFunc func(url string) (client.RPC, error)) client.RPC { + fallbackClient := &FallbackClient{ + ctx: ctx, + firstRpc: rpc, + urlList: urlList, + log: log, + rpcInitFunc: rpcInitFunc, + currentIndex: 0, + l1ChainId: l1ChainId, + l1Block: l1Block, + isClose: make(chan struct{}), + } + fallbackClient.currentRpc.Store(&rpc) + go func() { + ticker := time.NewTicker(1 * time.Minute) + for { + select { + case <-ticker.C: + log.Debug("FallbackClient clear lastMinuteFail 0") + fallbackClient.lastMinuteFail.Store(0) + case <-fallbackClient.isClose: + return + default: + if fallbackClient.lastMinuteFail.Load() >= threshold { + fallbackClient.switchCurrentRpc() + } + } + time.Sleep(1 * time.Second) + } + }() + return fallbackClient +} + +func (l *FallbackClient) Close() { + l.mx.Lock() + defer l.mx.Unlock() + l.isClose <- struct{}{} + currentRpc := *l.currentRpc.Load() + currentRpc.Close() + if currentRpc != l.firstRpc { + l.firstRpc.Close() + } +} + +func (l *FallbackClient) CallContext(ctx context.Context, result any, method string, args ...any) error { + err := (*l.currentRpc.Load()).CallContext(ctx, result, method, args...) + if err != nil { + l.handleErr(err) + } + return err +} + +func (l *FallbackClient) handleErr(err error) { + if errors.Is(err, rpc.ErrNoResult) { + return + } + var targetErr rpc.Error + if errors.As(err, &targetErr) { + return + } + l.lastMinuteFail.Add(1) +} + +func (l *FallbackClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { + err := (*l.currentRpc.Load()).BatchCallContext(ctx, b) + if err != nil { + l.handleErr(err) + } + return err +} + +func (l *FallbackClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { + subscribe, err := (*l.currentRpc.Load()).EthSubscribe(ctx, channel, args...) + if err != nil { + l.handleErr(err) + } + return subscribe, err +} + +func (l *FallbackClient) switchCurrentRpc() { + if l.currentIndex >= len(l.urlList) { + l.log.Error("the fallback client has tried all urls, but all failed") + return + } + l.mx.Lock() + defer l.mx.Unlock() + if l.lastMinuteFail.Load() <= threshold { + return + } + if l.metrics != nil { + l.metrics.RecordL1UrlSwitchEvent() + } + for { + l.currentIndex++ + if l.currentIndex >= len(l.urlList) { + l.log.Error("the fallback client has tried all urls, but all failed") + break + } + err := l.switchCurrentRpcLogic() + if err != nil { + l.log.Warn("the fallback client failed to switch the current client", "err", err) + } else { + break + } + } +} + +func (l *FallbackClient) switchCurrentRpcLogic() error { + url := l.urlList[l.currentIndex] + newRpc, err := l.rpcInitFunc(url) + if err != nil { + return fmt.Errorf("the fallback client init RPC failed,url:%s, err:%w", url, err) + } + vErr := l.validateRpc(newRpc) + if vErr != nil { + return vErr + } + lastRpc := *l.currentRpc.Load() + l.currentRpc.Store(&newRpc) + if lastRpc != l.firstRpc { + lastRpc.Close() + } + l.lastMinuteFail.Store(0) + if l.subscribeFunc != nil { + err := l.reSubscribeNewRpc(url) + if err != nil { + return err + } + } + l.log.Info("switched current rpc to new url", "url", url) + if !l.isInFallbackState { + l.isInFallbackState = true + l.recoverIfFirstRpcHealth() + } + return nil +} + +func (l *FallbackClient) reSubscribeNewRpc(url string) error { + (*l.l1HeadsSub).Unsubscribe() + subscriptionNew, err := l.subscribeFunc() + if err != nil { + l.log.Error("can not subscribe new url", "url", url, "err", err) + return err + } else { + *l.l1HeadsSub = subscriptionNew + } + return nil +} + +func (l *FallbackClient) recoverIfFirstRpcHealth() { + go func() { + count := 0 + for { + var id hexutil.Big + err := l.firstRpc.CallContext(l.ctx, &id, "eth_chainId") + if err != nil { + count = 0 + time.Sleep(3 * time.Second) + continue + } + count++ + if count >= 3 { + break + } + } + l.mx.Lock() + defer l.mx.Unlock() + if !l.isInFallbackState { + return + } + lastRpc := *l.currentRpc.Load() + l.currentRpc.Store(&l.firstRpc) + lastRpc.Close() + l.lastMinuteFail.Store(0) + l.currentIndex = 0 + l.isInFallbackState = false + if l.subscribeFunc != nil { + err := l.reSubscribeNewRpc(l.urlList[0]) + if err != nil { + l.log.Error("can not subscribe new url", "url", l.urlList[0], "err", err) + } + } + l.log.Info("recover the current rpc to the first rpc", "url", l.urlList[0]) + }() +} + +func (l *FallbackClient) RegisterSubscribeFunc(f func() (event.Subscription, error), l1HeadsSub *ethereum.Subscription) { + l.subscribeFunc = f + l.l1HeadsSub = l1HeadsSub +} + +func (l *FallbackClient) validateRpc(newRpc client.RPC) error { + chainID, err := l.ChainID(l.ctx, newRpc) + if err != nil { + return err + } + if l.l1ChainId.Cmp(chainID) != 0 { + return fmt.Errorf("incorrect L1 RPC chain id %d, expected %d", chainID, l.l1ChainId) + } + l1GenesisBlockRef, err := l.l1BlockRefByNumber(l.ctx, l.l1Block.Number, newRpc) + if err != nil { + return err + } + if l1GenesisBlockRef.Hash != l.l1Block.Hash { + return fmt.Errorf("incorrect L1 genesis block hash %s, expected %s", l1GenesisBlockRef.Hash, l.l1Block.Hash) + } + return nil +} + +func (l *FallbackClient) ChainID(ctx context.Context, rpc client.RPC) (*big.Int, error) { + var id hexutil.Big + err := rpc.CallContext(ctx, &id, "eth_chainId") + if err != nil { + return nil, err + } + return (*big.Int)(&id), nil +} + +func (l *FallbackClient) l1BlockRefByNumber(ctx context.Context, number uint64, newRpc client.RPC) (*rpcHeader, error) { + var header *rpcHeader + err := newRpc.CallContext(ctx, &header, "eth_getBlockByNumber", numberID(number).Arg(), false) // headers are just blocks without txs + if err != nil { + return nil, err + } + return header, nil +} + +func (l *FallbackClient) RegisterMetrics(metrics metrics.Metricer) { + l.metrics = metrics +} + +func (l *FallbackClient) GetCurrentIndex() int { + return l.currentIndex +} diff --git a/op-proposer/flags/flags.go b/op-proposer/flags/flags.go index 595932e6d2..6d8021fa51 100644 --- a/op-proposer/flags/flags.go +++ b/op-proposer/flags/flags.go @@ -20,7 +20,7 @@ var ( // Required Flags L1EthRpcFlag = cli.StringFlag{ Name: "l1-eth-rpc", - Usage: "HTTP provider URL for L1", + Usage: "HTTP provider URL for L1. Multiple alternative addresses are supported, separated by commas, and the first address is used by default", EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "L1_ETH_RPC"), } RollupRpcFlag = cli.StringFlag{ diff --git a/op-proposer/metrics/metrics.go b/op-proposer/metrics/metrics.go index df149a7aa8..0f54978242 100644 --- a/op-proposer/metrics/metrics.go +++ b/op-proposer/metrics/metrics.go @@ -3,10 +3,11 @@ package metrics import ( "context" + "github.com/ethereum/go-ethereum" + "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" "github.com/prometheus/client_golang/prometheus" @@ -80,7 +81,7 @@ func (m *Metrics) Serve(ctx context.Context, host string, port int) error { } func (m *Metrics) StartBalanceMetrics(ctx context.Context, - l log.Logger, client *ethclient.Client, account common.Address) { + l log.Logger, client ethereum.ChainStateReader, account common.Address) { opmetrics.LaunchBalanceMetrics(ctx, l, m.registry, m.ns, client, account) } diff --git a/op-proposer/metrics/noop.go b/op-proposer/metrics/noop.go index 14973bfd13..a353a5ff53 100644 --- a/op-proposer/metrics/noop.go +++ b/op-proposer/metrics/noop.go @@ -17,3 +17,6 @@ func (*noopMetrics) RecordInfo(version string) {} func (*noopMetrics) RecordUp() {} func (*noopMetrics) RecordL2BlocksProposed(l2ref eth.L2BlockRef) {} + +func (m *noopMetrics) RecordL1UrlSwitchEvt(url string) { +} diff --git a/op-proposer/proposer/config.go b/op-proposer/proposer/config.go index 59fb6a5ba7..55933e81bb 100644 --- a/op-proposer/proposer/config.go +++ b/op-proposer/proposer/config.go @@ -3,8 +3,9 @@ package proposer import ( "time" + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" "github.com/urfave/cli" "github.com/ethereum-optimism/optimism/op-node/sources" @@ -24,7 +25,7 @@ type Config struct { PollInterval time.Duration NetworkTimeout time.Duration TxManager txmgr.TxManager - L1Client *ethclient.Client + L1Client client.EthClient RollupClient *sources.RollupClient AllowNonFinalized bool } diff --git a/op-proposer/proposer/l2_output_submitter.go b/op-proposer/proposer/l2_output_submitter.go index e92da9a50f..89f6c06755 100644 --- a/op-proposer/proposer/l2_output_submitter.go +++ b/op-proposer/proposer/l2_output_submitter.go @@ -158,7 +158,7 @@ func NewL2OutputSubmitterConfigFromCLIConfig(cfg CLIConfig, l log.Logger, m metr // Connect to L1 and L2 providers. Perform these last since they are the most expensive. ctx := context.Background() - l1Client, err := opclient.DialEthClientWithTimeout(ctx, cfg.L1EthRpc, opclient.DefaultDialTimeout) + l1Client, err := opclient.DialEthClientWithTimeoutAndFallback(ctx, cfg.L1EthRpc, opclient.DefaultDialTimeout, l, opclient.ProposerFallbackThreshold, m) if err != nil { return nil, err } diff --git a/op-service/client/ethclient.go b/op-service/client/ethclient.go index c5949f67c4..e5b4d4db0e 100644 --- a/op-service/client/ethclient.go +++ b/op-service/client/ethclient.go @@ -2,6 +2,11 @@ package client import ( "context" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "math/big" "time" "github.com/ethereum/go-ethereum/ethclient" @@ -16,3 +21,50 @@ func DialEthClientWithTimeout(ctx context.Context, url string, timeout time.Dura return ethclient.DialContext(ctxt, url) } + +const BatcherFallbackThreshold int64 = 10 +const ProposerFallbackThreshold int64 = 3 +const TxmgrFallbackThreshold int64 = 3 + +// DialEthClientWithTimeoutAndFallback will try to dial within the timeout period and create an EthClient. +// If the URL is a multi URL, then a fallbackClient will be created to add the fallback capability to the client +func DialEthClientWithTimeoutAndFallback(ctx context.Context, url string, timeout time.Duration, l log.Logger, fallbackThreshold int64, m FallbackClientMetricer) (EthClient, error) { + ctxt, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + isMultiUrl, urlList := MultiUrlParse(url) + if isMultiUrl { + firstEthClient, err := ethclient.DialContext(ctxt, urlList[0]) + if err != nil { + return nil, err + } + fallbackClient := NewFallbackClient(firstEthClient, urlList, l, fallbackThreshold, m, func(url string) (EthClient, error) { + ctxtIn, cancelIn := context.WithTimeout(ctx, timeout) + defer cancelIn() + ethClientNew, err := ethclient.DialContext(ctxtIn, url) + if err != nil { + return nil, err + } + return ethClientNew, nil + }) + return fallbackClient, nil + } + + return ethclient.DialContext(ctxt, url) +} + +type EthClient interface { + ChainID(ctx context.Context) (*big.Int, error) + BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + StorageAt(ctx context.Context, account common.Address, key common.Hash, blockNumber *big.Int) ([]byte, error) + CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) + NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) + BlockNumber(ctx context.Context) (uint64, error) + TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) + SendTransaction(ctx context.Context, tx *types.Transaction) error + SuggestGasTipCap(ctx context.Context) (*big.Int, error) + PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) + EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) + CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) + Close() +} diff --git a/op-service/client/fallback_client.go b/op-service/client/fallback_client.go new file mode 100644 index 0000000000..1157bd2580 --- /dev/null +++ b/op-service/client/fallback_client.go @@ -0,0 +1,284 @@ +package client + +import ( + "context" + "errors" + "math/big" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" +) + +func MultiUrlParse(url string) (isMultiUrl bool, urlList []string) { + if strings.Contains(url, ",") { + return true, strings.Split(url, ",") + } + return false, []string{} +} + +type FallbackClientMetricer interface { + RecordL1UrlSwitchEvt(url string) +} + +type FallbackClientMetrics struct { + l1UrlSwitchEvt opmetrics.EventVec +} + +func (f *FallbackClientMetrics) RecordL1UrlSwitchEvt(url string) { + f.l1UrlSwitchEvt.Record(url) +} + +func NewFallbackClientMetrics(ns string, factory opmetrics.Factory) *FallbackClientMetrics { + return &FallbackClientMetrics{ + l1UrlSwitchEvt: opmetrics.NewEventVec(factory, ns, "", "l1_url_switch", "l1 url switch", []string{"url_idx"}), + } +} + +// FallbackClient is an EthClient, it can automatically switch to the next l1 endpoint +// when there is a problem with the current l1 endpoint +// and automatically switch back after the first l1 endpoint recovers. +type FallbackClient struct { + // firstClient is created by the first of the l1 urls, it should be used first in a healthy state + firstClient EthClient + urlList []string + clientInitFunc func(url string) (EthClient, error) + lastMinuteFail atomic.Int64 + currentClient atomic.Pointer[EthClient] + currentIndex int + mx sync.Mutex + log log.Logger + isInFallbackState bool + // fallbackThreshold specifies how many errors have occurred in the past 1 minute to trigger the switching logic + fallbackThreshold int64 + isClose chan struct{} + metrics FallbackClientMetricer +} + +// NewFallbackClient returns a new FallbackClient. +func NewFallbackClient(rpc EthClient, urlList []string, log log.Logger, fallbackThreshold int64, m FallbackClientMetricer, clientInitFunc func(url string) (EthClient, error)) EthClient { + fallbackClient := &FallbackClient{ + firstClient: rpc, + urlList: urlList, + log: log, + clientInitFunc: clientInitFunc, + currentIndex: 0, + fallbackThreshold: fallbackThreshold, + metrics: m, + isClose: make(chan struct{}), + } + fallbackClient.currentClient.Store(&rpc) + go func() { + ticker := time.NewTicker(1 * time.Minute) + for { + select { + case <-ticker.C: + log.Debug("FallbackClient clear lastMinuteFail 0") + fallbackClient.lastMinuteFail.Store(0) + case <-fallbackClient.isClose: + return + default: + if fallbackClient.lastMinuteFail.Load() >= fallbackClient.fallbackThreshold { + fallbackClient.switchCurrentClient() + } + } + time.Sleep(1 * time.Second) + } + }() + return fallbackClient +} + +func (l *FallbackClient) BlockNumber(ctx context.Context) (uint64, error) { + number, err := (*l.currentClient.Load()).BlockNumber(ctx) + if err != nil { + l.handleErr(err) + } + return number, err +} + +func (l *FallbackClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + receipt, err := (*l.currentClient.Load()).TransactionReceipt(ctx, txHash) + if err != nil { + l.handleErr(err) + } + return receipt, err +} + +func (l *FallbackClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { + err := (*l.currentClient.Load()).SendTransaction(ctx, tx) + if err != nil { + l.handleErr(err) + } + return err +} + +func (l *FallbackClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { + tipCap, err := (*l.currentClient.Load()).SuggestGasTipCap(ctx) + if err != nil { + l.handleErr(err) + } + return tipCap, err +} + +func (l *FallbackClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { + at, err := (*l.currentClient.Load()).PendingNonceAt(ctx, account) + if err != nil { + l.handleErr(err) + } + return at, err +} + +func (l *FallbackClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { + estimateGas, err := (*l.currentClient.Load()).EstimateGas(ctx, msg) + if err != nil { + l.handleErr(err) + } + return estimateGas, err +} + +func (l *FallbackClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { + contract, err := (*l.currentClient.Load()).CallContract(ctx, call, blockNumber) + if err != nil { + l.handleErr(err) + } + return contract, err +} + +func (l *FallbackClient) Close() { + l.mx.Lock() + defer l.mx.Unlock() + l.isClose <- struct{}{} + currentClient := *l.currentClient.Load() + currentClient.Close() + if currentClient != l.firstClient { + l.firstClient.Close() + } +} + +func (l *FallbackClient) ChainID(ctx context.Context) (*big.Int, error) { + id, err := (*l.currentClient.Load()).ChainID(ctx) + if err != nil { + l.handleErr(err) + } + return id, err +} + +func (l *FallbackClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + balanceAt, err := (*l.currentClient.Load()).BalanceAt(ctx, account, blockNumber) + if err != nil { + l.handleErr(err) + } + return balanceAt, err +} + +func (l *FallbackClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + headerByNumber, err := (*l.currentClient.Load()).HeaderByNumber(ctx, number) + if err != nil { + l.handleErr(err) + } + return headerByNumber, err +} + +func (l *FallbackClient) StorageAt(ctx context.Context, account common.Address, key common.Hash, blockNumber *big.Int) ([]byte, error) { + storageAt, err := (*l.currentClient.Load()).StorageAt(ctx, account, key, blockNumber) + if err != nil { + l.handleErr(err) + } + return storageAt, err +} + +func (l *FallbackClient) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { + codeAt, err := (*l.currentClient.Load()).CodeAt(ctx, account, blockNumber) + if err != nil { + l.handleErr(err) + } + return codeAt, err +} + +func (l *FallbackClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + nonceAt, err := (*l.currentClient.Load()).NonceAt(ctx, account, blockNumber) + if err != nil { + l.handleErr(err) + } + return nonceAt, err +} + +func (l *FallbackClient) handleErr(err error) { + if errors.Is(err, rpc.ErrNoResult) { + return + } + var targetErr rpc.Error + if errors.As(err, &targetErr) { + return + } + l.lastMinuteFail.Add(1) +} + +func (l *FallbackClient) switchCurrentClient() { + l.mx.Lock() + defer l.mx.Unlock() + if l.lastMinuteFail.Load() <= l.fallbackThreshold { + return + } + l.currentIndex++ + if l.currentIndex >= len(l.urlList) { + l.log.Error("the fallback client has tried all urls") + return + } + l.metrics.RecordL1UrlSwitchEvt(strconv.Itoa(l.currentIndex)) + url := l.urlList[l.currentIndex] + newClient, err := l.clientInitFunc(url) + if err != nil { + l.log.Error("the fallback client failed to switch the current client", "url", url, "err", err) + return + } + lastClient := *l.currentClient.Load() + l.currentClient.Store(&newClient) + if lastClient != l.firstClient { + lastClient.Close() + } + l.lastMinuteFail.Store(0) + l.log.Info("switched current rpc to new url", "url", url) + if !l.isInFallbackState { + l.isInFallbackState = true + l.recoverIfFirstRpcHealth() + } +} + +func (l *FallbackClient) recoverIfFirstRpcHealth() { + go func() { + count := 0 + for { + _, err := l.firstClient.ChainID(context.Background()) + if err != nil { + count = 0 + time.Sleep(3 * time.Second) + continue + } + count++ + if count >= 3 { + break + } + } + l.mx.Lock() + defer l.mx.Unlock() + if !l.isInFallbackState { + return + } + lastClient := *l.currentClient.Load() + l.currentClient.Store(&l.firstClient) + lastClient.Close() + l.lastMinuteFail.Store(0) + l.currentIndex = 0 + l.isInFallbackState = false + l.log.Info("recover the current client to the first client", "url", l.urlList[0]) + }() +} diff --git a/op-service/metrics/balance.go b/op-service/metrics/balance.go index de9066846c..a219679216 100644 --- a/op-service/metrics/balance.go +++ b/op-service/metrics/balance.go @@ -2,11 +2,11 @@ package metrics import ( "context" + "github.com/ethereum/go-ethereum" "math/big" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/prometheus/client_golang/prometheus" @@ -25,7 +25,7 @@ func weiToEther(wei *big.Int) float64 { // LaunchBalanceMetrics fires off a go rountine that queries the balance of the supplied account & periodically records it // to the balance metric of the namespace. The balance of the account is recorded in Ether (not Wei). // Cancel the supplied context to shut down the go routine -func LaunchBalanceMetrics(ctx context.Context, log log.Logger, r *prometheus.Registry, ns string, client *ethclient.Client, account common.Address) { +func LaunchBalanceMetrics(ctx context.Context, log log.Logger, r *prometheus.Registry, ns string, client ethereum.ChainStateReader, account common.Address) { go func() { balanceGuage := promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: ns, diff --git a/op-service/txmgr/cli.go b/op-service/txmgr/cli.go index 61173b6fc8..34cfd981dd 100644 --- a/op-service/txmgr/cli.go +++ b/op-service/txmgr/cli.go @@ -4,14 +4,15 @@ import ( "context" "errors" "fmt" + txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics" "math/big" "time" opservice "github.com/ethereum-optimism/optimism/op-service" + service_client "github.com/ethereum-optimism/optimism/op-service/client" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" "github.com/ethereum-optimism/optimism/op-signer/client" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli" ) @@ -174,14 +175,14 @@ func ReadCLIConfig(ctx *cli.Context) CLIConfig { } } -func NewConfig(cfg CLIConfig, l log.Logger) (Config, error) { +func NewConfig(cfg CLIConfig, l log.Logger, m txmetrics.TxMetricer) (Config, error) { if err := cfg.Check(); err != nil { return Config{}, fmt.Errorf("invalid config: %w", err) } ctx, cancel := context.WithTimeout(context.Background(), cfg.NetworkTimeout) defer cancel() - l1, err := ethclient.DialContext(ctx, cfg.L1RPCURL) + l1, err := service_client.DialEthClientWithTimeoutAndFallback(ctx, cfg.L1RPCURL, service_client.DefaultDialTimeout, l, service_client.TxmgrFallbackThreshold, m) if err != nil { return Config{}, fmt.Errorf("could not dial eth client: %w", err) } diff --git a/op-service/txmgr/metrics/noop.go b/op-service/txmgr/metrics/noop.go index 36dd32b91d..82a1642afd 100644 --- a/op-service/txmgr/metrics/noop.go +++ b/op-service/txmgr/metrics/noop.go @@ -11,3 +11,4 @@ func (*NoopTxMetrics) RecordTxConfirmationLatency(int64) {} func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {} func (*NoopTxMetrics) TxPublished(string) {} func (*NoopTxMetrics) RPCError() {} +func (m *NoopTxMetrics) RecordL1UrlSwitchEvt(url string) {} diff --git a/op-service/txmgr/metrics/tx_metrics.go b/op-service/txmgr/metrics/tx_metrics.go index 171dfcd5dd..bd77135222 100644 --- a/op-service/txmgr/metrics/tx_metrics.go +++ b/op-service/txmgr/metrics/tx_metrics.go @@ -1,6 +1,7 @@ package metrics import ( + "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" @@ -16,6 +17,7 @@ type TxMetricer interface { TxConfirmed(*types.Receipt) TxPublished(string) RPCError() + client.FallbackClientMetricer } type TxMetrics struct { @@ -30,6 +32,7 @@ type TxMetrics struct { publishEvent metrics.Event confirmEvent metrics.EventVec rpcError prometheus.Counter + *client.FallbackClientMetrics } func receiptStatusString(receipt *types.Receipt) string { @@ -104,6 +107,7 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics { Help: "Temporary: Count of RPC errors (like timeouts) that have occurred", Subsystem: "txmgr", }), + FallbackClientMetrics: client.NewFallbackClientMetrics(ns, factory), } } diff --git a/op-service/txmgr/txmgr.go b/op-service/txmgr/txmgr.go index 7301795583..1e18b9c6e3 100644 --- a/op-service/txmgr/txmgr.go +++ b/op-service/txmgr/txmgr.go @@ -94,7 +94,7 @@ type SimpleTxManager struct { // NewSimpleTxManager initializes a new SimpleTxManager with the passed Config. func NewSimpleTxManager(name string, l log.Logger, m metrics.TxMetricer, cfg CLIConfig) (*SimpleTxManager, error) { - conf, err := NewConfig(cfg, l) + conf, err := NewConfig(cfg, l, m) if err != nil { return nil, err } From 965e3a1a19fad00ed59f1ad1139ee0f815907072 Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Tue, 17 Oct 2023 09:56:00 +0800 Subject: [PATCH 2/2] bugfix(op-node): syncClient incorrectly removes peer issue (#50) * bugfix: only when no connection is available, we can remove the peer from syncClient * add unit test to prove effectiveness of the fixing code * use EventBus replace of notify * remove * change to old version of solution which just adds a condition to disconnected event --------- Co-authored-by: Welkin --- op-node/p2p/node.go | 5 +++- op-node/p2p/sync_test.go | 54 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/op-node/p2p/node.go b/op-node/p2p/node.go index 0d8f3f961c..2ef9de8165 100644 --- a/op-node/p2p/node.go +++ b/op-node/p2p/node.go @@ -97,7 +97,10 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l n.syncCl.AddPeer(conn.RemotePeer()) }, DisconnectedF: func(nw network.Network, conn network.Conn) { - n.syncCl.RemovePeer(conn.RemotePeer()) + // only when no connection is available, we can remove the peer + if nw.Connectedness(conn.RemotePeer()) == network.NotConnected { + n.syncCl.RemovePeer(conn.RemotePeer()) + } }, }) n.syncCl.Start() diff --git a/op-node/p2p/sync_test.go b/op-node/p2p/sync_test.go index a11bf91681..1f8995f97e 100644 --- a/op-node/p2p/sync_test.go +++ b/op-node/p2p/sync_test.go @@ -288,3 +288,57 @@ func TestMultiPeerSync(t *testing.T) { require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload") } } + +func TestNetworkNotifyAddPeerAndRemovePeer(t *testing.T) { + t.Parallel() + log := testlog.Logger(t, log.LvlDebug) + + cfg, _ := setupSyncTestData(25) + + confA := TestingConfig(t) + confB := TestingConfig(t) + hostA, err := confA.Host(log.New("host", "A"), nil, metrics.NoopMetrics) + require.NoError(t, err, "failed to launch host A") + defer hostA.Close() + hostB, err := confB.Host(log.New("host", "B"), nil, metrics.NoopMetrics) + require.NoError(t, err, "failed to launch host B") + defer hostB.Close() + + syncCl := NewSyncClient(log, cfg, hostA.NewStream, func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error { + return nil + }, metrics.NoopMetrics) + + waitChan := make(chan struct{}, 1) + hostA.Network().Notify(&network.NotifyBundle{ + ConnectedF: func(nw network.Network, conn network.Conn) { + syncCl.AddPeer(conn.RemotePeer()) + waitChan <- struct{}{} + }, + DisconnectedF: func(nw network.Network, conn network.Conn) { + // only when no connection is available, we can remove the peer + if nw.Connectedness(conn.RemotePeer()) == network.NotConnected { + syncCl.RemovePeer(conn.RemotePeer()) + } + waitChan <- struct{}{} + }, + }) + syncCl.Start() + + err = hostA.Connect(context.Background(), peer.AddrInfo{ID: hostB.ID(), Addrs: hostB.Addrs()}) + require.NoError(t, err, "failed to connect to peer B from peer A") + require.Equal(t, hostA.Network().Connectedness(hostB.ID()), network.Connected) + + //wait for async add process done + <-waitChan + _, ok := syncCl.peers[hostB.ID()] + require.True(t, ok, "peerB should exist in syncClient") + + err = hostA.Network().ClosePeer(hostB.ID()) + require.NoError(t, err, "close peer fail") + + //wait for async removing process done + <-waitChan + _, peerBExist3 := syncCl.peers[hostB.ID()] + require.True(t, !peerBExist3, "peerB should not exist in syncClient") + +}