Skip to content

Commit

Permalink
Take RestMulti headers and status into account when using SSE resourc…
Browse files Browse the repository at this point in the history
…e method
  • Loading branch information
geoand committed Dec 4, 2024
1 parent 316424b commit 8334aa1
Showing 1 changed file with 32 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jboss.resteasy.reactive.server.handlers;

import static org.jboss.resteasy.reactive.server.handlers.PublisherResponseHandler.AbstractMultiSubscriber.determineCustomizers;
import static org.jboss.resteasy.reactive.server.jaxrs.SseEventSinkImpl.EMPTY_BUFFER;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -103,7 +104,7 @@ private static class StreamingMultiSubscriber extends AbstractMultiSubscriber {

@Override
public void onNext(Object item) {
List<StreamingResponseCustomizer> customizers = determineCustomizers(!hadItem);
List<StreamingResponseCustomizer> customizers = determineCustomizers(publisher, !hadItem, staticCustomizers);
hadItem = true;
StreamingUtil.send(requestContext, customizers, item, messagePrefix(), messageSuffix())
.handle((v, t) -> {
Expand All @@ -125,33 +126,12 @@ public void onNext(Object item) {
});
}

private List<StreamingResponseCustomizer> determineCustomizers(boolean isFirst) {
// we only need to obtain the customizers from the Publisher if it's the first time we are sending data and the Publisher has customizable data
// at this point no matter the type of RestMulti we can safely obtain the headers and status
if (isFirst && (publisher instanceof RestMulti<?> restMulti)) {
Map<String, List<String>> headers = restMulti.getHeaders();
Integer status = restMulti.getStatus();
if (headers.isEmpty() && (status == null)) {
return staticCustomizers;
}
List<StreamingResponseCustomizer> result = new ArrayList<>(staticCustomizers.size() + 2);
result.addAll(staticCustomizers); // these are added first so that the result specific values will take precedence if there are conflicts
if (!headers.isEmpty()) {
result.add(new StreamingResponseCustomizer.AddHeadersCustomizer(headers));
}
if (status != null) {
result.add(new StreamingResponseCustomizer.StatusCustomizer(status));
}
return result;
}

return staticCustomizers;
}

@Override
public void onComplete() {
if (!hadItem) {
StreamingUtil.setHeaders(requestContext, requestContext.serverResponse(), this.determineCustomizers(true));
StreamingUtil.setHeaders(requestContext, requestContext.serverResponse(), determineCustomizers(
this.publisher, true,
this.staticCustomizers));
}
if (json) {
String postfix = onCompleteText();
Expand Down Expand Up @@ -218,6 +198,31 @@ static abstract class AbstractMultiSubscriber implements Subscriber<Object> {
});
}

@SuppressWarnings("rawtypes")
protected static List<StreamingResponseCustomizer> determineCustomizers(Publisher publisher, boolean isFirst,
List<StreamingResponseCustomizer> staticCustomizers) {
// we only need to obtain the customizers from the Publisher if it's the first time we are sending data and the Publisher has customizable data
// at this point no matter the type of RestMulti we can safely obtain the headers and status
if (isFirst && (publisher instanceof RestMulti<?> restMulti)) {
Map<String, List<String>> headers = restMulti.getHeaders();
Integer status = restMulti.getStatus();
if (headers.isEmpty() && (status == null)) {
return staticCustomizers;
}
List<StreamingResponseCustomizer> result = new ArrayList<>(staticCustomizers.size() + 2);
result.addAll(staticCustomizers); // these are added first so that the result specific values will take precedence if there are conflicts
if (!headers.isEmpty()) {
result.add(new StreamingResponseCustomizer.AddHeadersCustomizer(headers));
}
if (status != null) {
result.add(new StreamingResponseCustomizer.StatusCustomizer(status));
}
return result;
}

return staticCustomizers;
}

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
Expand Down Expand Up @@ -343,7 +348,8 @@ private void handleSse(ResteasyReactiveRequestContext requestContext, Publisher<
demand = 1L;
}

SseUtil.setHeaders(requestContext, requestContext.serverResponse(), streamingResponseCustomizers);
List<StreamingResponseCustomizer> customizers = determineCustomizers(result, true, streamingResponseCustomizers);
SseUtil.setHeaders(requestContext, requestContext.serverResponse(), customizers);
requestContext.suspend();
requestContext.serverResponse().write(EMPTY_BUFFER, throwable -> {
if (throwable == null) {
Expand Down

0 comments on commit 8334aa1

Please sign in to comment.