Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #90]Support update namesrv config #1316

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions rocketmq-common/src/common/mix_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

use std::collections::HashMap;
use std::env;

use cheetah_string::CheetahString;
Expand Down Expand Up @@ -159,6 +160,29 @@ pub fn human_readable_byte_count(bytes: i64, si: bool) -> String {
format!("{:.1} {}B", bytes / unit.powi(exp), pre)
}

pub fn string_to_properties(input: &str) -> Option<HashMap<CheetahString, CheetahString>> {
let mut properties = HashMap::new();

for line in input.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
// Skip empty lines or comments
continue;
}

if let Some((key, value)) = line.split_once('=') {
// Convert key and value to CheetahString
let key = CheetahString::from(key.trim());
let value = CheetahString::from(value.trim());
properties.insert(key, value);
} else {
return None; // Return None if the line isn't in `key=value` format
}
}

Some(properties)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -241,4 +265,33 @@ mod tests {
fn returns_false_for_none_metadata() {
assert!(!is_lmq(None));
}

#[test]
fn test_string_to_properties_valid_input() {
let input = r#"
# This is a comment
key1=value1
key2 = value2
key3=value3
"#;

let result = string_to_properties(input).expect("Parsing should succeed");
let mut expected = HashMap::new();
expected.insert(CheetahString::from("key1"), CheetahString::from("value1"));
expected.insert(CheetahString::from("key2"), CheetahString::from("value2"));
expected.insert(CheetahString::from("key3"), CheetahString::from("value3"));

assert_eq!(result, expected);
}

#[test]
fn test_string_to_properties_invalid_line() {
let input = r#"
key1=value1
invalid_line
"#;

let result = string_to_properties(input);
assert!(result.is_none(), "Parsing should fail for invalid input");
}
}
230 changes: 230 additions & 0 deletions rocketmq-common/src/common/namesrv/namesrv_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
* limitations under the License.
*/

use std::collections::HashMap;
use std::env;

use cheetah_string::CheetahString;
use serde::Deserialize;

use crate::common::mix_all::ROCKETMQ_HOME_ENV;
Expand Down Expand Up @@ -144,6 +146,118 @@
pub fn new() -> NamesrvConfig {
Self::default()
}

/// Splits the `config_black_list` into a `Vec<CheetahString>` for easier usage.
pub fn get_config_blacklist(&self) -> Vec<CheetahString> {
self.config_black_list
.split(';')
.map(|s| CheetahString::from(s.trim()))
.collect()
}

Check warning on line 156 in rocketmq-common/src/common/namesrv/namesrv_config.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/namesrv/namesrv_config.rs#L151-L156

Added lines #L151 - L156 were not covered by tests

pub fn update(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably this implementation could be improved by using deserialize_with derive?

&mut self,
properties: HashMap<CheetahString, CheetahString>,
) -> Result<(), String> {
for (key, value) in properties {
match key.as_str() {
"rocketmqHome" => self.rocketmq_home = value.to_string(),
"kvConfigPath" => self.kv_config_path = value.to_string(),
"configStorePath" => self.config_store_path = value.to_string(),
"productEnvName" => self.product_env_name = value.to_string(),
"clusterTest" => {
self.cluster_test = value
.parse()
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
}
"orderMessageEnable" => {
self.order_message_enable = value
.parse()
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
}
"clientRequestThreadPoolNums" => {
self.client_request_thread_pool_nums = value
.parse()
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
}
"defaultThreadPoolNums" => {
self.default_thread_pool_nums = value
.parse()
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
}
"clientRequestThreadPoolQueueCapacity" => {
self.client_request_thread_pool_queue_capacity = value
.parse()
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
}
"defaultThreadPoolQueueCapacity" => {
self.default_thread_pool_queue_capacity = value
.parse()
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
}
"scanNotActiveBrokerInterval" => {
self.scan_not_active_broker_interval = value
.parse()
.map_err(|_| format!("Invalid value for key '{}'", key))?
}
"unRegisterBrokerQueueCapacity" => {
self.unregister_broker_queue_capacity = value
.parse()
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
}
"supportActingMaster" => {
self.support_acting_master = value
.parse()
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
}
"enableAllTopicList" => {
self.enable_all_topic_list = value
.parse()
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
}
"enableTopicList" => {
self.enable_topic_list = value
.parse()
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
}
"notifyMinBrokerIdChanged" => {
self.notify_min_broker_id_changed = value
.parse()
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
}
"enableControllerInNamesrv" => {
self.enable_controller_in_namesrv = value
.parse()
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
}
"needWaitForService" => {
self.need_wait_for_service = value
.parse()
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
}
"waitSecondsForService" => {
self.wait_seconds_for_service = value
.parse()
.map_err(|_| format!("Invalid integer value for key '{}'", key))?
}
"deleteTopicWithBrokerRegistration" => {
self.delete_topic_with_broker_registration = value
.parse()
.map_err(|_| format!("Invalid boolean value for key '{}'", key))?
}
"configBlackList" => {
self.config_black_list = value
.parse()
.map_err(|_| format!("Invalid string value for key '{}'", key))?
}
_ => {
return Err(format!("Unknown configuration key: '{}'", key));

Check warning on line 254 in rocketmq-common/src/common/namesrv/namesrv_config.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-common/src/common/namesrv/namesrv_config.rs#L254

Added line #L254 was not covered by tests
}
}
}

Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -204,4 +318,120 @@
"configBlackList;configStorePath;kvConfigPath".to_string()
);
}

