Skip to content

Commit

Permalink
Completed unit tested function for block listeners
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst committed Jun 10, 2024
1 parent 4a72a03 commit fbce717
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 1 deletion.
1 change: 1 addition & 0 deletions internal/confirmations/confirmed_block_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (cbl *confirmedBlockListener) processBlockNotification(block *apitypes.Bloc
if cbl.waitingForFromBlock {
// by definition we won't find anything in cbl.blocksSinceCheckpoint below
cbl.fromBlock = block.BlockNumber.Uint64()
cbl.waitingForFromBlock = false
} else if block.BlockNumber.Uint64() < cbl.fromBlock {
log.L(cbl.ctx).Debugf("Notification of block %d/%s < fromBlock %d", block.BlockNumber, block.BlockHash, cbl.fromBlock)
return
Expand Down
60 changes: 60 additions & 0 deletions internal/confirmations/confirmed_block_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,52 @@ func TestCBLCatchUpToHeadFromZeroWithConfirmations(t *testing.T) {
mca.AssertExpectations(t)
}

func TestCBLListenFromCurrentBlock(t *testing.T) {
bcm, mca := newTestBlockConfirmationManager()

esDispatch := make(chan *ffcapi.ListenerEvent)

id := fftypes.NewUUID()

blocks := testBlockArray(15)

mbiNum := mca.On("BlockInfoByNumber", mock.Anything, mock.Anything)
mbiNum.Run(func(args mock.Arguments) { mockBlockNumberReturn(mbiNum, args, blocks) })

mbiHash := mca.On("BlockInfoByHash", mock.Anything, mock.Anything)
mbiHash.Run(func(args mock.Arguments) { mockBlockHashReturn(mbiHash, args, blocks) })

bcm.requiredConfirmations = 5
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch)
assert.NoError(t, err)

// Notify starting at block 5
bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{
BlockHashes: []string{blocks[5].BlockHash},
})

// Randomly notify below that too, which will be ignored
bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{
BlockHashes: []string{blocks[1].BlockHash},
})

for i := 5; i < len(blocks)-bcm.requiredConfirmations; i++ {
b := <-esDispatch
assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo)
}

time.Sleep(1 * time.Millisecond)
assert.Len(t, cbl.blocksSinceCheckpoint, bcm.requiredConfirmations)
select {
case <-esDispatch:
assert.Fail(t, "should not have received block in confirmation window")
default: // good - we should have the confirmations sat there, but no dispatch
}

bcm.Stop()
mca.AssertExpectations(t)
}

func TestCBLHandleReorgInConfirmationWindow1(t *testing.T) {
// test where the reorg happens at the edge of the confirmation window
testCBLHandleReorgInConfirmationWindow(t,
Expand Down Expand Up @@ -253,6 +299,20 @@ func TestCBLHandleRandomConflictingBlockNotification(t *testing.T) {
mca.AssertExpectations(t)
}

func TestCBLStartBadFromBlock(t *testing.T) {
bcm, mca := newTestBlockConfirmationManager()

esDispatch := make(chan *ffcapi.ListenerEvent)

id := fftypes.NewUUID()

_, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "wrong", nil, esDispatch)
assert.Regexp(t, "FF21090", err)

bcm.Stop()
mca.AssertExpectations(t)
}

func TestProcessBlockHashesSwallowsFailure(t *testing.T) {
bcm, mca := newTestBlockConfirmationManager()
cbl := &confirmedBlockListener{
Expand Down
4 changes: 4 additions & 0 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ func (es *eventStream) AddOrUpdateListener(ctx context.Context, id *fftypes.UUID
}
}
} else if isNew && startedState != nil {
if l.spec.Type != nil && *l.spec.Type == apitypes.ListenerTypeBlocks {
return spec, l.es.confirmations.StartConfirmedBlockListener(ctx, l.spec.ID, *l.spec.FromBlock, nil /* new so no checkpoint */, es.batchChannel)
}
// Start the new listener - no checkpoint needed here
return spec, l.start(startedState, nil)
}
Expand Down Expand Up @@ -726,6 +729,7 @@ func (es *eventStream) checkConfirmedEventForBatch(e *ffcapi.ListenerEvent) (l *
listenerID = e.BlockEvent.ListenerID
eToLog = e.BlockEvent
default:
log.L(es.bgCtx).Errorf("Invalid event cannot be dispatched: %+v", e)
return nil, nil
}
es.mux.Lock()
Expand Down
136 changes: 135 additions & 1 deletion internal/events/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,18 +463,25 @@ func TestWebSocketEventStreamsE2EBlocks(t *testing.T) {
}), mock.Anything).Run(func(args mock.Arguments) {
started <- args[4].(chan<- *ffcapi.ListenerEvent)
}).Return(nil)
mcm.On("StopConfirmedBlockListener", mock.Anything, l.ID).Return(nil)

