Skip to content

Commit

Permalink
fix(#191): Routing namespace --> Storage
Browse files Browse the repository at this point in the history
  • Loading branch information
epociask committed Oct 28, 2024
1 parent 705f740 commit 7fd2d74
Show file tree
Hide file tree
Showing 25 changed files with 487 additions and 366 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ In order to disperse to the EigenDA network in production, or at high throughput
| `--s3.path` | | `$EIGENDA_PROXY_S3_PATH` | Bucket path for S3 storage. |
| `--s3.endpoint` | | `$EIGENDA_PROXY_S3_ENDPOINT` | Endpoint for S3 storage. |
| `--s3.enable-tls` | | `$EIGENDA_PROXY_S3_ENABLE_TLS` | Enable TLS connection to S3 endpoint. |
| `--routing.fallback-targets` | `[]` | `$EIGENDA_PROXY_FALLBACK_TARGETS` | Fall back backend targets. Supports S3. | Backup storage locations to read from in the event of eigenda retrieval failure. |
| `--routing.cache-targets` | `[]` | `$EIGENDA_PROXY_CACHE_TARGETS` | Caching targets. Supports S3. | Caches data to backend targets after dispersing to DA, retrieved from before trying read from EigenDA. |
| `--routing.concurrent-write-threads` | `0` | `$EIGENDA_PROXY_CONCURRENT_WRITE_THREADS` | Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes. |
| `--store.fallback-targets` | `[]` | `$EIGENDA_PROXY_FALLBACK_TARGETS` | Fall back backend targets. Supports S3. | Backup storage locations to read from in the event of eigenda retrieval failure. |
| `--store.cache-targets` | `[]` | `$EIGENDA_PROXY_CACHE_TARGETS` | Caching targets. Supports S3. | Caches data to backend targets after dispersing to DA, retrieved from before trying read from EigenDA. |
| `--store.concurrent-write-threads` | `0` | `$EIGENDA_PROXY_CONCURRENT_WRITE_THREADS` | Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes. |
| `--s3.timeout` | `5s` | `$EIGENDA_PROXY_S3_TIMEOUT` | timeout for S3 storage operations (e.g. get, put) |
| `--redis.db` | `0` | `$EIGENDA_PROXY_REDIS_DB` | redis database to use after connecting to server |
| `--redis.endpoint` | `""` | `$EIGENDA_PROXY_REDIS_ENDPOINT` | redis endpoint url |
Expand Down
2 changes: 1 addition & 1 deletion utils/utils.go → common/common.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package common

import (
"fmt"
Expand Down
6 changes: 3 additions & 3 deletions utils/utils_test.go → common/common_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package utils_test
package common_test

import (
"fmt"
"testing"

"github.com/Layr-Labs/eigenda-proxy/utils"
"github.com/Layr-Labs/eigenda-proxy/common"
)

func TestParseByteAmount(t *testing.T) {
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestParseByteAmount(t *testing.T) {
t.Run(fmt.Sprintf("Input: %s", tc.input), func(t *testing.T) {
t.Parallel()

got, err := utils.ParseBytesAmount(tc.input)
got, err := common.ParseBytesAmount(tc.input)
if (err != nil) != tc.wantErr {
t.Errorf("wantErr: %v, got error: %v", tc.wantErr, err)
}
Expand Down
83 changes: 83 additions & 0 deletions common/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package common

import (
"context"
"fmt"
"strings"
)

// BackendType ... Storage backend type
type BackendType uint8

const (
EigenDABackendType BackendType = iota
MemoryBackendType
S3BackendType
RedisBackendType

UnknownBackendType
)

var (
ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size")
ErrEigenDAOversizedBlob = fmt.Errorf("blob size cannot exceed")
)

func (b BackendType) String() string {
switch b {
case EigenDABackendType:
return "EigenDA"
case MemoryBackendType:
return "Memory"
case S3BackendType:
return "S3"
case RedisBackendType:
return "Redis"
case UnknownBackendType:
fallthrough
default:
return "Unknown"
}
}

func StringToBackendType(s string) BackendType {
lower := strings.ToLower(s)

switch lower {
case "eigenda":
return EigenDABackendType
case "memory":
return MemoryBackendType
case "s3":
return S3BackendType
case "redis":
return RedisBackendType
case "unknown":
fallthrough
default:
return UnknownBackendType
}
}

type Store interface {
// Backend returns the backend type provider of the store.
BackendType() BackendType
// Verify verifies the given key-value pair.
Verify(key []byte, value []byte) error
}

type GeneratedKeyStore interface {
Store
// Get retrieves the given key if it's present in the key-value data store.
Get(ctx context.Context, key []byte) ([]byte, error)
// Put inserts the given value into the key-value data store.
Put(ctx context.Context, value []byte) (key []byte, err error)
}

type PrecomputedKeyStore interface {
Store
// Get retrieves the given key if it's present in the key-value data store.
Get(ctx context.Context, key []byte) ([]byte, error)
// Put inserts the given value into the key-value data store.
Put(ctx context.Context, key []byte, value []byte) error
}
3 changes: 2 additions & 1 deletion e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/Layr-Labs/eigenda-proxy/client"
"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/Layr-Labs/eigenda-proxy/e2e"
"github.com/Layr-Labs/eigenda-proxy/store"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
Expand Down Expand Up @@ -51,7 +52,7 @@ func requireDispersalRetrievalEigenDA(t *testing.T, cm *metrics.CountMap, mode c
}

// requireWriteReadSecondary ... ensure that secondary backend was successfully written/read to/from
func requireWriteReadSecondary(t *testing.T, cm *metrics.CountMap, bt store.BackendType) {
func requireWriteReadSecondary(t *testing.T, cm *metrics.CountMap, bt common.BackendType) {
writeCount, err := cm.Get(http.MethodPut, store.Success, bt.String())
require.NoError(t, err)
require.True(t, writeCount > 0)
Expand Down
2 changes: 1 addition & 1 deletion e2e/safety_checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestKeccak256CommitmentRequestErrorsWhenS3NotSet(t *testing.T) {
testCfg.UseKeccak256ModeS3 = true

tsConfig := e2e.TestSuiteConfig(testCfg)
tsConfig.EigenDAConfig.S3Config.Endpoint = ""
tsConfig.EigenDAConfig.StorageConfig.S3Config.Endpoint = ""
ts, kill := e2e.CreateTestSuite(tsConfig)
defer kill()

Expand Down
8 changes: 4 additions & 4 deletions e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/Layr-Labs/eigenda-proxy/client"
"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/stretchr/testify/require"

"github.com/Layr-Labs/eigenda-proxy/e2e"
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestProxyCaching(t *testing.T) {
defer kill()

requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000))
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.S3BackendType)
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode)
}

Expand All @@ -119,7 +119,7 @@ func TestProxyCachingWithRedis(t *testing.T) {
defer kill()

requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000))
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.RedisBackendType)
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.RedisBackendType)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode)
}

