From 1138ab3fcda2a4907be205de96dea3a3bdfe6e8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Tue, 15 Oct 2024 18:56:36 +0200 Subject: [PATCH] app/eth2wrap: fallback beacon nodes Implement a way to provide eth2wrap with two classes of beacon nodes addresses: standard and fallback beacon nodes. When one of the multi BN calls fails, eth2wrap wrappers will try to get an available fallback BN from a list and re-do the call on that. If no fallback BNs is specified, return the original error. If the fallback BN call fails, return fallback error instead. This PR firstly introduces concepts and code, will introduce CLI parameters and initialization code later. --- app/eth2wrap/eth2wrap.go | 98 ++-- app/eth2wrap/eth2wrap_gen.go | 660 +++++++++++++++++++++---- app/eth2wrap/fallback.go | 56 +++ app/eth2wrap/fallback_internal_test.go | 49 ++ app/eth2wrap/genwrap/genwrap.go | 20 +- app/eth2wrap/multi.go | 130 ++++- app/eth2wrap/multi_test.go | 10 +- 7 files changed, 859 insertions(+), 164 deletions(-) create mode 100644 app/eth2wrap/fallback.go create mode 100644 app/eth2wrap/fallback_internal_test.go diff --git a/app/eth2wrap/eth2wrap.go b/app/eth2wrap/eth2wrap.go index a8e198f88..a078e5937 100644 --- a/app/eth2wrap/eth2wrap.go +++ b/app/eth2wrap/eth2wrap.go @@ -57,7 +57,21 @@ func Instrument(clients ...Client) (Client, error) { return nil, errors.New("clients empty") } - return newMulti(clients), nil + // TODO(gsora): remove once the implementation is agreed upon and + // wiring is complete. + fb := NewFallbackClient(0, [4]byte{}, nil) + + return newMulti(clients, fb), nil +} + +// InstrumentWithFallback returns a new multi instrumented client using the provided clients as backends and fallback +// respectively. +func InstrumentWithFallback(fallback *FallbackClient, clients ...Client) (Client, error) { + if len(clients) == 0 { + return nil, errors.New("clients empty") + } + + return newMulti(clients, fallback), nil } // WithSyntheticDuties wraps the provided client adding synthetic duties. @@ -71,43 +85,58 @@ func WithSyntheticDuties(cl Client) Client { // NewMultiHTTP returns a new instrumented multi eth2 http client. func NewMultiHTTP(timeout time.Duration, forkVersion [4]byte, addresses ...string) (Client, error) { + return Instrument(newClients(timeout, forkVersion, addresses)...) +} + +// newClients returns a slice of Client initialized with the provided settings. +func newClients(timeout time.Duration, forkVersion [4]byte, addresses []string) []Client { var clients []Client for _, address := range addresses { - parameters := []eth2http.Parameter{ - eth2http.WithLogLevel(zeroLogInfo), - eth2http.WithAddress(address), - eth2http.WithTimeout(timeout), - eth2http.WithAllowDelayedStart(true), - eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)), - } + clients = append(clients, newBeaconClient(timeout, forkVersion, address)) + } - cl := newLazy(func(ctx context.Context) (Client, error) { - eth2Svc, err := eth2http.New(ctx, parameters...) - if err != nil { - return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address)) - } - eth2Http, ok := eth2Svc.(*eth2http.Service) - if !ok { - return nil, errors.New("invalid eth2 http service") - } + return clients +} - adaptedCl := AdaptEth2HTTP(eth2Http, timeout) - adaptedCl.SetForkVersion(forkVersion) +// newBeaconClient returns a Client with the provided settings. +func newBeaconClient(timeout time.Duration, forkVersion [4]byte, address string) Client { + parameters := []eth2http.Parameter{ + eth2http.WithLogLevel(zeroLogInfo), + eth2http.WithAddress(address), + eth2http.WithTimeout(timeout), + eth2http.WithAllowDelayedStart(true), + eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)), + } - return adaptedCl, nil - }) + cl := newLazy(func(ctx context.Context) (Client, error) { + eth2Svc, err := eth2http.New(ctx, parameters...) + if err != nil { + return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address)) + } + eth2Http, ok := eth2Svc.(*eth2http.Service) + if !ok { + return nil, errors.New("invalid eth2 http service") + } - clients = append(clients, cl) - } + adaptedCl := AdaptEth2HTTP(eth2Http, timeout) + adaptedCl.SetForkVersion(forkVersion) + + return adaptedCl, nil + }) - return Instrument(clients...) + return cl +} + +type provideArgs struct { + client Client + fallback *FallbackClient } // provide calls the work function with each client in parallel, returning the // first successful result or first error. // The bestIdxFunc is called with the index of the client returning a successful response. -func provide[O any](ctx context.Context, clients []Client, - work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestSelector *bestSelector, +func provide[O any](ctx context.Context, clients []Client, fallback *FallbackClient, + work forkjoin.Work[provideArgs, O], isSuccessFunc func(O) bool, bestSelector *bestSelector, ) (O, error) { if isSuccessFunc == nil { isSuccessFunc = func(O) bool { return true } @@ -118,12 +147,15 @@ func provide[O any](ctx context.Context, clients []Client, forkjoin.WithWorkers(len(clients)), ) for _, client := range clients { - fork(client) + fork(provideArgs{ + client: client, + fallback: fallback, + }) } defer cancel() var ( - nokResp forkjoin.Result[Client, O] + nokResp forkjoin.Result[provideArgs, O] hasNokResp bool zero O ) @@ -132,7 +164,7 @@ func provide[O any](ctx context.Context, clients []Client, return zero, ctx.Err() } else if res.Err == nil && isSuccessFunc(res.Output) { if bestSelector != nil { - bestSelector.Increment(res.Input.Address()) + bestSelector.Increment(res.Input.client.Address()) } return res.Output, nil @@ -154,10 +186,10 @@ func provide[O any](ctx context.Context, clients []Client, type empty struct{} // submit proxies provide, but returns nil instead of a successful result. -func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error, selector *bestSelector) error { - _, err := provide(ctx, clients, - func(ctx context.Context, cl Client) (empty, error) { - return empty{}, work(ctx, cl) +func submit(ctx context.Context, clients []Client, fallback *FallbackClient, work func(context.Context, provideArgs) error, selector *bestSelector) error { + _, err := provide(ctx, clients, fallback, + func(ctx context.Context, args provideArgs) (empty, error) { + return empty{}, work(ctx, args) }, nil, selector, ) diff --git a/app/eth2wrap/eth2wrap_gen.go b/app/eth2wrap/eth2wrap_gen.go index 1f71504cd..c859187fc 100644 --- a/app/eth2wrap/eth2wrap_gen.go +++ b/app/eth2wrap/eth2wrap_gen.go @@ -72,9 +72,23 @@ type Client interface { func (m multi) SlotDuration(ctx context.Context) (time.Duration, error) { const label = "slot_duration" - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (time.Duration, error) { - return cl.SlotDuration(ctx) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (time.Duration, error) { + res0, err := args.client.SlotDuration(ctx) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.SlotDuration(ctx) + } + + return res0, err }, nil, m.selector, ) @@ -94,9 +108,23 @@ func (m multi) SlotDuration(ctx context.Context) (time.Duration, error) { func (m multi) SlotsPerEpoch(ctx context.Context) (uint64, error) { const label = "slots_per_epoch" - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (uint64, error) { - return cl.SlotsPerEpoch(ctx) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (uint64, error) { + res0, err := args.client.SlotsPerEpoch(ctx) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.SlotsPerEpoch(ctx) + } + + return res0, err }, nil, m.selector, ) @@ -114,9 +142,23 @@ func (m multi) SignedBeaconBlock(ctx context.Context, opts *api.SignedBeaconBloc const label = "signed_beacon_block" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[*spec.VersionedSignedBeaconBlock], error) { - return cl.SignedBeaconBlock(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[*spec.VersionedSignedBeaconBlock], error) { + res0, err := args.client.SignedBeaconBlock(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.SignedBeaconBlock(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -134,9 +176,23 @@ func (m multi) AggregateAttestation(ctx context.Context, opts *api.AggregateAtte const label = "aggregate_attestation" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[*phase0.Attestation], error) { - return cl.AggregateAttestation(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[*phase0.Attestation], error) { + res0, err := args.client.AggregateAttestation(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.AggregateAttestation(ctx, opts) + } + + return res0, err }, isAggregateAttestationOk, m.selector, ) @@ -154,9 +210,23 @@ func (m multi) SubmitAggregateAttestations(ctx context.Context, aggregateAndProo const label = "submit_aggregate_attestations" defer latency(label)() - err := submit(ctx, m.clients, - func(ctx context.Context, cl Client) error { - return cl.SubmitAggregateAttestations(ctx, aggregateAndProofs) + err := submit(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) error { + err := args.client.SubmitAggregateAttestations(ctx, aggregateAndProofs) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return err + } + + defer args.fallback.place() + + return fe.SubmitAggregateAttestations(ctx, aggregateAndProofs) + } + + return err }, m.selector, ) @@ -174,9 +244,23 @@ func (m multi) AttestationData(ctx context.Context, opts *api.AttestationDataOpt const label = "attestation_data" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[*phase0.AttestationData], error) { - return cl.AttestationData(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[*phase0.AttestationData], error) { + res0, err := args.client.AttestationData(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.AttestationData(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -194,9 +278,23 @@ func (m multi) SubmitAttestations(ctx context.Context, attestations []*phase0.At const label = "submit_attestations" defer latency(label)() - err := submit(ctx, m.clients, - func(ctx context.Context, cl Client) error { - return cl.SubmitAttestations(ctx, attestations) + err := submit(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) error { + err := args.client.SubmitAttestations(ctx, attestations) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return err + } + + defer args.fallback.place() + + return fe.SubmitAttestations(ctx, attestations) + } + + return err }, m.selector, ) @@ -214,9 +312,23 @@ func (m multi) AttesterDuties(ctx context.Context, opts *api.AttesterDutiesOpts) const label = "attester_duties" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[[]*apiv1.AttesterDuty], error) { - return cl.AttesterDuties(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[[]*apiv1.AttesterDuty], error) { + res0, err := args.client.AttesterDuties(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.AttesterDuties(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -234,9 +346,23 @@ func (m multi) AttesterDuties(ctx context.Context, opts *api.AttesterDutiesOpts) func (m multi) DepositContract(ctx context.Context, opts *api.DepositContractOpts) (*api.Response[*apiv1.DepositContract], error) { const label = "deposit_contract" - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[*apiv1.DepositContract], error) { - return cl.DepositContract(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[*apiv1.DepositContract], error) { + res0, err := args.client.DepositContract(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.DepositContract(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -255,9 +381,23 @@ func (m multi) SyncCommitteeDuties(ctx context.Context, opts *api.SyncCommitteeD const label = "sync_committee_duties" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[[]*apiv1.SyncCommitteeDuty], error) { - return cl.SyncCommitteeDuties(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[[]*apiv1.SyncCommitteeDuty], error) { + res0, err := args.client.SyncCommitteeDuties(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.SyncCommitteeDuties(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -275,9 +415,23 @@ func (m multi) SubmitSyncCommitteeMessages(ctx context.Context, messages []*alta const label = "submit_sync_committee_messages" defer latency(label)() - err := submit(ctx, m.clients, - func(ctx context.Context, cl Client) error { - return cl.SubmitSyncCommitteeMessages(ctx, messages) + err := submit(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) error { + err := args.client.SubmitSyncCommitteeMessages(ctx, messages) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return err + } + + defer args.fallback.place() + + return fe.SubmitSyncCommitteeMessages(ctx, messages) + } + + return err }, m.selector, ) @@ -295,9 +449,23 @@ func (m multi) SubmitSyncCommitteeSubscriptions(ctx context.Context, subscriptio const label = "submit_sync_committee_subscriptions" defer latency(label)() - err := submit(ctx, m.clients, - func(ctx context.Context, cl Client) error { - return cl.SubmitSyncCommitteeSubscriptions(ctx, subscriptions) + err := submit(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) error { + err := args.client.SubmitSyncCommitteeSubscriptions(ctx, subscriptions) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return err + } + + defer args.fallback.place() + + return fe.SubmitSyncCommitteeSubscriptions(ctx, subscriptions) + } + + return err }, m.selector, ) @@ -315,9 +483,23 @@ func (m multi) SyncCommitteeContribution(ctx context.Context, opts *api.SyncComm const label = "sync_committee_contribution" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[*altair.SyncCommitteeContribution], error) { - return cl.SyncCommitteeContribution(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[*altair.SyncCommitteeContribution], error) { + res0, err := args.client.SyncCommitteeContribution(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.SyncCommitteeContribution(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -335,9 +517,23 @@ func (m multi) SubmitSyncCommitteeContributions(ctx context.Context, contributio const label = "submit_sync_committee_contributions" defer latency(label)() - err := submit(ctx, m.clients, - func(ctx context.Context, cl Client) error { - return cl.SubmitSyncCommitteeContributions(ctx, contributionAndProofs) + err := submit(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) error { + err := args.client.SubmitSyncCommitteeContributions(ctx, contributionAndProofs) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return err + } + + defer args.fallback.place() + + return fe.SubmitSyncCommitteeContributions(ctx, contributionAndProofs) + } + + return err }, m.selector, ) @@ -355,9 +551,23 @@ func (m multi) Proposal(ctx context.Context, opts *api.ProposalOpts) (*api.Respo const label = "proposal" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[*api.VersionedProposal], error) { - return cl.Proposal(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[*api.VersionedProposal], error) { + res0, err := args.client.Proposal(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.Proposal(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -375,9 +585,23 @@ func (m multi) Proposal(ctx context.Context, opts *api.ProposalOpts) (*api.Respo func (m multi) BeaconBlockRoot(ctx context.Context, opts *api.BeaconBlockRootOpts) (*api.Response[*phase0.Root], error) { const label = "beacon_block_root" - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[*phase0.Root], error) { - return cl.BeaconBlockRoot(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[*phase0.Root], error) { + res0, err := args.client.BeaconBlockRoot(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.BeaconBlockRoot(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -395,9 +619,23 @@ func (m multi) SubmitProposal(ctx context.Context, opts *api.SubmitProposalOpts) const label = "submit_proposal" defer latency(label)() - err := submit(ctx, m.clients, - func(ctx context.Context, cl Client) error { - return cl.SubmitProposal(ctx, opts) + err := submit(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) error { + err := args.client.SubmitProposal(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return err + } + + defer args.fallback.place() + + return fe.SubmitProposal(ctx, opts) + } + + return err }, m.selector, ) @@ -415,9 +653,23 @@ func (m multi) SubmitBeaconCommitteeSubscriptions(ctx context.Context, subscript const label = "submit_beacon_committee_subscriptions" defer latency(label)() - err := submit(ctx, m.clients, - func(ctx context.Context, cl Client) error { - return cl.SubmitBeaconCommitteeSubscriptions(ctx, subscriptions) + err := submit(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) error { + err := args.client.SubmitBeaconCommitteeSubscriptions(ctx, subscriptions) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return err + } + + defer args.fallback.place() + + return fe.SubmitBeaconCommitteeSubscriptions(ctx, subscriptions) + } + + return err }, m.selector, ) @@ -435,9 +687,23 @@ func (m multi) SubmitBlindedProposal(ctx context.Context, opts *api.SubmitBlinde const label = "submit_blinded_proposal" defer latency(label)() - err := submit(ctx, m.clients, - func(ctx context.Context, cl Client) error { - return cl.SubmitBlindedProposal(ctx, opts) + err := submit(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) error { + err := args.client.SubmitBlindedProposal(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return err + } + + defer args.fallback.place() + + return fe.SubmitBlindedProposal(ctx, opts) + } + + return err }, m.selector, ) @@ -455,9 +721,23 @@ func (m multi) SubmitValidatorRegistrations(ctx context.Context, registrations [ const label = "submit_validator_registrations" defer latency(label)() - err := submit(ctx, m.clients, - func(ctx context.Context, cl Client) error { - return cl.SubmitValidatorRegistrations(ctx, registrations) + err := submit(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) error { + err := args.client.SubmitValidatorRegistrations(ctx, registrations) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return err + } + + defer args.fallback.place() + + return fe.SubmitValidatorRegistrations(ctx, registrations) + } + + return err }, m.selector, ) @@ -475,9 +755,23 @@ func (m multi) Fork(ctx context.Context, opts *api.ForkOpts) (*api.Response[*pha const label = "fork" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[*phase0.Fork], error) { - return cl.Fork(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[*phase0.Fork], error) { + res0, err := args.client.Fork(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.Fork(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -495,9 +789,23 @@ func (m multi) ForkSchedule(ctx context.Context, opts *api.ForkScheduleOpts) (*a const label = "fork_schedule" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[[]*phase0.Fork], error) { - return cl.ForkSchedule(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[[]*phase0.Fork], error) { + res0, err := args.client.ForkSchedule(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.ForkSchedule(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -515,9 +823,23 @@ func (m multi) ForkSchedule(ctx context.Context, opts *api.ForkScheduleOpts) (*a func (m multi) Genesis(ctx context.Context, opts *api.GenesisOpts) (*api.Response[*apiv1.Genesis], error) { const label = "genesis" - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[*apiv1.Genesis], error) { - return cl.Genesis(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[*apiv1.Genesis], error) { + res0, err := args.client.Genesis(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.Genesis(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -535,9 +857,23 @@ func (m multi) NodeSyncing(ctx context.Context, opts *api.NodeSyncingOpts) (*api const label = "node_syncing" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[*apiv1.SyncState], error) { - return cl.NodeSyncing(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[*apiv1.SyncState], error) { + res0, err := args.client.NodeSyncing(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.NodeSyncing(ctx, opts) + } + + return res0, err }, isSyncStateOk, m.selector, ) @@ -555,9 +891,23 @@ func (m multi) NodeSyncing(ctx context.Context, opts *api.NodeSyncingOpts) (*api func (m multi) NodeVersion(ctx context.Context, opts *api.NodeVersionOpts) (*api.Response[string], error) { const label = "node_version" - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[string], error) { - return cl.NodeVersion(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[string], error) { + res0, err := args.client.NodeVersion(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.NodeVersion(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -576,9 +926,23 @@ func (m multi) NodeVersion(ctx context.Context, opts *api.NodeVersionOpts) (*api func (m multi) SubmitProposalPreparations(ctx context.Context, preparations []*apiv1.ProposalPreparation) error { const label = "submit_proposal_preparations" - err := submit(ctx, m.clients, - func(ctx context.Context, cl Client) error { - return cl.SubmitProposalPreparations(ctx, preparations) + err := submit(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) error { + err := args.client.SubmitProposalPreparations(ctx, preparations) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return err + } + + defer args.fallback.place() + + return fe.SubmitProposalPreparations(ctx, preparations) + } + + return err }, m.selector, ) @@ -596,9 +960,23 @@ func (m multi) ProposerDuties(ctx context.Context, opts *api.ProposerDutiesOpts) const label = "proposer_duties" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[[]*apiv1.ProposerDuty], error) { - return cl.ProposerDuties(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[[]*apiv1.ProposerDuty], error) { + res0, err := args.client.ProposerDuties(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.ProposerDuties(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -616,9 +994,23 @@ func (m multi) ProposerDuties(ctx context.Context, opts *api.ProposerDutiesOpts) func (m multi) Spec(ctx context.Context, opts *api.SpecOpts) (*api.Response[map[string]any], error) { const label = "spec" - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[map[string]any], error) { - return cl.Spec(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[map[string]any], error) { + res0, err := args.client.Spec(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.Spec(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -636,9 +1028,23 @@ func (m multi) Validators(ctx context.Context, opts *api.ValidatorsOpts) (*api.R const label = "validators" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*api.Response[map[phase0.ValidatorIndex]*apiv1.Validator], error) { - return cl.Validators(ctx, opts) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*api.Response[map[phase0.ValidatorIndex]*apiv1.Validator], error) { + res0, err := args.client.Validators(ctx, opts) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.Validators(ctx, opts) + } + + return res0, err }, nil, m.selector, ) @@ -656,9 +1062,23 @@ func (m multi) SubmitVoluntaryExit(ctx context.Context, voluntaryExit *phase0.Si const label = "submit_voluntary_exit" defer latency(label)() - err := submit(ctx, m.clients, - func(ctx context.Context, cl Client) error { - return cl.SubmitVoluntaryExit(ctx, voluntaryExit) + err := submit(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) error { + err := args.client.SubmitVoluntaryExit(ctx, voluntaryExit) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return err + } + + defer args.fallback.place() + + return fe.SubmitVoluntaryExit(ctx, voluntaryExit) + } + + return err }, m.selector, ) @@ -676,9 +1096,23 @@ func (m multi) SubmitVoluntaryExit(ctx context.Context, voluntaryExit *phase0.Si func (m multi) Domain(ctx context.Context, domainType phase0.DomainType, epoch phase0.Epoch) (phase0.Domain, error) { const label = "domain" - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (phase0.Domain, error) { - return cl.Domain(ctx, domainType, epoch) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (phase0.Domain, error) { + res0, err := args.client.Domain(ctx, domainType, epoch) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.Domain(ctx, domainType, epoch) + } + + return res0, err }, nil, m.selector, ) @@ -699,9 +1133,23 @@ func (m multi) Domain(ctx context.Context, domainType phase0.DomainType, epoch p func (m multi) GenesisDomain(ctx context.Context, domainType phase0.DomainType) (phase0.Domain, error) { const label = "genesis_domain" - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (phase0.Domain, error) { - return cl.GenesisDomain(ctx, domainType) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (phase0.Domain, error) { + res0, err := args.client.GenesisDomain(ctx, domainType) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.GenesisDomain(ctx, domainType) + } + + return res0, err }, nil, m.selector, ) @@ -719,9 +1167,23 @@ func (m multi) GenesisDomain(ctx context.Context, domainType phase0.DomainType) func (m multi) GenesisTime(ctx context.Context) (time.Time, error) { const label = "genesis_time" - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (time.Time, error) { - return cl.GenesisTime(ctx) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (time.Time, error) { + res0, err := args.client.GenesisTime(ctx) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return res0, err + } + + defer args.fallback.place() + + return fe.GenesisTime(ctx) + } + + return res0, err }, nil, m.selector, ) diff --git a/app/eth2wrap/fallback.go b/app/eth2wrap/fallback.go new file mode 100644 index 000000000..cab1c1bf3 --- /dev/null +++ b/app/eth2wrap/fallback.go @@ -0,0 +1,56 @@ +// Copyright © 2022-2024 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package eth2wrap + +import ( + "sync" + "time" + + "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/z" +) + +// FallbackClient holds a list of initialized Clients to be used when main client calls +// return errors. +type FallbackClient struct { + clients []Client + next int + + lock sync.Mutex +} + +// NewFallbackClient initializes a FallbackClient with the provided settings +func NewFallbackClient(timeout time.Duration, forkVersion [4]byte, addresses []string) *FallbackClient { + return &FallbackClient{ + clients: newClients(timeout, forkVersion, addresses), + } +} + +// pick returns an available fallback client. +// If no clients are available, it'll return an error. +func (f *FallbackClient) pick() (Client, error) { + f.lock.Lock() + defer f.lock.Unlock() + + if f.next >= len(f.clients) { + return nil, errors.New("all fallback clients have been taken", z.Int("total", len(f.clients))) + } + + ret := f.clients[f.next] + f.next++ + + return ret, nil +} + +// place returns a client back to the fallback client list. +// Callers must not re-use a client previously taken through pick() after this function has been called. +func (f *FallbackClient) place() { + f.lock.Lock() + defer f.lock.Unlock() + + if len(f.clients) == 0 { + return // no clients initialized, no need to place anything + } + + f.next-- +} diff --git a/app/eth2wrap/fallback_internal_test.go b/app/eth2wrap/fallback_internal_test.go new file mode 100644 index 000000000..811d8f72e --- /dev/null +++ b/app/eth2wrap/fallback_internal_test.go @@ -0,0 +1,49 @@ +// Copyright © 2022-2024 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package eth2wrap + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_fallbackClient(t *testing.T) { + // 4 clients + f := NewFallbackClient(1*time.Second, [4]byte{1, 2, 3, 4}, []string{ + "https://google.com", + "https://x.com", + "https://google.com", + "https://x.com", + }) + + // pick four + for range 4 { + _, err := f.pick() + require.NoError(t, err) + } + + // pick fifth, error + _, err := f.pick() + require.ErrorContains(t, err, "all fallback clients have been taken") + + // put one back + f.place() + + // pick again, no error + _, err = f.pick() + require.NoError(t, err) +} + +func Test_fallbackClient_noneSpecified(t *testing.T) { + // 4 clients + f := NewFallbackClient(1*time.Second, [4]byte{1, 2, 3, 4}, []string{}) + + // pick one, error + _, err := f.pick() + require.ErrorContains(t, err, "all fallback clients have been taken") + + // place one, error + f.place() +} diff --git a/app/eth2wrap/genwrap/genwrap.go b/app/eth2wrap/genwrap/genwrap.go index 6251976a0..2bbf128d8 100644 --- a/app/eth2wrap/genwrap/genwrap.go +++ b/app/eth2wrap/genwrap/genwrap.go @@ -66,9 +66,23 @@ type Client interface { {{if .Latency}}defer latency(label)() {{end}} - {{.ResultNames}} := {{.DoFunc}}(ctx, m.clients, - func(ctx context.Context, cl Client) ({{.ResultTypes}}){ - return cl.{{.Name}}({{.ParamNames}}) + {{.ResultNames}} := {{.DoFunc}}(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) ({{.ResultTypes}}){ + {{.ResultNames}} := args.client.{{.Name}}({{.ParamNames}}) + if err != nil { + // use a fallback BN if any + fe, fallbackErr := args.fallback.pick() + if fallbackErr != nil { + // no fallback endpoint available, return previous error + return {{.ResultNames}} + } + + defer args.fallback.place() + + return fe.{{.Name}}({{.ParamNames}}) + } + + return {{.ResultNames}} }, {{.SuccessFunc}} m.selector, ) diff --git a/app/eth2wrap/multi.go b/app/eth2wrap/multi.go index 9dbdf52b3..8e8a4b707 100644 --- a/app/eth2wrap/multi.go +++ b/app/eth2wrap/multi.go @@ -11,16 +11,24 @@ import ( ) // NewMultiForT creates a new mutil client for testing. -func NewMultiForT(clients []Client) Client { +func NewMultiForT(clients []Client, client ...*FallbackClient) Client { + var fb *FallbackClient + + if len(client) > 0 { + fb = client[0] + } + return &multi{ clients: clients, + fallback: fb, selector: newBestSelector(bestPeriod), } } -func newMulti(clients []Client) Client { +func newMulti(clients []Client, fallback *FallbackClient) Client { return multi{ clients: clients, + fallback: fallback, selector: newBestSelector(bestPeriod), } } @@ -28,9 +36,12 @@ func newMulti(clients []Client) Client { // multi implements Client by wrapping multiple clients, calling them in parallel // and returning the first successful response. // It also adds prometheus metrics and error wrapping. -// It also implements a best client selector. +// It also implements a "best client" selector. +// When any of the Clients specified fails a request, it will re-try it on the specified +// fallback endpoints, if any. type multi struct { clients []Client + fallback *FallbackClient selector *bestSelector } @@ -83,9 +94,9 @@ func (m multi) ActiveValidators(ctx context.Context) (ActiveValidators, error) { const label = "active_validators" // No latency since this is a cached endpoint. - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (ActiveValidators, error) { - return cl.ActiveValidators(ctx) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (ActiveValidators, error) { + return args.client.ActiveValidators(ctx) }, nil, nil, ) @@ -101,9 +112,9 @@ func (m multi) CompleteValidators(ctx context.Context) (CompleteValidators, erro const label = "complete_validators" // No latency since this is a cached endpoint. - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (CompleteValidators, error) { - return cl.CompleteValidators(ctx) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (CompleteValidators, error) { + return args.client.CompleteValidators(ctx) }, nil, nil, ) @@ -119,9 +130,23 @@ func (m multi) ProposerConfig(ctx context.Context) (*eth2exp.ProposerConfigRespo const label = "proposer_config" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (*eth2exp.ProposerConfigResponse, error) { - return cl.ProposerConfig(ctx) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (*eth2exp.ProposerConfigResponse, error) { + res, bnErr := args.client.ProposerConfig(ctx) + if bnErr != nil { + // use a fallback BN if any + fe, err := args.fallback.pick() + if err != nil { + // no fallback endpoint available, return previous error + return res, bnErr + } + + defer args.fallback.place() + + return fe.ProposerConfig(ctx) + } + + return res, bnErr }, nil, m.selector, ) @@ -137,9 +162,23 @@ func (m multi) AggregateBeaconCommitteeSelections(ctx context.Context, selection const label = "aggregate_beacon_committee_selections" defer latency(label)() - res0, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) ([]*eth2exp.BeaconCommitteeSelection, error) { - return cl.AggregateBeaconCommitteeSelections(ctx, selections) + res0, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) ([]*eth2exp.BeaconCommitteeSelection, error) { + res, bnErr := args.client.AggregateBeaconCommitteeSelections(ctx, selections) + if bnErr != nil { + // use a fallback BN if any + fe, err := m.fallback.pick() + if err != nil { + // no fallback endpoint available, return previous error + return res, bnErr + } + + defer args.fallback.place() + + return fe.AggregateBeaconCommitteeSelections(ctx, selections) + } + + return res, bnErr }, nil, m.selector, ) @@ -155,10 +194,25 @@ func (m multi) AggregateSyncCommitteeSelections(ctx context.Context, selections const label = "aggregate_sync_committee_selections" defer latency(label)() - res, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) ([]*eth2exp.SyncCommitteeSelection, error) { - return cl.AggregateSyncCommitteeSelections(ctx, selections) + res, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) ([]*eth2exp.SyncCommitteeSelection, error) { + res, bnErr := args.client.AggregateSyncCommitteeSelections(ctx, selections) + if bnErr != nil { + // use a fallback BN if any + fe, err := m.fallback.pick() + if err != nil { + // no fallback endpoint available, return previous error + return res, bnErr + } + + defer args.fallback.place() + + return fe.AggregateSyncCommitteeSelections(ctx, selections) + } + + return res, bnErr }, + nil, m.selector, ) if err != nil { @@ -173,9 +227,23 @@ func (m multi) BlockAttestations(ctx context.Context, stateID string) ([]*eth2p0 const label = "block_attestations" defer latency(label)() - res, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) ([]*eth2p0.Attestation, error) { - return cl.BlockAttestations(ctx, stateID) + res, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) ([]*eth2p0.Attestation, error) { + res, bnErr := args.client.BlockAttestations(ctx, stateID) + if bnErr != nil { + // use a fallback BN if any + fe, err := m.fallback.pick() + if err != nil { + // no fallback endpoint available, return previous error + return res, bnErr + } + + defer args.fallback.place() + + return fe.BlockAttestations(ctx, stateID) + } + + return res, bnErr }, nil, m.selector, ) @@ -191,9 +259,23 @@ func (m multi) NodePeerCount(ctx context.Context) (int, error) { const label = "node_peer_count" defer latency(label)() - res, err := provide(ctx, m.clients, - func(ctx context.Context, cl Client) (int, error) { - return cl.NodePeerCount(ctx) + res, err := provide(ctx, m.clients, m.fallback, + func(ctx context.Context, args provideArgs) (int, error) { + res, bnErr := args.client.NodePeerCount(ctx) + if bnErr != nil { + // use a fallback BN if any + fe, err := m.fallback.pick() + if err != nil { + // no fallback endpoint available, return previous error + return res, bnErr + } + + defer args.fallback.place() + + return fe.NodePeerCount(ctx) + } + + return res, bnErr }, nil, m.selector, ) diff --git a/app/eth2wrap/multi_test.go b/app/eth2wrap/multi_test.go index 1036583fd..663eeeb11 100644 --- a/app/eth2wrap/multi_test.go +++ b/app/eth2wrap/multi_test.go @@ -60,7 +60,7 @@ func TestMulti_NodePeerCount(t *testing.T) { client.On("Address").Return("test").Once() client.On("NodePeerCount", mock.Anything).Return(5, nil).Once() - m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}) + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, eth2wrap.NewFallbackClient(0, [4]byte{}, nil)) c, err := m.NodePeerCount(context.Background()) require.NoError(t, err) @@ -80,7 +80,7 @@ func TestMulti_BlockAttestations(t *testing.T) { client.On("Address").Return("test").Once() client.On("BlockAttestations", mock.Anything, "state").Return(atts, nil).Once() - m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}) + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, eth2wrap.NewFallbackClient(0, [4]byte{}, nil)) atts2, err := m.BlockAttestations(ctx, "state") require.NoError(t, err) @@ -101,7 +101,7 @@ func TestMulti_AggregateSyncCommitteeSelections(t *testing.T) { client.On("Address").Return("test").Once() client.On("AggregateSyncCommitteeSelections", mock.Anything, partsel).Return(selections, nil).Once() - m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}) + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, eth2wrap.NewFallbackClient(0, [4]byte{}, nil)) selections2, err := m.AggregateSyncCommitteeSelections(ctx, partsel) require.NoError(t, err) @@ -122,7 +122,7 @@ func TestMulti_AggregateBeaconCommitteeSelections(t *testing.T) { client.On("Address").Return("test").Once() client.On("AggregateBeaconCommitteeSelections", mock.Anything, partsel).Return(selections, nil).Once() - m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}) + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, eth2wrap.NewFallbackClient(0, [4]byte{}, nil)) selections2, err := m.AggregateBeaconCommitteeSelections(ctx, partsel) require.NoError(t, err) @@ -142,7 +142,7 @@ func TestMulti_ProposerConfig(t *testing.T) { client.On("Address").Return("test").Once() client.On("ProposerConfig", mock.Anything).Return(resp, nil).Once() - m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}) + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client}, eth2wrap.NewFallbackClient(0, [4]byte{}, nil)) resp2, err := m.ProposerConfig(ctx) require.NoError(t, err)