msp := es.persistence.(*persistencemocks.Persistence)
// load existing checkpoint on start
msp.On("GetCheckpoint", mock.Anything, mock.Anything).Return(&apitypes.EventStreamCheckpoint{
StreamID: es.spec.ID,
Time: fftypes.Now(),
Listeners: map[fftypes.UUID]json.RawMessage{
*l.ID: []byte(`{"block":10000}`),
},
}, nil) // existing checkpoint
}, nil)
// write a valid checkpoint
msp.On("WriteCheckpoint", mock.Anything, mock.MatchedBy(func(cp *apitypes.EventStreamCheckpoint) bool {
return cp.StreamID.Equals(es.spec.ID) && string(cp.Listeners[*l.ID]) == `{"block":10001}`
})).Return(nil)
// write a checkpoint when we delete
msp.On("WriteCheckpoint", mock.Anything, mock.MatchedBy(func(cp *apitypes.EventStreamCheckpoint) bool {
return cp.StreamID.Equals(es.spec.ID) && cp.Listeners[*l.ID] == nil
})).Return(nil)

senderChannel, _, receiverChannel := mockWSChannels(es.wsChannels.(*wsmocks.WebSocketChannels))

Expand Down Expand Up @@ -515,6 +522,9 @@ func TestWebSocketEventStreamsE2EBlocks(t *testing.T) {
},
}

err = es.RemoveListener(es.bgCtx, l.ID)
assert.NoError(t, err)

err = es.Stop(es.bgCtx)
assert.NoError(t, err)

Expand Down Expand Up @@ -1157,6 +1167,103 @@ func TestStartWithExistingBlockListener(t *testing.T) {
mfc.AssertExpectations(t)
}

func TestStartAndAddBadListenerType(t *testing.T) {

l := &apitypes.Listener{
ID: fftypes.NewUUID(),
Name: strPtr("ut_listener"),
Type: (*fftypes.FFEnum)(strPtr("wrong")),
}

mfc := &ffcapimocks.API{}

es, err := newTestEventStreamWithListener(t, mfc, `{
"name": "ut_stream"
}`)
assert.NoError(t, err)

_, err = es.AddOrUpdateListener(es.bgCtx, l.ID, l, false)
assert.Regexp(t, "FF21089.*wrong", err)

mfc.AssertExpectations(t)
}

func TestStartWithBlockListenerFailBeforeStart(t *testing.T) {

es := newTestEventStream(t, `{
"name": "ut_stream",
"websocket": {
"topic": "ut_stream"
}
}`)

l := &apitypes.Listener{
ID: apitypes.NewULID(),
Name: strPtr("ut_listener"),
Type: &apitypes.ListenerTypeBlocks,
FromBlock: strPtr(ffcapi.FromBlockLatest),
}

mfc := es.connector.(*ffcapimocks.API)
mfc.On("EventStreamStart", mock.Anything, mock.Anything).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil)
mfc.On("EventStreamStopped", mock.Anything, mock.Anything).Return(&ffcapi.EventStreamStoppedResponse{}, ffcapi.ErrorReason(""), nil)

mcm := es.confirmations.(*confirmationsmocks.Manager)
mcm.On("StartConfirmedBlockListener", mock.Anything, l.ID, "latest", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))

