Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1500]♻️Refactor SendMessageRequestHeaderV2🍻 #1501

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion rocketmq-remoting/src/protocol/command_custom_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@
/// # Arguments
///
/// * `_fields` - A reference to a `HashMap` that contains the fields to be decoded.
fn decode_fast(&mut self, _fields: &HashMap<CheetahString, CheetahString>) {}
fn decode_fast(
&mut self,
_fields: &HashMap<CheetahString, CheetahString>,
) -> crate::Result<()> {
Ok(())
}

Check warning on line 79 in rocketmq-remoting/src/protocol/command_custom_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/command_custom_header.rs#L74-L79

Added lines #L74 - L79 were not covered by tests

/// Indicates whether the implementing type supports fast codec.
///
Expand All @@ -82,6 +87,37 @@
fn support_fast_codec(&self) -> bool {
false
}

/// Retrieves the value associated with the specified field from the provided map.
///
/// # Arguments
///
/// * `map` - A reference to a `HashMap` containing `CheetahString` keys and values.
/// * `field` - A reference to a `CheetahString` representing the field to retrieve.
///
/// # Returns
///
/// * `Ok(CheetahString)` - If the field is found in the map, returns the associated value.
/// * `Err(RemotingError::RemotingCommandError)` - If the field is not found in the map, returns
/// an error indicating the field is required.
///
/// # Errors
///
/// This function returns a `RemotingError::RemotingCommandError` if the specified field is not
/// found in the map.
#[inline(always)]
fn get_and_check_not_none(
&self,
map: &HashMap<CheetahString, CheetahString>,
field: &CheetahString,
) -> crate::Result<CheetahString> {
match map.get(field) {
Some(value) => Ok(value.clone()),
None => Err(crate::remoting_error::RemotingError::RemotingCommandError(
format!("The field {} is required.", field),
)),

Check warning on line 118 in rocketmq-remoting/src/protocol/command_custom_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/command_custom_header.rs#L109-L118

Added lines #L109 - L118 were not covered by tests
}
}

Check warning on line 120 in rocketmq-remoting/src/protocol/command_custom_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/command_custom_header.rs#L120

Added line #L120 was not covered by tests
}

pub trait AsAny: Any {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use crate::protocol::command_custom_header::FromMap;
use crate::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader;
use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
use crate::remoting_error::RemotingError::RemotingCommandError;
use crate::rpc::topic_request_header::TopicRequestHeader;

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -145,33 +146,30 @@
}
}

