Skip to content

Commit

Permalink
feat(failover): return 503 to batcher when eigenda is down
Browse files Browse the repository at this point in the history
chore: go mod tidy to generate go.mod

feat: dealing with new eigenda-client grpc errors + ErrorFailover convention

comment: fix typo

feat(handlers): postShared returns 429 when disperser rate limited client

flag(eigenda): rename RetriesBeforeFailover -> PutRetries

reviewer correctly pointed out that retrying was more general than only for failovers

lint: nolint exhaustive switch check for Put case
  • Loading branch information
samlaf committed Oct 31, 2024
1 parent 390e491 commit 727a63d
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 9 deletions.
11 changes: 11 additions & 0 deletions flags/eigendaflags/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
ConfirmationDepthFlagName = withFlagPrefix("confirmation-depth")
EthRPCURLFlagName = withFlagPrefix("eth-rpc")
SvcManagerAddrFlagName = withFlagPrefix("svc-manager-addr")
// Flags that are proxy specific, and not used by the eigenda-client
PutRetriesFlagName = withFlagPrefix("put-retries")
)

func withFlagPrefix(s string) string {
Expand Down Expand Up @@ -137,6 +139,15 @@ func CLIFlags(envPrefix, category string) []cli.Flag {
Category: category,
Required: true,
},
// Flags that are proxy specific, and not used by the eigenda-client
// TODO: should we move this to a more specific category, like EIGENDA_STORE?
&cli.UintFlag{
Name: PutRetriesFlagName,
Usage: "Number of times to retry blob dispersals.",
Value: 3,
EnvVars: []string{withEnvPrefix(envPrefix, "PUT_RETRIES")},
Category: category,
},
}
}

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.22.0