msp := es.persistence.(*persistencemocks.Persistence)
msp.On("GetCheckpoint", mock.Anything, mock.Anything).Return(nil, nil)

_, err := es.AddOrUpdateListener(es.bgCtx, l.ID, l, false)
assert.NoError(t, err)

err = es.Start(es.bgCtx)
assert.Regexp(t, "pop", err)

err = es.Stop(es.bgCtx)
assert.NoError(t, err)

mfc.AssertExpectations(t)
}

func TestAddBlockListenerFailAfterStart(t *testing.T) {

es := newTestEventStream(t, `{
"name": "ut_stream",
"websocket": {
"topic": "ut_stream"
}
}`)

l := &apitypes.Listener{
ID: apitypes.NewULID(),
Name: strPtr("ut_listener"),
Type: &apitypes.ListenerTypeBlocks,
FromBlock: strPtr(ffcapi.FromBlockLatest),
}

mfc := es.connector.(*ffcapimocks.API)
mfc.On("EventStreamStart", mock.Anything, mock.Anything).Return(&ffcapi.EventStreamStartResponse{}, ffcapi.ErrorReason(""), nil)
mfc.On("EventStreamStopped", mock.Anything, mock.Anything).Return(&ffcapi.EventStreamStoppedResponse{}, ffcapi.ErrorReason(""), nil)

mcm := es.confirmations.(*confirmationsmocks.Manager)
mcm.On("StartConfirmedBlockListener", mock.Anything, l.ID, "latest", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))

msp := es.persistence.(*persistencemocks.Persistence)
msp.On("GetCheckpoint", mock.Anything, mock.Anything).Return(nil, nil)

err := es.Start(es.bgCtx)
assert.NoError(t, err)

_, err = es.AddOrUpdateListener(es.bgCtx, l.ID, l, false)
assert.Regexp(t, "pop", err)

err = es.Stop(es.bgCtx)
assert.NoError(t, err)

mfc.AssertExpectations(t)
}

func TestAttemptResetNonExistentListener(t *testing.T) {

es := newTestEventStream(t, `{
Expand Down Expand Up @@ -1947,3 +2054,30 @@ func TestHWMCheckpointFail(t *testing.T) {
msp.AssertExpectations(t)
mcm.AssertExpectations(t)
}

func TestCheckConfirmedEventForBatchIgnoreInvalid(t *testing.T) {

es := newTestEventStream(t, `{"name": "ut_stream"}`)

l, ewc := es.checkConfirmedEventForBatch(&ffcapi.ListenerEvent{})
assert.Nil(t, l)
assert.Nil(t, ewc)
}

func TestBuildBlockAddREquestBadCheckpoint(t *testing.T) {

spec := &apitypes.Listener{
ID: apitypes.NewULID(),
Name: strPtr("ut_listener"),
Type: &apitypes.ListenerTypeBlocks,
FromBlock: strPtr(ffcapi.FromBlockLatest),
}
l := &listener{spec: spec}

blar := l.buildBlockAddRequest(context.Background(), &apitypes.EventStreamCheckpoint{
Listeners: apitypes.CheckpointListeners{
*spec.ID: json.RawMessage([]byte("!!wrong")),
},
})
assert.Nil(t, blar.Checkpoint)
}
8 changes: 8 additions & 0 deletions pkg/ffcapi/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ func TestSortEvents(t *testing.T) {
}
}

func TestSortBlockEventsString(t *testing.T) {

assert.Equal(t, "block[12345/0x9614ad189f45ecff5f4949b22891c6bca7d83b40b50d8104bed101bc94395257]", (&BlockEvent{BlockInfo: BlockInfo{
BlockNumber: fftypes.NewFFBigInt(12345),
BlockHash: "0x9614ad189f45ecff5f4949b22891c6bca7d83b40b50d8104bed101bc94395257",
}}).String())
}

func TestBlockListenerCheckpoint(t *testing.T) {

b10 := &BlockListenerCheckpoint{Block: 10}
Expand Down

0 comments on commit fbce717

Please sign in to comment.