Skip to content

Commit

Permalink
fix: check all transactions in monitor check pending with timeout (#4867
Browse files Browse the repository at this point in the history
)

Co-authored-by: Janoš Guljaš <janos@users.noreply.github.com>
  • Loading branch information
martinconic and janos authored Oct 17, 2024
1 parent e7bc895 commit 0e42d83
Showing 1 changed file with 21 additions and 27 deletions.
48 changes: 21 additions & 27 deletions pkg/transaction/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type transactionMonitor struct {
}

type transactionWatch struct {
start time.Time
receiptC chan types.Receipt // channel to which the receipt will be written once available
errC chan error // error channel (primarily for cancelled transactions)
}
Expand Down Expand Up @@ -91,6 +92,7 @@ func (tm *transactionMonitor) WatchTransaction(txHash common.Hash, nonce uint64)
}

tm.watchesByNonce[nonce][txHash] = append(tm.watchesByNonce[nonce][txHash], transactionWatch{
start: time.Now(),
receiptC: receiptC,
errC: errC,
})
Expand Down Expand Up @@ -169,44 +171,36 @@ func (tm *transactionMonitor) watchPending() {
}
}

// potentiallyConfirmedTxWatches returns all watches with nonce less than what was specified
func (tm *transactionMonitor) potentiallyConfirmedTxWatches(nonce uint64) (watches map[uint64]map[common.Hash][]transactionWatch) {
func (tm *transactionMonitor) hasWatches() bool {
tm.lock.Lock()
defer tm.lock.Unlock()
return len(tm.watchesByNonce) > 0
}

potentiallyConfirmedTxWatches := make(map[uint64]map[common.Hash][]transactionWatch)
for n, watches := range tm.watchesByNonce {
if n < nonce {
potentiallyConfirmedTxWatches[n] = watches
func watchStart(watches []transactionWatch) time.Time {
if len(watches) == 0 {
return time.Time{}
}
start := watches[0].start
for _, w := range watches[1:] {
if w.start.Before(start) {
start = w.start
}
}

return potentiallyConfirmedTxWatches
}

func (tm *transactionMonitor) hasWatches() bool {
tm.lock.Lock()
defer tm.lock.Unlock()
return len(tm.watchesByNonce) > 0
return start
}

// check pending checks the given block (number) for confirmed or cancelled transactions
func (tm *transactionMonitor) checkPending(block uint64) error {
nonce, err := tm.backend.NonceAt(tm.ctx, tm.sender, new(big.Int).SetUint64(block))
if err != nil {
return err
}

// transactions with a nonce lower or equal to what is found on-chain are either confirmed or (at least temporarily) cancelled
potentiallyConfirmedTxWatches := tm.potentiallyConfirmedTxWatches(nonce)

confirmedNonces := make(map[uint64]*types.Receipt)
var cancelledNonces []uint64
for nonceGroup, watchMap := range potentiallyConfirmedTxWatches {
for txHash := range watchMap {
for nonceGroup, watchMap := range tm.watchesByNonce {
for txHash, watches := range watchMap {
receipt, err := tm.backend.TransactionReceipt(tm.ctx, txHash)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
// wait for a few blocks to be mined before considering a transaction not existing
transactionWatchNotFoundTimeout := 5 * tm.pollingInterval
if errors.Is(err, ethereum.NotFound) && watchStart(watches).Before(time.Now().Add(transactionWatchNotFoundTimeout)) {
// if both err and receipt are nil, there is no receipt
// the reason why we consider this only potentially cancelled is to catch cases where after a reorg the original transaction wins
continue
Expand All @@ -220,7 +214,7 @@ func (tm *transactionMonitor) checkPending(block uint64) error {
}
}

for nonceGroup := range potentiallyConfirmedTxWatches {
for nonceGroup := range tm.watchesByNonce {
if _, ok := confirmedNonces[nonceGroup]; ok {
continue
}
Expand All @@ -240,7 +234,7 @@ func (tm *transactionMonitor) checkPending(block uint64) error {
defer tm.lock.Unlock()

for nonce, receipt := range confirmedNonces {
for txHash, watches := range potentiallyConfirmedTxWatches[nonce] {
for txHash, watches := range tm.watchesByNonce[nonce] {
if receipt.TxHash == txHash {
for _, watch := range watches {
select {
Expand Down

0 comments on commit 0e42d83

Please sign in to comment.