Skip to content

Commit a7e5034

Browse files
committed
Allow configuring the rabbitmq health checks
Fix #2326 - allows disabling health check and readiness check per channel Fix #2019 - add an attribute to support lazy subscription
1 parent 9bcde20 commit a7e5034

File tree

4 files changed

+227
-22
lines changed

4 files changed

+227
-22
lines changed

documentation/src/main/docs/rabbitmq/rabbitmq-health.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,17 @@ On the outbound side (sending records to RabbitMQ), the check verifies
1010
that the sender is not disconnected from the broker; the sender *may*
1111
still be in an initialized state (connection not yet attempted), but
1212
this is regarded as live/ready.
13+
14+
You can disable health reporting by setting the `health-enabled` attribute of the channel to `false`.
15+
It disables both liveness and readiness.
16+
You can disable readiness reporting by setting the `health-readiness-enabled` attribute of the channel to `false`.
17+
18+
## @Channel and lazy subscription
19+
20+
When you inject a channel using `@Channel` annotation, you are responsible for subscribing to the channel.
21+
Until the subscription happens, the channel is not connected to the broker and thus cannot receive messages.
22+
The default health check will fail in this case.
23+
24+
To handle this use case, you need to configure the `health-lazy-subscription` attribute of the channel to `true`.
25+
It configures the health check to not fail if there are no subscription yet.
26+

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java

Lines changed: 71 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
11
package io.smallrye.reactive.messaging.rabbitmq;
22

3-
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING;
4-
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING;
5-
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING;
3+
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.*;
64
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;
75
import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;
86
import static java.time.Duration.ofSeconds;
97

10-
import java.util.Arrays;
11-
import java.util.HashMap;
12-
import java.util.List;
13-
import java.util.Map;
14-
import java.util.NoSuchElementException;
15-
import java.util.Optional;
8+
import java.util.*;
169
import java.util.concurrent.ConcurrentHashMap;
1710
import java.util.concurrent.CopyOnWriteArrayList;
1811
import java.util.concurrent.Flow;
@@ -96,6 +89,11 @@
9689
@ConnectorAttribute(name = "client-options-name", direction = INCOMING_AND_OUTGOING, description = "The name of the RabbitMQ Client Option bean used to customize the RabbitMQ client configuration", type = "string", alias = "rabbitmq-client-options-name")
9790
@ConnectorAttribute(name = "credentials-provider-name", direction = INCOMING_AND_OUTGOING, description = "The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client", type = "string", alias = "rabbitmq-credentials-provider-name")
9891

92+
// Health
93+
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")
94+
@ConnectorAttribute(name = "health-readiness-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether readiness health reporting is enabled (default) or disabled", defaultValue = "true")
95+
@ConnectorAttribute(name = "health-lazy-subscription", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the liveness and readiness checks should report 'ok' when there is no subscription yet. This is useful when injecting the channel with `@Inject @Channel(\"...\") Multi<...> multi;`", defaultValue = "false")
96+
9997
// Exchange
10098
@ConnectorAttribute(name = "exchange.name", direction = INCOMING_AND_OUTGOING, description = "The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to \"\", the default exchange is used.", type = "string")
10199
@ConnectorAttribute(name = "exchange.durable", direction = INCOMING_AND_OUTGOING, description = "Whether the exchange is durable", type = "boolean", defaultValue = "true")
@@ -181,6 +179,12 @@ private enum ChannelStatus {
181179

182180
private final Map<String, ChannelStatus> outgoingChannelStatus = new ConcurrentHashMap<>();
183181

182+
private final List<String> incomingReadiness = new CopyOnWriteArrayList<>();
183+
private final List<String> lazySubscriptions = new CopyOnWriteArrayList<>();
184+
private final List<String> outgoingReadiness = new CopyOnWriteArrayList<>();
185+
private final List<String> incomingLiveness = new CopyOnWriteArrayList<>();
186+
private final List<String> outgoingLiveness = new CopyOnWriteArrayList<>();
187+
184188
@Inject
185189
ExecutionHolder executionHolder;
186190

@@ -302,6 +306,17 @@ public Flow.Publisher<? extends Message<?>> getPublisher(final Config config) {
302306
multi = multi.broadcast().toAllSubscribers();
303307
}
304308

309+
// Register health check
310+
if (ic.getHealthEnabled()) {
311+
incomingLiveness.add(ic.getChannel());
312+
if (ic.getHealthReadinessEnabled()) {
313+
incomingReadiness.add(ic.getChannel());
314+
}
315+
if (ic.getHealthLazySubscription()) {
316+
lazySubscriptions.add(ic.getChannel());
317+
}
318+
}
319+
305320
return multi;
306321
}
307322

@@ -419,6 +434,14 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(final Config config)
419434
getSender);
420435
subscriptions.put(oc.getChannel(), processor);
421436

