Skip to content

Commit

Permalink
fix(#191): Routing namespace --> Storage (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
epociask authored Oct 31, 2024
1 parent 390e491 commit f263439
Show file tree
Hide file tree
Showing 24 changed files with 474 additions and 354 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ In order to disperse to the EigenDA network in production, or at high throughput
| `--s3.path` | | `$EIGENDA_PROXY_S3_PATH` | Bucket path for S3 storage. |
| `--s3.endpoint` | | `$EIGENDA_PROXY_S3_ENDPOINT` | Endpoint for S3 storage. |
| `--s3.enable-tls` | | `$EIGENDA_PROXY_S3_ENABLE_TLS` | Enable TLS connection to S3 endpoint. |
| `--routing.fallback-targets` | `[]` | `$EIGENDA_PROXY_FALLBACK_TARGETS` | Fall back backend targets. Supports S3. | Backup storage locations to read from in the event of eigenda retrieval failure. |
| `--routing.cache-targets` | `[]` | `$EIGENDA_PROXY_CACHE_TARGETS` | Caching targets. Supports S3. | Caches data to backend targets after dispersing to DA, retrieved from before trying read from EigenDA. |
| `--routing.concurrent-write-threads` | `0` | `$EIGENDA_PROXY_CONCURRENT_WRITE_THREADS` | Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes. |
| `--storage.fallback-targets` | `[]` | `$EIGENDA_PROXY_STORAGE_FALLBACK_TARGETS` | Fall back backend targets. Supports S3. | Backup storage locations to read from in the event of eigenda retrieval failure. |
| `--storage.cache-targets` | `[]` | `$EIGENDA_PROXY_STORAGE_CACHE_TARGETS` | Caching targets. Supports S3. | Caches data to backend targets after dispersing to DA, retrieved from before trying read from EigenDA. |
| `--storage.concurrent-write-threads` | `0` | `$EIGENDA_PROXY_STORAGE_CONCURRENT_WRITE_THREADS` | Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes. |
| `--s3.timeout` | `5s` | `$EIGENDA_PROXY_S3_TIMEOUT` | timeout for S3 storage operations (e.g. get, put) |
| `--redis.db` | `0` | `$EIGENDA_PROXY_REDIS_DB` | redis database to use after connecting to server |
| `--redis.endpoint` | `""` | `$EIGENDA_PROXY_REDIS_ENDPOINT` | redis endpoint url |
Expand Down
2 changes: 1 addition & 1 deletion utils/utils.go → common/common.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package common

import (
"fmt"
Expand Down
6 changes: 3 additions & 3 deletions utils/utils_test.go → common/common_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package utils_test
package common_test

import (
"fmt"
"testing"

"github.com/Layr-Labs/eigenda-proxy/utils"
"github.com/Layr-Labs/eigenda-proxy/common"
)

func TestParseByteAmount(t *testing.T) {
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestParseByteAmount(t *testing.T) {
t.Run(fmt.Sprintf("Input: %s", tc.input), func(t *testing.T) {
t.Parallel()

got, err := utils.ParseBytesAmount(tc.input)
got, err := common.ParseBytesAmount(tc.input)
if (err != nil) != tc.wantErr {
t.Errorf("wantErr: %v, got error: %v", tc.wantErr, err)
}
Expand Down
83 changes: 83 additions & 0 deletions common/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package common

import (
"context"
"fmt"
"strings"
)

// BackendType ... Storage backend type
type BackendType uint8

const (
EigenDABackendType BackendType = iota
MemoryBackendType
S3BackendType
RedisBackendType

UnknownBackendType
)

var (
ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size")
ErrEigenDAOversizedBlob = fmt.Errorf("blob size cannot exceed")
)

func (b BackendType) String() string {
switch b {
case EigenDABackendType:
return "EigenDA"
case MemoryBackendType:
return "Memory"
case S3BackendType:
return "S3"
case RedisBackendType:
return "Redis"
case UnknownBackendType:
fallthrough
default:
return "Unknown"
}
}

func StringToBackendType(s string) BackendType {
lower := strings.ToLower(s)

switch lower {
case "eigenda":
return EigenDABackendType
case "memory":
return MemoryBackendType
case "s3":
return S3BackendType
case "redis":
return RedisBackendType
case "unknown":
fallthrough
default:
return UnknownBackendType
}
}

type Store interface {
// Backend returns the backend type provider of the store.
BackendType() BackendType
// Verify verifies the given key-value pair.
Verify(ctx context.Context, key []byte, value []byte) error
}

type GeneratedKeyStore interface {
Store
// Get retrieves the given key if it's present in the key-value data store.
Get(ctx context.Context, key []byte) ([]byte, error)
// Put inserts the given value into the key-value data store.
Put(ctx context.Context, value []byte) (key []byte, err error)
}

type PrecomputedKeyStore interface {
Store
// Get retrieves the given key if it's present in the key-value data store.
Get(ctx context.Context, key []byte) ([]byte, error)
// Put inserts the given value into the key-value data store.
Put(ctx context.Context, key []byte, value []byte) error
}
3 changes: 2 additions & 1 deletion e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/Layr-Labs/eigenda-proxy/client"
"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/Layr-Labs/eigenda-proxy/e2e"
"github.com/Layr-Labs/eigenda-proxy/store"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
Expand Down Expand Up @@ -54,7 +55,7 @@ func requireDispersalRetrievalEigenDA(t *testing.T, cm *metrics.CountMap, mode c
}

// requireWriteReadSecondary ... ensure that secondary backend was successfully written/read to/from
func requireWriteReadSecondary(t *testing.T, cm *metrics.CountMap, bt store.BackendType) {
func requireWriteReadSecondary(t *testing.T, cm *metrics.CountMap, bt common.BackendType) {
writeCount, err := cm.Get(http.MethodPut, store.Success, bt.String())
require.NoError(t, err)
require.True(t, writeCount > 0)
Expand Down
2 changes: 1 addition & 1 deletion e2e/safety_checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestKeccak256CommitmentRequestErrorsWhenS3NotSet(t *testing.T) {
testCfg.UseKeccak256ModeS3 = true

tsConfig := e2e.TestSuiteConfig(testCfg)
tsConfig.EigenDAConfig.S3Config.Endpoint = ""
tsConfig.EigenDAConfig.StorageConfig.S3Config.Endpoint = ""
ts, kill := e2e.CreateTestSuite(tsConfig)
defer kill()

Expand Down
8 changes: 4 additions & 4 deletions e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/Layr-Labs/eigenda-proxy/client"
"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/stretchr/testify/require"

"github.com/Layr-Labs/eigenda-proxy/e2e"
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestProxyCaching(t *testing.T) {
defer kill()

requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000))
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.S3BackendType)
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode)
}

Expand All @@ -119,7 +119,7 @@ func TestProxyCachingWithRedis(t *testing.T) {
defer kill()

requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000))
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.RedisBackendType)
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.RedisBackendType)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode)
}

