From 0f67683abf3583d3b58afae3b9dbe50c77bf3fbb Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Thu, 23 Oct 2025 18:10:54 -0400 Subject: [PATCH 1/2] Add ability to set observeOn buffer size per stage We've found that these buffers are very important tuning parameters. Additionally, they are very stage specific. Sometimes, you need want small buffers and other times large buffers depending on the performance characteristics of the stage. Setting them all via the default RX configuration is not a strong strategy. Especially because that needs to be set when mantis-agent spins up; not necessarily when the job spins up. This commit is mostly plumbing to get the buffer size to the stage executors. --- .../io/mantisrx/runtime/GroupToGroup.java | 12 ++++++++- .../io/mantisrx/runtime/GroupToScalar.java | 12 ++++++++- .../java/io/mantisrx/runtime/KeyToKey.java | 12 ++++++++- .../java/io/mantisrx/runtime/KeyToScalar.java | 12 ++++++++- .../mantisrx/runtime/KeyValueStageConfig.java | 8 ++++-- .../io/mantisrx/runtime/ScalarToGroup.java | 12 ++++++++- .../java/io/mantisrx/runtime/ScalarToKey.java | 12 ++++++++- .../io/mantisrx/runtime/ScalarToScalar.java | 12 ++++++++- .../java/io/mantisrx/runtime/StageConfig.java | 25 +++++++++++++++---- .../runtime/executor/StageExecutors.java | 21 ++++++++-------- .../sink/ServerSentEventRequestHandler.java | 11 -------- 11 files changed, 114 insertions(+), 35 deletions(-) diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToGroup.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToGroup.java index dc414ca29..5ae88bd3f 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToGroup.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToGroup.java @@ -53,7 +53,7 @@ public class GroupToGroup extends KeyValueStageConfig { GroupToGroup(GroupComputation computation, Config config, Codec inputKeyCodec, Codec inputCodec) { - super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters); + super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -76,6 +76,7 @@ public static class Config { // always assume a stateful calculation is being made // do not allow config to override private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); /** @@ -135,6 +136,15 @@ public Config withParameters(List> params) return this; } + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } + } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java index cfb4cf35a..a99927f5a 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java @@ -56,7 +56,7 @@ public class GroupToScalar extends StageConfig { GroupToScalar(GroupToScalarComputation computation, Config config, Codec inputKeyCodec, Codec inputCodec) { - super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency); + super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -79,6 +79,7 @@ public static class Config { // do not allow config override private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; private int concurrency = DEFAULT_STAGE_CONCURRENCY; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); /** @@ -149,6 +150,15 @@ public Config withParameters(List> params) { return this; } + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } + } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java index 5760934cd..0c05e06f3 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java @@ -54,7 +54,7 @@ public class KeyToKey extends KeyValueStageConfig { KeyToKey(KeyComputation computation, Config config, Codec inputKeyCodec, Codec inputCodec) { - super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters); + super(config.description, inputKeyCodec, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -78,6 +78,7 @@ public static class Config { // always assume a stateful calculation is being made // do not allow config to override private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); /** @@ -136,6 +137,15 @@ public Config withParameters(List> params) this.parameters = params; return this; } + + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToScalar.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToScalar.java index d7cb9ed25..927914a76 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToScalar.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToScalar.java @@ -45,7 +45,7 @@ public class KeyToScalar extends StageConfig { KeyToScalar(ToScalarComputation computation, Config config, Codec inputKeyCodec, Codec inputCodec) { - super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters); + super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -67,6 +67,7 @@ public static class Config { // 'stateful group calculation' use case // do not allow config override private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); /** @@ -117,6 +118,15 @@ public Config withParameters(List> params) { return this; } + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } + } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java index 4e0cd6907..2cd1f6b44 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java @@ -33,11 +33,15 @@ public abstract class KeyValueStageConfig extends StageConfig { private final Codec keyCodec; public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params) { - this(description, inputKeyCodec, inputCodec, outputKeyCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY); + this(description, inputKeyCodec, inputCodec, outputKeyCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY, rx.internal.util.RxRingBuffer.SIZE); } public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency) { - super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency); + this(description, inputKeyCodec, inputCodec, outputKeyCodec, outputCodec, inputStrategy, params, concurrency, rx.internal.util.RxRingBuffer.SIZE); + } + + public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency, int bufferSize) { + super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency, bufferSize); this.keyCodec = outputKeyCodec; } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java index 7fb2a3206..c85f90887 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java @@ -54,7 +54,7 @@ public class ScalarToGroup extends KeyValueStageConfig { public ScalarToGroup(ToGroupComputation computation, Config config, Codec inputCodec) { - super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency); + super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; @@ -77,6 +77,7 @@ public static class Config { // default input type is concurrent for 'grouping' use case private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.CONCURRENT; private int concurrency = DEFAULT_STAGE_CONCURRENCY; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private long keyExpireTimeSeconds = Long.MAX_VALUE; // never expire by default private List> parameters = Collections.emptyList(); @@ -155,5 +156,14 @@ public Config withParameters(List> params) { this.parameters = params; return this; } + + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToKey.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToKey.java index 0a2e10425..c3af90a0e 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToKey.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToKey.java @@ -51,7 +51,7 @@ public class ScalarToKey extends KeyValueStageConfig { ScalarToKey(ToKeyComputation computation, Config config, Codec inputCodec) { - super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters); + super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, DEFAULT_STAGE_CONCURRENCY, config.bufferSize); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -72,6 +72,7 @@ public static class Config { // default input type is concurrent for 'grouping' use case private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.CONCURRENT; private long keyExpireTimeSeconds = Long.MAX_VALUE; // never expire by default + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); /** @@ -140,5 +141,14 @@ public Config withParameters(List> params) { this.parameters = params; return this; } + + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java index 7ba4309e2..42d5b2db4 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java @@ -42,7 +42,7 @@ public class ScalarToScalar extends StageConfig { public ScalarToScalar(ScalarComputation computation, Config config, Codec inputCodec) { - super(config.description, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency); + super(config.description, null, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.bufferSize); this.computation = computation; this.inputStrategy = config.inputStrategy; this.parameters = config.parameters; @@ -68,6 +68,7 @@ public static class Config { // default input type is serial for 'collecting' use case private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; private volatile int concurrency = StageConfig.DEFAULT_STAGE_CONCURRENCY; + private int bufferSize = rx.internal.util.RxRingBuffer.SIZE; private List> parameters = Collections.emptyList(); @@ -129,5 +130,14 @@ public INPUT_STRATEGY getInputStrategy() { public int getConcurrency() { return concurrency; } + + public Config bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public int getBufferSize() { + return bufferSize; + } } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java index a920c1987..82c1912f7 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java @@ -21,6 +21,7 @@ import io.mantisrx.runtime.parameter.ParameterDefinition; import java.util.Collections; import java.util.List; +import rx.internal.util.RxRingBuffer; public abstract class StageConfig { @@ -40,35 +41,44 @@ public abstract class StageConfig { // number of inner observables processed private int concurrency = DEFAULT_STAGE_CONCURRENCY; + // buffer size for observeOn scheduler, defaults to RxRingBuffer.SIZE + private int bufferSize = RxRingBuffer.SIZE; + public StageConfig(String description, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy) { - this(description, inputCodec, outputCodec, inputStrategy, Collections.emptyList(), DEFAULT_STAGE_CONCURRENCY); + this(description, null, inputCodec, outputCodec, inputStrategy, Collections.emptyList(), DEFAULT_STAGE_CONCURRENCY, RxRingBuffer.SIZE); } public StageConfig(String description, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params) { - this(description, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY); + this(description, null, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY, RxRingBuffer.SIZE); } public StageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params) { - this(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY); + this(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY, RxRingBuffer.SIZE); } public StageConfig(String description, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, int concurrency) { - this(description, inputCodec, outputCodec, inputStrategy, Collections.emptyList(), concurrency); + this(description, null, inputCodec, outputCodec, inputStrategy, Collections.emptyList(), concurrency, RxRingBuffer.SIZE); } public StageConfig(String description, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency) { - this(description, null, inputCodec, outputCodec, inputStrategy, params, concurrency); + this(description, null, inputCodec, outputCodec, inputStrategy, params, concurrency, RxRingBuffer.SIZE); } public StageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency) { + this(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency, RxRingBuffer.SIZE); + } + + public StageConfig(String description, Codec inputKeyCodec, Codec inputCodec, + Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, + int concurrency, int bufferSize) { this.description = description; this.inputKeyCodec = inputKeyCodec; this.inputCodec = inputCodec; @@ -76,6 +86,7 @@ public StageConfig(String description, Codec inputKeyCodec, Codec inpu this.inputStrategy = inputStrategy; this.parameters = params; this.concurrency = concurrency; + this.bufferSize = bufferSize; } public String getDescription() { @@ -109,5 +120,9 @@ public int getConcurrency() { return concurrency; } + public int getBufferSize() { + return bufferSize; + } + public enum INPUT_STRATEGY {NONE_SPECIFIED, SERIAL, CONCURRENT} } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java index 96458dde9..bdc25ab25 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/StageExecutors.java @@ -204,7 +204,7 @@ private static Observable> executeMantisGroups(Observabl */ @SuppressWarnings("unchecked") private static Observable> executeMantisGroupsInParallel(Observable>> go, Computation computation, - final Context context, final boolean applyTimeoutToInners, final long timeout, final int concurrency) { + final Context context, final boolean applyTimeoutToInners, final long timeout, final int concurrency, final int bufferSize) { logger.info("initializing {}", computation.getClass().getCanonicalName()); computation.init(context); @@ -218,7 +218,7 @@ private static Observable> executeMantisGroupsInParallel .lift(new MonitorOperator<>("worker_stage_outer")) .map(observable -> c .call(context, observable - .observeOn(Schedulers.computation()) + .observeOn(Schedulers.computation(), bufferSize) .lift(new MonitorOperator<>("worker_stage_inner_input"))) .lift(new MonitorOperator<>("worker_stage_inner_output"))); @@ -238,7 +238,7 @@ private static Observable> executeMantisGroupsInParallel .groupBy(e -> Math.abs(e.getKeyValue().hashCode()) % concurrency) .flatMap(gbo -> c .call(context, gbo - .observeOn(mantisRxSingleThreadSchedulers[gbo.getKey()]) + .observeOn(mantisRxSingleThreadSchedulers[gbo.getKey()], bufferSize) .lift(new MonitorOperator>("worker_stage_inner_input"))) .lift(new MonitorOperator("worker_stage_inner_output")))); } @@ -277,7 +277,7 @@ private static Observable> executeInners(Observable Observable> executeInnersInParallel(Observable> oo, Computation computation, - final Context context, final boolean applyTimeoutToInners, final long timeout, final int concurrency) { + final Context context, final boolean applyTimeoutToInners, final long timeout, final int concurrency, final int bufferSize) { logger.info("initializing {}", computation.getClass().getCanonicalName()); computation.init(context); @@ -290,7 +290,7 @@ private static Observable> executeInnersInParallel(Observab .lift(new MonitorOperator<>("worker_stage_outer")) .map(observable -> c .call(context, observable - .observeOn(Schedulers.computation()) + .observeOn(Schedulers.computation(), bufferSize) .lift(new MonitorOperator("worker_stage_inner_input"))) .lift(new MonitorOperator("worker_stage_inner_output"))); } else { @@ -307,7 +307,7 @@ private static Observable> executeInnersInParallel(Observab .flatMap(go -> c .call(context, go - .observeOn(mantisRxSingleThreadSchedulers[go.getKey().intValue()]) + .observeOn(mantisRxSingleThreadSchedulers[go.getKey().intValue()], bufferSize) .lift(new MonitorOperator<>("worker_stage_inner_input"))) .lift(new MonitorOperator<>("worker_stage_inner_output")))); } @@ -352,7 +352,8 @@ private static Observable> setupScalarToScalarStage(ScalarT context, false, Integer.MAX_VALUE, - resolveStageConcurrency(context, stage.getConcurrency())); + resolveStageConcurrency(context, stage.getConcurrency()), + stage.getBufferSize()); } else if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) { Observable> merged = Observable.just(Observable.merge(source)); return executeInners(merged, stage.getComputation(), context, false, Integer.MAX_VALUE); @@ -368,7 +369,7 @@ private static Observable>> se // check if job overrides the default input strategy if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) { return executeInnersInParallel(source, stage.getComputation(), context, true, - stage.getKeyExpireTimeSeconds(), resolveStageConcurrency(context, stage.getConcurrency())); + stage.getKeyExpireTimeSeconds(), resolveStageConcurrency(context, stage.getConcurrency()), stage.getBufferSize()); } else if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) { Observable> merged = Observable.just(Observable.merge(source)); return executeInners(merged, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds()); @@ -385,7 +386,7 @@ private static Observable>> setupSca // check if job overrides the default input strategy if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) { return executeInnersInParallel(source, stage.getComputation(), context, true, - stage.getKeyExpireTimeSeconds(),resolveStageConcurrency(context, stage.getConcurrency())); + stage.getKeyExpireTimeSeconds(),resolveStageConcurrency(context, stage.getConcurrency()), stage.getBufferSize()); } else if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) { Observable> merged = Observable.just(Observable.merge(source)); return executeInners(merged, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds()); @@ -446,7 +447,7 @@ private static Observable> setupGroupToScalarStage(Group if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) { logger.info("Execute Groups in PARALLEL!!!!"); return executeMantisGroupsInParallel(source, stage.getComputation(), context, true, - stage.getKeyExpireTimeSeconds(),resolveStageConcurrency(context, stage.getConcurrency())); + stage.getKeyExpireTimeSeconds(),resolveStageConcurrency(context, stage.getConcurrency()), stage.getBufferSize()); } else if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) { Observable>> merged = Observable.just(Observable.merge(source)); diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventRequestHandler.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventRequestHandler.java index 39cf2abb3..8f3e49698 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventRequestHandler.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventRequestHandler.java @@ -127,17 +127,6 @@ public Observable handle(HttpServerRequest request, // copy reference, then apply request specific filters, sampling Observable requestObservable = observableToServe; - // decouple the observable on a separate thread and add backpressure handling - String decoupleSSE = "false";//ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("sse.decouple", "false"); - //TODO Below condition would be always false during if condition. - // Since decoupleSSE would be false and matching with true as string - // would always ignore code inside if block - if ("true".equals(decoupleSSE)) { - final BasicTag sockAddrTag = new BasicTag("sockAddr", Optional.ofNullable(socketAddrStr).orElse("none")); - requestObservable = requestObservable - .lift(new DropOperator<>("outgoing_ServerSentEventRequestHandler", sockAddrTag)) - .observeOn(Schedulers.io()); - } response.getHeaders().set("Access-Control-Allow-Origin", "*"); response.getHeaders().set("content-type", "text/event-stream"); response.getHeaders().set("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate"); From 3254f911d491287da8b8762ea4b6aa7c2d48eb10 Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Thu, 23 Oct 2025 18:29:01 -0400 Subject: [PATCH 2/2] clean up --- .../io/mantisrx/runtime/KeyValueStageConfig.java | 2 +- .../main/java/io/mantisrx/runtime/StageConfig.java | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java index 2cd1f6b44..e48106d31 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java @@ -33,7 +33,7 @@ public abstract class KeyValueStageConfig extends StageConfig { private final Codec keyCodec; public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params) { - this(description, inputKeyCodec, inputCodec, outputKeyCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY, rx.internal.util.RxRingBuffer.SIZE); + this(description, inputKeyCodec, inputCodec, outputKeyCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY); } public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency) { diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java index 82c1912f7..f7edc1051 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java @@ -42,32 +42,32 @@ public abstract class StageConfig { private int concurrency = DEFAULT_STAGE_CONCURRENCY; // buffer size for observeOn scheduler, defaults to RxRingBuffer.SIZE - private int bufferSize = RxRingBuffer.SIZE; + private final int bufferSize; public StageConfig(String description, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy) { - this(description, null, inputCodec, outputCodec, inputStrategy, Collections.emptyList(), DEFAULT_STAGE_CONCURRENCY, RxRingBuffer.SIZE); + this(description, inputCodec, outputCodec, inputStrategy, Collections.emptyList(), DEFAULT_STAGE_CONCURRENCY); } public StageConfig(String description, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params) { - this(description, null, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY, RxRingBuffer.SIZE); + this(description, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY); } public StageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params) { - this(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY, RxRingBuffer.SIZE); + this(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY); } public StageConfig(String description, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, int concurrency) { - this(description, null, inputCodec, outputCodec, inputStrategy, Collections.emptyList(), concurrency, RxRingBuffer.SIZE); + this(description, inputCodec, outputCodec, inputStrategy, Collections.emptyList(), concurrency); } public StageConfig(String description, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency) { - this(description, null, inputCodec, outputCodec, inputStrategy, params, concurrency, RxRingBuffer.SIZE); + this(description, null, inputCodec, outputCodec, inputStrategy, params, concurrency); } public StageConfig(String description, Codec inputKeyCodec, Codec inputCodec,