Skip to content

Commit

Permalink
chore: Handle heartbeats in topics (momentohq#126)
Browse files Browse the repository at this point in the history
This adds support for heartbeats that will be rolled out more broadly
soon. We'll need to be able to accept these messages and allow the
subscription to continue.
  • Loading branch information
kvcache authored Feb 15, 2023
1 parent 77e254d commit 41108e9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ members = [
exclude = [ "example" ]

[dependencies]
momento-protos = { version = "=0.42.4" }
momento-protos = { version = "0.43.0" }
log = "0.4.17"
hyper = { version = "0.14" }
h2 = { version = "0.3" }
Expand Down
53 changes: 37 additions & 16 deletions src/preview/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,25 +171,41 @@ pub struct Subscription {
inner: tonic::Streaming<pubsub::SubscriptionItem>,
}

enum MapKind {
Heartbeat,
RealItem(SubscriptionItem),
BrokenProtocolMissingAttribute(&'static str),
StreamClosed,
}

impl Subscription {
/// Wait for the next item in the stream.
///
/// Result::Ok(Some(item)) -> the server sent you a subscription item!
/// Result::Ok(None) -> the server is done - there will be no more items!
/// Result::Err(MomentoError) -> something went wrong - log it and maybe reach out if you need help!
pub async fn item(&mut self) -> Result<Option<SubscriptionItem>, MomentoError> {
self.inner
.message()
.await
.map_err(|e| e.into())
.map(Subscription::map_into)
loop {
let next = self
.inner
.message()
.await
.map_err(MomentoError::from)
.map(Subscription::map_into)?;
match next {
MapKind::RealItem(item) => return Ok(Some(item)),
MapKind::StreamClosed => return Ok(None),
MapKind::Heartbeat => log::debug!("received a heartbeat"),
MapKind::BrokenProtocolMissingAttribute(missing_attribute) => log::warn!("Missing attribute: {missing_attribute} - do you need to update your Momento SDK version?"),
}
}
}

/// Yeah this is a pain, but doing it here lets us yield a simpler-typed subscription stream.
/// Also, we don't want to expose protocol buffers types outside of the sdk, so some type map
/// had to happen. It's all one-off at the moment though so might as well leave it as one
/// triangle expression =)
fn map_into(possible_item: Option<pubsub::SubscriptionItem>) -> Option<SubscriptionItem> {
fn map_into(possible_item: Option<pubsub::SubscriptionItem>) -> MapKind {
match possible_item {
Some(item) => match item.kind {
Some(kind) => match kind {
Expand All @@ -198,7 +214,7 @@ impl Subscription {
let sequence_number = item.topic_sequence_number;
match value.kind {
Some(topic_value_kind) => {
Some(SubscriptionItem::Value(SubscriptionValue {
MapKind::RealItem(SubscriptionItem::Value(SubscriptionValue {
topic_sequence_number: sequence_number,
kind: match topic_value_kind {
pubsub::topic_value::Kind::Text(text) => {
Expand All @@ -210,25 +226,30 @@ impl Subscription {
},
}))
}
// Broken protocol
None => Some(SubscriptionItem::Discontinuity(Discontinuity {
last_sequence_number: None,
new_sequence_number: sequence_number,
})),
// This is kind of a broken protocol situation - but we do have a sequence number
// so communicating the discontinuity at least allows downstream consumers to
// take action on a partially-unsupported stream.
None => MapKind::RealItem(SubscriptionItem::Discontinuity(
Discontinuity {
last_sequence_number: None,
new_sequence_number: sequence_number,
},
)),
}
}
None => None, // Broken protocol
None => MapKind::BrokenProtocolMissingAttribute("value kind"),
},
pubsub::subscription_item::Kind::Discontinuity(discontinuity) => {
Some(SubscriptionItem::Discontinuity(Discontinuity {
MapKind::RealItem(SubscriptionItem::Discontinuity(Discontinuity {
last_sequence_number: Some(discontinuity.last_topic_sequence),
new_sequence_number: discontinuity.new_topic_sequence,
}))
}
pubsub::subscription_item::Kind::Heartbeat(_) => MapKind::Heartbeat,
},
None => None, // Broken protocol,
None => MapKind::BrokenProtocolMissingAttribute("item kind"),
},
None => None, // Normal end-of-stream from server
None => MapKind::StreamClosed, // Normal end-of-stream from server
}
}
}
Expand Down

0 comments on commit 41108e9

Please sign in to comment.