Skip to content

Commit

Permalink
[Kernel] Default Parquet reader implementation
Browse files Browse the repository at this point in the history
This PR is part of #1783.

It implements Parquet reader based on `parquet-mr` and generates the output as columnar batches of `ColumnVector` and `ColumnarBatch` interface implementations.

UTs

Closes #1846
  • Loading branch information
vkorukanti committed Jun 22, 2023
1 parent cb89436 commit 04a29a4
Show file tree
Hide file tree
Showing 28 changed files with 3,407 additions and 26 deletions.
6 changes: 1 addition & 5 deletions kernel/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,13 @@ lazy val kernelDefault = (project in file("kernel-default"))
scalaStyleSettings,
releaseSettings,
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client-api" % hadoopVersion, // Configuration, Path
"org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion, // Configuration, Path
"io.delta" % "delta-storage" % deltaStorageVersion, // LogStore
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5", // ObjectMapper
"org.apache.parquet" % "parquet-hadoop" % "1.12.3",

"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"io.delta" %% "delta-core" % deltaSparkVersion % "test",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test", // SparkSession
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
"junit" % "junit" % "4.11" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.util.function.Consumer;
import java.util.function.Function;

public interface CloseableIterator<T> extends Iterator<T>, Closeable {
public interface CloseableIterator<T> extends Iterator<T>, Closeable
{
default <U> CloseableIterator<U> map(Function<T, U> mapper) {
CloseableIterator<T> delegate = this;
return new CloseableIterator<U>() {
Expand Down
92 changes: 72 additions & 20 deletions kernel/kernel-api/src/main/java/io/delta/kernel/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,36 @@
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;

public class Utils {
public class Utils
{
/**
* Utility method to create a singleton {@link CloseableIterator}.
*
* @param elem Element to create iterator with.
* @param <T> Element type.
* @param <T> Element type.
* @return A {@link CloseableIterator} with just one element.
*/
public static <T> CloseableIterator<T> singletonCloseableIterator(T elem) {
return new CloseableIterator<T>() {
public static <T> CloseableIterator<T> singletonCloseableIterator(T elem)
{
return new CloseableIterator<T>()
{
private boolean accessed;

@Override
public void close() throws IOException {
public void close() throws IOException
{
// nothing to close
}

@Override
public boolean hasNext() {
public boolean hasNext()
{
return !accessed;
}

@Override
public T next() {
public T next()
{
accessed = true;
return elem;
}
Expand All @@ -61,14 +67,17 @@ public T next() {
/**
* Convert a {@link Iterator} to {@link CloseableIterator}. Useful when passing normal iterators
* for arguments that require {@link CloseableIterator} type.
*
* @param iter {@link Iterator} instance
* @param <T> Element type
* @return A {@link CloseableIterator} wrapping the given {@link Iterator}
*/
public static <T> CloseableIterator<T> toCloseableIterator(Iterator<T> iter) {
return new CloseableIterator<T>() {
public static <T> CloseableIterator<T> toCloseableIterator(Iterator<T> iter)
{
return new CloseableIterator<T>()
{
@Override
public void close() { }
public void close() {}

@Override
public boolean hasNext()
Expand All @@ -91,30 +100,36 @@ public T next()
* @return A {@link ColumnVector} with a single element {@code value}
*/
// TODO: add String to method name or make generic?
public static ColumnVector singletonColumnVector(String value) {
return new ColumnVector() {
public static ColumnVector singletonColumnVector(String value)
{
return new ColumnVector()
{
@Override
public DataType getDataType()
{
return StringType.INSTANCE;
}

@Override
public int getSize() {
public int getSize()
{
return 1;
}

@Override
public void close() {
public void close()
{
}

@Override
public boolean isNullAt(int rowId) {
public boolean isNullAt(int rowId)
{
return value == null;
}

@Override
public String getString(int rowId) {
public String getString(int rowId)
{
if (rowId != 0) {
throw new IllegalArgumentException("Invalid row id: " + rowId);
}
Expand All @@ -130,7 +145,8 @@ public String getString(int rowId) {
* @param scanState Scan state {@link Row}
* @return Physical schema to read from the data files.
*/
public static StructType getPhysicalSchema(Row scanState) {
public static StructType getPhysicalSchema(Row scanState)
{
// TODO needs io.delta.kernel.internal.data.ScanStateRow
throw new UnsupportedOperationException("not implemented yet");
}
Expand All @@ -142,7 +158,8 @@ public static StructType getPhysicalSchema(Row scanState) {
* @param scanFileInfo {@link Row} representing one scan file.
* @return a {@link FileStatus} object created from the given scan file row.
*/
public static FileStatus getFileStatus(Row scanFileInfo) {
public static FileStatus getFileStatus(Row scanFileInfo)
{
String path = scanFileInfo.getString(0);
Long size = scanFileInfo.getLong(2);

Expand All @@ -151,13 +168,48 @@ public static FileStatus getFileStatus(Row scanFileInfo) {

/**
* Close the iterator.
*
* @param i1
*/
public static void safeClose(CloseableIterator i1) {
public static void safeClose(CloseableIterator i1)
{
try {
i1.close();
} catch (IOException ioe) {
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

/**
* Close the given one or more {@link CloseableIterator}s. {@link CloseableIterator#close()}
* will be called on all given non-null iterators. Will throw unchecked {@link RuntimeException}
* if an error occurs while closing. If multiple iterators causes exceptions in closing, the
* exceptions will be added as suppressed to the main exception that is thrown.
*
* @param iters
*/
public static void closeIterators(CloseableIterator<? extends Object>... iters)
{
RuntimeException exception = null;
for (CloseableIterator<? extends Object> iter : iters) {
if (iter == null) {
continue;
}
try {
iter.close();
}
catch (Exception ex) {
if (exception == null) {
exception = new RuntimeException(ex);
}
else {
exception.addSuppressed(ex);
}
}
}
if (exception != null) {
throw exception;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;

public class DefaultKernelUtils
{
private DefaultKernelUtils() {}

/**
* Given the file schema in Parquet file and selected columns by Delta, return
* a subschema of the file schema.
*
* @param fileSchema
* @param deltaType
* @return
*/
public static final MessageType pruneSchema(
GroupType fileSchema, // parquet
StructType deltaType) // delta-core
{
return deltaType.fields().stream()
.map(column -> {
Type type = findSubFieldType(fileSchema, column);
if (type == null) {
return null;
}
Type prunedSubfields = pruneSubfields(type, column.getDataType());
return new MessageType(column.getName(), prunedSubfields);
})
.filter(Objects::nonNull)
.reduce(MessageType::union)
.get();
}

/**
* Search for the Parquet type in {@code groupType} of subfield which is equivalent to
* given {@code field}.
*
* @param groupType Parquet group type coming from the file schema.
* @param field Sub field given as Delta Kernel's {@link StructField}
* @return {@link Type} of the Parquet field. Returns {@code null}, if not found.
*/
public static Type findSubFieldType(GroupType groupType, StructField field)
{
// TODO: Need a way to search by id once we start supporting column mapping `id` mode.
final String columnName = field.getName();
if (groupType.containsField(columnName)) {
return groupType.getType(columnName);
}
// Parquet is case-sensitive, but the engine that generated the parquet file may not be.
// Check for direct match above but if no match found, try case-insensitive match.
for (org.apache.parquet.schema.Type type : groupType.getFields()) {
if (type.getName().equalsIgnoreCase(columnName)) {
return type;
}
}

return null;
}

// Note this only prunes top-level fields
private static Type pruneSubfields(Type type, DataType deltaDatatype)
{
if (!(deltaDatatype instanceof StructType)) {
// there is no pruning for non-struct types
return type;
}

GroupType groupType = (GroupType) type;
List<Type> newParquetSubFields =
((StructType) deltaDatatype).fields().stream()
.map(structField -> findSubFieldType(groupType, structField))
.filter(Objects::nonNull)
.collect(Collectors.toList());

return groupType.withNewFields(newParquetSubFields);
}

/**
* Precondition-style validation that throws {@link IllegalArgumentException}.
*
* @param isValid {@code true} if valid, {@code false} if an exception should be thrown
* @throws IllegalArgumentException if {@code isValid} is false
*/
public static void checkArgument(boolean isValid)
throws IllegalArgumentException
{
if (!isValid) {
throw new IllegalArgumentException();
}
}

/**
* Precondition-style validation that throws {@link IllegalArgumentException}.
*
* @param isValid {@code true} if valid, {@code false} if an exception should be thrown
* @param message A String message for the exception.
* @throws IllegalArgumentException if {@code isValid} is false
*/
public static void checkArgument(boolean isValid, String message)
throws IllegalArgumentException
{
if (!isValid) {
throw new IllegalArgumentException(message);
}
}

/**
* Precondition-style validation that throws {@link IllegalArgumentException}.
*
* @param isValid {@code true} if valid, {@code false} if an exception should be thrown
* @param message A String message for the exception.
* @param args Objects used to fill in {@code %s} placeholders in the message
* @throws IllegalArgumentException if {@code isValid} is false
*/
public static void checkArgument(boolean isValid, String message, Object... args)
throws IllegalArgumentException
{
if (!isValid) {
throw new IllegalArgumentException(
String.format(String.valueOf(message), args));
}
}

/**
* Precondition-style validation that throws {@link IllegalStateException}.
*
* @param isValid {@code true} if valid, {@code false} if an exception should be thrown
* @param message A String message for the exception.
* @throws IllegalStateException if {@code isValid} is false
*/
public static void checkState(boolean isValid, String message)
throws IllegalStateException
{
if (!isValid) {
throw new IllegalStateException(message);
}
}
}
Loading

0 comments on commit 04a29a4

Please sign in to comment.