Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1303]🚀Add QueryAssignmentRequestBody and QueryAssignmentResponseBody #1304

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rocketmq-common/src/common/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub mod message_ext;
pub mod message_ext_broker_inner;
pub mod message_id;
pub mod message_queue;
pub mod message_queue_assignment;
pub mod message_single;

/// This module defines the `MessageTrait` trait, which provides a flexible interface for working
Expand Down
88 changes: 87 additions & 1 deletion rocketmq-common/src/common/message/message_enum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::fmt;

use serde::de;
use serde::de::Visitor;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;

#[derive(Debug, PartialEq, Copy, Clone, Default)]
pub enum MessageType {
#[default]
Expand Down Expand Up @@ -47,7 +56,7 @@
}
}

#[derive(Debug, PartialEq, Copy, Clone)]
#[derive(Debug, PartialEq, Copy, Clone, Hash, Eq)]
pub enum MessageRequestMode {
Pull,
Pop,
Expand All @@ -62,6 +71,48 @@
}
}

impl Serialize for MessageRequestMode {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(match *self {
MessageRequestMode::Pull => "PULL",
MessageRequestMode::Pop => "POP",
})
}
}

impl<'de> Deserialize<'de> for MessageRequestMode {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct MessageRequestModeVisitor;

impl Visitor<'_> for MessageRequestModeVisitor {
type Value = MessageRequestMode;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string representing a MessageRequestMode")
}

Check warning on line 98 in rocketmq-common/src/common/message/message_enum.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/message/message_enum.rs#L96-L98

Added lines #L96 - L98 were not covered by tests

fn visit_str<E>(self, value: &str) -> Result<MessageRequestMode, E>
where
E: de::Error,
{
match value {
"PULL" => Ok(MessageRequestMode::Pull),
"POP" => Ok(MessageRequestMode::Pop),
_ => Err(de::Error::unknown_variant(value, &["PULL", "POP"])),
}
}
}

deserializer.deserialize_str(MessageRequestModeVisitor)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -108,4 +159,39 @@
assert_eq!(MessageRequestMode::Pull.get_name(), "PULL");
assert_eq!(MessageRequestMode::Pop.get_name(), "POP");
}

#[test]
fn serialize_message_request_mode_pull() {
let mode = MessageRequestMode::Pull;
let serialized = serde_json::to_string(&mode).unwrap();
assert_eq!(serialized, "\"PULL\"");
}

#[test]
fn serialize_message_request_mode_pop() {
let mode = MessageRequestMode::Pop;
let serialized = serde_json::to_string(&mode).unwrap();
assert_eq!(serialized, "\"POP\"");
}

#[test]
fn deserialize_message_request_mode_pull() {
let json = "\"PULL\"";
let deserialized: MessageRequestMode = serde_json::from_str(json).unwrap();
assert_eq!(deserialized, MessageRequestMode::Pull);
}

#[test]
fn deserialize_message_request_mode_pop() {
let json = "\"POP\"";
let deserialized: MessageRequestMode = serde_json::from_str(json).unwrap();
assert_eq!(deserialized, MessageRequestMode::Pop);
}

#[test]
fn deserialize_message_request_mode_invalid() {
let json = "\"INVALID\"";
let deserialized: Result<MessageRequestMode, _> = serde_json::from_str(json);
assert!(deserialized.is_err());
}
}
57 changes: 57 additions & 0 deletions rocketmq-common/src/common/message/message_queue_assignment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;
use std::hash::Hash;
use std::hash::Hasher;

use cheetah_string::CheetahString;
use serde::Deserialize;
use serde::Serialize;

use crate::common::message::message_enum::MessageRequestMode;
use crate::common::message::message_queue::MessageQueue;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]

Check warning on line 28 in rocketmq-common/src/common/message/message_queue_assignment.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/message/message_queue_assignment.rs#L28

Added line #L28 was not covered by tests
#[serde(rename_all = "camelCase")]
pub struct MessageQueueAssignment {
pub message_queue: Option<MessageQueue>,
pub mode: MessageRequestMode,
pub attachments: Option<HashMap<CheetahString, CheetahString>>,
}

impl Hash for MessageQueueAssignment {
fn hash<H: Hasher>(&self, state: &mut H) {
self.message_queue.hash(state);
self.mode.hash(state);
if let Some(ref attachments) = self.attachments {
for (key, value) in attachments {
key.hash(state);
value.hash(state);
}
}
}

Check warning on line 46 in rocketmq-common/src/common/message/message_queue_assignment.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/message/message_queue_assignment.rs#L37-L46

Added lines #L37 - L46 were not covered by tests
}

