Skip to content

Commit

Permalink
Merge pull request #94 from hyperledger/diag-ldb-conf
Browse files Browse the repository at this point in the history
Fix confirmations incremental dispatch
  • Loading branch information
nguyer authored Jul 10, 2023
2 parents 6ff003a + f210a7b commit bdeb537
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 25 deletions.
39 changes: 29 additions & 10 deletions internal/confirmations/confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,20 +571,39 @@ func (bcm *blockConfirmationManager) dispatchConfirmations(item *pendingItem) {
for i, c := range item.confirmations {
if !newFork && i < len(item.notifiedConfirmations) {
newFork = c.BlockHash != item.notifiedConfirmations[i].BlockHash
if newFork {
// Notify of the full set
notificationConfirmations = append([]*apitypes.Confirmation{}, item.confirmations...)
break
}
} else {
// Only notify of the additional ones
notificationConfirmations = append(notificationConfirmations, c)
}
notificationConfirmations = append(notificationConfirmations, c)
}

notification := &apitypes.ConfirmationsNotification{
Confirmed: item.confirmed,
NewFork: newFork,
Confirmations: notificationConfirmations,
// Possible for us to re-dispatch the same confirmations, if we are notified about a block later after
// after we previously did a crawl for blocks.
// So we protect here against dispatching an empty array
if len(notificationConfirmations) > 0 || item.confirmed {
notification := &apitypes.ConfirmationsNotification{
Confirmed: item.confirmed,
NewFork: newFork,
Confirmations: notificationConfirmations,
}
// Take a copy of the notification confirmations so we know what we have previously notified next time round
// (not safe to keep a reference, in case it's modified by the callback).
previouslyNotified := len(item.notifiedConfirmations)
if newFork {
item.notifiedConfirmations = append([]*apitypes.Confirmation{}, notificationConfirmations...)
} else {
item.notifiedConfirmations = append(item.notifiedConfirmations, notificationConfirmations...)
}
log.L(bcm.ctx).Infof("Confirmation notification item=%s confirmed=%t confirmations=%d newFork=%t previouslyNotified=%d",
item.getKey(), notification.Confirmed, len(item.confirmations),
notification.NewFork, previouslyNotified)
item.confirmationsCallback(bcm.ctx, notification)
}
item.notifiedConfirmations = notificationConfirmations
log.L(bcm.ctx).Infof("Confirmation notification item=%s confirmed=%t confirmations=%d newFork=%t notifiedConfirmations=%d",
item.getKey(), notification.Confirmed, len(item.confirmations),
notification.NewFork, len(notificationConfirmations))
item.confirmationsCallback(bcm.ctx, notification)

}

Expand Down
198 changes: 183 additions & 15 deletions internal/confirmations/confirmations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newTestBlockConfirmationManagerCustomConfig(t *testing.T) (*blockConfirmati
func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) {
bcm, mca := newTestBlockConfirmationManager(t, true)

confirmed := make(chan []*apitypes.Confirmation, 1)
confirmed := make(chan *apitypes.ConfirmationsNotification, 1)
eventToConfirm := &EventInfo{
ID: &ffcapi.EventID{
ListenerID: fftypes.NewUUID(),
Expand All @@ -61,9 +61,7 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) {
LogIndex: 10,
},
Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) {
if notification.Confirmed {
confirmed <- notification.Confirmations
}
confirmed <- notification
},
}

Expand Down Expand Up @@ -148,12 +146,22 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) {

bcm.Start()

// First get the block 1002 & 1004 confirmation notifications - but we're not confirmed yet
dispatched := <-confirmed
assert.Equal(t, []*apitypes.Confirmation{
apitypes.ConfirmationFromBlock(block1002),
apitypes.ConfirmationFromBlock(block1003),
}, dispatched.Confirmations)
assert.True(t, dispatched.NewFork)
assert.False(t, dispatched.Confirmed)

// Then get the 1004 with the confirmed true
dispatched = <-confirmed
assert.Equal(t, []*apitypes.Confirmation{
apitypes.ConfirmationFromBlock(block1004),
}, dispatched)
}, dispatched.Confirmations)
assert.False(t, dispatched.NewFork)
assert.True(t, dispatched.Confirmed)

bcm.Stop()

