Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.activemq.perf;

import java.util.concurrent.TimeUnit;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.MapMessage;
Expand All @@ -39,7 +41,11 @@
public class InactiveDurableTopicTest extends TestCase {
private static final transient Logger LOG = LoggerFactory.getLogger(InactiveDurableTopicTest.class);

private static final int MESSAGE_COUNT = 2000;
/**
* Keep the payload small so that the test completes quickly but still
* exercises durable subscription behaviour.
*/
private static final int MESSAGE_COUNT = 500;
private static final String DEFAULT_PASSWORD = "";
private static final String USERNAME = "testuser";
private static final String CLIENTID = "mytestclient";
Expand All @@ -55,21 +61,28 @@ public class InactiveDurableTopicTest extends TestCase {
private ActiveMQConnectionFactory connectionFactory;
private BrokerService broker;

private static final int SEND_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30);
private static final long SEND_LOOP_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(2);
private static final long RECEIVE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5);
private static final String BROKER_NAME = "inactiveDurableTopicTest";

@Override
protected void setUp() throws Exception {
super.setUp();
broker = new BrokerService();

//broker.setPersistenceAdapter(new KahaPersistenceAdapter());
broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
broker.setUseJmx(false);
broker.setBrokerName(BROKER_NAME);
// broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
broker.start();
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
// connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
connectionFactory = new ActiveMQConnectionFactory("vm://" + BROKER_NAME);
/*
* Doesn't matter if you enable or disable these, so just leaving them
* out for this test case connectionFactory.setAlwaysSessionAsync(true);
* connectionFactory.setAsyncDispatch(true);
*/
connectionFactory.setUseAsyncSend(true);
connectionFactory.setSendTimeout(SEND_TIMEOUT_MILLIS);
}

@Override
Expand Down Expand Up @@ -124,9 +137,13 @@ public void test2ProducerTestCase() {
assertNotNull(msg);
msg.setString("key1", "value1");
int loop;
long start = System.currentTimeMillis();
for (loop = 0; loop < MESSAGE_COUNT; loop++) {
msg.setInt("key2", loop);
publisher.send(msg, DELIVERY_MODE, DELIVERY_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
if (System.currentTimeMillis() - start > SEND_LOOP_TIMEOUT_MILLIS) {
throw new AssertionFailedError("Timed out sending messages at loop: " + loop);
}
if (loop % 5000 == 0) {
LOG.info("Sent " + loop + " messages");
}
Expand Down Expand Up @@ -163,7 +180,10 @@ public void test3CreateSubscription() throws Exception {
assertNotNull(subscriber);
int loop;
for (loop = 0; loop < MESSAGE_COUNT; loop++) {
subscriber.receive();
Message message = subscriber.receive(RECEIVE_TIMEOUT_MILLIS);
if (message == null) {
throw new AssertionFailedError("Timed out waiting for message " + loop);
}
if (loop % 500 == 0) {
LOG.debug("Received " + loop + " messages");
}
Expand Down