From d0b4e782b019340d19c005209a8ff5a9627c7eb8 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 23 Aug 2022 15:49:16 -0700 Subject: [PATCH] Repro of https://github.com/datastax/pulsar/issues/112 --- .../integration/topics/TestTopicDeletion.java | 184 ++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java new file mode 100644 index 00000000000000..a28a8443dc1465 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.topics; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.fail; + +/** + * Test cases for compaction. + */ +@Slf4j +public class TestTopicDeletion extends PulsarTestSuite { + + final private boolean unload = false; + final private int numBrokers = 2; + + public void setupCluster() throws Exception { + brokerEnvs.put("managedLedgerMaxEntriesPerLedger", "10"); + brokerEnvs.put("brokerDeleteInactivePartitionedTopicMetadataEnabled", "false"); + brokerEnvs.put("brokerDeleteInactiveTopicsEnabled", "false"); + this.setupCluster(""); + } + + protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster( + String clusterName, + PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) { + specBuilder.numBrokers(numBrokers); + specBuilder.enableContainerLog(true); + return specBuilder; + } + + @Test(dataProvider = "ServiceUrls", timeOut=600_000) + public void testPartitionedTopicForceDeletion(Supplier serviceUrl) throws Exception { + + log.info("Creating tenant and namespace"); + + final String tenant = "test-partitioned-topic-" + randomName(4); + final String namespace = tenant + "/ns1"; + final String topic = "persistent://" + namespace + "/partitioned-topic"; + final int numPartitions = numBrokers * 3; + final int numKeys = numPartitions * 50; + final String subscriptionName = "sub1"; + + this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin"); + + this.createNamespace(namespace); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-clusters", "--clusters", pulsarCluster.getClusterName(), namespace); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-retention", "--size", "100M", "--time", "100m", namespace); + + this.createPartitionedTopic(topic, numPartitions); + + try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) { + + log.info("Creating consumer"); + Consumer consumer = client.newConsumer() + .topic(topic) + .subscriptionName(subscriptionName) + .subscribe(); + + log.info("Producing messages"); + try(Producer producer = client.newProducer() + .topic(topic) + .create() + ) { + for (int i = 0; i < numKeys; i++) { + producer.newMessage() + .key("" + i) + .value(("value-" + i).getBytes(UTF_8)) + .sendAsync(); + } + producer.flush(); + log.info("Successfully wrote {} values", numKeys); + } + + log.info("Consuming half of the messages"); + for (int i = 0; i < numKeys / 2; i++) { + Message m = consumer.receive(1, TimeUnit.MINUTES); + log.info("Read value {}", m.getKey()); + } + + if (unload) { + log.info("Unloading topic"); + pulsarCluster.runAdminCommandOnAnyBroker("topics", + "unload", topic); + } + + ContainerExecResult res; + log.info("Deleting the topic"); + res = pulsarCluster.runAdminCommandOnAnyBroker("topics", + "delete-partitioned-topic", "--force", topic); + consumer.close(); + assertEquals(0, res.getExitCode()); + + // parts of topic deletion are async, let them finish + Thread.sleep(5000); + + log.info("Deleting the topic again"); + try { + res = pulsarCluster.runAdminCommandOnAnyBroker("topics", + "delete-partitioned-topic", "--force", topic); + assertNotEquals(0, res.getExitCode()); + } catch (ContainerExecException e) { + log.info("Second delete failed with ContainerExecException, could be ok", e); + if (!e.getMessage().contains("with error code 1")) { + fail("Expected different error code"); + } + } + + // should succeed + log.info("Creating the topic again"); + this.createPartitionedTopic(topic, numPartitions); + } + } + + + private ContainerExecResult createTenantName(final String tenantName, + final String allowedClusterName, + final String adminRoleName) throws Exception { + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( + "tenants", "create", "--allowed-clusters", allowedClusterName, + "--admin-roles", adminRoleName, tenantName); + assertEquals(0, result.getExitCode()); + return result; + } + + private ContainerExecResult createNamespace(final String Ns) throws Exception { + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", + "create", + "--clusters", + pulsarCluster.getClusterName(), Ns); + assertEquals(0, result.getExitCode()); + return result; + } + + private ContainerExecResult createPartitionedTopic(final String partitionedTopicName, int numPartitions) + throws Exception { + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( + "topics", + "create-partitioned-topic", + "--partitions", "" + numPartitions, + partitionedTopicName); + assertEquals(0, result.getExitCode()); + return result; + } + + +}