Skip to content

Commit

Permalink
core: revert wire recaster component changes (#2877)
Browse files Browse the repository at this point in the history
Reverts #2752 as it was not working as expected and was blocking registrations health checks changes.

category: misc
ticket: #2669
  • Loading branch information
dB2510 authored Feb 13, 2024
1 parent f28b00c commit 3237887
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 38 deletions.
36 changes: 20 additions & 16 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,8 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

recaster, err := newRecaster(ctx, eth2Cl, cluster.Validators,
conf.BuilderAPI, conf.TestConfig.BroadcastCallback)
if err != nil {
if err = wireRecaster(ctx, eth2Cl, sched, sigAgg, broadcaster, cluster.Validators,
conf.BuilderAPI, conf.TestConfig.BroadcastCallback); err != nil {
return errors.Wrap(err, "wire recaster")
}

Expand All @@ -507,7 +506,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
core.WithTracking(track, inclusion),
core.WithAsyncRetry(retryer),
}
core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, recaster, opts...)
core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...)

err = wireValidatorMock(ctx, conf, eth2Cl, pubshares, sched)
if err != nil {
Expand Down Expand Up @@ -578,11 +577,12 @@ func wirePrioritise(ctx context.Context, conf Config, life *lifecycle.Manager, t
return nil
}

// newRecaster returns the rebroadcaster component with pre-generate registration stored in its memory.
// The wiring of recaster is done in core.Wire to support tracking of re-broadcasted duties.
func newRecaster(ctx context.Context, eth2Cl eth2wrap.Client, validators []*manifestpb.Validator, builderAPI bool,
// wireRecaster wires the rebroadcaster component to scheduler, sigAgg and broadcaster.
// This is not done in core.Wire since recaster isn't really part of the official core workflow (yet).
func wireRecaster(ctx context.Context, eth2Cl eth2wrap.Client, sched core.Scheduler, sigAgg core.SigAgg,
broadcaster core.Broadcaster, validators []*manifestpb.Validator, builderAPI bool,
callback func(context.Context, core.Duty, core.SignedDataSet) error,
) (core.Recaster, error) {
) error {
recaster, err := bcast.NewRecaster(func(ctx context.Context) (map[eth2p0.BLSPubKey]struct{}, error) {
valList, err := eth2Cl.ActiveValidators(ctx)
if err != nil {
Expand All @@ -598,15 +598,19 @@ func newRecaster(ctx context.Context, eth2Cl eth2wrap.Client, validators []*mani
return ret, nil
})
if err != nil {
return nil, errors.Wrap(err, "recaster init")
return errors.Wrap(err, "recaster init")
}

sched.SubscribeSlots(recaster.SlotTicked)
sigAgg.Subscribe(recaster.Store)
recaster.Subscribe(broadcaster.Broadcast)

if callback != nil {
recaster.Subscribe(callback)
}

if !builderAPI {
return recaster, nil
return nil
}

for _, val := range validators {
Expand All @@ -617,30 +621,30 @@ func newRecaster(ctx context.Context, eth2Cl eth2wrap.Client, validators []*mani

reg := new(eth2api.VersionedSignedValidatorRegistration)
if err := json.Unmarshal(val.BuilderRegistrationJson, reg); err != nil {
return nil, errors.Wrap(err, "unmarshal validator registration")
return errors.Wrap(err, "unmarshal validator registration")
}

pubkey, err := core.PubKeyFromBytes(val.PublicKey)
if err != nil {
return nil, errors.Wrap(err, "core pubkey from bytes")
return errors.Wrap(err, "core pubkey from bytes")
}

signedData, err := core.NewVersionedSignedValidatorRegistration(reg)
if err != nil {
return nil, errors.Wrap(err, "new versioned signed validator registration")
return errors.Wrap(err, "new versioned signed validator registration")
}

slot, err := slotFromTimestamp(ctx, eth2Cl, reg.V1.Message.Timestamp)
if err != nil {
return nil, errors.Wrap(err, "calculate slot from timestamp")
return errors.Wrap(err, "calculate slot from timestamp")
}

if err = recaster.Store(ctx, core.NewBuilderRegistrationDuty(slot), core.SignedDataSet{pubkey: signedData}); err != nil {
return nil, errors.Wrap(err, "recaster store registration")
return errors.Wrap(err, "recaster store registration")
}
}

return recaster, nil
return nil
}

// newTracker creates and starts a new tracker instance.
Expand Down
22 changes: 0 additions & 22 deletions core/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,6 @@ type Broadcaster interface {
Broadcast(context.Context, Duty, SignedDataSet) error
}

// Recaster rebroadcasts aggregated signed duty set periodically to the beacon node.
type Recaster interface {
// SlotTicked is called when new slots tick.
SlotTicked(context.Context, Slot) error

// Store stores aggregate signed duty for rebroadcasting.
Store(context.Context, Duty, SignedDataSet) error

// Subscribe subscribes to rebroadcasted duties.
Subscribe(func(context.Context, Duty, SignedDataSet) error)
}

// InclusionChecker checks whether submitted duties have been included on-chain.
// TODO(corver): Merge this with tracker below as a compose multi tracker.
type InclusionChecker interface {
Expand Down Expand Up @@ -254,9 +242,6 @@ type wireFuncs struct {
AggSigDBStore func(context.Context, Duty, SignedDataSet) error
AggSigDBAwait func(context.Context, Duty, PubKey) (SignedData, error)
BroadcasterBroadcast func(context.Context, Duty, SignedDataSet) error
RecasterSlotTicked func(context.Context, Slot) error
RecasterSubscribe func(func(context.Context, Duty, SignedDataSet) error)
RecasterStore func(context.Context, Duty, SignedDataSet) error
}

// WireOption defines a functional option to configure wiring.
Expand All @@ -273,7 +258,6 @@ func Wire(sched Scheduler,
sigAgg SigAgg,
aggSigDB AggSigDB,
bcast Broadcaster,
recast Recaster,
opts ...WireOption,
) {
w := wireFuncs{
Expand Down Expand Up @@ -314,9 +298,6 @@ func Wire(sched Scheduler,
AggSigDBStore: aggSigDB.Store,
AggSigDBAwait: aggSigDB.Await,
BroadcasterBroadcast: bcast.Broadcast,
RecasterSubscribe: recast.Subscribe,
RecasterSlotTicked: recast.SlotTicked,
RecasterStore: recast.Store,
}

for _, opt := range opts {
Expand Down Expand Up @@ -345,7 +326,4 @@ func Wire(sched Scheduler,
w.ParSigDBSubscribeThreshold(w.SigAggAggregate)
w.SigAggSubscribe(w.AggSigDBStore)
w.SigAggSubscribe(w.BroadcasterBroadcast)
w.SchedulerSubscribeSlots(w.RecasterSlotTicked)
w.SigAggSubscribe(w.RecasterStore)
w.RecasterSubscribe(w.BroadcasterBroadcast)
}

0 comments on commit 3237887

Please sign in to comment.