Skip to content

Commit

Permalink
Fixes #194 - Introduce reproducible content.
Browse files Browse the repository at this point in the history
Introduced ReactiveRequest.Content.rewind(), to be forwarded to Jetty's Request.Content.

Reworked AbstractSingleProcessor and other classes to allow the subscribe-cancel-subscribe sequence, since it is allowed by the specification.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Nov 13, 2023
1 parent 5f9e8bc commit b32ace4
Show file tree
Hide file tree
Showing 10 changed files with 490 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.io.Content.Chunk;
import org.eclipse.jetty.reactive.client.internal.AdapterRequestContent;
import org.eclipse.jetty.reactive.client.internal.PublisherContent;
import org.eclipse.jetty.reactive.client.internal.PublisherRequestContent;
import org.eclipse.jetty.reactive.client.internal.RequestEventPublisher;
import org.eclipse.jetty.reactive.client.internal.ResponseEventPublisher;
import org.eclipse.jetty.reactive.client.internal.ResponseListenerProcessor;
Expand Down Expand Up @@ -175,7 +175,7 @@ public Builder(Request request) {
* @return this instance
*/
public Builder content(Content content) {
request.body(new PublisherRequestContent(content));
request.body(new AdapterRequestContent(content));
return this;
}

Expand Down Expand Up @@ -302,6 +302,15 @@ public interface Content extends Publisher<Chunk> {
*/
public String getContentType();

/**
* <p>Rewinds this content, if possible.</p>
*
* @return whether this request content was rewound
*/
public default boolean rewind() {
return false;
}

public static Content fromString(String string, String mediaType, Charset charset) {
return new StringContent(string, mediaType, charset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected void onRequest(Subscriber<? super T> subscriber, long n) {
protected void emit(T event) {
Subscriber<? super T> subscriber = null;
try (AutoLock ignored = lock()) {
if (!isCancelled() && demand > 0) {
if (demand > 0) {
--demand;
subscriber = subscriber();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,21 @@
import org.eclipse.jetty.util.MathUtils;
import org.eclipse.jetty.util.thread.AutoLock;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
* <p>A {@link Processor} that allows a single {@link Subscriber} at a time,
* and can subscribe to only one {@link Publisher} at a time.</p>
* <p>The implementation acts as a {@link Subscriber} to upstream input,
* and acts as a {@link Publisher} for the downstream output.</p>
* <p>Subclasses implement the transformation of the input elements into the
* output elements by overriding {@link #onNext(Object)}.</p>
*
* @param <I> the type of the input elements
* @param <O> the type of the output elements
*/
public abstract class AbstractSingleProcessor<I, O> extends AbstractSinglePublisher<O> implements Processor<I, O> {
private Subscription upStream;
private long demand;
Expand All @@ -44,10 +56,12 @@ public void cancel() {
}

private void upStreamCancel() {
upStreamCancel(upStream());
}

private void upStreamCancel(Subscription upStream) {
Subscription upStream;
try (AutoLock ignored = lock()) {
upStream = this.upStream;
// Null-out the field to allow re-subscriptions.
this.upStream = null;
}
if (upStream != null) {
upStream.cancel();
}
Expand All @@ -59,7 +73,8 @@ protected void onRequest(Subscriber<? super O> subscriber, long n) {
Subscription upStream;
try (AutoLock ignored = lock()) {
demand = MathUtils.cappedAdd(this.demand, n);
upStream = upStream();
upStream = this.upStream;
// If there is not upStream yet, remember the demand.
this.demand = upStream == null ? demand : 0;
}
upStreamRequest(upStream, demand);
Expand All @@ -79,28 +94,27 @@ private void upStreamRequest(Subscription upStream, long demand) {
public void onSubscribe(Subscription subscription) {
Objects.requireNonNull(subscription, "invalid 'null' subscription");
long demand = 0;
boolean cancel = false;
Throwable failure = null;
try (AutoLock ignored = lock()) {
if (this.upStream != null) {
cancel = true;
failure = new IllegalStateException("multiple subscriptions not supported");
} else {
if (isCancelled()) {
cancel = true;
} else {
this.upStream = subscription;
demand = this.demand;
this.demand = 0;
}
this.upStream = subscription;
// The demand stored so far will be forwarded.
demand = this.demand;
this.demand = 0;
}
}
if (cancel) {
if (failure != null) {
subscription.cancel();
onError(failure);
} else if (demand > 0) {
// Forward any previously stored demand.
subscription.request(demand);
}
}

protected Subscription upStream() {
private Subscription upStream() {
try (AutoLock ignored = lock()) {
return upStream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.eclipse.jetty.reactive.client.internal;

import java.util.Objects;
import java.util.concurrent.CancellationException;

import org.eclipse.jetty.util.thread.AutoLock;
import org.reactivestreams.Publisher;
Expand All @@ -26,16 +25,15 @@
import org.slf4j.LoggerFactory;

/**
* A Publisher that supports a single Subscriber.
* <p>A {@link Publisher} that supports a single {@link Subscriber} at a time.</p>
*
* @param <T> the type of items emitted by this Publisher
* @param <T> the type of items emitted by this {@link Publisher}
*/
public abstract class AbstractSinglePublisher<T> implements Publisher<T>, Subscription {
private static final Logger logger = LoggerFactory.getLogger(AbstractSinglePublisher.class);

private final AutoLock lock = new AutoLock();
private Subscriber<? super T> subscriber;
private boolean cancelled;

protected AutoLock lock() {
return lock.lock();
Expand All @@ -49,11 +47,7 @@ public void subscribe(Subscriber<? super T> subscriber) {
if (this.subscriber != null) {
failure = new IllegalStateException("multiple subscribers not supported");
} else {
if (isCancelled()) {
failure = new CancellationException();
} else {
this.subscriber = subscriber;
}
this.subscriber = subscriber;
}
}
if (logger.isDebugEnabled()) {
Expand All @@ -76,10 +70,7 @@ public void request(long n) {
Subscriber<? super T> subscriber;
Throwable failure = null;
try (AutoLock ignored = lock()) {
if (isCancelled()) {
return;
}
subscriber = subscriber();
subscriber = this.subscriber;
if (n <= 0) {
failure = new IllegalArgumentException("reactive stream violation rule 3.9");
}
Expand All @@ -99,20 +90,14 @@ protected void onFailure(Subscriber<? super T> subscriber, Throwable failure) {

@Override
public void cancel() {
Subscriber<? super T> subscriber;
try (AutoLock ignored = lock()) {
cancelled = true;
subscriber = this.subscriber;
this.subscriber = null;
}
if (logger.isDebugEnabled()) {
logger.debug("{} cancelled subscription from {}", this, subscriber);
}
}

protected boolean isCancelled() {
try (AutoLock ignored = lock()) {
return cancelled;
if (subscriber != null) {
if (logger.isDebugEnabled()) {
logger.debug("{} cancelled subscription from {}", this, subscriber);
}
}
// Null-out the field to allow re-subscriptions.
subscriber = null;
}
}

Expand Down
Loading

0 comments on commit b32ace4

Please sign in to comment.