Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to avoid schedule receipt check immediately #124

Merged
merged 4 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|blockQueueLength|Internal queue length for notifying the confirmations manager of new blocks|`int`|`50`
|fetchReceiptUponEntry|Fetch receipt of new transactions immediately when they are added to the internal queue. When set to false, fetch will only happen when a new block is received or the transaction has been queue for more than the stale receipt timeout|`boolean`|`false`
|notificationQueueLength|Internal queue length for notifying the confirmations manager of new transactions/events|`int`|`50`
|receiptWorkers|Number of workers to use to query in parallel for receipts|`int`|`10`
|required|Number of confirmations required to consider a transaction/event final|`int`|`20`
Expand Down
24 changes: 18 additions & 6 deletions internal/confirmations/confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type blockConfirmationManager struct {
pendingMux sync.Mutex
receiptChecker *receiptChecker
retry *retry.Retry
fetchReceiptUponEntry bool
done chan struct{}
}

Expand All @@ -119,6 +120,7 @@ func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.A
MaximumDelay: config.GetDuration(tmconfig.ConfirmationsRetryMaxDelay),
Factor: config.GetFloat64(tmconfig.ConfirmationsRetryFactor),
},
fetchReceiptUponEntry: config.GetBool(tmconfig.ConfirmationsFetchReceiptUponEntry),
}
bcm.ctx, bcm.cancelFunc = context.WithCancel(baseContext)
// add a log context for this specific confirmation manager (as there are many within the )
Expand All @@ -140,6 +142,7 @@ type pendingItem struct {
added time.Time
notifiedConfirmations []*apitypes.Confirmation
confirmations []*apitypes.Confirmation
scheduledAtLeastOnce bool
confirmed bool
queuedStale *list.Element // protected by receiptChecker mux
lastReceiptCheck time.Time // protected by receiptChecker mux
Expand Down Expand Up @@ -328,6 +331,7 @@ func (bcm *blockConfirmationManager) confirmationsListener() {
notifications := make([]*Notification, 0)
blockHashes := make([]string, 0)
triggerType := ""
receivedFirstBlock := false
for {
select {
case bhe := <-bcm.newBlockHashes:
Expand Down Expand Up @@ -389,22 +393,28 @@ func (bcm *blockConfirmationManager) confirmationsListener() {
}
// Clear the notifications array now we've processed them (we keep the slice memory)
notifications = notifications[:0]

scheduleAllTxReceipts := !receivedFirstBlock && blockHashCount > 0
// Mark receipts stale after duration
bcm.staleReceiptCheck()
bcm.scheduleReceiptChecks(scheduleAllTxReceipts)
receivedFirstBlock = receivedFirstBlock || blockHashCount > 0
log.L(bcm.ctx).Tracef("[TimeTrace] Confirmation listener processed %d block hashes and %d notifications in %s, trigger type: %s", blockHashCount, notificationCount, time.Since(startTime), triggerType)

}

}

func (bcm *blockConfirmationManager) staleReceiptCheck() {
func (bcm *blockConfirmationManager) scheduleReceiptChecks(receivedBlocksFirstTime bool) {
now := time.Now()
for _, pending := range bcm.pending {
// For efficiency we do a dirty read on the receipt check time before going into the locking
// check within the receipt checker
if pending.pType == pendingTypeTransaction && now.Sub(pending.lastReceiptCheck) > bcm.staleReceiptTimeout {
bcm.receiptChecker.schedule(pending, true /* suspected timeout - prompts re-check in the lock */)
if pending.pType == pendingTypeTransaction {
if receivedBlocksFirstTime && !pending.scheduledAtLeastOnce {
bcm.receiptChecker.schedule(pending, false)
} else if now.Sub(pending.lastReceiptCheck) > bcm.staleReceiptTimeout {
// schedule stale receipt checks
bcm.receiptChecker.schedule(pending, true /* suspected timeout - prompts re-check in the lock */)
}
}
}
}
Expand All @@ -423,7 +433,9 @@ func (bcm *blockConfirmationManager) processNotifications(notifications []*Notif
case NewTransaction:
newItem := n.transactionPendingItem()
bcm.addOrReplaceItem(newItem)
bcm.receiptChecker.schedule(newItem, false)
if bcm.fetchReceiptUponEntry {
bcm.receiptChecker.schedule(newItem, false)
}
case RemovedEventLog:
bcm.removeItem(n.eventPendingItem(), true)
case RemovedTransaction:
Expand Down
29 changes: 19 additions & 10 deletions internal/confirmations/confirmations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ func TestBlockConfirmationManagerE2EForkReNotifyConfirmations(t *testing.T) {

func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) {
bcm, mca := newTestBlockConfirmationManager(t, true)
bcm.fetchReceiptUponEntry = true // mark fetch receipt upon entry to do a fetch receipt before any blocks were retrieved

confirmed := make(chan *apitypes.ConfirmationsNotification, 1)
receiptReceived := make(chan *ffcapi.TransactionReceiptResponse, 1)
Expand Down Expand Up @@ -643,7 +644,7 @@ func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) {
bcm.Stop()

mca.AssertExpectations(t)

// false
}

func TestBlockConfirmationManagerE2EHistoricalEvent(t *testing.T) {
Expand Down Expand Up @@ -1185,7 +1186,7 @@ func TestCheckReceiptWalkFail(t *testing.T) {
bcm.dispatchReceipt(pending, receipt, blocks)
}

func TestStaleReceiptCheck(t *testing.T) {
func TestScheduleReceiptCheck(t *testing.T) {

bcm, _ := newTestBlockConfirmationManager(t, false)
emm := &metricsmocks.EventMetricsEmitter{}
Expand All @@ -1197,16 +1198,24 @@ func TestStaleReceiptCheck(t *testing.T) {
emm.On("RecordConfirmationMetrics", mock.Anything, mock.Anything).Maybe()
bcm.receiptChecker = newReceiptChecker(bcm, 0, emm)

txHash := "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"
pending := &pendingItem{
pType: pendingTypeTransaction,
lastReceiptCheck: time.Now().Add(-1 * time.Hour),
transactionHash: txHash,
pendingStale := &pendingItem{ // stale
pType: pendingTypeTransaction,
lastReceiptCheck: time.Now().Add(-1 * time.Hour),
transactionHash: "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
scheduledAtLeastOnce: true,
}

pendingNotScheduled := &pendingItem{ // not scheduled
pType: pendingTypeTransaction,
lastReceiptCheck: time.Now().Add(-1 * time.Hour),
transactionHash: "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7",
scheduledAtLeastOnce: false,
}
bcm.pending[pending.getKey()] = pending
bcm.staleReceiptCheck()
bcm.pending[pendingStale.getKey()] = pendingStale
bcm.pending[pendingNotScheduled.getKey()] = pendingNotScheduled
bcm.scheduleReceiptChecks(true)

assert.Equal(t, bcm.receiptChecker.entries.Len(), 1)
assert.Equal(t, bcm.receiptChecker.entries.Len(), 2)

}

Expand Down
1 change: 1 addition & 0 deletions internal/confirmations/receipt_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (rc *receiptChecker) schedule(pending *pendingItem, suspectedTimeout bool)
return
}
pending.queuedStale = rc.entries.PushBack(pending)
pending.scheduledAtLeastOnce = true
rc.cond.Signal()
rc.cond.L.Unlock()
// Log (outside the lock as it's a contended one)
Expand Down
1 change: 0 additions & 1 deletion internal/confirmations/receipt_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,4 @@ func TestCheckReceiptDoubleQueueProtection(t *testing.T) {
// to a worker, and when it's successfully executed (or put back on the end of the list)
bcm.receiptChecker.schedule(pending, false)
assert.Zero(t, bcm.receiptChecker.entries.Len())

}
4 changes: 3 additions & 1 deletion internal/tmconfig/tmconfig.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -30,6 +30,7 @@ var (
ConfirmationsRequired = ffc("confirmations.required")
ConfirmationsBlockQueueLength = ffc("confirmations.blockQueueLength")
ConfirmationsStaleReceiptTimeout = ffc("confirmations.staleReceiptTimeout")
ConfirmationsFetchReceiptUponEntry = ffc("confirmations.fetchReceiptUponEntry")
ConfirmationsNotificationQueueLength = ffc("confirmations.notificationQueueLength")
ConfirmationsReceiptWorkers = ffc("confirmations.receiptWorkers")
ConfirmationsRetryInitDelay = ffc("confirmations.retry.initialDelay")
Expand Down Expand Up @@ -98,6 +99,7 @@ func setDefaults() {
viper.SetDefault(string(ConfirmationsRetryInitDelay), "100ms")
viper.SetDefault(string(ConfirmationsRetryMaxDelay), "15s")
viper.SetDefault(string(ConfirmationsRetryFactor), 2.0)
viper.SetDefault(string(ConfirmationsFetchReceiptUponEntry), false)

viper.SetDefault(string(EventStreamsDefaultsBatchSize), 50)
viper.SetDefault(string(EventStreamsDefaultsBatchTimeout), "5s")
Expand Down
1 change: 1 addition & 0 deletions internal/tmmsgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
ConfigConfirmationsNotificationsQueueLength = ffc("config.confirmations.notificationQueueLength", "Internal queue length for notifying the confirmations manager of new transactions/events", i18n.IntType)
ConfigConfirmationsRequired = ffc("config.confirmations.required", "Number of confirmations required to consider a transaction/event final", i18n.IntType)
ConfigConfirmationsStaleReceiptTimeout = ffc("config.confirmations.staleReceiptTimeout", "Duration after which to force a receipt check for a pending transaction", i18n.TimeDurationType)
ConfigConfirmationsFetchReceiptUponEntry = ffc("config.confirmations.fetchReceiptUponEntry", "Fetch receipt of new transactions immediately when they are added to the internal queue. When set to false, fetch will only happen when a new block is received or the transaction has been queue for more than the stale receipt timeout", i18n.BooleanType)
ConfigConfirmationsReceiptWorkers = ffc("config.confirmations.receiptWorkers", "Number of workers to use to query in parallel for receipts", i18n.IntType)

ConfigTransactionsNonceStateTimeout = ffc("config.transactions.nonceStateTimeout", "How old the most recently submitted transaction record in our local state needs to be, before we make a request to the node to query the next nonce for a signing address", i18n.TimeDurationType)
Expand Down