Skip to content

Commit

Permalink
feat(cannon): Wait for forks to activate per deriver (#268)
Browse files Browse the repository at this point in the history
* feat(cannon): Wait for forks to activate per deriver

* feat(cannon): Wait for forks to activate per deriver
  • Loading branch information
samcm authored Jan 12, 2024
1 parent c1d6911 commit f1fdc94
Show file tree
Hide file tree
Showing 15 changed files with 126 additions and 20 deletions.
59 changes: 51 additions & 8 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
//nolint:gosec // only exposed if pprofAddr config is set
_ "net/http/pprof"

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/beevik/ntp"
aBlockprint "github.com/ethpandaops/xatu/pkg/cannon/blockprint"
"github.com/ethpandaops/xatu/pkg/cannon/coordinator"
Expand Down Expand Up @@ -557,18 +558,60 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
return c.handleNewDecoratedEvents(ctx, events)
})

c.log.
WithField("deriver", deriver.Name()).
WithField("type", deriver.CannonType()).
Info("Starting cannon event deriver")

if err := deriver.Start(ctx); err != nil {
return err
}
go func() {
if err := c.startDeriverWhenReady(ctx, d); err != nil {
c.log.WithError(err).Error("Failed to start deriver")
}
}()
}

return nil
})

return nil
}

func (c *Cannon) startDeriverWhenReady(ctx context.Context, d deriver.EventDeriver) error {
for {
// Handle derivers that require phase0, since its not actually a fork it'll never appear
// in the spec.
if d.ActivationFork() != ethereum.ForkNamePhase0 {
spec, err := c.beacon.Node().Spec()
if err != nil {
c.log.WithError(err).Error("Failed to get spec")

time.Sleep(5 * time.Second)

continue
}

slot := c.beacon.Node().Wallclock().Slots().Current()

fork, err := spec.ForkEpochs.GetByName(d.ActivationFork())
if err != nil {
c.log.WithError(err).Errorf("unknown activation fork: %s", d.ActivationFork())

time.Sleep(5 * time.Second)

continue
}

if !fork.Active(phase0.Slot(slot.Number()), spec.SlotsPerEpoch) {
// Sleep until the next epochl and then retrty
c.log.Debug("Derived epoch is not active yet, sleeping until next epoch")

epoch := c.beacon.Metadata().Wallclock().Epochs().Current()

time.Sleep(time.Until(epoch.TimeWindow().End()))

continue
}
}

c.log.
WithField("deriver", d.Name()).
Info("Starting cannon event deriver")

return d.Start(ctx)
}
}
8 changes: 6 additions & 2 deletions pkg/cannon/deriver/beacon/eth/v1/beacon_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
)

type BeaconBlobDeriverConfig struct {
Enabled bool `yaml:"enabled" default:"false"`
Enabled bool `yaml:"enabled" default:"true"`
}

