diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java index 1cb2e1e1eaa4c..868c375531578 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java @@ -55,7 +55,7 @@ public class Backoff { @VisibleForTesting Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop, TimeUnit unitMandatoryStop, Clock clock) { - this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock, + this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS); } public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop, @@ -65,16 +65,16 @@ public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, l public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop, TimeUnit unitMandatoryStop, long backoffIntervalMs, long maxBackoffIntervalMs) { - this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone(), + this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone(), backoffIntervalMs, maxBackoffIntervalMs); } - + public long next() { long current = this.next; if (current < max) { this.next = Math.min(this.next * 2, this.max); } - + // Check for mandatory stop if (!mandatoryStopMade) { long now = clock.millis(); @@ -84,14 +84,14 @@ public long next() { } else { timeElapsedSinceFirstBackoff = now - firstBackoffTimeInMillis; } - + if (timeElapsedSinceFirstBackoff + current > mandatoryStop) { current = Math.max(initial, mandatoryStop - timeElapsedSinceFirstBackoff); mandatoryStopMade = true; } } - - // Randomly decrease the timeout up to 10% to avoid simultaneous retries + + // Randomly decrease the timeout up to 10% to avoid simultaneous retries // If current < 10 then current/10 < 1 and we get an exception from Random saying "Bound must be positive" if (current > 10) { current -= random.nextInt((int) current / 10); @@ -119,13 +119,13 @@ long getFirstBackoffTimeInMillis() { long backoffIntervalNanos() { return backoffIntervalNanos; } - + @VisibleForTesting long maxBackoffIntervalNanos() { return maxBackoffIntervalNanos; } - - public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts, + + public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts, long defaultInterval, long maxBackoffInterval) { long initialTimestampInNano = unitInitial.toNanos(initialTimestamp); long currentTime = System.nanoTime(); @@ -141,13 +141,9 @@ public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, // if the current time is less than the time at which next retry should occur, we should backoff return currentTime < (initialTimestampInNano + interval); } - + public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) { - return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts, + return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts, DEFAULT_INTERVAL_IN_NANOSECONDS, MAX_BACKOFF_INTERVAL_NANOSECONDS); } - - public boolean instanceShouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) { - return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts, backoffIntervalNanos, maxBackoffIntervalNanos); - } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 6dc96d7645a95..9b24d357cf707 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -155,7 +155,7 @@ enum SubscriptionMode { static ConsumerImpl newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, ExecutorService listenerExecutor, int partitionIndex, CompletableFuture> subscribeFuture, SubscriptionMode subscriptionMode, MessageId startMessageId, Schema schema, ConsumerInterceptors interceptors) { - return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, + return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId, schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS); } @@ -171,7 +171,7 @@ static ConsumerImpl newConsumerImpl(PulsarClientImpl client, String topic subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos); } } - + protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, ExecutorService listenerExecutor, int partitionIndex, CompletableFuture> subscribeFuture, SubscriptionMode subscriptionMode, MessageId startMessageId, Schema schema, ConsumerInterceptors interceptors, @@ -221,10 +221,10 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder() - .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMandatoryStop(60, TimeUnit.SECONDS) - .setMax(0, TimeUnit.MILLISECONDS) - .useUserConfiguredIntervals(backoffIntervalNanos, + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMax(60, TimeUnit.SECONDS) + .setMandatoryStop(0, TimeUnit.MILLISECONDS) + .useUserConfiguredIntervals(backoffIntervalNanos, maxBackoffIntervalNanos) .create(), this); @@ -1471,12 +1471,12 @@ CompletableFuture getLastMessageIdAsync() { AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); Backoff backoff = new BackoffBuilder() .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) - .setMax(0, TimeUnit.MILLISECONDS) - .useUserConfiguredIntervals(backoffIntervalNanos, + .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) + .setMandatoryStop(0, TimeUnit.MILLISECONDS) + .useUserConfiguredIntervals(backoffIntervalNanos, maxBackoffIntervalNanos) .create(); - + CompletableFuture getLastMessageIdFuture = new CompletableFuture<>(); internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index c80e529a56d6f..b4a0993b49c8c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -185,13 +185,13 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder() .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMandatoryStop(60, TimeUnit.SECONDS) - .setMax(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS) - .useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(), + .setMax(60, TimeUnit.SECONDS) + .setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS) + .useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(), client.getConfiguration().getMaxBackoffIntervalNanos()) .create(), this); - + grabCnx(); }