Skip to content

Commit 31e14b1

Browse files
committed
Fix bug where actions rarely get timedout on rejoin
Fix case when an action is scheduled, then disconnected, then sits in an idle stat until it is considered timed out, then it gets joined onto another client, the matching engine logic will mark it timed out before the client is able to mark it keep_alive. closes: #1568
1 parent 0915e03 commit 31e14b1

File tree

3 files changed

+93
-3
lines changed

3 files changed

+93
-3
lines changed

nativelink-scheduler/src/memory_awaited_action_db.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,15 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
783783
};
784784
*connected_clients += 1;
785785

786+
// Immediately mark the keep alive, we don't need to wake anyone
787+
// so we always fake that it was not actually changed.
788+
// Failing update the client could lead to the client connecting
789+
// then not updating the keep alive in time, resulting in the
790+
// operation timing out due to async behavior.
791+
tx.send_if_modified(|awaited_action| {
792+
awaited_action.update_client_keep_alive((self.now_fn)().now());
793+
false
794+
});
786795
let subscription = tx.subscribe();
787796

788797
self.client_operation_to_awaited_action

nativelink-scheduler/src/store_awaited_action_db.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -490,18 +490,31 @@ where
490490
.await
491491
.err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?;
492492
match maybe_awaited_action {
493-
Some(awaited_action) => {
493+
Some(mut awaited_action) => {
494494
// TODO(allada) We don't support joining completed jobs because we
495495
// need to also check that all the data is still in the cache.
496496
if awaited_action.state().stage.is_finished() {
497497
return Ok(None);
498498
}
499499
// TODO(allada) We only care about the operation_id here, we should
500500
// have a way to tell the decoder we only care about specific fields.
501-
let operation_id = awaited_action.operation_id();
501+
let operation_id = awaited_action.operation_id().clone();
502+
503+
awaited_action.update_client_keep_alive((self.now_fn)().now());
504+
let update_res = inner_update_awaited_action(self.store.as_ref(), awaited_action)
505+
.await
506+
.err_tip(|| "In OperationSubscriber::changed");
507+
if let Err(err) = update_res {
508+
event!(
509+
Level::WARN,
510+
"Error updating client keep alive in RedisAwaitedActionDb::try_subscribe - {err:?} - This is not a critical error, but we did decide to create a new action instead of joining an existing one."
511+
);
512+
return Ok(None);
513+
}
514+
502515
Ok(Some(OperationSubscriber::new(
503516
Some(client_operation_id.clone()),
504-
OperationIdToAwaitedAction(Cow::Owned(operation_id.clone())),
517+
OperationIdToAwaitedAction(Cow::Owned(operation_id)),
505518
Arc::downgrade(&self.store),
506519
self.now_fn.clone(),
507520
)))

nativelink-scheduler/tests/simple_scheduler_test.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2221,3 +2221,71 @@ async fn client_reconnect_keeps_action_alive() -> Result<(), Error> {
22212221

22222222
Ok(())
22232223
}
2224+
2225+
#[nativelink_test]
2226+
async fn client_timesout_job_then_same_action_requested() -> Result<(), Error> {
2227+
const CLIENT_ACTION_TIMEOUT_S: u64 = 60;
2228+
let task_change_notify = Arc::new(Notify::new());
2229+
let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback(
2230+
&SimpleSpec {
2231+
worker_timeout_s: WORKER_TIMEOUT_S,
2232+
client_action_timeout_s: CLIENT_ACTION_TIMEOUT_S,
2233+
..Default::default()
2234+
},
2235+
memory_awaited_action_db_factory(
2236+
0,
2237+
&task_change_notify.clone(),
2238+
MockInstantWrapped::default,
2239+
),
2240+
|| async move {},
2241+
task_change_notify,
2242+
MockInstantWrapped::default,
2243+
);
2244+
let action_digest = DigestInfo::new([99u8; 32], 512);
2245+
2246+
{
2247+
let insert_timestamp = make_system_time(1);
2248+
let mut action_listener =
2249+
setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp)
2250+
.await
2251+
.unwrap();
2252+
2253+
// We should get one notification saying it's queued.
2254+
assert_eq!(
2255+
action_listener.changed().await.unwrap().stage,
2256+
ActionStage::Queued
2257+
);
2258+
2259+
let changed_fut = action_listener.changed();
2260+
tokio::pin!(changed_fut);
2261+
2262+
MockClock::advance(Duration::from_secs(2));
2263+
scheduler.do_try_match_for_test().await.unwrap();
2264+
assert_eq!(poll!(&mut changed_fut), Poll::Pending);
2265+
}
2266+
2267+
MockClock::advance(Duration::from_secs(CLIENT_ACTION_TIMEOUT_S + 1));
2268+
2269+
{
2270+
let insert_timestamp = make_system_time(1);
2271+
let mut action_listener =
2272+
setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp)
2273+
.await
2274+
.unwrap();
2275+
2276+
// We should get one notification saying it's queued.
2277+
assert_eq!(
2278+
action_listener.changed().await.unwrap().stage,
2279+
ActionStage::Queued
2280+
);
2281+
2282+
let changed_fut = action_listener.changed();
2283+
tokio::pin!(changed_fut);
2284+
2285+
MockClock::advance(Duration::from_secs(2));
2286+
tokio::task::yield_now().await;
2287+
assert_eq!(poll!(&mut changed_fut), Poll::Pending);
2288+
}
2289+
2290+
Ok(())
2291+
}

0 commit comments

Comments
 (0)