diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 2d2a1940f7..a99cec350c 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -426,7 +426,10 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn } if cmid, ok := msgID.(*chunkMessageID); ok { - return pc.unAckChunksTracker.ack(cmid) + if txn == nil { + return pc.unAckChunksTracker.ack(cmid) + } + return pc.unAckChunksTracker.ackWithTxn(cmid, txn) } trackingID := toTrackingMessageID(msgID) @@ -2202,9 +2205,19 @@ func (u *unAckChunksTracker) remove(cmid *chunkMessageID) { } func (u *unAckChunksTracker) ack(cmid *chunkMessageID) error { + return u.ackWithTxn(cmid, nil) +} + +func (u *unAckChunksTracker) ackWithTxn(cmid *chunkMessageID, txn Transaction) error { ids := u.get(cmid) for _, id := range ids { - if err := u.pc.AckID(id); err != nil { + var err error + if txn == nil { + err = u.pc.AckID(id) + } else { + err = u.pc.AckIDWithTxn(id, txn) + } + if err != nil { return err } } diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go index 66a82ccad1..385b197e00 100644 --- a/pulsar/transaction_test.go +++ b/pulsar/transaction_test.go @@ -423,6 +423,14 @@ func TestTransactionAbort(t *testing.T) { // Abort the transaction. _ = txn.Abort(context.Background()) + consumerShouldNotReceiveMessage(t, consumer) + + // Clean up: Close the consumer and producer instances. + consumer.Close() + producer.Close() +} + +func consumerShouldNotReceiveMessage(t *testing.T, consumer Consumer) { // Expectation: The consumer should not receive any messages. done := make(chan struct{}) go func() { @@ -438,8 +446,97 @@ func TestTransactionAbort(t *testing.T) { require.Fail(t, "The consumer should not receive any messages") case <-time.After(time.Second): } +} - // Clean up: Close the consumer and producer instances. +func TestAckChunkMessage(t *testing.T) { + topic := newTopicName() + sub := "my-sub" + + // Prepare: Create PulsarClient and initialize the transaction coordinator client. + _, client := createTcClient(t) + + // Create transaction and register the send operation. + txn, err := client.NewTransaction(time.Hour) + require.Nil(t, err) + txn.(*transaction).registerSendOrAckOp() + + // Create a producer with chunking enabled to send a large message that will be split into chunks. + producer, err := client.CreateProducer(ProducerOptions{ + Name: "test", + Topic: topic, + EnableChunking: true, + DisableBatching: true, + }) + require.NoError(t, err) + require.NotNil(t, producer) + defer producer.Close() + + // Subscribe to the consumer. + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + Type: Exclusive, + SubscriptionName: sub, + }) + require.NoError(t, err) + defer consumer.Close() + + // Send a large message that will be split into chunks. + msgID, err := producer.Send(context.Background(), &ProducerMessage{ + Transaction: txn, + Payload: createTestMessagePayload(_brokerMaxMessageSize), + }) + require.NoError(t, err) + _, ok := msgID.(*chunkMessageID) + require.True(t, ok) + + err = txn.Commit(context.Background()) + require.Nil(t, err) + + // Receive the message using a new transaction and ack it. + txn2, err := client.NewTransaction(time.Hour) + require.Nil(t, err) + message, err := consumer.Receive(context.Background()) + require.Nil(t, err) + + err = consumer.AckWithTxn(message, txn2) + require.Nil(t, err) + + txn2.Abort(context.Background()) + + // Close the consumer to simulate reconnection and receive the same message again. consumer.Close() - producer.Close() + + // Subscribe to the consumer again. + consumer, err = client.Subscribe(ConsumerOptions{ + Topic: topic, + Type: Exclusive, + SubscriptionName: sub, + }) + require.Nil(t, err) + message, err = consumer.Receive(context.Background()) + require.Nil(t, err) + require.NotNil(t, message) + + // Create a new transaction and ack the message again. + txn3, err := client.NewTransaction(time.Hour) + require.Nil(t, err) + + err = consumer.AckWithTxn(message, txn3) + require.Nil(t, err) + + // Commit the third transaction. + err = txn3.Commit(context.Background()) + require.Nil(t, err) + + // Close the consumer again. + consumer.Close() + + // Subscribe to the consumer again and verify that no message is received. + consumer, err = client.Subscribe(ConsumerOptions{ + Topic: topic, + Type: Exclusive, + SubscriptionName: sub, + }) + require.Nil(t, err) + consumerShouldNotReceiveMessage(t, consumer) }