diff --git a/.github/workflows/maven-publish.yml b/.github/workflows/maven-publish.yml index 73dbbbf..6066440 100644 --- a/.github/workflows/maven-publish.yml +++ b/.github/workflows/maven-publish.yml @@ -15,9 +15,6 @@ jobs: distribution: 'adopt' java-version: '17' - - name: Build with Maven - run: mvn -B package --file pom.xml -DskipTests - - name: Set up Apache Maven Central uses: actions/setup-java@v3 with: # running setup-java again overwrites the settings.xml @@ -30,7 +27,7 @@ jobs: gpg-passphrase: GPG_PASSPHRASE # env variable for GPG private key passphrase - name: Publish to Apache Maven Central - run: mvn deploy -DskipTests + run: mvn clean deploy -DskipTests env: OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }} OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }} diff --git a/es4j-core/src/main/java/io/es4j/infrastructure/bus/ProjectionService.java b/es4j-core/src/main/java/io/es4j/infrastructure/bus/ProjectionService.java index fcd4ba4..e7aa7d9 100644 --- a/es4j-core/src/main/java/io/es4j/infrastructure/bus/ProjectionService.java +++ b/es4j-core/src/main/java/io/es4j/infrastructure/bus/ProjectionService.java @@ -5,6 +5,7 @@ import io.es4j.core.objects.Es4jError; import io.es4j.core.objects.JournalOffsetBuilder; import io.es4j.core.objects.JournalOffsetKey; +import io.es4j.core.projections.EventStreamListener; import io.es4j.infrastructure.EventStore; import io.es4j.infrastructure.OffsetStore; import io.es4j.infrastructure.models.EventStreamBuilder; @@ -14,6 +15,8 @@ import io.vertx.core.json.JsonObject; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.core.eventbus.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Objects; @@ -26,6 +29,9 @@ public class ProjectionService { private final EventStore eventStore; private final Class aClass; + protected static final Logger LOGGER = LoggerFactory.getLogger(ProjectionService.class); + + public ProjectionService(OffsetStore offsetStore, EventStore eventStore, Class aClass) { this.offsetStore = offsetStore; this.eventStore = eventStore; @@ -79,7 +85,7 @@ private static void handleThrowable(Message message, Throwable throw } private void handle(Throwable throwable) { - + LOGGER.error("Unhandled exception", throwable); } public Uni> next(ProjectionStream projectionStream) { @@ -97,6 +103,9 @@ public Uni> next(ProjectionStream proj .aggregateIds(projectionStream.aggregateIds()) .build() ) + .flatMap(events -> offsetStore.put(journalOffset.updateOffset(events)) + .replaceWith(events) + ) ); }