Expand Down Expand Up @@ -163,6 +163,6 @@ func TestProxyReadFallback(t *testing.T) {
require.Equal(t, expectedBlob, actualBlob)

requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000))
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.S3BackendType)
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode)
}
20 changes: 12 additions & 8 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"strings"
"time"

"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/Layr-Labs/eigenda-proxy/metrics"
"github.com/Layr-Labs/eigenda-proxy/server"
"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore"
"github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis"
"github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3"
"github.com/Layr-Labs/eigenda-proxy/utils"
"github.com/Layr-Labs/eigenda-proxy/verify"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/encoding/kzg"
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestConfig(useMemory bool) *Cfg {
}

func createRedisConfig(eigendaCfg server.Config) server.CLIConfig {
eigendaCfg.RedisConfig = redis.Config{
eigendaCfg.StorageConfig.RedisConfig = redis.Config{
Endpoint: redisEndpoint,
Password: "",
DB: 0,
Expand All @@ -144,7 +145,7 @@ func createS3Config(eigendaCfg server.Config) server.CLIConfig {
bucketName := "eigenda-proxy-test-" + RandStr(10)
createS3Bucket(bucketName)

eigendaCfg.S3Config = s3.Config{
eigendaCfg.StorageConfig.S3Config = s3.Config{
Bucket: bucketName,
Path: "",
Endpoint: minioEndpoint,
Expand Down Expand Up @@ -178,7 +179,7 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig {
pollInterval = time.Minute * 1
}

maxBlobLengthBytes, err := utils.ParseBytesAmount("16mib")
maxBlobLengthBytes, err := common.ParseBytesAmount("16mib")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -210,7 +211,10 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig {
BlobExpiration: testCfg.Expiration,
MaxBlobSizeBytes: maxBlobLengthBytes,
},
AsyncPutWorkers: testCfg.WriteThreadCount,

StorageConfig: store.Config{
AsyncPutWorkers: testCfg.WriteThreadCount,
},
}

if testCfg.UseMemory {
Expand All @@ -223,15 +227,15 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig {
cfg = createS3Config(eigendaCfg)

case testCfg.UseS3Caching:
eigendaCfg.CacheTargets = []string{"S3"}
eigendaCfg.StorageConfig.CacheTargets = []string{"S3"}
cfg = createS3Config(eigendaCfg)

case testCfg.UseS3Fallback:
eigendaCfg.FallbackTargets = []string{"S3"}
eigendaCfg.StorageConfig.FallbackTargets = []string{"S3"}
cfg = createS3Config(eigendaCfg)

case testCfg.UseRedisCaching:
eigendaCfg.CacheTargets = []string{"redis"}
eigendaCfg.StorageConfig.CacheTargets = []string{"redis"}
cfg = createRedisConfig(eigendaCfg)

default:
Expand Down
22 changes: 11 additions & 11 deletions flags/eigendaflags/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func withDeprecatedFlagPrefix(s string) string {
return "eigenda-" + s
}

func withDeprecatedEnvPrefix(envPrefix, s string) string {
return envPrefix + "_" + s
func withDeprecatedEnvPrefix(envPrefix, s string) []string {
return []string{envPrefix + "_" + s}
}

// CLIFlags ... used for EigenDA client configuration
Expand All @@ -38,7 +38,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag {
&cli.StringFlag{
Name: DeprecatedDisperserRPCFlagName,
Usage: "RPC endpoint of the EigenDA disperser.",
EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "DISPERSER_RPC")},
EnvVars: withDeprecatedEnvPrefix(envPrefix, "DISPERSER_RPC"),
Category: category,
Action: func(*cli.Context, string) error {
return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead",
Expand All @@ -50,7 +50,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag {
Name: DeprecatedStatusQueryTimeoutFlagName,
Usage: "Duration to wait for a blob to finalize after being sent for dispersal. Default is 30 minutes.",
Value: 30 * time.Minute,
EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "STATUS_QUERY_TIMEOUT")},
EnvVars: withDeprecatedEnvPrefix(envPrefix, "STATUS_QUERY_TIMEOUT"),
Category: category,
Action: func(*cli.Context, time.Duration) error {
return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead",
Expand All @@ -62,7 +62,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag {
Name: DeprecatedStatusQueryRetryIntervalFlagName,
Usage: "Interval between retries when awaiting network blob finalization. Default is 5 seconds.",
Value: 5 * time.Second,
EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "STATUS_QUERY_INTERVAL")},
EnvVars: withDeprecatedEnvPrefix(envPrefix, "STATUS_QUERY_INTERVAL"),
Category: category,
Action: func(*cli.Context, time.Duration) error {
return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead",
Expand All @@ -74,7 +74,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag {
Name: DeprecatedDisableTLSFlagName,
Usage: "Disable TLS for gRPC communication with the EigenDA disperser. Default is false.",
Value: false,
EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "GRPC_DISABLE_TLS")},
EnvVars: withDeprecatedEnvPrefix(envPrefix, "GRPC_DISABLE_TLS"),
Category: category,
Action: func(*cli.Context, bool) error {
return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead",
Expand All @@ -86,7 +86,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag {
Name: DeprecatedResponseTimeoutFlagName,
Usage: "Total time to wait for a response from the EigenDA disperser. Default is 60 seconds.",
Value: 60 * time.Second,
EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "RESPONSE_TIMEOUT")},
EnvVars: withDeprecatedEnvPrefix(envPrefix, "RESPONSE_TIMEOUT"),
Category: category,
Action: func(*cli.Context, time.Duration) error {
return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead",
Expand All @@ -98,7 +98,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag {
Name: DeprecatedCustomQuorumIDsFlagName,
Usage: "Custom quorum IDs for writing blobs. Should not include default quorums 0 or 1.",
Value: cli.NewUintSlice(),
EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "CUSTOM_QUORUM_IDS")},
EnvVars: withDeprecatedEnvPrefix(envPrefix, "CUSTOM_QUORUM_IDS"),
Category: category,
Action: func(*cli.Context, []uint) error {
return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead",
Expand All @@ -109,7 +109,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag {
&cli.StringFlag{
Name: DeprecatedSignerPrivateKeyHexFlagName,
Usage: "Hex-encoded signer private key. This key should not be associated with an Ethereum address holding any funds.",
EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "SIGNER_PRIVATE_KEY_HEX")},
EnvVars: withDeprecatedEnvPrefix(envPrefix, "SIGNER_PRIVATE_KEY_HEX"),
Category: category,
Action: func(*cli.Context, string) error {
return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead",
Expand All @@ -120,7 +120,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag {
&cli.UintFlag{
Name: DeprecatedPutBlobEncodingVersionFlagName,
Usage: "Blob encoding version to use when writing blobs from the high-level interface.",
EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "PUT_BLOB_ENCODING_VERSION")},
EnvVars: withDeprecatedEnvPrefix(envPrefix, "PUT_BLOB_ENCODING_VERSION"),
Value: 0,
Category: category,
Action: func(*cli.Context, uint) error {
Expand All @@ -132,7 +132,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag {
&cli.BoolFlag{
Name: DeprecatedDisablePointVerificationModeFlagName,
Usage: "Disable point verification mode. This mode performs IFFT on data before writing and FFT on data after reading. Disabling requires supplying the entire blob for verification against the KZG commitment.",
EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "DISABLE_POINT_VERIFICATION_MODE")},
EnvVars: withDeprecatedEnvPrefix(envPrefix, "DISABLE_POINT_VERIFICATION_MODE"),
Value: false,
Category: category,
Action: func(*cli.Context, bool) error {
Expand Down
Loading

0 comments on commit 7fd2d74

Please sign in to comment.