Skip to content

Commit

Permalink
refactor(host): simplify shutdown binary provider usage
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooksmtownsend@gmail.com>
  • Loading branch information
brooksmtownsend committed Feb 5, 2025
1 parent f85c6be commit e748d2d
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 144 deletions.
17 changes: 15 additions & 2 deletions crates/host/src/wasmbus/ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ impl ControlInterfaceServer for Host {
let provider_ref = request.provider_ref();
let annotations = request.annotations();

if let Err(err) = self
if let Err(err) = Arc::clone(&self)
.handle_start_provider_task(
config,
provider_id,
Expand Down Expand Up @@ -538,9 +538,16 @@ impl ControlInterfaceServer for Host {
return Ok(CtlResponse::error("provider with that ID is not running"));
};
let Provider {
ref annotations, ..
ref annotations,
mut tasks,
shutdown,
..
} = entry.remove();

// Set the shutdown flag to true to stop health checks and config updates. Also
// prevents restarting the provider but does not stop the provider process.
shutdown.store(true, Ordering::Relaxed);

// Send a request to the provider, requesting a graceful shutdown
let req = serde_json::to_vec(&json!({ "host_id": host_id }))
.context("failed to encode provider stop request")?;
Expand All @@ -566,7 +573,13 @@ impl ControlInterfaceServer for Host {
provider_id,
"provider did not gracefully shut down in time, shutting down forcefully"
);
// NOTE: The provider child process is spawned with [tokio::process::Command::kill_on_drop],
// so dropping the task will send a SIGKILL to the provider process.
}

// Stop the provider and health check / config changes tasks
tasks.abort_all();

info!(provider_id, "provider stopped");
self.publish_event(
"provider_stopped",
Expand Down
66 changes: 34 additions & 32 deletions crates/host/src/wasmbus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,39 +1054,41 @@ impl Host {
#[instrument(level = "debug", skip_all)]
async fn inventory(&self) -> HostInventory {
trace!("generating host inventory");
let components = self.components.read().await;
let components: Vec<_> = stream::iter(components.iter())
.filter_map(|(id, component)| async move {
let mut description = ComponentDescription::builder()
.id(id.into())
.image_ref(component.image_reference.to_string())
.annotations(component.annotations.clone().into_iter().collect())
.max_instances(component.max_instances.get().try_into().unwrap_or(u32::MAX))
.revision(
component
.claims()
.and_then(|claims| claims.metadata.as_ref())
.and_then(|jwt::Component { rev, .. }| *rev)
.unwrap_or_default(),
);
// Add name if present
if let Some(name) = component
.claims()
.and_then(|claims| claims.metadata.as_ref())
.and_then(|metadata| metadata.name.as_ref())
.cloned()
{
description = description.name(name);
};
let components: Vec<_> = {
let components = self.components.read().await;
stream::iter(components.iter())
.filter_map(|(id, component)| async move {
let mut description = ComponentDescription::builder()
.id(id.into())
.image_ref(component.image_reference.to_string())
.annotations(component.annotations.clone().into_iter().collect())
.max_instances(component.max_instances.get().try_into().unwrap_or(u32::MAX))
.revision(
component
.claims()
.and_then(|claims| claims.metadata.as_ref())
.and_then(|jwt::Component { rev, .. }| *rev)
.unwrap_or_default(),
);
// Add name if present
if let Some(name) = component
.claims()
.and_then(|claims| claims.metadata.as_ref())
.and_then(|metadata| metadata.name.as_ref())
.cloned()
{
description = description.name(name);
};

Some(
description
.build()
.expect("failed to build component description: {e}"),
)
})
.collect()
.await;
Some(
description
.build()
.expect("failed to build component description: {e}"),
)
})
.collect()
.await
};

let providers: Vec<_> = self
.providers
Expand Down
203 changes: 93 additions & 110 deletions crates/host/src/wasmbus/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use cloudevents::EventBuilderV10;
use futures::{stream, Future, StreamExt};
use nkeys::XKey;
use tokio::io::AsyncWriteExt;
use tokio::process;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tokio::{process, select};
use tracing::{error, instrument, trace, warn};
use uuid::Uuid;
use wascap::jwt::{CapabilityProvider, Token};
Expand Down Expand Up @@ -242,7 +242,6 @@ impl Host {
Arc::clone(&self.host_config.lattice),
self.host_key.public_key(),
provider_id.to_string(),
shutdown.clone(),
));

Ok(tasks)
Expand Down Expand Up @@ -282,7 +281,6 @@ impl Host {
Arc::clone(&config_bundle),
Arc::clone(&lattice),
provider_id.clone(),
shutdown.clone(),
));
loop {
let mut child = child.write().await;
Expand Down Expand Up @@ -344,7 +342,6 @@ impl Host {
new_config_bundle,
Arc::clone(&lattice),
provider_id.clone(),
shutdown.clone(),
));

// Restart the provider by attempting to re-execute the binary with the same
Expand Down Expand Up @@ -428,7 +425,6 @@ fn check_health(
lattice: Arc<str>,
host_id: String,
provider_id: String,
shutdown: Arc<AtomicBool>,
) -> impl Future<Output = ()> {
let health_subject =
async_nats::Subject::from(format!("wasmbus.rpc.{lattice}.{provider_id}.health"));
Expand All @@ -440,90 +436,87 @@ fn check_health(
health_check.reset_after(Duration::from_secs(5));
async move {
loop {
select! {
_ = health_check.tick() => {
trace!(?provider_id, "performing provider health check");
let request = async_nats::Request::new()
.payload(Bytes::new())
.headers(injector_to_headers(&TraceContextInjector::default_with_span()));
if let Ok(async_nats::Message { payload, ..}) = rpc_nats.send_request(
health_subject.clone(),
request,
).await {
match (serde_json::from_slice::<HealthCheckResponse>(&payload), previous_healthy) {
(Ok(HealthCheckResponse { healthy: true, ..}), false) => {
trace!(?provider_id, "provider health check succeeded");
previous_healthy = true;
if let Err(e) = event::publish(
&event_builder,
&ctl_nats,
&lattice,
"health_check_passed",
event::provider_health_check(
&host_id,
&provider_id,
)
).await {
warn!(
?e,
?provider_id,
"failed to publish provider health check succeeded event",
);
}
},
(Ok(HealthCheckResponse { healthy: false, ..}), true) => {
trace!(?provider_id, "provider health check failed");
previous_healthy = false;
if let Err(e) = event::publish(
&event_builder,
&ctl_nats,
&lattice,
"health_check_failed",
event::provider_health_check(
&host_id,
&provider_id,
)
).await {
warn!(
?e,
?provider_id,
"failed to publish provider health check failed event",
);
}
}
// If the provider health status didn't change, we simply publish a health check status event
(Ok(_), _) => {
if let Err(e) = event::publish(
&event_builder,
&ctl_nats,
&lattice,
"health_check_status",
event::provider_health_check(
&host_id,
&provider_id,
)
).await {
warn!(
?e,
?provider_id,
"failed to publish provider health check status event",
);
}
},
_ => warn!(
?provider_id,
"failed to deserialize provider health check response"
),
}
let _ = health_check.tick().await;
trace!(?provider_id, "performing provider health check");
let request =
async_nats::Request::new()
.payload(Bytes::new())
.headers(injector_to_headers(
&TraceContextInjector::default_with_span(),
));
if let Ok(async_nats::Message { payload, .. }) =
rpc_nats.send_request(health_subject.clone(), request).await
{
match (
serde_json::from_slice::<HealthCheckResponse>(&payload),
previous_healthy,
) {
(Ok(HealthCheckResponse { healthy: true, .. }), false) => {
trace!(?provider_id, "provider health check succeeded");
previous_healthy = true;
if let Err(e) = event::publish(
&event_builder,
&ctl_nats,
&lattice,
"health_check_passed",
event::provider_health_check(&host_id, &provider_id),
)
.await
{
warn!(
?e,
?provider_id,
"failed to publish provider health check succeeded event",
);
}
else {
warn!(?provider_id, "failed to request provider health, retrying in 30 seconds");
}
(Ok(HealthCheckResponse { healthy: false, .. }), true) => {
trace!(?provider_id, "provider health check failed");
previous_healthy = false;
if let Err(e) = event::publish(
&event_builder,
&ctl_nats,
&lattice,
"health_check_failed",
event::provider_health_check(&host_id, &provider_id),
)
.await
{
warn!(
?e,
?provider_id,
"failed to publish provider health check failed event",
);
}
}
// If the provider health status didn't change, we simply publish a health check status event
(Ok(_), _) => {
if let Err(e) = event::publish(
&event_builder,
&ctl_nats,
&lattice,
"health_check_status",
event::provider_health_check(&host_id, &provider_id),
)
.await
{
warn!(
?e,
?provider_id,
"failed to publish provider health check status event",
);
}
}
_ => warn!(
?provider_id,
"failed to deserialize provider health check response"
),
}
true = async { shutdown.load(Ordering::Relaxed) } => {
trace!(?provider_id, "received shutdown signal, stopping health check task");
break;
}
} else {
warn!(
?provider_id,
"failed to request provider health, retrying in 30 seconds"
);
}
}
}
Expand All @@ -538,38 +531,28 @@ fn watch_config(
config: Arc<RwLock<ConfigBundle>>,
lattice: Arc<str>,
provider_id: String,
shutdown: Arc<AtomicBool>,
) -> impl Future<Output = ()> {
let subject = provider_config_update_subject(&lattice, &provider_id);
trace!(?provider_id, "starting config update listener");
async move {
loop {
let mut config = config.write().await;
select! {
maybe_update = config.changed() => {
let Ok(update) = maybe_update else {
// TODO: shouldn't this be continue?
break;
};
trace!(?provider_id, "provider config bundle changed");
let bytes = match serde_json::to_vec(&*update) {
Ok(bytes) => bytes,
Err(err) => {
error!(%err, ?provider_id, ?lattice, "failed to serialize configuration update ");
continue;
}
};
trace!(?provider_id, subject, "publishing config bundle bytes");
if let Err(err) = rpc_nats.publish(subject.clone(), Bytes::from(bytes)).await {
error!(%err, ?provider_id, ?lattice, "failed to publish configuration update bytes to component");
if let Ok(update) = config.changed().await {
trace!(?provider_id, "provider config bundle changed");
let bytes = match serde_json::to_vec(&*update) {
Ok(bytes) => bytes,
Err(err) => {
error!(%err, ?provider_id, ?lattice, "failed to serialize configuration update ");
continue;
}
};
trace!(?provider_id, subject, "publishing config bundle bytes");
if let Err(err) = rpc_nats.publish(subject.clone(), Bytes::from(bytes)).await {
error!(%err, ?provider_id, ?lattice, "failed to publish configuration update bytes to component");
}
true = async { shutdown.load(Ordering::Relaxed) } => {
trace!(?provider_id, "received shutdown signal, stopping config update listener");
// TODO: shouldn't this be return?
break;
}
}
} else {
break;
};
}
}
}

0 comments on commit e748d2d

Please sign in to comment.