Skip to content

Commit

Permalink
- projection service
Browse files Browse the repository at this point in the history
- skeleton for business rules service
- fixes to pom
- projection route
  • Loading branch information
Reef3rm4n committed Jul 9, 2023
1 parent 516dda0 commit ee9d719
Show file tree
Hide file tree
Showing 57 changed files with 631 additions and 1,000 deletions.
5 changes: 5 additions & 0 deletions es4j-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@
<groupId>io.soabase.record-builder</groupId>
<artifactId>record-builder-processor</artifactId>
</dependency>
<dependency>
<groupId>com.github.victools</groupId>
<artifactId>jsonschema-generator</artifactId>
<version>4.31.1</version>
</dependency>
<!-- Logging Dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
2 changes: 1 addition & 1 deletion es4j-core/src/main/java/io/es4j/Aggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ default E transformFrom(int schemaVersion, JsonObject event) {
ErrorSource.LOGIC,
Aggregator.class.getName(),
"missing schema versionTo " + schemaVersion,
"could not transform event",
"could not transform event %s to schema version %d".formatted(event, schemaVersion),
"aggregate.event.transform",
500
)
Expand Down
12 changes: 12 additions & 0 deletions es4j-core/src/main/java/io/es4j/Behaviour.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.es4j;

import java.util.Collections;
import java.util.List;

/**
Expand All @@ -23,4 +24,15 @@ public interface Behaviour<T extends Aggregate, C extends Command> {
*/
List<Event> process(T state, C command);


/**
* Retrieves the list of roles required to execute this command.
* This method provides a default implementation that returns an empty list.
*
* @return A list of required roles as strings. Default is an empty list.
*/
default List<String> requiredRoles() {
return Collections.emptyList();
}

}
10 changes: 0 additions & 10 deletions es4j-core/src/main/java/io/es4j/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,6 @@ default String uniqueId() {
return UUID.randomUUID().toString();
}

/**
* Retrieves the list of roles required to execute this command.
* This method provides a default implementation that returns an empty list.
*
* @return A list of required roles as strings. Default is an empty list.
*/
default List<String> requiredRoles() {
return Collections.emptyList();
}

