From 8e219ed49a818b9655df7dbedf04d1cca3c3b743 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet <1105398+ma2bd@users.noreply.github.com> Date: Thu, 9 Oct 2025 12:42:36 -0400 Subject: [PATCH 1/3] handle dropped actors without crashing the RPC thread --- linera-core/src/worker.rs | 108 +++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 53 deletions(-) diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index c9fadf0ff6cb..9001607272a1 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -15,7 +15,6 @@ use linera_base::{ doc_scalar, hashed::Hashed, identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId}, - time::timer::{sleep, timeout}, }; #[cfg(with_testing)] use linera_chain::ChainExecutionContext; @@ -231,8 +230,6 @@ pub enum WorkerError { }, #[error("The block proposal is invalid: {0}")] InvalidBlockProposal(String), - #[error("The worker is too busy to handle new chains")] - FullChainWorkerCache, #[error("Failed to join spawned worker task")] JoinError, #[error("Blob was not required by any pending block")] @@ -241,6 +238,10 @@ pub enum WorkerError { TooManyPublishedBlobs(u64), #[error("Missing network description")] MissingNetworkDescription, + #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")] + ChainActorSendError { chain_id: ChainId, error: String }, + #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")] + ChainActorRecvError { chain_id: ChainId, error: String }, } impl From for WorkerError { @@ -749,11 +750,11 @@ where .await } + /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`. #[instrument(level = "trace", target = "telemetry_only", skip(self, request_builder), fields( nickname = %self.nickname, chain_id = %chain_id ))] - /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`. async fn query_chain_worker( &self, chain_id: ChainId, @@ -761,39 +762,10 @@ where oneshot::Sender>, ) -> ChainWorkerRequest, ) -> Result { - let chain_actor = self.get_chain_worker_endpoint(chain_id).await?; - let (callback, response) = oneshot::channel(); - - chain_actor - .send((request_builder(callback), tracing::Span::current())) - .expect("`ChainWorkerActor` stopped executing unexpectedly"); - - response - .await - .expect("`ChainWorkerActor` stopped executing without responding") - } - - /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, creating one and adding it - /// to the cache if needed. - #[instrument(level = "trace", target = "telemetry_only", skip(self), fields( - nickname = %self.nickname, - chain_id = %chain_id - ))] - async fn get_chain_worker_endpoint( - &self, - chain_id: ChainId, - ) -> Result, WorkerError> { - let (sender, new_receiver) = timeout(Duration::from_secs(3), async move { - loop { - match self.try_get_chain_worker_endpoint(chain_id) { - Some(endpoint) => break endpoint, - None => sleep(Duration::from_millis(250)).await, - } - } - }) - .await - .map_err(|_| WorkerError::FullChainWorkerCache)?; + let (response, new_receiver) = + self.call_and_maybe_create_chain_worker_endpoint(chain_id, request_builder)?; + // We just created an endpoint: spawn the actor. if let Some(receiver) = new_receiver { let delivery_notifier = self .delivery_notifiers @@ -826,36 +798,66 @@ where .spawn_task(actor_task); } - Ok(sender) + // Finally, wait a response. + match response.await { + Err(e) => { + // The actor endpoint was dropped. Better luck next time! + Err(WorkerError::ChainActorRecvError { + chain_id, + error: e.to_string(), + }) + } + Ok(response) => response, + } } - /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, attempting to create one - /// and add it to the cache if needed. - /// - /// Returns [`None`] if the cache is full and no candidate for eviction was found. - #[instrument(level = "trace", target = "telemetry_only", skip(self), fields( + /// Find an endpoint and call it. Create the endpoint if necessary. + #[instrument(level = "trace", target = "telemetry_only", skip(self, request_builder), fields( nickname = %self.nickname, chain_id = %chain_id ))] #[expect(clippy::type_complexity)] - fn try_get_chain_worker_endpoint( + fn call_and_maybe_create_chain_worker_endpoint( &self, chain_id: ChainId, - ) -> Option<( - ChainActorEndpoint, - Option< - mpsc::UnboundedReceiver<(ChainWorkerRequest, tracing::Span)>, - >, - )> { + request_builder: impl FnOnce( + oneshot::Sender>, + ) -> ChainWorkerRequest, + ) -> Result< + ( + oneshot::Receiver>, + Option< + mpsc::UnboundedReceiver<( + ChainWorkerRequest, + tracing::Span, + )>, + >, + ), + WorkerError, + > { let mut chain_workers = self.chain_workers.lock().unwrap(); - if let Some(endpoint) = chain_workers.get(&chain_id) { - Some((endpoint.clone(), None)) + let (sender, new_receiver) = if let Some(endpoint) = chain_workers.remove(&chain_id) { + (endpoint, None) } else { let (sender, receiver) = mpsc::unbounded_channel(); - chain_workers.insert(chain_id, sender.clone()); - Some((sender, Some(receiver))) + (sender, Some(receiver)) + }; + + let (callback, response) = oneshot::channel(); + + if let Err(e) = sender.send((request_builder(callback), tracing::Span::current())) { + // The actor was dropped. Give up without (re-)inserting the endpoint in the cache. + return Err(WorkerError::ChainActorSendError { + chain_id, + error: e.to_string(), + }); } + + // Put back the sender in the cache for next time. + chain_workers.insert(chain_id, sender); + + Ok((response, new_receiver)) } #[instrument(skip_all, fields( From 2cc8db814eab6280fd21af58dd7b4e91e1583f84 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet <1105398+ma2bd@users.noreply.github.com> Date: Thu, 9 Oct 2025 13:52:37 -0400 Subject: [PATCH 2/3] nit --- linera-core/src/worker.rs | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 9001607272a1..7245271fcacb 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -762,8 +762,12 @@ where oneshot::Sender>, ) -> ChainWorkerRequest, ) -> Result { - let (response, new_receiver) = - self.call_and_maybe_create_chain_worker_endpoint(chain_id, request_builder)?; + // Build the request. + let (callback, response) = oneshot::channel(); + let request = request_builder(callback); + + // Call the endpoint, possibly a new one. + let new_receiver = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?; // We just created an endpoint: spawn the actor. if let Some(receiver) = new_receiver { @@ -812,27 +816,19 @@ where } /// Find an endpoint and call it. Create the endpoint if necessary. - #[instrument(level = "trace", target = "telemetry_only", skip(self, request_builder), fields( + #[instrument(level = "trace", target = "telemetry_only", skip(self), fields( nickname = %self.nickname, chain_id = %chain_id ))] #[expect(clippy::type_complexity)] - fn call_and_maybe_create_chain_worker_endpoint( + fn call_and_maybe_create_chain_worker_endpoint( &self, chain_id: ChainId, - request_builder: impl FnOnce( - oneshot::Sender>, - ) -> ChainWorkerRequest, + request: ChainWorkerRequest, ) -> Result< - ( - oneshot::Receiver>, - Option< - mpsc::UnboundedReceiver<( - ChainWorkerRequest, - tracing::Span, - )>, - >, - ), + Option< + mpsc::UnboundedReceiver<(ChainWorkerRequest, tracing::Span)>, + >, WorkerError, > { let mut chain_workers = self.chain_workers.lock().unwrap(); @@ -844,9 +840,7 @@ where (sender, Some(receiver)) }; - let (callback, response) = oneshot::channel(); - - if let Err(e) = sender.send((request_builder(callback), tracing::Span::current())) { + if let Err(e) = sender.send((request, tracing::Span::current())) { // The actor was dropped. Give up without (re-)inserting the endpoint in the cache. return Err(WorkerError::ChainActorSendError { chain_id, @@ -857,7 +851,7 @@ where // Put back the sender in the cache for next time. chain_workers.insert(chain_id, sender); - Ok((response, new_receiver)) + Ok(new_receiver) } #[instrument(skip_all, fields( From 4a97c523b3828b64a695af7323d83b80637abf64 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet <1105398+ma2bd@users.noreply.github.com> Date: Thu, 9 Oct 2025 13:56:11 -0400 Subject: [PATCH 3/3] address reviewer comment --- linera-core/src/worker.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 7245271fcacb..6af111954364 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -239,9 +239,15 @@ pub enum WorkerError { #[error("Missing network description")] MissingNetworkDescription, #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")] - ChainActorSendError { chain_id: ChainId, error: String }, + ChainActorSendError { + chain_id: ChainId, + error: Box, + }, #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")] - ChainActorRecvError { chain_id: ChainId, error: String }, + ChainActorRecvError { + chain_id: ChainId, + error: Box, + }, } impl From for WorkerError { @@ -808,7 +814,7 @@ where // The actor endpoint was dropped. Better luck next time! Err(WorkerError::ChainActorRecvError { chain_id, - error: e.to_string(), + error: Box::new(e), }) } Ok(response) => response, @@ -844,7 +850,7 @@ where // The actor was dropped. Give up without (re-)inserting the endpoint in the cache. return Err(WorkerError::ChainActorSendError { chain_id, - error: e.to_string(), + error: Box::new(e), }); }