Skip to content

Commit

Permalink
Verify message was published on single partition
Browse files Browse the repository at this point in the history
  • Loading branch information
crossoverJie committed Jun 25, 2023
1 parent 9cc6824 commit c432172
Showing 1 changed file with 34 additions and 5 deletions.
39 changes: 34 additions & 5 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,16 @@ func TestMessageSingleRouter(t *testing.T) {
assert.Nil(t, err)
defer client.Close()

numOfMessages := 10

consumer, err := client.Subscribe(ConsumerOptions{
Topic: "my-single-partitioned-topic",
SubscriptionName: "my-sub",
})

assert.Nil(t, err)
defer consumer.Close()

producer, err := client.CreateProducer(ProducerOptions{
Topic: "my-single-partitioned-topic",
MessageRouter: NewSinglePartitionRouter(),
Expand All @@ -585,11 +595,30 @@ func TestMessageSingleRouter(t *testing.T) {

ctx := context.Background()

ID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte("hello"),
})
assert.Nil(t, err)
assert.NotNil(t, ID)
for i := 0; i < numOfMessages; i++ {
ID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte("hello"),
})
assert.Nil(t, err)
assert.NotNil(t, ID)
}

// Verify message was published on single partition
msgCount := 0
msgPartitionMap := make(map[string]int)
for i := 0; i < numOfMessages; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
consumer.Ack(msg)
msgCount++
msgPartitionMap[msg.Topic()]++
}
assert.Equal(t, msgCount, numOfMessages)
assert.Equal(t, len(msgPartitionMap), 1)
for _, i := range msgPartitionMap {
assert.Equal(t, i, numOfMessages)
}

}

Expand Down

0 comments on commit c432172

Please sign in to comment.