Skip to content

Commit

Permalink
chore: add memstore config to return failover errors on puts
Browse files Browse the repository at this point in the history
This is useful for testing failover in batcher.

Right now a new proxy would need to be spun up with this config turned on. We should consider adding an admin rpc rpi to turn these on/off dynamically.
  • Loading branch information
samlaf committed Feb 13, 2025
1 parent b183b0e commit 2c49666
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 18 deletions.
39 changes: 28 additions & 11 deletions store/generated_key/memstore/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
)

var (
EnabledFlagName = withFlagPrefix("enabled")
ExpirationFlagName = withFlagPrefix("expiration")
PutLatencyFlagName = withFlagPrefix("put-latency")
GetLatencyFlagName = withFlagPrefix("get-latency")
EnabledFlagName = withFlagPrefix("enabled")
ExpirationFlagName = withFlagPrefix("expiration")
PutLatencyFlagName = withFlagPrefix("put-latency")
GetLatencyFlagName = withFlagPrefix("get-latency")
PutReturnsFailoverErrorFlagName = withFlagPrefix("put-returns-failover-error")
)

func withFlagPrefix(s string) string {
Expand All @@ -39,13 +40,21 @@ func CLIFlags(envPrefix, category string) []cli.Flag {
Usage: "Whether to use memstore for DA logic.",
EnvVars: []string{withEnvPrefix(envPrefix, "ENABLED"), withDeprecatedEnvPrefix(envPrefix, "ENABLED")},
Category: category,
Action: func(_ *cli.Context, _ bool) error {
Action: func(ctx *cli.Context, enabled bool) error {
if _, ok := os.LookupEnv(withDeprecatedEnvPrefix(envPrefix, "ENABLED")); ok {
return fmt.Errorf("env var %s is deprecated for flag %s, use %s instead",
withDeprecatedEnvPrefix(envPrefix, "ENABLED"),
EnabledFlagName,
withEnvPrefix(envPrefix, "ENABLED"))
}
if enabled {
// If memstore is enabled, we disable cert verification,
// because memstore generates some meaningless certs.
err := ctx.Set(verify.CertVerificationDisabledFlagName, "true")
if err != nil {
return fmt.Errorf("failed to set %s: %w", verify.CertVerificationDisabledFlagName, err)
}
}
return nil
},
},
Expand All @@ -67,18 +76,25 @@ func CLIFlags(envPrefix, category string) []cli.Flag {
},
&cli.DurationFlag{
Name: PutLatencyFlagName,
Usage: "Artificial latency added for memstore backend to mimic EigenDA's dispersal latency.",
Usage: "(FOR TESTING) Artificial latency added for memstore backend to mimic EigenDA's dispersal latency.",
Value: 0,
EnvVars: []string{withEnvPrefix(envPrefix, "PUT_LATENCY")},
Category: category,
},
&cli.DurationFlag{
Name: GetLatencyFlagName,
Usage: "Artificial latency added for memstore backend to mimic EigenDA's retrieval latency.",
Usage: "(FOR TESTING) Artificial latency added for memstore backend to mimic EigenDA's retrieval latency.",
Value: 0,
EnvVars: []string{withEnvPrefix(envPrefix, "GET_LATENCY")},
Category: category,
},
&cli.BoolFlag{
Name: PutReturnsFailoverErrorFlagName,
Usage: fmt.Sprintf("(FOR TESTING) When true, Put requests will return a failover error, after sleeping for --%s duration.", PutLatencyFlagName),
Value: false,
EnvVars: []string{withEnvPrefix(envPrefix, "PUT_RETURNS_FAILOVER_ERROR")},
Category: category,
},
}
}

