diff --git a/src/main/java/org/eclipse/jetty/reactive/client/ReactiveRequest.java b/src/main/java/org/eclipse/jetty/reactive/client/ReactiveRequest.java index 8781d40..a7103f5 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/ReactiveRequest.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/ReactiveRequest.java @@ -22,8 +22,8 @@ import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.Request; import org.eclipse.jetty.io.Content.Chunk; +import org.eclipse.jetty.reactive.client.internal.AdapterRequestContent; import org.eclipse.jetty.reactive.client.internal.PublisherContent; -import org.eclipse.jetty.reactive.client.internal.PublisherRequestContent; import org.eclipse.jetty.reactive.client.internal.RequestEventPublisher; import org.eclipse.jetty.reactive.client.internal.ResponseEventPublisher; import org.eclipse.jetty.reactive.client.internal.ResponseListenerProcessor; @@ -175,7 +175,7 @@ public Builder(Request request) { * @return this instance */ public Builder content(Content content) { - request.body(new PublisherRequestContent(content)); + request.body(new AdapterRequestContent(content)); return this; } @@ -302,6 +302,15 @@ public interface Content extends Publisher { */ public String getContentType(); + /** + *

Rewinds this content, if possible.

+ * + * @return whether this request content was rewound + */ + public default boolean rewind() { + return false; + } + public static Content fromString(String string, String mediaType, Charset charset) { return new StringContent(string, mediaType, charset); } diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractEventPublisher.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractEventPublisher.java index 3bdf7bb..4f5336c 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractEventPublisher.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractEventPublisher.java @@ -50,7 +50,7 @@ protected void onRequest(Subscriber subscriber, long n) { protected void emit(T event) { Subscriber subscriber = null; try (AutoLock ignored = lock()) { - if (!isCancelled() && demand > 0) { + if (demand > 0) { --demand; subscriber = subscriber(); } diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSingleProcessor.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSingleProcessor.java index 2f17cf8..4c6a223 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSingleProcessor.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSingleProcessor.java @@ -20,9 +20,21 @@ import org.eclipse.jetty.util.MathUtils; import org.eclipse.jetty.util.thread.AutoLock; import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +/** + *

A {@link Processor} that allows a single {@link Subscriber} at a time, + * and can subscribe to only one {@link Publisher} at a time.

+ *

The implementation acts as a {@link Subscriber} to upstream input, + * and acts as a {@link Publisher} for the downstream output.

+ *

Subclasses implement the transformation of the input elements into the + * output elements by overriding {@link #onNext(Object)}.

+ * + * @param the type of the input elements + * @param the type of the output elements + */ public abstract class AbstractSingleProcessor extends AbstractSinglePublisher implements Processor { private Subscription upStream; private long demand; @@ -44,10 +56,12 @@ public void cancel() { } private void upStreamCancel() { - upStreamCancel(upStream()); - } - - private void upStreamCancel(Subscription upStream) { + Subscription upStream; + try (AutoLock ignored = lock()) { + upStream = this.upStream; + // Null-out the field to allow re-subscriptions. + this.upStream = null; + } if (upStream != null) { upStream.cancel(); } @@ -59,7 +73,8 @@ protected void onRequest(Subscriber subscriber, long n) { Subscription upStream; try (AutoLock ignored = lock()) { demand = MathUtils.cappedAdd(this.demand, n); - upStream = upStream(); + upStream = this.upStream; + // If there is not upStream yet, remember the demand. this.demand = upStream == null ? demand : 0; } upStreamRequest(upStream, demand); @@ -79,28 +94,27 @@ private void upStreamRequest(Subscription upStream, long demand) { public void onSubscribe(Subscription subscription) { Objects.requireNonNull(subscription, "invalid 'null' subscription"); long demand = 0; - boolean cancel = false; + Throwable failure = null; try (AutoLock ignored = lock()) { if (this.upStream != null) { - cancel = true; + failure = new IllegalStateException("multiple subscriptions not supported"); } else { - if (isCancelled()) { - cancel = true; - } else { - this.upStream = subscription; - demand = this.demand; - this.demand = 0; - } + this.upStream = subscription; + // The demand stored so far will be forwarded. + demand = this.demand; + this.demand = 0; } } - if (cancel) { + if (failure != null) { subscription.cancel(); + onError(failure); } else if (demand > 0) { + // Forward any previously stored demand. subscription.request(demand); } } - protected Subscription upStream() { + private Subscription upStream() { try (AutoLock ignored = lock()) { return upStream; } diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSinglePublisher.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSinglePublisher.java index f27858a..7364fe6 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSinglePublisher.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSinglePublisher.java @@ -16,7 +16,6 @@ package org.eclipse.jetty.reactive.client.internal; import java.util.Objects; -import java.util.concurrent.CancellationException; import org.eclipse.jetty.util.thread.AutoLock; import org.reactivestreams.Publisher; @@ -26,16 +25,15 @@ import org.slf4j.LoggerFactory; /** - * A Publisher that supports a single Subscriber. + *

A {@link Publisher} that supports a single {@link Subscriber} at a time.

* - * @param the type of items emitted by this Publisher + * @param the type of items emitted by this {@link Publisher} */ public abstract class AbstractSinglePublisher implements Publisher, Subscription { private static final Logger logger = LoggerFactory.getLogger(AbstractSinglePublisher.class); private final AutoLock lock = new AutoLock(); private Subscriber subscriber; - private boolean cancelled; protected AutoLock lock() { return lock.lock(); @@ -49,11 +47,7 @@ public void subscribe(Subscriber subscriber) { if (this.subscriber != null) { failure = new IllegalStateException("multiple subscribers not supported"); } else { - if (isCancelled()) { - failure = new CancellationException(); - } else { - this.subscriber = subscriber; - } + this.subscriber = subscriber; } } if (logger.isDebugEnabled()) { @@ -76,10 +70,7 @@ public void request(long n) { Subscriber subscriber; Throwable failure = null; try (AutoLock ignored = lock()) { - if (isCancelled()) { - return; - } - subscriber = subscriber(); + subscriber = this.subscriber; if (n <= 0) { failure = new IllegalArgumentException("reactive stream violation rule 3.9"); } @@ -99,20 +90,14 @@ protected void onFailure(Subscriber subscriber, Throwable failure) { @Override public void cancel() { - Subscriber subscriber; try (AutoLock ignored = lock()) { - cancelled = true; - subscriber = this.subscriber; - this.subscriber = null; - } - if (logger.isDebugEnabled()) { - logger.debug("{} cancelled subscription from {}", this, subscriber); - } - } - - protected boolean isCancelled() { - try (AutoLock ignored = lock()) { - return cancelled; + if (subscriber != null) { + if (logger.isDebugEnabled()) { + logger.debug("{} cancelled subscription from {}", this, subscriber); + } + } + // Null-out the field to allow re-subscriptions. + subscriber = null; } } diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/AdapterRequestContent.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/AdapterRequestContent.java new file mode 100644 index 0000000..4f5ff29 --- /dev/null +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/AdapterRequestContent.java @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.jetty.reactive.client.internal; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.reactive.client.ReactiveRequest; +import org.eclipse.jetty.util.thread.AutoLock; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

A {@link Request.Content} whose source is a {@link ReactiveRequest.Content}.

+ */ +public class AdapterRequestContent implements Request.Content { + private static final Logger logger = LoggerFactory.getLogger(AdapterRequestContent.class); + + private final ReactiveRequest.Content reactiveContent; + private Bridge bridge; + + public AdapterRequestContent(ReactiveRequest.Content content) { + this.reactiveContent = content; + } + + @Override + public long getLength() { + return reactiveContent.getLength(); + } + + @Override + public Content.Chunk read() { + return getOrCreateBridge().read(); + } + + @Override + public void demand(Runnable runnable) { + getOrCreateBridge().demand(runnable); + } + + @Override + public void fail(Throwable failure) { + getOrCreateBridge().fail(failure); + } + + @Override + public boolean rewind() { + boolean rewound = reactiveContent.rewind(); + if (rewound) { + if (bridge != null) { + bridge.cancel(); + bridge = null; + } + } + return rewound; + } + + private Bridge getOrCreateBridge() { + if (bridge == null) { + bridge = new Bridge(); + } + return bridge; + } + + @Override + public String getContentType() { + return reactiveContent.getContentType(); + } + + @Override + public String toString() { + return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + } + + /** + *

A bridge between the {@link Request.Content} read by the {@link HttpClient} + * implementation and the {@link ReactiveRequest.Content} provided by applications.

+ *

The first access to the {@link Request.Content} from the {@link HttpClient} + * implementation creates the bridge and forwards the access, calling either + * {@link #read()} or {@link #demand(Runnable)}. + * Method {@link #read()} returns the current {@link Content.Chunk}. + * Method {@link #demand(Runnable)} forwards the demand to the {@link ReactiveRequest.Content}, + * which in turns calls {@link #onNext(Content.Chunk)}, providing the current chunk + * returned by {@link #read()}.

+ */ + private class Bridge implements Subscriber { + private final AutoLock lock = new AutoLock(); + private Subscription subscription; + private Content.Chunk chunk; + private Throwable failure; + private boolean complete; + private Runnable demand; + + private Bridge() { + reactiveContent.subscribe(this); + } + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + } + + @Override + public void onNext(Content.Chunk c) { + Runnable onDemand; + try (AutoLock ignored = lock.lock()) { + chunk = c; + onDemand = demand; + demand = null; + } + + if (logger.isDebugEnabled()) { + logger.debug("content {} on {}", c, this); + } + + invokeDemand(onDemand); + } + + @Override + public void onError(Throwable error) { + Runnable onDemand; + try (AutoLock ignored = lock.lock()) { + failure = error; + onDemand = demand; + demand = null; + } + + if (logger.isDebugEnabled()) { + logger.debug("error on {}", this, error); + } + + if (onDemand != null) { + invokeDemand(onDemand); + } + } + + @Override + public void onComplete() { + Runnable onDemand; + try (AutoLock ignored = lock.lock()) { + complete = true; + onDemand = demand; + demand = null; + } + + if (logger.isDebugEnabled()) { + logger.debug("complete on {}", this); + } + + if (onDemand != null) { + invokeDemand(onDemand); + } + } + + private Content.Chunk read() { + try (AutoLock ignored = lock.lock()) { + Content.Chunk result = chunk; + if (result == null) { + if (complete) { + result = Content.Chunk.EOF; + } else if (failure != null) { + result = Content.Chunk.from(failure); + } + } + chunk = Content.Chunk.next(result); + if (logger.isDebugEnabled()) { + logger.debug("read {} on {}", result, this); + } + return result; + } + } + + private void demand(Runnable onDemand) { + try (AutoLock ignored = lock.lock()) { + if (demand != null) { + throw new IllegalStateException("demand already exists"); + } + demand = onDemand; + } + + if (logger.isDebugEnabled()) { + logger.debug("demand {} on {}", onDemand, this); + } + + // Forward the demand. + subscription.request(1); + } + + private void invokeDemand(Runnable demand) { + try { + if (logger.isDebugEnabled()) { + logger.debug("invoking demand callback {} on {}", demand, this); + } + demand.run(); + } catch (Throwable x) { + fail(x); + } + } + + private void fail(Throwable failure) { + if (logger.isDebugEnabled()) { + logger.debug("failure while processing request content on {}", this, failure); + } + cancel(); + } + + private void cancel() { + subscription.cancel(); + } + } +} diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/PublisherContent.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/PublisherContent.java index 7cf9600..6ee9e55 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/internal/PublisherContent.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/PublisherContent.java @@ -20,18 +20,31 @@ import org.eclipse.jetty.io.Content; import org.eclipse.jetty.reactive.client.ReactiveRequest; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; /** - * A {@link ReactiveRequest.Content} that wraps a Publisher. + * A {@link ReactiveRequest.Content} that wraps a {@link Publisher}. */ public class PublisherContent extends AbstractSingleProcessor implements ReactiveRequest.Content { + private final Publisher publisher; private final String contentType; public PublisherContent(Publisher publisher, String contentType) { + this.publisher = publisher; this.contentType = Objects.requireNonNull(contentType); + } + + @Override + public void subscribe(Subscriber subscriber) { + super.subscribe(subscriber); publisher.subscribe(this); } + @Override + public void onNext(Content.Chunk chunk) { + downStreamOnNext(chunk); + } + @Override public long getLength() { return -1; @@ -43,7 +56,7 @@ public String getContentType() { } @Override - public void onNext(Content.Chunk chunk) { - downStreamOnNext(chunk); + public boolean rewind() { + return true; } } diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/PublisherRequestContent.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/PublisherRequestContent.java deleted file mode 100644 index 4cb37fe..0000000 --- a/src/main/java/org/eclipse/jetty/reactive/client/internal/PublisherRequestContent.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2017 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.eclipse.jetty.reactive.client.internal; - -import org.eclipse.jetty.client.Request; -import org.eclipse.jetty.io.Content; -import org.eclipse.jetty.io.content.AsyncContent; -import org.eclipse.jetty.reactive.client.ReactiveRequest; -import org.eclipse.jetty.util.Callback; -import org.reactivestreams.Subscriber; - -public class PublisherRequestContent implements Request.Content, Subscriber { - private final AsyncContent asyncContent = new AsyncContent(); - private final ReactiveRequest.Content reactiveContent; - private org.reactivestreams.Subscription subscription; - - public PublisherRequestContent(ReactiveRequest.Content content) { - this.reactiveContent = content; - content.subscribe(this); - } - - @Override - public long getLength() { - return reactiveContent.getLength(); - } - - @Override - public Content.Chunk read() { - return asyncContent.read(); - } - - @Override - public void demand(Runnable runnable) { - asyncContent.demand(runnable); - } - - @Override - public void fail(Throwable failure) { - onError(failure); - } - - @Override - public String getContentType() { - return reactiveContent.getContentType(); - } - - @Override - public void onSubscribe(org.reactivestreams.Subscription subscription) { - this.subscription = subscription; - subscription.request(1); - } - - @Override - public void onNext(Content.Chunk chunk) { - asyncContent.write(chunk.isLast(), chunk.getByteBuffer(), Callback.from(chunk::release, - Callback.from(() -> subscription.request(1), x -> subscription.cancel()))); - } - - @Override - public void onError(Throwable failure) { - asyncContent.fail(failure); - } - - @Override - public void onComplete() { - asyncContent.close(); - } - - @Override - public String toString() { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); - } -} diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/QueuedSinglePublisher.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/QueuedSinglePublisher.java index 99ceeb6..68bb76c 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/internal/QueuedSinglePublisher.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/QueuedSinglePublisher.java @@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory; public class QueuedSinglePublisher extends AbstractSinglePublisher { - public static final Terminal COMPLETE = Subscriber::onComplete; + private static final Terminal COMPLETE = Subscriber::onComplete; private static final Logger logger = LoggerFactory.getLogger(QueuedSinglePublisher.class); private final Queue items = new ArrayDeque<>(); diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java index d05054e..362f0f3 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java @@ -30,7 +30,7 @@ public class StringContent extends AbstractSinglePublisher implem private final String mediaType; private final Charset encoding; private final byte[] bytes; - private boolean complete; + private State state = State.INITIAL; public StringContent(String string, String mediaType, Charset encoding) { this.mediaType = Objects.requireNonNull(mediaType); @@ -48,12 +48,32 @@ public String getContentType() { return mediaType + ";charset=" + encoding.name(); } + @Override + public boolean rewind() { + state = State.INITIAL; + return true; + } + @Override protected void onRequest(Subscriber subscriber, long n) { - if (!complete) { - complete = true; - subscriber.onNext(Content.Chunk.from(ByteBuffer.wrap(bytes), true)); - subscriber.onComplete(); + switch (state) { + case INITIAL: { + state = State.CONTENT; + subscriber.onNext(Content.Chunk.from(ByteBuffer.wrap(bytes), false)); + break; + } + case CONTENT: { + state = State.COMPLETE; + subscriber.onComplete(); + break; + } + default: { + break; + } } } + + private enum State { + INITIAL, CONTENT, COMPLETE + } } diff --git a/src/test/java/org/eclipse/jetty/reactive/client/RxJava2Test.java b/src/test/java/org/eclipse/jetty/reactive/client/RxJava2Test.java index 7faf333..818658f 100644 --- a/src/test/java/org/eclipse/jetty/reactive/client/RxJava2Test.java +++ b/src/test/java/org/eclipse/jetty/reactive/client/RxJava2Test.java @@ -23,7 +23,9 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -35,11 +37,13 @@ import io.reactivex.rxjava3.core.Single; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.reactive.client.internal.AbstractSingleProcessor; +import org.eclipse.jetty.reactive.client.internal.AbstractSinglePublisher; import org.eclipse.jetty.reactive.client.internal.BufferingProcessor; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; @@ -47,6 +51,9 @@ import org.eclipse.jetty.util.Blocker; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.MathUtils; +import org.eclipse.jetty.util.thread.AutoLock; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -619,6 +626,70 @@ public void onComplete() { assertEquals(content.flip(), ByteBuffer.wrap(original)); } + @ParameterizedTest + @MethodSource("protocols") + public void testReproducibleContent(String protocol) throws Exception { + prepare(protocol, new Handler.Abstract() { + @Override + public boolean handle(Request request, Response response, Callback callback) { + String target = Request.getPathInContext(request); + if (!target.equals("/ok")) { + Response.sendRedirect(request, response, callback, HttpStatus.TEMPORARY_REDIRECT_307, "/ok", true); + } else { + Content.copy(request, response, callback); + } + return true; + } + }); + + String text = "hello world"; + ReactiveRequest request = ReactiveRequest.newBuilder(httpClient().newRequest(uri()).method(HttpMethod.POST)) + .content(ReactiveRequest.Content.fromString(text, "text/plain", StandardCharsets.UTF_8)) + .build(); + String content = Single.fromPublisher(request.response(ReactiveResponse.Content.asString())) + .blockingGet(); + + assertEquals(text, content); + } + + @Disabled("Requires Jetty Issue #10879") + @ParameterizedTest + @MethodSource("protocols") + public void testReproducibleContentSplitAndDelayed(String protocol) throws Exception { + prepare(protocol, new Handler.Abstract() { + @Override + public boolean handle(Request request, Response response, Callback callback) { + String target = Request.getPathInContext(request); + if (!target.equals("/ok")) { + Response.sendRedirect(request, response, callback, HttpStatus.TEMPORARY_REDIRECT_307, "/ok", true); + } else { + Content.copy(request, response, callback); + } + return true; + } + }); + + String text1 = "hello"; + String text2 = "world"; + ChunkListSinglePublisher publisher = new ChunkListSinglePublisher(); + // Offer content to trigger the sending of the request and the processing on the server. + publisher.offer(StandardCharsets.UTF_8.encode(text1)); + ReactiveRequest request = ReactiveRequest.newBuilder(httpClient().newRequest(uri()).method(HttpMethod.POST)) + .content(ReactiveRequest.Content.fromPublisher(publisher, "text/plain", StandardCharsets.UTF_8)) + .build(); + Single flow = Single.fromPublisher(request.response(ReactiveResponse.Content.asString())); + // Send the request by subscribing as a CompletableFuture. + CompletableFuture completable = flow.toCompletionStage().toCompletableFuture(); + + // Allow the redirect to happen. + Thread.sleep(1000); + + publisher.offer(StandardCharsets.UTF_8.encode(text2)); + publisher.complete(); + + assertEquals(text1 + text2, completable.get(5, TimeUnit.SECONDS)); + } + private record Pair(X one, Y two) { } @@ -627,4 +698,102 @@ private BufferedResponse(ReactiveResponse response) { this(response, new ArrayList<>()); } } + + private static class ChunkListSinglePublisher extends AbstractSinglePublisher { + private final List byteBuffers = new ArrayList<>(); + private boolean complete; + private boolean stalled; + private long demand; + private int index; + + private ChunkListSinglePublisher() { + reset(); + } + + private void reset() { + try (AutoLock ignored = lock()) { + complete = false; + stalled = true; + demand = 0; + index = 0; + } + } + + private void offer(ByteBuffer byteBuffer) { + Subscriber subscriber; + try (AutoLock ignored = lock()) { + byteBuffers.add(Objects.requireNonNull(byteBuffer)); + subscriber = subscriber(); + if (subscriber != null) { + stalled = false; + } + } + if (subscriber != null) { + proceed(subscriber); + } + } + + private void complete() { + complete = true; + Subscriber subscriber = subscriber(); + if (subscriber != null) { + proceed(subscriber); + } + } + + @Override + protected void onRequest(Subscriber subscriber, long n) { + boolean proceed = false; + try (AutoLock ignored = lock()) { + demand = MathUtils.cappedAdd(demand, n); + if (stalled) { + stalled = false; + proceed = true; + } + } + if (proceed) { + proceed(subscriber); + } + } + + private void proceed(Subscriber subscriber) { + while (true) { + ByteBuffer byteBuffer = null; + boolean notify = false; + try (AutoLock ignored = lock()) { + if (index < byteBuffers.size()) { + if (demand > 0) { + byteBuffer = byteBuffers.get(index); + ++index; + --demand; + notify = true; + } else { + stalled = true; + } + } else { + if (complete) { + notify = true; + } else { + stalled = true; + } + } + } + if (notify) { + if (byteBuffer != null) { + subscriber.onNext(Content.Chunk.from(byteBuffer.slice(), false)); + continue; + } else { + subscriber.onComplete(); + } + } + break; + } + } + + @Override + public void cancel() { + reset(); + super.cancel(); + } + } }