impl Default for MessageQueueAssignment {
fn default() -> Self {
MessageQueueAssignment {
message_queue: None,
mode: MessageRequestMode::Pull,
attachments: None,
}
}

Check warning on line 56 in rocketmq-common/src/common/message/message_queue_assignment.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/message/message_queue_assignment.rs#L50-L56

Added lines #L50 - L56 were not covered by tests
}
2 changes: 2 additions & 0 deletions rocketmq-remoting/src/protocol/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub mod group_list;
pub mod kv_table;
pub mod pop_process_queue_info;
pub mod process_queue_info;
pub mod query_assignment_request_body;
pub mod query_assignment_response_body;
pub mod request;
pub mod response;
pub mod topic;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use cheetah_string::CheetahString;
use serde::Deserialize;
use serde::Serialize;

use crate::protocol::heartbeat::message_model::MessageModel;

#[derive(Debug, Clone, Serialize, Deserialize, Default)]

Check warning on line 23 in rocketmq-remoting/src/protocol/body/query_assignment_request_body.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/body/query_assignment_request_body.rs#L23

Added line #L23 was not covered by tests
#[serde(rename_all = "camelCase")]
pub struct QueryAssignmentRequestBody {
pub topic: CheetahString,
pub consumer_group: CheetahString,
pub client_id: CheetahString,
pub strategy_name: CheetahString,
pub message_model: MessageModel,
}
Comment on lines +23 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add documentation and consider implementing additional functionality

The structure definition needs the following improvements:

  1. Add documentation comments (rustdoc) for the structure and its fields
  2. Consider adding field validation
  3. Consider implementing additional traits or methods if needed

Add documentation like this:

+/// Request body for querying message queue assignments
+/// 
+/// This structure is used in the RocketMQ protocol to request assignment information
+/// for a specific topic and consumer group combination.
 #[derive(Debug, Clone, Serialize, Deserialize, Default)]
 #[serde(rename_all = "camelCase")]
 pub struct QueryAssignmentRequestBody {
+    /// The topic name for which assignments are being queried
     pub topic: CheetahString,
+    /// The consumer group requesting the assignments
     pub consumer_group: CheetahString,
+    /// Unique identifier of the client making the request
     pub client_id: CheetahString,
+    /// Name of the assignment strategy to be used
     pub strategy_name: CheetahString,
+    /// Message model type for the assignments
     pub message_model: MessageModel,
 }

Consider adding unit tests:

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_query_assignment_request_body_serialization() {
        // Add serialization/deserialization tests
    }
}

Comment on lines +1 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Request and response bodies are properly structured but processor implementation is incomplete

The QueryAssignmentRequestBody and QueryAssignmentResponseBody are well-defined, but the processor implementation in rocketmq-broker/src/processor/query_assignment_processor.rs is incomplete with a todo!() macro. This needs to be implemented before the feature can be used.

  • rocketmq-broker/src/processor/query_assignment_processor.rs: Processor implementation is a stub with todo!()
  • rocketmq-remoting/src/code/request_code.rs: Request code 400 is properly registered for QueryAssignment
  • rocketmq-remoting/src/protocol/body/query_assignment_request_body.rs: Request body structure is complete
  • rocketmq-remoting/src/protocol/body/query_assignment_response_body.rs: Response body structure is complete
🔗 Analysis chain

Verify integration with QueryAssignmentResponseBody

This request body should have a corresponding response body structure and proper integration points.


Let me check the response body implementation to ensure proper integration.


Let me check one more thing to verify if these types are properly integrated into the protocol/command system.


Let me check one final thing to verify the query assignment processor implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the complete query assignment feature implementation

# Find the corresponding response body
fd -e rs "query_assignment_response_body"

# Check for handler/processor implementations
rg -l "QueryAssignmentRequestBody|QueryAssignmentResponseBody" --type rust

# Look for test files covering this functionality
fd -e rs "test.*query.*assignment"

Length of output: 364


Script:

#!/bin/bash
# Check the response body implementation
cat rocketmq-remoting/src/protocol/body/query_assignment_response_body.rs

# Check if these types are used in any request/response handling code
rg -A 5 "QueryAssignmentRequestBody|QueryAssignmentResponseBody" --type rust

Length of output: 2282


Script:

#!/bin/bash
# Check for command code and request/response type registration
rg -l "RemotingCommand|RequestCode" --type rust

# Then check the content of potential command registration files
ast-grep --pattern 'RequestCode::$_'

Length of output: 31228


Script:

#!/bin/bash
# Check the query assignment processor implementation
cat rocketmq-broker/src/processor/query_assignment_processor.rs

# Check if QueryAssignment request code is properly registered
rg -A 5 "RequestCode::QueryAssignment" --type rust

Length of output: 1907

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashSet;

use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
use serde::Deserialize;
use serde::Serialize;

#[derive(Debug, Clone, Serialize, Deserialize, Default)]

Check warning on line 23 in rocketmq-remoting/src/protocol/body/query_assignment_response_body.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/body/query_assignment_response_body.rs#L23

Added line #L23 was not covered by tests
#[serde(rename_all = "camelCase")]
pub struct QueryAssignmentResponseBody {
pub message_queue_assignments: HashSet<MessageQueueAssignment>,
}
Loading