Expand All @@ -163,7 +171,7 @@ func TestBlockConfirmationManagerE2ENewEvent(t *testing.T) {
func TestBlockConfirmationManagerE2EFork(t *testing.T) {
bcm, mca := newTestBlockConfirmationManager(t, true)

confirmed := make(chan []*apitypes.Confirmation, 1)
confirmed := make(chan *apitypes.ConfirmationsNotification, 1)
eventToConfirm := &EventInfo{
ID: &ffcapi.EventID{
ListenerID: fftypes.NewUUID(),
Expand All @@ -174,9 +182,7 @@ func TestBlockConfirmationManagerE2EFork(t *testing.T) {
LogIndex: 10,
},
Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) {
if notification.Confirmed {
confirmed <- notification.Confirmations
}
confirmed <- notification
},
}

Expand Down Expand Up @@ -280,12 +286,162 @@ func TestBlockConfirmationManagerE2EFork(t *testing.T) {

bcm.Start()

// Notified of 1002 - new fork as base
dispatched := <-confirmed
assert.Equal(t, []*apitypes.Confirmation{
apitypes.ConfirmationFromBlock(block1002),
}, dispatched.Confirmations)
assert.True(t, dispatched.NewFork)
assert.False(t, dispatched.Confirmed)

// Only notified of 1003b which is in the confirmation chain - not a new fork, and not confirmed
dispatched = <-confirmed
assert.Equal(t, []*apitypes.Confirmation{
apitypes.ConfirmationFromBlock(block1003b),
}, dispatched.Confirmations)
assert.False(t, dispatched.NewFork)
assert.False(t, dispatched.Confirmed)

// Notified of 1004 and confirmation
dispatched = <-confirmed
assert.Equal(t, []*apitypes.Confirmation{
apitypes.ConfirmationFromBlock(block1004),
}, dispatched)
}, dispatched.Confirmations)
assert.False(t, dispatched.NewFork)
assert.True(t, dispatched.Confirmed)

bcm.Stop()

mca.AssertExpectations(t)

}

func TestBlockConfirmationManagerE2EForkReNotifyConfirmations(t *testing.T) {
bcm, mca := newTestBlockConfirmationManager(t, true)

confirmed := make(chan *apitypes.ConfirmationsNotification, 3)
eventToConfirm := &EventInfo{
ID: &ffcapi.EventID{
ListenerID: fftypes.NewUUID(),
TransactionHash: "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7",
BlockHash: "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542",
BlockNumber: 1001,
TransactionIndex: 5,
LogIndex: 10,
},
Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) {
confirmed <- notification
},
}

// The next filter gives us 1002, and a first 1003 block - which will later be removed
block1002 := &apitypes.BlockInfo{
BlockNumber: 1002,
BlockHash: "0x64fd8179b80dd255d52ce60d7f265c0506be810e2f3df52463fadeb44bb4d2df",
ParentHash: "0x0e32d749a86cfaf551d528b5b121cea456f980a39e5b8136eb8e85dbc744a542",
}
block1003a := &apitypes.BlockInfo{
BlockNumber: 1003,
BlockHash: "0x46210d224888265c269359529618bf2f6adb2697ff52c63c10f16a2391bdd295",
ParentHash: "0x64fd8179b80dd255d52ce60d7f265c0506be810e2f3df52463fadeb44bb4d2df",
}

blockHashes := bcm.NewBlockHashes()

// Have the event notification in flight from the beginning
err := bcm.Notify(&Notification{
NotificationType: NewEventLog,
Event: eventToConfirm,
})
assert.NoError(t, err)

// Then we get the final fork up to our confirmation
block1003b := &apitypes.BlockInfo{
BlockNumber: 1003,
BlockHash: "0xed21f4f73d150f16f922ae82b7485cd936ae1eca4c027516311b928360a347e8",
ParentHash: "0x64fd8179b80dd255d52ce60d7f265c0506be810e2f3df52463fadeb44bb4d2df",
}
block1004 := &apitypes.BlockInfo{
BlockNumber: 1004,
BlockHash: "0x110282339db2dfe4bfd13d78375f7883048cac6bc12f8393bd080a4e263d5d21",
ParentHash: "0xed21f4f73d150f16f922ae82b7485cd936ae1eca4c027516311b928360a347e8",
}
mca.On("BlockInfoByHash", mock.Anything, mock.MatchedBy(func(r *ffcapi.BlockInfoByHashRequest) bool {
return r.BlockHash == block1003b.BlockHash
})).Return(&ffcapi.BlockInfoByHashResponse{
BlockInfo: ffcapi.BlockInfo{
BlockNumber: fftypes.NewFFBigInt(int64(block1003b.BlockNumber)),
BlockHash: block1003b.BlockHash,
ParentHash: block1003b.ParentHash,
},
}, ffcapi.ErrorReason(""), nil).Once()
mca.On("BlockInfoByHash", mock.Anything, mock.MatchedBy(func(r *ffcapi.BlockInfoByHashRequest) bool {
return r.BlockHash == block1004.BlockHash
})).Return(&ffcapi.BlockInfoByHashResponse{
BlockInfo: ffcapi.BlockInfo{
BlockNumber: fftypes.NewFFBigInt(int64(block1004.BlockNumber)),
BlockHash: block1004.BlockHash,
ParentHash: block1004.ParentHash,
},
}, ffcapi.ErrorReason(""), nil).Once()

