Skip to content

Commit

Permalink
Use awaitility to read data in Kafka buffer tests to fix flakiness (o…
Browse files Browse the repository at this point in the history
…pensearch-project#4703)

Use awaitility to read data in KafkaBufferIT to promote stability and speed of execution. Contributes toward opensearch-project#4168

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Sep 4, 2024
1 parent 7f7d2dd commit 82d811a
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/kafka-plugin-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
run: |
echo 'KAFKA_VERSION=${{ matrix.kafka }}' > data-prepper-plugins/kafka-plugins/src/integrationTest/resources/kafka/.env
docker compose --project-directory data-prepper-plugins/kafka-plugins/src/integrationTest/resources/kafka/zookeeper --env-file data-prepper-plugins/kafka-plugins/src/integrationTest/resources/kafka/.env up -d
sleep 10
sleep 2
- name: Wait for Kafka
run: |
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaStartIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void waitForKafka() {
try (AdminClient adminClient = AdminClient.create(props)) {
await().atMost(Duration.ofMinutes(3))
.pollDelay(Duration.ofSeconds(2))
.untilAsserted(() -> adminClient.listTopics().names().get());
.until(() -> adminClient.listTopics().names().get() != null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer;
import org.opensearch.dataprepper.plugins.kafka.util.TestProducer;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -64,6 +62,7 @@
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper.awaitRead;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

@ExtendWith(MockitoExtension.class)
Expand All @@ -74,8 +73,6 @@ public class KafkaBufferIT {

private KafkaBufferConfig kafkaBufferConfig;
@Mock
private PluginFactory pluginFactory;
@Mock
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
private AcknowledgementSet acknowledgementSet;
Expand All @@ -95,9 +92,7 @@ void setUp() {
random = new Random();
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
acknowledgementSet = mock(AcknowledgementSet.class);
lenient().doAnswer((a) -> {
return null;
}).when(acknowledgementSet).complete();
lenient().doAnswer((a) -> null).when(acknowledgementSet).complete();
lenient().when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet);
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

Expand Down Expand Up @@ -142,7 +137,7 @@ void write_and_read() throws TimeoutException {
Record<Event> record = createRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -183,7 +178,7 @@ void write_and_read_max_request_test() throws TimeoutException, NoSuchFieldExcep
Record<Event> record = createLargeRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -213,7 +208,7 @@ void writeBigJson_and_read() throws Exception {
inputJson += "]";
objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -243,7 +238,7 @@ void writeMultipleSmallJson_and_read() throws Exception {
objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000);
}

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -273,7 +268,7 @@ void writeBytes_and_read() throws Exception {
final String key = UUID.randomUUID().toString();
objectUnderTest.writeBytes(bytes, key, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -398,7 +393,7 @@ void write_and_read_encrypted() throws TimeoutException {
Record<Event> record = createRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -526,7 +521,7 @@ void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeExcepti

testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray());

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper.awaitRead;

@ExtendWith(MockitoExtension.class)
public class KafkaBuffer_KmsIT {
Expand Down Expand Up @@ -177,7 +178,7 @@ void write_and_read_encrypted() throws TimeoutException {
Record<Event> record = createRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -216,7 +217,7 @@ void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeExcepti

testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray());

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.buffer;

import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;

import static org.awaitility.Awaitility.await;

class ReadBufferHelper {
static Map.Entry<Collection<Record<Event>>, CheckpointState> awaitRead(final KafkaBuffer objectUnderTest) {
final Map.Entry<Collection<Record<Event>>, CheckpointState>[] lastReadResult = new Map.Entry[1];
await()
.atMost(Duration.ofSeconds(30))
.until(() -> {
lastReadResult[0] = objectUnderTest.read(500);
return lastReadResult[0] != null && lastReadResult[0].getKey() != null && lastReadResult[0].getKey().size() >= 1;
});
return lastReadResult[0];
}
}

0 comments on commit 82d811a

Please sign in to comment.