Skip to content

Commit 367a724

Browse files
authored
Detect missing events in test exporter (#1128)
1 parent 137de5e commit 367a724

File tree

37 files changed

+501
-57
lines changed

37 files changed

+501
-57
lines changed

runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import static java.lang.System.currentTimeMillis;
3737
import static java.lang.ThreadLocal.withInitial;
3838
import static java.util.concurrent.TimeUnit.MILLISECONDS;
39+
import static java.util.concurrent.TimeUnit.SECONDS;
3940
import static org.agrona.CloseHelper.quietClose;
41+
import static org.agrona.LangUtil.rethrowUnchecked;
4042
import static org.agrona.concurrent.AgentRunner.startOnThread;
4143

4244
import java.net.InetAddress;
@@ -64,7 +66,6 @@
6466
import java.util.function.LongUnaryOperator;
6567
import java.util.function.Supplier;
6668

67-
import org.agrona.CloseHelper;
6869
import org.agrona.DeadlineTimerWheel;
6970
import org.agrona.DeadlineTimerWheel.TimerHandler;
7071
import org.agrona.DirectBuffer;
@@ -789,8 +790,15 @@ public void doStart()
789790

790791
public void doClose()
791792
{
792-
CloseHelper.close(runner);
793-
thread = null;
793+
try
794+
{
795+
Consumer<Thread> timeout = t -> rethrowUnchecked(new IllegalStateException("close timeout"));
796+
runner.close((int) SECONDS.toMillis(5L), timeout);
797+
}
798+
finally
799+
{
800+
thread = null;
801+
}
794802
}
795803

796804
@Override

runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/EngineConfigWatcher.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ private static WatchService newWatchService(
284284
{
285285
try
286286
{
287-
watcher = fileSystem.newWatchService();
287+
watcher = newWatchService(fileSystem);
288288
}
289289
catch (UnsupportedOperationException ex)
290290
{
@@ -302,4 +302,10 @@ private static WatchService newWatchService(
302302

303303
return watcher;
304304
}
305+
306+
private static WatchService newWatchService(
307+
FileSystem fileSystem) throws IOException
308+
{
309+
return fileSystem.newWatchService();
310+
}
305311
}

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/EngineRule.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -367,14 +367,14 @@ public void evaluate() throws Throwable
367367
}
368368
finally
369369
{
370-
if (!allowErrors)
371-
{
372-
assertEmpty(errors);
373-
}
374370
if (fs != null)
375371
{
376372
fs.close();
377373
}
374+
if (!allowErrors)
375+
{
376+
assertEmpty(errors);
377+
}
378378
}
379379
}
380380
}

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import io.aklivity.zilla.runtime.engine.config.RouteConfig;
3535
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
3636
import io.aklivity.zilla.runtime.engine.guard.GuardHandler;
37+
import io.aklivity.zilla.runtime.engine.model.ConverterHandler;
38+
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
3739
import io.aklivity.zilla.runtime.engine.namespace.NamespacedId;
3840
import io.aklivity.zilla.runtime.engine.test.internal.binding.config.TestBindingOptionsConfig;
3941
import io.aklivity.zilla.runtime.engine.test.internal.binding.config.TestBindingOptionsConfig.CatalogAssertion;
@@ -79,6 +81,7 @@ final class TestBindingFactory implements BindingHandler
7981
private final Long2LongHashMap router;
8082
private final TestEventContext event;
8183

