Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7567] transactionProducer get the topic route before sending the message #7569

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ public void start(final boolean startFactory) throws MQClientException {
mQClientFactory.start();
}

this.initTopicRoute();

this.mqFaultStrategy.startDetector();

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
Expand Down Expand Up @@ -1741,6 +1743,19 @@ private void prepareSendRequest(final Message msg, long timeout) {
}
}

private void initTopicRoute() {
List<String> topics = this.defaultMQProducer.getTopics();
if (topics != null && topics.size() > 0) {
topics.forEach(topic -> {
String newTopic = NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), topic);
TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(newTopic);
if (topicPublishInfo == null || !topicPublishInfo.ok()) {
log.warn("No route info of this topic: " + newTopic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO));
}
});
}
}

public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private String producerGroup;

/**
* Topics that need to be initialized for transaction producer
*/
private List<String> topics;

/**
* Just for testing or demo program
*/
Expand Down Expand Up @@ -235,6 +240,22 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}

/**
* Constructor specifying namespace, producer group, topics and RPC hook.
*
* @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
* @param topics Topic that needs to be initialized for routing
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
this.topics = topics;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}

/**
* Constructor specifying producer group and enabled msg trace flag.
*
Expand Down Expand Up @@ -290,6 +311,41 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC
}
}

/**
* Constructor specifying namespace, producer group, topics, RPC hook, enabled msgTrace flag and customized trace topic
* name.
*
* @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
* @param topics Topic that needs to be initialized for routing
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultMQProducer(final String namespace, final String producerGroup, final List<String> topics,
RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
this.topics = topics;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}

@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
Expand Down Expand Up @@ -1332,4 +1388,11 @@ public void setBackPressureForAsyncSendSize(int backPressureForAsyncSendSize) {
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize);
}

public List<String> getTopics() {
return topics;
}

public void setTopics(List<String> topics) {
this.topics = topics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.producer;

import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
Expand All @@ -36,19 +37,31 @@ public TransactionMQProducer() {
}

public TransactionMQProducer(final String producerGroup) {
this(null, producerGroup, null);
this(null, producerGroup, null, null);
}

public TransactionMQProducer(final String producerGroup, final List<String> topics) {
this(null, producerGroup, topics, null);
}

public TransactionMQProducer(final String namespace, final String producerGroup) {
this(namespace, producerGroup, null);
this(namespace, producerGroup, null, null);
}

public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics) {
this(namespace, producerGroup, topics, null);
}

public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
this(null, producerGroup, rpcHook);
this(null, producerGroup, null, rpcHook);
}

public TransactionMQProducer(final String producerGroup, final List<String> topics, RPCHook rpcHook) {
this(null, producerGroup, topics, rpcHook);
}

public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
super(namespace, producerGroup, rpcHook);
public TransactionMQProducer(final String namespace, final String producerGroup, final List<String> topics, RPCHook rpcHook) {
super(namespace, producerGroup, topics, rpcHook);
}

public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -39,7 +40,7 @@ public class TransactionProducer {

public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC));

// Uncomment the following line while debugging, namesrvAddr should be set to your local address
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
Expand Down