Skip to content

Commit

Permalink
Caplin: Fixed up validator's attestation rate during epoch slots (#12377
Browse files Browse the repository at this point in the history
)

Fixes:
- Optimized `head` retrieval in the beacon API.
- Optimized `GetHead`
- Fixed timeouts on missing peers with libp2p
- Add parallel peer forwarding when not enough peers for a sub-topic are
present
- Fixed attestation producer for a bunch of fucking annoying edge cases.

---------

Co-authored-by: Kewei <kewei.train@gmail.com>
  • Loading branch information
Giulio2002 and domiwei authored Oct 28, 2024
1 parent 04aa6bb commit 0ff97f9
Show file tree
Hide file tree
Showing 31 changed files with 443 additions and 209 deletions.
4 changes: 2 additions & 2 deletions cl/beacon/handler/attestation_rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ func (a *ApiHandler) PostEthV1BeaconRewardsAttestations(w http.ResponseWriter, r
if err != nil {
return nil, err
}
_, headSlot, err := a.forkchoiceStore.GetHead()
_, headSlot, statusCode, err := a.getHead()
if err != nil {
return nil, err
return nil, beaconhttp.NewEndpointError(statusCode, err)
}
headEpoch := headSlot / a.beaconChainCfg.SlotsPerEpoch
if epoch > headEpoch {
Expand Down
74 changes: 59 additions & 15 deletions cl/beacon/handler/block_production.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/beacon/beaconhttp"
"github.com/erigontech/erigon/cl/beacon/builder"
"github.com/erigontech/erigon/cl/beacon/synced_data"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes"
"github.com/erigontech/erigon/cl/cltypes/solid"
Expand Down Expand Up @@ -75,6 +76,42 @@ var (

var defaultGraffitiString = "Caplin"

const missedTimeout = 500 * time.Millisecond

func (a *ApiHandler) waitUntilHeadStateAtEpochIsReadyOrCountAsMissed(ctx context.Context, syncedData synced_data.SyncedData, epoch uint64) error {
timer := time.NewTimer(missedTimeout)
checkIfSlotIsThere := func() (bool, error) {
tx, err := a.indiciesDB.BeginRo(ctx)
if err != nil {
return false, err
}
defer tx.Rollback()
blockRoot, err := beacon_indicies.ReadCanonicalBlockRoot(tx, epoch*a.beaconChainCfg.SlotsPerEpoch)
if err != nil {
return false, err
}
return blockRoot != (libcommon.Hash{}), nil
}

defer timer.Stop()
for {
select {
case <-timer.C:
return nil
case <-ctx.Done():
return fmt.Errorf("waiting for head state to reach slot %d: %w", epoch, ctx.Err())
default:
}
ready, err := checkIfSlotIsThere()
if err != nil {
return err
}
if ready {
return nil
}
time.Sleep(30 * time.Millisecond)
}
}
func (a *ApiHandler) GetEthV1ValidatorAttestationData(
w http.ResponseWriter,
r *http.Request,
Expand All @@ -83,6 +120,24 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err)
}
// wait until the head state is at the target slot or later
err = a.waitUntilHeadStateAtEpochIsReadyOrCountAsMissed(r.Context(), a.syncedData, *slot/a.beaconChainCfg.SlotsPerEpoch)
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusServiceUnavailable, err)
}
tx, err := a.indiciesDB.BeginRo(r.Context())
if err != nil {
return nil, err
}
defer tx.Rollback()
headState := a.syncedData.HeadState()
if headState == nil {
return nil, beaconhttp.NewEndpointError(
http.StatusServiceUnavailable,
errors.New("beacon node is still syncing"),
)
}

committeeIndex, err := beaconhttp.Uint64FromQueryParams(r, "committee_index")
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err)
Expand All @@ -105,16 +160,10 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
committeeIndex = &zero
}

headState := a.syncedData.HeadState()
if headState == nil {
return nil, beaconhttp.NewEndpointError(
http.StatusServiceUnavailable,
errors.New("beacon node is still syncing"),
)
}

attestationData, err := a.attestationProducer.ProduceAndCacheAttestationData(
tx,
headState,
a.syncedData.HeadRoot(),
*slot,
*committeeIndex,
)
Expand Down Expand Up @@ -188,12 +237,7 @@ func (a *ApiHandler) GetEthV3ValidatorBlock(
)
}

baseBlockRoot, err := s.BlockRoot()
if err != nil {
log.Warn("Failed to get block root", "err", err)
return nil, err
}