/**
* Retrieves the options associated with this command.
* This method provides a default implementation that returns the default options.
Expand Down
1 change: 1 addition & 0 deletions es4j-core/src/main/java/io/es4j/LiveStateProjection.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.es4j.core.objects.AggregateState;
import io.smallrye.mutiny.Uni;

// create javadoc for this interface
public interface LiveStateProjection<T extends Aggregate> {

Uni<Void> update(AggregateState<T> currentState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
public record AggregatorWrap<T extends Aggregate, E extends Event> (
Aggregator<T, E> delegate,
Class<T> entityAggregateClass,
Class<?> eventClass
Class<E> eventClass
){
}
11 changes: 0 additions & 11 deletions es4j-core/src/main/java/io/es4j/core/objects/Command.java

This file was deleted.

39 changes: 0 additions & 39 deletions es4j-core/src/main/java/io/es4j/core/objects/CommandHeaders.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
@RecordBuilder
public record PublicQueryOptions(
public record DefaultFilters(
boolean desc,
Instant creationDateFrom,
Instant creationDateTo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class EventbusLiveStreams {
public static final String EVENT_STREAM = "event-stream";

public static String stateLiveStream(Class<? extends Aggregate> aggregateClass, String aggregateId, String tenantId) {
return new StringJoiner("/")
return new StringJoiner("/", "/","")
.add(STATE_STREAM)
.add(CommandHandler.camelToKebab(aggregateClass.getSimpleName()))
.add(tenantId)
Expand All @@ -21,7 +21,7 @@ public static String stateLiveStream(Class<? extends Aggregate> aggregateClass,
}

public static String eventLiveStream(Class<? extends Aggregate> aggregateClass, String aggregateId, String tenantId) {
return new StringJoiner("/")
return new StringJoiner("/","/","")
.add(EVENT_STREAM)
.add(CommandHandler.camelToKebab(aggregateClass.getSimpleName()))
.add(tenantId)
Expand Down
23 changes: 0 additions & 23 deletions es4j-core/src/main/java/io/es4j/core/objects/JournalOffset.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ public record LoadAggregate(
String aggregateId,
String tenant,
Long versionTo,
Instant dateTo,
CommandHeaders headers
Instant dateTo

) implements Command {
}
28 changes: 28 additions & 0 deletions es4j-core/src/main/java/io/es4j/core/objects/Offset.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.es4j.core.objects;


import io.es4j.infrastructure.models.Event;
import io.soabase.recordbuilder.core.RecordBuilder;
import io.vertx.core.shareddata.Shareable;

import java.io.Serializable;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;

@RecordBuilder
public record Offset(
String consumer,
String tenantId,
Long idOffSet,
Long eventVersionOffset,
Instant lastUpdate,
Instant creationDate
) implements Serializable, Shareable {
public Offset updateOffset(List<io.es4j.infrastructure.models.Event> events) {
final var eventIdOffset = events.stream().map(Event::journalOffset)
.max(Comparator.naturalOrder())
.orElseThrow();
return new Offset(consumer, tenantId, eventIdOffset, null, Instant.now(), creationDate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.soabase.recordbuilder.core.RecordBuilder;

@RecordBuilder
public record JournalOffsetKey(
public record OffsetKey(
String consumer,
String tenantId
) {
Expand Down
12 changes: 0 additions & 12 deletions es4j-core/src/main/java/io/es4j/core/objects/ScheduledCommand.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


import io.es4j.core.objects.AggregateEvent;
import io.es4j.core.objects.Offset;
import io.es4j.core.objects.OffsetKey;
import io.es4j.infrastructure.EventStore;
import io.es4j.infrastructure.OffsetStore;
import io.es4j.infrastructure.models.Event;
Expand All @@ -15,8 +17,6 @@
import io.es4j.PollingEventProjection;
import io.es4j.infrastructure.misc.EventParser;
import io.es4j.infrastructure.models.EventStream;
import io.es4j.core.objects.JournalOffset;
import io.es4j.core.objects.JournalOffsetKey;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -54,26 +54,26 @@ public Uni<Void> performTask() {
.replaceWithVoid();
}

private JournalOffsetKey getOffset() {
return new JournalOffsetKey(pollingEventProjection.getClass().getName(), pollingEventProjection.tenant());
private OffsetKey getOffset() {
return new OffsetKey(pollingEventProjection.getClass().getName(), pollingEventProjection.tenant());
}

private static EventStream streamStatement(PollingEventProjection pollingEventProjection, JournalOffset journalOffset) {
private static EventStream streamStatement(PollingEventProjection pollingEventProjection, Offset offset) {
AtomicReference<EventStream> eventStream = new AtomicReference<>();
pollingEventProjection.filter().ifPresentOrElse(
filter -> eventStream.set(
EventStreamBuilder.builder()
.events(filter.events())
.tenantId(pollingEventProjection.tenant())
.offset(journalOffset.idOffSet())
.offset(offset.idOffSet())
.batchSize(1000)
.tags(filter.tags())
.build()
),
() -> eventStream.set(
EventStreamBuilder.builder()
.tenantId(pollingEventProjection.tenant())
.offset(journalOffset.idOffSet())
.offset(offset.idOffSet())
.batchSize(1000)
.build()
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.es4j.core.tasks;

import io.es4j.Aggregate;
import io.es4j.core.objects.CommandHeaders;
import io.es4j.infrastructure.EventStore;
import io.es4j.infrastructure.OffsetStore;
import io.es4j.infrastructure.models.EventStreamBuilder;
Expand All @@ -15,7 +14,7 @@
import io.smallrye.mutiny.tuples.Tuple2;
import io.es4j.core.objects.LoadAggregate;
import io.es4j.infrastructure.proxy.AggregateEventBusPoxy;
import io.es4j.core.objects.JournalOffsetKey;
import io.es4j.core.objects.OffsetKey;
import io.es4j.core.objects.StateProjectionWrapper;

import java.util.List;
Expand Down Expand Up @@ -50,7 +49,7 @@ public Uni<Void> performTask() {
// polling will have to be moved to a queue
// a new entry must be inserted in the queue for each one of the updated streams
stateProjectionWrapper.logger().debug("Polling events");
return offsetStore.get(new JournalOffsetKey(stateProjectionWrapper.pollingStateProjection().getClass().getName(), "default"))
return offsetStore.get(new OffsetKey(stateProjectionWrapper.pollingStateProjection().getClass().getName(), "default"))
.flatMap(journalOffset -> {
stateProjectionWrapper.logger().debug("Journal idOffset at {}", journalOffset.idOffSet());
return eventStore.fetch(EventStreamBuilder.builder()
Expand All @@ -71,8 +70,7 @@ public Uni<Void> performTask() {
tuple2.getItem1(),
tuple2.getItem2(),
null,
null,
CommandHeaders.defaultHeaders()
null
)
)
.flatMap(stateProjectionWrapper::update)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


import io.es4j.core.objects.*;
import io.es4j.infrastructure.bus.ProjectionService;
import io.es4j.infrastructure.bus.Es4jService;
import io.es4j.infrastructure.misc.Es4jServiceLoader;
import io.reactiverse.contextual.logging.ContextualData;
import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -47,7 +47,7 @@ public class AggregateVerticle<T extends Aggregate> extends AbstractVerticle imp
private List<BehaviourWrap> behaviourWraps;
private List<AggregatorWrap> aggregatorWraps;
private Infrastructure infrastructure;
private ProjectionService projectionService;
private Es4jService es4jService;

public AggregateVerticle(
final Class<T> aggregateClass,
Expand Down Expand Up @@ -81,13 +81,15 @@ public Uni<Void> asyncStart() {
infrastructure,
aggregateConfiguration
);
this.projectionService = new ProjectionService(
this.es4jService = new Es4jService(
infrastructure.offsetStore(),
infrastructure.eventStore(),
aggregateClass
aggregateClass,
aggregatorWraps,
behaviourWraps
);
// todo as behaviours are loaded also register consumer in order to avoid unsafe operation ?
return projectionService.register(vertx).flatMap(avoid -> registerAggregateBus());
return es4jService.register(vertx).flatMap(avoid -> registerAggregateBus());
}

private Uni<Void> registerAggregateBus() {
Expand Down
15 changes: 9 additions & 6 deletions es4j-core/src/main/java/io/es4j/infrastructure/OffsetStore.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package io.es4j.infrastructure;

import io.es4j.Aggregate;
import io.es4j.core.objects.Offset;
import io.es4j.infrastructure.models.OffsetFilter;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.es4j.core.objects.JournalOffset;
import io.es4j.core.objects.JournalOffsetKey;
import io.es4j.core.objects.OffsetKey;
import io.vertx.mutiny.core.Vertx;

public interface OffsetStore {
import java.util.List;

Uni<JournalOffset> put(JournalOffset journalOffset);
Uni<JournalOffset> get(JournalOffsetKey journalOffset);
public interface OffsetStore {

Uni<JournalOffset> reset(JournalOffset journalOffset);
Uni<Offset> put(Offset offset);
Uni<Offset> get(OffsetKey journalOffset);
Uni<Offset> reset(Offset offset);

Uni<List<Offset>> projections(OffsetFilter offsetFilter);
Uni<Void> stop();
void start(Class<? extends Aggregate> aggregateClass, Vertx vertx, JsonObject configuration);
Uni<Void> setup(Class<? extends Aggregate> aggregateClass, Vertx vertx, JsonObject configuration);
Expand Down
Loading

0 comments on commit ee9d719

Please sign in to comment.