diff --git a/rocketmq-broker/src/broker_error.rs b/rocketmq-broker/src/broker_error.rs index d6cfc35e..a0fda721 100644 --- a/rocketmq-broker/src/broker_error.rs +++ b/rocketmq-broker/src/broker_error.rs @@ -35,6 +35,32 @@ pub enum BrokerError { ClientError(#[from] rocketmq_client_rust::client_error::MQClientError), } +impl From for rocketmq_remoting::remoting_error::RemotingError { + fn from(value: BrokerError) -> Self { + match value { + BrokerError::BrokerRemotingError(e) => e, + BrokerError::BrokerCommonError(e) => { + rocketmq_remoting::remoting_error::RemotingError::RemoteError(format!("{}", e)) + } + BrokerError::MQBrokerError(code, message, _) => { + rocketmq_remoting::remoting_error::RemotingError::RemoteError(format!( + "CODE:{}, Message:{}", + code, message + )) + } + BrokerError::IllegalArgumentError(e) => { + rocketmq_remoting::remoting_error::RemotingError::RemoteError(e) + } + BrokerError::ClientError(e) => { + rocketmq_remoting::remoting_error::RemotingError::RemotingCommandError(format!( + "{}", + e + )) + } + } + } +} + #[cfg(test)] mod tests { use rocketmq_remoting::remoting_error::RemotingError; diff --git a/rocketmq-broker/src/processor.rs b/rocketmq-broker/src/processor.rs index 8f8e1385..02576977 100644 --- a/rocketmq-broker/src/processor.rs +++ b/rocketmq-broker/src/processor.rs @@ -117,9 +117,10 @@ where | RequestCode::SendMessageV2 | RequestCode::SendBatchMessage | RequestCode::ConsumerSendMsgBack => { - self.send_message_processor + return self + .send_message_processor .process_request(channel, ctx, request_code, request) - .await + .await; } RequestCode::SendReplyMessage | RequestCode::SendReplyMessageV2 => { diff --git a/rocketmq-broker/src/processor/send_message_processor.rs b/rocketmq-broker/src/processor/send_message_processor.rs index 8a5d33d5..6fa66069 100644 --- a/rocketmq-broker/src/processor/send_message_processor.rs +++ b/rocketmq-broker/src/processor/send_message_processor.rs @@ -110,13 +110,13 @@ where ctx: ConnectionHandlerContext, request_code: RequestCode, request: RemotingCommand, - ) -> Option { + ) -> rocketmq_remoting::Result> { match request_code { RequestCode::ConsumerSendMsgBack => { //need to optimize self.inner .consumer_send_msg_back(&channel, &ctx, &request) - .unwrap() + .map_err(Into::into) } _ => { let mut request_header = parse_request_header(&request, request_code).unwrap(); //need to optimize @@ -129,7 +129,7 @@ where &mapping_context, ); if let Some(rewrite_result) = rewrite_result { - return Some(rewrite_result); + return Ok(Some(rewrite_result)); } let send_message_context = @@ -145,28 +145,30 @@ where }; if request_header.batch.is_none() || !request_header.batch.unwrap() { //handle single message - self.send_message( - &channel, - &ctx, - request, - send_message_context, - request_header, - mapping_context, - execute_send_message_hook_after, - ) - .await + Ok(self + .send_message( + &channel, + &ctx, + request, + send_message_context, + request_header, + mapping_context, + execute_send_message_hook_after, + ) + .await) } else { //handle batch message - self.send_batch_message( - &channel, - &ctx, - request, - send_message_context, - request_header, - mapping_context, - execute_send_message_hook_after, - ) - .await + Ok(self + .send_batch_message( + &channel, + &ctx, + request, + send_message_context, + request_header, + mapping_context, + execute_send_message_hook_after, + ) + .await) } } }