Skip to content

Commit

Permalink
Fixed so that inserting events with "any" WriteCondition never fails …
Browse files Browse the repository at this point in the history
…even if more than two threads are writing events to the same stream at the same time. (Fixed in MongoEventStore and SpringMongoEventStore)
  • Loading branch information
johanhaleby committed Oct 11, 2024
1 parent efe97a0 commit fe99e0b
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 42 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### Changelog next version
* Fixed so that inserting events with "any" WriteCondition never fails even if more than two threads are writing events to the same stream at the same time. (Fixed in MongoEventStore and SpringMongoEventStore)

### 0.19.5 (2024-09-27)
* Fixed so that blocking and reactive EventStoreQueries really uses `SortBy.unsorted()` by default as was intended in the previous release.

Expand Down
5 changes: 5 additions & 0 deletions eventstore/mongodb/native/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
<artifactId>mongodb-native-filter-bsonfilter-conversion</artifactId>
<version>0.19.6-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>retry</artifactId>
<version>0.19.6-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,17 @@
import org.occurrent.eventstore.mongodb.internal.MongoExceptionTranslator.WriteContext;
import org.occurrent.eventstore.mongodb.internal.StreamVersionDiff;
import org.occurrent.filter.Filter;
import org.occurrent.functionalsupport.internal.FunctionalSupport.Pair;
import org.occurrent.mongodb.spring.filterbsonfilterconversion.internal.FilterToBsonFilterConverter;
import org.occurrent.mongodb.timerepresentation.TimeRepresentation;
import org.occurrent.retry.RetryStrategy;

import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand All @@ -65,7 +64,7 @@
import static org.occurrent.eventstore.mongodb.internal.MongoExceptionTranslator.translateException;
import static org.occurrent.eventstore.mongodb.internal.OccurrentCloudEventMongoDocumentMapper.convertToCloudEvent;
import static org.occurrent.eventstore.mongodb.internal.OccurrentCloudEventMongoDocumentMapper.convertToDocument;
import static org.occurrent.functionalsupport.internal.FunctionalSupport.zip;
import static org.occurrent.functionalsupport.internal.FunctionalSupport.mapWithIndex;

