From bc1b97df96aa8ca0ac33f583d9bca98ab8968947 Mon Sep 17 00:00:00 2001 From: Johan Haleby Date: Thu, 18 Jan 2024 08:41:17 +0100 Subject: [PATCH] Added much more debug logging to CompetingConsumerSubscriptionModel --- ...LeaseCompetingConsumerStrategySupport.java | 4 +- .../CompetingConsumerSubscriptionModel.java | 47 +++++++++++++++++-- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/subscription/mongodb/common/blocking/competing-consumer-strategy/src/main/java/org/occurrent/subscription/mongodb/blocking/ccs/internal/MongoLeaseCompetingConsumerStrategySupport.java b/subscription/mongodb/common/blocking/competing-consumer-strategy/src/main/java/org/occurrent/subscription/mongodb/blocking/ccs/internal/MongoLeaseCompetingConsumerStrategySupport.java index a4299955a..61c6916b8 100644 --- a/subscription/mongodb/common/blocking/competing-consumer-strategy/src/main/java/org/occurrent/subscription/mongodb/blocking/ccs/internal/MongoLeaseCompetingConsumerStrategySupport.java +++ b/subscription/mongodb/common/blocking/competing-consumer-strategy/src/main/java/org/occurrent/subscription/mongodb/blocking/ccs/internal/MongoLeaseCompetingConsumerStrategySupport.java @@ -155,6 +155,8 @@ private enum Status { } private static void logDebug(String message, Object... params) { - log.debug(message, params); + if (log.isDebugEnabled()) { + log.debug(message, params); + } } } \ No newline at end of file diff --git a/subscription/util/blocking/competing-consumer-subscription/src/main/java/org/occurrent/subscription/blocking/competingconsumers/CompetingConsumerSubscriptionModel.java b/subscription/util/blocking/competing-consumer-subscription/src/main/java/org/occurrent/subscription/blocking/competingconsumers/CompetingConsumerSubscriptionModel.java index fd018b28f..1fa35f132 100644 --- a/subscription/util/blocking/competing-consumer-subscription/src/main/java/org/occurrent/subscription/blocking/competingconsumers/CompetingConsumerSubscriptionModel.java +++ b/subscription/util/blocking/competing-consumer-subscription/src/main/java/org/occurrent/subscription/blocking/competingconsumers/CompetingConsumerSubscriptionModel.java @@ -6,6 +6,8 @@ import org.occurrent.subscription.SubscriptionFilter; import org.occurrent.subscription.api.blocking.*; import org.occurrent.subscription.api.blocking.CompetingConsumerStrategy.CompetingConsumerListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Objects; import java.util.Optional; @@ -45,6 +47,7 @@ * If the above code is executed on multiple nodes/processes, then only one subscriber will receive events. */ public class CompetingConsumerSubscriptionModel implements DelegatingSubscriptionModel, SubscriptionModel, SubscriptionModelLifeCycle, CompetingConsumerListener { + private static final Logger log = LoggerFactory.getLogger(CompetingConsumerSubscriptionModel.class); private final SubscriptionModel delegate; private final CompetingConsumerStrategy competingConsumerStrategy; @@ -72,14 +75,19 @@ public Subscription subscribe(String subscriberId, String subscriptionId, Subscr Objects.requireNonNull(subscriberId, "SubscriberId cannot be null"); Objects.requireNonNull(subscriptionId, "SubscriptionId cannot be null"); + logDebug("Starting CompetingConsumer subscription (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); + SubscriptionIdAndSubscriberId subscriptionIdAndSubscriberId = SubscriptionIdAndSubscriberId.from(subscriptionId, subscriberId); final CompetingConsumerSubscription competingConsumerSubscription; if (competingConsumerStrategy.registerCompetingConsumer(subscriptionId, subscriberId)) { + logDebug("Successfully registered CompetingConsumer subscription (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); Subscription subscription = delegate.subscribe(subscriptionId, filter, startAt, action); competingConsumerSubscription = new CompetingConsumerSubscription(subscriptionId, subscriberId, subscription); competingConsumers.put(subscriptionIdAndSubscriberId, new CompetingConsumer(subscriptionIdAndSubscriberId, new CompetingConsumerState.Running())); } else { + logDebug("CompetingConsumer already registered, overriding to Waiting (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); competingConsumers.put(subscriptionIdAndSubscriberId, new CompetingConsumer(subscriptionIdAndSubscriberId, new CompetingConsumerState.Waiting(() -> { + logDebug("Starting delegated CompetingConsumer subscription after waiting (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); if (!delegate.isRunning()) { delegate.start(); } @@ -103,6 +111,7 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, */ @Override public synchronized void cancelSubscription(String subscriptionId) { + logDebug("Cancelling CompetingConsumer subscription (subscriptionId={})", subscriptionId); delegate.cancelSubscription(subscriptionId); findFirstCompetingConsumerMatching(cc -> cc.hasSubscriptionId(subscriptionId)) .ifPresent(cc -> unregisterCompetingConsumer(cc, __ -> competingConsumers.remove(cc.subscriptionIdAndSubscriberId))); @@ -113,12 +122,16 @@ public synchronized void cancelSubscription(String subscriptionId) { */ @Override public synchronized void stop() { + logDebug("Stopping CompetingConsumer subscription model"); if (!isRunning()) { return; } delegate.stop(); - unregisterAllCompetingConsumers(cc -> competingConsumers.put(cc.subscriptionIdAndSubscriberId, cc.registerPaused())); + unregisterAllCompetingConsumers(cc -> { + logDebug("Stopped CompetingConsumer subscription (subscriberId={}, subscriptionId={})", cc.getSubscriberId(), cc.getSubscriptionId()); + competingConsumers.put(cc.subscriptionIdAndSubscriberId, cc.registerPaused()); + }); } /** @@ -126,6 +139,7 @@ public synchronized void stop() { */ @Override public synchronized void start() { + logDebug("Starting CompetingConsumer subscription model"); if (isRunning()) { throw new IllegalStateException(CompetingConsumerSubscriptionModel.class.getSimpleName() + " is already started"); } @@ -137,6 +151,7 @@ public synchronized void start() { competingConsumers.values().stream() .filter(not(CompetingConsumer::isRunning)) .forEach(cc -> { + logDebug("Starting CompetingConsumer subscription (subscriberId={}, subscriptionId={}, state={})", cc.getSubscriberId(), cc.getSubscriptionId(), cc.state.getClass().getSimpleName()); // Only change state if we have permission to consume if (cc.isWaiting()) { // Registering a competing consumer will start the subscription automatically if lock was acquired @@ -177,6 +192,7 @@ public boolean isPaused(String subscriptionId) { */ @Override public synchronized Subscription resumeSubscription(String subscriptionId) { + logDebug("Trying to resume CompetingConsumer subscription (subscriptionId={})", subscriptionId); if (isRunning(subscriptionId)) { throw new IllegalArgumentException("Subscription " + subscriptionId + " is not paused"); } @@ -184,7 +200,9 @@ public synchronized Subscription resumeSubscription(String subscriptionId) { .map(competingConsumer -> { final Subscription subscription; String subscriberId = competingConsumer.getSubscriberId(); - if (hasLock(subscriptionId, subscriberId)) { + boolean hasLock = hasLock(subscriptionId, subscriberId); + logDebug("Resuming CompetingConsumer (subscriberId={}, subscriptionId={}, state={}, hasLock={})", subscriberId, subscriptionId, competingConsumer.state.getClass().getSimpleName(), hasLock); + if (hasLock) { if (competingConsumer.isWaiting()) { subscription = startWaitingConsumer(competingConsumer); } else { @@ -210,6 +228,7 @@ public synchronized Subscription resumeSubscription(String subscriptionId) { */ @Override public synchronized void pauseSubscription(String subscriptionId) { + logDebug("Trying to pause CompetingConsumer subscription (subscriptionId={})", subscriptionId); if (isPaused(subscriptionId)) { throw new IllegalArgumentException("Subscription " + subscriptionId + " is already paused"); } @@ -236,6 +255,7 @@ public SubscriptionModel getDelegatedSubscriptionModel() { @PreDestroy @Override public synchronized void shutdown() { + logDebug("Trying to resume CompetingConsumer subscription model"); delegate.shutdown(); unregisterAllCompetingConsumers(cc -> competingConsumers.remove(cc.subscriptionIdAndSubscriberId)); competingConsumerStrategy.removeListener(this); @@ -244,8 +264,10 @@ public synchronized void shutdown() { @Override public synchronized void onConsumeGranted(String subscriptionId, String subscriberId) { + logDebug("Consumption granted to CompetingConsumer (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); CompetingConsumer competingConsumer = competingConsumers.get(SubscriptionIdAndSubscriberId.from(subscriptionId, subscriberId)); if (competingConsumer == null) { + logDebug("Failed to find CompetingConsumer, returning (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); return; } @@ -253,7 +275,9 @@ public synchronized void onConsumeGranted(String subscriptionId, String subscrib startWaitingConsumer(competingConsumer); } else if (competingConsumer.isPaused()) { CompetingConsumerState.Paused state = (CompetingConsumerState.Paused) competingConsumer.state; - if (!state.pausedByUser) { + if (state.pausedByUser) { + logDebug("Won't resume CompetingConsumer, because it was paused by user (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); + } else { resumeSubscription(subscriptionId); } } @@ -261,28 +285,36 @@ public synchronized void onConsumeGranted(String subscriptionId, String subscrib @Override public synchronized void onConsumeProhibited(String subscriptionId, String subscriberId) { + logDebug("Consumption granted to CompetingConsumer (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); SubscriptionIdAndSubscriberId subscriptionIdAndSubscriberId = SubscriptionIdAndSubscriberId.from(subscriptionId, subscriberId); CompetingConsumer competingConsumer = competingConsumers.get(subscriptionIdAndSubscriberId); if (competingConsumer == null) { + logDebug("CompetingConsumer couldn't be found onConsumeProhibited (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); return; } if (competingConsumer.isRunning()) { + logDebug("CompetingConsumer is running, will pause subscription and consumers (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); pauseSubscription(subscriptionId); pauseConsumer(competingConsumer, false); } else if (competingConsumer.isPaused()) { + logDebug("CompetingConsumer is paused (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); CompetingConsumerState.Paused paused = (CompetingConsumerState.Paused) competingConsumer.state; pauseConsumer(competingConsumer, paused.pausedByUser); + } else { + logDebug("CompetingConsumer is neither running nor paused, won't do anything (subscriberId={}, subscriptionId={}, state={})", subscriberId, subscriptionId, competingConsumer.state.getClass().getSimpleName()); } } private Subscription startWaitingConsumer(CompetingConsumer cc) { + logDebug("Start CompetingConsumer that has previously been waiting (subscriberId={}, subscriptionId={})", cc.getSubscriberId(), cc.getSubscriptionId()); String subscriptionId = cc.getSubscriptionId(); competingConsumers.put(SubscriptionIdAndSubscriberId.from(subscriptionId, cc.getSubscriberId()), cc.registerRunning()); return ((CompetingConsumerState.Waiting) cc.state).startSubscription(); } private void pauseConsumer(CompetingConsumer cc, boolean pausedByUser) { + logDebug("Pausing CompetingConsumer (subscriberId={}, subscriptionId={}, pausedByUser={})", cc.getSubscriberId(), cc.getSubscriptionId(), pausedByUser); SubscriptionIdAndSubscriberId subscriptionIdAndSubscriberId = SubscriptionIdAndSubscriberId.from(cc.getSubscriptionId(), cc.getSubscriberId()); competingConsumers.put(subscriptionIdAndSubscriberId, cc.registerPaused(pausedByUser)); } @@ -389,6 +421,7 @@ public boolean hasPermissionToConsume() { } private void unregisterAllCompetingConsumers(Consumer andDo) { + logDebug("Unregistering all CompetingConsumer's"); unregisterCompetingConsumersMatching(CompetingConsumer::isRunning, andDo); } @@ -397,11 +430,13 @@ private void unregisterCompetingConsumersMatching(Predicate p } private synchronized void unregisterCompetingConsumer(CompetingConsumer cc, Consumer and) { + logDebug("Unregistering CompetingConsumer (subscriberId={}, subscriptionId={})", cc.getSubscriberId(), cc.getSubscriptionId()); and.accept(cc); competingConsumerStrategy.unregisterCompetingConsumer(cc.getSubscriptionId(), cc.getSubscriberId()); } private boolean registerCompetingConsumer(String subscriptionId, String subscriberId) { + logDebug("Registering CompetingConsumer (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId); return competingConsumerStrategy.registerCompetingConsumer(subscriptionId, subscriberId); } @@ -416,4 +451,10 @@ private Optional findFirstCompetingConsumerMatching(Predicate private Stream findCompetingConsumersMatching(Predicate predicate) { return competingConsumers.values().stream().filter(predicate); } + + private static void logDebug(String message, Object... params) { + if (log.isDebugEnabled()) { + log.debug(message, params); + } + } } \ No newline at end of file