diff --git a/changelog.md b/changelog.md index cb3203e06..18f7d2748 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,6 @@ +### Changelog next version +* Fixed so that inserting events with "any" WriteCondition never fails even if more than two threads are writing events to the same stream at the same time. (Fixed in MongoEventStore and SpringMongoEventStore) + ### 0.19.5 (2024-09-27) * Fixed so that blocking and reactive EventStoreQueries really uses `SortBy.unsorted()` by default as was intended in the previous release. diff --git a/eventstore/mongodb/native/pom.xml b/eventstore/mongodb/native/pom.xml index 078b881dc..eb48967c9 100644 --- a/eventstore/mongodb/native/pom.xml +++ b/eventstore/mongodb/native/pom.xml @@ -41,6 +41,11 @@ mongodb-native-filter-bsonfilter-conversion 0.19.6-SNAPSHOT + + org.occurrent + retry + 0.19.6-SNAPSHOT + org.mongodb mongodb-driver-sync diff --git a/eventstore/mongodb/native/src/main/java/org/occurrent/eventstore/mongodb/nativedriver/MongoEventStore.java b/eventstore/mongodb/native/src/main/java/org/occurrent/eventstore/mongodb/nativedriver/MongoEventStore.java index 7c28a3251..5482a8223 100644 --- a/eventstore/mongodb/native/src/main/java/org/occurrent/eventstore/mongodb/nativedriver/MongoEventStore.java +++ b/eventstore/mongodb/native/src/main/java/org/occurrent/eventstore/mongodb/nativedriver/MongoEventStore.java @@ -37,18 +37,17 @@ import org.occurrent.eventstore.mongodb.internal.MongoExceptionTranslator.WriteContext; import org.occurrent.eventstore.mongodb.internal.StreamVersionDiff; import org.occurrent.filter.Filter; -import org.occurrent.functionalsupport.internal.FunctionalSupport.Pair; import org.occurrent.mongodb.spring.filterbsonfilterconversion.internal.FilterToBsonFilterConverter; import org.occurrent.mongodb.timerepresentation.TimeRepresentation; +import org.occurrent.retry.RetryStrategy; import java.net.URI; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.LongStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -65,7 +64,7 @@ import static org.occurrent.eventstore.mongodb.internal.MongoExceptionTranslator.translateException; import static org.occurrent.eventstore.mongodb.internal.OccurrentCloudEventMongoDocumentMapper.convertToCloudEvent; import static org.occurrent.eventstore.mongodb.internal.OccurrentCloudEventMongoDocumentMapper.convertToDocument; -import static org.occurrent.functionalsupport.internal.FunctionalSupport.zip; +import static org.occurrent.functionalsupport.internal.FunctionalSupport.mapWithIndex; /** * This is an {@link EventStore} that stores events in MongoDB using the "native" synchronous java driver MongoDB. @@ -184,32 +183,53 @@ public WriteResult write(String streamId, WriteCondition writeCondition, Stream< throw new IllegalArgumentException(WriteCondition.class.getSimpleName() + " cannot be null"); } - try (ClientSession clientSession = mongoClient.startSession()) { - StreamVersionDiff streamVersionDiff = clientSession.withTransaction(() -> { - long currentStreamVersion = currentStreamVersion(streamId, clientSession); - - if (!isFulfilled(currentStreamVersion, writeCondition)) { - throw new WriteConditionNotFulfilledException(streamId, currentStreamVersion, writeCondition, String.format("%s was not fulfilled. Expected version %s but was %s.", WriteCondition.class.getSimpleName(), writeCondition, currentStreamVersion)); - } - - List cloudEventDocuments = zip(LongStream.iterate(currentStreamVersion + 1, i -> i + 1).boxed(), events, Pair::new) - .map(pair -> convertToDocument(timeRepresentation, streamId, pair.t1, pair.t2)) - .collect(Collectors.toList()); - - if (cloudEventDocuments.isEmpty()) { - return StreamVersionDiff.of(currentStreamVersion, currentStreamVersion); - } else { - try { - eventCollection.insertMany(clientSession, cloudEventDocuments); - } catch (MongoException e) { - throw translateException(new WriteContext(streamId, currentStreamVersion, writeCondition), e); - } - final long newStreamVersion = cloudEventDocuments.get(cloudEventDocuments.size() - 1).getLong(STREAM_VERSION); - return StreamVersionDiff.of(currentStreamVersion, newStreamVersion); - } - }, transactionOptions); - return new WriteResult(streamId, streamVersionDiff.oldStreamVersion, streamVersionDiff.newStreamVersion); + // This is an (ugly) hack to fix problems when write condition is "any" and we have parallel writes + // to the same stream. This will cause MongoDB to throw an exception since we're in a transaction. + // But in this case we should just retry since if the user has specified "any" as stream version + // he/she will expect that the events are just written to the event store and WriteConditionNotFulfilledException + // should not be thrown. Since the write method takes a "Stream" of events we can't simply retry since, + // on the first retry, the stream would already have been consumed. Thus, we preemptively convert the "events" + // stream into a list when write condition is any. This way, we can retry without errors. + final BiFunction, Long, List> convertCloudEventsToDocuments; + if (writeCondition.isAnyStreamVersion()) { + List cached = events.toList(); + convertCloudEventsToDocuments = (cloudEvents, currentStreamVersion) -> convertCloudEventsToDocuments(streamId, cached.stream(), currentStreamVersion); + } else { + convertCloudEventsToDocuments = (cloudEvents, currentStreamVersion) -> convertCloudEventsToDocuments(streamId, cloudEvents, currentStreamVersion); } + + Supplier writeEvents = () -> { + try (ClientSession clientSession = mongoClient.startSession()) { + StreamVersionDiff streamVersionDiff = clientSession.withTransaction(() -> { + long currentStreamVersion = currentStreamVersion(streamId, clientSession); + + if (!isFulfilled(currentStreamVersion, writeCondition)) { + throw new WriteConditionNotFulfilledException(streamId, currentStreamVersion, writeCondition, String.format("%s was not fulfilled. Expected version %s but was %s.", WriteCondition.class.getSimpleName(), writeCondition, currentStreamVersion)); + } + + List cloudEventDocuments = convertCloudEventsToDocuments.apply(events, currentStreamVersion); + + if (cloudEventDocuments.isEmpty()) { + return StreamVersionDiff.of(currentStreamVersion, currentStreamVersion); + } else { + try { + eventCollection.insertMany(clientSession, cloudEventDocuments); + } catch (MongoException e) { + throw translateException(new WriteContext(streamId, currentStreamVersion, writeCondition), e); + } + final long newStreamVersion = cloudEventDocuments.get(cloudEventDocuments.size() - 1).getLong(STREAM_VERSION); + return StreamVersionDiff.of(currentStreamVersion, newStreamVersion); + } + }, transactionOptions); + return new WriteResult(streamId, streamVersionDiff.oldStreamVersion, streamVersionDiff.newStreamVersion); + } + }; + + return RetryStrategy.retry().retryIf(__ -> writeCondition.isAnyStreamVersion()).execute(writeEvents); + } + + private List convertCloudEventsToDocuments(String streamId, Stream cloudEvents, long currentStreamVersion) { + return mapWithIndex(cloudEvents, currentStreamVersion, pair -> convertToDocument(timeRepresentation, streamId, pair.t1, pair.t2)).toList(); } private static boolean isFulfilled(long currentStreamVersion, WriteCondition writeCondition) { diff --git a/eventstore/mongodb/native/src/test/java/org/occurrent/eventstore/mongodb/nativedriver/MongoEventStoreTest.java b/eventstore/mongodb/native/src/test/java/org/occurrent/eventstore/mongodb/nativedriver/MongoEventStoreTest.java index ed47ca725..e4804f475 100644 --- a/eventstore/mongodb/native/src/test/java/org/occurrent/eventstore/mongodb/nativedriver/MongoEventStoreTest.java +++ b/eventstore/mongodb/native/src/test/java/org/occurrent/eventstore/mongodb/nativedriver/MongoEventStoreTest.java @@ -765,6 +765,56 @@ void parallel_writes_to_event_store_throws_WriteConditionNotFulfilledException() // Then Awaitility.await().atMost(4, SECONDS).untilAsserted(() -> assertThat(exception).hasValue(new WriteConditionNotFulfilledException("name", 1, writeCondition, "WriteCondition was not fulfilled. Expected version to be equal to 0 but was 1."))); } + + @EnabledOnOs(MAC) + @RepeatedIfExceptionsTest(repeats = 5, suspend = 200, minSuccess = 5) + void parallel_writes_to_event_store_does_not_throw_WriteConditionNotFulfilledException_when_write_condition_is_any() { + // Given + CyclicBarrier cyclicBarrier = new CyclicBarrier(3); + WriteCondition writeCondition = WriteCondition.anyStreamVersion(); + AtomicReference exception = new AtomicReference<>(); + + // When + new Thread(() -> { + NameWasChanged event = new NameWasChanged(UUID.randomUUID().toString(), now, "name", "Ikk Doe"); + try { + await(cyclicBarrier); + persist("name", writeCondition, event); + } catch (Exception e) { + exception.set(e); + } + }).start(); + + new Thread(() -> { + NameWasChanged event = new NameWasChanged(UUID.randomUUID().toString(), now, "name", "Ikkster Doe"); + try { + await(cyclicBarrier); + persist("name", writeCondition, event); + } catch (Exception e) { + exception.set(e); + } + }).start(); + + new Thread(() -> { + NameWasChanged event = new NameWasChanged(UUID.randomUUID().toString(), now, "name", "Ikkster Doe2"); + try { + await(cyclicBarrier); + persist("name", writeCondition, event); + } catch (Exception e) { + exception.set(e); + } + }).start(); + + // Then + Awaitility.await().atMost(4, SECONDS).untilAsserted(() -> { + EventStream eventStream = eventStore.read("name"); + assertThat(deserialize(eventStream.events())) + .extracting(it -> (NameWasChanged) it) + .extracting(NameWasChanged::name) + .contains("Ikk Doe", "Ikkster Doe", "Ikkster Doe2"); + }); + assertThat(exception.get()).isNull(); + } } @Nested diff --git a/eventstore/mongodb/spring/blocking/pom.xml b/eventstore/mongodb/spring/blocking/pom.xml index 53040d26b..4fb2d1332 100644 --- a/eventstore/mongodb/spring/blocking/pom.xml +++ b/eventstore/mongodb/spring/blocking/pom.xml @@ -46,6 +46,11 @@ mongodb-spring-sort-conversion 0.19.6-SNAPSHOT + + org.occurrent + retry + 0.19.6-SNAPSHOT + org.springframework.data spring-data-mongodb diff --git a/eventstore/mongodb/spring/blocking/src/main/java/org/occurrent/eventstore/mongodb/spring/blocking/SpringMongoEventStore.java b/eventstore/mongodb/spring/blocking/src/main/java/org/occurrent/eventstore/mongodb/spring/blocking/SpringMongoEventStore.java index 732f58265..49ca1af93 100644 --- a/eventstore/mongodb/spring/blocking/src/main/java/org/occurrent/eventstore/mongodb/spring/blocking/SpringMongoEventStore.java +++ b/eventstore/mongodb/spring/blocking/src/main/java/org/occurrent/eventstore/mongodb/spring/blocking/SpringMongoEventStore.java @@ -36,6 +36,7 @@ import org.occurrent.filter.Filter; import org.occurrent.mongodb.spring.filterqueryconversion.internal.FilterConverter; import org.occurrent.mongodb.timerepresentation.TimeRepresentation; +import org.occurrent.retry.RetryStrategy; import org.springframework.dao.DataAccessException; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoTemplate; @@ -147,17 +148,9 @@ public WriteResult write(String streamId, WriteCondition writeCondition, Stream< return new StreamVersionDiff(currentStreamVersion, newStreamVersion); }; - StreamVersionDiff streamVersion; - try { - streamVersion = transactionTemplate.execute(writeLogic); - } catch (WriteConditionNotFulfilledException e) { - if (writeCondition.isAnyStreamVersion()) { - // See comments on "convertCloudEventsToDocuments" above - streamVersion = transactionTemplate.execute(writeLogic); - } else { - throw e; - } - } + StreamVersionDiff streamVersion = RetryStrategy.retry() + .retryIf(__ -> writeCondition.isAnyStreamVersion()) + .execute(() -> transactionTemplate.execute(writeLogic)); return new WriteResult(streamId, streamVersion.oldStreamVersion, streamVersion.newStreamVersion); } diff --git a/eventstore/mongodb/spring/blocking/src/test/java/org/occurrent/eventstore/mongodb/spring/blocking/SpringMongoEventStoreTest.java b/eventstore/mongodb/spring/blocking/src/test/java/org/occurrent/eventstore/mongodb/spring/blocking/SpringMongoEventStoreTest.java index 01e65b707..44719ae2c 100644 --- a/eventstore/mongodb/spring/blocking/src/test/java/org/occurrent/eventstore/mongodb/spring/blocking/SpringMongoEventStoreTest.java +++ b/eventstore/mongodb/spring/blocking/src/test/java/org/occurrent/eventstore/mongodb/spring/blocking/SpringMongoEventStoreTest.java @@ -758,7 +758,7 @@ void parallel_writes_to_event_store_throws_WriteConditionNotFulfilledException_w @RepeatedIfExceptionsTest(repeats = 5, suspend = 200, minSuccess = 5) void parallel_writes_to_event_store_does_not_throw_WriteConditionNotFulfilledException_when_write_condition_is_any() { // Given - CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + CyclicBarrier cyclicBarrier = new CyclicBarrier(3); WriteCondition writeCondition = WriteCondition.anyStreamVersion(); AtomicReference exception = new AtomicReference<>(); @@ -783,13 +783,23 @@ void parallel_writes_to_event_store_does_not_throw_WriteConditionNotFulfilledExc } }).start(); + new Thread(() -> { + NameWasChanged event = new NameWasChanged(UUID.randomUUID().toString(), now, "name", "Ikkster Doe2"); + try { + await(cyclicBarrier); + persist("name", writeCondition, event); + } catch (Exception e) { + exception.set(e); + } + }).start(); + // Then Awaitility.await().atMost(4, SECONDS).untilAsserted(() -> { EventStream eventStream = eventStore.read("name"); assertThat(deserialize(eventStream.events())) .extracting(it -> (NameWasChanged) it) .extracting(NameWasChanged::name) - .contains("Ikk Doe", "Ikkster Doe"); + .contains("Ikk Doe", "Ikkster Doe", "Ikkster Doe2"); }); assertThat(exception.get()).isNull(); }