Skip to content

Commit

Permalink
[ISSUE #1283]⚡️Optimize name server DefaultRequestProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Nov 24, 2024
1 parent f9b544d commit 474a002
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
8 changes: 4 additions & 4 deletions rocketmq-namesrv/src/processor/default_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ impl DefaultRequestProcessor {
.decode_command_custom_header::<RegisterTopicRequestHeader>()
.expect("decode RegisterTopicRequestHeader failed");
if let Some(ref body) = request.body() {
let topic_route_data = SerdeJsonUtils::decode::<TopicRouteData>(body).unwrap();
let topic_route_data = TopicRouteData::decode(body).unwrap_or_default();

Check warning on line 362 in rocketmq-namesrv/src/processor/default_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/default_request_processor.rs#L362

Added line #L362 was not covered by tests
if !topic_route_data.queue_datas.is_empty() {
self.route_info_manager
.register_topic(request_header.topic, topic_route_data.queue_datas)
Expand All @@ -371,7 +371,7 @@ impl DefaultRequestProcessor {
fn get_kv_list_by_namespace(&self, request: RemotingCommand) -> RemotingCommand {
let request_header = request
.decode_command_custom_header::<GetKVListByNamespaceRequestHeader>()
.unwrap();
.expect("decode GetKVListByNamespaceRequestHeader failed");

Check warning on line 374 in rocketmq-namesrv/src/processor/default_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/default_request_processor.rs#L374

Added line #L374 was not covered by tests
let value = self
.kvconfig_manager
.get_kv_list_by_namespace(&request_header.namespace);
Expand All @@ -396,10 +396,10 @@ impl DefaultRequestProcessor {

let request_header = request
.decode_command_custom_header::<GetTopicsByClusterRequestHeader>()
.unwrap();
.expect("decode GetTopicsByClusterRequestHeader failed");

Check warning on line 399 in rocketmq-namesrv/src/processor/default_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/default_request_processor.rs#L399

Added line #L399 was not covered by tests
let topics_by_cluster = self
.route_info_manager
.get_topics_by_cluster(request_header.cluster.as_str());
.get_topics_by_cluster(&request_header.cluster);

Check warning on line 402 in rocketmq-namesrv/src/processor/default_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/default_request_processor.rs#L402

Added line #L402 was not covered by tests
RemotingCommand::create_response_command().set_body(topics_by_cluster.encode())
}

Expand Down
23 changes: 14 additions & 9 deletions rocketmq-namesrv/src/route/route_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,15 +762,21 @@ impl RouteInfoManager {
drop(lock)
}

pub(crate) fn register_topic(&mut self, topic: CheetahString, queue_data_vec: Vec<QueueData>) {
pub(crate) fn register_topic(&self, topic: CheetahString, queue_data_vec: Vec<QueueData>) {

Check warning on line 765 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L765

Added line #L765 was not covered by tests
if queue_data_vec.is_empty() {
return;
}

let lock = self.lock.write();

Check warning on line 769 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L769

Added line #L769 was not covered by tests
if !self.topic_queue_table.contains_key(&topic) {
self.topic_queue_table.insert(topic.clone(), HashMap::new());
self.topic_queue_table
.mut_from_ref()
.insert(topic.clone(), HashMap::new());

Check warning on line 773 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L771-L773

Added lines #L771 - L773 were not covered by tests
}
let queue_data_map = self.topic_queue_table.get_mut(&topic).unwrap();
let queue_data_map = self
.topic_queue_table
.mut_from_ref()
.get_mut(&topic)
.unwrap();

Check warning on line 779 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L775-L779

Added lines #L775 - L779 were not covered by tests
let vec_length = queue_data_vec.len();
for queue_data in queue_data_vec {
if !self
Expand All @@ -785,6 +791,7 @@ impl RouteInfoManager {
}
queue_data_map.insert(queue_data.broker_name().clone(), queue_data);
}
drop(lock);

Check warning on line 794 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L794

Added line #L794 was not covered by tests

if queue_data_map.len() > vec_length {
info!("Topic route already exist.{}, {:?}", topic, queue_data_map)
Expand All @@ -793,8 +800,9 @@ impl RouteInfoManager {
}
}

pub(crate) fn get_topics_by_cluster(&self, cluster: &str) -> TopicList {
pub(crate) fn get_topics_by_cluster(&self, cluster: &CheetahString) -> TopicList {

Check warning on line 803 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L803

Added line #L803 was not covered by tests
let mut topic_list = Vec::new();
let lock = self.lock.read();

Check warning on line 805 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L805

Added line #L805 was not covered by tests
if let Some(broker_name_set) = self.cluster_addr_table.get(cluster) {
for broker_name in broker_name_set {
for (topic, queue_data_map) in self.topic_queue_table.iter() {
Expand All @@ -804,6 +812,7 @@ impl RouteInfoManager {
}
}
}
drop(lock);

Check warning on line 815 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L815

Added line #L815 was not covered by tests
TopicList {
topic_list,
broker_addr: None,
Expand All @@ -821,10 +830,6 @@ impl RouteInfoManager {
}
if !self.broker_addr_table.is_empty() {
for broker_addr in self.broker_addr_table.values() {
/*for ip in broker_addr.broker_addrs().values() {
broker_addr_out = Some(ip.clone());
break;
}*/
let broker_addrs = broker_addr.broker_addrs();
if !broker_addrs.is_empty() {
broker_addr_out = Some(broker_addrs.values().next().unwrap().clone());
Expand Down

0 comments on commit 474a002

Please sign in to comment.