Skip to content

Commit

Permalink
Merge branch 'main' of github.com:hyperledger/firefly-transaction-man…
Browse files Browse the repository at this point in the history
…ager into block-listener

Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst committed Jun 12, 2024
2 parents adc7fa2 + 32d8a0c commit 072aa38
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 18 deletions.
1 change: 1 addition & 0 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|blockQueueLength|Internal queue length for notifying the confirmations manager of new blocks|`int`|`50`
|fetchReceiptUponEntry|Fetch receipt of new transactions immediately when they are added to the internal queue. When set to false, fetch will only happen when a new block is received or the transaction has been queue for more than the stale receipt timeout|`boolean`|`false`
|notificationQueueLength|Internal queue length for notifying the confirmations manager of new transactions/events|`int`|`50`
|receiptWorkers|Number of workers to use to query in parallel for receipts|`int`|`10`
|required|Number of confirmations required to consider a transaction/event final|`int`|`20`
Expand Down
24 changes: 18 additions & 6 deletions internal/confirmations/confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type blockConfirmationManager struct {
retry *retry.Retry
cblLock sync.Mutex
cbls map[fftypes.UUID]*confirmedBlockListener
fetchReceiptUponEntry bool
done chan struct{}
}

Expand All @@ -124,6 +125,7 @@ func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.A
MaximumDelay: config.GetDuration(tmconfig.ConfirmationsRetryMaxDelay),
Factor: config.GetFloat64(tmconfig.ConfirmationsRetryFactor),
},
fetchReceiptUponEntry: config.GetBool(tmconfig.ConfirmationsFetchReceiptUponEntry),
}
bcm.ctx, bcm.cancelFunc = context.WithCancel(baseContext)
// add a log context for this specific confirmation manager (as there are many within the )
Expand All @@ -145,6 +147,7 @@ type pendingItem struct {
added time.Time
notifiedConfirmations []*apitypes.Confirmation
confirmations []*apitypes.Confirmation
scheduledAtLeastOnce bool
confirmed bool
queuedStale *list.Element // protected by receiptChecker mux
lastReceiptCheck time.Time // protected by receiptChecker mux
Expand Down Expand Up @@ -358,6 +361,7 @@ func (bcm *blockConfirmationManager) confirmationsListener() {
notifications := make([]*Notification, 0)
blockHashes := make([]string, 0)
triggerType := ""
receivedFirstBlock := false
for {
select {
case bhe := <-bcm.newBlockHashes:
Expand Down Expand Up @@ -424,22 +428,28 @@ func (bcm *blockConfirmationManager) confirmationsListener() {
}
// Clear the notifications array now we've processed them (we keep the slice memory)
notifications = notifications[:0]

scheduleAllTxReceipts := !receivedFirstBlock && blockHashCount > 0
// Mark receipts stale after duration
bcm.staleReceiptCheck()
bcm.scheduleReceiptChecks(scheduleAllTxReceipts)
receivedFirstBlock = receivedFirstBlock || blockHashCount > 0
log.L(bcm.ctx).Tracef("[TimeTrace] Confirmation listener processed %d block hashes and %d notifications in %s, trigger type: %s", blockHashCount, notificationCount, time.Since(startTime), triggerType)

}

}

func (bcm *blockConfirmationManager) staleReceiptCheck() {
func (bcm *blockConfirmationManager) scheduleReceiptChecks(receivedBlocksFirstTime bool) {
now := time.Now()
for _, pending := range bcm.pending {
// For efficiency we do a dirty read on the receipt check time before going into the locking
// check within the receipt checker
if pending.pType == pendingTypeTransaction && now.Sub(pending.lastReceiptCheck) > bcm.staleReceiptTimeout {
bcm.receiptChecker.schedule(pending, true /* suspected timeout - prompts re-check in the lock */)
if pending.pType == pendingTypeTransaction {
if receivedBlocksFirstTime && !pending.scheduledAtLeastOnce {
bcm.receiptChecker.schedule(pending, false)
} else if now.Sub(pending.lastReceiptCheck) > bcm.staleReceiptTimeout {
// schedule stale receipt checks
bcm.receiptChecker.schedule(pending, true /* suspected timeout - prompts re-check in the lock */)
}
}
}
}
Expand All @@ -458,7 +468,9 @@ func (bcm *blockConfirmationManager) processNotifications(notifications []*Notif
case NewTransaction:
newItem := n.transactionPendingItem()
bcm.addOrReplaceItem(newItem)
bcm.receiptChecker.schedule(newItem, false)
if bcm.fetchReceiptUponEntry {
bcm.receiptChecker.schedule(newItem, false)
}
case RemovedEventLog:
bcm.removeItem(n.eventPendingItem(), true)
case RemovedTransaction:
Expand Down
29 changes: 19 additions & 10 deletions internal/confirmations/confirmations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ func TestBlockConfirmationManagerE2EForkReNotifyConfirmations(t *testing.T) {

func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) {
bcm, mca := newTestBlockConfirmationManager()
bcm.fetchReceiptUponEntry = true // mark fetch receipt upon entry to do a fetch receipt before any blocks were retrieved

confirmed := make(chan *apitypes.ConfirmationsNotification, 1)
receiptReceived := make(chan *ffcapi.TransactionReceiptResponse, 1)
Expand Down Expand Up @@ -647,7 +648,7 @@ func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) {
bcm.Stop()

mca.AssertExpectations(t)

// false
}

func TestBlockConfirmationManagerE2EHistoricalEvent(t *testing.T) {
Expand Down Expand Up @@ -1193,7 +1194,7 @@ func TestCheckReceiptWalkFail(t *testing.T) {
bcm.dispatchReceipt(pending, receipt, blocks)
}

func TestStaleReceiptCheck(t *testing.T) {
func TestScheduleReceiptCheck(t *testing.T) {

bcm, _ := newTestBlockConfirmationManager()
emm := &metricsmocks.EventMetricsEmitter{}
Expand All @@ -1205,16 +1206,24 @@ func TestStaleReceiptCheck(t *testing.T) {
emm.On("RecordConfirmationMetrics", mock.Anything, mock.Anything).Maybe()
bcm.receiptChecker = newReceiptChecker(bcm, 0, emm)

txHash := "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"
pending := &pendingItem{
pType: pendingTypeTransaction,
lastReceiptCheck: time.Now().Add(-1 * time.Hour),
transactionHash: txHash,
pendingStale := &pendingItem{ // stale
pType: pendingTypeTransaction,
lastReceiptCheck: time.Now().Add(-1 * time.Hour),
transactionHash: "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
scheduledAtLeastOnce: true,
}

pendingNotScheduled := &pendingItem{ // not scheduled
pType: pendingTypeTransaction,
lastReceiptCheck: time.Now().Add(-1 * time.Hour),
transactionHash: "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7",
scheduledAtLeastOnce: false,
}
bcm.pending[pending.getKey()] = pending
bcm.staleReceiptCheck()
bcm.pending[pendingStale.getKey()] = pendingStale
bcm.pending[pendingNotScheduled.getKey()] = pendingNotScheduled
bcm.scheduleReceiptChecks(true)

assert.Equal(t, bcm.receiptChecker.entries.Len(), 1)
assert.Equal(t, bcm.receiptChecker.entries.Len(), 2)

}

Expand Down
1 change: 1 addition & 0 deletions internal/confirmations/receipt_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (rc *receiptChecker) schedule(pending *pendingItem, suspectedTimeout bool)
return
}
pending.queuedStale = rc.entries.PushBack(pending)
pending.scheduledAtLeastOnce = true
rc.cond.Signal()
rc.cond.L.Unlock()
// Log (outside the lock as it's a contended one)
Expand Down
1 change: 0 additions & 1 deletion internal/confirmations/receipt_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,4 @@ func TestCheckReceiptDoubleQueueProtection(t *testing.T) {
// to a worker, and when it's successfully executed (or put back on the end of the list)
bcm.receiptChecker.schedule(pending, false)
assert.Zero(t, bcm.receiptChecker.entries.Len())

}
4 changes: 3 additions & 1 deletion internal/tmconfig/tmconfig.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -30,6 +30,7 @@ var (
ConfirmationsRequired = ffc("confirmations.required")
ConfirmationsBlockQueueLength = ffc("confirmations.blockQueueLength")
ConfirmationsStaleReceiptTimeout = ffc("confirmations.staleReceiptTimeout")
ConfirmationsFetchReceiptUponEntry = ffc("confirmations.fetchReceiptUponEntry")
ConfirmationsNotificationQueueLength = ffc("confirmations.notificationQueueLength")
ConfirmationsReceiptWorkers = ffc("confirmations.receiptWorkers")
ConfirmationsRetryInitDelay = ffc("confirmations.retry.initialDelay")
Expand Down Expand Up @@ -98,6 +99,7 @@ func setDefaults() {
viper.SetDefault(string(ConfirmationsRetryInitDelay), "100ms")
viper.SetDefault(string(ConfirmationsRetryMaxDelay), "15s")
viper.SetDefault(string(ConfirmationsRetryFactor), 2.0)
viper.SetDefault(string(ConfirmationsFetchReceiptUponEntry), false)

viper.SetDefault(string(EventStreamsDefaultsBatchSize), 50)
viper.SetDefault(string(EventStreamsDefaultsBatchTimeout), "5s")
Expand Down
1 change: 1 addition & 0 deletions internal/tmmsgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
ConfigConfirmationsNotificationsQueueLength = ffc("config.confirmations.notificationQueueLength", "Internal queue length for notifying the confirmations manager of new transactions/events", i18n.IntType)
ConfigConfirmationsRequired = ffc("config.confirmations.required", "Number of confirmations required to consider a transaction/event final", i18n.IntType)
ConfigConfirmationsStaleReceiptTimeout = ffc("config.confirmations.staleReceiptTimeout", "Duration after which to force a receipt check for a pending transaction", i18n.TimeDurationType)
ConfigConfirmationsFetchReceiptUponEntry = ffc("config.confirmations.fetchReceiptUponEntry", "Fetch receipt of new transactions immediately when they are added to the internal queue. When set to false, fetch will only happen when a new block is received or the transaction has been queue for more than the stale receipt timeout", i18n.BooleanType)
ConfigConfirmationsReceiptWorkers = ffc("config.confirmations.receiptWorkers", "Number of workers to use to query in parallel for receipts", i18n.IntType)

ConfigTransactionsNonceStateTimeout = ffc("config.transactions.nonceStateTimeout", "How old the most recently submitted transaction record in our local state needs to be, before we make a request to the node to query the next nonce for a signing address", i18n.TimeDurationType)
Expand Down

0 comments on commit 072aa38

Please sign in to comment.