Skip to content

Commit

Permalink
- adds offset handling in projection service
Browse files Browse the repository at this point in the history
  • Loading branch information
Reef3rm4n committed Jul 3, 2023
1 parent 94e26c0 commit 8382925
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/maven-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ jobs:
distribution: 'adopt'
java-version: '17'

- name: Build with Maven
run: mvn -B package --file pom.xml -DskipTests

- name: Set up Apache Maven Central
uses: actions/setup-java@v3
with: # running setup-java again overwrites the settings.xml
Expand All @@ -30,7 +27,7 @@ jobs:
gpg-passphrase: GPG_PASSPHRASE # env variable for GPG private key passphrase

- name: Publish to Apache Maven Central
run: mvn deploy -DskipTests
run: mvn clean deploy -DskipTests
env:
OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }}
OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.es4j.core.objects.Es4jError;
import io.es4j.core.objects.JournalOffsetBuilder;
import io.es4j.core.objects.JournalOffsetKey;
import io.es4j.core.projections.EventStreamListener;
import io.es4j.infrastructure.EventStore;
import io.es4j.infrastructure.OffsetStore;
import io.es4j.infrastructure.models.EventStreamBuilder;
Expand All @@ -14,6 +15,8 @@
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.eventbus.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Objects;
Expand All @@ -26,6 +29,9 @@ public class ProjectionService {
private final EventStore eventStore;
private final Class<? extends Aggregate> aClass;

protected static final Logger LOGGER = LoggerFactory.getLogger(ProjectionService.class);


public ProjectionService(OffsetStore offsetStore, EventStore eventStore, Class<? extends Aggregate> aClass) {
this.offsetStore = offsetStore;
this.eventStore = eventStore;
Expand Down Expand Up @@ -79,7 +85,7 @@ private static void handleThrowable(Message<JsonObject> message, Throwable throw
}

private void handle(Throwable throwable) {

LOGGER.error("Unhandled exception", throwable);
}

public Uni<List<io.es4j.infrastructure.models.Event>> next(ProjectionStream projectionStream) {
Expand All @@ -97,6 +103,9 @@ public Uni<List<io.es4j.infrastructure.models.Event>> next(ProjectionStream proj
.aggregateIds(projectionStream.aggregateIds())
.build()
)
.flatMap(events -> offsetStore.put(journalOffset.updateOffset(events))
.replaceWith(events)
)
);
}

Expand Down

0 comments on commit 8382925

Please sign in to comment.