Skip to content

Commit a7d493b

Browse files
authored
transactionProducer get the topic route before sending the message (#7569)
1 parent ca721b0 commit a7d493b

File tree

4 files changed

+98
-6
lines changed

4 files changed

+98
-6
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,8 @@ public void start(final boolean startFactory) throws MQClientException {
262262
mQClientFactory.start();
263263
}
264264

265+
this.initTopicRoute();
266+
265267
this.mqFaultStrategy.startDetector();
266268

267269
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
@@ -1740,6 +1742,19 @@ private void prepareSendRequest(final Message msg, long timeout) {
17401742
}
17411743
}
17421744

1745+
private void initTopicRoute() {
1746+
List<String> topics = this.defaultMQProducer.getTopics();
1747+
if (topics != null && topics.size() > 0) {
1748+
topics.forEach(topic -> {
1749+
String newTopic = NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), topic);
1750+
TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(newTopic);
1751+
if (topicPublishInfo == null || !topicPublishInfo.ok()) {
1752+
log.warn("No route info of this topic: " + newTopic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO));
1753+
}
1754+
});
1755+
}
1756+
}
1757+
17431758
public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
17441759
return topicPublishInfoTable;
17451760
}

client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
8787
*/
8888
private String producerGroup;
8989

90+
/**
91+
* Topics that need to be initialized for transaction producer
92+
*/
93+
private List<String> topics;
94+
9095
/**
9196
* Just for testing or demo program
9297
*/
@@ -235,6 +240,22 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC
235240
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
236241
}
237242

243+
/**
244+
* Constructor specifying namespace, producer group, topics and RPC hook.
245+
*
246+
* @param namespace Namespace for this MQ Producer instance.
247+
* @param producerGroup Producer group, see the name-sake field.
248+
* @param topics Topic that needs to be initialized for routing
249+
* @param rpcHook RPC hook to execute per each remoting command execution.
250+
*/
251+
public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
252+
this.namespace = namespace;
253+
this.producerGroup = producerGroup;
254+
this.topics = topics;
255+
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
256+
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
257+
}
258+
238259
/**
239260
* Constructor specifying producer group and enabled msg trace flag.
240261
*
@@ -290,6 +311,41 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC
290311
}
291312
}
292313

314+
/**
315+
* Constructor specifying namespace, producer group, topics, RPC hook, enabled msgTrace flag and customized trace topic
316+
* name.
317+
*
318+
* @param namespace Namespace for this MQ Producer instance.
319+
* @param producerGroup Producer group, see the name-sake field.
320+
* @param topics Topic that needs to be initialized for routing
321+
* @param rpcHook RPC hook to execute per each remoting command execution.
322+
* @param enableMsgTrace Switch flag instance for message trace.
323+
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
324+
* trace topic name.
325+
*/
326+
public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics,
327+
RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
328+
this.namespace = namespace;
329+
this.producerGroup = producerGroup;
330+
this.topics = topics;
331+
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
332+
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
333+
//if client open the message trace feature
334+
if (enableMsgTrace) {
335+
try {
336+
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
337+
dispatcher.setHostProducer(this.defaultMQProducerImpl);
338+
traceDispatcher = dispatcher;
339+
this.defaultMQProducerImpl.registerSendMessageHook(
340+
new SendMessageTraceHookImpl(traceDispatcher));
341+
this.defaultMQProducerImpl.registerEndTransactionHook(
342+
new EndTransactionTraceHookImpl(traceDispatcher));
343+
} catch (Throwable e) {
344+
logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
345+
}
346+
}
347+
}
348+
293349
@Override
294350
public void setUseTLS(boolean useTLS) {
295351
super.setUseTLS(useTLS);
@@ -1316,4 +1372,11 @@ public void setBackPressureForAsyncSendSize(int backPressureForAsyncSendSize) {
13161372
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize);
13171373
}
13181374

1375+
public List<String> getTopics() {
1376+
return topics;
1377+
}
1378+
1379+
public void setTopics(List<String> topics) {
1380+
this.topics = topics;
1381+
}
13191382
}

client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.client.producer;
1818

19+
import java.util.List;
1920
import java.util.concurrent.ExecutorService;
2021
import org.apache.rocketmq.client.exception.MQClientException;
2122
import org.apache.rocketmq.common.message.Message;
@@ -36,19 +37,31 @@ public TransactionMQProducer() {
3637
}
3738

3839
public TransactionMQProducer(final String producerGroup) {
39-
this(null, producerGroup, null);
40+
this(null, producerGroup, null, null);
41+
}
42+
43+
public TransactionMQProducer(final String producerGroup, final List<String> topics) {
44+
this(null, producerGroup, topics, null);
4045
}
4146

4247
public TransactionMQProducer(final String namespace, final String producerGroup) {
43-
this(namespace, producerGroup, null);
48+
this(namespace, producerGroup, null, null);
49+
}
50+
51+
public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics) {
52+
this(namespace, producerGroup, topics, null);
4453
}
4554

4655
public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
47-
this(null, producerGroup, rpcHook);
56+
this(null, producerGroup, null, rpcHook);
57+
}
58+
59+
public TransactionMQProducer(final String producerGroup, final List<String> topics, RPCHook rpcHook) {
60+
this(null, producerGroup, topics, rpcHook);
4861
}
4962

50-
public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
51-
super(namespace, producerGroup, rpcHook);
63+
public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
64+
super(namespace, producerGroup, topics, rpcHook);
5265
}
5366

5467
public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {

example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.rocketmq.remoting.common.RemotingHelper;
2525

2626
import java.io.UnsupportedEncodingException;
27+
import java.util.Arrays;
2728
import java.util.concurrent.ArrayBlockingQueue;
2829
import java.util.concurrent.ExecutorService;
2930
import java.util.concurrent.ThreadPoolExecutor;
@@ -39,7 +40,7 @@ public class TransactionProducer {
3940

4041
public static void main(String[] args) throws MQClientException, InterruptedException {
4142
TransactionListener transactionListener = new TransactionListenerImpl();
42-
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
43+
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC));
4344

4445
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
4546
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

0 commit comments

Comments
 (0)