From 095f39d58220686b3b955b9259654c38cc6148dd Mon Sep 17 00:00:00 2001 From: Johan Haleby Date: Fri, 19 Jan 2024 08:32:43 +0100 Subject: [PATCH] Fixed a rare ConcurrentModificationException issue in SpringMongoSubscriptionModel if the subscription model is shutdown while it's restarting --- changelog.md | 1 + ...gMongoSubscriptionPositionStorageTest.java | 2 +- .../SpringMongoSubscriptionModel.java | 32 +++++++++++-------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/changelog.md b/changelog.md index 2696da2dd..1c6b0bc5f 100644 --- a/changelog.md +++ b/changelog.md @@ -4,6 +4,7 @@ * Domain event subscriptions now accepts metadata as the first parameter, besides just the event. The metadata currently contains the stream version and stream id, which can be useful when building projections. * Fixed a bug in SpringMongoSubscriptionModel in which it didn't restart correctly on non DataAccessException's * Introducing Decider support (experimental) +* Fixed a rare ConcurrentModificationException issue in SpringMongoSubscriptionModel if the subscription model is shutdown while it's restarting * Upgraded from Kotlin 1.9.20 to 1.9.22 * Upgraded amqp-client from 5.16.0 to 5.20.0 * Upgraded Spring Boot from 3.1.4 to 3.2.1 diff --git a/subscription/mongodb/spring/blocking-position-storage/src/test/java/org/occurrent/subscription/mongodb/spring/blocking/SpringMongoSubscriptionPositionStorageTest.java b/subscription/mongodb/spring/blocking-position-storage/src/test/java/org/occurrent/subscription/mongodb/spring/blocking/SpringMongoSubscriptionPositionStorageTest.java index 5e989a979..48178dba5 100644 --- a/subscription/mongodb/spring/blocking-position-storage/src/test/java/org/occurrent/subscription/mongodb/spring/blocking/SpringMongoSubscriptionPositionStorageTest.java +++ b/subscription/mongodb/spring/blocking-position-storage/src/test/java/org/occurrent/subscription/mongodb/spring/blocking/SpringMongoSubscriptionPositionStorageTest.java @@ -320,7 +320,7 @@ void blocking_spring_subscription_allows_resuming_events_from_where_it_left_when // When mongoEventStore.write("1", 0, serialize(nameDefined1)); - // The subscription is async so we need to wait for it + // The subscription is async, so we need to wait for it await().atMost(ONE_SECOND).and().dontCatchUncaughtExceptions().untilAtomic(counter, equalTo(1)); // Since an exception occurred we need to run the stream again, but first we need to close the old subscription subscriptionModel.shutdown(); diff --git a/subscription/mongodb/spring/blocking/src/main/java/org/occurrent/subscription/mongodb/spring/blocking/SpringMongoSubscriptionModel.java b/subscription/mongodb/spring/blocking/src/main/java/org/occurrent/subscription/mongodb/spring/blocking/SpringMongoSubscriptionModel.java index aa9d739bb..3af3bc09e 100644 --- a/subscription/mongodb/spring/blocking/src/main/java/org/occurrent/subscription/mongodb/spring/blocking/SpringMongoSubscriptionModel.java +++ b/subscription/mongodb/spring/blocking/src/main/java/org/occurrent/subscription/mongodb/spring/blocking/SpringMongoSubscriptionModel.java @@ -310,13 +310,13 @@ private org.springframework.data.mongodb.core.messaging.Subscription registerNew Throwable cause = throwable.getCause(); if (cause instanceof MongoQueryException) { log.warn("Caught {} ({}) for subscription {}, will restart!", MongoQueryException.class.getSimpleName(), cause.getMessage(), subscriptionId, throwable); - restartInternalSubscription(subscriptionId, StartAt.subscriptionModelDefault()); + restartInternalSubscriptionInNewThread(subscriptionId, StartAt.subscriptionModelDefault()); } else if (cause instanceof MongoCommandException && ((MongoCommandException) cause).getErrorCode() == CHANGE_STREAM_HISTORY_LOST_ERROR_CODE) { String restartMessage = restartSubscriptionsOnChangeStreamHistoryLost ? "will restart subscription from current time." : "will not restart subscription! Consider remove the subscription from the durable storage or use a catch-up subscription to get up to speed if needed."; if (restartSubscriptionsOnChangeStreamHistoryLost) { log.warn("There was not enough oplog to resume subscription {}, {}", subscriptionId, restartMessage, throwable); - restartInternalSubscription(subscriptionId, StartAt.now()); + restartInternalSubscriptionInNewThread(subscriptionId, StartAt.now()); } else { log.error("There was not enough oplog to resume subscription {}, {}", subscriptionId, restartMessage, throwable); } @@ -326,7 +326,7 @@ private org.springframework.data.mongodb.core.messaging.Subscription registerNew } } else { log.error("Error caught for subscription {}: {} {}. Will restart!", subscriptionId, cause.getClass().getName(), cause.getMessage(), throwable); - restartInternalSubscription(subscriptionId, StartAt.subscriptionModelDefault()); + restartInternalSubscriptionInNewThread(subscriptionId, StartAt.subscriptionModelDefault()); } } else if (isCursorNoLongerOpen(throwable)) { if (log.isDebugEnabled()) { @@ -334,26 +334,30 @@ private org.springframework.data.mongodb.core.messaging.Subscription registerNew } } else { log.error("An error occurred for subscription {}, will restart", subscriptionId, throwable); - restartInternalSubscription(subscriptionId, StartAt.subscriptionModelDefault()); + restartInternalSubscriptionInNewThread(subscriptionId, StartAt.subscriptionModelDefault()); } }); } - private void restartInternalSubscription(String subscriptionId, StartAt startAt) { + private void restartInternalSubscriptionInNewThread(String subscriptionId, StartAt startAt) { InternalSubscription internalSubscription = runningSubscriptions.get(subscriptionId); if (internalSubscription != null) { - new Thread(() -> { - // We restart from now!! - org.springframework.data.mongodb.core.messaging.Subscription oldSpringSubscription = internalSubscription.getSpringSubscription(); - ChangeStreamRequest newChangeStreamRequestFromNow = internalSubscription.newChangeStreamRequest(startAt); - org.springframework.data.mongodb.core.messaging.Subscription newSpringSubscription = registerNewSpringSubscription(subscriptionId, newChangeStreamRequestFromNow); - internalSubscription.occurrentSubscription.changeSubscription(newSpringSubscription); - messageListenerContainer.remove(oldSpringSubscription); - log.info("Subscription {} successfully restarted", subscriptionId); - }).start(); + new Thread(() -> restartSubscriptionInThisThread(subscriptionId, startAt, internalSubscription)).start(); } } + // This method is synchronized to avoid ConcurrentModificationException is the subscription model is shutdown + // at the same time as restarting the subscription. + private synchronized void restartSubscriptionInThisThread(String subscriptionId, StartAt startAt, InternalSubscription internalSubscription) { + // We restart from now!! + org.springframework.data.mongodb.core.messaging.Subscription oldSpringSubscription = internalSubscription.getSpringSubscription(); + ChangeStreamRequest newChangeStreamRequestFromNow = internalSubscription.newChangeStreamRequest(startAt); + org.springframework.data.mongodb.core.messaging.Subscription newSpringSubscription = registerNewSpringSubscription(subscriptionId, newChangeStreamRequestFromNow); + internalSubscription.occurrentSubscription.changeSubscription(newSpringSubscription); + messageListenerContainer.remove(oldSpringSubscription); + log.info("Subscription {} successfully restarted", subscriptionId); + } + private static boolean isCursorNoLongerOpen(Throwable throwable) { return throwable instanceof IllegalStateException && throwable.getMessage().startsWith("Cursor") && throwable.getMessage().endsWith("is not longer open."); }