Skip to content

Commit

Permalink
Minor fix to deserializer cache
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Jul 27, 2024
1 parent 2ef627a commit 127e0f7
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>io.kcache</groupId>
<artifactId>kwack</artifactId>
<version>0.7.0</version>
<version>0.8.0</version>
<packaging>jar</packaging>

<name>kwack</name>
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/io/kcache/kwack/KwackEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.reactivex.rxjava3.core.Observable;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import io.vavr.Tuple3;
import java.io.ByteArrayOutputStream;
import java.io.UncheckedIOException;
import java.sql.Blob;
Expand Down Expand Up @@ -132,7 +133,7 @@ public class KwackEngine implements Configurable, Closeable {
private Map<String, Serde> valueSerdes;
private final Map<String, ColumnDef> keyColDefs = new HashMap<>();
private final Map<String, ColumnDef> valueColDefs = new HashMap<>();
private final Map<Tuple2<Serde, ParsedSchema>, Deserializer<?>> deserializers = new HashMap<>();
private final Map<Tuple3<Boolean, Serde, ParsedSchema>, Deserializer<?>> deserializers = new HashMap<>();
private final Map<ParsedSchema, ColumnDef> columnDefs = new HashMap<>();
private String query;
private EnumSet<RowAttribute> rowAttributes;
Expand Down Expand Up @@ -391,7 +392,7 @@ public Optional<ParsedSchema> parseSchema(Serde serde) {
serde.setId(id);
return Optional.of(parsedSchema);
} catch (Exception e) {
LOG.error("Could not parse schema " + schema, e);
LOG.error("Could not parse schema {}", schema, e);
return Optional.empty();
}
}
Expand Down Expand Up @@ -494,7 +495,8 @@ private Tuple2<Context, Object> deserialize(boolean isKey, String topic, byte[]
}

public Deserializer<?> getDeserializer(boolean isKey, Tuple2<Serde, ParsedSchema> schema) {
return deserializers.computeIfAbsent(schema, k -> createDeserializer(isKey, schema));
return deserializers.computeIfAbsent(Tuple.of(isKey, schema._1, schema._2),
k -> createDeserializer(isKey, schema));
}

private Deserializer<?> createDeserializer(boolean isKey, Tuple2<Serde, ParsedSchema> schema) {
Expand Down
1 change: 0 additions & 1 deletion src/test/java/io/kcache/kwack/AvroKeyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
Expand Down

0 comments on commit 127e0f7

Please sign in to comment.