Skip to content

Commit

Permalink
Updates our usage of the Apache Parquet project to use their new inte…
Browse files Browse the repository at this point in the history
…rfaces over the old Hadoop ones. This allows use to be ready to extract Hadoop as other changes are made to the Parquet project. Remove some Hadoop transitive dependencies and make Hadoop runtime only where possible. Added a test for INT96, clean up some test files. Contributes toward #4612. (#4623)

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Jul 1, 2024
1 parent f27b808 commit 1538288
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 20 deletions.
16 changes: 12 additions & 4 deletions data-prepper-plugins/parquet-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,24 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation libs.avro.core
implementation libs.hadoop.common
implementation(libs.hadoop.mapreduce) {
exclude group: 'org.apache.hadoop', module: 'hadoop-hdfs-client'
}
implementation 'org.apache.commons:commons-text:1.11.0'
implementation 'org.apache.parquet:parquet-avro:1.14.0'
implementation 'org.apache.parquet:parquet-column:1.14.0'
implementation 'org.apache.parquet:parquet-common:1.14.0'
implementation 'org.apache.parquet:parquet-hadoop:1.14.0'
runtimeOnly(libs.hadoop.common) {
exclude group: 'org.eclipse.jetty'
exclude group: 'org.apache.hadoop', module: 'hadoop-auth'
}
runtimeOnly(libs.hadoop.mapreduce) {
exclude group: 'org.apache.hadoop', module: 'hadoop-hdfs-client'
}
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-test-event')
testImplementation(libs.hadoop.common) {
exclude group: 'org.eclipse.jetty'
exclude group: 'org.apache.hadoop', module: 'hadoop-auth'
}

