diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 8bb2ce7c7f9ac..205dc074c8e12 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -244,6 +244,22 @@
test
+
+
+ org.testng
+ testng
+ ${testng.version}
+
+
+ org.apache.pulsar
+ pulsar-client-api
+ ${project.version}
+
+
+ org.apache.pulsar
+ pulsar-client-admin-api
+ ${project.version}
+
diff --git a/tests/integration/src/main/java/org/apache/pulsar/tests/integration/IntegTest.java b/tests/integration/src/main/java/org/apache/pulsar/tests/integration/IntegTest.java
new file mode 100644
index 0000000000000..fc4b7b6274d5e
--- /dev/null
+++ b/tests/integration/src/main/java/org/apache/pulsar/tests/integration/IntegTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class IntegTest {
+ public PulsarClient client;
+ public PulsarAdmin admin;
+
+ public IntegTest(PulsarClient client, PulsarAdmin admin) {
+ this.client = client;
+ this.admin = admin;
+ }
+
+}
diff --git a/tests/integration/src/main/java/org/apache/pulsar/tests/integration/IntegTestUtils.java b/tests/integration/src/main/java/org/apache/pulsar/tests/integration/IntegTestUtils.java
new file mode 100644
index 0000000000000..49398f7d9bf52
--- /dev/null
+++ b/tests/integration/src/main/java/org/apache/pulsar/tests/integration/IntegTestUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import static org.testng.Assert.assertTrue;
+import java.util.concurrent.ThreadLocalRandom;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+
+@Slf4j
+public class IntegTestUtils {
+ private static String randomName(int numChars) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < numChars; i++) {
+ sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
+ }
+ return sb.toString();
+ }
+
+ protected static String generateNamespaceName() {
+ return "ns-" + randomName(8);
+ }
+
+ protected static String generateTopicName(String topicPrefix, boolean isPersistent) {
+ return generateTopicName("default", topicPrefix, isPersistent);
+ }
+
+ protected static String generateTopicName(String namespace, String topicPrefix, boolean isPersistent) {
+ String topicName = new StringBuilder(topicPrefix)
+ .append("-")
+ .append(randomName(8))
+ .append("-")
+ .append(System.currentTimeMillis())
+ .toString();
+ if (isPersistent) {
+ return "persistent://public/" + namespace + "/" + topicName;
+ } else {
+ return "non-persistent://public/" + namespace + "/" + topicName;
+ }
+ }
+
+ public static String getNonPartitionedTopic(PulsarAdmin admin, String topicPrefix, boolean isPersistent)
+ throws Exception {
+ String nsName = generateNamespaceName();
+ admin.namespaces().createNamespace("public/" + nsName);
+ return generateTopicName(nsName, topicPrefix, isPersistent);
+ }
+
+ public static String getPartitionedTopic(PulsarAdmin admin, String topicPrefix, boolean isPersistent,
+ int partitions) throws Exception {
+ assertTrue(partitions > 0, "partitions must greater than 1");
+ String nsName = generateNamespaceName();
+ admin.namespaces().createNamespace("public/" + nsName);
+ String topicName = generateTopicName(nsName, topicPrefix, isPersistent);
+ admin.topics().createPartitionedTopic(topicName, partitions);
+ return topicName;
+ }
+}
diff --git a/tests/integration/src/main/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingTest.java b/tests/integration/src/main/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingTest.java
new file mode 100644
index 0000000000000..4b01b2a3fb57e
--- /dev/null
+++ b/tests/integration/src/main/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingTest.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.messaging;
+
+import static org.apache.pulsar.tests.integration.IntegTestUtils.getNonPartitionedTopic;
+import static org.apache.pulsar.tests.integration.IntegTestUtils.getPartitionedTopic;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.tests.integration.IntegTest;
+
+@Slf4j
+public class TopicMessagingTest extends IntegTest {
+
+
+ public TopicMessagingTest(PulsarClient client, PulsarAdmin admin) {
+ super(client, admin);
+ }
+
+ public void nonPartitionedTopicSendAndReceiveWithExclusive(boolean isPersistent) throws Exception {
+ log.info("-- Starting nonPartitionedTopicSendAndReceiveWithExclusive test --");
+ final String topicName = getNonPartitionedTopic(admin, "test-non-partitioned-consume-exclusive", isPersistent);
+ @Cleanup final Consumer consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+ try {
+ client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+ fail("should be failed");
+ } catch (PulsarClientException ignore) {
+ }
+ final int messagesToSend = 10;
+ final String producerName = "producerForExclusive";
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName(producerName)
+ .create();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+ log.info("public messages complete.");
+ receiveMessagesCheckOrderAndDuplicate(Collections.singletonList(consumer), messagesToSend);
+ log.info("-- Exiting nonPartitionedTopicSendAndReceiveWithExclusive test --");
+ }
+
+ public void partitionedTopicSendAndReceiveWithExclusive(boolean isPersistent) throws Exception {
+ log.info("-- Starting partitionedTopicSendAndReceiveWithExclusive test --");
+ final int partitions = 3;
+ String topicName = getPartitionedTopic(admin, "test-partitioned-consume-exclusive", isPersistent, partitions);
+ List> consumerList = new ArrayList<>(3);
+ for (int i = 0; i < partitions; i++) {
+ Consumer consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName + "-partition-" + i)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+ consumerList.add(consumer);
+ }
+ assertEquals(partitions, consumerList.size());
+ try {
+ client.newConsumer(Schema.STRING)
+ .topic(topicName + "-partition-" + 0)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+ fail("should be failed");
+ } catch (PulsarClientException ignore) {
+ }
+ final int messagesToSend = 9;
+ final String producerName = "producerForExclusive";
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName(producerName)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .create();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+ log.info("public messages complete.");
+ receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
+ // To simulate a consumer crashed
+ Consumer crashedConsumer = consumerList.remove(0);
+ crashedConsumer.close();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+ receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend - 3);
+ closeConsumers(consumerList);
+ log.info("-- Exiting partitionedTopicSendAndReceiveWithExclusive test --");
+ }
+
+ public void nonPartitionedTopicSendAndReceiveWithFailover(boolean isPersistent) throws Exception {
+ log.info("-- Starting nonPartitionedTopicSendAndReceiveWithFailover test --");
+ final String topicName = getNonPartitionedTopic(admin, "test-non-partitioned-consume-failover", isPersistent);
+ List> consumerList = new ArrayList<>(2);
+ final Consumer consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+ consumerList.add(consumer);
+ final Consumer standbyConsumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+ assertNotNull(standbyConsumer);
+ assertTrue(standbyConsumer.isConnected());
+ consumerList.add(standbyConsumer);
+ final int messagesToSend = 10;
+ final String producerName = "producerForFailover";
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName(producerName)
+ .create();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+ log.info("public messages complete.");
+ receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
+ // To simulate a consumer crashed
+ Consumer crashedConsumer = consumerList.remove(0);
+ // wait ack send
+ Thread.sleep(3000);
+ crashedConsumer.close();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+ receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
+ closeConsumers(consumerList);
+ log.info("-- Exiting nonPartitionedTopicSendAndReceiveWithFailover test --");
+ }
+
+ public void partitionedTopicSendAndReceiveWithFailover(boolean isPersistent) throws Exception {
+ log.info("-- Starting partitionedTopicSendAndReceiveWithFailover test --");
+ final int partitions = 3;
+ String topicName = getPartitionedTopic(admin, "test-partitioned-consume-failover", isPersistent, partitions);
+ List> consumerList = new ArrayList<>(3);
+ Consumer consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+ consumerList.add(consumer);
+ Consumer standbyConsumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+ assertNotNull(standbyConsumer);
+ assertTrue(standbyConsumer.isConnected());
+ consumerList.add(standbyConsumer);
+ assertEquals(consumerList.size(), 2);
+ final int messagesToSend = 9;
+ final String producerName = "producerForFailover";
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName(producerName)
+ .create();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+ log.info("public messages complete.");
+ receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
+ // To simulate a consumer crashed
+ Consumer crashedConsumer = consumerList.remove(0);
+ // wait ack send
+ Thread.sleep(3000);
+ crashedConsumer.close();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+ receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
+ closeConsumers(consumerList);
+ log.info("-- Exiting partitionedTopicSendAndReceiveWithFailover test --");
+ }
+
+ public void nonPartitionedTopicSendAndReceiveWithShared(boolean isPersistent) throws Exception {
+ log.info("-- Starting nonPartitionedTopicSendAndReceiveWithShared test --");
+ final String topicName = getNonPartitionedTopic(admin, "test-non-partitioned-consume-shared", isPersistent);
+ List> consumerList = new ArrayList<>(2);
+ final Consumer consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ consumerList.add(consumer);
+ Consumer moreConsumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ assertNotNull(moreConsumer);
+ assertTrue(moreConsumer.isConnected());
+ consumerList.add(moreConsumer);
+ final int messagesToSend = 10;
+ final String producerName = "producerForShared";
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName(producerName)
+ .create();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+ log.info("public messages complete.");
+ receiveMessagesCheckDuplicate(consumerList, messagesToSend);
+ // To simulate a consumer crashed
+ Consumer crashedConsumer = consumerList.remove(0);
+ crashedConsumer.close();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+ receiveMessagesCheckDuplicate(consumerList, messagesToSend);
+ closeConsumers(consumerList);
+ log.info("-- Exiting nonPartitionedTopicSendAndReceiveWithShared test --");
+ }
+
+ public void partitionedTopicSendAndReceiveWithShared(boolean isPersistent) throws Exception {
+ log.info("-- Starting partitionedTopicSendAndReceiveWithShared test --");
+ final int partitions = 3;
+ String topicName = getPartitionedTopic(admin, "test-partitioned-consume-shared", isPersistent, partitions);
+ List> consumerList = new ArrayList<>(3);
+ for (int i = 0; i < partitions; i++) {
+ Consumer consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ consumerList.add(consumer);
+ }
+ assertEquals(partitions, consumerList.size());
+ final int messagesToSend = 10;
+ final String producerName = "producerForFailover";
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName(producerName)
+ .create();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+ log.info("public messages complete.");
+ receiveMessagesCheckDuplicate(consumerList, messagesToSend);
+ // To simulate a consumer crashed
+ Consumer crashedConsumer = consumerList.remove(0);
+ crashedConsumer.close();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+ receiveMessagesCheckDuplicate(consumerList, messagesToSend);
+ closeConsumers(consumerList);
+ log.info("-- Exiting partitionedTopicSendAndReceiveWithShared test --");
+ }
+
+ public void nonPartitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent) throws Exception {
+ log.info("-- Starting nonPartitionedTopicSendAndReceiveWithKeyShared test --");
+ final String topicName = getNonPartitionedTopic(admin, "test-non-partitioned-consume-key-shared", isPersistent);
+ List> consumerList = new ArrayList<>(2);
+ Consumer consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ assertTrue(consumer.isConnected());
+ consumerList.add(consumer);
+ Consumer moreConsumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ assertNotNull(moreConsumer);
+ assertTrue(moreConsumer.isConnected());
+ consumerList.add(moreConsumer);
+ final int messagesToSend = 10;
+ final String producerName = "producerForKeyShared";
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName(producerName)
+ .create();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage()
+ .key(UUID.randomUUID().toString())
+ .value(producer.getProducerName() + "-" + i)
+ .send();
+ assertNotNull(messageId);
+ }
+ log.info("publish messages complete.");
+ receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
+ // To simulate a consumer crashed
+ Consumer crashedConsumer = consumerList.remove(0);
+ crashedConsumer.close();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage()
+ .key(UUID.randomUUID().toString())
+ .value(producer.getProducerName() + "-" + i)
+ .send();
+ assertNotNull(messageId);
+ }
+ receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
+ closeConsumers(consumerList);
+ log.info("-- Exiting nonPartitionedTopicSendAndReceiveWithKeyShared test --");
+ }
+
+ public void partitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent) throws Exception {
+ log.info("-- Starting partitionedTopicSendAndReceiveWithKeyShared test --");
+ final int partitions = 3;
+ String topicName = getPartitionedTopic(admin, "test-partitioned-consume-key-shared", isPersistent, partitions);
+ List> consumerList = new ArrayList<>(2);
+ Consumer consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ assertTrue(consumer.isConnected());
+ consumerList.add(consumer);
+ Consumer moreConsumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ assertNotNull(moreConsumer);
+ assertTrue(moreConsumer.isConnected());
+ consumerList.add(moreConsumer);
+ final int messagesToSend = 10;
+ final String producerName = "producerForKeyShared";
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName(producerName)
+ .create();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage()
+ .key(UUID.randomUUID().toString())
+ .value(producer.getProducerName() + "-" + i)
+ .send();
+ assertNotNull(messageId);
+ }
+ log.info("publish messages complete.");
+ receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
+ // To simulate a consumer crashed
+ Consumer crashedConsumer = consumerList.remove(0);
+ crashedConsumer.close();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage()
+ .key(UUID.randomUUID().toString())
+ .value(producer.getProducerName() + "-" + i)
+ .send();
+ assertNotNull(messageId);
+ }
+ receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
+ closeConsumers(consumerList);
+ log.info("-- Exiting partitionedTopicSendAndReceiveWithKeyShared test --");
+ }
+
+
+ protected static > void receiveMessagesCheckOrderAndDuplicate
+ (List> consumerList, int messagesToReceive) throws PulsarClientException {
+ Set messagesReceived = new HashSet<>();
+ for (Consumer consumer : consumerList) {
+ Message currentReceived;
+ Map> lastReceivedMap = new HashMap<>();
+ while (true) {
+ try {
+ currentReceived = consumer.receive(3, TimeUnit.SECONDS);
+ } catch (PulsarClientException e) {
+ log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
+ break;
+ }
+ // Make sure that messages are received in order
+ if (currentReceived != null) {
+ consumer.acknowledge(currentReceived);
+ if (lastReceivedMap.containsKey(currentReceived.getTopicName())) {
+ assertTrue(currentReceived.getMessageId().compareTo(
+ lastReceivedMap.get(currentReceived.getTopicName()).getMessageId()) > 0,
+ "Received messages are not in order.");
+ }
+ } else {
+ break;
+ }
+ lastReceivedMap.put(currentReceived.getTopicName(), currentReceived);
+ // Make sure that there are no duplicates
+ assertTrue(messagesReceived.add(currentReceived.getValue()),
+ "Received duplicate message " + currentReceived.getValue());
+ }
+ }
+ assertEquals(messagesToReceive, messagesReceived.size());
+ }
+
+ protected static void receiveMessagesCheckDuplicate
+ (List> consumerList, int messagesToReceive) throws PulsarClientException {
+ Set messagesReceived = new HashSet<>();
+ for (Consumer consumer : consumerList) {
+ Message currentReceived = null;
+ while (true) {
+ try {
+ currentReceived = consumer.receive(3, TimeUnit.SECONDS);
+ } catch (PulsarClientException e) {
+ log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
+ break;
+ }
+ if (currentReceived != null) {
+ consumer.acknowledge(currentReceived);
+ // Make sure that there are no duplicates
+ assertTrue(messagesReceived.add(currentReceived.getValue()),
+ "Received duplicate message " + currentReceived.getValue());
+ } else {
+ break;
+ }
+ }
+ }
+ assertEquals(messagesReceived.size(), messagesToReceive);
+ }
+
+ protected static void receiveMessagesCheckStickyKeyAndDuplicate
+ (List> consumerList, int messagesToReceive) throws PulsarClientException {
+ Map> consumerKeys = new HashMap<>();
+ Set messagesReceived = new HashSet<>();
+ for (Consumer consumer : consumerList) {
+ Message currentReceived;
+ while (true) {
+ try {
+ currentReceived = consumer.receive(3, TimeUnit.SECONDS);
+ } catch (PulsarClientException e) {
+ log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
+ break;
+ }
+ if (currentReceived != null) {
+ consumer.acknowledge(currentReceived);
+ assertNotNull(currentReceived.getKey());
+ consumerKeys.putIfAbsent(consumer.getConsumerName(), new HashSet<>());
+ consumerKeys.get(consumer.getConsumerName()).add(currentReceived.getKey());
+ // Make sure that there are no duplicates
+ assertTrue(messagesReceived.add(currentReceived.getValue()),
+ "Received duplicate message " + currentReceived.getValue());
+ } else {
+ break;
+ }
+ }
+ }
+ // Make sure key will not be distributed to multiple consumers (except null key)
+ Set allKeys = new HashSet<>();
+ consumerKeys.forEach((k, v) -> v.stream().filter(Objects::nonNull).forEach(key -> {
+ assertTrue(allKeys.add(key),
+ "Key " + key + " is distributed to multiple consumers");
+ }));
+ assertEquals(messagesReceived.size(), messagesToReceive);
+ }
+
+ protected static void closeConsumers(List> consumerList) throws PulsarClientException {
+ Iterator> iterator = consumerList.iterator();
+ while (iterator.hasNext()) {
+ iterator.next().close();
+ iterator.remove();
+ }
+ }
+
+}
diff --git a/tests/integration/src/main/java/org/apache/pulsar/tests/integration/messaging/package-info.java b/tests/integration/src/main/java/org/apache/pulsar/tests/integration/messaging/package-info.java
new file mode 100644
index 0000000000000..7d9f1b036c659
--- /dev/null
+++ b/tests/integration/src/main/java/org/apache/pulsar/tests/integration/messaging/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of policies.
+ */
+package org.apache.pulsar.tests.integration.messaging;
\ No newline at end of file
diff --git a/tests/integration/src/main/java/org/apache/pulsar/tests/integration/package-info.java b/tests/integration/src/main/java/org/apache/pulsar/tests/integration/package-info.java
new file mode 100644
index 0000000000000..68c0199d5f32f
--- /dev/null
+++ b/tests/integration/src/main/java/org/apache/pulsar/tests/integration/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of policies.
+ */
+package org.apache.pulsar.tests.integration;
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java
index 0d6b6f1abe4cf..e6a1a4583e269 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java
@@ -26,7 +26,7 @@
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +35,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-public class ClientToolTest extends TopicMessagingBase {
+public class ClientToolTest extends MessagingBase {
private static final int MESSAGE_COUNT = 10;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
index 8c4f3a137aa31..737a583665c23 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
@@ -23,13 +23,13 @@
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-public class PerfToolTest extends TopicMessagingBase {
+public class PerfToolTest extends MessagingBase {
private static final int MESSAGE_COUNT = 50;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index b78a832f60933..3579a5e48e876 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -2171,11 +2171,11 @@ private void checkSchemaForAutoSchema(Message message, String bas
}
}
- private PulsarClient getPulsarClient() throws PulsarClientException {
+ protected PulsarClient getPulsarClient() throws PulsarClientException {
return PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
}
- private PulsarAdmin getPulsarAdmin() throws PulsarClientException {
+ protected PulsarAdmin getPulsarAdmin() throws PulsarClientException {
return PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
index ddedacc531a7c..70f177155a46e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
@@ -46,7 +46,7 @@ public abstract class MessagingBase extends PulsarTestSuite {
protected String methodName;
@BeforeMethod(alwaysRun = true)
- public void beforeMethod(Method m) throws Exception {
+ public void beforeMethod(Method m) {
methodName = m.getName();
}
@@ -54,7 +54,7 @@ protected String getNonPartitionedTopic(String topicPrefix, boolean isPersistent
String nsName = generateNamespaceName();
pulsarCluster.createNamespace(nsName);
- return generateTopicName(nsName, topicPrefix, true);
+ return generateTopicName(nsName, topicPrefix, isPersistent);
}
protected String getPartitionedTopic(String topicPrefix, boolean isPersistent, int partitions) throws Exception {
@@ -62,7 +62,7 @@ protected String getPartitionedTopic(String topicPrefix, boolean isPersistent, i
String nsName = generateNamespaceName();
pulsarCluster.createNamespace(nsName);
- String topicName = generateTopicName(nsName, topicPrefix, true);
+ String topicName = generateTopicName(nsName, topicPrefix, isPersistent);
pulsarCluster.createPartitionedTopic(topicName, partitions);
return topicName;
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
index 618053ac000e2..1ec39ae8c75d6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
@@ -18,20 +18,23 @@
*/
package org.apache.pulsar.tests.integration.messaging;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.function.Supplier;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.common.naming.TopicDomain;
import org.testng.ITest;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
-public class MessagingSmokeTest extends TopicMessagingBase implements ITest {
+public class MessagingSmokeTest extends MessagingBase implements ITest {
+
+ TopicMessagingTest test;
@Factory
- public static Object[] messagingTests() {
+ public static Object[] messagingTests() throws IOException {
List> tests = List.of(
new MessagingSmokeTest("Extensible Load Manager",
Map.of("loadManagerClassName", ExtensibleLoadManagerImpl.class.getName(),
@@ -46,10 +49,16 @@ public static Object[] messagingTests() {
private final String name;
- public MessagingSmokeTest(String name, Map brokerEnvs) {
+ public MessagingSmokeTest(String name, Map brokerEnvs) throws IOException {
super();
this.brokerEnvs.putAll(brokerEnvs);
this.name = name;
+
+ }
+
+ @BeforeClass(alwaysRun = true)
+ public void setupTest() throws Exception {
+ this.test = new TopicMessagingTest(getPulsarClient(), getPulsarAdmin());
}
@Override
@@ -57,51 +66,51 @@ public String getTestName() {
return name;
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithExclusive(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testNonPartitionedTopicMessagingWithExclusive(TopicDomain topicDomain)
throws Exception {
- nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.nonPartitionedTopicSendAndReceiveWithExclusive(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithExclusive(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testPartitionedTopicMessagingWithExclusive(TopicDomain topicDomain)
throws Exception {
- partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.partitionedTopicSendAndReceiveWithExclusive(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithFailover(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testNonPartitionedTopicMessagingWithFailover(TopicDomain topicDomain)
throws Exception {
- nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.nonPartitionedTopicSendAndReceiveWithFailover(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithFailover(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testPartitionedTopicMessagingWithFailover(TopicDomain topicDomain)
throws Exception {
- partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.partitionedTopicSendAndReceiveWithFailover(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithShared(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testNonPartitionedTopicMessagingWithShared(TopicDomain topicDomain)
throws Exception {
- nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.nonPartitionedTopicSendAndReceiveWithShared(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithShared(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testPartitionedTopicMessagingWithShared(TopicDomain topicDomain)
throws Exception {
- partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.partitionedTopicSendAndReceiveWithShared(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testNonPartitionedTopicMessagingWithKeyShared(TopicDomain topicDomain)
throws Exception {
- nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.nonPartitionedTopicSendAndReceiveWithKeyShared(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testPartitionedTopicMessagingWithKeyShared(TopicDomain topicDomain)
throws Exception {
- partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.partitionedTopicSendAndReceiveWithKeyShared(TopicDomain.persistent.equals(topicDomain));
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
index 0b379af3957c7..dbbcc36883ea3 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
@@ -18,50 +18,57 @@
*/
package org.apache.pulsar.tests.integration.messaging;
-import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
-public class NonPersistentTopicMessagingTest extends TopicMessagingBase {
+public class NonPersistentTopicMessagingTest extends MessagingBase {
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), false);
+ TopicMessagingTest test;
+
+ @BeforeClass(alwaysRun = true)
+ public void setupTest() throws Exception {
+ this.test = new TopicMessagingTest(getPulsarClient(), getPulsarAdmin());
+ }
+
+ @Test
+ public void testNonPartitionedTopicMessagingWithExclusive() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithExclusive(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), false);
+ @Test
+ public void testPartitionedTopicMessagingWithExclusive() throws Exception {
+ test.partitionedTopicSendAndReceiveWithExclusive(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), false);
+ @Test
+ public void testNonPartitionedTopicMessagingWithFailover() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithFailover(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), false);
+ @Test
+ public void testPartitionedTopicMessagingWithFailover() throws Exception {
+ test.partitionedTopicSendAndReceiveWithFailover(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), false);
+ @Test
+ public void testNonPartitionedTopicMessagingWithShared() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithShared(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), false);
+ @Test
+ public void testPartitionedTopicMessagingWithShared() throws Exception {
+ test.partitionedTopicSendAndReceiveWithShared(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), false);
+ @Test
+ public void testNonPartitionedTopicMessagingWithKeyShared() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithKeyShared(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), false);
+ @Test
+ public void testPartitionedTopicMessagingWithKeyShared() throws Exception {
+ test.partitionedTopicSendAndReceiveWithKeyShared(false);
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java
index 5675aa88ec6cc..5e711e2b56f42 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java
@@ -18,51 +18,59 @@
*/
package org.apache.pulsar.tests.integration.messaging;
-import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
-public class PersistentTopicMessagingTest extends TopicMessagingBase {
+public class PersistentTopicMessagingTest extends MessagingBase {
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), true);
+ TopicMessagingTest test;
+
+
+ @BeforeClass(alwaysRun = true)
+ public void setupTest() throws Exception {
+ this.test = new TopicMessagingTest(getPulsarClient(), getPulsarAdmin());
+ }
+
+ @Test
+ public void testNonPartitionedTopicMessagingWithExclusive() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithExclusive(true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), true);
+ @Test
+ public void testPartitionedTopicMessagingWithExclusive() throws Exception {
+ test.partitionedTopicSendAndReceiveWithExclusive(true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), true);
+ @Test
+ public void testNonPartitionedTopicMessagingWithFailover() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithFailover(true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), true);
+ @Test
+ public void testPartitionedTopicMessagingWithFailover() throws Exception {
+ test.partitionedTopicSendAndReceiveWithFailover(true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), true);
+ @Test
+ public void testNonPartitionedTopicMessagingWithShared() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithShared(true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), true);
+ @Test
+ public void testPartitionedTopicMessagingWithShared() throws Exception {
+ test.partitionedTopicSendAndReceiveWithShared( true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), true);
+ @Test
+ public void testNonPartitionedTopicMessagingWithKeyShared() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithKeyShared( true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), true);
+ @Test
+ public void testPartitionedTopicMessagingWithKeyShared() throws Exception {
+ test.partitionedTopicSendAndReceiveWithKeyShared( true);
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
index 98000c6f40636..7591d3697d84d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
@@ -29,14 +29,14 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.annotations.Test;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-public class TestBrokerInterceptors extends TopicMessagingBase {
+public class TestBrokerInterceptors extends MessagingBase {
private static final String PREFIX = "PULSAR_PREFIX_";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestEntryFilters.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestEntryFilters.java
index f41551b248db0..5e544738868c1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestEntryFilters.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestEntryFilters.java
@@ -28,12 +28,12 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.function.Supplier;
-public class TestEntryFilters extends TopicMessagingBase {
+public class TestEntryFilters extends MessagingBase {
private static final String PREFIX = "PULSAR_PREFIX_";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
index 8b99f21373560..106ccf8cd9716 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
@@ -18,13 +18,17 @@
*/
package org.apache.pulsar.tests.integration.topologies;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.TopicDomain;
import org.testng.annotations.DataProvider;
@@ -34,12 +38,49 @@
@Slf4j
public abstract class PulsarClusterTestBase extends PulsarTestBase {
+
+ public final static String CLIENT_CONFIG_FILE_PATH_PROPERTY_NAME = "client.config.file.path";
+
protected final Map brokerEnvs = new HashMap<>();
protected final Map bookkeeperEnvs = new HashMap<>();
protected final Map proxyEnvs = new HashMap<>();
protected final List brokerAdditionalPorts = new LinkedList<>();
protected final List bookieAdditionalPorts = new LinkedList<>();
+
+
+ private Map readClientConfigs(String clientConfFilePath) throws IOException {
+ Properties prop = new Properties(System.getProperties());
+ try (FileInputStream input = new FileInputStream(clientConfFilePath)) {
+ prop.load(input);
+ }
+ Map map = new HashMap<>();
+ for (String key : prop.stringPropertyNames()) {
+ map.put(key, prop.get(key));
+ }
+
+ return map;
+ }
+
+ protected PulsarClient getPulsarClient() throws IOException {
+ var clientConfFilePath = System.getProperty(CLIENT_CONFIG_FILE_PATH_PROPERTY_NAME);
+
+ if (clientConfFilePath == null) {
+ return PulsarClient.builder().serviceUrl(getPulsarCluster().getPlainTextServiceUrl()).build();
+ }
+
+ return PulsarClient.builder().loadConf(readClientConfigs(clientConfFilePath)).build();
+ }
+
+ protected PulsarAdmin getPulsarAdmin() throws IOException {
+ var clientConfFilePath = System.getProperty(CLIENT_CONFIG_FILE_PATH_PROPERTY_NAME);
+
+ if (clientConfFilePath == null) {
+ return PulsarAdmin.builder().serviceHttpUrl(getPulsarCluster().getHttpServiceUrl()).build();
+ }
+ return PulsarAdmin.builder().loadConf(readClientConfigs(clientConfFilePath)).build();
+ }
+
@Override
protected final void setup() throws Exception {
setupCluster();
@@ -101,6 +142,18 @@ public Object[][] serviceUrlAndTopicDomain() {
};
}
+ @DataProvider(name = "topicDomain")
+ public Object[][] topicDomain() {
+ return new Object[][] {
+ {
+ TopicDomain.persistent
+ },
+ {
+ TopicDomain.non_persistent
+ },
+ };
+ }
+
protected PulsarAdmin pulsarAdmin;
protected PulsarCluster pulsarCluster;