Skip to content

Commit

Permalink
[fix] Fix ordering key not being set and parsed when batching is disa…
Browse files Browse the repository at this point in the history
…bled
  • Loading branch information
RobertIndie committed Jun 26, 2023
1 parent 56c9691 commit 36cb5fa
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
1 change: 1 addition & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
redeliveryCount: response.GetRedeliveryCount(),
schemaVersion: msgMeta.GetSchemaVersion(),
schemaInfoCache: pc.schemaInfoCache,
orderingKey: string(msgMeta.GetOrderingKey()),
index: messageIndex,
brokerPublishTime: brokerPublishTime,
}
Expand Down
4 changes: 4 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ func (p *partitionProducer) genMetadata(msg *ProducerMessage,
mm.PartitionKey = proto.String(msg.Key)
}

if len(msg.OrderingKey) != 0 {
mm.OrderingKey = []byte(msg.OrderingKey)
}

if msg.Properties != nil {
mm.Properties = internal.ConvertFromStringMap(msg.Properties)
}
Expand Down
22 changes: 18 additions & 4 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1906,7 +1906,15 @@ func TestMemLimitContextCancel(t *testing.T) {
assert.NoError(t, err)
}

func TestSendMessagesWithMetadata(t *testing.T) {
func TestBatchSendMessagesWithMetadata(t *testing.T) {
testSendMessagesWithMetadata(t, false)
}

func TestNoBatchSendMessagesWithMetadata(t *testing.T) {
testSendMessagesWithMetadata(t, true)
}

func testSendMessagesWithMetadata(t *testing.T, disableBatch bool) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
Expand All @@ -1917,7 +1925,7 @@ func TestSendMessagesWithMetadata(t *testing.T) {
topic := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
DisableBatching: disableBatch,
})
assert.Nil(t, err)

Expand All @@ -1928,13 +1936,19 @@ func TestSendMessagesWithMetadata(t *testing.T) {
assert.Nil(t, err)

msg := &ProducerMessage{EventTime: time.Now().Local(),
Payload: []byte("msg")}
Key: "my-key",
OrderingKey: "my-ordering-key",
Properties: map[string]string{"k1": "v1", "k2": "v2"},
Payload: []byte("msg")}

_, err = producer.Send(context.Background(), msg)
assert.Nil(t, err)

recvMsg, err := consumer.Receive(context.Background())
assert.Nil(t, err)

assert.Equal(t, internal.TimestampMillis(recvMsg.EventTime()), internal.TimestampMillis(msg.EventTime))
assert.Equal(t, internal.TimestampMillis(msg.EventTime), internal.TimestampMillis(recvMsg.EventTime()))
assert.Equal(t, msg.Key, recvMsg.Key())
assert.Equal(t, msg.OrderingKey, recvMsg.OrderingKey())
assert.Equal(t, msg.Properties, recvMsg.Properties())
}

0 comments on commit 36cb5fa

Please sign in to comment.