Skip to content

Commit 9def069

Browse files
authored
[ISSUE #2384]🚀Implement PopMessageProcessor shutdown🤡 (#2385)
1 parent 84cfeb5 commit 9def069

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

rocketmq-broker/src/long_polling/long_polling_service/pop_long_polling_service.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use rocketmq_rust::ArcMut;
3434
use rocketmq_store::consume_queue::consume_queue_ext::CqExtUnit;
3535
use rocketmq_store::filter::MessageFilter;
3636
use rocketmq_store::log_file::MessageStore;
37+
use tokio::select;
3738
use tokio::sync::Notify;
3839
use tracing::error;
3940
use tracing::warn;
@@ -74,7 +75,11 @@ impl<MS: MessageStore, RP: RequestProcessor + Sync + 'static> PopLongPollingServ
7475
pub fn start(this: ArcMut<Self>) {
7576
tokio::spawn(async move {
7677
loop {
77-
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
78+
select! {
79+
_ = this.notify.notified() => {break;}
80+
_ = tokio::time::sleep(tokio::time::Duration::from_millis(20)) => {}
81+
}
82+
7883
if this.polling_map.is_empty() {
7984
continue;
8085
}
@@ -108,6 +113,10 @@ impl<MS: MessageStore, RP: RequestProcessor + Sync + 'static> PopLongPollingServ
108113
});
109114
}
110115

116+
pub fn shutdown(&mut self) {
117+
self.notify.notify_waiters();
118+
}
119+
111120
fn clean_unused_resource(&self) {
112121
warn!("clean_unused_resource start");
113122
}

rocketmq-broker/src/processor/pop_message_processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1322,7 +1322,7 @@ where
13221322
}
13231323

13241324
pub fn shutdown(&mut self) {
1325-
warn!("PopMessageProcessor shutdown unimplemented, need to implement");
1325+
self.pop_long_polling_service.shutdown();
13261326
}
13271327
}
13281328

0 commit comments

Comments
 (0)