Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run tests on the current JVM / Gradle 8.8 #4730

Merged
merged 6 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ subprojects {

test {
useJUnitPlatform()
javaLauncher = javaToolchains.launcherFor {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tells Gradle to use the current JVM version for running the tests. This is what now allows our tests to run on Java 17 or 21.

languageVersion = JavaLanguageVersion.current()
}
reports {
junitXml.required
html.required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public abstract class AbstractSink<T extends Record<?>> implements Sink<T> {
private Thread retryThread;
private int maxRetries;
private int waitTimeMs;
private SinkThread sinkThread;

public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) {
this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
Expand All @@ -51,7 +52,8 @@ public void initialize() {
// the exceptions which are not retryable.
doInitialize();
if (!isReady() && retryThread == null) {
retryThread = new Thread(new SinkThread(this, maxRetries, waitTimeMs));
sinkThread = new SinkThread(this, maxRetries, waitTimeMs);
retryThread = new Thread(sinkThread);
retryThread.start();
}
}
Expand All @@ -76,7 +78,7 @@ public void output(Collection<T> records) {
@Override
public void shutdown() {
if (retryThread != null) {
retryThread.stop();
Copy link
Member Author

@dlvenable dlvenable Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java 21, Thread::stop always throws an UnsupportedOperationException. So I've changed this approach to tell the runnable below to stop itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have other locations in plugins where threads are stopped like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran IntelliJ's "Find Usages" feature on Thread::stop. It found this usage, but found no others. So I don't believe so.

sinkThread.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class SinkThread implements Runnable {
private int maxRetries;
private int waitTimeMs;

private volatile boolean isStopped = false;

public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
this.sink = sink;
this.maxRetries = maxRetries;
Expand All @@ -19,11 +21,15 @@ public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
@Override
public void run() {
int numRetries = 0;
while (!sink.isReady() && numRetries++ < maxRetries) {
while (!sink.isReady() && numRetries++ < maxRetries && !isStopped) {
try {
Thread.sleep(waitTimeMs);
sink.doInitialize();
} catch (InterruptedException e){}
}
}

public void stop() {
isStopped = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventHandle;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Arrays;
Expand All @@ -30,6 +25,12 @@
import java.util.UUID;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class AbstractSinkTest {
private int count;
Expand Down Expand Up @@ -71,13 +72,13 @@ void testMetrics() {
}

@Test
void testSinkNotReady() {
void testSinkNotReady() throws InterruptedException {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
AbstractSink<Record<String>> abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
abstractSink.initialize();
assertEquals(abstractSink.isReady(), false);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.RUNNABLE);
Expand All @@ -87,7 +88,10 @@ void testSinkNotReady() {
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
int initCountBeforeShutdown = abstractSink.initCount;
abstractSink.shutdown();
Thread.sleep(200);
assertThat(abstractSink.initCount, equalTo(initCountBeforeShutdown));
}

@Test
Expand Down
3 changes: 0 additions & 3 deletions data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ dependencies {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, in Java 17 and 21, this results in all the logs going to stdout in the Gradle task. This makes understanding the build very difficult as there is far too much noise.

testImplementation testLibs.spring.test
implementation libs.armeria.core
implementation libs.armeria.grpc
Expand Down Expand Up @@ -89,8 +88,6 @@ task integrationTest(type: Test) {

classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'

filter {
includeTestsMatching '*IT'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.UUID;
import java.util.stream.Stream;

Expand Down Expand Up @@ -218,7 +218,7 @@ static class SomeUnknownTypesArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
arguments(Random.class),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java 21 (and maybe 17?), Random is final and this causes Mockito to fail when it mocks it. So, we test on the Timer class instead.

arguments(Timer.class),
arguments(InputStream.class),
arguments(File.class)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
return Stream.of(
Arguments.of(0, randomInt + 1, 0.0),
Arguments.of(1, 100, 1.0),
Arguments.of(randomInt, randomInt, 100.0),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can sometimes be zero, and this will cause the test to fail. So, ensure it is not 0.

Arguments.of(randomInt + 1, randomInt + 1, 100.0),
Arguments.of(randomInt, randomInt + 250, ((double) randomInt / (randomInt + 250)) * 100),
Arguments.of(6, 9, 66.66666666666666),
Arguments.of(531, 1000, 53.1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Copy link
Member Author

@dlvenable dlvenable Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly running the IntelliJ formatter. The only other changes are the truncatedTo. See comments below.

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import org.mockito.Mock;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
Expand All @@ -28,6 +31,7 @@
import java.io.ByteArrayInputStream;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -56,7 +60,7 @@ public EventJsonInputCodec createInputCodec() {
@ParameterizedTest
@ValueSource(strings = {"", "{}"})
public void emptyTest(String input) throws Exception {
input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["+input+"]}";
input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[" + input + "]}";
ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes());
inputCodec = createInputCodec();
Consumer<Record<Event>> consumer = mock(Consumer.class);
Expand All @@ -70,15 +74,15 @@ public void inCompatibleVersionTest() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In later versions of Java on Linux, there is an issue where the nanosecond resolution results in test failures.

Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\"3.0\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"3.0\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
Expand All @@ -95,24 +99,24 @@ public void basicTest() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
inputStream = new ByteArrayInputStream(input.getBytes());
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(inputStream, records::add);
assertThat(records.size(), equalTo(2));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -126,24 +130,24 @@ public void test_with_timeReceivedOverridden() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now().minusSeconds(5);
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS).minusSeconds(5);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
inputStream = new ByteArrayInputStream(input.getBytes());
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(inputStream, records::add);
assertThat(records.size(), equalTo(2));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), not(equalTo(startTime)));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -159,7 +163,7 @@ private Event createEvent(final Map<String, Object> json, final Instant timeRece
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent)logBuilder.build();
final JacksonEvent event = (JacksonEvent) logBuilder.build();

return event;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import org.mockito.Mock;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -22,6 +25,7 @@
import org.opensearch.dataprepper.model.log.JacksonLog;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -64,7 +68,7 @@ public void basicTest() throws Exception {
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);

Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
outputCodec = createOutputCodec();
inputCodec = createInputCodec();
Expand All @@ -75,8 +79,8 @@ public void basicTest() throws Exception {
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(1));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -90,7 +94,7 @@ public void multipleEventsTest() throws Exception {
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);

Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
outputCodec = createOutputCodec();
inputCodec = createInputCodec();
Expand All @@ -103,8 +107,8 @@ public void multipleEventsTest() throws Exception {
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(3));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -122,7 +126,7 @@ public void extendedTest() throws Exception {

Set<String> tags = Set.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
List<String> tagsList = tags.stream().collect(Collectors.toList());
Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
Instant origTime = startTime.minusSeconds(5);
event.getMetadata().setExternalOriginationTime(origTime);
Expand All @@ -135,11 +139,11 @@ public void extendedTest() throws Exception {
outputCodec.complete(outputStream);
assertThat(outputCodec.getExtension(), equalTo(EventJsonOutputCodec.EVENT_JSON));
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(1));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags(), equalTo(tags));
Expand All @@ -157,7 +161,7 @@ private Event createEvent(final Map<String, Object> json, final Instant timeRece
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent)logBuilder.build();
final JacksonEvent event = (JacksonEvent) logBuilder.build();

return event;
}
Expand Down
Loading
Loading