Expand All @@ -88,9 +104,10 @@ func ReadConfig(ctx *cli.Context) Config {
// right now we get it from the verifier cli, but there's probably a way to share flags more nicely?
// maybe use a duplicate but hidden flag in memstore category, and set it using the action by reading
// from the other flag?
MaxBlobSizeBytes: verify.MaxBlobLengthBytes,
BlobExpiration: ctx.Duration(ExpirationFlagName),
PutLatency: ctx.Duration(PutLatencyFlagName),
GetLatency: ctx.Duration(GetLatencyFlagName),
MaxBlobSizeBytes: verify.MaxBlobLengthBytes,
BlobExpiration: ctx.Duration(ExpirationFlagName),
PutLatency: ctx.Duration(PutLatencyFlagName),
GetLatency: ctx.Duration(GetLatencyFlagName),
PutReturnsFailoverError: ctx.Bool(PutReturnsFailoverErrorFlagName),
}
}
25 changes: 18 additions & 7 deletions store/generated_key/memstore/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package memstore
import (
"context"
"crypto/rand"
"errors"
"fmt"
"math/big"
"sync"
Expand All @@ -12,6 +13,7 @@ import (

"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/Layr-Labs/eigenda-proxy/verify"
"github.com/Layr-Labs/eigenda/api"
"github.com/Layr-Labs/eigenda/api/clients/codecs"
eigenda_common "github.com/Layr-Labs/eigenda/api/grpc/common"
"github.com/Layr-Labs/eigenda/api/grpc/disperser"
Expand All @@ -27,9 +29,15 @@ const (
type Config struct {
MaxBlobSizeBytes uint64
BlobExpiration time.Duration
// TODO: we should probably add an admin api to set the below values
// without having to restart the server
// artificial latency added for memstore backend to mimic eigenda's latency
PutLatency time.Duration
GetLatency time.Duration
// when true, put requests will return an errorFailover error,
// after sleeping PutLatency duration.
// This can be used to simulate eigenda being down.
PutReturnsFailoverError bool
}

/*
Expand All @@ -38,7 +46,7 @@ time to evict blobs to best emulate the ephemeral nature of blobs dispersed to
EigenDA operators.
*/
type MemStore struct {
sync.RWMutex
mu sync.RWMutex

config Config
log logging.Logger
Expand Down Expand Up @@ -90,8 +98,8 @@ func (e *MemStore) pruningLoop(ctx context.Context) {

// pruneExpired ... removes expired blobs from the store based on the expiration time.
func (e *MemStore) pruneExpired() {
e.Lock()
defer e.Unlock()
e.mu.Lock()
defer e.mu.Unlock()

for commit, dur := range e.keyStarts {
if time.Since(dur) >= e.config.BlobExpiration {
Expand All @@ -107,8 +115,8 @@ func (e *MemStore) pruneExpired() {
func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {
time.Sleep(e.config.GetLatency)
e.reads++
e.RLock()
defer e.RUnlock()
e.mu.RLock()
defer e.mu.RUnlock()

var cert verify.Certificate
err := rlp.DecodeBytes(commit, &cert)
Expand All @@ -134,6 +142,9 @@ func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {
// Put inserts a value into the store.
func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) {
time.Sleep(e.config.PutLatency)
if e.config.PutReturnsFailoverError {
return nil, api.NewErrorFailover(errors.New("memstore in failover simulation mode"))
}
encodedVal, err := e.codec.EncodeBlob(value)
if err != nil {
return nil, err
Expand All @@ -143,8 +154,8 @@ func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) {
return nil, fmt.Errorf("%w: blob length %d, max blob size %d", common.ErrProxyOversizedBlob, len(value), e.config.MaxBlobSizeBytes)
}

e.Lock()
defer e.Unlock()
e.mu.Lock()
defer e.mu.Unlock()

commitment, err := e.verifier.Commit(encodedVal)
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions store/generated_key/memstore/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/Layr-Labs/eigenda-proxy/verify"
"github.com/Layr-Labs/eigenda/api"
"github.com/Layr-Labs/eigenda/encoding/kzg"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -132,3 +133,29 @@ func TestLatency(t *testing.T) {
require.GreaterOrEqual(t, time.Since(timeBeforeGet), getLatency)

}

func TestPutRetursFailoverErrorConfig(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

verifier, err := verify.NewVerifier(getDefaultVerifierTestConfig(), nil)
require.NoError(t, err)

config := getDefaultMemStoreTestConfig()
ms, err := New(ctx, verifier, testLogger, config)
require.NoError(t, err)

validKey, err := ms.Put(ctx, []byte("some-value"))
require.NoError(t, err)

ms.config.PutReturnsFailoverError = true

// failover mode should only affect Put route
_, err = ms.Get(ctx, validKey)
require.NoError(t, err)

_, err = ms.Put(ctx, []byte("some-value"))
require.ErrorIs(t, err, &api.ErrorFailover{})
}

0 comments on commit 2c49666

Please sign in to comment.