Skip to content

Commit

Permalink
Greatly reduce the time that the KafkaSinkTest takes by mocking the c…
Browse files Browse the repository at this point in the history
…onstructor. Update the unit tests to JUnit 5. Only the integration tests need JUnit 4 now, but these use quite a few JUnit 4 features. (#3972)

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable committed Jan 17, 2024
1 parent a64f409 commit 334239b
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 43 deletions.
33 changes: 17 additions & 16 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ plugins {
id 'com.google.protobuf' version '0.9.4'
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
Expand Down Expand Up @@ -52,7 +67,6 @@ dependencies {
testImplementation 'org.apache.kafka:kafka_2.13:3.4.0:test'
testImplementation 'org.apache.curator:curator-test:5.5.0'
testImplementation 'io.confluent:kafka-schema-registry:7.4.0'
testImplementation testLibs.junit.vintage
testImplementation 'org.apache.kafka:kafka-clients:3.4.0:test'
testImplementation 'org.apache.kafka:connect-json:3.4.0'
testImplementation('com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39')
Expand All @@ -64,6 +78,8 @@ dependencies {
testImplementation libs.commons.io
testImplementation libs.armeria.grpc

integrationTestImplementation testLibs.junit.vintage

constraints {
implementation('org.mozilla:rhino') {
version {
Expand All @@ -85,21 +101,6 @@ test {
useJUnitPlatform()
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private void checkTopicCreationCriteriaAndCreateTopic() {

}

public KafkaCustomProducer createProducer() {
private KafkaCustomProducer createProducer() {
// TODO: Add the DLQSink here. new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting)
return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, pluginFactory, pluginSetting, expressionEvaluator, sinkContext, pluginMetrics, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
Expand All @@ -28,6 +29,8 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory;
import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker;
import org.springframework.test.util.ReflectionTestUtils;
import org.yaml.snakeyaml.Yaml;
Expand All @@ -48,7 +51,9 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand All @@ -58,10 +63,8 @@
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class KafkaSinkTest {


KafkaSink kafkaSink;

@Mock
KafkaCustomProducer kafkaCustomProducer;

KafkaSinkConfig kafkaSinkConfig;

Expand All @@ -80,8 +83,6 @@ public class KafkaSinkTest {

Event event;

KafkaSink spySink;

private static final Integer totalWorkers = 1;

MockedStatic<Executors> executorsMockedStatic;
Expand Down Expand Up @@ -118,52 +119,65 @@ void setUp() throws Exception {
when(pluginSetting.getPipelineName()).thenReturn("Kafka-sink");
event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
when(sinkContext.getTagsTargetKey()).thenReturn("tag");
kafkaSink = new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactoryMock, pluginMetrics, mock(ExpressionEvaluator.class), sinkContext, awsCredentialsSupplier);
spySink = spy(kafkaSink);
executorsMockedStatic = mockStatic(Executors.class);
props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9093");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
ReflectionTestUtils.setField(spySink, "executorService", executorService);


}

@AfterEach
public void after() {
executorsMockedStatic.close();
}

private KafkaSink createObjectUnderTest() {
final KafkaSink objectUnderTest;
try(final MockedConstruction<KafkaCustomProducerFactory> ignored = mockConstruction(KafkaCustomProducerFactory.class, (mock, context) -> {
when(mock.createProducer(any(), any(), any(), any(), any(), any(), anyBoolean())).thenReturn(kafkaCustomProducer);
})) {
objectUnderTest = new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactoryMock, pluginMetrics, mock(ExpressionEvaluator.class), sinkContext, awsCredentialsSupplier);
}
ReflectionTestUtils.setField(objectUnderTest, "executorService", executorService);
return spy(objectUnderTest);

}

@Test
public void doOutputTest() {
ReflectionTestUtils.setField(kafkaSinkConfig, "schemaConfig", null);
when(executorService.submit(any(ProducerWorker.class))).thenReturn(futureTask);
final Collection records = Arrays.asList(new Record(event));
spySink.doOutput(records);
verify(spySink).doOutput(records);
final KafkaSink objectUnderTest = createObjectUnderTest();

objectUnderTest.doOutput(records);

verify(objectUnderTest).doOutput(records);
}


@Test
public void doOutputExceptionTest() {
final Collection records = Arrays.asList(new Record(event));
when(executorService.submit(any(ProducerWorker.class))).thenThrow(new RuntimeException());
assertThrows(RuntimeException.class, () -> spySink.doOutput(records));
final KafkaSink objectUnderTest = createObjectUnderTest();
assertThrows(RuntimeException.class, () -> objectUnderTest.doOutput(records));
}

@Test
public void doOutputEmptyRecordsTest() {
final Collection records = Arrays.asList();
spySink.doOutput(records);
verify(spySink).doOutput(records);
final KafkaSink objectUnderTest = createObjectUnderTest();
objectUnderTest.doOutput(records);
verify(objectUnderTest).doOutput(records);

}

@Test
public void shutdownTest() {
spySink.shutdown();
verify(spySink).shutdown();
final KafkaSink objectUnderTest = createObjectUnderTest();
objectUnderTest.shutdown();
verify(objectUnderTest).shutdown();
}

@Test
Expand All @@ -173,28 +187,31 @@ public void shutdownExceptionTest() throws InterruptedException {

when(executorService.awaitTermination(
1000L, TimeUnit.MILLISECONDS)).thenThrow(interruptedException);
spySink.shutdown();

createObjectUnderTest().shutdown();
}


@Test
public void doInitializeTest() {
spySink.doInitialize();
verify(spySink).doInitialize();
final KafkaSink objectUnderTest = createObjectUnderTest();
objectUnderTest.doInitialize();
verify(objectUnderTest).doInitialize();
}

@Test
public void doInitializeNullPointerExceptionTest() {
when(Executors.newFixedThreadPool(totalWorkers)).thenThrow(NullPointerException.class);
assertThrows(NullPointerException.class, () -> spySink.doInitialize());
final KafkaSink objectUnderTest = createObjectUnderTest();
assertThrows(NullPointerException.class, () -> objectUnderTest.doInitialize());
}


@Test
public void isReadyTest() {
ReflectionTestUtils.setField(kafkaSink, "sinkInitialized", true);
assertEquals(true, kafkaSink.isReady());
final KafkaSink objectUnderTest = createObjectUnderTest();
ReflectionTestUtils.setField(objectUnderTest, "sinkInitialized", true);
assertEquals(true, objectUnderTest.isReady());
}

@Test
Expand All @@ -213,6 +230,8 @@ public void doOutputTestForAutoTopicCreate() {

when(executorService.submit(any(ProducerWorker.class))).thenReturn(futureTask);
final Collection records = Arrays.asList(new Record(event));
assertThrows(RuntimeException.class, () -> spySink.doOutput(records));
final KafkaSink objectUnderTest = createObjectUnderTest();

assertThrows(RuntimeException.class, () -> objectUnderTest.doOutput(records));
}
}

0 comments on commit 334239b

Please sign in to comment.