Skip to content

Commit

Permalink
Bug fixes and improvements to support asyncapi http, sse, and kafka i…
Browse files Browse the repository at this point in the history
…ntegration (#1124)

* Adjust padding to accommodate good enough headers and don't include  partial data frame while computing crc32c value

* filtering by structured value field(s)

* model-json extract support

* Test Converter update to support extracted header for ITs

* incorrect index fix

* incorrect index fix for number

* update to reuse matcher object

* support for Avro model and header format change

* support for Protobuf model

* Support other format data types in catalog

* Support kafka key validation

* Check if the binding exists

---------

Co-authored-by: Ankit Kumar <ankitjames25@gmail.com>
  • Loading branch information
akrambek and ankitk-me authored Jul 4, 2024
1 parent dc43223 commit 8a922d4
Show file tree
Hide file tree
Showing 41 changed files with 487 additions and 201 deletions.
6 changes: 6 additions & 0 deletions runtime/binding-asyncapi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>catalog-karapace</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>model-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiSchema;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiServer;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.binding.http.config.HttpAuthorizationConfig;
import io.aklivity.zilla.runtime.binding.http.config.HttpConditionConfig;
Expand Down Expand Up @@ -148,6 +147,7 @@ else if ("sse".equals(server.protocol()))
.exit("sse_server0")
.when(HttpConditionConfig::builder)
.header(":path", path)
.header(":method", "GET")
.build()
.build();
}
Expand Down Expand Up @@ -209,18 +209,13 @@ private <C> HttpRequestConfigBuilder<C> injectContent(
{
if (messages != null)
{
for (Map.Entry<String, AsyncapiMessage> messageEntry : messages.entrySet())
{
AsyncapiMessageView message =
AsyncapiMessageView.of(asyncapi.components.messages, messageEntry.getValue());
request.
request.
content(JsonModelConfig::builder)
.catalog()
.name(INLINE_CATALOG_NAME)
.inject(cataloged -> injectSchema(cataloged, asyncapi, message))
.build()
.build();
}
.catalog()
.name(INLINE_CATALOG_NAME)
.inject(cataloged -> injectValueSchemas(cataloged, asyncapi, messages))
.build()
.build();
}
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.config;

import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.MINIMIZE_QUOTES;
import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER;
import static io.aklivity.zilla.runtime.common.feature.FeatureFilter.featureEnabled;
import static java.util.stream.Collectors.toList;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.io.StringReader;
import java.io.StringWriter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.ToLongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -33,27 +34,25 @@
import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;
import jakarta.json.JsonValue;
import jakarta.json.JsonWriter;
import jakarta.json.bind.Jsonb;
import jakarta.json.bind.JsonbBuilder;

import org.agrona.collections.MutableInteger;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;

import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiServerConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiKafkaServerBindings;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiSchemaItem;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiServer;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiTrait;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiSchemaView;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.catalog.inline.config.InlineOptionsConfig;
import io.aklivity.zilla.runtime.catalog.inline.config.InlineSchemaConfigBuilder;
import io.aklivity.zilla.runtime.catalog.karapace.config.KarapaceOptionsConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
Expand All @@ -63,8 +62,9 @@

public abstract class AsyncapiNamespaceGenerator
{
protected static final String INLINE_CATALOG_NAME = "catalog0";
protected static final String CATALOG_NAME = "catalog0";
protected static final String INLINE_CATALOG_TYPE = "inline";
protected static final String KARAPACE_CATALOG_TYPE = "karapace";
protected static final String VERSION_LATEST = "latest";
protected static final AsyncapiOptionsConfig EMPTY_OPTION =
new AsyncapiOptionsConfig(null, null, null, null, null, null, null);
Expand Down Expand Up @@ -218,13 +218,33 @@ protected <C> NamespaceConfigBuilder<C> injectCatalog(
NamespaceConfigBuilder<C> namespace,
List<Asyncapi> asyncapis)
{
final boolean injectCatalog = asyncapis.stream()
.anyMatch(a -> a.components != null && a.components.schemas != null && !a.components.schemas.isEmpty());
if (injectCatalog)
Optional<AsyncapiServer> server = asyncapis.stream()
.filter(a -> a.servers.entrySet().stream().anyMatch(s -> s.getValue().bindings != null &&
s.getValue().bindings.kafka != null))
.map(s -> s.servers.values())
.flatMap(Collection::stream)
.filter(s -> s.bindings.kafka != null)
.findFirst();
final boolean injectCatalog = asyncapis.stream().anyMatch(AsyncapiNamespaceGenerator::hasSchemas);
if (server.isPresent())
{
AsyncapiKafkaServerBindings kafka = server.get().bindings.kafka;
namespace
.catalog()
.name(CATALOG_NAME)
.type(KARAPACE_CATALOG_TYPE)
.options(KarapaceOptionsConfig::builder)
.url(kafka.schemaRegistryUrl)
.context("default")
.maxAge(Duration.ofHours(1))
.build()
.build();
}
else if (injectCatalog)
{
namespace
.catalog()
.name(INLINE_CATALOG_NAME)
.name(CATALOG_NAME)
.type(INLINE_CATALOG_TYPE)
.options(InlineOptionsConfig::builder)
.subjects()
Expand All @@ -242,22 +262,18 @@ protected <C> InlineSchemaConfigBuilder<C> injectSubjects(
{
for (Asyncapi asyncapi : asyncapis)
{
if (asyncapi.components != null && asyncapi.components.schemas != null && !asyncapi.components.schemas.isEmpty())
if (hasSchemas(asyncapi))
{
try (Jsonb jsonb = JsonbBuilder.create())
{
YAMLMapper yaml = YAMLMapper.builder()
.disable(WRITE_DOC_START_MARKER)
.enable(MINIMIZE_QUOTES)
.build();
for (Map.Entry<String, AsyncapiSchemaItem> entry : asyncapi.components.schemas.entrySet())
{
AsyncapiSchemaView schema = AsyncapiSchemaView.of(asyncapi.components.schemas, entry.getValue());

subjects
.subject(entry.getKey())
.version(VERSION_LATEST)
.schema(writeSchemaYaml(jsonb, yaml, schema))
.schema(writeSchemaJson(jsonb, schema))
.build();
}
if (asyncapi.components.messageTraits != null)
Expand All @@ -268,7 +284,7 @@ protected <C> InlineSchemaConfigBuilder<C> injectSubjects(
subjects
.subject(k)
.version(VERSION_LATEST)
.schema(writeSchemaYaml(jsonb, yaml, v))
.schema(writeSchemaJson(jsonb, v))
.build());
}
}
Expand All @@ -282,39 +298,40 @@ protected <C> InlineSchemaConfigBuilder<C> injectSubjects(
return subjects;
}

protected static String writeSchemaYaml(
private static boolean hasSchemas(
Asyncapi asyncapi)
{
return asyncapi.components != null &&
asyncapi.components.schemas != null &&
!asyncapi.components.schemas.isEmpty();
}

protected static String writeSchemaJson(
Jsonb jsonb,
YAMLMapper yaml,
Object schema)
{
String result = null;
try
{
String schemaJson = jsonb.toJson(schema);
String schemaJson = jsonb.toJson(schema);

JsonReader reader = Json.createReader(new StringReader(schemaJson));
JsonObject jsonObject = reader.readObject();
JsonReader reader = Json.createReader(new StringReader(schemaJson));
JsonValue jsonValue = reader.readValue();

JsonObject modifiedJsonObject = jsonObject.getJsonObject("schema");
if (jsonValue instanceof JsonObject)
{
JsonObject jsonObject = (JsonObject) jsonValue;

if (modifiedJsonObject != null)
if (jsonObject.containsKey("schema"))
{
JsonValue modifiedJsonValue = jsonObject.get("schema");
StringWriter stringWriter = new StringWriter();
JsonWriter jsonWriter = Json.createWriter(stringWriter);
jsonWriter.writeObject(modifiedJsonObject);
jsonWriter.write(modifiedJsonValue);
jsonWriter.close();

schemaJson = stringWriter.toString();
}

JsonNode json = new ObjectMapper().readTree(schemaJson);
result = yaml.writeValueAsString(json);
}
catch (JsonProcessingException ex)
{
rethrowUnchecked(ex);
}
return result;

return schemaJson;
}

protected <C> BindingConfigBuilder<C> injectMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiSchemaView;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
Expand Down Expand Up @@ -116,7 +117,20 @@ public <C>BindingConfigBuilder<C> injectProtocolClientOptions(
return binding;
}

protected <C> CatalogedConfigBuilder<C> injectSchema(
protected <C> CatalogedConfigBuilder<C> injectKeySchema(
CatalogedConfigBuilder<C> cataloged,
Asyncapi asyncapi,
AsyncapiMessageView message)
{
String schema = AsyncapiSchemaView.of(asyncapi.components.schemas, message.key()).refKey();
cataloged.schema()
.version(VERSION_LATEST)
.subject(schema)
.build();
return cataloged;
}

protected <C> CatalogedConfigBuilder<C> injectValueSchema(
CatalogedConfigBuilder<C> cataloged,
Asyncapi asyncapi,
AsyncapiMessageView message)
Expand All @@ -129,6 +143,25 @@ protected <C> CatalogedConfigBuilder<C> injectSchema(
return cataloged;
}

protected <C> CatalogedConfigBuilder<C> injectValueSchemas(
CatalogedConfigBuilder<C> cataloged,
Asyncapi asyncapi,
Map<String, AsyncapiMessage> messages)
{
for (Map.Entry<String, AsyncapiMessage> messageEntry : messages.entrySet())
{
AsyncapiMessageView message =
AsyncapiMessageView.of(asyncapi.components.messages, messageEntry.getValue());
String schema = AsyncapiSchemaView.of(asyncapi.components.schemas, message.payload()).refKey();
cataloged.schema()
.version(VERSION_LATEST)
.subject(schema)
.build();
}

return cataloged;
}

protected ModelConfig injectModel(
Asyncapi asyncapi,
AsyncapiMessageView message)
Expand All @@ -145,25 +178,28 @@ else if (jsonContentType.reset(contentType).matches())
model = JsonModelConfig.builder()
.catalog()
.name(INLINE_CATALOG_NAME)
.inject(catalog -> injectSchema(catalog, asyncapi, message))
.inject(catalog -> injectValueSchema(catalog, asyncapi, message))
.build()
.build();
}
else if (avroContentType.reset(contentType).matches())
{
model = AvroModelConfig.builder()
.view("json")
.catalog()
.name(INLINE_CATALOG_NAME)
.inject(catalog -> injectSchema(catalog, asyncapi, message))
.inject(catalog -> injectValueSchema(catalog, asyncapi, message))
.build()
.build();
}
else if (protobufContentType.reset(contentType).matches())
{
model = ProtobufModelConfig.builder()
.view("json")
.catalog()
.name(INLINE_CATALOG_NAME)
.inject(catalog -> injectSchema(catalog, asyncapi, message))
.inject(catalog -> injectValueSchema(catalog, asyncapi, message))

.build()
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView;
import io.aklivity.zilla.runtime.binding.sse.config.SseConditionConfig;
import io.aklivity.zilla.runtime.binding.sse.config.SseOptionsConfig;
import io.aklivity.zilla.runtime.binding.sse.config.SseOptionsConfigBuilder;
Expand All @@ -35,6 +34,7 @@
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;
import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig;

@Incubating
public class AsyncapiSseProtocol extends AsyncapiProtocol
Expand Down Expand Up @@ -172,15 +172,12 @@ private <C> SsePathConfigBuilder<C> injectValue(
{
if (messages != null)
{
for (Map.Entry<String, AsyncapiMessage> messageEntry : messages.entrySet())
{
AsyncapiMessageView message =
AsyncapiMessageView.of(asyncapi.components.messages, messageEntry.getValue());
if (message.payload() != null)
{
request.content(injectModel(asyncapi, message));
}
}
request.content(JsonModelConfig::builder)
.catalog()
.name(INLINE_CATALOG_NAME)
.inject(cataloged -> injectValueSchemas(cataloged, asyncapi, messages))
.build()
.build();
}
return request;
}
Expand Down
Loading

0 comments on commit 8a922d4

Please sign in to comment.