Skip to content

Commit 97cbe18

Browse files
authored
feat(queueType): Allow to set queueType from client (#146)
* feat(queueType): Allow to set queueType from client * update some docs deps
1 parent 07ba566 commit 97cbe18

File tree

19 files changed

+1534
-1005
lines changed

19 files changed

+1534
-1005
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.reactivecommons.async.rabbit;
2+
3+
import lombok.AccessLevel;
4+
import lombok.NoArgsConstructor;
5+
6+
import java.util.UUID;
7+
8+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
9+
public class InstanceIdentifier {
10+
private static final String INSTANCE_ID = UUID.randomUUID().toString().replace("-", "");
11+
12+
public static String getInstanceId(String kind) {
13+
return getInstanceId(kind, INSTANCE_ID);
14+
}
15+
16+
public static String getInstanceId(String kind, String defaultHost) {
17+
String host = System.getenv("HOSTNAME");
18+
if (host == null || host.isEmpty()) {
19+
return defaultHost + "-" + kind;
20+
}
21+
return host + "-" + kind;
22+
}
23+
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/TopologyCreator.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,17 @@
1414

1515
@Log
1616
/*
17-
Direct use of channel is temporal, remove when https://github.com/reactor/reactor-rabbitmq/issues/37 is fixed in 1.0.0.RC2
17+
Direct use of channel is temporal, remove when https://github.com/reactor/reactor-rabbitmq/issues/37 is fixed in 1.0
18+
.0.RC2
1819
*/
1920
public class TopologyCreator {
2021

2122
private final Sender sender;
23+
private final String queueType;
2224

23-
public TopologyCreator(Sender sender) {
25+
public TopologyCreator(Sender sender, String queueType) {
2426
this.sender = sender;
27+
this.queueType = queueType != null ? queueType : "classic";
2528
}
2629

2730
public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchange) {
@@ -30,7 +33,7 @@ public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchange) {
3033
}
3134

3235
public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification queue) {
33-
return sender.declare(queue)
36+
return sender.declare(fillQueueType(queue))
3437
.onErrorMap(TopologyDefException::new);
3538
}
3639

@@ -85,6 +88,20 @@ public Mono<AMQP.Queue.DeclareOk> declareQueue(String name, Optional<Integer> ma
8588
return declare(specification);
8689
}
8790

91+
protected QueueSpecification fillQueueType(QueueSpecification specification) {
92+
String resolvedQueueType = this.queueType;
93+
if (specification.isAutoDelete() || specification.isExclusive()) {
94+
resolvedQueueType = "classic";
95+
}
96+
Map<String, Object> args = specification.getArguments();
97+
if (args == null) {
98+
args = new HashMap<>();
99+
}
100+
args.put("x-queue-type", resolvedQueueType);
101+
specification.arguments(args);
102+
return specification;
103+
}
104+
88105
public static class TopologyDefException extends RuntimeException {
89106
public TopologyDefException(Throwable cause) {
90107
super(cause);

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListener.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ private <T, D> Function<Message, Object> resolveConverter(RegisteredCommandHandl
146146
throw new RuntimeException("Unknown handler type");
147147
}
148148

149+
@Override
150+
protected String getKind() {
151+
return "commands";
152+
}
149153
}
150154

151155

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ private <T, D> Function<Message, Object> resolveConverter(RegisteredEventListene
139139
}
140140
throw new RuntimeException("Unknown handler type");
141141
}
142+
143+
@Override
144+
protected String getKind() {
145+
return "events";
146+
}
142147
}
143148

144149

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationNotificationListener.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,8 @@ private <T, D> Function<Message, Object> resolveConverter(RegisteredEventListene
101101
throw new RuntimeException("Unknown handler type");
102102
}
103103

104+
@Override
105+
protected String getKind() {
106+
return "notifications";
107+
}
104108
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,11 @@ protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
183183
protected Object parseMessageForReporter(Message msj) {
184184
return converter.readAsyncQueryStructure(msj);
185185
}
186+
187+
@Override
188+
protected String getKind() {
189+
return "queries";
190+
}
186191
}
187192

188193

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationReplyListener.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import lombok.extern.java.Log;
55
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
66
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
7+
import org.reactivecommons.async.rabbit.InstanceIdentifier;
78
import org.reactivecommons.async.rabbit.RabbitMessage;
89
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
910
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1011
import reactor.core.publisher.Flux;
1112
import reactor.core.publisher.Mono;
13+
import reactor.rabbitmq.ConsumeOptions;
1214
import reactor.rabbitmq.Receiver;
1315

