Skip to content

Commit

Permalink
ctx in operator, validator runner and queue
Browse files Browse the repository at this point in the history
  • Loading branch information
anatolie-ssv committed Nov 13, 2024
1 parent 7c8b076 commit fbbee6e
Show file tree
Hide file tree
Showing 37 changed files with 298 additions and 252 deletions.
10 changes: 5 additions & 5 deletions operator/duties/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (

eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/operator/duties/dutystore"
)
Expand Down Expand Up @@ -79,7 +79,7 @@ func (h *AttesterHandler) HandleDuties(ctx context.Context) {
buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, slot, slot%32+1)
h.logger.Debug("🛠 ticker event", zap.String("epoch_slot_pos", buildStr))

h.processExecution(currentEpoch, slot)
h.processExecution(ctx, currentEpoch, slot) // TODO use the correct ctx here
h.processFetching(ctx, currentEpoch, slot)

slotsPerEpoch := h.network.Beacon.SlotsPerEpoch()
Expand Down Expand Up @@ -166,7 +166,7 @@ func (h *AttesterHandler) processFetching(ctx context.Context, epoch phase0.Epoc
}
}

func (h *AttesterHandler) processExecution(epoch phase0.Epoch, slot phase0.Slot) {
func (h *AttesterHandler) processExecution(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) {
duties := h.duties.CommitteeSlotDuties(epoch, slot)
if duties == nil {
return
Expand All @@ -192,7 +192,7 @@ func (h *AttesterHandler) processExecution(epoch phase0.Epoch, slot phase0.Slot)
}
}

h.dutiesExecutor.ExecuteDuties(h.logger, toExecute)
h.dutiesExecutor.ExecuteDuties(ctx, h.logger, toExecute)
}

func (h *AttesterHandler) fetchAndProcessDuties(ctx context.Context, epoch phase0.Epoch) error {
Expand Down
8 changes: 4 additions & 4 deletions operator/duties/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (

eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/operator/duties/dutystore"
)

Expand Down Expand Up @@ -67,7 +67,7 @@ func (h *CommitteeHandler) HandleDuties(ctx context.Context) {
}

h.logger.Debug("🛠 ticker event", zap.String("period_epoch_slot_pos", buildStr))
h.processExecution(period, epoch, slot)
h.processExecution(ctx, period, epoch, slot) // TODO use the correct ctx here

case <-h.reorg:
// do nothing
Expand All @@ -78,15 +78,15 @@ func (h *CommitteeHandler) HandleDuties(ctx context.Context) {
}
}

