From 6cc7e52060a5d337e111834fb6bc41ccdcf11414 Mon Sep 17 00:00:00 2001 From: Francesco Pepe <3891780+francescopepe@users.noreply.github.com> Date: Fri, 7 Jul 2023 16:51:07 +0100 Subject: [PATCH] Fix context cancellation in buffered consumer --- consumers.go | 11 +++++++---- internal/messages/messages.go | 9 +++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/consumers.go b/consumers.go index e6cf140..ed609c2 100644 --- a/consumers.go +++ b/consumers.go @@ -177,7 +177,7 @@ func (c *multiMessageConsumer) consume(concurrency int, errorCh chan<- error, me } select { - case <-consumers: // Use an available comsumer + case <-consumers: // Use an available consumer case <-buffer.CtxExpired(): errorCh <- errBufferCtxExpired @@ -186,16 +186,19 @@ func (c *multiMessageConsumer) consume(concurrency int, errorCh chan<- error, me continue } + ctx, cancelCtx := buffer.PullContext() + wg.Add(1) - go func(ctx context.Context, messages []messages.Message) { + go func(ctx context.Context, ctxCancelFunc context.CancelFunc, msgs []messages.Message) { defer func() { wg.Done() consumers <- struct{}{} // Release consumer + ctxCancelFunc() // Cancel context }() // Process the messages - c.processMessages(errorCh, deleteCh, ctx, messages) - }(buffer.Context(), buffer.Messages()) + c.processMessages(errorCh, deleteCh, ctx, msgs) + }(ctx, cancelCtx, buffer.Messages()) // Reset buffer buffer.Reset() diff --git a/internal/messages/messages.go b/internal/messages/messages.go index 4030a8b..13c9ba4 100644 --- a/internal/messages/messages.go +++ b/internal/messages/messages.go @@ -156,6 +156,15 @@ func (b *BufferWithContextTimeout) Context() context.Context { return b.ctx } +func (b *BufferWithContextTimeout) PullContext() (context.Context, context.CancelFunc) { + ctx, cancelCtx := b.ctx, b.cancelCtx + + b.ctx = context.Background() // Create a context that doesn't expire + b.cancelCtx = func() {} + + return ctx, cancelCtx +} + func NewBufferWithContextTimeout(config BufferWithContextTimeoutConfiguration) *BufferWithContextTimeout { return &BufferWithContextTimeout{ Buffer: NewMessageBuffer(BufferConfiguration{