Skip to content

Commit

Permalink
*: consensus abstraction (#3327)
Browse files Browse the repository at this point in the history
Summary of changes:
1. Introduced & implemented `ConsensusController` interface. 
2. Refactored the entire `consensus` package with increased test coverage.
3. Added `cluster run --consensus-protocol=xyz` flag.
4. Wired Priority protocol to change the current core consensus algorithm.
5. Support to run two consensus protocols at a time (one for Priority - QBFT and another is for Core).
6. Added `docs/consensus.md`

category: feature
ticket: #3304
  • Loading branch information
pinebit authored Nov 4, 2024
1 parent 45c1ef5 commit 3b7e052
Show file tree
Hide file tree
Showing 50 changed files with 1,738 additions and 636 deletions.
88 changes: 58 additions & 30 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ import (
"github.com/obolnetwork/charon/core/aggsigdb"
"github.com/obolnetwork/charon/core/bcast"
"github.com/obolnetwork/charon/core/consensus"
pbv1 "github.com/obolnetwork/charon/core/corepb/v1"
"github.com/obolnetwork/charon/core/consensus/protocols"
"github.com/obolnetwork/charon/core/consensus/qbft"
"github.com/obolnetwork/charon/core/dutydb"
"github.com/obolnetwork/charon/core/fetcher"
"github.com/obolnetwork/charon/core/infosync"
Expand Down Expand Up @@ -92,6 +93,7 @@ type Config struct {
SimnetBMockFuzz bool
TestnetConfig eth2util.Network
ProcDirectory string
ConsensusProtocol string

TestConfig TestConfig
}
Expand Down Expand Up @@ -257,8 +259,6 @@ func Run(ctx context.Context, conf Config) (err error) {

wirePeerInfo(life, tcpNode, peerIDs, cluster.GetInitialMutationHash(), sender, conf.BuilderAPI)

consensusDebugger := consensus.NewDebugger()

// seenPubkeys channel to send seen public keys from validatorapi to monitoringapi.
seenPubkeys := make(chan core.PubKey)
seenPubkeysFunc := func(pk core.PubKey) {
Expand All @@ -281,6 +281,8 @@ func Run(ctx context.Context, conf Config) (err error) {
return err
}

consensusDebugger := consensus.NewDebugger()

wireMonitoringAPI(ctx, life, conf.MonitoringAddr, conf.DebugAddr, tcpNode, eth2Cl, peerIDs,
promRegistry, consensusDebugger, pubkeys, seenPubkeys, vapiCalls, len(cluster.GetValidators()))

Expand Down Expand Up @@ -524,16 +526,25 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

retryer := retry.New[core.Duty](deadlineFunc)
retryer := retry.New(deadlineFunc)

cons, startCons, err := newConsensus(cluster, tcpNode, p2pKey, sender,
deadlinerFunc("consensus"), gaterFunc, consensusDebugger.AddInstance)
// Consensus
consensusController, err := consensus.NewConsensusController(
ctx, tcpNode, sender, peers, p2pKey,
deadlineFunc, gaterFunc, consensusDebugger)
if err != nil {
return err
}

defaultConsensus := consensusController.DefaultConsensus()
startConsensusCtrl := lifecycle.HookFuncCtx(consensusController.Start)

coreConsensus := consensusController.CurrentConsensus() // initially points to DefaultConsensus()

// Priority protocol always uses QBFTv2.
err = wirePrioritise(ctx, conf, life, tcpNode, peerIDs, int(cluster.GetThreshold()),
sender.SendReceive, cons, sched, p2pKey, deadlineFunc)
sender.SendReceive, defaultConsensus, sched, p2pKey, deadlineFunc,
consensusController, cluster.GetConsensusProtocol())
if err != nil {
return err
}
Expand All @@ -553,12 +564,13 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

// Core always uses the "current" consensus that is changed dynamically.
opts := []core.WireOption{
core.WithTracing(),
core.WithTracking(track, inclusion),
core.WithAsyncRetry(retryer),
}
core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...)
core.Wire(sched, fetch, coreConsensus, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...)

err = wireValidatorMock(ctx, conf, eth2Cl, pubshares, sched)
if err != nil {
Expand All @@ -570,7 +582,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
}

life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartScheduler, lifecycle.HookFuncErr(sched.Run))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PConsensus, startCons)
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PConsensus, startConsensusCtrl)
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartAggSigDB, lifecycle.HookFuncCtx(aggSigDB.Run))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartParSigDB, lifecycle.HookFuncCtx(parSigDB.Trim))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartTracker, lifecycle.HookFuncCtx(inclusion.Run))
Expand All @@ -585,8 +597,9 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
func wirePrioritise(ctx context.Context, conf Config, life *lifecycle.Manager, tcpNode host.Host,
peers []peer.ID, threshold int, sendFunc p2p.SendReceiveFunc, coreCons core.Consensus,
sched core.Scheduler, p2pKey *k1.PrivateKey, deadlineFunc func(duty core.Duty) (time.Time, bool),
consensusController core.ConsensusController, clusterPreferredProtocol string,
) error {
cons, ok := coreCons.(*consensus.Component)
cons, ok := coreCons.(*qbft.Consensus)
if !ok {
// Priority protocol not supported for leader cast.
return nil
Expand All @@ -602,9 +615,22 @@ func wirePrioritise(ctx context.Context, conf Config, life *lifecycle.Manager, t
return err
}

// The initial protocols order as defined by implementation is altered by:
// 1. Prioritizing the cluster (lock) preferred protocol to the top.
// 2. Prioritizing the protocol specified by CLI flag (cluster run) to the top.
// In all cases this prioritizes all versions of the protocol identified by name.
// The order of all these operations are important.
allProtocols := Protocols()
if clusterPreferredProtocol != "" {
allProtocols = protocols.PrioritizeProtocolsByName(clusterPreferredProtocol, allProtocols)
}
if conf.ConsensusProtocol != "" {
allProtocols = protocols.PrioritizeProtocolsByName(conf.ConsensusProtocol, allProtocols)
}

isync := infosync.New(prio,
version.Supported(),
Protocols(),
allProtocols,
ProposalTypes(conf.BuilderAPI, conf.SyntheticBlockProposals),
)

Expand All @@ -621,6 +647,26 @@ func wirePrioritise(ctx context.Context, conf Config, life *lifecycle.Manager, t
prio.Subscribe(conf.TestConfig.PrioritiseCallback)
}

prio.Subscribe(func(ctx context.Context, _ core.Duty, tr []priority.TopicResult) error {
for _, t := range tr {
if t.Topic == infosync.TopicProtocol {
allProtocols := t.PrioritiesOnly()
preferredConsensusProtocol := protocols.MostPreferredConsensusProtocol(allProtocols)
preferredConsensusProtocolID := protocol.ID(preferredConsensusProtocol)

if err := consensusController.SetCurrentConsensusForProtocol(ctx, preferredConsensusProtocolID); err != nil {
log.Error(ctx, "Failed to set current consensus protocol", err, z.Str("protocol", preferredConsensusProtocol))
} else {
log.Info(ctx, "Current consensus protocol changed", z.Str("protocol", preferredConsensusProtocol))
}

break
}
}

return nil
})

life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartPeerInfo, lifecycle.HookFuncCtx(prio.Start))

return nil
Expand Down Expand Up @@ -918,24 +964,6 @@ func configureEth2Client(ctx context.Context, forkVersion []byte, addrs []string
return eth2Cl, nil
}

// newConsensus returns a new consensus component and its start lifecycle hook.
func newConsensus(cluster *manifestpb.Cluster, tcpNode host.Host, p2pKey *k1.PrivateKey,
sender *p2p.Sender, deadliner core.Deadliner, gaterFunc core.DutyGaterFunc,
qbftSniffer func(*pbv1.SniffedConsensusInstance),
) (core.Consensus, lifecycle.IHookFunc, error) {
peers, err := manifest.ClusterPeers(cluster)
if err != nil {
return nil, nil, err
}

comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, gaterFunc, qbftSniffer)
if err != nil {
return nil, nil, err
}

return comp, lifecycle.HookFuncCtx(comp.Start), nil
}

