Skip to content

Commit

Permalink
Added much more debug logging to CompetingConsumerSubscriptionModel
Browse files Browse the repository at this point in the history
  • Loading branch information
johanhaleby committed Jan 18, 2024
1 parent 7db40d6 commit bc1b97d
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ private enum Status {
}

private static void logDebug(String message, Object... params) {
log.debug(message, params);
if (log.isDebugEnabled()) {
log.debug(message, params);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.occurrent.subscription.SubscriptionFilter;
import org.occurrent.subscription.api.blocking.*;
import org.occurrent.subscription.api.blocking.CompetingConsumerStrategy.CompetingConsumerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -45,6 +47,7 @@
* If the above code is executed on multiple nodes/processes, then only <i>one</i> subscriber will receive events.
*/
public class CompetingConsumerSubscriptionModel implements DelegatingSubscriptionModel, SubscriptionModel, SubscriptionModelLifeCycle, CompetingConsumerListener {
private static final Logger log = LoggerFactory.getLogger(CompetingConsumerSubscriptionModel.class);

private final SubscriptionModel delegate;
private final CompetingConsumerStrategy competingConsumerStrategy;
Expand Down Expand Up @@ -72,14 +75,19 @@ public Subscription subscribe(String subscriberId, String subscriptionId, Subscr
Objects.requireNonNull(subscriberId, "SubscriberId cannot be null");
Objects.requireNonNull(subscriptionId, "SubscriptionId cannot be null");

logDebug("Starting CompetingConsumer subscription (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);

SubscriptionIdAndSubscriberId subscriptionIdAndSubscriberId = SubscriptionIdAndSubscriberId.from(subscriptionId, subscriberId);
final CompetingConsumerSubscription competingConsumerSubscription;
if (competingConsumerStrategy.registerCompetingConsumer(subscriptionId, subscriberId)) {
logDebug("Successfully registered CompetingConsumer subscription (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);
Subscription subscription = delegate.subscribe(subscriptionId, filter, startAt, action);
competingConsumerSubscription = new CompetingConsumerSubscription(subscriptionId, subscriberId, subscription);
competingConsumers.put(subscriptionIdAndSubscriberId, new CompetingConsumer(subscriptionIdAndSubscriberId, new CompetingConsumerState.Running()));
} else {
logDebug("CompetingConsumer already registered, overriding to Waiting (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);
competingConsumers.put(subscriptionIdAndSubscriberId, new CompetingConsumer(subscriptionIdAndSubscriberId, new CompetingConsumerState.Waiting(() -> {
logDebug("Starting delegated CompetingConsumer subscription after waiting (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);
if (!delegate.isRunning()) {
delegate.start();
}
Expand All @@ -103,6 +111,7 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter,
*/
@Override
public synchronized void cancelSubscription(String subscriptionId) {
logDebug("Cancelling CompetingConsumer subscription (subscriptionId={})", subscriptionId);
delegate.cancelSubscription(subscriptionId);
findFirstCompetingConsumerMatching(cc -> cc.hasSubscriptionId(subscriptionId))
.ifPresent(cc -> unregisterCompetingConsumer(cc, __ -> competingConsumers.remove(cc.subscriptionIdAndSubscriberId)));
Expand All @@ -113,19 +122,24 @@ public synchronized void cancelSubscription(String subscriptionId) {
*/
@Override
public synchronized void stop() {
logDebug("Stopping CompetingConsumer subscription model");
if (!isRunning()) {
return;
}

delegate.stop();
unregisterAllCompetingConsumers(cc -> competingConsumers.put(cc.subscriptionIdAndSubscriberId, cc.registerPaused()));
unregisterAllCompetingConsumers(cc -> {
logDebug("Stopped CompetingConsumer subscription (subscriberId={}, subscriptionId={})", cc.getSubscriberId(), cc.getSubscriptionId());
competingConsumers.put(cc.subscriptionIdAndSubscriberId, cc.registerPaused());
});
}

/**
* @see SubscriptionModelLifeCycle#start()
*/
@Override
public synchronized void start() {
logDebug("Starting CompetingConsumer subscription model");
if (isRunning()) {
throw new IllegalStateException(CompetingConsumerSubscriptionModel.class.getSimpleName() + " is already started");
}
Expand All @@ -137,6 +151,7 @@ public synchronized void start() {
competingConsumers.values().stream()
.filter(not(CompetingConsumer::isRunning))
.forEach(cc -> {
logDebug("Starting CompetingConsumer subscription (subscriberId={}, subscriptionId={}, state={})", cc.getSubscriberId(), cc.getSubscriptionId(), cc.state.getClass().getSimpleName());
// Only change state if we have permission to consume
if (cc.isWaiting()) {
// Registering a competing consumer will start the subscription automatically if lock was acquired
Expand Down Expand Up @@ -177,14 +192,17 @@ public boolean isPaused(String subscriptionId) {
*/
@Override
public synchronized Subscription resumeSubscription(String subscriptionId) {
logDebug("Trying to resume CompetingConsumer subscription (subscriptionId={})", subscriptionId);
if (isRunning(subscriptionId)) {
throw new IllegalArgumentException("Subscription " + subscriptionId + " is not paused");
}
return findFirstCompetingConsumerMatching(competingConsumer -> competingConsumer.hasSubscriptionId(subscriptionId))
.map(competingConsumer -> {
final Subscription subscription;
String subscriberId = competingConsumer.getSubscriberId();
if (hasLock(subscriptionId, subscriberId)) {
boolean hasLock = hasLock(subscriptionId, subscriberId);
logDebug("Resuming CompetingConsumer (subscriberId={}, subscriptionId={}, state={}, hasLock={})", subscriberId, subscriptionId, competingConsumer.state.getClass().getSimpleName(), hasLock);
if (hasLock) {
if (competingConsumer.isWaiting()) {
subscription = startWaitingConsumer(competingConsumer);
} else {
Expand All @@ -210,6 +228,7 @@ public synchronized Subscription resumeSubscription(String subscriptionId) {
*/
@Override
public synchronized void pauseSubscription(String subscriptionId) {
logDebug("Trying to pause CompetingConsumer subscription (subscriptionId={})", subscriptionId);
if (isPaused(subscriptionId)) {
throw new IllegalArgumentException("Subscription " + subscriptionId + " is already paused");
}
Expand All @@ -236,6 +255,7 @@ public SubscriptionModel getDelegatedSubscriptionModel() {
@PreDestroy
@Override
public synchronized void shutdown() {
logDebug("Trying to resume CompetingConsumer subscription model");
delegate.shutdown();
unregisterAllCompetingConsumers(cc -> competingConsumers.remove(cc.subscriptionIdAndSubscriberId));
competingConsumerStrategy.removeListener(this);
Expand All @@ -244,45 +264,57 @@ public synchronized void shutdown() {

@Override
public synchronized void onConsumeGranted(String subscriptionId, String subscriberId) {
logDebug("Consumption granted to CompetingConsumer (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);
CompetingConsumer competingConsumer = competingConsumers.get(SubscriptionIdAndSubscriberId.from(subscriptionId, subscriberId));
if (competingConsumer == null) {
logDebug("Failed to find CompetingConsumer, returning (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);
return;
}

if (competingConsumer.isWaiting()) {
startWaitingConsumer(competingConsumer);
} else if (competingConsumer.isPaused()) {
CompetingConsumerState.Paused state = (CompetingConsumerState.Paused) competingConsumer.state;
if (!state.pausedByUser) {
if (state.pausedByUser) {
logDebug("Won't resume CompetingConsumer, because it was paused by user (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);
} else {
resumeSubscription(subscriptionId);
}
}
}

@Override
public synchronized void onConsumeProhibited(String subscriptionId, String subscriberId) {
logDebug("Consumption granted to CompetingConsumer (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);
SubscriptionIdAndSubscriberId subscriptionIdAndSubscriberId = SubscriptionIdAndSubscriberId.from(subscriptionId, subscriberId);
CompetingConsumer competingConsumer = competingConsumers.get(subscriptionIdAndSubscriberId);
if (competingConsumer == null) {
logDebug("CompetingConsumer couldn't be found onConsumeProhibited (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);
return;
}

if (competingConsumer.isRunning()) {
logDebug("CompetingConsumer is running, will pause subscription and consumers (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);
pauseSubscription(subscriptionId);
pauseConsumer(competingConsumer, false);
} else if (competingConsumer.isPaused()) {
logDebug("CompetingConsumer is paused (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);
CompetingConsumerState.Paused paused = (CompetingConsumerState.Paused) competingConsumer.state;
pauseConsumer(competingConsumer, paused.pausedByUser);
} else {
logDebug("CompetingConsumer is neither running nor paused, won't do anything (subscriberId={}, subscriptionId={}, state={})", subscriberId, subscriptionId, competingConsumer.state.getClass().getSimpleName());
}
}

private Subscription startWaitingConsumer(CompetingConsumer cc) {
logDebug("Start CompetingConsumer that has previously been waiting (subscriberId={}, subscriptionId={})", cc.getSubscriberId(), cc.getSubscriptionId());
String subscriptionId = cc.getSubscriptionId();
competingConsumers.put(SubscriptionIdAndSubscriberId.from(subscriptionId, cc.getSubscriberId()), cc.registerRunning());
return ((CompetingConsumerState.Waiting) cc.state).startSubscription();
}

private void pauseConsumer(CompetingConsumer cc, boolean pausedByUser) {
logDebug("Pausing CompetingConsumer (subscriberId={}, subscriptionId={}, pausedByUser={})", cc.getSubscriberId(), cc.getSubscriptionId(), pausedByUser);
SubscriptionIdAndSubscriberId subscriptionIdAndSubscriberId = SubscriptionIdAndSubscriberId.from(cc.getSubscriptionId(), cc.getSubscriberId());
competingConsumers.put(subscriptionIdAndSubscriberId, cc.registerPaused(pausedByUser));
}
Expand Down Expand Up @@ -389,6 +421,7 @@ public boolean hasPermissionToConsume() {
}

private void unregisterAllCompetingConsumers(Consumer<CompetingConsumer> andDo) {
logDebug("Unregistering all CompetingConsumer's");
unregisterCompetingConsumersMatching(CompetingConsumer::isRunning, andDo);
}

Expand All @@ -397,11 +430,13 @@ private void unregisterCompetingConsumersMatching(Predicate<CompetingConsumer> p
}

private synchronized void unregisterCompetingConsumer(CompetingConsumer cc, Consumer<CompetingConsumer> and) {
logDebug("Unregistering CompetingConsumer (subscriberId={}, subscriptionId={})", cc.getSubscriberId(), cc.getSubscriptionId());
and.accept(cc);
competingConsumerStrategy.unregisterCompetingConsumer(cc.getSubscriptionId(), cc.getSubscriberId());
}

private boolean registerCompetingConsumer(String subscriptionId, String subscriberId) {
logDebug("Registering CompetingConsumer (subscriberId={}, subscriptionId={})", subscriberId, subscriptionId);
return competingConsumerStrategy.registerCompetingConsumer(subscriptionId, subscriberId);
}

Expand All @@ -416,4 +451,10 @@ private Optional<CompetingConsumer> findFirstCompetingConsumerMatching(Predicate
private Stream<CompetingConsumer> findCompetingConsumersMatching(Predicate<CompetingConsumer> predicate) {
return competingConsumers.values().stream().filter(predicate);
}

private static void logDebug(String message, Object... params) {
if (log.isDebugEnabled()) {
log.debug(message, params);
}
}
}

0 comments on commit bc1b97d

Please sign in to comment.