Skip to content

Commit

Permalink
[ISSUE #9152] Broker getConsumeStats supports inputting multiple topi…
Browse files Browse the repository at this point in the history
…cs (#9153)

* [ISSUE #9152] The getConsumeStats supports inputting multiple topics
  • Loading branch information
qianye1001 authored Feb 5, 2025
1 parent ff94b99 commit 8086fc5
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1947,37 +1947,21 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
try {
final GetConsumeStatsRequestHeader requestHeader = request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
ConsumeStats consumeStats = new ConsumeStats();
List<String> topicListProvided = requestHeader.fetchTopicList();
String topicProvided = requestHeader.getTopic();
String group = requestHeader.getConsumerGroup();

Set<String> topics = new HashSet<>();
if (UtilAll.isBlank(requestHeader.getTopic())) {
topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
} else {
topics.add(requestHeader.getTopic());
}
ConsumeStats consumeStats = new ConsumeStats();
Set<String> topicsForCollecting = getTopicsForCollectingConsumeStats(topicListProvided, topicProvided, group);

for (String topic : topics) {
for (String topic : topicsForCollecting) {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
LOGGER.warn("AdminBrokerProcessor#getConsumeStats: topic config does not exist, topic={}", topic);
continue;
}

TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);

{
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);

if (null == findSubscriptionData
&& this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
LOGGER.warn(
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, "
+ "topic={}, consumer group={}", topic, requestHeader.getConsumerGroup());
continue;
}
}

for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
Expand Down Expand Up @@ -2038,6 +2022,38 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
return response;
}

private Set<String> getTopicsForCollectingConsumeStats(List<String> topicListProvided, String topicProvided,
String group) {
Set<String> topicsForCollecting = new HashSet<>();
if (!topicListProvided.isEmpty()) {
// if topic list is provided, only collect the topics in the list
// and ignore subscription check
topicsForCollecting.addAll(topicListProvided);
} else {
// In order to be compatible with the old logic,
// even if the topic has been provided here, the subscription will be checked.
if (UtilAll.isBlank(topicProvided)) {
topicsForCollecting.addAll(
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group));
} else {
topicsForCollecting.add(topicProvided);
}
int subscriptionCount = this.brokerController.getConsumerManager().findSubscriptionDataCount(group);
Iterator<String> iterator = topicsForCollecting.iterator();
while (iterator.hasNext()) {
String topic = iterator.next();
SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
if (findSubscriptionData == null && subscriptionCount > 0) {
LOGGER.warn(
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, topic={}, consumer group={}",
topic, group);
iterator.remove();
}
}
}
return topicsForCollecting;
}

private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1748,16 +1748,27 @@ public TopicStatsTable getTopicStatsInfo(final String addr, final String topic,
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
return getConsumeStats(addr, consumerGroup, null, null, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final List<String> topicList,
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return getConsumeStats(addr, consumerGroup, null, topicList, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
final long timeoutMillis)
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return getConsumeStats(addr, consumerGroup, topic, null, timeoutMillis);
}

public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
final List<String> topicList, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(topic);
requestHeader.updateTopicList(topicList);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RequestCode;

@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action = Action.GET)
@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, resource = ResourceType.CLUSTER, action = Action.GET)
public class ExportRocksDBConfigToJsonRequestHeader implements CommandCustomHeader {
private static final String CONFIG_TYPE_SEPARATOR = ";";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,62 @@
package org.apache.rocketmq.remoting.protocol.header;

import com.google.common.base.MoreObjects;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.common.resource.RocketMQResource;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;

@RocketMQAction(value = RequestCode.GET_CONSUME_STATS, action = Action.GET)
public class GetConsumeStatsRequestHeader extends TopicRequestHeader {
private static final String TOPIC_NAME_SEPARATOR = ";";

@CFNotNull
@RocketMQResource(ResourceType.GROUP)
private String consumerGroup;

@RocketMQResource(ResourceType.TOPIC)
private String topic;

// if topicList is provided, topic will be ignored
@RocketMQResource(value = ResourceType.TOPIC, splitter = TOPIC_NAME_SEPARATOR)
private String topicList;

@Override
public void checkFields() throws RemotingCommandException {
}

public List<String> fetchTopicList() {
if (StringUtils.isBlank(topicList)) {
return Collections.emptyList();
}
return Arrays.asList(StringUtils.split(topicList, TOPIC_NAME_SEPARATOR));
}

public void updateTopicList(List<String> topicList) {
if (topicList == null || topicList.isEmpty()) {
return;
}
StringBuilder sb = new StringBuilder();
topicList.forEach(topic -> sb.append(topic).append(TOPIC_NAME_SEPARATOR));
this.setTopicList(sb.toString());
}

public String getTopicList() {
return topicList;
}

public void setTopicList(String topicList) {
this.topicList = topicList;
}

public String getConsumerGroup() {
return consumerGroup;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol.header;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

public class GetConsumeStatsRequestHeaderTest {

private GetConsumeStatsRequestHeader header;

@Before
public void setUp() {
header = new GetConsumeStatsRequestHeader();
}

@Test
public void updateTopicList_NullTopicList_DoesNotUpdate() {
header.updateTopicList(null);
assertNull(header.getTopicList());
}

@Test
public void updateTopicList_EmptyTopicList_SetsEmptyString() {
header.updateTopicList(Collections.emptyList());
assertNull(header.getTopicList());
}

@Test
public void updateTopicList_SingleTopic_SetsSingleTopicString() {
List<String> topicList = Collections.singletonList("TopicA");
header.updateTopicList(topicList);
assertEquals("TopicA;", header.getTopicList());
}

@Test
public void updateTopicList_MultipleTopics_SetsMultipleTopicsString() {
List<String> topicList = Arrays.asList("TopicA", "TopicB", "TopicC");
header.updateTopicList(topicList);
assertEquals("TopicA;TopicB;TopicC;", header.getTopicList());
}

@Test
public void updateTopicList_RepeatedTopics_SetsRepeatedTopicsString() {
List<String> topicList = Arrays.asList("TopicA", "TopicA", "TopicB");
header.updateTopicList(topicList);
assertEquals("TopicA;TopicA;TopicB;", header.getTopicList());
}

@Test
public void fetchTopicList_NullTopicList_ReturnsEmptyList() {
header.setTopicList(null);
List<String> topicList = header.fetchTopicList();
assertEquals(Collections.emptyList(), topicList);

header.updateTopicList(new ArrayList<>());
topicList = header.fetchTopicList();
assertEquals(Collections.emptyList(), topicList);
}

@Test
public void fetchTopicList_EmptyTopicList_ReturnsEmptyList() {
header.setTopicList("");
List<String> topicList = header.fetchTopicList();
assertEquals(Collections.emptyList(), topicList);
}

@Test
public void fetchTopicList_BlankTopicList_ReturnsEmptyList() {
header.setTopicList(" ");
List<String> topicList = header.fetchTopicList();
assertEquals(Collections.emptyList(), topicList);
}

@Test
public void fetchTopicList_SingleTopic_ReturnsSingleTopicList() {
header.setTopicList("TopicA");
List<String> topicList = header.fetchTopicList();
assertEquals(Collections.singletonList("TopicA"), topicList);
}

@Test
public void fetchTopicList_MultipleTopics_ReturnsTopicList() {
header.setTopicList("TopicA;TopicB;TopicC");
List<String> topicList = header.fetchTopicList();
assertEquals(Arrays.asList("TopicA", "TopicB", "TopicC"), topicList);
}

@Test
public void fetchTopicList_TopicListEndsWithSeparator_ReturnsTopicList() {
header.setTopicList("TopicA;TopicB;");
List<String> topicList = header.fetchTopicList();
assertEquals(Arrays.asList("TopicA", "TopicB"), topicList);
}

@Test
public void fetchTopicList_TopicListStartsWithSeparator_ReturnsTopicList() {
header.setTopicList(";TopicA;TopicB");
List<String> topicList = header.fetchTopicList();
assertEquals(Arrays.asList("TopicA", "TopicB"), topicList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ public void testMessageTrackDetail() throws InterruptedException, RemotingExcept
connection.setConnectionSet(connections);
when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(connection);
ConsumeStats consumeStats = new ConsumeStats();
when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), isNull(), anyLong())).thenReturn(consumeStats);
when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), (String) isNull(), anyLong())).thenReturn(consumeStats);
List<MessageTrack> broadcastMessageTracks = defaultMQAdminExt.messageTrackDetail(messageExt);
assertThat(broadcastMessageTracks.size()).isEqualTo(2);
assertThat(broadcastMessageTracks.get(0).getTrackType()).isEqualTo(TrackType.CONSUME_BROADCASTING);
Expand Down

0 comments on commit 8086fc5

Please sign in to comment.