fn decode_fast(&mut self, fields: &HashMap<CheetahString, CheetahString>) {
if let Some(v) = fields.get(&CheetahString::from_slice("a")) {
self.a = v.clone();
}
if let Some(v) = fields.get(&CheetahString::from_slice("b")) {
self.b = v.clone();
}
if let Some(v) = fields.get(&CheetahString::from_slice("c")) {
self.c = v.clone();
}

if let Some(v) = fields.get(&CheetahString::from_slice("d")) {
self.d = v.parse().unwrap();
}
if let Some(v) = fields.get(&CheetahString::from_slice("e")) {
self.e = v.parse().unwrap();
}
if let Some(v) = fields.get(&CheetahString::from_slice("f")) {
self.f = v.parse().unwrap();
}
if let Some(v) = fields.get(&CheetahString::from_slice("g")) {
self.g = v.parse().unwrap();
}

if let Some(v) = fields.get(&CheetahString::from_slice("h")) {
self.h = v.parse().unwrap();
}
fn decode_fast(&mut self, fields: &HashMap<CheetahString, CheetahString>) -> crate::Result<()> {
self.a = self.get_and_check_not_none(fields, &CheetahString::from_slice("a"))?; //producerGroup
self.b = self.get_and_check_not_none(fields, &CheetahString::from_slice("b"))?; //topic
self.c = self.get_and_check_not_none(fields, &CheetahString::from_slice("c"))?; //defaultTopic
self.d = self
.get_and_check_not_none(fields, &CheetahString::from_slice("d"))?
.parse()
.map_err(|_| RemotingCommandError("Parse field d error".to_string()))?; //defaultTopicQueueNums
self.e = self
.get_and_check_not_none(fields, &CheetahString::from_slice("e"))?
.parse()
.map_err(|_| RemotingCommandError("Parse field e error".to_string()))?; //queueId
self.f = self
.get_and_check_not_none(fields, &CheetahString::from_slice("f"))?
.parse()
.map_err(|_| RemotingCommandError("Parse field f error".to_string()))?; //sysFlag
self.g = self
.get_and_check_not_none(fields, &CheetahString::from_slice("g"))?
.parse()
.map_err(|_| RemotingCommandError("Parse field g error".to_string()))?; //bornTimestamp
self.h = self
.get_and_check_not_none(fields, &CheetahString::from_slice("h"))?
.parse()
.map_err(|_| RemotingCommandError("Parse field h error".to_string()))?; //flag

Check warning on line 172 in rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header_v2.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header_v2.rs#L149-L172

Added lines #L149 - L172 were not covered by tests

if let Some(v) = fields.get(&CheetahString::from_slice("i")) {
self.i = Some(v.clone());
Expand All @@ -182,20 +180,30 @@
}

if let Some(v) = fields.get(&CheetahString::from_slice("k")) {
self.k = Some(v.parse().unwrap());
self.k = Some(
v.parse()
.map_err(|_| RemotingCommandError("Parse field k error".to_string()))?,

Check warning on line 185 in rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header_v2.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header_v2.rs#L184-L185

Added lines #L184 - L185 were not covered by tests
);
}

if let Some(v) = fields.get(&CheetahString::from_slice("l")) {
self.l = Some(v.parse().unwrap());
self.l = Some(
v.parse()
.map_err(|_| RemotingCommandError("Parse field l error".to_string()))?,

Check warning on line 192 in rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header_v2.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header_v2.rs#L191-L192

Added lines #L191 - L192 were not covered by tests
);
}

if let Some(v) = fields.get(&CheetahString::from_slice("m")) {
self.m = Some(v.parse().unwrap());
self.m = Some(
v.parse()
.map_err(|_| RemotingCommandError("Parse field m error".to_string()))?,

Check warning on line 199 in rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header_v2.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header_v2.rs#L198-L199

Added lines #L198 - L199 were not covered by tests
);
}

if let Some(v) = fields.get(&CheetahString::from_slice("n")) {
self.n = Some(v.clone());
}
Ok(())

Check warning on line 206 in rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header_v2.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header_v2.rs#L206

Added line #L206 was not covered by tests
}

fn support_fast_codec(&self) -> bool {
Expand Down Expand Up @@ -438,3 +446,242 @@
self.e = queue_id.unwrap();
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use cheetah_string::CheetahString;

use super::*;

#[test]
fn send_message_request_header_v2_serializes_correctly() {
let header = SendMessageRequestHeaderV2 {
a: CheetahString::from_static_str("test_producer_group"),
b: CheetahString::from_static_str("test_topic"),
c: CheetahString::from_static_str("test_default_topic"),
d: 8,
e: 1,
f: 0,
g: 1622547800000,
h: 0,
i: Some(CheetahString::from_static_str("test_properties")),
j: Some(3),
k: Some(true),
l: Some(5),
m: Some(false),
n: Some(CheetahString::from_static_str("test_broker_name")),
topic_request_header: None,
};
let map = header.to_map().unwrap();
assert_eq!(
map.get(&CheetahString::from_static_str("a")).unwrap(),
"test_producer_group"
);
assert_eq!(
map.get(&CheetahString::from_static_str("b")).unwrap(),
"test_topic"
);
assert_eq!(
map.get(&CheetahString::from_static_str("c")).unwrap(),
"test_default_topic"
);
assert_eq!(map.get(&CheetahString::from_static_str("d")).unwrap(), "8");
assert_eq!(map.get(&CheetahString::from_static_str("e")).unwrap(), "1");
assert_eq!(map.get(&CheetahString::from_static_str("f")).unwrap(), "0");
assert_eq!(
map.get(&CheetahString::from_static_str("g")).unwrap(),
"1622547800000"
);
assert_eq!(map.get(&CheetahString::from_static_str("h")).unwrap(), "0");
assert_eq!(
map.get(&CheetahString::from_static_str("i")).unwrap(),
"test_properties"
);
assert_eq!(map.get(&CheetahString::from_static_str("j")).unwrap(), "3");
assert_eq!(
map.get(&CheetahString::from_static_str("k")).unwrap(),
"true"
);
assert_eq!(map.get(&CheetahString::from_static_str("l")).unwrap(), "5");
assert_eq!(
map.get(&CheetahString::from_static_str("m")).unwrap(),
"false"
);
assert_eq!(
map.get(&CheetahString::from_static_str("n")).unwrap(),
"test_broker_name"
);
}

#[test]
fn send_message_request_header_v2_deserializes_correctly() {
let mut map = HashMap::new();
map.insert(
CheetahString::from_static_str("a"),
CheetahString::from_static_str("test_producer_group"),
);
map.insert(
CheetahString::from_static_str("b"),
CheetahString::from_static_str("test_topic"),
);
map.insert(
CheetahString::from_static_str("c"),
CheetahString::from_static_str("test_default_topic"),
);
map.insert(
CheetahString::from_static_str("d"),
CheetahString::from_static_str("8"),
);
map.insert(
CheetahString::from_static_str("e"),
CheetahString::from_static_str("1"),
);
map.insert(
CheetahString::from_static_str("f"),
CheetahString::from_static_str("0"),
);
map.insert(
CheetahString::from_static_str("g"),
CheetahString::from_static_str("1622547800000"),
);
map.insert(
CheetahString::from_static_str("h"),
CheetahString::from_static_str("0"),
);
map.insert(
CheetahString::from_static_str("i"),
CheetahString::from_static_str("test_properties"),
);
map.insert(
CheetahString::from_static_str("j"),
CheetahString::from_static_str("3"),
);
map.insert(
CheetahString::from_static_str("k"),
CheetahString::from_static_str("true"),
);
map.insert(
CheetahString::from_static_str("l"),
CheetahString::from_static_str("5"),
);
map.insert(
CheetahString::from_static_str("m"),
CheetahString::from_static_str("false"),
);
map.insert(
CheetahString::from_static_str("n"),
CheetahString::from_static_str("test_broker_name"),
);

let header = <SendMessageRequestHeaderV2 as FromMap>::from(&map).unwrap();
assert_eq!(header.a, "test_producer_group");
assert_eq!(header.b, "test_topic");
assert_eq!(header.c, "test_default_topic");
assert_eq!(header.d, 8);
assert_eq!(header.e, 1);
assert_eq!(header.f, 0);
assert_eq!(header.g, 1622547800000);
assert_eq!(header.h, 0);
assert_eq!(header.i.unwrap(), "test_properties");
assert_eq!(header.j.unwrap(), 3);
assert_eq!(header.k.unwrap(), true);
assert_eq!(header.l.unwrap(), 5);
assert_eq!(header.m.unwrap(), false);
assert_eq!(header.n.unwrap(), "test_broker_name");
}

#[test]
fn send_message_request_header_v2_handles_missing_optional_fields() {
let mut map = HashMap::new();
map.insert(
CheetahString::from_static_str("a"),
CheetahString::from_static_str("test_producer_group"),
);
map.insert(
CheetahString::from_static_str("b"),
CheetahString::from_static_str("test_topic"),
);
map.insert(
CheetahString::from_static_str("c"),
CheetahString::from_static_str("test_default_topic"),
);
map.insert(
CheetahString::from_static_str("d"),
CheetahString::from_static_str("8"),
);
map.insert(
CheetahString::from_static_str("e"),
CheetahString::from_static_str("1"),
);
map.insert(
CheetahString::from_static_str("f"),
CheetahString::from_static_str("0"),
);
map.insert(
CheetahString::from_static_str("g"),
CheetahString::from_static_str("1622547800000"),
);
map.insert(
CheetahString::from_static_str("h"),
CheetahString::from_static_str("0"),
);

let header = <SendMessageRequestHeaderV2 as FromMap>::from(&map).unwrap();
assert_eq!(header.a, "test_producer_group");
assert_eq!(header.b, "test_topic");
assert_eq!(header.c, "test_default_topic");
assert_eq!(header.d, 8);
assert_eq!(header.e, 1);
assert_eq!(header.f, 0);
assert_eq!(header.g, 1622547800000);
assert_eq!(header.h, 0);
assert!(header.i.is_none());
assert!(header.j.is_none());
assert!(header.k.is_none());
assert!(header.l.is_none());
assert!(header.m.is_none());
assert!(header.n.is_none());
}

#[test]
fn send_message_request_header_v2_handles_invalid_data() {
let mut map = HashMap::new();
map.insert(
CheetahString::from_static_str("a"),
CheetahString::from_static_str("test_producer_group"),
);
map.insert(
CheetahString::from_static_str("b"),
CheetahString::from_static_str("test_topic"),
);
map.insert(
CheetahString::from_static_str("c"),
CheetahString::from_static_str("test_default_topic"),
);
map.insert(
CheetahString::from_static_str("d"),
CheetahString::from_static_str("invalid"),
);
map.insert(
CheetahString::from_static_str("e"),
CheetahString::from_static_str("invalid"),
);
map.insert(
CheetahString::from_static_str("f"),
CheetahString::from_static_str("invalid"),
);
map.insert(
CheetahString::from_static_str("g"),
CheetahString::from_static_str("invalid"),
);
map.insert(
CheetahString::from_static_str("h"),
CheetahString::from_static_str("invalid"),
);

let result = <SendMessageRequestHeaderV2 as FromMap>::from(&map);
assert!(result.is_err());
}
}
Loading
Loading