Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use batch confirmation that was upstreamed to eigenda client #192

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ jobs:
go-version: '1.22' # The Go version to download (if necessary) and use.
- run: go version

- name: Checkout EigenDA
- name: Checkout code
uses: actions/checkout@v3

- name: golangci-lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.60
version: v1.61
args: --timeout 3m
72 changes: 71 additions & 1 deletion flags/eigendaflags/cli.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package eigendaflags

import (
"fmt"
"log"
"strconv"
"time"

"github.com/Layr-Labs/eigenda/api/clients"
Expand All @@ -21,6 +24,9 @@ var (
PutBlobEncodingVersionFlagName = withFlagPrefix("put-blob-encoding-version")
DisablePointVerificationModeFlagName = withFlagPrefix("disable-point-verification-mode")
WaitForFinalizationFlagName = withFlagPrefix("wait-for-finalization")
ConfirmationDepthFlagName = withFlagPrefix("confirmation-depth")
EthRPCURLFlagName = withFlagPrefix("eth-rpc")
SvcManagerAddrFlagName = withFlagPrefix("svc-manager-addr")
)

func withFlagPrefix(s string) string {
Expand Down Expand Up @@ -101,11 +107,41 @@ func CLIFlags(envPrefix, category string) []cli.Flag {
EnvVars: []string{withEnvPrefix(envPrefix, "WAIT_FOR_FINALIZATION")},
Value: false,
Category: category,
Hidden: true,
Action: func(_ *cli.Context, _ bool) error {
return fmt.Errorf("flag --%s is deprecated, instead use --%s finalized", WaitForFinalizationFlagName, ConfirmationDepthFlagName)
},
},
&cli.StringFlag{
Name: ConfirmationDepthFlagName,
Usage: "Number of Ethereum blocks to wait after the blob's batch has been included on-chain, " +
"before returning from PutBlob calls. Can either be a number or 'finalized'.",
EnvVars: []string{withEnvPrefix(envPrefix, "CONFIRMATION_DEPTH")},
Value: "0",
Category: category,
Action: func(_ *cli.Context, val string) error {
return validateConfirmationFlag(val)
},
},
&cli.StringFlag{
Name: EthRPCURLFlagName,
Usage: "URL of the Ethereum RPC endpoint. Needed to confirm blobs landed onchain.",
EnvVars: []string{withEnvPrefix(envPrefix, "ETH_RPC")},
Category: category,
Required: true,
},
&cli.StringFlag{
Name: SvcManagerAddrFlagName,
Usage: "Address of the EigenDAServiceManager contract. Required to confirm blobs landed onchain. See https://github.com/Layr-Labs/eigenlayer-middleware/?tab=readme-ov-file#current-mainnet-deployment",
EnvVars: []string{withEnvPrefix(envPrefix, "SERVICE_MANAGER_ADDR")},
Category: category,
Required: true,
},
}
}