baseBlockRoot := a.syncedData.HeadRoot()
sourceBlock, err := a.blockReader.ReadBlockByRoot(ctx, tx, baseBlockRoot)
if err != nil {
log.Warn("Failed to get source block", "err", err, "root", baseBlockRoot)
Expand Down Expand Up @@ -1161,7 +1205,7 @@ func (a *ApiHandler) findBestAttestationsForBlockProduction(
for _, att := range atts {
expectedReward, err := computeAttestationReward(s, att)
if err != nil {
log.Warn("[Block Production] Could not compute expected attestation reward", "reason", err)
log.Debug("[Block Production] Could not compute expected attestation reward", "reason", err)
continue
}
if expectedReward == 0 {
Expand Down
11 changes: 4 additions & 7 deletions cl/beacon/handler/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,13 @@ type headerResponse struct {
Header *cltypes.SignedBeaconBlockHeader `json:"header"`
}

type getHeadersRequest struct {
Slot *uint64 `json:"slot,omitempty,string"`
ParentRoot *libcommon.Hash `json:"root,omitempty"`
}

func (a *ApiHandler) rootFromBlockId(ctx context.Context, tx kv.Tx, blockId *beaconhttp.SegmentID) (root libcommon.Hash, err error) {
switch {
case blockId.Head():
root, _, err = a.forkchoiceStore.GetHead()
var statusCode int
root, _, statusCode, err = a.getHead()
if err != nil {
return libcommon.Hash{}, err
return libcommon.Hash{}, beaconhttp.NewEndpointError(statusCode, err)
}
case blockId.Finalized():
root = a.forkchoiceStore.FinalizedCheckpoint().Root
Expand Down Expand Up @@ -201,6 +197,7 @@ func (a *ApiHandler) GetEthV1BeaconBlockRoot(w http.ResponseWriter, r *http.Requ
return nil, err
}
isOptimistic := a.forkchoiceStore.IsRootOptimistic(root)

// check if the root exist
slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, root)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cl/beacon/handler/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ func (a *ApiHandler) GetEth1V1BuilderStatesExpectedWithdrawals(w http.ResponseWr
if a.beaconChainCfg.GetCurrentStateVersion(*slot/a.beaconChainCfg.SlotsPerEpoch) < clparams.CapellaVersion {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, errors.New("the specified state is not a capella state"))
}
headRoot, _, err := a.forkchoiceStore.GetHead()
headRoot, _, statusCode, err := a.getHead()
if err != nil {
return nil, err
return nil, beaconhttp.NewEndpointError(statusCode, err)
}

if a.syncedData.Syncing() {
return nil, beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("beacon node is syncing"))
}
Expand Down
8 changes: 4 additions & 4 deletions cl/beacon/handler/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ func (a *ApiHandler) GetEthV2DebugBeaconHeads(w http.ResponseWriter, r *http.Req
if a.syncedData.Syncing() {
return nil, beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("beacon node is syncing"))
}
hash, slotNumber, err := a.forkchoiceStore.GetHead()
root, slot, statusCode, err := a.getHead()
if err != nil {
return nil, err
return nil, beaconhttp.NewEndpointError(statusCode, err)
}
return newBeaconResponse(
[]interface{}{
map[string]interface{}{
"slot": strconv.FormatUint(slotNumber, 10),
"root": hash,
"slot": strconv.FormatUint(slot, 10),
"root": root,
"execution_optimistic": false,
},
}), nil
Expand Down
18 changes: 18 additions & 0 deletions cl/beacon/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package handler

import (
"errors"
"net/http"
"sync"

Expand Down Expand Up @@ -107,6 +108,7 @@ type ApiHandler struct {
proposerSlashingService services.ProposerSlashingService
builderClient builder.BuilderClient
validatorsMonitor monitor.ValidatorMonitor
enableMemoizedHeadState bool
}

func NewApiHandler(
Expand Down Expand Up @@ -141,6 +143,7 @@ func NewApiHandler(
proposerSlashingService services.ProposerSlashingService,
builderClient builder.BuilderClient,
validatorMonitor monitor.ValidatorMonitor,
enableMemoizedHeadState bool,
) *ApiHandler {
blobBundles, err := lru.New[common.Bytes48, BlobBundle]("blobs", maxBlobBundleCacheSize)
if err != nil {
Expand Down Expand Up @@ -183,6 +186,7 @@ func NewApiHandler(
proposerSlashingService: proposerSlashingService,
builderClient: builderClient,
validatorsMonitor: validatorMonitor,
enableMemoizedHeadState: enableMemoizedHeadState,
}
}

Expand Down Expand Up @@ -352,3 +356,17 @@ func (a *ApiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
})
a.mux.ServeHTTP(w, r)
}

func (a *ApiHandler) getHead() (common.Hash, uint64, int, error) {
if a.enableMemoizedHeadState {
if a.syncedData.Syncing() {
return common.Hash{}, 0, http.StatusServiceUnavailable, errors.New("beacon node is syncing")
}
return a.syncedData.HeadRoot(), a.syncedData.HeadSlot(), 0, nil
}
blockRoot, blockSlot, err := a.forkchoiceStore.GetHead(nil)
if err != nil {
return common.Hash{}, 0, http.StatusInternalServerError, err
}
return blockRoot, blockSlot, 0, nil
}
5 changes: 1 addition & 4 deletions cl/beacon/handler/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ func (a *ApiHandler) blockRootFromStateId(ctx context.Context, tx kv.Tx, stateId

switch {
case stateId.Head():
root, _, err = a.forkchoiceStore.GetHead()
if err != nil {
return libcommon.Hash{}, http.StatusInternalServerError, err
}
root, _, httpStatusErr, err = a.getHead()
return
case stateId.Finalized():
root = a.forkchoiceStore.FinalizedCheckpoint().Root
Expand Down
1 change: 1 addition & 0 deletions cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
proposerSlashingService,
nil,
mockValidatorMonitor,
false,
) // TODO: add tests
h.Init()
return
Expand Down
1 change: 1 addition & 0 deletions cl/beacon/handler/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (t *validatorTestSuite) SetupTest() {
nil,
nil,
nil,
false,
)
t.gomockCtrl = gomockCtrl
}
Expand Down
2 changes: 2 additions & 0 deletions cl/beacon/synced_data/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package synced_data

import (
"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/phase1/core/state"
)
Expand All @@ -29,4 +30,5 @@ type SyncedData interface {
HeadStateMutator() abstract.BeaconStateMutator
Syncing() bool
HeadSlot() uint64
HeadRoot() common.Hash
}
48 changes: 43 additions & 5 deletions cl/beacon/synced_data/mock_services/synced_data_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0ff97f9

Please sign in to comment.