Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1775]🤡Simply MQClientInstance#query_assignment method code and add doc for method🔥 #1776

Merged
merged 1 commit into from
Dec 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,25 @@
false
}

/// Queries the assignment for a given topic.
///
/// This function attempts to find the broker address for the specified topic. If the broker
/// address is not found, it updates the topic route information from the name server and
/// retries. If the broker address is found, it queries the assignment from the broker.
///
/// # Arguments
///
/// * `topic` - A reference to a `CheetahString` representing the topic to query.
/// * `consumer_group` - A reference to a `CheetahString` representing the consumer group.
/// * `strategy_name` - A reference to a `CheetahString` representing the allocation strategy
/// name.
/// * `message_model` - The message model to use for the query.
/// * `timeout` - The timeout duration for the query.
///
/// # Returns
///
/// A `Result` containing an `Option` with a `HashSet` of `MessageQueueAssignment` if the query
/// is successful, or an error if it fails.
pub async fn query_assignment(
&mut self,
topic: &CheetahString,
Expand All @@ -1159,27 +1178,33 @@
message_model: MessageModel,
timeout: u64,
) -> Result<Option<HashSet<MessageQueueAssignment>>> {
// Try to find broker address
let mut broker_addr = self.find_broker_addr_by_topic(topic).await;

// If not found, update and retry
if broker_addr.is_none() {
self.update_topic_route_info_from_name_server_topic(topic)
.await;
broker_addr = self.find_broker_addr_by_topic(topic).await;
}
if let Some(broker_addr) = broker_addr {
let client_id = self.client_id.clone();
self.mq_client_api_impl
.as_mut()
.unwrap()
.query_assignment(
&broker_addr,
topic.clone(),
consumer_group.clone(),
strategy_name.clone(),
client_id,
message_model,
timeout,
)
.await
match self.mq_client_api_impl.as_mut() {
Some(api_impl) => {
api_impl
.query_assignment(
&broker_addr,
topic.clone(),
consumer_group.clone(),
strategy_name.clone(),
client_id,
message_model,
timeout,
)
.await

Check warning on line 1204 in rocketmq-client/src/factory/mq_client_instance.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/factory/mq_client_instance.rs#L1192-L1204

Added lines #L1192 - L1204 were not covered by tests
}
None => mq_client_err!("mq_client_api_impl is None"),

Check warning on line 1206 in rocketmq-client/src/factory/mq_client_instance.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/factory/mq_client_instance.rs#L1206

Added line #L1206 was not covered by tests
}
} else {
Ok(None)
}
Expand Down
Loading