Skip to content

Commit

Permalink
- adds first draft for saga
Browse files Browse the repository at this point in the history
- adds frist draft for cdc
  • Loading branch information
Reef3rm4n committed Sep 13, 2023
1 parent a036bee commit 07420d9
Show file tree
Hide file tree
Showing 46 changed files with 1,370 additions and 21 deletions.
10 changes: 5 additions & 5 deletions es4j-core/src/main/java/io/es4j/Aggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ default int schemaVersion() {
}

/**
* Transforms the event based on the specified schema version.
* Migrates the event based on the specified schema version.
* The default implementation of this method throws an UnknownEvent exception,
* indicating that the event version transformation is not supported.
*
Expand All @@ -45,13 +45,13 @@ default int schemaVersion() {
* @return The transformed event, if the transformation is supported.
* @throws UnknownEvent if the event version transformation is not supported.
*/
default E transformFrom(int schemaVersion, JsonObject event) {
default E migrate(int schemaVersion, JsonObject event) {
throw new UnknownEvent(new Es4jError(
ErrorSource.LOGIC,
Aggregator.class.getName(),
"missing schema versionTo " + schemaVersion,
"could not transform event %s to schema version %d".formatted(event, schemaVersion),
"aggregate.event.transform",
"Unable to migrate event schema",
"Unable not migrate event %s to schema version %d".formatted(event, schemaVersion),
"aggregate.event.migration",
500
)
);
Expand Down
6 changes: 3 additions & 3 deletions es4j-core/src/main/java/io/es4j/AsyncProjection.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@
import java.util.Optional;

/**
* The PollingEventProjection interface defines the structure for event projections
* The AsyncProjection interface defines the structure for event projections
* that use a polling strategy.
*/
public interface AsyncProjection {

/**
* Apply the list of AggregateEvent objects to the projection.
* Apply the list of event objects to the projection.
*
* @param events the list of AggregateEvent objects to apply
* @return a Uni representing the completion of the operation
*/
Uni<Void> apply(List<AggregateEvent> events);

/**
* Optional filter for the event journal.
* Optional filter to be applied to the event journal.
*
* @return an Optional containing the EventJournalFilter, if one is defined
*/
Expand Down
5 changes: 4 additions & 1 deletion es4j-core/src/main/java/io/es4j/core/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ private Uni<JsonObject> replay(LoadAggregate loadAggregate) {
}

public Uni<JsonObject> process(Command command) {
// todo should not return aggregate state but instead void
// todo add fire and forget command
// todo add rejected command store
if (command instanceof LoadAggregate loadAggregate) {
return replay(loadAggregate);
}
Expand Down Expand Up @@ -101,7 +104,7 @@ private T aggregateEvent(T aggregateState, final Event event, Integer eventSchem
LOGGER.debug("Applying {} schema versionTo {} ", aggregator.delegate().getClass().getSimpleName(), aggregator.delegate().schemaVersion());
if (aggregator.delegate().schemaVersion() != eventSchemaVersion) {
LOGGER.debug("Schema versionTo mismatch, migrating event {} {} ", event.getClass().getName(), JsonObject.mapFrom(event).encodePrettily());
finalEvent = aggregator.delegate().transformFrom(eventSchemaVersion, JsonObject.mapFrom(event));
finalEvent = aggregator.delegate().migrate(eventSchemaVersion, JsonObject.mapFrom(event));
}
final var newAggregateState = (T) aggregator.delegate().apply(aggregateState, finalEvent);
LOGGER.debug("State after aggregation {}", newAggregateState);
Expand Down
46 changes: 46 additions & 0 deletions es4j-extensions/es4j-saga/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.es4j</groupId>
<artifactId>es4j-extensions</artifactId>
<version>0</version>
</parent>

<artifactId>es4j-saga</artifactId>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.es4j</groupId>
<artifactId>es4j-dependencies</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.es4j</groupId>
<artifactId>es4j-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.es4j</groupId>
<artifactId>es4j-sql</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.soabase.record-builder</groupId>
<artifactId>record-builder-processor</artifactId>
</dependency>
</dependencies>
</project>
14 changes: 14 additions & 0 deletions es4j-extensions/es4j-saga/src/main/java/io/es4j/saga/Saga.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.es4j.saga;

import java.util.List;

public interface Saga<T, R> {

List<Class<? extends SagaTransaction<T>>> transactionOrder();

T supplyPayload(R request);
default boolean async() {
return false;
}

}
169 changes: 169 additions & 0 deletions es4j-extensions/es4j-saga/src/main/java/io/es4j/saga/SagaManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package io.es4j.saga;

import io.es4j.sql.exceptions.NotFound;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.json.JsonObject;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

public record SagaManager<T extends SagaTrigger, R>(
Class<R> requestClass,
Class<T> payloadClass,
Saga<T, R> saga,
Set<? extends SagaTransaction<T>> transactions,
SagaStore sagaStore
) {

public boolean isMatch(String rClass) {
return requestClass.getName().equals(rClass);
}

public T supplyPayload(JsonObject request) {
return saga.supplyPayload(request.mapTo(requestClass));
}

public Uni<Void> process(JsonObject request, SagaStore sagaStore) {
final var payload = new AtomicReference<>(saga.supplyPayload(request.mapTo(requestClass)));
if (payload.get().fireAndForget()) {
handleSaga(sagaStore, payload)
.subscribe()
.with(UniHelper.NOOP);
return Uni.createFrom().voidItem();
}
return handleSaga(sagaStore, payload);
}

private Uni<Void> handleSaga(SagaStore sagaStore, AtomicReference<T> payload) {
return sagaStore.fetchSaga(payload.get().id())
.onFailure(NotFound.class).recoverWithNull().replaceWith(sagaRecord(payload))
.flatMap(sagaRecord -> switch (sagaRecord.state()) {
case STAGED -> Uni.createFrom().voidItem();
case INITIALIZED -> performSaga(sagaStore, payload, sagaRecord);
case COMMITTED -> rollback(sagaStore, payload, sagaRecord);
default -> Uni.createFrom().failure(new IllegalStateException());
})
.replaceWithVoid();
}


private SagaRecord sagaRecord(AtomicReference<T> payload) {
return new SagaRecord(
payload.get().id(),
SagaState.INITIALIZED,
transactions.stream().map(t -> new SagaTransactionRecord(t.name(), SagaTransactionState.INITIALIZED, null)).collect(Collectors.toSet()),
null,
JsonObject.mapFrom(payload.get()).getMap()
);
}

private Uni<?> rollback(SagaStore sagaStore, AtomicReference<T> payload, SagaRecord sagaRecord) {
return Uni.createFrom().voidItem();
}

private Uni<Void> performSaga(SagaStore sagaStore, AtomicReference<T> payload, SagaRecord sagaRecord) {
if (Objects.nonNull(sagaRecord.payload())) {
payload.set(JsonObject.mapFrom(sagaRecord.payload()).mapTo(payloadClass));
}
final var executionStack = new Stack<SagaTransaction<T>>();
final var rollbackStack = new Stack<SagaTransaction<T>>();
transactions.stream()
.filter(requiredTransaction -> sagaRecord.transactions().stream()
.noneMatch(executedTransaction -> executedTransaction.transactionName().equals(requiredTransaction.name()))
)
.forEach(executionStack::push);
return Multi.createFrom().iterable(executionStack)
.onItem().transformToUniAndConcatenate(transaction -> {
final var record = sagaRecord.transactions().stream().filter(t -> transaction.name().equals(t.transactionName())).findFirst().orElseThrow();
return processTransaction(payload, record, transaction, sagaRecord)
.onItemOrFailure().transformToUni(
(item, failure) -> {
rollbackStack.push(transaction);
if (Objects.nonNull(failure)) {
// todo interrupt flow trigger rollback
} else {

}
return null;
}
);
}
)
.collect().asList()
.onFailure().invoke(
throwable -> {

}
)
.replaceWithVoid();
}

private Uni<? extends T> processTransaction(AtomicReference<T> payload, SagaTransactionRecord sagaTransactionRecord, SagaTransaction<T> transaction, SagaRecord record) {
return switch (sagaTransactionRecord.state()) {
case INITIALIZED -> stage(record, sagaTransactionRecord, payload, transaction);
case STAGED -> commit(record, sagaTransactionRecord, payload, transaction);
case COMMIT_FAILURE, REVERT_FAILURE, STAGE_FAILURE -> rollback(record, sagaTransactionRecord, payload, transaction);
case COMMITTED, REVERTED -> Uni.createFrom().item(payload.get());
};
}

private Uni<T> rollback(SagaRecord sagaRecord, SagaTransactionRecord transactionRecord, AtomicReference<T> payload, SagaTransaction<T> transaction) {
return transaction.stage(payload.get()).map(payload::getAndSet)
.onFailure().retry().withBackOff(transaction.configuration().retryBackOff()).atMost(transaction.configuration().numberOfRetries())
.onItemOrFailure().call((item, failure) -> handleRollbackResult(sagaRecord, transactionRecord, item, failure));
}

private Uni<Void> handleRollbackResult(SagaRecord sagaRecord, SagaTransactionRecord transactionRecord, T item, Throwable failure) {
if (Objects.nonNull(failure)) {
final var tResult = SagaTransactionRecordBuilder.builder(transactionRecord).state(SagaTransactionState.REVERT_FAILURE).build();
sagaRecord.transactions().add(tResult);
} else {
final var tResult = SagaTransactionRecordBuilder.builder(transactionRecord).state(SagaTransactionState.REVERTED).build();
sagaRecord.transactions().add(tResult);
}
return sagaStore.update(SagaRecordBuilder.builder(sagaRecord.computeState())
.payload(JsonObject.mapFrom(item).getMap())
.build());
}

private Uni<T> stage(SagaRecord sagaRecord, SagaTransactionRecord transactionRecord, AtomicReference<T> payload, SagaTransaction<T> transaction) {
return transaction.stage(payload.get()).map(payload::getAndSet)
.onItemOrFailure().call((item, failure) -> handleStageResult(sagaRecord, transactionRecord, item, failure));
}

private Uni<Void> handleStageResult(SagaRecord sagaRecord, SagaTransactionRecord transactionRecord, T item, Throwable failure) {
if (Objects.nonNull(failure)) {
final var tResult = SagaTransactionRecordBuilder.builder(transactionRecord).state(SagaTransactionState.STAGE_FAILURE).build();
sagaRecord.transactions().add(tResult);
} else {
final var tResult = SagaTransactionRecordBuilder.builder(transactionRecord).state(SagaTransactionState.STAGED).build();
sagaRecord.transactions().add(tResult);
}
return sagaStore.update(SagaRecordBuilder.builder(sagaRecord.computeState())
.payload(JsonObject.mapFrom(item).getMap())
.build());
}

private Uni<T> commit(SagaRecord sagaRecord, SagaTransactionRecord transactionRecord, AtomicReference<T> payload, SagaTransaction<T> transaction) {
return transaction.commit(payload.get()).map(payload::getAndSet)
.onItemOrFailure().call((item, failure) -> handleCommitResult(sagaRecord, transactionRecord, item, failure));
}

private Uni<Void> handleCommitResult(SagaRecord sagaRecord, SagaTransactionRecord transactionRecord, T item, Throwable failure) {
if (Objects.nonNull(failure)) {
final var tResult = SagaTransactionRecordBuilder.builder(transactionRecord).state(SagaTransactionState.COMMIT_FAILURE).build();
sagaRecord.transactions().add(tResult);
} else {
final var tResult = SagaTransactionRecordBuilder.builder(transactionRecord).state(SagaTransactionState.COMMITTED).build();
sagaRecord.transactions().add(tResult);
}

return sagaStore.update(SagaRecordBuilder.builder(sagaRecord.computeState())
.payload(JsonObject.mapFrom(item).getMap())
.build());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.es4j.saga;


import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;

import java.util.List;

public class SagaOrchestrator {

List<SagaManager> sagas;
SagaStore sagaStore;



public Uni<Void> route(String className, JsonObject request) {
final var sagaWrapper = sagas.stream().filter(saga -> saga.isMatch(className)).findFirst().orElseThrow(() -> new IllegalArgumentException("Saga not found"));
final var payload = sagaWrapper.supplyPayload(request);

return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.es4j.saga;

import io.soabase.recordbuilder.core.RecordBuilder;

import java.util.Map;
import java.util.Set;


@RecordBuilder
public record SagaRecord(
String id,
SagaState state,
Set<SagaTransactionRecord> transactions,
Map<String, Object> trigger,
Map<String, Object> payload
) {

public SagaRecord computeState() {
if (transactions().stream().allMatch(t -> t.state() == SagaTransactionState.STAGED)) {
return SagaRecordBuilder.builder(this).state(SagaState.STAGED).build();
} else if (transactions().stream().allMatch(t -> t.state() == SagaTransactionState.COMMITTED)) {
return SagaRecordBuilder.builder(this).state(SagaState.COMMITTED).build();
} else if (transactions().stream().allMatch(t -> t.state() == SagaTransactionState.REVERTED)) {
return SagaRecordBuilder.builder(this).state(SagaState.REVERTED).build();
} else if (transactions().stream().anyMatch(t -> t.state().name().contains("FAILURE"))) {
return SagaRecordBuilder.builder(this).state(SagaState.FAILED).build();
} else {
return SagaRecordBuilder.builder(this).state(SagaState.INITIALIZED).build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.es4j.saga;

public enum SagaState {
INITIALIZED, STAGED, COMMITTED, REVERTED, FAILED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.es4j.saga;

import io.smallrye.mutiny.Uni;


public interface SagaStore {


Uni<SagaRecord> fetchSaga(String id);
Uni<Void> update(SagaRecord command);

}
Loading

0 comments on commit 07420d9

Please sign in to comment.