Skip to content

Commit

Permalink
[Producer] respect context cancellation in Flush (apache#1165)
Browse files Browse the repository at this point in the history
### Motivation

The producer's `Flush` method does not respect context cancellation. If the caller's context get's cancelled, it will have to wait for the producer to finish flushing.

### Modifications

This change adds a `FlushWithCtx` method which takes a context and selects on two channels.
  • Loading branch information
jayshrivastava authored Feb 2, 2024
1 parent 8776135 commit 2a28e21
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 24 deletions.
6 changes: 3 additions & 3 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) {
}
wg.Wait()

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)

// send another batch
Expand Down Expand Up @@ -1218,7 +1218,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) {
}, nil)
}

producer.Flush()
producer.FlushWithCtx(context.Background())

for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
Expand Down Expand Up @@ -3932,7 +3932,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o
log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), id.BatchSize())
})
}
assert.Nil(t, producer.Flush())
assert.Nil(t, producer.FlushWithCtx(context.Background()))

msgIds := make([]MessageID, BatchingMaxSize)
for i := 0; i < BatchingMaxSize; i++ {
Expand Down
4 changes: 4 additions & 0 deletions pulsar/internal/pulsartracing/producer_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ func (p *mockProducer) Flush() error {
return nil
}

func (p *mockProducer) FlushWithCtx(ctx context.Context) error {
return nil
}

func (p *mockProducer) Close() {}
7 changes: 5 additions & 2 deletions pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,13 @@ type Producer interface {
// return the last sequence id published by this producer.
LastSequenceID() int64

// Flush all the messages buffered in the client and wait until all messages have been successfully
// persisted.
// Deprecated: Use `FlushWithCtx()` instead.
Flush() error

// Flush all the messages buffered in the client and wait until all messageshave been successfully
// persisted.
FlushWithCtx(ctx context.Context) error

// Close the producer and releases resources allocated
// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
// of errors, pending writes will not be retried.
Expand Down
6 changes: 5 additions & 1 deletion pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,15 @@ func (p *producer) LastSequenceID() int64 {
}

func (p *producer) Flush() error {
return p.FlushWithCtx(context.Background())
}

func (p *producer) FlushWithCtx(ctx context.Context) error {
p.RLock()
defer p.RUnlock()

for _, pp := range p.producers {
if err := pp.Flush(); err != nil {
if err := pp.FlushWithCtx(ctx); err != nil {
return err
}

Expand Down
18 changes: 15 additions & 3 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,15 +1422,27 @@ func (p *partitionProducer) LastSequenceID() int64 {
}

func (p *partitionProducer) Flush() error {
return p.FlushWithCtx(context.Background())
}

func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
flushReq := &flushRequest{
doneCh: make(chan struct{}),
err: nil,
}
p.cmdChan <- flushReq
select {
case <-ctx.Done():
return ctx.Err()
case p.cmdChan <- flushReq:
}

// wait for the flush request to complete
<-flushReq.doneCh
return flushReq.err
select {
case <-ctx.Done():
return ctx.Err()
case <-flushReq.doneCh:
return flushReq.err
}
}

func (p *partitionProducer) getProducerState() producerState {
Expand Down
24 changes: 12 additions & 12 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestProducerAsyncSend(t *testing.T) {
assert.NoError(t, err)
}

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)

wg.Wait()
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestProducerFlushDisableBatching(t *testing.T) {
assert.NoError(t, err)
}

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)

wg.Wait()
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestFlushInProducer(t *testing.T) {
})
assert.Nil(t, err)
}
err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)
wg.Wait()

Expand Down Expand Up @@ -429,7 +429,7 @@ func TestFlushInProducer(t *testing.T) {
assert.Nil(t, err)
}

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)
wg.Wait()

Expand Down Expand Up @@ -500,7 +500,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {
}

// After flush, should be able to consume.
err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)

wg.Wait()
Expand Down Expand Up @@ -1717,7 +1717,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) {
}

}
producer.Flush()
producer.FlushWithCtx(context.Background())

//// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Expand Down Expand Up @@ -1808,7 +1808,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) {
assert.NotNil(t, id)
})
}
producer.Flush()
producer.FlushWithCtx(context.Background())

//// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Expand Down Expand Up @@ -2027,9 +2027,9 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

// flush pending msg
err = producer1.Flush()
err = producer1.FlushWithCtx(context.Background())
assert.NoError(t, err)
err = producer2.Flush()
err = producer2.FlushWithCtx(context.Background())
assert.NoError(t, err)
assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())

Expand Down Expand Up @@ -2118,9 +2118,9 @@ func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) {
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

// flush pending msg
err = producer1.Flush()
err = producer1.FlushWithCtx(context.Background())
assert.NoError(t, err)
err = producer2.Flush()
err = producer2.FlushWithCtx(context.Background())
assert.NoError(t, err)
assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())

Expand Down Expand Up @@ -2244,7 +2244,7 @@ func TestMemLimitContextCancel(t *testing.T) {
cancel()
wg.Wait()

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)
assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())

Expand Down
6 changes: 3 additions & 3 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func TestReaderOnSpecificMessageWithBatching(t *testing.T) {
})
}

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)

// create reader on 5th message (not included)
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestReaderOnLatestWithBatching(t *testing.T) {
})
}

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)

// create reader on 5th message (not included)
Expand Down Expand Up @@ -592,7 +592,7 @@ func TestReaderSeek(t *testing.T) {
seekID = id
}
}
err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)

for i := 0; i < N; i++ {
Expand Down

0 comments on commit 2a28e21

Please sign in to comment.