diff --git a/rocketmq-client/src/factory/mq_client_instance.rs b/rocketmq-client/src/factory/mq_client_instance.rs index 2f9fe47a..4944ac43 100644 --- a/rocketmq-client/src/factory/mq_client_instance.rs +++ b/rocketmq-client/src/factory/mq_client_instance.rs @@ -1151,6 +1151,25 @@ impl MQClientInstance { false } + /// Queries the assignment for a given topic. + /// + /// This function attempts to find the broker address for the specified topic. If the broker + /// address is not found, it updates the topic route information from the name server and + /// retries. If the broker address is found, it queries the assignment from the broker. + /// + /// # Arguments + /// + /// * `topic` - A reference to a `CheetahString` representing the topic to query. + /// * `consumer_group` - A reference to a `CheetahString` representing the consumer group. + /// * `strategy_name` - A reference to a `CheetahString` representing the allocation strategy + /// name. + /// * `message_model` - The message model to use for the query. + /// * `timeout` - The timeout duration for the query. + /// + /// # Returns + /// + /// A `Result` containing an `Option` with a `HashSet` of `MessageQueueAssignment` if the query + /// is successful, or an error if it fails. pub async fn query_assignment( &mut self, topic: &CheetahString, @@ -1159,7 +1178,10 @@ impl MQClientInstance { message_model: MessageModel, timeout: u64, ) -> Result>> { + // Try to find broker address let mut broker_addr = self.find_broker_addr_by_topic(topic).await; + + // If not found, update and retry if broker_addr.is_none() { self.update_topic_route_info_from_name_server_topic(topic) .await; @@ -1167,19 +1189,22 @@ impl MQClientInstance { } if let Some(broker_addr) = broker_addr { let client_id = self.client_id.clone(); - self.mq_client_api_impl - .as_mut() - .unwrap() - .query_assignment( - &broker_addr, - topic.clone(), - consumer_group.clone(), - strategy_name.clone(), - client_id, - message_model, - timeout, - ) - .await + match self.mq_client_api_impl.as_mut() { + Some(api_impl) => { + api_impl + .query_assignment( + &broker_addr, + topic.clone(), + consumer_group.clone(), + strategy_name.clone(), + client_id, + message_model, + timeout, + ) + .await + } + None => mq_client_err!("mq_client_api_impl is None"), + } } else { Ok(None) }