Skip to content

Commit

Permalink
We can not wait for ack from publishing to NATS core
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Aug 30, 2024
1 parent 161a1fc commit 5934cc7
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
26 changes: 23 additions & 3 deletions rhio/src/nats/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,20 @@ use crate::nats::consumer::{Consumer, ConsumerId, JetStreamEvent};

pub enum ToNatsActor {
Publish {
/// Wait for acknowledgment of NATS JetStream.
///
/// Important: If we're sending a regular NATS Core message (for example during a
/// request-response flow), messages will _never_ be acknowledged. In this case this flag
/// should be set to false.
wait_for_ack: bool,

/// NATS subject to which this message is published to.
subject: Subject,

/// Payload of message.
payload: Vec<u8>,

/// Channel to receive result. Can fail if server did not acknowledge message in time.
reply: oneshot::Sender<Result<()>>,
},
Subscribe {
Expand Down Expand Up @@ -104,11 +116,12 @@ impl NatsActor {
async fn on_actor_message(&mut self, msg: ToNatsActor) -> Result<()> {
match msg {
ToNatsActor::Publish {
wait_for_ack,
subject,
payload,
reply,
} => {
let result = self.on_publish(subject, payload).await;
let result = self.on_publish(wait_for_ack, subject, payload).await;
reply.send(result).ok();
}
ToNatsActor::Subscribe {
Expand All @@ -131,12 +144,19 @@ impl NatsActor {
/// Publish a message inside an existing stream.
///
/// This method fails if the stream does not exist on the NATS server
async fn on_publish(&self, subject: Subject, payload: Vec<u8>) -> Result<()> {
async fn on_publish(
&self,
wait_for_ack: bool,
subject: Subject,
payload: Vec<u8>,
) -> Result<()> {
let server_ack = self.nats_jetstream.publish(subject, payload.into()).await?;

// Wait until the server confirmed receiving this message, to make sure it got delivered
// and persisted
server_ack.await?;
if wait_for_ack {
server_ack.await.context("publish message to nats server")?;
}

Ok(())
}
Expand Down
8 changes: 7 additions & 1 deletion rhio/src/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,16 @@ impl Nats {
reply_rx.await?
}

pub async fn publish(&self, subject: Subject, payload: Vec<u8>) -> Result<()> {
pub async fn publish(
&self,
wait_for_ack: bool,
subject: Subject,
payload: Vec<u8>,
) -> Result<()> {
let (reply, reply_rx) = oneshot::channel();
self.nats_actor_tx
.send(ToNatsActor::Publish {
wait_for_ack,
subject,
payload,
reply,
Expand Down
6 changes: 4 additions & 2 deletions rhio/src/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl NodeActor {
.extract()
.ok_or(anyhow!("missing 'subject' field in header"))?;
let payload = encode_operation(operation.header, operation.body)?;
self.nats.publish(subject, payload).await?;
self.nats.publish(true, subject, payload).await?;

Ok(())
}
Expand Down Expand Up @@ -317,8 +317,10 @@ impl NodeActor {

// If the control command requested a response via NATS Core, we will provide it!
if let Some(subject) = reply_subject {
// Since NATS Core messages are never acknowledged ("fire and forget"), we set
// the flag to "false" to never wait for an ACK
self.nats
.publish(subject.to_string(), hash.to_bytes())
.publish(false, subject.to_string(), hash.to_bytes())
.await?;
}
}
Expand Down

0 comments on commit 5934cc7

Please sign in to comment.