Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 65 additions & 33 deletions app/eth2wrap/eth2wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,21 @@ func Instrument(clients ...Client) (Client, error) {
return nil, errors.New("clients empty")
}

return newMulti(clients), nil
// TODO(gsora): remove once the implementation is agreed upon and
// wiring is complete.
fb := NewFallbackClient(0, [4]byte{}, nil)

return newMulti(clients, fb), nil
}

// InstrumentWithFallback returns a new multi instrumented client using the provided clients as backends and fallback
// respectively.
func InstrumentWithFallback(fallback *FallbackClient, clients ...Client) (Client, error) {
if len(clients) == 0 {
return nil, errors.New("clients empty")
}

return newMulti(clients, fallback), nil
}

// WithSyntheticDuties wraps the provided client adding synthetic duties.
Expand All @@ -71,43 +85,58 @@ func WithSyntheticDuties(cl Client) Client {

// NewMultiHTTP returns a new instrumented multi eth2 http client.
func NewMultiHTTP(timeout time.Duration, forkVersion [4]byte, addresses ...string) (Client, error) {
return Instrument(newClients(timeout, forkVersion, addresses)...)
}

// newClients returns a slice of Client initialized with the provided settings.
func newClients(timeout time.Duration, forkVersion [4]byte, addresses []string) []Client {
var clients []Client
for _, address := range addresses {
parameters := []eth2http.Parameter{
eth2http.WithLogLevel(zeroLogInfo),
eth2http.WithAddress(address),
eth2http.WithTimeout(timeout),
eth2http.WithAllowDelayedStart(true),
eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)),
}
clients = append(clients, newBeaconClient(timeout, forkVersion, address))
}

cl := newLazy(func(ctx context.Context) (Client, error) {
eth2Svc, err := eth2http.New(ctx, parameters...)
if err != nil {
return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address))
}
eth2Http, ok := eth2Svc.(*eth2http.Service)
if !ok {
return nil, errors.New("invalid eth2 http service")
}
return clients
}

adaptedCl := AdaptEth2HTTP(eth2Http, timeout)
adaptedCl.SetForkVersion(forkVersion)
// newBeaconClient returns a Client with the provided settings.
func newBeaconClient(timeout time.Duration, forkVersion [4]byte, address string) Client {
parameters := []eth2http.Parameter{
eth2http.WithLogLevel(zeroLogInfo),
eth2http.WithAddress(address),
eth2http.WithTimeout(timeout),
eth2http.WithAllowDelayedStart(true),
eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)),
}

return adaptedCl, nil
})
cl := newLazy(func(ctx context.Context) (Client, error) {
eth2Svc, err := eth2http.New(ctx, parameters...)
if err != nil {
return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address))
}
eth2Http, ok := eth2Svc.(*eth2http.Service)
if !ok {
return nil, errors.New("invalid eth2 http service")
}

clients = append(clients, cl)
}
adaptedCl := AdaptEth2HTTP(eth2Http, timeout)
adaptedCl.SetForkVersion(forkVersion)

return adaptedCl, nil
})

return Instrument(clients...)
return cl
}

type provideArgs struct {
client Client
fallback *FallbackClient
}

// provide calls the work function with each client in parallel, returning the
// first successful result or first error.
// The bestIdxFunc is called with the index of the client returning a successful response.
func provide[O any](ctx context.Context, clients []Client,
work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
func provide[O any](ctx context.Context, clients []Client, fallback *FallbackClient,
work forkjoin.Work[provideArgs, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
) (O, error) {
if isSuccessFunc == nil {
isSuccessFunc = func(O) bool { return true }
Expand All @@ -118,12 +147,15 @@ func provide[O any](ctx context.Context, clients []Client,
forkjoin.WithWorkers(len(clients)),
)
for _, client := range clients {
fork(client)
fork(provideArgs{
client: client,
fallback: fallback,
})
}
defer cancel()

var (
nokResp forkjoin.Result[Client, O]
nokResp forkjoin.Result[provideArgs, O]
hasNokResp bool
zero O
)
Expand All @@ -132,7 +164,7 @@ func provide[O any](ctx context.Context, clients []Client,
return zero, ctx.Err()
} else if res.Err == nil && isSuccessFunc(res.Output) {
if bestSelector != nil {
bestSelector.Increment(res.Input.Address())
bestSelector.Increment(res.Input.client.Address())
}

return res.Output, nil
Expand All @@ -154,10 +186,10 @@ func provide[O any](ctx context.Context, clients []Client,
type empty struct{}

// submit proxies provide, but returns nil instead of a successful result.
func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error, selector *bestSelector) error {
_, err := provide(ctx, clients,
func(ctx context.Context, cl Client) (empty, error) {
return empty{}, work(ctx, cl)
func submit(ctx context.Context, clients []Client, fallback *FallbackClient, work func(context.Context, provideArgs) error, selector *bestSelector) error {
_, err := provide(ctx, clients, fallback,
func(ctx context.Context, args provideArgs) (empty, error) {
return empty{}, work(ctx, args)
},
nil, selector,
)
Expand Down
Loading