Skip to content

Commit

Permalink
More expressive multi-fetch message model
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 9, 2024
1 parent c1aa133 commit e939ff2
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 53 deletions.
44 changes: 19 additions & 25 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ use quickwit_ingest::{
};
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_proto::ingest::ingester::{
fetch_message, FetchEof, FetchPayload, IngesterService, TruncateShardsRequest,
TruncateShardsSubrequest,
FetchEof, FetchPayload, IngesterService, TruncateShardsRequest, TruncateShardsSubrequest,
};
use quickwit_proto::ingest::IngestV2Error;
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -473,33 +472,28 @@ impl Source for IngestSource {
let deadline = now + *EMIT_BATCHES_TIMEOUT;
loop {
match time::timeout_at(deadline, self.fetch_stream.next()).await {
Ok(Ok(MultiFetchMessage {
fetch_message,
force_commit,
})) => {
if force_commit {
batch_builder.force_commit();
}
match fetch_message.message {
Some(fetch_message::Message::Payload(fetch_payload)) => {
self.process_fetch_payload(&mut batch_builder, fetch_payload)?;
Ok(Ok(multi_fetch_message)) => match multi_fetch_message {
MultiFetchMessage::Payload(fetch_payload) => {
self.process_fetch_payload(&mut batch_builder, fetch_payload)?;

if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT {
break;
}
}
Some(fetch_message::Message::Eof(fetch_eof)) => {
self.process_fetch_eof(&mut batch_builder, fetch_eof)?;
if force_commit {
batch_builder.force_commit()
}
if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT {
break;
}
None => {
warn!("received empty fetch message");
continue;
}
MultiFetchMessage::Eof {
fetch_eof,
force_commit,
} => {
self.process_fetch_eof(&mut batch_builder, fetch_eof)?;
if force_commit {
batch_builder.force_commit()
}
}
}
MultiFetchMessage::Empty => {
warn!("received empty fetch message");
continue;
}
},
Ok(Err(fetch_stream_error)) => {
self.process_fetch_stream_error(&mut batch_builder, fetch_stream_error)?;
}
Expand Down
70 changes: 42 additions & 28 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,6 @@ impl fmt::Debug for FetchStreamTask {
}
}

/// Wrapper around [`FetchMessage`] that keeps track of both the amount of data
/// in flight and its origin
#[derive(Debug)]
struct TrackedFetchMessage {
/// the ingester from which the message was fetched
source_node_id: NodeId,
/// the actual message and a tracker of the amount of in flight data
fetch_message: InFlightValue<FetchMessage>,
/// a guard to track the ingesters that still have in flight data
_tracked_ingesters_ref: Arc<Vec<NodeId>>,
}

pub struct MultiFetchMessage {
pub force_commit: bool,
pub fetch_message: FetchMessage,
}

impl FetchStreamTask {
pub fn spawn(
open_fetch_stream_request: OpenFetchStreamRequest,
Expand Down Expand Up @@ -282,6 +265,28 @@ pub struct FetchStreamError {
pub ingest_error: IngestV2Error,
}

/// Wrapper around [`FetchMessage`] that keeps track of both the amount of data
/// in flight and its origin
#[derive(Debug)]
struct TrackedFetchMessage {
/// the ingester from which the message was fetched
source_node_id: NodeId,
/// the actual message and a tracker of the amount of in flight data
fetch_message: InFlightValue<FetchMessage>,
/// a guard to track the ingesters that still have in flight data
_tracked_ingesters_ref: Arc<Vec<NodeId>>,
}

pub enum MultiFetchMessage {
Payload(FetchPayload),
Eof {
fetch_eof: FetchEof,
force_commit: bool,
},
/// Fetch messages should not be empty
Empty,
}

/// Combines multiple fetch streams originating from different ingesters into a single stream. It
/// tolerates the failure of ingesters and automatically fails over to replica shards.
pub struct MultiFetchStream {
Expand Down Expand Up @@ -409,17 +414,19 @@ impl MultiFetchStream {
let fetch_message = fetch_message.into_inner();
drop(_tracked_ingesters_ref);

let force_commit =
if let Some(fetch_message::Message::Eof(fetch_eof)) = &fetch_message.message {
fetch_eof.is_decommissioning && !self.active_ingesters.contains(&source_node_id)
} else {
false
};
let multi_fetch_message = match fetch_message.message {
Some(fetch_message::Message::Payload(fetch_payload)) => {
MultiFetchMessage::Payload(fetch_payload)
}
Some(fetch_message::Message::Eof(fetch_eof)) => MultiFetchMessage::Eof {
force_commit: fetch_eof.is_decommissioning
&& !self.active_ingesters.contains(&source_node_id),
fetch_eof,
},
None => MultiFetchMessage::Empty,
};

Ok(MultiFetchMessage {
force_commit,
fetch_message,
})
Ok(multi_fetch_message)
}

/// Resets the stream by aborting all the active fetch tasks and dropping all queued responses.
Expand Down Expand Up @@ -712,7 +719,13 @@ pub(super) mod tests {

impl From<MultiFetchMessage> for FetchMessage {
fn from(val: MultiFetchMessage) -> Self {
val.fetch_message
match val {
MultiFetchMessage::Payload(fetch_payload) => {
FetchMessage::new_payload(fetch_payload)
}
MultiFetchMessage::Eof { fetch_eof, .. } => FetchMessage::new_eof(fetch_eof),
MultiFetchMessage::Empty => panic!("fetch messages should not be empty"),
}
}
}

Expand Down Expand Up @@ -924,6 +937,7 @@ pub(super) mod tests {
assert_eq!(fetch_eof.source_id, source_id);
assert_eq!(fetch_eof.shard_id(), shard_id);
assert_eq!(fetch_eof.eof_position, Some(Position::eof(3u64)));
assert_eq!(fetch_eof.is_decommissioning, false);

fetch_task_handle.await.unwrap();
}
Expand Down

0 comments on commit e939ff2

Please sign in to comment.