1416
import java.util.logging.Level;
@@ -46,10 +48,12 @@ public void startListening(String routeKey) {
4648
if (createTopology) {
4749
flow = creator.declare(exchange(exchangeName).type("topic").durable(true)).then();
4850
}
51+
ConsumeOptions consumeOptions = new ConsumeOptions();
52+
consumeOptions.consumerTag(InstanceIdentifier.getInstanceId("replies"));
4953
deliveryFlux = flow
5054
.then(creator.declare(queue(queueName).durable(false).autoDelete(true).exclusive(true)))
5155
.then(creator.bind(binding(exchangeName, routeKey, queueName)))
52-
.thenMany(receiver.consumeAutoAck(queueName).doOnNext(delivery -> {
56+
.thenMany(receiver.consumeAutoAck(queueName, consumeOptions).doOnNext(delivery -> {
5357
try {
5458
final String correlationID = delivery.getProperties().getHeaders().get(CORRELATION_ID).toString();
5559
final boolean isEmpty = delivery.getProperties().getHeaders().get(COMPLETION_ONLY_SIGNAL) != null;

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package org.reactivecommons.async.rabbit.listeners;
22

33
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
45
import lombok.extern.java.Log;
56
import org.reactivecommons.async.commons.DiscardNotifier;
67
import org.reactivecommons.async.commons.FallbackStrategy;
78
import org.reactivecommons.async.commons.communications.Message;
89
import org.reactivecommons.async.commons.ext.CustomReporter;
910
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
11+
import org.reactivecommons.async.rabbit.InstanceIdentifier;
1012
import org.reactivecommons.async.rabbit.RabbitMessage;
1113
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
1214
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
@@ -94,6 +96,7 @@ public void startListener() {
9496

9597
ConsumeOptions consumeOptions = new ConsumeOptions();
9698
consumeOptions.qos(messageListener.getPrefetchCount());
99+
consumeOptions.consumerTag(InstanceIdentifier.getInstanceId(getKind()));
97100

98101
if (createTopology) {
99102
this.messageFlux = setUpBindings(messageListener.getTopologyCreator())
@@ -138,7 +141,8 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
138141
return flow.doOnSuccess(o -> logExecution(executorPath, initTime, true))
139142
.subscribeOn(scheduler).thenReturn(msj);
140143
} catch (Exception e) {
141-
log.log(Level.SEVERE, format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! ", msj.getProperties().getMessageId()));
144+
log.log(Level.SEVERE, format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! " +
145+
"Severe Warning! ", msj.getProperties().getMessageId()));
142146
return Mono.error(e);
143147
}
144148
}
@@ -247,6 +251,8 @@ private Long getRetryNumber(AcknowledgableDelivery delivery) {
247251
}
248252

249253
protected abstract Object parseMessageForReporter(Message msj);
254+
255+
protected abstract String getKind();
250256
}
251257

252258

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.reactivecommons.async.rabbit;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import static org.assertj.core.api.Assertions.assertThat;
6+
7+
class InstanceIdentifierTest {
8+
9+
@Test
10+
void shouldGetInstanceIdFromUuid() {
11+
String instanceId = InstanceIdentifier.getInstanceId("events");
12+
var expectedLength = 39;
13+
assertThat(instanceId).endsWith("-events").hasSize(expectedLength);
14+
}
15+
16+
@Test
17+
void shouldGetInstanceIdFromEnv() {
18+
String instanceId = InstanceIdentifier.getInstanceId("events", "host123");
19+
assertThat(instanceId).isEqualTo("host123-events");
20+
}
21+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.reactivecommons.async.rabbit.communications;
2+
3+
import org.junit.jupiter.api.BeforeEach;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.api.extension.ExtendWith;
6+
import org.mockito.Mock;
7+
import org.mockito.junit.jupiter.MockitoExtension;
8+
import reactor.rabbitmq.QueueSpecification;
9+
import reactor.rabbitmq.Sender;
10+
11+
import static org.assertj.core.api.Assertions.assertThat;
12+
13+
@ExtendWith(MockitoExtension.class)
14+
class TopologyCreatorTest {
15+
@Mock
16+
private Sender sender;
17+
18+
private TopologyCreator creator;
19+
20+
@BeforeEach
21+
void setUp() {
22+
creator = new TopologyCreator(sender, "quorum");
23+
}
24+
25+
@Test
26+
void shouldInjectQueueType() {
27+
QueueSpecification spec = creator.fillQueueType(QueueSpecification.queue("durable"));
28+
assertThat(spec.getArguments()).containsEntry("x-queue-type", "quorum");
29+
}
30+
31+
@Test
32+
void shouldForceClassicQueueTypeWhenAutodelete() {
33+
QueueSpecification spec = creator.fillQueueType(QueueSpecification.queue("autodelete").autoDelete(true));
34+
assertThat(spec.getArguments()).containsEntry("x-queue-type", "classic");
35+
}
36+
37+
@Test
38+
void shouldForceClassicQueueTypeWhenExclusive() {
39+
QueueSpecification spec = creator.fillQueueType(QueueSpecification.queue("exclusive").exclusive(true));
40+
assertThat(spec.getArguments()).containsEntry("x-queue-type", "classic");
41+
}
42+
}

0 commit comments

Comments
 (0)