diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index c87e7d277a2..c01605c017a 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -506,6 +506,9 @@ impl Source for IngestSource { num_millis=%now.elapsed().as_millis(), "Sending doc batch to indexer." ); + if !self.fetch_stream.has_active_shard_subscriptions() { + batch_builder.force_commit(); + } let message = batch_builder.build(); ctx.send_message(doc_processor_mailbox, message).await?; } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 53f6b0aeeea..65739d89242 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -294,6 +294,14 @@ impl MultiFetchStream { self.fetch_message_tx.clone() } + pub fn has_active_shard_subscriptions(&self) -> bool { + tracing::info!( + tx_count = self.fetch_message_tx.strong_count(), + "has_active_shard_subscriptions" + ); + self.fetch_message_tx.strong_count() > 1 + } + /// Subscribes to a shard and fails over to the replica if an error occurs. #[allow(clippy::too_many_arguments)] pub async fn subscribe( @@ -1864,8 +1872,9 @@ pub(super) mod tests { let client_id = "test-client".to_string(); let ingester_pool = IngesterPool::default(); let retry_params = RetryParams::for_test(); - let _multi_fetch_stream = + let multi_fetch_stream = MultiFetchStream::new(self_node_id, client_id, ingester_pool, retry_params); + assert!(!multi_fetch_stream.has_active_shard_subscriptions()) // TODO: Backport from original branch. } } diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs index 5b79ff021b9..7ba381a35ab 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::HashSet; use std::time::Duration; use futures_util::FutureExt; @@ -602,7 +603,6 @@ async fn test_shutdown_single_node() { sandbox.enable_ingest_v2(); - // Create index sandbox .indexer_rest_client .indexes() @@ -625,41 +625,29 @@ async fn test_shutdown_single_node() { .await .unwrap(); - // Ensure that the index is ready to accept records. - ingest_with_retry( - &sandbox.indexer_rest_client, - index_id, - ingest_json!({"body": "one"}), - CommitType::Force, - ) - .await - .unwrap(); - - // Test force commit sandbox .indexer_rest_client .ingest( index_id, - ingest_json!({"body": "two"}), + ingest_json!({"body": "commit me before shutdown"}), None, None, - CommitType::Force, + CommitType::Auto, ) .await .unwrap(); + // assert that we don't wait the commit timeout (60s) tokio::time::timeout(std::time::Duration::from_secs(10), sandbox.shutdown()) .await .unwrap() .unwrap(); } -/// When the control plane is on a different node, it might be shutdown -/// before the ingest pipeline is scheduled on the indexer. #[tokio::test] -async fn test_shutdown_control_plane_early_shutdown() { +async fn test_shutdown_indexer_first() { initialize_tests(); - let sandbox = ClusterSandboxBuilder::default() + let mut sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Indexer]) .add_node([ QuickwitService::ControlPlane, @@ -671,10 +659,8 @@ async fn test_shutdown_control_plane_early_shutdown() { .await; let index_id = "test_shutdown_separate_indexer"; - // TODO: make this test work with ingest v2 (#5068) - // sandbox.enable_ingest_v2(); + sandbox.enable_ingest_v2(); - // Create index sandbox .indexer_rest_client .indexes() @@ -688,7 +674,7 @@ async fn test_shutdown_control_plane_early_shutdown() { - name: body type: text indexing_settings: - commit_timeout_secs: 1 + commit_timeout_secs: 60 "# ), ConfigFormat::Yaml, @@ -697,28 +683,40 @@ async fn test_shutdown_control_plane_early_shutdown() { .await .unwrap(); - // Ensure that the index is ready to accept records. - ingest_with_retry( - &sandbox.indexer_rest_client, - index_id, - ingest_json!({"body": "one"}), - CommitType::Force, + sandbox + .indexer_rest_client + .ingest( + index_id, + ingest_json!({"body": "commit me before shutdown"}), + None, + None, + CommitType::Auto, + ) + .await + .unwrap(); + + // assert that we don't wait the commit timeout (60s) + tokio::time::timeout( + std::time::Duration::from_secs(10), + sandbox.shutdown_services(&HashSet::from_iter([QuickwitService::Indexer])), ) .await + .unwrap() .unwrap(); + // still, the doc should be committed during the graceful shutdown + sandbox.assert_hit_count(index_id, "body:before", 1).await; + tokio::time::timeout(std::time::Duration::from_secs(10), sandbox.shutdown()) .await .unwrap() .unwrap(); } -/// When the control plane/metastore are shutdown before the indexer, the -/// indexer shutdown should not hang indefinitely #[tokio::test] -async fn test_shutdown_separate_indexer() { +async fn test_shutdown_metastore_first() { initialize_tests(); - let sandbox = ClusterSandboxBuilder::default() + let mut sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Indexer]) .add_node([ QuickwitService::ControlPlane, @@ -731,6 +729,8 @@ async fn test_shutdown_separate_indexer() { let index_id = "test_shutdown_separate_indexer"; // TODO: make this test work with ingest v2 (#5068) + // When the control plane/metastore are shutdown before the indexer, the + // indexer shutdown should not hang indefinitely // sandbox.enable_ingest_v2(); // Create index @@ -766,10 +766,18 @@ async fn test_shutdown_separate_indexer() { .await .unwrap(); - sandbox - .wait_for_splits(index_id, Some(vec![SplitState::Published]), 1) - .await - .unwrap(); + tokio::time::timeout( + std::time::Duration::from_secs(10), + sandbox.shutdown_services(&HashSet::from_iter([ + QuickwitService::ControlPlane, + QuickwitService::Searcher, + QuickwitService::Metastore, + QuickwitService::Janitor, + ])), + ) + .await + .unwrap() + .unwrap(); tokio::time::timeout(std::time::Duration::from_secs(10), sandbox.shutdown()) .await