Skip to content

Commit

Permalink
[ISSUE #1303]🚀Add QueryAssignmentRequestBody and QueryAssignmentRespo…
Browse files Browse the repository at this point in the history
…nseBody
  • Loading branch information
mxsm committed Nov 25, 2024
1 parent b45646d commit c53f376
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 1 deletion.
1 change: 1 addition & 0 deletions rocketmq-common/src/common/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub mod message_ext;
pub mod message_ext_broker_inner;
pub mod message_id;
pub mod message_queue;
pub mod message_queue_assignment;
pub mod message_single;

/// This module defines the `MessageTrait` trait, which provides a flexible interface for working
Expand Down
88 changes: 87 additions & 1 deletion rocketmq-common/src/common/message/message_enum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::fmt;

use serde::de;
use serde::de::Visitor;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;

#[derive(Debug, PartialEq, Copy, Clone, Default)]
pub enum MessageType {
#[default]
Expand Down Expand Up @@ -47,7 +56,7 @@ impl MessageType {
}
}

#[derive(Debug, PartialEq, Copy, Clone)]
#[derive(Debug, PartialEq, Copy, Clone, Hash, Eq)]
pub enum MessageRequestMode {
Pull,
Pop,
Expand All @@ -62,6 +71,48 @@ impl MessageRequestMode {
}
}

impl Serialize for MessageRequestMode {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(match *self {
MessageRequestMode::Pull => "PULL",
MessageRequestMode::Pop => "POP",
})
}
}

impl<'de> Deserialize<'de> for MessageRequestMode {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct MessageRequestModeVisitor;

impl Visitor<'_> for MessageRequestModeVisitor {
type Value = MessageRequestMode;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string representing a MessageRequestMode")
}

Check warning on line 98 in rocketmq-common/src/common/message/message_enum.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/message/message_enum.rs#L96-L98

Added lines #L96 - L98 were not covered by tests

fn visit_str<E>(self, value: &str) -> Result<MessageRequestMode, E>
where
E: de::Error,
{
match value {
"PULL" => Ok(MessageRequestMode::Pull),
"POP" => Ok(MessageRequestMode::Pop),
_ => Err(de::Error::unknown_variant(value, &["PULL", "POP"])),
}
}
}

deserializer.deserialize_str(MessageRequestModeVisitor)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -108,4 +159,39 @@ mod tests {
assert_eq!(MessageRequestMode::Pull.get_name(), "PULL");
assert_eq!(MessageRequestMode::Pop.get_name(), "POP");
}

#[test]
fn serialize_message_request_mode_pull() {
let mode = MessageRequestMode::Pull;
let serialized = serde_json::to_string(&mode).unwrap();
assert_eq!(serialized, "\"PULL\"");
}

#[test]
fn serialize_message_request_mode_pop() {
let mode = MessageRequestMode::Pop;
let serialized = serde_json::to_string(&mode).unwrap();
assert_eq!(serialized, "\"POP\"");
}

#[test]
fn deserialize_message_request_mode_pull() {
let json = "\"PULL\"";
let deserialized: MessageRequestMode = serde_json::from_str(json).unwrap();
assert_eq!(deserialized, MessageRequestMode::Pull);
}

#[test]
fn deserialize_message_request_mode_pop() {
let json = "\"POP\"";
let deserialized: MessageRequestMode = serde_json::from_str(json).unwrap();
assert_eq!(deserialized, MessageRequestMode::Pop);
}

#[test]
fn deserialize_message_request_mode_invalid() {
let json = "\"INVALID\"";
let deserialized: Result<MessageRequestMode, _> = serde_json::from_str(json);
assert!(deserialized.is_err());
}
}
57 changes: 57 additions & 0 deletions rocketmq-common/src/common/message/message_queue_assignment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;
use std::hash::Hash;
use std::hash::Hasher;

use cheetah_string::CheetahString;
use serde::Deserialize;
use serde::Serialize;

use crate::common::message::message_enum::MessageRequestMode;
use crate::common::message::message_queue::MessageQueue;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]

Check warning on line 28 in rocketmq-common/src/common/message/message_queue_assignment.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/message/message_queue_assignment.rs#L28

Added line #L28 was not covered by tests
#[serde(rename_all = "camelCase")]
pub struct MessageQueueAssignment {
pub message_queue: Option<MessageQueue>,
pub mode: MessageRequestMode,
pub attachments: Option<HashMap<CheetahString, CheetahString>>,
}

impl Hash for MessageQueueAssignment {
fn hash<H: Hasher>(&self, state: &mut H) {
self.message_queue.hash(state);
self.mode.hash(state);
if let Some(ref attachments) = self.attachments {
for (key, value) in attachments {
key.hash(state);
value.hash(state);
}
}
}

Check warning on line 46 in rocketmq-common/src/common/message/message_queue_assignment.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/message/message_queue_assignment.rs#L37-L46

Added lines #L37 - L46 were not covered by tests
}

impl Default for MessageQueueAssignment {
fn default() -> Self {
MessageQueueAssignment {
message_queue: None,
mode: MessageRequestMode::Pull,
attachments: None,
}
}

Check warning on line 56 in rocketmq-common/src/common/message/message_queue_assignment.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/message/message_queue_assignment.rs#L50-L56

Added lines #L50 - L56 were not covered by tests
}
2 changes: 2 additions & 0 deletions rocketmq-remoting/src/protocol/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub mod group_list;
pub mod kv_table;
pub mod pop_process_queue_info;
pub mod process_queue_info;
pub mod query_assignment_request_body;
pub mod query_assignment_response_body;
pub mod request;
pub mod response;
pub mod topic;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use cheetah_string::CheetahString;
use serde::Deserialize;
use serde::Serialize;

use crate::protocol::heartbeat::message_model::MessageModel;

#[derive(Debug, Clone, Serialize, Deserialize, Default)]

Check warning on line 23 in rocketmq-remoting/src/protocol/body/query_assignment_request_body.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/body/query_assignment_request_body.rs#L23

Added line #L23 was not covered by tests
#[serde(rename_all = "camelCase")]
pub struct QueryAssignmentRequestBody {
pub topic: CheetahString,
pub consumer_group: CheetahString,
pub client_id: CheetahString,
pub strategy_name: CheetahString,
pub message_model: MessageModel,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashSet;

use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
use serde::Deserialize;
use serde::Serialize;

#[derive(Debug, Clone, Serialize, Deserialize, Default)]

Check warning on line 23 in rocketmq-remoting/src/protocol/body/query_assignment_response_body.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/body/query_assignment_response_body.rs#L23

Added line #L23 was not covered by tests
#[serde(rename_all = "camelCase")]
pub struct QueryAssignmentResponseBody {
pub message_queue_assignments: HashSet<MessageQueueAssignment>,
}

0 comments on commit c53f376

Please sign in to comment.