Skip to content

Commit

Permalink
Fix context cancellation in buffered consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
francescopepe committed Jul 7, 2023
1 parent 96467d1 commit 6cc7e52
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
11 changes: 7 additions & 4 deletions consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (c *multiMessageConsumer) consume(concurrency int, errorCh chan<- error, me
}

select {
case <-consumers: // Use an available comsumer
case <-consumers: // Use an available consumer
case <-buffer.CtxExpired():
errorCh <- errBufferCtxExpired

Expand All @@ -186,16 +186,19 @@ func (c *multiMessageConsumer) consume(concurrency int, errorCh chan<- error, me
continue
}

ctx, cancelCtx := buffer.PullContext()

wg.Add(1)
go func(ctx context.Context, messages []messages.Message) {
go func(ctx context.Context, ctxCancelFunc context.CancelFunc, msgs []messages.Message) {
defer func() {
wg.Done()
consumers <- struct{}{} // Release consumer
ctxCancelFunc() // Cancel context
}()

// Process the messages
c.processMessages(errorCh, deleteCh, ctx, messages)
}(buffer.Context(), buffer.Messages())
c.processMessages(errorCh, deleteCh, ctx, msgs)
}(ctx, cancelCtx, buffer.Messages())

// Reset buffer
buffer.Reset()
Expand Down
9 changes: 9 additions & 0 deletions internal/messages/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ func (b *BufferWithContextTimeout) Context() context.Context {
return b.ctx
}

func (b *BufferWithContextTimeout) PullContext() (context.Context, context.CancelFunc) {
ctx, cancelCtx := b.ctx, b.cancelCtx

b.ctx = context.Background() // Create a context that doesn't expire
b.cancelCtx = func() {}

return ctx, cancelCtx
}

func NewBufferWithContextTimeout(config BufferWithContextTimeoutConfiguration) *BufferWithContextTimeout {
return &BufferWithContextTimeout{
Buffer: NewMessageBuffer(BufferConfiguration{
Expand Down

0 comments on commit 6cc7e52

Please sign in to comment.