diff --git a/src/main/java/io/kcache/kwack/KwackEngine.java b/src/main/java/io/kcache/kwack/KwackEngine.java index 4a0117e..7b37975 100644 --- a/src/main/java/io/kcache/kwack/KwackEngine.java +++ b/src/main/java/io/kcache/kwack/KwackEngine.java @@ -621,7 +621,7 @@ private void initTable(DuckDBConnection conn, String topic) { try { conn.createStatement().execute(ddl); } catch (SQLException e) { - LOG.warn("Could not execute DDL: {}", e.getMessage()); + // ignore, as type may already exist if more than one topic is being processed } } @@ -801,8 +801,9 @@ public void handleUpdate(Headers headers, if (valueSerde.usesSchemaRegistry()) { valueSchemaId = schemaIdFor(value.get()); } + Object originalKey = keyObj._1 != null ? keyObj._1.getOriginalMessage() : null; valueObj = deserializeValue( - topic, keyObj._1.getOriginalMessage(), value != null ? value.get() : null); + topic, originalKey, value != null ? value.get() : null); Struct rowInfo = null; if (rowInfoSize > 0) {