type BeaconBlobDeriver struct {
Expand All @@ -57,6 +57,10 @@ func (b *BeaconBlobDeriver) CannonType() xatu.CannonType {
return BeaconBlobDeriverName
}

func (b *BeaconBlobDeriver) ActivationFork() string {
return ethereum.ForkNameDeneb
}

func (b *BeaconBlobDeriver) Name() string {
return BeaconBlobDeriverName.String()
}
Expand All @@ -75,7 +79,7 @@ func (b *BeaconBlobDeriver) Start(ctx context.Context) error {
b.log.Info("Beacon blob deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v1/proposer_duty.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (b *ProposerDutyDeriver) CannonType() xatu.CannonType {
return ProposerDutyDeriverName
}

func (b *ProposerDutyDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *ProposerDutyDeriver) Name() string {
return ProposerDutyDeriverName.String()
}
Expand All @@ -74,7 +78,7 @@ func (b *ProposerDutyDeriver) Start(ctx context.Context) error {
b.log.Info("Proposer duty deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (a *AttesterSlashingDeriver) CannonType() xatu.CannonType {
return AttesterSlashingDeriverName
}

func (a *AttesterSlashingDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (a *AttesterSlashingDeriver) Name() string {
return AttesterSlashingDeriverName.String()
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/beacon_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (b *BeaconBlockDeriver) CannonType() xatu.CannonType {
return BeaconBlockDeriverName
}

func (b *BeaconBlockDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *BeaconBlockDeriver) Name() string {
return BeaconBlockDeriverName.String()
}
Expand All @@ -77,7 +81,7 @@ func (b *BeaconBlockDeriver) Start(ctx context.Context) error {
b.log.Info("Beacon block deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/bls_to_execution_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (b *BLSToExecutionChangeDeriver) OnEventsDerived(ctx context.Context, fn fu
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}

func (b *BLSToExecutionChangeDeriver) ActivationFork() string {
return ethereum.ForkNameCapella
}

func (b *BLSToExecutionChangeDeriver) Start(ctx context.Context) error {
if !b.cfg.Enabled {
b.log.Info("BLS to execution change deriver disabled")
Expand All @@ -74,7 +78,7 @@ func (b *BLSToExecutionChangeDeriver) Start(ctx context.Context) error {
b.log.Info("BLS to execution change deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (b *DepositDeriver) Name() string {
return DepositDeriverName.String()
}

func (b *DepositDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *DepositDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) {
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}
Expand All @@ -72,7 +76,7 @@ func (b *DepositDeriver) Start(ctx context.Context) error {
b.log.Info("Deposit deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/elaborated_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (b *ElaboratedAttestationDeriver) Name() string {
return ElaboratedAttestationDeriverName.String()
}

func (b *ElaboratedAttestationDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *ElaboratedAttestationDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) {
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}
Expand All @@ -73,7 +77,7 @@ func (b *ElaboratedAttestationDeriver) Start(ctx context.Context) error {
b.log.Info("Elaborated attestation deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/execution_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (b *ExecutionTransactionDeriver) CannonType() xatu.CannonType {
return ExecutionTransactionDeriverName
}

func (b *ExecutionTransactionDeriver) ActivationFork() string {
return ethereum.ForkNameBellatrix
}

func (b *ExecutionTransactionDeriver) Name() string {
return ExecutionTransactionDeriverName.String()
}
Expand All @@ -75,7 +79,7 @@ func (b *ExecutionTransactionDeriver) Start(ctx context.Context) error {
b.log.Info("Execution transaction deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/proposer_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (b *ProposerSlashingDeriver) Name() string {
return ProposerSlashingDeriverName.String()
}

func (b *ProposerSlashingDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *ProposerSlashingDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) {
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}
Expand All @@ -72,7 +76,7 @@ func (b *ProposerSlashingDeriver) Start(ctx context.Context) error {
b.log.Info("Proposer slashing deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/voluntary_exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (b *VoluntaryExitDeriver) CannonType() xatu.CannonType {
return VoluntaryExitDeriverName
}

func (b *VoluntaryExitDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *VoluntaryExitDeriver) Name() string {
return VoluntaryExitDeriverName.String()
}
Expand All @@ -72,7 +76,7 @@ func (b *VoluntaryExitDeriver) Start(ctx context.Context) error {
b.log.Info("Voluntary exit deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/beacon/eth/v2/withdrawal.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (b *WithdrawalDeriver) Name() string {
return WithdrawalDeriverName.String()
}

func (b *WithdrawalDeriver) ActivationFork() string {
return ethereum.ForkNameCapella
}

func (b *WithdrawalDeriver) OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error) {
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}
Expand All @@ -72,7 +76,7 @@ func (b *WithdrawalDeriver) Start(ctx context.Context) error {
b.log.Info("Withdrawal deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/cannon/deriver/blockprint/block_classification.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (b *BlockClassificationDeriver) CannonType() xatu.CannonType {
return BlockClassificationName
}

func (b *BlockClassificationDeriver) ActivationFork() string {
return ethereum.ForkNamePhase0
}

func (b *BlockClassificationDeriver) Name() string {
return BlockClassificationName.String()
}
Expand All @@ -85,7 +89,7 @@ func (b *BlockClassificationDeriver) Start(ctx context.Context) error {
b.log.Info("BlockClassification deriver enabled")

// Start our main loop
go b.run(ctx)
b.run(ctx)

return nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/cannon/deriver/event_deriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package deriver
import (
"context"

v1 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v1"
v2 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v2"
"github.com/ethpandaops/xatu/pkg/cannon/deriver/blockprint"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
Expand All @@ -15,6 +16,8 @@ type EventDeriver interface {
CannonType() xatu.CannonType
// Callbacks
OnEventsDerived(ctx context.Context, fn func(ctx context.Context, events []*xatu.DecoratedEvent) error)
// ActivationFork is the fork at which the deriver should start deriving events
ActivationFork() string
}

// Ensure that derivers implements the EventDeriver interface
Expand All @@ -28,3 +31,5 @@ var _ EventDeriver = &v2.WithdrawalDeriver{}
var _ EventDeriver = &v2.BeaconBlockDeriver{}
var _ EventDeriver = &blockprint.BlockClassificationDeriver{}
var _ EventDeriver = &v2.ElaboratedAttestationDeriver{}
var _ EventDeriver = &v1.ProposerDutyDeriver{}
var _ EventDeriver = &v1.BeaconBlobDeriver{}
10 changes: 10 additions & 0 deletions pkg/cannon/ethereum/forks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ethereum

var (
ForkNamePhase0 = "PHASE0"
ForkNameAltair = "ALTAIR"
ForkNameBellatrix = "BELLATRIX"
ForkNameCapella = "CAPELLA"
ForkNameDeneb = "DENEB"
ForkNameElectra = "ELECTRA"
)

0 comments on commit f1fdc94

Please sign in to comment.