437+
// Register health check
438+
if (oc.getHealthEnabled()) {
439+
outgoingLiveness.add(oc.getChannel());
440+
if (oc.getHealthReadinessEnabled()) {
441+
outgoingReadiness.add(oc.getChannel());
442+
}
443+
}
444+
422445
// Return a SubscriberBuilder
423446
return MultiUtils.via(processor, m -> m.onFailure().invoke(t -> {
424447
log.error(oc.getChannel(), t);
@@ -428,28 +451,55 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(final Config config)
428451

429452
@Override
430453
public HealthReport getReadiness() {
431-
return getHealth(false);
454+
return getHealth(incomingReadiness, outgoingReadiness);
432455
}
433456

434457
@Override
435458
public HealthReport getLiveness() {
436-
return getHealth(false);
459+
return getHealth(incomingLiveness, outgoingLiveness);
437460
}
438461

439-
public HealthReport getHealth(boolean strict) {
462+
public HealthReport getStrictHealth() {
440463
final HealthReport.HealthReportBuilder builder = HealthReport.builder();
441464

442-
// Add health for incoming channels; since connections are made immediately
443-
// for subscribers, we insist on channel status being connected
444465
incomingChannelStatus.forEach((channel, status) -> builder.add(channel, status == ChannelStatus.CONNECTED));
445466

446-
// Add health for outgoing channels; since connections are made only on
447-
// first message dispatch for publishers, we allow both connected and initialising
448-
// unless in strict mode, in order to avoid a possible deadly embrace in
449-
// kubernetes (k8s will refuse to allow the pod to accept traffic unless it is ready,
450-
// and only if it is ready can e.g. calls be made to it that would trigger a message dispatch).
451-
outgoingChannelStatus.forEach((channel, status) -> builder.add(channel,
452-
(strict) ? status == ChannelStatus.CONNECTED : status != ChannelStatus.NOT_CONNECTED));
467+
outgoingChannelStatus.forEach((channel, status) -> builder.add(channel, status == ChannelStatus.CONNECTED));
468+
469+
return builder.build();
470+
}
471+
472+
public HealthReport getHealth(List<String> incomingChannels, List<String> outgoingChannels) {
473+
final HealthReport.HealthReportBuilder builder = HealthReport.builder();
474+
475+
for (String channel : incomingChannels) {
476+
// Add health for incoming channels; since connections are made immediately
477+
// for subscribers, we insist on channel status being connected (unless lazy subscription is enabled).
478+
ChannelStatus status = incomingChannelStatus.get(channel);
479+
if (status == null) {
480+
builder.add(channel, false);
481+
} else {
482+
if (lazySubscriptions.contains(channel)) {
483+
builder.add(channel, status == ChannelStatus.CONNECTED || status == ChannelStatus.INITIALISING);
484+
} else {
485+
builder.add(channel, status == ChannelStatus.CONNECTED);
486+
}
487+
}
488+
}
489+
490+
for (String channel : outgoingChannels) {
491+
// Add health for outgoing channels; since connections are made only on
492+
// first message dispatch for publishers, we allow both connected and initialising
493+
// unless in strict mode, in order to avoid a possible deadly embrace in
494+
// kubernetes (k8s will refuse to allow the pod to accept traffic unless it is ready,
495+
// and only if it is ready can e.g. calls be made to it that would trigger a message dispatch).
496+
ChannelStatus status = outgoingChannelStatus.get(channel);
497+
if (status == null) {
498+
builder.add(channel, false);
499+
} else {
500+
builder.add(channel, status != ChannelStatus.NOT_CONNECTED);
501+
}
502+
}
453503

454504
return builder.build();
455505
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package io.smallrye.reactive.messaging.rabbitmq;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import jakarta.inject.Inject;
6+
7+
import org.eclipse.microprofile.reactive.messaging.Channel;
8+
import org.eclipse.microprofile.reactive.messaging.Emitter;
9+
import org.eclipse.microprofile.reactive.messaging.Incoming;
10+
import org.eclipse.microprofile.reactive.messaging.Outgoing;
11+
import org.junit.jupiter.api.Test;
12+
13+
import io.smallrye.mutiny.Multi;
14+
import io.smallrye.reactive.messaging.providers.extension.HealthCenter;
15+
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
16+
17+
public class HealthTest extends WeldTestBase {
18+
19+
private MapBasedConfig getBaseConfig() {
20+
return new MapBasedConfig()
21+
.with("mp.messaging.incoming.in.queue.name", "in")
22+
.with("mp.messaging.incoming.in.connector", RabbitMQConnector.CONNECTOR_NAME)
23+
.with("mp.messaging.incoming.in.host", host)
24+
.with("mp.messaging.incoming.in.port", port)
25+
.with("rabbitmq-username", username)
26+
.with("rabbitmq-password", password)
27+
.with("mp.messaging.outgoing.out.queue.name", "out")
28+
.with("mp.messaging.outgoing.out.connector", RabbitMQConnector.CONNECTOR_NAME)
29+
.with("mp.messaging.outgoing.out.host", host)
30+
.with("mp.messaging.outgoing.out.port", port);
31+
}
32+
33+
@Test
34+
void testReadinessAndLivenessEnabled() {
35+
MapBasedConfig config = getBaseConfig();
36+
37+
addBeans(MyApp.class);
38+
runApplication(config);
39+
HealthCenter health = container.getBeanManager().createInstance().select(HealthCenter.class).get();
40+
assertThat(health.getLiveness().isOk()).isTrue();
41+
assertThat(health.getLiveness().getChannels()).anySatisfy(ci -> {
42+
assertThat(ci.getChannel()).isEqualTo("in");
43+
assertThat(ci.isOk()).isTrue();
44+
})
45+
.anySatisfy(ci -> {
46+
assertThat(ci.getChannel()).isEqualTo("out");
47+
assertThat(ci.isOk()).isTrue();
48+
});
49+
50+
assertThat(health.getReadiness().isOk()).isTrue();
51+
assertThat(health.getReadiness().getChannels()).anySatisfy(ci -> {
52+
assertThat(ci.getChannel()).isEqualTo("in");
53+
assertThat(ci.isOk()).isTrue();
54+
})
55+
.anySatisfy(ci -> {
56+
assertThat(ci.getChannel()).isEqualTo("out");
57+
assertThat(ci.isOk()).isTrue();
58+
});
59+
}
60+
61+
@Test
62+
void testHealthDisabled() {
63+
MapBasedConfig config = getBaseConfig()
64+
.with("mp.messaging.incoming.in.health-enabled", false)
65+
.with("mp.messaging.outgoing.out.health-enabled", false);
66+
67+
addBeans(MyApp.class);
68+
runApplication(config);
69+
HealthCenter health = container.getBeanManager().createInstance().select(HealthCenter.class).get();
70+
assertThat(health.getLiveness().isOk()).isTrue();
71+
assertThat(health.getLiveness().getChannels()).isEmpty();
72+
73+
assertThat(health.getReadiness().isOk()).isTrue();
74+
assertThat(health.getReadiness().getChannels()).isEmpty();
75+
}
76+
77+
@Test
78+
void testReadinessDisabled() {
79+
MapBasedConfig config = getBaseConfig()
80+
.with("mp.messaging.incoming.in.health-readiness-enabled", false)
81+
.with("mp.messaging.outgoing.out.health-readiness-enabled", false);
82+
83+
addBeans(MyApp.class);
84+
runApplication(config);
85+
HealthCenter health = container.getBeanManager().createInstance().select(HealthCenter.class).get();
86+
assertThat(health.getLiveness().isOk()).isTrue();
87+
assertThat(health.getLiveness().getChannels()).hasSize(2);
88+
89+
assertThat(health.getReadiness().isOk()).isTrue();
90+
assertThat(health.getReadiness().getChannels()).isEmpty();
91+
}
92+
93+
@Test
94+
void testWithAppUsingChannels() {
95+
MapBasedConfig config = getBaseConfig()
96+
.with("mp.messaging.incoming.in.health-lazy-subscription", true);
97+
98+
addBeans(MyAppUsingChannels.class);
99+
runApplication(config);
100+
HealthCenter health = container.getBeanManager().createInstance().select(HealthCenter.class).get();
101+
assertThat(health.getLiveness().isOk()).isTrue();
102+
assertThat(health.getLiveness().getChannels()).anySatisfy(ci -> {
103+
assertThat(ci.getChannel()).isEqualTo("in");
104+
assertThat(ci.isOk()).isTrue();
105+
})
106+
.anySatisfy(ci -> {
107+
assertThat(ci.getChannel()).isEqualTo("out");
108+
assertThat(ci.isOk()).isTrue();
109+
});
110+
111+
assertThat(health.getReadiness().isOk()).isTrue();
112+
assertThat(health.getReadiness().getChannels()).anySatisfy(ci -> {
113+
assertThat(ci.getChannel()).isEqualTo("in");
114+
assertThat(ci.isOk()).isTrue();
115+
})
116+
.anySatisfy(ci -> {
117+
assertThat(ci.getChannel()).isEqualTo("out");
118+
assertThat(ci.isOk()).isTrue();
119+
});
120+
}
121+
122+
public static class MyApp {
123+
@Incoming("in")
124+
@Outgoing("out")
125+
public String process(String in) {
126+
return in;
127+
}
128+
}
129+
130+
public static class MyAppUsingChannels {
131+
132+
@Inject
133+
@Channel("in")
134+
Multi<String> in;
135+
136+
@Inject
137+
@Channel("out")
138+
Emitter<String> out;
139+
140+
}
141+
}

smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQBrokerTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public boolean isRabbitMQConnectorAvailable(WeldContainer container) {
9898

9999
// Use strict mode for health because that indicates that outgoing channels have got to the point where
100100
// a declared exchange has been established.
101-
return connector.getHealth(true).isOk();
101+
return connector.getStrictHealth().isOk();
102102
}
103103

104104
public boolean isRabbitMQConnectorReady(SeContainer container) {

0 commit comments

Comments
 (0)