From d7ba055e05af860645caee429bfd328e9c3b6fb6 Mon Sep 17 00:00:00 2001 From: Johan Haleby Date: Fri, 16 Feb 2024 15:25:24 +0100 Subject: [PATCH] Use time-based subscription position in CatchupSubscriptionModel if it's not possible to get the global position from MongoDB --- .../durable/catchup/CatchupSubscriptionModel.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/subscription/util/blocking/catchup-subscription/src/main/java/org/occurrent/subscription/blocking/durable/catchup/CatchupSubscriptionModel.java b/subscription/util/blocking/catchup-subscription/src/main/java/org/occurrent/subscription/blocking/durable/catchup/CatchupSubscriptionModel.java index 4d876fd3a..1c17866f4 100644 --- a/subscription/util/blocking/catchup-subscription/src/main/java/org/occurrent/subscription/blocking/durable/catchup/CatchupSubscriptionModel.java +++ b/subscription/util/blocking/catchup-subscription/src/main/java/org/occurrent/subscription/blocking/durable/catchup/CatchupSubscriptionModel.java @@ -135,7 +135,8 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, // Here's the reason why we're forcing the wrapping subscription to be a PositionAwareBlockingSubscription. // This is in order to be 100% safe since we need to take events that are published meanwhile the EventStoreQuery // is executed. Thus, we need the global position of the subscription at the time of starting the query. - SubscriptionPosition globalSubscriptionPosition = subscriptionModel.globalSubscriptionPosition(); + SubscriptionPosition globalSubscriptionPositionFromSubscriptionModel = subscriptionModel.globalSubscriptionPosition(); + SubscriptionPosition globalSubscriptionPosition = Objects.requireNonNullElseGet(globalSubscriptionPositionFromSubscriptionModel, () -> new TimeBasedSubscriptionPosition(OffsetDateTime.now())); FixedSizeCache cache = new FixedSizeCache(config.cacheSize); final Stream stream; @@ -180,14 +181,11 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, // (i.e. written by the catch-up subscription), we save the globalSubscriptionPosition. // The reason that we need to write the time-based subscription position in this case // is that the wrapped subscription might not support time-based subscriptions. - if ((position == null || isTimeBasedSubscriptionPosition(position)) && globalSubscriptionPosition != null) { + if (position == null || isTimeBasedSubscriptionPosition(position)) { position = cfg.storage().save(subscriptionId, globalSubscriptionPosition); - } else if (position == null) { - // Position can still be null here if globalSubscriptionPosition is null, if so, we start at the "subscriptionModelDefault" - return StartAt.subscriptionModelDefault(); } return StartAt.subscriptionPosition(position); - }).orElse(() -> globalSubscriptionPosition == null ? StartAt.subscriptionModelDefault() : StartAt.subscriptionPosition(globalSubscriptionPosition))); + }).orElse(() -> StartAt.subscriptionPosition(globalSubscriptionPosition))); final Subscription subscription; if (subscriptionsWasCancelledOrShutdown) {