// createMockValidators creates mock validators identified by their public shares.
func createMockValidators(pubkeys []eth2p0.BLSPubKey) beaconmock.ValidatorSet {
resp := make(beaconmock.ValidatorSet)
Expand Down Expand Up @@ -1079,7 +1107,7 @@ func (h httpServeHook) Call(context.Context) error {
// Protocols returns the list of supported Protocols in order of precedence.
func Protocols() []protocol.ID {
var resp []protocol.ID
resp = append(resp, consensus.Protocols()...)
resp = append(resp, protocols.Protocols()...)
resp = append(resp, parsigex.Protocols()...)
resp = append(resp, peerinfo.Protocols()...)
resp = append(resp, priority.Protocols()...)
Expand Down
13 changes: 7 additions & 6 deletions cluster/manifest/mutationlegacylock.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,13 @@ func transformLegacyLock(input *manifestpb.Cluster, signed *manifestpb.SignedMut
}

return &manifestpb.Cluster{
Name: lock.Name,
Threshold: int32(lock.Threshold),
DkgAlgorithm: lock.DKGAlgorithm,
ForkVersion: lock.ForkVersion,
Validators: vals,
Operators: ops,
Name: lock.Name,
Threshold: int32(lock.Threshold),
DkgAlgorithm: lock.DKGAlgorithm,
ForkVersion: lock.ForkVersion,
ConsensusProtocol: lock.ConsensusProtocol,
Validators: vals,
Operators: ops,
}, nil
}

Expand Down
Loading

0 comments on commit 3b7e052

Please sign in to comment.