diff --git a/rocketmq-common/src/common/message.rs b/rocketmq-common/src/common/message.rs index 2174b7fa..2c806541 100644 --- a/rocketmq-common/src/common/message.rs +++ b/rocketmq-common/src/common/message.rs @@ -37,6 +37,7 @@ pub mod message_ext; pub mod message_ext_broker_inner; pub mod message_id; pub mod message_queue; +pub mod message_queue_assignment; pub mod message_single; /// This module defines the `MessageTrait` trait, which provides a flexible interface for working diff --git a/rocketmq-common/src/common/message/message_enum.rs b/rocketmq-common/src/common/message/message_enum.rs index c247db9f..8ecd6645 100644 --- a/rocketmq-common/src/common/message/message_enum.rs +++ b/rocketmq-common/src/common/message/message_enum.rs @@ -14,6 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::fmt; + +use serde::de; +use serde::de::Visitor; +use serde::Deserialize; +use serde::Deserializer; +use serde::Serialize; +use serde::Serializer; + #[derive(Debug, PartialEq, Copy, Clone, Default)] pub enum MessageType { #[default] @@ -47,7 +56,7 @@ impl MessageType { } } -#[derive(Debug, PartialEq, Copy, Clone)] +#[derive(Debug, PartialEq, Copy, Clone, Hash, Eq)] pub enum MessageRequestMode { Pull, Pop, @@ -62,6 +71,48 @@ impl MessageRequestMode { } } +impl Serialize for MessageRequestMode { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(match *self { + MessageRequestMode::Pull => "PULL", + MessageRequestMode::Pop => "POP", + }) + } +} + +impl<'de> Deserialize<'de> for MessageRequestMode { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct MessageRequestModeVisitor; + + impl Visitor<'_> for MessageRequestModeVisitor { + type Value = MessageRequestMode; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a string representing a MessageRequestMode") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + match value { + "PULL" => Ok(MessageRequestMode::Pull), + "POP" => Ok(MessageRequestMode::Pop), + _ => Err(de::Error::unknown_variant(value, &["PULL", "POP"])), + } + } + } + + deserializer.deserialize_str(MessageRequestModeVisitor) + } +} + #[cfg(test)] mod tests { use super::*; @@ -108,4 +159,39 @@ mod tests { assert_eq!(MessageRequestMode::Pull.get_name(), "PULL"); assert_eq!(MessageRequestMode::Pop.get_name(), "POP"); } + + #[test] + fn serialize_message_request_mode_pull() { + let mode = MessageRequestMode::Pull; + let serialized = serde_json::to_string(&mode).unwrap(); + assert_eq!(serialized, "\"PULL\""); + } + + #[test] + fn serialize_message_request_mode_pop() { + let mode = MessageRequestMode::Pop; + let serialized = serde_json::to_string(&mode).unwrap(); + assert_eq!(serialized, "\"POP\""); + } + + #[test] + fn deserialize_message_request_mode_pull() { + let json = "\"PULL\""; + let deserialized: MessageRequestMode = serde_json::from_str(json).unwrap(); + assert_eq!(deserialized, MessageRequestMode::Pull); + } + + #[test] + fn deserialize_message_request_mode_pop() { + let json = "\"POP\""; + let deserialized: MessageRequestMode = serde_json::from_str(json).unwrap(); + assert_eq!(deserialized, MessageRequestMode::Pop); + } + + #[test] + fn deserialize_message_request_mode_invalid() { + let json = "\"INVALID\""; + let deserialized: Result = serde_json::from_str(json); + assert!(deserialized.is_err()); + } } diff --git a/rocketmq-common/src/common/message/message_queue_assignment.rs b/rocketmq-common/src/common/message/message_queue_assignment.rs new file mode 100644 index 00000000..b4c5a2e5 --- /dev/null +++ b/rocketmq-common/src/common/message/message_queue_assignment.rs @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashMap; +use std::hash::Hash; +use std::hash::Hasher; + +use cheetah_string::CheetahString; +use serde::Deserialize; +use serde::Serialize; + +use crate::common::message::message_enum::MessageRequestMode; +use crate::common::message::message_queue::MessageQueue; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct MessageQueueAssignment { + pub message_queue: Option, + pub mode: MessageRequestMode, + pub attachments: Option>, +} + +impl Hash for MessageQueueAssignment { + fn hash(&self, state: &mut H) { + self.message_queue.hash(state); + self.mode.hash(state); + if let Some(ref attachments) = self.attachments { + for (key, value) in attachments { + key.hash(state); + value.hash(state); + } + } + } +} + +impl Default for MessageQueueAssignment { + fn default() -> Self { + MessageQueueAssignment { + message_queue: None, + mode: MessageRequestMode::Pull, + attachments: None, + } + } +} diff --git a/rocketmq-remoting/src/protocol/body.rs b/rocketmq-remoting/src/protocol/body.rs index 35ac52e5..2cdb58dd 100644 --- a/rocketmq-remoting/src/protocol/body.rs +++ b/rocketmq-remoting/src/protocol/body.rs @@ -30,6 +30,8 @@ pub mod group_list; pub mod kv_table; pub mod pop_process_queue_info; pub mod process_queue_info; +pub mod query_assignment_request_body; +pub mod query_assignment_response_body; pub mod request; pub mod response; pub mod topic; diff --git a/rocketmq-remoting/src/protocol/body/query_assignment_request_body.rs b/rocketmq-remoting/src/protocol/body/query_assignment_request_body.rs new file mode 100644 index 00000000..5fa6dd53 --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/query_assignment_request_body.rs @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use cheetah_string::CheetahString; +use serde::Deserialize; +use serde::Serialize; + +use crate::protocol::heartbeat::message_model::MessageModel; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct QueryAssignmentRequestBody { + pub topic: CheetahString, + pub consumer_group: CheetahString, + pub client_id: CheetahString, + pub strategy_name: CheetahString, + pub message_model: MessageModel, +} diff --git a/rocketmq-remoting/src/protocol/body/query_assignment_response_body.rs b/rocketmq-remoting/src/protocol/body/query_assignment_response_body.rs new file mode 100644 index 00000000..70f783ab --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/query_assignment_response_body.rs @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashSet; + +use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct QueryAssignmentResponseBody { + pub message_queue_assignments: HashSet, +}