diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index c9fadf0ff6c..6af11195436 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -15,7 +15,6 @@ use linera_base::{ doc_scalar, hashed::Hashed, identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId}, - time::timer::{sleep, timeout}, }; #[cfg(with_testing)] use linera_chain::ChainExecutionContext; @@ -231,8 +230,6 @@ pub enum WorkerError { }, #[error("The block proposal is invalid: {0}")] InvalidBlockProposal(String), - #[error("The worker is too busy to handle new chains")] - FullChainWorkerCache, #[error("Failed to join spawned worker task")] JoinError, #[error("Blob was not required by any pending block")] @@ -241,6 +238,16 @@ pub enum WorkerError { TooManyPublishedBlobs(u64), #[error("Missing network description")] MissingNetworkDescription, + #[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")] + ChainActorSendError { + chain_id: ChainId, + error: Box, + }, + #[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")] + ChainActorRecvError { + chain_id: ChainId, + error: Box, + }, } impl From for WorkerError { @@ -749,11 +756,11 @@ where .await } + /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`. #[instrument(level = "trace", target = "telemetry_only", skip(self, request_builder), fields( nickname = %self.nickname, chain_id = %chain_id ))] - /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`. async fn query_chain_worker( &self, chain_id: ChainId, @@ -761,39 +768,14 @@ where oneshot::Sender>, ) -> ChainWorkerRequest, ) -> Result { - let chain_actor = self.get_chain_worker_endpoint(chain_id).await?; + // Build the request. let (callback, response) = oneshot::channel(); + let request = request_builder(callback); - chain_actor - .send((request_builder(callback), tracing::Span::current())) - .expect("`ChainWorkerActor` stopped executing unexpectedly"); - - response - .await - .expect("`ChainWorkerActor` stopped executing without responding") - } - - /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, creating one and adding it - /// to the cache if needed. - #[instrument(level = "trace", target = "telemetry_only", skip(self), fields( - nickname = %self.nickname, - chain_id = %chain_id - ))] - async fn get_chain_worker_endpoint( - &self, - chain_id: ChainId, - ) -> Result, WorkerError> { - let (sender, new_receiver) = timeout(Duration::from_secs(3), async move { - loop { - match self.try_get_chain_worker_endpoint(chain_id) { - Some(endpoint) => break endpoint, - None => sleep(Duration::from_millis(250)).await, - } - } - }) - .await - .map_err(|_| WorkerError::FullChainWorkerCache)?; + // Call the endpoint, possibly a new one. + let new_receiver = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?; + // We just created an endpoint: spawn the actor. if let Some(receiver) = new_receiver { let delivery_notifier = self .delivery_notifiers @@ -826,36 +808,56 @@ where .spawn_task(actor_task); } - Ok(sender) + // Finally, wait a response. + match response.await { + Err(e) => { + // The actor endpoint was dropped. Better luck next time! + Err(WorkerError::ChainActorRecvError { + chain_id, + error: Box::new(e), + }) + } + Ok(response) => response, + } } - /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, attempting to create one - /// and add it to the cache if needed. - /// - /// Returns [`None`] if the cache is full and no candidate for eviction was found. + /// Find an endpoint and call it. Create the endpoint if necessary. #[instrument(level = "trace", target = "telemetry_only", skip(self), fields( nickname = %self.nickname, chain_id = %chain_id ))] #[expect(clippy::type_complexity)] - fn try_get_chain_worker_endpoint( + fn call_and_maybe_create_chain_worker_endpoint( &self, chain_id: ChainId, - ) -> Option<( - ChainActorEndpoint, + request: ChainWorkerRequest, + ) -> Result< Option< mpsc::UnboundedReceiver<(ChainWorkerRequest, tracing::Span)>, >, - )> { + WorkerError, + > { let mut chain_workers = self.chain_workers.lock().unwrap(); - if let Some(endpoint) = chain_workers.get(&chain_id) { - Some((endpoint.clone(), None)) + let (sender, new_receiver) = if let Some(endpoint) = chain_workers.remove(&chain_id) { + (endpoint, None) } else { let (sender, receiver) = mpsc::unbounded_channel(); - chain_workers.insert(chain_id, sender.clone()); - Some((sender, Some(receiver))) + (sender, Some(receiver)) + }; + + if let Err(e) = sender.send((request, tracing::Span::current())) { + // The actor was dropped. Give up without (re-)inserting the endpoint in the cache. + return Err(WorkerError::ChainActorSendError { + chain_id, + error: Box::new(e), + }); } + + // Put back the sender in the cache for next time. + chain_workers.insert(chain_id, sender); + + Ok(new_receiver) } #[instrument(skip_all, fields(