func (h *CommitteeHandler) processExecution(period uint64, epoch phase0.Epoch, slot phase0.Slot) {
func (h *CommitteeHandler) processExecution(ctx context.Context, period uint64, epoch phase0.Epoch, slot phase0.Slot) {
attDuties := h.attDuties.CommitteeSlotDuties(epoch, slot)
syncDuties := h.syncDuties.CommitteePeriodDuties(period)
if attDuties == nil && syncDuties == nil {
return
}

committeeMap := h.buildCommitteeDuties(attDuties, syncDuties, epoch, slot)
h.dutiesExecutor.ExecuteCommitteeDuties(h.logger, committeeMap)
h.dutiesExecutor.ExecuteCommitteeDuties(ctx, h.logger, committeeMap)
}

func (h *CommitteeHandler) buildCommitteeDuties(attDuties []*eth2apiv1.AttesterDuty, syncDuties []*eth2apiv1.SyncCommitteeDuty, epoch phase0.Epoch, slot phase0.Slot) committeeDutiesMap {
Expand Down
13 changes: 6 additions & 7 deletions operator/duties/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import (
"fmt"
"time"

genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"

eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/operator/duties/dutystore"
)
Expand Down Expand Up @@ -74,9 +73,9 @@ func (h *ProposerHandler) HandleDuties(ctx context.Context) {
h.fetchFirst = false
h.indicesChanged = false
h.processFetching(ctx, currentEpoch)
h.processExecution(currentEpoch, slot)
h.processExecution(ctx, currentEpoch, slot) // TODO use the correct ctx here
} else {
h.processExecution(currentEpoch, slot)
h.processExecution(ctx, currentEpoch, slot) // TODO use the correct ctx here
if h.indicesChanged {
h.indicesChanged = false
h.processFetching(ctx, currentEpoch)
Expand Down Expand Up @@ -130,7 +129,7 @@ func (h *ProposerHandler) processFetching(ctx context.Context, epoch phase0.Epoc
}
}

func (h *ProposerHandler) processExecution(epoch phase0.Epoch, slot phase0.Slot) {
func (h *ProposerHandler) processExecution(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) {
duties := h.duties.CommitteeSlotDuties(epoch, slot)
if duties == nil {
return
Expand All @@ -155,7 +154,7 @@ func (h *ProposerHandler) processExecution(epoch phase0.Epoch, slot phase0.Slot)
toExecute = append(toExecute, h.toSpecDuty(d, spectypes.BNRoleProposer))
}
}
h.dutiesExecutor.ExecuteDuties(h.logger, toExecute)
h.dutiesExecutor.ExecuteDuties(ctx, h.logger, toExecute)
}

func (h *ProposerHandler) fetchAndProcessDuties(ctx context.Context, epoch phase0.Epoch) error {
Expand Down
17 changes: 8 additions & 9 deletions operator/duties/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
spectypes "github.com/ssvlabs/ssv-spec/types"

"github.com/ssvlabs/ssv/beacon/goclient"
"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/logging/fields"
Expand Down Expand Up @@ -56,15 +55,15 @@ const (
// DutiesExecutor is an interface for executing duties.
type DutiesExecutor interface {
ExecuteGenesisDuties(logger *zap.Logger, duties []*genesisspectypes.Duty)
ExecuteDuties(logger *zap.Logger, duties []*spectypes.ValidatorDuty)
ExecuteCommitteeDuties(logger *zap.Logger, duties committeeDutiesMap)
ExecuteDuties(ctx context.Context, logger *zap.Logger, duties []*spectypes.ValidatorDuty)
ExecuteCommitteeDuties(ctx context.Context, logger *zap.Logger, duties committeeDutiesMap)
}

// DutyExecutor is an interface for executing duty.
type DutyExecutor interface {
ExecuteGenesisDuty(logger *zap.Logger, duty *genesisspectypes.Duty)
ExecuteDuty(logger *zap.Logger, duty *spectypes.ValidatorDuty)
ExecuteCommitteeDuty(logger *zap.Logger, committeeID spectypes.CommitteeID, duty *spectypes.CommitteeDuty)
ExecuteDuty(ctx context.Context, logger *zap.Logger, duty *spectypes.ValidatorDuty)
ExecuteCommitteeDuty(ctx context.Context, logger *zap.Logger, committeeID spectypes.CommitteeID, duty *spectypes.CommitteeDuty)
}

type BeaconNode interface {
Expand Down Expand Up @@ -394,7 +393,7 @@ func (s *Scheduler) ExecuteGenesisDuties(logger *zap.Logger, duties []*genesissp
}

// ExecuteDuties tries to execute the given duties
func (s *Scheduler) ExecuteDuties(logger *zap.Logger, duties []*spectypes.ValidatorDuty) {
func (s *Scheduler) ExecuteDuties(ctx context.Context, logger *zap.Logger, duties []*spectypes.ValidatorDuty) {
for _, duty := range duties {
duty := duty
logger := s.loggerWithDutyContext(logger, duty)
Expand All @@ -407,13 +406,13 @@ func (s *Scheduler) ExecuteDuties(logger *zap.Logger, duties []*spectypes.Valida
if duty.Type == spectypes.BNRoleAttester || duty.Type == spectypes.BNRoleSyncCommittee {
s.waitOneThirdOrValidBlock(duty.Slot)
}
s.dutyExecutor.ExecuteDuty(logger, duty)
s.dutyExecutor.ExecuteDuty(ctx, logger, duty)
}()
}
}

// ExecuteCommitteeDuties tries to execute the given committee duties
func (s *Scheduler) ExecuteCommitteeDuties(logger *zap.Logger, duties committeeDutiesMap) {
func (s *Scheduler) ExecuteCommitteeDuties(ctx context.Context, logger *zap.Logger, duties committeeDutiesMap) {
for _, committee := range duties {
duty := committee.duty
logger := s.loggerWithCommitteeDutyContext(logger, committee)
Expand All @@ -427,7 +426,7 @@ func (s *Scheduler) ExecuteCommitteeDuties(logger *zap.Logger, duties committeeD
slotDelayHistogram.Observe(float64(slotDelay.Milliseconds()))
go func() {
s.waitOneThirdOrValidBlock(duty.Slot)
s.dutyExecutor.ExecuteCommitteeDuty(logger, committee.id, duty)
s.dutyExecutor.ExecuteCommitteeDuty(ctx, logger, committee.id, duty)
}()
}
}
Expand Down
32 changes: 16 additions & 16 deletions operator/duties/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions operator/duties/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func setupSchedulerAndMocks(t *testing.T, handlers []dutyHandler, currentSlot *S
func setExecuteDutyFunc(s *Scheduler, executeDutiesCall chan []*spectypes.ValidatorDuty, executeDutiesCallSize int) {
executeDutiesBuffer := make(chan *spectypes.ValidatorDuty, executeDutiesCallSize)

s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteDuty(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteDuty(context.TODO(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(logger *zap.Logger, duty *spectypes.ValidatorDuty) error {
logger.Debug("🏃 Executing duty", zap.Any("duty", duty))
executeDutiesBuffer <- duty
Expand Down Expand Up @@ -218,14 +218,14 @@ func setExecuteGenesisDutyFunc(s *Scheduler, executeDutiesCall chan []*genesissp
func setExecuteDutyFuncs(s *Scheduler, executeDutiesCall chan committeeDutiesMap, executeDutiesCallSize int) {
executeDutiesBuffer := make(chan *committeeDuty, executeDutiesCallSize)

s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteDuty(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteDuty(context.TODO(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(logger *zap.Logger, duty *spectypes.ValidatorDuty) error {
logger.Debug("🏃 Executing duty", zap.Any("duty", duty))
return nil
},
).AnyTimes()

s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteCommitteeDuty(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteCommitteeDuty(context.TODO(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(logger *zap.Logger, committeeID spectypes.CommitteeID, duty *spectypes.CommitteeDuty) {
logger.Debug("🏃 Executing committee duty", zap.Any("duty", duty))
executeDutiesBuffer <- &committeeDuty{id: committeeID, duty: duty}
Expand Down
10 changes: 5 additions & 5 deletions operator/duties/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (

eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/operator/duties/dutystore"
)
Expand Down Expand Up @@ -87,7 +87,7 @@ func (h *SyncCommitteeHandler) HandleDuties(ctx context.Context) {
h.logger.Debug("🛠 ticker event", zap.String("period_epoch_slot_pos", buildStr))

ctx, cancel := context.WithDeadline(ctx, h.network.Beacon.GetSlotStartTime(slot+1).Add(100*time.Millisecond))
h.processExecution(period, slot)
h.processExecution(ctx, period, slot) // TODO use the correct ctx here
h.processFetching(ctx, period, true)
cancel()

Expand Down Expand Up @@ -162,7 +162,7 @@ func (h *SyncCommitteeHandler) processFetching(ctx context.Context, period uint6
}
}

func (h *SyncCommitteeHandler) processExecution(period uint64, slot phase0.Slot) {
func (h *SyncCommitteeHandler) processExecution(ctx context.Context, period uint64, slot phase0.Slot) {
// range over duties and execute
duties := h.duties.CommitteePeriodDuties(period)
if duties == nil {
Expand All @@ -189,7 +189,7 @@ func (h *SyncCommitteeHandler) processExecution(period uint64, slot phase0.Slot)
}
}

h.dutiesExecutor.ExecuteDuties(h.logger, toExecute)
h.dutiesExecutor.ExecuteDuties(ctx, h.logger, toExecute)
}

func (h *SyncCommitteeHandler) fetchAndProcessDuties(ctx context.Context, period uint64, waitForInitial bool) error {
Expand Down
8 changes: 4 additions & 4 deletions operator/duties/validatorregistration.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"encoding/hex"

genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"

"github.com/attestantio/go-eth2-client/spec/phase0"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
spectypes "github.com/ssvlabs/ssv-spec/types"
)

const validatorRegistrationEpochInterval = uint64(10)
Expand Down Expand Up @@ -66,7 +66,7 @@ func (h *ValidatorRegistrationHandler) HandleDuties(ctx context.Context) {
// no need for other params
}})
} else {
h.dutiesExecutor.ExecuteDuties(h.logger, []*spectypes.ValidatorDuty{{
h.dutiesExecutor.ExecuteDuties(ctx, h.logger, []*spectypes.ValidatorDuty{{ // TODO use the correct ctx here
Type: spectypes.BNRoleValidatorRegistration,
ValidatorIndex: share.ValidatorIndex,
PubKey: pk,
Expand Down
11 changes: 5 additions & 6 deletions operator/duties/voluntary_exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"context"
"math/big"

genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"

"github.com/attestantio/go-eth2-client/spec/phase0"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/operator/duties/dutystore"
)
Expand Down Expand Up @@ -59,7 +58,7 @@ func (h *VoluntaryExitHandler) HandleDuties(ctx context.Context) {
next = h.ticker.Next()

h.logger.Debug("🛠 ticker event", fields.Slot(currentSlot))
h.processExecution(currentSlot)
h.processExecution(ctx, currentSlot) // TODO use the right ctx here

case exitDescriptor, ok := <-h.validatorExitCh:
if !ok {
Expand Down Expand Up @@ -104,7 +103,7 @@ func (h *VoluntaryExitHandler) HandleDuties(ctx context.Context) {
}
}

func (h *VoluntaryExitHandler) processExecution(slot phase0.Slot) {
func (h *VoluntaryExitHandler) processExecution(ctx context.Context, slot phase0.Slot) {
var dutiesForExecution, pendingDuties []*spectypes.ValidatorDuty

for _, duty := range h.dutyQueue {
Expand Down Expand Up @@ -134,7 +133,7 @@ func (h *VoluntaryExitHandler) processExecution(slot phase0.Slot) {
}

if dutyCount := len(dutiesForExecution); dutyCount != 0 {
h.dutiesExecutor.ExecuteDuties(h.logger, dutiesForExecution)
h.dutiesExecutor.ExecuteDuties(ctx, h.logger, dutiesForExecution)
h.logger.Debug("executed voluntary exit duties",
fields.Slot(slot),
fields.Count(dutyCount))
Expand Down
Loading

0 comments on commit fbbee6e

Please sign in to comment.