Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PLUGIN-1755: Use Kryo for serialisation #710

Merged
merged 1 commit into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
<aws.sdk.version>1.11.133</aws.sdk.version>
<bigquery.connector.hadoop2.version>0.10.2-hadoop2</bigquery.connector.hadoop2.version>
<bouncycastle.version>1.56</bouncycastle.version>
<cdap.version>6.10.0</cdap.version>
<cdap.version>6.11.0-SNAPSHOT</cdap.version>
<chlorine.version>1.1.5</chlorine.version>
<commons.validator.version>1.6</commons.validator.version>
<commons-io.version>2.5</commons-io.version>
Expand Down
5 changes: 5 additions & 0 deletions wrangler-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@
<artifactId>guava-retrying</artifactId>
<version>${guava.retrying.version}</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.wrangler.utils;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.cdap.wrangler.api.Row;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

/**
* A helper class with allows Serialization and Deserialization using Kryo
* We should register all schema classes present in {@link SchemaConverter}
**/
public class RowSerializer {

private final Kryo kryo;
private static final Gson GSON = new Gson();

public RowSerializer() {
kryo = new Kryo();
// Register all classes from SchemaConverter
kryo.register(Row.class);
kryo.register(ArrayList.class);
kryo.register(LocalDate.class);
kryo.register(LocalTime.class);
kryo.register(ZonedDateTime.class);
kryo.register(Map.class);
kryo.register(JsonNull.class);
// JsonPrimitive does not have no-arg constructor hence we need a
// custom serializer
kryo.register(JsonPrimitive.class, new JsonSerializer());
kryo.register(JsonArray.class);
kryo.register(JsonObject.class);
// Support deprecated util.date classes
kryo.register(Date.class);
kryo.register(java.sql.Date.class);
kryo.register(Time.class);
kryo.register(Timestamp.class);
}

public byte[] fromRows(List<Row> rows) {
Output output = new Output(1024, -1);
kryo.writeClassAndObject(output, rows);
return output.getBuffer();
}

public List<Row> toRows(byte[] bytes) {
Input input = new Input(bytes);
List<Row> result = (List<Row>) kryo.readClassAndObject(input);
return result;
}

static class JsonSerializer extends Serializer<JsonElement> {

@Override
public void write(Kryo kryo, Output output, JsonElement object) {
output.writeString(GSON.toJson(object));
}

@Override
public JsonElement read(Kryo kryo, Input input, Class<JsonElement> type) {
return GSON.fromJson(input.readString(), type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public Schema getSchema(Object value, String name) throws RecordConvertorExcepti
* @param name name of the field
* @param recordPrefix prefix to append at the beginning of a custom record
* @return the schema of this object
* NOTE: ANY NEWLY SUPPORTED DATATYPE SHOULD ALSO BE REGISTERED IN {@link RowSerializer}
*/
@Nullable
public Schema getSchema(Object value, String name, @Nullable String recordPrefix) throws RecordConvertorException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,5 @@ public final class JsonTestData {
+ " }"
+ " }";
public static final String EMPTY_OBJECT = "{ \"dividesplitdetails\":{\"type0\":[]}}";
public static final String NULL_OBJECT = "{ \"dividesplitdetails\":{\"type0\":null, \"type1\":0}}";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.wrangler.utils;

import com.google.common.collect.Lists;
import com.google.gson.JsonParser;
import io.cdap.wrangler.TestingRig;
import io.cdap.wrangler.api.RecipePipeline;
import io.cdap.wrangler.api.Row;
import org.junit.Assert;
import org.junit.Test;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class RowSerializerTest {

private static final String[] TESTS = new String[]{
JsonTestData.BASIC,
JsonTestData.SIMPLE_JSON_OBJECT,
JsonTestData.ARRAY_OF_OBJECTS,
JsonTestData.JSON_ARRAY_WITH_OBJECT,
JsonTestData.COMPLEX_1,
JsonTestData.ARRAY_OF_NUMBERS,
JsonTestData.ARRAY_OF_STRING,
JsonTestData.COMPLEX_2,
JsonTestData.EMPTY_OBJECT,
JsonTestData.NULL_OBJECT,
JsonTestData.FB_JSON
};

private static final String[] directives = new String[]{
"set-column body json:Parse(body)"
};

@Test
public void testJsonTypes() throws Exception {
SchemaConverter converter = new SchemaConverter();
RecordConvertor recordConvertor = new RecordConvertor();
JsonParser parser = new JsonParser();
RecipePipeline executor = TestingRig.execute(directives);
for (String test : TESTS) {
Row row = new Row("body", test);

List<Row> expectedRows = executor.execute(Lists.newArrayList(row));
byte[] serializedRows = new RowSerializer().fromRows(expectedRows);
List<Row> gotRows = new RowSerializer().toRows(serializedRows);
Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
}
}

@Test
public void testLogicalTypes() throws Exception {
Row testRow = new Row();
testRow.add("id", 1);
testRow.add("name", "abc");
testRow.add("date", LocalDate.of(2018, 11, 11));
testRow.add("time", LocalTime.of(11, 11, 11));
testRow.add("timestamp", ZonedDateTime.of(2018, 11, 11, 11, 11, 11, 0, ZoneId.of("UTC")));
testRow.add("bigdecimal", new BigDecimal(new BigInteger("123456"), 5));
testRow.add("datetime", LocalDateTime.now());
List<Row> expectedRows = Collections.singletonList(testRow);
byte[] serializedRows = new RowSerializer().fromRows(expectedRows);
List<Row> gotRows = new RowSerializer().toRows(serializedRows);
Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
}

@Test
public void testCollectionTypes() throws Exception {
List<Integer> list = new ArrayList<>();
list.add(null);
list.add(1);
list.add(2);
Set<Integer> set = new HashSet<>();
set.add(null);
set.add(1);
set.add(2);
Map<String, Integer> map = new HashMap<>();
map.put("null", null);
map.put("1", 1);
map.put("2", 2);

Row testRow = new Row();
testRow.add("list", list);
testRow.add("set", set);
testRow.add("map", map);

List<Row> expectedRows = Collections.singletonList(testRow);
byte[] serializedRows = new RowSerializer().fromRows(expectedRows);
List<Row> gotRows = new RowSerializer().toRows(serializedRows);
Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.cdap.cdap.api.service.worker.RunnableTask;
import io.cdap.cdap.api.service.worker.RunnableTaskContext;
import io.cdap.cdap.api.service.worker.SystemAppTaskContext;
import io.cdap.cdap.features.Feature;
import io.cdap.directives.aggregates.DefaultTransientStore;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.CompileException;
Expand All @@ -44,6 +45,7 @@
import io.cdap.wrangler.registry.UserDirectiveRegistry;
import io.cdap.wrangler.utils.ObjectSerDe;

import io.cdap.wrangler.utils.RowSerializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -57,6 +59,7 @@ public class RemoteExecutionTask implements RunnableTask {

private static final Gson GSON = new Gson();


@Override
public void run(RunnableTaskContext runnableTaskContext) throws Exception {
RemoteDirectiveRequest directiveRequest = GSON.fromJson(runnableTaskContext.getParam(),
Expand Down Expand Up @@ -121,7 +124,12 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception {
}

runnableTaskContext.setTerminateOnComplete(hasUDD.get() || EL.isUsed());
runnableTaskContext.writeResult(objectSerDe.toByteArray(rows));

if (Feature.WRANGLER_KRYO_SERIALIZATION.isEnabled(systemAppContext)) {
runnableTaskContext.writeResult(new RowSerializer().fromRows(rows));
} else {
runnableTaskContext.writeResult(objectSerDe.toByteArray(rows));
}
} catch (DirectiveParseException | ClassNotFoundException | CompileException e) {
throw new BadRequestException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.cdap.cdap.etl.proto.ArtifactSelectorConfig;
import io.cdap.cdap.etl.proto.connection.ConnectorDetail;
import io.cdap.cdap.etl.proto.connection.SampleResponse;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.wrangler.PropertyIds;
Expand Down Expand Up @@ -79,6 +80,7 @@
import io.cdap.wrangler.store.workspace.WorkspaceStore;
import io.cdap.wrangler.utils.ObjectSerDe;
import io.cdap.wrangler.utils.RowHelper;
import io.cdap.wrangler.utils.RowSerializer;
import io.cdap.wrangler.utils.SchemaConverter;
import io.cdap.wrangler.utils.StructuredToRowTransformer;
import org.apache.commons.lang3.StringEscapeUtils;
Expand Down Expand Up @@ -624,7 +626,11 @@ private <E extends Exception> List<Row> executeRemotely(String namespace, List<S
.withNamespace(namespace)
.build();
byte[] bytes = getContext().runTask(runnableTaskRequest);
return new ObjectSerDe<List<Row>>().toObject(bytes);
if (Feature.WRANGLER_KRYO_SERIALIZATION.isEnabled(getContext())) {
return new RowSerializer().toRows(bytes);
} else {
return new ObjectSerDe<List<Row>>().toObject(bytes);
}
}

private List<Row> getSample(SampleResponse sampleResponse) {
Expand Down
Loading