require (
github.com/Layr-Labs/eigenda v0.8.5-0.20241031144746-e2ead56a306d
github.com/avast/retry-go/v4 v4.6.0
github.com/consensys/gnark-crypto v0.12.1
github.com/ethereum-optimism/optimism v1.9.4-0.20240927020138-a9c7f349d10b
github.com/ethereum/go-ethereum v1.14.11
Expand All @@ -20,6 +21,7 @@ require (
github.com/testcontainers/testcontainers-go/modules/redis v0.33.0
github.com/urfave/cli/v2 v2.27.4
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa
google.golang.org/grpc v1.64.1
)

require (
Expand Down Expand Up @@ -283,7 +285,6 @@ require (
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.24.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/grpc v1.64.1 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA=
github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU=
Expand Down
2 changes: 2 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type Config struct {
EdaClientConfig clients.EigenDAClientConfig
VerifierConfig verify.Config
PutRetries uint

MemstoreEnabled bool
MemstoreConfig memstore.Config
Expand All @@ -43,6 +44,7 @@ func ReadConfig(ctx *cli.Context) Config {
S3Config: s3.ReadConfig(ctx),
EdaClientConfig: edaClientConfig,
VerifierConfig: verify.ReadConfig(ctx, edaClientConfig),
PutRetries: ctx.Uint(eigendaflags.PutRetriesFlagName),
MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName),
MemstoreConfig: memstore.ReadConfig(ctx),
FallbackTargets: ctx.StringSlice(flags.FallbackTargetsFlagName),
Expand Down
22 changes: 22 additions & 0 deletions server/errors.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package server

import (
"errors"
"fmt"

"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/store"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// MetaError includes both an error and commitment metadata
Expand All @@ -22,3 +26,21 @@ func (me MetaError) Error() string {
func (me MetaError) Unwrap() error {
return me.Err
}

func is400(err error) bool {
// proxy requests are super simple (clients basically only pass bytes), so the only 400 possible
// is passing a blob that's too big.
//
// Any 400s returned by the disperser are due to formatting bugs in proxy code, for eg. badly
// IFFT'ing or encoding the blob, so we shouldn't return a 400 to the client.
// See https://github.com/Layr-Labs/eigenda/blob/bee55ed9207f16153c3fd8ebf73c219e68685def/api/errors.go#L22
// for the 400s returned by the disperser server (currently only INVALID_ARGUMENT).
return errors.Is(err, store.ErrProxyOversizedBlob)
}

func is429(err error) bool {
// grpc RESOURCE_EXHAUSTED is returned by the disperser server when the client has sent too many requests
// in a short period of time. This is a client-side issue, so we should return the 429 to the client.
st, isGRPCError := status.FromError(err)
return isGRPCError && st.Code() == codes.ResourceExhausted
}
14 changes: 9 additions & 5 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"net/http"

"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/Layr-Labs/eigenda/api"
"github.com/gorilla/mux"
)

Expand Down Expand Up @@ -181,11 +181,15 @@ func (svr *Server) handlePostShared(w http.ResponseWriter, r *http.Request, comm
Err: fmt.Errorf("put request failed with commitment %v (commitment mode %v): %w", comm, meta.Mode, err),
Meta: meta,
}
if errors.Is(err, store.ErrEigenDAOversizedBlob) || errors.Is(err, store.ErrProxyOversizedBlob) {
// we add here any error that should be returned as a 400 instead of a 500.
// currently only includes oversized blob requests
switch {
case is400(err):
http.Error(w, err.Error(), http.StatusBadRequest)
} else {
case is429(err):
http.Error(w, err.Error(), http.StatusTooManyRequests)
case errors.Is(err, &api.ErrorFailover{}):
// this tells the caller (batcher) to failover to ethda b/c eigenda is temporarily down
http.Error(w, err.Error(), http.StatusServiceUnavailable)
default:
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return err
Expand Down
1 change: 1 addition & 0 deletions server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr
MaxBlobSizeBytes: cfg.EigenDAConfig.MemstoreConfig.MaxBlobSizeBytes,
EthConfirmationDepth: cfg.EigenDAConfig.VerifierConfig.EthConfirmationDepth,
StatusQueryTimeout: cfg.EigenDAConfig.EdaClientConfig.StatusQueryTimeout,
PutRetries: cfg.EigenDAConfig.PutRetries,
},
)
}
Expand Down
43 changes: 42 additions & 1 deletion store/generated_key/eigenda/eigenda.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/Layr-Labs/eigenda-proxy/verify"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/grpc/disperser"

"github.com/avast/retry-go/v4"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type StoreConfig struct {
Expand All @@ -20,6 +25,9 @@ type StoreConfig struct {

// total duration time that client waits for blob to confirm
StatusQueryTimeout time.Duration

// number of times to retry eigenda blob dispersals
PutRetries uint
}

// Store does storage interactions and verifications for blobs with DA.
Expand Down Expand Up @@ -70,7 +78,40 @@ func (e Store) Put(ctx context.Context, value []byte) ([]byte, error) {
return nil, fmt.Errorf("%w: blob length %d, max blob size %d", store.ErrProxyOversizedBlob, len(value), e.cfg.MaxBlobSizeBytes)
}

blobInfo, err := e.client.PutBlob(ctx, value)
// We attempt to disperse the blob to EigenDA up to 3 times, unless we get a 400 error on any attempt.
blobInfo, err := retry.DoWithData(
func() (*disperser.BlobInfo, error) {
return e.client.PutBlob(ctx, value)
},
retry.RetryIf(func(err error) bool {
st, isGRPCError := status.FromError(err)
if !isGRPCError {
// api.ErrorFailover is returned, so we should retry
return true
}
//nolint:exhaustive // we only care about a few grpc error codes
switch st.Code() {
case codes.InvalidArgument:
// we don't retry 400 errors because there is no point,
// we are passing invalid data
return false
case codes.ResourceExhausted:
// we retry on 429s because *can* mean we are being rate limited
// we sleep 1 second... very arbitrarily, because we don't have more info.
// grpc error itself should return a backoff time,
// see https://github.com/Layr-Labs/eigenda/issues/845 for more details
time.Sleep(1 * time.Second)
return true
default:
return true
}
}),
// only return the last error. If it is an api.ErrorFailover, then the handler will convert
// it to an http 503 to signify to the client (batcher) to failover to ethda
// b/c eigenda is temporarily down.
retry.LastErrorOnly(true),
retry.Attempts(e.cfg.PutRetries),
)
if err != nil {
// TODO: we will want to filter for errors here and return a 503 when needed
// ie when dispersal itself failed, or that we timed out waiting for batch to land onchain
Expand Down
3 changes: 1 addition & 2 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ const (
)

var (
ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size")
ErrEigenDAOversizedBlob = fmt.Errorf("blob size cannot exceed")
ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size")
)

func (b BackendType) String() string {
Expand Down

0 comments on commit 727a63d

Please sign in to comment.