diff --git a/README.md b/README.md index 9cecb0c3..af655bee 100644 --- a/README.md +++ b/README.md @@ -63,9 +63,9 @@ In order to disperse to the EigenDA network in production, or at high throughput | `--s3.path` | | `$EIGENDA_PROXY_S3_PATH` | Bucket path for S3 storage. | | `--s3.endpoint` | | `$EIGENDA_PROXY_S3_ENDPOINT` | Endpoint for S3 storage. | | `--s3.enable-tls` | | `$EIGENDA_PROXY_S3_ENABLE_TLS` | Enable TLS connection to S3 endpoint. | -| `--routing.fallback-targets` | `[]` | `$EIGENDA_PROXY_FALLBACK_TARGETS` | Fall back backend targets. Supports S3. | Backup storage locations to read from in the event of eigenda retrieval failure. | -| `--routing.cache-targets` | `[]` | `$EIGENDA_PROXY_CACHE_TARGETS` | Caching targets. Supports S3. | Caches data to backend targets after dispersing to DA, retrieved from before trying read from EigenDA. | -| `--routing.concurrent-write-threads` | `0` | `$EIGENDA_PROXY_CONCURRENT_WRITE_THREADS` | Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes. | +| `--store.fallback-targets` | `[]` | `$EIGENDA_PROXY_FALLBACK_TARGETS` | Fall back backend targets. Supports S3. | Backup storage locations to read from in the event of eigenda retrieval failure. | +| `--store.cache-targets` | `[]` | `$EIGENDA_PROXY_CACHE_TARGETS` | Caching targets. Supports S3. | Caches data to backend targets after dispersing to DA, retrieved from before trying read from EigenDA. | +| `--store.concurrent-write-threads` | `0` | `$EIGENDA_PROXY_CONCURRENT_WRITE_THREADS` | Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes. | | `--s3.timeout` | `5s` | `$EIGENDA_PROXY_S3_TIMEOUT` | timeout for S3 storage operations (e.g. get, put) | | `--redis.db` | `0` | `$EIGENDA_PROXY_REDIS_DB` | redis database to use after connecting to server | | `--redis.endpoint` | `""` | `$EIGENDA_PROXY_REDIS_ENDPOINT` | redis endpoint url | diff --git a/utils/utils.go b/common/common.go similarity index 99% rename from utils/utils.go rename to common/common.go index aa83aa95..f1e69f13 100644 --- a/utils/utils.go +++ b/common/common.go @@ -1,4 +1,4 @@ -package utils +package common import ( "fmt" diff --git a/utils/utils_test.go b/common/common_test.go similarity index 92% rename from utils/utils_test.go rename to common/common_test.go index 02177587..93a1ab16 100644 --- a/utils/utils_test.go +++ b/common/common_test.go @@ -1,10 +1,10 @@ -package utils_test +package common_test import ( "fmt" "testing" - "github.com/Layr-Labs/eigenda-proxy/utils" + "github.com/Layr-Labs/eigenda-proxy/common" ) func TestParseByteAmount(t *testing.T) { @@ -46,7 +46,7 @@ func TestParseByteAmount(t *testing.T) { t.Run(fmt.Sprintf("Input: %s", tc.input), func(t *testing.T) { t.Parallel() - got, err := utils.ParseBytesAmount(tc.input) + got, err := common.ParseBytesAmount(tc.input) if (err != nil) != tc.wantErr { t.Errorf("wantErr: %v, got error: %v", tc.wantErr, err) } diff --git a/common/store.go b/common/store.go new file mode 100644 index 00000000..79220715 --- /dev/null +++ b/common/store.go @@ -0,0 +1,83 @@ +package common + +import ( + "context" + "fmt" + "strings" +) + +// BackendType ... Storage backend type +type BackendType uint8 + +const ( + EigenDABackendType BackendType = iota + MemoryBackendType + S3BackendType + RedisBackendType + + UnknownBackendType +) + +var ( + ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size") + ErrEigenDAOversizedBlob = fmt.Errorf("blob size cannot exceed") +) + +func (b BackendType) String() string { + switch b { + case EigenDABackendType: + return "EigenDA" + case MemoryBackendType: + return "Memory" + case S3BackendType: + return "S3" + case RedisBackendType: + return "Redis" + case UnknownBackendType: + fallthrough + default: + return "Unknown" + } +} + +func StringToBackendType(s string) BackendType { + lower := strings.ToLower(s) + + switch lower { + case "eigenda": + return EigenDABackendType + case "memory": + return MemoryBackendType + case "s3": + return S3BackendType + case "redis": + return RedisBackendType + case "unknown": + fallthrough + default: + return UnknownBackendType + } +} + +type Store interface { + // Backend returns the backend type provider of the store. + BackendType() BackendType + // Verify verifies the given key-value pair. + Verify(key []byte, value []byte) error +} + +type GeneratedKeyStore interface { + Store + // Get retrieves the given key if it's present in the key-value data store. + Get(ctx context.Context, key []byte) ([]byte, error) + // Put inserts the given value into the key-value data store. + Put(ctx context.Context, value []byte) (key []byte, err error) +} + +type PrecomputedKeyStore interface { + Store + // Get retrieves the given key if it's present in the key-value data store. + Get(ctx context.Context, key []byte) ([]byte, error) + // Put inserts the given value into the key-value data store. + Put(ctx context.Context, key []byte, value []byte) error +} diff --git a/e2e/main_test.go b/e2e/main_test.go index 39a2c062..2b9bac05 100644 --- a/e2e/main_test.go +++ b/e2e/main_test.go @@ -7,6 +7,7 @@ import ( "github.com/Layr-Labs/eigenda-proxy/client" "github.com/Layr-Labs/eigenda-proxy/commitments" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/Layr-Labs/eigenda-proxy/e2e" "github.com/Layr-Labs/eigenda-proxy/store" altda "github.com/ethereum-optimism/optimism/op-alt-da" @@ -51,7 +52,7 @@ func requireDispersalRetrievalEigenDA(t *testing.T, cm *metrics.CountMap, mode c } // requireWriteReadSecondary ... ensure that secondary backend was successfully written/read to/from -func requireWriteReadSecondary(t *testing.T, cm *metrics.CountMap, bt store.BackendType) { +func requireWriteReadSecondary(t *testing.T, cm *metrics.CountMap, bt common.BackendType) { writeCount, err := cm.Get(http.MethodPut, store.Success, bt.String()) require.NoError(t, err) require.True(t, writeCount > 0) diff --git a/e2e/safety_checks_test.go b/e2e/safety_checks_test.go index 3cc0bc70..9f248278 100644 --- a/e2e/safety_checks_test.go +++ b/e2e/safety_checks_test.go @@ -145,7 +145,7 @@ func TestKeccak256CommitmentRequestErrorsWhenS3NotSet(t *testing.T) { testCfg.UseKeccak256ModeS3 = true tsConfig := e2e.TestSuiteConfig(testCfg) - tsConfig.EigenDAConfig.S3Config.Endpoint = "" + tsConfig.EigenDAConfig.StorageConfig.S3Config.Endpoint = "" ts, kill := e2e.CreateTestSuite(tsConfig) defer kill() diff --git a/e2e/server_test.go b/e2e/server_test.go index abc0e7b6..19accff1 100644 --- a/e2e/server_test.go +++ b/e2e/server_test.go @@ -6,7 +6,7 @@ import ( "github.com/Layr-Labs/eigenda-proxy/client" "github.com/Layr-Labs/eigenda-proxy/commitments" - "github.com/Layr-Labs/eigenda-proxy/store" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/stretchr/testify/require" "github.com/Layr-Labs/eigenda-proxy/e2e" @@ -100,7 +100,7 @@ func TestProxyCaching(t *testing.T) { defer kill() requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000)) - requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.S3BackendType) + requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType) requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode) } @@ -119,7 +119,7 @@ func TestProxyCachingWithRedis(t *testing.T) { defer kill() requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000)) - requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.RedisBackendType) + requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.RedisBackendType) requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode) } @@ -163,6 +163,6 @@ func TestProxyReadFallback(t *testing.T) { require.Equal(t, expectedBlob, actualBlob) requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000)) - requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.S3BackendType) + requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType) requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode) } diff --git a/e2e/setup.go b/e2e/setup.go index fe4aefa1..cf30a834 100644 --- a/e2e/setup.go +++ b/e2e/setup.go @@ -8,12 +8,13 @@ import ( "strings" "time" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/Layr-Labs/eigenda-proxy/metrics" "github.com/Layr-Labs/eigenda-proxy/server" + "github.com/Layr-Labs/eigenda-proxy/store" "github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3" - "github.com/Layr-Labs/eigenda-proxy/utils" "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/encoding/kzg" @@ -127,7 +128,7 @@ func TestConfig(useMemory bool) *Cfg { } func createRedisConfig(eigendaCfg server.Config) server.CLIConfig { - eigendaCfg.RedisConfig = redis.Config{ + eigendaCfg.StorageConfig.RedisConfig = redis.Config{ Endpoint: redisEndpoint, Password: "", DB: 0, @@ -144,7 +145,7 @@ func createS3Config(eigendaCfg server.Config) server.CLIConfig { bucketName := "eigenda-proxy-test-" + RandStr(10) createS3Bucket(bucketName) - eigendaCfg.S3Config = s3.Config{ + eigendaCfg.StorageConfig.S3Config = s3.Config{ Bucket: bucketName, Path: "", Endpoint: minioEndpoint, @@ -178,7 +179,7 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig { pollInterval = time.Minute * 1 } - maxBlobLengthBytes, err := utils.ParseBytesAmount("16mib") + maxBlobLengthBytes, err := common.ParseBytesAmount("16mib") if err != nil { panic(err) } @@ -210,7 +211,10 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig { BlobExpiration: testCfg.Expiration, MaxBlobSizeBytes: maxBlobLengthBytes, }, - AsyncPutWorkers: testCfg.WriteThreadCount, + + StorageConfig: store.Config{ + AsyncPutWorkers: testCfg.WriteThreadCount, + }, } if testCfg.UseMemory { @@ -223,15 +227,15 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig { cfg = createS3Config(eigendaCfg) case testCfg.UseS3Caching: - eigendaCfg.CacheTargets = []string{"S3"} + eigendaCfg.StorageConfig.CacheTargets = []string{"S3"} cfg = createS3Config(eigendaCfg) case testCfg.UseS3Fallback: - eigendaCfg.FallbackTargets = []string{"S3"} + eigendaCfg.StorageConfig.FallbackTargets = []string{"S3"} cfg = createS3Config(eigendaCfg) case testCfg.UseRedisCaching: - eigendaCfg.CacheTargets = []string{"redis"} + eigendaCfg.StorageConfig.CacheTargets = []string{"redis"} cfg = createRedisConfig(eigendaCfg) default: diff --git a/flags/eigendaflags/deprecated.go b/flags/eigendaflags/deprecated.go index 7c406316..a09b124b 100644 --- a/flags/eigendaflags/deprecated.go +++ b/flags/eigendaflags/deprecated.go @@ -28,8 +28,8 @@ func withDeprecatedFlagPrefix(s string) string { return "eigenda-" + s } -func withDeprecatedEnvPrefix(envPrefix, s string) string { - return envPrefix + "_" + s +func withDeprecatedEnvPrefix(envPrefix, s string) []string { + return []string{envPrefix + "_" + s} } // CLIFlags ... used for EigenDA client configuration @@ -38,7 +38,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag { &cli.StringFlag{ Name: DeprecatedDisperserRPCFlagName, Usage: "RPC endpoint of the EigenDA disperser.", - EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "DISPERSER_RPC")}, + EnvVars: withDeprecatedEnvPrefix(envPrefix, "DISPERSER_RPC"), Category: category, Action: func(*cli.Context, string) error { return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead", @@ -50,7 +50,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag { Name: DeprecatedStatusQueryTimeoutFlagName, Usage: "Duration to wait for a blob to finalize after being sent for dispersal. Default is 30 minutes.", Value: 30 * time.Minute, - EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "STATUS_QUERY_TIMEOUT")}, + EnvVars: withDeprecatedEnvPrefix(envPrefix, "STATUS_QUERY_TIMEOUT"), Category: category, Action: func(*cli.Context, time.Duration) error { return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead", @@ -62,7 +62,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag { Name: DeprecatedStatusQueryRetryIntervalFlagName, Usage: "Interval between retries when awaiting network blob finalization. Default is 5 seconds.", Value: 5 * time.Second, - EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "STATUS_QUERY_INTERVAL")}, + EnvVars: withDeprecatedEnvPrefix(envPrefix, "STATUS_QUERY_INTERVAL"), Category: category, Action: func(*cli.Context, time.Duration) error { return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead", @@ -74,7 +74,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag { Name: DeprecatedDisableTLSFlagName, Usage: "Disable TLS for gRPC communication with the EigenDA disperser. Default is false.", Value: false, - EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "GRPC_DISABLE_TLS")}, + EnvVars: withDeprecatedEnvPrefix(envPrefix, "GRPC_DISABLE_TLS"), Category: category, Action: func(*cli.Context, bool) error { return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead", @@ -86,7 +86,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag { Name: DeprecatedResponseTimeoutFlagName, Usage: "Total time to wait for a response from the EigenDA disperser. Default is 60 seconds.", Value: 60 * time.Second, - EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "RESPONSE_TIMEOUT")}, + EnvVars: withDeprecatedEnvPrefix(envPrefix, "RESPONSE_TIMEOUT"), Category: category, Action: func(*cli.Context, time.Duration) error { return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead", @@ -98,7 +98,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag { Name: DeprecatedCustomQuorumIDsFlagName, Usage: "Custom quorum IDs for writing blobs. Should not include default quorums 0 or 1.", Value: cli.NewUintSlice(), - EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "CUSTOM_QUORUM_IDS")}, + EnvVars: withDeprecatedEnvPrefix(envPrefix, "CUSTOM_QUORUM_IDS"), Category: category, Action: func(*cli.Context, []uint) error { return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead", @@ -109,7 +109,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag { &cli.StringFlag{ Name: DeprecatedSignerPrivateKeyHexFlagName, Usage: "Hex-encoded signer private key. This key should not be associated with an Ethereum address holding any funds.", - EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "SIGNER_PRIVATE_KEY_HEX")}, + EnvVars: withDeprecatedEnvPrefix(envPrefix, "SIGNER_PRIVATE_KEY_HEX"), Category: category, Action: func(*cli.Context, string) error { return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead", @@ -120,7 +120,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag { &cli.UintFlag{ Name: DeprecatedPutBlobEncodingVersionFlagName, Usage: "Blob encoding version to use when writing blobs from the high-level interface.", - EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "PUT_BLOB_ENCODING_VERSION")}, + EnvVars: withDeprecatedEnvPrefix(envPrefix, "PUT_BLOB_ENCODING_VERSION"), Value: 0, Category: category, Action: func(*cli.Context, uint) error { @@ -132,7 +132,7 @@ func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag { &cli.BoolFlag{ Name: DeprecatedDisablePointVerificationModeFlagName, Usage: "Disable point verification mode. This mode performs IFFT on data before writing and FFT on data after reading. Disabling requires supplying the entire blob for verification against the KZG commitment.", - EnvVars: []string{withDeprecatedEnvPrefix(envPrefix, "DISABLE_POINT_VERIFICATION_MODE")}, + EnvVars: withDeprecatedEnvPrefix(envPrefix, "DISABLE_POINT_VERIFICATION_MODE"), Value: false, Category: category, Action: func(*cli.Context, bool) error { diff --git a/flags/flags.go b/flags/flags.go index 9ee798bb..103afeab 100644 --- a/flags/flags.go +++ b/flags/flags.go @@ -2,6 +2,7 @@ package flags import ( "github.com/Layr-Labs/eigenda-proxy/flags/eigendaflags" + "github.com/Layr-Labs/eigenda-proxy/store" "github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3" @@ -17,6 +18,8 @@ const ( EigenDAClientCategory = "EigenDA Client" EigenDADeprecatedCategory = "DEPRECATED EIGENDA CLIENT FLAGS -- THESE WILL BE REMOVED IN V2.0.0" MemstoreFlagsCategory = "Memstore (for testing purposes - replaces EigenDA backend)" + StorageFlagsCategory = "Storage" + StorageDeprecatedCategory = "DEPRECATED STORAGE FLAGS -- THESE WILL BE REMOVED IN V2.0.0" RedisCategory = "Redis Cache/Fallback" S3Category = "S3 Cache/Fallback" VerifierCategory = "KZG and Cert Verifier" @@ -26,12 +29,6 @@ const ( const ( ListenAddrFlagName = "addr" PortFlagName = "port" - - // routing flags - // TODO: change "routing" --> "secondary" - FallbackTargetsFlagName = "routing.fallback-targets" - CacheTargetsFlagName = "routing.cache-targets" - ConcurrentWriteThreads = "routing.concurrent-write-routines" ) const EnvVarPrefix = "EIGENDA_PROXY" @@ -55,24 +52,6 @@ func CLIFlags() []cli.Flag { Value: 3100, EnvVars: prefixEnvVars("PORT"), }, - &cli.StringSliceFlag{ - Name: FallbackTargetsFlagName, - Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.", - Value: cli.NewStringSlice(), - EnvVars: prefixEnvVars("FALLBACK_TARGETS"), - }, - &cli.StringSliceFlag{ - Name: CacheTargetsFlagName, - Usage: "List of caching targets to use fast reads from EigenDA.", - Value: cli.NewStringSlice(), - EnvVars: prefixEnvVars("CACHE_TARGETS"), - }, - &cli.IntFlag{ - Name: ConcurrentWriteThreads, - Usage: "Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes.", - Value: 0, - EnvVars: prefixEnvVars("CONCURRENT_WRITE_THREADS"), - }, } return flags @@ -87,6 +66,8 @@ func init() { Flags = append(Flags, opmetrics.CLIFlags(EnvVarPrefix)...) Flags = append(Flags, eigendaflags.CLIFlags(EnvVarPrefix, EigenDAClientCategory)...) Flags = append(Flags, eigendaflags.DeprecatedCLIFlags(EnvVarPrefix, EigenDADeprecatedCategory)...) + Flags = append(Flags, store.CLIFlags(EnvVarPrefix, StorageFlagsCategory)...) + Flags = append(Flags, store.DeprecatedCLIFlags(EnvVarPrefix, StorageDeprecatedCategory)...) Flags = append(Flags, redis.CLIFlags(EnvVarPrefix, RedisCategory)...) Flags = append(Flags, s3.CLIFlags(EnvVarPrefix, S3Category)...) Flags = append(Flags, memstore.CLIFlags(EnvVarPrefix, MemstoreFlagsCategory)...) diff --git a/server/config.go b/server/config.go index bfa2030e..684d9f5b 100644 --- a/server/config.go +++ b/server/config.go @@ -5,13 +5,9 @@ import ( "github.com/urfave/cli/v2" - "github.com/Layr-Labs/eigenda-proxy/flags" "github.com/Layr-Labs/eigenda-proxy/flags/eigendaflags" "github.com/Layr-Labs/eigenda-proxy/store" "github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore" - "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis" - "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3" - "github.com/Layr-Labs/eigenda-proxy/utils" "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/Layr-Labs/eigenda/api/clients" @@ -20,52 +16,23 @@ import ( type Config struct { EdaClientConfig clients.EigenDAClientConfig + MemstoreConfig memstore.Config + StorageConfig store.Config VerifierConfig verify.Config MemstoreEnabled bool - MemstoreConfig memstore.Config - - // routing - AsyncPutWorkers int - FallbackTargets []string - CacheTargets []string - - // secondary storage - RedisConfig redis.Config - S3Config s3.Config } // ReadConfig ... parses the Config from the provided flags or environment variables. func ReadConfig(ctx *cli.Context) Config { return Config{ - RedisConfig: redis.ReadConfig(ctx), - S3Config: s3.ReadConfig(ctx), EdaClientConfig: eigendaflags.ReadConfig(ctx), - VerifierConfig: verify.ReadConfig(ctx), - MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName), MemstoreConfig: memstore.ReadConfig(ctx), - FallbackTargets: ctx.StringSlice(flags.FallbackTargetsFlagName), - CacheTargets: ctx.StringSlice(flags.CacheTargetsFlagName), - } -} - -// checkTargets ... verifies that a backend target slice is constructed correctly -func (cfg *Config) checkTargets(targets []string) error { - if len(targets) == 0 { - return nil - } - - if utils.ContainsDuplicates(targets) { - return fmt.Errorf("duplicate targets provided: %+v", targets) - } + StorageConfig: store.ReadConfig(ctx), + VerifierConfig: verify.ReadConfig(ctx), - for _, t := range targets { - if store.StringToBackendType(t) == store.Unknown { - return fmt.Errorf("unknown fallback target provided: %s", t) - } + MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName), } - - return nil } // Check ... verifies that configuration values are adequately set @@ -90,42 +57,7 @@ func (cfg *Config) Check() error { } } - if cfg.S3Config.CredentialType == s3.CredentialTypeUnknown && cfg.S3Config.Endpoint != "" { - return fmt.Errorf("s3 credential type must be set") - } - if cfg.S3Config.CredentialType == s3.CredentialTypeStatic { - if cfg.S3Config.Endpoint != "" && (cfg.S3Config.AccessKeyID == "" || cfg.S3Config.AccessKeySecret == "") { - return fmt.Errorf("s3 endpoint is set, but access key id or access key secret is not set") - } - } - - if cfg.RedisConfig.Endpoint == "" && cfg.RedisConfig.Password != "" { - return fmt.Errorf("redis password is set, but endpoint is not") - } - - err := cfg.checkTargets(cfg.FallbackTargets) - if err != nil { - return err - } - - err = cfg.checkTargets(cfg.CacheTargets) - if err != nil { - return err - } - - // verify that same target is not in both fallback and cache targets - for _, t := range cfg.FallbackTargets { - if utils.Contains(cfg.CacheTargets, t) { - return fmt.Errorf("target %s is in both fallback and cache targets", t) - } - } - - // verify that thread counts are sufficiently set - if cfg.AsyncPutWorkers >= 100 { - return fmt.Errorf("number of secondary write workers can't be greater than 100") - } - - return nil + return cfg.StorageConfig.Check() } type CLIConfig struct { diff --git a/server/config_test.go b/server/config_test.go index 01f87099..64bde141 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -4,10 +4,8 @@ import ( "testing" "time" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore" - "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis" - "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3" - "github.com/Layr-Labs/eigenda-proxy/utils" "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/encoding/kzg" @@ -15,25 +13,11 @@ import ( ) func validCfg() *Config { - maxBlobLengthBytes, err := utils.ParseBytesAmount("2MiB") + maxBlobLengthBytes, err := common.ParseBytesAmount("2MiB") if err != nil { panic(err) } return &Config{ - RedisConfig: redis.Config{ - Endpoint: "localhost:6379", - Password: "password", - DB: 0, - Eviction: 10 * time.Minute, - }, - S3Config: s3.Config{ - Bucket: "test-bucket", - Path: "", - Endpoint: "http://localhost:9000", - EnableTLS: false, - AccessKeyID: "access-key-id", - AccessKeySecret: "access-key-secret", - }, EdaClientConfig: clients.EigenDAClientConfig{ RPC: "http://localhost:8545", StatusQueryRetryInterval: 5 * time.Second, @@ -107,88 +91,4 @@ func TestConfigVerification(t *testing.T) { }) }) - t.Run("MissingS3AccessKeys", func(t *testing.T) { - cfg := validCfg() - - cfg.S3Config.CredentialType = s3.CredentialTypeStatic - cfg.S3Config.Endpoint = "http://localhost:9000" - cfg.S3Config.AccessKeyID = "" - - err := cfg.Check() - require.Error(t, err) - }) - - t.Run("MissingS3Credential", func(t *testing.T) { - cfg := validCfg() - - cfg.S3Config.CredentialType = s3.CredentialTypeUnknown - - err := cfg.Check() - require.Error(t, err) - }) - - t.Run("MissingEigenDADisperserRPC", func(t *testing.T) { - cfg := validCfg() - cfg.EdaClientConfig.RPC = "" - cfg.MemstoreEnabled = false - - err := cfg.Check() - require.Error(t, err) - }) - - t.Run("InvalidFallbackTarget", func(t *testing.T) { - cfg := validCfg() - cfg.FallbackTargets = []string{"postgres"} - - err := cfg.Check() - require.Error(t, err) - }) - - t.Run("InvalidCacheTarget", func(t *testing.T) { - cfg := validCfg() - cfg.CacheTargets = []string{"postgres"} - - err := cfg.Check() - require.Error(t, err) - }) - t.Run("InvalidCacheTarget", func(t *testing.T) { - cfg := validCfg() - cfg.CacheTargets = []string{"postgres"} - - err := cfg.Check() - require.Error(t, err) - }) - - t.Run("DuplicateCacheTargets", func(t *testing.T) { - cfg := validCfg() - cfg.CacheTargets = []string{"s3", "s3"} - - err := cfg.Check() - require.Error(t, err) - }) - - t.Run("DuplicateFallbackTargets", func(t *testing.T) { - cfg := validCfg() - cfg.FallbackTargets = []string{"s3", "s3"} - - err := cfg.Check() - require.Error(t, err) - }) - - t.Run("OverlappingCacheFallbackTargets", func(t *testing.T) { - cfg := validCfg() - cfg.FallbackTargets = []string{"s3"} - cfg.CacheTargets = []string{"s3"} - - err := cfg.Check() - require.Error(t, err) - }) - - t.Run("BadRedisConfiguration", func(t *testing.T) { - cfg := validCfg() - cfg.RedisConfig.Endpoint = "" - - err := cfg.Check() - require.Error(t, err) - }) } diff --git a/server/handlers.go b/server/handlers.go index b9877cfb..514d3585 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -9,7 +9,7 @@ import ( "net/http" "github.com/Layr-Labs/eigenda-proxy/commitments" - "github.com/Layr-Labs/eigenda-proxy/store" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/gorilla/mux" ) @@ -180,7 +180,7 @@ func (svr *Server) handlePostShared(w http.ResponseWriter, r *http.Request, comm Err: fmt.Errorf("put request failed with commitment %v (commitment mode %v): %w", comm, meta.Mode, err), Meta: meta, } - if errors.Is(err, store.ErrEigenDAOversizedBlob) || errors.Is(err, store.ErrProxyOversizedBlob) { + if errors.Is(err, common.ErrEigenDAOversizedBlob) || errors.Is(err, common.ErrProxyOversizedBlob) { // we add here any error that should be returned as a 400 instead of a 500. // currently only includes oversized blob requests http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/server/load_store.go b/server/load_store.go index be298c87..e79145bf 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/Layr-Labs/eigenda-proxy/metrics" "github.com/Layr-Labs/eigenda-proxy/store" "github.com/Layr-Labs/eigenda-proxy/store/generated_key/eigenda" @@ -18,29 +19,29 @@ import ( // TODO - create structured abstraction for dependency injection vs. overloading stateless functions // populateTargets ... creates a list of storage backends based on the provided target strings -func populateTargets(targets []string, s3 store.PrecomputedKeyStore, redis *redis.Store) []store.PrecomputedKeyStore { - stores := make([]store.PrecomputedKeyStore, len(targets)) +func populateTargets(targets []string, s3 common.PrecomputedKeyStore, redis *redis.Store) []common.PrecomputedKeyStore { + stores := make([]common.PrecomputedKeyStore, len(targets)) for i, f := range targets { - b := store.StringToBackendType(f) + b := common.StringToBackendType(f) switch b { - case store.RedisBackendType: + case common.RedisBackendType: if redis == nil { panic(fmt.Sprintf("Redis backend is not configured but specified in targets: %s", f)) } stores[i] = redis - case store.S3BackendType: + case common.S3BackendType: if s3 == nil { panic(fmt.Sprintf("S3 backend is not configured but specified in targets: %s", f)) } stores[i] = s3 - case store.EigenDABackendType, store.MemoryBackendType: + case common.EigenDABackendType, common.MemoryBackendType: panic(fmt.Sprintf("Invalid target for fallback: %s", f)) - case store.Unknown: + case common.UnknownBackendType: fallthrough default: @@ -55,21 +56,21 @@ func populateTargets(targets []string, s3 store.PrecomputedKeyStore, redis *redi func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metrics.Metricer) (store.IManager, error) { // create S3 backend store (if enabled) var err error - var s3Store store.PrecomputedKeyStore + var s3Store *s3.Store var redisStore *redis.Store - if cfg.EigenDAConfig.S3Config.Bucket != "" && cfg.EigenDAConfig.S3Config.Endpoint != "" { + if cfg.EigenDAConfig.StorageConfig.S3Config.Bucket != "" && cfg.EigenDAConfig.StorageConfig.S3Config.Endpoint != "" { log.Info("Using S3 backend") - s3Store, err = s3.NewS3(cfg.EigenDAConfig.S3Config) + s3Store, err = s3.NewStore(cfg.EigenDAConfig.StorageConfig.S3Config) if err != nil { return nil, fmt.Errorf("failed to create S3 store: %w", err) } } - if cfg.EigenDAConfig.RedisConfig.Endpoint != "" { + if cfg.EigenDAConfig.StorageConfig.RedisConfig.Endpoint != "" { log.Info("Using Redis backend") // create Redis backend store - redisStore, err = redis.NewStore(&cfg.EigenDAConfig.RedisConfig) + redisStore, err = redis.NewStore(&cfg.EigenDAConfig.StorageConfig.RedisConfig) if err != nil { return nil, fmt.Errorf("failed to create Redis store: %w", err) } @@ -91,7 +92,7 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr } // create EigenDA backend store - var eigenDA store.GeneratedKeyStore + var eigenDA common.GeneratedKeyStore if cfg.EigenDAConfig.MemstoreEnabled { log.Info("Using memstore backend for EigenDA") eigenDA, err = memstore.New(ctx, verifier, log, cfg.EigenDAConfig.MemstoreConfig) @@ -120,15 +121,15 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr } // create secondary storage router - fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore) - caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore) + fallbacks := populateTargets(cfg.EigenDAConfig.StorageConfig.FallbackTargets, s3Store, redisStore) + caches := populateTargets(cfg.EigenDAConfig.StorageConfig.CacheTargets, s3Store, redisStore) secondary := store.NewSecondaryManager(log, m, caches, fallbacks) if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled // NOTE: in the future the number of threads could be made configurable via env - log.Debug("Starting secondary write loop(s)", "count", cfg.EigenDAConfig.AsyncPutWorkers) + log.Debug("Starting secondary write loop(s)", "count", cfg.EigenDAConfig.StorageConfig.AsyncPutWorkers) - for i := 0; i < cfg.EigenDAConfig.AsyncPutWorkers; i++ { + for i := 0; i < cfg.EigenDAConfig.StorageConfig.AsyncPutWorkers; i++ { go secondary.WriteSubscriptionLoop(ctx) } } diff --git a/store/cli.go b/store/cli.go new file mode 100644 index 00000000..fb776b06 --- /dev/null +++ b/store/cli.go @@ -0,0 +1,59 @@ +package store + +import ( + "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis" + "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3" + "github.com/urfave/cli/v2" +) + +var ( + FallbackTargetsFlagName = withFlagPrefix("fallback-targets") + CacheTargetsFlagName = withFlagPrefix("cache-targets") + ConcurrentWriteThreads = withFlagPrefix("concurrent-write-routines") +) + +func withFlagPrefix(s string) string { + return "store." + s +} + +func withEnvPrefix(envPrefix, s string) []string { + return []string{envPrefix + "_STORE_" + s} +} + +// CLIFlags ... used for Redis backend configuration +// category is used to group the flags in the help output (see https://cli.urfave.org/v2/examples/flags/#grouping) +func CLIFlags(envPrefix, category string) []cli.Flag { + return []cli.Flag{ + &cli.StringSliceFlag{ + Name: FallbackTargetsFlagName, + Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.", + Value: cli.NewStringSlice(), + EnvVars: withEnvPrefix(envPrefix, "FALLBACK_TARGETS"), + Category: category, + }, + &cli.StringSliceFlag{ + Name: CacheTargetsFlagName, + Usage: "List of caching targets to use fast reads from EigenDA.", + Value: cli.NewStringSlice(), + EnvVars: withEnvPrefix(envPrefix, "CACHE_TARGETS"), + Category: category, + }, + &cli.IntFlag{ + Name: ConcurrentWriteThreads, + Usage: "Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes.", + Value: 0, + EnvVars: withEnvPrefix(envPrefix, "CONCURRENT_WRITE_THREADS"), + Category: category, + }, + } +} + +func ReadConfig(ctx *cli.Context) Config { + return Config{ + AsyncPutWorkers: ctx.Int(ConcurrentWriteThreads), + FallbackTargets: ctx.StringSlice(FallbackTargetsFlagName), + CacheTargets: ctx.StringSlice(CacheTargetsFlagName), + RedisConfig: redis.ReadConfig(ctx), + S3Config: s3.ReadConfig(ctx), + } +} diff --git a/store/deprecated_flags.go b/store/deprecated_flags.go new file mode 100644 index 00000000..b0b7eec1 --- /dev/null +++ b/store/deprecated_flags.go @@ -0,0 +1,67 @@ +package store + +import ( + "fmt" + + "github.com/urfave/cli/v2" +) + +// All of these flags are deprecated and will be removed in release v2.0.0 +// we leave them here with actions that crash the program to ensure they are not used, +// and to make it easier for users to find the new flags (instead of silently crashing late during execution +// because some flag's env var was changed but the user forgot to update it) +var ( + DeprecatedFallbackTargetsFlagName = withDeprecatedFlagPrefix("fallback-targets") + DeprecatedCacheTargetsFlagName = withDeprecatedFlagPrefix("cache-targets") + DeprecatedConcurrentWriteThreads = withDeprecatedFlagPrefix("concurrent-write-routines") +) + +func withDeprecatedFlagPrefix(s string) string { + return "routing." + s +} + +func withDeprecatedEnvPrefix(envPrefix, s string) []string { + return []string{envPrefix + "_" + s} +} + +// CLIFlags ... used for EigenDA client configuration +func DeprecatedCLIFlags(envPrefix, category string) []cli.Flag { + return []cli.Flag{ + &cli.StringSliceFlag{ + Name: DeprecatedFallbackTargetsFlagName, + Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.", + Value: cli.NewStringSlice(), + EnvVars: withDeprecatedEnvPrefix(envPrefix, "FALLBACK_TARGETS"), + Category: category, + Action: func(*cli.Context, []string) error { + return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead", + DeprecatedFallbackTargetsFlagName, withDeprecatedEnvPrefix(envPrefix, "FALLBACK_TARGETS"), + FallbackTargetsFlagName, withEnvPrefix(envPrefix, "FALLBACK_TARGETS")) + }, + }, + &cli.StringSliceFlag{ + Name: DeprecatedCacheTargetsFlagName, + Usage: "List of caching targets to use fast reads from EigenDA.", + Value: cli.NewStringSlice(), + EnvVars: withDeprecatedEnvPrefix(envPrefix, "CACHE_TARGETS"), + Category: category, + Action: func(*cli.Context, []string) error { + return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead", + DeprecatedCacheTargetsFlagName, withDeprecatedEnvPrefix(envPrefix, "CACHE_TARGETS"), + CacheTargetsFlagName, withEnvPrefix(envPrefix, "CACHE_TARGETS")) + }, + }, + &cli.IntFlag{ + Name: DeprecatedConcurrentWriteThreads, + Usage: "Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes.", + Value: 0, + EnvVars: withDeprecatedEnvPrefix(envPrefix, "CONCURRENT_WRITE_THREADS"), + Category: category, + Action: func(*cli.Context, int) error { + return fmt.Errorf("flag --%s (env var %s) is deprecated, use --%s (env var %s) instead", + DeprecatedCacheTargetsFlagName, withDeprecatedEnvPrefix(envPrefix, "CONCURRENT_WRITE_THREADS"), + CacheTargetsFlagName, withEnvPrefix(envPrefix, "CONCURRENT_WRITE_THREADS")) + }, + }, + } +} diff --git a/store/generated_key/eigenda/eigenda.go b/store/generated_key/eigenda/eigenda.go index 56ace6c3..59da1d46 100644 --- a/store/generated_key/eigenda/eigenda.go +++ b/store/generated_key/eigenda/eigenda.go @@ -6,7 +6,7 @@ import ( "fmt" "time" - "github.com/Layr-Labs/eigenda-proxy/store" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/Layr-Labs/eigenda/api/clients" "github.com/ethereum/go-ethereum/log" @@ -31,7 +31,7 @@ type Store struct { log log.Logger } -var _ store.GeneratedKeyStore = (*Store)(nil) +var _ common.GeneratedKeyStore = (*Store)(nil) func NewStore(client *clients.EigenDAClient, v *verify.Verifier, log log.Logger, cfg *StoreConfig) (*Store, error) { @@ -67,7 +67,7 @@ func (e Store) Put(ctx context.Context, value []byte) ([]byte, error) { return nil, fmt.Errorf("EigenDA client failed to re-encode blob: %w", err) } if uint64(len(encodedBlob)) > e.cfg.MaxBlobSizeBytes { - return nil, fmt.Errorf("%w: blob length %d, max blob size %d", store.ErrProxyOversizedBlob, len(value), e.cfg.MaxBlobSizeBytes) + return nil, fmt.Errorf("%w: blob length %d, max blob size %d", common.ErrProxyOversizedBlob, len(value), e.cfg.MaxBlobSizeBytes) } dispersalStart := time.Now() @@ -116,14 +116,9 @@ func (e Store) Put(ctx context.Context, value []byte) ([]byte, error) { return bytes, nil } -// Entries are a no-op for EigenDA Store -func (e Store) Stats() *store.Stats { - return nil -} - // Backend returns the backend type for EigenDA Store -func (e Store) BackendType() store.BackendType { - return store.EigenDABackendType +func (e Store) BackendType() common.BackendType { + return common.EigenDABackendType } // Key is used to recover certificate fields and that verifies blob diff --git a/store/generated_key/memstore/memstore.go b/store/generated_key/memstore/memstore.go index b0dd7ab5..7d31c30a 100644 --- a/store/generated_key/memstore/memstore.go +++ b/store/generated_key/memstore/memstore.go @@ -11,10 +11,10 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" - "github.com/Layr-Labs/eigenda-proxy/store" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/Layr-Labs/eigenda/api/clients/codecs" - "github.com/Layr-Labs/eigenda/api/grpc/common" + eigenda_common "github.com/Layr-Labs/eigenda/api/grpc/common" "github.com/Layr-Labs/eigenda/api/grpc/disperser" "github.com/ethereum/go-ethereum/crypto" ) @@ -49,7 +49,7 @@ type MemStore struct { reads int } -var _ store.GeneratedKeyStore = (*MemStore)(nil) +var _ common.GeneratedKeyStore = (*MemStore)(nil) // New ... constructor func New( @@ -140,7 +140,7 @@ func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) { } if uint64(len(encodedVal)) > e.config.MaxBlobSizeBytes { - return nil, fmt.Errorf("%w: blob length %d, max blob size %d", store.ErrProxyOversizedBlob, len(value), e.config.MaxBlobSizeBytes) + return nil, fmt.Errorf("%w: blob length %d, max blob size %d", common.ErrProxyOversizedBlob, len(value), e.config.MaxBlobSizeBytes) } e.Lock() @@ -164,7 +164,7 @@ func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) { cert := &verify.Certificate{ BlobHeader: &disperser.BlobHeader{ - Commitment: &common.G1Commitment{ + Commitment: &eigenda_common.G1Commitment{ X: commitment.X.Marshal(), Y: commitment.Y.Marshal(), }, @@ -222,16 +222,6 @@ func (e *MemStore) Verify(_, _ []byte) error { return nil } -// Stats ... returns the current usage metrics of the in-memory key-value data store. -func (e *MemStore) Stats() *store.Stats { - e.RLock() - defer e.RUnlock() - return &store.Stats{ - Entries: len(e.store), - Reads: e.reads, - } -} - -func (e *MemStore) BackendType() store.BackendType { - return store.MemoryBackendType +func (e *MemStore) BackendType() common.BackendType { + return common.MemoryBackendType } diff --git a/store/manager.go b/store/manager.go index 79dfb9c8..cce52678 100644 --- a/store/manager.go +++ b/store/manager.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/Layr-Labs/eigenda-proxy/commitments" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/ethereum/go-ethereum/log" ) @@ -21,15 +22,15 @@ type IManager interface { type Manager struct { log log.Logger // primary storage backends - eigenda GeneratedKeyStore // ALT DA commitment type for OP mode && simple commitment mode for standard /client - s3 PrecomputedKeyStore // OP commitment mode && keccak256 commitment type + eigenda common.GeneratedKeyStore // ALT DA commitment type for OP mode && simple commitment mode for standard /client + s3 common.PrecomputedKeyStore // OP commitment mode && keccak256 commitment type // secondary storage backends (caching and fallbacks) secondary ISecondary } // NewManager ... Init -func NewManager(eigenda GeneratedKeyStore, s3 PrecomputedKeyStore, l log.Logger, +func NewManager(eigenda common.GeneratedKeyStore, s3 common.PrecomputedKeyStore, l log.Logger, secondary ISecondary) (IManager, error) { return &Manager{ log: l, diff --git a/store/precomputed_key/redis/redis.go b/store/precomputed_key/redis/redis.go index a90b202f..b8fa32b4 100644 --- a/store/precomputed_key/redis/redis.go +++ b/store/precomputed_key/redis/redis.go @@ -6,7 +6,7 @@ import ( "fmt" "time" - "github.com/Layr-Labs/eigenda-proxy/store" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/go-redis/redis/v8" ) @@ -27,7 +27,7 @@ type Store struct { client *redis.Client } -var _ store.PrecomputedKeyStore = (*Store)(nil) +var _ common.PrecomputedKeyStore = (*Store)(nil) // NewStore ... constructor func NewStore(cfg *Config) (*Store, error) { @@ -75,6 +75,6 @@ func (r *Store) Verify(_ []byte, _ []byte) error { return nil } -func (r *Store) BackendType() store.BackendType { - return store.RedisBackendType +func (r *Store) BackendType() common.BackendType { + return common.RedisBackendType } diff --git a/store/precomputed_key/s3/s3.go b/store/precomputed_key/s3/s3.go index 5754dac8..1535bcd4 100644 --- a/store/precomputed_key/s3/s3.go +++ b/store/precomputed_key/s3/s3.go @@ -10,7 +10,7 @@ import ( "path" "strings" - "github.com/Layr-Labs/eigenda-proxy/store" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/ethereum/go-ethereum/crypto" "github.com/minio/minio-go/v7" @@ -34,7 +34,7 @@ func StringToCredentialType(s string) CredentialType { } } -var _ store.PrecomputedKeyStore = (*Store)(nil) +var _ common.PrecomputedKeyStore = (*Store)(nil) type CredentialType string type Config struct { @@ -59,7 +59,7 @@ func isGoogleEndpoint(endpoint string) bool { return strings.Contains(endpoint, "storage.googleapis.com") } -func NewS3(cfg Config) (*Store, error) { +func NewStore(cfg Config) (*Store, error) { putObjectOptions := minio.PutObjectOptions{} if isGoogleEndpoint(cfg.Endpoint) { putObjectOptions.DisableContentSha256 = true // Avoid chunk signatures on GCS: https://github.com/minio/minio-go/issues/1922 @@ -116,8 +116,8 @@ func (s *Store) Verify(key []byte, value []byte) error { return nil } -func (s *Store) BackendType() store.BackendType { - return store.S3BackendType +func (s *Store) BackendType() common.BackendType { + return common.S3BackendType } func creds(cfg Config) *credentials.Credentials { diff --git a/store/secondary.go b/store/secondary.go index f78b9020..37df93ad 100644 --- a/store/secondary.go +++ b/store/secondary.go @@ -6,6 +6,7 @@ import ( "net/http" "sync" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/Layr-Labs/eigenda-proxy/metrics" "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/crypto" @@ -44,8 +45,8 @@ type SecondaryManager struct { log log.Logger m metrics.Metricer - caches []PrecomputedKeyStore - fallbacks []PrecomputedKeyStore + caches []common.PrecomputedKeyStore + fallbacks []common.PrecomputedKeyStore verifyLock sync.RWMutex topic chan PutNotify @@ -53,9 +54,9 @@ type SecondaryManager struct { } // NewSecondaryManager ... creates a new secondary storage router -func NewSecondaryManager(log log.Logger, m metrics.Metricer, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) ISecondary { +func NewSecondaryManager(log log.Logger, m metrics.Metricer, caches []common.PrecomputedKeyStore, fallbacks []common.PrecomputedKeyStore) ISecondary { return &SecondaryManager{ - topic: make(chan PutNotify), // yes channel is un-buffered which dispersing consumption across routines helps alleviate + topic: make(chan PutNotify), // channel is un-buffered which dispersing consumption across routines helps alleviate log: log, m: m, caches: caches, @@ -142,7 +143,7 @@ func (sm *SecondaryManager) WriteSubscriptionLoop(ctx context.Context) { // NOTE: - this can also be parallelized when reading from multiple sources and discarding connections that fail // - for complete optimization we can profile secondary storage backends to determine the fastest / most reliable and always rout to it first func (sm *SecondaryManager) MultiSourceRead(ctx context.Context, commitment []byte, fallback bool, verify func([]byte, []byte) error) ([]byte, error) { - var sources []PrecomputedKeyStore + var sources []common.PrecomputedKeyStore if fallback { sources = sm.fallbacks } else { diff --git a/store/store.go b/store/store.go index 404fedcc..6b65b6db 100644 --- a/store/store.go +++ b/store/store.go @@ -1,88 +1,79 @@ package store import ( - "context" "fmt" - "strings" + + "github.com/Layr-Labs/eigenda-proxy/common" + "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis" + "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3" ) -type BackendType uint8 +type Config struct { + AsyncPutWorkers int + FallbackTargets []string + CacheTargets []string -const ( - EigenDABackendType BackendType = iota - MemoryBackendType - S3BackendType - RedisBackendType + // secondary storage cfgs + RedisConfig redis.Config + S3Config s3.Config +} - Unknown -) +// checkTargets ... verifies that a backend target slice is constructed correctly +func (cfg *Config) checkTargets(targets []string) error { + if len(targets) == 0 { + return nil + } -var ( - ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size") - ErrEigenDAOversizedBlob = fmt.Errorf("blob size cannot exceed") -) + if common.ContainsDuplicates(targets) { + return fmt.Errorf("duplicate targets provided: %+v", targets) + } -func (b BackendType) String() string { - switch b { - case EigenDABackendType: - return "EigenDA" - case MemoryBackendType: - return "Memory" - case S3BackendType: - return "S3" - case RedisBackendType: - return "Redis" - case Unknown: - fallthrough - default: - return "Unknown" + for _, t := range targets { + if common.StringToBackendType(t) == common.UnknownBackendType { + return fmt.Errorf("unknown fallback target provided: %s", t) + } } + + return nil } -func StringToBackendType(s string) BackendType { - lower := strings.ToLower(s) - - switch lower { - case "eigenda": - return EigenDABackendType - case "memory": - return MemoryBackendType - case "s3": - return S3BackendType - case "redis": - return RedisBackendType - case "unknown": - fallthrough - default: - return Unknown +// Check ... verifies that configuration values are adequately set +func (cfg *Config) Check() error { + + if cfg.S3Config.CredentialType == s3.CredentialTypeUnknown && cfg.S3Config.Endpoint != "" { + return fmt.Errorf("s3 credential type must be set") + } + if cfg.S3Config.CredentialType == s3.CredentialTypeStatic { + if cfg.S3Config.Endpoint != "" && (cfg.S3Config.AccessKeyID == "" || cfg.S3Config.AccessKeySecret == "") { + return fmt.Errorf("s3 endpoint is set, but access key id or access key secret is not set") + } } -} -// Used for E2E tests -type Stats struct { - Entries int - Reads int -} + if cfg.RedisConfig.Endpoint == "" && cfg.RedisConfig.Password != "" { + return fmt.Errorf("redis password is set, but endpoint is not") + } -type Store interface { - // Backend returns the backend type provider of the store. - BackendType() BackendType - // Verify verifies the given key-value pair. - Verify(key []byte, value []byte) error -} + err := cfg.checkTargets(cfg.FallbackTargets) + if err != nil { + return err + } -type GeneratedKeyStore interface { - Store - // Get retrieves the given key if it's present in the key-value data store. - Get(ctx context.Context, key []byte) ([]byte, error) - // Put inserts the given value into the key-value data store. - Put(ctx context.Context, value []byte) (key []byte, err error) -} + err = cfg.checkTargets(cfg.CacheTargets) + if err != nil { + return err + } + + // verify that same target is not in both fallback and cache targets + for _, t := range cfg.FallbackTargets { + if common.Contains(cfg.CacheTargets, t) { + return fmt.Errorf("target %s is in both fallback and cache targets", t) + } + } + + // verify that thread counts are sufficiently set + if cfg.AsyncPutWorkers >= 100 { + return fmt.Errorf("number of secondary write workers can't be greater than 100") + } -type PrecomputedKeyStore interface { - Store - // Get retrieves the given key if it's present in the key-value data store. - Get(ctx context.Context, key []byte) ([]byte, error) - // Put inserts the given value into the key-value data store. - Put(ctx context.Context, key []byte, value []byte) error + return nil } diff --git a/store/store_test.go b/store/store_test.go new file mode 100644 index 00000000..0f214aa3 --- /dev/null +++ b/store/store_test.go @@ -0,0 +1,115 @@ +package store_test + +import ( + "testing" + "time" + + "github.com/Layr-Labs/eigenda-proxy/store" + "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis" + "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3" + "github.com/stretchr/testify/require" +) + +func validCfg() *store.Config { + return &store.Config{ + RedisConfig: redis.Config{ + Endpoint: "localhost:6379", + Password: "password", + DB: 0, + Eviction: 10 * time.Minute, + }, + S3Config: s3.Config{ + Bucket: "test-bucket", + Path: "", + Endpoint: "http://localhost:9000", + EnableTLS: false, + AccessKeyID: "access-key-id", + AccessKeySecret: "access-key-secret", + }, + } +} + +func TestConfigVerification(t *testing.T) { + t.Run("ValidConfig", func(t *testing.T) { + cfg := validCfg() + + err := cfg.Check() + require.NoError(t, err) + }) + + t.Run("MissingS3AccessKeys", func(t *testing.T) { + cfg := validCfg() + + cfg.S3Config.CredentialType = s3.CredentialTypeStatic + cfg.S3Config.Endpoint = "http://localhost:9000" + cfg.S3Config.AccessKeyID = "" + + err := cfg.Check() + require.Error(t, err) + }) + + t.Run("MissingS3Credential", func(t *testing.T) { + cfg := validCfg() + + cfg.S3Config.CredentialType = s3.CredentialTypeUnknown + + err := cfg.Check() + require.Error(t, err) + }) + + t.Run("InvalidFallbackTarget", func(t *testing.T) { + cfg := validCfg() + cfg.FallbackTargets = []string{"postgres"} + + err := cfg.Check() + require.Error(t, err) + }) + + t.Run("InvalidCacheTarget", func(t *testing.T) { + cfg := validCfg() + cfg.CacheTargets = []string{"postgres"} + + err := cfg.Check() + require.Error(t, err) + }) + t.Run("InvalidCacheTarget", func(t *testing.T) { + cfg := validCfg() + cfg.CacheTargets = []string{"postgres"} + + err := cfg.Check() + require.Error(t, err) + }) + + t.Run("DuplicateCacheTargets", func(t *testing.T) { + cfg := validCfg() + cfg.CacheTargets = []string{"s3", "s3"} + + err := cfg.Check() + require.Error(t, err) + }) + + t.Run("DuplicateFallbackTargets", func(t *testing.T) { + cfg := validCfg() + cfg.FallbackTargets = []string{"s3", "s3"} + + err := cfg.Check() + require.Error(t, err) + }) + + t.Run("OverlappingCacheFallbackTargets", func(t *testing.T) { + cfg := validCfg() + cfg.FallbackTargets = []string{"s3"} + cfg.CacheTargets = []string{"s3"} + + err := cfg.Check() + require.Error(t, err) + }) + + t.Run("BadRedisConfiguration", func(t *testing.T) { + cfg := validCfg() + cfg.RedisConfig.Endpoint = "" + + err := cfg.Check() + require.Error(t, err) + }) +} diff --git a/verify/cli.go b/verify/cli.go index f41135f4..afe98ccd 100644 --- a/verify/cli.go +++ b/verify/cli.go @@ -4,7 +4,7 @@ import ( "fmt" "runtime" - "github.com/Layr-Labs/eigenda-proxy/utils" + "github.com/Layr-Labs/eigenda-proxy/common" "github.com/Layr-Labs/eigenda/encoding/kzg" "github.com/urfave/cli/v2" ) @@ -113,7 +113,7 @@ func CLIFlags(envPrefix, category string) []cli.Flag { HasBeenSet: true, Action: func(_ *cli.Context, maxBlobLengthStr string) error { // parse the string to a uint64 and set the maxBlobLengthBytes var to be used by ReadConfig() - numBytes, err := utils.ParseBytesAmount(maxBlobLengthStr) + numBytes, err := common.ParseBytesAmount(maxBlobLengthStr) if err != nil { return fmt.Errorf("failed to parse max blob length flag: %w", err) }