Skip to content

Commit

Permalink
Fix use of SR for record key
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Jul 27, 2024
1 parent a6d8bb3 commit 1ed1493
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>io.kcache</groupId>
<artifactId>kwack</artifactId>
<version>0.6.0</version>
<version>0.7.0</version>
<packaging>jar</packaging>

<name>kwack</name>
Expand Down
25 changes: 18 additions & 7 deletions src/main/java/io/kcache/kwack/KwackEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaJsonDeserializerConfig;
import io.kcache.CacheUpdateHandler;
import io.kcache.KafkaCache;
import io.kcache.KafkaCacheConfig;
Expand Down Expand Up @@ -438,7 +439,7 @@ private Tuple2<Context, Object> deserialize(boolean isKey, String topic, byte[]
Tuple2<Serde, ParsedSchema> schema =
isKey ? getKeySchema(serde, topic) : getValueSchema(serde, topic);

Deserializer<?> deserializer = getDeserializer(schema);
Deserializer<?> deserializer = getDeserializer(isKey, schema);

if (serde.usesExternalSchema() || config.getSkipBytes() > 0) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
Expand Down Expand Up @@ -492,11 +493,11 @@ private Tuple2<Context, Object> deserialize(boolean isKey, String topic, byte[]
return Tuple.of(ctx, object);
}

public Deserializer<?> getDeserializer(Tuple2<Serde, ParsedSchema> schema) {
return deserializers.computeIfAbsent(schema, this::createDeserializer);
public Deserializer<?> getDeserializer(boolean isKey, Tuple2<Serde, ParsedSchema> schema) {
return deserializers.computeIfAbsent(schema, k -> createDeserializer(isKey, schema));
}

private Deserializer<?> createDeserializer(Tuple2<Serde, ParsedSchema> schema) {
private Deserializer<?> createDeserializer(boolean isKey, Tuple2<Serde, ParsedSchema> schema) {
if (schema._2 != null) {
ParsedSchema parsedSchema = schema._2;
SchemaRegistryClient schemaRegistry = null;
Expand All @@ -517,18 +518,28 @@ private Deserializer<?> createDeserializer(Tuple2<Serde, ParsedSchema> schema) {
originals.put(AbstractKafkaSchemaSerDeConfig.USE_SCHEMA_ID, schema._1.getId());
break;
}
Deserializer<?> deserializer = null;
switch (parsedSchema.schemaType()) {
case "AVRO":
// This allows BigDecimal to be passed through unchanged
originals.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);
return new KafkaAvroDeserializer(schemaRegistry, originals);
deserializer = new KafkaAvroDeserializer(schemaRegistry);
break;
case "JSON":
return new KafkaJsonSchemaDeserializer<>(schemaRegistry, originals);
// Set the type to null so JsonNode is produced
// Otherwise the type defaults to Object.class which produces a LinkedHashMap
originals.put(KafkaJsonDeserializerConfig.JSON_KEY_TYPE, null);
originals.put(KafkaJsonDeserializerConfig.JSON_VALUE_TYPE, null);
deserializer = new KafkaJsonSchemaDeserializer<>(schemaRegistry);
break;
case "PROTOBUF":
return new KafkaProtobufDeserializer<>(schemaRegistry, originals);
deserializer = new KafkaProtobufDeserializer<>(schemaRegistry);
break;
default:
throw new IllegalArgumentException("Illegal type " + parsedSchema.schemaType());
}
deserializer.configure(originals, isKey);
return deserializer;
} else {
switch (schema._1.getSerdeType()) {
case STRING:
Expand Down
25 changes: 19 additions & 6 deletions src/test/java/io/kcache/kwack/AbstractSchemaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,38 @@ protected Properties createProducerProps(String schemaRegistryUrl) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(SCHEMA_REGISTRY_URL, schemaRegistryUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.BytesSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKeySerializer());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getValueSerializer());
return props;
}

protected abstract String getTopic();

protected Class<?> getKeySerializer() {
return org.apache.kafka.common.serialization.BytesSerializer.class;
}

protected abstract Class<?> getValueSerializer();

protected KafkaProducer createProducer(Properties props) {
return new KafkaProducer(props);
}

protected void produce(KafkaProducer producer, String topic, Object[] objects) {
ProducerRecord<Bytes, Object> record;
for (Object object : objects) {
byte[] bytes = ByteBuffer.allocate(4).putInt(object.hashCode()).array();
record = new ProducerRecord<>(topic, Bytes.wrap(bytes), object);
produce(producer, topic, null, objects);
}

protected void produce(KafkaProducer producer, String topic, Object[] keys, Object[] values) {
ProducerRecord<Object, Object> record;
for (int i = 0; i < values.length; i++) {
Object value = values[i];
Object key;
if (keys != null) {
key = keys[i];
} else {
key = Bytes.wrap(ByteBuffer.allocate(4).putInt(value.hashCode()).array());
}
record = new ProducerRecord<>(topic, key, value);
producer.send(record);
}
}
Expand Down
216 changes: 216 additions & 0 deletions src/test/java/io/kcache/kwack/AvroKeyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package io.kcache.kwack;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.reactivex.rxjava3.core.Observable;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.jupiter.api.Test;

public class AvroKeyTest extends AbstractSchemaTest {

@Override
protected Properties createProducerProps(String schemaRegistryUrl) {
Properties props = super.createProducerProps(schemaRegistryUrl);
props.put(KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);
return props;
}

private Schema createSimpleSchema() {
return new Schema.Parser().parse(
"{\"namespace\": \"namespace\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"test\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"f1\", \"type\": \"string\"},\n"
+ " {\"name\": \"f2\", \"type\": \"int\"}\n"
+ "]\n"
+ "}");
}

private IndexedRecord createSimpleRecord() {
return createSimpleRecord(123);
}

private IndexedRecord createSimpleRecord(int f2) {
Schema schema = createSimpleSchema();
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "hi");
avroRecord.put("f2", f2);
return avroRecord;
}

private Schema createEnumSchema() {
String enumSchema = "{\"name\": \"Kind\",\"namespace\": \"example.avro\",\n"
+ " \"type\": \"enum\",\n"
+ " \"symbols\" : [\"ONE\", \"TWO\", \"THREE\"]\n"
+ "}";
Schema.Parser parser = new Schema.Parser();
return parser.parse(enumSchema);
}

private Schema createFixedSchema() {
String fixedSchema = "{\"name\": \"Fixed\",\n"
+ " \"type\": \"fixed\",\n"
+ " \"size\" : 4\n"
+ "}";
Schema.Parser parser = new Schema.Parser();
return parser.parse(fixedSchema);
}

private Schema createComplexSchema() {
return new Schema.Parser().parse(
"{\"namespace\": \"namespace\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"test\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"null\", \"type\": \"null\"},\n"
+ " {\"name\": \"boolean\", \"type\": \"boolean\"},\n"
+ " {\"name\": \"int\", \"type\": \"int\"},\n"
+ " {\"name\": \"long\", \"type\": \"long\"},\n"
+ " {\"name\": \"float\", \"type\": \"float\"},\n"
+ " {\"name\": \"double\", \"type\": \"double\"},\n"
+ " {\"name\": \"bytes\", \"type\": \"bytes\"},\n"
+ " {\"name\": \"string\", \"type\": \"string\", \"aliases\": [\"string_alias\"]},\n"
+ " {\"name\": \"enum\",\n"
+ " \"type\": {\n"
+ " \"name\": \"Kind\",\n"
+ " \"type\": \"enum\",\n"
+ " \"symbols\" : [\"ONE\", \"TWO\", \"THREE\"]\n"
+ " }\n"
+ " },\n"
+ " {\"name\": \"array\",\n"
+ " \"type\": {\n"
+ " \"type\": \"array\",\n"
+ " \"items\" : \"string\"\n"
+ " }\n"
+ " },\n"
+ " {\"name\": \"map\",\n"
+ " \"type\": {\n"
+ " \"type\": \"map\",\n"
+ " \"values\" : \"string\"\n"
+ " }\n"
+ " },\n"
+ " {\"name\": \"nullable_string\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"union\", \"type\": [\"null\", \"string\", \"int\"]},\n"
+ " {\"name\": \"fixed\",\n"
+ " \"type\": {\n"
+ " \"name\": \"Fixed\",\n"
+ " \"type\": \"fixed\",\n"
+ " \"size\" : 4\n"
+ " }\n"
+ " },\n"
+ " {\"name\": \"decimal\", \"type\": {\"type\": \"bytes\",\n"
+ " \"logicalType\": \"decimal\", \"precision\": 5, \"scale\": 2}},\n"
+ " {\"name\": \"uuid\", \"type\": {\"type\": \"string\", \"logicalType\": \"uuid\"}},\n"
+ " {\"name\": \"date\", \"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},\n"
+ " {\"name\": \"time\", \"type\": {\"type\": \"int\", \"logicalType\": \"time-millis\"}},\n"
+ " {\"name\": \"timestamp\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}\n"
+ "]\n"
+ "}");
}

private IndexedRecord createComplexRecord() {
Schema enumSchema = createEnumSchema();
Schema fixedSchema = createFixedSchema();
Schema schema = createComplexSchema();
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("null", null);
avroRecord.put("boolean", true);
avroRecord.put("int", 1);
avroRecord.put("long", 2L);
avroRecord.put("float", 3.0f);
avroRecord.put("double", 4.0d);
avroRecord.put("bytes", ByteBuffer.wrap(new byte[]{0, 1, 2}));
avroRecord.put("string", "testUser");
avroRecord.put("enum", new GenericData.EnumSymbol(enumSchema, "ONE"));
avroRecord.put("array", ImmutableList.of("hi", "there"));
avroRecord.put("map", ImmutableMap.of("bye", "there"));
avroRecord.put("nullable_string", "zap");
avroRecord.put("union", 123);
avroRecord.put("fixed", new GenericData.Fixed(fixedSchema, new byte[]{0, 0, 0, 0}));
avroRecord.put("decimal", new BigDecimal("123.45"));
avroRecord.put("uuid", UUID.fromString("d21998e8-8737-432e-a83c-13768dabd821"));
avroRecord.put("date", LocalDate.of(2024, 1, 1));
avroRecord.put("time", LocalTime.of(8, 30, 30));
avroRecord.put("timestamp", Instant.ofEpochSecond(1234567890L));
return avroRecord;
}

@Test
public void testComplexKey() throws IOException {
IndexedRecord key = createComplexRecord();
IndexedRecord value = createSimpleRecord();
Properties producerProps = createProducerProps(MOCK_URL);
KafkaProducer producer = createProducer(producerProps);
produce(producer, getTopic(), new Object[] { key }, new Object[] { value });
producer.close();

engine.init();
Observable<Map<String, Object>> obs = engine.start();
List<Map<String, Object>> lm = Lists.newArrayList(obs.blockingIterable().iterator());
Map<String, Object> row = lm.get(0);
Map<String, Object> m = (Map<String, Object>) row.get("rowkey");
assertNull(m.get("null"));
assertEquals(true, m.get("boolean"));
assertEquals(1, m.get("int"));
assertEquals(2L, m.get("long"));
assertEquals(3.0f, m.get("float"));
assertEquals(4.0d, m.get("double"));
assertEquals(Base64.getEncoder().encodeToString(new byte[]{0, 1, 2}), m.get("bytes"));
assertEquals("testUser", m.get("string"));
assertEquals("ONE", m.get("enum"));
assertEquals(ImmutableList.of("hi", "there"), m.get("array"));
assertEquals(ImmutableMap.of("bye", "there"), m.get("map"));
assertEquals("zap", m.get("nullable_string"));
assertEquals(123, m.get("union"));
assertEquals(Base64.getEncoder().encodeToString(new byte[]{0, 0, 0, 0}), m.get("fixed"));
assertEquals(new BigDecimal("123.45"), m.get("decimal"));
assertEquals(UUID.fromString("d21998e8-8737-432e-a83c-13768dabd821"), m.get("uuid"));
assertEquals(LocalDate.of(2024, 1, 1), m.get("date"));
assertEquals(LocalTime.of(8, 30, 30), m.get("time"));
assertEquals(Timestamp.from(Instant.ofEpochSecond(1234567890L)), m.get("timestamp"));
}

@Override
protected String getTopic() {
return "test-avro";
}

@Override
protected Class<?> getKeySerializer() {
return io.confluent.kafka.serializers.KafkaAvroSerializer.class;
}

@Override
protected Class<?> getValueSerializer() {
return io.confluent.kafka.serializers.KafkaAvroSerializer.class;
}

@Override
protected void injectKwackProperties(Properties props) {
super.injectKwackProperties(props);
props.put(KwackConfig.KEY_SERDES_CONFIG, getTopic() + "=latest");
}
}

0 comments on commit 1ed1493

Please sign in to comment.