#[test]
fn test_namesrv_config_update() {
let mut config = NamesrvConfig::new();

let mut properties = HashMap::new();
properties.insert(
CheetahString::from("rocketmqHome"),
CheetahString::from("/new/path"),
);
properties.insert(
CheetahString::from("kvConfigPath"),
CheetahString::from("/new/kvConfigPath"),
);
properties.insert(
CheetahString::from("configStorePath"),
CheetahString::from("/new/configStorePath"),
);
properties.insert(
CheetahString::from("productEnvName"),
CheetahString::from("new_env"),
);
properties.insert(
CheetahString::from("clusterTest"),
CheetahString::from("true"),
);
properties.insert(
CheetahString::from("orderMessageEnable"),
CheetahString::from("true"),
);
properties.insert(
CheetahString::from("clientRequestThreadPoolNums"),
CheetahString::from("10"),
);
properties.insert(
CheetahString::from("defaultThreadPoolNums"),
CheetahString::from("20"),
);
properties.insert(
CheetahString::from("clientRequestThreadPoolQueueCapacity"),
CheetahString::from("10000"),
);
properties.insert(
CheetahString::from("defaultThreadPoolQueueCapacity"),
CheetahString::from("20000"),
);
properties.insert(
CheetahString::from("scanNotActiveBrokerInterval"),
CheetahString::from("15000"),
);
properties.insert(
CheetahString::from("unRegisterBrokerQueueCapacity"),
CheetahString::from("4000"),
);
properties.insert(
CheetahString::from("supportActingMaster"),
CheetahString::from("true"),
);
properties.insert(
CheetahString::from("enableAllTopicList"),
CheetahString::from("false"),
);
properties.insert(
CheetahString::from("enableTopicList"),
CheetahString::from("false"),
);
properties.insert(
CheetahString::from("notifyMinBrokerIdChanged"),
CheetahString::from("true"),
);
properties.insert(
CheetahString::from("enableControllerInNamesrv"),
CheetahString::from("true"),
);
properties.insert(
CheetahString::from("needWaitForService"),
CheetahString::from("true"),
);
properties.insert(
CheetahString::from("waitSecondsForService"),
CheetahString::from("30"),
);
properties.insert(
CheetahString::from("deleteTopicWithBrokerRegistration"),
CheetahString::from("true"),
);
properties.insert(
CheetahString::from("configBlackList"),
CheetahString::from("newBlackList"),
);

let result = config.update(properties);
assert!(result.is_ok());

assert_eq!(config.rocketmq_home, "/new/path");
assert_eq!(config.kv_config_path, "/new/kvConfigPath");
assert_eq!(config.config_store_path, "/new/configStorePath");
assert_eq!(config.product_env_name, "new_env");
assert_eq!(config.cluster_test, true);
assert_eq!(config.order_message_enable, true);
assert_eq!(config.client_request_thread_pool_nums, 10);
assert_eq!(config.default_thread_pool_nums, 20);
assert_eq!(config.client_request_thread_pool_queue_capacity, 10000);
assert_eq!(config.default_thread_pool_queue_capacity, 20000);
assert_eq!(config.scan_not_active_broker_interval, 15000);
assert_eq!(config.unregister_broker_queue_capacity, 4000);
assert_eq!(config.support_acting_master, true);
assert_eq!(config.enable_all_topic_list, false);
assert_eq!(config.enable_topic_list, false);
assert_eq!(config.notify_min_broker_id_changed, true);
assert_eq!(config.enable_controller_in_namesrv, true);
assert_eq!(config.need_wait_for_service, true);
assert_eq!(config.wait_seconds_for_service, 30);
assert_eq!(config.delete_topic_with_broker_registration, true);
assert_eq!(config.config_black_list, "newBlackList");
}
}
8 changes: 8 additions & 0 deletions rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@
}
}

/// Updates the Namesrv configuration.
pub fn update_namesrv_config(
&mut self,
updates: HashMap<CheetahString, CheetahString>,
) -> Result<(), String> {
self.namesrv_config.update(updates)
}

Check warning on line 105 in rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs#L100-L105

Added lines #L100 - L105 were not covered by tests

Comment on lines +99 to +106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add unit tests for the new configuration update functionality.

The new update_namesrv_config method lacks test coverage. Consider adding tests to verify successful updates and error handling.

Add these test cases to the tests module:

#[test]
fn update_namesrv_config_successful() {
    let mut manager = create_kv_config_manager();
    let mut updates = HashMap::new();
    updates.insert("key".into(), "value".into());
    
    let result = manager.update_namesrv_config(updates);
    assert!(result.is_ok());
}

#[test]
fn update_namesrv_config_handles_errors() {
    let mut manager = create_kv_config_manager();
    let mut updates = HashMap::new();
    // Add invalid configuration that would cause an error
    updates.insert("invalid_key".into(), "invalid_value".into());
    
    let result = manager.update_namesrv_config(updates);
    assert!(result.is_err());
}

/// Persists the current key-value configurations to a file.
pub fn persist(&mut self) {
let wrapper =
Expand Down
Loading
Loading