diff --git a/runtime/binding-asyncapi/pom.xml b/runtime/binding-asyncapi/pom.xml
index 560c92cc56..3e6d3b2c3a 100644
--- a/runtime/binding-asyncapi/pom.xml
+++ b/runtime/binding-asyncapi/pom.xml
@@ -107,6 +107,12 @@
${project.version}
provided
+
+ io.aklivity.zilla
+ catalog-karapace
+ ${project.version}
+ provided
+
io.aklivity.zilla
model-core
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpProtocol.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpProtocol.java
index 5eb6e55b49..960ae3c73b 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpProtocol.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiHttpProtocol.java
@@ -28,7 +28,6 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiSchema;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiServer;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView;
-import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.binding.http.config.HttpAuthorizationConfig;
import io.aklivity.zilla.runtime.binding.http.config.HttpConditionConfig;
@@ -148,6 +147,7 @@ else if ("sse".equals(server.protocol()))
.exit("sse_server0")
.when(HttpConditionConfig::builder)
.header(":path", path)
+ .header(":method", "GET")
.build()
.build();
}
@@ -209,18 +209,13 @@ private HttpRequestConfigBuilder injectContent(
{
if (messages != null)
{
- for (Map.Entry messageEntry : messages.entrySet())
- {
- AsyncapiMessageView message =
- AsyncapiMessageView.of(asyncapi.components.messages, messageEntry.getValue());
- request.
+ request.
content(JsonModelConfig::builder)
- .catalog()
- .name(INLINE_CATALOG_NAME)
- .inject(cataloged -> injectSchema(cataloged, asyncapi, message))
- .build()
- .build();
- }
+ .catalog()
+ .name(INLINE_CATALOG_NAME)
+ .inject(cataloged -> injectValueSchemas(cataloged, asyncapi, messages))
+ .build()
+ .build();
}
return request;
}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java
index a9a1675075..b31d275e4a 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiNamespaceGenerator.java
@@ -14,17 +14,18 @@
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.config;
-import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.MINIMIZE_QUOTES;
-import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER;
import static io.aklivity.zilla.runtime.common.feature.FeatureFilter.featureEnabled;
import static java.util.stream.Collectors.toList;
import static org.agrona.LangUtil.rethrowUnchecked;
import java.io.StringReader;
import java.io.StringWriter;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.ToLongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -33,20 +34,17 @@
import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;
+import jakarta.json.JsonValue;
import jakarta.json.JsonWriter;
import jakarta.json.bind.Jsonb;
import jakarta.json.bind.JsonbBuilder;
import org.agrona.collections.MutableInteger;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
-
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiServerConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiKafkaServerBindings;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiSchemaItem;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiServer;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiTrait;
@@ -54,6 +52,7 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.catalog.inline.config.InlineOptionsConfig;
import io.aklivity.zilla.runtime.catalog.inline.config.InlineSchemaConfigBuilder;
+import io.aklivity.zilla.runtime.catalog.karapace.config.KarapaceOptionsConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
@@ -63,8 +62,9 @@
public abstract class AsyncapiNamespaceGenerator
{
- protected static final String INLINE_CATALOG_NAME = "catalog0";
+ protected static final String CATALOG_NAME = "catalog0";
protected static final String INLINE_CATALOG_TYPE = "inline";
+ protected static final String KARAPACE_CATALOG_TYPE = "karapace";
protected static final String VERSION_LATEST = "latest";
protected static final AsyncapiOptionsConfig EMPTY_OPTION =
new AsyncapiOptionsConfig(null, null, null, null, null, null, null);
@@ -218,13 +218,33 @@ protected NamespaceConfigBuilder injectCatalog(
NamespaceConfigBuilder namespace,
List asyncapis)
{
- final boolean injectCatalog = asyncapis.stream()
- .anyMatch(a -> a.components != null && a.components.schemas != null && !a.components.schemas.isEmpty());
- if (injectCatalog)
+ Optional server = asyncapis.stream()
+ .filter(a -> a.servers.entrySet().stream().anyMatch(s -> s.getValue().bindings != null &&
+ s.getValue().bindings.kafka != null))
+ .map(s -> s.servers.values())
+ .flatMap(Collection::stream)
+ .filter(s -> s.bindings.kafka != null)
+ .findFirst();
+ final boolean injectCatalog = asyncapis.stream().anyMatch(AsyncapiNamespaceGenerator::hasSchemas);
+ if (server.isPresent())
+ {
+ AsyncapiKafkaServerBindings kafka = server.get().bindings.kafka;
+ namespace
+ .catalog()
+ .name(CATALOG_NAME)
+ .type(KARAPACE_CATALOG_TYPE)
+ .options(KarapaceOptionsConfig::builder)
+ .url(kafka.schemaRegistryUrl)
+ .context("default")
+ .maxAge(Duration.ofHours(1))
+ .build()
+ .build();
+ }
+ else if (injectCatalog)
{
namespace
.catalog()
- .name(INLINE_CATALOG_NAME)
+ .name(CATALOG_NAME)
.type(INLINE_CATALOG_TYPE)
.options(InlineOptionsConfig::builder)
.subjects()
@@ -242,14 +262,10 @@ protected InlineSchemaConfigBuilder injectSubjects(
{
for (Asyncapi asyncapi : asyncapis)
{
- if (asyncapi.components != null && asyncapi.components.schemas != null && !asyncapi.components.schemas.isEmpty())
+ if (hasSchemas(asyncapi))
{
try (Jsonb jsonb = JsonbBuilder.create())
{
- YAMLMapper yaml = YAMLMapper.builder()
- .disable(WRITE_DOC_START_MARKER)
- .enable(MINIMIZE_QUOTES)
- .build();
for (Map.Entry entry : asyncapi.components.schemas.entrySet())
{
AsyncapiSchemaView schema = AsyncapiSchemaView.of(asyncapi.components.schemas, entry.getValue());
@@ -257,7 +273,7 @@ protected InlineSchemaConfigBuilder injectSubjects(
subjects
.subject(entry.getKey())
.version(VERSION_LATEST)
- .schema(writeSchemaYaml(jsonb, yaml, schema))
+ .schema(writeSchemaJson(jsonb, schema))
.build();
}
if (asyncapi.components.messageTraits != null)
@@ -268,7 +284,7 @@ protected InlineSchemaConfigBuilder injectSubjects(
subjects
.subject(k)
.version(VERSION_LATEST)
- .schema(writeSchemaYaml(jsonb, yaml, v))
+ .schema(writeSchemaJson(jsonb, v))
.build());
}
}
@@ -282,39 +298,40 @@ protected InlineSchemaConfigBuilder injectSubjects(
return subjects;
}
- protected static String writeSchemaYaml(
+ private static boolean hasSchemas(
+ Asyncapi asyncapi)
+ {
+ return asyncapi.components != null &&
+ asyncapi.components.schemas != null &&
+ !asyncapi.components.schemas.isEmpty();
+ }
+
+ protected static String writeSchemaJson(
Jsonb jsonb,
- YAMLMapper yaml,
Object schema)
{
- String result = null;
- try
- {
- String schemaJson = jsonb.toJson(schema);
+ String schemaJson = jsonb.toJson(schema);
- JsonReader reader = Json.createReader(new StringReader(schemaJson));
- JsonObject jsonObject = reader.readObject();
+ JsonReader reader = Json.createReader(new StringReader(schemaJson));
+ JsonValue jsonValue = reader.readValue();
- JsonObject modifiedJsonObject = jsonObject.getJsonObject("schema");
+ if (jsonValue instanceof JsonObject)
+ {
+ JsonObject jsonObject = (JsonObject) jsonValue;
- if (modifiedJsonObject != null)
+ if (jsonObject.containsKey("schema"))
{
+ JsonValue modifiedJsonValue = jsonObject.get("schema");
StringWriter stringWriter = new StringWriter();
JsonWriter jsonWriter = Json.createWriter(stringWriter);
- jsonWriter.writeObject(modifiedJsonObject);
+ jsonWriter.write(modifiedJsonValue);
jsonWriter.close();
schemaJson = stringWriter.toString();
}
-
- JsonNode json = new ObjectMapper().readTree(schemaJson);
- result = yaml.writeValueAsString(json);
- }
- catch (JsonProcessingException ex)
- {
- rethrowUnchecked(ex);
}
- return result;
+
+ return schemaJson;
}
protected BindingConfigBuilder injectMetrics(
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProtocol.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProtocol.java
index 239a6fbb97..015de7cd4a 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProtocol.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiProtocol.java
@@ -25,6 +25,7 @@
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiSchemaView;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
@@ -116,7 +117,20 @@ public BindingConfigBuilder injectProtocolClientOptions(
return binding;
}
- protected CatalogedConfigBuilder injectSchema(
+ protected CatalogedConfigBuilder injectKeySchema(
+ CatalogedConfigBuilder cataloged,
+ Asyncapi asyncapi,
+ AsyncapiMessageView message)
+ {
+ String schema = AsyncapiSchemaView.of(asyncapi.components.schemas, message.key()).refKey();
+ cataloged.schema()
+ .version(VERSION_LATEST)
+ .subject(schema)
+ .build();
+ return cataloged;
+ }
+
+ protected CatalogedConfigBuilder injectValueSchema(
CatalogedConfigBuilder cataloged,
Asyncapi asyncapi,
AsyncapiMessageView message)
@@ -129,6 +143,25 @@ protected CatalogedConfigBuilder injectSchema(
return cataloged;
}
+ protected CatalogedConfigBuilder injectValueSchemas(
+ CatalogedConfigBuilder cataloged,
+ Asyncapi asyncapi,
+ Map messages)
+ {
+ for (Map.Entry messageEntry : messages.entrySet())
+ {
+ AsyncapiMessageView message =
+ AsyncapiMessageView.of(asyncapi.components.messages, messageEntry.getValue());
+ String schema = AsyncapiSchemaView.of(asyncapi.components.schemas, message.payload()).refKey();
+ cataloged.schema()
+ .version(VERSION_LATEST)
+ .subject(schema)
+ .build();
+ }
+
+ return cataloged;
+ }
+
protected ModelConfig injectModel(
Asyncapi asyncapi,
AsyncapiMessageView message)
@@ -145,25 +178,28 @@ else if (jsonContentType.reset(contentType).matches())
model = JsonModelConfig.builder()
.catalog()
.name(INLINE_CATALOG_NAME)
- .inject(catalog -> injectSchema(catalog, asyncapi, message))
+ .inject(catalog -> injectValueSchema(catalog, asyncapi, message))
.build()
.build();
}
else if (avroContentType.reset(contentType).matches())
{
model = AvroModelConfig.builder()
+ .view("json")
.catalog()
.name(INLINE_CATALOG_NAME)
- .inject(catalog -> injectSchema(catalog, asyncapi, message))
+ .inject(catalog -> injectValueSchema(catalog, asyncapi, message))
.build()
.build();
}
else if (protobufContentType.reset(contentType).matches())
{
model = ProtobufModelConfig.builder()
+ .view("json")
.catalog()
.name(INLINE_CATALOG_NAME)
- .inject(catalog -> injectSchema(catalog, asyncapi, message))
+ .inject(catalog -> injectValueSchema(catalog, asyncapi, message))
+
.build()
.build();
}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiSseProtocol.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiSseProtocol.java
index 4ce76db4ac..a9505a137e 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiSseProtocol.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiSseProtocol.java
@@ -26,7 +26,6 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView;
-import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView;
import io.aklivity.zilla.runtime.binding.sse.config.SseConditionConfig;
import io.aklivity.zilla.runtime.binding.sse.config.SseOptionsConfig;
import io.aklivity.zilla.runtime.binding.sse.config.SseOptionsConfigBuilder;
@@ -35,6 +34,7 @@
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;
+import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig;
@Incubating
public class AsyncapiSseProtocol extends AsyncapiProtocol
@@ -172,15 +172,12 @@ private SsePathConfigBuilder injectValue(
{
if (messages != null)
{
- for (Map.Entry messageEntry : messages.entrySet())
- {
- AsyncapiMessageView message =
- AsyncapiMessageView.of(asyncapi.components.messages, messageEntry.getValue());
- if (message.payload() != null)
- {
- request.content(injectModel(asyncapi, message));
- }
- }
+ request.content(JsonModelConfig::builder)
+ .catalog()
+ .name(INLINE_CATALOG_NAME)
+ .inject(cataloged -> injectValueSchemas(cataloged, asyncapi, messages))
+ .build()
+ .build();
}
return request;
}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AyncapiKafkaProtocol.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AyncapiKafkaProtocol.java
index 0d17b133bd..48f31317ff 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AyncapiKafkaProtocol.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AyncapiKafkaProtocol.java
@@ -36,7 +36,12 @@
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.KindConfig;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
+import io.aklivity.zilla.runtime.engine.config.ModelConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;
+import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig;
+import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig;
+import io.aklivity.zilla.runtime.model.protobuf.config.ProtobufModelConfig;
+
public class AyncapiKafkaProtocol extends AsyncapiProtocol
{
@@ -153,6 +158,9 @@ private KafkaOptionsConfigBuilder injectKafkaTopicOptions(
AsyncapiOperation operation = asyncapi.operations.get(name);
AsyncapiChannelView channel = AsyncapiChannelView.of(asyncapi.channels, operation.channel);
String topic = channel.address();
+ String replyTo = operation.reply != null
+ ? AsyncapiChannelView.of(asyncapi.channels, operation.reply.channel).address()
+ : null;
if (channel.messages() != null && !channel.messages().isEmpty() ||
channel.parameters() != null && !channel.parameters().isEmpty())
@@ -167,9 +175,29 @@ private KafkaOptionsConfigBuilder injectKafkaTopicOptions(
.topic(KafkaTopicConfig::builder)
.name(topic)
.inject(topicConfig -> injectHeader(topicConfig, kafkaTopic))
+ .inject(topicConfig -> injectKey(topicConfig, asyncapi, channel.messages()))
.inject(topicConfig -> injectValue(topicConfig, asyncapi, channel.messages()))
.build()
.build();
+
+ if (replyTo != null)
+ {
+ KafkaTopicConfig kafkaReply = kafka != null && kafka.topics != null
+ ? kafka.topics.stream()
+ .filter(t -> t.name.equals(replyTo))
+ .findFirst()
+ .orElse(null)
+ : null;
+ AsyncapiChannelView replyView = AsyncapiChannelView.of(asyncapi.channels, operation.reply.channel);
+
+ options
+ .topic(KafkaTopicConfig::builder)
+ .name(replyTo)
+ .inject(topicConfig -> injectHeader(topicConfig, kafkaReply))
+ .inject(topicConfig -> injectValue(topicConfig, asyncapi, replyView.messages()))
+ .build()
+ .build();
+ }
}
}
}
@@ -195,13 +223,13 @@ private KafkaTopicConfigBuilder injectHeader(
{
if (kafkaTopic != null)
{
- kafkaTopic.headers.forEach(h -> topic.header(h.name.asString(), h.path));
+ kafkaTopic.headers.forEach(h -> topic.header(h.name, h.path));
}
return topic;
}
- private KafkaTopicConfigBuilder injectValue(
+ private KafkaTopicConfigBuilder injectKey(
KafkaTopicConfigBuilder topic,
Asyncapi asyncapi,
Map messages)
@@ -212,11 +240,64 @@ private KafkaTopicConfigBuilder injectValue(
{
AsyncapiMessageView message =
AsyncapiMessageView.of(asyncapi.components.messages, messageEntry.getValue());
- if (message.payload() != null)
+ if (message.key() != null)
+ {
+ topic.key(AvroModelConfig.builder()
+ .catalog()
+ .name(INLINE_CATALOG_NAME)
+ .inject(catalog -> injectKeySchema(catalog, asyncapi, message))
+ .build()
+ .build());
+ }
+ }
+ }
+ return topic;
+ }
+
+ private KafkaTopicConfigBuilder injectValue(
+ KafkaTopicConfigBuilder topic,
+ Asyncapi asyncapi,
+ Map messages)
+ {
+ if (messages != null)
+ {
+ AsyncapiMessageView message =
+ AsyncapiMessageView.of(asyncapi.components.messages, messages.values().stream().findFirst().get());
+ String contentType = message.contentType() == null ? asyncapi.defaultContentType : message.contentType();
+ ModelConfig model = null;
+ if (contentType != null)
+ {
+ if (jsonContentType.reset(contentType).matches())
{
- topic.value(injectModel(asyncapi, message));
+ model = JsonModelConfig.builder()
+ .catalog()
+ .name(INLINE_CATALOG_NAME)
+ .inject(catalog -> injectValueSchemas(catalog, asyncapi, messages))
+ .build()
+ .build();
+ }
+ else if (avroContentType.reset(contentType).matches())
+ {
+ model = AvroModelConfig.builder()
+ .view("json")
+ .catalog()
+ .name(INLINE_CATALOG_NAME)
+ .inject(catalog -> injectValueSchemas(catalog, asyncapi, messages))
+ .build()
+ .build();
+ }
+ else if (protobufContentType.reset(contentType).matches())
+ {
+ model = ProtobufModelConfig.builder()
+ .view("json")
+ .catalog()
+ .name(INLINE_CATALOG_NAME)
+ .inject(catalog -> injectValueSchemas(catalog, asyncapi, messages))
+ .build()
+ .build();
}
}
+ topic.value(model);
}
return topic;
}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiKafkaMessageBinding.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiKafkaMessageBinding.java
new file mode 100644
index 0000000000..60d5fa7326
--- /dev/null
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiKafkaMessageBinding.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.binding.asyncapi.internal.model;
+
+public class AsyncapiKafkaMessageBinding
+{
+ public AsyncapiMultiFormatSchema key;
+}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiKafkaServerBindings.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiKafkaServerBindings.java
new file mode 100644
index 0000000000..3df459793f
--- /dev/null
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiKafkaServerBindings.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.binding.asyncapi.internal.model;
+
+public class AsyncapiKafkaServerBindings
+{
+ public String schemaRegistryUrl;
+ public String schemaRegistryVendor;
+ public String bindingVersion;
+}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java
index d93614ba2b..ff2299e232 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessage.java
@@ -27,6 +27,7 @@ public class AsyncapiMessage
public AsyncapiSchemaItem payload;
public List traits;
public AsyncapiCorrelationId correlationId;
+ public AsyncapiMessageBindings bindings;
@JsonbProperty("$ref")
public String ref;
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessageBindings.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessageBindings.java
new file mode 100644
index 0000000000..1d1f806751
--- /dev/null
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiMessageBindings.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.binding.asyncapi.internal.model;
+
+public class AsyncapiMessageBindings
+{
+ public AsyncapiKafkaMessageBinding kafka;
+}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiSchemasPayloadDeserializer.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiSchemasPayloadDeserializer.java
index 893bc48672..58720aab86 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiSchemasPayloadDeserializer.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiSchemasPayloadDeserializer.java
@@ -30,10 +30,7 @@ public class AsyncapiSchemasPayloadDeserializer implements JsonbDeserializer deserialize(
- JsonParser parser,
- DeserializationContext ctx,
- Type rtType)
+ public Map deserialize(JsonParser parser, DeserializationContext ctx, Type rtType)
{
Map result = new HashMap<>();
Jsonb jsonb = JsonbBuilder.create();
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiServer.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiServer.java
index 96120ac88f..d734bd8146 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiServer.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiServer.java
@@ -26,4 +26,5 @@ public class AsyncapiServer
public String protocol;
public JsonArray security;
public Map variables;
+ public AsyncapiServerBindings bindings;
}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiServerBindings.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiServerBindings.java
new file mode 100644
index 0000000000..c02b966131
--- /dev/null
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/model/AsyncapiServerBindings.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.binding.asyncapi.internal.model;
+
+public class AsyncapiServerBindings
+{
+ public AsyncapiKafkaServerBindings kafka;
+}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java
index d74d06482f..f3e1d1df6d 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiCorrelationIdView.java
@@ -56,7 +56,7 @@ private AsyncapiCorrelationIdView(
Map correlationIds,
AsyncapiCorrelationId correlationId)
{
- super(correlationIds, "#/components/correlationIds/(\\w+)");
+ super(correlationIds, "#/components/correlationIds/(.+)");
if (correlationId.ref != null)
{
correlationId = resolveRef(correlationId.ref);
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java
index 4cf620e50f..6772f11c15 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiMessageView.java
@@ -19,6 +19,7 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiCorrelationId;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage;
+import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMultiFormatSchema;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiSchema;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiSchemaItem;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiTrait;
@@ -42,10 +43,18 @@ public String contentType()
return message.contentType;
}
+ public AsyncapiMultiFormatSchema key()
+ {
+ return message.bindings != null && message.bindings.kafka != null
+ ? message.bindings.kafka.key
+ : null;
+ }
+
public AsyncapiSchemaItem payload()
{
- return (AsyncapiSchemaItem) message.payload;
+ return message.payload;
}
+
public List traits()
{
return message.traits;
@@ -67,7 +76,7 @@ private AsyncapiMessageView(
Map messages,
AsyncapiMessage message)
{
- super(messages, "#/components/messages/(\\w+)");
+ super(messages, "#/components/messages/(.+)");
this.message = message.ref == null ? message : resolveRef(message.ref);
}
}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiSchemaView.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiSchemaView.java
index 3c2d4782cc..d5077248f0 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiSchemaView.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiSchemaView.java
@@ -96,7 +96,7 @@ private AsyncapiSchemaView(
Map schemas,
AsyncapiSchemaItem schema)
{
- super(schemas, "#/components/schemas/(\\w+)");
+ super(schemas, "#/components/schemas/(.+)");
if (schema.ref != null)
{
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiTraitView.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiTraitView.java
index 38a4ec4e7f..6330bad890 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiTraitView.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiTraitView.java
@@ -44,7 +44,7 @@ private AsyncapiTraitView(
Map traits,
AsyncapiTrait trait)
{
- super(traits, "#/components/messageTraits/(\\w+)");
+ super(traits, "#/components/messageTraits/(.+)");
this.trait = trait.ref == null ? trait : resolveRef(trait.ref);
}
}
diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiVariableView.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiVariableView.java
index 33a43e39fc..0240f0bb58 100644
--- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiVariableView.java
+++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/view/AsyncapiVariableView.java
@@ -48,7 +48,7 @@ private AsyncapiVariableView(
Map variables,
AsyncapiVariable variable)
{
- super(variables, "#/components/serverVariables/(\\w+)");
+ super(variables, "#/components/serverVariables/(.+)");
this.variable = variable.ref == null ? variable : resolveRef(variable.ref);
}
}
diff --git a/runtime/binding-asyncapi/src/main/moditect/module-info.java b/runtime/binding-asyncapi/src/main/moditect/module-info.java
index 7c6eaa0dc3..b0d205b370 100644
--- a/runtime/binding-asyncapi/src/main/moditect/module-info.java
+++ b/runtime/binding-asyncapi/src/main/moditect/module-info.java
@@ -27,6 +27,7 @@
requires io.aklivity.zilla.runtime.binding.tcp;
requires io.aklivity.zilla.runtime.binding.tls;
requires io.aklivity.zilla.runtime.catalog.inline;
+ requires io.aklivity.zilla.runtime.catalog.karapace;
requires io.aklivity.zilla.runtime.guard.jwt;
requires io.aklivity.zilla.runtime.vault.filesystem;
requires io.aklivity.zilla.runtime.model.core;
diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/config/KafkaTopicConfigBuilder.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/config/KafkaTopicConfigBuilder.java
index 88994a9923..f2b4bede76 100644
--- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/config/KafkaTopicConfigBuilder.java
+++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/config/KafkaTopicConfigBuilder.java
@@ -15,8 +15,6 @@
*/
package io.aklivity.zilla.runtime.binding.kafka.config;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
@@ -25,7 +23,6 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType;
-import io.aklivity.zilla.runtime.binding.kafka.internal.types.String32FW;
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.ModelConfig;
@@ -33,9 +30,12 @@ public final class KafkaTopicConfigBuilder extends ConfigBuilder mapper;
private String name;
private KafkaOffsetType defaultOffset;
@@ -50,6 +50,7 @@ public final class KafkaTopicConfigBuilder extends ConfigBuilder();
this.matcher = PATH_PATTERN.matcher("");
+ this.internalMatcher = INTERNAL_PATH_PATTERN.matcher("");
}
@Override
@@ -104,8 +105,12 @@ public KafkaTopicConfigBuilder header(
}
if (matcher.reset(path).matches())
{
- this.headers.add(new KafkaTopicHeaderType(new String32FW(name, UTF_8),
- String.format(INTERNAL_PATH_PATTERN, matcher.group(1))));
+ this.headers.add(new KafkaTopicHeaderType(name,
+ String.format(INTERNAL_VALUE, matcher.group(1))));
+ }
+ else if (internalMatcher.reset(path).matches())
+ {
+ this.headers.add(new KafkaTopicHeaderType(name, path));
}
return this;
}
diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/config/KafkaTopicHeaderType.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/config/KafkaTopicHeaderType.java
index c103c219aa..62005946c8 100644
--- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/config/KafkaTopicHeaderType.java
+++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/config/KafkaTopicHeaderType.java
@@ -15,15 +15,13 @@
*/
package io.aklivity.zilla.runtime.binding.kafka.config;
-import io.aklivity.zilla.runtime.binding.kafka.internal.types.String32FW;
-
public class KafkaTopicHeaderType
{
- public final String32FW name;
+ public final String name;
public final String path;
public KafkaTopicHeaderType(
- String32FW name,
+ String name,
String path)
{
this.name = name;
diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCacheFile.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCacheFile.java
index b031aff963..874e903cf8 100644
--- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCacheFile.java
+++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCacheFile.java
@@ -142,6 +142,14 @@ public T readBytes(
return visitor.visit(mappedBuf, position, capacity);
}
+ public T readBytes(
+ int position,
+ int maxLimit,
+ Flyweight.Visitor visitor)
+ {
+ return visitor.visit(mappedBuf, position, maxLimit);
+ }
+
public int readInt(
int position)
{
diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartition.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartition.java
index 77240eccfa..70095e9abd 100644
--- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartition.java
+++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartition.java
@@ -35,6 +35,7 @@
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.cache.KafkaCacheEntryFW.FIELD_OFFSET_SEQUENCE;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.cache.KafkaCacheEntryFW.FIELD_OFFSET_TIMESTAMP;
import static java.nio.ByteBuffer.allocateDirect;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static org.agrona.BitUtil.SIZE_OF_INT;
@@ -122,6 +123,8 @@ public final class KafkaCachePartition
private final MutableDirectBuffer valueInfo = new UnsafeBuffer(new byte[Integer.BYTES]);
private final Varint32FW varintRO = new Varint32FW();
+ private final String32FW.Builder stringRW = new String32FW.Builder()
+ .wrap(new UnsafeBuffer(new byte[256]), 0, 256);;
private final Varint32FW.Builder varintRW = new Varint32FW.Builder().wrap(new UnsafeBuffer(new byte[5]), 0, 5);
private final Array32FW headersRO = new Array32FW(new KafkaHeaderFW());
private final Array32FW.Builder trailersRW =
@@ -148,6 +151,7 @@ public final class KafkaCachePartition
private KafkaCacheEntryFW ancestorEntry;
private final AtomicLong produceCapacity;
private final OctetsFW octetsRO = new OctetsFW();
+ private final KafkaKeyFW keyRO = new KafkaKeyFW();
public KafkaCachePartition(
Path location,
@@ -351,7 +355,6 @@ public void writeEntry(
KafkaKeyFW key,
ArrayFW headers,
OctetsFW value,
- KafkaCacheEntryFW ancestor,
int entryFlags,
KafkaDeltaType deltaType,
ConverterHandler convertKey,
@@ -359,10 +362,9 @@ public void writeEntry(
boolean verbose,
List headerTypes)
{
- final long keyHash = computeHash(key);
final int valueLength = value != null ? value.sizeof() : -1;
writeEntryStart(context, traceId, bindingId, offset, entryMark, valueMark, timestamp, producerId, key,
- keyHash, valueLength, ancestor, entryFlags, deltaType, value, convertKey, convertValue, verbose);
+ valueLength, null, entryFlags, deltaType, value, convertKey, convertValue, verbose);
writeEntryContinue(value);
writeEntryFinish(headers, deltaType, context, traceId, bindingId, FLAGS_COMPLETE, offset, entryMark, valueMark,
convertValue, verbose, headerTypes);
@@ -378,9 +380,8 @@ public void writeEntryStart(
long timestamp,
long producerId,
KafkaKeyFW key,
- long keyHash,
int valueLength,
- KafkaCacheEntryFW ancestor,
+ IntFunction findAncestor,
int entryFlags,
KafkaDeltaType deltaType,
OctetsFW payload,
@@ -408,16 +409,6 @@ public void writeEntryStart(
logFile.mark();
- final long ancestorOffset = ancestor != null ? ancestor.offset$() : NO_ANCESTOR_OFFSET;
- final int deltaPosition = deltaType == JSON_PATCH &&
- ancestor != null && ancestor.valueLen() != -1 &&
- valueLength != -1
- ? deltaFile.capacity()
- : NO_DELTA_POSITION;
-
- assert deltaPosition == NO_DELTA_POSITION || ancestor != null;
- this.ancestorEntry = ancestor;
-
int convertedPos = NO_CONVERTED_POSITION;
if (valueLength != -1 && convertValue != ConverterHandler.NONE)
{
@@ -438,21 +429,22 @@ public void writeEntryStart(
entryInfo.putLong(FIELD_OFFSET_OWNER_ID, producerId);
entryInfo.putLong(FIELD_OFFSET_ACKNOWLEDGE, NO_ACKNOWLEDGE);
entryInfo.putInt(FIELD_OFFSET_SEQUENCE, NO_SEQUENCE);
- entryInfo.putLong(FIELD_OFFSET_ANCESTOR, ancestorOffset);
+ entryInfo.putLong(FIELD_OFFSET_ANCESTOR, NO_ANCESTOR_OFFSET);
entryInfo.putLong(FIELD_OFFSET_DESCENDANT, NO_DESCENDANT_OFFSET);
entryInfo.putInt(FIELD_OFFSET_FLAGS, entryFlags);
entryInfo.putInt(FIELD_OFFSET_CONVERTED_POSITION, convertedPos);
- entryInfo.putInt(FIELD_OFFSET_DELTA_POSITION, deltaPosition);
+ entryInfo.putInt(FIELD_OFFSET_DELTA_POSITION, NO_DELTA_POSITION);
entryInfo.putShort(FIELD_OFFSET_ACK_MODE, KafkaAckMode.NONE.value());
logFile.appendBytes(entryInfo);
+ final int keyAt = logFile.capacity();
+
if (key.value() == null)
{
logFile.appendBytes(key);
}
else
{
- final int keyAt = logFile.capacity();
Varint32FW initLength = varintRW.set(0).build();
logFile.appendBytes(initLength);
@@ -463,8 +455,10 @@ public void writeEntryStart(
int keyShift = newLength.sizeof() - progress.sizeof();
if (keyShift > 0)
{
- logFile.readBytes(progress.limit(), octetsRO::wrap);
- logFile.writeBytes(newLength.limit(), octetsRO);
+ OctetsFW octets = logFile.readBytes(progress.limit(), progress.limit() + progress.value(), octetsRO::wrap);
+ logFile.writeBytes(newLength.limit(), octets);
+
+ logFile.advance(keyAt + newLength.limit());
}
logFile.writeBytes(keyAt, newLength);
logFile.appendBytes(buffer, index, length);
@@ -489,6 +483,23 @@ public void writeEntryStart(
valueMark.value = logFile.capacity();
+ final long keyHash = computeHash(logFile.readBytes(keyAt, keyRO::wrap));
+
+ final KafkaCacheEntryFW ancestor = findAncestor != null ? findAncestor.apply((int) keyHash) : null;
+
+ final long ancestorOffset = ancestor != null ? ancestor.offset$() : NO_ANCESTOR_OFFSET;
+ final int deltaPosition = deltaType == JSON_PATCH &&
+ ancestor != null && ancestor.valueLen() != -1 &&
+ valueLength != -1
+ ? deltaFile.capacity()
+ : NO_DELTA_POSITION;
+
+ logFile.writeLong(entryMark.value + FIELD_OFFSET_ANCESTOR, ancestorOffset);
+ logFile.writeInt(entryMark.value + FIELD_OFFSET_DELTA_POSITION, deltaPosition);
+
+ assert deltaPosition == NO_DELTA_POSITION || ancestor != null;
+ this.ancestorEntry = ancestor;
+
final long hashEntry = keyHash << 32 | logFile.markValue();
hashFile.appendLong(hashEntry);
@@ -548,7 +559,8 @@ public void writeEntryFinish(
final KafkaCacheFile indexFile = headSegment.indexFile();
final KafkaCacheFile convertedFile = headSegment.convertedFile();
- final int valueLength = logFile.capacity() - valueMark.value;
+ final int valueLength = logFile.readInt(valueMark.value - SIZE_OF_INT);
+ assert logFile.capacity() - valueMark.value == Math.max(valueLength, 0);
final int logAvailable = logFile.available();
final int logRequired = headers.sizeof();
@@ -594,7 +606,7 @@ else if (headerTypes != null && !headerTypes.isEmpty())
trailersRW.wrap(trailersRW.buffer(), 0, trailersRW.maxLimit());
for (KafkaTopicHeaderType header : headerTypes)
{
- String32FW name = header.name;
+ String32FW name = stringRW.set(header.name, UTF_8).build();
String path = header.path;
builder.item(h ->
{
diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaTopicConfigAdapter.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaTopicConfigAdapter.java
index 0602263d7f..d5095300bd 100644
--- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaTopicConfigAdapter.java
+++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaTopicConfigAdapter.java
@@ -83,7 +83,7 @@ public JsonObject adaptToJson(
JsonObjectBuilder headers = Json.createObjectBuilder();
for (KafkaTopicHeaderType header : topic.headers)
{
- headers.add(header.name.asString(), header.path);
+ headers.add(header.name, header.path);
}
object.add(HEADERS_NAME, headers);
}
diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerFetchFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerFetchFactory.java
index 1e867cda45..5980f829d3 100644
--- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerFetchFactory.java
+++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerFetchFactory.java
@@ -32,6 +32,7 @@
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.IntFunction;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
@@ -778,8 +779,8 @@ private void onServerFanoutReplyFlush(
}
partition.writeEntry(context, traceId, routedId, partitionOffset, entryMark, valueMark, 0L, producerId,
- EMPTY_KEY, EMPTY_HEADERS, EMPTY_OCTETS, null,
- entryFlags, KafkaDeltaType.NONE, convertKey, convertValue, verbose, headerTypes);
+ EMPTY_KEY, EMPTY_HEADERS, EMPTY_OCTETS,
+ entryFlags, KafkaDeltaType.NONE, convertKey, convertValue, verbose, headerTypes);
if (result == KafkaTransactionResult.ABORT)
{
@@ -878,11 +879,11 @@ private void onServerFanoutReplyData(
this.deleteId = doServerFanoutInitialSignalAt(deleteAt, traceId, SIGNAL_SEGMENT_DELETE);
}
+ IntFunction findAncestor =
+ kh -> findAndMarkAncestor(key, nextHead, kh, partitionOffset);
final int entryFlags = (flags & FLAGS_SKIP) != 0x00 ? CACHE_ENTRY_FLAGS_ABORTED : 0x00;
- final long keyHash = partition.computeKeyHash(key);
- final KafkaCacheEntryFW ancestor = findAndMarkAncestor(key, nextHead, (int) keyHash, partitionOffset);
partition.writeEntryStart(context, traceId, routedId, partitionOffset, entryMark, valueMark, timestamp,
- producerId, key, keyHash, valueLength, ancestor, entryFlags, deltaType, valueFragment, convertKey,
+ producerId, key, valueLength, findAncestor, entryFlags, deltaType, valueFragment, convertKey,
convertValue, verbose);
}
diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartitionTest.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartitionTest.java
index 0c3f8c9ea0..f71e5eb1f2 100644
--- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartitionTest.java
+++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartitionTest.java
@@ -229,14 +229,14 @@ public void shouldCleanSegment() throws Exception
KafkaCacheSegment head10s = head10.segment();
partition.writeEntry(null, 1L, 1L, 11L, entryMark, valueMark, 0L, -1L,
- key, headers, value, null, 0x00, KafkaDeltaType.NONE, ConverterHandler.NONE,
+ key, headers, value, 0x00, KafkaDeltaType.NONE, ConverterHandler.NONE,
ConverterHandler.NONE, false, null);
long keyHash = partition.computeKeyHash(key);
KafkaCacheEntryFW ancestor = head10.findAndMarkAncestor(key, keyHash, 11L, ancestorRO);
partition.writeEntry(null, 1L, 1L, 12L, entryMark, valueMark, 0L, -1L,
- key, headers, value, ancestor, 0x00, KafkaDeltaType.NONE, ConverterHandler.NONE,
+ key, headers, value, 0x00, KafkaDeltaType.NONE, ConverterHandler.NONE,
ConverterHandler.NONE, false, null);
Node head15 = partition.append(15L);
@@ -287,14 +287,14 @@ public void shouldSeekAncestor() throws Exception
Node head10 = partition.append(10L);
partition.writeEntry(null, 1L, 1L, 11L, entryMark, valueMark, 0L, -1L,
- key, headers, value, null, 0x00, KafkaDeltaType.NONE, ConverterHandler.NONE,
+ key, headers, value, 0x00, KafkaDeltaType.NONE, ConverterHandler.NONE,
ConverterHandler.NONE, false, null);
long keyHash = partition.computeKeyHash(key);
KafkaCacheEntryFW ancestor = head10.findAndMarkAncestor(key, keyHash, 11L, ancestorRO);
partition.writeEntry(null, 1L, 1L, 12L, entryMark, valueMark, 0L, -1L,
- key, headers, value, ancestor, 0x00, KafkaDeltaType.NONE, ConverterHandler.NONE,
+ key, headers, value, 0x00, KafkaDeltaType.NONE, ConverterHandler.NONE,
ConverterHandler.NONE, false, null);
Node head15 = partition.append(15L);
diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaOptionsConfigAdapterTest.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaOptionsConfigAdapterTest.java
index 268c0834fa..9b3130708f 100644
--- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaOptionsConfigAdapterTest.java
+++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaOptionsConfigAdapterTest.java
@@ -240,7 +240,7 @@ public void shouldReadHeadersOptions()
assertThat(options, not(nullValue()));
assertThat(options.bootstrap, equalTo(singletonList("test")));
- assertEquals(options.topics.get(0).headers.get(0).name.asString(), "correlation-id");
+ assertEquals(options.topics.get(0).headers.get(0).name, "correlation-id");
assertEquals(options.topics.get(0).headers.get(0).path, "$.correlationId");
}
diff --git a/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfig.java b/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/config/KarapaceOptionsConfig.java
similarity index 95%
rename from runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfig.java
rename to runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/config/KarapaceOptionsConfig.java
index b02609fbf0..fc54ff9106 100644
--- a/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfig.java
+++ b/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/config/KarapaceOptionsConfig.java
@@ -12,7 +12,7 @@
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package io.aklivity.zilla.runtime.catalog.karapace.internal.config;
+package io.aklivity.zilla.runtime.catalog.karapace.config;
import java.time.Duration;
import java.util.function.Function;
diff --git a/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfigBuilder.java b/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/config/KarapaceOptionsConfigBuilder.java
similarity index 96%
rename from runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfigBuilder.java
rename to runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/config/KarapaceOptionsConfigBuilder.java
index f212e79e74..bddfc95256 100644
--- a/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfigBuilder.java
+++ b/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/config/KarapaceOptionsConfigBuilder.java
@@ -12,7 +12,7 @@
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package io.aklivity.zilla.runtime.catalog.karapace.internal.config;
+package io.aklivity.zilla.runtime.catalog.karapace.config;
import java.time.Duration;
import java.util.function.Function;
diff --git a/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceCatalogContext.java b/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceCatalogContext.java
index 9290af9245..e292155871 100644
--- a/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceCatalogContext.java
+++ b/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceCatalogContext.java
@@ -16,7 +16,7 @@
import java.util.concurrent.ConcurrentMap;
-import io.aklivity.zilla.runtime.catalog.karapace.internal.config.KarapaceOptionsConfig;
+import io.aklivity.zilla.runtime.catalog.karapace.config.KarapaceOptionsConfig;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
diff --git a/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceCatalogHandler.java b/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceCatalogHandler.java
index fbdd09e8ec..fad0cc1d74 100644
--- a/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceCatalogHandler.java
+++ b/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceCatalogHandler.java
@@ -32,7 +32,7 @@
import org.agrona.collections.Int2ObjectCache;
import org.agrona.concurrent.UnsafeBuffer;
-import io.aklivity.zilla.runtime.catalog.karapace.internal.config.KarapaceOptionsConfig;
+import io.aklivity.zilla.runtime.catalog.karapace.config.KarapaceOptionsConfig;
import io.aklivity.zilla.runtime.catalog.karapace.internal.serializer.RegisterSchemaRequest;
import io.aklivity.zilla.runtime.catalog.karapace.internal.types.KarapacePrefixFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
diff --git a/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfigAdapter.java b/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfigAdapter.java
index 9d53cbdae1..d1b2589a78 100644
--- a/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfigAdapter.java
+++ b/runtime/catalog-karapace/src/main/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfigAdapter.java
@@ -21,6 +21,8 @@
import jakarta.json.JsonObjectBuilder;
import jakarta.json.bind.adapter.JsonbAdapter;
+import io.aklivity.zilla.runtime.catalog.karapace.config.KarapaceOptionsConfig;
+import io.aklivity.zilla.runtime.catalog.karapace.config.KarapaceOptionsConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi;
diff --git a/runtime/catalog-karapace/src/main/moditect/module-info.java b/runtime/catalog-karapace/src/main/moditect/module-info.java
index 2ada5786bb..cfa11a32b1 100644
--- a/runtime/catalog-karapace/src/main/moditect/module-info.java
+++ b/runtime/catalog-karapace/src/main/moditect/module-info.java
@@ -17,6 +17,8 @@
requires java.net.http;
requires io.aklivity.zilla.runtime.engine;
+ exports io.aklivity.zilla.runtime.catalog.karapace.config;
+
provides io.aklivity.zilla.runtime.engine.catalog.CatalogFactorySpi
with io.aklivity.zilla.runtime.catalog.karapace.internal.KarapaceCatalogFactorySpi;
diff --git a/runtime/catalog-karapace/src/test/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceIT.java b/runtime/catalog-karapace/src/test/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceIT.java
index 359a06a3ce..c8a8efa2ee 100644
--- a/runtime/catalog-karapace/src/test/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceIT.java
+++ b/runtime/catalog-karapace/src/test/java/io/aklivity/zilla/runtime/catalog/karapace/internal/KarapaceIT.java
@@ -32,7 +32,7 @@
import io.aklivity.k3po.runtime.junit.annotation.Specification;
import io.aklivity.k3po.runtime.junit.rules.K3poRule;
-import io.aklivity.zilla.runtime.catalog.karapace.internal.config.KarapaceOptionsConfig;
+import io.aklivity.zilla.runtime.catalog.karapace.config.KarapaceOptionsConfig;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
diff --git a/runtime/catalog-karapace/src/test/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfigAdapterTest.java b/runtime/catalog-karapace/src/test/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfigAdapterTest.java
index e9f79e2cfe..f088cbb93f 100644
--- a/runtime/catalog-karapace/src/test/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfigAdapterTest.java
+++ b/runtime/catalog-karapace/src/test/java/io/aklivity/zilla/runtime/catalog/karapace/internal/config/KarapaceOptionsConfigAdapterTest.java
@@ -28,6 +28,8 @@
import org.junit.Before;
import org.junit.Test;
+import io.aklivity.zilla.runtime.catalog.karapace.config.KarapaceOptionsConfig;
+
public class KarapaceOptionsConfigAdapterTest
{
private Jsonb jsonb;
diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java
index 946e714e4f..9f171c8e8b 100644
--- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java
+++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java
@@ -134,19 +134,26 @@ protected final boolean validate(
boolean status = false;
try
{
- GenericRecord record = supplyRecord(schemaId);
- in.wrap(buffer, index, length);
- GenericDatumReader reader = supplyReader(schemaId);
Schema schema = supplySchema(schemaId);
- if (reader != null)
+ switch (schema.getType())
{
- decoderFactory.binaryDecoder(in, decoder);
- reader.read(record, decoder);
+ case STRING:
status = true;
-
+ break;
+ case RECORD:
+ GenericRecord record = supplyRecord(schemaId);
+ in.wrap(buffer, index, length);
+ GenericDatumReader reader = supplyReader(schemaId);
+ if (reader != null)
+ {
+ decoderFactory.binaryDecoder(in, decoder);
+ reader.read(record, decoder);
+ status = true;
+ }
+ progress = index;
+ extractFields(buffer, index + length, schema);
+ break;
}
- progress = index;
- extractFields(buffer, length, schema);
}
catch (IOException | AvroRuntimeException ex)
{
@@ -157,12 +164,12 @@ protected final boolean validate(
protected void extractFields(
DirectBuffer buffer,
- int length,
+ int limit,
Schema schema)
{
for (Schema.Field field : schema.getFields())
{
- extract(field.schema(), buffer, length, extracted.get(field.name()));
+ extract(field.schema(), buffer, limit, extracted.get(field.name()));
}
}
@@ -251,11 +258,13 @@ private int calculatePadding(
if (schema != null)
{
- padding = 2;
+ padding = 10;
if (schema.getType().equals(Schema.Type.RECORD))
{
for (Schema.Field field : schema.getFields())
{
+ padding += field.name().getBytes().length;
+
switch (field.schema().getType())
{
case RECORD:
@@ -265,24 +274,24 @@ private int calculatePadding(
}
case UNION:
{
- padding += field.name().getBytes().length + JSON_FIELD_UNION_LENGTH;
+ padding += JSON_FIELD_UNION_LENGTH;
break;
}
case MAP:
{
- padding += field.name().getBytes().length + JSON_FIELD_MAP_LENGTH +
+ padding += JSON_FIELD_MAP_LENGTH +
calculatePadding(field.schema().getValueType());
break;
}
case ARRAY:
{
- padding += field.name().getBytes().length + JSON_FIELD_ARRAY_LENGTH +
+ padding += JSON_FIELD_ARRAY_LENGTH +
calculatePadding(field.schema().getElementType());
break;
}
default:
{
- padding += field.name().getBytes().length + JSON_FIELD_STRUCTURE_LENGTH;
+ padding += JSON_FIELD_STRUCTURE_LENGTH;
break;
}
}
diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java
index f33405e7cd..98244bea93 100644
--- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java
+++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java
@@ -196,7 +196,7 @@ record = reader.read(record, decoderFactory.binaryDecoder(in, decoder));
out.flush();
progress = index;
- extractFields(buffer, length, schema);
+ extractFields(buffer, index + length, schema);
}
}
catch (IOException | AvroRuntimeException ex)
diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java
index 2889dcf0f5..454eec8a55 100644
--- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java
+++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java
@@ -85,18 +85,27 @@ private int serializeJsonRecord(
try
{
Schema schema = supplySchema(schemaId);
- GenericDatumReader reader = supplyReader(schemaId);
- GenericDatumWriter writer = supplyWriter(schemaId);
- if (reader != null)
+
+ switch (schema.getType())
{
- GenericRecord record = supplyRecord(schemaId);
- in.wrap(buffer, index, length);
- expandable.wrap(expandable.buffer());
- record = reader.read(record, decoderFactory.jsonDecoder(schema, in));
- encoderFactory.binaryEncoder(expandable, encoder);
- writer.write(record, encoder);
- encoder.flush();
- next.accept(expandable.buffer(), 0, expandable.position());
+ case STRING:
+ next.accept(buffer, index, length);
+ break;
+ case RECORD:
+ GenericDatumReader reader = supplyReader(schemaId);
+ GenericDatumWriter writer = supplyWriter(schemaId);
+ if (reader != null)
+ {
+ GenericRecord record = supplyRecord(schemaId);
+ in.wrap(buffer, index, length);
+ expandable.wrap(expandable.buffer());
+ record = reader.read(record, decoderFactory.jsonDecoder(schema, in));
+ encoderFactory.binaryEncoder(expandable, encoder);
+ writer.write(record, encoder);
+ encoder.flush();
+ next.accept(expandable.buffer(), 0, expandable.position());
+ }
+ break;
}
}
catch (IOException | AvroRuntimeException ex)
diff --git a/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java b/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java
index 2ef491ecd5..685b4a7a20 100644
--- a/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java
+++ b/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java
@@ -300,7 +300,7 @@ public void shouldVerifyPaddingLength()
0x30, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65};
data.wrap(bytes, 0, bytes.length);
- assertEquals(260, converter.padding(data, 0, data.capacity()));
+ assertEquals(292, converter.padding(data, 0, data.capacity()));
}
@Test
diff --git a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/schema/asyncapi.schema.patch.json b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/schema/asyncapi.schema.patch.json
index eeb70d2a60..af181bceaf 100644
--- a/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/schema/asyncapi.schema.patch.json
+++ b/specs/binding-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/asyncapi/schema/asyncapi.schema.patch.json
@@ -154,6 +154,10 @@
"sasl":
{
"$ref": "#/$defs/options/binding/kafka/sasl"
+ },
+ "topics":
+ {
+ "$ref": "#/$defs/options/binding/kafka/cache_server_topics"
}
},
"additionalProperties": false
diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/schema/kafka.schema.patch.json b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/schema/kafka.schema.patch.json
index ca060534cd..ee79743099 100644
--- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/schema/kafka.schema.patch.json
+++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/schema/kafka.schema.patch.json
@@ -144,52 +144,7 @@
},
"topics":
{
- "title": "Topics",
- "type": "array",
- "items":
- {
- "type": "object",
- "additionalProperties": false,
- "properties":
- {
- "name":
- {
- "type": "string"
- },
- "defaultOffset":
- {
- "type": "string",
- "enum": [ "live", "historical" ]
- },
- "deltaType":
- {
- "type": "string",
- "enum": [ "none", "json_patch" ],
- "deprecated": true
- },
- "headers":
- {
- "type": "object",
- "patternProperties":
- {
- "^[a-zA-Z]+[a-zA-Z0-9\\._\\-]*$":
- {
- "type": "string",
- "pattern": "^\\$\\{message\\.value\\.([A-Za-z_][A-Za-z0-9_]*)\\}$"
- }
- },
- "additionalProperties": false
- },
- "key":
- {
- "$ref": "#/$defs/converter"
- },
- "value":
- {
- "$ref": "#/$defs/converter"
- }
- }
- }
+ "$ref": "#/$defs/options/binding/kafka/cache_server_topics"
}
},
"additionalProperties": false
@@ -353,5 +308,61 @@
]
}
}
+ },
+ {
+ "op": "add",
+ "path": "/$defs/options/binding/kafka/cache_server_topics",
+ "value":
+ {
+ "topics":
+ {
+ "title": "Topics",
+ "type": "array",
+ "items":
+ {
+ "type": "object",
+ "additionalProperties": false,
+ "properties":
+ {
+ "name":
+ {
+ "type": "string"
+ },
+ "defaultOffset":
+ {
+ "type": "string",
+ "enum": [ "live", "historical" ]
+ },
+ "deltaType":
+ {
+ "type": "string",
+ "enum": [ "none", "json_patch" ],
+ "deprecated": true
+ },
+ "headers":
+ {
+ "type": "object",
+ "patternProperties":
+ {
+ "^[a-zA-Z]+[a-zA-Z0-9\\._\\-]*$":
+ {
+ "type": "string",
+ "pattern": "^\\$\\{message\\.value\\.([A-Za-z_][A-Za-z0-9_]*)\\}$"
+ }
+ },
+ "additionalProperties": false
+ },
+ "key":
+ {
+ "$ref": "#/$defs/converter"
+ },
+ "value":
+ {
+ "$ref": "#/$defs/converter"
+ }
+ }
+ }
+ }
+ }
}
]