mca.On("BlockInfoByNumber", mock.Anything, mock.MatchedBy(func(r *ffcapi.BlockInfoByNumberRequest) bool {
return r.BlockNumber.Uint64() == 1002
})).Return(&ffcapi.BlockInfoByNumberResponse{
BlockInfo: ffcapi.BlockInfo{
BlockNumber: fftypes.NewFFBigInt(int64(block1002.BlockNumber)),
BlockHash: block1002.BlockHash,
ParentHash: block1002.ParentHash,
},
}, ffcapi.ErrorReason(""), nil)
mca.On("BlockInfoByNumber", mock.Anything, mock.MatchedBy(func(r *ffcapi.BlockInfoByNumberRequest) bool {
return r.BlockNumber.Uint64() == 1003
})).Run(func(args mock.Arguments) {
// When we download 1003a, notify of 1003b
blockHashes <- &ffcapi.BlockHashEvent{
BlockHashes: []string{
block1003b.BlockHash,
block1004.BlockHash,
},
}
}).Return(&ffcapi.BlockInfoByNumberResponse{
BlockInfo: ffcapi.BlockInfo{
BlockNumber: fftypes.NewFFBigInt(int64(block1003a.BlockNumber)),
BlockHash: block1003a.BlockHash,
ParentHash: block1003a.ParentHash,
},
}, ffcapi.ErrorReason(""), nil)
mca.On("BlockInfoByNumber", mock.Anything, mock.MatchedBy(func(r *ffcapi.BlockInfoByNumberRequest) bool {
return r.BlockNumber.Uint64() == 1004
})).Return(nil, ffcapi.ErrorReasonNotFound, fmt.Errorf("not found"))

bcm.Start()

// Notified of 1002 and the original 1003, as the initial fork
dispatched := <-confirmed
assert.Equal(t, []*apitypes.Confirmation{
apitypes.ConfirmationFromBlock(block1002),
apitypes.ConfirmationFromBlock(block1003a),
}, dispatched.Confirmations)
assert.True(t, dispatched.NewFork)
assert.False(t, dispatched.Confirmed)

// Then notified of the complete new fork - including 1002 again
dispatched = <-confirmed
assert.Equal(t, []*apitypes.Confirmation{
apitypes.ConfirmationFromBlock(block1002),
apitypes.ConfirmationFromBlock(block1003b),
}, dispatched.Confirmations)
assert.True(t, dispatched.NewFork)
assert.False(t, dispatched.Confirmed)

// Notified of 1004 and confirmation
dispatched = <-confirmed
assert.Equal(t, []*apitypes.Confirmation{
apitypes.ConfirmationFromBlock(block1004),
}, dispatched.Confirmations)
assert.False(t, dispatched.NewFork)
assert.True(t, dispatched.Confirmed)

bcm.Stop()

Expand All @@ -296,14 +452,12 @@ func TestBlockConfirmationManagerE2EFork(t *testing.T) {
func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) {
bcm, mca := newTestBlockConfirmationManager(t, true)

confirmed := make(chan []*apitypes.Confirmation, 1)
confirmed := make(chan *apitypes.ConfirmationsNotification, 1)
receiptReceived := make(chan *ffcapi.TransactionReceiptResponse, 1)
txToConfirmForkA := &TransactionInfo{
TransactionHash: "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) {
if notification.Confirmed {
confirmed <- notification.Confirmations
}
confirmed <- notification
},
Receipt: func(ctx context.Context, receipt *ffcapi.TransactionReceiptResponse) {
receiptReceived <- receipt
Expand Down Expand Up @@ -457,9 +611,23 @@ func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) {
dispatched := <-confirmed
assert.Equal(t, []*apitypes.Confirmation{
apitypes.ConfirmationFromBlock(block1002b),
}, dispatched.Confirmations)
assert.True(t, dispatched.NewFork)
assert.False(t, dispatched.Confirmed)

dispatched = <-confirmed
assert.Equal(t, []*apitypes.Confirmation{
apitypes.ConfirmationFromBlock(block1003),
}, dispatched.Confirmations)
assert.False(t, dispatched.NewFork)
assert.False(t, dispatched.Confirmed)

dispatched = <-confirmed
assert.Equal(t, []*apitypes.Confirmation{
apitypes.ConfirmationFromBlock(block1004),
}, dispatched)
}, dispatched.Confirmations)
assert.False(t, dispatched.NewFork)
assert.True(t, dispatched.Confirmed)

bcm.Stop()

Expand Down
1 change: 1 addition & 0 deletions pkg/txhandler/simple/policyloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ func (sth *simpleTransactionHandler) HandleTransactionConfirmations(ctx context.
pending.confirmed = notification.Confirmed
pending.confirmNotify = fftypes.Now()
pending.confirmations = notification
log.L(ctx).Infof("Received %d confirmations (resync=%t)", len(notification.Confirmations), notification.NewFork)
sth.mux.Unlock()

sth.markInflightUpdate()
Expand Down

0 comments on commit bdeb537

Please sign in to comment.