diff --git a/pom.xml b/pom.xml
index 18ea1fdfe..94284fb88 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,7 @@
1.11.133
0.10.2-hadoop2
1.56
- 6.10.0
+ 6.11.0-SNAPSHOT
1.1.5
1.6
2.5
diff --git a/wrangler-core/pom.xml b/wrangler-core/pom.xml
index d75f48b18..7a48f916e 100644
--- a/wrangler-core/pom.xml
+++ b/wrangler-core/pom.xml
@@ -170,6 +170,11 @@
guava-retrying
${guava.retrying.version}
+
+ com.esotericsoftware
+ kryo
+ 4.0.2
+
org.antlr
antlr4
diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/RowSerializer.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/RowSerializer.java
new file mode 100644
index 000000000..6520412f9
--- /dev/null
+++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/RowSerializer.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.wrangler.utils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonNull;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import io.cdap.wrangler.api.Row;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A helper class with allows Serialization and Deserialization using Kryo
+ * We should register all schema classes present in {@link SchemaConverter}
+ **/
+public class RowSerializer {
+
+ private final Kryo kryo;
+ private static final Gson GSON = new Gson();
+
+ public RowSerializer() {
+ kryo = new Kryo();
+ // Register all classes from SchemaConverter
+ kryo.register(Row.class);
+ kryo.register(ArrayList.class);
+ kryo.register(LocalDate.class);
+ kryo.register(LocalTime.class);
+ kryo.register(ZonedDateTime.class);
+ kryo.register(Map.class);
+ kryo.register(JsonNull.class);
+ // JsonPrimitive does not have no-arg constructor hence we need a
+ // custom serializer
+ kryo.register(JsonPrimitive.class, new JsonSerializer());
+ kryo.register(JsonArray.class);
+ kryo.register(JsonObject.class);
+ // Support deprecated util.date classes
+ kryo.register(Date.class);
+ kryo.register(java.sql.Date.class);
+ kryo.register(Time.class);
+ kryo.register(Timestamp.class);
+ }
+
+ public byte[] fromRows(List rows) {
+ Output output = new Output(1024, -1);
+ kryo.writeClassAndObject(output, rows);
+ return output.getBuffer();
+ }
+
+ public List toRows(byte[] bytes) {
+ Input input = new Input(bytes);
+ List result = (List) kryo.readClassAndObject(input);
+ return result;
+ }
+
+ static class JsonSerializer extends Serializer {
+
+ @Override
+ public void write(Kryo kryo, Output output, JsonElement object) {
+ output.writeString(GSON.toJson(object));
+ }
+
+ @Override
+ public JsonElement read(Kryo kryo, Input input, Class type) {
+ return GSON.fromJson(input.readString(), type);
+ }
+ }
+}
diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java
index 65ba2ae5d..2dd00ef16 100644
--- a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java
+++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java
@@ -100,6 +100,7 @@ public Schema getSchema(Object value, String name) throws RecordConvertorExcepti
* @param name name of the field
* @param recordPrefix prefix to append at the beginning of a custom record
* @return the schema of this object
+ * NOTE: ANY NEWLY SUPPORTED DATATYPE SHOULD ALSO BE REGISTERED IN {@link RowSerializer}
*/
@Nullable
public Schema getSchema(Object value, String name, @Nullable String recordPrefix) throws RecordConvertorException {
diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java b/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java
index aaad5eb2b..b5fa647fc 100644
--- a/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java
+++ b/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java
@@ -194,4 +194,5 @@ public final class JsonTestData {
+ " }"
+ " }";
public static final String EMPTY_OBJECT = "{ \"dividesplitdetails\":{\"type0\":[]}}";
+ public static final String NULL_OBJECT = "{ \"dividesplitdetails\":{\"type0\":null, \"type1\":0}}";
}
diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/utils/RowSerializerTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/utils/RowSerializerTest.java
new file mode 100644
index 000000000..97b45b28c
--- /dev/null
+++ b/wrangler-core/src/test/java/io/cdap/wrangler/utils/RowSerializerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.wrangler.utils;
+
+import com.google.common.collect.Lists;
+import com.google.gson.JsonParser;
+import io.cdap.wrangler.TestingRig;
+import io.cdap.wrangler.api.RecipePipeline;
+import io.cdap.wrangler.api.Row;
+import org.junit.Assert;
+import org.junit.Test;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class RowSerializerTest {
+
+ private static final String[] TESTS = new String[]{
+ JsonTestData.BASIC,
+ JsonTestData.SIMPLE_JSON_OBJECT,
+ JsonTestData.ARRAY_OF_OBJECTS,
+ JsonTestData.JSON_ARRAY_WITH_OBJECT,
+ JsonTestData.COMPLEX_1,
+ JsonTestData.ARRAY_OF_NUMBERS,
+ JsonTestData.ARRAY_OF_STRING,
+ JsonTestData.COMPLEX_2,
+ JsonTestData.EMPTY_OBJECT,
+ JsonTestData.NULL_OBJECT,
+ JsonTestData.FB_JSON
+ };
+
+ private static final String[] directives = new String[]{
+ "set-column body json:Parse(body)"
+ };
+
+ @Test
+ public void testJsonTypes() throws Exception {
+ SchemaConverter converter = new SchemaConverter();
+ RecordConvertor recordConvertor = new RecordConvertor();
+ JsonParser parser = new JsonParser();
+ RecipePipeline executor = TestingRig.execute(directives);
+ for (String test : TESTS) {
+ Row row = new Row("body", test);
+
+ List expectedRows = executor.execute(Lists.newArrayList(row));
+ byte[] serializedRows = new RowSerializer().fromRows(expectedRows);
+ List gotRows = new RowSerializer().toRows(serializedRows);
+ Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
+ }
+ }
+
+ @Test
+ public void testLogicalTypes() throws Exception {
+ Row testRow = new Row();
+ testRow.add("id", 1);
+ testRow.add("name", "abc");
+ testRow.add("date", LocalDate.of(2018, 11, 11));
+ testRow.add("time", LocalTime.of(11, 11, 11));
+ testRow.add("timestamp", ZonedDateTime.of(2018, 11, 11, 11, 11, 11, 0, ZoneId.of("UTC")));
+ testRow.add("bigdecimal", new BigDecimal(new BigInteger("123456"), 5));
+ testRow.add("datetime", LocalDateTime.now());
+ List expectedRows = Collections.singletonList(testRow);
+ byte[] serializedRows = new RowSerializer().fromRows(expectedRows);
+ List gotRows = new RowSerializer().toRows(serializedRows);
+ Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
+ }
+
+ @Test
+ public void testCollectionTypes() throws Exception {
+ List list = new ArrayList<>();
+ list.add(null);
+ list.add(1);
+ list.add(2);
+ Set set = new HashSet<>();
+ set.add(null);
+ set.add(1);
+ set.add(2);
+ Map map = new HashMap<>();
+ map.put("null", null);
+ map.put("1", 1);
+ map.put("2", 2);
+
+ Row testRow = new Row();
+ testRow.add("list", list);
+ testRow.add("set", set);
+ testRow.add("map", map);
+
+ List expectedRows = Collections.singletonList(testRow);
+ byte[] serializedRows = new RowSerializer().fromRows(expectedRows);
+ List gotRows = new RowSerializer().toRows(serializedRows);
+ Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
+ }
+}
diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java
index 27216247f..97ecd8eba 100644
--- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java
+++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java
@@ -19,6 +19,7 @@
import io.cdap.cdap.api.service.worker.RunnableTask;
import io.cdap.cdap.api.service.worker.RunnableTaskContext;
import io.cdap.cdap.api.service.worker.SystemAppTaskContext;
+import io.cdap.cdap.features.Feature;
import io.cdap.directives.aggregates.DefaultTransientStore;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.CompileException;
@@ -44,6 +45,7 @@
import io.cdap.wrangler.registry.UserDirectiveRegistry;
import io.cdap.wrangler.utils.ObjectSerDe;
+import io.cdap.wrangler.utils.RowSerializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -57,6 +59,7 @@ public class RemoteExecutionTask implements RunnableTask {
private static final Gson GSON = new Gson();
+
@Override
public void run(RunnableTaskContext runnableTaskContext) throws Exception {
RemoteDirectiveRequest directiveRequest = GSON.fromJson(runnableTaskContext.getParam(),
@@ -121,7 +124,12 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception {
}
runnableTaskContext.setTerminateOnComplete(hasUDD.get() || EL.isUsed());
- runnableTaskContext.writeResult(objectSerDe.toByteArray(rows));
+
+ if (Feature.WRANGLER_KRYO_SERIALIZATION.isEnabled(systemAppContext)) {
+ runnableTaskContext.writeResult(new RowSerializer().fromRows(rows));
+ } else {
+ runnableTaskContext.writeResult(objectSerDe.toByteArray(rows));
+ }
} catch (DirectiveParseException | ClassNotFoundException | CompileException e) {
throw new BadRequestException(e.getMessage(), e);
}
diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java
index 82b45521e..6d2667df9 100644
--- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java
+++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java
@@ -39,6 +39,7 @@
import io.cdap.cdap.etl.proto.ArtifactSelectorConfig;
import io.cdap.cdap.etl.proto.connection.ConnectorDetail;
import io.cdap.cdap.etl.proto.connection.SampleResponse;
+import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.wrangler.PropertyIds;
@@ -79,6 +80,7 @@
import io.cdap.wrangler.store.workspace.WorkspaceStore;
import io.cdap.wrangler.utils.ObjectSerDe;
import io.cdap.wrangler.utils.RowHelper;
+import io.cdap.wrangler.utils.RowSerializer;
import io.cdap.wrangler.utils.SchemaConverter;
import io.cdap.wrangler.utils.StructuredToRowTransformer;
import org.apache.commons.lang3.StringEscapeUtils;
@@ -624,7 +626,11 @@ private List executeRemotely(String namespace, List>().toObject(bytes);
+ if (Feature.WRANGLER_KRYO_SERIALIZATION.isEnabled(getContext())) {
+ return new RowSerializer().toRows(bytes);
+ } else {
+ return new ObjectSerDe>().toObject(bytes);
+ }
}
private List getSample(SampleResponse sampleResponse) {