Expand Down Expand Up @@ -163,6 +163,6 @@ func TestProxyReadFallback(t *testing.T) {
require.Equal(t, expectedBlob, actualBlob)

requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000))
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.S3BackendType)
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode)
}
20 changes: 12 additions & 8 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"strings"
"time"

"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/Layr-Labs/eigenda-proxy/metrics"
"github.com/Layr-Labs/eigenda-proxy/server"
"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore"
"github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis"
"github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3"
"github.com/Layr-Labs/eigenda-proxy/utils"
"github.com/Layr-Labs/eigenda-proxy/verify"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/encoding/kzg"
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestConfig(useMemory bool) *Cfg {
}

func createRedisConfig(eigendaCfg server.Config) server.CLIConfig {
eigendaCfg.RedisConfig = redis.Config{
eigendaCfg.StorageConfig.RedisConfig = redis.Config{
Endpoint: redisEndpoint,
Password: "",
DB: 0,
Expand All @@ -144,7 +145,7 @@ func createS3Config(eigendaCfg server.Config) server.CLIConfig {
bucketName := "eigenda-proxy-test-" + RandStr(10)
createS3Bucket(bucketName)

eigendaCfg.S3Config = s3.Config{
eigendaCfg.StorageConfig.S3Config = s3.Config{
Bucket: bucketName,
Path: "",
Endpoint: minioEndpoint,
Expand Down Expand Up @@ -178,7 +179,7 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig {
pollInterval = time.Minute * 1
}

maxBlobLengthBytes, err := utils.ParseBytesAmount("16mib")
maxBlobLengthBytes, err := common.ParseBytesAmount("16mib")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -213,7 +214,10 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig {
BlobExpiration: testCfg.Expiration,
MaxBlobSizeBytes: maxBlobLengthBytes,
},
AsyncPutWorkers: testCfg.WriteThreadCount,

StorageConfig: store.Config{
AsyncPutWorkers: testCfg.WriteThreadCount,
},
}

if testCfg.UseMemory {
Expand All @@ -226,15 +230,15 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig {
cfg = createS3Config(eigendaCfg)

case testCfg.UseS3Caching:
eigendaCfg.CacheTargets = []string{"S3"}
eigendaCfg.StorageConfig.CacheTargets = []string{"S3"}
cfg = createS3Config(eigendaCfg)

case testCfg.UseS3Fallback:
eigendaCfg.FallbackTargets = []string{"S3"}
eigendaCfg.StorageConfig.FallbackTargets = []string{"S3"}
cfg = createS3Config(eigendaCfg)

case testCfg.UseRedisCaching:
eigendaCfg.CacheTargets = []string{"redis"}
eigendaCfg.StorageConfig.CacheTargets = []string{"redis"}
cfg = createRedisConfig(eigendaCfg)

default:
Expand Down
29 changes: 5 additions & 24 deletions flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flags

import (
"github.com/Layr-Labs/eigenda-proxy/flags/eigendaflags"
"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore"
"github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis"
"github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3"
Expand All @@ -17,6 +18,8 @@ const (
EigenDAClientCategory = "EigenDA Client"
EigenDADeprecatedCategory = "DEPRECATED EIGENDA CLIENT FLAGS -- THESE WILL BE REMOVED IN V2.0.0"
MemstoreFlagsCategory = "Memstore (for testing purposes - replaces EigenDA backend)"
StorageFlagsCategory = "Storage"
StorageDeprecatedCategory = "DEPRECATED STORAGE FLAGS -- THESE WILL BE REMOVED IN V2.0.0"
RedisCategory = "Redis Cache/Fallback"
S3Category = "S3 Cache/Fallback"
VerifierCategory = "KZG and Cert Verifier"
Expand All @@ -26,12 +29,6 @@ const (
const (
ListenAddrFlagName = "addr"
PortFlagName = "port"

// routing flags
// TODO: change "routing" --> "secondary"
FallbackTargetsFlagName = "routing.fallback-targets"
CacheTargetsFlagName = "routing.cache-targets"
ConcurrentWriteThreads = "routing.concurrent-write-routines"
)

const EnvVarPrefix = "EIGENDA_PROXY"
Expand All @@ -55,24 +52,6 @@ func CLIFlags() []cli.Flag {
Value: 3100,
EnvVars: prefixEnvVars("PORT"),
},
&cli.StringSliceFlag{
Name: FallbackTargetsFlagName,
Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.",
Value: cli.NewStringSlice(),
EnvVars: prefixEnvVars("FALLBACK_TARGETS"),
},
&cli.StringSliceFlag{
Name: CacheTargetsFlagName,
Usage: "List of caching targets to use fast reads from EigenDA.",
Value: cli.NewStringSlice(),
EnvVars: prefixEnvVars("CACHE_TARGETS"),
},
&cli.IntFlag{
Name: ConcurrentWriteThreads,
Usage: "Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes.",
Value: 0,
EnvVars: prefixEnvVars("CONCURRENT_WRITE_THREADS"),
},
}

return flags
Expand All @@ -87,6 +66,8 @@ func init() {
Flags = append(Flags, opmetrics.CLIFlags(EnvVarPrefix)...)
Flags = append(Flags, eigendaflags.CLIFlags(EnvVarPrefix, EigenDAClientCategory)...)
Flags = append(Flags, eigendaflags.DeprecatedCLIFlags(EnvVarPrefix, EigenDADeprecatedCategory)...)
Flags = append(Flags, store.CLIFlags(EnvVarPrefix, StorageFlagsCategory)...)
Flags = append(Flags, store.DeprecatedCLIFlags(EnvVarPrefix, StorageDeprecatedCategory)...)
Flags = append(Flags, redis.CLIFlags(EnvVarPrefix, RedisCategory)...)
Flags = append(Flags, s3.CLIFlags(EnvVarPrefix, S3Category)...)
Flags = append(Flags, memstore.CLIFlags(EnvVarPrefix, MemstoreFlagsCategory)...)
Expand Down
Loading

0 comments on commit f263439

Please sign in to comment.