Skip to content

Commit

Permalink
Fix flaky upsert integration tests (#14447)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Nov 14, 2024
1 parent 476415f commit a49f312
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 319 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
Expand Down Expand Up @@ -568,61 +566,22 @@ protected List<File> unpackTarData(String tarFileName, File outputDir)
return TarCompressionUtils.untar(inputStream, outputDir);
}

/**
* Pushes the data in the given Avro files into a Kafka stream.
*
* @param avroFiles List of Avro files
*/
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, "localhost:" + getKafkaPort(), getKafkaTopic(),
getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), injectTombstones());
}

/**
* Pushes the data in the given Avro files into a Kafka stream.
*
* @param csvFile List of CSV strings
*/
protected void pushCsvIntoKafka(File csvFile, String kafkaTopic, @Nullable Integer partitionColumnIndex)
throws Exception {
String kafkaBroker = "localhost:" + getKafkaPort();
StreamDataProducer producer = null;
try {
producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
getDefaultKafkaProducerProperties(kafkaBroker));
ClusterIntegrationTestUtils.pushCsvIntoKafka(csvFile, kafkaTopic, partitionColumnIndex, injectTombstones(),
producer);
} catch (Exception e) {
if (producer != null) {
producer.close();
}
throw e;
}
ClusterIntegrationTestUtils.pushCsvIntoKafka(csvFile, "localhost:" + getKafkaPort(), kafkaTopic,
partitionColumnIndex, injectTombstones());
}

protected void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic, @Nullable Integer partitionColumnIndex) {
String kafkaBroker = "localhost:" + getKafkaPort();
StreamDataProducer producer = null;
try {
producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
getDefaultKafkaProducerProperties(kafkaBroker));
ClusterIntegrationTestUtils.pushCsvIntoKafka(csvRecords, kafkaTopic, partitionColumnIndex, injectTombstones(),
producer);
} catch (Exception e) {
if (producer != null) {
producer.close();
}
}
}

private Properties getDefaultKafkaProducerProperties(String kafkaBroker) {
Properties properties = new Properties();
properties.put("metadata.broker.list", kafkaBroker);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");
return properties;
protected void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic, @Nullable Integer partitionColumnIndex)
throws Exception {
ClusterIntegrationTestUtils.pushCsvIntoKafka(csvRecords, "localhost:" + getKafkaPort(), kafkaTopic,
partitionColumnIndex, injectTombstones());
}

