Skip to content

Commit

Permalink
Use time-based subscription position in CatchupSubscriptionModel if i…
Browse files Browse the repository at this point in the history
…t's not possible to get the global position from MongoDB
  • Loading branch information
johanhaleby committed Feb 16, 2024
1 parent 5b5790e commit d7ba055
Showing 1 changed file with 4 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter,
// Here's the reason why we're forcing the wrapping subscription to be a PositionAwareBlockingSubscription.
// This is in order to be 100% safe since we need to take events that are published meanwhile the EventStoreQuery
// is executed. Thus, we need the global position of the subscription at the time of starting the query.
SubscriptionPosition globalSubscriptionPosition = subscriptionModel.globalSubscriptionPosition();
SubscriptionPosition globalSubscriptionPositionFromSubscriptionModel = subscriptionModel.globalSubscriptionPosition();
SubscriptionPosition globalSubscriptionPosition = Objects.requireNonNullElseGet(globalSubscriptionPositionFromSubscriptionModel, () -> new TimeBasedSubscriptionPosition(OffsetDateTime.now()));

FixedSizeCache cache = new FixedSizeCache(config.cacheSize);
final Stream<CloudEvent> stream;
Expand Down Expand Up @@ -180,14 +181,11 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter,
// (i.e. written by the catch-up subscription), we save the globalSubscriptionPosition.
// The reason that we need to write the time-based subscription position in this case
// is that the wrapped subscription might not support time-based subscriptions.
if ((position == null || isTimeBasedSubscriptionPosition(position)) && globalSubscriptionPosition != null) {
if (position == null || isTimeBasedSubscriptionPosition(position)) {
position = cfg.storage().save(subscriptionId, globalSubscriptionPosition);
} else if (position == null) {
// Position can still be null here if globalSubscriptionPosition is null, if so, we start at the "subscriptionModelDefault"
return StartAt.subscriptionModelDefault();
}
return StartAt.subscriptionPosition(position);
}).orElse(() -> globalSubscriptionPosition == null ? StartAt.subscriptionModelDefault() : StartAt.subscriptionPosition(globalSubscriptionPosition)));
}).orElse(() -> StartAt.subscriptionPosition(globalSubscriptionPosition)));

final Subscription subscription;
if (subscriptionsWasCancelledOrShutdown) {
Expand Down

0 comments on commit d7ba055

Please sign in to comment.