From 53ced7f31ce280bce40581fb6cc635c34c7e02a6 Mon Sep 17 00:00:00 2001 From: Kengo Seki Date: Mon, 28 Nov 2022 09:31:48 +0900 Subject: [PATCH] Replace parquet-tools with parquet-avro The newer version of parquet-tools is not available via Maven Central as it was deprecated in Parquet 1.12.0 and removed from source in 1.12.3 (PARQUET-2020). This PR replaces it with another module so that we can catch up with the recent version of Parquet later. --- build.gradle | 3 ++- .../s3_parquet/EmbulkPluginTestHelper.scala | 24 ++++++------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/build.gradle b/build.gradle index b6eb299..f0a56e9 100644 --- a/build.gradle +++ b/build.gradle @@ -53,7 +53,8 @@ dependencies { } testImplementation "org.embulk:embulk-core:0.9.23:tests" testImplementation "org.scalatest:scalatest_2.13:3.1.1" - testImplementation 'org.apache.parquet:parquet-tools:1.11.0' + testImplementation 'org.apache.parquet:parquet-avro:1.11.0' + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-avro:2.14.0' testImplementation 'org.apache.hadoop:hadoop-client:2.9.2' } diff --git a/src/test/scala/org/embulk/output/s3_parquet/EmbulkPluginTestHelper.scala b/src/test/scala/org/embulk/output/s3_parquet/EmbulkPluginTestHelper.scala index b040d4b..c532b34 100644 --- a/src/test/scala/org/embulk/output/s3_parquet/EmbulkPluginTestHelper.scala +++ b/src/test/scala/org/embulk/output/s3_parquet/EmbulkPluginTestHelper.scala @@ -13,12 +13,13 @@ import com.amazonaws.services.s3.transfer.{ TransferManagerBuilder } import com.google.inject.{Binder, Guice, Module, Stage} +import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path => HadoopPath} +import org.apache.parquet.avro.AvroReadSupport import org.apache.parquet.hadoop.{ParquetFileReader, ParquetReader} import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.schema.MessageType -import org.apache.parquet.tools.read.{SimpleReadSupport, SimpleRecord} import org.embulk.{TestPluginSourceModule, TestUtilityModule} import org.embulk.config.{ ConfigLoader, @@ -229,27 +230,16 @@ abstract class EmbulkPluginTestHelper ) ) { reader => messageTypeTest(reader.getFileMetaData.getSchema) } - val reader: ParquetReader[SimpleRecord] = ParquetReader + val reader: ParquetReader[GenericRecord] = ParquetReader .builder( - new SimpleReadSupport(), + new AvroReadSupport[GenericRecord](), new HadoopPath(pathString) ) .build() - def read( - reader: ParquetReader[SimpleRecord], - records: Seq[Seq[AnyRef]] = Seq() - ): Seq[Seq[AnyRef]] = { - val simpleRecord: SimpleRecord = reader.read() - if (simpleRecord != null) { - val r: Seq[AnyRef] = simpleRecord.getValues - .map(_.getValue) - return read(reader, records :+ r) - } - records - } - try read(reader) - finally reader.close() + Iterator.continually(reader.read()).takeWhile(_ != null).map( + record => record.getSchema.getFields.map(f => record.get(f.name())) + ).toSeq } def loadConfigSourceFromYamlString(yaml: String): ConfigSource =