protected boolean injectTombstones() {
Expand Down Expand Up @@ -661,14 +620,21 @@ protected List<File> getRealtimeAvroFiles(List<File> avroFiles, int numRealtimeS
}

protected void startKafka() {
startKafka(KafkaStarterUtils.DEFAULT_KAFKA_PORT);
startKafkaWithoutTopic();
createKafkaTopic(getKafkaTopic());
}

protected void startKafkaWithoutTopic() {
startKafkaWithoutTopic(KafkaStarterUtils.DEFAULT_KAFKA_PORT);
}

protected void startKafkaWithoutTopic(int port) {
_kafkaStarters = KafkaStarterUtils.startServers(getNumKafkaBrokers(), port, getKafkaZKAddress(),
KafkaStarterUtils.getDefaultKafkaConfiguration());
}

protected void startKafka(int port) {
Properties kafkaConfig = KafkaStarterUtils.getDefaultKafkaConfiguration();
_kafkaStarters = KafkaStarterUtils.startServers(getNumKafkaBrokers(), port, getKafkaZKAddress(), kafkaConfig);
_kafkaStarters.get(0)
.createTopic(getKafkaTopic(), KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
protected void createKafkaTopic(String topic) {
_kafkaStarters.get(0).createTopic(topic, KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
}

protected void stopKafka() {
Expand Down Expand Up @@ -706,7 +672,7 @@ protected void waitForAllDocsLoaded(long timeoutMs)
}

protected void waitForDocsLoaded(long timeoutMs, boolean raiseError, String tableName) {
final long countStarResult = getCountStarResult();
long countStarResult = getCountStarResult();
TestUtils.waitForCondition(() -> getCurrentCountStarResult(tableName) == countStarResult, 100L, timeoutMs,
"Failed to load " + countStarResult + " documents", raiseError, Duration.ofMillis(timeoutMs / 10));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.request.PinotQuery;
Expand Down Expand Up @@ -361,16 +360,32 @@ public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig,
TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile);
}

public static StreamDataProducer getKafkaProducer(String kafkaBroker)
throws Exception {
Properties properties = new Properties();
properties.put("metadata.broker.list", kafkaBroker);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");
return StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
}

/**
* Push the records from the given Avro files into a Kafka stream.
*
* @param csvFile CSV File name
* @param kafkaTopic Kafka topic
* @param partitionColumnIndex Optional Index of the partition column
* @throws Exception
* Push the records from the given CSV file into a Kafka stream.
*/
public static void pushCsvIntoKafka(File csvFile, String kafkaTopic,
@Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
public static void pushCsvIntoKafka(File csvFile, String kafkaBroker, String kafkaTopic,
@Nullable Integer partitionColumnIndex, boolean injectTombstones)
throws Exception {
try (StreamDataProducer producer = getKafkaProducer(kafkaBroker)) {
pushCsvIntoKafka(csvFile, kafkaTopic, partitionColumnIndex, injectTombstones, producer);
}
}

/**
* Push the records from the given CSV file into a Kafka stream.
*/
public static void pushCsvIntoKafka(File csvFile, String kafkaTopic, @Nullable Integer partitionColumnIndex,
boolean injectTombstones, StreamDataProducer producer)
throws Exception {
long counter = 0;
if (injectTombstones) {
Expand All @@ -380,7 +395,7 @@ public static void pushCsvIntoKafka(File csvFile, String kafkaTopic,
producer.produce(kafkaTopic, Longs.toByteArray(counter++), null);
}
}
CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
CSVFormat csvFormat = CSVFormat.Builder.create().setSkipHeaderRecord(true).build();
try (CSVParser parser = CSVParser.parse(csvFile, StandardCharsets.UTF_8, csvFormat)) {
for (CSVRecord csv : parser) {
byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(counter++)
Expand All @@ -396,12 +411,18 @@ public static void pushCsvIntoKafka(File csvFile, String kafkaTopic,
}

/**
* Push the records from the given Avro files into a Kafka stream.
*
* @param csvRecords List of CSV record string
* @param kafkaTopic Kafka topic
* @param partitionColumnIndex Optional Index of the partition column
* @throws Exception
* Push the records from the given CSV file into a Kafka stream.
*/
public static void pushCsvIntoKafka(List<String> csvRecords, String kafkaBroker, String kafkaTopic,
@Nullable Integer partitionColumnIndex, boolean injectTombstones)
throws Exception {
try (StreamDataProducer producer = getKafkaProducer(kafkaBroker)) {
pushCsvIntoKafka(csvRecords, kafkaTopic, partitionColumnIndex, injectTombstones, producer);
}
}

/**
* Push the CSV records into a Kafka stream.
*/
public static void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic,
@Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
Expand All @@ -414,7 +435,7 @@ public static void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic,
producer.produce(kafkaTopic, Longs.toByteArray(counter++), null);
}
}
CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
CSVFormat csvFormat = CSVFormat.Builder.create().setSkipHeaderRecord(true).build();
for (String recordCsv : csvRecords) {
try (CSVParser parser = CSVParser.parse(recordCsv, csvFormat)) {
for (CSVRecord csv : parser) {
Expand All @@ -433,28 +454,23 @@ public static void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic,

/**
* Push the records from the given Avro files into a Kafka stream.
*
* @param avroFiles List of Avro files
* @param kafkaBroker Kafka broker config
* @param kafkaTopic Kafka topic
* @param maxNumKafkaMessagesPerBatch Maximum number of Kafka messages per batch
* @param header Optional Kafka message header
* @param partitionColumn Optional partition column
* @throws Exception
*/
public static void pushAvroIntoKafka(List<File> avroFiles, String kafkaBroker, String kafkaTopic,
int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable String partitionColumn,
boolean injectTombstones)
throws Exception {
Properties properties = new Properties();
properties.put("metadata.broker.list", kafkaBroker);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");

StreamDataProducer producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
try (StreamDataProducer producer = getKafkaProducer(kafkaBroker)) {
pushAvroIntoKafka(avroFiles, kafkaTopic, maxNumKafkaMessagesPerBatch, header, partitionColumn, injectTombstones,
producer);
}
}

/**
* Push the records from the given Avro files into a Kafka stream.
*/
public static void pushAvroIntoKafka(List<File> avroFiles, String kafkaTopic, int maxNumKafkaMessagesPerBatch,
@Nullable byte[] header, @Nullable String partitionColumn, boolean injectTombstones, StreamDataProducer producer)
throws Exception {
long counter = 0;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
if (injectTombstones) {
Expand Down Expand Up @@ -487,29 +503,31 @@ public static void pushAvroIntoKafka(List<File> avroFiles, String kafkaBroker, S
}

/**
* Push the records from the given Avro files into a Kafka stream.
*
* @param avroFiles List of Avro files
* @param kafkaBroker Kafka broker config
* @param kafkaTopic Kafka topic
* @param header Optional Kafka message header
* @param partitionColumn Optional partition column
* @param commit if the transaction commits or aborts
* @throws Exception
* Push the records from the given Avro files into a Kafka stream with transaction.
*/
public static void pushAvroIntoKafkaWithTransaction(List<File> avroFiles, String kafkaBroker, String kafkaTopic,
int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable String partitionColumn, boolean commit)
throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBroker);
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("request.required.acks", "1");
props.put("transactional.id", "test-transaction");
props.put("transaction.state.log.replication.factor", "2");

Producer<byte[], byte[]> producer = new KafkaProducer<>(props);
// initiate transaction.
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaBroker);
properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put("request.required.acks", "1");
properties.put("transactional.id", "test-transaction");
properties.put("transaction.state.log.replication.factor", "2");
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
pushAvroIntoKafkaWithTransaction(avroFiles, kafkaTopic, maxNumKafkaMessagesPerBatch, header, partitionColumn,
commit, producer);
}
}

/**
* Push the records from the given Avro files into a Kafka stream with transaction.
*/
public static void pushAvroIntoKafkaWithTransaction(List<File> avroFiles, String kafkaTopic,
int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable String partitionColumn, boolean commit,
KafkaProducer<byte[], byte[]> producer)
throws Exception {
producer.initTransactions();
producer.beginTransaction();
long counter = 0;
Expand Down Expand Up @@ -543,31 +561,26 @@ public static void pushAvroIntoKafkaWithTransaction(List<File> avroFiles, String
}

/**
* Push random generated
*
* @param avroFile Sample Avro file used to extract the Avro schema
* @param kafkaBroker Kafka broker config
* @param kafkaTopic Kafka topic
* @param numKafkaMessagesToPush Number of Kafka messages to push
* @param maxNumKafkaMessagesPerBatch Maximum number of Kafka messages per batch
* @param header Optional Kafka message header
* @param partitionColumn Optional partition column
* @throws Exception
* Push random generated records with the given Avro file schema into a Kafka stream.
*/
@SuppressWarnings("unused")
public static void pushRandomAvroIntoKafka(File avroFile, String kafkaBroker, String kafkaTopic,
int numKafkaMessagesToPush, int maxNumKafkaMessagesPerBatch, @Nullable byte[] header,
@Nullable String partitionColumn)
throws Exception {
Properties properties = new Properties();
properties.put("metadata.broker.list", kafkaBroker);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");
try (StreamDataProducer producer = getKafkaProducer(kafkaBroker)) {
pushRandomAvroIntoKafka(avroFile, kafkaTopic, numKafkaMessagesToPush, maxNumKafkaMessagesPerBatch, header,
partitionColumn, producer);
}
}

/**
* Push random generated records with the given Avro file schema into a Kafka stream.
*/
public static void pushRandomAvroIntoKafka(File avroFile, String kafkaTopic, int numKafkaMessagesToPush,
int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable String partitionColumn,
StreamDataProducer producer)
throws Exception {
long counter = 0;
StreamDataProducer producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) {
BinaryEncoder binaryEncoder = new EncoderFactory().directBinaryEncoder(outputStream, null);
Expand Down Expand Up @@ -990,8 +1003,7 @@ private static void comparePinotResultsWithExpectedValues(Set<String> expectedVa
if ((!isLimitSet || limit > h2NumRows) && !expectedValues.contains(actualValue)) {
throw new RuntimeException(String.format(
"Selection result differ in Pinot from H2: Pinot row: [ %s ] not found in H2 result set: [%s].",
actualValue, expectedValues)
);
actualValue, expectedValues));
}
if (!orderByColumns.isEmpty()) {
// Check actual group value is the same as expected group value in the same order.
Expand Down
Loading

0 comments on commit a49f312

Please sign in to comment.