Skip to content

Commit

Permalink
Merge branch 'main' into pinebit/v2-endpoint-warn
Browse files Browse the repository at this point in the history
  • Loading branch information
pinebit authored Jan 8, 2025
2 parents 3107739 + 853ed7f commit 0b1e3ad
Show file tree
Hide file tree
Showing 26 changed files with 1,133 additions and 484 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.62.0
version: v1.62.2
- name: notify failure
if: failure() && github.ref == 'refs/heads/main'
env:
Expand Down
5 changes: 1 addition & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ linters:
- gocyclo
- godot
- godox
- gomnd
- gomoddirectives
- inamedparam
- interfacebloat
Expand All @@ -182,6 +181,4 @@ linters:
- tagliatelle
- varnamelen
- wsl
# Deprecated
- goerr113
- execinquery
- err113
2 changes: 1 addition & 1 deletion .pre-commit/run_linter.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash

VERSION="1.62.0"
VERSION="1.62.2"

if ! command -v golangci-lint &> /dev/null
then
Expand Down
98 changes: 65 additions & 33 deletions app/eth2wrap/eth2wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,21 @@ func Instrument(clients ...Client) (Client, error) {
return nil, errors.New("clients empty")
}

return newMulti(clients), nil
// TODO(gsora): remove once the implementation is agreed upon and
// wiring is complete.
fb := NewFallbackClient(0, [4]byte{}, nil)

return newMulti(clients, fb), nil
}

// InstrumentWithFallback returns a new multi instrumented client using the provided clients as backends and fallback
// respectively.
func InstrumentWithFallback(fallback *FallbackClient, clients ...Client) (Client, error) {
if len(clients) == 0 {
return nil, errors.New("clients empty")
}

return newMulti(clients, fallback), nil
}

// WithSyntheticDuties wraps the provided client adding synthetic duties.
Expand All @@ -71,43 +85,58 @@ func WithSyntheticDuties(cl Client) Client {

// NewMultiHTTP returns a new instrumented multi eth2 http client.
func NewMultiHTTP(timeout time.Duration, forkVersion [4]byte, addresses ...string) (Client, error) {
return Instrument(newClients(timeout, forkVersion, addresses)...)
}

// newClients returns a slice of Client initialized with the provided settings.
func newClients(timeout time.Duration, forkVersion [4]byte, addresses []string) []Client {
var clients []Client
for _, address := range addresses {
parameters := []eth2http.Parameter{
eth2http.WithLogLevel(zeroLogInfo),
eth2http.WithAddress(address),
eth2http.WithTimeout(timeout),
eth2http.WithAllowDelayedStart(true),
eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)),
}
clients = append(clients, newBeaconClient(timeout, forkVersion, address))
}

cl := newLazy(func(ctx context.Context) (Client, error) {
eth2Svc, err := eth2http.New(ctx, parameters...)
if err != nil {
return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address))
}
eth2Http, ok := eth2Svc.(*eth2http.Service)
if !ok {
return nil, errors.New("invalid eth2 http service")
}
return clients
}

adaptedCl := AdaptEth2HTTP(eth2Http, timeout)
adaptedCl.SetForkVersion(forkVersion)
// newBeaconClient returns a Client with the provided settings.
func newBeaconClient(timeout time.Duration, forkVersion [4]byte, address string) Client {
parameters := []eth2http.Parameter{
eth2http.WithLogLevel(zeroLogInfo),
eth2http.WithAddress(address),
eth2http.WithTimeout(timeout),
eth2http.WithAllowDelayedStart(true),
eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)),
}

return adaptedCl, nil
})
cl := newLazy(func(ctx context.Context) (Client, error) {
eth2Svc, err := eth2http.New(ctx, parameters...)
if err != nil {
return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address))
}
eth2Http, ok := eth2Svc.(*eth2http.Service)
if !ok {
return nil, errors.New("invalid eth2 http service")
}

clients = append(clients, cl)
}
adaptedCl := AdaptEth2HTTP(eth2Http, timeout)
adaptedCl.SetForkVersion(forkVersion)

return adaptedCl, nil
})

return Instrument(clients...)
return cl
}

type provideArgs struct {
client Client
fallback *FallbackClient
}

// provide calls the work function with each client in parallel, returning the
// first successful result or first error.
// The bestIdxFunc is called with the index of the client returning a successful response.
func provide[O any](ctx context.Context, clients []Client,
work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
func provide[O any](ctx context.Context, clients []Client, fallback *FallbackClient,
work forkjoin.Work[provideArgs, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
) (O, error) {
if isSuccessFunc == nil {
isSuccessFunc = func(O) bool { return true }
Expand All @@ -118,12 +147,15 @@ func provide[O any](ctx context.Context, clients []Client,
forkjoin.WithWorkers(len(clients)),
)
for _, client := range clients {
fork(client)
fork(provideArgs{
client: client,
fallback: fallback,
})
}
defer cancel()

var (
nokResp forkjoin.Result[Client, O]
nokResp forkjoin.Result[provideArgs, O]
hasNokResp bool
zero O
)
Expand All @@ -132,7 +164,7 @@ func provide[O any](ctx context.Context, clients []Client,
return zero, ctx.Err()
} else if res.Err == nil && isSuccessFunc(res.Output) {
if bestSelector != nil {
bestSelector.Increment(res.Input.Address())
bestSelector.Increment(res.Input.client.Address())
}

return res.Output, nil
Expand All @@ -154,10 +186,10 @@ func provide[O any](ctx context.Context, clients []Client,
type empty struct{}

// submit proxies provide, but returns nil instead of a successful result.
func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error, selector *bestSelector) error {
_, err := provide(ctx, clients,
func(ctx context.Context, cl Client) (empty, error) {
return empty{}, work(ctx, cl)
func submit(ctx context.Context, clients []Client, fallback *FallbackClient, work func(context.Context, provideArgs) error, selector *bestSelector) error {
_, err := provide(ctx, clients, fallback,
func(ctx context.Context, args provideArgs) (empty, error) {
return empty{}, work(ctx, args)
},
nil, selector,
)
Expand Down
Loading

0 comments on commit 0b1e3ad

Please sign in to comment.