84+
private ConverterHandler valueType;
8285
private List<CatalogHandler> catalogs;
8386
private SchemaConfig catalog;
8487
private List<CatalogAssertion> catalogAssertions;
@@ -111,6 +114,11 @@ public void attach(
111114
TestBindingOptionsConfig options = (TestBindingOptionsConfig) binding.options;
112115
if (options != null)
113116
{
117+
if (options.value != null)
118+
{
119+
this.valueType = context.supplyWriteConverter(options.value);
120+
}
121+
114122
if (options.cataloged != null)
115123
{
116124
this.catalog = options.cataloged.size() != 0 ? options.cataloged.get(0).schemas.get(0) : null;
@@ -119,18 +127,21 @@ public void attach(
119127
{
120128
int namespaceId = context.supplyTypeId(binding.namespace);
121129
int catalogId = context.supplyTypeId(catalog.name);
122-
catalogs.add(context.supplyCatalog(NamespacedId.id(namespaceId, catalogId)));
130+
final CatalogHandler handler = context.supplyCatalog(NamespacedId.id(namespaceId, catalogId));
131+
catalogs.add(handler);
123132
}
124133
this.catalogAssertions = options.catalogAssertions != null && !options.catalogAssertions.isEmpty() ?
125134
options.catalogAssertions.get(0).assertions : null;
126135
}
136+
127137
if (options.authorization != null)
128138
{
129139
int namespaceId = context.supplyTypeId(binding.namespace);
130140
int guardId = context.supplyTypeId(options.authorization.name);
131141
this.guard = context.supplyGuard(NamespacedId.id(namespaceId, guardId));
132142
this.credentials = options.authorization.credentials;
133143
}
144+
134145
this.events = options.events;
135146
}
136147
}
@@ -336,7 +347,16 @@ private void onInitialData(
336347

337348
initialSeq = sequence + reserved;
338349

339-
target.doInitialData(traceId, flags, reserved, payload);
350+
if (valueType != null &&
351+
valueType.convert(traceId, routedId, payload.buffer(), payload.offset(), payload.sizeof(),
352+
ValueConsumer.NOP) < 0)
353+
{
354+
target.doInitialAbort(traceId);
355+
}
356+
else
357+
{
358+
target.doInitialData(traceId, flags, reserved, payload);
359+
}
340360
}
341361

342362
private void onInitialEnd(

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
import java.util.function.Function;
2020

2121
import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
22+
import io.aklivity.zilla.runtime.engine.config.ModelConfig;
2223
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
2324

2425
public final class TestBindingOptionsConfig extends OptionsConfig
2526
{
27+
public final ModelConfig value;
2628
public final String mode;
2729
public final TestAuthorizationConfig authorization;
2830
public final List<CatalogedConfig> cataloged;
@@ -41,12 +43,15 @@ public static <T> TestBindingOptionsConfigBuilder<T> builder(
4143
}
4244

4345
TestBindingOptionsConfig(
46+
ModelConfig value,
4447
String mode,
4548
TestAuthorizationConfig authorization,
4649
List<CatalogedConfig> cataloged,
4750
List<Event> events,
4851
List<CatalogAssertions> catalogAssertions)
4952
{
53+
super(value != null ? List.of(value) : List.of(), List.of());
54+
this.value = value;
5055
this.mode = mode;
5156
this.authorization = authorization;
5257
this.cataloged = cataloged;

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfigAdapter.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import jakarta.json.JsonValue;
2727

2828
import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
29+
import io.aklivity.zilla.runtime.engine.config.ModelConfigAdapter;
2930
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
3031
import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi;
3132
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
@@ -35,6 +36,7 @@ public final class TestBindingOptionsConfigAdapter implements OptionsConfigAdapt
3536
{
3637
public static final String DEFAULT_ASSERTION_SCHEMA = new String();
3738

39+
private static final String VALUE_NAME = "value";
3840
private static final String MODE_NAME = "mode";
3941
private static final String CATALOG_NAME = "catalog";
4042
private static final String AUTHORIZATION_NAME = "authorization";
@@ -47,6 +49,8 @@ public final class TestBindingOptionsConfigAdapter implements OptionsConfigAdapt
4749
private static final String SCHEMA_NAME = "schema";
4850
private static final String DELAY_NAME = "delay";
4951

52+
private final ModelConfigAdapter model = new ModelConfigAdapter();
53+
5054
private final SchemaConfigAdapter schema = new SchemaConfigAdapter();
5155

5256
@Override
@@ -69,10 +73,16 @@ public JsonObject adaptToJson(
6973

7074
JsonObjectBuilder object = Json.createObjectBuilder();
7175

76+
if (testOptions.value != null)
77+
{
78+
object.add(VALUE_NAME, model.adaptToJson(testOptions.value));
79+
}
80+
7281
if (testOptions.mode != null)
7382
{
7483
object.add(MODE_NAME, testOptions.mode);
7584
}
85+
7686
if (testOptions.cataloged != null && !testOptions.cataloged.isEmpty())
7787
{
7888
JsonObjectBuilder catalogs = Json.createObjectBuilder();
@@ -87,6 +97,7 @@ public JsonObject adaptToJson(
8797
}
8898
object.add(CATALOG_NAME, catalogs);
8999
}
100+
90101
if (testOptions.catalogAssertions != null)
91102
{
92103
JsonObjectBuilder assertions = Json.createObjectBuilder();
@@ -107,6 +118,7 @@ public JsonObject adaptToJson(
107118
assertions.add(CATALOG_NAME, catalogAssertions);
108119
object.add(ASSERTIONS_NAME, assertions);
109120
}
121+
110122
if (testOptions.authorization != null)
111123
{
112124
JsonObjectBuilder credentials = Json.createObjectBuilder();
@@ -115,6 +127,7 @@ public JsonObject adaptToJson(
115127
authorization.add(testOptions.authorization.name, credentials);
116128
object.add(AUTHORIZATION_NAME, authorization);
117129
}
130+
118131
if (testOptions.events != null)
119132
{
120133
JsonArrayBuilder events = Json.createArrayBuilder();
@@ -139,10 +152,16 @@ public OptionsConfig adaptFromJson(
139152

140153
if (object != null)
141154
{
155+
if (object.containsKey(VALUE_NAME))
156+
{
157+
testOptions.value(model.adaptFromJson(object.get(VALUE_NAME)));
158+
}
159+
142160
if (object.containsKey(MODE_NAME))
143161
{
144162
testOptions.mode(object.getString(MODE_NAME));
145163
}
164+
146165
if (object.containsKey(CATALOG_NAME))
147166
{
148167
JsonObject catalogsJson = object.getJsonObject(CATALOG_NAME);
@@ -161,6 +180,7 @@ public OptionsConfig adaptFromJson(
161180
}
162181
testOptions.catalog(catalogs);
163182
}
183+
164184
if (object.containsKey(ASSERTIONS_NAME))
165185
{
166186
JsonObject assertionsJson = object.getJsonObject(ASSERTIONS_NAME);
@@ -184,6 +204,7 @@ public OptionsConfig adaptFromJson(
184204
}
185205
}
186206
}
207+
187208
if (object.containsKey(AUTHORIZATION_NAME))
188209
{
189210
JsonObject authorization = object.getJsonObject(AUTHORIZATION_NAME);
@@ -198,6 +219,7 @@ public OptionsConfig adaptFromJson(
198219
}
199220
}
200221
}
222+
201223
if (object.containsKey(EVENTS_NAME))
202224
{
203225
JsonArray events = object.getJsonArray(EVENTS_NAME);

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/config/TestBindingOptionsConfigBuilder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121

2222
import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
2323
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;
24+
import io.aklivity.zilla.runtime.engine.config.ModelConfig;
2425
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
2526

2627
public final class TestBindingOptionsConfigBuilder<T> extends ConfigBuilder<T, TestBindingOptionsConfigBuilder<T>>
2728
{
2829
private final Function<OptionsConfig, T> mapper;
2930

31+
private ModelConfig value;
3032
private String mode;
3133
private TestAuthorizationConfig authorization;
3234
private List<CatalogedConfig> catalogs;
@@ -46,6 +48,13 @@ protected Class<TestBindingOptionsConfigBuilder<T>> thisType()
4648
return (Class<TestBindingOptionsConfigBuilder<T>>) getClass();
4749
}
4850

51+
public TestBindingOptionsConfigBuilder<T> value(
52+
ModelConfig value)
53+
{
54+
this.value = value;
55+
return this;
56+
}
57+
4958
public TestBindingOptionsConfigBuilder<T> mode(
5059
String mode)
5160
{
@@ -95,6 +104,6 @@ public TestBindingOptionsConfigBuilder<T> catalogAssertions(
95104
@Override
96105
public T build()
97106
{
98-
return mapper.apply(new TestBindingOptionsConfig(mode, authorization, catalogs, events, catalogAssertions));
107+
return mapper.apply(new TestBindingOptionsConfig(value, mode, authorization, catalogs, events, catalogAssertions));
99108
}
100109
}

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/exporter/TestExporterHandler.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,19 @@ public int export()
5959
@Override
6060
public void stop()
6161
{
62+
try
63+
{
64+
// drain events
65+
while (options.events != null &&
66+
eventIndex < options.events.size())
67+
{
68+
readEvent.read(this::handleEvent, Integer.MAX_VALUE);
69+
}
70+
}
71+
catch (Exception ex)
72+
{
73+
assert options.events == null || eventIndex == options.events.size();
74+
}
6275
}
6376

6477
private void handleEvent(
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
RULE watcher service failed
2-
CLASS ^java.nio.file.FileSystem
3-
METHOD newWatchService
2+
CLASS io.aklivity.zilla.runtime.engine.internal.watcher.EngineConfigWatcher
3+
METHOD newWatchService(java.nio.file.FileSystem)
44
IF TRUE
55
DO throw new java.io.IOException("[failed]")
66
ENDRULE

0 commit comments

Comments
 (0)