From 1b9dc81ff7744df3e0093a1f905b670e475a032b Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Mon, 25 Nov 2024 22:58:46 +0200 Subject: [PATCH 1/3] feat: add initial update implementation --- rocketmq-common/src/common/mix_all.rs | 53 ++++ .../src/common/namesrv/namesrv_config.rs | 230 ++++++++++++++++++ .../src/kvconfig/kvconfig_mananger.rs | 8 + .../processor/default_request_processor.rs | 65 +++++ rocketmq-remoting/src/code/response_code.rs | 2 + 5 files changed, 358 insertions(+) diff --git a/rocketmq-common/src/common/mix_all.rs b/rocketmq-common/src/common/mix_all.rs index be71ef62..9961733b 100644 --- a/rocketmq-common/src/common/mix_all.rs +++ b/rocketmq-common/src/common/mix_all.rs @@ -15,6 +15,7 @@ * limitations under the License. */ +use std::collections::HashMap; use std::env; use cheetah_string::CheetahString; @@ -159,6 +160,29 @@ pub fn human_readable_byte_count(bytes: i64, si: bool) -> String { format!("{:.1} {}B", bytes / unit.powi(exp), pre) } +pub fn string_to_properties(input: &str) -> Option> { + let mut properties = HashMap::new(); + + for line in input.lines() { + let line = line.trim(); + if line.is_empty() || line.starts_with('#') { + // Skip empty lines or comments + continue; + } + + if let Some((key, value)) = line.split_once('=') { + // Convert key and value to CheetahString + let key = CheetahString::from(key.trim()); + let value = CheetahString::from(value.trim()); + properties.insert(key, value); + } else { + return None; // Return None if the line isn't in `key=value` format + } + } + + Some(properties) +} + #[cfg(test)] mod tests { use super::*; @@ -241,4 +265,33 @@ mod tests { fn returns_false_for_none_metadata() { assert!(!is_lmq(None)); } + + #[test] + fn test_string_to_properties_valid_input() { + let input = r#" + # This is a comment + key1=value1 + key2 = value2 + key3=value3 + "#; + + let result = string_to_properties(input).expect("Parsing should succeed"); + let mut expected = HashMap::new(); + expected.insert(CheetahString::from("key1"), CheetahString::from("value1")); + expected.insert(CheetahString::from("key2"), CheetahString::from("value2")); + expected.insert(CheetahString::from("key3"), CheetahString::from("value3")); + + assert_eq!(result, expected); + } + + #[test] + fn test_string_to_properties_invalid_line() { + let input = r#" + key1=value1 + invalid_line + "#; + + let result = string_to_properties(input); + assert!(result.is_none(), "Parsing should fail for invalid input"); + } } diff --git a/rocketmq-common/src/common/namesrv/namesrv_config.rs b/rocketmq-common/src/common/namesrv/namesrv_config.rs index 4b5c795f..186ee7e5 100644 --- a/rocketmq-common/src/common/namesrv/namesrv_config.rs +++ b/rocketmq-common/src/common/namesrv/namesrv_config.rs @@ -15,8 +15,10 @@ * limitations under the License. */ +use std::collections::HashMap; use std::env; +use cheetah_string::CheetahString; use serde::Deserialize; use crate::common::mix_all::ROCKETMQ_HOME_ENV; @@ -144,6 +146,118 @@ impl NamesrvConfig { pub fn new() -> NamesrvConfig { Self::default() } + + /// Splits the `config_black_list` into a `Vec` for easier usage. + pub fn get_config_blacklist(&self) -> Vec { + self.config_black_list + .split(';') + .map(|s| CheetahString::from(s.trim())) + .collect() + } + + pub fn update( + &mut self, + properties: HashMap, + ) -> Result<(), String> { + for (key, value) in properties { + match key.as_str() { + "rocketmqHome" => self.rocketmq_home = value.to_string(), + "kvConfigPath" => self.kv_config_path = value.to_string(), + "configStorePath" => self.config_store_path = value.to_string(), + "productEnvName" => self.product_env_name = value.to_string(), + "clusterTest" => { + self.cluster_test = value + .parse() + .map_err(|_| format!("Invalid boolean value for key '{}'", key))? + } + "orderMessageEnable" => { + self.order_message_enable = value + .parse() + .map_err(|_| format!("Invalid boolean value for key '{}'", key))? + } + "clientRequestThreadPoolNums" => { + self.client_request_thread_pool_nums = value + .parse() + .map_err(|_| format!("Invalid integer value for key '{}'", key))? + } + "defaultThreadPoolNums" => { + self.default_thread_pool_nums = value + .parse() + .map_err(|_| format!("Invalid integer value for key '{}'", key))? + } + "clientRequestThreadPoolQueueCapacity" => { + self.client_request_thread_pool_queue_capacity = value + .parse() + .map_err(|_| format!("Invalid integer value for key '{}'", key))? + } + "defaultThreadPoolQueueCapacity" => { + self.default_thread_pool_queue_capacity = value + .parse() + .map_err(|_| format!("Invalid integer value for key '{}'", key))? + } + "scanNotActiveBrokerInterval" => { + self.scan_not_active_broker_interval = value + .parse() + .map_err(|_| format!("Invalid value for key '{}'", key))? + } + "unRegisterBrokerQueueCapacity" => { + self.unregister_broker_queue_capacity = value + .parse() + .map_err(|_| format!("Invalid integer value for key '{}'", key))? + } + "supportActingMaster" => { + self.support_acting_master = value + .parse() + .map_err(|_| format!("Invalid boolean value for key '{}'", key))? + } + "enableAllTopicList" => { + self.enable_all_topic_list = value + .parse() + .map_err(|_| format!("Invalid boolean value for key '{}'", key))? + } + "enableTopicList" => { + self.enable_topic_list = value + .parse() + .map_err(|_| format!("Invalid boolean value for key '{}'", key))? + } + "notifyMinBrokerIdChanged" => { + self.notify_min_broker_id_changed = value + .parse() + .map_err(|_| format!("Invalid boolean value for key '{}'", key))? + } + "enableControllerInNamesrv" => { + self.enable_controller_in_namesrv = value + .parse() + .map_err(|_| format!("Invalid boolean value for key '{}'", key))? + } + "needWaitForService" => { + self.need_wait_for_service = value + .parse() + .map_err(|_| format!("Invalid boolean value for key '{}'", key))? + } + "waitSecondsForService" => { + self.wait_seconds_for_service = value + .parse() + .map_err(|_| format!("Invalid integer value for key '{}'", key))? + } + "deleteTopicWithBrokerRegistration" => { + self.delete_topic_with_broker_registration = value + .parse() + .map_err(|_| format!("Invalid boolean value for key '{}'", key))? + } + "configBlackList" => { + self.config_black_list = value + .parse() + .map_err(|_| format!("Invalid string value for key '{}'", key))? + } + _ => { + return Err(format!("Unknown configuration key: '{}'", key)); + } + } + } + + Ok(()) + } } #[cfg(test)] @@ -204,4 +318,120 @@ mod tests { "configBlackList;configStorePath;kvConfigPath".to_string() ); } + + #[test] + fn test_namesrv_config_update() { + let mut config = NamesrvConfig::new(); + + let mut properties = HashMap::new(); + properties.insert( + CheetahString::from("rocketmqHome"), + CheetahString::from("/new/path"), + ); + properties.insert( + CheetahString::from("kvConfigPath"), + CheetahString::from("/new/kvConfigPath"), + ); + properties.insert( + CheetahString::from("configStorePath"), + CheetahString::from("/new/configStorePath"), + ); + properties.insert( + CheetahString::from("productEnvName"), + CheetahString::from("new_env"), + ); + properties.insert( + CheetahString::from("clusterTest"), + CheetahString::from("true"), + ); + properties.insert( + CheetahString::from("orderMessageEnable"), + CheetahString::from("true"), + ); + properties.insert( + CheetahString::from("clientRequestThreadPoolNums"), + CheetahString::from("10"), + ); + properties.insert( + CheetahString::from("defaultThreadPoolNums"), + CheetahString::from("20"), + ); + properties.insert( + CheetahString::from("clientRequestThreadPoolQueueCapacity"), + CheetahString::from("10000"), + ); + properties.insert( + CheetahString::from("defaultThreadPoolQueueCapacity"), + CheetahString::from("20000"), + ); + properties.insert( + CheetahString::from("scanNotActiveBrokerInterval"), + CheetahString::from("15000"), + ); + properties.insert( + CheetahString::from("unRegisterBrokerQueueCapacity"), + CheetahString::from("4000"), + ); + properties.insert( + CheetahString::from("supportActingMaster"), + CheetahString::from("true"), + ); + properties.insert( + CheetahString::from("enableAllTopicList"), + CheetahString::from("false"), + ); + properties.insert( + CheetahString::from("enableTopicList"), + CheetahString::from("false"), + ); + properties.insert( + CheetahString::from("notifyMinBrokerIdChanged"), + CheetahString::from("true"), + ); + properties.insert( + CheetahString::from("enableControllerInNamesrv"), + CheetahString::from("true"), + ); + properties.insert( + CheetahString::from("needWaitForService"), + CheetahString::from("true"), + ); + properties.insert( + CheetahString::from("waitSecondsForService"), + CheetahString::from("30"), + ); + properties.insert( + CheetahString::from("deleteTopicWithBrokerRegistration"), + CheetahString::from("true"), + ); + properties.insert( + CheetahString::from("configBlackList"), + CheetahString::from("newBlackList"), + ); + + let result = config.update(properties); + assert!(result.is_ok()); + + assert_eq!(config.rocketmq_home, "/new/path"); + assert_eq!(config.kv_config_path, "/new/kvConfigPath"); + assert_eq!(config.config_store_path, "/new/configStorePath"); + assert_eq!(config.product_env_name, "new_env"); + assert_eq!(config.cluster_test, true); + assert_eq!(config.order_message_enable, true); + assert_eq!(config.client_request_thread_pool_nums, 10); + assert_eq!(config.default_thread_pool_nums, 20); + assert_eq!(config.client_request_thread_pool_queue_capacity, 10000); + assert_eq!(config.default_thread_pool_queue_capacity, 20000); + assert_eq!(config.scan_not_active_broker_interval, 15000); + assert_eq!(config.unregister_broker_queue_capacity, 4000); + assert_eq!(config.support_acting_master, true); + assert_eq!(config.enable_all_topic_list, false); + assert_eq!(config.enable_topic_list, false); + assert_eq!(config.notify_min_broker_id_changed, true); + assert_eq!(config.enable_controller_in_namesrv, true); + assert_eq!(config.need_wait_for_service, true); + assert_eq!(config.wait_seconds_for_service, 30); + assert_eq!(config.delete_topic_with_broker_registration, true); + assert_eq!(config.config_black_list, "newBlackList"); + } } diff --git a/rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs b/rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs index a3921c18..b62d40f1 100644 --- a/rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs +++ b/rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs @@ -96,6 +96,14 @@ impl KVConfigManager { } } + /// Updates the Namesrv configuration. + pub fn update_namesrv_config( + &mut self, + updates: HashMap, + ) -> Result<(), String> { + self.namesrv_config.update(updates) + } + /// Persists the current key-value configurations to a file. pub fn persist(&mut self) { let wrapper = diff --git a/rocketmq-namesrv/src/processor/default_request_processor.rs b/rocketmq-namesrv/src/processor/default_request_processor.rs index 07c69bc8..a54ff0c7 100644 --- a/rocketmq-namesrv/src/processor/default_request_processor.rs +++ b/rocketmq-namesrv/src/processor/default_request_processor.rs @@ -15,10 +15,13 @@ * limitations under the License. */ +use core::str; +use std::collections::HashMap; use std::net::SocketAddr; use cheetah_string::CheetahString; use rocketmq_common::common::mix_all; +use rocketmq_common::common::mix_all::string_to_properties; use rocketmq_common::common::mq_version::RocketMqVersion; use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils; use rocketmq_common::CRC32Utils; @@ -102,6 +105,7 @@ impl DefaultRequestProcessor { RequestCode::GetHasUnitSubUnunitTopicList => { self.get_has_unit_sub_un_unit_topic_list(request) } + RequestCode::UpdateNamesrvConfig => self.update_config(request), _ => RemotingCommand::create_response_command_with_code( RemotingSysResponseCode::SystemError, ), @@ -436,6 +440,55 @@ impl DefaultRequestProcessor { RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::SystemError) .set_remark("disable") } + + fn update_config(&mut self, request: RemotingCommand) -> RemotingCommand { + if let Some(body) = request.body() { + let body_str = match str::from_utf8(&body) { + Ok(s) => s, + Err(e) => { + return RemotingCommand::create_response_command_with_code( + RemotingSysResponseCode::SystemError, + ) + .set_remark(format!("UnsupportedEncodingException {:?}", e)); + } + }; + + let properties = match string_to_properties(body_str) { + Some(props) => props, + None => { + return RemotingCommand::create_response_command_with_code( + RemotingSysResponseCode::SystemError, + ) + .set_remark("string_to_properties error".to_string()); + } + }; + if validate_blacklist_config_exist( + &properties, + &self + .kvconfig_manager + .get_namesrv_config() + .get_config_blacklist(), + ) { + return RemotingCommand::create_response_command_with_code( + RemotingSysResponseCode::NoPermission, + ) + .set_remark("Cannot update config in blacklist.".to_string()); + } + + let result = self.kvconfig_manager.update_namesrv_config(properties); + if let Err(e) = result { + return RemotingCommand::create_response_command_with_code( + RemotingSysResponseCode::SystemError, + ) + .set_remark(format!("Update error {:?}", e)); + } + } + + return RemotingCommand::create_response_command_with_code( + RemotingSysResponseCode::Success, + ) + .set_remark(CheetahString::empty()); + } } fn extract_register_topic_config_from_request( @@ -487,6 +540,18 @@ fn check_sum_crc32( true } +fn validate_blacklist_config_exist( + properties: &HashMap, + config_blacklist: &[CheetahString], +) -> bool { + for black_config in config_blacklist { + if properties.contains_key(black_config.as_str()) { + return true; + } + } + false +} + #[cfg(test)] mod tests { use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader; diff --git a/rocketmq-remoting/src/code/response_code.rs b/rocketmq-remoting/src/code/response_code.rs index 0f300691..772e4f20 100644 --- a/rocketmq-remoting/src/code/response_code.rs +++ b/rocketmq-remoting/src/code/response_code.rs @@ -21,6 +21,7 @@ pub enum RemotingSysResponseCode { SystemBusy = 2, RequestCodeNotSupported = 3, TransactionFailed = 4, + NoPermission = 16, } impl From for i32 { @@ -37,6 +38,7 @@ impl From for RemotingSysResponseCode { 2 => RemotingSysResponseCode::SystemBusy, 3 => RemotingSysResponseCode::RequestCodeNotSupported, 4 => RemotingSysResponseCode::TransactionFailed, + 16 => RemotingSysResponseCode::NoPermission, _ => RemotingSysResponseCode::SystemError, } } From f6515e69ad59cad19069153d1a2f73916d58e9ae Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Mon, 25 Nov 2024 23:06:43 +0200 Subject: [PATCH 2/3] fix: clippy --- rocketmq-namesrv/src/processor/default_request_processor.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rocketmq-namesrv/src/processor/default_request_processor.rs b/rocketmq-namesrv/src/processor/default_request_processor.rs index a54ff0c7..1514f1f3 100644 --- a/rocketmq-namesrv/src/processor/default_request_processor.rs +++ b/rocketmq-namesrv/src/processor/default_request_processor.rs @@ -484,10 +484,8 @@ impl DefaultRequestProcessor { } } - return RemotingCommand::create_response_command_with_code( - RemotingSysResponseCode::Success, - ) - .set_remark(CheetahString::empty()); + RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::Success) + .set_remark(CheetahString::empty()) } } From 3caffaa3f76938b7e4d7d2e0257660caa331fb6b Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Mon, 25 Nov 2024 23:09:30 +0200 Subject: [PATCH 3/3] fix: clippy body reference --- rocketmq-namesrv/src/processor/default_request_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-namesrv/src/processor/default_request_processor.rs b/rocketmq-namesrv/src/processor/default_request_processor.rs index 1514f1f3..0712e9d9 100644 --- a/rocketmq-namesrv/src/processor/default_request_processor.rs +++ b/rocketmq-namesrv/src/processor/default_request_processor.rs @@ -443,7 +443,7 @@ impl DefaultRequestProcessor { fn update_config(&mut self, request: RemotingCommand) -> RemotingCommand { if let Some(body) = request.body() { - let body_str = match str::from_utf8(&body) { + let body_str = match str::from_utf8(body) { Ok(s) => s, Err(e) => { return RemotingCommand::create_response_command_with_code(