Skip to content

Commit a720b7f

Browse files
authored
[ISSUE #1503]🍻Refactor PullMessageResponseHeader♻️ (#1506)
1 parent 7c97393 commit a720b7f

File tree

4 files changed

+313
-131
lines changed

4 files changed

+313
-131
lines changed

rocketmq-broker/src/processor/default_pull_message_result_handler.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -270,14 +270,13 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
270270
request_header.consumer_group,
271271
offset_moved_event.offset_request,
272272
offset_moved_event.offset_new,
273-
response_header.suggest_which_broker_id.unwrap()
273+
response_header.suggest_which_broker_id
274274
);
275275
} else {
276276
let response_header = response
277277
.read_custom_header_mut::<PullMessageResponseHeader>()
278278
.unwrap();
279-
response_header.suggest_which_broker_id =
280-
Some(subscription_group_config.broker_id());
279+
response_header.suggest_which_broker_id = subscription_group_config.broker_id();
281280
response.set_code_ref(ResponseCode::PullRetryImmediately);
282281
warn!(
283282
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, \
@@ -421,9 +420,9 @@ impl DefaultPullMessageResultHandler {
421420
) {
422421
let mut response_header = PullMessageResponseHeader::default();
423422
response.set_remark_mut(format!("{:?}", get_message_result.status()));
424-
response_header.next_begin_offset = Some(get_message_result.next_begin_offset());
425-
response_header.min_offset = Some(get_message_result.min_offset());
426-
response_header.max_offset = Some(get_message_result.max_offset());
423+
response_header.next_begin_offset = get_message_result.next_begin_offset();
424+
response_header.min_offset = get_message_result.min_offset();
425+
response_header.max_offset = get_message_result.max_offset();
427426
response_header.topic_sys_flag = Some(topic_sys_flag);
428427
response_header.group_sys_flag = Some(subscription_group_config.group_sys_flag());
429428

@@ -488,13 +487,12 @@ impl DefaultPullMessageResultHandler {
488487
if broker_config.slave_read_enable && !broker_config.is_in_broker_container {
489488
if get_message_result.suggest_pulling_from_slave() {
490489
response_header.suggest_which_broker_id =
491-
Some(subscription_group_config.which_broker_when_consume_slowly());
490+
subscription_group_config.which_broker_when_consume_slowly();
492491
} else {
493-
response_header.suggest_which_broker_id =
494-
Some(subscription_group_config.broker_id());
492+
response_header.suggest_which_broker_id = subscription_group_config.broker_id();
495493
}
496494
} else {
497-
response_header.suggest_which_broker_id = Some(MASTER_ID);
495+
response_header.suggest_which_broker_id = MASTER_ID;
498496
}
499497

500498
if broker_config.broker_identity.broker_id != MASTER_ID
@@ -506,11 +504,11 @@ impl DefaultPullMessageResultHandler {
506504
request_header.topic,
507505
request_header.queue_id,
508506
request_header.consumer_group,
509-
response_header.next_begin_offset.unwrap(),
510-
response_header.min_offset.unwrap(),
511-
response_header.max_offset.unwrap()
507+
response_header.next_begin_offset,
508+
response_header.min_offset,
509+
response_header.max_offset
512510
);
513-
response_header.suggest_which_broker_id = Some(MASTER_ID);
511+
response_header.suggest_which_broker_id = MASTER_ID;
514512
if get_message_result.status() != Some(GetMessageStatus::Found) {
515513
response.set_code_ref(ResponseCode::PullRetryImmediately);
516514
}

rocketmq-broker/src/processor/pull_message_processor.rs

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -246,14 +246,14 @@ pub fn rewrite_response_for_static_topic(
246246
if code != ResponseCode::Success {
247247
let mut is_revised = false;
248248
if leader_item.gen == current_item.gen {
249-
if request_offset > max_offset.unwrap() {
249+
if request_offset > max_offset {
250250
if code == ResponseCode::PullOffsetMoved {
251251
response_code = ResponseCode::PullOffsetMoved;
252252
next_begin_offset = max_offset;
253253
} else {
254254
response_code = code;
255255
}
256-
} else if request_offset < min_offset.unwrap() {
256+
} else if request_offset < min_offset {
257257
next_begin_offset = min_offset;
258258
response_code = ResponseCode::PullRetryImmediately;
259259
} else {
@@ -262,7 +262,7 @@ pub fn rewrite_response_for_static_topic(
262262
}
263263

264264
if earlist_item.gen == current_item.gen {
265-
if request_offset < min_offset.unwrap() {
265+
if request_offset < min_offset {
266266
/*if code == ResponseCode::PullOffsetMoved {
267267
response_code = ResponseCode::PullOffsetMoved;
268268
next_begin_offset = min_offset;
@@ -272,13 +272,13 @@ pub fn rewrite_response_for_static_topic(
272272
}*/
273273
response_code = ResponseCode::PullOffsetMoved;
274274
next_begin_offset = min_offset;
275-
} else if request_offset >= max_offset.unwrap() {
275+
} else if request_offset >= max_offset {
276276
if let Some(next_item) =
277277
TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true)
278278
{
279279
is_revised = true;
280-
next_begin_offset = Some(next_item.start_offset);
281-
min_offset = Some(next_item.start_offset);
280+
next_begin_offset = next_item.start_offset;
281+
min_offset = next_item.start_offset;
282282
max_offset = min_offset;
283283
response_code = ResponseCode::PullRetryImmediately;
284284
} else {
@@ -293,15 +293,15 @@ pub fn rewrite_response_for_static_topic(
293293
&& leader_item.gen != current_item.gen
294294
&& earlist_item.gen != current_item.gen
295295
{
296-
if request_offset < min_offset? {
296+
if request_offset < min_offset {
297297
next_begin_offset = min_offset;
298298
response_code = ResponseCode::PullRetryImmediately;
299-
} else if request_offset >= max_offset? {
299+
} else if request_offset >= max_offset {
300300
if let Some(next_item) =
301301
TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true)
302302
{
303-
next_begin_offset = Some(next_item.start_offset);
304-
min_offset = Some(next_item.start_offset);
303+
next_begin_offset = next_item.start_offset;
304+
min_offset = next_item.start_offset;
305305
max_offset = min_offset;
306306
response_code = ResponseCode::PullRetryImmediately;
307307
} else {
@@ -313,26 +313,20 @@ pub fn rewrite_response_for_static_topic(
313313
}
314314
}
315315

316-
if current_item.check_if_end_offset_decided()
317-
&& next_begin_offset.unwrap() >= current_item.end_offset
318-
{
319-
next_begin_offset = Some(current_item.end_offset);
316+
if current_item.check_if_end_offset_decided() && next_begin_offset >= current_item.end_offset {
317+
next_begin_offset = current_item.end_offset;
320318
}
321319

322320
response_header.next_begin_offset =
323-
Some(current_item.compute_static_queue_offset_strictly(next_begin_offset.unwrap()));
324-
response_header.min_offset =
325-
Some(current_item.compute_static_queue_offset_strictly(
326-
min_offset.unwrap().max(current_item.start_offset),
321+
current_item.compute_static_queue_offset_strictly(next_begin_offset);
322+
response_header.min_offset = current_item
323+
.compute_static_queue_offset_strictly(min_offset.max(current_item.start_offset));
324+
response_header.max_offset = current_item
325+
.compute_static_queue_offset_strictly(max_offset)
326+
.max(TopicQueueMappingDetail::compute_max_offset_from_mapping(
327+
mapping_detail,
328+
mapping_context.global_id,
327329
));
328-
response_header.max_offset = Some(
329-
current_item
330-
.compute_static_queue_offset_strictly(max_offset.unwrap())
331-
.max(TopicQueueMappingDetail::compute_max_offset_from_mapping(
332-
mapping_detail,
333-
mapping_context.global_id,
334-
)),
335-
);
336330
response_header.offset_delta = Some(current_item.compute_offset_delta());
337331

338332
if code != ResponseCode::Success {

rocketmq-client/src/implementation/mq_client_api_impl.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -980,12 +980,12 @@ impl MQClientAPIImpl {
980980
let pull_result = PullResultExt {
981981
pull_result: PullResult {
982982
pull_status,
983-
next_begin_offset: response_header.next_begin_offset.unwrap_or(0) as u64,
984-
min_offset: response_header.min_offset.unwrap_or(0) as u64,
985-
max_offset: response_header.max_offset.unwrap_or(0) as u64,
983+
next_begin_offset: response_header.next_begin_offset as u64,
984+
min_offset: response_header.min_offset as u64,
985+
max_offset: response_header.max_offset as u64,
986986
msg_found_list: vec![],
987987
},
988-
suggest_which_broker_id: response_header.suggest_which_broker_id.unwrap_or(0),
988+
suggest_which_broker_id: response_header.suggest_which_broker_id,
989989
message_binary: response.take_body(),
990990
offset_delta: response_header.offset_delta,
991991
};

0 commit comments

Comments
 (0)