diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java index 6002d1f5a4d..b256aa5d763 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -33,6 +34,7 @@ import org.apache.rocketmq.common.namesrv.NamesrvConfig; import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConcurrentTopicList; import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader; @@ -131,6 +133,24 @@ public void getAllTopicList() { assertThat(topicList.getTopicList()).contains("TestTopic", "TestTopic1", "TestTopic2"); } + @Test + public void hugeTopicListAddTest() { + TopicList topicList = new TopicList(); + ConcurrentTopicList concurrentTopicList = new ConcurrentTopicList(); + HashSet topics= new HashSet<>(); + for(int i=0; i< 100000 ; ++i) { + topics.add("Topic" + i); + } + long startTime = System.currentTimeMillis(); + topicList.getTopicList().addAll(topics); + long endTime = System.currentTimeMillis(); + assertThat(endTime - startTime < 1000).isTrue(); + startTime = System.currentTimeMillis(); + concurrentTopicList.getTopicList().addAll(topics); + endTime = System.currentTimeMillis(); + assertThat(endTime - startTime > 3000).isTrue(); + } + @Test public void registerBroker() { // Register master broker diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ConcurrentTopicList.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ConcurrentTopicList.java new file mode 100644 index 00000000000..4161c9c13a2 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ConcurrentTopicList.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.body; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class ConcurrentTopicList extends RemotingSerializable { + private Set topicList = new CopyOnWriteArraySet<>(); + private String brokerAddr; + + public Set getTopicList() { + return topicList; + } + + public void setTopicList(Set topicList) { + this.topicList = topicList; + } + + public String getBrokerAddr() { + return brokerAddr; + } + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java index 30edfb5a987..93770e9db1f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/TopicList.java @@ -16,12 +16,12 @@ */ package org.apache.rocketmq.remoting.protocol.body; +import java.util.HashSet; import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class TopicList extends RemotingSerializable { - private Set topicList = new CopyOnWriteArraySet<>(); + private Set topicList = new HashSet<>(); private String brokerAddr; public Set getTopicList() { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 40bd5d56d30..04b754c7d62 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -46,6 +46,7 @@ import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConcurrentTopicList; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; @@ -505,7 +506,7 @@ public TopicList queryTopicsByConsumer(String group) } @Override - public AdminToolResult queryTopicsByConsumerConcurrent(String group) { + public AdminToolResult queryTopicsByConsumerConcurrent(String group) { return defaultMQAdminExtImpl.queryTopicsByConsumerConcurrent(group); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 331b24d6068..ce568b98998 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -84,6 +84,7 @@ import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConcurrentTopicList; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; @@ -1084,7 +1085,7 @@ public TopicList queryTopicsByConsumer( } @Override - public AdminToolResult queryTopicsByConsumerConcurrent(final String group) { + public AdminToolResult queryTopicsByConsumerConcurrent(final String group) { return adminToolExecute(new AdminToolHandler() { @Override public AdminToolResult doExecute() throws Exception { @@ -1094,7 +1095,7 @@ public AdminToolResult doExecute() throws Exception { if (topicRouteData == null || CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) { return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, "router info not found."); } - final TopicList result = new TopicList(); + final ConcurrentTopicList result = new ConcurrentTopicList(); final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size()); for (final BrokerData bd : topicRouteData.getBrokerDatas()) { threadPoolExecutor.submit(new Runnable() { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 3148fc0987e..f043363b25f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -42,6 +42,7 @@ import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConcurrentTopicList; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; @@ -243,7 +244,7 @@ GroupList queryTopicConsumeByWho(final String topic) throws RemotingConnectExcep TopicList queryTopicsByConsumer( final String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException; - AdminToolResult queryTopicsByConsumerConcurrent(final String group); + AdminToolResult queryTopicsByConsumerConcurrent(final String group); SubscriptionData querySubscription(final String group, final String topic) throws InterruptedException, MQBrokerException, RemotingException, MQClientException;