Skip to content

Commit

Permalink
app/eth2wrap: fallback beacon nodes
Browse files Browse the repository at this point in the history
Implement a way to provide eth2wrap with two classes of beacon nodes addresses: standard and fallback beacon nodes.

When one of the multi BN calls fails, eth2wrap wrappers will try to get an available fallback BN from a list and re-do the call on that.

If no fallback BNs is specified, return the original error.

If the fallback BN call fails, return fallback error instead.

This PR firstly introduces concepts and code, will introduce CLI parameters and initialization code later.
  • Loading branch information
gsora committed Oct 15, 2024
1 parent 4bbb36e commit 1138ab3
Show file tree
Hide file tree
Showing 7 changed files with 859 additions and 164 deletions.
98 changes: 65 additions & 33 deletions app/eth2wrap/eth2wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,21 @@ func Instrument(clients ...Client) (Client, error) {
return nil, errors.New("clients empty")
}

return newMulti(clients), nil
// TODO(gsora): remove once the implementation is agreed upon and
// wiring is complete.
fb := NewFallbackClient(0, [4]byte{}, nil)

return newMulti(clients, fb), nil
}

// InstrumentWithFallback returns a new multi instrumented client using the provided clients as backends and fallback
// respectively.
func InstrumentWithFallback(fallback *FallbackClient, clients ...Client) (Client, error) {
if len(clients) == 0 {
return nil, errors.New("clients empty")
}

Check warning on line 72 in app/eth2wrap/eth2wrap.go

View check run for this annotation

Codecov / codecov/patch

app/eth2wrap/eth2wrap.go#L69-L72

Added lines #L69 - L72 were not covered by tests

return newMulti(clients, fallback), nil

Check warning on line 74 in app/eth2wrap/eth2wrap.go

View check run for this annotation

Codecov / codecov/patch

app/eth2wrap/eth2wrap.go#L74

Added line #L74 was not covered by tests
}

// WithSyntheticDuties wraps the provided client adding synthetic duties.
Expand All @@ -71,43 +85,58 @@ func WithSyntheticDuties(cl Client) Client {

// NewMultiHTTP returns a new instrumented multi eth2 http client.
func NewMultiHTTP(timeout time.Duration, forkVersion [4]byte, addresses ...string) (Client, error) {
return Instrument(newClients(timeout, forkVersion, addresses)...)
}

// newClients returns a slice of Client initialized with the provided settings.
func newClients(timeout time.Duration, forkVersion [4]byte, addresses []string) []Client {
var clients []Client
for _, address := range addresses {
parameters := []eth2http.Parameter{
eth2http.WithLogLevel(zeroLogInfo),
eth2http.WithAddress(address),
eth2http.WithTimeout(timeout),
eth2http.WithAllowDelayedStart(true),
eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)),
}
clients = append(clients, newBeaconClient(timeout, forkVersion, address))
}

cl := newLazy(func(ctx context.Context) (Client, error) {
eth2Svc, err := eth2http.New(ctx, parameters...)
if err != nil {
return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address))
}
eth2Http, ok := eth2Svc.(*eth2http.Service)
if !ok {
return nil, errors.New("invalid eth2 http service")
}
return clients
}

adaptedCl := AdaptEth2HTTP(eth2Http, timeout)
adaptedCl.SetForkVersion(forkVersion)
// newBeaconClient returns a Client with the provided settings.
func newBeaconClient(timeout time.Duration, forkVersion [4]byte, address string) Client {
parameters := []eth2http.Parameter{
eth2http.WithLogLevel(zeroLogInfo),
eth2http.WithAddress(address),
eth2http.WithTimeout(timeout),
eth2http.WithAllowDelayedStart(true),
eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)),
}

return adaptedCl, nil
})
cl := newLazy(func(ctx context.Context) (Client, error) {
eth2Svc, err := eth2http.New(ctx, parameters...)
if err != nil {
return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address))
}

Check warning on line 115 in app/eth2wrap/eth2wrap.go

View check run for this annotation

Codecov / codecov/patch

app/eth2wrap/eth2wrap.go#L114-L115

Added lines #L114 - L115 were not covered by tests
eth2Http, ok := eth2Svc.(*eth2http.Service)
if !ok {
return nil, errors.New("invalid eth2 http service")
}

Check warning on line 119 in app/eth2wrap/eth2wrap.go

View check run for this annotation

Codecov / codecov/patch

app/eth2wrap/eth2wrap.go#L118-L119

Added lines #L118 - L119 were not covered by tests

clients = append(clients, cl)
}
adaptedCl := AdaptEth2HTTP(eth2Http, timeout)
adaptedCl.SetForkVersion(forkVersion)

return adaptedCl, nil
})

return Instrument(clients...)
return cl
}

type provideArgs struct {
client Client
fallback *FallbackClient
}

// provide calls the work function with each client in parallel, returning the
// first successful result or first error.
// The bestIdxFunc is called with the index of the client returning a successful response.
func provide[O any](ctx context.Context, clients []Client,
work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
func provide[O any](ctx context.Context, clients []Client, fallback *FallbackClient,
work forkjoin.Work[provideArgs, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
) (O, error) {
if isSuccessFunc == nil {
isSuccessFunc = func(O) bool { return true }
Expand All @@ -118,12 +147,15 @@ func provide[O any](ctx context.Context, clients []Client,
forkjoin.WithWorkers(len(clients)),
)
for _, client := range clients {
fork(client)
fork(provideArgs{
client: client,
fallback: fallback,
})
}
defer cancel()

var (
nokResp forkjoin.Result[Client, O]
nokResp forkjoin.Result[provideArgs, O]
hasNokResp bool
zero O
)
Expand All @@ -132,7 +164,7 @@ func provide[O any](ctx context.Context, clients []Client,
return zero, ctx.Err()
} else if res.Err == nil && isSuccessFunc(res.Output) {
if bestSelector != nil {
bestSelector.Increment(res.Input.Address())
bestSelector.Increment(res.Input.client.Address())
}

return res.Output, nil
Expand All @@ -154,10 +186,10 @@ func provide[O any](ctx context.Context, clients []Client,
type empty struct{}

// submit proxies provide, but returns nil instead of a successful result.
func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error, selector *bestSelector) error {
_, err := provide(ctx, clients,
func(ctx context.Context, cl Client) (empty, error) {
return empty{}, work(ctx, cl)
func submit(ctx context.Context, clients []Client, fallback *FallbackClient, work func(context.Context, provideArgs) error, selector *bestSelector) error {
_, err := provide(ctx, clients, fallback,
func(ctx context.Context, args provideArgs) (empty, error) {
return empty{}, work(ctx, args)
},
nil, selector,
)
Expand Down
Loading

0 comments on commit 1138ab3

Please sign in to comment.