From 727a63d37eebc37d5b5eb80d899ff9c9af1b14f1 Mon Sep 17 00:00:00 2001 From: Samuel Laferriere Date: Fri, 25 Oct 2024 17:23:36 +0100 Subject: [PATCH] feat(failover): return 503 to batcher when eigenda is down chore: go mod tidy to generate go.mod feat: dealing with new eigenda-client grpc errors + ErrorFailover convention comment: fix typo feat(handlers): postShared returns 429 when disperser rate limited client flag(eigenda): rename RetriesBeforeFailover -> PutRetries reviewer correctly pointed out that retrying was more general than only for failovers lint: nolint exhaustive switch check for Put case --- flags/eigendaflags/cli.go | 11 +++++++ go.mod | 3 +- go.sum | 2 ++ server/config.go | 2 ++ server/errors.go | 22 +++++++++++++ server/handlers.go | 14 ++++++--- server/load_store.go | 1 + store/generated_key/eigenda/eigenda.go | 43 +++++++++++++++++++++++++- store/store.go | 3 +- 9 files changed, 92 insertions(+), 9 deletions(-) diff --git a/flags/eigendaflags/cli.go b/flags/eigendaflags/cli.go index 9a13c772..eb87ffd0 100644 --- a/flags/eigendaflags/cli.go +++ b/flags/eigendaflags/cli.go @@ -27,6 +27,8 @@ var ( ConfirmationDepthFlagName = withFlagPrefix("confirmation-depth") EthRPCURLFlagName = withFlagPrefix("eth-rpc") SvcManagerAddrFlagName = withFlagPrefix("svc-manager-addr") + // Flags that are proxy specific, and not used by the eigenda-client + PutRetriesFlagName = withFlagPrefix("put-retries") ) func withFlagPrefix(s string) string { @@ -137,6 +139,15 @@ func CLIFlags(envPrefix, category string) []cli.Flag { Category: category, Required: true, }, + // Flags that are proxy specific, and not used by the eigenda-client + // TODO: should we move this to a more specific category, like EIGENDA_STORE? + &cli.UintFlag{ + Name: PutRetriesFlagName, + Usage: "Number of times to retry blob dispersals.", + Value: 3, + EnvVars: []string{withEnvPrefix(envPrefix, "PUT_RETRIES")}, + Category: category, + }, } } diff --git a/go.mod b/go.mod index 38e89c39..3f261ef8 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ toolchain go1.22.0 require ( github.com/Layr-Labs/eigenda v0.8.5-0.20241031144746-e2ead56a306d + github.com/avast/retry-go/v4 v4.6.0 github.com/consensys/gnark-crypto v0.12.1 github.com/ethereum-optimism/optimism v1.9.4-0.20240927020138-a9c7f349d10b github.com/ethereum/go-ethereum v1.14.11 @@ -20,6 +21,7 @@ require ( github.com/testcontainers/testcontainers-go/modules/redis v0.33.0 github.com/urfave/cli/v2 v2.27.4 golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa + google.golang.org/grpc v1.64.1 ) require ( @@ -283,7 +285,6 @@ require ( golang.org/x/time v0.6.0 // indirect golang.org/x/tools v0.24.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect - google.golang.org/grpc v1.64.1 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 8e602d79..b636e0b5 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,8 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5 github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= +github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA= +github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE= github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU= diff --git a/server/config.go b/server/config.go index 3dc9d55c..22e62bc6 100644 --- a/server/config.go +++ b/server/config.go @@ -21,6 +21,7 @@ import ( type Config struct { EdaClientConfig clients.EigenDAClientConfig VerifierConfig verify.Config + PutRetries uint MemstoreEnabled bool MemstoreConfig memstore.Config @@ -43,6 +44,7 @@ func ReadConfig(ctx *cli.Context) Config { S3Config: s3.ReadConfig(ctx), EdaClientConfig: edaClientConfig, VerifierConfig: verify.ReadConfig(ctx, edaClientConfig), + PutRetries: ctx.Uint(eigendaflags.PutRetriesFlagName), MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName), MemstoreConfig: memstore.ReadConfig(ctx), FallbackTargets: ctx.StringSlice(flags.FallbackTargetsFlagName), diff --git a/server/errors.go b/server/errors.go index 77ff759b..a3435497 100644 --- a/server/errors.go +++ b/server/errors.go @@ -1,9 +1,13 @@ package server import ( + "errors" "fmt" "github.com/Layr-Labs/eigenda-proxy/commitments" + "github.com/Layr-Labs/eigenda-proxy/store" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // MetaError includes both an error and commitment metadata @@ -22,3 +26,21 @@ func (me MetaError) Error() string { func (me MetaError) Unwrap() error { return me.Err } + +func is400(err error) bool { + // proxy requests are super simple (clients basically only pass bytes), so the only 400 possible + // is passing a blob that's too big. + // + // Any 400s returned by the disperser are due to formatting bugs in proxy code, for eg. badly + // IFFT'ing or encoding the blob, so we shouldn't return a 400 to the client. + // See https://github.com/Layr-Labs/eigenda/blob/bee55ed9207f16153c3fd8ebf73c219e68685def/api/errors.go#L22 + // for the 400s returned by the disperser server (currently only INVALID_ARGUMENT). + return errors.Is(err, store.ErrProxyOversizedBlob) +} + +func is429(err error) bool { + // grpc RESOURCE_EXHAUSTED is returned by the disperser server when the client has sent too many requests + // in a short period of time. This is a client-side issue, so we should return the 429 to the client. + st, isGRPCError := status.FromError(err) + return isGRPCError && st.Code() == codes.ResourceExhausted +} diff --git a/server/handlers.go b/server/handlers.go index ea04872b..f38ca817 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -9,7 +9,7 @@ import ( "net/http" "github.com/Layr-Labs/eigenda-proxy/commitments" - "github.com/Layr-Labs/eigenda-proxy/store" + "github.com/Layr-Labs/eigenda/api" "github.com/gorilla/mux" ) @@ -181,11 +181,15 @@ func (svr *Server) handlePostShared(w http.ResponseWriter, r *http.Request, comm Err: fmt.Errorf("put request failed with commitment %v (commitment mode %v): %w", comm, meta.Mode, err), Meta: meta, } - if errors.Is(err, store.ErrEigenDAOversizedBlob) || errors.Is(err, store.ErrProxyOversizedBlob) { - // we add here any error that should be returned as a 400 instead of a 500. - // currently only includes oversized blob requests + switch { + case is400(err): http.Error(w, err.Error(), http.StatusBadRequest) - } else { + case is429(err): + http.Error(w, err.Error(), http.StatusTooManyRequests) + case errors.Is(err, &api.ErrorFailover{}): + // this tells the caller (batcher) to failover to ethda b/c eigenda is temporarily down + http.Error(w, err.Error(), http.StatusServiceUnavailable) + default: http.Error(w, err.Error(), http.StatusInternalServerError) } return err diff --git a/server/load_store.go b/server/load_store.go index be298c87..5d69c861 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -111,6 +111,7 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr MaxBlobSizeBytes: cfg.EigenDAConfig.MemstoreConfig.MaxBlobSizeBytes, EthConfirmationDepth: cfg.EigenDAConfig.VerifierConfig.EthConfirmationDepth, StatusQueryTimeout: cfg.EigenDAConfig.EdaClientConfig.StatusQueryTimeout, + PutRetries: cfg.EigenDAConfig.PutRetries, }, ) } diff --git a/store/generated_key/eigenda/eigenda.go b/store/generated_key/eigenda/eigenda.go index 33f7ea36..f0db5375 100644 --- a/store/generated_key/eigenda/eigenda.go +++ b/store/generated_key/eigenda/eigenda.go @@ -8,8 +8,13 @@ import ( "github.com/Layr-Labs/eigenda-proxy/store" "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/api/grpc/disperser" + + "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type StoreConfig struct { @@ -20,6 +25,9 @@ type StoreConfig struct { // total duration time that client waits for blob to confirm StatusQueryTimeout time.Duration + + // number of times to retry eigenda blob dispersals + PutRetries uint } // Store does storage interactions and verifications for blobs with DA. @@ -70,7 +78,40 @@ func (e Store) Put(ctx context.Context, value []byte) ([]byte, error) { return nil, fmt.Errorf("%w: blob length %d, max blob size %d", store.ErrProxyOversizedBlob, len(value), e.cfg.MaxBlobSizeBytes) } - blobInfo, err := e.client.PutBlob(ctx, value) + // We attempt to disperse the blob to EigenDA up to 3 times, unless we get a 400 error on any attempt. + blobInfo, err := retry.DoWithData( + func() (*disperser.BlobInfo, error) { + return e.client.PutBlob(ctx, value) + }, + retry.RetryIf(func(err error) bool { + st, isGRPCError := status.FromError(err) + if !isGRPCError { + // api.ErrorFailover is returned, so we should retry + return true + } + //nolint:exhaustive // we only care about a few grpc error codes + switch st.Code() { + case codes.InvalidArgument: + // we don't retry 400 errors because there is no point, + // we are passing invalid data + return false + case codes.ResourceExhausted: + // we retry on 429s because *can* mean we are being rate limited + // we sleep 1 second... very arbitrarily, because we don't have more info. + // grpc error itself should return a backoff time, + // see https://github.com/Layr-Labs/eigenda/issues/845 for more details + time.Sleep(1 * time.Second) + return true + default: + return true + } + }), + // only return the last error. If it is an api.ErrorFailover, then the handler will convert + // it to an http 503 to signify to the client (batcher) to failover to ethda + // b/c eigenda is temporarily down. + retry.LastErrorOnly(true), + retry.Attempts(e.cfg.PutRetries), + ) if err != nil { // TODO: we will want to filter for errors here and return a 503 when needed // ie when dispersal itself failed, or that we timed out waiting for batch to land onchain diff --git a/store/store.go b/store/store.go index 8c50e8f3..f4d349b4 100644 --- a/store/store.go +++ b/store/store.go @@ -18,8 +18,7 @@ const ( ) var ( - ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size") - ErrEigenDAOversizedBlob = fmt.Errorf("blob size cannot exceed") + ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size") ) func (b BackendType) String() string {