Skip to content

Commit

Permalink
Add retry logic for router (#4027)
Browse files Browse the repository at this point in the history
* Add retry logic for router

* Fix various issues
  • Loading branch information
guilload authored Oct 27, 2023
1 parent 572d6d5 commit 742347b
Show file tree
Hide file tree
Showing 29 changed files with 1,177 additions and 418 deletions.
91 changes: 45 additions & 46 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 21 additions & 12 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,12 @@ fn convert_metastore_error<T>(
| MetastoreError::JsonDeserializeError { .. }
| MetastoreError::JsonSerializeError { .. }
| MetastoreError::NotFound(_) => true,
MetastoreError::Unavailable(_)
MetastoreError::Connection { .. }
| MetastoreError::Db { .. }
| MetastoreError::InconsistentControlPlaneState { .. }
| MetastoreError::Internal { .. }
| MetastoreError::Io { .. }
| MetastoreError::Connection { .. }
| MetastoreError::Db { .. } => false,
| MetastoreError::Unavailable(_) => false,
};
if is_transaction_certainly_aborted {
// If the metastore transaction is certain to have been aborted,
Expand Down Expand Up @@ -363,10 +364,22 @@ impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {
request: GetOrCreateOpenShardsRequest,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
Ok(self
let response = match self
.ingest_controller
.get_or_create_open_shards(request, &mut self.model, ctx.progress())
.await)
.await
{
Ok(response) => response,
Err(ControlPlaneError::Metastore(metastore_error)) => {
return convert_metastore_error(metastore_error);
}
Err(control_plane_error) => {
return Ok(Err(control_plane_error));
}
};
// TODO: Why do we return an error if the indexing scheduler fails?
self.indexing_scheduler.on_index_change(&self.model).await?;
Ok(Ok(response))
}
}

Expand All @@ -379,7 +392,7 @@ mod tests {
ListIndexesMetadataResponseExt,
};
use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest;
use quickwit_proto::ingest::Shard;
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore::{
EntityKind, ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest,
ListShardsResponse, ListShardsSubresponse, MetastoreError, SourceType,
Expand Down Expand Up @@ -641,7 +654,7 @@ mod tests {
}

#[tokio::test]
async fn test_control_plane_get_open_shards() {
async fn test_control_plane_get_or_create_open_shards() {
let universe = Universe::with_accelerated_time();

let cluster_id = "test-cluster".to_string();
Expand Down Expand Up @@ -675,6 +688,7 @@ mod tests {
index_uid: "test-index:0".to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
shard_id: 1,
shard_state: ShardState::Open as i32,
..Default::default()
}],
next_shard_id: 2,
Expand Down Expand Up @@ -716,11 +730,6 @@ mod tests {
universe.assert_quit().await;
}

#[tokio::test]
async fn test_control_plane_close_shards() {
// TODO: Write test when the RPC is actually called by ingesters.
}

#[tokio::test]
async fn test_control_plane_supervision_reload_from_metastore() {
let universe = Universe::default();
Expand Down
Loading

0 comments on commit 742347b

Please sign in to comment.