diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Bulkhead.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Bulkhead.java new file mode 100644 index 0000000..3bdf91b --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Bulkhead.java @@ -0,0 +1,156 @@ +/** + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Supplier; + +/** + * A bulkhead that can limit concurrent executions with a given context. + * + * @param the context type to run tasks with + */ +public interface Bulkhead { + + /** + * Executes the given {@link CompletionStage} with the given context. It is recommended to set + * a timeout on the given {@link CompletionStage} to ensure progress. + * + * @param supplier the task to run + * @param context the context to run the task with + * @param the type of the {@link CompletionStage}s + * @return a new {@link CompletionStage} with the same completion result as the supplier, + * except for when this bulkhead cannot accept the task, in which case it completes + * exceptionally with a {@link RejectedExecutionException} + */ + CompletionStage executeCompletionStage(Supplier> supplier, ContextT context); + + /** + * A bulkhead that drains the backlog in parallel and executes its tasks in parallel (while + * limiting concurrency). This bulkhead offers additional methods to run synchronous tasks in + * calling threads (which is not recommended in a serial drainer). + * + * @param the context type to run tasks with + */ + interface ParallelDrainingBulkhead extends Bulkhead { + + /** + * Executes the given {@link Supplier} with the given context. + * + * @param supplier the task to run + * @param context the context to run the task with + * @param the type of the result + * @return a {@link CompletionStage} with a completion result set by running the supplier, + * except for when this bulkhead cannot accept the task, in which case it completes + * exceptionally with a {@link RejectedExecutionException} + */ + default CompletionStage executeSupplier(Supplier supplier, ContextT context) { + return executeCompletionStage(() -> { + try { + return CompletableFuture.completedFuture(supplier.get()); + } catch (Throwable t) { + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(t); + return failed; + } + }, context); + } + + /** + * Executes the given {@link Runnable} with the given context. + * + * @param runnable the task to run + * @param context the context to run the task with + * @return a {@link CompletionStage} with a completion result set by running the runnable, + * except for when this bulkhead cannot accept the task, in which case it completes + * exceptionally with a {@link RejectedExecutionException} + */ + default CompletionStage execute(Runnable runnable, ContextT context) { + return executeSupplier(() -> { + runnable.run(); + return null; + }, context); + } + + /** + * Executes the given {@link Callable} with the given context. + * + * @param callable the task to run + * @param context the context to run the task with + * @return a {@link CompletionStage} with a completion result set by running the callable, + * except for when this bulkhead cannot accept the task, in which case it completes + * exceptionally with a {@link RejectedExecutionException} + */ + default CompletionStage execute(Callable callable, ContextT context) { + return executeSupplier(() -> { + try { + return callable.call(); + } catch (Exception e) { + throw new CompletionException(e); + } + }, context); + } + } + + /** + * A bulkhead that executes its tasks without contexts. Implementations can freely exchange + * permits from {@link Limiter}s for tasks without consequences (e.g., maintaining total + * execution order), allowing for parallelism. + */ + interface GlobalBulkhead extends Bulkhead { + + default CompletionStage executeCompletionStage(Supplier> supplier) { + return executeCompletionStage(supplier, null); + } + } + + /** + * A bulkhead that drains the backlog in parallel and executes its tasks in parallel without + * contexts. + */ + interface GlobalParallelDrainingBulkhead extends GlobalBulkhead, ParallelDrainingBulkhead { + + default CompletionStage executeSupplier(Supplier supplier) { + return executeSupplier(supplier, null); + } + + default CompletionStage execute(Runnable runnable) { + return execute(runnable, null); + } + + default CompletionStage execute(Callable callable) { + return execute(callable, null); + } + + } + + /** + * A bulkhead that executes its tasks with contexts that have low cardinality (i.e., few + * distinct values). This allows for maintaining execution order by exchanging permits from + * {@link Limiter}s across tasks with equal contexts. Implementations can take advantage of + * this, e.g., by maintaining a partial FIFO order of task execution and parallize draining + * backlogs. Crucially, contexts should have proper {@link Object#equals(Object)} and + * {@link Object#hashCode()} implementations. + * + * @param the context type with low cardinality to run tasks with + */ + interface LowCardinalityContextBulkhead extends Bulkhead { + } +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limiter.java index 8817af5..d629a4d 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limiter.java @@ -29,6 +29,29 @@ public interface Limiter { /** */ interface Listener { + + enum Result { + SUCCESS, + IGNORE, + DROPPED + } + + default void on(Result result) { + switch (result) { + case SUCCESS: + onSuccess(); + break; + case IGNORE: + onIgnore(); + break; + case DROPPED: + onDropped(); + break; + default: + throw new IllegalArgumentException("Unknown result: " + result); + } + } + /** * Notification that the operation succeeded and internally measured latency should be * used as an RTT sample diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/AbstractDispatcherBulkhead.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/AbstractDispatcherBulkhead.java new file mode 100644 index 0000000..a2432b6 --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/AbstractDispatcherBulkhead.java @@ -0,0 +1,190 @@ +/** + * Copyright 2025 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.bulkhead; + +import com.netflix.concurrency.limits.Bulkhead; +import com.netflix.concurrency.limits.Limiter; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * A dispatcher {@link Bulkhead} that uses a {@link Limiter} to control concurrency and a backlog + * {@link BlockingQueue} to hold pending tasks. + *

+ * Using this {@link Bulkhead} is suitable in cases where dispatching tasks is cheap and can be done + * by threads calling {@link #executeCompletionStage(Supplier, Object)}, or threads that complete + * the dispatched tasks. Ideally, the actual work of these tasks, e.g., the transport of gRPC calls, + * is done by a separate {@link Executor}. This bulkhead however guarantees there are no more + * concurrent tasks running beyond what the given {@link Limiter} allows. + * + * @param the context type to run tasks with + */ +public abstract class AbstractDispatcherBulkhead implements Bulkhead { + + protected final Limiter limiter; + + protected final Queue> backlog; + + protected final Function exceptionClassifier; + + protected final int maxDispatchPerCall; + + protected AbstractDispatcherBulkhead(Limiter limiter, + Queue> backlog, + Function exceptionClassifier, + int maxDispatchPerCall) { + this.limiter = limiter; + this.backlog = backlog; + this.exceptionClassifier = exceptionClassifier; + this.maxDispatchPerCall = maxDispatchPerCall; + } + + public final int getMaxDispatchPerCall() { + return maxDispatchPerCall; + } + + @Override + public final CompletionStage executeCompletionStage(Supplier> supplier, ContextT context) { + final CompletableFuture result = new CompletableFuture<>(); + + try { + backlog.add(new BulkheadTask<>(supplier, result, context)); + drain(); + } catch (IllegalStateException ise) { + result.completeExceptionally(new RejectedExecutionException("Backlog full", ise)); + } + + return result; + } + + protected abstract void drain(); + + protected final void dispatch(BulkheadTask task, Limiter.Listener listener) { + final CompletionStage stage; + try { + stage = task.supplier.get(); + } catch (RuntimeException re) { + // Failed before a meaningful RTT measurement could be made. + listener.onIgnore(); + task.result.completeExceptionally(re); + drain(); + return; + } + + stage.whenComplete( + (value, throwable) -> { + try { + if (throwable == null) { + listener.onSuccess(); + task.result.complete(value); + } else { + Limiter.Listener.Result result = classifyException(throwable); + listener.on(result); + task.result.completeExceptionally(throwable); + } + } finally { + // Completion frees capacity; kick the drainer to fill newly available tokens. + drain(); + } + }); + } + + private Limiter.Listener.Result classifyException(Throwable throwable) { + return (throwable instanceof CompletionException || throwable instanceof ExecutionException) + && throwable.getCause() != null + ? classifyException(throwable.getCause()) + : exceptionClassifier.apply(throwable); + + } + + protected static abstract class AbstractBuilder> { + + protected Limiter limiter; + + protected Queue> backlog; + + protected Function exceptionClassifier; + + protected int maxDispatchPerCall; + + public final BuilderT limiter(Limiter limiter) { + this.limiter = limiter; + return self(); + } + + private BuilderT backlog(Queue> backlog) { + this.backlog = backlog; + return self(); + } + + public final BuilderT backlog(int size) { + if (size < 0) { + return backlog(new ConcurrentLinkedQueue<>()); + } else if (size == 0) { + return backlog(new SynchronousQueue<>()); + } else if (size >= 10_000) { + return backlog(new LinkedBlockingQueue<>(size)); + } else { + return backlog(new ArrayBlockingQueue<>(size)); + } + } + + public final BuilderT exceptionClassifier(Function exceptionClassifier) { + this.exceptionClassifier = exceptionClassifier; + return self(); + } + + + public final BuilderT maxDispatchPerCall(int maxDispatchPerCall) { + this.maxDispatchPerCall = maxDispatchPerCall; + return self(); + } + + @SuppressWarnings("unchecked") + private BuilderT self() { + return (BuilderT) this; + } + + public abstract Bulkhead build(); + } + + protected static class BulkheadTask { + + final Supplier> supplier; + + final CompletableFuture result; + + final ContextT context; + + BulkheadTask(Supplier> supplier, CompletableFuture result, ContextT context) { + this.supplier = supplier; + this.result = result; + this.context = context; + } + } +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/EnumContextPartialFifoOrderBulkhead.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/EnumContextPartialFifoOrderBulkhead.java new file mode 100644 index 0000000..b42da7a --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/EnumContextPartialFifoOrderBulkhead.java @@ -0,0 +1,76 @@ +/** + * Copyright 2025 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.bulkhead; + +import com.netflix.concurrency.limits.Bulkhead; +import com.netflix.concurrency.limits.Bulkhead.LowCardinalityContextBulkhead; +import com.netflix.concurrency.limits.Bulkhead.ParallelDrainingBulkhead; +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; + +/** + * A non-blocking {@link Bulkhead} that maintains a partial FIFO ordering of task execution; tasks + * with equal contexts are executed in FIFO order, but tasks with different contexts may be + * reordered to allow parallel draining. This class is intended for use with low-cardinality enum + * contexts, where each context gets its own {@link Bulkhead} (and backlog) to allow parallel + * draining. + * + * @param the {@link Enum} context type to run tasks with + * @see FifoParallelDispatcherBulkhead + */ +public class EnumContextPartialFifoOrderBulkhead> implements + LowCardinalityContextBulkhead, + ParallelDrainingBulkhead { + + private final Map> bulkheads; + + private EnumContextPartialFifoOrderBulkhead(Map> bulkheads) { + this.bulkheads = bulkheads; + } + + @Override + public CompletionStage executeCompletionStage(Supplier> supplier, + ContextT context) { + return bulkheads.get(context).executeCompletionStage(supplier, context); + } + + public static > Builder newBuilder(Class clazz) { + return new Builder<>(clazz); + } + + public static class Builder> extends + AbstractDispatcherBulkhead.AbstractBuilder> { + + private final Class clazz; + + private Builder(Class clazz) { + this.clazz = clazz; + } + + @Override + public EnumContextPartialFifoOrderBulkhead build() { + final Map> bulkheads = new EnumMap<>(clazz); + for (ContextT context : clazz.getEnumConstants()) { + final Bulkhead bulkhead = new SingletonContextDispatcherBulkhead<>( + limiter, backlog, exceptionClassifier, maxDispatchPerCall, context); + bulkheads.put(context, bulkhead); + } + return new EnumContextPartialFifoOrderBulkhead<>(bulkheads); + } + } +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/FifoParallelDispatcherBulkhead.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/FifoParallelDispatcherBulkhead.java new file mode 100644 index 0000000..54b1fc0 --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/FifoParallelDispatcherBulkhead.java @@ -0,0 +1,73 @@ +/** + * Copyright 2025 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.bulkhead; + +import com.netflix.concurrency.limits.Bulkhead; +import com.netflix.concurrency.limits.Bulkhead.GlobalParallelDrainingBulkhead; +import com.netflix.concurrency.limits.Limiter; +import java.util.Optional; +import java.util.Queue; +import java.util.function.Function; + +/** + * A non-blocking dispatcher {@link Bulkhead} that guarantees FIFO ordering of task execution. Does + * not support contexts for task execution in favor of parallelism. + * + * @see AbstractDispatcherBulkhead + */ +public class FifoParallelDispatcherBulkhead + extends AbstractDispatcherBulkhead + implements GlobalParallelDrainingBulkhead { + + private FifoParallelDispatcherBulkhead(Limiter limiter, + Queue> backlog, + Function exceptionClassifier, + int maxDispatchPerCall) { + super(limiter, backlog, exceptionClassifier, maxDispatchPerCall); + } + + + @Override + protected void drain() { + int dispatched = 0; + while (dispatched < maxDispatchPerCall && !backlog.isEmpty()) { + final Optional listener = limiter.acquire(null); + if (!listener.isPresent()) { + return; + } + + final BulkheadTask head = backlog.poll(); + if (head == null) { + listener.get().onIgnore(); + } else { + dispatch(head, listener.get()); + dispatched++; + } + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder extends AbstractBuilder { + + @Override + public FifoParallelDispatcherBulkhead build() { + return new FifoParallelDispatcherBulkhead(limiter, backlog, exceptionClassifier, maxDispatchPerCall); + } + } +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/FifoSerialDispatcherBulkhead.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/FifoSerialDispatcherBulkhead.java new file mode 100644 index 0000000..ca3f4da --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/FifoSerialDispatcherBulkhead.java @@ -0,0 +1,93 @@ +/** + * Copyright 2025 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.bulkhead; + +import com.netflix.concurrency.limits.Bulkhead; +import com.netflix.concurrency.limits.Limiter; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +/** + * A non-blocking dispatcher {@link Bulkhead} that guarantees FIFO ordering of task execution. + *

+ * The downside of maintaining this strict FIFO ordering is that draining cannot be done in + * parallel, hence {@link RoundRobinDispatcherBulkhead} may provide higher throughput in some + * scenarios. However, FIFO ordering may provide lower latencies. + * + * @param the context type to run tasks with + * @see AbstractDispatcherBulkhead + */ +public class FifoSerialDispatcherBulkhead extends AbstractDispatcherBulkhead { + + private final AtomicInteger wip = new AtomicInteger(); + + private FifoSerialDispatcherBulkhead(Limiter limiter, + Queue> backlog, + Function exceptionClassifier, + int maxDispatchPerCall) { + super(limiter, backlog, exceptionClassifier, maxDispatchPerCall); + } + + public int getWip() { + return wip.get(); + } + + @Override + protected void drain() { + if (wip.getAndIncrement() == 0) { + drainLoop(); + } + } + + private void drainLoop() { + int todo = 1; + + while (todo > 0) { + int dispatched = 0; + BulkheadTask head; + while (dispatched < maxDispatchPerCall && (head = backlog.peek()) != null) { + final Optional listener = limiter.acquire(head.context); + if (!listener.isPresent()) { + break; + } + + head = backlog.poll(); + if (head == null) { + listener.get().onIgnore(); + } else { + dispatch(head, listener.get()); + dispatched++; + } + } + + todo = wip.addAndGet(-todo); + } + } + + public static Builder newBuilder() { + return new Builder<>(); + } + + public static class Builder extends AbstractBuilder> { + + @Override + public FifoSerialDispatcherBulkhead build() { + return new FifoSerialDispatcherBulkhead<>(limiter, backlog, exceptionClassifier, maxDispatchPerCall); + } + } +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/RoundRobinDispatcherBulkhead.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/RoundRobinDispatcherBulkhead.java new file mode 100644 index 0000000..9cf9ed9 --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/RoundRobinDispatcherBulkhead.java @@ -0,0 +1,77 @@ +/** + * Copyright 2025 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.bulkhead; + +import com.netflix.concurrency.limits.Bulkhead; +import com.netflix.concurrency.limits.Bulkhead.ParallelDrainingBulkhead; +import com.netflix.concurrency.limits.Limiter; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Function; + +/** + * A non-blocking dispatcher {@link Bulkhead} with round-robin style task execution; when no permit + * can be obtained from the {@link Limiter}, tasks are added back to (the tail) of the queue. This + * means there is no FIFO guarantee on the order of task execution. However, draining the queue can + * be done in parallel, which may provide higher throughput, but higher latencies than + * {@link FifoSerialDispatcherBulkhead} in some scenarios. + * + * @param the context type to run tasks with + * @see AbstractDispatcherBulkhead + */ +public class RoundRobinDispatcherBulkhead extends AbstractDispatcherBulkhead + implements ParallelDrainingBulkhead { + + private RoundRobinDispatcherBulkhead(Limiter limiter, + Queue> backlog, + Function exceptionClassifier, + int maxDispatchPerCall) { + super(limiter, backlog, exceptionClassifier, maxDispatchPerCall); + } + + @Override + protected void drain() { + BulkheadTask head; + int dispatched; + for (head = backlog.poll(), dispatched = 0; + head != null && dispatched < maxDispatchPerCall; + head = backlog.poll(), dispatched++) { + final Optional listener = limiter.acquire(head.context); + if (!listener.isPresent()) { + try { + backlog.add(head); + } catch (IllegalStateException ise) { + head.result.completeExceptionally(new RejectedExecutionException("Backlog full", ise)); + } + return; + } + + dispatch(head, listener.get()); + } + } + + public static Builder newBuilder() { + return new Builder<>(); + } + + public static class Builder extends AbstractBuilder> { + + public RoundRobinDispatcherBulkhead build() { + return new RoundRobinDispatcherBulkhead<>(limiter, backlog, exceptionClassifier, maxDispatchPerCall); + } + } +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/SingletonContextDispatcherBulkhead.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/SingletonContextDispatcherBulkhead.java new file mode 100644 index 0000000..e23d141 --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/bulkhead/SingletonContextDispatcherBulkhead.java @@ -0,0 +1,70 @@ +/** + * Copyright 2025 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.bulkhead; + +import com.netflix.concurrency.limits.Bulkhead; +import com.netflix.concurrency.limits.Limiter; +import java.util.Optional; +import java.util.Queue; +import java.util.function.Function; + +/** + * A non-blocking dispatcher {@link Bulkhead} that guarantees FIFO ordering of task execution for a + * given fixed context. Since there is only one context, draining can be done in parallel while + * maintaining FIFO (because permits for tasks with equal contexts can be exchanged freely). This + * class is for internal use only; clients should use {@link EnumContextPartialFifoOrderBulkhead} or + * {@link FifoParallelDispatcherBulkhead}. + * + * @param the context type to run tasks with + * @see EnumContextPartialFifoOrderBulkhead + * @see FifoParallelDispatcherBulkhead + */ +class SingletonContextDispatcherBulkhead extends AbstractDispatcherBulkhead { + + private final ContextT context; + + SingletonContextDispatcherBulkhead(Limiter limiter, + Queue> backlog, + Function exceptionClassifier, + int maxDispatchPerCall, + ContextT context) { + super(limiter, backlog, exceptionClassifier, maxDispatchPerCall); + this.context = context; + } + + @Override + protected void drain() { + int dispatched = 0; + BulkheadTask head; + while (dispatched < maxDispatchPerCall && (head = backlog.peek()) != null) { + assert head.context == context; + final Optional listener = limiter.acquire(context); + if (!listener.isPresent()) { + return; + } + + head = backlog.poll(); + if (head == null) { + listener.get().onIgnore(); + } else { + assert head.context == context; + dispatch(head, listener.get()); + dispatched++; + } + } + + } +} diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/EnumContextPartialFifoOrderBulkheadTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/EnumContextPartialFifoOrderBulkheadTest.java new file mode 100644 index 0000000..0e49547 --- /dev/null +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/EnumContextPartialFifoOrderBulkheadTest.java @@ -0,0 +1,76 @@ +/** + * Copyright 2025 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.bulkhead; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limit.FixedLimit; +import com.netflix.concurrency.limits.limiter.AbstractPartitionedLimiter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Assert; +import org.junit.Test; + +public class EnumContextPartialFifoOrderBulkheadTest { + + @Test + public void testSuccessfulExecution() { + Limiter limiter = new TestLimiterBuilder() + .partition(TestContext.REQUEST.name(), .5) + .partition(TestContext.RESPONSE.name(), .5) + .partitionResolver(TestContext::name) + .limit(FixedLimit.of(2)) + .build(); + + EnumContextPartialFifoOrderBulkhead bulkhead = EnumContextPartialFifoOrderBulkhead.newBuilder(TestContext.class) + .limiter(limiter) + .backlog(1) + .exceptionClassifier(t -> Limiter.Listener.Result.IGNORE) + .maxDispatchPerCall(1) + .build(); + + AtomicBoolean requestExecuted = new AtomicBoolean(false); + + CompletionStage requestResult = bulkhead.executeCompletionStage( + () -> CompletableFuture.runAsync( + () -> requestExecuted.set(true)), TestContext.REQUEST); + + AtomicBoolean responseExecuted = new AtomicBoolean(false); + + CompletionStage responseResult = bulkhead.executeCompletionStage( + () -> CompletableFuture.runAsync( + () -> responseExecuted.set(true)), TestContext.RESPONSE); + + requestResult.toCompletableFuture().join(); + responseResult.toCompletableFuture().join(); + + Assert.assertTrue(requestExecuted.get()); + Assert.assertTrue(responseExecuted.get()); + } + + static class TestLimiterBuilder extends AbstractPartitionedLimiter.Builder { + + @Override + protected TestLimiterBuilder self() { + return this; + } + } + + enum TestContext { + REQUEST, + RESPONSE + } +} diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/FifoParallelDispatcherBulkheadTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/FifoParallelDispatcherBulkheadTest.java new file mode 100644 index 0000000..1fbc4ac --- /dev/null +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/FifoParallelDispatcherBulkheadTest.java @@ -0,0 +1,50 @@ +/** + * Copyright 2025 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.bulkhead; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limit.FixedLimit; +import com.netflix.concurrency.limits.limiter.SimpleLimiter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Assert; +import org.junit.Test; + +public class FifoParallelDispatcherBulkheadTest { + + @Test + public void testSuccessfulExecution() { + Limiter limiter = SimpleLimiter.newBuilder().limit(FixedLimit.of(1)).build(); + + FifoParallelDispatcherBulkhead bulkhead = FifoParallelDispatcherBulkhead.newBuilder() + .limiter(limiter) + .backlog(1) + .exceptionClassifier(t -> Limiter.Listener.Result.IGNORE) + .maxDispatchPerCall(1) + .build(); + + AtomicBoolean executed = new AtomicBoolean(false); + + CompletionStage result = bulkhead.executeCompletionStage( + () -> CompletableFuture.runAsync( + () -> executed.set(true)), null); + + result.toCompletableFuture().join(); + + Assert.assertTrue(executed.get()); + } +} diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/FifoSerialDispatcherBulkheadTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/FifoSerialDispatcherBulkheadTest.java new file mode 100644 index 0000000..7a4bec2 --- /dev/null +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/FifoSerialDispatcherBulkheadTest.java @@ -0,0 +1,50 @@ +/** + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.bulkhead; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limit.FixedLimit; +import com.netflix.concurrency.limits.limiter.SimpleLimiter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Assert; +import org.junit.Test; + +public class FifoSerialDispatcherBulkheadTest { + + @Test + public void testSuccessfulExecution() { + Limiter limiter = SimpleLimiter.newBuilder().limit(FixedLimit.of(1)).build(); + + FifoSerialDispatcherBulkhead bulkhead = FifoSerialDispatcherBulkhead.newBuilder() + .limiter(limiter) + .backlog(1) + .exceptionClassifier(t -> Limiter.Listener.Result.IGNORE) + .maxDispatchPerCall(1) + .build(); + + AtomicBoolean executed = new AtomicBoolean(false); + + CompletionStage result = bulkhead.executeCompletionStage( + () -> CompletableFuture.runAsync( + () -> executed.set(true)), null); + + result.toCompletableFuture().join(); + + Assert.assertTrue(executed.get()); + } +} diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/RoundRobinDispatcherBulkheadTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/RoundRobinDispatcherBulkheadTest.java new file mode 100644 index 0000000..12692c0 --- /dev/null +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/bulkhead/RoundRobinDispatcherBulkheadTest.java @@ -0,0 +1,50 @@ +/** + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.bulkhead; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limit.FixedLimit; +import com.netflix.concurrency.limits.limiter.SimpleLimiter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Assert; +import org.junit.Test; + +public class RoundRobinDispatcherBulkheadTest { + + @Test + public void testSuccessfulExecution() { + Limiter limiter = SimpleLimiter.newBuilder().limit(FixedLimit.of(1)).build(); + + RoundRobinDispatcherBulkhead bulkhead = RoundRobinDispatcherBulkhead.newBuilder() + .limiter(limiter) + .backlog(1) + .exceptionClassifier(t -> Limiter.Listener.Result.IGNORE) + .maxDispatchPerCall(1) + .build(); + + AtomicBoolean executed = new AtomicBoolean(false); + + CompletionStage result = bulkhead.executeCompletionStage( + () -> CompletableFuture.runAsync( + () -> executed.set(true)), null); + + result.toCompletableFuture().join(); + + Assert.assertTrue(executed.get()); + } +}