diff --git a/pom.xml b/pom.xml
index 0d4ffa9d..a279aa7f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,10 @@
org.springframework.boot
spring-boot-starter-actuator
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+
diff --git a/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java b/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java
new file mode 100644
index 00000000..74dfcce7
--- /dev/null
+++ b/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java
@@ -0,0 +1,36 @@
+package org.gridsuite.loadflow.server.config;
+
+import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Configuration
+public class RabbitConsumerConfig {
+ /*
+ * RabbitMQ consumer priority:
+ * https://www.rabbitmq.com/docs/consumer-priority
+ *
+ * Each container creates exactly one AMQP consumer with prefetch=1 and its own priority.
+ * When dispatching messages, RabbitMQ always selects the highest-priority consumer
+ * that is available.
+ */
+ @Bean
+ public ListenerContainerCustomizer customizer() {
+ /*
+ * Using AtomicInteger as in org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java
+ * We expect cloud stream to call our customizer exactly once in order for each container so it will produce a sequence of increasing priorities
+ */
+ AtomicInteger index = new AtomicInteger();
+ return (container, destination, group) -> {
+ if (container instanceof SimpleMessageListenerContainer smlc && Objects.equals(group, "loadflowGroup")) {
+ smlc.setConsumerArguments(Map.of("x-priority", index.getAndIncrement()));
+ }
+ };
+ }
+}
diff --git a/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java b/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java
index 10558123..2e64dcba 100644
--- a/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java
+++ b/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java
@@ -545,9 +545,33 @@ public static String getNextLimitName(LimitViolationInfos limitViolationInfos, N
return temporaryLimit != null ? temporaryLimit.getName() : null;
}
+ /*
+ * Spring Cloud Stream does not allow customizing each consumer within a single listener
+ * container (i.e. when concurrency = N)
+ *
+ * Since we need to customize each consumer individually, we simulate "concurrency = N"
+ * by creating N listener containers, each with concurrency = 1.
+ *
+ * This requires defining one Consumer bean per container, which explains
+ * the duplicated methods below.
+ */
@Bean
- @Override
- public Consumer> consumeRun() {
+ public Consumer> consumeRun1() {
+ return super.consumeRun();
+ }
+
+ @Bean
+ public Consumer> consumeRun2() {
+ return super.consumeRun();
+ }
+
+ @Bean
+ public Consumer> consumeRun3() {
+ return super.consumeRun();
+ }
+
+ @Bean
+ public Consumer> consumeRun4() {
return super.consumeRun();
}
diff --git a/src/main/resources/config/application.yaml b/src/main/resources/config/application.yaml
index 3d8db43a..1ec238e1 100644
--- a/src/main/resources/config/application.yaml
+++ b/src/main/resources/config/application.yaml
@@ -4,15 +4,25 @@ spring:
cloud:
function:
- definition: consumeRun;consumeCancel
+ definition: consumeRun1;consumeRun2;consumeRun3;consumeRun4;consumeCancel
stream:
bindings:
- consumeRun-in-0:
+ # Spring Cloud Stream does not allow customizing each consumer within a single listener
+ # container (i.e. when concurrency = N)
+ #
+ # Since we need to customize each consumer individually, we simulate "concurrency = N"
+ # by creating N listener containers, each with concurrency = 1.
+ consumeRun1-in-0: &consumeRunConfig
destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run
group: loadflowGroup
consumer:
- concurrency: 4
max-attempts: 1
+ consumeRun2-in-0:
+ <<: *consumeRunConfig
+ consumeRun3-in-0:
+ <<: *consumeRunConfig
+ consumeRun4-in-0:
+ <<: *consumeRunConfig
publishRun-out-0:
destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run
publishResult-out-0:
@@ -28,7 +38,8 @@ spring:
output-bindings: publishRun-out-0;publishResult-out-0;publishCancel-out-0;publishStopped-out-0;publishCancelFailed-out-0
rabbit:
bindings:
- consumeRun-in-0:
+ # See comment on spring.cloud.stream.bindings.consumeRun1-in-0
+ consumeRun1-in-0: &consumeRunRabbitConfig
consumer:
auto-bind-dlq: true
dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx
@@ -37,6 +48,12 @@ spring:
quorum:
enabled: true
delivery-limit: 2
+ consumeRun2-in-0:
+ <<: *consumeRunRabbitConfig
+ consumeRun3-in-0:
+ <<: *consumeRunRabbitConfig
+ consumeRun4-in-0:
+ <<: *consumeRunRabbitConfig
powsybl-ws:
database:
name: loadflow
diff --git a/src/test/resources/application-default.yml b/src/test/resources/application-default.yml
index 9762ed63..f8ac484c 100644
--- a/src/test/resources/application-default.yml
+++ b/src/test/resources/application-default.yml
@@ -8,6 +8,10 @@ spring:
hibernate:
#to turn off schema validation that fails (because of clob types) and blocks tests even if the the schema is compatible
ddl-auto: none
+ cloud:
+ function:
+ # disable consumeRun2/3/4 during test - all of them receive the "loadflowGroup" messages otherwise
+ definition: consumeRun1;consumeCancel
logging:
level: