Skip to content

Commit abffe04

Browse files
committed
fix IT race condition
1 parent 0623ecf commit abffe04

File tree

1 file changed

+14
-11
lines changed
  • solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder

1 file changed

+14
-11
lines changed

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,36 +1079,39 @@ public <T> void testConsumerReconnect(
10791079

10801080
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
10811081

1082-
logger.info(String.format("Disabling egress to queue %s", queue0));
1082+
logger.info("Disabling egress to queue {}", queue0);
10831083
sempV2Api.config().updateMsgVpnQueue(vpnName, queue0, new ConfigMsgVpnQueue().egressEnabled(false), null, null);
10841084
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
10851085

1086-
logger.info(String.format("Enabling egress to queue %s", queue0));
1086+
logger.info("Enabling egress to queue {}", queue0);
10871087
sempV2Api.config().updateMsgVpnQueue(vpnName, queue0, new ConfigMsgVpnQueue().egressEnabled(true), null, null);
10881088
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
10891089

10901090
logger.info("Stopping producer");
10911091
producerStop.set(true);
10921092
int numMsgsSent = producerFuture.get(5, TimeUnit.SECONDS);
10931093

1094-
softly.assertThat(queue0).satisfies(q -> retryAssert(1, TimeUnit.MINUTES, () ->
1094+
logger.info("Waiting for consumer to finish processing messages");
1095+
softly.assertThat(queue0).satisfies(q -> retryAssert(5, TimeUnit.MINUTES, () ->
10951096
assertThat(sempV2Api.monitor()
10961097
.getMsgVpnQueueMsgs(vpnName, q, Integer.MAX_VALUE, null, null, null)
10971098
.getData()
10981099
.size())
10991100
.as("Expected queue %s to be empty after rebind", q)
11001101
.isEqualTo(0)));
11011102

1102-
MonitorMsgVpnQueue queueState = sempV2Api.monitor()
1103-
.getMsgVpnQueue(vpnName, queue0, null)
1104-
.getData();
1103+
softly.assertThat(queue0).satisfies(q -> retryAssert(1, TimeUnit.MINUTES, () -> {
1104+
MonitorMsgVpnQueue queueState = sempV2Api.monitor()
1105+
.getMsgVpnQueue(vpnName, q, null)
1106+
.getData();
11051107

1106-
softly.assertThat(queueState.getDisabledBindFailureCount()).isGreaterThan(0);
1107-
softly.assertThat(uniquePayloadsReceived.size()).isEqualTo(numMsgsSent);
1108-
softly.assertThat(numMsgsConsumed.get()).isGreaterThanOrEqualTo(numMsgsSent);
1108+
logger.info("num-sent: {}, num-consumed: {}, num-redelivered: {}", numMsgsSent, numMsgsConsumed.get(),
1109+
queueState.getRedeliveredMsgCount());
1110+
softly.assertThat(queueState.getDisabledBindFailureCount()).isGreaterThan(0);
1111+
softly.assertThat(uniquePayloadsReceived.size()).isEqualTo(numMsgsSent);
1112+
softly.assertThat(numMsgsConsumed.get()).isGreaterThanOrEqualTo(numMsgsSent);
1113+
}));
11091114

1110-
logger.info("num-sent: {}, num-consumed: {}, num-redelivered: {}", numMsgsSent, numMsgsConsumed.get(),
1111-
queueState.getRedeliveredMsgCount());
11121115
producerBinding.unbind();
11131116
consumerBinding.unbind();
11141117
}

0 commit comments

Comments
 (0)