Skip to content

Commit

Permalink
WIP readPath resolvePath readLocation
Browse files Browse the repository at this point in the history
  • Loading branch information
attilakreiner committed Jun 14, 2024
1 parent b6c2ff2 commit 2da3603
Show file tree
Hide file tree
Showing 31 changed files with 359 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void attachProxyBinding(
namespaceGenerator.init(binding);
final List<String> labels = configs.stream().map(c -> c.apiLabel).collect(toList());
final NamespaceConfig composite = namespaceGenerator.generateProxy(binding, asyncapis, schemaIdsByApiId::get, labels);
composite.readURL = binding.readURL;
composite.readLocation = binding.readLocation;
attach.accept(composite);
updateNamespace(configs, composite, new ArrayList<>(asyncapis.values()));
}
Expand Down Expand Up @@ -256,7 +256,7 @@ private void attachServerClientBinding(
namespaceConfig.servers.forEach(s -> s.setAsyncapiProtocol(
namespaceGenerator.resolveProtocol(s.protocol(), options, namespaceConfig.asyncapis, namespaceConfig.servers)));
final NamespaceConfig composite = namespaceGenerator.generate(binding, namespaceConfig);
composite.readURL = binding.readURL;
composite.readLocation = binding.readLocation;
attach.accept(composite);
updateNamespace(namespaceConfig.configs, composite, namespaceConfig.asyncapis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package io.aklivity.zilla.runtime.binding.echo.internal.bench;

import java.net.InetAddress;
import java.net.URL;
import java.nio.channels.SelectableChannel;
import java.nio.file.Path;
import java.time.Clock;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -319,8 +319,8 @@ public ConverterHandler supplyWriteConverter(
}

@Override
public URL resolvePath(
String path)
public Path resolvePath(
String location)
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public final class GrpcOptionsConfigAdapter implements OptionsConfigAdapterSpi,

private final GrpcProtobufParser parser = new GrpcProtobufParser();

private Function<String, String> readURL;
private Function<String, String> readLocation;

@Override
public Kind kind()
Expand Down Expand Up @@ -88,7 +88,7 @@ public OptionsConfig adaptFromJson(
public void adaptContext(
ConfigAdapterContext context)
{
this.readURL = context::readURL;
this.readLocation = context::readLocation;
}

private List<GrpcProtobufConfig> asListProtobufs(
Expand All @@ -103,7 +103,7 @@ private GrpcProtobufConfig asProtobuf(
JsonValue value)
{
final String location = ((JsonString) value).getString();
final String protobuf = readURL.apply(location);
final String protobuf = readLocation.apply(location);

return parser.parse(location, protobuf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void initJson() throws IOException
{
content = new String(resource.readAllBytes(), UTF_8);
}
Mockito.doReturn(content).when(context).readURL("protobuf/echo.proto");
Mockito.doReturn(content).when(context).readLocation("protobuf/echo.proto");
adapter = new OptionsConfigAdapter(OptionsConfigAdapterSpi.Kind.BINDING, context);
adapter.adaptType("grpc");
JsonbConfig config = new JsonbConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void attach(
Object2ObjectHashMap::new));

this.composite = namespaceGenerator.generate(binding, openapis, asyncapis, openapiSchemaIdsByApiId::get);
this.composite.readURL = binding.readURL;
this.composite.readLocation = binding.readLocation;
attach.accept(this.composite);

BindingConfig mappingBinding = composite.bindings.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void attach(
for (OpenapiNamespaceConfig namespaceConfig : namespaceConfigs.values())
{
final NamespaceConfig composite = namespaceGenerator.generate(binding, namespaceConfig);
composite.readURL = binding.readURL;
composite.readLocation = binding.readLocation;
attach.accept(composite);
namespaceConfig.configs.forEach(c ->
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
import static io.aklivity.zilla.runtime.engine.internal.stream.StreamId.isInitial;
import static java.lang.ThreadLocal.withInitial;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.channels.SelectableChannel;
import java.nio.file.Path;
import java.time.Clock;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -85,7 +83,7 @@ public class TlsWorker implements EngineContext
private final BindingFactory factory;
private final VaultFactory vaultFactory;
private final Configuration config;
private final URL configURL;
private final Path configPath;

private final TlsSignaler signaler;

Expand All @@ -105,7 +103,7 @@ public TlsWorker(
.readonly(false)
.build()
.bufferPool();
this.configURL = config.configURL();
this.configPath = config.configPath();

this.signaler = new TlsSignaler();

Expand Down Expand Up @@ -387,19 +385,10 @@ public ConverterHandler supplyWriteConverter(
}

@Override
public URL resolvePath(
String path)
public Path resolvePath(
String location)
{
URL resolved = null;
try
{
resolved = new URL(configURL, path);
}
catch (MalformedURLException ex)
{
rethrowUnchecked(ex);
}
return resolved;
return configPath.resolveSibling(location);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
package io.aklivity.zilla.runtime.catalog.filesystem.internal;

import java.io.InputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -34,7 +35,7 @@ public class FilesystemCatalogHandler implements CatalogHandler
private final CRC32C crc32c;
private final FilesystemEventContext event;
private final long catalogId;
private final Function<String, URL> resolvePath;
private final Function<String, Path> resolvePath;

public FilesystemCatalogHandler(
FilesystemOptionsConfig config,
Expand Down Expand Up @@ -72,8 +73,8 @@ private void registerSchema(
{
try
{
URL storeURL = resolvePath.apply(config.path);
try (InputStream input = storeURL.openStream())
Path storePath = resolvePath.apply(config.path);
try (InputStream input = Files.newInputStream(storePath))
{
String schema = new String(input.readAllBytes());
int schemaId = generateCRC32C(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;

import java.net.URL;
import java.nio.file.Path;

import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -38,7 +39,7 @@
public class FilesystemCatalogFactoryTest
{
@Test
public void shouldLoadAndCreate()
public void shouldLoadAndCreate() throws Exception
{
Configuration config = new Configuration();
CatalogFactory factory = CatalogFactory.instantiate();
Expand All @@ -50,7 +51,8 @@ public void shouldLoadAndCreate()
EngineContext engineContext = mock(EngineContext.class);
URL url = FilesystemCatalogFactoryTest.class
.getResource("../../../../specs/catalog/filesystem/config/asyncapi/mqtt.yaml");
Mockito.doReturn(url).when(engineContext).resolvePath("asyncapi/mqtt.yaml");
Path path = Path.of(url.toURI());
Mockito.doReturn(path).when(engineContext).resolvePath("asyncapi/mqtt.yaml");

CatalogContext context = catalog.supply(engineContext);
assertThat(context, instanceOf(FilesystemCatalogContext.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.mock;

import java.net.URL;
import java.nio.file.Path;

import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
Expand All @@ -41,13 +42,14 @@ public class FilesystemIT
private EngineContext context = mock(EngineContext.class);

@Before
public void setup()
public void setup() throws Exception
{
config = new FilesystemOptionsConfig(singletonList(
new FilesystemSchemaConfig("subject1", "asyncapi/mqtt.yaml")));

URL url = FilesystemIT.class.getResource("../../../../specs/catalog/filesystem/config/asyncapi/mqtt.yaml");
Mockito.doReturn(url).when(context).resolvePath("asyncapi/mqtt.yaml");
Path path = Path.of(url.toURI());
Mockito.doReturn(path).when(context).resolvePath("asyncapi/mqtt.yaml");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
package io.aklivity.zilla.runtime.engine;

import static io.aklivity.zilla.runtime.engine.internal.layouts.metrics.HistogramsLayout.BUCKETS;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.stream.Collectors.toList;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -80,7 +79,7 @@ public final class Engine implements Collector, AutoCloseable
private final AtomicInteger nextTaskId;
private final ThreadFactory factory;

private final URL configURL;
private final Path configPath;
private final List<EngineWorker> workers;
private final boolean readonly;
private final EngineConfiguration config;
Expand Down Expand Up @@ -149,7 +148,7 @@ public final class Engine implements Collector, AutoCloseable
for (int workerIndex = 0; workerIndex < workerCount; workerIndex++)
{
EngineWorker worker =
new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity, bindings, exporters,
new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity, this::resolvePath, bindings, exporters,
guards, vaults, catalogs, models, metricGroups, this, this::supplyEventReader,
eventFormatterFactory, workerIndex, readonly, this::process);
workers.add(worker);
Expand Down Expand Up @@ -178,7 +177,7 @@ public final class Engine implements Collector, AutoCloseable
final Map<String, Guard> guardsByType = guards.stream()
.collect(Collectors.toMap(g -> g.name(), g -> g));

this.configURL = config.configURL();
this.configPath = config.configPath();
EngineManager manager = new EngineManager(
schemaTypes,
bindingsByType::get,
Expand All @@ -192,8 +191,9 @@ public final class Engine implements Collector, AutoCloseable
context,
config,
extensions,
this.configURL,
this::readURL);
this.configPath,
this::readPath,
this::readLocation);

this.bindings = bindings;
this.tasks = tasks;
Expand Down Expand Up @@ -285,18 +285,13 @@ public static EngineBuilder builder()
return new EngineBuilder();
}

private String readURL(
String location)
private String readPath(
Path path)
{
String result;
try
{
URL url = new URL(configURL, location);
URLConnection connection = url.openConnection();
try (InputStream input = connection.getInputStream())
{
result = new String(input.readAllBytes(), UTF_8);
}
result = Files.readString(path);
}
catch (Exception ex)
{
Expand All @@ -305,6 +300,18 @@ private String readURL(
return result;
}

public Path resolvePath(
String location)
{
return configPath.resolveSibling(location);
}

private String readLocation(
String location)
{
return readPath(resolvePath(location));
}

private Thread newTaskThread(
Runnable r)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
Expand Down Expand Up @@ -146,6 +147,34 @@ public URL configURL()
return ENGINE_CONFIG_URL.get(this);
}

public Path configPath()
{
Path configPath = null;
try
{
URI configUri = configURL().toURI();
if ("file".equals(configUri.getScheme()) && !Path.of(configUri.getSchemeSpecificPart()).isAbsolute())
{
// this works for relative file e.g. file:zilla.yaml
Path basePath = Path.of("").toAbsolutePath();
configPath = basePath.resolve(configUri.getSchemeSpecificPart());
}
else
{
// this works for absolute file e.g. file:/path/dir/zilla.yaml
// this works for http e.g. http://localhost:7115/zilla.yaml
// this works for jar e.g. jar:file:/path/engine.jar!/package/zilla.yaml
// (the jar filesystem is opened and closed by EngineRule)
configPath = Path.of(configUri);
}
}
catch (Exception ex)
{
rethrowUnchecked(ex);
}
return configPath;
}

public int configPollIntervalSeconds()
{
return ENGINE_CONFIG_POLL_INTERVAL_SECONDS.getAsInt(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package io.aklivity.zilla.runtime.engine;

import java.net.InetAddress;
import java.net.URL;
import java.nio.channels.SelectableChannel;
import java.nio.file.Path;
import java.time.Clock;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -157,8 +157,8 @@ ConverterHandler supplyReadConverter(
ConverterHandler supplyWriteConverter(
ModelConfig config);

URL resolvePath(
String path);
Path resolvePath(
String location);

Metric resolveMetric(
String name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class BindingConfig
public transient long id;
public transient long entryId;
public transient ToLongFunction<String> resolveId;
public transient Function<String, String> readURL;
public transient Function<String, String> readLocation;

public transient long vaultId;
public transient String qvault;
Expand Down
Loading

0 comments on commit 2da3603

Please sign in to comment.