Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
Fixed issue in backoff builder configuration (#3984)
Browse files Browse the repository at this point in the history
### Motivation

In #3848 there was a mismatch of the values when converting from constructor to builder. The values for `setMandatoryStop()` and `setMax()` were inverted. 

That makes a client to keep reconnecting every 100 millis, instead of doing the expected backoff.
  • Loading branch information
merlimat authored and sijie committed Apr 9, 2019
1 parent 14a5ee0 commit 6bce00b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class Backoff {
@VisibleForTesting
Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
TimeUnit unitMandatoryStop, Clock clock) {
this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock,
this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock,
Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
}
public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
Expand All @@ -65,16 +65,16 @@ public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, l

public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
TimeUnit unitMandatoryStop, long backoffIntervalMs, long maxBackoffIntervalMs) {
this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone(),
this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone(),
backoffIntervalMs, maxBackoffIntervalMs);
}

public long next() {
long current = this.next;
if (current < max) {
this.next = Math.min(this.next * 2, this.max);
}

// Check for mandatory stop
if (!mandatoryStopMade) {
long now = clock.millis();
Expand All @@ -84,14 +84,14 @@ public long next() {
} else {
timeElapsedSinceFirstBackoff = now - firstBackoffTimeInMillis;
}

if (timeElapsedSinceFirstBackoff + current > mandatoryStop) {
current = Math.max(initial, mandatoryStop - timeElapsedSinceFirstBackoff);
mandatoryStopMade = true;
}
}
// Randomly decrease the timeout up to 10% to avoid simultaneous retries

// Randomly decrease the timeout up to 10% to avoid simultaneous retries
// If current < 10 then current/10 < 1 and we get an exception from Random saying "Bound must be positive"
if (current > 10) {
current -= random.nextInt((int) current / 10);
Expand Down Expand Up @@ -119,13 +119,13 @@ long getFirstBackoffTimeInMillis() {
long backoffIntervalNanos() {
return backoffIntervalNanos;
}

@VisibleForTesting
long maxBackoffIntervalNanos() {
return maxBackoffIntervalNanos;
}
public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts,

public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts,
long defaultInterval, long maxBackoffInterval) {
long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
long currentTime = System.nanoTime();
Expand All @@ -141,13 +141,9 @@ public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial,
// if the current time is less than the time at which next retry should occur, we should backoff
return currentTime < (initialTimestampInNano + interval);
}

public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts,
return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts,
DEFAULT_INTERVAL_IN_NANOSECONDS, MAX_BACKOFF_INTERVAL_NANOSECONDS);
}

public boolean instanceShouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts, backoffIntervalNanos, maxBackoffIntervalNanos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ enum SubscriptionMode {
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode,
return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode,
startMessageId, schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
}

Expand All @@ -171,7 +171,7 @@ static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic
subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
}
}

protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
Expand Down Expand Up @@ -221,10 +221,10 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat

this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(60, TimeUnit.SECONDS)
.setMax(0, TimeUnit.MILLISECONDS)
.useUserConfiguredIntervals(backoffIntervalNanos,
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMax(60, TimeUnit.SECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.useUserConfiguredIntervals(backoffIntervalNanos,
maxBackoffIntervalNanos)
.create(),
this);
Expand Down Expand Up @@ -1471,12 +1471,12 @@ CompletableFuture<MessageId> getLastMessageIdAsync() {
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMax(0, TimeUnit.MILLISECONDS)
.useUserConfiguredIntervals(backoffIntervalNanos,
.setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.useUserConfiguredIntervals(backoffIntervalNanos,
maxBackoffIntervalNanos)
.create();

CompletableFuture<MessageId> getLastMessageIdFuture = new CompletableFuture<>();

internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(60, TimeUnit.SECONDS)
.setMax(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
.useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
.setMax(60, TimeUnit.SECONDS)
.setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
.useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
client.getConfiguration().getMaxBackoffIntervalNanos())
.create(),
this);

grabCnx();
}

Expand Down

0 comments on commit 6bce00b

Please sign in to comment.