diff --git a/config.md b/config.md index cd804e6a..56e8d207 100644 --- a/config.md +++ b/config.md @@ -44,6 +44,7 @@ |Key|Description|Type|Default Value| |---|-----------|----|-------------| |blockQueueLength|Internal queue length for notifying the confirmations manager of new blocks|`int`|`50` +|fetchReceiptUponEntry|Fetch receipt of new transactions immediately when they are added to the internal queue. When set to false, fetch will only happen when a new block is received or the transaction has been queue for more than the stale receipt timeout|`boolean`|`false` |notificationQueueLength|Internal queue length for notifying the confirmations manager of new transactions/events|`int`|`50` |receiptWorkers|Number of workers to use to query in parallel for receipts|`int`|`10` |required|Number of confirmations required to consider a transaction/event final|`int`|`20` diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index af519be6..a05effa5 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -99,6 +99,7 @@ type blockConfirmationManager struct { pendingMux sync.Mutex receiptChecker *receiptChecker retry *retry.Retry + fetchReceiptUponEntry bool done chan struct{} } @@ -119,6 +120,7 @@ func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.A MaximumDelay: config.GetDuration(tmconfig.ConfirmationsRetryMaxDelay), Factor: config.GetFloat64(tmconfig.ConfirmationsRetryFactor), }, + fetchReceiptUponEntry: config.GetBool(tmconfig.ConfirmationsFetchReceiptUponEntry), } bcm.ctx, bcm.cancelFunc = context.WithCancel(baseContext) // add a log context for this specific confirmation manager (as there are many within the ) @@ -140,6 +142,7 @@ type pendingItem struct { added time.Time notifiedConfirmations []*apitypes.Confirmation confirmations []*apitypes.Confirmation + scheduledAtLeastOnce bool confirmed bool queuedStale *list.Element // protected by receiptChecker mux lastReceiptCheck time.Time // protected by receiptChecker mux @@ -328,6 +331,7 @@ func (bcm *blockConfirmationManager) confirmationsListener() { notifications := make([]*Notification, 0) blockHashes := make([]string, 0) triggerType := "" + receivedFirstBlock := false for { select { case bhe := <-bcm.newBlockHashes: @@ -389,22 +393,28 @@ func (bcm *blockConfirmationManager) confirmationsListener() { } // Clear the notifications array now we've processed them (we keep the slice memory) notifications = notifications[:0] - + scheduleAllTxReceipts := !receivedFirstBlock && blockHashCount > 0 // Mark receipts stale after duration - bcm.staleReceiptCheck() + bcm.scheduleReceiptChecks(scheduleAllTxReceipts) + receivedFirstBlock = receivedFirstBlock || blockHashCount > 0 log.L(bcm.ctx).Tracef("[TimeTrace] Confirmation listener processed %d block hashes and %d notifications in %s, trigger type: %s", blockHashCount, notificationCount, time.Since(startTime), triggerType) } } -func (bcm *blockConfirmationManager) staleReceiptCheck() { +func (bcm *blockConfirmationManager) scheduleReceiptChecks(receivedBlocksFirstTime bool) { now := time.Now() for _, pending := range bcm.pending { // For efficiency we do a dirty read on the receipt check time before going into the locking // check within the receipt checker - if pending.pType == pendingTypeTransaction && now.Sub(pending.lastReceiptCheck) > bcm.staleReceiptTimeout { - bcm.receiptChecker.schedule(pending, true /* suspected timeout - prompts re-check in the lock */) + if pending.pType == pendingTypeTransaction { + if receivedBlocksFirstTime && !pending.scheduledAtLeastOnce { + bcm.receiptChecker.schedule(pending, false) + } else if now.Sub(pending.lastReceiptCheck) > bcm.staleReceiptTimeout { + // schedule stale receipt checks + bcm.receiptChecker.schedule(pending, true /* suspected timeout - prompts re-check in the lock */) + } } } } @@ -423,7 +433,9 @@ func (bcm *blockConfirmationManager) processNotifications(notifications []*Notif case NewTransaction: newItem := n.transactionPendingItem() bcm.addOrReplaceItem(newItem) - bcm.receiptChecker.schedule(newItem, false) + if bcm.fetchReceiptUponEntry { + bcm.receiptChecker.schedule(newItem, false) + } case RemovedEventLog: bcm.removeItem(n.eventPendingItem(), true) case RemovedTransaction: diff --git a/internal/confirmations/confirmations_test.go b/internal/confirmations/confirmations_test.go index 1b61d3a7..39d2dd8a 100644 --- a/internal/confirmations/confirmations_test.go +++ b/internal/confirmations/confirmations_test.go @@ -462,6 +462,7 @@ func TestBlockConfirmationManagerE2EForkReNotifyConfirmations(t *testing.T) { func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) { bcm, mca := newTestBlockConfirmationManager(t, true) + bcm.fetchReceiptUponEntry = true // mark fetch receipt upon entry to do a fetch receipt before any blocks were retrieved confirmed := make(chan *apitypes.ConfirmationsNotification, 1) receiptReceived := make(chan *ffcapi.TransactionReceiptResponse, 1) @@ -643,7 +644,7 @@ func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) { bcm.Stop() mca.AssertExpectations(t) - + // false } func TestBlockConfirmationManagerE2EHistoricalEvent(t *testing.T) { @@ -1185,7 +1186,7 @@ func TestCheckReceiptWalkFail(t *testing.T) { bcm.dispatchReceipt(pending, receipt, blocks) } -func TestStaleReceiptCheck(t *testing.T) { +func TestScheduleReceiptCheck(t *testing.T) { bcm, _ := newTestBlockConfirmationManager(t, false) emm := &metricsmocks.EventMetricsEmitter{} @@ -1197,16 +1198,24 @@ func TestStaleReceiptCheck(t *testing.T) { emm.On("RecordConfirmationMetrics", mock.Anything, mock.Anything).Maybe() bcm.receiptChecker = newReceiptChecker(bcm, 0, emm) - txHash := "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347" - pending := &pendingItem{ - pType: pendingTypeTransaction, - lastReceiptCheck: time.Now().Add(-1 * time.Hour), - transactionHash: txHash, + pendingStale := &pendingItem{ // stale + pType: pendingTypeTransaction, + lastReceiptCheck: time.Now().Add(-1 * time.Hour), + transactionHash: "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + scheduledAtLeastOnce: true, + } + + pendingNotScheduled := &pendingItem{ // not scheduled + pType: pendingTypeTransaction, + lastReceiptCheck: time.Now().Add(-1 * time.Hour), + transactionHash: "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7", + scheduledAtLeastOnce: false, } - bcm.pending[pending.getKey()] = pending - bcm.staleReceiptCheck() + bcm.pending[pendingStale.getKey()] = pendingStale + bcm.pending[pendingNotScheduled.getKey()] = pendingNotScheduled + bcm.scheduleReceiptChecks(true) - assert.Equal(t, bcm.receiptChecker.entries.Len(), 1) + assert.Equal(t, bcm.receiptChecker.entries.Len(), 2) } diff --git a/internal/confirmations/receipt_checker.go b/internal/confirmations/receipt_checker.go index 8daf6bdb..e4d2861d 100644 --- a/internal/confirmations/receipt_checker.go +++ b/internal/confirmations/receipt_checker.go @@ -149,6 +149,7 @@ func (rc *receiptChecker) schedule(pending *pendingItem, suspectedTimeout bool) return } pending.queuedStale = rc.entries.PushBack(pending) + pending.scheduledAtLeastOnce = true rc.cond.Signal() rc.cond.L.Unlock() // Log (outside the lock as it's a contended one) diff --git a/internal/confirmations/receipt_checker_test.go b/internal/confirmations/receipt_checker_test.go index 56cb6858..a42fb96d 100644 --- a/internal/confirmations/receipt_checker_test.go +++ b/internal/confirmations/receipt_checker_test.go @@ -130,5 +130,4 @@ func TestCheckReceiptDoubleQueueProtection(t *testing.T) { // to a worker, and when it's successfully executed (or put back on the end of the list) bcm.receiptChecker.schedule(pending, false) assert.Zero(t, bcm.receiptChecker.entries.Len()) - } diff --git a/internal/tmconfig/tmconfig.go b/internal/tmconfig/tmconfig.go index 0463dc85..cf877701 100644 --- a/internal/tmconfig/tmconfig.go +++ b/internal/tmconfig/tmconfig.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -30,6 +30,7 @@ var ( ConfirmationsRequired = ffc("confirmations.required") ConfirmationsBlockQueueLength = ffc("confirmations.blockQueueLength") ConfirmationsStaleReceiptTimeout = ffc("confirmations.staleReceiptTimeout") + ConfirmationsFetchReceiptUponEntry = ffc("confirmations.fetchReceiptUponEntry") ConfirmationsNotificationQueueLength = ffc("confirmations.notificationQueueLength") ConfirmationsReceiptWorkers = ffc("confirmations.receiptWorkers") ConfirmationsRetryInitDelay = ffc("confirmations.retry.initialDelay") @@ -98,6 +99,7 @@ func setDefaults() { viper.SetDefault(string(ConfirmationsRetryInitDelay), "100ms") viper.SetDefault(string(ConfirmationsRetryMaxDelay), "15s") viper.SetDefault(string(ConfirmationsRetryFactor), 2.0) + viper.SetDefault(string(ConfirmationsFetchReceiptUponEntry), false) viper.SetDefault(string(EventStreamsDefaultsBatchSize), 50) viper.SetDefault(string(EventStreamsDefaultsBatchTimeout), "5s") diff --git a/internal/tmmsgs/en_config_descriptions.go b/internal/tmmsgs/en_config_descriptions.go index 99c84d3f..14422fc8 100644 --- a/internal/tmmsgs/en_config_descriptions.go +++ b/internal/tmmsgs/en_config_descriptions.go @@ -45,6 +45,7 @@ var ( ConfigConfirmationsNotificationsQueueLength = ffc("config.confirmations.notificationQueueLength", "Internal queue length for notifying the confirmations manager of new transactions/events", i18n.IntType) ConfigConfirmationsRequired = ffc("config.confirmations.required", "Number of confirmations required to consider a transaction/event final", i18n.IntType) ConfigConfirmationsStaleReceiptTimeout = ffc("config.confirmations.staleReceiptTimeout", "Duration after which to force a receipt check for a pending transaction", i18n.TimeDurationType) + ConfigConfirmationsFetchReceiptUponEntry = ffc("config.confirmations.fetchReceiptUponEntry", "Fetch receipt of new transactions immediately when they are added to the internal queue. When set to false, fetch will only happen when a new block is received or the transaction has been queue for more than the stale receipt timeout", i18n.BooleanType) ConfigConfirmationsReceiptWorkers = ffc("config.confirmations.receiptWorkers", "Number of workers to use to query in parallel for receipts", i18n.IntType) ConfigTransactionsNonceStateTimeout = ffc("config.transactions.nonceStateTimeout", "How old the most recently submitted transaction record in our local state needs to be, before we make a request to the node to query the next nonce for a signing address", i18n.TimeDurationType)