From a49206ff302ad9aef488cccae1d75bb7ac272c0c Mon Sep 17 00:00:00 2001 From: panzhi33 Date: Thu, 16 Nov 2023 16:37:16 +0800 Subject: [PATCH] transactionProducer get the topic route before sending the message --- .../impl/producer/DefaultMQProducerImpl.java | 15 +++++ .../client/producer/DefaultMQProducer.java | 63 +++++++++++++++++++ .../producer/TransactionMQProducer.java | 23 +++++-- .../transaction/TransactionProducer.java | 3 +- 4 files changed, 98 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index b0c212e46b6..566d7439ed4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -263,6 +263,8 @@ public void start(final boolean startFactory) throws MQClientException { mQClientFactory.start(); } + this.initTopicRoute(); + this.mqFaultStrategy.startDetector(); log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), @@ -1741,6 +1743,19 @@ private void prepareSendRequest(final Message msg, long timeout) { } } + private void initTopicRoute() { + List topics = this.defaultMQProducer.getTopics(); + if (topics != null && topics.size() > 0) { + topics.forEach(topic -> { + String newTopic = NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), topic); + TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(newTopic); + if (topicPublishInfo == null || !topicPublishInfo.ok()) { + log.warn("No route info of this topic: " + newTopic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO)); + } + }); + } + } + public ConcurrentMap getTopicPublishInfoTable() { return topicPublishInfoTable; } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index c5b1b52230a..530b0c07d3e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -87,6 +87,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ private String producerGroup; + /** + * Topics that need to be initialized for transaction producer + */ + private List topics; + /** * Just for testing or demo program */ @@ -235,6 +240,22 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); } + /** + * Constructor specifying namespace, producer group, topics and RPC hook. + * + * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. + * @param topics Topic that needs to be initialized for routing + * @param rpcHook RPC hook to execute per each remoting command execution. + */ + public DefaultMQProducer(final String namespace, final String producerGroup, final List topics, RPCHook rpcHook) { + this.namespace = namespace; + this.producerGroup = producerGroup; + this.topics = topics; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); + } + /** * Constructor specifying producer group and enabled msg trace flag. * @@ -290,6 +311,41 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC } } + /** + * Constructor specifying namespace, producer group, topics, RPC hook, enabled msgTrace flag and customized trace topic + * name. + * + * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. + * @param topics Topic that needs to be initialized for routing + * @param rpcHook RPC hook to execute per each remoting command execution. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default + * trace topic name. + */ + public DefaultMQProducer(final String namespace, final String producerGroup, final List topics, + RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { + this.namespace = namespace; + this.producerGroup = producerGroup; + this.topics = topics; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); + //if client open the message trace feature + if (enableMsgTrace) { + try { + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); + dispatcher.setHostProducer(this.defaultMQProducerImpl); + traceDispatcher = dispatcher; + this.defaultMQProducerImpl.registerSendMessageHook( + new SendMessageTraceHookImpl(traceDispatcher)); + this.defaultMQProducerImpl.registerEndTransactionHook( + new EndTransactionTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + logger.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } + } + @Override public void setUseTLS(boolean useTLS) { super.setUseTLS(useTLS); @@ -1332,4 +1388,11 @@ public void setBackPressureForAsyncSendSize(int backPressureForAsyncSendSize) { defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize); } + public List getTopics() { + return topics; + } + + public void setTopics(List topics) { + this.topics = topics; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index baa8b440805..15df1ee06ac 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.producer; +import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; @@ -36,19 +37,31 @@ public TransactionMQProducer() { } public TransactionMQProducer(final String producerGroup) { - this(null, producerGroup, null); + this(null, producerGroup, null, null); + } + + public TransactionMQProducer(final String producerGroup, final List topics) { + this(null, producerGroup, topics, null); } public TransactionMQProducer(final String namespace, final String producerGroup) { - this(namespace, producerGroup, null); + this(namespace, producerGroup, null, null); + } + + public TransactionMQProducer(final String namespace, final String producerGroup, final List topics) { + this(namespace, producerGroup, topics, null); } public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { - this(null, producerGroup, rpcHook); + this(null, producerGroup, null, rpcHook); + } + + public TransactionMQProducer(final String producerGroup, final List topics, RPCHook rpcHook) { + this(null, producerGroup, topics, rpcHook); } - public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { - super(namespace, producerGroup, rpcHook); + public TransactionMQProducer(final String namespace, final String producerGroup, final List topics, RPCHook rpcHook) { + super(namespace, producerGroup, topics, rpcHook); } public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java index 5973c3c306c..d1d57c55ef6 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; +import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -39,7 +40,7 @@ public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); - TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP); + TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC)); // Uncomment the following line while debugging, namesrvAddr should be set to your local address // producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);