Skip to content

Commit bb7e244

Browse files
authored
storcon: fix heartbeats timing out causing a panic (#10902)
Fix an issue caused by PR #10891: we introduced the concept of timeouts for heartbeats, where we would hang up on the other side of the oneshot channel if a timeout happened (future gets cancelled, receiver is dropped). This hang up would make the heartbeat task panic when it did obtain the response, as we unwrap the result of the result sending operation. The panic would lead to the heartbeat task panicing itself, which is then according to logs the last sign of life we of that process invocation. I'm not sure what brings down the process, in theory tokio [should continue](https://docs.rs/tokio/latest/tokio/runtime/enum.UnhandledPanic.html#variant.Ignore), but idk. Alternative to #10901.
1 parent 787b98f commit bb7e244

File tree

2 files changed

+17
-9
lines changed

2 files changed

+17
-9
lines changed

storage_controller/src/heartbeater.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,13 @@ where
140140
request = self.receiver.recv() => {
141141
match request {
142142
Some(req) => {
143+
if req.reply.is_closed() {
144+
// Prevent a possibly infinite buildup of the receiver channel, if requests arrive faster than we can handle them
145+
continue;
146+
}
143147
let res = self.heartbeat(req.servers).await;
144-
req.reply.send(res).unwrap();
148+
// Ignore the return value in order to not panic if the heartbeat function's future was cancelled
149+
_ = req.reply.send(res);
145150
},
146151
None => { return; }
147152
}

storage_controller/src/service.rs

+11-8
Original file line numberDiff line numberDiff line change
@@ -815,13 +815,12 @@ impl Service {
815815
};
816816

817817
tracing::info!("Sending initial heartbeats...");
818-
let res_ps = self
819-
.heartbeater_ps
820-
.heartbeat(Arc::new(nodes_to_heartbeat))
821-
.await;
822818
// Put a small, but reasonable timeout to get the initial heartbeats of the safekeepers to avoid a storage controller downtime
823819
const SK_TIMEOUT: Duration = Duration::from_secs(5);
824-
let res_sk = tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(all_sks)).await;
820+
let (res_ps, res_sk) = tokio::join!(
821+
self.heartbeater_ps.heartbeat(Arc::new(nodes_to_heartbeat)),
822+
tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(all_sks))
823+
);
825824

826825
let mut online_nodes = HashMap::new();
827826
if let Ok(deltas) = res_ps {
@@ -1064,8 +1063,12 @@ impl Service {
10641063
locked.safekeepers.clone()
10651064
};
10661065

1067-
let res_ps = self.heartbeater_ps.heartbeat(nodes).await;
1068-
let res_sk = self.heartbeater_sk.heartbeat(safekeepers).await;
1066+
const SK_TIMEOUT: Duration = Duration::from_secs(3);
1067+
let (res_ps, res_sk) = tokio::join!(
1068+
self.heartbeater_ps.heartbeat(nodes),
1069+
tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(safekeepers))
1070+
);
1071+
10691072
if let Ok(deltas) = res_ps {
10701073
let mut to_handle = Vec::default();
10711074

@@ -1167,7 +1170,7 @@ impl Service {
11671170
}
11681171
}
11691172
}
1170-
if let Ok(deltas) = res_sk {
1173+
if let Ok(Ok(deltas)) = res_sk {
11711174
let mut locked = self.inner.write().unwrap();
11721175
let mut safekeepers = (*locked.safekeepers).clone();
11731176
for (id, state) in deltas.0 {

0 commit comments

Comments
 (0)