-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1302]🔥Rocketmq-rust client supports request code QUERY_ASSIGNMENT(400) #1415
Conversation
WalkthroughThe changes in this pull request primarily involve the modification of the Changes
Assessment against linked issues
Possibly related issues
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (4)
rocketmq-client/src/implementation/mq_client_api_impl.rs (1)
1253-1300
: Implementation looks good with minor suggestions.The implementation is well-structured and follows the established patterns. Consider these improvements:
- Add error logging when response body decoding fails
- Consider adding debug assertions for parameter validation
pub async fn query_assignment( &mut self, addr: &CheetahString, topic: CheetahString, consumer_group: CheetahString, client_id: CheetahString, strategy_name: CheetahString, message_model: MessageModel, timeout: u64, ) -> Result<Option<HashSet<MessageQueueAssignment>>> { + debug_assert!(!topic.is_empty(), "topic should not be empty"); + debug_assert!(!consumer_group.is_empty(), "consumer_group should not be empty"); + debug_assert!(!client_id.is_empty(), "client_id should not be empty"); let request_body = QueryAssignmentRequestBody { topic, consumer_group, client_id, strategy_name, message_model, }; let request = RemotingCommand::new_request(RequestCode::QueryAssignment, request_body.encode()); let response = self .remoting_client .invoke_async( Some(&mix_all::broker_vip_channel( self.client_config.vip_channel_enabled, addr, )), request, timeout, ) .await?; if ResponseCode::from(response.code()) == ResponseCode::Success { let body = response.body(); if let Some(body) = body { let assignment = QueryAssignmentResponseBody::decode(body.as_ref()); if let Ok(assignment) = assignment { return Ok(Some(assignment.message_queue_assignments)); } + error!("Failed to decode QueryAssignmentResponseBody: {:?}", assignment.err()); } return Ok(None); } Err(MQBrokerError( response.code(), response.remark().map_or("".to_string(), |s| s.to_string()), addr.to_string(), )) }rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (3)
48-48
: Confirm necessity of changingQUERY_ASSIGNMENT_TIMEOUT
tou64
The constant
QUERY_ASSIGNMENT_TIMEOUT
has been updated fromu32
tou64
. Ensure that this change aligns with the expected data types for timeout values elsewhere in the codebase to prevent potential type mismatches.
145-147
: Simplify loop by using the loop variable directlyThe
retry_times
variable is manually incremented within the loop. This can be simplified by using the loop variable directly, improving readability.Refactor the loop as follows:
-let mut retry_times = 0; -for _ in 0..TIMEOUT_CHECK_TIMES { - retry_times += 1; +for retry_times in 1..=TIMEOUT_CHECK_TIMES {This change eliminates the need for a separate
retry_times
variable and makes the loop bounds clearer.
167-168
: HandleRequestTimeoutError
explicitly in match armThe match arm for
MQClientError::RequestTimeoutError
is currently empty, which might lead to confusion about how timeouts are handled.Consider adding a comment or logging to clarify that timeouts are being silently retried:
Err(e) => match e { MQClientError::RequestTimeoutError(_, _) => { + // Timeout occurred, will retry } _ => {
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (3)
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
(2 hunks)rocketmq-client/src/factory/mq_client_instance.rs
(2 hunks)rocketmq-client/src/implementation/mq_client_api_impl.rs
(4 hunks)
🔇 Additional comments (4)
rocketmq-client/src/implementation/mq_client_api_impl.rs (2)
29-29
: LGTM! Imports are well organized.
The new imports for message queue assignment types are correctly placed and necessary for the new functionality.
Also applies to: 45-46
1253-1300
: Verify integration with RebalanceImpl.
The new query_assignment method is likely called from RebalanceImpl for message queue assignment. Let's verify the integration.
✅ Verification successful
Integration with RebalanceImpl is properly implemented
The verification shows that query_assignment
is correctly integrated within the RebalanceImpl
:
- It's called from
try_query_assignment
method inrocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
- The integration follows the expected flow where
try_query_assignment
is used as part of the rebalancing logic:if !self.client_rebalance(topic) && self.try_query_assignment(topic).await
- All required parameters are properly passed from RebalanceImpl to the query_assignment method
- The implementation includes proper error handling and timeout (QUERY_ASSIGNMENT_TIMEOUT)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the integration of query_assignment with RebalanceImpl
# Search for usage of query_assignment in RebalanceImpl
ast-grep --pattern $'impl RebalanceImpl {
$$$
try_query_assignment($$$) {
$$$
query_assignment($$$)
$$$
}
$$$
}'
# Search for any other usages of query_assignment
rg -A 5 'query_assignment'
Length of output: 11266
rocketmq-client/src/factory/mq_client_instance.rs (2)
31-31
: LGTM: Import changes are appropriate.
The new imports for MessageQueueAssignment
and MessageModel
are correctly added to support the query assignment functionality.
Also applies to: 37-37
1154-1186
: Implementation looks solid, verify integration with rebalance logic.
The query_assignment implementation is well-structured and follows proper error handling patterns. The method correctly:
- Attempts to find broker address
- Updates topic route info if needed
- Makes the API call with all required parameters
- Returns appropriate Result type
Let's verify the integration with rebalance logic:
✅ Verification successful
The search results show that query_assignment
is properly integrated with the rebalance logic in rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
. Let's verify the complete rebalance flow with another search to ensure the integration is correct:
Integration with rebalance logic is properly implemented
The verification shows that query_assignment
is correctly integrated into the rebalance flow:
- It's called within
try_query_assignment
method as part of the rebalance process - The rebalance logic follows a clear decision tree:
- First checks for client-side rebalance
- If client rebalance is not used, attempts query assignment
- Falls back to topic-based rebalance if needed
- Proper error handling and state management is implemented through the use of locks and checks
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if query_assignment is properly integrated with rebalance implementation
# Expect: Usage of query_assignment in rebalance-related code
# Search for rebalance implementations that might use query_assignment
ast-grep --pattern 'try_rebalance($$$) {
$$$
query_assignment($$$)
$$$
}'
# Search for any direct calls to query_assignment
rg -A 5 'query_assignment\('
Length of output: 5053
Script:
#!/bin/bash
# Check the rebalance implementation that uses query_assignment
rg -B 5 -A 10 'try_query_assignment' rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
# Check the complete rebalance flow
ast-grep --pattern 'impl RebalanceImpl {
$$$
async fn try_rebalance($$$) {
$$$
}
$$$
}'
Length of output: 1586
async fn try_query_assignment(&mut self, topic: &CheetahString) -> bool { | ||
let topic_client_rebalance = self.topic_client_rebalance.read().await; | ||
if topic_client_rebalance.contains_key(topic) { | ||
return false; | ||
} | ||
drop(topic_client_rebalance); | ||
let topic_broker_rebalance = self.topic_broker_rebalance.read().await; | ||
if topic_broker_rebalance.contains_key(topic) { | ||
return true; | ||
} | ||
drop(topic_broker_rebalance); | ||
let strategy_name = if let Some(strategy) = &self.allocate_message_queue_strategy { | ||
CheetahString::from_static_str(strategy.get_name()) | ||
} else { | ||
CheetahString::from_static_str("unknown") | ||
}; | ||
let mut retry_times = 0; | ||
for _ in 0..TIMEOUT_CHECK_TIMES { | ||
retry_times += 1; | ||
match self | ||
.client_instance | ||
.as_mut() | ||
.unwrap() | ||
.query_assignment( | ||
topic, | ||
self.consumer_group.as_ref().unwrap(), | ||
&strategy_name, | ||
self.message_model.unwrap(), | ||
QUERY_ASSIGNMENT_TIMEOUT, | ||
) | ||
.await | ||
{ | ||
Ok(_) => { | ||
let mut topic_broker_rebalance = self.topic_broker_rebalance.write().await; | ||
topic_broker_rebalance.insert(topic.clone(), topic.clone()); | ||
return true; | ||
} | ||
Err(e) => match e { | ||
MQClientError::RequestTimeoutError(_, _) => {} | ||
_ => { | ||
error!("tryQueryAssignment error {}.", e); | ||
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | ||
topic_client_rebalance.insert(topic.clone(), topic.clone()); | ||
return false; | ||
} | ||
}, | ||
} | ||
} | ||
|
||
if retry_times >= TIMEOUT_CHECK_TIMES { | ||
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | ||
topic_client_rebalance.insert(topic.clone(), topic.clone()); | ||
return false; | ||
} | ||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unreachable code and logical inconsistency in try_query_assignment
The try_query_assignment
function may contain unreachable code due to the unconditional return true
at the end. Given that the loop runs from 0..TIMEOUT_CHECK_TIMES
, and retry_times
is incremented on each iteration, retry_times
will always be equal to TIMEOUT_CHECK_TIMES
after the loop, causing the function to return false
just before the final return true
.
Consider refactoring to ensure the function returns the correct value:
if retry_times >= TIMEOUT_CHECK_TIMES {
let mut topic_client_rebalance = self.topic_client_rebalance.write().await;
topic_client_rebalance.insert(topic.clone(), topic.clone());
return false;
}
- return true;
+ // Since the function will have already returned true on success, we can return false here
+ return false;
Alternatively, adjust the loop to correctly reflect the retry logic and ensure the final return statement is appropriate.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn try_query_assignment(&mut self, topic: &CheetahString) -> bool { | |
let topic_client_rebalance = self.topic_client_rebalance.read().await; | |
if topic_client_rebalance.contains_key(topic) { | |
return false; | |
} | |
drop(topic_client_rebalance); | |
let topic_broker_rebalance = self.topic_broker_rebalance.read().await; | |
if topic_broker_rebalance.contains_key(topic) { | |
return true; | |
} | |
drop(topic_broker_rebalance); | |
let strategy_name = if let Some(strategy) = &self.allocate_message_queue_strategy { | |
CheetahString::from_static_str(strategy.get_name()) | |
} else { | |
CheetahString::from_static_str("unknown") | |
}; | |
let mut retry_times = 0; | |
for _ in 0..TIMEOUT_CHECK_TIMES { | |
retry_times += 1; | |
match self | |
.client_instance | |
.as_mut() | |
.unwrap() | |
.query_assignment( | |
topic, | |
self.consumer_group.as_ref().unwrap(), | |
&strategy_name, | |
self.message_model.unwrap(), | |
QUERY_ASSIGNMENT_TIMEOUT, | |
) | |
.await | |
{ | |
Ok(_) => { | |
let mut topic_broker_rebalance = self.topic_broker_rebalance.write().await; | |
topic_broker_rebalance.insert(topic.clone(), topic.clone()); | |
return true; | |
} | |
Err(e) => match e { | |
MQClientError::RequestTimeoutError(_, _) => {} | |
_ => { | |
error!("tryQueryAssignment error {}.", e); | |
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | |
topic_client_rebalance.insert(topic.clone(), topic.clone()); | |
return false; | |
} | |
}, | |
} | |
} | |
if retry_times >= TIMEOUT_CHECK_TIMES { | |
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | |
topic_client_rebalance.insert(topic.clone(), topic.clone()); | |
return false; | |
} | |
true | |
async fn try_query_assignment(&mut self, topic: &CheetahString) -> bool { | |
let topic_client_rebalance = self.topic_client_rebalance.read().await; | |
if topic_client_rebalance.contains_key(topic) { | |
return false; | |
} | |
drop(topic_client_rebalance); | |
let topic_broker_rebalance = self.topic_broker_rebalance.read().await; | |
if topic_broker_rebalance.contains_key(topic) { | |
return true; | |
} | |
drop(topic_broker_rebalance); | |
let strategy_name = if let Some(strategy) = &self.allocate_message_queue_strategy { | |
CheetahString::from_static_str(strategy.get_name()) | |
} else { | |
CheetahString::from_static_str("unknown") | |
}; | |
let mut retry_times = 0; | |
for _ in 0..TIMEOUT_CHECK_TIMES { | |
retry_times += 1; | |
match self | |
.client_instance | |
.as_mut() | |
.unwrap() | |
.query_assignment( | |
topic, | |
self.consumer_group.as_ref().unwrap(), | |
&strategy_name, | |
self.message_model.unwrap(), | |
QUERY_ASSIGNMENT_TIMEOUT, | |
) | |
.await | |
{ | |
Ok(_) => { | |
let mut topic_broker_rebalance = self.topic_broker_rebalance.write().await; | |
topic_broker_rebalance.insert(topic.clone(), topic.clone()); | |
return true; | |
} | |
Err(e) => match e { | |
MQClientError::RequestTimeoutError(_, _) => {} | |
_ => { | |
error!("tryQueryAssignment error {}.", e); | |
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | |
topic_client_rebalance.insert(topic.clone(), topic.clone()); | |
return false; | |
} | |
}, | |
} | |
} | |
if retry_times >= TIMEOUT_CHECK_TIMES { | |
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | |
topic_client_rebalance.insert(topic.clone(), topic.clone()); | |
return false; | |
} | |
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
rocketmq-client/src/implementation/mq_client_api_impl.rs (1)
1286-1291
: Consider enhancing error handling and variable naming.A few suggestions to improve the code:
- Add error logging when body decoding fails
- Use more descriptive variable names (e.g.,
response_body
instead ofbody
)let body = response.body(); if let Some(body) = body { - let assignment = QueryAssignmentResponseBody::decode(body.as_ref()); - if let Ok(assignment) = assignment { + let response_body = QueryAssignmentResponseBody::decode(body.as_ref()); + if let Ok(response_body) = response_body { + tracing::debug!("Successfully decoded QueryAssignmentResponseBody"); return Ok(Some(assignment.message_queue_assignments)); + } else { + tracing::error!("Failed to decode QueryAssignmentResponseBody"); } }rocketmq-client/src/factory/mq_client_instance.rs (1)
1162-1168
: Add error logging for broker address lookup failures.Consider adding error logging when broker address lookup fails to help with debugging and monitoring.
let mut broker_addr = self.find_broker_addr_by_topic(topic).await; if broker_addr.is_none() { + warn!("Failed to find broker address for topic {}, attempting to update topic route info", topic); self.update_topic_route_info_from_name_server_topic(topic) .await; broker_addr = self.find_broker_addr_by_topic(topic).await; + if broker_addr.is_none() { + warn!("Still unable to find broker address for topic {} after update", topic); + } }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (3)
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
(2 hunks)rocketmq-client/src/factory/mq_client_instance.rs
(2 hunks)rocketmq-client/src/implementation/mq_client_api_impl.rs
(4 hunks)
🔇 Additional comments (6)
rocketmq-client/src/implementation/mq_client_api_impl.rs (2)
29-29
: LGTM! Imports are well-organized.
The new imports for message queue assignment types are correctly placed and necessary for implementing the QUERY_ASSIGNMENT functionality.
Also applies to: 45-46
1253-1300
: LGTM! Well-structured implementation of QUERY_ASSIGNMENT.
The implementation follows RocketMQ client patterns with proper async/await usage, error handling, and protocol adherence.
rocketmq-client/src/factory/mq_client_instance.rs (2)
31-31
: LGTM: Required imports added for query_assignment functionality.
The new imports for MessageQueueAssignment
and MessageModel
are correctly placed and necessary for the new functionality.
Also applies to: 37-37
1154-1186
: LGTM: Well-structured query_assignment implementation.
The implementation follows good practices:
- Proper error handling using Result
- Efficient broker address lookup with retry
- Clean parameter passing and ownership management
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (2)
44-44
: Importing MQClientError
Enhances Error Handling
The addition of use crate::error::MQClientError;
is appropriate as it facilitates more precise error handling in the try_query_assignment
method.
48-48
: Type Update for QUERY_ASSIGNMENT_TIMEOUT
Ensures Compatibility
Changing QUERY_ASSIGNMENT_TIMEOUT
from u32
to u64
aligns its type with the expected parameter type in functions that use this constant, preventing potential type mismatch errors.
async fn try_query_assignment(&mut self, topic: &CheetahString) -> bool { | ||
let topic_client_rebalance = self.topic_client_rebalance.read().await; | ||
if topic_client_rebalance.contains_key(topic) { | ||
return false; | ||
} | ||
drop(topic_client_rebalance); | ||
let topic_broker_rebalance = self.topic_broker_rebalance.read().await; | ||
if topic_broker_rebalance.contains_key(topic) { | ||
return true; | ||
} | ||
drop(topic_broker_rebalance); | ||
let strategy_name = if let Some(strategy) = &self.allocate_message_queue_strategy { | ||
CheetahString::from_static_str(strategy.get_name()) | ||
} else { | ||
CheetahString::from_static_str("unknown") | ||
}; | ||
let mut retry_times = 0; | ||
for _ in 0..TIMEOUT_CHECK_TIMES { | ||
retry_times += 1; | ||
match self | ||
.client_instance | ||
.as_mut() | ||
.unwrap() | ||
.query_assignment( | ||
topic, | ||
self.consumer_group.as_ref().unwrap(), | ||
&strategy_name, | ||
self.message_model.unwrap(), | ||
QUERY_ASSIGNMENT_TIMEOUT, | ||
) | ||
.await | ||
{ | ||
Ok(_) => { | ||
let mut topic_broker_rebalance = self.topic_broker_rebalance.write().await; | ||
topic_broker_rebalance.insert(topic.clone(), topic.clone()); | ||
return true; | ||
} | ||
Err(e) => match e { | ||
MQClientError::RequestTimeoutError(_, _) => {} | ||
_ => { | ||
error!("tryQueryAssignment error {}.", e); | ||
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | ||
topic_client_rebalance.insert(topic.clone(), topic.clone()); | ||
return false; | ||
} | ||
}, | ||
} | ||
} | ||
|
||
if retry_times >= TIMEOUT_CHECK_TIMES { | ||
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | ||
topic_client_rebalance.insert(topic.clone(), topic.clone()); | ||
return false; | ||
} | ||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logical Error in Retry Mechanism of try_query_assignment
Method
The condition if retry_times >= TIMEOUT_CHECK_TIMES
at line 178 will always be true
after the loop concludes, rendering the return true
statement at line 183 unreachable. As a result, the function may incorrectly return false
even when the query assignment could succeed.
To correct this, adjust the loop and increment logic to accurately reflect the number of retries and ensure the function returns true
when appropriate. Here's a suggested fix:
145 let mut retry_times = 0;
146 loop {
147 match self
148 .client_instance
149 .as_mut()
150 .unwrap()
151 .query_assignment(
152 topic,
153 self.consumer_group.as_ref().unwrap(),
154 &strategy_name,
155 self.message_model.unwrap(),
156 QUERY_ASSIGNMENT_TIMEOUT,
157 )
158 .await
159 {
160 Ok(_) => {
161 let mut topic_broker_rebalance = self.topic_broker_rebalance.write().await;
162 topic_broker_rebalance.insert(topic.clone(), topic.clone());
163 return true;
164 }
165 Err(e) => match e {
166 MQClientError::RequestTimeoutError(_, _) => {
+ retry_times += 1;
+ if retry_times >= TIMEOUT_CHECK_TIMES {
+ let mut topic_client_rebalance = self.topic_client_rebalance.write().await;
+ topic_client_rebalance.insert(topic.clone(), topic.clone());
+ return false;
+ }
+ continue;
}
167 _ => {
168 error!("tryQueryAssignment error {}.", e);
169 let mut topic_client_rebalance = self.topic_client_rebalance.write().await;
170 topic_client_rebalance.insert(topic.clone(), topic.clone());
171 return false;
172 }
173 },
174 }
175 }
-178 if retry_times >= TIMEOUT_CHECK_TIMES {
-179 let mut topic_client_rebalance = self.topic_client_rebalance.write().await;
-180 topic_client_rebalance.insert(topic.clone(), topic.clone());
-181 return false;
-182 }
-183 true
This revision moves the retry_times
increment inside the timeout error handling and adds a continue
statement to retry the operation. It ensures that the function only returns false
after exhausting all retries due to timeouts, and the return true
statement becomes reachable when the operation succeeds.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn try_query_assignment(&mut self, topic: &CheetahString) -> bool { | |
let topic_client_rebalance = self.topic_client_rebalance.read().await; | |
if topic_client_rebalance.contains_key(topic) { | |
return false; | |
} | |
drop(topic_client_rebalance); | |
let topic_broker_rebalance = self.topic_broker_rebalance.read().await; | |
if topic_broker_rebalance.contains_key(topic) { | |
return true; | |
} | |
drop(topic_broker_rebalance); | |
let strategy_name = if let Some(strategy) = &self.allocate_message_queue_strategy { | |
CheetahString::from_static_str(strategy.get_name()) | |
} else { | |
CheetahString::from_static_str("unknown") | |
}; | |
let mut retry_times = 0; | |
for _ in 0..TIMEOUT_CHECK_TIMES { | |
retry_times += 1; | |
match self | |
.client_instance | |
.as_mut() | |
.unwrap() | |
.query_assignment( | |
topic, | |
self.consumer_group.as_ref().unwrap(), | |
&strategy_name, | |
self.message_model.unwrap(), | |
QUERY_ASSIGNMENT_TIMEOUT, | |
) | |
.await | |
{ | |
Ok(_) => { | |
let mut topic_broker_rebalance = self.topic_broker_rebalance.write().await; | |
topic_broker_rebalance.insert(topic.clone(), topic.clone()); | |
return true; | |
} | |
Err(e) => match e { | |
MQClientError::RequestTimeoutError(_, _) => {} | |
_ => { | |
error!("tryQueryAssignment error {}.", e); | |
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | |
topic_client_rebalance.insert(topic.clone(), topic.clone()); | |
return false; | |
} | |
}, | |
} | |
} | |
if retry_times >= TIMEOUT_CHECK_TIMES { | |
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | |
topic_client_rebalance.insert(topic.clone(), topic.clone()); | |
return false; | |
} | |
true | |
async fn try_query_assignment(&mut self, topic: &CheetahString) -> bool { | |
let topic_client_rebalance = self.topic_client_rebalance.read().await; | |
if topic_client_rebalance.contains_key(topic) { | |
return false; | |
} | |
drop(topic_client_rebalance); | |
let topic_broker_rebalance = self.topic_broker_rebalance.read().await; | |
if topic_broker_rebalance.contains_key(topic) { | |
return true; | |
} | |
drop(topic_broker_rebalance); | |
let strategy_name = if let Some(strategy) = &self.allocate_message_queue_strategy { | |
CheetahString::from_static_str(strategy.get_name()) | |
} else { | |
CheetahString::from_static_str("unknown") | |
}; | |
let mut retry_times = 0; | |
loop { | |
match self | |
.client_instance | |
.as_mut() | |
.unwrap() | |
.query_assignment( | |
topic, | |
self.consumer_group.as_ref().unwrap(), | |
&strategy_name, | |
self.message_model.unwrap(), | |
QUERY_ASSIGNMENT_TIMEOUT, | |
) | |
.await | |
{ | |
Ok(_) => { | |
let mut topic_broker_rebalance = self.topic_broker_rebalance.write().await; | |
topic_broker_rebalance.insert(topic.clone(), topic.clone()); | |
return true; | |
} | |
Err(e) => match e { | |
MQClientError::RequestTimeoutError(_, _) => { | |
retry_times += 1; | |
if retry_times >= TIMEOUT_CHECK_TIMES { | |
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | |
topic_client_rebalance.insert(topic.clone(), topic.clone()); | |
return false; | |
} | |
continue; | |
} | |
_ => { | |
error!("tryQueryAssignment error {}.", e); | |
let mut topic_client_rebalance = self.topic_client_rebalance.write().await; | |
topic_client_rebalance.insert(topic.clone(), topic.clone()); | |
return false; | |
} | |
}, | |
} | |
} | |
} |
Which Issue(s) This PR Fixes(Closes)
Fixes #1302
Brief Description
How Did You Test This Change?
Summary by CodeRabbit