From 3dbb780a6f28b06f6c5c826921732f88e4e779c5 Mon Sep 17 00:00:00 2001 From: LE SAULNIER Kevin Date: Thu, 5 Feb 2026 09:59:31 +0100 Subject: [PATCH 1/6] POC for rabbitmq loadbalancing Signed-off-by: LE SAULNIER Kevin --- pom.xml | 4 + .../server/config/CustomConsumersFactory.java | 98 +++++++++++++++++++ .../service/LoadFlowMessageListener.java | 47 +++++++++ src/main/resources/config/application.yaml | 19 +--- 4 files changed, 150 insertions(+), 18 deletions(-) create mode 100644 src/main/java/org/gridsuite/loadflow/server/config/CustomConsumersFactory.java create mode 100644 src/main/java/org/gridsuite/loadflow/server/service/LoadFlowMessageListener.java diff --git a/pom.xml b/pom.xml index fb327634..09968280 100644 --- a/pom.xml +++ b/pom.xml @@ -175,6 +175,10 @@ org.springframework.boot spring-boot-starter-actuator + + org.springframework.boot + spring-boot-starter-amqp + diff --git a/src/main/java/org/gridsuite/loadflow/server/config/CustomConsumersFactory.java b/src/main/java/org/gridsuite/loadflow/server/config/CustomConsumersFactory.java new file mode 100644 index 00000000..7a3f582a --- /dev/null +++ b/src/main/java/org/gridsuite/loadflow/server/config/CustomConsumersFactory.java @@ -0,0 +1,98 @@ +package org.gridsuite.loadflow.server.config; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import org.gridsuite.loadflow.server.service.LoadFlowMessageListener; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.annotation.Bean; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Component +public class CustomConsumersFactory { + + private final ConnectionFactory connectionFactory; + private final LoadFlowMessageListener loadflowMessageListener; + + private final List containers = new ArrayList<>(); + + public CustomConsumersFactory( + ConnectionFactory connectionFactory, + LoadFlowMessageListener loadflowMessageListener + ) { + this.connectionFactory = connectionFactory; + this.loadflowMessageListener = loadflowMessageListener; + } + + @Bean + public Queue loadflowRunQueue() { + return QueueBuilder.durable("loadflowGroup").build(); + } + + @Bean + public TopicExchange loadflowExchange() { + return new TopicExchange("loadflow.run", true, false); + } + + @Bean + Binding loadflowRunBinding(Queue loadflowRunQueue, + TopicExchange loadflowExchange) { + return BindingBuilder + .bind(loadflowRunQueue) + .to(loadflowExchange) + .with("#"); + } + + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory cf) { + return new RabbitAdmin(cf); + } + + @PostConstruct + public void start() { + SimpleMessageListenerContainer consumerContainer = + new SimpleMessageListenerContainer(connectionFactory); + + consumerContainer.setQueueNames("loadflowGroup"); + consumerContainer.setConcurrentConsumers(1); + consumerContainer.setPrefetchCount(1); + consumerContainer.setBeanName("run-loadflow1"); + consumerContainer.setMessageListener(loadflowMessageListener); + consumerContainer.setConsumerArguments(Map.of("x-priority", 3)); + consumerContainer.setAutoStartup(false); + consumerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); + containers.add(consumerContainer); + + consumerContainer = + new SimpleMessageListenerContainer(connectionFactory); + + consumerContainer.setQueueNames("loadflowGroup"); + consumerContainer.setConcurrentConsumers(1); + consumerContainer.setPrefetchCount(1); + consumerContainer.setBeanName("run-loadflow2"); + consumerContainer.setMessageListener(loadflowMessageListener); + consumerContainer.setConsumerArguments(Map.of("x-priority", 2)); + consumerContainer.setAutoStartup(false); + consumerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); + + containers.add(consumerContainer); + } + + @EventListener(ApplicationReadyEvent.class) + public void startContainers() { + containers.forEach(SimpleMessageListenerContainer::start); + } + + @PreDestroy + public void stop() { + containers.forEach(SimpleMessageListenerContainer::stop); + } +} diff --git a/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowMessageListener.java b/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowMessageListener.java new file mode 100644 index 00000000..a96cbbfc --- /dev/null +++ b/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowMessageListener.java @@ -0,0 +1,47 @@ +package org.gridsuite.loadflow.server.service; + +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +@Component +public class LoadFlowMessageListener implements ChannelAwareMessageListener { + private final LoadFlowWorkerService loadFlowWorkerService; + + public LoadFlowMessageListener(LoadFlowWorkerService loadFlowWorkerService) { + this.loadFlowWorkerService = loadFlowWorkerService; + } + + @Override + public void onMessage(Message message, Channel channel) throws IOException { + long tag = message.getMessageProperties().getDeliveryTag(); + try { + // payload + String payload = new String( + message.getBody(), + StandardCharsets.UTF_8 + ); + + // headers + org.springframework.messaging.Message springMessage = MessageBuilder + .withPayload(payload) + .copyHeaders(message.getMessageProperties().getHeaders()) + .build(); + + loadFlowWorkerService.consumeRun().accept(springMessage); + + channel.basicAck(tag, false); + } catch (Exception e) { + + channel.basicReject(tag, false); + + throw e; + } + + } +} diff --git a/src/main/resources/config/application.yaml b/src/main/resources/config/application.yaml index 3d8db43a..3899f1d3 100644 --- a/src/main/resources/config/application.yaml +++ b/src/main/resources/config/application.yaml @@ -4,15 +4,9 @@ spring: cloud: function: - definition: consumeRun;consumeCancel + definition: consumeCancel stream: bindings: - consumeRun-in-0: - destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run - group: loadflowGroup - consumer: - concurrency: 4 - max-attempts: 1 publishRun-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run publishResult-out-0: @@ -26,17 +20,6 @@ spring: publishCancelFailed-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.cancelfailed output-bindings: publishRun-out-0;publishResult-out-0;publishCancel-out-0;publishStopped-out-0;publishCancelFailed-out-0 - rabbit: - bindings: - consumeRun-in-0: - consumer: - auto-bind-dlq: true - dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx - dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx.dlq - dead-letter-exchange-type: topic - quorum: - enabled: true - delivery-limit: 2 powsybl-ws: database: name: loadflow From 701d5cac46ba6aafa42052dbe516083cedf3eef9 Mon Sep 17 00:00:00 2001 From: LE SAULNIER Kevin Date: Fri, 6 Feb 2026 13:16:10 +0100 Subject: [PATCH 2/6] working solution - need refactor Signed-off-by: LE SAULNIER Kevin --- pom.xml | 1 - .../server/config/CustomConsumersFactory.java | 98 ------------------- .../server/config/RabbitConsumerConfig.java | 24 +++++ .../service/LoadFlowMessageListener.java | 47 --------- .../server/service/LoadFlowWorkerService.java | 18 +++- src/main/resources/config/application.yaml | 60 +++++++++++- 6 files changed, 99 insertions(+), 149 deletions(-) delete mode 100644 src/main/java/org/gridsuite/loadflow/server/config/CustomConsumersFactory.java create mode 100644 src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java delete mode 100644 src/main/java/org/gridsuite/loadflow/server/service/LoadFlowMessageListener.java diff --git a/pom.xml b/pom.xml index 09968280..bc756580 100644 --- a/pom.xml +++ b/pom.xml @@ -169,7 +169,6 @@ org.springframework.cloud spring-cloud-stream-binder-rabbit - runtime org.springframework.boot diff --git a/src/main/java/org/gridsuite/loadflow/server/config/CustomConsumersFactory.java b/src/main/java/org/gridsuite/loadflow/server/config/CustomConsumersFactory.java deleted file mode 100644 index 7a3f582a..00000000 --- a/src/main/java/org/gridsuite/loadflow/server/config/CustomConsumersFactory.java +++ /dev/null @@ -1,98 +0,0 @@ -package org.gridsuite.loadflow.server.config; - -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import org.gridsuite.loadflow.server.service.LoadFlowMessageListener; -import org.springframework.amqp.core.*; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.core.RabbitAdmin; -import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.annotation.Bean; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -@Component -public class CustomConsumersFactory { - - private final ConnectionFactory connectionFactory; - private final LoadFlowMessageListener loadflowMessageListener; - - private final List containers = new ArrayList<>(); - - public CustomConsumersFactory( - ConnectionFactory connectionFactory, - LoadFlowMessageListener loadflowMessageListener - ) { - this.connectionFactory = connectionFactory; - this.loadflowMessageListener = loadflowMessageListener; - } - - @Bean - public Queue loadflowRunQueue() { - return QueueBuilder.durable("loadflowGroup").build(); - } - - @Bean - public TopicExchange loadflowExchange() { - return new TopicExchange("loadflow.run", true, false); - } - - @Bean - Binding loadflowRunBinding(Queue loadflowRunQueue, - TopicExchange loadflowExchange) { - return BindingBuilder - .bind(loadflowRunQueue) - .to(loadflowExchange) - .with("#"); - } - - @Bean - public RabbitAdmin rabbitAdmin(ConnectionFactory cf) { - return new RabbitAdmin(cf); - } - - @PostConstruct - public void start() { - SimpleMessageListenerContainer consumerContainer = - new SimpleMessageListenerContainer(connectionFactory); - - consumerContainer.setQueueNames("loadflowGroup"); - consumerContainer.setConcurrentConsumers(1); - consumerContainer.setPrefetchCount(1); - consumerContainer.setBeanName("run-loadflow1"); - consumerContainer.setMessageListener(loadflowMessageListener); - consumerContainer.setConsumerArguments(Map.of("x-priority", 3)); - consumerContainer.setAutoStartup(false); - consumerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); - containers.add(consumerContainer); - - consumerContainer = - new SimpleMessageListenerContainer(connectionFactory); - - consumerContainer.setQueueNames("loadflowGroup"); - consumerContainer.setConcurrentConsumers(1); - consumerContainer.setPrefetchCount(1); - consumerContainer.setBeanName("run-loadflow2"); - consumerContainer.setMessageListener(loadflowMessageListener); - consumerContainer.setConsumerArguments(Map.of("x-priority", 2)); - consumerContainer.setAutoStartup(false); - consumerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); - - containers.add(consumerContainer); - } - - @EventListener(ApplicationReadyEvent.class) - public void startContainers() { - containers.forEach(SimpleMessageListenerContainer::start); - } - - @PreDestroy - public void stop() { - containers.forEach(SimpleMessageListenerContainer::stop); - } -} diff --git a/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java b/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java new file mode 100644 index 00000000..363db987 --- /dev/null +++ b/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java @@ -0,0 +1,24 @@ +package org.gridsuite.loadflow.server.config; + +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.cloud.stream.config.ListenerContainerCustomizer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +@Configuration +public class RabbitConsumerConfig { + @Bean + public ListenerContainerCustomizer customizer() { + AtomicInteger index = new AtomicInteger(); + return (container, destination, group) -> { + if (container instanceof SimpleMessageListenerContainer smlc && Objects.equals(destination, "loadflow.run.loadflowGroup")) { + smlc.setConsumerArguments(Map.of("x-priority", index.getAndIncrement())); + } + }; + } +} diff --git a/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowMessageListener.java b/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowMessageListener.java deleted file mode 100644 index a96cbbfc..00000000 --- a/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowMessageListener.java +++ /dev/null @@ -1,47 +0,0 @@ -package org.gridsuite.loadflow.server.service; - -import com.rabbitmq.client.Channel; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; - -@Component -public class LoadFlowMessageListener implements ChannelAwareMessageListener { - private final LoadFlowWorkerService loadFlowWorkerService; - - public LoadFlowMessageListener(LoadFlowWorkerService loadFlowWorkerService) { - this.loadFlowWorkerService = loadFlowWorkerService; - } - - @Override - public void onMessage(Message message, Channel channel) throws IOException { - long tag = message.getMessageProperties().getDeliveryTag(); - try { - // payload - String payload = new String( - message.getBody(), - StandardCharsets.UTF_8 - ); - - // headers - org.springframework.messaging.Message springMessage = MessageBuilder - .withPayload(payload) - .copyHeaders(message.getMessageProperties().getHeaders()) - .build(); - - loadFlowWorkerService.consumeRun().accept(springMessage); - - channel.basicAck(tag, false); - } catch (Exception e) { - - channel.basicReject(tag, false); - - throw e; - } - - } -} diff --git a/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java b/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java index 10558123..a51b6802 100644 --- a/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java +++ b/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java @@ -546,8 +546,22 @@ public static String getNextLimitName(LimitViolationInfos limitViolationInfos, N } @Bean - @Override - public Consumer> consumeRun() { + public Consumer> consumeRun1() { + return super.consumeRun(); + } + + @Bean + public Consumer> consumeRun2() { + return super.consumeRun(); + } + + @Bean + public Consumer> consumeRun3() { + return super.consumeRun(); + } + + @Bean + public Consumer> consumeRun4() { return super.consumeRun(); } diff --git a/src/main/resources/config/application.yaml b/src/main/resources/config/application.yaml index 3899f1d3..9d37337f 100644 --- a/src/main/resources/config/application.yaml +++ b/src/main/resources/config/application.yaml @@ -4,9 +4,29 @@ spring: cloud: function: - definition: consumeCancel + definition: consumeRun1;consumeRun2;consumeRun3;consumeRun4;consumeCancel stream: bindings: + consumeRun1-in-0: + destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run + group: loadflowGroup + consumer: + max-attempts: 1 + consumeRun2-in-0: + destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run + group: loadflowGroup + consumer: + max-attempts: 1 + consumeRun3-in-0: + destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run + group: loadflowGroup + consumer: + max-attempts: 1 + consumeRun4-in-0: + destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run + group: loadflowGroup + consumer: + max-attempts: 1 publishRun-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run publishResult-out-0: @@ -20,6 +40,44 @@ spring: publishCancelFailed-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.cancelfailed output-bindings: publishRun-out-0;publishResult-out-0;publishCancel-out-0;publishStopped-out-0;publishCancelFailed-out-0 + rabbit: + bindings: + consumeRun1-in-0: + consumer: + auto-bind-dlq: true + dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx + dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx.dlq + dead-letter-exchange-type: topic + quorum: + enabled: true + delivery-limit: 2 + consumeRun2-in-0: + consumer: + auto-bind-dlq: true + dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx + dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx.dlq + dead-letter-exchange-type: topic + quorum: + enabled: true + delivery-limit: 2 + consumeRun3-in-0: + consumer: + auto-bind-dlq: true + dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx + dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx.dlq + dead-letter-exchange-type: topic + quorum: + enabled: true + delivery-limit: 2 + consumeRun4-in-0: + consumer: + auto-bind-dlq: true + dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx + dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx.dlq + dead-letter-exchange-type: topic + quorum: + enabled: true + delivery-limit: 2 powsybl-ws: database: name: loadflow From 41fab65736cdeaf1b6e1c1adaedf09ca45cc1a84 Mon Sep 17 00:00:00 2001 From: LE SAULNIER Kevin Date: Fri, 6 Feb 2026 13:30:46 +0100 Subject: [PATCH 3/6] add comment + yaml factorisation Signed-off-by: LE SAULNIER Kevin --- .../server/config/RabbitConsumerConfig.java | 10 +++- src/main/resources/config/application.yaml | 46 ++++--------------- 2 files changed, 17 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java b/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java index 363db987..afd05282 100644 --- a/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java +++ b/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java @@ -12,11 +12,19 @@ @Configuration public class RabbitConsumerConfig { + /* + * RabbitMQ consumer priority: + * https://www.rabbitmq.com/docs/consumer-priority + * + * Each container creates exactly one AMQP consumer with prefetch=1 and its own priority. + * When dispatching messages, RabbitMQ always selects the highest-priority consumer + * that is available. + */ @Bean public ListenerContainerCustomizer customizer() { AtomicInteger index = new AtomicInteger(); return (container, destination, group) -> { - if (container instanceof SimpleMessageListenerContainer smlc && Objects.equals(destination, "loadflow.run.loadflowGroup")) { + if (container instanceof SimpleMessageListenerContainer smlc && Objects.equals(group, "loadflowGroup")) { smlc.setConsumerArguments(Map.of("x-priority", index.getAndIncrement())); } }; diff --git a/src/main/resources/config/application.yaml b/src/main/resources/config/application.yaml index 9d37337f..f7452a0e 100644 --- a/src/main/resources/config/application.yaml +++ b/src/main/resources/config/application.yaml @@ -7,26 +7,17 @@ spring: definition: consumeRun1;consumeRun2;consumeRun3;consumeRun4;consumeCancel stream: bindings: - consumeRun1-in-0: + consumeRun1-in-0: &consumeRunConfig destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run group: loadflowGroup consumer: max-attempts: 1 consumeRun2-in-0: - destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run - group: loadflowGroup - consumer: - max-attempts: 1 + <<: *consumeRunConfig consumeRun3-in-0: - destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run - group: loadflowGroup - consumer: - max-attempts: 1 + <<: *consumeRunConfig consumeRun4-in-0: - destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run - group: loadflowGroup - consumer: - max-attempts: 1 + <<: *consumeRunConfig publishRun-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run publishResult-out-0: @@ -42,7 +33,7 @@ spring: output-bindings: publishRun-out-0;publishResult-out-0;publishCancel-out-0;publishStopped-out-0;publishCancelFailed-out-0 rabbit: bindings: - consumeRun1-in-0: + consumeRun1-in-0: &consumeRunRabbitConfig consumer: auto-bind-dlq: true dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx @@ -52,32 +43,11 @@ spring: enabled: true delivery-limit: 2 consumeRun2-in-0: - consumer: - auto-bind-dlq: true - dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx - dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx.dlq - dead-letter-exchange-type: topic - quorum: - enabled: true - delivery-limit: 2 + <<: *consumeRunRabbitConfig consumeRun3-in-0: - consumer: - auto-bind-dlq: true - dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx - dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx.dlq - dead-letter-exchange-type: topic - quorum: - enabled: true - delivery-limit: 2 + <<: *consumeRunRabbitConfig consumeRun4-in-0: - consumer: - auto-bind-dlq: true - dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx - dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx.dlq - dead-letter-exchange-type: topic - quorum: - enabled: true - delivery-limit: 2 + <<: *consumeRunRabbitConfig powsybl-ws: database: name: loadflow From 235bf33fc66d93135d5c3f1267cc3ac11870b4e8 Mon Sep 17 00:00:00 2001 From: LE SAULNIER Kevin Date: Fri, 6 Feb 2026 13:46:49 +0100 Subject: [PATCH 4/6] revert pom change Signed-off-by: LE SAULNIER Kevin --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index bc756580..09968280 100644 --- a/pom.xml +++ b/pom.xml @@ -169,6 +169,7 @@ org.springframework.cloud spring-cloud-stream-binder-rabbit + runtime org.springframework.boot From 661d6a55716b3ee642274772bf4a2551c9340ee8 Mon Sep 17 00:00:00 2001 From: LE SAULNIER Kevin Date: Fri, 6 Feb 2026 14:11:41 +0100 Subject: [PATCH 5/6] fix tests Signed-off-by: LE SAULNIER Kevin --- src/test/resources/application-default.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/test/resources/application-default.yml b/src/test/resources/application-default.yml index 9762ed63..f8ac484c 100644 --- a/src/test/resources/application-default.yml +++ b/src/test/resources/application-default.yml @@ -8,6 +8,10 @@ spring: hibernate: #to turn off schema validation that fails (because of clob types) and blocks tests even if the the schema is compatible ddl-auto: none + cloud: + function: + # disable consumeRun2/3/4 during test - all of them receive the "loadflowGroup" messages otherwise + definition: consumeRun1;consumeCancel logging: level: From 7a31da1da50388f8dc3e728374219def8451b030 Mon Sep 17 00:00:00 2001 From: LE SAULNIER Kevin Date: Mon, 9 Feb 2026 14:02:19 +0100 Subject: [PATCH 6/6] add comments from PR remarks Signed-off-by: LE SAULNIER Kevin --- .../loadflow/server/config/RabbitConsumerConfig.java | 4 ++++ .../loadflow/server/service/LoadFlowWorkerService.java | 10 ++++++++++ src/main/resources/config/application.yaml | 6 ++++++ 3 files changed, 20 insertions(+) diff --git a/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java b/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java index afd05282..74dfcce7 100644 --- a/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java +++ b/src/main/java/org/gridsuite/loadflow/server/config/RabbitConsumerConfig.java @@ -22,6 +22,10 @@ public class RabbitConsumerConfig { */ @Bean public ListenerContainerCustomizer customizer() { + /* + * Using AtomicInteger as in org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java + * We expect cloud stream to call our customizer exactly once in order for each container so it will produce a sequence of increasing priorities + */ AtomicInteger index = new AtomicInteger(); return (container, destination, group) -> { if (container instanceof SimpleMessageListenerContainer smlc && Objects.equals(group, "loadflowGroup")) { diff --git a/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java b/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java index a51b6802..2e64dcba 100644 --- a/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java +++ b/src/main/java/org/gridsuite/loadflow/server/service/LoadFlowWorkerService.java @@ -545,6 +545,16 @@ public static String getNextLimitName(LimitViolationInfos limitViolationInfos, N return temporaryLimit != null ? temporaryLimit.getName() : null; } + /* + * Spring Cloud Stream does not allow customizing each consumer within a single listener + * container (i.e. when concurrency = N) + * + * Since we need to customize each consumer individually, we simulate "concurrency = N" + * by creating N listener containers, each with concurrency = 1. + * + * This requires defining one Consumer bean per container, which explains + * the duplicated methods below. + */ @Bean public Consumer> consumeRun1() { return super.consumeRun(); diff --git a/src/main/resources/config/application.yaml b/src/main/resources/config/application.yaml index f7452a0e..1ec238e1 100644 --- a/src/main/resources/config/application.yaml +++ b/src/main/resources/config/application.yaml @@ -7,6 +7,11 @@ spring: definition: consumeRun1;consumeRun2;consumeRun3;consumeRun4;consumeCancel stream: bindings: + # Spring Cloud Stream does not allow customizing each consumer within a single listener + # container (i.e. when concurrency = N) + # + # Since we need to customize each consumer individually, we simulate "concurrency = N" + # by creating N listener containers, each with concurrency = 1. consumeRun1-in-0: &consumeRunConfig destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run group: loadflowGroup @@ -33,6 +38,7 @@ spring: output-bindings: publishRun-out-0;publishResult-out-0;publishCancel-out-0;publishStopped-out-0;publishCancelFailed-out-0 rabbit: bindings: + # See comment on spring.cloud.stream.bindings.consumeRun1-in-0 consumeRun1-in-0: &consumeRunRabbitConfig consumer: auto-bind-dlq: true