Skip to content

Commit

Permalink
core/consensus: add ShouldRun method (#2452)
Browse files Browse the repository at this point in the history
Adds a `ShouldRun` method to `instanceIO` to check if the instance was actually "running" rather than returning boolean from `getInstanceIO`.

category: bug
ticket: #2439
  • Loading branch information
dB2510 authored Jul 20, 2023
1 parent f14931a commit 8ad5803
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 8 deletions.
32 changes: 24 additions & 8 deletions core/consensus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func newInstanceIO() instanceIO {
return instanceIO{
participated: make(chan struct{}),
proposed: make(chan struct{}),
running: make(chan struct{}),
recvBuffer: make(chan msg, recvBuffer),
hashCh: make(chan [32]byte, 1),
valueCh: make(chan proto.Message, 1),
Expand All @@ -144,6 +145,7 @@ func newInstanceIO() instanceIO {
type instanceIO struct {
participated chan struct{} // Closed when Participate was called for this instance.
proposed chan struct{} // Closed when Propose was called for this instance.
running chan struct{} // Closed when runInstance was already called.
recvBuffer chan msg // Outer receive buffers.
hashCh chan [32]byte // Async input hash channel.
valueCh chan proto.Message // Async input value channel.
Expand Down Expand Up @@ -177,6 +179,20 @@ func (io *instanceIO) MarkProposed() error {
return nil
}

// ShouldRun checks the current status of the instance.
// If the instance is not already running, it returns true and marks the instance as running.
// It returns false if the instance is already running.
func (io *instanceIO) ShouldRun() bool {

This comment has been minimized.

Copy link
@corverroos

corverroos Jul 20, 2023

Contributor

this doesn;t need to be a pointer receiver, also what about renaming to MaybeStart returns true if the instance was't running and has been started by this call, otherwise it returns false if the instance was started in the past and is either running now or has completed?

select {
case <-io.running:
return false
default:
close(io.running)
}

return true
}

// New returns a new consensus QBFT component.
func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey,
deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance),
Expand Down Expand Up @@ -320,7 +336,7 @@ func (c *Component) propose(ctx context.Context, duty core.Duty, value proto.Mes
return err
}

inst, running := c.getInstanceIO(duty)
inst := c.getInstanceIO(duty)

if err := inst.MarkProposed(); err != nil {
return errors.Wrap(err, "propose consensus", z.Any("duty", duty))
Expand Down Expand Up @@ -351,7 +367,7 @@ func (c *Component) propose(ctx context.Context, duty core.Duty, value proto.Mes
}
}()

if running { // Participate was already called, instance is running.
if !inst.ShouldRun() { // Participate was already called, instance is running.
return <-inst.errCh
}

Expand All @@ -370,13 +386,13 @@ func (c *Component) Participate(ctx context.Context, duty core.Duty) error {
return nil // Not an eager start timer, wait for Propose to start.
}

inst, running := c.getInstanceIO(duty)
inst := c.getInstanceIO(duty)

if err := inst.MarkParticipated(); err != nil {
return errors.Wrap(err, "participate consensus", z.Any("duty", duty))
}

if running {
if !inst.ShouldRun() {
return nil // Instance already running.
}

Expand All @@ -398,7 +414,7 @@ func (c *Component) runInstance(ctx context.Context, duty core.Duty) (err error)
z.Any("timer", string(roundTimer.Type())),
)

inst, _ := c.getInstanceIO(duty)
inst := c.getInstanceIO(duty)
defer func() {
inst.errCh <- err // Send resulting error to errCh.
}()
Expand Down Expand Up @@ -539,7 +555,7 @@ func (c *Component) getRecvBuffer(duty core.Duty) chan msg {
}

// getInstanceIO returns the duty's instance and true if it were previously created.
func (c *Component) getInstanceIO(duty core.Duty) (instanceIO, bool) {
func (c *Component) getInstanceIO(duty core.Duty) instanceIO {
c.mutable.Lock()
defer c.mutable.Unlock()

Expand All @@ -548,10 +564,10 @@ func (c *Component) getInstanceIO(duty core.Duty) (instanceIO, bool) {
inst = newInstanceIO()
c.mutable.instances[duty] = inst

return inst, false
return inst
}

return inst, true
return inst
}

// deleteInstanceIO deletes the instanceIO for the duty.
Expand Down
111 changes: 111 additions & 0 deletions core/consensus/component_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,75 @@ func TestComponentHandle(t *testing.T) {
}
}

func TestInstanceIO_ShouldRun(t *testing.T) {
t.Run("ShouldRun for new instance", func(t *testing.T) {
inst1 := newInstanceIO()
require.True(t, inst1.ShouldRun())
require.False(t, inst1.ShouldRun())
})

t.Run("ShouldRun after handle", func(t *testing.T) {
var c Component
c.deadliner = testDeadliner{}
c.mutable.instances = make(map[core.Duty]instanceIO)

// Generate a p2p private key.
p2pKey := testutil.GenerateInsecureK1Key(t, 0)
c.pubkeys = make(map[int64]*k1.PublicKey)
c.pubkeys[0] = p2pKey.PubKey()

duty := core.Duty{Slot: 42, Type: 1}
msg := &pbv1.ConsensusMsg{
Msg: randomMsg(t),
}
msg = signConsensusMsg(t, msg, p2pKey, duty)

// It should create new instance of instanceIO for the given duty.
_, _, err := c.handle(context.Background(), "peerID", msg)
require.NoError(t, err)

inst, ok := c.mutable.instances[duty]
require.True(t, ok)
require.True(t, inst.ShouldRun())
require.False(t, inst.ShouldRun())
})

t.Run("Call Propose after handle", func(t *testing.T) {
ctx := context.Background()

var c Component
c.deadliner = testDeadliner{}
c.mutable.instances = make(map[core.Duty]instanceIO)
c.timerFunc = getTimerFunc()

// Generate a p2p private key pair.
p2pKey := testutil.GenerateInsecureK1Key(t, 0)
c.pubkeys = make(map[int64]*k1.PublicKey)
c.pubkeys[0] = p2pKey.PubKey()

duty := core.Duty{Slot: 42, Type: 1}
msg := &pbv1.ConsensusMsg{
Msg: randomMsg(t),
}
msg = signConsensusMsg(t, msg, p2pKey, duty)

// It should create new instance of instanceIO for the given duty.
_, _, err := c.handle(ctx, "peerID", msg)
require.NoError(t, err)

pubkey := testutil.RandomCorePubKey(t)

// Propose should internally mark instance as running by calling inst.ShouldRun().
err = c.Propose(ctx, duty, core.UnsignedDataSet{pubkey: testutil.RandomCoreAttestationData(t)})
require.Error(t, err) // It should return an error as no peers are specified.

// Check if ShouldRun is called before.
inst, ok := c.mutable.instances[duty]
require.True(t, ok)
require.False(t, inst.ShouldRun())
})
}

// testDeadliner is a mock deadliner implementation.
type testDeadliner struct {
deadlineChan chan core.Duty
Expand All @@ -466,3 +535,45 @@ func (testDeadliner) Add(core.Duty) bool {
func (t testDeadliner) C() <-chan core.Duty {
return t.deadlineChan
}

func signConsensusMsg(t *testing.T, msg *pbv1.ConsensusMsg, privKey *k1.PrivateKey, duty core.Duty) *pbv1.ConsensusMsg {
t.Helper()

msg.Msg.Duty.Type = int32(duty.Type)
msg.Msg.PeerIdx = 0
msg.Msg.Duty = &pbv1.Duty{
Slot: duty.Slot,
Type: int32(duty.Type),
}

// Sign the base message
msgHash, err := hashProto(msg.Msg)
require.NoError(t, err)

sign, err := k1util.Sign(privKey, msgHash[:])
require.NoError(t, err)

msg.Msg.Signature = sign

// construct a justification
msg.Justification = []*pbv1.QBFTMsg{
randomMsg(t),
}

msg.Justification[0].PeerIdx = 0
msg.Justification[0].Duty = &pbv1.Duty{
Slot: duty.Slot,
Type: int32(duty.Type),
}

// Sign the justification
justHash, err := hashProto(msg.Justification[0])
require.NoError(t, err)

justSign, err := k1util.Sign(privKey, justHash[:])
require.NoError(t, err)

msg.Justification[0].Signature = justSign

return msg
}

0 comments on commit 8ad5803

Please sign in to comment.