diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index 9a82c247bca5f5..d9db547c68c451 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -1,5 +1,6 @@ package org.jboss.resteasy.reactive.server.handlers; +import static org.jboss.resteasy.reactive.server.handlers.PublisherResponseHandler.AbstractMultiSubscriber.determineCustomizers; import static org.jboss.resteasy.reactive.server.jaxrs.SseEventSinkImpl.EMPTY_BUFFER; import java.nio.charset.StandardCharsets; @@ -103,7 +104,7 @@ private static class StreamingMultiSubscriber extends AbstractMultiSubscriber { @Override public void onNext(Object item) { - List customizers = determineCustomizers(!hadItem); + List customizers = determineCustomizers(publisher, !hadItem, staticCustomizers); hadItem = true; StreamingUtil.send(requestContext, customizers, item, messagePrefix(), messageSuffix()) .handle((v, t) -> { @@ -125,33 +126,12 @@ public void onNext(Object item) { }); } - private List determineCustomizers(boolean isFirst) { - // we only need to obtain the customizers from the Publisher if it's the first time we are sending data and the Publisher has customizable data - // at this point no matter the type of RestMulti we can safely obtain the headers and status - if (isFirst && (publisher instanceof RestMulti restMulti)) { - Map> headers = restMulti.getHeaders(); - Integer status = restMulti.getStatus(); - if (headers.isEmpty() && (status == null)) { - return staticCustomizers; - } - List result = new ArrayList<>(staticCustomizers.size() + 2); - result.addAll(staticCustomizers); // these are added first so that the result specific values will take precedence if there are conflicts - if (!headers.isEmpty()) { - result.add(new StreamingResponseCustomizer.AddHeadersCustomizer(headers)); - } - if (status != null) { - result.add(new StreamingResponseCustomizer.StatusCustomizer(status)); - } - return result; - } - - return staticCustomizers; - } - @Override public void onComplete() { if (!hadItem) { - StreamingUtil.setHeaders(requestContext, requestContext.serverResponse(), this.determineCustomizers(true)); + StreamingUtil.setHeaders(requestContext, requestContext.serverResponse(), determineCustomizers( + this.publisher, true, + this.staticCustomizers)); } if (json) { String postfix = onCompleteText(); @@ -218,6 +198,31 @@ static abstract class AbstractMultiSubscriber implements Subscriber { }); } + @SuppressWarnings("rawtypes") + protected static List determineCustomizers(Publisher publisher, boolean isFirst, + List staticCustomizers) { + // we only need to obtain the customizers from the Publisher if it's the first time we are sending data and the Publisher has customizable data + // at this point no matter the type of RestMulti we can safely obtain the headers and status + if (isFirst && (publisher instanceof RestMulti restMulti)) { + Map> headers = restMulti.getHeaders(); + Integer status = restMulti.getStatus(); + if (headers.isEmpty() && (status == null)) { + return staticCustomizers; + } + List result = new ArrayList<>(staticCustomizers.size() + 2); + result.addAll(staticCustomizers); // these are added first so that the result specific values will take precedence if there are conflicts + if (!headers.isEmpty()) { + result.add(new StreamingResponseCustomizer.AddHeadersCustomizer(headers)); + } + if (status != null) { + result.add(new StreamingResponseCustomizer.StatusCustomizer(status)); + } + return result; + } + + return staticCustomizers; + } + @Override public void onSubscribe(Subscription s) { this.subscription = s; @@ -343,7 +348,8 @@ private void handleSse(ResteasyReactiveRequestContext requestContext, Publisher< demand = 1L; } - SseUtil.setHeaders(requestContext, requestContext.serverResponse(), streamingResponseCustomizers); + List customizers = determineCustomizers(result, true, streamingResponseCustomizers); + SseUtil.setHeaders(requestContext, requestContext.serverResponse(), customizers); requestContext.suspend(); requestContext.serverResponse().write(EMPTY_BUFFER, throwable -> { if (throwable == null) {