Skip to content

Commit

Permalink
Support remote zilla configuration with change detection (#1071)
Browse files Browse the repository at this point in the history
* Remove BindingConfig.readLocation
* Remove GuardedConfig.readPath
* Rename ConfigAdapterContext.readLocation to ConfigAdapterContext.readResource
* Deprecate ConfigAdapterContext for removal
* Add EngineConfiguration.configURI() to resolve absolute file path
* Remove EngineConfiguration.configURL() but use to default EngineConfiguration.configURI()
* Simplify EngineManager
* Gather resources on NamespaceConfig from member configs
* Consolidate config and resource watcher as EngineConfigWatcher
* Resolve watched paths based on path filesystem provider scheme
* Configure HttpFileSystem poll internval duration via Map<String,?> env
* Simplify ReconfigureHttpIT scripts and include /zilla.yaml in request path
* HttpFileSystem per origin (root path) not per individual path
* Track HttpPath change count vs read count to implement simple caching
* No watch event needed for identical response body
* Handle status 204 with null body and infer delay for optional prefer wait

---------

Co-authored-by: John Fallows <john.r.fallows@gmail.com>
  • Loading branch information
attilakreiner and jfallows authored Jun 26, 2024
1 parent 89f2863 commit 230eec2
Show file tree
Hide file tree
Showing 120 changed files with 5,229 additions and 1,073 deletions.
6 changes: 6 additions & 0 deletions cloud/docker-image/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>filesystem-http</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>metrics-stream</artifactId>
Expand Down
1 change: 1 addition & 0 deletions cloud/docker-image/src/main/docker/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<include>io/aklivity/zilla/binding-*/**</include>
<include>io/aklivity/zilla/catalog-*/**</include>
<include>io/aklivity/zilla/exporter-*/**</include>
<include>io/aklivity/zilla/filesystem-*/**</include>
<include>io/aklivity/zilla/guard-*/**</include>
<include>io/aklivity/zilla/metrics-*/**</include>
<include>io/aklivity/zilla/model-*/**</include>
Expand Down
1 change: 1 addition & 0 deletions cloud/docker-image/src/main/docker/zpm.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"io.aklivity.zilla:exporter-otlp",
"io.aklivity.zilla:exporter-prometheus",
"io.aklivity.zilla:exporter-stdout",
"io.aklivity.zilla:filesystem-http",
"io.aklivity.zilla:guard-jwt",
"io.aklivity.zilla:metrics-stream",
"io.aklivity.zilla:metrics-http",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ private void attachProxyBinding(
namespaceGenerator.init(binding);
final List<String> labels = configs.stream().map(c -> c.apiLabel).collect(toList());
final NamespaceConfig composite = namespaceGenerator.generateProxy(binding, asyncapis, schemaIdsByApiId::get, labels);
composite.readURL = binding.readURL;
attach.accept(composite);
updateNamespace(configs, composite, new ArrayList<>(asyncapis.values()));
}
Expand Down Expand Up @@ -349,7 +348,6 @@ private void attachServerClientBinding(
namespaceConfig.servers.forEach(s -> s.setAsyncapiProtocol(
namespaceGenerator.resolveProtocol(s.protocol(), options, namespaceConfig.asyncapis, namespaceConfig.servers)));
final NamespaceConfig composite = namespaceGenerator.generate(binding, namespaceConfig);
composite.readURL = binding.readURL;
attach.accept(composite);
updateNamespace(namespaceConfig.configs, composite, namespaceConfig.asyncapis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package io.aklivity.zilla.runtime.binding.echo.internal.bench;

import java.net.InetAddress;
import java.net.URL;
import java.nio.channels.SelectableChannel;
import java.nio.file.Path;
import java.time.Clock;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -319,8 +319,8 @@ public ConverterHandler supplyWriteConverter(
}

@Override
public URL resolvePath(
String path)
public Path resolvePath(
String location)
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public final class GrpcOptionsConfigAdapter implements OptionsConfigAdapterSpi,

private final GrpcProtobufParser parser = new GrpcProtobufParser();

private Function<String, String> readURL;
private Function<String, String> readResource;

@Override
public Kind kind()
Expand Down Expand Up @@ -88,7 +88,7 @@ public OptionsConfig adaptFromJson(
public void adaptContext(
ConfigAdapterContext context)
{
this.readURL = context::readURL;
this.readResource = context::readResource;
}

private List<GrpcProtobufConfig> asListProtobufs(
Expand All @@ -103,7 +103,7 @@ private GrpcProtobufConfig asProtobuf(
JsonValue value)
{
final String location = ((JsonString) value).getString();
final String protobuf = readURL.apply(location);
final String protobuf = readResource.apply(location);

return parser.parse(location, protobuf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void initJson() throws IOException
{
content = new String(resource.readAllBytes(), UTF_8);
}
Mockito.doReturn(content).when(context).readURL("protobuf/echo.proto");
Mockito.doReturn(content).when(context).readResource("protobuf/echo.proto");
adapter = new OptionsConfigAdapter(OptionsConfigAdapterSpi.Kind.BINDING, context);
adapter.adaptType("grpc");
JsonbConfig config = new JsonbConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import io.aklivity.zilla.runtime.binding.http.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.http.internal.types.String8FW;
import io.aklivity.zilla.runtime.engine.config.ModelConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public final class HttpOptionsConfig extends OptionsConfig
Expand Down Expand Up @@ -55,41 +56,46 @@ public static <T> HttpOptionsConfigBuilder<T> builder(
HttpAuthorizationConfig authorization,
List<HttpRequestConfig> requests)
{
super(requests != null && !requests.isEmpty()
? requests.stream()
.flatMap(request -> Stream.concat(
Stream.of(request.content),
Stream.concat(
request.headers != null
? request.headers.stream().flatMap(header -> Stream.of(header != null ? header.model : null))
: Stream.empty(),
Stream.concat(
request.pathParams != null
? request.pathParams.stream().flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty(),
Stream.concat(
request.queryParams != null
? request.queryParams.stream().flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty(),
Stream.concat(request.responses != null
? request.responses.stream().flatMap(param -> Stream.of(param != null
? param.content
: null))
: Stream.empty(), request.responses != null
? request.responses.stream()
.flatMap(response -> response.headers != null
? response.headers.stream()
.flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty())
: Stream.empty())
)))).filter(Objects::nonNull))
.collect(Collectors.toList())
: emptyList());

super(resolveModels(requests), List.of());
this.versions = versions;
this.overrides = overrides;
this.access = access;
this.authorization = authorization;
this.requests = requests;
}

private static List<ModelConfig> resolveModels(
List<HttpRequestConfig> requests)
{
return requests != null && !requests.isEmpty()
? requests.stream()
.flatMap(request -> Stream.concat(
Stream.of(request.content),
Stream.concat(
request.headers != null
? request.headers.stream().flatMap(header -> Stream.of(header != null ? header.model : null))
: Stream.empty(),
Stream.concat(
request.pathParams != null
? request.pathParams.stream().flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty(),
Stream.concat(
request.queryParams != null
? request.queryParams.stream().flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty(),
Stream.concat(request.responses != null
? request.responses.stream().flatMap(param -> Stream.of(param != null
? param.content
: null))
: Stream.empty(), request.responses != null
? request.responses.stream()
.flatMap(response -> response.headers != null
? response.headers.stream()
.flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty())
: Stream.empty())
)))).filter(Objects::nonNull))
.collect(Collectors.toList())
: emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Function;
import java.util.stream.Stream;

import io.aklivity.zilla.runtime.engine.config.ModelConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public final class KafkaOptionsConfig extends OptionsConfig
Expand All @@ -49,15 +50,21 @@ public static <T> KafkaOptionsConfigBuilder<T> builder(
List<KafkaServerConfig> servers,
KafkaSaslConfig sasl)
{
super(topics != null && !topics.isEmpty()
? topics.stream()
.flatMap(t -> Stream.of(t.key, t.value))
.filter(Objects::nonNull)
.collect(toList())
: emptyList());
super(resolveModels(topics), List.of());
this.bootstrap = bootstrap;
this.topics = topics;
this.servers = servers;
this.sasl = sasl;
}

private static List<ModelConfig> resolveModels(
List<KafkaTopicConfig> topics)
{
return topics != null && !topics.isEmpty()
? topics.stream()
.flatMap(t -> Stream.of(t.key, t.value))
.filter(Objects::nonNull)
.collect(toList())
: emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.stream.Stream;

import io.aklivity.zilla.runtime.binding.mqtt.internal.config.MqttVersion;
import io.aklivity.zilla.runtime.engine.config.ModelConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public class MqttOptionsConfig extends OptionsConfig
Expand All @@ -50,18 +51,24 @@ public MqttOptionsConfig(
List<MqttTopicConfig> topics,
List<MqttVersion> versions)
{
super(topics != null && !topics.isEmpty()
super(resolveModels(topics), List.of());
this.authorization = authorization;
this.topics = topics;
this.versions = versions;
}

private static List<ModelConfig> resolveModels(
List<MqttTopicConfig> topics)
{
return topics != null && !topics.isEmpty()
? topics.stream()
.flatMap(topic -> Stream.concat(
Stream.of(topic.content),
Stream.of(topic.content),
Optional.ofNullable(topic.userProperties).orElseGet(Collections::emptyList).stream()
.flatMap(p -> Stream.of(p.value))
.filter(Objects::nonNull))
.flatMap(p -> Stream.of(p.value))
.filter(Objects::nonNull))
.filter(Objects::nonNull))
.collect(Collectors.toList())
: emptyList());
this.authorization = authorization;
this.topics = topics;
this.versions = versions;
: emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ public void attach(
Object2ObjectHashMap::new));

this.composite = namespaceGenerator.generate(binding, openapis, asyncapis, openapiSchemaIdsByApiId::get);
this.composite.readURL = binding.readURL;
attach.accept(this.composite);

BindingConfig mappingBinding = composite.bindings.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ public void attach(
for (OpenapiNamespaceConfig namespaceConfig : namespaceConfigs.values())
{
final NamespaceConfig composite = namespaceGenerator.generate(binding, namespaceConfig);
composite.readURL = binding.readURL;
attach.accept(composite);
namespaceConfig.configs.forEach(c ->
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.aklivity.zilla.runtime.engine.config.ModelConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public final class SseOptionsConfig extends OptionsConfig
Expand All @@ -46,14 +47,20 @@ public static <T> SseOptionsConfigBuilder<T> builder(
int retry,
List<SseRequestConfig> requests)
{
super(requests != null && !requests.isEmpty()
super(resolveModels(requests), List.of());
this.retry = retry;
this.requests = requests;
}

private static List<ModelConfig> resolveModels(
List<SseRequestConfig> requests)
{
return requests != null && !requests.isEmpty()
? requests.stream()
.flatMap(path ->
Stream.of(path.content)
.filter(Objects::nonNull))
.collect(Collectors.toList())
: emptyList());
this.retry = retry;
this.requests = requests;
: emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
import static io.aklivity.zilla.runtime.engine.internal.stream.StreamId.isInitial;
import static java.lang.ThreadLocal.withInitial;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.channels.SelectableChannel;
import java.nio.file.Path;
import java.time.Clock;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -85,7 +83,7 @@ public class TlsWorker implements EngineContext
private final BindingFactory factory;
private final VaultFactory vaultFactory;
private final Configuration config;
private final URL configURL;
private final Path configPath;

private final TlsSignaler signaler;

Expand All @@ -105,7 +103,7 @@ public TlsWorker(
.readonly(false)
.build()
.bufferPool();
this.configURL = config.configURL();
this.configPath = Path.of(config.configURI());

this.signaler = new TlsSignaler();

Expand Down Expand Up @@ -387,19 +385,10 @@ public ConverterHandler supplyWriteConverter(
}

@Override
public URL resolvePath(
String path)
public Path resolvePath(
String location)
{
URL resolved = null;
try
{
resolved = new URL(configURL, path);
}
catch (MalformedURLException ex)
{
rethrowUnchecked(ex);
}
return resolved;
return configPath.resolveSibling(location);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
package io.aklivity.zilla.runtime.catalog.filesystem.internal;

import java.io.InputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -34,7 +35,7 @@ public class FilesystemCatalogHandler implements CatalogHandler
private final CRC32C crc32c;
private final FilesystemEventContext event;
private final long catalogId;
private final Function<String, URL> resolvePath;
private final Function<String, Path> resolvePath;

public FilesystemCatalogHandler(
FilesystemOptionsConfig config,
Expand Down Expand Up @@ -72,8 +73,8 @@ private void registerSchema(
{
try
{
URL storeURL = resolvePath.apply(config.path);
try (InputStream input = storeURL.openStream())
Path storePath = resolvePath.apply(config.path);
try (InputStream input = Files.newInputStream(storePath))
{
String schema = new String(input.readAllBytes());
int schemaId = generateCRC32C(schema);
Expand Down
Loading

0 comments on commit 230eec2

Please sign in to comment.