Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/**
* Copyright 2025 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.concurrency.limits;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;

/**
* A bulkhead that can limit concurrent executions with a given context.
*
* @param <ContextT> the context type to run tasks with
*/
public interface Bulkhead<ContextT> {

/**
* Executes the given {@link CompletionStage} with the given context. It is recommended to set
* a timeout on the given {@link CompletionStage} to ensure progress.
*
* @param supplier the task to run
* @param context the context to run the task with
* @param <T> the type of the {@link CompletionStage}s
* @return a new {@link CompletionStage} with the same completion result as the supplier,
* except for when this bulkhead cannot accept the task, in which case it completes
* exceptionally with a {@link RejectedExecutionException}
*/
<T> CompletionStage<T> executeCompletionStage(Supplier<? extends CompletionStage<T>> supplier, ContextT context);

/**
* A bulkhead that drains the backlog in parallel and executes its tasks in parallel (while
* limiting concurrency). This bulkhead offers additional methods to run synchronous tasks in
* calling threads (which is not recommended in a serial drainer).
*
* @param <ContextT> the context type to run tasks with
*/
interface ParallelDrainingBulkhead<ContextT> extends Bulkhead<ContextT> {

/**
* Executes the given {@link Supplier} with the given context.
*
* @param supplier the task to run
* @param context the context to run the task with
* @param <T> the type of the result
* @return a {@link CompletionStage} with a completion result set by running the supplier,
* except for when this bulkhead cannot accept the task, in which case it completes
* exceptionally with a {@link RejectedExecutionException}
*/
default <T> CompletionStage<T> executeSupplier(Supplier<T> supplier, ContextT context) {
return executeCompletionStage(() -> {
try {
return CompletableFuture.completedFuture(supplier.get());
} catch (Throwable t) {
CompletableFuture<T> failed = new CompletableFuture<>();
failed.completeExceptionally(t);
return failed;
}
}, context);
}

/**
* Executes the given {@link Runnable} with the given context.
*
* @param runnable the task to run
* @param context the context to run the task with
* @return a {@link CompletionStage} with a completion result set by running the runnable,
* except for when this bulkhead cannot accept the task, in which case it completes
* exceptionally with a {@link RejectedExecutionException}
*/
default CompletionStage<Void> execute(Runnable runnable, ContextT context) {
return executeSupplier(() -> {
runnable.run();
return null;
}, context);
}

/**
* Executes the given {@link Callable} with the given context.
*
* @param callable the task to run
* @param context the context to run the task with
* @return a {@link CompletionStage} with a completion result set by running the callable,
* except for when this bulkhead cannot accept the task, in which case it completes
* exceptionally with a {@link RejectedExecutionException}
*/
default <T> CompletionStage<T> execute(Callable<T> callable, ContextT context) {
return executeSupplier(() -> {
try {
return callable.call();
} catch (Exception e) {
throw new CompletionException(e);
}
}, context);
}
}

/**
* A bulkhead that executes its tasks without contexts. Implementations can freely exchange
* permits from {@link Limiter}s for tasks without consequences (e.g., maintaining total
* execution order), allowing for parallelism.
*/
interface GlobalBulkhead extends Bulkhead<Void> {

default <T> CompletionStage<T> executeCompletionStage(Supplier<? extends CompletionStage<T>> supplier) {
return executeCompletionStage(supplier, null);
}
}

/**
* A bulkhead that drains the backlog in parallel and executes its tasks in parallel without
* contexts.
*/
interface GlobalParallelDrainingBulkhead extends GlobalBulkhead, ParallelDrainingBulkhead<Void> {

default <T> CompletionStage<T> executeSupplier(Supplier<T> supplier) {
return executeSupplier(supplier, null);
}

default CompletionStage<Void> execute(Runnable runnable) {
return execute(runnable, null);
}

default <T> CompletionStage<T> execute(Callable<T> callable) {
return execute(callable, null);
}

}

/**
* A bulkhead that executes its tasks with contexts that have low cardinality (i.e., few
* distinct values). This allows for maintaining execution order by exchanging permits from
* {@link Limiter}s across tasks with equal contexts. Implementations can take advantage of
* this, e.g., by maintaining a partial FIFO order of task execution and parallize draining
* backlogs. Crucially, contexts should have proper {@link Object#equals(Object)} and
* {@link Object#hashCode()} implementations.
*
* @param <ContextT> the context type with low cardinality to run tasks with
*/
interface LowCardinalityContextBulkhead<ContextT> extends Bulkhead<ContextT> {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,29 @@ public interface Limiter<ContextT> {
/**
*/
interface Listener {

enum Result {
SUCCESS,
IGNORE,
DROPPED
}

default void on(Result result) {
switch (result) {
case SUCCESS:
onSuccess();
break;
case IGNORE:
onIgnore();
break;
case DROPPED:
onDropped();
break;
default:
throw new IllegalArgumentException("Unknown result: " + result);
}
}

/**
* Notification that the operation succeeded and internally measured latency should be
* used as an RTT sample
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/**
* Copyright 2025 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.concurrency.limits.bulkhead;

import com.netflix.concurrency.limits.Bulkhead;
import com.netflix.concurrency.limits.Limiter;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* A dispatcher {@link Bulkhead} that uses a {@link Limiter} to control concurrency and a backlog
* {@link BlockingQueue} to hold pending tasks.
* <p>
* Using this {@link Bulkhead} is suitable in cases where dispatching tasks is cheap and can be done
* by threads calling {@link #executeCompletionStage(Supplier, Object)}, or threads that complete
* the dispatched tasks. Ideally, the actual work of these tasks, e.g., the transport of gRPC calls,
* is done by a separate {@link Executor}. This bulkhead however guarantees there are no more
* concurrent tasks running beyond what the given {@link Limiter} allows.
*
* @param <ContextT> the context type to run tasks with
*/
public abstract class AbstractDispatcherBulkhead<ContextT> implements Bulkhead<ContextT> {

protected final Limiter<ContextT> limiter;

protected final Queue<BulkheadTask<?, ContextT>> backlog;

protected final Function<Throwable, Limiter.Listener.Result> exceptionClassifier;

protected final int maxDispatchPerCall;

protected AbstractDispatcherBulkhead(Limiter<ContextT> limiter,
Queue<BulkheadTask<?, ContextT>> backlog,
Function<Throwable, Limiter.Listener.Result> exceptionClassifier,
int maxDispatchPerCall) {
this.limiter = limiter;
this.backlog = backlog;
this.exceptionClassifier = exceptionClassifier;
this.maxDispatchPerCall = maxDispatchPerCall;
}

public final int getMaxDispatchPerCall() {
return maxDispatchPerCall;
}

@Override
public final <T> CompletionStage<T> executeCompletionStage(Supplier<? extends CompletionStage<T>> supplier, ContextT context) {
final CompletableFuture<T> result = new CompletableFuture<>();

try {
backlog.add(new BulkheadTask<>(supplier, result, context));
drain();
} catch (IllegalStateException ise) {
result.completeExceptionally(new RejectedExecutionException("Backlog full", ise));
}

return result;
}

protected abstract void drain();

protected final <T> void dispatch(BulkheadTask<T, ContextT> task, Limiter.Listener listener) {
final CompletionStage<T> stage;
try {
stage = task.supplier.get();
} catch (RuntimeException re) {
// Failed before a meaningful RTT measurement could be made.
listener.onIgnore();
task.result.completeExceptionally(re);
drain();
return;
}

stage.whenComplete(
(value, throwable) -> {
try {
if (throwable == null) {
listener.onSuccess();
task.result.complete(value);
} else {
Limiter.Listener.Result result = classifyException(throwable);
listener.on(result);
task.result.completeExceptionally(throwable);
}
} finally {
// Completion frees capacity; kick the drainer to fill newly available tokens.
drain();
}
});
}

private Limiter.Listener.Result classifyException(Throwable throwable) {
return (throwable instanceof CompletionException || throwable instanceof ExecutionException)
&& throwable.getCause() != null
? classifyException(throwable.getCause())
: exceptionClassifier.apply(throwable);

}

protected static abstract class AbstractBuilder<ContextT, BuilderT extends AbstractBuilder<ContextT, BuilderT>> {

protected Limiter<ContextT> limiter;

protected Queue<BulkheadTask<?, ContextT>> backlog;

protected Function<Throwable, Limiter.Listener.Result> exceptionClassifier;

protected int maxDispatchPerCall;

public final BuilderT limiter(Limiter<ContextT> limiter) {
this.limiter = limiter;
return self();
}

private BuilderT backlog(Queue<BulkheadTask<?, ContextT>> backlog) {
this.backlog = backlog;
return self();
}

public final BuilderT backlog(int size) {
if (size < 0) {
return backlog(new ConcurrentLinkedQueue<>());
} else if (size == 0) {
return backlog(new SynchronousQueue<>());
} else if (size >= 10_000) {
return backlog(new LinkedBlockingQueue<>(size));
} else {
return backlog(new ArrayBlockingQueue<>(size));
}
}

public final BuilderT exceptionClassifier(Function<Throwable, Limiter.Listener.Result> exceptionClassifier) {
this.exceptionClassifier = exceptionClassifier;
return self();
}


public final BuilderT maxDispatchPerCall(int maxDispatchPerCall) {
this.maxDispatchPerCall = maxDispatchPerCall;
return self();
}

@SuppressWarnings("unchecked")
private BuilderT self() {
return (BuilderT) this;
}

public abstract Bulkhead<ContextT> build();
}

protected static class BulkheadTask<T, ContextT> {

final Supplier<? extends CompletionStage<T>> supplier;

final CompletableFuture<T> result;

final ContextT context;

BulkheadTask(Supplier<? extends CompletionStage<T>> supplier, CompletableFuture<T> result, ContextT context) {
this.supplier = supplier;
this.result = result;
this.context = context;
}
}
}
Loading