Skip to content

Commit

Permalink
Add optimistic header v2 endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
avalonche committed May 6, 2024
1 parent 4e02e4b commit 0ea2b26
Show file tree
Hide file tree
Showing 10 changed files with 865 additions and 170 deletions.
24 changes: 21 additions & 3 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var (

SlotsPerEpoch = uint64(cli.GetEnvInt("SLOTS_PER_EPOCH", 32))
DurationPerEpoch = DurationPerSlot * time.Duration(SlotsPerEpoch)

EmptyTxRoot = "0x7ffe241ea60187fdb0187bfa22de35d1f9bed7ab061d9401fd47e34a54fbede1"
)

func SlotToEpoch(slot uint64) uint64 {
Expand All @@ -38,10 +40,10 @@ type BuilderStatus struct {
IsOptimistic bool
}

// Profile captures performance metrics for the block submission handler. Each
// BlockSubmissionProfile captures performance metrics for the block submission handler. Each
// field corresponds to the number of microseconds in each stage. The `Total`
// field is the number of microseconds taken for entire flow.
type Profile struct {
type BlockSubmissionProfile struct {
PayloadLoad uint64
Decode uint64
Prechecks uint64
Expand All @@ -50,6 +52,22 @@ type Profile struct {
Total uint64
}

func (p *Profile) String() string {
func (p *BlockSubmissionProfile) String() string {
return fmt.Sprintf("%v,%v,%v,%v,%v", p.Decode, p.Prechecks, p.Simulation, p.RedisUpdate, p.Total)
}

// HeaderSubmissionProfile captures performance metrics for the header submission handler. Each
// field corresponds to the number of microseconds at the start of each stage.
type HeaderSubmissionProfile struct {
PayloadLoad uint64
Decode uint64
Prechecks uint64
Signature uint64
RedisChecks uint64
RedisUpdate uint64
Total uint64
}

func (p *HeaderSubmissionProfile) String() string {
return fmt.Sprintf("%v,%v,%v,%v,%v,%v,%v", p.PayloadLoad, p.Decode, p.Prechecks, p.Signature, p.RedisChecks, p.RedisUpdate, p.Total)
}
5 changes: 5 additions & 0 deletions common/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,14 @@ func CreateTestBlockSubmission(t *testing.T, builderPubkey string, value *uint25
Message: bidTrace,
ExecutionPayload: &deneb.ExecutionPayload{ //nolint:exhaustruct
BaseFeePerGas: uint256.NewInt(0),
ExtraData: make([]byte, 32),
Transactions: make([]bellatrix.Transaction, 0),
Withdrawals: make([]*capella.Withdrawal, 0),
},
BlobsBundle: &builderApiDeneb.BlobsBundle{ //nolint:exhaustruct
Commitments: make([]deneb.KZGCommitment, 0),
Proofs: make([]deneb.KZGProof, 0),
Blobs: make([]deneb.Blob, 0),
},
Signature: phase0.BLSSignature{},
},
Expand Down
4 changes: 2 additions & 2 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type IDatabaseService interface {
GetValidatorRegistration(pubkey string) (*ValidatorRegistrationEntry, error)
GetValidatorRegistrationsForPubkeys(pubkeys []string) ([]*ValidatorRegistrationEntry, error)

SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.Profile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error)
SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.BlockSubmissionProfile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error)
GetBlockSubmissionEntry(slot uint64, proposerPubkey, blockHash string) (entry *BuilderBlockSubmissionEntry, err error)
GetBuilderSubmissions(filters GetBuilderSubmissionsFilters) ([]*BuilderBlockSubmissionEntry, error)
GetBuilderSubmissionsBySlots(slotFrom, slotTo uint64) (entries []*BuilderBlockSubmissionEntry, err error)
Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *DatabaseService) GetLatestValidatorRegistrations(timestampOnly bool) ([
return registrations, err
}

func (s *DatabaseService) SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.Profile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error) {
func (s *DatabaseService) SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.BlockSubmissionProfile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error) {
// Save execution_payload: insert, or if already exists update to be able to return the id ('on conflict do nothing' doesn't return an id)
execPayloadEntry, err := PayloadToExecPayloadEntry(payload)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
feeRecipient = bellatrix.ExecutionAddress{0x02}
blockHashStr = "0xa645370cc112c2e8e3cce121416c7dc849e773506d4b6fb9b752ada711355369"
testDBDSN = common.GetEnv("TEST_DB_DSN", "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")
profile = common.Profile{
profile = common.BlockSubmissionProfile{
Decode: 42,
Prechecks: 43,
Simulation: 44,
Expand Down
2 changes: 1 addition & 1 deletion database/mockdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (db MockDB) GetLatestValidatorRegistrations(timestampOnly bool) ([]*Validat
return nil, nil
}

func (db MockDB) SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.Profile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error) {
func (db MockDB) SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.BlockSubmissionProfile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error) {
return nil, nil
}

Expand Down
104 changes: 56 additions & 48 deletions datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

builderApi "github.com/attestantio/go-builder-client/api"
builderApiDeneb "github.com/attestantio/go-builder-client/api/deneb"
builderApiV1 "github.com/attestantio/go-builder-client/api/v1"
builderSpec "github.com/attestantio/go-builder-client/spec"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/capella"
Expand Down Expand Up @@ -496,31 +497,26 @@ type SaveBidAndUpdateTopBidResponse struct {
PrevTopBidValue *big.Int

TimePrep time.Duration
TimeSavePayload time.Duration
TimeSaveBid time.Duration
TimeSaveTrace time.Duration
TimeSavePayload time.Duration
TimeUpdateTopBid time.Duration
TimeUpdateFloor time.Duration
}

func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis.Pipeliner, trace *common.BidTraceV2WithBlobFields, payload *common.VersionedSubmitBlockRequest, getPayloadResponse *builderApi.VersionedSubmitBlindedBlockResponse, getHeaderResponse *builderSpec.VersionedSignedBuilderBid, reqReceivedAt time.Time, isCancellationEnabled bool, floorValue *big.Int) (state SaveBidAndUpdateTopBidResponse, err error) {
func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis.Pipeliner, trace *builderApiV1.BidTrace, blockSubmission *common.BlockSubmissionInfo, getPayloadResponse *builderApi.VersionedSubmitBlindedBlockResponse, getHeaderResponse *builderSpec.VersionedSignedBuilderBid, reqReceivedAt time.Time, isCancellationEnabled bool, floorValue *big.Int) (state SaveBidAndUpdateTopBidResponse, err error) {
var prevTime, nextTime time.Time
prevTime = time.Now()

submission, err := common.GetBlockSubmissionInfo(payload)
if err != nil {
return state, err
}

// Load latest bids for a given slot+parent+proposer
builderBids, err := NewBuilderBidsFromRedis(ctx, r, pipeliner, submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String())
builderBids, err := NewBuilderBidsFromRedis(ctx, r, pipeliner, trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String())
if err != nil {
return state, err
}

// Load floor value (if not passed in already)
if floorValue == nil {
floorValue, err = r.GetFloorBidValue(ctx, pipeliner, submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String())
floorValue, err = r.GetFloorBidValue(ctx, pipeliner, trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String())
if err != nil {
return state, err
}
Expand All @@ -534,7 +530,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis
state.PrevTopBidValue = state.TopBidValue

// Abort now if non-cancellation bid is lower than floor value
isBidAboveFloor := submission.BidTrace.Value.ToBig().Cmp(floorValue) == 1
isBidAboveFloor := trace.Value.ToBig().Cmp(floorValue) == 1
if !isCancellationEnabled && !isBidAboveFloor {
return state, nil
}
Expand All @@ -547,61 +543,73 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis
//
// Time to save things in Redis
//
// 1. Save the execution payload
switch payload.Version {
case spec.DataVersionCapella:
err = r.SaveExecutionPayloadCapella(ctx, pipeliner, submission.BidTrace.Slot, submission.BidTrace.ProposerPubkey.String(), submission.BidTrace.BlockHash.String(), getPayloadResponse.Capella)
if err != nil {
return state, err
}
case spec.DataVersionDeneb:
err = r.SavePayloadContentsDeneb(ctx, pipeliner, submission.BidTrace.Slot, submission.BidTrace.ProposerPubkey.String(), submission.BidTrace.BlockHash.String(), getPayloadResponse.Deneb)
if err != nil {
return state, err
}
case spec.DataVersionUnknown, spec.DataVersionPhase0, spec.DataVersionAltair, spec.DataVersionBellatrix:
return state, fmt.Errorf("unsupported payload version: %s", payload.Version) //nolint:goerr113
}

// Record time needed to save payload
nextTime = time.Now().UTC()
state.TimeSavePayload = nextTime.Sub(prevTime)
prevTime = nextTime

// 2. Save latest bid for this builder
err = r.SaveBuilderBid(ctx, pipeliner, submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String(), submission.BidTrace.BuilderPubkey.String(), reqReceivedAt, getHeaderResponse)
// 1. Save latest bid for this builder
err = r.SaveBuilderBid(ctx, pipeliner, trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String(), trace.BuilderPubkey.String(), reqReceivedAt, getHeaderResponse)
if err != nil {
return state, err
}
builderBids.bidValues[submission.BidTrace.BuilderPubkey.String()] = submission.BidTrace.Value.ToBig()
builderBids.bidValues[trace.BuilderPubkey.String()] = trace.Value.ToBig()

// Record time needed to save bid
nextTime = time.Now().UTC()
state.TimeSaveBid = nextTime.Sub(prevTime)
prevTime = nextTime

// 3. Save the bid trace
err = r.SaveBidTrace(ctx, pipeliner, trace)
if err != nil {
return state, err
// 2. Save the bid trace
if blockSubmission != nil {
bidTrace := common.BidTraceV2WithBlobFields{
BidTrace: *trace,
BlockNumber: blockSubmission.BlockNumber,
NumTx: uint64(len(blockSubmission.Transactions)),
NumBlobs: uint64(len(blockSubmission.Blobs)),
BlobGasUsed: blockSubmission.BlobGasUsed,
ExcessBlobGas: blockSubmission.ExcessBlobGas,
}
err = r.SaveBidTrace(ctx, pipeliner, &bidTrace)
if err != nil {
return state, err
}

// Record time needed to save trace
nextTime = time.Now().UTC()
state.TimeSaveTrace = nextTime.Sub(prevTime)
prevTime = nextTime
}

// Record time needed to save trace
nextTime = time.Now().UTC()
state.TimeSaveTrace = nextTime.Sub(prevTime)
prevTime = nextTime
// 3. Save the execution payload
if getPayloadResponse != nil {
switch getPayloadResponse.Version {
case spec.DataVersionCapella:
err = r.SaveExecutionPayloadCapella(ctx, pipeliner, trace.Slot, trace.ProposerPubkey.String(), trace.BlockHash.String(), getPayloadResponse.Capella)
if err != nil {
return state, err
}
case spec.DataVersionDeneb:
err = r.SavePayloadContentsDeneb(ctx, pipeliner, trace.Slot, trace.ProposerPubkey.String(), trace.BlockHash.String(), getPayloadResponse.Deneb)
if err != nil {
return state, err
}
case spec.DataVersionUnknown, spec.DataVersionPhase0, spec.DataVersionAltair, spec.DataVersionBellatrix:
return state, fmt.Errorf("unsupported payload version: %s", getPayloadResponse.Version) //nolint:goerr113
}

// Record time needed to save payload
nextTime = time.Now().UTC()
state.TimeSavePayload = nextTime.Sub(prevTime)
prevTime = nextTime
}

// If top bid value hasn't change, abort now
_, state.TopBidValue = builderBids.getTopBid()
if state.TopBidValue.Cmp(state.PrevTopBidValue) == 0 {
return state, nil
}

state, err = r._updateTopBid(ctx, pipeliner, state, builderBids, submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String(), floorValue)
state, err = r._updateTopBid(ctx, pipeliner, state, builderBids, trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String(), floorValue)
if err != nil {
return state, err
}
state.IsNewTopBid = submission.BidTrace.Value.ToBig().Cmp(state.TopBidValue) == 0
state.IsNewTopBid = trace.Value.ToBig().Cmp(state.TopBidValue) == 0
// An Exec happens in _updateTopBid.
state.WasBidSaved = true

Expand All @@ -615,8 +623,8 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis
}

// Non-cancellable bid above floor should set new floor
keyBidSource := r.keyLatestBidByBuilder(submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String(), submission.BidTrace.BuilderPubkey.String())
keyFloorBid := r.keyFloorBid(submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String())
keyBidSource := r.keyLatestBidByBuilder(trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String(), trace.BuilderPubkey.String())
keyFloorBid := r.keyFloorBid(trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String())
c := pipeliner.Copy(ctx, keyBidSource, keyFloorBid, 0, true)
_, err = pipeliner.Exec(ctx)
if err != nil {
Expand All @@ -634,8 +642,8 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis
return state, err
}

keyFloorBidValue := r.keyFloorBidValue(submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String())
err = pipeliner.Set(ctx, keyFloorBidValue, submission.BidTrace.Value.Dec(), expiryBidCache).Err()
keyFloorBidValue := r.keyFloorBidValue(trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String())
err = pipeliner.Set(ctx, keyFloorBidValue, trace.Value.Dec(), expiryBidCache).Err()
if err != nil {
return state, err
}
Expand Down
Loading

0 comments on commit 0ea2b26

Please sign in to comment.