func ReadConfig(ctx *cli.Context) clients.EigenDAClientConfig {
waitForFinalization, confirmationDepth := parseConfirmationFlag(ctx.String(ConfirmationDepthFlagName))
return clients.EigenDAClientConfig{
RPC: ctx.String(DisperserRPCFlagName),
StatusQueryRetryInterval: ctx.Duration(StatusQueryRetryIntervalFlagName),
Expand All @@ -116,6 +152,40 @@ func ReadConfig(ctx *cli.Context) clients.EigenDAClientConfig {
SignerPrivateKeyHex: ctx.String(SignerPrivateKeyHexFlagName),
PutBlobEncodingVersion: codecs.BlobEncodingVersion(ctx.Uint(PutBlobEncodingVersionFlagName)),
DisablePointVerificationMode: ctx.Bool(DisablePointVerificationModeFlagName),
WaitForFinalization: ctx.Bool(WaitForFinalizationFlagName),
WaitForFinalization: waitForFinalization,
WaitForConfirmationDepth: confirmationDepth,
EthRpcUrl: ctx.String(EthRPCURLFlagName),
SvcManagerAddr: ctx.String(SvcManagerAddrFlagName),
}
}

// parse the val (either "finalized" or a number) into waitForFinalization (bool) and confirmationDepth (uint64).
func parseConfirmationFlag(val string) (bool, uint64) {
if val == "finalized" {
return true, 0
}

depth, err := strconv.ParseUint(val, 10, 64)
if err != nil {
panic("this should never happen, as the flag is validated before this point")
}

return false, depth
}

func validateConfirmationFlag(val string) error {
if val == "finalized" {
return nil
}

depth, err := strconv.ParseUint(val, 10, 64)
if err != nil {
return fmt.Errorf("confirmation-depth must be either 'finalized' or a number, got: %s", val)
}

if depth >= 64 {
log.Printf("Warning: confirmation depth set to %d, which is > 2 epochs (64). Consider using 'finalized' instead.\n", depth)
}

return nil
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22
toolchain go1.22.0

require (
github.com/Layr-Labs/eigenda v0.8.4
github.com/Layr-Labs/eigenda v0.8.5-0.20241028201743-5fe3e910a22d
github.com/consensys/gnark-crypto v0.12.1
github.com/ethereum-optimism/optimism v1.9.4-0.20240927020138-a9c7f349d10b
github.com/ethereum/go-ethereum v1.14.11
Expand Down Expand Up @@ -39,6 +39,7 @@ require (
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.31.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect
github.com/aws/aws-sdk-go-v2/service/kms v1.31.0 // indirect
Expand Down Expand Up @@ -173,7 +174,7 @@ require (
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/miekg/dns v1.1.62 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e h1:ZIWapoIRN1VqT8GR8jAwb1Ie9GyehWjVcGh32Y2MznE=
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Layr-Labs/eigenda v0.8.4 h1:6znMJcPLtchYixbUddCeKCKqwbpsAf/4CX/fBnWlIpc=
github.com/Layr-Labs/eigenda v0.8.4/go.mod h1:MzSFbxDQ1/tMcLlfxqz08YubB3rd+E2xme2p7hwP2YM=
github.com/Layr-Labs/eigenda v0.8.5-0.20241028201743-5fe3e910a22d h1:s5U1TaWhC1J2rwc9IQdU/COGvuXALCKMd9GbONUZMxc=
github.com/Layr-Labs/eigenda v0.8.5-0.20241028201743-5fe3e910a22d/go.mod h1:sqUNf9Ak+EfAX82jDxrb4QbT/g3DViWD3b7YIk36skk=
github.com/Layr-Labs/eigensdk-go v0.1.7-0.20240507215523-7e4891d5099a h1:L/UsJFw9M31FD/WgXTPFB0oxbq9Cu4Urea1xWPMQS7Y=
github.com/Layr-Labs/eigensdk-go v0.1.7-0.20240507215523-7e4891d5099a/go.mod h1:OF9lmS/57MKxS0xpSpX0qHZl0SKkDRpvJIvsGvMN1y8=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
Expand Down Expand Up @@ -583,8 +583,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
Expand Down
7 changes: 4 additions & 3 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ type Config struct {

// ReadConfig ... parses the Config from the provided flags or environment variables.
func ReadConfig(ctx *cli.Context) Config {
edaClientConfig := eigendaflags.ReadConfig(ctx)
return Config{
RedisConfig: redis.ReadConfig(ctx),
S3Config: s3.ReadConfig(ctx),
EdaClientConfig: eigendaflags.ReadConfig(ctx),
VerifierConfig: verify.ReadConfig(ctx),
EdaClientConfig: edaClientConfig,
VerifierConfig: verify.ReadConfig(ctx, edaClientConfig),
MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName),
MemstoreConfig: memstore.ReadConfig(ctx),
FallbackTargets: ctx.StringSlice(flags.FallbackTargetsFlagName),
Expand Down Expand Up @@ -80,7 +81,7 @@ func (cfg *Config) Check() error {
// TODO: move this verification logic to verify/cli.go
if cfg.VerifierConfig.VerifyCerts {
if cfg.MemstoreEnabled {
return fmt.Errorf("cannot enable cert verification when memstore is enabled")
return fmt.Errorf("cannot enable cert verification when memstore is enabled. use --%s", verify.CertVerificationDisabledFlagName)
}
if cfg.VerifierConfig.RPCURL == "" {
return fmt.Errorf("cert verification enabled but eth rpc is not set")
Expand Down
38 changes: 9 additions & 29 deletions store/generated_key/eigenda/eigenda.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package eigenda

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -66,46 +65,27 @@ func (e Store) Put(ctx context.Context, value []byte) ([]byte, error) {
if err != nil {
return nil, fmt.Errorf("EigenDA client failed to re-encode blob: %w", err)
}
// TODO: We should move this length check inside PutBlob
if uint64(len(encodedBlob)) > e.cfg.MaxBlobSizeBytes {
return nil, fmt.Errorf("%w: blob length %d, max blob size %d", store.ErrProxyOversizedBlob, len(value), e.cfg.MaxBlobSizeBytes)
}

dispersalStart := time.Now()
blobInfo, err := e.client.PutBlob(ctx, value)
if err != nil {
// TODO: we will want to filter for errors here and return a 503 when needed
// ie when dispersal itself failed, or that we timed out waiting for batch to land onchain
return nil, err
}
cert := (*verify.Certificate)(blobInfo)

err = e.verifier.VerifyCommitment(cert.BlobHeader.Commitment, encodedBlob)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to verify commitment: %w", err)
}

dispersalDuration := time.Since(dispersalStart)
remainingTimeout := e.cfg.StatusQueryTimeout - dispersalDuration

ticker := time.NewTicker(12 * time.Second) // avg. eth block time
defer ticker.Stop()
ctx, cancel := context.WithTimeout(context.Background(), remainingTimeout)
defer cancel()

done := false
for !done {
select {
case <-ctx.Done():
return nil, fmt.Errorf("timed out when trying to verify the DA certificate for a blob batch after dispersal")
case <-ticker.C:
err = e.verifier.VerifyCert(cert)
switch {
case err == nil:
done = true
case errors.Is(err, verify.ErrBatchMetadataHashNotFound):
e.log.Info("Blob confirmed, waiting for sufficient confirmation depth...", "targetDepth", e.cfg.EthConfirmationDepth)
default:
return nil, err
}
}
err = e.verifier.VerifyCert(ctx, cert)
if err != nil {
return nil, fmt.Errorf("failed to verify DA cert: %w", err)
}

bytes, err := rlp.EncodeToBytes(cert)
Expand All @@ -128,7 +108,7 @@ func (e Store) BackendType() store.BackendType {

// Key is used to recover certificate fields and that verifies blob
// against commitment to ensure data is valid and non-tampered.
func (e Store) Verify(key []byte, value []byte) error {
func (e Store) Verify(ctx context.Context, key []byte, value []byte) error {
var cert verify.Certificate
err := rlp.DecodeBytes(key, &cert)
if err != nil {
Expand All @@ -148,5 +128,5 @@ func (e Store) Verify(key []byte, value []byte) error {
}

// verify DA certificate against EigenDA's batch metadata that's bridged to Ethereum
return e.verifier.VerifyCert(&cert)
return e.verifier.VerifyCert(ctx, &cert)
}
2 changes: 1 addition & 1 deletion store/generated_key/memstore/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) {
return certBytes, nil
}

func (e *MemStore) Verify(_, _ []byte) error {
func (e *MemStore) Verify(_ context.Context, _, _ []byte) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions store/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (m *Manager) Get(ctx context.Context, key []byte, cm commitments.Commitment
}

// 2 - verify blob hash against commitment key digest
err = m.s3.Verify(key, value)
err = m.s3.Verify(ctx, key, value)
if err != nil {
return nil, err
}
Expand All @@ -82,7 +82,7 @@ func (m *Manager) Get(ctx context.Context, key []byte, cm commitments.Commitment
data, err := m.eigenda.Get(ctx, key)
if err == nil {
// verify
err = m.eigenda.Verify(key, data)
err = m.eigenda.Verify(ctx, key, data)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func (m *Manager) putKeccak256Mode(ctx context.Context, key []byte, value []byte
return nil, errors.New("S3 is disabled but is only supported for posting known commitment keys")
}

err := m.s3.Verify(key, value)
err := m.s3.Verify(ctx, key, value)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion store/precomputed_key/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (r *Store) Put(ctx context.Context, key []byte, value []byte) error {
return r.client.Set(ctx, string(key), string(value), r.eviction).Err()
}

func (r *Store) Verify(_ []byte, _ []byte) error {
func (r *Store) Verify(_ context.Context, _, _ []byte) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion store/precomputed_key/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *Store) Put(ctx context.Context, key []byte, value []byte) error {
return nil
}

func (s *Store) Verify(key []byte, value []byte) error {
func (s *Store) Verify(_ context.Context, key []byte, value []byte) error {
h := crypto.Keccak256Hash(value)
if !bytes.Equal(h[:], key) {
return fmt.Errorf("key does not match value, expected: %s got: %s", hex.EncodeToString(key), h.Hex())
Expand Down
6 changes: 3 additions & 3 deletions store/secondary.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type ISecondary interface {
CachingEnabled() bool
FallbackEnabled() bool
HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error
MultiSourceRead(context.Context, []byte, bool, func([]byte, []byte) error) ([]byte, error)
MultiSourceRead(context.Context, []byte, bool, func(context.Context, []byte, []byte) error) ([]byte, error)
WriteSubscriptionLoop(ctx context.Context)
}

Expand Down Expand Up @@ -141,7 +141,7 @@ func (sm *SecondaryManager) WriteSubscriptionLoop(ctx context.Context) {
// MultiSourceRead ... reads from a set of backends and returns the first successfully read blob
// NOTE: - this can also be parallelized when reading from multiple sources and discarding connections that fail
// - for complete optimization we can profile secondary storage backends to determine the fastest / most reliable and always rout to it first
func (sm *SecondaryManager) MultiSourceRead(ctx context.Context, commitment []byte, fallback bool, verify func([]byte, []byte) error) ([]byte, error) {
func (sm *SecondaryManager) MultiSourceRead(ctx context.Context, commitment []byte, fallback bool, verify func(context.Context, []byte, []byte) error) ([]byte, error) {
var sources []PrecomputedKeyStore
if fallback {
sources = sm.fallbacks
Expand All @@ -167,7 +167,7 @@ func (sm *SecondaryManager) MultiSourceRead(ctx context.Context, commitment []by

// verify cert:data using provided verification function
sm.verifyLock.Lock()
err = verify(commitment, data)
err = verify(ctx, commitment, data)
if err != nil {
cb(Failed)
log.Warn("Failed to verify blob", "err", err, "backend", src.BackendType())
Expand Down
2 changes: 1 addition & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Store interface {
// Backend returns the backend type provider of the store.
BackendType() BackendType
// Verify verifies the given key-value pair.
Verify(key []byte, value []byte) error
Verify(ctx context.Context, key []byte, value []byte) error
}

type GeneratedKeyStore interface {
Expand Down
14 changes: 0 additions & 14 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,6 @@ func Contains[P comparable](s []P, e P) bool {
return false
}

func EqualSlices[P comparable](s1, s2 []P) bool {
if len(s1) != len(s2) {
return false
}

for i := 0; i < len(s1); i++ {
if s1[i] != s2[i] {
return false
}
}

return true
}

func ParseBytesAmount(s string) (uint64, error) {
s = strings.TrimSpace(s)

Expand Down
Loading
Loading