diff --git a/pulsar-inttest-client/pom.xml b/pulsar-inttest-client/pom.xml
index e30dd6c84ca0c..70ff707364693 100644
--- a/pulsar-inttest-client/pom.xml
+++ b/pulsar-inttest-client/pom.xml
@@ -39,9 +39,9 @@
${project.version}
- org.testng
- testng
- ${testng.version}
+ org.assertj
+ assertj-core
+ ${assertj-core.version}
org.apache.pulsar
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/DelayMessaging.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/DelayMessaging.java
index c2fd0318fd3a7..d6e4c0dd2ebd0 100644
--- a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/DelayMessaging.java
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/DelayMessaging.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.tests.integration.messaging;
import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getPartitionedTopic;
+import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -31,7 +32,6 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.integration.IntegTest;
-import org.testng.Assert;
/**
* Delay messaging test.
@@ -77,14 +77,16 @@ public void delayMsgBlockTest() throws Exception {
// receive message at first time
Message message = consumer.receive(delayTimeSeconds * 2, TimeUnit.SECONDS);
- Assert.assertNotNull(message, "Can't receive message at the first time.");
+ assertThat(message).isNotNull().as("Can't receive message at the first time.");
consumer.reconsumeLater(message, delayTimeSeconds, TimeUnit.SECONDS);
// receive retry messages
for (int i = 0; i < redeliverCnt; i++) {
message = consumer.receive(delayTimeSeconds * 2, TimeUnit.SECONDS);
- Assert.assertNotNull(message, "Consumer can't receive message in double delayTimeSeconds time "
- + delayTimeSeconds * 2 + "s");
+ assertThat(message)
+ .isNotNull()
+ .as("Consumer can't receive message in double delayTimeSeconds time "
+ + delayTimeSeconds * 2 + "s");
log.info("receive msg. reConsumeTimes: {}", message.getProperty("RECONSUMETIMES"));
consumer.reconsumeLater(message, delayTimeSeconds, TimeUnit.SECONDS);
}
@@ -97,7 +99,7 @@ public void delayMsgBlockTest() throws Exception {
.subscribe();
message = dltConsumer.receive(10, TimeUnit.SECONDS);
- Assert.assertNotNull(message, "Dead letter topic consumer can't receive message.");
+ assertThat(message).isNotNull().as("Dead letter topic consumer can't receive message.");
}
}
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/GeoReplication.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/GeoReplication.java
index 939acfcc00cfa..03b14ae9a1d7d 100644
--- a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/GeoReplication.java
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/GeoReplication.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.tests.integration.messaging;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -30,7 +32,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.tests.integration.GeoRepIntegTest;
import org.awaitility.Awaitility;
-import org.testng.Assert;
/**
* Geo replication test.
@@ -50,9 +51,9 @@ public void testTopicReplication(String domain) throws Exception {
adminA.topics().createPartitionedTopic(topic, 10);
} catch (Exception e) {
log.error("Failed to create partitioned topic {}.", topic, e);
- Assert.fail("Failed to create partitioned topic " + topic);
+ fail("Failed to create partitioned topic " + topic);
}
- Assert.assertEquals(adminA.topics().getPartitionedTopicMetadata(topic).partitions, 10);
+ assertThat(adminA.topics().getPartitionedTopicMetadata(topic).partitions).isEqualTo(10);
});
log.info("Test geo-replication produce and consume for topic {}.", topic);
@@ -76,7 +77,7 @@ public void testTopicReplication(String domain) throws Exception {
for (int i = 0; i < 10; i++) {
Message message = c.receive(10, TimeUnit.SECONDS);
- Assert.assertNotNull(message);
+ assertThat(message).isNotNull();
}
log.info("Successfully consume message from cluster {} for topic {}.", "cluster2", topic);
}
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessaging.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessaging.java
index 42036f087e076..55b3d7e5c11d7 100644
--- a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessaging.java
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessaging.java
@@ -20,7 +20,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getNonPartitionedTopic;
-import static org.testng.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
import java.util.stream.IntStream;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
@@ -62,7 +62,7 @@ public void testNonDurableConsumer() throws Exception {
for (int i = 0; i < numMessages; i++) {
Message msg = consumer.receive();
- assertEquals(new String(msg.getValue(), UTF_8), "message-" + i);
+ assertThat(new String(msg.getValue(), UTF_8)).isEqualTo("message-" + i);
}
}
}
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/ReaderMessaging.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/ReaderMessaging.java
index 9a8849a116fe3..972c763129086 100644
--- a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/ReaderMessaging.java
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/ReaderMessaging.java
@@ -19,8 +19,7 @@
package org.apache.pulsar.tests.integration.messaging;
import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getNonPartitionedTopic;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -58,24 +57,24 @@ public void testReaderReconnectAndRead() throws Exception {
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value("message-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
for (int i = 0; i < messagesToSend; i++) {
Message msg = reader.readNext();
- assertEquals(msg.getValue(), "message-" + i);
+ assertThat(msg.getValue()).isEqualTo("message-" + i);
}
admin.topics().unload(topicName);
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value("message-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
for (int i = 0; i < messagesToSend; i++) {
Message msg = reader.readNext();
- assertEquals(msg.getValue(), "message-" + i);
+ assertThat(msg.getValue()).isEqualTo("message-" + i);
}
log.info("-- Exiting testReaderReconnectAndRead test --");
@@ -103,24 +102,24 @@ public void testReaderReconnectAndReadBatchMessages()
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value("message-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
for (int i = 0; i < messagesToSend; i++) {
Message msg = reader.readNext();
- assertEquals(msg.getValue(), "message-" + i);
+ assertThat(msg.getValue()).isEqualTo("message-" + i);
}
admin.topics().unload(topicName);
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value("message-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
for (int i = 0; i < messagesToSend; i++) {
Message msg = reader.readNext();
- assertEquals(msg.getValue(), "message-" + i);
+ assertThat(msg.getValue()).isEqualTo("message-" + i);
}
log.info("-- Exiting testReaderReconnectAndReadBatchMessages test --");
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/TopicMessaging.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/TopicMessaging.java
index e0a283962fa4d..6a9e4e7be56a6 100644
--- a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/TopicMessaging.java
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/TopicMessaging.java
@@ -20,10 +20,8 @@
import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getNonPartitionedTopic;
import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getPartitionedTopic;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -82,7 +80,7 @@ public void nonPartitionedTopicSendAndReceiveWithExclusive(boolean isPersistent)
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckOrderAndDuplicate(Collections.singletonList(consumer), messagesToSend);
@@ -102,7 +100,7 @@ public void partitionedTopicSendAndReceiveWithExclusive(boolean isPersistent) th
.subscribe();
consumerList.add(consumer);
}
- assertEquals(partitions, consumerList.size());
+ assertThat(consumerList.size()).isEqualTo(partitions);
try {
client.newConsumer(Schema.STRING)
.topic(topicName + "-partition-" + 0)
@@ -122,7 +120,7 @@ public void partitionedTopicSendAndReceiveWithExclusive(boolean isPersistent) th
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
@@ -131,7 +129,7 @@ public void partitionedTopicSendAndReceiveWithExclusive(boolean isPersistent) th
crashedConsumer.close();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend - 3);
closeConsumers(consumerList);
@@ -153,8 +151,8 @@ public void nonPartitionedTopicSendAndReceiveWithFailover(boolean isPersistent)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Failover)
.subscribe();
- assertNotNull(standbyConsumer);
- assertTrue(standbyConsumer.isConnected());
+ assertThat(standbyConsumer).isNotNull();
+ assertThat(standbyConsumer.isConnected()).isTrue();
consumerList.add(standbyConsumer);
final int messagesToSend = 10;
final String producerName = "producerForFailover";
@@ -165,7 +163,7 @@ public void nonPartitionedTopicSendAndReceiveWithFailover(boolean isPersistent)
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
@@ -176,7 +174,7 @@ public void nonPartitionedTopicSendAndReceiveWithFailover(boolean isPersistent)
crashedConsumer.close();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
@@ -199,10 +197,10 @@ public void partitionedTopicSendAndReceiveWithFailover(boolean isPersistent) thr
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Failover)
.subscribe();
- assertNotNull(standbyConsumer);
- assertTrue(standbyConsumer.isConnected());
+ assertThat(standbyConsumer).isNotNull();
+ assertThat(standbyConsumer.isConnected()).isTrue();
consumerList.add(standbyConsumer);
- assertEquals(consumerList.size(), 2);
+ assertThat(consumerList.size()).isEqualTo(2);
final int messagesToSend = 9;
final String producerName = "producerForFailover";
@Cleanup final Producer producer = client.newProducer(Schema.STRING)
@@ -212,7 +210,7 @@ public void partitionedTopicSendAndReceiveWithFailover(boolean isPersistent) thr
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
@@ -223,7 +221,7 @@ public void partitionedTopicSendAndReceiveWithFailover(boolean isPersistent) thr
crashedConsumer.close();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
@@ -245,8 +243,8 @@ public void nonPartitionedTopicSendAndReceiveWithShared(boolean isPersistent) th
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
- assertNotNull(moreConsumer);
- assertTrue(moreConsumer.isConnected());
+ assertThat(moreConsumer).isNotNull();
+ assertThat(moreConsumer.isConnected()).isTrue();
consumerList.add(moreConsumer);
final int messagesToSend = 10;
final String producerName = "producerForShared";
@@ -257,7 +255,7 @@ public void nonPartitionedTopicSendAndReceiveWithShared(boolean isPersistent) th
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckDuplicate(consumerList, messagesToSend);
@@ -266,7 +264,7 @@ public void nonPartitionedTopicSendAndReceiveWithShared(boolean isPersistent) th
crashedConsumer.close();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
@@ -286,7 +284,7 @@ public void partitionedTopicSendAndReceiveWithShared(boolean isPersistent) throw
.subscribe();
consumerList.add(consumer);
}
- assertEquals(partitions, consumerList.size());
+ assertThat(consumerList.size()).isEqualTo(partitions);
final int messagesToSend = 10;
final String producerName = "producerForFailover";
@Cleanup final Producer producer = client.newProducer(Schema.STRING)
@@ -296,7 +294,7 @@ public void partitionedTopicSendAndReceiveWithShared(boolean isPersistent) throw
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckDuplicate(consumerList, messagesToSend);
@@ -305,7 +303,7 @@ public void partitionedTopicSendAndReceiveWithShared(boolean isPersistent) throw
crashedConsumer.close();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
@@ -321,15 +319,15 @@ public void nonPartitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
- assertTrue(consumer.isConnected());
+ assertThat(consumer.isConnected()).isTrue();
consumerList.add(consumer);
Consumer moreConsumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
- assertNotNull(moreConsumer);
- assertTrue(moreConsumer.isConnected());
+ assertThat(moreConsumer).isNotNull();
+ assertThat(moreConsumer.isConnected()).isTrue();
consumerList.add(moreConsumer);
final int messagesToSend = 10;
final String producerName = "producerForKeyShared";
@@ -343,7 +341,7 @@ public void nonPartitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent)
.key(UUID.randomUUID().toString())
.value(producer.getProducerName() + "-" + i)
.send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("publish messages complete.");
receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
@@ -355,7 +353,7 @@ public void nonPartitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent)
.key(UUID.randomUUID().toString())
.value(producer.getProducerName() + "-" + i)
.send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
@@ -372,15 +370,15 @@ public void partitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent) th
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
- assertTrue(consumer.isConnected());
+ assertThat(consumer.isConnected()).isTrue();
consumerList.add(consumer);
Consumer moreConsumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
- assertNotNull(moreConsumer);
- assertTrue(moreConsumer.isConnected());
+ assertThat(moreConsumer).isNotNull();
+ assertThat(moreConsumer.isConnected()).isTrue();
consumerList.add(moreConsumer);
final int messagesToSend = 10;
final String producerName = "producerForKeyShared";
@@ -394,7 +392,7 @@ public void partitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent) th
.key(UUID.randomUUID().toString())
.value(producer.getProducerName() + "-" + i)
.send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("publish messages complete.");
receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
@@ -406,7 +404,7 @@ public void partitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent) th
.key(UUID.randomUUID().toString())
.value(producer.getProducerName() + "-" + i)
.send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
@@ -431,20 +429,22 @@ public void partitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent) th
if (currentReceived != null) {
consumer.acknowledge(currentReceived);
if (lastReceivedMap.containsKey(currentReceived.getTopicName())) {
- assertTrue(currentReceived.getMessageId().compareTo(
- lastReceivedMap.get(currentReceived.getTopicName()).getMessageId()) > 0,
- "Received messages are not in order.");
+ assertThat(currentReceived.getMessageId().compareTo(
+ lastReceivedMap.get(currentReceived.getTopicName()).getMessageId()) > 0)
+ .as("Received messages are not in order.")
+ .isTrue();
}
} else {
break;
}
lastReceivedMap.put(currentReceived.getTopicName(), currentReceived);
// Make sure that there are no duplicates
- assertTrue(messagesReceived.add(currentReceived.getValue()),
- "Received duplicate message " + currentReceived.getValue());
+ assertThat(messagesReceived.add(currentReceived.getValue())).
+ isTrue()
+ .as("Received duplicate message " + currentReceived.getValue());
}
}
- assertEquals(messagesToReceive, messagesReceived.size());
+ assertThat(messagesReceived.size()).isEqualTo(messagesToReceive);
}
protected static void receiveMessagesCheckDuplicate
@@ -462,14 +462,15 @@ public void partitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent) th
if (currentReceived != null) {
consumer.acknowledge(currentReceived);
// Make sure that there are no duplicates
- assertTrue(messagesReceived.add(currentReceived.getValue()),
- "Received duplicate message " + currentReceived.getValue());
+ assertThat(messagesReceived.add(currentReceived.getValue()))
+ .isTrue()
+ .as("Received duplicate message " + currentReceived.getValue());
} else {
break;
}
}
}
- assertEquals(messagesReceived.size(), messagesToReceive);
+ assertThat(messagesReceived.size()).isEqualTo(messagesToReceive);
}
protected static void receiveMessagesCheckStickyKeyAndDuplicate
@@ -487,12 +488,13 @@ public void partitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent) th
}
if (currentReceived != null) {
consumer.acknowledge(currentReceived);
- assertNotNull(currentReceived.getKey());
+ assertThat(currentReceived.getKey()).isNotNull();
consumerKeys.putIfAbsent(consumer.getConsumerName(), new HashSet<>());
consumerKeys.get(consumer.getConsumerName()).add(currentReceived.getKey());
// Make sure that there are no duplicates
- assertTrue(messagesReceived.add(currentReceived.getValue()),
- "Received duplicate message " + currentReceived.getValue());
+ assertThat(messagesReceived.add(currentReceived.getValue()))
+ .isTrue()
+ .as("Received duplicate message " + currentReceived.getValue());
} else {
break;
}
@@ -501,10 +503,11 @@ public void partitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent) th
// Make sure key will not be distributed to multiple consumers (except null key)
Set allKeys = new HashSet<>();
consumerKeys.forEach((k, v) -> v.stream().filter(Objects::nonNull).forEach(key -> {
- assertTrue(allKeys.add(key),
- "Key " + key + " is distributed to multiple consumers");
+ assertThat(allKeys.add(key))
+ .isTrue()
+ .as("Key " + key + " is distributed to multiple consumers");
}));
- assertEquals(messagesReceived.size(), messagesToReceive);
+ assertThat(messagesReceived.size()).isEqualTo(messagesToReceive);
}
protected static void closeConsumers(List> consumerList) throws PulsarClientException {
diff --git a/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
index 4bb0bb85c946d..e473a05d8aa8f 100644
--- a/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
@@ -20,6 +20,7 @@
import static java.util.stream.Collectors.joining;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -98,6 +99,8 @@ protected PulsarClient getPulsarClient(PulsarCluster cluster) throws IOException
}
protected PulsarAdmin getPulsarAdmin(PulsarCluster cluster) throws IOException {
- return PulsarAdmin.builder().serviceHttpUrl(cluster.getHttpServiceUrl()).build();
+ return PulsarAdmin.builder().serviceHttpUrl(cluster.getHttpServiceUrl())
+ .requestTimeout(30, TimeUnit.SECONDS)
+ .build();
}
}