Skip to content

Commit

Permalink
Fix eventTracker deadlock by using helper function block
Browse files Browse the repository at this point in the history
  • Loading branch information
bitwiseguy committed Oct 9, 2023
1 parent 3b27f46 commit fbf16c1
Showing 1 changed file with 36 additions and 33 deletions.
69 changes: 36 additions & 33 deletions node/engine/chainservice/eth_chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,46 +372,49 @@ func (ecs *EthChainService) listenForEventLogs(errorChan chan<- error, eventChan
return

case err := <-ecs.eventSub.Err():
latestBlockNum := ecs.GetLastConfirmedBlockNum()
// Use helper function block to ensure "defer" statement is called for all exit paths
func() {
latestBlockNum := ecs.GetLastConfirmedBlockNum()

ecs.eventTracker.mu.Lock()
defer ecs.eventTracker.mu.Unlock()
ecs.eventTracker.mu.Lock()
defer ecs.eventTracker.mu.Unlock()

if err != nil {
ecs.logger.Warn("error in chain event subscription: " + err.Error())
ecs.eventSub.Unsubscribe()
} else {
ecs.logger.Warn("chain event subscription closed")
}

resubscribed := false // Flag to indicate whether resubscription was successful

// Use exponential backoff loop to attempt to re-establish subscription
for backoffTime := MIN_BACKOFF_TIME; backoffTime < MAX_BACKOFF_TIME; backoffTime *= 2 {
eventSub, err := ecs.chain.SubscribeFilterLogs(ecs.ctx, eventQuery, eventChan)
if err != nil {
ecs.logger.Warn("failed to resubscribe to chain events, retrying", "backoffTime", backoffTime)
time.Sleep(backoffTime)
continue
ecs.logger.Warn("error in chain event subscription: " + err.Error())
ecs.eventSub.Unsubscribe()
} else {
ecs.logger.Warn("chain event subscription closed")
}

ecs.eventSub = eventSub
ecs.logger.Debug("resubscribed to chain events")
err = ecs.checkForMissedEvents(latestBlockNum)
if err != nil {
errorChan <- fmt.Errorf("subscribeFilterLogs failed during checkForMissedEvents: " + err.Error())
return
resubscribed := false // Flag to indicate whether resubscription was successful

// Use exponential backoff loop to attempt to re-establish subscription
for backoffTime := MIN_BACKOFF_TIME; backoffTime < MAX_BACKOFF_TIME; backoffTime *= 2 {
eventSub, err := ecs.chain.SubscribeFilterLogs(ecs.ctx, eventQuery, eventChan)
if err != nil {
ecs.logger.Warn("failed to resubscribe to chain events, retrying", "backoffTime", backoffTime)
time.Sleep(backoffTime)
continue
}

ecs.eventSub = eventSub
ecs.logger.Debug("resubscribed to chain events")
err = ecs.checkForMissedEvents(latestBlockNum)
if err != nil {
errorChan <- fmt.Errorf("subscribeFilterLogs failed during checkForMissedEvents: " + err.Error())
return
}

resubscribed = true
break
}

resubscribed = true
break
}

if !resubscribed {
ecs.logger.Error("subscribeFilterLogs failed to resubscribe")
errorChan <- fmt.Errorf("subscribeFilterLogs failed to resubscribe")
return
}
if !resubscribed {
ecs.logger.Error("subscribeFilterLogs failed to resubscribe")
errorChan <- fmt.Errorf("subscribeFilterLogs failed to resubscribe")
return
}
}()

case <-time.After(RESUB_INTERVAL):
// Due to https://github.com/ethereum/go-ethereum/issues/23845 we can't rely on a long running subscription.
Expand Down

0 comments on commit fbf16c1

Please sign in to comment.