diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index b4232de067..24f2fa62e0 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -8,6 +8,21 @@ plugins { id 'com.google.protobuf' version '0.9.4' } +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:buffer-common') @@ -52,7 +67,6 @@ dependencies { testImplementation 'org.apache.kafka:kafka_2.13:3.4.0:test' testImplementation 'org.apache.curator:curator-test:5.5.0' testImplementation 'io.confluent:kafka-schema-registry:7.4.0' - testImplementation testLibs.junit.vintage testImplementation 'org.apache.kafka:kafka-clients:3.4.0:test' testImplementation 'org.apache.kafka:connect-json:3.4.0' testImplementation('com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39') @@ -64,6 +78,8 @@ dependencies { testImplementation libs.commons.io testImplementation libs.armeria.grpc + integrationTestImplementation testLibs.junit.vintage + constraints { implementation('org.mozilla:rhino') { version { @@ -85,21 +101,6 @@ test { useJUnitPlatform() } -sourceSets { - integrationTest { - java { - compileClasspath += main.output + test.output - runtimeClasspath += main.output + test.output - srcDir file('src/integrationTest/java') - } - } -} - -configurations { - integrationTestImplementation.extendsFrom testImplementation - integrationTestRuntime.extendsFrom testRuntime -} - task integrationTest(type: Test) { group = 'verification' testClassesDirs = sourceSets.integrationTest.output.classesDirs diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java index fc75bfcdff..6d009aec14 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java @@ -161,7 +161,7 @@ private void checkTopicCreationCriteriaAndCreateTopic() { } - public KafkaCustomProducer createProducer() { + private KafkaCustomProducer createProducer() { // TODO: Add the DLQSink here. new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting) return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, pluginFactory, pluginSetting, expressionEvaluator, sinkContext, pluginMetrics, true); } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkConfigTest.java index 1a297b5175..63e08c18ec 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkConfigTest.java @@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.notNullValue; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java index 1984e40516..081655f2a1 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java @@ -14,6 +14,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.MockedConstruction; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -28,6 +29,8 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer; +import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory; import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker; import org.springframework.test.util.ReflectionTestUtils; import org.yaml.snakeyaml.Yaml; @@ -48,7 +51,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -58,10 +63,8 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class KafkaSinkTest { - - - KafkaSink kafkaSink; - + @Mock + KafkaCustomProducer kafkaCustomProducer; KafkaSinkConfig kafkaSinkConfig; @@ -80,8 +83,6 @@ public class KafkaSinkTest { Event event; - KafkaSink spySink; - private static final Integer totalWorkers = 1; MockedStatic executorsMockedStatic; @@ -118,16 +119,11 @@ void setUp() throws Exception { when(pluginSetting.getPipelineName()).thenReturn("Kafka-sink"); event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); when(sinkContext.getTagsTargetKey()).thenReturn("tag"); - kafkaSink = new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactoryMock, pluginMetrics, mock(ExpressionEvaluator.class), sinkContext, awsCredentialsSupplier); - spySink = spy(kafkaSink); executorsMockedStatic = mockStatic(Executors.class); props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9093"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - ReflectionTestUtils.setField(spySink, "executorService", executorService); - - } @AfterEach @@ -135,13 +131,28 @@ public void after() { executorsMockedStatic.close(); } + private KafkaSink createObjectUnderTest() { + final KafkaSink objectUnderTest; + try(final MockedConstruction ignored = mockConstruction(KafkaCustomProducerFactory.class, (mock, context) -> { + when(mock.createProducer(any(), any(), any(), any(), any(), any(), anyBoolean())).thenReturn(kafkaCustomProducer); + })) { + objectUnderTest = new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactoryMock, pluginMetrics, mock(ExpressionEvaluator.class), sinkContext, awsCredentialsSupplier); + } + ReflectionTestUtils.setField(objectUnderTest, "executorService", executorService); + return spy(objectUnderTest); + + } + @Test public void doOutputTest() { ReflectionTestUtils.setField(kafkaSinkConfig, "schemaConfig", null); when(executorService.submit(any(ProducerWorker.class))).thenReturn(futureTask); final Collection records = Arrays.asList(new Record(event)); - spySink.doOutput(records); - verify(spySink).doOutput(records); + final KafkaSink objectUnderTest = createObjectUnderTest(); + + objectUnderTest.doOutput(records); + + verify(objectUnderTest).doOutput(records); } @@ -149,21 +160,24 @@ public void doOutputTest() { public void doOutputExceptionTest() { final Collection records = Arrays.asList(new Record(event)); when(executorService.submit(any(ProducerWorker.class))).thenThrow(new RuntimeException()); - assertThrows(RuntimeException.class, () -> spySink.doOutput(records)); + final KafkaSink objectUnderTest = createObjectUnderTest(); + assertThrows(RuntimeException.class, () -> objectUnderTest.doOutput(records)); } @Test public void doOutputEmptyRecordsTest() { final Collection records = Arrays.asList(); - spySink.doOutput(records); - verify(spySink).doOutput(records); + final KafkaSink objectUnderTest = createObjectUnderTest(); + objectUnderTest.doOutput(records); + verify(objectUnderTest).doOutput(records); } @Test public void shutdownTest() { - spySink.shutdown(); - verify(spySink).shutdown(); + final KafkaSink objectUnderTest = createObjectUnderTest(); + objectUnderTest.shutdown(); + verify(objectUnderTest).shutdown(); } @Test @@ -173,28 +187,31 @@ public void shutdownExceptionTest() throws InterruptedException { when(executorService.awaitTermination( 1000L, TimeUnit.MILLISECONDS)).thenThrow(interruptedException); - spySink.shutdown(); + createObjectUnderTest().shutdown(); } @Test public void doInitializeTest() { - spySink.doInitialize(); - verify(spySink).doInitialize(); + final KafkaSink objectUnderTest = createObjectUnderTest(); + objectUnderTest.doInitialize(); + verify(objectUnderTest).doInitialize(); } @Test public void doInitializeNullPointerExceptionTest() { when(Executors.newFixedThreadPool(totalWorkers)).thenThrow(NullPointerException.class); - assertThrows(NullPointerException.class, () -> spySink.doInitialize()); + final KafkaSink objectUnderTest = createObjectUnderTest(); + assertThrows(NullPointerException.class, () -> objectUnderTest.doInitialize()); } @Test public void isReadyTest() { - ReflectionTestUtils.setField(kafkaSink, "sinkInitialized", true); - assertEquals(true, kafkaSink.isReady()); + final KafkaSink objectUnderTest = createObjectUnderTest(); + ReflectionTestUtils.setField(objectUnderTest, "sinkInitialized", true); + assertEquals(true, objectUnderTest.isReady()); } @Test @@ -213,6 +230,8 @@ public void doOutputTestForAutoTopicCreate() { when(executorService.submit(any(ProducerWorker.class))).thenReturn(futureTask); final Collection records = Arrays.asList(new Record(event)); - assertThrows(RuntimeException.class, () -> spySink.doOutput(records)); + final KafkaSink objectUnderTest = createObjectUnderTest(); + + assertThrows(RuntimeException.class, () -> objectUnderTest.doOutput(records)); } }