From 74a660ce573d1936c1327c7efac9f075a4272782 Mon Sep 17 00:00:00 2001 From: absolute8511 Date: Thu, 5 Dec 2024 17:04:16 +0800 Subject: [PATCH] feat: change some SYSTEM_ERROR to more meaningful error code Change-Id: I4b6ffa5aa18325eeadc29941c5788244c2770423 --- .../AbstractSendMessageProcessor.java | 4 ++-- .../processor/AdminBrokerProcessor.java | 22 +++++++++---------- .../processor/ConsumerManageProcessor.java | 4 ++-- .../processor/NotificationProcessor.java | 2 +- .../processor/PeekMessageProcessor.java | 2 +- .../processor/PollingInfoProcessor.java | 2 +- .../broker/processor/PopMessageProcessor.java | 4 ++-- .../processor/PullMessageProcessor.java | 2 +- .../remoting/protocol/ResponseCode.java | 2 ++ 9 files changed, 23 insertions(+), 21 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index ba2d1b5f320..39befedaa22 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -467,7 +467,7 @@ protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(requestHeader.getTopic()); if (!result.isValid()) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(result.getRemark()); return response; } @@ -522,7 +522,7 @@ protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, RemotingHelper.parseChannelRemoteAddr(ctx.channel())); LOGGER.warn(errorInfo); - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(errorInfo); return response; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4c341dde920..55887b3588c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -424,7 +424,7 @@ private RemotingCommand getSubscriptionGroup(ChannelHandlerContext ctx, SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getGroup()); if (groupConfig == null) { LOGGER.error("No group in this broker, client: {} group: {}", ctx.channel().remoteAddress(), requestHeader.getGroup()); - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark("No group in this broker"); return response; } @@ -513,13 +513,13 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext try { TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic); if (!result.isValid()) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(result.getRemark()); return response; } if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) { if (TopicValidator.isSystemTopic(topic)) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark("The topic[" + topic + "] is conflict with system topic."); return response; } @@ -540,7 +540,7 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN + TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName(); String msgTypeAttrValue = topicConfig.getAttributes().get(msgTypeAttrKey); if (msgTypeAttrValue != null && msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark("MIXED message type is not supported."); return response; } @@ -603,13 +603,13 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont String topic = topicConfig.getTopicName(); TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic); if (!result.isValid()) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(result.getRemark()); return response; } if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) { if (TopicValidator.isSystemTopic(topic)) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark("The topic[" + topic + "] is conflict with system topic."); return response; } @@ -619,7 +619,7 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN + TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName(); String msgTypeAttrValue = topicConfig.getAttributes().get(msgTypeAttrKey); if (msgTypeAttrValue != null && msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark("MIXED message type is not supported."); return response; } @@ -673,13 +673,13 @@ private synchronized RemotingCommand updateAndCreateStaticTopic(ChannelHandlerCo TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic); if (!result.isValid()) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(result.getRemark()); return response; } if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) { if (TopicValidator.isSystemTopic(topic)) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark("The topic[" + topic + "] is conflict with system topic."); return response; } @@ -720,14 +720,14 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, String topic = requestHeader.getTopic(); if (UtilAll.isBlank(topic)) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark("The specified topic is blank."); return response; } if (brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) { if (TopicValidator.isSystemTopic(topic)) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark("The topic[" + topic + "] is conflict with system topic."); return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 9b3ef603de7..dfa755d7c44 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -177,13 +177,13 @@ private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, Remoting } if (queueId == null) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark("QueueId is null, topic is " + topic); return response; } if (offset == null) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark("Offset is null, topic is " + topic); return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index b4ebd9c4a99..6317d6ad7d2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -112,7 +112,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); POP_LOGGER.warn(errorInfo); - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(errorInfo); return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java index 8473e3a2865..40117b74a54 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java @@ -114,7 +114,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); LOG.warn(errorInfo); - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(errorInfo); return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java index 65a4d7d7851..f7baac144e6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java @@ -89,7 +89,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); POP_LOGGER.warn(errorInfo); - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(errorInfo); return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index e0454afa3ca..05efc14b7b4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -252,7 +252,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC return response; } if (requestHeader.getMaxMsgNums() > 32) { - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(String.format("the broker[%s] pop message's num is greater than 32", this.brokerController.getBrokerConfig().getBrokerIP1())); return response; @@ -288,7 +288,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); POP_LOGGER.warn(errorInfo); - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(errorInfo); return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 2ad2c9e93e4..5b11bc2fef4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -371,7 +371,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); LOGGER.warn(errorInfo); - response.setCode(ResponseCode.SYSTEM_ERROR); + response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(errorInfo); return response; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java index b19355487e5..e2ce81d95b9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java @@ -55,6 +55,8 @@ public class ResponseCode extends RemotingSysResponseCode { public static final int FILTER_DATA_NOT_LATEST = 28; + public static final int INVALID_PARAMETER = 29; + public static final int TRANSACTION_SHOULD_COMMIT = 200; public static final int TRANSACTION_SHOULD_ROLLBACK = 201;