constraints {
implementation('com.nimbusds:nimbus-jose-jwt') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
package org.opensearch.dataprepper.plugins.codec.parquet;

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.hadoop.ParquetReader;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
Expand Down Expand Up @@ -46,13 +47,13 @@ public class ParquetInputCodec implements InputCodec {

private static final Logger LOG = LoggerFactory.getLogger(ParquetInputCodec.class);

private final Configuration configuration;
private final ParquetConfiguration configuration;
private final EventFactory eventFactory;

@DataPrepperPluginConstructor
public ParquetInputCodec(final EventFactory eventFactory) {
this.eventFactory = eventFactory;
configuration = new Configuration();
configuration = new PlainParquetConfiguration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
}

Expand Down Expand Up @@ -80,8 +81,7 @@ public void parse(final InputFile inputFile, final DecompressionEngine decompres
}

private void parseParquetFile(final InputFile inputFile, final Consumer<Record<Event>> eventConsumer) throws IOException {
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile)
.withConf(this.configuration)
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile, this.configuration)
.build()) {
GenericRecordJsonEncoder encoder = new GenericRecordJsonEncoder();
GenericRecord record = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.NanoTime;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -33,12 +42,15 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.time.OffsetDateTime;
import java.time.temporal.JulianFields;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -161,6 +173,22 @@ public void parseInputFile_parsesCorrectly() throws IOException {
assertRecordsCorrect(actualRecords);
}

@Test
public void parseInputStream_parsesCorrectly_with_int96() throws IOException {
final File testDataFile = File.createTempFile(FILE_PREFIX + "-int96-", FILE_SUFFIX);
testDataFile.deleteOnExit();
generateTestDataInt96(testDataFile);
InputStream targetStream = new FileInputStream(testDataFile);

parquetInputCodec.parse(targetStream, mockConsumer);

final ArgumentCaptor<Record<Event>> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
verify(mockConsumer, times(10)).accept(recordArgumentCaptor.capture());

final List<Record<Event>> actualRecords = recordArgumentCaptor.getAllValues();
assertThat(actualRecords.size(), equalTo(10));
}

@Test
public void parseInputFile_snappyInputFile() throws IOException, URISyntaxException {
URL resource = getClass().getClassLoader().getResource("sample.snappy.parquet");
Expand Down Expand Up @@ -203,8 +231,10 @@ public void parseInputFile_testParquetFile() throws IOException, URISyntaxExcept
private static void generateTestData(final File file) throws IOException {
Schema schema = new Schema.Parser().parse(SCHEMA_JSON);

ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new LocalOutputFile(file))
final ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new LocalOutputFile(file))
.withSchema(schema)
.withConf(new PlainParquetConfiguration())
.withEncryption(null)
.build();

for (int i = 0; i < 10; i++) {
Expand All @@ -220,6 +250,34 @@ private static void generateTestData(final File file) throws IOException {
writer.close();
}

/**
* Generates a Parquet file with INT96 data. This must use the example
* schema rather than Avro, or it would not correctly reproduce possible INT96
* error.
*
* @param file The file for Parquet
*/
private static void generateTestDataInt96(final File file) throws IOException {
final MessageType schema = new MessageType("test", List.of(
new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT96, "my_timestamp_value")
));
final PlainParquetConfiguration conf = new PlainParquetConfiguration();
conf.setStrings(WRITE_FIXED_AS_INT96, "my_timestamp_value");
conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
final ParquetWriter<Group> writer = ExampleParquetWriter.builder(new LocalOutputFile(file))
.withConf(conf)
.withEncryption(null)
.build();

for (int i = 0; i < 10; i++) {
final Group group = new SimpleGroup(schema);
group.add("my_timestamp_value", createInt96());

writer.write(group);
}
writer.close();
}

private void assertRecordsCorrect(final List<Record<Event>> records) {
assertThat(records.size(), equalTo(10));
for (int i = 0; i < 10; i++) {
Expand All @@ -240,5 +298,9 @@ private void assertRecordsCorrect(final List<Record<Event>> records) {
assertThat(record.getData().getMetadata().getEventType(), equalTo(EVENT_TYPE));
}
}

private static NanoTime createInt96() {
return new NanoTime((int) OffsetDateTime.now().getLong(JulianFields.JULIAN_DAY), System.nanoTime());
}
}

5 changes: 4 additions & 1 deletion data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ dependencies {
implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.9.22'
implementation project(':data-prepper-plugins:avro-codecs')
implementation libs.avro.core
implementation libs.hadoop.common
implementation(libs.hadoop.common) {
exclude group: 'org.eclipse.jetty'
exclude group: 'org.apache.hadoop', module: 'hadoop-auth'
}
implementation 'org.apache.parquet:parquet-avro:1.14.0'
implementation 'software.amazon.awssdk:apache-client'
implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.9.22'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.avro.util.Utf8;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand Down Expand Up @@ -65,7 +66,7 @@ public void validate(int expectedRecords, final List<Map<String, Object>> sample
int validatedRecords = 0;

int count = 0;
try (final ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile)
try (final ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile, new PlainParquetConfiguration())
.build()) {
GenericRecord record;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.LocalInputFile;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -79,6 +77,7 @@
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -413,7 +412,7 @@ private List<HashMap<String, Object>> createParquetRecordsList(final InputStream
final File tempFile = File.createTempFile(FILE_NAME, FILE_SUFFIX);
Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
List<HashMap<String, Object>> actualRecordList = new ArrayList<>();
try (ParquetFileReader parquetFileReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(tempFile.toURI()), new Configuration()), ParquetReadOptions.builder().build())) {
try (final ParquetFileReader parquetFileReader = new ParquetFileReader(new LocalInputFile(Path.of(tempFile.toURI())), ParquetReadOptions.builder().build())) {
final ParquetMetadata footer = parquetFileReader.getFooter();
final MessageType schema = createdParquetSchema(footer);
PageReadStore pages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.LocalInputFile;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -46,6 +44,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -556,7 +555,7 @@ private List<Map<String, Object>> createParquetRecordsList(final InputStream inp
final File tempFile = new File(tempDirectory, FILE_NAME);
Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
List<Map<String, Object>> actualRecordList = new ArrayList<>();
try (ParquetFileReader parquetFileReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(tempFile.toURI()), new Configuration()), ParquetReadOptions.builder().build())) {
try (final ParquetFileReader parquetFileReader = new ParquetFileReader(new LocalInputFile(Path.of(tempFile.toURI())), ParquetReadOptions.builder().build())) {
final ParquetMetadata footer = parquetFileReader.getFooter();
final MessageType schema = createdParquetSchema(footer);
PageReadStore pages;
Expand Down

0 comments on commit 1538288

Please sign in to comment.