diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportSchedulerTest.java index 21462d252a..d4e4d9813d 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportSchedulerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportSchedulerTest.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState; +import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; @@ -25,6 +26,7 @@ import java.util.concurrent.Executors; import java.util.stream.Collectors; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -108,7 +110,9 @@ void test_export_run() throws InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> exportScheduler.run()); - Thread.sleep(100); + await() + .atMost(Duration.ofSeconds(2)) + .untilAsserted(() -> verify(coordinator, times(2)).createPartition(any())); executorService.shutdownNow(); @@ -161,7 +165,9 @@ void test_export_run_multiple_partitions() throws InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> exportScheduler.run()); - Thread.sleep(100); + await() + .atMost(Duration.ofSeconds(2)) + .untilAsserted(() -> verify(coordinator, times(4)).createPartition(any())); executorService.shutdownNow(); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorkerTest.java index 1fa4185ff3..916ac157d9 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportWorkerTest.java @@ -28,12 +28,14 @@ import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig; import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition; +import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -160,7 +162,9 @@ public void test_shouldProcessPartitionSuccess(final String partitionKey) throws } }); - Thread.sleep(100); + await() + .atMost(Duration.ofSeconds(2)) + .untilAsserted(() -> verify(mongoClient).getDatabase(eq("test"))); executorService.shutdownNow(); // Then dependencies are called verify(mongoClient).getDatabase(eq("test"));