Skip to content

Commit

Permalink
refactor: use config for memstore
Browse files Browse the repository at this point in the history
  • Loading branch information
samlaf committed Sep 6, 2024
1 parent 516ee3d commit 77a31ee
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 41 deletions.
7 changes: 6 additions & 1 deletion server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store.
var eigenda store.KeyGeneratedStore
if cfg.EigenDAConfig.MemstoreEnabled {
log.Info("Using mem-store backend for EigenDA")
eigenda, err = store.NewMemStore(ctx, verifier, log, maxBlobLength, cfg.EigenDAConfig.MemstoreBlobExpiration, cfg.EigenDAConfig.MemstorePutLatency, cfg.EigenDAConfig.MemstoreGetLatency)
eigenda, err = store.NewMemStore(ctx, verifier, log, store.MemStoreConfig{
MaxBlobSizeBytes: maxBlobLength,
BlobExpiration: cfg.EigenDAConfig.MemstoreBlobExpiration,
PutLatency: cfg.EigenDAConfig.MemstorePutLatency,
GetLatency: cfg.EigenDAConfig.MemstoreGetLatency,
})
} else {
var client *clients.EigenDAClient
log.Info("Using EigenDA backend")
Expand Down
67 changes: 36 additions & 31 deletions store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ const (
DefaultPruneInterval = 500 * time.Millisecond
)

type MemStoreConfig struct {
MaxBlobSizeBytes uint64
BlobExpiration time.Duration
// artificial latency added for memstore backend to mimic eigenda's latency
PutLatency time.Duration
GetLatency time.Duration
}

var memStoreConfigDefaults = MemStoreConfig{

Check failure on line 33 in store/memory.go

View workflow job for this annotation

GitHub Actions / Linter

var `memStoreConfigDefaults` is unused (unused)
MaxBlobSizeBytes: 1024 * 1024,
}

/*
MemStore is a simple in-memory store for blobs which uses an expiration
time to evict blobs to best emulate the ephemeral nature of blobs dispersed to
Expand All @@ -30,40 +42,33 @@ EigenDA operators.
type MemStore struct {
sync.RWMutex

l log.Logger
keyStarts map[string]time.Time
store map[string][]byte
verifier *verify.Verifier
codec codecs.BlobCodec
putLatency time.Duration
getLatency time.Duration

maxBlobSizeBytes uint64
blobExpiration time.Duration
reads int
config MemStoreConfig
l log.Logger
keyStarts map[string]time.Time
store map[string][]byte
verifier *verify.Verifier
codec codecs.BlobCodec

reads int
}

var _ KeyGeneratedStore = (*MemStore)(nil)

// NewMemStore ... constructor
func NewMemStore(ctx context.Context, verifier *verify.Verifier, l log.Logger,
maxBlobSizeBytes uint64, blobExpiration time.Duration,
putLatency, getLatency time.Duration) (*MemStore, error) {
func NewMemStore(
ctx context.Context, verifier *verify.Verifier, l log.Logger, config MemStoreConfig,
) (*MemStore, error) {
store := &MemStore{
l: l,
keyStarts: make(map[string]time.Time),
store: make(map[string][]byte),
verifier: verifier,
codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()),
maxBlobSizeBytes: maxBlobSizeBytes,
blobExpiration: blobExpiration,
// artificial latency added for memstore backend to mimic eigenda's latency
putLatency: putLatency,
getLatency: getLatency,
l: l,
config: config,
keyStarts: make(map[string]time.Time),
store: make(map[string][]byte),
verifier: verifier,
codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()),
}

if store.blobExpiration != 0 {
l.Info("memstore expiration enabled", "time", store.blobExpiration)
if store.config.BlobExpiration != 0 {
l.Info("memstore expiration enabled", "time", store.config.BlobExpiration)
go store.EventLoop(ctx)
}

Expand Down Expand Up @@ -92,7 +97,7 @@ func (e *MemStore) pruneExpired() {
defer e.Unlock()

for commit, dur := range e.keyStarts {
if time.Since(dur) >= e.blobExpiration {
if time.Since(dur) >= e.config.BlobExpiration {
delete(e.keyStarts, commit)
delete(e.store, commit)

Expand All @@ -103,7 +108,7 @@ func (e *MemStore) pruneExpired() {

// Get fetches a value from the store.
func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {
time.Sleep(e.getLatency)
time.Sleep(e.config.GetLatency)
e.reads++
e.RLock()
defer e.RUnlock()
Expand Down Expand Up @@ -131,9 +136,9 @@ func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {

// Put inserts a value into the store.
func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) {
time.Sleep(e.putLatency)
if uint64(len(value)) > e.maxBlobSizeBytes {
return nil, fmt.Errorf("blob is larger than max blob size: blob length %d, max blob size %d", len(value), e.maxBlobSizeBytes)
time.Sleep(e.config.PutLatency)
if uint64(len(value)) > e.config.MaxBlobSizeBytes {
return nil, fmt.Errorf("blob is larger than max blob size: blob length %d, max blob size %d", len(value), e.config.MaxBlobSizeBytes)
}

e.Lock()
Expand Down
21 changes: 12 additions & 9 deletions store/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ const (
testPreimage = "Four score and seven years ago"
)

func getDefaultTestConfig() MemStoreConfig {
return MemStoreConfig{
MaxBlobSizeBytes: 1024 * 1024,
BlobExpiration: time.Hour * 1000,
PutLatency: 0,
GetLatency: 0,
}
}

func TestGetSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -41,9 +50,7 @@ func TestGetSet(t *testing.T) {
ctx,
verifier,
log.New(),
1024*1024*2,
time.Hour*1000,
0, 0,
getDefaultTestConfig(),
)

require.NoError(t, err)
Expand Down Expand Up @@ -84,9 +91,7 @@ func TestExpiration(t *testing.T) {
ctx,
verifier,
log.New(),
1024*1024*2,
time.Millisecond*10,
0, 0,
getDefaultTestConfig(),
)

require.NoError(t, err)
Expand Down Expand Up @@ -133,9 +138,7 @@ func TestLatency(t *testing.T) {
ctx,
verifier,
log.New(),
1024*1024*2,
time.Millisecond*10,
putLatency, getLatency,
getDefaultTestConfig(),
)

require.NoError(t, err)
Expand Down

0 comments on commit 77a31ee

Please sign in to comment.