Skip to content

Commit 4445013

Browse files
authored
[ISSUE #1283]⚡️Optimize name server DefaultRequestProcessor (#1284)
1 parent f9b544d commit 4445013

File tree

2 files changed

+18
-13
lines changed

2 files changed

+18
-13
lines changed

rocketmq-namesrv/src/processor/default_request_processor.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ impl DefaultRequestProcessor {
359359
.decode_command_custom_header::<RegisterTopicRequestHeader>()
360360
.expect("decode RegisterTopicRequestHeader failed");
361361
if let Some(ref body) = request.body() {
362-
let topic_route_data = SerdeJsonUtils::decode::<TopicRouteData>(body).unwrap();
362+
let topic_route_data = TopicRouteData::decode(body).unwrap_or_default();
363363
if !topic_route_data.queue_datas.is_empty() {
364364
self.route_info_manager
365365
.register_topic(request_header.topic, topic_route_data.queue_datas)
@@ -371,7 +371,7 @@ impl DefaultRequestProcessor {
371371
fn get_kv_list_by_namespace(&self, request: RemotingCommand) -> RemotingCommand {
372372
let request_header = request
373373
.decode_command_custom_header::<GetKVListByNamespaceRequestHeader>()
374-
.unwrap();
374+
.expect("decode GetKVListByNamespaceRequestHeader failed");
375375
let value = self
376376
.kvconfig_manager
377377
.get_kv_list_by_namespace(&request_header.namespace);
@@ -396,10 +396,10 @@ impl DefaultRequestProcessor {
396396

397397
let request_header = request
398398
.decode_command_custom_header::<GetTopicsByClusterRequestHeader>()
399-
.unwrap();
399+
.expect("decode GetTopicsByClusterRequestHeader failed");
400400
let topics_by_cluster = self
401401
.route_info_manager
402-
.get_topics_by_cluster(request_header.cluster.as_str());
402+
.get_topics_by_cluster(&request_header.cluster);
403403
RemotingCommand::create_response_command().set_body(topics_by_cluster.encode())
404404
}
405405

rocketmq-namesrv/src/route/route_info_manager.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -762,15 +762,21 @@ impl RouteInfoManager {
762762
drop(lock)
763763
}
764764

765-
pub(crate) fn register_topic(&mut self, topic: CheetahString, queue_data_vec: Vec<QueueData>) {
765+
pub(crate) fn register_topic(&self, topic: CheetahString, queue_data_vec: Vec<QueueData>) {
766766
if queue_data_vec.is_empty() {
767767
return;
768768
}
769-
769+
let lock = self.lock.write();
770770
if !self.topic_queue_table.contains_key(&topic) {
771-
self.topic_queue_table.insert(topic.clone(), HashMap::new());
771+
self.topic_queue_table
772+
.mut_from_ref()
773+
.insert(topic.clone(), HashMap::new());
772774
}
773-
let queue_data_map = self.topic_queue_table.get_mut(&topic).unwrap();
775+
let queue_data_map = self
776+
.topic_queue_table
777+
.mut_from_ref()
778+
.get_mut(&topic)
779+
.unwrap();
774780
let vec_length = queue_data_vec.len();
775781
for queue_data in queue_data_vec {
776782
if !self
@@ -785,6 +791,7 @@ impl RouteInfoManager {
785791
}
786792
queue_data_map.insert(queue_data.broker_name().clone(), queue_data);
787793
}
794+
drop(lock);
788795

789796
if queue_data_map.len() > vec_length {
790797
info!("Topic route already exist.{}, {:?}", topic, queue_data_map)
@@ -793,8 +800,9 @@ impl RouteInfoManager {
793800
}
794801
}
795802

796-
pub(crate) fn get_topics_by_cluster(&self, cluster: &str) -> TopicList {
803+
pub(crate) fn get_topics_by_cluster(&self, cluster: &CheetahString) -> TopicList {
797804
let mut topic_list = Vec::new();
805+
let lock = self.lock.read();
798806
if let Some(broker_name_set) = self.cluster_addr_table.get(cluster) {
799807
for broker_name in broker_name_set {
800808
for (topic, queue_data_map) in self.topic_queue_table.iter() {
@@ -804,6 +812,7 @@ impl RouteInfoManager {
804812
}
805813
}
806814
}
815+
drop(lock);
807816
TopicList {
808817
topic_list,
809818
broker_addr: None,
@@ -821,10 +830,6 @@ impl RouteInfoManager {
821830
}
822831
if !self.broker_addr_table.is_empty() {
823832
for broker_addr in self.broker_addr_table.values() {
824-
/*for ip in broker_addr.broker_addrs().values() {
825-
broker_addr_out = Some(ip.clone());
826-
break;
827-
}*/
828833
let broker_addrs = broker_addr.broker_addrs();
829834
if !broker_addrs.is_empty() {
830835
broker_addr_out = Some(broker_addrs.values().next().unwrap().clone());

0 commit comments

Comments
 (0)