/**
* This is an {@link EventStore} that stores events in MongoDB using the "native" synchronous java driver MongoDB.
Expand Down Expand Up @@ -184,32 +183,53 @@ public WriteResult write(String streamId, WriteCondition writeCondition, Stream<
throw new IllegalArgumentException(WriteCondition.class.getSimpleName() + " cannot be null");
}

try (ClientSession clientSession = mongoClient.startSession()) {
StreamVersionDiff streamVersionDiff = clientSession.withTransaction(() -> {
long currentStreamVersion = currentStreamVersion(streamId, clientSession);

if (!isFulfilled(currentStreamVersion, writeCondition)) {
throw new WriteConditionNotFulfilledException(streamId, currentStreamVersion, writeCondition, String.format("%s was not fulfilled. Expected version %s but was %s.", WriteCondition.class.getSimpleName(), writeCondition, currentStreamVersion));
}

List<Document> cloudEventDocuments = zip(LongStream.iterate(currentStreamVersion + 1, i -> i + 1).boxed(), events, Pair::new)
.map(pair -> convertToDocument(timeRepresentation, streamId, pair.t1, pair.t2))
.collect(Collectors.toList());

if (cloudEventDocuments.isEmpty()) {
return StreamVersionDiff.of(currentStreamVersion, currentStreamVersion);
} else {
try {
eventCollection.insertMany(clientSession, cloudEventDocuments);
} catch (MongoException e) {
throw translateException(new WriteContext(streamId, currentStreamVersion, writeCondition), e);
}
final long newStreamVersion = cloudEventDocuments.get(cloudEventDocuments.size() - 1).getLong(STREAM_VERSION);
return StreamVersionDiff.of(currentStreamVersion, newStreamVersion);
}
}, transactionOptions);
return new WriteResult(streamId, streamVersionDiff.oldStreamVersion, streamVersionDiff.newStreamVersion);
// This is an (ugly) hack to fix problems when write condition is "any" and we have parallel writes
// to the same stream. This will cause MongoDB to throw an exception since we're in a transaction.
// But in this case we should just retry since if the user has specified "any" as stream version
// he/she will expect that the events are just written to the event store and WriteConditionNotFulfilledException
// should not be thrown. Since the write method takes a "Stream" of events we can't simply retry since,
// on the first retry, the stream would already have been consumed. Thus, we preemptively convert the "events"
// stream into a list when write condition is any. This way, we can retry without errors.
final BiFunction<Stream<CloudEvent>, Long, List<Document>> convertCloudEventsToDocuments;
if (writeCondition.isAnyStreamVersion()) {
List<CloudEvent> cached = events.toList();
convertCloudEventsToDocuments = (cloudEvents, currentStreamVersion) -> convertCloudEventsToDocuments(streamId, cached.stream(), currentStreamVersion);
} else {
convertCloudEventsToDocuments = (cloudEvents, currentStreamVersion) -> convertCloudEventsToDocuments(streamId, cloudEvents, currentStreamVersion);
}

Supplier<WriteResult> writeEvents = () -> {
try (ClientSession clientSession = mongoClient.startSession()) {
StreamVersionDiff streamVersionDiff = clientSession.withTransaction(() -> {
long currentStreamVersion = currentStreamVersion(streamId, clientSession);

if (!isFulfilled(currentStreamVersion, writeCondition)) {
throw new WriteConditionNotFulfilledException(streamId, currentStreamVersion, writeCondition, String.format("%s was not fulfilled. Expected version %s but was %s.", WriteCondition.class.getSimpleName(), writeCondition, currentStreamVersion));
}

List<Document> cloudEventDocuments = convertCloudEventsToDocuments.apply(events, currentStreamVersion);

if (cloudEventDocuments.isEmpty()) {
return StreamVersionDiff.of(currentStreamVersion, currentStreamVersion);
} else {
try {
eventCollection.insertMany(clientSession, cloudEventDocuments);
} catch (MongoException e) {
throw translateException(new WriteContext(streamId, currentStreamVersion, writeCondition), e);
}
final long newStreamVersion = cloudEventDocuments.get(cloudEventDocuments.size() - 1).getLong(STREAM_VERSION);
return StreamVersionDiff.of(currentStreamVersion, newStreamVersion);
}
}, transactionOptions);
return new WriteResult(streamId, streamVersionDiff.oldStreamVersion, streamVersionDiff.newStreamVersion);
}
};

return RetryStrategy.retry().retryIf(__ -> writeCondition.isAnyStreamVersion()).execute(writeEvents);
}

private List<Document> convertCloudEventsToDocuments(String streamId, Stream<CloudEvent> cloudEvents, long currentStreamVersion) {
return mapWithIndex(cloudEvents, currentStreamVersion, pair -> convertToDocument(timeRepresentation, streamId, pair.t1, pair.t2)).toList();
}

private static boolean isFulfilled(long currentStreamVersion, WriteCondition writeCondition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,56 @@ void parallel_writes_to_event_store_throws_WriteConditionNotFulfilledException()
// Then
Awaitility.await().atMost(4, SECONDS).untilAsserted(() -> assertThat(exception).hasValue(new WriteConditionNotFulfilledException("name", 1, writeCondition, "WriteCondition was not fulfilled. Expected version to be equal to 0 but was 1.")));
}

@EnabledOnOs(MAC)
@RepeatedIfExceptionsTest(repeats = 5, suspend = 200, minSuccess = 5)
void parallel_writes_to_event_store_does_not_throw_WriteConditionNotFulfilledException_when_write_condition_is_any() {
// Given
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
WriteCondition writeCondition = WriteCondition.anyStreamVersion();
AtomicReference<Throwable> exception = new AtomicReference<>();

// When
new Thread(() -> {
NameWasChanged event = new NameWasChanged(UUID.randomUUID().toString(), now, "name", "Ikk Doe");
try {
await(cyclicBarrier);
persist("name", writeCondition, event);
} catch (Exception e) {
exception.set(e);
}
}).start();

new Thread(() -> {
NameWasChanged event = new NameWasChanged(UUID.randomUUID().toString(), now, "name", "Ikkster Doe");
try {
await(cyclicBarrier);
persist("name", writeCondition, event);
} catch (Exception e) {
exception.set(e);
}
}).start();

new Thread(() -> {
NameWasChanged event = new NameWasChanged(UUID.randomUUID().toString(), now, "name", "Ikkster Doe2");
try {
await(cyclicBarrier);
persist("name", writeCondition, event);
} catch (Exception e) {
exception.set(e);
}
}).start();

// Then
Awaitility.await().atMost(4, SECONDS).untilAsserted(() -> {
EventStream<CloudEvent> eventStream = eventStore.read("name");
assertThat(deserialize(eventStream.events()))
.extracting(it -> (NameWasChanged) it)
.extracting(NameWasChanged::name)
.contains("Ikk Doe", "Ikkster Doe", "Ikkster Doe2");
});
assertThat(exception.get()).isNull();
}
}

@Nested
Expand Down
5 changes: 5 additions & 0 deletions eventstore/mongodb/spring/blocking/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
<artifactId>mongodb-spring-sort-conversion</artifactId>
<version>0.19.6-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>retry</artifactId>
<version>0.19.6-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.occurrent.filter.Filter;
import org.occurrent.mongodb.spring.filterqueryconversion.internal.FilterConverter;
import org.occurrent.mongodb.timerepresentation.TimeRepresentation;
import org.occurrent.retry.RetryStrategy;
import org.springframework.dao.DataAccessException;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
Expand Down Expand Up @@ -147,17 +148,9 @@ public WriteResult write(String streamId, WriteCondition writeCondition, Stream<
return new StreamVersionDiff(currentStreamVersion, newStreamVersion);
};

StreamVersionDiff streamVersion;
try {
streamVersion = transactionTemplate.execute(writeLogic);
} catch (WriteConditionNotFulfilledException e) {
if (writeCondition.isAnyStreamVersion()) {
// See comments on "convertCloudEventsToDocuments" above
streamVersion = transactionTemplate.execute(writeLogic);
} else {
throw e;
}
}
StreamVersionDiff streamVersion = RetryStrategy.retry()
.retryIf(__ -> writeCondition.isAnyStreamVersion())
.execute(() -> transactionTemplate.execute(writeLogic));
return new WriteResult(streamId, streamVersion.oldStreamVersion, streamVersion.newStreamVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ void parallel_writes_to_event_store_throws_WriteConditionNotFulfilledException_w
@RepeatedIfExceptionsTest(repeats = 5, suspend = 200, minSuccess = 5)
void parallel_writes_to_event_store_does_not_throw_WriteConditionNotFulfilledException_when_write_condition_is_any() {
// Given
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
WriteCondition writeCondition = WriteCondition.anyStreamVersion();
AtomicReference<Throwable> exception = new AtomicReference<>();

Expand All @@ -783,13 +783,23 @@ void parallel_writes_to_event_store_does_not_throw_WriteConditionNotFulfilledExc
}
}).start();

new Thread(() -> {
NameWasChanged event = new NameWasChanged(UUID.randomUUID().toString(), now, "name", "Ikkster Doe2");
try {
await(cyclicBarrier);
persist("name", writeCondition, event);
} catch (Exception e) {
exception.set(e);
}
}).start();

// Then
Awaitility.await().atMost(4, SECONDS).untilAsserted(() -> {
EventStream<CloudEvent> eventStream = eventStore.read("name");
assertThat(deserialize(eventStream.events()))
.extracting(it -> (NameWasChanged) it)
.extracting(NameWasChanged::name)
.contains("Ikk Doe", "Ikkster Doe");
.contains("Ikk Doe", "Ikkster Doe", "Ikkster Doe2");
});
assertThat(exception.get()).isNull();
}
Expand Down

0 comments on commit fe99e0b

Please sign in to comment.