Skip to content

Commit 902314e

Browse files
authored
[ISSUE #2004]🍻Implement PopBufferMergeService-1 🚀 (#2022)
1 parent 5f8e290 commit 902314e

File tree

5 files changed

+69
-15
lines changed

5 files changed

+69
-15
lines changed

rocketmq-broker/src/broker_runtime.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ use crate::processor::default_pull_message_result_handler::DefaultPullMessageRes
7676
use crate::processor::end_transaction_processor::EndTransactionProcessor;
7777
use crate::processor::pop_inflight_message_counter::PopInflightMessageCounter;
7878
use crate::processor::pop_message_processor::PopMessageProcessor;
79-
use crate::processor::processor_service::pop_buffer_merge_service::PopBufferMergeService;
8079
use crate::processor::pull_message_processor::PullMessageProcessor;
8180
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
8281
use crate::processor::query_assignment_processor::QueryAssignmentProcessor;
@@ -597,7 +596,6 @@ impl BrokerRuntime {
597596
Arc::new(self.consumer_offset_manager.clone()),
598597
self.consumer_order_info_manager.clone(),
599598
self.broker_stats_manager.clone(),
600-
ArcMut::new(PopBufferMergeService),
601599
self.escape_bridge.clone(),
602600
pop_message_processor,
603601
)),

rocketmq-broker/src/processor/ack_message_processor.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,11 @@ use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager;
5757
use crate::offset::manager::consumer_order_info_manager::ConsumerOrderInfoManager;
5858
use crate::processor::pop_inflight_message_counter::PopInflightMessageCounter;
5959
use crate::processor::pop_message_processor::PopMessageProcessor;
60-
use crate::processor::processor_service::pop_buffer_merge_service::PopBufferMergeService;
6160
use crate::topic::manager::topic_config_manager::TopicConfigManager;
6261

