Skip to content

Commit 055ac99

Browse files
authored
Merge pull request #6916 from onflow/tim/6854-consensus-matching-engine-componentmanager-refactor
Refactor Consensus Matching Engine: `engine.Unit` -> `ComponentManager`
2 parents 92d5d24 + 85e2881 commit 055ac99

File tree

2 files changed

+70
-102
lines changed

2 files changed

+70
-102
lines changed

engine/consensus/matching/engine.go

Lines changed: 53 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
sealing "github.com/onflow/flow-go/engine/consensus"
1212
"github.com/onflow/flow-go/model/flow"
1313
"github.com/onflow/flow-go/module"
14+
"github.com/onflow/flow-go/module/component"
15+
"github.com/onflow/flow-go/module/irrecoverable"
1416
"github.com/onflow/flow-go/module/metrics"
1517
"github.com/onflow/flow-go/network"
1618
"github.com/onflow/flow-go/network/channels"
@@ -27,7 +29,7 @@ const defaultIncorporatedBlockQueueCapacity = 10
2729
// Engine is a wrapper struct for `Core` which implements consensus algorithm.
2830
// Engine is responsible for handling incoming messages, queueing for processing, broadcasting proposals.
2931
type Engine struct {
30-
unit *engine.Unit
32+
component.Component
3133
log zerolog.Logger
3234
me module.Local
3335
core sealing.MatchingCore
@@ -69,7 +71,6 @@ func NewEngine(
6971

7072
e := &Engine{
7173
log: log.With().Str("engine", "matching.Engine").Logger(),
72-
unit: engine.NewUnit(),
7374
me: me,
7475
core: core,
7576
state: state,
@@ -83,6 +84,12 @@ func NewEngine(
8384
pendingIncorporatedBlocks: pendingIncorporatedBlocks,
8485
}
8586

87+
e.Component = component.NewComponentManagerBuilder().
88+
AddWorker(e.inboundEventsProcessingLoop).
89+
AddWorker(e.finalizationProcessingLoop).
90+
AddWorker(e.blockIncorporatedEventsProcessingLoop).
91+
Build()
92+
8693
// register engine with the receipt provider
8794
_, err = net.Register(channels.ReceiveReceipts, e)
8895
if err != nil {
@@ -92,79 +99,34 @@ func NewEngine(
9299
return e, nil
93100
}
94101

95-
// Ready returns a ready channel that is closed once the engine has fully
96-
// started. For consensus engine, this is true once the underlying consensus
97-
// algorithm has started.
98-
func (e *Engine) Ready() <-chan struct{} {
99-
e.unit.Launch(e.inboundEventsProcessingLoop)
100-
e.unit.Launch(e.finalizationProcessingLoop)
101-
e.unit.Launch(e.blockIncorporatedEventsProcessingLoop)
102-
return e.unit.Ready()
103-
}
104-
105-
// Done returns a done channel that is closed once the engine has fully stopped.
106-
// For the consensus engine, we wait for hotstuff to finish.
107-
func (e *Engine) Done() <-chan struct{} {
108-
return e.unit.Done()
109-
}
110-
111-
// SubmitLocal submits an event originating on the local node.
112-
func (e *Engine) SubmitLocal(event interface{}) {
113-
err := e.ProcessLocal(event)
114-
if err != nil {
115-
e.log.Fatal().Err(err).Msg("internal error processing event")
116-
}
117-
}
118-
119-
// Submit submits the given event from the node with the given origin ID
120-
// for processing in a non-blocking manner. It returns instantly and logs
121-
// a potential processing error internally when done.
122-
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
123-
err := e.Process(channel, originID, event)
124-
if err != nil {
125-
e.log.Fatal().Err(err).Msg("internal error processing event")
126-
}
127-
}
128-
129-
// ProcessLocal processes an event originating on the local node.
130-
func (e *Engine) ProcessLocal(event interface{}) error {
131-
return e.process(e.me.NodeID(), event)
132-
}
133-
134-
// Process processes the given event from the node with the given origin ID in
135-
// a blocking manner. It returns the potential processing error when done.
102+
// Process receives events from the network and checks their type,
103+
// before enqueuing them to be processed by a worker in a non-blocking manner.
104+
// No errors expected during normal operation (errors are logged instead).
136105
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error {
137-
err := e.process(originID, event)
138-
if err != nil {
139-
if engine.IsIncompatibleInputTypeError(err) {
140-
e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel)
141-
return nil
142-
}
143-
return fmt.Errorf("unexpected error while processing engine message: %w", err)
106+
receipt, ok := event.(*flow.ExecutionReceipt)
107+
if !ok {
108+
e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel)
109+
return nil
144110
}
111+
e.addReceiptToQueue(receipt)
145112
return nil
146113
}
147114

148-
// process events for the matching engine on the consensus node.
149-
func (e *Engine) process(originID flow.Identifier, event interface{}) error {
150-
receipt, ok := event.(*flow.ExecutionReceipt)
151-
if !ok {
152-
return fmt.Errorf("no matching processor for message of type %T from origin %x: %w", event, originID[:],
153-
engine.IncompatibleInputTypeError)
154-
}
115+
// addReceiptToQueue adds an execution receipt to the queue of the matching engine, to be processed by a worker
116+
func (e *Engine) addReceiptToQueue(receipt *flow.ExecutionReceipt) {
155117
e.metrics.MessageReceived(metrics.EngineSealing, metrics.MessageExecutionReceipt)
156118
e.pendingReceipts.Push(receipt)
157119
e.inboundEventsNotifier.Notify()
158-
return nil
159120
}
160121

161-
// HandleReceipt ingests receipts from the Requester module.
122+
// HandleReceipt ingests receipts from the Requester module, adding them to the queue.
162123
func (e *Engine) HandleReceipt(originID flow.Identifier, receipt flow.Entity) {
163124
e.log.Debug().Msg("received receipt from requester engine")
164-
err := e.process(originID, receipt)
165-
if err != nil {
166-
e.log.Fatal().Err(err).Msg("internal error processing event from requester module")
125+
r, ok := receipt.(*flow.ExecutionReceipt)
126+
if !ok {
127+
e.log.Fatal().Err(engine.IncompatibleInputTypeError).Msg("internal error processing event from requester module")
167128
}
129+
e.addReceiptToQueue(r)
168130
}
169131

170132
// OnFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer`
@@ -183,10 +145,10 @@ func (e *Engine) OnBlockIncorporated(incorporatedBlock *model.Block) {
183145
}
184146

185147
// processIncorporatedBlock selects receipts that were included into incorporated block and submits them
186-
// for further processing by matching core.
148+
// to the matching core for further processing.
187149
// Without the logic below, the sealing engine would produce IncorporatedResults
188150
// only from receipts received directly from ENs. sealing Core would not know about
189-
// Receipts that are incorporated by other nodes in their blocks blocks (but never
151+
// Receipts that are incorporated by other nodes in their blocks (but never
190152
// received directly from the EN).
191153
// No errors expected during normal operations.
192154
func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error {
@@ -205,61 +167,67 @@ func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error {
205167
return nil
206168
}
207169

208-
// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events
209-
func (e *Engine) finalizationProcessingLoop() {
170+
// finalizationProcessingLoop contains the logic for processing of finalization events.
171+
// This method is intended to be executed by a dedicated worker / goroutine.
172+
func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
210173
finalizationNotifier := e.finalizationEventsNotifier.Channel()
174+
ready()
211175
for {
212176
select {
213-
case <-e.unit.Quit():
177+
case <-ctx.Done():
214178
return
215179
case <-finalizationNotifier:
216180
err := e.core.OnBlockFinalization()
217181
if err != nil {
218-
e.log.Fatal().Err(err).Msg("could not process last finalized event")
182+
ctx.Throw(fmt.Errorf("could not process last finalized event: %w", err))
219183
}
220184
}
221185
}
222186
}
223187

224-
// blockIncorporatedEventsProcessingLoop is a separate goroutine for processing block incorporated events.
225-
func (e *Engine) blockIncorporatedEventsProcessingLoop() {
188+
// blockIncorporatedEventsProcessingLoop contains the logic for processing block incorporated events.
189+
// This method is intended to be executed by a dedicated worker / goroutine.
190+
func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
226191
c := e.blockIncorporatedNotifier.Channel()
227-
192+
ready()
228193
for {
229194
select {
230-
case <-e.unit.Quit():
195+
case <-ctx.Done():
231196
return
232197
case <-c:
233-
err := e.processBlockIncorporatedEvents()
198+
err := e.processBlockIncorporatedEvents(ctx)
234199
if err != nil {
235-
e.log.Fatal().Err(err).Msg("internal error processing block incorporated queued message")
200+
ctx.Throw(fmt.Errorf("internal error processing block incorporated queued message: %w", err))
236201
}
237202
}
238203
}
239204
}
240205

241-
func (e *Engine) inboundEventsProcessingLoop() {
206+
// inboundEventsProcessingLoop contains the logic for processing execution receipts, received
207+
// from the network via Process, from the Requester module via HandleReceipt, or from incorporated blocks.
208+
// This method is intended to be executed by a dedicated worker / goroutine.
209+
func (e *Engine) inboundEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
242210
c := e.inboundEventsNotifier.Channel()
243-
211+
ready()
244212
for {
245213
select {
246-
case <-e.unit.Quit():
214+
case <-ctx.Done():
247215
return
248216
case <-c:
249-
err := e.processAvailableEvents()
217+
err := e.processExecutionReceipts(ctx)
250218
if err != nil {
251-
e.log.Fatal().Err(err).Msg("internal error processing queued message")
219+
ctx.Throw(fmt.Errorf("internal error processing queued execution receipt: %w", err))
252220
}
253221
}
254222
}
255223
}
256224

257225
// processBlockIncorporatedEvents performs processing of block incorporated hot stuff events.
258226
// No errors expected during normal operations.
259-
func (e *Engine) processBlockIncorporatedEvents() error {
227+
func (e *Engine) processBlockIncorporatedEvents(ctx irrecoverable.SignalerContext) error {
260228
for {
261229
select {
262-
case <-e.unit.Quit():
230+
case <-ctx.Done():
263231
return nil
264232
default:
265233
}
@@ -279,27 +247,18 @@ func (e *Engine) processBlockIncorporatedEvents() error {
279247
}
280248
}
281249

282-
// processAvailableEvents processes _all_ available events (untrusted messages
250+
// processExecutionReceipts processes execution receipts
283251
// from other nodes as well as internally trusted.
284252
// No errors expected during normal operations.
285-
func (e *Engine) processAvailableEvents() error {
253+
func (e *Engine) processExecutionReceipts(ctx irrecoverable.SignalerContext) error {
286254
for {
287255
select {
288-
case <-e.unit.Quit():
256+
case <-ctx.Done():
289257
return nil
290258
default:
291259
}
292260

293-
msg, ok := e.pendingIncorporatedBlocks.Pop()
294-
if ok {
295-
err := e.processIncorporatedBlock(msg.(flow.Identifier))
296-
if err != nil {
297-
return fmt.Errorf("could not process incorporated block: %w", err)
298-
}
299-
continue
300-
}
301-
302-
msg, ok = e.pendingReceipts.Pop()
261+
msg, ok := e.pendingReceipts.Pop()
303262
if ok {
304263
err := e.core.ProcessReceipt(msg.(*flow.ExecutionReceipt))
305264
if err != nil {

engine/consensus/matching/engine_test.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package matching
22

33
import (
4+
"context"
45
"sync"
56
"testing"
67
"time"
@@ -10,9 +11,9 @@ import (
1011
"github.com/stretchr/testify/suite"
1112

1213
"github.com/onflow/flow-go/consensus/hotstuff/model"
13-
"github.com/onflow/flow-go/engine"
1414
mockconsensus "github.com/onflow/flow-go/engine/consensus/mock"
1515
"github.com/onflow/flow-go/model/flow"
16+
"github.com/onflow/flow-go/module/irrecoverable"
1617
"github.com/onflow/flow-go/module/metrics"
1718
mockmodule "github.com/onflow/flow-go/module/mock"
1819
"github.com/onflow/flow-go/network/channels"
@@ -36,6 +37,7 @@ type MatchingEngineSuite struct {
3637

3738
// Matching Engine
3839
engine *Engine
40+
cancel context.CancelFunc
3941
}
4042

4143
func (s *MatchingEngineSuite) SetupTest() {
@@ -57,7 +59,17 @@ func (s *MatchingEngineSuite) SetupTest() {
5759
s.engine, err = NewEngine(unittest.Logger(), net, me, metrics, metrics, s.state, s.receipts, s.index, s.core)
5860
require.NoError(s.T(), err)
5961

60-
<-s.engine.Ready()
62+
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(s.T(), context.Background())
63+
s.cancel = cancel
64+
s.engine.Start(ctx)
65+
unittest.AssertClosesBefore(s.T(), s.engine.Ready(), 10*time.Millisecond)
66+
}
67+
68+
func (s *MatchingEngineSuite) TearDownTest() {
69+
if s.cancel != nil {
70+
s.cancel()
71+
unittest.AssertClosesBefore(s.T(), s.engine.Done(), 10*time.Millisecond)
72+
}
6173
}
6274

6375
// TestOnFinalizedBlock tests if finalized block gets processed when send through `Engine`.
@@ -135,15 +147,12 @@ func (s *MatchingEngineSuite) TestMultipleProcessingItems() {
135147
s.core.AssertExpectations(s.T())
136148
}
137149

138-
// TestProcessUnsupportedMessageType tests that Process and ProcessLocal correctly handle a case where invalid message type
139-
// was submitted from network layer.
150+
// TestProcessUnsupportedMessageType tests that Process correctly handles a case where invalid message type
151+
// (byzantine message) was submitted from network layer.
140152
func (s *MatchingEngineSuite) TestProcessUnsupportedMessageType() {
141153
invalidEvent := uint64(42)
142154
err := s.engine.Process("ch", unittest.IdentifierFixture(), invalidEvent)
143155
// shouldn't result in error since byzantine inputs are expected
144156
require.NoError(s.T(), err)
145-
// in case of local processing error cannot be consumed since all inputs are trusted
146-
err = s.engine.ProcessLocal(invalidEvent)
147-
require.Error(s.T(), err)
148-
require.True(s.T(), engine.IsIncompatibleInputTypeError(err))
157+
// Local processing happens only via HandleReceipt, which will log.Fatal on invalid input
149158
}

0 commit comments

Comments
 (0)