Skip to content

Commit

Permalink
[ISSUE ##1307]🚀Implement MessageRequestModeManager (#1308)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Nov 25, 2024
1 parent 5a5312a commit 6e8aaac
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 0 deletions.
1 change: 1 addition & 0 deletions rocketmq-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub(crate) mod controller;
pub(crate) mod error;
pub(crate) mod filter;
pub(crate) mod hook;
pub(crate) mod load_balance;
pub(crate) mod long_polling;
pub(crate) mod mqtrace;
pub(crate) mod offset;
Expand Down
17 changes: 17 additions & 0 deletions rocketmq-broker/src/load_balance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub(crate) mod message_request_mode_manager;
194 changes: 194 additions & 0 deletions rocketmq-broker/src/load_balance/message_request_mode_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;
use std::sync::Arc;

use cheetah_string::CheetahString;
use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody;
use rocketmq_rust::ArcMut;
use rocketmq_store::config::message_store_config::MessageStoreConfig;
use tracing::info;

use crate::broker_path_config_helper;

pub(crate) struct MessageRequestModeManager {
message_store_config: ArcMut<MessageStoreConfig>,
message_request_mode_map: Arc<
parking_lot::Mutex<
HashMap<
CheetahString, /* topic */
HashMap<CheetahString /* consumerGroup */, SetMessageRequestModeRequestBody>,
>,
>,
>,
}

impl MessageRequestModeManager {
pub fn new(message_store_config: ArcMut<MessageStoreConfig>) -> Self {
Self {
message_store_config,
message_request_mode_map: Arc::new(parking_lot::Mutex::new(HashMap::new())),
}
}

pub fn set_message_request_mode(
&self,
topic: CheetahString,
consumer_group: CheetahString,
request_body: SetMessageRequestModeRequestBody,
) {
let mut message_request_mode_map = self.message_request_mode_map.lock();
let consumer_group_map = message_request_mode_map.entry(topic.clone()).or_default();
consumer_group_map.insert(consumer_group, request_body);
}

pub fn get_message_request_mode(
&self,
topic: &CheetahString,
consumer_group: &CheetahString,
) -> Option<SetMessageRequestModeRequestBody> {
let message_request_mode_map = self.message_request_mode_map.lock();
if let Some(consumer_group_map) = message_request_mode_map.get(topic) {
if let Some(message_request_mode) = consumer_group_map.get(consumer_group) {
return Some(message_request_mode.clone());
}
}
None
}
}

impl ConfigManager for MessageRequestModeManager {
fn config_file_path(&self) -> String {
broker_path_config_helper::get_message_request_mode_path(
self.message_store_config.store_path_root_dir.as_str(),
)
}

fn encode_pretty(&self, pretty_format: bool) -> String {
if pretty_format {
SerdeJsonUtils::to_json_pretty(&*self.message_request_mode_map.lock())
.expect("encode failed")
} else {
SerdeJsonUtils::to_json(&*self.message_request_mode_map.lock()).expect("encode failed")
}
}

fn decode(&self, json_string: &str) {
info!(
"decode MessageRequestModeManager from json string:{}",
json_string
);
if json_string.is_empty() {
return;
}
let message_request_mode_map: HashMap<
CheetahString,
HashMap<CheetahString, SetMessageRequestModeRequestBody>,
> = SerdeJsonUtils::from_json_str(json_string).expect("decode failed");
let mut message_request_mode_map_ = self.message_request_mode_map.lock();
*message_request_mode_map_ = message_request_mode_map;
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_enum::MessageRequestMode;
use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody;
use rocketmq_rust::ArcMut;
use rocketmq_store::config::message_store_config::MessageStoreConfig;

use super::*;

#[test]
fn set_message_request_mode_adds_entry() {
let message_store_config = ArcMut::new(MessageStoreConfig::default());
let manager = MessageRequestModeManager::new(message_store_config);
let topic = CheetahString::from("test_topic");
let consumer_group = CheetahString::from("test_group");
let request_body = SetMessageRequestModeRequestBody::default();

manager.set_message_request_mode(
topic.clone(),
consumer_group.clone(),
request_body.clone(),
);
let result = manager.get_message_request_mode(&topic, &consumer_group);

//assert_eq!(result, Some(request_body));
}

#[test]
fn get_message_request_mode_returns_none_for_nonexistent_entry() {
let message_store_config = ArcMut::new(MessageStoreConfig::default());
let manager = MessageRequestModeManager::new(message_store_config);
let topic = CheetahString::from("nonexistent_topic");
let consumer_group = CheetahString::from("nonexistent_group");

let result = manager.get_message_request_mode(&topic, &consumer_group);

assert!(result.is_none());
}

#[test]
fn encode_pretty_returns_pretty_json() {
let message_store_config = ArcMut::new(MessageStoreConfig::default());
let manager = MessageRequestModeManager::new(message_store_config);
let topic = CheetahString::from("test_topic");
let consumer_group = CheetahString::from("test_group");
let request_body = SetMessageRequestModeRequestBody::default();

manager.set_message_request_mode(
topic.clone(),
consumer_group.clone(),
request_body.clone(),
);
let json = manager.encode_pretty(true);

assert!(json.contains("\n"));
assert!(json.contains("\"test_topic\""));
}

#[test]
fn decode_populates_message_request_mode_map() {
let message_store_config = ArcMut::new(MessageStoreConfig::default());
let manager = MessageRequestModeManager::new(message_store_config);
let json = r#"{
"test_topic": {
"test_group": {
"topic": "test_topic",
"consumerGroup": "test_group",
"mode": "PULL",
"popShareQueueNum": 0
}
}
}"#;

manager.decode(json);
let result = manager.get_message_request_mode(
&CheetahString::from("test_topic"),
&CheetahString::from("test_group"),
);

assert!(result.is_some());
assert_eq!(result.unwrap().mode, MessageRequestMode::Pull);
}
}

0 comments on commit 6e8aaac

Please sign in to comment.