6362
pub struct AckMessageProcessor<MS> {
6463
topic_config_manager: TopicConfigManager,
6564
message_store: ArcMut<MS>,
66-
pop_buffer_merge_service: ArcMut<PopBufferMergeService>,
6765
escape_bridge: ArcMut<EscapeBridge<MS>>,
6866
store_host: SocketAddr,
6967
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
@@ -90,8 +88,6 @@ where
9088
AckMessageProcessor {
9189
topic_config_manager,
9290
message_store,
93-
/* need to implement PopBufferMergeService */
94-
pop_buffer_merge_service: ArcMut::new(PopBufferMergeService),
9591
escape_bridge,
9692
store_host,
9793
pop_inflight_message_counter,
@@ -377,7 +373,8 @@ where
377373
ack_msg.set_pop_time(pop_time);
378374
ack_msg.set_broker_name(broker_name);
379375
if self
380-
.pop_buffer_merge_service
376+
.pop_message_processor
377+
.pop_buffer_merge_service_mut()
381378
.add_ack(r_qid, ack_msg.as_ref())
382379
{
383380
return;

rocketmq-broker/src/processor/change_invisible_time_processor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ use crate::failover::escape_bridge::EscapeBridge;
5151
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager;
5252
use crate::offset::manager::consumer_order_info_manager::ConsumerOrderInfoManager;
5353
use crate::processor::pop_message_processor::PopMessageProcessor;
54-
use crate::processor::processor_service::pop_buffer_merge_service::PopBufferMergeService;
5554
use crate::topic::manager::topic_config_manager::TopicConfigManager;
5655

5756
pub struct ChangeInvisibleTimeProcessor<MS> {
@@ -61,7 +60,6 @@ pub struct ChangeInvisibleTimeProcessor<MS> {
6160
consumer_offset_manager: Arc<ConsumerOffsetManager>,
6261
consumer_order_info_manager: Arc<ConsumerOrderInfoManager<MS>>,
6362
broker_stats_manager: Arc<BrokerStatsManager>,
64-
pop_buffer_merge_service: ArcMut<PopBufferMergeService>,
6563
escape_bridge: ArcMut<EscapeBridge<MS>>,
6664
revive_topic: CheetahString,
6765
store_host: SocketAddr,
@@ -76,7 +74,6 @@ impl<MS> ChangeInvisibleTimeProcessor<MS> {
7674
consumer_offset_manager: Arc<ConsumerOffsetManager>,
7775
consumer_order_info_manager: Arc<ConsumerOrderInfoManager<MS>>,
7876
broker_stats_manager: Arc<BrokerStatsManager>,
79-
pop_buffer_merge_service: ArcMut<PopBufferMergeService>,
8077
escape_bridge: ArcMut<EscapeBridge<MS>>,
8178
pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
8279
) -> Self {
@@ -93,7 +90,6 @@ impl<MS> ChangeInvisibleTimeProcessor<MS> {
9390
consumer_offset_manager,
9491
consumer_order_info_manager,
9592
broker_stats_manager,
96-
pop_buffer_merge_service,
9793
escape_bridge,
9894
revive_topic: CheetahString::from_string(revive_topic),
9995
store_host,
@@ -275,7 +271,11 @@ where
275271
request_header.topic.as_str(),
276272
1,
277273
);
278-
if self.pop_buffer_merge_service.add_ack(rq_id, &ack_msg) {
274+
if self
275+
.pop_message_processor
276+
.pop_buffer_merge_service_mut()
277+
.add_ack(rq_id, &ack_msg)
278+
{
279279
return Ok(());
280280
}
281281
let mut inner = MessageExtBrokerInner::default();

rocketmq-broker/src/processor/pop_message_processor.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ pub struct PopMessageProcessor<MS> {
9696
pop_buffer_merge_service: ArcMut<PopBufferMergeService>,
9797
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
9898
queue_lock_manager: QueueLockManager,
99+
revive_topic: CheetahString,
99100
}
100101

101102
impl<MS> PopMessageProcessor<MS> {
@@ -111,6 +112,10 @@ impl<MS> PopMessageProcessor<MS> {
111112
consumer_filter_manager: Arc<ConsumerFilterManager>,
112113
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
113114
) -> Self {
115+
let revive_topic = CheetahString::from_string(PopAckConstants::build_cluster_revive_topic(
116+
broker_config.broker_identity.broker_cluster_name.as_str(),
117+
));
118+
let queue_lock_manager = QueueLockManager::new();
114119
PopMessageProcessor {
115120
consumer_offset_manager,
116121
consumer_manager,
@@ -123,9 +128,13 @@ impl<MS> PopMessageProcessor<MS> {
123128
consumer_filter_manager,
124129
ck_message_number: Default::default(),
125130
pop_long_polling_service: ArcMut::new(PopLongPollingService),
126-
pop_buffer_merge_service: ArcMut::new(PopBufferMergeService),
131+
pop_buffer_merge_service: ArcMut::new(PopBufferMergeService::new(
132+
revive_topic.clone(),
133+
queue_lock_manager.clone(),
134+
)),
127135
pop_inflight_message_counter,
128-
queue_lock_manager: QueueLockManager::new(),
136+
queue_lock_manager,
137+
revive_topic,
129138
}
130139
}
131140
}
@@ -1160,6 +1169,14 @@ impl<MS> PopMessageProcessor<MS> {
11601169
PopAckConstants::CK_TAG
11611170
)
11621171
}
1172+
1173+
pub fn pop_buffer_merge_service(&self) -> &ArcMut<PopBufferMergeService> {
1174+
&self.pop_buffer_merge_service
1175+
}
1176+
1177+
pub fn pop_buffer_merge_service_mut(&mut self) -> &mut ArcMut<PopBufferMergeService> {
1178+
&mut self.pop_buffer_merge_service
1179+
}
11631180
}
11641181

11651182
struct TimedLock {
@@ -1202,6 +1219,7 @@ impl TimedLock {
12021219
}
12031220
}
12041221

1222+
#[derive(Clone)]
12051223
pub struct QueueLockManager {
12061224
expired_local_cache: Arc<Mutex<HashMap<CheetahString, TimedLock>>>,
12071225
}

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,53 @@ use std::sync::atomic::Ordering;
2121
use std::sync::Arc;
2222

2323
use cheetah_string::CheetahString;
24+
use dashmap::DashMap;
2425
use rocketmq_common::common::pop_ack_constants::PopAckConstants;
2526
use rocketmq_common::TimeUtils::get_current_millis;
2627
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
2728
use rocketmq_store::pop::AckMessage;
2829

29-
pub(crate) struct PopBufferMergeService;
30+
use crate::processor::pop_message_processor::QueueLockManager;
31+
32+
pub(crate) struct PopBufferMergeService {
33+
buffer: DashMap<CheetahString /* mergeKey */, PopCheckPointWrapper>,
34+
commit_offsets:
35+
DashMap<CheetahString /* topic@cid@queueId */, QueueWithTime<PopCheckPointWrapper>>,
36+
serving: AtomicBool,
37+
counter: AtomicI32,
38+
scan_times: i32,
39+
revive_topic: CheetahString,
40+
queue_lock_manager: QueueLockManager,
41+
interval: u64,
42+
minute5: u64,
43+
count_of_minute1: u64,
44+
count_of_second1: u64,
45+
count_of_second30: u64,
46+
batch_ack_index_list: Vec<u8>,
47+
master: AtomicBool,
48+
}
49+
50+
impl PopBufferMergeService {
51+
pub fn new(revive_topic: CheetahString, queue_lock_manager: QueueLockManager) -> Self {
52+
let interval = 5;
53+
Self {
54+
buffer: DashMap::with_capacity(1024 * 16),
55+
commit_offsets: DashMap::with_capacity(1024),
56+
serving: AtomicBool::new(true),
57+
counter: AtomicI32::new(0),
58+
scan_times: 0,
59+
revive_topic,
60+
queue_lock_manager,
61+
interval,
62+
minute5: 5 * 60 * 1000,
63+
count_of_minute1: 60 * 1000 / interval,
64+
count_of_second1: 1000 / interval,
65+
count_of_second30: 30 * 1000 / interval,
66+
batch_ack_index_list: Vec::with_capacity(32),
67+
master: AtomicBool::new(false),
68+
}
69+
}
70+
}
3071

3172
impl PopBufferMergeService {
3273
pub fn add_ack(&mut self, _revive_qid: i32, _ack_msg: &dyn AckMessage) -> bool {

0 commit comments

Comments
 (0)