From 2c49666f7befebac7eebc73b3c15dd514936fc60 Mon Sep 17 00:00:00 2001 From: Samuel Laferriere Date: Thu, 13 Feb 2025 01:22:40 -0500 Subject: [PATCH] chore: add memstore config to return failover errors on puts This is useful for testing failover in batcher. Right now a new proxy would need to be spun up with this config turned on. We should consider adding an admin rpc rpi to turn these on/off dynamically. --- store/generated_key/memstore/cli.go | 39 +++++++++++++------ store/generated_key/memstore/memstore.go | 25 ++++++++---- store/generated_key/memstore/memstore_test.go | 27 +++++++++++++ 3 files changed, 73 insertions(+), 18 deletions(-) diff --git a/store/generated_key/memstore/cli.go b/store/generated_key/memstore/cli.go index 39717212..6da3d6e1 100644 --- a/store/generated_key/memstore/cli.go +++ b/store/generated_key/memstore/cli.go @@ -10,10 +10,11 @@ import ( ) var ( - EnabledFlagName = withFlagPrefix("enabled") - ExpirationFlagName = withFlagPrefix("expiration") - PutLatencyFlagName = withFlagPrefix("put-latency") - GetLatencyFlagName = withFlagPrefix("get-latency") + EnabledFlagName = withFlagPrefix("enabled") + ExpirationFlagName = withFlagPrefix("expiration") + PutLatencyFlagName = withFlagPrefix("put-latency") + GetLatencyFlagName = withFlagPrefix("get-latency") + PutReturnsFailoverErrorFlagName = withFlagPrefix("put-returns-failover-error") ) func withFlagPrefix(s string) string { @@ -39,13 +40,21 @@ func CLIFlags(envPrefix, category string) []cli.Flag { Usage: "Whether to use memstore for DA logic.", EnvVars: []string{withEnvPrefix(envPrefix, "ENABLED"), withDeprecatedEnvPrefix(envPrefix, "ENABLED")}, Category: category, - Action: func(_ *cli.Context, _ bool) error { + Action: func(ctx *cli.Context, enabled bool) error { if _, ok := os.LookupEnv(withDeprecatedEnvPrefix(envPrefix, "ENABLED")); ok { return fmt.Errorf("env var %s is deprecated for flag %s, use %s instead", withDeprecatedEnvPrefix(envPrefix, "ENABLED"), EnabledFlagName, withEnvPrefix(envPrefix, "ENABLED")) } + if enabled { + // If memstore is enabled, we disable cert verification, + // because memstore generates some meaningless certs. + err := ctx.Set(verify.CertVerificationDisabledFlagName, "true") + if err != nil { + return fmt.Errorf("failed to set %s: %w", verify.CertVerificationDisabledFlagName, err) + } + } return nil }, }, @@ -67,18 +76,25 @@ func CLIFlags(envPrefix, category string) []cli.Flag { }, &cli.DurationFlag{ Name: PutLatencyFlagName, - Usage: "Artificial latency added for memstore backend to mimic EigenDA's dispersal latency.", + Usage: "(FOR TESTING) Artificial latency added for memstore backend to mimic EigenDA's dispersal latency.", Value: 0, EnvVars: []string{withEnvPrefix(envPrefix, "PUT_LATENCY")}, Category: category, }, &cli.DurationFlag{ Name: GetLatencyFlagName, - Usage: "Artificial latency added for memstore backend to mimic EigenDA's retrieval latency.", + Usage: "(FOR TESTING) Artificial latency added for memstore backend to mimic EigenDA's retrieval latency.", Value: 0, EnvVars: []string{withEnvPrefix(envPrefix, "GET_LATENCY")}, Category: category, }, + &cli.BoolFlag{ + Name: PutReturnsFailoverErrorFlagName, + Usage: fmt.Sprintf("(FOR TESTING) When true, Put requests will return a failover error, after sleeping for --%s duration.", PutLatencyFlagName), + Value: false, + EnvVars: []string{withEnvPrefix(envPrefix, "PUT_RETURNS_FAILOVER_ERROR")}, + Category: category, + }, } } @@ -88,9 +104,10 @@ func ReadConfig(ctx *cli.Context) Config { // right now we get it from the verifier cli, but there's probably a way to share flags more nicely? // maybe use a duplicate but hidden flag in memstore category, and set it using the action by reading // from the other flag? - MaxBlobSizeBytes: verify.MaxBlobLengthBytes, - BlobExpiration: ctx.Duration(ExpirationFlagName), - PutLatency: ctx.Duration(PutLatencyFlagName), - GetLatency: ctx.Duration(GetLatencyFlagName), + MaxBlobSizeBytes: verify.MaxBlobLengthBytes, + BlobExpiration: ctx.Duration(ExpirationFlagName), + PutLatency: ctx.Duration(PutLatencyFlagName), + GetLatency: ctx.Duration(GetLatencyFlagName), + PutReturnsFailoverError: ctx.Bool(PutReturnsFailoverErrorFlagName), } } diff --git a/store/generated_key/memstore/memstore.go b/store/generated_key/memstore/memstore.go index 9b1c7f6b..e043c1e9 100644 --- a/store/generated_key/memstore/memstore.go +++ b/store/generated_key/memstore/memstore.go @@ -3,6 +3,7 @@ package memstore import ( "context" "crypto/rand" + "errors" "fmt" "math/big" "sync" @@ -12,6 +13,7 @@ import ( "github.com/Layr-Labs/eigenda-proxy/common" "github.com/Layr-Labs/eigenda-proxy/verify" + "github.com/Layr-Labs/eigenda/api" "github.com/Layr-Labs/eigenda/api/clients/codecs" eigenda_common "github.com/Layr-Labs/eigenda/api/grpc/common" "github.com/Layr-Labs/eigenda/api/grpc/disperser" @@ -27,9 +29,15 @@ const ( type Config struct { MaxBlobSizeBytes uint64 BlobExpiration time.Duration + // TODO: we should probably add an admin api to set the below values + // without having to restart the server // artificial latency added for memstore backend to mimic eigenda's latency PutLatency time.Duration GetLatency time.Duration + // when true, put requests will return an errorFailover error, + // after sleeping PutLatency duration. + // This can be used to simulate eigenda being down. + PutReturnsFailoverError bool } /* @@ -38,7 +46,7 @@ time to evict blobs to best emulate the ephemeral nature of blobs dispersed to EigenDA operators. */ type MemStore struct { - sync.RWMutex + mu sync.RWMutex config Config log logging.Logger @@ -90,8 +98,8 @@ func (e *MemStore) pruningLoop(ctx context.Context) { // pruneExpired ... removes expired blobs from the store based on the expiration time. func (e *MemStore) pruneExpired() { - e.Lock() - defer e.Unlock() + e.mu.Lock() + defer e.mu.Unlock() for commit, dur := range e.keyStarts { if time.Since(dur) >= e.config.BlobExpiration { @@ -107,8 +115,8 @@ func (e *MemStore) pruneExpired() { func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) { time.Sleep(e.config.GetLatency) e.reads++ - e.RLock() - defer e.RUnlock() + e.mu.RLock() + defer e.mu.RUnlock() var cert verify.Certificate err := rlp.DecodeBytes(commit, &cert) @@ -134,6 +142,9 @@ func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) { // Put inserts a value into the store. func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) { time.Sleep(e.config.PutLatency) + if e.config.PutReturnsFailoverError { + return nil, api.NewErrorFailover(errors.New("memstore in failover simulation mode")) + } encodedVal, err := e.codec.EncodeBlob(value) if err != nil { return nil, err @@ -143,8 +154,8 @@ func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) { return nil, fmt.Errorf("%w: blob length %d, max blob size %d", common.ErrProxyOversizedBlob, len(value), e.config.MaxBlobSizeBytes) } - e.Lock() - defer e.Unlock() + e.mu.Lock() + defer e.mu.Unlock() commitment, err := e.verifier.Commit(encodedVal) if err != nil { diff --git a/store/generated_key/memstore/memstore_test.go b/store/generated_key/memstore/memstore_test.go index b344d9a4..1e4a4efe 100644 --- a/store/generated_key/memstore/memstore_test.go +++ b/store/generated_key/memstore/memstore_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/Layr-Labs/eigenda-proxy/verify" + "github.com/Layr-Labs/eigenda/api" "github.com/Layr-Labs/eigenda/encoding/kzg" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/stretchr/testify/require" @@ -132,3 +133,29 @@ func TestLatency(t *testing.T) { require.GreaterOrEqual(t, time.Since(timeBeforeGet), getLatency) } + +func TestPutRetursFailoverErrorConfig(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + verifier, err := verify.NewVerifier(getDefaultVerifierTestConfig(), nil) + require.NoError(t, err) + + config := getDefaultMemStoreTestConfig() + ms, err := New(ctx, verifier, testLogger, config) + require.NoError(t, err) + + validKey, err := ms.Put(ctx, []byte("some-value")) + require.NoError(t, err) + + ms.config.PutReturnsFailoverError = true + + // failover mode should only affect Put route + _, err = ms.Get(ctx, validKey) + require.NoError(t, err) + + _, err = ms.Put(ctx, []byte("some-value")) + require.ErrorIs(t, err, &api.ErrorFailover{}) +}