Skip to content

Commit

Permalink
Fixed a rare ConcurrentModificationException issue in SpringMongoSubs…
Browse files Browse the repository at this point in the history
…criptionModel if the subscription model is shutdown while it's restarting
  • Loading branch information
johanhaleby committed Jan 19, 2024
1 parent 903c1af commit 095f39d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Domain event subscriptions now accepts metadata as the first parameter, besides just the event. The metadata currently contains the stream version and stream id, which can be useful when building projections.
* Fixed a bug in SpringMongoSubscriptionModel in which it didn't restart correctly on non DataAccessException's
* Introducing Decider support (experimental)
* Fixed a rare ConcurrentModificationException issue in SpringMongoSubscriptionModel if the subscription model is shutdown while it's restarting
* Upgraded from Kotlin 1.9.20 to 1.9.22
* Upgraded amqp-client from 5.16.0 to 5.20.0
* Upgraded Spring Boot from 3.1.4 to 3.2.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ void blocking_spring_subscription_allows_resuming_events_from_where_it_left_when

// When
mongoEventStore.write("1", 0, serialize(nameDefined1));
// The subscription is async so we need to wait for it
// The subscription is async, so we need to wait for it
await().atMost(ONE_SECOND).and().dontCatchUncaughtExceptions().untilAtomic(counter, equalTo(1));
// Since an exception occurred we need to run the stream again, but first we need to close the old subscription
subscriptionModel.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,13 @@ private org.springframework.data.mongodb.core.messaging.Subscription registerNew
Throwable cause = throwable.getCause();
if (cause instanceof MongoQueryException) {
log.warn("Caught {} ({}) for subscription {}, will restart!", MongoQueryException.class.getSimpleName(), cause.getMessage(), subscriptionId, throwable);
restartInternalSubscription(subscriptionId, StartAt.subscriptionModelDefault());
restartInternalSubscriptionInNewThread(subscriptionId, StartAt.subscriptionModelDefault());
} else if (cause instanceof MongoCommandException && ((MongoCommandException) cause).getErrorCode() == CHANGE_STREAM_HISTORY_LOST_ERROR_CODE) {
String restartMessage = restartSubscriptionsOnChangeStreamHistoryLost ? "will restart subscription from current time." :
"will not restart subscription! Consider remove the subscription from the durable storage or use a catch-up subscription to get up to speed if needed.";
if (restartSubscriptionsOnChangeStreamHistoryLost) {
log.warn("There was not enough oplog to resume subscription {}, {}", subscriptionId, restartMessage, throwable);
restartInternalSubscription(subscriptionId, StartAt.now());
restartInternalSubscriptionInNewThread(subscriptionId, StartAt.now());
} else {
log.error("There was not enough oplog to resume subscription {}, {}", subscriptionId, restartMessage, throwable);
}
Expand All @@ -326,34 +326,38 @@ private org.springframework.data.mongodb.core.messaging.Subscription registerNew
}
} else {
log.error("Error caught for subscription {}: {} {}. Will restart!", subscriptionId, cause.getClass().getName(), cause.getMessage(), throwable);
restartInternalSubscription(subscriptionId, StartAt.subscriptionModelDefault());
restartInternalSubscriptionInNewThread(subscriptionId, StartAt.subscriptionModelDefault());
}
} else if (isCursorNoLongerOpen(throwable)) {
if (log.isDebugEnabled()) {
log.debug("Cursor is no longer open for subscription {}, this may happen if you pause a subscription very soon after subscribing.", subscriptionId, throwable);
}
} else {
log.error("An error occurred for subscription {}, will restart", subscriptionId, throwable);
restartInternalSubscription(subscriptionId, StartAt.subscriptionModelDefault());
restartInternalSubscriptionInNewThread(subscriptionId, StartAt.subscriptionModelDefault());
}
});
}

private void restartInternalSubscription(String subscriptionId, StartAt startAt) {
private void restartInternalSubscriptionInNewThread(String subscriptionId, StartAt startAt) {
InternalSubscription internalSubscription = runningSubscriptions.get(subscriptionId);
if (internalSubscription != null) {
new Thread(() -> {
// We restart from now!!
org.springframework.data.mongodb.core.messaging.Subscription oldSpringSubscription = internalSubscription.getSpringSubscription();
ChangeStreamRequest<Document> newChangeStreamRequestFromNow = internalSubscription.newChangeStreamRequest(startAt);
org.springframework.data.mongodb.core.messaging.Subscription newSpringSubscription = registerNewSpringSubscription(subscriptionId, newChangeStreamRequestFromNow);
internalSubscription.occurrentSubscription.changeSubscription(newSpringSubscription);
messageListenerContainer.remove(oldSpringSubscription);
log.info("Subscription {} successfully restarted", subscriptionId);
}).start();
new Thread(() -> restartSubscriptionInThisThread(subscriptionId, startAt, internalSubscription)).start();
}
}

// This method is synchronized to avoid ConcurrentModificationException is the subscription model is shutdown
// at the same time as restarting the subscription.
private synchronized void restartSubscriptionInThisThread(String subscriptionId, StartAt startAt, InternalSubscription internalSubscription) {
// We restart from now!!
org.springframework.data.mongodb.core.messaging.Subscription oldSpringSubscription = internalSubscription.getSpringSubscription();
ChangeStreamRequest<Document> newChangeStreamRequestFromNow = internalSubscription.newChangeStreamRequest(startAt);
org.springframework.data.mongodb.core.messaging.Subscription newSpringSubscription = registerNewSpringSubscription(subscriptionId, newChangeStreamRequestFromNow);
internalSubscription.occurrentSubscription.changeSubscription(newSpringSubscription);
messageListenerContainer.remove(oldSpringSubscription);
log.info("Subscription {} successfully restarted", subscriptionId);
}

private static boolean isCursorNoLongerOpen(Throwable throwable) {
return throwable instanceof IllegalStateException && throwable.getMessage().startsWith("Cursor") && throwable.getMessage().endsWith("is not longer open.");
}
Expand Down

0 comments on commit 095f39d

Please sign in to comment.