From 471e0d7f45358967ed7135a988e10338372611b5 Mon Sep 17 00:00:00 2001 From: Samuel Stokes Date: Mon, 9 Oct 2023 10:45:25 -0400 Subject: [PATCH 1/3] Fix chain event resubscribe case --- node/engine/chainservice/eth_chainservice.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/node/engine/chainservice/eth_chainservice.go b/node/engine/chainservice/eth_chainservice.go index 33cca008a..7673e2eaa 100644 --- a/node/engine/chainservice/eth_chainservice.go +++ b/node/engine/chainservice/eth_chainservice.go @@ -384,6 +384,8 @@ func (ecs *EthChainService) listenForEventLogs(errorChan chan<- error, eventChan ecs.logger.Warn("chain event subscription closed") } + resubscribed := false // Flag to indicate whether resubscription was successful + // Use exponential backoff loop to attempt to re-establish subscription for backoffTime := MIN_BACKOFF_TIME; backoffTime < MAX_BACKOFF_TIME; backoffTime *= 2 { eventSub, err := ecs.chain.SubscribeFilterLogs(ecs.ctx, eventQuery, eventChan) @@ -401,12 +403,15 @@ func (ecs *EthChainService) listenForEventLogs(errorChan chan<- error, eventChan return } + resubscribed = true break } - ecs.logger.Error("subscribeFilterLogs failed to resubscribe") - errorChan <- fmt.Errorf("subscribeFilterLogs failed to resubscribe") - return + if !resubscribed { + ecs.logger.Error("subscribeFilterLogs failed to resubscribe") + errorChan <- fmt.Errorf("subscribeFilterLogs failed to resubscribe") + return + } case <-time.After(RESUB_INTERVAL): // Due to https://github.com/ethereum/go-ethereum/issues/23845 we can't rely on a long running subscription. From 3b27f4671f5db173ec438776c502cb7bd7cf35eb Mon Sep 17 00:00:00 2001 From: Samuel Stokes Date: Mon, 9 Oct 2023 11:09:25 -0400 Subject: [PATCH 2/3] Use GetLastConfirmedBlockNum to set checkForMissedEvents startBlock --- node/engine/chainservice/eth_chainservice.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/engine/chainservice/eth_chainservice.go b/node/engine/chainservice/eth_chainservice.go index 7673e2eaa..8d3c6804a 100644 --- a/node/engine/chainservice/eth_chainservice.go +++ b/node/engine/chainservice/eth_chainservice.go @@ -372,11 +372,11 @@ func (ecs *EthChainService) listenForEventLogs(errorChan chan<- error, eventChan return case err := <-ecs.eventSub.Err(): + latestBlockNum := ecs.GetLastConfirmedBlockNum() + ecs.eventTracker.mu.Lock() defer ecs.eventTracker.mu.Unlock() - latestBlockNum := ecs.eventTracker.latestBlockNum - if err != nil { ecs.logger.Warn("error in chain event subscription: " + err.Error()) ecs.eventSub.Unsubscribe() From fbf16c14a8afb677bde89d6fbf17359dd9bbf83a Mon Sep 17 00:00:00 2001 From: Samuel Stokes Date: Mon, 9 Oct 2023 14:01:25 -0400 Subject: [PATCH 3/3] Fix eventTracker deadlock by using helper function block --- node/engine/chainservice/eth_chainservice.go | 69 ++++++++++---------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/node/engine/chainservice/eth_chainservice.go b/node/engine/chainservice/eth_chainservice.go index 8d3c6804a..0c834a7e9 100644 --- a/node/engine/chainservice/eth_chainservice.go +++ b/node/engine/chainservice/eth_chainservice.go @@ -372,46 +372,49 @@ func (ecs *EthChainService) listenForEventLogs(errorChan chan<- error, eventChan return case err := <-ecs.eventSub.Err(): - latestBlockNum := ecs.GetLastConfirmedBlockNum() + // Use helper function block to ensure "defer" statement is called for all exit paths + func() { + latestBlockNum := ecs.GetLastConfirmedBlockNum() - ecs.eventTracker.mu.Lock() - defer ecs.eventTracker.mu.Unlock() + ecs.eventTracker.mu.Lock() + defer ecs.eventTracker.mu.Unlock() - if err != nil { - ecs.logger.Warn("error in chain event subscription: " + err.Error()) - ecs.eventSub.Unsubscribe() - } else { - ecs.logger.Warn("chain event subscription closed") - } - - resubscribed := false // Flag to indicate whether resubscription was successful - - // Use exponential backoff loop to attempt to re-establish subscription - for backoffTime := MIN_BACKOFF_TIME; backoffTime < MAX_BACKOFF_TIME; backoffTime *= 2 { - eventSub, err := ecs.chain.SubscribeFilterLogs(ecs.ctx, eventQuery, eventChan) if err != nil { - ecs.logger.Warn("failed to resubscribe to chain events, retrying", "backoffTime", backoffTime) - time.Sleep(backoffTime) - continue + ecs.logger.Warn("error in chain event subscription: " + err.Error()) + ecs.eventSub.Unsubscribe() + } else { + ecs.logger.Warn("chain event subscription closed") } - ecs.eventSub = eventSub - ecs.logger.Debug("resubscribed to chain events") - err = ecs.checkForMissedEvents(latestBlockNum) - if err != nil { - errorChan <- fmt.Errorf("subscribeFilterLogs failed during checkForMissedEvents: " + err.Error()) - return + resubscribed := false // Flag to indicate whether resubscription was successful + + // Use exponential backoff loop to attempt to re-establish subscription + for backoffTime := MIN_BACKOFF_TIME; backoffTime < MAX_BACKOFF_TIME; backoffTime *= 2 { + eventSub, err := ecs.chain.SubscribeFilterLogs(ecs.ctx, eventQuery, eventChan) + if err != nil { + ecs.logger.Warn("failed to resubscribe to chain events, retrying", "backoffTime", backoffTime) + time.Sleep(backoffTime) + continue + } + + ecs.eventSub = eventSub + ecs.logger.Debug("resubscribed to chain events") + err = ecs.checkForMissedEvents(latestBlockNum) + if err != nil { + errorChan <- fmt.Errorf("subscribeFilterLogs failed during checkForMissedEvents: " + err.Error()) + return + } + + resubscribed = true + break } - resubscribed = true - break - } - - if !resubscribed { - ecs.logger.Error("subscribeFilterLogs failed to resubscribe") - errorChan <- fmt.Errorf("subscribeFilterLogs failed to resubscribe") - return - } + if !resubscribed { + ecs.logger.Error("subscribeFilterLogs failed to resubscribe") + errorChan <- fmt.Errorf("subscribeFilterLogs failed to resubscribe") + return + } + }() case <-time.After(RESUB_INTERVAL): // Due to https://github.com/ethereum/go-ethereum/issues/23845 we can't rely on a long running subscription.