diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index df70b0dd0b..d66e23765d 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -983,7 +983,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) { } wg.Wait() - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) // send another batch @@ -1218,7 +1218,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) { }, nil) } - producer.Flush() + producer.FlushWithCtx(context.Background()) for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) @@ -3932,7 +3932,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), id.BatchSize()) }) } - assert.Nil(t, producer.Flush()) + assert.Nil(t, producer.FlushWithCtx(context.Background())) msgIds := make([]MessageID, BatchingMaxSize) for i := 0; i < BatchingMaxSize; i++ { diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go b/pulsar/internal/pulsartracing/producer_interceptor_test.go index 8d8e6965b8..1c2c712fcf 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go @@ -67,4 +67,8 @@ func (p *mockProducer) Flush() error { return nil } +func (p *mockProducer) FlushWithCtx(ctx context.Context) error { + return nil +} + func (p *mockProducer) Close() {} diff --git a/pulsar/producer.go b/pulsar/producer.go index 70d152c78b..f8013a16ff 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -237,10 +237,13 @@ type Producer interface { // return the last sequence id published by this producer. LastSequenceID() int64 - // Flush all the messages buffered in the client and wait until all messages have been successfully - // persisted. + // Deprecated: Use `FlushWithCtx()` instead. Flush() error + // Flush all the messages buffered in the client and wait until all messageshave been successfully + // persisted. + FlushWithCtx(ctx context.Context) error + // Close the producer and releases resources allocated // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case // of errors, pending writes will not be retried. diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 3c45b597d0..ca923108fe 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -334,11 +334,15 @@ func (p *producer) LastSequenceID() int64 { } func (p *producer) Flush() error { + return p.FlushWithCtx(context.Background()) +} + +func (p *producer) FlushWithCtx(ctx context.Context) error { p.RLock() defer p.RUnlock() for _, pp := range p.producers { - if err := pp.Flush(); err != nil { + if err := pp.FlushWithCtx(ctx); err != nil { return err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1b79053e38..fbcc5b9776 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1422,15 +1422,27 @@ func (p *partitionProducer) LastSequenceID() int64 { } func (p *partitionProducer) Flush() error { + return p.FlushWithCtx(context.Background()) +} + +func (p *partitionProducer) FlushWithCtx(ctx context.Context) error { flushReq := &flushRequest{ doneCh: make(chan struct{}), err: nil, } - p.cmdChan <- flushReq + select { + case <-ctx.Done(): + return ctx.Err() + case p.cmdChan <- flushReq: + } // wait for the flush request to complete - <-flushReq.doneCh - return flushReq.err + select { + case <-ctx.Done(): + return ctx.Err() + case <-flushReq.doneCh: + return flushReq.err + } } func (p *partitionProducer) getProducerState() producerState { diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 3b9ea7e8da..ba5911565e 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -159,7 +159,7 @@ func TestProducerAsyncSend(t *testing.T) { assert.NoError(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -220,7 +220,7 @@ func TestProducerFlushDisableBatching(t *testing.T) { assert.NoError(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -387,7 +387,7 @@ func TestFlushInProducer(t *testing.T) { }) assert.Nil(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -429,7 +429,7 @@ func TestFlushInProducer(t *testing.T) { assert.Nil(t, err) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -500,7 +500,7 @@ func TestFlushInPartitionedProducer(t *testing.T) { } // After flush, should be able to consume. - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.Nil(t, err) wg.Wait() @@ -1717,7 +1717,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) { } } - producer.Flush() + producer.FlushWithCtx(context.Background()) //// create consumer consumer, err := client.Subscribe(ConsumerOptions{ @@ -1808,7 +1808,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) { assert.NotNil(t, id) }) } - producer.Flush() + producer.FlushWithCtx(context.Background()) //// create consumer consumer, err := client.Subscribe(ConsumerOptions{ @@ -2027,9 +2027,9 @@ func TestMemLimitRejectProducerMessages(t *testing.T) { assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) // flush pending msg - err = producer1.Flush() + err = producer1.FlushWithCtx(context.Background()) assert.NoError(t, err) - err = producer2.Flush() + err = producer2.FlushWithCtx(context.Background()) assert.NoError(t, err) assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) @@ -2118,9 +2118,9 @@ func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) { assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) // flush pending msg - err = producer1.Flush() + err = producer1.FlushWithCtx(context.Background()) assert.NoError(t, err) - err = producer2.Flush() + err = producer2.FlushWithCtx(context.Background()) assert.NoError(t, err) assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) @@ -2244,7 +2244,7 @@ func TestMemLimitContextCancel(t *testing.T) { cancel() wg.Wait() - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index ec10f8f162..c8228a7ca9 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -277,7 +277,7 @@ func TestReaderOnSpecificMessageWithBatching(t *testing.T) { }) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) // create reader on 5th message (not included) @@ -353,7 +353,7 @@ func TestReaderOnLatestWithBatching(t *testing.T) { }) } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) // create reader on 5th message (not included) @@ -592,7 +592,7 @@ func TestReaderSeek(t *testing.T) { seekID = id } } - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) for i := 0; i < N; i++ {