From b7e96ddf45ee87aae91b3d88ffa6910dc7460718 Mon Sep 17 00:00:00 2001 From: Alessandro Pagnin Date: Wed, 29 Jan 2025 09:39:45 +0100 Subject: [PATCH] fix: heartbeat go routine gone rogue (#1030) When running integration tests with high concurrency, race warnings frequently appeared, often triggered by the heartbeat. It seems that after some refactoring, a goroutine was left behind without any synchronization mechanism, leading to the issue. The fix should be straightforward: simply remove the goroutine. --- v2/pkg/engine/resolve/resolve.go | 46 +++++++++++++++----------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 80365ac60..b05b0af26 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -426,35 +426,33 @@ func (r *Resolver) handleHeartbeat(data []byte) { continue } - go func() { - if r.options.Debug { - fmt.Printf("resolver:heartbeat:subscription:%d\n", s.id.SubscriptionID) - } + if r.options.Debug { + fmt.Printf("resolver:heartbeat:subscription:%d\n", s.id.SubscriptionID) + } - s.mux.Lock() - if _, err := s.writer.Write(data); err != nil { - if errors.Is(err, context.Canceled) { - // client disconnected - s.mux.Unlock() - _ = r.AsyncUnsubscribeSubscription(s.id) - return - } - r.asyncErrorWriter.WriteError(c, err, nil, s.writer) - } - err := s.writer.Flush() - s.mux.Unlock() - if err != nil { + s.mux.Lock() + if _, err := s.writer.Write(data); err != nil { + if errors.Is(err, context.Canceled) { // client disconnected + s.mux.Unlock() _ = r.AsyncUnsubscribeSubscription(s.id) return } - if r.options.Debug { - fmt.Printf("resolver:heartbeat:subscription:flushed:%d\n", s.id.SubscriptionID) - } - if r.reporter != nil { - r.reporter.SubscriptionUpdateSent() - } - }() + r.asyncErrorWriter.WriteError(c, err, nil, s.writer) + } + err := s.writer.Flush() + s.mux.Unlock() + if err != nil { + // client disconnected + _ = r.AsyncUnsubscribeSubscription(s.id) + return + } + if r.options.Debug { + fmt.Printf("resolver:heartbeat:subscription:flushed:%d\n", s.id.SubscriptionID) + } + if r.reporter != nil { + r.reporter.SubscriptionUpdateSent() + } } }