diff --git a/node/engine/chainservice/eth_chainservice.go b/node/engine/chainservice/eth_chainservice.go index 33cca008a..0c834a7e9 100644 --- a/node/engine/chainservice/eth_chainservice.go +++ b/node/engine/chainservice/eth_chainservice.go @@ -372,41 +372,49 @@ func (ecs *EthChainService) listenForEventLogs(errorChan chan<- error, eventChan return case err := <-ecs.eventSub.Err(): - ecs.eventTracker.mu.Lock() - defer ecs.eventTracker.mu.Unlock() + // Use helper function block to ensure "defer" statement is called for all exit paths + func() { + latestBlockNum := ecs.GetLastConfirmedBlockNum() - latestBlockNum := ecs.eventTracker.latestBlockNum + ecs.eventTracker.mu.Lock() + defer ecs.eventTracker.mu.Unlock() - if err != nil { - ecs.logger.Warn("error in chain event subscription: " + err.Error()) - ecs.eventSub.Unsubscribe() - } else { - ecs.logger.Warn("chain event subscription closed") - } - - // Use exponential backoff loop to attempt to re-establish subscription - for backoffTime := MIN_BACKOFF_TIME; backoffTime < MAX_BACKOFF_TIME; backoffTime *= 2 { - eventSub, err := ecs.chain.SubscribeFilterLogs(ecs.ctx, eventQuery, eventChan) if err != nil { - ecs.logger.Warn("failed to resubscribe to chain events, retrying", "backoffTime", backoffTime) - time.Sleep(backoffTime) - continue + ecs.logger.Warn("error in chain event subscription: " + err.Error()) + ecs.eventSub.Unsubscribe() + } else { + ecs.logger.Warn("chain event subscription closed") } - ecs.eventSub = eventSub - ecs.logger.Debug("resubscribed to chain events") - err = ecs.checkForMissedEvents(latestBlockNum) - if err != nil { - errorChan <- fmt.Errorf("subscribeFilterLogs failed during checkForMissedEvents: " + err.Error()) - return + resubscribed := false // Flag to indicate whether resubscription was successful + + // Use exponential backoff loop to attempt to re-establish subscription + for backoffTime := MIN_BACKOFF_TIME; backoffTime < MAX_BACKOFF_TIME; backoffTime *= 2 { + eventSub, err := ecs.chain.SubscribeFilterLogs(ecs.ctx, eventQuery, eventChan) + if err != nil { + ecs.logger.Warn("failed to resubscribe to chain events, retrying", "backoffTime", backoffTime) + time.Sleep(backoffTime) + continue + } + + ecs.eventSub = eventSub + ecs.logger.Debug("resubscribed to chain events") + err = ecs.checkForMissedEvents(latestBlockNum) + if err != nil { + errorChan <- fmt.Errorf("subscribeFilterLogs failed during checkForMissedEvents: " + err.Error()) + return + } + + resubscribed = true + break } - break - } - - ecs.logger.Error("subscribeFilterLogs failed to resubscribe") - errorChan <- fmt.Errorf("subscribeFilterLogs failed to resubscribe") - return + if !resubscribed { + ecs.logger.Error("subscribeFilterLogs failed to resubscribe") + errorChan <- fmt.Errorf("subscribeFilterLogs failed to resubscribe") + return + } + }() case <-time.After(RESUB_INTERVAL): // Due to https://github.com/ethereum/go-ethereum/issues/23845 we can't rely on a long running subscription.