Skip to content

Commit

Permalink
Merge pull request #1819 from statechannels/fix-resubscribe
Browse files Browse the repository at this point in the history
Fix bug in chain event resubscribe
  • Loading branch information
bitwiseguy committed Oct 10, 2023
2 parents 3aab58d + 9cbd41b commit e7e508f
Showing 1 changed file with 36 additions and 28 deletions.
64 changes: 36 additions & 28 deletions node/engine/chainservice/eth_chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,41 +372,49 @@ func (ecs *EthChainService) listenForEventLogs(errorChan chan<- error, eventChan
return

case err := <-ecs.eventSub.Err():
ecs.eventTracker.mu.Lock()
defer ecs.eventTracker.mu.Unlock()
// Use helper function block to ensure "defer" statement is called for all exit paths
func() {
latestBlockNum := ecs.GetLastConfirmedBlockNum()

latestBlockNum := ecs.eventTracker.latestBlockNum
ecs.eventTracker.mu.Lock()
defer ecs.eventTracker.mu.Unlock()

if err != nil {
ecs.logger.Warn("error in chain event subscription: " + err.Error())
ecs.eventSub.Unsubscribe()
} else {
ecs.logger.Warn("chain event subscription closed")
}

// Use exponential backoff loop to attempt to re-establish subscription
for backoffTime := MIN_BACKOFF_TIME; backoffTime < MAX_BACKOFF_TIME; backoffTime *= 2 {
eventSub, err := ecs.chain.SubscribeFilterLogs(ecs.ctx, eventQuery, eventChan)
if err != nil {
ecs.logger.Warn("failed to resubscribe to chain events, retrying", "backoffTime", backoffTime)
time.Sleep(backoffTime)
continue
ecs.logger.Warn("error in chain event subscription: " + err.Error())
ecs.eventSub.Unsubscribe()
} else {
ecs.logger.Warn("chain event subscription closed")
}

ecs.eventSub = eventSub
ecs.logger.Debug("resubscribed to chain events")
err = ecs.checkForMissedEvents(latestBlockNum)
if err != nil {
errorChan <- fmt.Errorf("subscribeFilterLogs failed during checkForMissedEvents: " + err.Error())
return
resubscribed := false // Flag to indicate whether resubscription was successful

// Use exponential backoff loop to attempt to re-establish subscription
for backoffTime := MIN_BACKOFF_TIME; backoffTime < MAX_BACKOFF_TIME; backoffTime *= 2 {
eventSub, err := ecs.chain.SubscribeFilterLogs(ecs.ctx, eventQuery, eventChan)
if err != nil {
ecs.logger.Warn("failed to resubscribe to chain events, retrying", "backoffTime", backoffTime)
time.Sleep(backoffTime)
continue
}

ecs.eventSub = eventSub
ecs.logger.Debug("resubscribed to chain events")
err = ecs.checkForMissedEvents(latestBlockNum)
if err != nil {
errorChan <- fmt.Errorf("subscribeFilterLogs failed during checkForMissedEvents: " + err.Error())
return
}

resubscribed = true
break
}

break
}

ecs.logger.Error("subscribeFilterLogs failed to resubscribe")
errorChan <- fmt.Errorf("subscribeFilterLogs failed to resubscribe")
return
if !resubscribed {
ecs.logger.Error("subscribeFilterLogs failed to resubscribe")
errorChan <- fmt.Errorf("subscribeFilterLogs failed to resubscribe")
return
}
}()

case <-time.After(RESUB_INTERVAL):
// Due to https://github.com/ethereum/go-ethereum/issues/23845 we can't rely on a long running subscription.
Expand Down

0 comments on commit e7e508f

Please sign in to comment.