diff --git a/core/consensus/component.go b/core/consensus/component.go index 0ac0724ec..f05e2bed3 100644 --- a/core/consensus/component.go +++ b/core/consensus/component.go @@ -131,6 +131,7 @@ func newInstanceIO() instanceIO { return instanceIO{ participated: make(chan struct{}), proposed: make(chan struct{}), + running: make(chan struct{}), recvBuffer: make(chan msg, recvBuffer), hashCh: make(chan [32]byte, 1), valueCh: make(chan proto.Message, 1), @@ -144,6 +145,7 @@ func newInstanceIO() instanceIO { type instanceIO struct { participated chan struct{} // Closed when Participate was called for this instance. proposed chan struct{} // Closed when Propose was called for this instance. + running chan struct{} // Closed when runInstance was already called. recvBuffer chan msg // Outer receive buffers. hashCh chan [32]byte // Async input hash channel. valueCh chan proto.Message // Async input value channel. @@ -177,6 +179,20 @@ func (io *instanceIO) MarkProposed() error { return nil } +// ShouldRun checks the current status of the instance. +// If the instance is not already running, it returns true and marks the instance as running. +// It returns false if the instance is already running. +func (io *instanceIO) ShouldRun() bool { + select { + case <-io.running: + return false + default: + close(io.running) + } + + return true +} + // New returns a new consensus QBFT component. func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey, deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance), @@ -320,7 +336,7 @@ func (c *Component) propose(ctx context.Context, duty core.Duty, value proto.Mes return err } - inst, running := c.getInstanceIO(duty) + inst := c.getInstanceIO(duty) if err := inst.MarkProposed(); err != nil { return errors.Wrap(err, "propose consensus", z.Any("duty", duty)) @@ -351,7 +367,7 @@ func (c *Component) propose(ctx context.Context, duty core.Duty, value proto.Mes } }() - if running { // Participate was already called, instance is running. + if !inst.ShouldRun() { // Participate was already called, instance is running. return <-inst.errCh } @@ -370,13 +386,13 @@ func (c *Component) Participate(ctx context.Context, duty core.Duty) error { return nil // Not an eager start timer, wait for Propose to start. } - inst, running := c.getInstanceIO(duty) + inst := c.getInstanceIO(duty) if err := inst.MarkParticipated(); err != nil { return errors.Wrap(err, "participate consensus", z.Any("duty", duty)) } - if running { + if !inst.ShouldRun() { return nil // Instance already running. } @@ -398,7 +414,7 @@ func (c *Component) runInstance(ctx context.Context, duty core.Duty) (err error) z.Any("timer", string(roundTimer.Type())), ) - inst, _ := c.getInstanceIO(duty) + inst := c.getInstanceIO(duty) defer func() { inst.errCh <- err // Send resulting error to errCh. }() @@ -539,7 +555,7 @@ func (c *Component) getRecvBuffer(duty core.Duty) chan msg { } // getInstanceIO returns the duty's instance and true if it were previously created. -func (c *Component) getInstanceIO(duty core.Duty) (instanceIO, bool) { +func (c *Component) getInstanceIO(duty core.Duty) instanceIO { c.mutable.Lock() defer c.mutable.Unlock() @@ -548,10 +564,10 @@ func (c *Component) getInstanceIO(duty core.Duty) (instanceIO, bool) { inst = newInstanceIO() c.mutable.instances[duty] = inst - return inst, false + return inst } - return inst, true + return inst } // deleteInstanceIO deletes the instanceIO for the duty. diff --git a/core/consensus/component_internal_test.go b/core/consensus/component_internal_test.go index 6b4ea68e1..2e2c39010 100644 --- a/core/consensus/component_internal_test.go +++ b/core/consensus/component_internal_test.go @@ -454,6 +454,75 @@ func TestComponentHandle(t *testing.T) { } } +func TestInstanceIO_ShouldRun(t *testing.T) { + t.Run("ShouldRun for new instance", func(t *testing.T) { + inst1 := newInstanceIO() + require.True(t, inst1.ShouldRun()) + require.False(t, inst1.ShouldRun()) + }) + + t.Run("ShouldRun after handle", func(t *testing.T) { + var c Component + c.deadliner = testDeadliner{} + c.mutable.instances = make(map[core.Duty]instanceIO) + + // Generate a p2p private key. + p2pKey := testutil.GenerateInsecureK1Key(t, 0) + c.pubkeys = make(map[int64]*k1.PublicKey) + c.pubkeys[0] = p2pKey.PubKey() + + duty := core.Duty{Slot: 42, Type: 1} + msg := &pbv1.ConsensusMsg{ + Msg: randomMsg(t), + } + msg = signConsensusMsg(t, msg, p2pKey, duty) + + // It should create new instance of instanceIO for the given duty. + _, _, err := c.handle(context.Background(), "peerID", msg) + require.NoError(t, err) + + inst, ok := c.mutable.instances[duty] + require.True(t, ok) + require.True(t, inst.ShouldRun()) + require.False(t, inst.ShouldRun()) + }) + + t.Run("Call Propose after handle", func(t *testing.T) { + ctx := context.Background() + + var c Component + c.deadliner = testDeadliner{} + c.mutable.instances = make(map[core.Duty]instanceIO) + c.timerFunc = getTimerFunc() + + // Generate a p2p private key pair. + p2pKey := testutil.GenerateInsecureK1Key(t, 0) + c.pubkeys = make(map[int64]*k1.PublicKey) + c.pubkeys[0] = p2pKey.PubKey() + + duty := core.Duty{Slot: 42, Type: 1} + msg := &pbv1.ConsensusMsg{ + Msg: randomMsg(t), + } + msg = signConsensusMsg(t, msg, p2pKey, duty) + + // It should create new instance of instanceIO for the given duty. + _, _, err := c.handle(ctx, "peerID", msg) + require.NoError(t, err) + + pubkey := testutil.RandomCorePubKey(t) + + // Propose should internally mark instance as running by calling inst.ShouldRun(). + err = c.Propose(ctx, duty, core.UnsignedDataSet{pubkey: testutil.RandomCoreAttestationData(t)}) + require.Error(t, err) // It should return an error as no peers are specified. + + // Check if ShouldRun is called before. + inst, ok := c.mutable.instances[duty] + require.True(t, ok) + require.False(t, inst.ShouldRun()) + }) +} + // testDeadliner is a mock deadliner implementation. type testDeadliner struct { deadlineChan chan core.Duty @@ -466,3 +535,45 @@ func (testDeadliner) Add(core.Duty) bool { func (t testDeadliner) C() <-chan core.Duty { return t.deadlineChan } + +func signConsensusMsg(t *testing.T, msg *pbv1.ConsensusMsg, privKey *k1.PrivateKey, duty core.Duty) *pbv1.ConsensusMsg { + t.Helper() + + msg.Msg.Duty.Type = int32(duty.Type) + msg.Msg.PeerIdx = 0 + msg.Msg.Duty = &pbv1.Duty{ + Slot: duty.Slot, + Type: int32(duty.Type), + } + + // Sign the base message + msgHash, err := hashProto(msg.Msg) + require.NoError(t, err) + + sign, err := k1util.Sign(privKey, msgHash[:]) + require.NoError(t, err) + + msg.Msg.Signature = sign + + // construct a justification + msg.Justification = []*pbv1.QBFTMsg{ + randomMsg(t), + } + + msg.Justification[0].PeerIdx = 0 + msg.Justification[0].Duty = &pbv1.Duty{ + Slot: duty.Slot, + Type: int32(duty.Type), + } + + // Sign the justification + justHash, err := hashProto(msg.Justification[0]) + require.NoError(t, err) + + justSign, err := k1util.Sign(privKey, justHash[:]) + require.NoError(t, err) + + msg.Justification[0].Signature = justSign + + return msg +}