Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 49 additions & 47 deletions linera-core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use linera_base::{
doc_scalar,
hashed::Hashed,
identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
time::timer::{sleep, timeout},
};
#[cfg(with_testing)]
use linera_chain::ChainExecutionContext;
Expand Down Expand Up @@ -231,8 +230,6 @@ pub enum WorkerError {
},
#[error("The block proposal is invalid: {0}")]
InvalidBlockProposal(String),
#[error("The worker is too busy to handle new chains")]
FullChainWorkerCache,
#[error("Failed to join spawned worker task")]
JoinError,
#[error("Blob was not required by any pending block")]
Expand All @@ -241,6 +238,16 @@ pub enum WorkerError {
TooManyPublishedBlobs(u64),
#[error("Missing network description")]
MissingNetworkDescription,
#[error("ChainWorkerActor for chain {chain_id} stopped executing unexpectedly: {error}")]
ChainActorSendError {
chain_id: ChainId,
error: Box<dyn std::error::Error + Send + Sync>,
},
#[error("ChainWorkerActor for chain {chain_id} stopped executing without responding: {error}")]
ChainActorRecvError {
chain_id: ChainId,
error: Box<dyn std::error::Error + Send + Sync>,
},
}

impl From<ChainError> for WorkerError {
Expand Down Expand Up @@ -749,51 +756,26 @@ where
.await
}

/// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`.
#[instrument(level = "trace", target = "telemetry_only", skip(self, request_builder), fields(
nickname = %self.nickname,
chain_id = %chain_id
))]
/// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`.
async fn query_chain_worker<Response>(
&self,
chain_id: ChainId,
request_builder: impl FnOnce(
oneshot::Sender<Result<Response, WorkerError>>,
) -> ChainWorkerRequest<StorageClient::Context>,
) -> Result<Response, WorkerError> {
let chain_actor = self.get_chain_worker_endpoint(chain_id).await?;
// Build the request.
let (callback, response) = oneshot::channel();
let request = request_builder(callback);

chain_actor
.send((request_builder(callback), tracing::Span::current()))
.expect("`ChainWorkerActor` stopped executing unexpectedly");

response
.await
.expect("`ChainWorkerActor` stopped executing without responding")
}

/// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, creating one and adding it
/// to the cache if needed.
#[instrument(level = "trace", target = "telemetry_only", skip(self), fields(
nickname = %self.nickname,
chain_id = %chain_id
))]
async fn get_chain_worker_endpoint(
&self,
chain_id: ChainId,
) -> Result<ChainActorEndpoint<StorageClient>, WorkerError> {
let (sender, new_receiver) = timeout(Duration::from_secs(3), async move {
loop {
match self.try_get_chain_worker_endpoint(chain_id) {
Some(endpoint) => break endpoint,
None => sleep(Duration::from_millis(250)).await,
}
}
})
.await
.map_err(|_| WorkerError::FullChainWorkerCache)?;
// Call the endpoint, possibly a new one.
let new_receiver = self.call_and_maybe_create_chain_worker_endpoint(chain_id, request)?;

// We just created an endpoint: spawn the actor.
if let Some(receiver) = new_receiver {
let delivery_notifier = self
.delivery_notifiers
Expand Down Expand Up @@ -826,36 +808,56 @@ where
.spawn_task(actor_task);
}

Ok(sender)
// Finally, wait a response.
match response.await {
Err(e) => {
// The actor endpoint was dropped. Better luck next time!
Err(WorkerError::ChainActorRecvError {
chain_id,
error: Box::new(e),
})
}
Ok(response) => response,
}
}

/// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, attempting to create one
/// and add it to the cache if needed.
///
/// Returns [`None`] if the cache is full and no candidate for eviction was found.
/// Find an endpoint and call it. Create the endpoint if necessary.
#[instrument(level = "trace", target = "telemetry_only", skip(self), fields(
nickname = %self.nickname,
chain_id = %chain_id
))]
#[expect(clippy::type_complexity)]
fn try_get_chain_worker_endpoint(
fn call_and_maybe_create_chain_worker_endpoint(
&self,
chain_id: ChainId,
) -> Option<(
ChainActorEndpoint<StorageClient>,
request: ChainWorkerRequest<StorageClient::Context>,
) -> Result<
Option<
mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
>,
)> {
WorkerError,
> {
let mut chain_workers = self.chain_workers.lock().unwrap();

if let Some(endpoint) = chain_workers.get(&chain_id) {
Some((endpoint.clone(), None))
let (sender, new_receiver) = if let Some(endpoint) = chain_workers.remove(&chain_id) {
(endpoint, None)
} else {
let (sender, receiver) = mpsc::unbounded_channel();
chain_workers.insert(chain_id, sender.clone());
Some((sender, Some(receiver)))
(sender, Some(receiver))
};

if let Err(e) = sender.send((request, tracing::Span::current())) {
// The actor was dropped. Give up without (re-)inserting the endpoint in the cache.
return Err(WorkerError::ChainActorSendError {
chain_id,
error: Box::new(e),
});
}

// Put back the sender in the cache for next time.
chain_workers.insert(chain_id, sender);

Ok(new_receiver)
}

#[instrument(skip_all, fields(
Expand Down
Loading