Skip to content

Commit

Permalink
- adds event stores testing
Browse files Browse the repository at this point in the history
- adds offset stores testing
- adds access to event store in tests
- adds access to offset store in tests
- adds access to cache store in tests
  • Loading branch information
Reef3rm4n committed Jul 4, 2023
1 parent ed7810f commit 516dda0
Show file tree
Hide file tree
Showing 15 changed files with 455 additions and 174 deletions.
2 changes: 1 addition & 1 deletion es4j-core/src/main/java/io/es4j/core/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private List<io.es4j.infrastructure.models.Event> applyCommandBehaviour(final Ag
}

public static <X extends Aggregate> ArrayList<io.es4j.infrastructure.models.Event> transformEvents(AggregateState<X> state, Command finalCommand, Event[] array) {
final var currentVersion = state.currentVersion() == null ? 0 : state.currentVersion();
final var currentVersion = Objects.requireNonNullElse(state.currentVersion(), 0L);
return new ArrayList<>(IntStream.range(1, array.length + 1)
.mapToObj(index -> {
final var ev = array[index - 1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class AggregateConfiguration {
private Integer snapshotThreshold = 500;
private Integer idempotencyThreshold = 50;

public Duration aggregateCacheTtlInMinutes() {
public Duration ttl() {
return cacheTtl;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ private CaffeineWrapper() {
public static synchronized void setUp(AggregateConfiguration aggregateConfiguration) {
if (Objects.isNull(CAFFEINE)) {
CAFFEINE = Caffeine.newBuilder()
.expireAfterAccess(aggregateConfiguration.aggregateCacheTtlInMinutes())
.expireAfterAccess(aggregateConfiguration.ttl())
.initialCapacity(500)
.evictionListener((key, value, reason) -> logger.info("Aggregate evicted from cache {}", new JsonObject().put("reason", reason).put("key", key).encodePrettily()))
.removalListener((key, value, reason) -> logger.info("Aggregate removed from cache {}", new JsonObject().put("reason", reason).put("key", key).encodePrettily()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,14 @@ public Uni<Void> stop() {

@Override
public Uni<Void> setup(Class<? extends Aggregate> aggregateClass, Vertx vertx, JsonObject configuration) {
LOGGER.debug("Migrating database for {} with configuration {}", aggregateClass.getSimpleName(), configuration);
final var schema = camelToKebab(aggregateClass.getSimpleName());
LOGGER.debug("Migrating postgres schema {} configuration {}", schema, configuration);
configuration.put("schema", schema);
return LiquibaseHandler.liquibaseString(
vertx,
configuration,
"pg-event-store.xml",
Map.of("schema", camelToKebab(aggregateClass.getSimpleName()))
Map.of("schema", schema)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ private static JournalOffset getJournalOffset(EventJournalOffSet offset) {

@Override
public Uni<Void> setup(Class<? extends Aggregate> aggregateClass, Vertx vertx, JsonObject configuration) {
LOGGER.debug("Migrating database for {} with configuration {}", aggregateClass.getSimpleName(), configuration);
final var schema = camelToKebab(aggregateClass.getSimpleName());
LOGGER.debug("Migrating postgres schema {} configuration {}", schema, configuration);
configuration.put("schema", schema);
return LiquibaseHandler.liquibaseString(
vertx,
configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</changeSet>
<changeSet id="event-journal-indexes" author="reeferman" context="vertx">
<addPrimaryKey tableName="event_store" columnNames="id" schemaName="${schema}"/>
<addUniqueConstraint tableName="event_store" columnNames="aggregate_id, event_version"
<addUniqueConstraint tableName="event_store" columnNames="aggregate_id, tenant, event_version"
schemaName="${schema}"/>
<createIndex tableName="event_store" indexName="index-2" schemaName="${schema}">
<column name="aggregate_id"/>
Expand Down
21 changes: 18 additions & 3 deletions es4j-test/src/main/java/io/es4j/Bootstrapper.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package io.es4j;

import io.es4j.infrastructure.AggregateCache;
import io.es4j.infrastructure.EventStore;
import io.es4j.infrastructure.Infrastructure;
import io.es4j.infrastructure.OffsetStore;
import io.es4j.infrastructure.cache.CaffeineAggregateCache;
import io.es4j.infrastructure.misc.Es4jServiceLoader;
import io.es4j.sql.misc.Constants;
import io.vertx.core.DeploymentOptions;
import io.es4j.infrastructure.proxy.AggregateEventBusPoxy;
Expand All @@ -17,10 +23,12 @@
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

import java.util.Optional;

import static io.es4j.core.CommandHandler.camelToKebab;


public class Bootstrapper<T extends Aggregate> {
class Bootstrapper<T extends Aggregate> {
public AggregateEventBusPoxy<T> eventBusPoxy;
public AggregateHttpClient<T> httpClient;
private static Network network = Network.newNetwork();
Expand All @@ -39,6 +47,9 @@ public class Bootstrapper<T extends Aggregate> {
public Integer HTTP_PORT = Integer.parseInt(System.getenv().getOrDefault("HTTP_PORT", "8080"));
public Class<T> aggregateClass;
private GenericContainer zookeeperContainer;
public AggregateCache cache;
public EventStore eventStore;
public OffsetStore offsetStore;

public Bootstrapper(
Class<T> aggregateClass
Expand All @@ -51,6 +62,12 @@ public void bootstrap() {
config = configuration(aggregateClass).put("schema", camelToKebab(aggregateClass.getSimpleName()));
if (Boolean.TRUE.equals(infrastructure())) {
deployPgContainer();
vertx.deployVerticle(Es4jMain::new, new DeploymentOptions().setInstances(1).setConfig(config)).await().indefinitely();
this.cache = new CaffeineAggregateCache();
this.eventStore = Es4jServiceLoader.loadEventStore();
eventStore.start(aggregateClass, vertx, config);
this.offsetStore = Es4jServiceLoader.loadOffsetStore();
offsetStore.start(aggregateClass, vertx, config);
}
this.httpClient = new AggregateHttpClient<>(
WebClient.create(vertx, new WebClientOptions()
Expand All @@ -59,12 +76,10 @@ public void bootstrap() {
),
aggregateClass
);
vertx.deployVerticle(Es4jMain::new, new DeploymentOptions().setInstances(1).setConfig(config)).await().indefinitely();
this.eventBusPoxy = new AggregateEventBusPoxy<>(vertx, aggregateClass);
}



public Bootstrapper<T> setRemoteHost(String host) {
this.HTTP_HOST = host;
return this;
Expand Down
14 changes: 13 additions & 1 deletion es4j-test/src/main/java/io/es4j/Es4jExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import io.es4j.config.DatabaseConfigurationCache;
import io.es4j.config.orm.ConfigurationKey;
import io.es4j.core.objects.AggregateState;
import io.es4j.infrastructure.AggregateCache;
import io.es4j.infrastructure.EventStore;
import io.es4j.infrastructure.OffsetStore;
import io.es4j.infrastructure.cache.CaffeineWrapper;
import io.es4j.infrastructure.config.FileConfigurationCache;
import io.es4j.infrastructure.models.AggregatePlainKey;
Expand Down Expand Up @@ -51,7 +54,10 @@ public void afterAll(ExtensionContext extensionContext) {
@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
return parameterContext.getParameter().getType() == AggregateEventBusPoxy.class
|| parameterContext.getParameter().getType() == AggregateHttpClient.class;
|| parameterContext.getParameter().getType() == AggregateHttpClient.class
|| parameterContext.getParameter().getType() == EventStore.class
|| parameterContext.getParameter().getType() == OffsetStore.class
|| parameterContext.getParameter().getType() == AggregateCache.class;
}

@Override
Expand All @@ -61,6 +67,12 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte
return bootstrapper.eventBusPoxy;
} else if (parameterContext.getParameter().getType().isAssignableFrom(AggregateHttpClient.class)) {
return bootstrapper.httpClient;
} else if (parameterContext.getParameter().getType().isAssignableFrom(EventStore.class)) {
return bootstrapper.eventStore;
} else if (parameterContext.getParameter().getType().isAssignableFrom(OffsetStore.class)) {
return bootstrapper.offsetStore;
} else if (parameterContext.getParameter().getType().isAssignableFrom(AggregateCache.class)) {
return bootstrapper.cache;
}
}
throw new IllegalStateException("Bootstrapper has not been initialized");
Expand Down
12 changes: 0 additions & 12 deletions es4j-test/src/main/java/io/es4j/InfrastructureTest.java

This file was deleted.

160 changes: 160 additions & 0 deletions es4j-test/src/test/java/io/es4j/infrastructure/EventStoreTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package io.es4j.infrastructure;

import io.es4j.domain.FakeAggregate;
import io.es4j.infra.pg.PgEventStore;
import io.es4j.infrastructure.models.AggregateEventStreamBuilder;
import io.es4j.infrastructure.models.AppendInstruction;
import io.es4j.infrastructure.models.Event;
import io.es4j.sql.misc.Constants;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.wait.strategy.Wait;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import java.util.stream.Stream;

class EventStoreTest {

public static final String TENANT_ID = "default";
private static PostgreSQLContainer POSTGRES_CONTAINER;
private static final Vertx vertx = Vertx.vertx();
private static final JsonObject CONFIGURATION = new JsonObject();

private static final Network NETWORK = Network.newNetwork();

@BeforeAll
static void setup() {
deployContainers();
}

@AfterAll
static void stop() {
destroyContainers();
}

@ParameterizedTest
@MethodSource("eventStores")
void append_ensure_uniqueness(EventStore eventStore) {
eventStore.setup(FakeAggregate.class, vertx, CONFIGURATION).await().indefinitely();
eventStore.start(FakeAggregate.class, vertx, CONFIGURATION);
// Define events and append instructions
final var aggregateId = UUID.randomUUID().toString();
final var goodAppend = createAppendInstruction(aggregateId);
final var conflictingAppend = createAppendInstruction(aggregateId);

// Append the first event
Assertions.assertDoesNotThrow(
() -> eventStore.append(goodAppend).await().indefinitely()
);
// Append the second event and expect an exception to be thrown
Assertions.assertThrows(
RuntimeException.class,
() -> eventStore.append(conflictingAppend).await().indefinitely());
}

@ParameterizedTest
@MethodSource("eventStores")
void append_and_fetch(EventStore eventStore) {
eventStore.setup(FakeAggregate.class, vertx, CONFIGURATION).await().indefinitely();
eventStore.start(FakeAggregate.class, vertx, CONFIGURATION);
// Define events and append instructions
final var aggregateId = UUID.randomUUID().toString();
int numberOfEvents = 100;
final var goodAppend = createAppendInstruction(aggregateId, numberOfEvents);
// Append the first event
Assertions.assertDoesNotThrow(
() -> eventStore.append(goodAppend).await().indefinitely()
);
// Append the second event and expect an exception to be thrown
final var events = Assertions.assertDoesNotThrow(
() -> eventStore.fetch(AggregateEventStreamBuilder.builder()
.aggregateId(aggregateId)
.tenantId(TENANT_ID)
.build()
).await().indefinitely()
);
Assertions.assertEquals(numberOfEvents, events.size());
}

@ParameterizedTest
@MethodSource("eventStores")
void append_and_stream(EventStore eventStore) {
eventStore.setup(FakeAggregate.class, vertx, CONFIGURATION).await().indefinitely();
eventStore.start(FakeAggregate.class, vertx, CONFIGURATION);
// Define events and append instructions
final var aggregateId = UUID.randomUUID().toString();
int numberOfEvents = 100;
final var goodAppend = createAppendInstruction(aggregateId, numberOfEvents);
// Append the first event
Assertions.assertDoesNotThrow(
() -> eventStore.append(goodAppend).await().indefinitely()
);
// Append the second event and expect an exception to be thrown
final var atomicInt = new AtomicInteger(0);
Assertions.assertDoesNotThrow(
() -> eventStore.stream(AggregateEventStreamBuilder.builder()
.aggregateId(aggregateId)
.tenantId(TENANT_ID)
.build(),
event -> atomicInt.incrementAndGet()
).await().indefinitely()
);
Assertions.assertEquals(numberOfEvents, atomicInt.get());
}

private static AppendInstruction<FakeAggregate> createAppendInstruction(String aggregateId, int numberOfEvents) {
Assertions.assertTrue(numberOfEvents > 0);
final var events = IntStream.range(0, numberOfEvents).mapToObj(i -> createEvent(aggregateId, (long) i)).toList();
return new AppendInstruction<>(
FakeAggregate.class, aggregateId, TENANT_ID, events);
}

private static AppendInstruction<FakeAggregate> createAppendInstruction(String aggregateId) {
return createAppendInstruction(aggregateId, 1);
}

private static Event createEvent(String aggregateId, Long version) {
return new Event(null, aggregateId, "test-event", version, new JsonObject(), TENANT_ID, "test-event", List.of(), 0);
}

// This method provides different implementations of EventStore
// to be used as parameters in the test
private static Stream<EventStore> eventStores() {
Stream<EventStore> eventStoreStream = Stream.of(
new PgEventStore()
// Add more implementations as needed
);
return eventStoreStream;
}


private static void deployContainers() {
POSTGRES_CONTAINER = new PostgreSQLContainer<>("postgres:latest")
.withNetwork(NETWORK)
.waitingFor(Wait.forListeningPort());
POSTGRES_CONTAINER.start();
CONFIGURATION.put(Constants.PG_HOST, POSTGRES_CONTAINER.getHost())
.put(Constants.PG_PORT, POSTGRES_CONTAINER.getFirstMappedPort())
.put(Constants.PG_USER, POSTGRES_CONTAINER.getUsername())
.put(Constants.PG_PASSWORD, POSTGRES_CONTAINER.getPassword())
.put(Constants.PG_DATABASE, POSTGRES_CONTAINER.getDatabaseName())
.put(Constants.JDBC_URL, POSTGRES_CONTAINER.getJdbcUrl());
}

private static void destroyContainers() {
vertx.closeAndAwait();
POSTGRES_CONTAINER.stop();
POSTGRES_CONTAINER.close();
}
}
Loading

0 comments on commit 516dda0

Please sign in to comment.