Skip to content

Commit

Permalink
Add ";msg:<name>" option, add -x (--skip-bytes) option
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Jul 20, 2024
1 parent 99bbee1 commit e87e275
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 67 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ In-Memory Analytics for Kafka using DuckDB.
<id> (use schema id from SR)
Default for key: binary
Default for value: latest
The proto/latest/<id> serde formats can
also take a message type name, e.g.
proto:<schema|@file>;msg:<name>
in case multiple message types exist
-r, --schema-registry-url=<url> SR (Schema Registry) URL
-q, --query=<query> SQL query to execute. If none is specified,
interactive sqlline mode is used
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>io.kcache</groupId>
<artifactId>kwack</artifactId>
<version>0.3.0</version>
<version>0.4.0</version>
<packaging>jar</packaging>

<name>kwack</name>
Expand Down
65 changes: 27 additions & 38 deletions src/main/java/io/kcache/kwack/KwackConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public class KwackConfig extends KafkaCacheConfig {
public static final String DB_DOC = "DuckDB db, appended to 'jdbc:duckdb:'";
public static final String DB_DEFAULT = ":memory:";

public static final String SKIP_BYTES_CONFIG = "skip.bytes";
public static final String SKIP_BYTES_DOC =
"Extra bytes to skip when deserializing with an external schema";

public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
public static final String SSL_KEYSTORE_LOCATION_DOC =
"Location of the keystore file to use for SSL. This is required for HTTPS.";
Expand Down Expand Up @@ -267,6 +271,11 @@ public class KwackConfig extends KafkaCacheConfig {
DB_DEFAULT,
Importance.MEDIUM,
DB_DOC
).define(SKIP_BYTES_CONFIG,
Type.INT,
0,
Importance.LOW,
SKIP_BYTES_DOC
).define(
SSL_KEYSTORE_LOCATION_CONFIG,
Type.STRING,
Expand Down Expand Up @@ -447,6 +456,10 @@ public String getDbUrl() {
return "jdbc:duckdb:" + db;
}

public int getSkipBytes() {
return getInt(SKIP_BYTES_CONFIG);
}

private static String getDefaultHost() {
try {
return InetAddress.getLocalHost().getCanonicalHostName();
Expand Down Expand Up @@ -518,7 +531,7 @@ public static class Serde {
private final SerdeType serdeType;
private int id;
private final String schema;
private final String refs;
private final String msg;

public static final Serde KEY_DEFAULT = new Serde(SerdeType.BINARY);
public static final Serde VALUE_DEFAULT = new Serde(SerdeType.LATEST);
Expand All @@ -527,14 +540,14 @@ public Serde(String value) {
int id = 0;
String schema = null;
String format = value;
String refs = null;
String msg = null;
int index = value.indexOf(':');
if (index > 0) {
format = value.substring(0, index);
int lastIndex = value.lastIndexOf(";refs:");
int lastIndex = value.lastIndexOf(";msg:");
if (lastIndex > 0) {
schema = value.substring(index + 1, lastIndex);
refs = value.substring(lastIndex + ";refs:".length());
msg = value.substring(lastIndex + ";msg:".length());
} else {
schema = value.substring(index + 1);
}
Expand All @@ -554,18 +567,18 @@ public Serde(String value) {
this.serdeType = serdeType;
this.id = id;
this.schema = schema;
this.refs = refs;
this.msg = msg;
}

public Serde(SerdeType serdeType) {
this(serdeType, 0, null, null);
}

public Serde(SerdeType serdeType, int id, String schema, String refs) {
public Serde(SerdeType serdeType, int id, String schema, String msg) {
this.serdeType = serdeType;
this.id = id;
this.schema = schema;
this.refs = refs;
this.msg = msg;
}

public SerdeType getSerdeType() {
Expand Down Expand Up @@ -608,32 +621,8 @@ public String getSchema() {
}
}

public List<SchemaReference> getSchemaReferences() {
String str;
if (refs == null || refs.isEmpty()) {
return Collections.emptyList();
} else if (refs.startsWith("@")) {
String file = schema.substring(1);
try {
str = Files.readString(Paths.get(file));
} catch (IOException e) {
throw new IllegalArgumentException("Could not read file: " + file);
}
} else {
str = refs;
}
return parseRefs(str);
}

private static List<SchemaReference> parseRefs(String str) {
List<SchemaReference> list;
try {
list = objectMapper.readValue(str, new TypeReference<>() {
});
} catch (Exception e) {
throw new ConfigException("Could not parse refs " + str, e);
}
return list;
public String getMessage() {
return msg;
}

@Override
Expand All @@ -644,12 +633,12 @@ public boolean equals(Object o) {
return id == serde.id
&& serdeType == serde.serdeType
&& Objects.equals(schema, serde.schema)
&& Objects.equals(refs, serde.refs);
&& Objects.equals(msg, serde.msg);
}

@Override
public int hashCode() {
return Objects.hash(serdeType, id, schema, refs);
return Objects.hash(serdeType, id, schema, msg);
}

@Override
Expand All @@ -664,9 +653,9 @@ public String toString() {
sb.append(serdeType);
sb.append(":");
sb.append(schema);
if (refs != null && !refs.isEmpty()) {
sb.append(";refs:");
sb.append(refs);
if (msg != null && !msg.isEmpty()) {
sb.append(";msg:");
sb.append(msg);
}
return sb.toString();
default:
Expand Down
48 changes: 34 additions & 14 deletions src/main/java/io/kcache/kwack/KwackEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import static io.kcache.kwack.schema.ColumnStrategy.NULL_STRATEGY;

import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.kcache.CacheUpdateHandler;
Expand Down Expand Up @@ -108,7 +109,6 @@
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import sqlline.BuiltInProperty;
import sqlline.SqlLine;
import sqlline.SqlLine.Status;

Expand All @@ -120,7 +120,7 @@ public class KwackEngine implements Configurable, Closeable {
public static final String ROWVAL = "rowval";
public static final String ROWINFO = "rowinfo";

private static final int ZERO_BYTE = 0x0;
private static final int MAGIC_BYTE = 0x0;

private KwackConfig config;
private DuckDBConnection conn;
Expand Down Expand Up @@ -355,25 +355,34 @@ private Tuple2<Serde, ParsedSchema> getSchema(String subject, Serde serde) {
case AVRO:
case JSON:
case PROTO:
return parseSchema(serde).map(s -> Tuple.of(serde, s))
return parseSchema(serde).map(s -> createTuple(serde, s))
.orElseGet(() -> Tuple.of(new Serde(SerdeType.BINARY), null));
case LATEST:
return getLatestSchema(subject).map(s -> Tuple.of(serde, s))
return getLatestSchema(subject).map(s -> createTuple(serde, s))
.orElseGet(() -> Tuple.of(new Serde(SerdeType.BINARY), null));
case ID:
return getSchemaById(serde.getId()).map(s -> Tuple.of(serde, s))
return getSchemaById(serde.getId()).map(s -> createTuple(serde, s))
.orElseGet(() -> Tuple.of(new Serde(SerdeType.BINARY), null));
default:
throw new IllegalArgumentException("Illegal serde type: " + serde.getSerdeType());
}
}

private Tuple2<Serde, ParsedSchema> createTuple(Serde serde, ParsedSchema schema) {
if (serde.getMessage() != null
&& !serde.getMessage().isEmpty()
&& schema instanceof ProtobufSchema) {
ProtobufSchema protobufSchema = (ProtobufSchema) schema;
schema = protobufSchema.copy(serde.getMessage());
}
return Tuple.of(serde, schema);
}

public Optional<ParsedSchema> parseSchema(Serde serde) {
String schemaType = serde.getSchemaType();
String schema = serde.getSchema();
List<SchemaReference> references = serde.getSchemaReferences();
try {
Schema s = new Schema(null, null, null, schemaType, references, schema);
Schema s = new Schema(null, null, null, schemaType, null, schema);
ParsedSchema parsedSchema =
getSchemaProvider(schemaType).parseSchemaOrElseThrow(s, false, false);
parsedSchema.validate(false);
Expand Down Expand Up @@ -431,14 +440,25 @@ private Tuple2<Context, Object> deserialize(boolean isKey, String topic, byte[]

Deserializer<?> deserializer = getDeserializer(schema);

if (serde.usesExternalSchema()) {
if (serde.usesExternalSchema() || config.getSkipBytes() > 0) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
out.write(ZERO_BYTE);
out.write(ByteBuffer.allocate(4).putInt(serde.getId()).array());
if (serde.getSerdeType() == SerdeType.PROTO) {
out.write(ZERO_BYTE); // assume message type is first in schema
if (serde.usesExternalSchema()) {
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(4).putInt(serde.getId()).array());
if (serde.getSerdeType() == SerdeType.PROTO) {
MessageIndexes indexes;
if (serde.getMessage() != null && !serde.getMessage().isEmpty()) {
ProtobufSchema protobufSchema = (ProtobufSchema) schema._2;
indexes = protobufSchema.toMessageIndexes(serde.getMessage());
} else {
// assume message type is first in schema
indexes = new MessageIndexes(Collections.singletonList(0));
}
out.write(indexes.toByteArray());
}
}
out.write(bytes);
int skipBytes = config.getSkipBytes();
out.write(bytes, skipBytes, bytes.length - skipBytes);
bytes = out.toByteArray();
}
}
Expand Down
19 changes: 18 additions & 1 deletion src/main/java/io/kcache/kwack/KwackMain.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kcache.kwack;

import static io.kcache.kwack.KwackEngine.MOCK_SR;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kcache.KafkaCacheConfig;
import io.kcache.kwack.KwackConfig.ListPropertyParser;
Expand Down Expand Up @@ -90,7 +92,12 @@ public class KwackMain implements Callable<Integer> {
+ " latest (use latest version in SR) |\n"
+ " <id> (use schema id from SR)\n"
+ " Default for key: binary\n"
+ " Default for value: latest",
+ " Default for value: latest\n"
+ "The proto/latest/<id> serde formats can\n"
+ "also take a message type name, e.g.\n"
+ " proto:<schema|@file>;msg:<name>\n"
+ "in case multiple message types exist",

paramLabel = "<topic=serde>")
private Map<String, KwackConfig.Serde> valueSerdes;

Expand Down Expand Up @@ -123,6 +130,11 @@ public class KwackMain implements Callable<Integer> {
description = "DuckDB db, appended to 'jdbc:duckdb:' Default: :memory:", paramLabel = "<db>")
private String db;

@Option(names = {"-x", "--skip-bytes"},
description = "Extra bytes to skip when deserializing with an external schema",
paramLabel = "<bytes>")
private Integer bytesToSkip;

@Option(names = {"-X", "--property"},
description = "Set configuration property.", paramLabel = "<prop=val>")
private Map<String, String> properties;
Expand Down Expand Up @@ -204,8 +216,13 @@ private KwackConfig updateConfig() {
if (db != null) {
props.put(KwackConfig.DB_CONFIG, db);
}
if (bytesToSkip != null) {
props.put(KwackConfig.SKIP_BYTES_CONFIG, String.valueOf(bytesToSkip));
}
if (schemaRegistryUrl != null) {
props.put(KwackConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
} else {
props.put(KwackConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SR);
}
if (properties != null) {
props.putAll(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,7 @@ public class ProtobufTransformer implements Transformer {
public ColumnDef schemaToColumnDef(Context ctx, ParsedSchema parsedSchema) {
ProtobufSchema protobufSchema = (ProtobufSchema) parsedSchema;
Descriptor descriptor = protobufSchema.toDescriptor();
FileDescriptor fileDescriptor = descriptor.getFile();
List<Descriptor> messageTypes = fileDescriptor.getMessageTypes();
if (messageTypes.size() == 1) {
return schemaToColumnDef(ctx, descriptor);
}
LinkedHashMap<String, ColumnDef> columnDefs = new LinkedHashMap<>();
for (Descriptor messageType : messageTypes) {
columnDefs.put(messageType.getName(), schemaToColumnDef(ctx, messageType));
}
return new UnionColumnDef(columnDefs, NULL_STRATEGY);
return schemaToColumnDef(ctx, descriptor);
}

private ColumnDef schemaToColumnDef(Context ctx, Descriptor descriptor) {
Expand Down Expand Up @@ -358,11 +349,15 @@ private Object messageToColumn(
continue;
}
if (msg.hasOneof(oneOfDescriptor)) {
UnionColumnDef uColDef = (UnionColumnDef)
structColumnDef.getColumnDefs().get(oneOfDescriptor.getName());
FieldDescriptor fd = msg.getOneofFieldDescriptor(oneOfDescriptor);
String uBranch = fd.getName();
ctx.putUnionBranch(uColDef, uBranch);
Object obj = msg.getField(fd);
if (obj != null) {
attributes.add(messageToColumn(ctx, obj,
structColumnDef.getColumnDefs().get(oneOfDescriptor.getName())));
uColDef.getColumnDefs().get(uBranch)));
} else {
attributes.add(null);
}
Expand Down
1 change: 0 additions & 1 deletion src/test/java/io/kcache/kwack/ProtobufNoSRTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ public void testComplex() throws IOException {
Observable<Map<String, Object>> obs = engine.start();
List<Map<String, Object>> lm = Lists.newArrayList(obs.blockingIterable().iterator());
Map<String, Object> m = lm.get(0);
m = (Map<String, Object>) m.get("rowval");
assertEquals("test", m.get("name"));
assertEquals("testUser", m.get("mystring"));
assertEquals(Base64.getEncoder().encodeToString(new byte[]{0, 1, 2}), m.get("mybytes"));
Expand Down
1 change: 0 additions & 1 deletion src/test/java/io/kcache/kwack/ProtobufTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public void testComplex() throws IOException {
Observable<Map<String, Object>> obs = engine.start();
List<Map<String, Object>> lm = Lists.newArrayList(obs.blockingIterable().iterator());
Map<String, Object> m = lm.get(0);
m = (Map<String, Object>) m.get("rowval");
assertEquals("test", m.get("name"));
assertEquals("testUser", m.get("mystring"));
assertEquals(Base64.getEncoder().encodeToString(new byte[]{0, 1, 2}), m.get("mybytes"));
Expand Down

0 comments on commit e87e275

Please sign in to comment.